Book Image

Instant Apache ActiveMQ Messaging Application Development How-to

By : Timothy A. Bish
Book Image

Instant Apache ActiveMQ Messaging Application Development How-to

By: Timothy A. Bish

Overview of this book

Apache ActiveMQ is a powerful and popular open source messaging and Integration Patterns server. ActiveMQ is a fully JMS 1.1 compliant Message Broker and supports many advanced features beyond the JMS specification.Instant ActiveMQ Application Development How-to shows you how to get started with the ActiveMQ Message Broker. You will learn how to develop message-based applications using ActiveMQ and the JMS specification. In this book you will learn all the basic skills you need to start writing Java Messaging applications with a firm grounding in the more advanced features of ActiveMQ, giving you the tools to continue to master application development using ActiveMQ. Starting by applying the messaging features of the JMS specification to write basic messaging applications, you will develop a basic JMS application using topics and queues to broadcast events as well as perform Request and Response operations over the JMS. Once you have mastered the simple tasks you will move onto using the advanced features in ActiveMQ to supercharge your messaging applications. You will get to grips with ActiveMQ's scheduler to delay messages. You will also learn how to leverage ActiveMQ's fault-tolerant capabilities to create robust client applications.
Table of Contents (7 chapters)

Event processing with topics (Simple)


In this recipe we will explore how to use JMS topics as event channels by looking at a sample stock price update service and a simple consumer of those events.

Getting ready

For this recipe we will use the examples named event-publisher and event-subscriber that implement a simple stock price ticker application.

How to do it...

To run the sample for this recipe you will need to perform the following steps:

  1. Open a terminal and start a broker.

  2. Open two more terminals, change to the directory where the event-subscriber file is located in each, and run the consumer by typing mvn compile exec:java. (You can use Ctrl + C to terminate this application or it will stop on its own after five minutes.)

  3. Open a fourth terminal, change to the directory where the event-publisher file is located, and run the producer by typing mvn compile exec:java.

In the terminal where you started the producer, you will see an output like the following code snippet indicating that the producer is sending events that our consumers are interested in:

Starting example Stock Ticker Producer now...
Producer sending price update(0)
AAPL: $ (113.95001)
GOOG: $ (645.9444)
MSFT: $ (514.129)
ORCL: $ (469.03384)
Producer sending price update(1)
AAPL: $ (163.04951)
GOOG: $ (560.21594)

And in the terminal windows where you started the consumers you should see that each consumer is receiving the stock price update events our producer is firing:

Starting example Stock Ticker Consumer now...
Price Update: AAPL[$160.8296]
Price Update: GOOG[$330.88095]
Price Update: MSFT[$301.13727]
Price Update: ORCL[$533.9956]
Price Update: AAPL[$591.5656]
Price Update: GOOG[$873.3398]

How it works...

This example leverages the JMS topic to send stock price updates to interested clients. A JMS topic represents a type of message domain known as publish-subscribe messaging. This domain differs from point-to-point messaging in two key ways. First, the messages sent to a topic are only dispatched to consumers that were listening before the message was sent. Second, every consumer listening on a topic will get a copy of the messages sent to a topic.

JMS topics

In the preceding figure we can see the typical JMS topic usage scenario. We have a single producer that's sending messages to a particular topic. There are two consumers that are interested in receiving messages sent to the topic and as our producer sends its messages the JMS provider dispatches a copy of the message to each consumer. If the producer were to send its messages to the topic and neither of our subscribers were currently registered, the JMS provider would simply discard those messages.

The JMS topic is ideal when you want to send notifications to clients but it's not essential that offline clients be able to consume that data when they come online again. Our stock ticker application is a good example of a use case for such a messaging domain. The client has no need for the old stock prices it only cares about the present pricing data.

Our simple stock ticker producer publishes a series of price updates for a set of stock ticker symbols to a JMS topic every five seconds. Each stock ticker consumer that we run will receive the same price update messages as they are produced. If, however, we run the price update producer to completion and then start our consumer, they would not receive any data since the JMS topic domain doesn't retain messages by default when there are no consumers.

Let's take a look at the producer and consumer applications now and see how the JMS topic messaging domain works in practice.

The stock price producer

The code for our simple stock price update producer is shown next, as you review the code take note of how much it looks like the producer from the previous recipe:

public class TickerProducer {

    private final String connectionUri = "tcp://localhost:61616";
    private ActiveMQConnectionFactory connectionFactory;
    private Connection connection;
    private Session session;
    private Destination destination;

    private final Random pricePicker = new Random();
    private final ArrayList<String> symbols = new ArrayList<String>(3);

