Book Image

Azure Databricks Cookbook

By : Phani Raj, Vinod Jaiswal
Book Image

Azure Databricks Cookbook

By: Phani Raj, Vinod Jaiswal

Overview of this book

Azure Databricks is a unified collaborative platform for performing scalable analytics in an interactive environment. The Azure Databricks Cookbook provides recipes to get hands-on with the analytics process, including ingesting data from various batch and streaming sources and building a modern data warehouse. The book starts by teaching you how to create an Azure Databricks instance within the Azure portal, Azure CLI, and ARM templates. You’ll work through clusters in Databricks and explore recipes for ingesting data from sources, including files, databases, and streaming sources such as Apache Kafka and EventHub. The book will help you explore all the features supported by Azure Databricks for building powerful end-to-end data pipelines. You'll also find out how to build a modern data warehouse by using Delta tables and Azure Synapse Analytics. Later, you’ll learn how to write ad hoc queries and extract meaningful insights from the data lake by creating visualizations and dashboards with Databricks SQL. Finally, you'll deploy and productionize a data pipeline as well as deploy notebooks and Azure Databricks service using continuous integration and continuous delivery (CI/CD). By the end of this Azure book, you'll be able to use Azure Databricks to streamline different processes involved in building data-driven apps.
Table of Contents (12 chapters)

Reading and writing data from and to Azure Synapse SQL (dedicated SQL pool) using native connectors 

In this recipe, you will learn how to read and write data to Azure Synapse Analytics using Azure Databricks.

Azure Synapse Analytics is a data warehouse hosted in the cloud that leverages massively parallel processing (MPP) to run complex queries across large volumes of data.

Azure Synapse can be accessed from Databricks using the Azure Synapse connector. DataFrames can be directly loaded as a table in a Synapse Spark pool. Azure Blob storage is used as temporary storage to upload data between Azure Databricks and Azure Synapse with the Azure Synapse connector.

Getting ready

You will need to ensure you have the following items before starting to work on this recipe:

CREATE MASTER KEY
  • Account Name and Storage keys for Azure Storage Blob —get these by following the steps mentioned in the Mounting ADLS Gen-2 and Azure Blob storage to Azure DBFS recipe of this chapter
  • Service principal details—get these by following the steps mentioned in the Mounting ADLS Gen-2 and Azure Blob storage to Azure DBFS recipe of this chapter
  • A mounted ADLS Gen-2 storage account—get this by following the steps mentioned in the Mounting ADLS Gen-2 and Azure Blob storage to Azure DBFS recipe of this chapter
  • Use the same Customer csvFiles that was used in the Reading and writing data from and to Azure Blob recipe.
  • Get the JDBC connection string of Synapse Workspace. Go to the Dedicated SQL Pool that you have created in the Synapse Workspace and in the Overview tab click on Show database connection strings. Under the Connection strings tab, click on JDBC and copy the connection string under JDBC (SQL authentication)

You can follow the steps by running the steps in the 2_5.Reading and Writing Data from and to Azure Synapse.ipynb notebook in your local cloned repository in the Chapter02 folder.

Upload the csvFiles folder in the Chapter02/Customer folder to the ADLS Gen2 account in the rawdata file system. We have mounted the rawdata container as /mnt/Gen2Source.

How to do it…

