Book Image

Azure Databricks Cookbook

By : Phani Raj, Vinod Jaiswal
Book Image

Azure Databricks Cookbook

By: Phani Raj, Vinod Jaiswal

Overview of this book

Azure Databricks is a unified collaborative platform for performing scalable analytics in an interactive environment. The Azure Databricks Cookbook provides recipes to get hands-on with the analytics process, including ingesting data from various batch and streaming sources and building a modern data warehouse. The book starts by teaching you how to create an Azure Databricks instance within the Azure portal, Azure CLI, and ARM templates. You’ll work through clusters in Databricks and explore recipes for ingesting data from sources, including files, databases, and streaming sources such as Apache Kafka and EventHub. The book will help you explore all the features supported by Azure Databricks for building powerful end-to-end data pipelines. You'll also find out how to build a modern data warehouse by using Delta tables and Azure Synapse Analytics. Later, you’ll learn how to write ad hoc queries and extract meaningful insights from the data lake by creating visualizations and dashboards with Databricks SQL. Finally, you'll deploy and productionize a data pipeline as well as deploy notebooks and Azure Databricks service using continuous integration and continuous delivery (CI/CD). By the end of this Azure book, you'll be able to use Azure Databricks to streamline different processes involved in building data-driven apps.
Table of Contents (12 chapters)

Reading and writing data from and to ADLS Gen2

In this recipe, you will learn how to read and write data to ADLS Gen2 from Databricks. We can do this by following these two methods:

  • Mounting storage: Covered in the Mounting ADLS Gen2 and Azure Blob storage to Azure DBFS recipe of this chapter.
  • Directly accessing the ADLS Gen2 storage using a SAS token and a service principal: In this scenario, we will not mount the storage, but we will directly access the storage endpoint to read and write files using storage keys, service principals, and OAuth 2.0.

ADLS Gen2 provides file system semantics, which provides security to folders and files, and the hierarchical directory structure provides efficient access to the data in the storage.

By the end of this recipe, you will know multiple ways to read/write files from and to an ADLS Gen2 account.

Getting ready

You will need to ensure you have the following items before starting to work on this recipe:

  • An ADLS Gen2 account, mounted by following the steps in the first recipe of this chapter, Mounting ADLS Gen2 and Azure Blob to Azure Databricks File System.
  • Storage keys—you can get these by following the steps mentioned in the first recipe of this chapter, Mounting ADLS Gen2 and Azure Blob to Azure Databricks File System.

You can follow along by running the steps in the 2-3.Reading and Writing Data from and to ADLS Gen-2.ipynb notebook in your local cloned repository in the Chapter02 folder.

Upload the csvFiles folder in the Chapter02/Customer folder to the ADLS Gen2 account in the rawdata file system.

Note

We have tested the steps mentioned in this recipe on Azure Databricks Runtime version 6.4 which includes Spark 2.4.5 and on Runtime version 7.3 LTS which includes Spark 3.0.1

How to do it…

We will learn how to read CSV files from the mount point and the ADLS Gen2 storage directly. We will perform basic aggregation on the DataFrame, such as counting and storing the result in another csv file.

Working with the mount point, we'll proceed as follows:

  1. Let's list the CSV files we are trying to read from the mount point:
    display(dbutils.fs.ls("/mnt/Gen2/Customer/csvFiles/"))
  2. We will read the csv files directly from the mount point without specifying any schema options:
    df_cust= spark.read.format("csv").option("header",True).load("/mnt/Gen2/Customer/csvFiles/")
  3. When you run df_cust.printSchema(), you will find that the datatypes for all columns are strings.
  4. Next, we will run the same code as in the preceding step, but this time asking Spark to infer the schema from csv files by using option("header","true"):
    df_cust= spark.read.format("csv").option("header",True).option("inferSchema", True).load("/mnt/Gen2/Customer/csvFiles/")
  5. Run df_cust.printSchema(), and you will find the datatype has changed for a few columns such as CustKey, where the datatype now being shown is an integer instead of a string.
  6. We will now create a schema and explicitly provide it while reading the csv files using a DataFrame:
    cust_schema = StructType([
        StructField("C_CUSTKEY", IntegerType()),
        StructField("C_NAME", StringType()),
        StructField("C_ADDRESS", StringType()),
        StructField("C_NATIONKEY", ShortType()),
        StructField("C_PHONE", StringType()),
        StructField("C_ACCTBAL", DoubleType()),
        StructField("C_MKTSEGMENT", StringType()),
        StructField("C_COMMENT", StringType())
    ])
  7. Create a DataFrame by using the schema created in the preceding step:
    df_cust= spark.read.format("csv").option("header",True).schema(cust_schema).load("/mnt/Gen2/Customer/csvFiles/")
  8. In the following step, we will be performing basic aggregation on the DataFrame:
    df_cust_agg = df_cust.groupBy("C_MKTSEGMENT") .agg(sum("C_ACCTBAL").cast('decimal(20,3)').alias("sum_acctbal"), avg("C_ACCTBAL").alias("avg_acctbal"), max("C_ACCTBAL").alias("max_bonus")).orderBy("avg_acctbal",ascending=False)
  9. We will write the DataFrame we created in the preceding step to the mount point and save it in CSV format:
    df_cust_agg.write.mode("overwrite").option("header", "true").csv("/mnt/Gen-2/CustMarketSegmentAgg/"))
  10. To list the CSV file created, run the following code:
    (dbutils.fs.ls("/mnt/Gen-2/CustMarketSegmentAgg/"))

