top of page
Writer's pictureRory McManus

Databricks DeltaLake: PySpark Upsert Function

Today I’m going to share with you how to create an an Upsert function using PySpark for Databricks Delta Lake, it can be reused across any Databricks workflow with minimal effort and flexibility.



Basic Upsert Logic

  1. Data is load in to a dataframe

  2. The data is joined on key columns columns and a conditional watermark column to identify the matches

  3. If the record in the staging Dataframe exists in the target table and if is newer than the optional watermark column, the record is updated in the target table

  4. If the record in the staging table does not exist in the target table, it is inserted into the target table


Databricks SQL Upsert PySpark Function

Functionality

  • An input Dataframe is created

  • If the table does not exist it is created using the schema of the incoming dataframe

  • The function accepts a parameter for multiple key columns and/or an optional watermark(e.g. DataModified) column to join the staging dataframe and target table

  • If a watermark column is passed to the function, it will update the record in the target table provided the staging table record is newer than the target table record

  • The function will dynamically read the Dataframe columns to form part of the Merge upsert and insert statements

  • The code will be integrated with Azure Key Vault to securely store the Storage Account and AccessKeys


Prerequisite

  • Create an Azure Key Vault service and grant appropriate access for the Databricks Workspace.

  • Create the following KeyVault Entries which are used in the function to secure sensitive information

Inputs

  • df: Input Dataframe

  • TargetTable: Name of the Databricks Delta Target Table

  • Container: ADLS Gen2 Container

  • Folder: ADLS Gen2 Folder

  • KeyColumns: Pipe separated columns that uniquely defines a record in input dataframe e.g. CustomerId or CustomerId|FirstName

  • WatermarkColumn: Name of watermark column in input dataframe

NOTE: You can remove the Container and Folder location parameters if you have one static location for Delta Tables and use the TargetTable name as the unique location. Note the container/folder must be a unqiue location to store the tables data.


Code


Please see the comments on each block of code for an explanation.





Conclusion

If you would like a copy please drop me a message on LinkedIn.


I hope you have found this helpful and will save your company time and money getting started on your Databricks Journey. Any thoughts, questions, corrections and suggestions are very welcome :)


Please share on LinkedIn if you found this useful #DataEngineering #Databricks #Spark #PySpark #DeltaLake #Python #ELT

1,415 views0 comments

Kommentare


bottom of page