    public void before() throws Exception {
        connectionFactory = new
            ActiveMQConnectionFactory(connectionUri);
        connection = connectionFactory.createConnection();
        session = connection.createSession(
            false, Session.AUTO_ACKNOWLEDGE);
        destination = session.createTopic("EVENTS.QUOTES");
        symbols.add("AAPL"); symbols.add("GOOG");
        symbols.add("MSFT"); symbols.add("ORCL");
    }

    public void run() throws Exception {
        MessageProducer producer = 
            session.createProducer(destination);

        for (int i = 0; i < 10000; ++i) {
            System.out.println("Producer sending price update("+i+")");
            for (String symbol : symbols) {
                Message message = session.createMessage();
                message.setStringProperty("symbol", symbol);
                message.setFloatProperty(
                    "price", pricePicker.nextFloat() * 1000);
                producer.send(message);
            }
            Thread.sleep(5);
        }

        producer.close();
    }
}

Comparing this code to our previous queue producer code, you may have noticed that the only difference is that we created a topic destination from our session in the before() method as opposed to creating a queue destination. The JMS API makes it a simple task to transition from one messaging domain to another. If we needed our stock price updates to be preserved until some application processes each and every one of them, we could simply create a queue destination and send them there.

The stock price consumer

Our stock price update consumer is also nearly identical to the previous recipe's consumer code. The consumer code is shown next. We will follow the same model as our previous consumer by creating a class to do the JMS setup and shutdown work and then delegate the message processing to a MessageListener object.

public class TickerConsumer {

    private final String connectionUri = "tcp://localhost:61616";
    private ActiveMQConnectionFactory connectionFactory;
    private Connection connection;
    private Session session;
    private Destination destination;
    public void before() throws Exception {
        connectionFactory = new 
            ActiveMQConnectionFactory(connectionUri);
        connection = connectionFactory.createConnection();
        connection.start();
        session = connection.createSession(
            false, Session.AUTO_ACKNOWLEDGE);
        destination = session.createTopic("EVENTS.QUOTES");
    }

    public void run() throws Exception {
        MessageConsumer consumer = 
            session.createConsumer(destination);
        consumer.setMessageListener(new EventListener());
        TimeUnit.MINUTES.sleep(5);
        connection.stop();
        consumer.close();
    }
}

The MessageListener code is also just like our previous recipe's MessageListener; we only change what information is gathered from the message we receive. Notice that we use the simple message type as opposed to TextMessage since our stock pricing data is stored in the message's properties, we don't need a message body here:

public class EventListener implements MessageListener {
    public void onMessage(Message message) {
        try {
            float price = message.getFloatProperty("price");
            String symbol = message.getStringProperty("symbol");
        } catch (Exception e) {
        }
    }
}

There's more...

While it's true that the default behavior of a JMS topic is to not retain any message that is sent to it when there are no active consumers, there are cases where you might want to have a consumer go offline and, when they come back online, receive any messages that were sent while they were offline. The JMS specification provides for this by defining a type of topic subscription known as a durable subscription.

To create a durable subscription your code must call the createDurableSubscriber() method of a Session instance as well as give your connection a unique name that your application must reuse each time it runs. Let's take a look at the changes we would need to make to our stock price consumer to make it a durable subscriber:

    public void before() throws Exception {
        connectionFactory = new 
            ActiveMQConnectionFactory(connectionUri);
        connection = connectionFactory.createConnection();
        connection.setClientId("PriceConsumer");
        connection.start();
    }

    public void run() throws Exception {
        MessageConsumer consumer = 
            session.createDurableSubscription(
                destination, "DurableConsumer");
    }

The changes are quite minor, we simply set a client ID for our connection in the before() method and called durable subscription to create a method in the run() method. Because a durable subscription doesn't exist until it's created once, we won't see any messages if we run the consumer just once after running the stock price update producer. However, if we run the consumer once, then run the producer and the consumer a second time afterwards, we will receive all the stock price updates sent while it was offline.

Things to keep in mind when using durable subscriptions

There are several limitations that you should keep in mind when using durable subscriptions, let's take a brief look at them.

  • Each JMS connection that is used to create a durable subscription must be assigned a unique client ID in order for the provider to identify the client each time its durable subscription is activated. Additionally the subscription needs its own ID value as well.

  • The client must create the durable subscription at least once before the JMS provider will begin storing messages for that subscription.

  • Only one JMS connection can be active at any given time for the same client ID and subscription ID meaning no load balancing is possible for multiple consumers on the same logical subscription.

These limitations make using durable subscription much more complicated than using a queue when message durability is a must. ActiveMQ provides a mechanism to help mitigate the limitations of durable subscriptions known as Virtual Destinations, which you should investigate any time you start to think that a durable subscription sounds like it might fit your use case. You can read about Virtual Destinations at http://activemq.apache.org/virtual-destinations.html.

In a later recipe, we will look at how we can use ActiveMQ's Virtual Destinations feature to overcome the limitations of JMS durable topic subscriptions in our applications.