Book Image

Essential PySpark for Scalable Data Analytics

By : Sreeram Nudurupati
Book Image

Essential PySpark for Scalable Data Analytics

By: Sreeram Nudurupati

Overview of this book

Apache Spark is a unified data analytics engine designed to process huge volumes of data quickly and efficiently. PySpark is Apache Spark's Python language API, which offers Python developers an easy-to-use scalable data analytics framework. Essential PySpark for Scalable Data Analytics starts by exploring the distributed computing paradigm and provides a high-level overview of Apache Spark. You'll begin your analytics journey with the data engineering process, learning how to perform data ingestion, cleansing, and integration at scale. This book helps you build real-time analytics pipelines that help you gain insights faster. You'll then discover methods for building cloud-based data lakes, and explore Delta Lake, which brings reliability to data lakes. The book also covers Data Lakehouse, an emerging paradigm, which combines the structure and performance of a data warehouse with the scalability of cloud-based data lakes. Later, you'll perform scalable data science and machine learning tasks using PySpark, such as data preparation, feature engineering, and model training and productionization. Finally, you'll learn ways to scale out standard Python ML libraries along with a new pandas API on top of PySpark called Koalas. By the end of this PySpark book, you'll be able to harness the power of PySpark to solve business problems.
Table of Contents (19 chapters)
1
Section 1: Data Engineering
6
Section 2: Data Science
13
Section 3: Data Analysis

Big data processing with Spark SQL and DataFrames

The Spark SQL engine supports two types of APIs, namely, DataFrame and Spark SQL. Being higher-level abstractions than RDDs, these are far more intuitive and even more expressive. They come with many more data transformation functions and utilities that you might already be familiar with as a data engineer, data analyst, or a data scientist.

Spark SQL and DataFrame APIs offer a low barrier to entry into big data processing. They allow you to use your existing knowledge and skills of data analytics and allow you to easily get started with Distributed Computing. They help you get started with processing data at scale, without having to deal with any of the complexities that typically come along with Distributed Computing frameworks.

In this section, you will learn how to use both DataFrame and Spark SQL APIs to get started with your scalable data processing journey. Notably, the concepts learned here will be useful and are required throughout this book.

Transforming data with Spark DataFrames

Starting with Apache Spark 1.3, the Spark SQL engine was added as a layer on top of the RDD API and expanded to every component of Spark, to offer an even easier to use and familiar API for developers. Over the years, the Spark SQL engine and its DataFrame and SQL APIs have grown to be even more robust and have become the de facto and recommended standard for using Spark in general. Throughout this book, you will be exclusively using either DataFrame operations or Spark SQL statements for all your data processing needs, and you will rarely ever use the RDD API.

Think of a Spark DataFrame as a Pandas DataFrame or a relational database table with rows and named columns. The only difference is that a Spark DataFrame resides in the memory of several machines instead of a single machine. The following diagram shows a Spark DataFrame with three columns distributed across three worker machines:

Figure 1.4 – A distributed DataFrame

Figure 1.4 – A distributed DataFrame

A Spark DataFrame is also an immutable data structure such as an RDD, consisting of rows and named columns, where each individual column can be of any type. Additionally, DataFrames come with operations that allow you to manipulate data, and we generally refer to these set of operations as Domain Specific Language (DSL). Spark DataFrame operations can be grouped into two main categories, namely, transformations and actions, which we will explore in the following sections.

One advantage of using DataFrames or Spark SQL over the RDD API is that the Spark SQL engine comes with a built-in query optimizer called Catalyst. This Catalyst optimizer analyzes user code, along with any available statistics on the data, to generate the best possible execution plan for the query. This query plan is further converted into Java bytecode, which runs natively inside the Executor's Java JVM. This happens irrespective of the programming language used, thus making any code processed via the Spark SQL engine equally performant in most cases, whether it be written using Scala, Java, Python, R, or SQL.

Transformations

Transformations are operations performed on DataFrames that manipulate the data in the DataFrame and result in another DataFrame. Some examples of transformations are read, select, where, filter, join, and groupBy.

Actions

Actions are operations that actually cause a result to be calculated and either printed onto the console or, more practically, written back to a storage location. Some examples of actions include write, count, and show.

Lazy evaluation

Spark transformations are lazily evaluated, which means that transformations are not evaluated immediately as they are declared, and data is not manifested in memory until an action is called. This has a few advantages, as it gives the Spark optimizer an opportunity to evaluate all of your transformations until an action is called and generate the most optimal plan of execution to get the best performance and efficiency out of your code.

The advantage of Lazy Evaluation coupled with Spark's Catalyst optimizer is that you can solely focus on expressing your data transformation logic and not worry too much about arranging your transformations in a specific order to get the best performance and efficiency out of your code. This helps you be more productive at your tasks and not become perplexed by the complexities of a new framework.

Important note

Compared to Pandas DataFrames, Spark DataFrames are not manifested in memory as soon as they are declared. They are only manifested in memory when an action is called. Similarly, DataFrame operations don't necessarily run in the order you specified them to, as Spark's Catalyst optimizer generates the best possible execution plan for you, sometimes even combining a few operations into a single unit.

