Building on what we learned in the preceding recipe, we will modify our stock price consumer to use JMS selectors to filter the events it receives from the update service.
For this recipe we will use two examples. The first is event-producer
that we already saw in the last recipe, and the second is selective-event-consumer
that implements a simple stock price ticker application that uses JMS selectors to limit which stock price updates it receives.
To run the sample for this recipe you will need to perform the following steps:
Open a terminal and start a broker.
Open a second terminal, change to the directory where the
selective-event-consumer
file is located, and run the consumer by typingmvn compile exec:java
. (You can use Ctrl + C to terminate this application or it will stop on its own after five minutes.)Open a third terminal, change to the directory where
event-producer
is located, and run the producer by typingmvn compile exec:java
.
In the terminal where you started the producer you will see an output like the following code snippet, indicating that the producer is sending events that our consumer is interested in:
Starting example Stock Ticker Producer now... Producer sending price update(0) AAPL: $ (113.95001) GOOG: $ (645.9444) MSFT: $ (514.129) ORCL: $ (469.03384) Producer sending price update(1) AAPL: $ (163.04951) GOOG: $ (560.21594)
And in the terminal window where you started the consumers, you should see that the consumer is receiving the stock price update events our producer is firing, but only the ones with the ticker symbol GOOG
:
Starting example Stock Ticker Consumer now... Price Update: GOOG[$330.88095] Price Update: GOOG[$873.3398] Price Update: GOOG[$423.3238] Price Update: GOOG[$256.38521]
There are times when you want the consumer messages from a topic or queue but you need only a subset of the messages that are being placed into the destination. An example of this would be an application that only wants to process orders for North America, or in the case of our sample application, we only want to read stock price updates for a specific company. To accomplish this we employ the JMS selector functionality in our topic subscription.
A selector is a way of attaching a filter to a given subscription, also known as content-based routing. In JMS, the selector uses message properties and headers compared against Boolean expressions to filter messages. The Boolean expressions are defined using SQL92 syntax. The following table summarizes the selector language used in JMS:
Element |
Valid values |
---|---|
Identifiers |
Property or header field reference (such as JMSCorrelationID, price, and date). The following values are not possible: |
Operators |
|
Literals |
The two Boolean literals, Exact number literals that have no decimal point; for example, +20 and -256, 42. Approximate number literals. These literals can use scientific notation or decimals; for example, -21.4E4, 5E2 and +34.4928. |
The code for the sample stock price consumer application in this recipe is identical to the consumer code from the previous recipe except for a few small changes. Let's take a look at the code and see how we apply a selector to our consumer's subscription:
public class SelectiveTickerConsumer { private final String connectionUri = "tcp://localhost:61616"; private ActiveMQConnectionFactory connectionFactory; private Connection connection; private Session session; private Destination destination; private String selector; public void before() throws Exception { connectionFactory = new ActiveMQConnectionFactory(connectionUri); connection = connectionFactory.createConnection(); connection.start(); session = connection.createSession( false, Session.AUTO_ACKNOWLEDGE); destination = session.createTopic("EVENTS.QUOTES"); selector = System.getProperty( "QuoteSel", "symbol = 'GOOG'"); } public void run() throws Exception { System.out.println(" Running example with selector: " + selector); MessageConsumer consumer = session.createConsumer(destination, selector); consumer.setMessageListener(new EventListener()); TimeUnit.MINUTES.sleep(5); connection.stop(); consumer.close(); } }
As you can see in the sample code, we added a variable named selector
to our consumer application and initialized it by reading a system property named QuoteSel
, which defaults to symbol = 'GOOG'
. We then pass in our set selector value to the Session
object's createConsumer()
method and that's it; the remainder of the code is unchanged.
By default our example stock price consumer will now only receive price updates for the ticker symbol GOOG
. We can alter this by running the consumer application again and specifying a different selector and then running the price update producer application again. Try it yourself by running the demos again but using some different selector values.
To receive updates for both GOOG
and AAPL
run your consumer with:
$mvn exec:java –DQuoteSel="symbol='GOOG' OR symbol='AAPL'
To receiver updates for MSFT
but only when the price is greater than $800 try:
$mvn exec:java –DquoteSel="symbol='MSFT' AND price >= 800"
Practice yourself by coming up with more selectors on your own and rerunning the consumer and producer applications.
The JMS specification covers the various syntax rules governing its selector expression language quite well in the API documentation for the Message
object; you can review the API documentation here: http://docs.oracle.com/javaee/1.4/api/javax/jms/Message.html.
ActiveMQ uses a paging architecture for messages that are stored on the broker. By default only a certain number of pages are brought into memory at any given time for a destination and this can have an impact on your consumers that use message selectors. If the destination contains a large number of messages and the selector doesn't match any of the messages that have been paged in, your consumer can stall until either the messages that are paged in are acknowledged or expired.
This sort of scenario happens most often when your selector is a sparse matching selector meaning it only matches a few messages out of the larger whole. If this happens, you can work around this issue by increasing the page size ActiveMQ uses for a destination. Refer to the documentation on setting destination policies for more information: