Book Image

Optimizing Databricks Workloads

By : Anirudh Kala, Anshul Bhatnagar, Sarthak Sarbahi
Book Image

Optimizing Databricks Workloads

By: Anirudh Kala, Anshul Bhatnagar, Sarthak Sarbahi

Overview of this book

Databricks is an industry-leading, cloud-based platform for data analytics, data science, and data engineering supporting thousands of organizations across the world in their data journey. It is a fast, easy, and collaborative Apache Spark-based big data analytics platform for data science and data engineering in the cloud. In Optimizing Databricks Workloads, you will get started with a brief introduction to Azure Databricks and quickly begin to understand the important optimization techniques. The book covers how to select the optimal Spark cluster configuration for running big data processing and workloads in Databricks, some very useful optimization techniques for Spark DataFrames, best practices for optimizing Delta Lake, and techniques to optimize Spark jobs through Spark core. It contains an opportunity to learn about some of the real-world scenarios where optimizing workloads in Databricks has helped organizations increase performance and save costs across various domains. By the end of this book, you will be prepared with the necessary toolkit to speed up your Spark jobs and process your data more efficiently.
Table of Contents (13 chapters)
1
Section 1: Introduction to Azure Databricks
5
Section 2: Optimization Techniques
10
Section 3: Real-World Scenarios

Learning about Delta Lake

Delta Lake was launched by Databricks as an open source project owned by the Linux Foundation that converts a traditional data lake into a lakehouse. The term lakehouse refers to a platform that brings in the best of both data lakes and warehouses. Delta Lake offers the following features:

  • ACID transactions: Data readers never see inconsistent data, also called dirty reads.
  • Handling metadata: Spark's distributed processing power makes it easy to handle the metadata of tables with terabytes and petabytes of data in a scalable fashion.
  • Streaming and batch workloads: Delta Lake helps unify stream and batch processing on data.
  • Schema enforcement: The schema is always checked before data gets appended. This helps to prevent the corruption of data.
  • Time travel: View and roll back to previous versions of data enabling audit logs.
  • Upserts and deletes: Perform transactions such as updates, inserts, and deletes on data lying in the lake.

Next, we'll look at big data file formats.

Big data file formats

Before we dive deeper into Delta Lake, let's first try to understand the file formats used to store big data. Traditional file formats such as CSV and TSV store data in a row-wise format and are not partitioned. CSVs are basically strings without any data types so they always need to be scanned entirely without any scope for filtering. This makes it difficult for processing and querying larger datasets. Instead, file formats such as Parquet, ORC, and Avro help us overcome many such challenges as they can be stored in a distributed fashion.

Note

Row-based file formats store data by row, whereas columnar file formats store data by column. Row-based file formats work best for transactional writes of data, whereas columnar file formats are ideal for data querying and analytical workloads.

Let us look at row-based file formats versus columnar file formats in the following image:

Figure 1.17 – Row-based file formats versus columnar

Figure 1.17 – Row-based file formats versus columnar

The similarities between Parquet, ORC, and Avro are as follows:

  • All three file formats are machine-readable formats and not human-readable.
  • They can be partitioned across a cluster for parallel processing and distribution.
  • The formats carry the data schema as well. This helps newer machines or clusters to process the data independently.

The differences between Parquet, ORC, and Avro are as follows:

Figure 1.18 – Comparison of Avro, Parquet, and ORC

Figure 1.18 – Comparison of Avro, Parquet, and ORC

Coming back to Delta Lake, it can simply be treated as a file format. Tables that are created on top of this delta file format are simply called delta tables. The delta file format is mainly composed of two components:

  • Transactional logs: A _delta_log folder is created when data is written in the delta file format. This folder stores files that record all the transactions to data.
  • Versioned Parquet files: This is the actual data that is written out as partitions. These Parquet partition files (.parquet) can also be compacted later using different functions. For efficient querying purposes, these Parquet partition files can also be distributed based on partition folders.
Figure 1.19 – Data written in delta format as viewed in Azure Blob Storage

Figure 1.19 – Data written in delta format as viewed in Azure Blob Storage

It's also important for us to understand the use and value of the transactional log.

Understanding the transactional log

Having an understanding of the transactional log is imperative when working with Delta Lake. Let's take a peek at the contents of the _delta_log folder.

Figure 1.20 – Contents of the _delta_log folder as viewed in Azure Blob Storage

Figure 1.20 – Contents of the _delta_log folder as viewed in Azure Blob Storage

