Book Image

Azure Databricks Cookbook

By : Phani Raj, Vinod Jaiswal
Book Image

Azure Databricks Cookbook

By: Phani Raj, Vinod Jaiswal

Overview of this book

Azure Databricks is a unified collaborative platform for performing scalable analytics in an interactive environment. The Azure Databricks Cookbook provides recipes to get hands-on with the analytics process, including ingesting data from various batch and streaming sources and building a modern data warehouse. The book starts by teaching you how to create an Azure Databricks instance within the Azure portal, Azure CLI, and ARM templates. You’ll work through clusters in Databricks and explore recipes for ingesting data from sources, including files, databases, and streaming sources such as Apache Kafka and EventHub. The book will help you explore all the features supported by Azure Databricks for building powerful end-to-end data pipelines. You'll also find out how to build a modern data warehouse by using Delta tables and Azure Synapse Analytics. Later, you’ll learn how to write ad hoc queries and extract meaningful insights from the data lake by creating visualizations and dashboards with Databricks SQL. Finally, you'll deploy and productionize a data pipeline as well as deploy notebooks and Azure Databricks service using continuous integration and continuous delivery (CI/CD). By the end of this Azure book, you'll be able to use Azure Databricks to streamline different processes involved in building data-driven apps.
Table of Contents (12 chapters)

Reading and writing data from and to an Azure SQL database using native connectors

Reading and writing data from and to an Azure SQL database is the most important step in most data ingestion pipelines. You will have a step in your data ingestion pipeline where you will load the transformed data into Azure SQL or read raw data from Azure SQL to perform some transformations.

In this recipe, you will learn how to read and write data using SQL Server JDBC Driver and the Apache Spark connector for Azure SQL.

Getting ready

The Apache Spark connector for Azure SQL only supports Spark 2.4.x and 3.0.x clusters as of now and might change in future. SQL Server JDBC Driver supports both Spark 2.4.x and 3.0.x clusters. Before we start working on the recipe, we need to create a Spark 2.4.x or 3.0.x cluster. You can follow the steps mentioned in the Creating a cluster from the UI to create 2.x clusters recipe from Chapter 1, Creating an Azure Databricks Service.

We have used Databricks Runtime Version 7.3 LTS with Spark 3.0.1 having Scala version as 2.12 for this recipe. The code is tested with Databricks Runtime Version 6.4 that includes Spark 2.4.5 and Scala 2.11 as well

You need to create an Azure SQL database—to do so, follow the steps at this link:

https://docs.microsoft.com/en-us/azure/azure-sql/database/single-database-create-quickstart?tabs=azure-portal

After your Azure SQL database is created, connect to the database and create the following table in the newly created database:

CREATE TABLE [dbo].[CUSTOMER](
     [C_CUSTKEY] [int] NULL,
     [C_NAME] [varchar](25) NULL,
     [C_ADDRESS] [varchar](40) NULL,
     [C_NATIONKEY] [smallint] NULL,
     [C_PHONE] [char](15) NULL,
     [C_ACCTBAL] [decimal](18, 0) NULL,
     [C_MKTSEGMENT] [char](10) NULL,
     [C_COMMENT] [varchar](117) NULL
) ON [PRIMARY]
GO

Once the table is created, you can proceed with the steps mentioned in the How to do it… section. You can follow along the steps mentioned in the notebook 2_4.Reading and Writing from and to Azure SQL Database.ipynb.

How to do it…

You will learn how to use SQL Server JDBC Driver and the Apache Spark connector for Azure SQL to read and write data to and from an Azure SQL database. You will learn how to install the Spark connector for Azure SQL in a Databricks cluster.

