Book Image

Big Data Analysis with Python

By : Ivan Marin, Ankit Shukla, Sarang VK
Book Image

Big Data Analysis with Python

By: Ivan Marin, Ankit Shukla, Sarang VK

Overview of this book

Processing big data in real time is challenging due to scalability, information inconsistency, and fault tolerance. Big Data Analysis with Python teaches you how to use tools that can control this data avalanche for you. With this book, you'll learn practical techniques to aggregate data into useful dimensions for posterior analysis, extract statistical measurements, and transform datasets into features for other systems. The book begins with an introduction to data manipulation in Python using pandas. You'll then get familiar with statistical analysis and plotting techniques. With multiple hands-on activities in store, you'll be able to analyze data that is distributed on several computers by using Dask. As you progress, you'll study how to aggregate data for plots when the entire data cannot be accommodated in memory. You'll also explore Hadoop (HDFS and YARN), which will help you tackle larger datasets. The book also covers Spark and explains how it interacts with other tools. By the end of this book, you'll be able to bootstrap your own Python environment, process large files, and manipulate data to generate statistics, metrics, and graphs.
Table of Contents (11 chapters)
Big Data Analysis with Python
Preface

Chapter 04: Diving Deeper with Spark


Activity 9: Getting Started with Spark DataFrames

If you are using Google Collab to run the Jupyter notebook, add these lines to ensure you have set the environment:

!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q http://www-us.apache.org/dist/spark/spark-2.4.0/spark-2.4.0-bin-hadoop2.7.tgz
!tar xf spark-2.4.0-bin-hadoop2.7.tgz
!pip install -q findspark
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-2.4.2-bin-hadoop2.7"

Install findspark if not installed using the following command:

pip install -q findspark
  1. To create a sample DataFrame by manually specifying the schema, importing findspark module to connect Jupyter with Spark:

    import findspark
    findspark.init()
    import pyspark
    import os
  2. Create the SparkContext and SQLContext using the following command:

    sc = pyspark.SparkContext()
    from pyspark.sql import SQLContext
    sqlc = SQLContext(sc)
    
    from pyspark.sql import *
    na_schema = Row("Name","Subject","Marks")
    row1 = na_schema("Ankit", "Science",95)
    row2 = na_schema("Ankit", "Maths", 86)
    row3 = na_schema("Preity", "Maths", 92)
    na_list = [row1, row2, row3]
    df_na = sqlc.createDataFrame(na_list)
    type(df_na)

    The output is as follows:

    pyspark.sql.dataframe.DataFrame
  3. Check the DataFrame using the following command:

    df_na.show()

    The output is as follows:

    Figure 4.29: Sample DataFrame

  4. Create a sample DataFrame from an existing RDD. First creating RDD as illustrated here:

    data = [("Ankit","Science",95),("Preity","Maths",86),("Ankit","Maths",86)]
    data_rdd = sc.parallelize(data)
    type(data_rdd)

    The output is as follows:

    pyspark.rdd.RDD
  5. Converting RDD to DataFrame using the following command:

    data_df = sqlc.createDataFrame(data_rdd)
    data_df.show()

    The output is as follows:

    Figure 4.30: RDD to DataFrame

  6. Create a sample DataFrame by reading the data from a CSV file:

    df = sqlc.read.format('com.databricks.spark.csv').options(header='true', inferschema='true').load('mtcars.csv')
    type(df)

    The output is as follows:

    pyspark.sql.dataframe.DataFrame
  7. Print first seven rows of the DataFrame:

    df.show(7)

    The output is as follows:

    Figure 4.31: First seven rows of the DataFrame

  8. Print the schema of the DataFrame:

    df.printSchema()
  9. The output is as follows:

    Figure 4.32: Schema of the DataFrame

  10. Print the number of columns and rows in DataFrame:

    print('number of rows:'+ str(df.count()))
    print('number of columns:'+ str(len(df.columns)))

    The output is as follows:

    number of rows:32
    number of columns:11
  11. Print the summary statistics of DataFrame and any two individual columns:

    df.describe().show()

    The output is as follows:

    Figure 4.33: Summary statistics of DataFrame

    Print the summary of any two columns:

    df.describe(['mpg','cyl']).show()

    The output is as follows:

    Figure 4.34: Summary statistics of mpg and cyl columns

  12. Write first seen rows of the sample DataFrame in a CSV file:

    df_p = df.toPandas()
    df_p.head(7).to_csv("mtcars_head.csv")

