In this recipe we will explore how to use JMS topics as event channels by looking at a sample stock price update service and a simple consumer of those events.
For this recipe we will use the examples named event-publisher
and event-subscriber
that implement a simple stock price ticker application.
To run the sample for this recipe you will need to perform the following steps:
Open a terminal and start a broker.
Open two more terminals, change to the directory where the
event-subscriber
file is located in each, and run the consumer by typingmvn compile exec:java
. (You can use Ctrl + C to terminate this application or it will stop on its own after five minutes.)Open a fourth terminal, change to the directory where the
event-publisher
file is located, and run the producer by typingmvn compile exec:java
.
In the terminal where you started the producer, you will see an output like the following code snippet indicating that the producer is sending events that our consumers are interested in:
Starting example Stock Ticker Producer now... Producer sending price update(0) AAPL: $ (113.95001) GOOG: $ (645.9444) MSFT: $ (514.129) ORCL: $ (469.03384) Producer sending price update(1) AAPL: $ (163.04951) GOOG: $ (560.21594)
And in the terminal windows where you started the consumers you should see that each consumer is receiving the stock price update events our producer is firing:
Starting example Stock Ticker Consumer now... Price Update: AAPL[$160.8296] Price Update: GOOG[$330.88095] Price Update: MSFT[$301.13727] Price Update: ORCL[$533.9956] Price Update: AAPL[$591.5656] Price Update: GOOG[$873.3398]
This example leverages the JMS topic to send stock price updates to interested clients. A JMS topic represents a type of message domain known as publish-subscribe messaging. This domain differs from point-to-point messaging in two key ways. First, the messages sent to a topic are only dispatched to consumers that were listening before the message was sent. Second, every consumer listening on a topic will get a copy of the messages sent to a topic.
In the preceding figure we can see the typical JMS topic usage scenario. We have a single producer that's sending messages to a particular topic. There are two consumers that are interested in receiving messages sent to the topic and as our producer sends its messages the JMS provider dispatches a copy of the message to each consumer. If the producer were to send its messages to the topic and neither of our subscribers were currently registered, the JMS provider would simply discard those messages.
The JMS topic is ideal when you want to send notifications to clients but it's not essential that offline clients be able to consume that data when they come online again. Our stock ticker application is a good example of a use case for such a messaging domain. The client has no need for the old stock prices it only cares about the present pricing data.
Our simple stock ticker producer publishes a series of price updates for a set of stock ticker symbols to a JMS topic every five seconds. Each stock ticker consumer that we run will receive the same price update messages as they are produced. If, however, we run the price update producer to completion and then start our consumer, they would not receive any data since the JMS topic domain doesn't retain messages by default when there are no consumers.
Let's take a look at the producer and consumer applications now and see how the JMS topic messaging domain works in practice.
The code for our simple stock price update producer is shown next, as you review the code take note of how much it looks like the producer from the previous recipe:
public class TickerProducer { private final String connectionUri = "tcp://localhost:61616"; private ActiveMQConnectionFactory connectionFactory; private Connection connection; private Session session; private Destination destination; private final Random pricePicker = new Random(); private final ArrayList<String> symbols = new ArrayList<String>(3); public void before() throws Exception { connectionFactory = new ActiveMQConnectionFactory(connectionUri); connection = connectionFactory.createConnection(); session = connection.createSession( false, Session.AUTO_ACKNOWLEDGE); destination = session.createTopic("EVENTS.QUOTES"); symbols.add("AAPL"); symbols.add("GOOG"); symbols.add("MSFT"); symbols.add("ORCL"); } public void run() throws Exception { MessageProducer producer = session.createProducer(destination); for (int i = 0; i < 10000; ++i) { System.out.println("Producer sending price update("+i+")"); for (String symbol : symbols) { Message message = session.createMessage(); message.setStringProperty("symbol", symbol); message.setFloatProperty( "price", pricePicker.nextFloat() * 1000); producer.send(message); } Thread.sleep(5); } producer.close(); } }
Comparing this code to our previous queue producer code, you may have noticed that the only difference is that we created a topic destination from our session in the before()
method as opposed to creating a queue destination. The JMS API makes it a simple task to transition from one messaging domain to another. If we needed our stock price updates to be preserved until some application processes each and every one of them, we could simply create a queue destination and send them there.
Our stock price update consumer is also nearly identical to the previous recipe's consumer code. The consumer code is shown next. We will follow the same model as our previous consumer by creating a class to do the JMS setup and shutdown work and then delegate the message processing to a MessageListener
object.
public class TickerConsumer { private final String connectionUri = "tcp://localhost:61616"; private ActiveMQConnectionFactory connectionFactory; private Connection connection; private Session session; private Destination destination; public void before() throws Exception { connectionFactory = new ActiveMQConnectionFactory(connectionUri); connection = connectionFactory.createConnection(); connection.start(); session = connection.createSession( false, Session.AUTO_ACKNOWLEDGE); destination = session.createTopic("EVENTS.QUOTES"); } public void run() throws Exception { MessageConsumer consumer = session.createConsumer(destination); consumer.setMessageListener(new EventListener()); TimeUnit.MINUTES.sleep(5); connection.stop(); consumer.close(); } }
The MessageListener
code is also just like our previous recipe's MessageListener
; we only change what information is gathered from the message we receive. Notice that we use the simple message type as opposed to TextMessage
since our stock pricing data is stored in the message's properties, we don't need a message body here:
public class EventListener implements MessageListener { public void onMessage(Message message) { try { float price = message.getFloatProperty("price"); String symbol = message.getStringProperty("symbol"); } catch (Exception e) { } } }
While it's true that the default behavior of a JMS topic is to not retain any message that is sent to it when there are no active consumers, there are cases where you might want to have a consumer go offline and, when they come back online, receive any messages that were sent while they were offline. The JMS specification provides for this by defining a type of topic subscription known as a durable subscription.
To create a durable subscription your code must call the createDurableSubscriber()
method of a Session
instance as well as give your connection a unique name that your application must reuse each time it runs. Let's take a look at the changes we would need to make to our stock price consumer to make it a durable subscriber:
public void before() throws Exception { connectionFactory = new ActiveMQConnectionFactory(connectionUri); connection = connectionFactory.createConnection(); connection.setClientId("PriceConsumer"); connection.start(); } public void run() throws Exception { MessageConsumer consumer = session.createDurableSubscription( destination, "DurableConsumer"); }
The changes are quite minor, we simply set a client ID for our connection in the before()
method and called durable subscription to create a method in the run()
method. Because a durable subscription doesn't exist until it's created once, we won't see any messages if we run the consumer just once after running the stock price update producer. However, if we run the consumer once, then run the producer and the consumer a second time afterwards, we will receive all the stock price updates sent while it was offline.
There are several limitations that you should keep in mind when using durable subscriptions, let's take a brief look at them.
Each JMS connection that is used to create a durable subscription must be assigned a unique client ID in order for the provider to identify the client each time its durable subscription is activated. Additionally the subscription needs its own ID value as well.
The client must create the durable subscription at least once before the JMS provider will begin storing messages for that subscription.
Only one JMS connection can be active at any given time for the same client ID and subscription ID meaning no load balancing is possible for multiple consumers on the same logical subscription.
These limitations make using durable subscription much more complicated than using a queue when message durability is a must. ActiveMQ provides a mechanism to help mitigate the limitations of durable subscriptions known as Virtual Destinations, which you should investigate any time you start to think that a durable subscription sounds like it might fit your use case. You can read about Virtual Destinations at http://activemq.apache.org/virtual-destinations.html.
In a later recipe, we will look at how we can use ActiveMQ's Virtual Destinations feature to overcome the limitations of JMS durable topic subscriptions in our applications.