Here are the steps to read data from an Azure SQL database using SQL Server JDBC Driver:

  1. First, create a variable for the connection string and the table from which we will be reading and writing the data. We will load the csv files from ADLS Gen2 that we saw in the Reading and writing data from and to ADLS Gen2 recipe:
    # Details about connection string
    logicalServername = "demologicalserver.database.windows.net"
    databaseName = "demoDB"
    tableName = "CUSTOMER"
    userName = "sqladmin"
    password = "Password@Strong12345" # Please specify password here
    jdbcUrl = "jdbc:sqlserver://{0}:{1};database={2}".format(logicalServername, 1433, databaseName)
    connectionProperties = {
      "user" : userName,
      "password" : password,
      "driver" : "com.microsoft.sqlserver.jdbc.SQLServerDriver"
    }
  2. As you can see from the preceding step, the driver we are using is called SQLServerDriver, which comes installed as part of Databricks Runtime.
  3. Create a schema for the csv files, store this in ADLS Gen-2, and mount the storage to DBFS. Follow the steps mentioned in the third recipe, Reading and writing data from and to ADLS Gen2, to learn how to mount storage to DBFS:
    #Creating a schema which can be passed while creating the DataFrame
    cust_schema = StructType([
        StructField("C_CUSTKEY", IntegerType()),
        StructField("C_NAME", StringType()),
        StructField("C_ADDRESS", StringType()),
        StructField("C_NATIONKEY", ShortType()),
        StructField("C_PHONE", StringType()),
        StructField("C_ACCTBAL", DecimalType(18,2)),
        StructField("C_MKTSEGMENT", StringType()),
        StructField("C_COMMENT", StringType())
    ])
  4. Once a schema is created, we will read the csv files in a DataFrame:
    # Reading customer csv files in a DataFrame. This Dataframe will be written to Customer table in Azure SQL DB
    df_cust= spark.read.format("csv").option("header",True).schema(cust_schema).load("dbfs:/mnt/Gen2/Customer/csvFiles")
  5. After the preceding step is executed, we will write the DataFrame to the dbo.CUSTOMER table that we have already created as part of the Getting ready section:
    df_cust.write.jdbc(jdbcUrl,  
                       mode ="append", 
                       table=tableName, 
                       properties=connectionProperties)
  6. After loading the data, we will read the table to count the number of records inserted in the table:
    df_jdbcRead= spark.read.jdbc(jdbcUrl,  
                       table=tableName, 
                       properties=connectionProperties)
    # Counting number of rows
    df_jdbcRead.count()

Here are the steps to read data from an Azure SQL database using Apache Spark Connector for Azure SQL Database.

Table 2.1 - Compatible connectors for Spark 2.4.x and Spark 3.0.x clusters

Table 2.1 - Compatible connectors for Spark 2.4.x and Spark 3.0.x clusters

