ZeroMQ Pub/Sub Hello World

It's been a while since I posted some new content on here. This is mostly because I've been quite busy trying to be an academic at Purdue. This blog will get an update soon with a few publications!

Anyways, I've been working a LOT with C++ recently, side-by-side with (and guided by) my labmates Tomo and Stephen. (Expect a post about the perfect emacs setup for C++ development soon, but here's a hint: it involves the usual suspects rtags and ycmd) As part of our project, I am currently surveying a few methods for inter-thread and inter-process communication, which seems to be what ZeroMQ was created for.

After reading through a few chapters of the intro, I managed to piece together a very basic inter-thread pub-sub example, which simply publishes a few messages on one thread and consumes them on another. ZeroMQ seems like it's pretty powerful software and I hope that I get to play with it as part of our project a little more.

Here's what I came up with, compile as follows: (I do -O0 and -g of course only when learning a new library)

c++ -c -Wall -O0 -g -fPIC -std=gnu++14 -o zmq_test zmq_test.cc
#include <iostream>
#include <sstream>
#include <string.h>
#include <string>
#include <thread>
#include <zmq.hpp>

int main(int argc, char *argv[]) {
  // "You should create and use exactly one context in your process."
  zmq::context_t context(0);

  // the main thread runs the publisher and sends messages periodically
  zmq::socket_t publisher(context, ZMQ_PUB);
  // for the inproc transport, we MUST bind on the publisher before we can
  // connect any subscribers
  std::string transport("inproc://mytransport");
  publisher.bind(transport);

  // in a seperate thread, poll the socket until a message is ready. when a
  // message is ready, receive it, and print it out. then, start over.
  //
  // 1. create the subscriber socket
  zmq::socket_t subscriber(context, ZMQ_SUB);
  // 2. we need to connect to the transport
  subscriber.connect(transport);
  // 3. set the socket options such that we receive all messages. we can set
  // filters here. this "filter" ("" and 0) subscribes to all messages.
  subscriber.setsockopt(ZMQ_SUBSCRIBE, "", 0);

  size_t nmsg = 10;
  size_t nrx = 0;

  // to use zmq_poll correctly, we construct this vector of pollitems
  std::vector<zmq::pollitem_t> p = {{subscriber, 0, ZMQ_POLLIN, 0}};
  // the subscriber thread runs until it received all of the messages
  std::thread subs_thread([&subscriber, &p, &nrx, &nmsg]() {
    while (true) {
      zmq::message_t rx_msg;
      // when timeout (the third argument here) is -1,
      // then block until ready to receive
      zmq::poll(p.data(), 1, -1);
      if (p[0].revents & ZMQ_POLLIN) {
        // received something on the first (only) socket
        subscriber.recv(&rx_msg);
        std::string rx_str;
        rx_str.assign(static_cast<char *>(rx_msg.data()), rx_msg.size());
        std::cout << "Received: " << rx_str << std::endl;
        if (++nrx == nmsg)
          break;
      }
    }
  });

  // let's publish a few "Hello" messages
  for (size_t i = 0; i < nmsg; ++i) {
    // create a message
    std::stringstream s;
    s << "Hello " << i;
    auto msg = s.str();
    zmq::message_t message(msg.length());
    memcpy(message.data(), msg.c_str(), msg.length());
    publisher.send(message);
  }

  subs_thread.join();
  return 0;
}

Diving in to C++ has been a lot of fun, and I have a lot more stuff to share, so stay tuned!

"
\n

RSS License: CC BY-SA 4.0

"