Today I’m going to share with you how to create an Azure SQL Upsert function using PySpark for Azure Dedicated Spark Pools, it can be reused across Azure Synapse workflows with minimal effort and flexibility.
Basic Upsert Logic
Two tables are created, one staging table and one target table
Data is loaded into the staging table
The tables are joined on lookup columns and a conditional delta/watermark column to identify the matches
If the record in the staging table exists in the target table, the record is updated in the target table
If the record in the staging table does not exist in the target table, it is inserted into the target table
Azure SQL Upsert PySpark Function
Functionality
An input data frame is written to a staging table on Azure SQL
The function accepts a parameter for multiple lookup columns and/or an optional Delta column to join the staging and target tables
If a delta column is passed to the function, it will update the record in the target table provided the staging table record is newer than the target table record
The function will dynamically read the Dataframe columns to form part of the SQL Merge upsert and insert statements
The code will be integrated with Azure Key Vault to securely store keys, passwords, and server names etc
Before writing code, it is critical to understand the Spark Azure SQL Database connector. The connector does not support preUpdate or postUpdate statements following writing to a table. For this reason, we need to write the Dataframe to the staging table and subsequently pass the valid SQL merge statements to the PyODBC connector to execute the upsert.
Prerequisite
Create an Azure Key Vault service and grant appropriate access for the Azure Synapse Workspace.
Create the following KeyVault Entries which are used in the function to secure sensitive information
AzureSQL-ServerName: e.g. datamastery-sql-srv-prod-01
AzureSQL-DatabaseName: e.g. Datawarehouse
AzureSQL-Admin-Username: e.g. SqlAdmin
AzureSQL-Admin-Password: e.g. a very secure password :) — Password123
Azure SQL Target and Staging tables to be created with the correct data types and indexes to improve join performance
NOTE: If you want the staging table to be overwritten each time check the code comments to implement.
Input Parameters
df: Input Dataframe
azureSqlStagingTable: Name of the Azure SQL Target Table
azureSqlDWTable: Name of the Azure SQL Target DW Table
lookupColumns: Pipe separated columns that uniquely defines a record in input dataframe e.g. CustomerId or CustomerId|FirstName
deltaColumn: Name of watermark column in input dataframe
Code
Please see the comments on each block of code for an explanation.
Conclusion
If you would like a copy please drop me a message on LinkedIn.
I hope you have found this helpful and will save your company time and money getting started on your Azure Analytics Journey. Any thoughts, questions, corrections and suggestions are very welcome :)
Please share on LinkedIn if you found this useful #DataEngineering #AzureSynapse #Spark #PySpark #AzureSQL #SQL #ELT
Comments