The sync is actually used for thread safe socket, I don't think it is a good idea to use it.
Actually we already have thread safe sockets, we can use Radio-dish. Maybe creating a socket option to use radio-dish instead of pair, by default the monitor will use pair, if the new option (ZMQ_MONITOR_USE_RADIO?) is enabled socket will create radio socket instead. On Tue, Oct 11, 2016 at 2:03 PM, Auer, Jens <[email protected]> wrote: > Hi, > > > > I’ve patched socket_base_t to protect the monitor socket with a mutex. There > was an unused mutex “sync” already available in socket_base_t so I’ve used > this J. If you want I can upload a pullrequest. > > > > Cheers, > > Jens > > > > -- > > Dr. Jens Auer | CGI | Software Engineer > > CGI Deutschland Ltd. & Co. KG > Rheinstraße 95 | 64295 Darmstadt | Germany > > T: +49 6151 36860 154 > > [email protected] > > Unsere Pflichtangaben gemäß § 35a GmbHG / §§ 161, 125a HGB finden Sie unter > de.cgi.com/pflichtangaben. > > > > CONFIDENTIALITY NOTICE: Proprietary/Confidential information belonging to > CGI Group Inc. and its affiliates may be contained in this message. If you > are not a recipient indicated or intended in this message (or responsible > for delivery of this message to such person), or you think for any reason > that this message may have been addressed to you in error, you may not use > or copy or deliver this message to anyone else. In such case, you should > destroy this message and are asked to notify the sender by reply e-mail. > > > > From: zeromq-dev [mailto:[email protected]] On Behalf Of > Auer, Jens > Sent: 11 October 2016 09:31 > To: ZeroMQ development list > Subject: Re: [zeromq-dev] Assertion failure with socket monitor when clients > disconnect > > > > Hi Doron, > > I'm not a big fun of monitoring and prefer others solution. I actually think > of deprecating it, the implementaion is buggy and actually violate the > don't share the socket between threads, which is probably what causing your > issue. > > That’s quite a shock given that it is part of the stable API. I think in > this case it should not only be deprecated but removed from API completely. > On the other hand, something to monitor the connections is probably needed. > Our clients are very concerned with TCP connections and would always insist > on logging capabilities of TCP connection state changes. It would also be > nice to get some statistics from the connections, e.g. number of received > bytes or messages. > > Anyway, you have two options, I think you can manage without the > monitoring, I can try and help you find other solutions. Another option is > to try and not listen to all events, maybe this will avoid the sharing > violation. > > I would like to replace the monitor with something else, but I am not sure > how to do this given our requirements. Currently, we use the monitor to > > - Log TCP connection events for connect, disconnect, accept, listen > and reconnection retry events > > - Limit the number of reconnection attempts > > Unfortunately this includes disconnect events which are exactly the events > causing the crashes right now. The other events are probably rare enough > that there is no problem. > > Is there another way to implement this without using the socket monitor? We > use Router, Dealer, Sub, XPub and Stream sockets. I have full control over > the protocol between the Router/Dealer, but I cannot change the protocol > between Pub/Sub and external clients over Stream sockets, so I cannot add > control messages here. > > I think an easy fix for my issues would be to add a mutex to protect the > monitor socket in socket_base_t. I guess this was not done because it would > block the thread and probably impact performance, but at least it will work > correctly and not crash. It should be good enough for our use-case. > > An idea for a non-blocking solution would be to have an independent monitor > class as there are listener and accepter classes which has an inproc SUB > socket and the PAIR socket. Each ZeroMQ socket would then create a monitor > when it is created, and each session object would have a PUB socket to > broadcast events to the monitor. The monitor then forwards events received > from individual clients/sessions on different IO threads to the PAIR socket > where the application code can connect. > > Best wishes, > > Jens > > > > > > On Oct 10, 2016 17:45, "Auer, Jens" <[email protected]> wrote: > > Hi, > > I have an issue with socket monitor on a ZMQ_STREAM socket. We have > recognized that our sometimes software crashes with an internal ZeroMQ > assertions when non-ZeroMQ-clients disconnect. > I can reproduce this with a small test program that > - starts a context with 6 threads > - create a ZMQ_SUB socket with a monitor > - create a ZMQ_PAIR socket and connect it to the monitor for the SUB socket > - creates a ZMQ_STREAM socket and binds it > - creates a socket monitor with a ZMQ_PAIR socket to get events > - runs in loop zmq_polling on the two sockets > > When I start 500 instances of this program and connect 6 clients to each of > them (I use netcat >/dev/null), and then kill the clients, I randomly get > crashes with ZeroMQ assertion failures. > Most of the time the failure is in the code processing the socket monitor > event which I copied from the man page: > Program terminated with signal 6, Aborted. > #0 0x00007fb5644435f7 in raise () from /lib64/libc.so.6 > Missing separate debuginfos, use: debuginfo-install apr-1.4.8-3.el7.x86_64 > apr-util-1.5.2-6.el7.x86_64 cyrus-sasl-lib-2.1.26-20.el7_2.x86_64 > expat-2.1.0-8.el7.x86_64 glibc-2.17-106.el7_2.6.x86_64 > libdb-5.3.21-19.el7.x86_64 libgcc-4.8.5-4.el7.x86_64 > libstdc++-4.8.5-4.el7.x86_64 libuuid-2.23.2-26.el7_2.2.x86_64 > log4cxx-0.10.0-16.el7.x86_64 nspr-4.11.0-1.el7_2.x86_64 > nss-3.21.0-9.el7_2.x86_64 nss-softokn-freebl-3.16.2.3-14.2.el7_2.x86_64 > nss-util-3.21.0-2.2.el7_2.x86_64 openldap-2.4.40-9.el7_2.x86_64 > openssl-libs-1.0.1e-51.el7_2.5.x86_64 zlib-1.2.7-15.el7.x86_64 > (gdb) bt > #0 0x00007fb5644435f7 in raise () from /lib64/libc.so.6 > #1 0x00007fb564444ce8 in abort () from /lib64/libc.so.6 > #2 0x00007fb56443c566 in __assert_fail_base () from /lib64/libc.so.6 > #3 0x00007fb56443c612 in __assert_fail () from /lib64/libc.so.6 > #4 0x00000000004b16f9 in get_monitor_event (monitor=0x2512b90, value=0x0, > address=0x0) at /home/auerj/MDAF/src/test/pepSim.cpp:47 > #5 0x00000000004b423e in main (argc=3, argv=0x7fffe10a2668) at > /home/auerj/MDAF/src/test/pepSim.cpp:268 > > Sometimes, I get an internal zmq assertion: > Program terminated with signal 6, Aborted. > #0 0x00007f3e75f185f7 in raise () from /lib64/libc.so.6 > Missing separate debuginfos, use: debuginfo-install apr-1.4.8-3.el7.x86_64 > apr-util-1.5.2-6.el7.x86_64 cyrus-sasl-lib-2.1.26-20.el7_2.x86_64 > expat-2.1.0-8.el7.x86_64 glibc-2.17-106.el7_2.6.x86_64 > libdb-5.3.21-19.el7.x86_64 libgcc-4.8.5-4.el7.x86_64 > libstdc++-4.8.5-4.el7.x86_64 libuuid-2.23.2-26.el7_2.2.x86_64 > log4cxx-0.10.0-16.el7.x86_64 nspr-4.11.0-1.el7_2.x86_64 > nss-3.21.0-9.el7_2.x86_64 nss-softokn-freebl-3.16.2.3-14.2.el7_2.x86_64 > nss-util-3.21.0-2.2.el7_2.x86_64 openldap-2.4.40-9.el7_2.x86_64 > openssl-libs-1.0.1e-51.el7_2.5.x86_64 zlib-1.2.7-15.el7.x86_64 > (gdb) bt > #0 0x00007f3e75f185f7 in raise () from /lib64/libc.so.6 > #1 0x00007f3e75f19ce8 in abort () from /lib64/libc.so.6 > #2 0x00007f3e76cff769 in zmq::zmq_abort > (errmsg_=errmsg_@entry=0x7f3e76d328ed "check ()") at src/err.cpp:83 > #3 0x00007f3e76d06232 in zmq::msg_t::size (this=this@entry=0x7ffd200e9a50) > at src/msg.cpp:254 > #4 0x00007f3e76d2e035 in zmq_msg_size (msg_=msg_@entry=0x7ffd200e9a50) at > src/zmq.cpp:627 > #5 0x00007f3e76d2e415 in s_recvmsg (s_=<optimized out>, > msg_=0x7ffd200e9a50, flags_=<optimized out>) at src/zmq.cpp:459 > #6 0x00000000004b1654 in get_monitor_event (monitor=0x1d3fb90, value=0x0, > address=0x0) at /home/auerj/MDAF/src/test/pepSim.cpp:45 > #7 0x00000000004b423e in main (argc=3, argv=0x7ffd200e9d68) at > /home/auerj/MDAF/src/test/pepSim.cpp:268 > > I've added assertions to the internal function calls in ZeroMQ to better > trace the error. It turns out that the message is already corrupted when > it is read from the internal pipe: > #0 0x00007fd3bc9285f7 in raise () from /lib64/libc.so.6 > Missing separate debuginfos, use: debuginfo-install apr-1.4.8-3.el7.x86_64 > apr-util-1.5.2-6.el7.x86_64 cyrus-sasl-lib-2.1.26-20.el7_2.x86_64 > expat-2.1.0-8.el7.x86_64 glibc-2.17-106.el7_2.6.x86_64 > libdb-5.3.21-19.el7.x86_64 libgcc-4.8.5-4.el7.x86_64 > libstdc++-4.8.5-4.el7.x86_64 libuuid-2.23.2-26.el7_2.2.x86_64 > log4cxx-0.10.0-16.el7.x86_64 nspr-4.11.0-1.el7_2.x86_64 > nss-3.21.0-9.el7_2.x86_64 nss-softokn-freebl-3.16.2.3-14.2.el7_2.x86_64 > nss-util-3.21.0-2.2.el7_2.x86_64 openldap-2.4.40-9.el7_2.x86_64 > openssl-libs-1.0.1e-51.el7_2.5.x86_64 zlib-1.2.7-15.el7.x86_64 > (gdb) bt > #0 0x00007fd3bc9285f7 in raise () from /lib64/libc.so.6 > #1 0x00007fd3bc929ce8 in abort () from /lib64/libc.so.6 > #2 0x00007fd3bd70f769 in zmq::zmq_abort > (errmsg_=errmsg_@entry=0x7fd3bd742d08 "queue.front().check()") at > src/err.cpp:83 > #3 0x00007fd3bd71eb1f in zmq::ypipe_t<zmq::msg_t, 256>::read > (this=0x1f978f0, value_=0x7ffd760e8540) at src/ypipe.hpp:346 > #4 0x00007fd3bd71e259 in zmq::pipe_t::read (this=0x1f9f9a0, > msg_=msg_@entry=0x7ffd760e8540) at src/pipe.cpp:162 > #5 0x00007fd3bd71c63e in zmq::pair_t::xrecv (this=0x1f96b90, > msg_=0x7ffd760e8540) at src/pair.cpp:114 > #6 0x00007fd3bd728b5b in zmq::socket_base_t::recv (this=0x1f96b90, > msg_=msg_@entry=0x7ffd760e8540, flags_=0) at src/socket_base.cpp:910 > #7 0x00007fd3bd73e809 in s_recvmsg (s_=<optimized out>, > msg_=0x7ffd760e8540, flags_=<optimized out>) at src/zmq.cpp:456 > #8 0x00000000004b1654 in get_monitor_event (monitor=0x1f96b90, value=0x0, > address=0x0) at /home/auerj/MDAF/src/test/pepSim.cpp:45 > #9 0x00000000004b423e in main (argc=3, argv=0x7ffd760e8858) at > /home/auerj/MDAF/src/test/pepSim.cpp:268 > > The assertion is triggered by additional checks in the ypipe_t class which I > specialized for zmq::msg_t to add the checks. The code is: > 311 inline bool check_read () > 312 { > 313 // Was the value prefetched already? If so, return. > 314 if (&queue.front () != r && r) > 315 return true; > 316 > 317 // There's no prefetched value, so let us prefetch more > values. > 318 // Prefetching is to simply retrieve the > 319 // pointer from c in atomic fashion. If there are no > 320 // items to prefetch, set c to NULL (using > compare-and-swap). > 321 r = c.cas (&queue.front (), NULL); > 322 > 323 // If there are no elements prefetched, exit. > 324 // During pipe's lifetime r should never be NULL, however, > 325 // it can happen during pipe shutdown when items > 326 // are being deallocated. > 327 if (&queue.front () == r || !r) > 328 return false; > 329 > 330 zmq_assert( queue.front().check() ); > 331 > 332 // There was at least one value prefetched. > 333 return true; > 334 } > 335 > 336 // Reads an item from the pipe. Returns false if there is no > value. > 337 // available. > 338 inline bool read (T *value_) > 339 { > 340 // Try to prefetch a value. > 341 if (!check_read ()) > 342 return false; > 343 > 344 // There was at least one value prefetched. > 345 // Return it to the caller. > 346 zmq_assert( queue.front().check() ); > 347 *value_ = queue.front (); > 348 zmq_assert( value_->check() ); > 349 queue.pop (); > 350 zmq_assert( value_->check() ); > 351 > 352 return true; > 353 } > > > I have another assertion sometimes when sending the monitor event, but here > I am not sure if this is due to my modification > adding the assertion. The stack trace is: > #0 0x00007feaecc625f7 in raise () from /lib64/libc.so.6 > #1 0x00007feaecc63ce8 in abort () from /lib64/libc.so.6 > #2 0x00007feaeda49769 in zmq::zmq_abort > (errmsg_=errmsg_@entry=0x7feaeda7ccf3 "queue.back().check()") at > src/err.cpp:83 > #3 0x00007feaeda58856 in zmq::ypipe_t<zmq::msg_t, 256>::write > (this=0xaa2670, value_=..., incomplete_=<optimized out>) at > src/ypipe.hpp:259 > #4 0x00007feaeda57645 in zmq::pipe_t::write (this=0xaac1c0, > msg_=msg_@entry=0x7feae7ce10f0) at src/pipe.cpp:215 > #5 0x00007feaeda56490 in zmq::pair_t::xsend (this=0xaa1ed0, > msg_=0x7feae7ce10f0) at src/pair.cpp:90 > #6 0x00007feaeda62971 in zmq::socket_base_t::send > (this=this@entry=0xaa1ed0, msg_=msg_@entry=0x7feae7ce10f0, > flags_=flags_@entry=2) at src/socket_base.cpp:843 > #7 0x00007feaeda7846c in s_sendmsg (s_=0xaa1ed0, msg_=0x7feae7ce10f0, > flags_=2) at src/zmq.cpp:346 > #8 0x00007feaeda62d93 in zmq::socket_base_t::monitor_event (this=0xaa16a0, > event_=event_@entry=512, value_=25, addr_="tcp://192.168.120.1:9494") at > src/socket_base.cpp:1357 > #9 0x00007feaeda62f1d in zmq::socket_base_t::event_disconnected > (this=<optimized out>, addr_="tcp://192.168.120.1:9494", fd_=<optimized > out>) at src/socket_base.cpp:1344 > #10 0x00007feaeda6b45a in zmq::stream_engine_t::error > (this=this@entry=0x7feae00022c0, > reason=reason@entry=zmq::stream_engine_t::connection_error) at > src/stream_engine.cpp:936 > #11 0x00007feaeda6c5db in zmq::stream_engine_t::in_event > (this=0x7feae00022c0) at src/stream_engine.cpp:300 > #12 0x00007feaeda493ee in zmq::epoll_t::loop (this=0xaa0830) at > src/epoll.cpp:176 > #13 0x00007feaeda73150 in thread_routine (arg_=0xaa08b0) at > src/thread.cpp:96 > #14 0x00007feaed816dc5 in start_thread () from /lib64/libpthread.so.0 > #15 0x00007feaecd23ced in clone () from /lib64/libc.so.6 > > and the code in question is > 253 inline void write (const T &value_, bool incomplete_) > 254 { > 255 zmq_assert(value_.check() ); > 256 > 257 // Place the value to the queue, add new terminator > element. > 258 queue.back () = value_; > 259 queue.push (); > 260 zmq_assert( queue.back().check() ); > 261 > 262 // Move the "flush up to here" poiter. > 263 if (!incomplete_) > 264 f = &queue.back (); > 265 } > > I can also reproduce the crashes with a simpler test program that does only > have the ZMQ_STREAM socket with its monitor and > does not open a ZMQ_SUB socket. I could not reproduce it with less threads, > i.e. the default of 2 io threads. > > The code for my test program is > #include <string> > > #include <zmq.hpp> > > static int > get_monitor_event(void *monitor, > int *value = nullptr, > char **address = nullptr) > { > // First frame in message contains event number and value > zmq_msg_t msg; > zmq_msg_init (&msg); > if (zmq_msg_recv (&msg, monitor, 0) == -1) > return -1; // Interruped, presumably > assert (zmq_msg_more (&msg)); > > uint8_t *data = (uint8_t *) zmq_msg_data (&msg); > uint16_t event = *(uint16_t *) (data); > if (value) > *value = *(uint32_t *) (data + 2); > > // Second frame in message contains event address > zmq_msg_init (&msg); > if (zmq_msg_recv (&msg, monitor, 0) == -1) > return -1; // Interruped, presumably > assert (!zmq_msg_more (&msg)); > > if (address) { > data = (uint8_t *) zmq_msg_data (&msg); > size_t size = zmq_msg_size (&msg); > *address = (char *) malloc (size + 1); > memcpy (*address, data, size); > *address [size] = 0; > } > return event; > } > > int main(int argc, char* argv[]) > { > std::string const address = argv[1]; > std::string const port = argv[2]; > std::string const stream_monitor_address("inproc://stream_monitor"); > std::string const sub_monitor_address("inproc://sub_monitor"); > > zmq::context_t ctx(6); > zmq::socket_t stream(ctx, ZMQ_STREAM); > zmq::socket_t sub(ctx, ZMQ_SUB); > > zmq_socket_monitor( static_cast<void*>(stream), > stream_monitor_address.c_str(), ZMQ_EVENT_ALL); > zmq_socket_monitor( static_cast<void*>(sub), > sub_monitor_address.c_str(), ZMQ_EVENT_ALL); > > zmq::socket_t stream_monitor(ctx, ZMQ_PAIR); > zmq::socket_t sub_monitor(ctx, ZMQ_PAIR); > > stream_monitor.connect( stream_monitor_address.c_str() ); > sub_monitor.connect( sub_monitor_address.c_str() ); > > stream.bind("tcp://" + address + ":" + port); > //sub.connect("tcp://127.0.0.1:6000"); > > std::vector<zmq::message_t> clientIds; > > while(1) > { > zmq::pollitem_t items[4] = { > {static_cast<void*>(stream_monitor), -1, ZMQ_POLLIN, 0}, > { static_cast<void*>(stream), -1, ZMQ_POLLIN, 0}, > {static_cast<void*>(sub_monitor), -1, ZMQ_POLLIN, 0}, > { static_cast<void*>(sub), -1, ZMQ_POLLIN, 0}, > > }; > > auto rc = zmq::poll( items, 4 ); > > if (rc != -1) > { > if (0 != (items[1].revents & ZMQ_POLLIN) ) > { > zmq::message_t id; > zmq::message_t m; > > stream.recv(&id); > stream.recv(&m); > } > if (0 != (items[3].revents & ZMQ_POLLIN) ) > { > } > if (0 != (items[0].revents & ZMQ_POLLIN)) > { > std::cout << "STREAM MONITOR " << get_monitor_event( > static_cast<void*>(stream_monitor), nullptr, nullptr) << std::endl; > } > if (0 != (items[2].revents & ZMQ_POLLIN) ) > { > std::cout << "SUB MONITOR " << get_monitor_event( > static_cast<void*>(sub_monitor), nullptr, nullptr) << std::endl; > } > } > } > > return 0; > } > > It uses the C++ interface from > https://github.com/zeromq/cppzmq/blob/master/zmq.hpp. > > Best wishes, > Jens Auer > > -- > > Jens Auer | > CGI | Software-Engineer > > CGI (Germany) GmbH & Co. KG > > Rheinstraße 95 | 64295 Darmstadt | Germany > T: +49 6151 36860 154 > > [email protected] > > Unsere Pflichtangaben gemäß § 35a GmbHG / §§ 161, 125a HGB finden Sie unter > de.cgi.com/pflichtangaben. > > > CONFIDENTIALITY NOTICE: Proprietary/Confidential information belonging to > CGI Group Inc. and its affiliates may be contained in this message. If you > are not a recipient indicated or intended in this message > (or responsible for delivery of this message to such person), or you think > for any reason that this message may have been addressed to you in error, > you may not use or copy or deliver this message to anyone else. In such > case, you should destroy this message > and are asked to notify the sender by reply e-mail. > > > > > > > > > > > > > _______________________________________________ > zeromq-dev mailing list > [email protected] > http://lists.zeromq.org/mailman/listinfo/zeromq-dev > > > _______________________________________________ > zeromq-dev mailing list > [email protected] > http://lists.zeromq.org/mailman/listinfo/zeromq-dev _______________________________________________ zeromq-dev mailing list [email protected] http://lists.zeromq.org/mailman/listinfo/zeromq-dev