You can also download the connector from https://search.maven.org/search?q=spark-mssql-connector.

  1. After a Spark 2.4.x or 3.0.x cluster is created, you need to install Spark connector for Azure SQL DB from Maven. Make sure you use the coordinates as mentioned in the preceding table. Go to the Databricks Clusters page and click on Libraries. Then, click on Install New and select the library source as Maven. Now, click on Search Packages and search for spark-mssql-connector:
    Figure 2.13 – Installing Spark connector on Azure SQL database

    Figure 2.13 – Installing Spark connector on Azure SQL database

  2. Under Search Packages, select Maven Central and search for spark-mssql-connector and select the version with artifact id spark-mssql-connector_2.12 with Releases as 1.1.0 as we are using Spark 3.0.1 cluster and click on Select. You can use any latest version which is available when you are going through the recipe. If you are using Spark 2.4.x cluster then you must use the version with Artifact Id as spark-mssql-connector with Releases version 1.0.2.
    Figure 2.14 – Installing Spark connector on Azure SQL database (continued)

    Figure 2.14 – Installing Spark connector on Azure SQL database (continued)

  3. After selecting the package, it gets installed and you will see the status as Installed:
    Figure 2.15 – Spark connector to Azure SQL database installed

    Figure 2.15 – Spark connector to Azure SQL database installed

  4. After the Spark connector for Azure SQL is installed then you can run the follow code for setting the connection string for Azure SQL
    server_name = f"jdbc:sqlserver://{logicalServername}" 
    database_name = "demoDB"
    url = server_name + ";" + "databaseName=" + database_name + ";"
    table_name = "dbo.Customer"
    username = "sqladmin"
    password = "xxxxxx" # Please specify password here
  5. After the Spark connector is installed, we will read the records from the dbo.CUSTOMER table using the newly installed Spark connector for Azure SQL:
    sparkconnectorDF = spark.read \
            .format("com.microsoft.sqlserver.jdbc.spark") \
            .option("url", url) \
            .option("dbtable", table_name) \
            .option("user", username) \
            .option("password", password).load()
  6. Run the following code to check the schema of the DataFrame created as part of the preceding step:
    display(sparkconnectorDF.printSchema())
  7. To view a few records from the DataFrame, run the following code:
    display(sparkconnectorDF.limit(10))
  8. Create a schema for the csv files, store this in ADLS Gen-2, and mount it to DBFS. Follow the steps mentioned in the Reading and writing data from and to ADLS Gen2 recipe to learn how to mount ADLS Gen-2 Storage Account to DBFS:
    #Creating a schema which can be passed while creating the DataFrame
    cust_schema = StructType([
        StructField("C_CUSTKEY", IntegerType()),
        StructField("C_NAME", StringType()),
        StructField("C_ADDRESS", StringType()),
        StructField("C_NATIONKEY", ShortType()),
        StructField("C_PHONE", StringType()),
        StructField("C_ACCTBAL", DecimalType(18,2)),
        StructField("C_MKTSEGMENT", StringType()),
        StructField("C_COMMENT", StringType())
    ])
  9. Once a schema is created, we will load the csv files in a DataFrame by running the following code:
    df_cust= spark.read.format("csv").option("header",True).schema(cust_schema).load("dbfs:/mnt/Gen2/Customer/csvFiles")
  10. In the following step, we will learn how we can write the DataFrame to an Azure SQL database table using the append method:
    #Appending records to the existing table
    try:
      df_cust.write \
        .format("com.microsoft.sqlserver.jdbc.spark") \
        .mode("append") \
        .option("url", url) \
        .option("dbtable", tableName) \
        .option("user", userName) \
        .option("password", password) \
        .save()
    except ValueError as error :
        print("Connector write failed", error)
  11. The preceding code will append the data in the existing table; if no table exists, then it will throw an error. The following code will overwrite the existing data in the table:
    try:
      df_cust.write \
        .format("com.microsoft.sqlserver.jdbc.spark") \
        .mode("overwrite") \
        .option("truncate",True) \
        .option("url", url) \
        .option("dbtable", tableName) \
        .option("user", userName) \
        .option("password", password) \
        .save()
    except ValueError as error :
        print("Connector write failed", error)
  12. As the last step, we will read the data loaded in the customer table to ensure the data is loaded properly:
    #Read the data from the table
    sparkconnectorDF = spark.read \
            .format("com.microsoft.sqlserver.jdbc.spark") \
            .option("url", url) \
            .option("dbtable", table_name) \
            .option("user", username) \
            .option("password", password).load()

How it works…

The Apache Spark connector works the latest version of Spark 2.4.x and Spark 3.0.x. It can be used for both SQL Server and Azure SQL Database and is customized for SQL Server and Azure SQL Database for performing big data analytics efficiently. The following document outlines the benefits of using the Spark connector and provides a performance comparison between the JDBC connector and the Spark connector:

https://docs.microsoft.com/en-us/sql/connect/spark/connector?view=sql-server-ver15

To overwrite the data using the Spark connector, we are in overwrite mode, which will drop and recreate the table with the scheme based on the source DataFrame schema, and none of the indexes that were present on that table will be added after the table is recreated. If we want to recreate the indexes with overwrite mode, then we need to include the True option (truncate). This option will ensure that after a table is dropped and created, the required index will be created as well.

Just to append data to an existing table, we will use append mode, whereby the existing table will not be dropped or recreated. If the table is not found, it throws an error. This option is used when we are just inserting data into a raw table. If it's a staging table where we want to truncate before load, then we need to use overwrite mode with the truncate option.