We're updating the issue view to help you get more done. 

Race condition when using ZMQ_IDENTITY

Description

There appears to be a race condition with opening/closing/re-opening a ZMQ_REQ socket that has it's identity explicitly set via zmq_setsockopt using ZMQ_IDENTITY. If the socket is immediately re-opened, any message sent on the new ZMQ_REQ socket isn't received by the ZMQ_REP socket. This also occurs if the ZMQ_REP socket is replaced with a ZMQ_ROUTER socket (This is where the problem was initially noticed).

The race condition doesn't occur if there's a 1 second sleep added between closing the socket and re-opening it.

The test program below demonstrates the problem.
It does the following:
. The thread '_Server' is created in main() and performs the following:

  • Create a ZMQ_REP socket.

  • Bind to 'inproc:://server'.

  • In a loop, poll for input on the ZMQ_REP socket and send/echo all
    messages received back through the socket.

. main() performs the following in a loop:

  • Create a ZMQ_REQ socket.

  • Set the socket identity to 'test_id'

  • Connect to 'inproc:://server'.

  • Send 'test message'

  • Receive and print the message that was sent back.

  • Close the socket.

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 // Test program compiled with : g++ test.c -lzmq -pthread #include <assert.h> #include <pthread.h> #include <unistd.h> #include <zmq.h> void* context; void* _Server(void*) { void* sock = zmq_socket(context, ZMQ_REP); zmq_bind(sock, "inproc://server"); zmq_pollitem_t items[1]; items[0].socket = sock; items[0].events = ZMQ_POLLIN; items[0].revents = 0; while (1) { int rc = zmq_poll(items, 1, -1); assert(rc >= 0); int more; size_t size = sizeof(more); char buf[256]; do { int num = zmq_recv(sock, buf, sizeof buf, 0); rc = zmq_getsockopt(sock, ZMQ_RCVMORE, &more, &size); assert(rc == 0); zmq_send(sock, buf, num, more ? ZMQ_SNDMORE : 0); } while (more); } return 0; } int main (int argc, char *argv[]) { context = zmq_ctx_new(); pthread_t server_thread; pthread_create(&server_thread, 0, &_Server, 0); // Give the server thread a change to start... sleep(1); unsigned i = 0; const char ID[] = "test_id"; while (1) { void* req_socket = zmq_socket(context, ZMQ_REQ); zmq_setsockopt(req_socket, ZMQ_IDENTITY, ID, sizeof ID); zmq_connect(req_socket, "inproc://server"); zmq_send(req_socket, "test message", sizeof "test message", 0); char buf[256]; int num = zmq_recv(req_socket, buf, sizeof buf, 0); printf("Received: '%s'\n", buf); zmq_close(req_socket); // Uncommenting the following line makes the race condition go away. // sleep(1); } return 0; }

Environment

Compiled/Run on:
Linux Bonobo-Extreme 3.8.0-26-generic #38-Ubuntu SMP Mon Jun 17 21:43:33 UTC 2013 x86_64 x86_64 x86_64 GNU/Linux

8 cores - Intel(R) Core(TM) i7-3630QM CPU @ 2.40GHz
gcc version: 4.7.3

Status

Assignee

Unassigned

Reporter

John Fardo

Components

Affects versions

3.2.3

Priority

Major