Book Image

RabbitMQ Cookbook

Book Image

RabbitMQ Cookbook

Overview of this book

RabbitMQ is an open source message broker software (sometimes called message-oriented middleware) that implements the Advanced Message Queuing Protocol (AMQP). The RabbitMQ server is written in the Erlang programming language and is built on the Open Telecom Platform framework for clustering and failover. Messaging enables software applications to connect and scale. Applications can connect to each other as components of a larger application or to user devices and data. RabbitMQ Cookbook touches on all the aspects of RabbitMQ messaging. You will learn how to use this enabling technology for the solution of highly scalable problems dictated by the dynamic requirements of Web and mobile architectures, based for example on cloud computing platforms. This is a practical guide with several examples that will help you to understand the usefulness and the power of RabbitMQ. This book helps you learn the basic functionalities of RabbitMQ with simple examples which describe the use of RabbitMQ client APIs and how a RabbitMQ server works. You will find examples of RabbitMQ deployed in real-life use-cases, where its functionalities will be exploited combined with other technologies. This book helps you understand the advanced features of RabbitMQ that are useful for even the most demanding programmer. Over the course of the book, you will learn about the usage of basic AMQP functionalities and use RabbitMQ to let decoupled applications exchange messages as per enterprise integration applications. The same building blocks are used to implement the architecture of highly scalable applications like today's social networks, and they are presented in the book with some examples. You will also learn how to extend RabbitMQ functionalities by implementing Erlang plugins. This book combines information with detailed examples coupled with screenshots and diagrams to help you create a messaging application with ease.
Table of Contents (19 chapters)
RabbitMQ Cookbook
Credits
About the Authors
About the Reviewers
www.PacktPub.com
Preface
Index

Using RPC with messaging


Remote Procedure Calls (RPC) are commonly used with client-server architectures. The client is required to perform some actions to the server, and then waits for the server reply.

The messaging paradigm tries to enforce a totally different approach with the fire-and-forget messaging style, but it is possible to use properly designed AMQP queues to perform and enhance RPC, as shown in the following figure:

Graphically it is depicted that the request queue is associated with the responder, the reply queues with the callers.

However, when we use RabbitMQ, all the involved peers (both the callers and the responders) are AMQP clients.

We are now going to describe the steps performed in the example in Chapter01/Recipe05/Java_5/src/rmqexample/rpc.

Getting ready

To use this recipe we need to set up the Java development environment as indicated in the Introduction section.

How to do it…

Let's perform the following steps to implement the RPC responder:

  1. Declare the request queue where the responder will be waiting for the RPC requests:

    channel.queueDeclare(requestQueue, false, false, false, null);
  2. Define our specialized consumer RpcResponderConsumer by overriding DefaultConsumer.handleDelivery() as already seen in the Consuming messages recipe. On the reception of each RPC request, this consumer will:

    • Perform the action required in the RPC request

    • Prepare the reply message

    • Set the correlation ID in the reply properties by using the following code:

        BasicProperties replyProperties = new BasicProperties.Builder().correlationId(properties.getCorrelationId()).build();
    • Publish the answer on the reply queue:

        getChannel().basicPublish("", properties.getReplyTo(),replyProperties, reply.getBytes());
      
    • Send the ack to the RPC request:

        getChannel().basicAck(envelope.getDeliveryTag(), false);
  3. Start consuming messages, until we stop it as already seen in the Consuming messages recipe.

Now let's perform the following steps to implement the RPC caller:

  1. Declare the request queue where the responder will be waiting for the RPC requests:

    channel.queueDeclare(requestQueue, false, false, false, null);
    
  2. Create a temporary, private, autodelete reply queue:

    String replyQueue = channel.queueDeclare().getQueue();
  3. Define our specialized consumer RpcCallerConsumer, which will take care of receiving and handling RPC replies. It will:

    • Allow to specify what to do when it gets the replies (in our example, by defining AddAction())

    • Override handleDelivery():

          public void handleDelivery(String consumerTag, 
          Envelope envelope,
          AMQP.BasicProperties properties, 
          byte[] body) throws java.io.IOException {
        
          String messageIdentifier = properties.getCorrelationId();
          String action = actions.get(messageIdentifier);
          actions.remove(messageIdentifier);
      
          String response = new String(body);
          OnReply(action, response);
        }
  4. Start consuming reply messages invoking channel.basicConsume().

  5. Prepare and serialize the requests (messageRequest in our example).

  6. Initialize an arbitrary, unique message identifier (messageIdentifier).

  7. Define what to do when the consumer gets the corresponding reply, by binding the action with the messageIdentifier. In our example we do it by calling our custom method consumer.AddAction().

  8. Publish the message to requestqueue, setting its properties:

    BasicProperties props = new BasicProperties.Builder()
    .correlationId(messageIdentifier)
    .replyTo(replyQueue).build();
    channel.basicPublish("", requestQueue, props,messageRequest.getBytes());

How it works…

In this example the RPC responder takes the role of an RPC server; the responder listens on the requestQueue public queue (step 1), where the callers will place their requests.

Each caller, on the other hand, will consume the responder replies on its own private queue, created in step 5.

When the caller sends a message (step 11), it includes two properties: the name of the temporary reply queue (replyTo()) where it will be listening, and a message identifier (correlationId()), needed by the caller to identify the call when the reply comes back.

In fact, in our example we have implemented an asynchronous RPC caller. The action to be performed by the RpcCallerConsumer (step 6) when the reply comes back is recorded by the nonblocking consumer by calling AddAction() (step 10).

Coming back to the responder, the RPC logic is all in the RpcResponderConsumer. This is not different from a specialized non-blocking consumer, as we have seen in the Consuming messages recipe, except for two details:

  • The reply queue name is got by the message properties, properties.getReplyTo(). Its value has been set by the caller to its private, temporary reply queue.

  • The reply message must include in its properties the correlation ID sent in the incoming message.

Tip

The correlation ID is not used by the RPC responder; it is only used to let the caller receiving this message correlate this reply with its corresponding request.

There's more…

In this section we will discuss the use of blocking RPC and some scalability notes.

Using blocking RPC

Sometimes simplicity is more important than scalability. In this case it is possible to use the following helper classes, included in the Java RabbitMQ client library, that implement blocking RPC semantics:

com.rabbitmq.client.RpcClient
com.rabbitmq.client.StringRpcServer

The logic is identical, but there are no non-blocking consumers involved, and the handling of temporary queues and correlation IDs is transparent to the user.

You can find a working example at Chapter01/Recipe05/Java_5/src/rmqexample/simplerpc.

Scalability notes

What happens when there are multiple callers? It mainly works as a standard RPC client/server architecture. But what if we run many responders?

In this case all the responders will take care of consuming messages from the request queue. Furthermore, the responders can be located on different hosts. We have just got load distribution for free. More on this topic is in the recipe Distributing messages to many consumers.