Book Image

Storm Real-time Processing Cookbook

By : Quinton Anderson
Book Image

Storm Real-time Processing Cookbook

By: Quinton Anderson

Overview of this book

<p>Storm is a free and open source distributed real-time computation system. Storm makes it easy to reliably process unbounded streams of data, doing for real-time processing what Hadoop did for batch processing. Storm is simple, can be used with any programming language, and is a lot of fun to use!<br />Storm Real Time Processing Cookbook will have basic to advanced recipes on Storm for real-time computation.<br /><br />The book begins with setting up the development environment and then teaches log stream processing. This will be followed by real-time payments workflow, distributed RPC, integrating it with other software such as Hadoop and Apache Camel, and more.</p>
Table of Contents (16 chapters)
Storm Real-time Processing Cookbook
Credits
About the Author
About the Reviewers
www.packtpub.com
Preface
Index

Creating a Storm cluster – provisioning the machines


Testing the cluster in the local mode is useful for debugging and verifying the basic functional logic of the cluster. It doesn't, however, give you a realistic view as to the operation of the cluster. Moreover, any development effort is only complete once the system is running in a production environment. This is a key consideration for any developer and is the cornerstone of the entire DevOps movement; regardless of the methodology, however, you must be able to reliably deploy your code into an environment. This recipe demonstrates how to create and provision an entire cluster directly from version control. There are many key principles in doing this:

  • The state of any given server must be known at all times. It isn't acceptable that people can log into a server and make changes to its settings or files without strict version control being in place.

  • Servers should be fundamentally immutable, with the state in some kind of separate volume. This allows deterministic recovery times of a server.

  • If something causes problems in the delivery process, do it more often. In software development and IT operations, this applies heavily to disaster recovery and integration. Both tasks can only be performed often if they are automated.

  • This book assumes that your destination production environment is a cluster (based on Amazon Web Services (AWS) EC2), which enables automatic scaling. Elastic auto-scaling is only possible where provisioning is automated.

The deployment of Storm topologies to an AWS cluster is the subject for a later chapter; however, the fundamentals will be presented in this recipe in a development environment.

How to do it...