Activity 10: Data Manipulation with Spark DataFrames

  1. Install the packages as illustrated here:

    !apt-get install openjdk-8-jdk-headless -qq > /dev/null
    !wget -q http://www-us.apache.org/dist/spark/spark-2.4.0/spark-2.4.0-bin-hadoop2.7.tgz
    !tar xf spark-2.4.0-bin-hadoop2.7.tgz
    !pip install -q findspark
    import os
    os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
    os.environ["SPARK_HOME"] = "/content/spark-2.4.0-bin-hadoop2.7"
  2. Then, import the findspark module to connect the Jupyter with Spark use the following command:

    import findspark
    findspark.init()
    import pyspark
    import os
  3. Now, create the SparkContext and SQLContext as illustrated here:

    sc = pyspark.SparkContext()
    from pyspark.sql import SQLContext
    sqlc = SQLContext(sc)
  4. Create a DataFrame in Spark as illustrated here:

    df = sqlc.read.format('com.databricks.spark.csv').options(header='true', inferschema='true').load('mtcars.csv')
    df.show(4)

    The output is as follows:

    Figure 4.35: DataFrame in Spark

  5. Rename any five columns of DataFrame using the following command:

    data = df
    new_names = ['mpg_new', 'cyl_new', 'disp_new', 'hp_new', 'drat_new']
    for i,z in zip(data.columns[0:5],new_names):
        data = data.withColumnRenamed(str(i),str(z))
        
    data.columns

    The output is as follows:

    Figure 4.36: Columns of DataFrame

  6. Select any two numeric and one categorical column from the DataFrame:

    data = df.select(['cyl','mpg','hp'])
    data.show(5)

    The output is as follows:

    Figure 4.37: Two numeric and one categorical column from the DataFrame

  7. Count the number of distinct categories in the categorical variable:

    data.select('cyl').distinct().count() #3
  8. Create two new columns in DataFrame by summing up and multiplying together the two numerical columns:

    data = data.withColumn('colsum',(df['mpg'] + df['hp']))
    data = data.withColumn('colproduct',(df['mpg'] * df['hp']))
    data.show(5)

    The output is as follows:

    Figure 4.38: New columns in DataFrame

  9. Drop both the original numerical columns:

    data = data.drop('mpg','hp')
    data.show(5)

    Figure 4.39: New columns in DataFrame after dropping

  10. Sort the data by categorical column:

    data = data.orderBy(data.cyl)
    data.show(5)

    The output is as follows:

    Figure 4.40: Sort data by categorical columns

  11. Calculate the mean of the summation column for each distinct category in the categorical variable:

    data.groupby('cyl').agg({'colsum':'mean'}).show()

    The output is as follows:

    Figure 4.41: Mean of the summation column

  12. Filter the rows with values greater than the mean of all the mean values calculated in the previous step:

    data.count()#15
    cyl_avg = data.groupby('cyl').agg({'colsum':'mean'})
    avg = cyl_avg.agg({'avg(colsum)':'mean'}).toPandas().iloc[0,0]
    data = data.filter(data.colsum > avg)
    data.count()
    data.show(5)

    The output is as follows:

    Figure 4.42: Mean of all the mean values calculated of the summation column

  13. De-duplicate the resultant DataFrame to make sure it has all unique records:

    data = data.dropDuplicates()
    data.count()

    The output is 15.

Activity 11: Graphs in Spark

  1. Import the required Python libraries in the Jupyter Notebook:

    import pandas as pd
    import os
    import matplotlib.pyplot as plt
    import seaborn as sns
    %matplotlib inline
  2. Read and show the data from the CSV file using the following command:

    df = pd.read_csv('mtcars.csv')
    df.head()

    The output is as follows:

    Figure 4.43: Auto-mpg DataFrame

  3. Visualize the discrete frequency distribution of any continuous numeric variable from your dataset using a histogram:

    plt.hist(df['mpg'], bins=20)
    plt.ylabel('Frequency')
    plt.xlabel('Values')
    plt.title('Frequency distribution of mpg')
    plt.show()

    The output is as follows:

    Figure 4.44: Discrete frequency distribution histogram

  4. Visualize the percentage share of the categories in the dataset using a pie chart:

    ## Calculate count of records for each gear
    data = pd.DataFrame([[3,4,5],df['gear'].value_counts().tolist()]).T
    data.columns = ['gear','gear_counts']
    
    ## Visualising percentage contribution of each gear in data using pie chart
    plt.pie(data.gear_counts, labels=data.gear, startangle=90, autopct='%.1f%%')
    plt.title('Percentage contribution of each gear')
    plt.show()

    The output is as follows:

    Figure 4.45: Percentage share of the categories using pie chart

  5. Plot the distribution of a continuous variable across the categories of a categorical variable using a boxplot:

    sns.boxplot(x = 'gear', y = 'disp', data = df)
    plt.show()

    The output is as follows:

    Figure 4.46: Distribution of a continuous using boxplot

  6. Visualize the values of a continuous numeric variable using a line chart:

    data = df[['hp']]
    data.plot(linestyle='-')
    plt.title('Line Chart for hp')
    plt.ylabel('Values')
    plt.xlabel('Row number')
    plt.show()

    The output is as follows:

    Figure 4.47: Continuous numeric variable using a line chart

  7. Plot the values of multiple continuous numeric variables on the same line chart:

    data = df[['hp','disp', 'mpg']]
    data.plot(linestyle='-')
    plt.title('Line Chart for hp, disp & mpg')
    plt.ylabel('Values')
    plt.xlabel('Row number')
    plt.show()

    The output is as follows:

    Figure 4.48: Multiple continuous numeric variables