Book Image

Instant Apache Camel Message Routing

By : Bilgin Ibryam
Book Image

Instant Apache Camel Message Routing

By: Bilgin Ibryam

Overview of this book

With new APIs and technologies emerging every day, the need for integrating applications is greater than ever before. With the right tools, integrating applications is not hard. Apache Camel is the leading open source integration and message orchestration framework. Apache Camel, which has a variety of connectors and features numerous well-known integration patterns, has an enormous advantage over home grown integration solutions. Instant Apache Camel Message Routing helps you to get started using the Camel routing engine and Enterprise Integration Patterns. This book will show you how to create integration applications using Apache Camel. You will learn how Camel works and how to leverage the Enterprise Integration Patterns for message routing. Instant Apache Camel Message Routing is a practical and step-by-step guide to Apache Camel and integration patterns. This book will show you how Apache Camel works and how it integrates disparate systems using Enterprise Integration Patterns. The book starts with a high level overview of the Camel architecture before diving into message routing principles. Then, it introduces a number of patterns, complete with diagrams, common use cases, and examples about how to use them with Camel. The book also shows you how to test and monitor Camel applications and cope with failure scenarios.
Table of Contents (7 chapters)

Splitting a message into many (Intermediate)


Messages passed to integration applications are not always in the right granularity to work with. In many cases they are composite messages consisting of multiple elements each of which has to be processed individually. This is when a Splitter pattern can help us to split incoming messages into series of messages, which are easier to work with. In this tutorial, we will create a route that splits an incoming XML message into multiple messages.

Getting ready

The complete source code for this tutorial is located under the project camel-message-routing-examples/splitting-message.

How to do it...

  1. In the tutorial, we assume that the incoming message is an XML message in the following format:

    <invoice>
        <item><product name="widget" quantity="2"/></item>
        <item><product name="gadget" quantity="1"/></item>
    </invoice>
  2. At the point in the Camel route where the message has to split into sub messages add the Splitter with the XPath expression:

    from("direct:start")
        .split(xpath("//invoice/item/product"))
            .to("mock:products")
        .end()
        .to("mock:result");

How it works...

The only required configuration for Splitter is the criteria used to split the messages. It can be any expression language, so we have the freedom to choose from Simple language, XPath, XQuery, and many others, depending mainly on the message type and ease of splitting. In our example, to split the incoming XML we chose to use XPath expression that will be applied to the main message, and the sub messages will be sent to next endpoint(s).

There are a couple of options to customize the way newly created Exchanges are processed. With onPrepareRef it is possible to reference a Processor in the Registry that will be called just before sending the sub message to child endpoints. Setting parallelProcessing will cause new Exchanges to be processed in parallel instead of sequentially; stopOnException will interrupt further processing remaining sub messages if an Exception is thrown and return the error, whereas leaving the default false value will finish processing all sub messages and return the error at the end. Splitter also supports splitting messages using tokenizer expressions and splitting streamed messages, which is useful for chunking large files with a low memory footprint.

There's more...

In addition to splitting messages, the Splitter can also aggregate the messages back into one. This turns Splitter into a very powerful and compact construct used for splitting and then aggregating messages without a need for an additional Aggregator.

Aggregating results

When a message is passed to Splitter, it is split and the sub messages are sent to child endpoints, however, the processing is not over yet. If the incoming MEP is InOut, then Splitter has to reply to the caller or pass a message to the next processor in the route if there is one. This is where the strategyRef option comes into play. It allows us to reference a custom implementation of the AggregationStrategy interface from Registry used for aggregating results from the sub Exchanges. This interface only has one method that accepts the original incoming Exchange and the new Exchange returned from each sub Exchange that is processed:

Exchange aggregate(Exchange oldExchange, Exchange newExchange)

A typical implementation usually gets the result from newExchange, aggregates it into oldExchange and returns it. This allows accumulation of the sub Exchange results in the original Exchange and finally returning that as the final result from the Splitter. If no custom AggregationStrategy is used, the default strategy simply returns the original incoming Exchange without performing any aggregation.

AggregationStrategy is the same interface used in the Aggregator pattern (which we will see next), and having it as part of Splitter turns Splitter into a pattern having an embedded lightweight Aggregator.

Threading model

When parallelProcessing option is set, Splitter will create a new thread pool (java.util.concurrent.ExecutorService) and using this will process sub messages in parallel. Instead of creating a new thread pool, it is possible to share and reuse an existing thread pool by referencing it from the Registry using the executorServiceRef option.

Parallel processing with thread pools are also used in other EIPs such us Aggregator, multicast, WireTap, Recipient List, and some other components. Camel creates a new thread pool based on the thread pool profiles. The default thread pool profile looks like the following:

<threadPoolProfile id="defaultThreadPoolProfile" defaultProfile="true" poolSize="10" maxPoolSize="20" maxQueueSize="1000" rejectedPolicy="CallerRuns"/>

In the Java DSL, thread pool profiles are set using the ExecutorServiceManager field of CamelContext. If we use executorServiceRef, Camel will look up in Registry for a thread pool with the given ID. If there is no such thread pool, it will look up in Registry for a thread pool profile with the same ID. In case there is a thread pool profile with that ID, Camel will use it to create the new thread pool. If there is no such thread pool profile, default profile will be used. Also, all thread pools created by Camel will be properly shutdown when CamelContext is stopped, so we shouldn't worry about thread leaks. More details about the Camel threading model can be found on this page http://camel.apache.org/threading-model.html.