Skip to content

Behaviour of ZMQ_CONFLATE in PUB/SUB pattern #3323

@francesco-romano

Description

@francesco-romano

Issue description

Note: this might be the intended behaviour.

I am implementing a PUB/SUB pattern where there is one publisher (quite fast) and multiple subscribers (they have different rates).

I am interested in only the most recent message. So I thought to enable the ZMQ_CONFLATE options.

My understanding (just from the doc, I did not check the code) is that only the newest message is retained -- a queue of size 1 and with the most recent message. Something that we cannot do with the high water mark that discards the new messages.
I am hypothesising that the "user" code sends a message (zmq_send), the thread pool in the zmq_context will handle the message to be sent on the socket. If a newer message comes before the old one is sent, then (with ZMQ_CONFLATE = true), the old message is discarded and the new one is sent. The same on the receiver size. If the user code is slow in calling zmq_recv, only the last message is read.

Now, this works perfectly as long as I have 1 publisher and 1 subscriber.
If I add more subscribers, only the first one that connects get the messages (using tcp) or the last one (using ipc).
Note that is the publisher side that control the behaviour. Adding ZMQ_CONFLATE in the subscriber has no effect (in the sense that the subscribers are able to connect)

So, my question is: is this the intended behaviour?
Does ZMQ_CONFLATE set on the publisher side discards the message after it is sent to the first subscriber (and not because a new zmq_send has been called)?

Environment

I tested this on macOS 13.6, lib zmq (installed by homebrew) 4.2.5.
I also tested between two Debian (4.2.3-1 and 4.2.1-4)

The code I run, is the following:

Code

Publisher:

#include <zmq.h>

#include <cstdlib>
#include <chrono>
#include <iostream>
#include <thread>
#include <atomic>
#include <csignal>

static std::atomic<bool> s_shutdown(false);

void signal_handler(int) {
    s_shutdown = true;
}

int main(int argc, char** argv)
{
    signal(SIGINT, signal_handler);
    signal(SIGTERM, signal_handler);

    void* context = zmq_ctx_new();
    void* output = zmq_socket(context, ZMQ_PUB);


    // We set the internal queue to have only one (the most recent) message and throw away
    // everything else.
    int trueValue = 1;
    zmq_setsockopt(output, ZMQ_CONFLATE, &trueValue, sizeof(int));

    // We are ready. Create the endpoints.
//    zmq_bind(output, "tcp://*:10000");
    zmq_bind(output, "ipc:///tmp/zmq_test");


    unsigned i = 0;
    while(true) {

        if (s_shutdown) {
            break;
        }

        if (zmq_send(output, &i, sizeof(unsigned), 0) ==  -1) {
            std::cerr << "Failed to send message." << std::endl;
        } else {
            std::cerr << "Sending " << i << std::endl;
        }
        i++;

        // fake a sleep (1ms)
        std::this_thread::sleep_for(std::chrono::milliseconds(1));
    }

    zmq_close(output);
    zmq_ctx_term(context);
    return 0;
}

Subscriber

#include <zmq.h>

#include <cstdlib>
#include <atomic>
#include <csignal>
#include <iostream>
#include <thread>

static std::atomic<bool> s_shutdown(false);

void signal_handler(int) {
    s_shutdown = true;
}

int main(int argc, char** argv)
{
    signal(SIGINT, signal_handler);
    signal(SIGTERM, signal_handler);

    void* context = zmq_ctx_new();

    void* input = zmq_socket(context, ZMQ_SUB);

    // subscribe to everything
    zmq_setsockopt(input, ZMQ_SUBSCRIBE, "", 0);
    // Setting the buffer to accept a single message and to drop old messages.
    int trueValue = 1;
     zmq_setsockopt(input, ZMQ_CONFLATE, &trueValue, sizeof(int));

//    zmq_connect(input, "tcp://localhost:10000");
    zmq_connect(input, "ipc:///tmp/zmq_test");

    while(true) {
        // reading the state
        if (s_shutdown) {
            break;
        }

        unsigned readValue = 0;
        int readBytes = zmq_recv(input, &readValue, sizeof(unsigned), ZMQ_DONTWAIT);
        if (readBytes > 0) {
            std::cerr << "Read " << readValue << std::endl;
        }


        // fake a sleep (100ms)
        std::this_thread::sleep_for(std::chrono::milliseconds(100));
    }

    zmq_close(input);
    zmq_ctx_term(context);
    return 0;
}

Commenting the zmq_setsockopt(output, ZMQ_CONFLATE, &trueValue, sizeof(int)); in the publisher allows multiple subscribers to receive the message.

Metadata

Metadata

Assignees

No one assigned

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions