Book Image

Instant Apache ActiveMQ Messaging Application Development How-to

By : Timothy A. Bish
Book Image

Instant Apache ActiveMQ Messaging Application Development How-to

By: Timothy A. Bish

Overview of this book

Apache ActiveMQ is a powerful and popular open source messaging and Integration Patterns server. ActiveMQ is a fully JMS 1.1 compliant Message Broker and supports many advanced features beyond the JMS specification.Instant ActiveMQ Application Development How-to shows you how to get started with the ActiveMQ Message Broker. You will learn how to develop message-based applications using ActiveMQ and the JMS specification. In this book you will learn all the basic skills you need to start writing Java Messaging applications with a firm grounding in the more advanced features of ActiveMQ, giving you the tools to continue to master application development using ActiveMQ. Starting by applying the messaging features of the JMS specification to write basic messaging applications, you will develop a basic JMS application using topics and queues to broadcast events as well as perform Request and Response operations over the JMS. Once you have mastered the simple tasks you will move onto using the advanced features in ActiveMQ to supercharge your messaging applications. You will get to grips with ActiveMQ's scheduler to delay messages. You will also learn how to leverage ActiveMQ's fault-tolerant capabilities to create robust client applications.
Table of Contents (7 chapters)

Selecting messages (Simple)


Building on what we learned in the preceding recipe, we will modify our stock price consumer to use JMS selectors to filter the events it receives from the update service.

Getting ready

For this recipe we will use two examples. The first is event-producer that we already saw in the last recipe, and the second is selective-event-consumer that implements a simple stock price ticker application that uses JMS selectors to limit which stock price updates it receives.

How to do it...

To run the sample for this recipe you will need to perform the following steps:

  1. Open a terminal and start a broker.

  2. Open a second terminal, change to the directory where the selective-event-consumer file is located, and run the consumer by typing mvn compile exec:java. (You can use Ctrl + C to terminate this application or it will stop on its own after five minutes.)

  3. Open a third terminal, change to the directory where event-producer is located, and run the producer by typing mvn compile exec:java.

In the terminal where you started the producer you will see an output like the following code snippet, indicating that the producer is sending events that our consumer is interested in:

Starting example Stock Ticker Producer now...
Producer sending price update(0)
AAPL: $ (113.95001)
GOOG: $ (645.9444)
MSFT: $ (514.129)
ORCL: $ (469.03384)
Producer sending price update(1)
AAPL: $ (163.04951)
GOOG: $ (560.21594)

And in the terminal window where you started the consumers, you should see that the consumer is receiving the stock price update events our producer is firing, but only the ones with the ticker symbol GOOG:

Starting example Stock Ticker Consumer now...
Price Update: GOOG[$330.88095]
Price Update: GOOG[$873.3398]
Price Update: GOOG[$423.3238]
Price Update: GOOG[$256.38521]

How it works...

There are times when you want the consumer messages from a topic or queue but you need only a subset of the messages that are being placed into the destination. An example of this would be an application that only wants to process orders for North America, or in the case of our sample application, we only want to read stock price updates for a specific company. To accomplish this we employ the JMS selector functionality in our topic subscription.

What is a JMS selector?

A selector is a way of attaching a filter to a given subscription, also known as content-based routing. In JMS, the selector uses message properties and headers compared against Boolean expressions to filter messages. The Boolean expressions are defined using SQL92 syntax. The following table summarizes the selector language used in JMS:

Element

Valid values

Identifiers

Property or header field reference (such as JMSCorrelationID, price, and date).

The following values are not possible: NULL, TRUE, FALSE, NOT, AND, OR, BETWEEN, LIKE, IN, IS.

Operators

AND, OR, LIKE, BETWEEN, =, <>, <, >, <=, >=, IS NULL, IS NOT NULL.

Literals

The two Boolean literals, TRUE and FALSE.

Exact number literals that have no decimal point; for example, +20 and -256, 42.

Approximate number literals. These literals can use scientific notation or decimals; for example, -21.4E4, 5E2 and +34.4928.

Adding selectors to subscriptions

The code for the sample stock price consumer application in this recipe is identical to the consumer code from the previous recipe except for a few small changes. Let's take a look at the code and see how we apply a selector to our consumer's subscription:

public class SelectiveTickerConsumer {

    private final String connectionUri = "tcp://localhost:61616";
    private ActiveMQConnectionFactory connectionFactory;
    private Connection connection;
    private Session session;
    private Destination destination;
    private String selector;

    public void before() throws Exception {
        connectionFactory = new 
            ActiveMQConnectionFactory(connectionUri);
        connection = connectionFactory.createConnection();
        connection.start();
        session = connection.createSession(
            false, Session.AUTO_ACKNOWLEDGE);
        destination = session.createTopic("EVENTS.QUOTES");
        selector = System.getProperty(
            "QuoteSel", "symbol = 'GOOG'");
    }

    public void run() throws Exception {
        System.out.println(" Running example with selector: " + selector);

        MessageConsumer consumer = 
             session.createConsumer(destination, selector);
        consumer.setMessageListener(new EventListener());
        TimeUnit.MINUTES.sleep(5);
        connection.stop();
        consumer.close();
    }
}

As you can see in the sample code, we added a variable named selector to our consumer application and initialized it by reading a system property named QuoteSel, which defaults to symbol = 'GOOG'. We then pass in our set selector value to the Session object's createConsumer() method and that's it; the remainder of the code is unchanged.

By default our example stock price consumer will now only receive price updates for the ticker symbol GOOG. We can alter this by running the consumer application again and specifying a different selector and then running the price update producer application again. Try it yourself by running the demos again but using some different selector values.

To receive updates for both GOOG and AAPL run your consumer with:

$mvn exec:java –DQuoteSel="symbol='GOOG' OR symbol='AAPL'

To receiver updates for MSFT but only when the price is greater than $800 try:

$mvn exec:java –DquoteSel="symbol='MSFT' AND price >= 800"

Practice yourself by coming up with more selectors on your own and rerunning the consumer and producer applications.

There's more...

The JMS specification covers the various syntax rules governing its selector expression language quite well in the API documentation for the Message object; you can review the API documentation here: http://docs.oracle.com/javaee/1.4/api/javax/jms/Message.html.

Sparse matching selectors

ActiveMQ uses a paging architecture for messages that are stored on the broker. By default only a certain number of pages are brought into memory at any given time for a destination and this can have an impact on your consumers that use message selectors. If the destination contains a large number of messages and the selector doesn't match any of the messages that have been paged in, your consumer can stall until either the messages that are paged in are acknowledged or expired.

This sort of scenario happens most often when your selector is a sparse matching selector meaning it only matches a few messages out of the larger whole. If this happens, you can work around this issue by increasing the page size ActiveMQ uses for a destination. Refer to the documentation on setting destination policies for more information:

http://activemq.apache.org/per-destination-policies.html