Book Image

Learning Apache Flink

By : Tanmay Deshpande
Book Image

Learning Apache Flink

By: Tanmay Deshpande

Overview of this book

<p>With the advent of massive computer systems, organizations in different domains generate large amounts of data on a real-time basis. The latest entrant to big data processing, Apache Flink, is designed to process continuous streams of data at a lightning fast pace.</p> <p>This book will be your definitive guide to batch and stream data processing with Apache Flink. The book begins with introducing the Apache Flink ecosystem, setting it up and using the DataSet and DataStream API for processing batch and streaming datasets. Bringing the power of SQL to Flink, this book will then explore the Table API for querying and manipulating data. In the latter half of the book, readers will get to learn the remaining ecosystem of Apache Flink to achieve complex tasks such as event processing, machine learning, and graph processing. The final part of the book would consist of topics such as scaling Flink solutions, performance optimization and integrating Flink with other tools such as ElasticSearch.</p> <p>Whether you want to dive deeper into Apache Flink, or want to investigate how to get more out of this powerful technology, you’ll find everything you need inside.</p>
Table of Contents (17 chapters)
Learning Apache Flink
Credits
About the Author
About the Reviewers
www.PacktPub.com
Customer Feedback
Preface

Cluster setup


Setting up a Flink cluster is very simple as well. Those who have a background of installing a Hadoop cluster will be able to relate to these steps very easily. In order to set up the cluster, let's assume we have four Linux machines with us, each having a moderate configuration. At least two cores and 4 GB RAM machines would be a good option to get started.

The very first thing we need to do this is to choose the cluster design. As we have four machines, we will use one machine as the Job Manager and the other three machines as the Task Managers:

SSH configurations

In order to set up the cluster, we first need to do password less connections to the Task Manager from the Job Manager machine. The following steps needs to be performed on the Job Manager machine which creates an SSH key and copies it to authorized_keys:

$ssh-keygen

This will generate the public and private keys in the /home/flinkuser/.ssh folder. Now copy the public key to the Task Manager machine and perform the following steps on the Task Manager to allow password less connection from the Job Manager:

sudo mkdir -p /home/flinkuser/.ssh

sudo touch /home/flinkuser/authorized_keys

sudo cp /home/flinkuser/.ssh

    sudo sh -c "cat id_rsa.pub >> /home/flinkuser/.ssh/authorized_keys"

Make sure the keys have restricted access by executing the following commands:

sudo chmod 700 /home/flinkuser/.ssh
sudo chmod 600 /home/flinkuser/.ssh/authorized_keys

Now you can test the password less SSH connection from the Job Manager machine:

sudo ssh <task-manager-1>
sudo ssh <task-manager-2>
sudo ssh <task-manager-3>

Tip

If you are using any cloud service instances for the installations, please make sure that the ROOT login is enabled from SSH. In order to do this, you need to login to each machine: open file /etc/ssh/sshd_config. Then change the value to PermitRootLogin yes. Once you save the file, restart the SSH service by executing the command: sudo service sshd restart

Java installation

Next we need to install Java on each machine. The following command will help you install Java on Redhat/CentOS based UNIX machines.

wget --no-check-certificate --no-cookies --header "Cookie: 
    oraclelicense=accept-securebackup-cookie" 
    http://download.oracle.com/otn-pub/java/jdk/8u92-b14/jdk-8u92-
    linux-x64.rpm
sudo rpm -ivh jdk-8u92-linux-x64.rpm

Next we need to set up the JAVA_HOME environment variable so that Java is available to access from everywhere.

Create a java.sh file:

sudo vi /etc/profile.d/java.sh

And add following content in it and save it:

#!/bin/bash
JAVA_HOME=/usr/java/jdk1.8.0_92
PATH=$JAVA_HOME/bin:$PATH
export PATH JAVA_HOME
export CLASSPATH=.

Make the file executable and source it:

sudo chmod +x /etc/profile.d/java.sh
source /etc/profile.d/java.sh

You can now check if Java is installed properly:

$ java -version
java version "1.8.0_92"
Java(TM) SE Runtime Environment (build 1.8.0_92-b14)
Java HotSpot(TM) 64-Bit Server VM (build 25.92-b14, mixed mode)

Repeat these installations steps on the Job Manager and Task Manager machines.

Flink installation

Once SSH and Java installation is done, we need to download Flink binaries and extract them into a specific folder. Please make a note that the installation directory on all nodes should be same.

So let's get started:

cd /usr/local
sudo wget  http://www-eu.apache.org/dist/flink/flink-1.1.4/flink-
    1.1.4-bin-hadoop27-scala_2.11.tgz
sudo tar -xzf flink-1.1.4-bin-hadoop27-scala_2.11.tgz

Now that the binary is ready, we need to do some configurations.

Configurations

Flink's configurations are simple. We need to tune a few parameters and we are all set. Most of the configurations are same for the Job Manager node and the Task Manager node. All configurations are done in the conf/flink-conf.yaml file.

The following is a configuration file for a Job Manager node:

jobmanager.rpc.address: localhost
jobmanager.rpc.port: 6123
jobmanager.heap.mb: 256
taskmanager.heap.mb: 512
taskmanager.numberOfTaskSlots: 1

You may want to change memory configurations for the Job Manager and Task Manager based on your node configurations. For the Task Manager, jobmanager.rpc.address should be populated with the correct Job Manager hostname or IP address.

So for all Task Managers, the configuration file should be like the following:

jobmanager.rpc.address: <jobmanager-ip-or-host>
jobmanager.rpc.port: 6123
jobmanager.heap.mb: 256
taskmanager.heap.mb: 512
taskmanager.numberOfTaskSlots: 1

We need to add the JAVA_HOME details in this file so that Flink knows exactly where to look for Java binaries:

export JAVA_HOME=/usr/java/jdk1.8.0_92

We also need to add the slave node details in the conf/slaves file, with each node on a separate new line.

Here is how a sample conf/slaves file should look like:

<task-manager-1>
<task-manager-2>
<task-manager-3>

Starting daemons

Now the only thing left is starting the Flink processes. We can start each process separately on individual nodes or we can execute the start-cluster.sh command to start the required processes on each node:

bin/start-cluster.sh

If all the configurations are good, then you would see that the cluster is up and running. You can check the web UI at http://<job-manager-ip>:8081/ .

The following are some snapshots of the Flink Web UI:

You can click on the Job Manager link to get the following view:

Similarly, you can check out the Task Managers view as follows:

Adding additional Job/Task Managers

Flink provides you with the facility to add additional instances of Job and Task Managers to the running cluster.

Before we start the daemon, please make sure that you have followed the steps given previously.

To add an additional Job Manager to the existing cluster, execute the following command:

sudo bin/jobmanager.sh start cluster

Similarly, we need to execute the following command to add an additional Task Manager:

sudo bin/taskmanager.sh start cluster

Stopping daemons and cluster

Once the job execution is completed, you want to shut down the cluster. The following commands are used for that.

To stop the complete cluster in one go:

sudo bin/stop-cluster.sh

To stop the individual Job Manager:

sudo bin/jobmanager.sh stop cluster

To stop the individual Task Manager:

sudo bin/taskmanager.sh stop cluster