Modern businesses have an overwhelming amount of data available to them from a huge number of IoT devices and applications, in wide and varied file formats. This data can be extremely time-consuming and complex to ingest and transform into a readable, useful format. Auto Loader is an amazing tool from Databricks which simplifies the solution, ensures full automation of the process and enables real time decision making.
Databricks is a scalable big data analytics platform designed for data science and data engineering and integrated with major cloud platforms Amazon Web Services, Microsoft Azure, and Google Cloud Platform.
Auto Loader is a feature of Databricks to stream or incrementally ingest and transform millions of files per hour from a data lake reading newly arriving files. For a detailed explanation on Databricks Auto Loader click here.
Data Mastery recently deployed an Auto Loader solution with an OEM in mining industry to ingest and transform telemetry data from WAGO PLC devices to IoT Hub. Previously, the OEM was manually downloading the data files on site, then manually transforming the data file using excel to produce a readable report. Auto Loader completely automates this process and delivers a readable report automatically, eliminating one man hour of manual data processing per shift, saving over 21+ man hours per week per site (24hr operation). Needless to say, our client was delighted with the result!
How to ingest and transform WAGO PLC device data
In this article, I will explain what we did to create a streaming solution to ingest and transform WAGO PLC data being sent to IoT hub from multiple sites and devices.
The data starts out in 1000x 16-bit Unicode registers. This is used for calculations in the ARM assembly language used by the MRA and in the PLC.
In this case, the encoding within the binary registers is subdivided to carry more than one piece of information. An example is the date and time encoding, where seconds and minutes are encoded within the same 16-bit register.
While the data is stored on the WAGO PLC device as 16 bit registers, it is sent to IoT hub as 2000x 8-bit unicode registers to reduce the packet sizes being sent. Therefore it needs to be transformed back into 16-bit registers from 8-bit as part of the transformation into readable format.
Solution
The solution used follows the high-level steps below:
1. WAGO PLC messages are sent to Azure IoT Hub where these messages are routed to specific Azure Storage containers in Json format based on their DeviceId.
2. As the file lands in the Storage Container an Event Grid-created subscriber creates a message on an Azure Storage Queue with the location and name of the new file.
3. Databricks Auto Loader checks the Storage Queue for new messages and reads the file stream into a dataframe as they arrive.
4. The input dataframe is then transformed from 2000x 8-bit unicode registers, back to 1000x 16-bit unicode registers and subsequently to the desired formats e.g. Tonnage Per Hour, Day, Month Year etc.
5. The transformed dataframe is then appended to a Delta Table for Power BI consumption.
Prerequisites
Configure WAGO PLC device(s) to send messages to Azure IoT Hub
Register an Azure service principal - used for the auto creation of Event Grid/Subscription and Storage Queue, alternatively, you can create the services manually if you want
Generate a Storage Queue Shared access signature with the following permissions
Create the following Azure Key Vault secrets for SubscriptionId, TenantId, Bronze-Queue-ConnectionString (created above), Service Principal ClientId & ClientSecret (created above)
To create the Auto Loader streaming solution the following steps must be completed
Configure WAGO PLC devices to send messages to Azure IoT Hub
Route incoming IoT Hub messages to specific storage containers by filtering on DeviceId
Create a PySpark function to use Spark structured stream source cloudFiles to read from the input ADLS Gen2 storage container
Create a PySpark function to transform the input file stream to the desired output columns
Write the stream or batch dataframe to a Databricks Delta table
Test :)
Detailed step through:
To configure the WAGO PLC to send messages to Azure IoT hub - Click Here
Route incoming IoT Hub messages to specific storage containers depending on DeviceId
2.1. Create a custom Endpoint to an ADLS Gen2 storage container
IoT Hub 🠮 Message routing 🠮 Custom endpoints 🠮 Add
2.2. Create a route and add a query to filter by DeviceId
IoT Hub 🠮 Message routing 🠮 Routes 🠮 Add
3. Create a PySpark function to read newly arriving files on the storage queue
4. Create a PySpark function to transform the input file stream dataframe to the desired output columns.
NOTE: I have only added a sample transformation due to a large number of transformations
5. Write the stream to the Delta Table
Use Trigger once or Trigger AvailableNow to run as a batch - for more information on these options click here.
6. Test :)
Run the process end-to-end to check the data has been inserted into the table
Conclusion
Auto Loader is an excellent tool that can be applied across any business in any sector to get the benefits of a simple, time efficient solution and real time data.
I hope you have found this helpful and will save your company money and time getting started with Databricks Auto Loader, and hope it will help drive insights to give value to your business.
If you would like a copy of my code, please drop me a message on LinkedIn or if you have enjoyed reading as much as I did writing please share with your friends.
Commenti