Book Image

Instant Apache ActiveMQ Messaging Application Development How-to

By : Timothy A. Bish
Book Image

Instant Apache ActiveMQ Messaging Application Development How-to

By: Timothy A. Bish

Overview of this book

Apache ActiveMQ is a powerful and popular open source messaging and Integration Patterns server. ActiveMQ is a fully JMS 1.1 compliant Message Broker and supports many advanced features beyond the JMS specification.Instant ActiveMQ Application Development How-to shows you how to get started with the ActiveMQ Message Broker. You will learn how to develop message-based applications using ActiveMQ and the JMS specification. In this book you will learn all the basic skills you need to start writing Java Messaging applications with a firm grounding in the more advanced features of ActiveMQ, giving you the tools to continue to master application development using ActiveMQ. Starting by applying the messaging features of the JMS specification to write basic messaging applications, you will develop a basic JMS application using topics and queues to broadcast events as well as perform Request and Response operations over the JMS. Once you have mastered the simple tasks you will move onto using the advanced features in ActiveMQ to supercharge your messaging applications. You will get to grips with ActiveMQ's scheduler to delay messages. You will also learn how to leverage ActiveMQ's fault-tolerant capabilities to create robust client applications.
Table of Contents (7 chapters)

Using the JMS request/response pattern (Intermediate)


In this recipe we are going to take a look at a messaging pattern known as request/response messaging.

Getting ready

We will use two examples for this recipe: jms-requester and jms-responder. Together they implement a simple request/response application.

How to do it...

To run the samples for this recipe, you will need to perform the following steps:

  1. Open a terminal and start a broker.

  2. Open a second terminal, change to the directory where the jms-responder example is located, and run it by typing mvn compile exec:java. (You can shut down the example by pressing Ctrl + C when done or allow it to stop on its own after five minutes.)

  3. Open a third terminal, change to the directory where the jms-requester example is located, and run the example by typing mvn compile exec:java.

In the terminal where you started the jms-requester, you will see the following output indicating that the request application is sending request messages and receiving responses:

Starting Requester example now...
Job Request
Job Request
Job Finished
Job Request
Job Request
Job Finished
Job Request
Job Finished
Job Finished
Woohoo! Work's all done!
Finished running the Requester example.

And in the terminal window where you started the jms-responder example, you should see that the responder is receiving the request events from our requester example.

Starting Responder example now...
Job Request
Job Request
Job Request
Job Request
Job Request
Job Request

How it works...

While decoupling of messaging applications is a primary driver of the JMS specification, there are cases where an application needs to send a request and will not continue until it receives a response indicating its request was handled. This sort of messaging pattern is known as request/response messaging, and you can think of it as a sort of Remote Procedure Call (RPC) over JMS if that helps.

Traditionally, this type of architecture has been implemented using TCP client and server applications that operate synchronously across a network connection. There are several problems that arise in this implementation, the biggest of which can be scaling. Because the TCP client and server are tightly coupled, it's difficult to add new clients to handle an increasing workload. Using messaging based architecture we can reduce or eliminate this scaling issue along with other issues of fault tolerance and so on. In the messaging paradigm, a requester sends a request message to a queue located on a remote broker and one or more responders can take this message, process it, and return a response message to the requester. In the following diagram, we see a visual depiction of the typical implementation of the request/response pattern over JMS:

The JMS request/response pattern

As we can see in the previous diagram, each JMS request/response application follows a simple fixed pattern. But there are a lot of moving parts we need to understand. Let's now examine the source code for the examples we executed earlier and see how this messaging pattern is implemented in our JMS applications.

The JMS request application

Our simpler requester application places several requests onto a queue on our ActiveMQ Broker, and then it waits until all its outstanding requests are processed before finishing. Let's take a look at this in the following code:

public class RequesterExample implements MessageListener {

  private final String connectionUri = "tcp://localhost:61616";
  private ActiveMQConnectionFactory connectionFactory;
  private Connection connection;
  private Session session;
  private Destination destination;
  private static final int NUM_REQUESTS = 10;
  private final CountDownLatch done = new CountDownLatch(NUM_REQUESTS);

  public void before() throws Exception {
    connectionFactory = new
        ActiveMQConnectionFactory(connectionUri);
    connection = connectionFactory.createConnection();
    connection.start();
    session = connection.createSession(
        false, Session.AUTO_ACKNOWLEDGE);
    destination = session.createQueue("REQUEST.QUEUE");
  }

  public void run() throws Exception {
    TemporaryQueue responseQ = session.createTemporaryQueue();
    MessageProducer requester =
        session.createProducer(destination);
    MessageConsumer responseListener = 
        session.createConsumer(responseQ);
    responseListener.setMessageListener(this);

    for (int i = 0; i < NUM_REQUESTS; i++) {
        TextMessage request = 
            session.createTextMessage("Job Request");
        request.setJMSReplyTo(responseQ);
        request.setJMSCorrelationID("request: " + i);
        requester.send(request);
    }
    if (done.await(10, TimeUnit.MINUTES)) {
        System.out.println("Woohoo! Work's all done!");
    } 
    else {
        System.out.println("Doh!! Work didn't get done.");
    }
  }

  public void onMessage(Message message) {
    try {
        String jmsCorrelation = message.getJMSCorrelationID();
        if (!jmsCorrelation.startsWith("request")) {
            System.out.println("Received an unexpected response: " + jmsCorrelation);
        }
        TextMessage txtResponse = (TextMessage) message;
        System.out.println(txtResponse.getText());
        done.countDown();
    }
    catch (Exception ex) {
    }
  }
}

