I suspect there is a bug within ZMQ, that or I'm doing something stupid.
Actually, the latter is probably more likely as I'd like to think that this
would have surfaced elsewhere if it was a bug. Attached is minimal test
program which reproduces the issue on my system. In my test, ZMQ is
statically linked to the test program. This is a Windows program compiled
in Visual Studio 2015. Once running, after roughly 2 mins 40 seconds it
crashes. There's an error message that says "abort() has been called".
Could someone please confirm that they also see the crash, and if I am
doing something stupid please feel free to tell me.
James
On 30 September 2016 at 11:49, James Chapman <[email protected]> wrote:
> Thanks, I'll move to a shared context that persists for the duration of
> the process.
>
> Sockets are one per thread, in fact, as the threads are re-used, each
> thread will create many sockets over its lifetime.
>
> -James
>
>
>
> On 30 September 2016 at 11:24, Luca Boccassi <[email protected]>
> wrote:
>
>> You can (and probably should as best practise) reuse the context,
>> which is thread safe.
>>
>> Do not use the same socket from multiple threads. There is a new
>> category of thread-safe sockets in libzmq master but the API is not
>> yet finalised.
>>
>
>
#include <iostream>
#include <string>
#include <sstream>
#include <thread>
#include <chrono>
#include <WinSock2.h>
#include <windows.h>
#include <stdio.h>
#include <Shlwapi.h>
#pragma comment(lib, "ws2_32.lib")
#pragma comment(lib, "Shlwapi.lib")
#include <zmq.hpp>
#define MSG_SIZE 1024
BOOL CtrlHandler(DWORD fdwCtrlType)
{
switch (fdwCtrlType)
{
// Handle the CTRL-C signal.
case CTRL_C_EVENT:
return(FALSE);
// CTRL-CLOSE: confirm that the user wants to exit.
case CTRL_CLOSE_EVENT:
return(FALSE);
// Pass other signals to the next handler.
case CTRL_BREAK_EVENT:
return FALSE;
case CTRL_LOGOFF_EVENT:
return FALSE;
case CTRL_SHUTDOWN_EVENT:
return FALSE;
default:
return FALSE;
}
}
/**
* SERVER: Bind and listen on tcp://127.0.0.5:9995
*/
void startServer(zmq::context_t * pContext)
{
auto pZmqSocket = new zmq::socket_t(*pContext, ZMQ_REP);
pZmqSocket->bind("tcp://127.0.0.5:9995");
zmq::pollitem_t pollItem = { *pZmqSocket, 0, ZMQ_POLLIN, 0 };
bool bReceived = false;
int nRetries = 5;
while (1)
{
zmq::poll(&pollItem, 1, 3 * 1000);
bReceived = false;
if (pollItem.revents & ZMQ_POLLIN)
{
auto pZmqMsgIn = new zmq::message_t();
try
{
bReceived = pZmqSocket->recv(pZmqMsgIn);
}
catch (std::exception& e)
{
std::stringstream streamExceptionMsg;
streamExceptionMsg << "Exception caught during reply message socket.recv()" << " : " << e.what();
printf("SERVER: %s\n", streamExceptionMsg.str().c_str());
}
if (bReceived)
{
printf("SERVER: message received\n");
auto pZmqMsgOut = new zmq::message_t();
pZmqSocket->send(*pZmqMsgOut);
delete pZmqMsgOut;
}
delete pZmqMsgIn;
}
else
{
--nRetries;
if (nRetries == 0)
{
std::stringstream ss;
ss << "Sent message but got no response";
printf("SERVER: retry timeout\n");
}
}
} // end while
}
/**
* CLIENT: Connect to tcp://127.0.0.5:9995, send message, wait for reply, repeat
*/
void startClient(zmq::context_t * pContext)
{
while (1)
{
auto pZmqSocket = new zmq::socket_t(*pContext, ZMQ_REQ);
auto pZmqMsgOut = new zmq::message_t(MSG_SIZE);
pZmqSocket->connect("tcp://127.0.0.5:9995");
int linger = 1000;
pZmqSocket->setsockopt(ZMQ_LINGER, &linger, sizeof(int));
zmq::pollitem_t items[] = { { *pZmqSocket, 0, ZMQ_POLLIN, 0 } };
char buffer[MSG_SIZE] = { "s" };
memcpy(pZmqMsgOut->data(), buffer, MSG_SIZE);
try
{
if (pZmqSocket->send(*pZmqMsgOut))
printf("CLIENT: message sent\n");
}
catch (std::exception& e)
{
std::stringstream ss;
ss << "send fail : " << e.what();
printf("CLIENT: %s\n", ss.str().c_str());
}
int nRetries = 5;
bool bReceived = false;
while (1)
{
zmq::poll(&items[0], 1, 3 * 1000);
bReceived = false;
if (items[0].revents & ZMQ_POLLIN)
{
auto pZmqMsgIn = new zmq::message_t();
try
{
bReceived = pZmqSocket->recv(pZmqMsgIn);
}
catch (std::exception& e)
{
std::stringstream streamExceptionMsg;
streamExceptionMsg << "Exception caught during reply message socket.recv()" << " : " << e.what();
printf("CLIENT: %s\n", streamExceptionMsg.str().c_str());
}
if (bReceived)
{
printf("CLIENT: reply received\n");
}
delete pZmqMsgIn;
break;
}
else
{
--nRetries;
if (nRetries == 0)
{
std::stringstream ss;
ss << "Sent message but got no response";
printf("CLIENT: retry timeout\n");
break;
}
}
} // end while
pZmqSocket->disconnect("tcp://127.0.0.5:9995");
pZmqSocket->close();
delete pZmqMsgOut;
delete pZmqSocket;
}
}
int main()
{
zmq::context_t * pContext = new zmq::context_t(2);
std::thread t1 = std::thread(startServer, pContext);
std::thread t2 = std::thread(startClient, pContext);
if (SetConsoleCtrlHandler((PHANDLER_ROUTINE)CtrlHandler, TRUE))
{
while (1)
{
std::this_thread::sleep_for(std::chrono::milliseconds(1000));
}
}
return 0;
}
_______________________________________________
zeromq-dev mailing list
[email protected]
http://lists.zeromq.org/mailman/listinfo/zeromq-dev