ZeroMQ Pub/Sub Hello World
Published: June 28, 2017
It's been a while since I posted some new content on here. This is mostly because I've been quite busy trying to be an academic at Purdue. This blog will get an update soon with a few publications!
Anyways, I've been working a LOT with C++ recently, side-by-side with (and guided by) my labmates Tomo and Stephen. (Expect a post about the perfect emacs setup for C++ development soon, but here's a hint: it involves the usual suspects rtags and ycmd) As part of our project, I am currently surveying a few methods for inter-thread and inter-process communication, which seems to be what ZeroMQ was created for.
After reading through a few chapters of the intro, I managed to piece together a very basic inter-thread pub-sub example, which simply publishes a few messages on one thread and consumes them on another. ZeroMQ seems like it's pretty powerful software and I hope that I get to play with it as part of our project a little more.
Here's what I came up with, compile as follows: (I do -O0
and -g
of course
only when learning a new library)
c++ -c -Wall -O0 -g -fPIC -std=gnu++14 -o zmq_test zmq_test.cc
#include <iostream> #include <sstream> #include <string.h> #include <string> #include <thread> #include <zmq.hpp> int main(int argc, char *argv[]) { // "You should create and use exactly one context in your process." zmq::context_t context(0); // the main thread runs the publisher and sends messages periodically zmq::socket_t publisher(context, ZMQ_PUB); // for the inproc transport, we MUST bind on the publisher before we can // connect any subscribers std::string transport("inproc://mytransport"); publisher.bind(transport); // in a seperate thread, poll the socket until a message is ready. when a // message is ready, receive it, and print it out. then, start over. // // 1. create the subscriber socket zmq::socket_t subscriber(context, ZMQ_SUB); // 2. we need to connect to the transport subscriber.connect(transport); // 3. set the socket options such that we receive all messages. we can set // filters here. this "filter" ("" and 0) subscribes to all messages. subscriber.setsockopt(ZMQ_SUBSCRIBE, "", 0); size_t nmsg = 10; size_t nrx = 0; // to use zmq_poll correctly, we construct this vector of pollitems std::vector<zmq::pollitem_t> p = {{subscriber, 0, ZMQ_POLLIN, 0}}; // the subscriber thread runs until it received all of the messages std::thread subs_thread([&subscriber, &p, &nrx, &nmsg]() { while (true) { zmq::message_t rx_msg; // when timeout (the third argument here) is -1, // then block until ready to receive zmq::poll(p.data(), 1, -1); if (p[0].revents & ZMQ_POLLIN) { // received something on the first (only) socket subscriber.recv(&rx_msg); std::string rx_str; rx_str.assign(static_cast<char *>(rx_msg.data()), rx_msg.size()); std::cout << "Received: " << rx_str << std::endl; if (++nrx == nmsg) break; } } }); // let's publish a few "Hello" messages for (size_t i = 0; i < nmsg; ++i) { // create a message std::stringstream s; s << "Hello " << i; auto msg = s.str(); zmq::message_t message(msg.length()); memcpy(message.data(), msg.c_str(), msg.length()); publisher.send(message); } subs_thread.join(); return 0; }
Diving in to C++ has been a lot of fun, and I have a lot more stuff to share, so stay tuned!