Book Image

Storm Real-time Processing Cookbook

By : Quinton Anderson
Book Image

Storm Real-time Processing Cookbook

By: Quinton Anderson

Overview of this book

<p>Storm is a free and open source distributed real-time computation system. Storm makes it easy to reliably process unbounded streams of data, doing for real-time processing what Hadoop did for batch processing. Storm is simple, can be used with any programming language, and is a lot of fun to use!<br />Storm Real Time Processing Cookbook will have basic to advanced recipes on Storm for real-time computation.<br /><br />The book begins with setting up the development environment and then teaches log stream processing. This will be followed by real-time payments workflow, distributed RPC, integrating it with other software such as Hadoop and Apache Camel, and more.</p>
Table of Contents (16 chapters)
Storm Real-time Processing Cookbook
Credits
About the Author
About the Reviewers
www.packtpub.com
Preface
Index

Implementing an integration test


Integration testing can mean many different things depending on the situation and audience. For the purposes of this book, integration testing is a means of testing the topology from end-to-end, with defined input and output points within a local cluster. This allows for a full-functional verification of the functionality before deploying it to an actual cluster.

How to do it…

  1. Create the IntegrationTestTopology class in the src/test/java folder in the storm.cookbook package. Set up a local topology by adding in a testing utility bolt:

    @BeforeClass
        public static void setup(){
       //We want all output tuples coming to the mock for // testing purposes
         topology.getBuilder().setBolt("testBolt",testBolt, 1).globalGrouping("geoStats")
                              .globalGrouping("totalStats");
       // run in local mode, but we will shut the cluster down // when we are finished
            topology.runLocal(0);
       //jedis required for input and output of the cluster
            jedis = new Jedis("localhost", Integer.parseInt(ClickTopology.DEFAULT_JEDIS_PORT));
            jedis.connect();
            jedis.flushDB();
       //give it some time to startup before running the tests.
            Utils.sleep(5000);
        }
  2. Then, define the expected parameters as a set of arrays arranged in pairs:

    @Parameterized.Parameters
        public static Collection<Object[]> data() {
        Object[][] data = new Object[][] { {new Object[]{ "165.228.250.178", "internal.com",  "Client1"}, //input
        new Object[]{ "AUSTRALIA", new Long(1), "SYDNEY", new Long(1), new Long(1), new Long(1) } },//expectations
              {new Object[]{ "165.228.250.178", "internal.com",  "Client1"}, //input
              new Object[]{ "AUSTRALIA", new Long(2), "SYDNEY", new Long(2), new Long(2), new Long(1) } },
              {new Object[]{ "4.17.136.0", "internal.com",  "Client1"}, //input, same client, different location
                new Object[]{ "UNITED STATES", new Long(1), "DERRY, NH", new Long(1), new Long(3), new Long(1) } },
                {new Object[]{ "4.17.136.0", "internal.com",  "Client2"}, //input, same client, different location
                new Object[]{ "UNITED STATES", new Long(2), "DERRY, NH", new Long(2), new Long(4), new Long(2) } }};//expectations
            return Arrays.asList(data);
        }
    Object[] input;
        Object[] expected;
        public IntegrationTestTopology(Object[] input,Object[] expected){
          this.input = input;
          this.expected = expected;
        }
  3. The test logic can then be based on these parameters:

    @Test
        public void inputOutputClusterTest(){
            JSONObject content = new JSONObject();
            content.put("ip" ,input[0]);
            content.put("url" ,input[1]);
            content.put("clientKey" ,input[2]);
    
            jedis.rpush("count", content.toJSONString());
    
            Utils.sleep(3000);
            
            int count = 0;
            String data = jedis.rpop("TestTuple");
            
            while(data != null){
              JSONArray values = (JSONArray) JSONValue.parse(data);
                
                if(values.get(0).toString().contains("geoStats")){
                  count++;
                    assertEquals(expected[0], values.get(1).toString().toUpperCase());
                    assertEquals(expected[1], values.get(2));
                    assertEquals(expected[2], values.get(3).toString().toUpperCase());
                    assertEquals(expected[3], values.get(4));
                } else if(values.get(0).toString().contains("totalStats")) {
                  count++;
                    assertEquals(expected[4], values.get(1));
                    assertEquals(expected[5], values.get(2));
                } 
                data = jedis.rpop("TestTuple");
    
            }
            assertEquals(2, count);
    
        }

How it works…

The integration test works by creating a local cluster and then injecting input values into the cluster through Redis, in the same way as a real web server would for the given design. It then adds a specific testing bolt to the end of the topology that receives all the output tuples and tests these against the expected values.

Once the TestBolt value is submitted to the cluster, it is no longer accessible from the test; therefore, the outputs can only be accessed through persistence. TestBolt persists received tuples to Redis, where the test case can read and validate them. The logic within TestBolt is as follows:

public void execute(Tuple input) {
    List objects = input.getValues();
    objects.add(0, input.getSourceComponent());
    jedis.rpush("TestTuple", JSONArray.toJSONString(objects));
  
  }

This is then read by the test and validated against the expected values:

String data = jedis.rpop("TestTuple");
        
    while(data != null){
      JSONArray values = (JSONArray) JSONValue.parse(data);
            
       if(values.get(0).toString().contains("geoStats")){
         count++;
            assertEquals(expected[0], values.get(1)
                          .toString().toUpperCase());
            assertEquals(expected[1], values.get(2));
            assertEquals(expected[2], values.get(3)
                          .toString().toUpperCase());
            assertEquals(expected[3], values.get(4));
       } else if(values.get(0).toString().contains("totalStats")) 
       {
       count++;
          assertEquals(expected[4], values.get(1));
          assertEquals(expected[5], values.get(2));
       } 
      data = jedis.rpop("TestTuple");

    }
     assertEquals(2, count);

  }