Book Image

HornetQ Messaging Developer's Guide

By : Piero Giacomelli
Book Image

HornetQ Messaging Developer's Guide

By: Piero Giacomelli

Overview of this book

<p>Messages and information can be exchanged at exponential speed with JBoss HornetQ asynchronous messaging middleware. Learn how to use the JAVA open source Message Oriented Framework, to build a high-performance, multi-protocol, embeddable, clustered system and manage millions of messages per second.<br /><br />In the HornetQ Messaging Developer’s Guide you will find the most common applications of a message exchanger with example code, as part of real-world scenarios. This practical and applicable guide increases reader knowledge chapter by chapter, covering basics to the most advanced features.<br /><br />You will start from a clean installation of a HornetQ sever and, having progressively become a HornetQ master, will finish by being able to use the framework embedded in your software and sharing information in a cluster environment.<br /><br />Starting from writing and reading a single message, we will discover more advanced features like managing queues, clustering the server, and controlling the undelivered messages. The book deals with a real-world advanced medical scenario as the main example that will lead you from learning the basics to the advanced features of HornetQ.</p>
Table of Contents (18 chapters)
HornetQ Messaging Developer's Guide
Credits
About the Author
About the Reviewers
www.PacktPub.com
Preface
Index

Coding our first example


Now that we are ready for the coding phase, as mentioned previously, we only need to implement the various methods that we have listed. Moving from this, we need to code the following steps:

  1. Adding class fields.

  2. Initializing the HornetQ environment.

  3. Creating a connection, session, and looking for a JMS queue.

  4. Creating a message and sending it to the queue.

  5. Receiving the message sent in the previous step.

  6. Saving to MongoDB.

  7. Closing all connections and freeing up resources.

Every time you need to access a HornetQ queue, you need to create a connection as for a database, and create a session. If you access a queue as a message producer or as a message consumer, you need to create a connection and a session in it. All the configuration options that are needed to identify the queue or manage the connections will be described in Chapter 2, Setting Up HornetQ. For the moment, we will only describe how to create the connection and session. In our example class, both the session and connection are shared between the message producer and message consumer, so we will instantiate them only once. But in most of the cases, if you decide to write separate classes that access the queue from the consumer or the producer layer, you have to configure and open the session in both, separately.

Class fields

In our example, we will avoid the import directive and use objects identified by their fully qualified namespace, so that the reader can see where the objects are located in the namespace. The import statements do not affect runtime performance even if they affect compile-time ones, but using a wildcard, in our opinion, affects core readability.

So let us start with the fields needed through the various methods:

	javax.naming.Context ic = null;
	javax.jms.ConnectionFactory cf = null;
	javax.jms.Connection connection =  null;
	javax.jms.Queue queue = null;
	javax.jms.Session session = null;
	
	com.mongodb.Mongo m;
	com.mongodb.DB db;
	
	String destinationName = "queue/DLQ";

Note

Downloading the example code

You can download the example code files for all Packt books you have purchased from your account at http://www.PacktPub.com. If you purchased this book elsewhere, you can visit http://www.PacktPub.com/support and register to have the files e-mailed directly to you.

As you can see, we need to have a Context object that is responsible for mapping names to the objects, because the discovery of the queue name and other configuration options is done using JNDI in a .xml configuration file.

We also have a ConnectionFactory object that is able to create multiple connections to HornetQ.

The connection is an object that will be used from the message producer layer and message consumer layer to send/receive messages.

The queue is an object that maps the queue defined in HornetQ configuration files. The DLQ queue is a particular one that we will use in this example, only to simplify the code. We will see how to create our own queues in the following chapters.

Finally, we have the Mongo and db objects that are responsible for connecting and storing data in the MongoDB database.

The destinationName string variable stores the queue name. The name should be exactly the one above otherwise our example will not work.

You are probably asking yourself where all these configuration parameters are stored. We will describe them in depth in Chapter 2, Setting Up HornetQ; for now, all we can say is that you can mimic JBoss configuration files. Also, HornetQ comes with a set of XML-based configuration files that are stored in the HORNETQ_ROOT\config folder.

