Book Image

RabbitMQ Cookbook

Book Image

RabbitMQ Cookbook

Overview of this book

RabbitMQ is an open source message broker software (sometimes called message-oriented middleware) that implements the Advanced Message Queuing Protocol (AMQP). The RabbitMQ server is written in the Erlang programming language and is built on the Open Telecom Platform framework for clustering and failover. Messaging enables software applications to connect and scale. Applications can connect to each other as components of a larger application or to user devices and data. RabbitMQ Cookbook touches on all the aspects of RabbitMQ messaging. You will learn how to use this enabling technology for the solution of highly scalable problems dictated by the dynamic requirements of Web and mobile architectures, based for example on cloud computing platforms. This is a practical guide with several examples that will help you to understand the usefulness and the power of RabbitMQ. This book helps you learn the basic functionalities of RabbitMQ with simple examples which describe the use of RabbitMQ client APIs and how a RabbitMQ server works. You will find examples of RabbitMQ deployed in real-life use-cases, where its functionalities will be exploited combined with other technologies. This book helps you understand the advanced features of RabbitMQ that are useful for even the most demanding programmer. Over the course of the book, you will learn about the usage of basic AMQP functionalities and use RabbitMQ to let decoupled applications exchange messages as per enterprise integration applications. The same building blocks are used to implement the architecture of highly scalable applications like today's social networks, and they are presented in the book with some examples. You will also learn how to extend RabbitMQ functionalities by implementing Erlang plugins. This book combines information with detailed examples coupled with screenshots and diagrams to help you create a messaging application with ease.
Table of Contents (19 chapters)
RabbitMQ Cookbook
Credits
About the Authors
About the Reviewers
www.PacktPub.com
Preface
Index

Consuming messages


In this recipe we are closing the loop; we have already seen how to send messages to RabbitMQ—or to any AMQP broker—and now we are ready to learn how to retrieve them.

You can find the source code of the recipe at Chapter01/Recipe03/src/rmqexample/nonblocking.

Getting ready

To use this recipe we need to set up the Java development environment as indicated in the introduction.

How to do it…

In order to consume the messages sent as seen in the previous recipe, perform the following steps:

  1. Declare the queue where we want to consume the messages from:

    String myQueue="myFirstQueue";
    channel.queueDeclare(myQueue, true, false, false, null);
  2. Define a specialized consumer class inherited from DefaultConsumer:

    public class ActualConsumer extends DefaultConsumer {
      public ActualConsumer(Channel channel) {
        super(channel);
      }
      @Override
      public void handleDelivery(
        String consumerTag, 
        Envelope envelope, 
        BasicProperties properties, 
        byte[] body) throws java.io.IOException {
          String message = new String(body);
          System.out.println("Received: " + message);
        }
    }
  3. Create a consumer object, which is an instance of this class, bound to our channel:

    ActualConsumer consumer = new ActualConsumer(channel);
  4. Start consuming messages:

    String consumerTag = channel.basicConsume(myQueue, true, consumer);
  5. Once done, stop the consumer:

    channel.basicCancel(consumerTag);

How it works…

After we have established the connection and the channel to the AMQP broker as seen in the Connecting to the broker recipe, we need to ensure that the queue from which we are going to consume the messages exists (step 1).

In fact it is possible that the consumer is started before any producer has sent a message to the queue and the queue itself may actually not exist at all. To avoid the failure of the subsequent operations on the queue, we need to declare the queue.

Tip

By allowing both producers and consumers to declare the same queue, we are decoupling their existence; the order in which we start them is not important.

The heart of this recipe is step 2. Here we have defined our specialized consumer that overrides handleDelivery() and instantiated it in step 3. In the Java client API the consumer callbacks are defined by the com.rabbitmq.client.Consumer interface. We have extended our consumer from DefaultConsumer, which provides a no-operation implementation for all the methods declared in the Consumer interface.

In step 3, by calling channel.basicConsume(), we let the consumer threads start consuming messages. The consumers of each channel are always executed on the same thread, independent of the calling one.

Now that we have activated a consumer for myQueue, the Java client library starts getting messages from that queue on the RabbitMQ broker, and invokes handleDelivery() for each one.

Then after the channel.basicConsume() method's invocation, we just sit idle waiting for a key press in the main thread. Messages are being consumed with nonblocking semantics respecting the event-driven paradigm, typical of messaging applications.

Only after we press Enter, the execution proceeds to step 5, cancelling the consumer. At this point the consumer threads stop invoking our consumer object, and we can release the resources and exit.

There's more…

In this section we will learn more about consumer threads and the use of blockage semantics.

More on consumer threads

At connection definition time, the RabbitMQ Java API allocates a thread pool from which it will allocate consumer threads on need.

All the consumers bound to one channel will be executed by one single thread in the pool; however, it is possible that consumers from different channels are handled by the same thread. That's why it is important to avoid long-lasting operations in the consumer methods, in order to avoid blocking other consumers.

It is also possible to handle the consumer thread pool by ourselves, as we have shown in our example; however, this not obligatory at all. We have defined a thread pool, java.util.concurrent.ExecutorService, and passed it at connection time:

ExecutorService eService = Executors.newFixedThreadPool(10);
Connection connection = factory.newConnection(eService);

As we were managing it, we were also in charge of terminating it:

eService.shutdown();

However, remember that if you don't define your own ExecutorService thread pool, the Java client library will create one during connection creation time, and destroy it as soon as we destroy the corresponding connections.

Blocking semantics

It is possible to use blocking semantics too, but we strongly discourage this approach if it's not being used for simple applications and test cases; the recipe to consume messages is non-blocking.

However, you can find the source code for the blocking approach at Chapter01/Recipe03/src/rmqexample/blocking.

See also

You can find all the available methods of the consumer interface in the official Javadoc at

http://www.rabbitmq.com/releases/rabbitmq-java-client/current-javadoc/com/rabbitmq/client/Consumer.html