Book Image

Instant Apache Camel Message Routing

By : Bilgin Ibryam
Book Image

Instant Apache Camel Message Routing

By: Bilgin Ibryam

Overview of this book

With new APIs and technologies emerging every day, the need for integrating applications is greater than ever before. With the right tools, integrating applications is not hard. Apache Camel is the leading open source integration and message orchestration framework. Apache Camel, which has a variety of connectors and features numerous well-known integration patterns, has an enormous advantage over home grown integration solutions. Instant Apache Camel Message Routing helps you to get started using the Camel routing engine and Enterprise Integration Patterns. This book will show you how to create integration applications using Apache Camel. You will learn how Camel works and how to leverage the Enterprise Integration Patterns for message routing. Instant Apache Camel Message Routing is a practical and step-by-step guide to Apache Camel and integration patterns. This book will show you how Apache Camel works and how it integrates disparate systems using Enterprise Integration Patterns. The book starts with a high level overview of the Camel architecture before diving into message routing principles. Then, it introduces a number of patterns, complete with diagrams, common use cases, and examples about how to use them with Camel. The book also shows you how to test and monitor Camel applications and cope with failure scenarios.
Table of Contents (7 chapters)

Aggregating multiple messages into one (Intermediate)


The Aggregator pattern is the opposite of the Splitter pattern; it combines multiple messages into one. In a message driven application, it usually appears after Splitter or the Recipient List patterns where a number of messages have been produced out of one, and the goal of the Aggregator is to merge back related messages. In this tutorial, we will create a route that merges messages belonging to the same group into one.

Getting ready

The complete source code for this tutorial is located under the project camel-message-routing-examples/aggregating-messages.

How to do it...

  1. When using Aggregator, the first thing we have to decide is how the messages will be merged together. This is very specific to the actual message content and business logic of the application. We provide that logic by implementing the AggregationStrategy interface:

    public class InvoiceTotalAggregator implements AggregationStrategy {
        public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
            if (oldExchange == null) {
                return newExchange;
            }
            BigDecimal currentTotal = oldExchange.getIn().getHeader("invoiceItemTotal", BigDecimal.class);
            BigDecimal itemTotal = newExchange.getIn().getHeader("invoiceItemTotal", BigDecimal.class);
            oldExchange.getIn().setHeader("invoiceItemTotal", currentTotal.add(itemTotal));
            return oldExchange;
        }
    }
  2. Then, we add the Aggregator to the route by referencing our custom aggregation strategy:

    <bean id="invoiceTotalAggregator" class="org.apache.camel.howto.InvoiceTotalAggregator"/>
    
    <camelContext xmlns="http://camel.apache.org/schema/spring">
        <route>
            <from uri="direct:start"/>
            <aggregate strategyRef="invoiceTotalAggregator" completionTimeout="3000">
                <correlationExpression>
                    <simple>header.invoiceId</simple>
                </correlationExpression>
                <to uri="mock:aggregated"/>
            </aggregate>
        </route>
    </camelContext>

The Aggregator also has two other mandatory settings: the correlation expression and the completion condition. The first one is evaluated on each incoming message to generate a correlation key. The key is used to determine which messages should be grouped together. In our example, the key would be the invoiceId, so all invoice item messages belonging to the same invoice will be merged into one message. The completion condition is used to tell Camel when the aggregation is completed and the currently built message should be sent further down the route.

How it works...

When a new message comes in, the first thing done by the Aggregator is to evaluate the correlation expression and get the correlation key. Then the correlation key is used to look up the internal repository for an existing message with the same correlation key. The Aggregator has an internal (in memory by default) repository that contains the current aggregated Exchange for each correlation key. The next step is to pass the old Exchange (if there is one) and the new Exchange to the AggregationStrategy to merge them. After the aggregation, the completeness condition is checked, and if it is satisfied the aggregated Exchange is sent to the next processor in the route. If the aggregation has not completed yet, the internal repository is updated with the aggregated Exchange, and it continues waiting for the next Exchange to complete it or for timeout to occur.

There are a number of ways for specifying the completion condition and at least one should be present: completionPredicate is a Predicate expression which has to evaluate to true in order to complete the aggregated Exchange; completionSize states how many messages should be aggregated to complete the aggregation, it can be a fixed number or dynamically retrieved from the Exchange with an expression. completionTimeout is based on an inactivity period in milliseconds of the aggregated message, and completes the aggregated Exchange if there are no further incoming messages within the specified time period; completionInterval is similar but completes all currently aggregated messages after each time interval in milliseconds. The latter two completion conditions are asynchronous conditions, because these conditions are not evaluated on each incoming message, they run in a background thread and whenever a condition is satisfied the corresponding aggregated message starts its journey on the route.

There's more...

The Aggregator is usually used together with other patterns. Here are two common integration patterns that the Aggregator takes part in:

Composed message processor

This pattern is useful for maintaining the incoming message flow while processing each element of the composite messages in a separate flow.

In the preceding routing diagram, we can see how the incoming composite message is split up into sub messages and then each sub message is routed to an appropriate destination using a Context-Based Router. After each sub message has been processed differently based on its type, they are aggregated back into a single message using Aggregator. Notice that the composed message processor pattern can also be achieved without using Aggregator explicitly. If you remember from the previous tutorial that Splitter has an embedded Aggregator in the form of AggregationStrategy. Using that strategy, Splitter can aggregate the results from each sub message and have the same effect as having an explicit Aggregator.

Scatter-gather

Here, a message is broadcast to multiple channels using the recipients list pattern or the publish subscribe mechanism. The difference in this case is that there is no Splitter involved, so each recipient gets the same copy of the incoming message.

After the message has been processed by multiple recipients, Aggregator receives the results and produces one single result. In this scenario, Aggregator usually does not merge results; instead it picks up only one of them because they are identical, in the example in the preceding diagram – the "Best" Quote.