Whenever a transaction is carried out on a delta table, the changes are recorded in the _delta_log folder in the form of JSON files. The naming conventions of these JSON files begin sequentially, starting with 000000.json. Subsequent JSON files are created as changes get committed (000001.json, 000002.json, and so on). Also, with each fresh transaction, a new set of Parquet files may be written. In this process, the new JSON file created in the _delta_log folder keeps a record of which Parquet files to reference and which to omit. This happens because every transaction to a delta table results in a new version of the table.

Figure 1.21 – JSON files in _delta_log

Figure 1.21 – JSON files in _delta_log

Let's see how this works with an example. Suppose we have a delta table with a _delta_log folder containing 00000.json. Suppose this JSON file references two Parquet files, part-0000.parquet and part-0001.parquet.

Now we have an UPDATE transaction carried out on the delta table. This creates a new JSON file in the _delta_log folder by the name of 00001.json. Also, a new Parquet file is added in the delta table's directory, part-0002.parquet. Upon checking the new JSON file, we find that it references part-0001.parquet and part-0002.parquet but omits part-0000.parquet.

Delta Lake in action

Let's start by creating a Spark DataFrame by reading a CSV file. Create a new Databricks Python notebook and spin up a Spark cluster with one driver, one worker, the standard type, and autoscaling disabled. Every code block in the following section must be executed in a new notebook cell:

  1. We will be using the airlines dataset from the databricks-datasets repository. Databricks provides many sample datasets in every workspace. These are part of the databricks-datasets directory of the DBFS. The following code block creates a new Spark DataFrame by specifying the first row as the header, automatically inferring the schema, and reading from a CSV file. Once the DataFrame is created, we will display the first five rows:
    airlines = (spark.read
                .option("header",True)
                .option("inferSchema",True)
                .option("delimiter",",")
                .csv("dbfs:/databricks-datasets/airlines/part-00000")
    # View the dataframe
    display(airlines.limit(5))
  2. Next, we will write the DataFrame as a delta file in the DBFS. Once the writing process is complete, we can look at the contents of the delta file. It contains four Parquet files and a _delta_log folder:
    airlines.write.mode("overwrite").format("delta").save("dbfs:/airlines/")

    We can view the location where the data is written in delta format:

    display(dbutils.fs.ls("dbfs:/airlines/"))
  3. Inside the _delta_log folder, we can find one JSON file:
    display(dbutils.fs.ls("dbfs:/airlines/_delta_log/"))
  4. Now, we will create a delta table using the delta file that is written to Azure Blob Storage. Here, we will switch from PySpark to Spark SQL syntax using the %sql magic command. The name of the delta table created is airlines_delta_table. A count operation on the newly created delta table returns the number of records in the table:
    %sql
    DROP TABLE IF EXISTS airlines_delta_table;
    CREATE TABLE airlines_delta_table USING DELTA LOCATION "dbfs:/airlines/";
    %sql
    SELECT COUNT(*) as count FROM airlines_delta_table
  5. Let's perform a DELETE operation on the delta table. This will delete all the rows where the Month column equals 10. This deletes 448,620 rows from the delta table:
    %sql
    DELETE FROM airlines_delta_table WHERE Month = '10'
  6. Next, we will perform an UPDATE operation on the delta table. This transaction will update the Dest column and replace all the SFO values with San Francisco. We can also see that 7,575 rows received updates in the table:
    %sql
    UPDATE airlines_delta_table SET Dest = 'San Francisco' WHERE Dest = 'SFO'
  7. Before we move forward, let's look at the Parquet files and transactional logs folder once again. Inside the delta file, we can see that more Parquet files have been added after two transactions (DELETE and UPDATE) were carried out:
    display(dbutils.fs.ls("dbfs:/airlines/"))
  8. Also, the _delta_log folder now contains two more JSON files, one for each transaction:
    display(dbutils.fs.ls("dbfs:/airlines/_delta_log/"))
  9. Finally, it is time for time travel! Running the DESCRIBE HISTORY command on the delta table returns a list of all the versions of the table:
    %sql
    -- Time travel
    DESCRIBE HISTORY airlines_delta_table
  10. Switching to a previous version is as easy as adding VERSION AS OF to the delta table. First, we'll try to query the data based on the condition that got updated. For instance, after the update operation, no record should have the SFO value. Hence, we get a count of 0:
    %sql
    -- Return count of rows where Dest = 'SFO' in current version that is version 2
    SELECT COUNT(*) FROM airlines_delta_table WHERE Dest = 'SFO'
  11. But when the same query is run on the previous version of the delta table (version 1), we get a count of 7,575. This is because this SQL query is querying on the data that existed before the update operation:
    %sql
    -- Return count of rows where Dest = 'SFO' in version 1
    SELECT COUNT(*) FROM airlines_delta_table VERSION AS OF 1 WHERE Dest = 'SFO'

Let's recap what we've covered in this first chapter.