Initializing the context

Now, we are ready to code the GetInitialContext method. This method is implemented in the following way:

java.util.Properties p = new java.util.Properties();
	
p.put(javax.naming.Context.INITIAL_CONTEXT_FACTORY,
"org.jnp.interfaces.NamingContextFactory");

p.put(javax.naming.Context.URL_PKG_PREFIXES,
"org.jboss.naming:org.jnp.interfaces");
	
p.put(javax.naming.Context.PROVIDER_URL, "jnp://localhost:1099");
	
ic = new javax.naming.InitialContext(p);

We instantiate a Properties object to store three key/value properties that are needed by Context to identify where to find the .xml files that contain the information for the HornetQ instance that we are going to use. The most significant file, for the moment, is the third one that tells us where the HornetQ server is running. By default, when it is started without other configuration issues, the HornetQ server is running on localhost and starts a Java naming provider on port 1099. As you can see these values (hostname and port) are mapped as the value for the provider_url object.

When InitialContext is created as an object, it can throw a runtime javax.naming.NamingException, which should be managed in case something goes wrong; for example, the name/IP address of the Java naming provider server could be wrong or the port may not be accessible from other machines due to the firewall configuration. In our case, everything should work fine considering that we are running everything from the same machine.

Now that we have our InitialContext object correctly assigned and created, we are ready to open the real connection to the HornetQ server.

Creating and opening the connection

This is done by the connectAndCreateSession method. The following code shows how to arrange the connection:

cf = (javax.jms.ConnectionFactory)ic.lookup("/ConnectionFactory");
queue = (javax.jms.Queue)ic.lookup(destinationName);
connection = cf.createConnection();
session = connection.createSession(false, javax.jms.Session.AUTO_ACKNOWLEDGE);
connection.start();

First, we use a lookup on the context that is mapped to the hornetq-jms.xml file inside the HORNETQ_ROOT\config folder containing the configuration properties to identify the entry/ConnectionFactory. How JNDI queries the naming context to find the value is out of the scope of this book, but from a higher level of abstraction, it uses a tree directory-like structure. The ConnectionFactory object uses Netty, an asynchronous, event-driven application framework that supports lots of protocols, such as FTP , SMTP, HTTP, and SSL.

After the ConnectionFactory object has been created, we can also create the queue to store the messages. Using a lookup, we search for the queue whose name is queue/DLQ as defined in the class fields. It is important to notice that we use such a name because it was one of the default queues created by the standalone HornetQ server. If you take a look at the HornetQ default log stored in the HORNETQ_ROOT\logs folder, created when launching the run.bat script, you should see some lines like the following ones:

[main] 06:51:25,013 INFO trying to deploy queue jms.queue.DLQ
[main] 06:51:25,053 INFO trying to deploy queue jms.queue.ExpiryQueue

So the server that you started created two queues—one named DLQ and the other one named ExpiryQueue.

Now, you are ready to create the connection by calling the createConnection() method of ConnectionFactory. Before opening the connection, we will create a single session on it. Creating the session is pretty easy, but we need to describe the two parameters needed. The first one is a boolean value that tells if the session is transacted or not. This means that if you choose the session to be transacted, the delivery of the messages within that specific transaction will be made once you call the Commit() method of that session; so, this could potentially cause a significant overhead on the HornetQ server. In this case, we choose "not transacted" messages for easiness. The second parameter tells us how the messages are delivered in that session.

The possible choices are:

  • Client_Acknowledge mode: This is not a feasible option (when you have the freedom to choose from the other two options) since the JMS server cannot send subsequent messages till it receives an acknowledgement from the client.

  • Auto_Acknowledge mode: This mode follows the policy of delivering the message once and only once, but it incurs an overhead on the server to maintain this policy.

  • Dups_Ok_Acknowledge: This mode has a different policy of sending the message more than once, thereby reducing the overhead on the server (imposed when using the Auto_Acknowledge mode), but imposing an overhead on the network traffic by sending the message more than once. The Auto_Acknowledge mode cleans up resources early from the persistent storage/memory, due to which the overhead is reduced.

  • Session_Transacted mode: This mode should be used when the session is transacted.