In this section, you will see the steps for reading data from an ADLS Gen2 storage account and writing data to Azure Synapse Analytics using the Azure Synapse connector:

  1. You need to mount the storage location for accessing the data files from the storage account. You can find detailed steps on how to mount ADLS Gen-2 in the Mounting ADLS Gen-2 and Azure Blob Storage to Azure Databricks File System recipe. Run the following to mount ADLS Gen-2 Storage Account.
    storageAccount="teststoragegen2"
    mountpoint = "/mnt/Gen2Source"
    storageEndPoint ="abfss://rawdata@{}.dfs.core.windows.net/".format(storageAccount)
    print ('Mount Point ='+mountpoint)
    #ClientId, TenantId and Secret is for the Application(ADLSGen2App) was have created as part of this recipe
    clientID ="xxx-xxx-xx-xxx"
    tenantID ="xx-xxx-xxx-xxx"
    clientSecret ="xx-xxx-xxx-xxx"
    oauth2Endpoint = "https://login.microsoftonline.com/{}/oauth2/token".format(tenantID)
    configs = {"fs.azure.account.auth.type": "OAuth",
    "fs.azure.account.oauth.provider.type": "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider",
    "fs.azure.account.oauth2.client.id": clientID,
    "fs.azure.account.oauth2.client.secret": clientSecret,
    "fs.azure.account.oauth2.client.endpoint": oauth2Endpoint}
    dbutils.fs.mount(
    source = storageEndPoint,
    mount_point = mountpoint,
    extra_configs = configs)
  2. Copy and run the following code to see the csv files in the ADLS Gen-2 account:
    display(dbutils.fs.ls("/mnt/Gen2Source/Customer/csvFiles "))
  3. Provide the Azure Storage account configuration like Storage Account name and Storage Key. This is used by both the Azure Databricks cluster and Azure Synapse Dedicated SQL Pool to access a common Blob Storage Account for exchanging data between them. Azure Synapse connector triggers the Spark job in Azure Databricks cluster to read and write data from and to the common Blob Storage Account.
    blobStorage = "stgcccookbook.blob.core.windows.net"
    blobContainer = "synapse"
    blobAccessKey = "xxxxxxxxxxxxxxxxxxxxxxxxx"
  4. The following code will be used for specifying an intermediate temporary folder required for moving data between Azure Databricks and Azure Synapse:
    tempDir = "wasbs://" + blobContainer + "@" + blobStorage +"/tempDirs"
  5. Execute the following code to store the Azure Blob storage access keys in the Notebook session configuration. This configuration will not impact other Notebooks attached to the same cluster.
    acntInfo = "fs.azure.account.key."+ blobStorage
    spark.conf.set(
      acntInfo,
      blobAccessKey)
  6. Run the following code to load the DataFrame from the csv files in the ADLS Gen-2 Storage Account:
    customerDF = 
    spark.read.format("csv").option("header",True).option("inferSchema", True).load("dbfs:/mnt/Gen2Source/Customer/csvFiles") 
  7. Run the following command to store the JDBC URL for the Synapse Dedicated SQL Pool in a variable.
    # We have changed trustServerCertificate=true from trustServerCertificate=false. In certain cases, you might get errors like 
    '''
    The driver could not establish a secure connection to SQL Server by using Secure Sockets Layer (SSL) encryption. Error: "Failed to validate the server name in a certificate during Secure Sockets Layer (SSL) initialization.". ClientConnectionId:sadasd-asds-asdasd [ErrorCode = 0] [SQLState = 08S '''
      
    sqlDwUrl="jdbc:sqlserver://synapsetestdemoworkspace.sql.azuresynapse.net:1433;database=sqldwpool1;user=sqladminuser@synapsetestdemoworkspace;password=xxxxxxx;encrypt=true;trustServerCertificate=true;hostNameInCertificate=*.sql.azuresynapse.net;loginTimeout=30;"
    db_table = "dbo.customer"
  8. Execute the following code to load the customerDF DataFrame as a table into Azure Synapse Dedicated SQL Pool. This creates a table named CustomerTable in the SQL database. You can query the table using SQL Server Management Studio (SSMS) or from Synapse Studio. This is the default save mode where when writing a DataFrame to Dedicated SQL Pool, if table already exists then an exception is thrown else it will create a table and populated the data in the table:
    customerDF.write \
      .format("com.databricks.spark.sqldw")\
      .option("url", sqlDwUrl)\
      .option("forwardSparkAzureStorageCredentials", "true")\
      .option("dbTable", db_table)\
      .option("tempDir", tempDir)\
      .save()
  9. Connect to the SQL database using SSMS, and you can query data from the CustomerTable table:
    Figure 2.16 – Azure Synapse table verification

    Figure 2.16 – Azure Synapse table verification

  10. Run the following code which will append the data to an existing dbo.Customer table.
    # This code is writing to data into SQL Pool with append save option. In append save option data is appended to existing table.
    customerDF.write \
      .format("com.databricks.spark.sqldw")\
      .option("url", sqlDwUrl)\
      .option("forwardSparkAzureStorageCredentials", "true")\
      .option("dbTable", db_table)\
      .option("tempDir", tempDir)\
      .mode("append")\
      .save()
  11. Run the following code which will overwrite the data in an existing dbo.Customer table.
    customerDF.write \
      .format("com.databricks.spark.sqldw")\
      .option("url", sqlDwUrl)\
      .option("forwardSparkAzureStorageCredentials", "true")\
      .option("dbTable", db_table)\
      .option("tempDir", tempDir)\
      .mode("overwrite")\
      .save()
  12. Run the following code to read data from Azure Synapse Dedicated SQL Pool using an Azure Synapse connector:
    customerTabledf = spark.read \
      .format("com.databricks.spark.sqldw") \
      .option("url", sqlDwUrl) \
      .option("tempDir", tempDir) \
      .option("forwardSparkAzureStorageCredentials", "true") \
      .option("dbTable", db_table) \
      .load()
  13. You can see the result in the DataFrame by using the following code:
    customerTabledf.show()
  14. Instead of table you can use query to read data from Dedicated SQL Pool. You can see from the following code that we have used option("query",query) to run a query on the database.
    query= " select C_MKTSEGMENT, count(*) as Cnt from [dbo].[customer] group by C_MKTSEGMENT"
    df_query = spark.read \
      .format("com.databricks.spark.sqldw") \
      .option("url", sqlDwUrl) \
      .option("tempDir", tempDir) \
      .option("forwardSparkAzureStorageCredentials", "true") \
      .option("query", query) \
      .load()
  15. You can execute the following code to display the results.
    display(df_query.limit(5))

By the end of this recipe, you have learnt how to read and write data from and to Synapse Dedicated SQL Pool using the Azure Synapse Connector from Azure Databricks.

How it works…

In Azure Synapse, data loading and unloading operations are performed by PolyBase and are triggered by the Azure Synapse connector through JDBC.

For Databricks Runtime 7.0 and above, the Azure Synapse connector through JDBC uses COPY to load data into Azure Synapse.

The Azure Synapse connector triggers Apache Spark jobs to read and write data to the Blob storage container.

We are using spark.conf.set (acntInfo, blobAccessKey) so that Spark connects to the Storage Blob container using the built-in connectors. We can use ADLS Gen-2 as well to the store the data read from Synapse or data written to Synapse Dedicated SQL Pool.

Spark connects to Synapse using the JDBC drivers with username and password. Azure Synapse connects to Azure Storage Blob account using account key and it can use Managed Service Identity as well to connect.

Following is the JDBC connection string used in the Notebook. In this case we have changed trustServerCertificate value to true from false (which is default value) and by doing so, Microsoft JDBC Driver for SQL Server won't validate the SQL Server TLS certificate. This change must be done only in test environments and not in production.

sqlDwUrl="jdbc:sqlserver://synapsetestdemoworkspace.sql.azuresynapse.net:1433;database=sqldwpool1;user=sqladminuser@synapsetestdemoworkspace;password=xxxxxxx;encrypt=true;trustServerCertificate=true;hostNameInCertificate=*.sql.azuresynapse.net;loginTimeout=30;"

If you are not creating the Master Key in the Synapse Dedicated SQL Pool you will be getting the following error while reading the data from the Dedicated SQL Pool from Azure Databricks Notebook.

Underlying SQLException(s):
  - com.microsoft.sqlserver.jdbc.SQLServerException: Please create a master key in the database or open the master key in the session before performing this operation. [ErrorCode = 15581] [SQLState = S0006]

When you are writing the data to Dedicated SQL Pools you will see the following messages while code is getting executed n the Notebook.

Waiting for Azure Synapse Analytics to load intermediate data from wasbs://[email protected]/tempDirs/2021-08-07/04-19-43-514/cb6dd9a2-da6b-4748-a34d-0961d2df981f/ into "dbo"."customer" using COPY.

Spark is writing the csv files to the common Blob Storage as parquet files and then Synapse uses COPY statement to load the parquet files to the final tables. You can check in Blob Storage Account, and you will find the parquet files created. When you look at the DAG for the execution (you will learn more about DAG in upcoming recipes) you will find that Spark is reading the CSV files and writing to Blob Storage as parquet format.

When you are reading data from Synapse Dedicated SQL Pool tables you will see the following messages in the Notebook while the code is getting executed.

Waiting for Azure Synapse Analytics to unload intermediate data under wasbs://[email protected]/tempDirs/2021-08-07/04-24-56-781/esds43c1-bc9e-4dc4-bc07-c4368d20c467/ using Polybase

Azure Synapse is writing the data from Dedicated SQL Pool to common Blob storage as parquet files first with snappy compression and then its is read by Spark and displayed to the end user when customerTabledf.show() is executed.

We are setting up the account key and secret for the common Blob Storage in the session configuration associated with the Notebook using spark.conf.set and setting the value forwardSparkAzureStorageCredentials as true. By setting the value as true, Azure Synapse connector will forward the Storage Account access key to the Azure Synapse Dedicated Pool by creating a temporary Azure Database Scoped Credentials.