We can start writing some code after our introduction to message queuing and ZeroMQ and of course we will start with the famous "hello world" program.
Let's consider a scenario where we have a server and a client. The server replies world
whenever it receives a hello
message from the clients. The server runs on port 4040
and clients send messages to port 4040
.
The following is the server code, which sends the world
message to clients:
#include <string.h> #include <stdio.h> #include <unistd.h> #include "zmq.h" int main (int argc, char const *argv[]) { void* context = zmq_ctx_new(); void* respond = zmq_socket(context, ZMQ_REP); zmq_bind(respond, "tcp://*:4040"); printf("Starting…\n"); for(;;) { zmq_msg_t request; zmq_msg_init(&request); zmq_msg_recv(&request, respond, 0); printf("Received: hello\n"); zmq_msg_close(&request); sleep(1); // sleep one second zmq_msg_t reply; zmq_msg_init_size(&reply, strlen("world")); memcpy(zmq_msg_data(&reply), "world", 5); zmq_msg_send(&reply, respond, 0); zmq_msg_close(&reply); } zmq_close(respond); zmq_ctx_destroy(context); return 0; }
Tip
Downloading the example code
You can download the example code files for all Packt books you have purchased from your account at http://www.PacktPub.com. If you purchased this book elsewhere, you can visit http://www.PacktPub.com/support and register to have the files e-mailed directly to you.
The following is the client code that sends the hello
message to the server:
#include <string.h> #include <stdio.h> #include <unistd.h> #include "zmq.h" int main (int argc, char const *argv[]) { void* context = zmq_ctx_new(); printf("Client Starting….\n"); void* request = zmq_socket(context, ZMQ_REQ); zmq_connect(request, "tcp://localhost:4040"); int count = 0; for(;;) { zmq_msg_t req; zmq_msg_init_size(&req, strlen("hello")); memcpy(zmq_msg_data(&req), "hello", 5); printf("Sending: hello - %d\n", count); zmq_msg_send(&req, request, 0); zmq_msg_close(&req); zmq_msg_t reply; zmq_msg_init(&reply); zmq_msg_recv(&reply, request, 0); printf("Received: hello - %d\n", count); zmq_msg_close(&reply); count++; } // We never get here though. zmq_close(request); zmq_ctx_destroy(context); return 0; }
Note
Please note that the examples in this book are written for ZeroMQ 3.2. Bear in mind that some examples may not work properly when using ZeroMQ Version 2.2 or older. Methods that were deprecated in 2.x were removed in 3.x. Some methods have been deprecated from those versions.
We have our first basic request-reply architecture, as shown in the following diagram:
Let's have a closer look at the code to understand how it works.
First we create a context and a socket. The zmq_ctx_new()
method creates a new context. It is thread safe, so one context can be used from multiple threads.
zmq_socket(2)
creates a new socket in the defined context. ZeroMQ sockets are not thread safe, so it should be used only by the thread where it was created. Traditional sockets are synchronous whereas ZeroMQ sockets have a queue on the client side and another on the server side for managing the request-reply pattern asynchronously. ZeroMQ automatically arranges setting up the connection, reconnecting, disconnecting, and content delivery. We will cover the difference between traditional sockets and ZeroMQ sockets in depth in Chapter 3, Using Socket Topology.
The server binds the ZMQ_REP
socket to port 4040
and starts waiting for requests and replies back whenever it receives a message.
This basic "hello world" example introduces us to our first pattern, the request-reply pattern.
We use the request-reply pattern to send messages from a client to one or multiple services and receive a reply for each message sent. This is most likely the easiest way to use ZeroMQ. The replies to the requests have to be strictly in order.
The following is the reply part of the request-reply pattern:
void* context = zmq_ctx_new();
void* respond = zmq_socket(context, ZMQ_REP);
zmq_bind(respond, "tcp://*:4040");
A server uses the ZMQ_REP
socket to receive messages from and send replies to the clients. If the connection between a client and the server is lost then the replied message is thrown away without any notice. The incoming routing strategy of ZMQ_REP
is fair-queue and the outgoing strategy is last-peer.
This book is all about queues. You may wonder what we mean when we refer to a fair-queue strategy. It is a scheduling algorithm and allocates the resources fairly by its definition.
To understand how it works, let's say that the Flows in the preceding figure send 16, 2, 6, and 8 packets/second respectively, but the output can handle only 12 packets per second. In this case we could transmit 4 packets/second, but Flow 2 transmits only 2 packets/second. The rule of fair-queue is that there should not be any idle output unless all inputs are idle. Thus, we could allow Flow 2 to transmit its 2 packets/second and share the remaining 10 packets between the rest of the Flows.
This is the incoming routing strategy used by ZMQ_REP
. The round-robin scheduling is the simplest way of implementing the fair-queue strategy, which is used by ZeroMQ as well.
The following is the request part of the request-reply pattern:
void* context = zmq_ctx_new();
printf("Client Starting….\n");
void* request = zmq_socket(context, ZMQ_REQ);
zmq_connect(request, "tcp://localhost:4040");
A client uses ZMQ_REQ
for sending messages to and receiving replies from a server. All messages are sent with the round-robin routing strategy. The incoming routing strategy is last-peer.
ZMQ_REQ
does not throw away any messages. If there are no available services to send the message or if the all services are busy, all send operations—zmq_send(3)
—are blocked until a service becomes available to send the message.
ZMQ_REQ
is compatible with the ZMQ_REP
and ZMQ_ROUTER
types. We will cover ZMQ_ROUTER
in Chapter 4, Advanced Patterns.
This part combines the request and reply sections and shows how to request a message from somewhere and how to respond to them.
printf("Sending: hello - %d\n", count);
zmq_msg_send(&req, request, 0);
zmq_msg_close(&req);
The client sends the message to the server using zmq_msg_send(3)
. It queues the message and sends it to the socket.
int zmq_send_msg(zmq_msg_t *msg, void *socket, int flags)
zmq_msg_send
takes three parameters, namely, message, socket, and flags.
The message parameter is nullified during the request, so if you want to send the message to multiple sockets you need to copy it.
A successful
zmq_msg_send()
request does not point out if the message has been sent over the network.The flags parameter is either
ZMQ_DONTWAIT
orZMQ_SNDMORE
.ZMQ_DONTWAIT
indicates that the message should be sent asynchronously.ZMQ_SNDMORE
indicates that the message is a multipart message and the rest of the parts of the message are on the way.
After sending the message, the client waits to receive a response. This is done by using zmq_msg_recv(3)
.
zmq_msg_recv(&reply, request, 0);
printf("Received: hello - %d\n", count);
zmq_msg_close(&reply);
zmq_msg_recv(3)
receives a part of the message from the socket, as specified in the socket parameter, and stores the reply in the message parameter.
int zmq_msg_recv (zmq_msg_t *msg, void *socket, int flags)
zmq_msg_recv
takes three parameters, namely, message, socket, and flags.
The previously received message (if any) is nullified
The flags parameter could be
ZMQ_DONTWAIT
, which indicates that the operation should be done asynchronously