Book Image

RabbitMQ Essentials - Second Edition

By : Lovisa Johansson, David Dossot
Book Image

RabbitMQ Essentials - Second Edition

By: Lovisa Johansson, David Dossot

Overview of this book

RabbitMQ is an open source message queuing software that acts as a message broker using the Advanced Message Queuing Protocol (AMQP). This book will help you to get to grips with RabbitMQ to build your own applications with a message queue architecture. You’ll learn from the experts from CloudAMQP as they share what they've learned while managing the largest fleet of RabbitMQ clusters in the world. Following the case study of Complete Car, you’ll discover how you can use RabbitMQ to provide exceptional customer service and user experience, and see how a message queue architecture makes it easy to upgrade the app and add features as the company grows. From implementing simple synchronous operations through to advanced message routing and tracking, you’ll explore how RabbitMQ streamlines scalable operations for fast distribution. This book will help you understand the advantages of message queue architecture, including application scalability, resource efficiency, and user reliability. Finally, you’ll learn best practices for working with RabbitMQ and be able to use this book as a reference guide for your future app development projects. By the end of this book, you’ll have learned how to use message queuing software to streamline the development of your distributed and scalable applications.
Table of Contents (8 chapters)

Adding topic messages

CC's application allows its taxis to organize themselves into groups by registering their topics of interest. The new feature to roll out will allow clients to send an order request to all taxis within a particular topic. It turns out that this feature matches a specific exchange routing rule, not surprisingly called topic! This type of exchange allows us to route the message to all the queues that have been bound with a routing key matching the routing key of the message. So, unlike the direct exchange that routes a message to one queue maximum, the topic exchange can route it to multiple queues. Two other examples of where topic-based routing could be applied are to location-specific data, such as traffic warning broadcasts, or to trip price updates.

A routing pattern consists of several words separated by dots. A best practice to follow is to structure routing keys from the most general element to the most specific one, such as news.economy.usa or europe.sweden.stockholm.
The topic exchange supports strict routing key matching and will also perform wildcard matching using * and # as placeholders for exactly one word and zero or more words, respectively.

The following diagram illustrates how the topic exchange will be used in CC's application. Notice how the single inbox queue remains unchanged and simply gets connected to the topic exchange via extra bindings, each of them reflecting the interest of a user:

Fig 2.17: The topic exchange sending thematic messages to eco queues

Because the same inbox is used for everything, the code that's already in place for fetching messages doesn't need to be changed. In fact, this whole feature can be implemented with only a few additions. The first of these additions takes care of declaring the topic exchange in the existing on_start method, as follows:

def on_start(channel)
# Declare and return the topic exchange, taxi-topic
channel.topic("taxi-topic", durable: true, auto_delete: true)
end

There's nothing really new or fancy here; the main difference is that this exchange is called taxi-topic and is a topic type of exchange. Sending a message is even simpler than with the client-to-taxi feature because there is no attempt to create the addressee's queue. It wouldn't make sense for the sender to iterate through all the users to create and bind their queues, as only users already subscribed to the target topic at the time of sending will get the message, which is exactly the expected functionality. The order_taxi method is listed here:

# Publishing an order to the exchange
def order_taxi(type, exchange)
payload = "example-message"
message_id = rand
exchange.publish(payload,
routing_key: type,
content_type: "application/json",
content_encoding: "UTF-8",
persistent: true,
message_id: message_id)
end

exchange = on_start(channel)
# Order will go to any eco taxi
order_taxi('taxi.eco', exchange)
# Order will go to any eco taxi
order_taxi('taxi.eco', exchange)
# Order will go to any taxi
order_taxi('taxi', exchange)
# Order will go to any taxi
order_taxi('taxi', exchange)

The difference is that, now, messages are published to the taxi-topic exchange. The rest of the code that creates and publishes the message is exactly the same as the client-to-taxi messaging. Lastly, information needs to be added when a new taxi subscribes or unsubscribes from certain topics:

# example_consumer.rb

def taxi_topic_subscribe(channel, taxi, type)
# Declare a queue for a given taxi
queue = channel.queue(taxi, durable: true)

# Declare a topic exchange
exchange = channel.topic('taxi-topic', durable: true, auto_delete: true)

# Bind the queue to the exchange
queue.bind(exchange, routing_key: type)

# Bind the queue to the exchange to make sure the taxi will get any order
queue.bind(exchange, routing_key: 'taxi')

# Subscribe from the queue
queue.subscribe(block:true,manual_ack: false) do |delivery_info, properties, payload|
process_order(payload)
end
end

taxi = "taxi.3"
taxi_topic_subscribe(channel, taxi, "taxi.eco.3")

taxi.3 is the new environmentally friendly taxi, now ready to receive orders from clients that want an environmentally friendly car.

The AMQP specification does not provide any means to introspect the current bindings of a queue, so it is not possible to iterate them and remove the ones not needed anymore in order to reflect a change in a taxi's topics of interest. This is not a terrible concern because the application is required to maintain this state anyway.

The RabbitMQ management console exposes a REST API that can be used to perform queue binding introspection, among many other features not covered by the AMQP specification. More about that in upcoming chapters.

With this new code in place, everything works as expected. No code changes are needed to retrieve the new client-to-taxi orders because they arrive in the same inbox queue as the previous messages. Topical messages are sent and received correctly by the taxi cars, and all this happens with a minimal change and no increase in the number of queues. When connected to the management console, click on the Exchanges tab; the only visible difference is the new exchange topic; that is, taxi-topic.