Book Image

Machine Learning Engineering with Python - Second Edition

By : Andrew P. McMahon
1 (1)
Book Image

Machine Learning Engineering with Python - Second Edition

1 (1)
By: Andrew P. McMahon

Overview of this book

The Second Edition of Machine Learning Engineering with Python is the practical guide that MLOps and ML engineers need to build solutions to real-world problems. It will provide you with the skills you need to stay ahead in this rapidly evolving field. The book takes an examples-based approach to help you develop your skills and covers the technical concepts, implementation patterns, and development methodologies you need. You'll explore the key steps of the ML development lifecycle and create your own standardized "model factory" for training and retraining of models. You'll learn to employ concepts like CI/CD and how to detect different types of drift. Get hands-on with the latest in deployment architectures and discover methods for scaling up your solutions. This edition goes deeper in all aspects of ML engineering and MLOps, with emphasis on the latest open-source and cloud-based technologies. This includes a completely revamped approach to advanced pipelining and orchestration techniques. With a new chapter on deep learning, generative AI, and LLMOps, you will learn to use tools like LangChain, PyTorch, and Hugging Face to leverage LLMs for supercharged analysis. You will explore AI assistants like GitHub Copilot to become more productive, then dive deep into the engineering considerations of working with deep learning.
Table of Contents (12 chapters)
Other Books You May Enjoy

Building general pipelines with Airflow

In Chapter 4, Packaging Up, we discussed the benefits of writing our ML code as pipelines. We discussed how to implement some basic ML pipelines using tools such as sklearn and Spark ML. The pipelines we were concerned with there were very nice ways of streamlining your code and making several processes available to use within a single object to simplify an application. However, everything we discussed then was very much focused on one Python file and not necessarily something we could extend very flexibly outside the confines of the package we were using. With the techniques we discussed, for example, it would be very difficult to create pipelines where each step was using a different package or even where they were entirely different programs. They did not allow us to build much sophistication into our data flows or application logic either, as if one of the steps failed, the pipeline failed, and that was that.

The tools we are about to discuss take these concepts to the next level. They allow you to manage the workflows of your ML solutions so that you can organize, coordinate, and orchestrate elements with the appropriate level of complexity to get the job done.


Apache Airflow is the workflow management tool that was initially developed by Airbnb in the 2010s and has been open-source since its inception. It gives data scientists, data engineers, and ML engineers the capability of programmatically creating complex pipelines through Python scripts. Airflow’s task management is based on the definition and then execution of a Directed Acyclic Graph (DAG) with nodes as the tasks to be run. DAGs are also used in TensorFlow and Spark, so you may have heard of them before.