We'll now work with an ADLS Gen2 storage account without mounting it to DBFS:

  1. You can access an ADLS Gen2 storage account directly without mounting to DBFS using OAuth 2.0 and a service principal. You can access any ADLS Gen2 storage account that the service principal has permissions on. We need to set the credentials first in our notebook before we can directly access the file system. clientID and clientSecret are the variables defined in the notebook:
    spark.conf.set("fs.azure.account.auth.type.cookbookadlsgen2storage.dfs.core.windows.net", "OAuth")
    spark.conf.set("fs.azure.account.oauth.provider.type.cookobookadlsgen2storage.dfs.core.windows.net", "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider")
    spark.conf.set("fs.azure.account.oauth2.client.id.cookbookadlsgen2storage.dfs.core.windows.net", clientID)
    spark.conf.set("fs.azure.account.oauth2.client.secret.cookbookadlsgen2storage.dfs.core.windows.net", clientSecret)
    spark.conf.set("fs.azure.account.oauth2.client.endpoint.cookbookadlsgen2storage.dfs.core.windows.net", oauth2Endpoint)
  2. After the preceding step is executed, you can directly read the csv files from the ADLS Gen2 storage account without mounting it:
    df_direct = spark.read.format("csv").option("header",True).schema(cust_schema).load("abfss://[email protected]/Customer/csvFiles")
  3. You can view a few records of the DataFrame by executing the following code:
    display(df_direct.limit(10))
  4. We will now write a DataFrame in Parquet format in the ADLS Gen2 storage account directly, without using the mount point, by executing the following code. We are repartitioning the DataFrame to ensure we are creating 10 Parquet files:
    parquetCustomerDestDirect = "abfss://[email protected]/Customer/parquetFiles"
    df_direct_repart=df_direct.repartition(10)
    df_direct_repart.write.mode("overwrite").option("header", "true").parquet(parquetCustomerDestDirect)
  5. You can create a DataFrame on the Parquet files created in the preceding step to ensure we are able to read the data:
    df_parquet = spark.read.format("parquet").option("header",True).schema(cust_schema).load("abfss://[email protected]/Customer/parquetFiles")
  6. You can view the Parquet files created in the preceding step by executing the following code:
    display(dbutils.fs.ls(parquetCustomerDestDirect))

How it works…

The following code is set to directly access the ADL Gen2 storage account without mounting to DBFS. These settings are applicable when we are using DataFrame or dataset APIs:

spark.conf.set("fs.azure.account.auth.type.cookbookadlsgen2storage.dfs.core.windows.net", "OAuth")
spark.conf.set("fs.azure.account.oauth.provider.type.cookbookadlsgen2storage.dfs.core.windows.net", "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider")
spark.conf.set("fs.azure.account.oauth2.client.id.cookbookadlsgen2storage.dfs.core.windows.net", clientID)
spark.conf.set("fs.azure.account.oauth2.client.secret.cookbookadlsgen2storage.dfs.core.windows.net", clientSecret)
spark.conf.set("fs.azure.account.oauth2.client.endpoint.cookbookadlsgen2storage.dfs.core.windows.net", oauth2Endpoint)

You should set the preceding values in your notebook session if you want the users to directly access the ADLS Gen2 storage account without mounting to DBFS. This method is useful when you are doing some ad hoc analysis and don't want users to create multiple mount points when you are trying to access data from various ADLS Gen2 storage accounts.