Let's start by creating a new project as follows:

  1. Create a new project named vagrant-storm-cluster with the following data structure:

  2. Using your favorite editor, create a file in the project root called Vagrantfile. Inside the file, you must create the file header and the configuration for the virtual machines that we want to create. We need at least one nimbus node, two supervisor nodes, and a zookeeper node:

    # -*- mode: ruby -*-
    # vi: set ft=ruby :
    boxes = [
      { :name => :nimbus, :ip => '192.168.33.100', :cpus =>2, :memory => 512 },
      { :name => :supervisor1, :ip => '192.168.33.101', :cpus =>4, :memory => 1024 },
      { :name => :supervisor2, :ip => '192.168.33.102', :cpus =>4, :memory => 1024 },
      { :name => :zookeeper1, :ip => '192.168.33.201', :cpus =>1, :memory => 512 }
    ]

    Tip

    Note that the use of a single zookeeper node is only for development environments, as this cluster is not highly available. The purpose of this cluster is to test your topology logic in a realistic setting and identify stability issues.

  3. You must then create the virtual machine provisioning for each machine, specialized by the previous configuration at execution time. The first set of properties defines the hardware, networking, and operating system:

    boxes.each do |opts|
        config.vm.define opts[:name] do |config|
        config.vm.box = "ubuntu12"
        config.vm.box_url = 
            "http://dl.dropbox.com/u/1537815/precise64.box"
             config.vm.network :hostonly, opts[:ip]
        config.vm.host_name = "storm.%s" % opts[:name].to_s
        config.vm.share_folder "v-data", "/vagrant_data", "./data", :transient => false
        config.vm.customize ["modifyvm", :id, "--memory", opts[:memory]]
        config.vm.customize ["modifyvm", :id, "--cpus", opts[:cpus] ] if opts[:cpus]
  4. The provisioning of the application is then configured using a combination of the bash and Puppet scripts:

    config.vm.provision :shell, :inline => "cp -fv /vagrant_data/hosts /etc/hosts"
          
        config.vm.provision :shell, :inline => "apt-get update"
          # Check if the jdk has been provided
          if File.exist?("./data/jdk-6u35-linux-x64.bin") then
          config.vm.provision :puppet do |puppet|
            puppet.manifests_path = "manifests"
            puppet.manifest_file = "jdk.pp"
           end
          end
          
          config.vm.provision :puppet do |puppet|
          puppet.manifests_path = "manifests"
          puppet.manifest_file = "provisioningInit.pp"
          end
          
          # Ask puppet to do the provisioning now.
          config.vm.provision :shell, :inline => "puppet apply /tmp/storm-puppet/manifests/site.pp --verbose --modulepath=/tmp/storm-puppet/modules/ --debug"  
          
        end
      end
    end

    The Vagrant file simply defines the hypervisor-level configuration and provisioning; the remaining provisioning is done through Puppet and is defined at two levels. The first level makes the base Ubuntu installation ready for application provisioning. The second level contains the actual application provisioning. In order to create the first level of provisioning, you need to create the JDK provisioning bash script and the provisioning initialization Puppet script.

  5. In the scripts folder of the project, create the installJdk.sh file and populate it with the following code:

    #!/bin/sh
    echo "Installing JDK!"
    chmod 775 /vagrant_data/jdk-6u35-linux-x64.bin
    cd /root
    yes | /vagrant_data/jdk-6u35-linux-x64.bin
    /bin/mv /root/jdk1.6.0_35 /opt
    /bin/rm -rv /usr/bin/java
    /bin/rm -rv /usr/bin/javac
    /bin/ln -s /opt/jdk1.6.0_35/bin/java /usr/bin
    /bin/ln -s /opt/jdk1.6.0_35/bin/javac /usr/bin
    JAVA_HOME=/opt/jdk1.6.0_35
    export JAVA_HOME
    PATH=$PATH:$JAVA_HOME/bin
    export PATH

    This will simply be invoked by the Puppet script in a declarative manner.

  6. In the manifest folder create a file called jdk.pp:

    $JDK_VERSION = "1.6.0_35" 
    package {"openjdk":
      ensure  =>  absent,
    } 
    exec { "installJdk":
      command => "installJdk.sh",
        path => "/vagrant/scripts",
        logoutput => true, 
        creates => "/opt/jdk${JDK_VERSION}",
    }
  7. In the manifest folder, create the provisioningInit.pp file and define the required packages and static variable values:

    $CLONE_URL = "https://bitbucket.org/qanderson/storm-puppet.git"
    $CHECKOUT_DIR="/tmp/storm-puppet"
    
    package {git:ensure=> [latest,installed]}
    package {puppet:ensure=> [latest,installed]}
    package {ruby:ensure=> [latest,installed]}
    package {rubygems:ensure=> [latest,installed]}
    package {unzip:ensure=> [latest,installed]}
    
    exec { "install_hiera":
      command => "gem install hiera hiera-puppet",
        path => "/usr/bin",
        require => Package['rubygems'],
    }

    Note

    For more information on Hiera, please see the Puppet documentation page at http://docs.puppetlabs.com/hiera/1/index.html.

  8. You must then clone the repository, which contains the second level of provisioning:

    exec { "clone_storm-puppet":
      command => "git clone ${CLONE_URL}",
      cwd => "/tmp",
        path => "/usr/bin",
        creates => "${CHECKOUT_DIR}",
        require => Package['git'],
    }
  9. You must now configure a Puppet plugin called Hiera, which is used to externalize properties from the provisioning scripts in a hierarchical manner:

    exec {"/bin/ln -s /var/lib/gems/1.8/gems/hiera-puppet-1.0.0/ /tmp/storm-puppet/modules/hiera-puppet":
      creates => "/tmp/storm-puppet/modules/hiera-puppet",
      require => [Exec['clone_storm-puppet'],Exec['install_hiera']]
    }
    
    
    #install hiera and the storm configuration
    file { "/etc/puppet/hiera.yaml":
        source => "/vagrant_data/hiera.yaml",
        replace => true,
        require => Package['puppet']
    }
    
    file { "/etc/puppet/hieradata":
      ensure => directory,
      require => Package['puppet'] 
    }
    
    file {"/etc/puppet/hieradata/storm.yaml": 
      source => "${CHECKOUT_DIR}/modules/storm.yaml",
        replace => true,
        require => [Exec['clone_storm-puppet'],File['/etc/puppet/hieradata']]
    }
  10. Finally, you need to populate the data folder. Create the Hiera base configuration file, hiera.yaml:

    ---
    :hierarchy:
        - %{operatingsystem}
        - storm
    :backends:
        - yaml
    :yaml:
        :datadir: '/etc/puppet/hieradata'
  11. The final datafile required is the host's file, which act as the DNS in our local cluster:

    127.0.0.1       localhost
    192.168.33.100  storm.nimbus
    192.168.33.101  storm.supervisor1
    192.168.33.102  storm.supervisor2
    192.168.33.103  storm.supervisor3
    192.168.33.104  storm.supervisor4
    192.168.33.105  storm.supervisor5
    192.168.33.201  storm.zookeeper1
    192.168.33.202  storm.zookeeper2
    192.168.33.203  storm.zookeeper3
    192.168.33.204  storm.zookeeper4

    Tip

    The host's file is not required in properly configured environments; however, it works nicely in our local "host only" development network.

    The project is now complete, in that it will provision the correct virtual machines and install the base required packages; however, we need to create the Application layer provisioning, which is contained in a separate repository.

  12. Initialize your Git repository for this project and push it to bitbucket.org.

How it works...

Provisioning is performed on three distinct layers:

This recipe only works in the bottom two layers, with the Application layer presented in the next recipe. A key reason for the separation is that you will typically create different provisioning at these layers depending on the Hypervisor you are using for deployment. Once the VMs are provisioned, however, the application stack provisioning should be consistent through all your environments. This is key, in that it allows us to test our deployments hundreds of times before we get to production, and ensure that they are in a repeatable and version-controlled state.

In the development environment, VirtualBox is the Hypervisor with Vagrant and Puppet providing the provisioning. Vagrant works by specializing a base image of a VirtualBox. This base image represents a version-controlled artifact. For each box defined in our Vagrant file, the following parameters are specified:

  • The base box

  • The network settings

  • Shared folders

  • Memory and CPU settings for the VM

    Tip

    This base provisioning does not include any of the baseline controls you would expect in a production environment, such as security, access controls, housekeeping, and monitoring. You must provision these before proceeding beyond your development environment. You can find these kinds of recipes on Puppet Forge (http://forge.puppetlabs.com/).

Provisioning agents are then invoked to perform the remaining heavy lifting:

config.vm.provision :shell, :inline => "cp -fv /vagrant_data/hosts /etc/hosts"

The preceding command installs the host's file that gives the resolution of our cluster name:

config.vm.provision :shell, :inline => "apt-get update"

This updates all the packages in the apt-get cache within the Ubuntu installation. Vagrant then proceeds to install the JDK and the base provisioning. Finally it invokes the application provisioning.

Note

The base VM image could contain the entire base provisioning already, thus making this portion of the provisioning unrequired. However, it is important to understand the process of creating an appropriate base image and also to balance the amount of specialization in the base images you control; otherwise, they will proliferate.