Let's take the word count example that we previously implemented using the RDD API and try to implement it using the DataFrame DSL:

from pyspark.sql.functions import split, explode
linesDf = spark.read.text("/databricks-datasets/README.md")
wordListDf = linesDf.select(split("value", " ").alias("words"))
wordsDf = wordListDf.select(explode("words").alias("word"))
wordCountDf = wordsDf.groupBy("word").count()
wordCountDf.show()
wordCountDf.write.csv("/tmp/wordcounts.csv")

In the previous code snippet, the following occurs:

  1. First, we import a few functions from the PySpark SQL function library, namely, split and explode.
  2. Then, we read text using the SparkSession read.text() method, which creates a DataFrame of lines of StringType.
  3. We then use the split() function to separate out every line into its individual words; the result is a DataFrame with a single column, named value, which is actually a list of words.
  4. Then, we use the explode() function to separate the list of words in each row out to every word on a separate row; the result is a DataFrame with a column labeled word.
  5. Now we are finally ready to count our words, so we group our words by the word column and count individual occurrences of each word. The final result is a DataFrame of two columns, that is, the actual word and its count.
  6. We can view a sample of the result using the show() function, and, finally, save our results in persistent storage using the write() function.

Can you guess which operations are actions? If you guessed show() or write(), then you are correct. Every other function, including select() and groupBy(), are transformations and will not induce the Spark job into action.

Note

Although the read() function is a transformation, sometimes, you will notice that it will actually execute a Spark job. The reason for this is that with certain structured and semi-structured data formats, Spark will try and infer the schema information from the underlying files and will process a small subset of the actual files to do this.

Using SQL on Spark

SQL is an expressive language for ad hoc data exploration and business intelligence types of queries. Because it is a very high-level declarative programming language, the user can simply focus on the input and output and what needs to be done to the data and not worry too much about the programming complexities of how to actually implement the logic. Apache Spark's SQL engine also has a SQL language API along with the DataFrame and Dataset APIs.

With Spark 3.0, Spark SQL is now compliant with ANSI standards, so if you are a data analyst who is familiar with another SQL-based platform, you should be able to get started with Spark SQL with minimal effort.

Since DataFrames and Spark SQL utilize the same underlying Spark SQL engine, they are completely interchangeable, and it is often the case that users intermix DataFrame DSL with Spark SQL statements for parts of the code that are expressed easily with SQL.

Now, let's rewrite our word count program using Spark SQL. First, we create a table specifying our text file to be a CSV file with a white space as the delimiter, a neat trick to read each line of the text file, and also split each file into individual words all at once:

CREATE TABLE word_counts (word STRING)
USING csv
OPTIONS("delimiter"=" ")
LOCATION "/databricks-datasets/README.md"

Now that we have a table of a single column of words, we just need to GROUP BY the word column and do a COUNT() operation to get our word counts:

SELECT word, COUNT(word) AS count
FROM word_counts
GROUP BY word

Here, you can observe that solving the same business problem became progressively easier from using MapReduce to RRDs, to DataFrames and Spark SQL. With each new release, Apache Spark has been adding many higher-level programming abstractions, data transformation and utility functions, and other optimizations. The goal has been to enable data engineers, data scientists, and data analysts to focus their time and energy on solving the actual business problem at hand and not worry about complex programming abstractions or system architectures.

Apache Spark's latest major release of version 3 has many such enhancements that make the life of a data analytics professional much easier. We will discuss the most prominent of these enhancements in the following section.

What's new in Apache Spark 3.0?

There are many new and notable features in Apache Spark 3.0; however, only a few are mentioned here, which you will find very useful during the beginning phases of your data analytics journey:

  • Speed: Apache Spark 3.0 is orders of magnitude faster than its predecessors. Third-party benchmarks have put Spark 3.0 to be anywhere from 2 to 17 times faster for certain types of workloads.
  • Adaptive Query Execution: The Spark SQL engine generates a few logical and physical query execution plans based on user code and any previously collected statistics on the source data. Then, it tries to choose the most optimal execution plan. However, sometimes, Spark is not able to generate the best possible execution plan either because the statistics are either stale or non-existent, leading to suboptimal performance. With adaptive query execution, Spark is able to dynamically adjust the execution plan during runtime and give the best possible query performance.
  • Dynamic Partition Pruning: Business intelligence systems and data warehouses follow a data modeling technique called Dimensional Modeling, where data is stored in a central fact table surrounded by a few dimensional tables. Business intelligence types of queries utilizing these dimensional models involve queries with multiple joins between the dimension and fact tables, along with various filter conditions on the dimension tables. With dynamic partition pruning, Spark is able to filter out any fact table partitions based on the filters applied on these dimensions, resulting in less data being read into the memory, which, in turn, results in better query performance.
  • Kubernetes Support: Earlier, we learned that Spark comes with its own Standalone Cluster Manager and can also work with other popular resource managers such as YARN and Mesos. Now Spark 3.0 natively supports Kubernetes, which is a popular open source framework for running and managing parallel container services.