In summary, the Client_Acknowledge mode or Dups_Ok_Acknowledge mode give a better performance than the Auto_Acknowledge mode.

Apart from the specific parameter, it is always good to remember that the session is specific to the connection; so once opened and used, it is good practice to close it before closing the connection.

Finally, we are able to start the connection with the specific session inside it. So we are now ready to move on to the production of our first message. ECG observation can be stored using ANSI protocols like HL7. This is too specific to our purpose, so we code a simple string that contains the patient's unique identifier, which was the date when it was recorded and the three ECG signals that were recorded.

For example, propose that we use a string that is like a line of a CSV (Comma Separated Value), so one single measurement will be a string like the following one:

1;02/20/2012 14:01:59.010;1020,1021,1022

The string is pretty self explanatory, so we move on to the createMessage method.

Producing the message

We are now ready to create our first JMS message and store it on queue/DLQ.

String theECG = "1;02/20/2012 14:01:59.010;1020,1021,1022";
javax.jms.MessageProducer publisher = session.createProducer(queue);
javax.jms.TextMessage message = 
session.createTextMessage(theECG);
publisher.send(message);
System.out.println("Message sent!");
publisher.close();

Once we have instantiated the queue and the session, we create a MessageProducer object, and using a TextMessage object, we send it to the chosen queue. The pushing of the object on the "queue.DLQ" is done using the send method on the MessageProducer object. We recall for the moment that, following the JMS specifications, the message producer object is equipped with the following four main properties:

  • Destination: This is a JMS-administrated object

  • deliveryMode: This specifies how the message should be delivered

  • Priority: This specifies what the priority of the message is

  • timeToLive: This specifies how many milliseconds the message should be alive for, in the queue

At the moment, we won't specify any of the parameters; we will manage that when dealing with the expiration of messages in Chapter 5, Controlling Message Flow, but such parameters greatly impact the performance, so they need to be carefully configured in a high-performance environment.

As good programming practice, close the messageProducer object once it has done its job.

We are now ready to have a first run of our code. If everything is ok, you should see on the Console tab of the Eclipse IDE what is shown in the following screenshot:

So your first program, publishing a JMS Message on HornetQ, has been successfully coded! But there are two more steps to be implemented to complete the example. First, we need to read the message just stored on the queue, retrieve it, and save it to the MongoDB database. In the end, we will close all the resources involved in the correct way.

Consuming the message

Let us now start by introducing the code for the consumeMessage method. Inside this, we will call a private method called storeIntoMongo to perform the commit operation to MongoDB. How MongoDB manages and stores the document using the NoSQL paradigm is an extensive task, which does not affect this book. But we would like to demonstrate how to store information to a final destination. The reader should think that the consumer of the messages of a HornetQ server should do some other tasks apart from reading the messages. The obvious one would be to store the message for a second-stage analysis, such as storing log messages for parsing and using historical data.

Note

For the interested reader, I would like to suggest to you, only as an exercise, to think of how to code the message storing using your preferred RDBMS, or even modify the method we implemented to store the messages on a text file.

Before doing this exercise, you should take a look at the following consumeMessage code:

javax.jms.MessageConsumer messageConsumer = session.createConsumer(queue);
javax.jms.TextMessage messageReceived = (TextMessage)messageConsumer.receive(5000);
insertMongo(messageReceived);
System.out.println("Received message: " + messageReceived.getText());
messageConsumer.close();

