Today I’m going to share with you how to create an an Upsert function using PySpark for Databricks Delta Lake, it can be reused across any Databricks workflow with minimal effort and flexibility.
Basic Upsert Logic
Data is load in to a dataframe
The data is joined on key columns columns and a conditional watermark column to identify the matches
If the record in the staging Dataframe exists in the target table and if is newer than the optional watermark column, the record is updated in the target table
If the record in the staging table does not exist in the target table, it is inserted into the target table
Databricks SQL Upsert PySpark Function
Functionality
An input Dataframe is created
If the table does not exist it is created using the schema of the incoming dataframe
The function accepts a parameter for multiple key columns and/or an optional watermark(e.g. DataModified) column to join the staging dataframe and target table
If a watermark column is passed to the function, it will update the record in the target table provided the staging table record is newer than the target table record
The function will dynamically read the Dataframe columns to form part of the Merge upsert and insert statements
The code will be integrated with Azure Key Vault to securely store the Storage Account and AccessKeys
Prerequisite
Create an Azure Key Vault service and grant appropriate access for the Databricks Workspace.
Create the following KeyVault Entries which are used in the function to secure sensitive information
Inputs
df: Input Dataframe
TargetTable: Name of the Databricks Delta Target Table
Container: ADLS Gen2 Container
Folder: ADLS Gen2 Folder
KeyColumns: Pipe separated columns that uniquely defines a record in input dataframe e.g. CustomerId or CustomerId|FirstName
WatermarkColumn: Name of watermark column in input dataframe
NOTE: You can remove the Container and Folder location parameters if you have one static location for Delta Tables and use the TargetTable name as the unique location. Note the container/folder must be a unqiue location to store the tables data.
Code
Please see the comments on each block of code for an explanation.
Conclusion
If you would like a copy please drop me a message on LinkedIn.
I hope you have found this helpful and will save your company time and money getting started on your Databricks Journey. Any thoughts, questions, corrections and suggestions are very welcome :)
Please share on LinkedIn if you found this useful #DataEngineering #Databricks #Spark #PySpark #DeltaLake #Python #ELT
Kommentare