As you can see in the preceding code, the request application creates a JMS MessageProducer object for sending its request along with a MessageConsumer object to listen for the response. We've already seen code similar to this in other recipes; the question here is about how we told the responder where to send its response.

The JMSReplyTo property for a JMS message was added just for this sort of messaging pattern. The responder application doesn't have to know anything about preconfigured destinations for sending responses, it just needs to query the message for its JMSReplyTo address. This is yet another example of the benefits of the loose coupling that comes from using the JMS API.

In our sample application, we create a JMS temporary queue and assign that to JMSReplyTo for every request message we send. A JMS temporary destination works a lot like its non-temporary counterpart, however there are three key differences:

  • The lifetime of a temporary destination is tied to that of the connection object that created it. Once the connection is closed, the temporary destination is destroyed.

  • Only a MessageConsumer object created from the connection that created the temporary destination can consume from that temporary destination.

  • Temporary destinations don't offer the message persistence or reliability guarantees that normal destinations do.

Did you notice that we also assign a correlation ID to each request? Why do we do that? Since there could be multiple responses for requests we've sent, we might want to match up each response to ensure all our work gets done. For example, our application could have stored the correlation IDs in a map along with the job data and matched up the responses. Also, our application could have checked on a timeout if any outstanding requests hadn't arrived and could have resubmitted the unfinished job, or logged a warning to the administrator. The JMSCorrelationID field allows you to build this sort of book keeping into your request/response applications easily.

The JMS response application

In order to implement our simple responder application, we assemble many of the same JMS elements we did in the requester example; lets take a look at this in the following code:

public class ResponderExample implements MessageListener {

  private final String connectionUri = "tcp://localhost:61616";
  private ActiveMQConnectionFactory connectionFactory;
  private Connection connection;
  private Session session;
  private Destination destination;
  private MessageConsumer requestListener;
  private MessageProducer responder;

  public void before() throws Exception {
    connectionFactory = new 
        ActiveMQConnectionFactory(connectionUri);
    connection = connectionFactory.createConnection();
    connection.start();
    session = connection.createSession(
        false, Session.AUTO_ACKNOWLEDGE);
    destination = session.createQueue("REQUEST.QUEUE");
    responder = session.createProducer(null);
    requestListener = session.createConsumer(destination);
    requestListener.setMessageListener(this);
  }

  public void run() throws Exception {
    TimeUnit.MINUTES.sleep(5);
  }

  public void onMessage(Message message) {
    try {
        Destination replyTo = message.getJMSReplyTo();
        if (replyTo != null) {
            TextMessage textMessage = (TextMessage) message;
            System.out.println(textMessage.getText());
            Message response = 
                session.createTextMessage("Job Finished");
            response.setJMSCorrelationID(
                message.getJMSCorrelationID());
            responder.send(replyTo, response);
        }
    }
    catch (Exception e) {
        System.out.println(
            "Encounted an error while responding: " + 
            e.getMessage());
    }
  }
}

As you can see, we create both a producer and consumer for our response example once again; however, we did something a bit different here. Did you spot it? The MessageProducer object we created was assigned a null destination; this is called an anonymous producer. We can use the anonymous producer to send messages to any destination, which is great since we don't know at startup which destination we are going to publish our responses to. We'd rather not create a new MessageProducer object every time a message arrives since that would add more network traffic and load to our broker.

When a request message is received by the responder application, it queries the message for the JMSReplyTo destination to which it should send the response. Once the responder knows where to send its answer, it constructs the appropriate response message and assigns it the JMSCorrelationID method that the requester will use to identify the response prior to sending it back to the responder.

Now we can start to imagine how our request/response applications can scale as the number of responders can grow with the increasing workload. If we need a new responder, we can spin up another instance on a different machine; it will share the load with all the others by taking one request at a time off the shared request queue and eventually sending its response back when its done. If, for instance, each request takes about a minute to complete, our sample would finish in about ten minutes. But if we add another responder application, we can cut that time in half.

There's more...

Every JMS request/response application we develop follows the basic pattern we saw in this recipe. Looking at the code, we start to see how JMS code can become a bit tedious with all the setup of connections, sessions, producers and consumers. Fortunately, there are ways to reduce the amount of work needed by using other software components that exist on top of the JMS API and provide simpler abstractions to problems such as the request/response pattern. One such solution is the Apache Camel project (http://camel.apache.org/). Apache Camel is what's known as an integration framework that implements well-known Enterprise Integration Patterns (EIPs) such as request/response.

Not only do Camel as well as other EIP frameworks provide a simpler way of implementing patterns such as request/response, they also build in error handling and recovery mechanics that make the developer's life simpler. It's a good idea to explore an EIP framework when you need to implement the more complex messaging patterns.

Some things to remember about temporary destinations

In an application that does a lot of request/response messaging, using temporary destinations for each response can create a large build up of resource utilization on the ActiveMQ Broker. Remember that temporary destinations don't go away until the connection that created them does. So, for a long running application that uses a connection, those destination resources created by the connection would never be recovered. In ActiveMQ, it's possible to configure a periodic cleanup of older inactive destinations to recover otherwise wasted resources. The documentation for this feature can be found at: http://activemq.apache.org/delete-inactive-destinations.html