As you can see, the code is somewhat similar to the produceMessage method. On the same session object and the same queue that we used to produce the message, we consume it. This time, we create a MessageConsumer object. The messageConsumer object can receive messages in a synchronous or asynchronous way, the main difference being that the synchronous way can be obtained by simply calling the receive method; while in the asynchronous way, when a new message is received, an event is triggered. So with asynchronous message consumption, we need to register with the messageConsumer object a new MessageListener event whose onMessage() method will be called. In this case, we have implemented a synchronous receive() method using a long number that gives the time in milliseconds. In this case, messageConsumer receives the next message in a time frame of 5 seconds. Once we have the TextMessage message, we pass it to the method that will insert it into the MongoDB database, print the message to the console, and close the messageConsumer object.

As we said before, the messageConsumer object can receive the messages in a synchronous/asynchronous way. In both the cases, however, when you call the close method, the action is blocked until a receive method or message listener is in progress.

Connecting to MongoDB

As the last part of our example program, we will show you how to get the messageConsumer text message and store it in MongoDB. Again, we will not focus on the details too much, as managing push storage into a NoSQL database is a little bit different from a common insert statement in a classical RDBMS and requires different concepts. We won't be looking at this in much depth, but I would like to show you how things work.

The code of the insertMongo method is as follows:

try {
		m = new com.mongodb.Mongo();
		db = m.getDB( "hornetqdb" );
	} catch (UnknownHostException | MongoException e) {
		e.printStackTrace();
	}
		
	com.mongodb.DBCollection coll = db.getCollection("testCollection");
	com.mongodb.BasicDBObject doc = new com.mongodb.BasicDBObject();

   doc.put("name", "MongoDB");
   doc.put("type", "database");

   com.mongodb.BasicDBObject info = new com.mongodb.BasicDBObject();

   info.put("textmessage", messageReceived.getText());

The first try/catch block creates a new instance of MongoDB and connects to the hornetqdb database. If the database does not exist the first time you launch the program, MongoDB will create it for you. Remember that if, at this time of your test, you have closed the command prompt that launched the MongoDB database, simply go into the main MongoDB root folder inside the bin folder, and relaunch it.

Storing to MongoDB

The next step is to create a document collection that somehow is like a schema on a RDBMS database, declare it as a database, and put it inside a document named textmessage, whose value is the string we received from the message consumer, using the getText method of the messageReceived object. If you have your MongoDB window open, you should see a text message similar to the following:

Mon Feb 20 14:51:24 [initandlisten] connection accepted from 127.0.0.1:49867 #2

Mon Feb 20 14:51:24 [conn2] end connection 127.0.0.1:49867

This tells you that you were able to successfully store your message in MongoDB.

Closing everything

This first example is nearly over. Consider that the message producer and message consumer layer insist on the same connection, and on the same session. So, it is important to close them to free up the resources. The garbage collector is not in charge of closing unreferenced objects. Here is the closing connection method that follows the best practices in closing the connection and session associated to a JMS queue:

if (session != null ) {
		try {
			session.close();
		} catch (JMSException e) {
			e.printStackTrace();
		}
	}
			
	if (connection != null) {
		try {
			connection.close();
		} catch (JMSException e) {
e.printStackTrace();
		}
	}

The first step tries to close the session object, and if it succeeds, closes the associated connection. In this case, the connection has been closed in the simplest way, but we have to observe that if we deal with a transactional message producer, then a better way to deal with the closing would be to commit all the transactions that are still pending, or your code will be exposed to the problem of losing messages from the production layer. Similarly, if you use an asynchronous message consumer, you should make sure to close all the connections and sessions while you don't have messages in transaction.

We will cover more advanced features such as this in Chapter 5, Some More Advance Features of HornetQ. Let's recap what we have covered:

  • First of all, we set up a fully working environment to code a simple JMS example using HornetQ

  • For the moment, we chose a standalone, non-clustered server for our developing purpose, and we have seen how to launch it

  • As an optional task, we will also install and configure a single instance of the MongoDB NoSQL database

  • Using Eclipse, we coded a class for connecting to the default queue launched by HornetQ, and for creating a session on it

  • Within the session, we coded a simple JMS messageProducer object to send a composite string to the default queue

  • From the same session object, we instantiated a synchronous messageConsumer object that reads the message stored and passes it for storing to the MongoDB instance we created