Airflow contains a variety of default operators to allow you to define DAGs that can call and use multiple components as tasks, without caring about the specific details of a task. It also provides functionality for scheduling your pipelines. As an example, let’s build an Apache Airflow pipeline that will get data, perform some feature engineering, train a model, and then persist the model. We won’t cover the detailed implementation of each command, but simply show you how your ML processes hang together in an Airflow DAG. In Chapter 9, Building an Extract, Transform, Machine Learning Use Case, we will build out a detailed end-to-end example discussing these lower-level details. This first example is more concerned with understanding the high level of how to write, deploy, and manage your DAGs in the cloud:

  1. First, in a file called, we can import the relevant Airflow packages and any utility packages we need:
    import datetime
    from datetime import timedelta
    from airflow import DAG
    from airflow.operators.python import PythonOperator
    from airflow.utils.dates import days_ago
  2. Next, Airflow allows you to define default arguments that can be referenced by all of the following tasks, with the option to overwrite at the same level:
    default_args = {
        'owner': 'Andrew McMahon',
        'depends_on_past': False,
        'start_date': days_ago(31),
        'email': ['[email protected]'],
        'email_on_failure': False,
        'email_on_retry': False,
        'retries': 1,
        'retry_delay': timedelta(minutes=2)
  3. We then have to instantiate our DAG and provide the relevant metadata, including our scheduling interval:
    with DAG(
        start_date=datetime.datetime(2021, 10, 1),
    ) as dag:
  4. Then, all that is required is to define your tasks within the DAG definition. First, we define an initial task that gets our dataset. This next piece of code assumes there is a Python executable, for example, a function or class method, called get_data that we can pass to the task. This could have been imported from any submodule or package we want. Note that steps 3-5 assume we are inside the code block of the DAG instantiation, so we assume another indent that we don’t show here to save space:
        get_data_task = PythonOperator(
  5. We then perform a task that takes this data and performs our model training steps. This task could, for example, encapsulate one of the pipeline types we covered in Chapter 3, From Model to Model Factory; for example, a Spark ML pipeline, Scikit-Learn pipeline, or any other ML training pipeline we looked at. Again, we assume there is a Python executable called train_model that can be used in this step:
        train_model_task = PythonOperator(
  6. The final step of this process is a placeholder for taking the resultant trained model and persisting it to our storage layer. This means that other services or pipelines can use this model for prediction:
        persist_model_task = PythonOperator(
  7. Finally, we define the running order of the task nodes that we have defined in the DAG using the >> operator. The tasks above could have been defined in any order, but the following syntax stipulates how they must run:
    get_data_task >> train_model_task >> persist_model_task

In the next sections, we will briefly cover how to set up an Airflow pipeline on AWS using the Managed Workflows for Apache Airflow (MWAA) service. The section after will then show how you can use CI/CD principles to continuously develop and update your Airflow solutions. This will bring together some of the setup and work we have been doing in previous chapters of the book.

Airflow on AWS

AWS provides a cloud-hosted service called Managed Workflows for Apache Airflow (MWAA) that allows you to deploy and host your Airflow pipelines easily and robustly. Here, we will briefly cover how to do this.

Complete the following steps:

  1. Select Create an environment on the MWAA landing page. You can find this by searching for MWAA in the AWS Management Console.
  2. You will then be provided with a screen asking for the details of your new Airflow environment. Figure 5.24 shows the high-level steps that the website takes you through:
    Figure 5.29 – The high-level steps for setting up an MWAA environment and associated managed Airflow runs

    Figure 5.24: The high-level steps for setting up an MWAA environment and associated managed Airflow runs.

    Environment details, as shown in Figure 5.25, is where we specify our environment name. Here, we have called it mlewp2-airflow-dev-env:

    Figure 5.25: Naming your MWAA environment.

  1. For MWAA to run, it needs to be able to access code defining the DAG and any associated requirements or plugin files. The system then asks for an AWS S3 bucket where these pieces of code and configuration reside. In this example, we create a bucket called mlewp2-ch5-airflow-example that will contain these pieces. Figure 5.26 shows the creation of the bucket:

    Figure 5.26: The successful creation of our AWS S3 bucket for storing our Airflow code and supporting configuration elements.

    Figure 5.27 shows how we point MWAA to the correct bucket, folders, and plugins or requirement files if we have them too:

    Figure 5.27: We reference the bucket we created in the previous step in the configuration of the MWAA instance.

  1. We then have to define the configuration of the network that the managed instance of Airflow will use, similar to the other AWS examples in this chapter. This can get a bit confusing if you are new to networking, so it might be good to read around the topics of subnets, IP addresses, and VPCs. Creating a new MWAA VPC is the easiest approach for getting started in terms of networking here, but your organization will have networking specialists who can help you use the appropriate settings for your situation. We will go with this simplest route and click Create MWAA VPC, which opens a new window where we can quickly spin up a new VPC and network setup based on a standard stack definition provided by AWS. You will be asked for a stack name. I have called mine MLEWP-2-MWAA-VPC. The networking information will be populated with something like that shown in Figure 5.28:

    Figure 5.28: An example stack template for creating your new VPC.

  1. We are then taken to a page where we are asked for more details on networking. We can select Public network (No additional setup) for this example as we will not be too concerned with creating an organizationally aligned security model. For deployments in an organization, work with your security team to understand what additional security you need to put in place. We can also select Create new security group. This is shown in Figure 5.29.

    Figure 5.29: Finalizing the networking for our MWAA setup.

  1. Next, we have to define the Environment class that we want to spin up. Currently, there are three options. Here, we’ll use the smallest, but you can choose the environment that best suits your needs (always ask the billpayer’s permission!). Figure 5.30 shows that we can select the mw1.small environment class with a min to max worker count of 1-10. MWAA does allow you to change the environment class after instantiating if you need to, so it can often be better to start small and scale up as needed from a cost point of view. You will also be asked about the number of schedulers you want for the environment. Let’s leave this as the default, 2, for now, but you can go up to 5.

    Figure 5.30: Selecting an environment class and worker sizes.

  1. Now, if desired, we confirm some optional configuration parameters (or leave these blank, as done here) and confirm that we are happy for AWS to create and use a new execution role. We can also just proceed with the default monitoring settings. Figure 5.31 shows an example of this (and don’t worry, the security group will have long been deleted by the time you are reading this page!):

    Figure 5.31: The creation of the execution role used by AWS for the MWAA environment.

  1. The next page will supply you with a final summary before allowing you to create your MWAA environment. Once you do this, you will be able to see your newly created environment in the MWAA service, as in Figure 5.32. This process can take some time, and for this example it took around 30 minutes:

    Figure 5.32: Our newly minted MWAA environment.

Now that you have this MWAA environment and you have supplied your DAG to the S3 bucket that it points to, you can open the Airflow UI and see the scheduled jobs defined by your DAG. You have now deployed a basic running service that we can build upon in later work.

Now we will want to see the DAGs in the Airflow UI so that we can orchestrate and monitor the jobs. To do this, you may need to configure access for your own account to the MWAA UI using the details outlined on the AWS documentation pages. As a quick summary, you need to go to the IAM service on AWS. You will need to be logged in as a root user, and then create a new policy title, AmazonMWAAWebServerAccess. Give this policy the following JSON body:

    "Version": "2012-10-17",
    "Statement": [
            "Effect": "Allow",
            "Action": "airflow:CreateWebLoginToken",
            "Resource": [
                "arn:aws:airflow:{your-region}:YOUR_ACCOUNT_ID:role/{your-                 environment-name}/{airflow-role}"

For this definition, the Airflow role refers to one of the five roles of Admin, Op, Viewer, User, or Public, as defined in the Airflow documentation at I have used the Admin role for this example. If you add this policy to the permissions of your account, you should be able to access the Airflow UI by clicking the Open Airflow UI button in the MWAA service. You will then be directed to the Airflow UI, as shown in Figure 5.33.

Figure 5.33: The Airflow UI accessed via the AWS MWAA service. This view shows the classification DAG that we wrote earlier in the example.

The Airflow UI allows you to trigger DAG runs, manage the jobs that you have scheduled, and monitor and troubleshoot your pipelines. As an example, upon a successful run, you can see summary information for the runs, as shown in Figure 5.34, and can use the different views to understand the time taken for each of the pipeline steps and diagnose where any issues have arisen if there are errors raised.

Figure 5.34: Example run summary for our simple classification DAG in the Airflow UI.

The pipeline we have built and run in this example is obviously very simple, with only core Python functionality being used. If you want to leverage other AWS services, for example, by submitting a Spark job to an EMR cluster, then you will need to configure further access policies like the one we did above for the UI access. This is covered in the MWAA documentation.


Once you have created this MWAA environment, you cannot pause it, as it costs a small amount to run per hour (around 0.5 USD per hour for the environment configuration above). MWAA does not currently contain a feature for pausing and resuming an environment, so you will have to delete the environment and re-instantiate a new one with the same configuration when required. This can be automated using tools such as Terraform or AWS CloudFormation, which we will not cover here. So, a word of warning – DO NOT ACCIDENTALLY LEAVE YOUR ENVIRONMENT RUNNING. For example, definitely do not leave it running for a week, like I may or may not have done.

Revisiting CI/CD for Airflow

We introduced the basics of CI/CD in Chapter 2, The Machine Learning Development Process, and discussed how this can be achieved by using GitHub Actions. We will now take this a step further and start to set up CI/CD pipelines that deploy code to the cloud.

First, we will start with an important example where we will push some code to an AWS S3 bucket. This can be done by creating a .yml file in your GitHub repo under your .github./workflows directory called aws-s3-deploy.yml. This will be the nucleus around which we will form our CI/CD pipeline.

The .yml file, in our case, will upload the Airflow DAG and contain the following pieces:

  1. We name the process using the syntax for name and express that we want the deployment process to be triggered on a push to the main branch or a pull request to the main branch:
    name: Upload DAGS to S3
        branches: [ main ]
        branches: [ main ]
  2. We then define the jobs we want to occur during the deployment process. In this case, we want to upload our DAG files to an S3 bucket we have already created, and we want to use the appropriate AWS credentials we have configured in our GitHub secrets store:
        name: Upload DAGS to Amazon S3
        runs-on: ubuntu-latest
          - name: Checkout
            uses: actions/checkout@v2
          - name: Configure AWS credentials from account
            uses: aws-actions/configure-aws-credentials@v1
              aws-access-key-id: ${{ secrets.AWS_ACCESS_KEY_ID }}
              aws-secret-access-key: ${{ secrets.AWS_SECRET_ACCESS_KEY }}
              aws-region: us-east-1

    Then, as part of the job, we run the step that copies the relevant files to our specified AWS S3 bucket. In this case, we are also specifying some details about how to make the copy using the AWS CLI. Specifically, here we want to copy over all the Python files to the dags folder of the repo:

          - name: Copy files to bucket with the AWS CLI
            run: |
              aws s3 cp ./dags s3://github-actions-ci-cd-tests
              --recursive--include "*.py"
  1. Once we perform a git push command with updated code, this will then execute the action and push the dag Python code to the specified S3 bucket. In the GitHub UI, you will be able to see something like Figure 5.35 on a successful run:
    Figure 5.38 – A successful CI/CD process run via GitHub Actions and using the AWS CLI

    Figure 5.35: A successful CI/CD process run via GitHub Actions and using the AWS CLI.

This process then allows you to successfully push new updates to your Airflow service into AWS to be run by your MWAA instance. This is real CI/CD and allows you to continually update the service you are providing without downtime.