Hi Luca,
Unfortunately I'm not familiar with libzmq internals, so I can't decide
whether or not EAGAIN is appropriate. But as a library user I expect either:
sending a message on a mandatory router socket not to block when queue is
below HWM
-or-
documentation explicitly stating that ZMQ_ROUTER_MANDATORY must always be
used with ZMQ_DONTWAIT because the socket may occasionally block
(independent of HWM).
I attached a code example to demonstrate the problem. The router socket
send blocks, HWM is not reached (only 1 message in the queue), and the
socket never recovers as the pipe never returns to a state where it accepts
messages.
I agree that this is a corner case, the timeframe when the socket may block
is really short (sending exactly the same moment the peer disconnects), but
still the operation can't be called non-blocking. The attached example
triggers the issue with high occurrence rate by sending the message when
the monitor reports peer disconnect, but I could also reproduce the issue
without the monitor event (much lower occurrence rate of course).
With my very limited knowledge of the library internals I would replace the
condition:
router.cpp:213 if (it != outpipes.end ())
with something like this:
router.cpp:213 if (it != outpipes.end () &&
it->second.pipe->check_active()) // (out_active && state == active)
but it's probably not that simple. :)
Regards,
Gyorgy
On Sat, Feb 18, 2017 at 7:32 PM, Luca Boccassi <[email protected]>
wrote:
> On Fri, 2017-02-17 at 10:53 +0100, Gyorgy Szekely wrote:
> > Hi,
> > Sorry for spamming the list :( I will rate limit myself.
> >
> > I reviewed the docs for ZMQ_ROUTER_MANDATORY and it's clear now that
> > the
> > router socket may block if the message can be routed but HWM is
> > reached and
> > ZMQ_DONTWAIT is not specified. This is the exact code path my
> > application
> > blocks in.
> >
> > The problem is that HWM is not reached in my case.
> > zmq::router_t::xsend()
> > checks HWM with zmq::pipe_t::check_write(), which returns false, but
> > not
> > because HWM is reached, but beacuse pipe state is
> > zmq::pipe_t::waiting_for_delimiter.
> >
> > Summary:
> > I don't think it's reasonable for zmq::router_t::xsend() to return -1
> > EAGAIN, when the corresponding pipe is being terminated. It's obvious
> > that
> > the message can't be sent in the future, there's no point in
> > retrying.
> >
> > (For the time being, as a workaround I specify ZMQ_DONTWAIT on the
> > send,
> > and I consider the worker dead with either EHOTUNREACH or EAGAIN.)
> >
> > What's your opinion on this?
> >
> >
> > Regards,
> > Gyorgy
>
> Is the pipe terminated when the underlying socket is disconnected? I
> can't remember and I'd have to double check, but if that's the case
> then it could come back, so EAGAIN would be appropriate, right?
>
> Also the check_write just returns true/false, and given it's in the hot
> path I'd be wary of overloading it to cater for a single corner case.
>
> > On Thu, Feb 16, 2017 at 10:44 PM, Gyorgy Szekely <[email protected]
> > >
> > wrote:
> >
> > > Hi,
> > > I dug a bit deeper, here are my findings:
> > > - removing the on/off switching for the ZMQ_ROUTER_MANDATORY flag,
> > > and
> > > enabling it before the router socket bind: makes no difference
> > > - removing the monitor trigger and heartbeating the workers
> > > periodically
> > > (2.5 sec) drastically reduces the occurrence rate, the program
> > > hangs after
> > > 3-4 hours, instead of seconds. (in the background a worker
> > > connects/disconnects with 4 second period time)
> > >
> > > From this I suspect the issue appears in a small timeframe which is
> > > close
> > > to the monitor event, but otherwise hard to hit.
> > >
> > > With GDB is see the following:
> > > - in zmq::socket_base_t::send() the call to xsend() returns EAGAIN.
> > > This
> > > should not happen since the ZMQ_DONTWAIT is not specified.
> > > - ZMQ_DONTWAIT is not specified, so the function won't return -1,
> > > but
> > > block (see trace in prev mail).
> > >
> > > - inside zmq::router_t::xsend() the pipe is found in the outpipes
> > > map, but
> > > the check_write() on it returns false
> > > - the if(mandatory) check in this block (router.cpp:218) returns
> > > with -1,
> > > EAGAIN
> > > - a similar block 10 lines below returns with -1, EHOSTUNREACH
> > >
> > > Should both if(mandatory) checks return EHOSTUNREACH? There's also
> > > a
> > > comment in the header for bool mandatory, that it will report
> > > EAGAIN, but
> > > this contradicts with the documentation.
> > >
> > > Can you help to clarify?
> > >
> > >
> > > Regards,
> > > Gyorgy
> > >
> > >
> > > It
> > >
> > > On Thu, Feb 16, 2017 at 12:22 PM, Gyorgy Szekely <[email protected]
> > > om>
> > > wrote:
> > >
> > > > Hi,
> > > > Continuing my journey on detecting dead workers I reduced the
> > > > design to
> > > > the minimal, and eliminated the messy file descriptors.
> > > > I only have:
> > > > - a router socket, with some number of peers
> > > > - a monitor socket attached to the router socket
> > > >
> > > > When the monitor detects a disconnect on the router socket:
> > > > - do setsockopt(ZMQ_ROUTER_MANDATORY, 1);
> > > > - send heartbeat message to every known peer
> > > > - if EHOSTUNREACH returned: remove the peer
> > > > - do setsockopt(ZMQ_ROUTER_MANDATORY, 0);
> > > >
> > > > What happens: _my app regularly hangs_ in zmq_msg_send(). Roughly
> > > > 20% of
> > > > the invocations. The call never returns, I have to kill the
> > > > application.
> > > >
> > > > What am I doing wrong??? According to the RFC's router sockets
> > > > should
> > > > never block.
> > > > I attached a full stacktrace with info locals and args for each
> > > > relevant
> > > > frame (sorry for the machine readable format).
> > > >
> > > > Env:
> > > > libzmq 4.2.1 stable, debug build
> > > > Ubuntu 16.04 64bit (the same happens with ubuntu packaged lib)
> > > >
> > > > Regards,
> > > > Gyorgy
> > > >
> > > >
> >
> > _______________________________________________
> > zeromq-dev mailing list
> > [email protected]
> > https://lists.zeromq.org/mailman/listinfo/zeromq-dev
> _______________________________________________
> zeromq-dev mailing list
> [email protected]
> https://lists.zeromq.org/mailman/listinfo/zeromq-dev
>
#include <iostream>
#include <future>
#include <chrono>
#include <vector>
#include <string>
#include <zmq_addon.hpp>
using namespace std::chrono_literals;
inline std::string asHex(const std::string& str)
{
std::ostringstream oss;
oss << std::hex << std::uppercase << std::setw(2) << std::setfill('0');
for (unsigned char c : str)
oss << static_cast<unsigned int>(c);
return oss.str();
}
/* Client - Server
* Dealer - Router(MANDATORY)
*
* Client is represented by a separate thread and zmq context, it repeats the following:
* sends a single message to the server; sleeps 3 sec; dies
*
* The server runs on the main thread, and receives messages from the clients. It
* sends a single reply to each client. The reply is triggered by the disconnect monitor event.
*
* Expected behavior: sending the reply fails with EHOSTUNREACH
* Actual behvior: sending blocks, even though the HWM is not reached
*/
int main(void)
{
auto client = std::async(std::launch::async, [](){
while (true) {
std::cout << "Client connecting and sending msg" << std::endl;
zmq::context_t ctx;
zmq::socket_t sock(ctx, zmq::socket_type::dealer);
sock.setsockopt(ZMQ_LINGER, 0);
sock.connect("tcp://127.0.0.1:7777");
zmq::multipart_t msg;
msg.addstr("blabla");
msg.send(sock);
std::this_thread::sleep_for(3s);
}
});
// create router socket
zmq::context_t ctx;
zmq::socket_t server(ctx, zmq::socket_type::router);
server.setsockopt(ZMQ_ROUTER_MANDATORY, 1);
server.bind("tcp://*:7777");
// create monitor on router socket with disconnect event
zmq_socket_monitor(static_cast<void*>(server), "inproc://monitor", ZMQ_EVENT_DISCONNECTED);
zmq::socket_t monitor(ctx, zmq::socket_type::pair);
monitor.connect("inproc://monitor");
std::string identity;
std::vector<zmq::pollitem_t> pollset{ { static_cast<void*>(server), 0, ZMQ_POLLIN, 0 },
{ static_cast<void*>(monitor), 0, ZMQ_POLLIN, 0 } };
while(true) {
zmq::poll(pollset);
if (pollset[0].revents == ZMQ_POLLIN) {
zmq::multipart_t msg;
msg.recv(server);
identity = msg.popstr();
std::cout << "Received client message, identity: " << asHex(identity) << std::endl;
}
if (pollset[1].revents == ZMQ_POLLIN) {
std::cout << "Monitor event" << std::endl;
zmq::multipart_t msgin, msgout;
msgin.recv(monitor);
// discard monitor msg
// send a message on the server socket, should throw, instead it blocks
msgout.addstr(identity);
msgout.addstr("dummy");
try {
msgout.send(server);
} catch (const zmq::error_t& e) {
std::cout << "Got error: " << e.what() << std::endl;
}
}
}
}
#if 0
Output:
Client connecting and sending msg
Received client message, identity: 006B8B4567
Client connecting and sending msg
Monitor event
Client connecting and sending msg
Client connecting and sending msg
<main thread blocked>
Env:
Ubuntu 16.04 64bit
libzmq 4.2.1 stable (debug build)
#endif
_______________________________________________
zeromq-dev mailing list
[email protected]
https://lists.zeromq.org/mailman/listinfo/zeromq-dev