Book Image

Instant Apache Camel Message Routing

By : Bilgin Ibryam
Book Image

Instant Apache Camel Message Routing

By: Bilgin Ibryam

Overview of this book

With new APIs and technologies emerging every day, the need for integrating applications is greater than ever before. With the right tools, integrating applications is not hard. Apache Camel is the leading open source integration and message orchestration framework. Apache Camel, which has a variety of connectors and features numerous well-known integration patterns, has an enormous advantage over home grown integration solutions. Instant Apache Camel Message Routing helps you to get started using the Camel routing engine and Enterprise Integration Patterns. This book will show you how to create integration applications using Apache Camel. You will learn how Camel works and how to leverage the Enterprise Integration Patterns for message routing. Instant Apache Camel Message Routing is a practical and step-by-step guide to Apache Camel and integration patterns. This book will show you how Apache Camel works and how it integrates disparate systems using Enterprise Integration Patterns. The book starts with a high level overview of the Camel architecture before diving into message routing principles. Then, it introduces a number of patterns, complete with diagrams, common use cases, and examples about how to use them with Camel. The book also shows you how to test and monitor Camel applications and cope with failure scenarios.
Table of Contents (7 chapters)

Reorganizing messages (Intermediate)


In an asynchronous distributed application, messages can easily get out of order. Sometimes, this is due to a limitation of some endpoints, for example the Amazon Simple Queue Service not guaranteeing in order delivery of messages. And sometimes, it is a conscious architectural decision (for example to increase throughput), maybe there is a Splitter or Context-Based Router used to split long running tasks into separate threads which is losing the original message order. No matter what the reason is, when the messages have to be (re)ordered, Resequencer is the pattern to use.

Getting ready

The complete source code for this tutorial is located under the project camel-message-routing-examples/reorganizing-messages.

How to do it...

Let's start with a route that is supposably getting random messages, and we want to put them back in order. Here is how a Resequencer definition looks in Java DSL:

from("direct:start")
    .resequence(header("message_index"))
    .batch().size(100).timeout(1000L)
    .to("mock:result");

The main piece of information Resequencer needs to know is the criteria that will be used to order the messages. This can be the message body itself, or a header or property and is specified using an expression. In addition to the header used for ordering messages, in our example there are two other options: batchSize and batchTimeout. These options control how long the Resequencer should wait and collect messages before sorting and letting them go.

How it works...

In the previous example, the Resequencer will collect up to 100 messages or will wait up to one second (whichever happens first), and then if there are any messages batched, sort them by the message_index header value, and let the messages further down the pipeline. When used in the default batching mode, Resequencer can also do reverse sorting, allow duplicate messages, and reject old messages or messages with an invalid sequence number. The downside of the batch processing algorithm is that it always collects messages up to a certain number or timeout, reducing the overall throughput of the system. There is also a stream based algorithm which reorders messages continuously based on detections of gaps between messages rather than fixed batch size. In order to use stream-based Resequencer, the messages must contain a unique sequence number for which the predecessor and successor is known. For example, the message sequence 1, 2, 4 has a gap, and the Resequencer will let message 1 and 2 go instantly without any batching, but retain message 4 until 3 arrives or timeout occurs. Configuring a streaming based Resequencer is very similar to a batch based one, except this one has the timeout and capacity options:

from("direct:start")
    .resequence(header("message_index"))
    .stream().capacity(100).timeout(1000L)
    .to("mock:result");

More information about the Resequencer pattern and how to use it can be found here at http://camel.apache.org/resequencer.html.

There's more...

There are two other patterns that work on the message flow without modifying the message content. Let's have a look at Throttler and Delayer patterns.

Limiting flow rate with Throttler

Throttler is a simple pattern useful for organizing messages in time. It works by throttling down messages to a specified maximum rate in order to protect a target endpoint from getting overloaded. For example, some APIs do not allow frequent calls or incur extra charges when a certain rate is exceeded. A snippet like the following will prevent the mock:result endpoint from getting more than three requests per 10 seconds during peak load times:

<route>
    <from uri="seda:a"/>
        <throttle maximumRequestsPerPeriod="3" timePeriodMillis="10000">
            <to uri="mock:result"/>
        </throttle>
</route>

The option maximumRequestsPerPeriod doesn't have to be hardcoded in the route definition, it can also be calculated at runtime using expressions.

Throttler can also work in non-blocking mode, so when the number of messages exceeds the maximum rate, instead of blocking the caller thread, it will schedule a task to be executed in the future using a separate thread. This lets the caller process further incoming messages while still honoring the maximum rate after the Throttler.

Delaying messages

The Delayer pattern functions very similar to Throttler, but instead of introducing delay only when the maximum rate is exceeded, it always delays the messages by the specified amount of time. One good use case for this pattern is route testing and simulating long running processes:

<route>
    <from uri="direct:start"/>
    <delay asyncDelayed="true">
        <constant>1000</constant>
    </delay>
    <to uri="mock:result"/>
</route>

Similarly for Throttler, it is possible to specify the delay option using expression or make it non-blocking using the asyncDelayed option.