Hi, I think you are setting the HWM to 0, making ZMQ queues of infinite size (limited by the RAM you have)... this may explain your issue? I mean: the queues get full and they start growing leading to 1) increased memory usage, 2) increasing latency...
HTH, Francesco 2017-03-11 13:07 GMT+01:00 LN <[email protected]>: > hi all ! > > I have written a program to imitate the pub/sub of the securities market > quotation data. and i found that when i subscribe a lot of topics, the > subscriber get data latency. when subscirbe 50 topics , some tick data slow > more than 100ms from the server, and i don't know why. Below is my code, and > i user the black box pattern. > > the problem is that: (Windows Server 2008, 8 cores, sizeof(TDF_MARKET_DATA) > = 300bytes) > A: when i subscribe one topic from console(s 0600008,), the latency from > server to client is 0 or 1 milliseconds > , it is ok!(log compare from client to server 0600008) > > B: when i subscribe 100 topics from console(like s 0600002, 0600004, > 0600006...), the latency sometimes 1ms and sometimes 10 ~ 60 milliseconds. > > C: when i subscribe 300 topics from console, the latency sometimes 1 ~ 20 > seconds!!! > And i found that the memory of client increase fast. > > and i don't know what's wrong with my program. > > Looking forward to your reply!! Thanks a lot! > > -------------------------Server(modify from > wuserver.cpp)--------------------------- > > // > // Weather update server in C++ > // Binds PUB socket to tcp://*:5556 > // Publishes random weather updates > // > // Olivier Chamoux <[email protected]> > // > > #include <stdio.h> > #include <stdlib.h> > #include <time.h> > #include <zmq.hpp> > > #include <boost/asio.hpp> > #include <boost/bind.hpp> > #include <boost/random.hpp> > #include <boost/thread.hpp> > #include <boost/lexical_cast.hpp> > #include <boost/tokenizer.hpp> > #include <boost/filesystem.hpp> > #include <boost/date_time/gregorian/gregorian.hpp> > #include <boost/date_time/posix_time/posix_time.hpp> > > #include <log4cpp/Category.hh> > #include <log4cpp/Appender.hh> > #include <log4cpp/Priority.hh> > #include <log4cpp/FileAppender.hh> > #include <log4cpp/PatternLayout.hh> > #include <log4cpp/OstreamAppender.hh> > > #include "../TDFPub/src/TDF/TDFAPIStruct.h" > > #if (defined (WIN32)) > //#include <zhelpers.hpp> > #endif > > #define within(num) (int) ((float) num * random () / (RAND_MAX + 1.0)) > > log4cpp::Category& glog = log4cpp::Category::getRoot(); > > int main () > { > boost::filesystem::path log_path("./log/"); > if (!boost::filesystem::exists(log_path)) > { > create_directory(log_path); > } > std::string local_day = > boost::gregorian::to_iso_string(boost::gregorian::day_clock::local_day()); > boost::filesystem::path path_local_day(""); > path_local_day += log_path; > path_local_day += local_day; > if (!boost::filesystem::exists(path_local_day)) > { > create_directory(path_local_day); > } > > log4cpp::PatternLayout* layout_c = new log4cpp::PatternLayout(); > layout_c->setConversionPattern("%d: %p %c %x: %m%n"); > log4cpp::Appender *console_appender = new > log4cpp::OstreamAppender("console", &std::cout); > console_appender->setLayout(layout_c); > > const std::string str_log(path_local_day.string() + "/wuserver.log"); > log4cpp::PatternLayout* layout_f = new log4cpp::PatternLayout(); > layout_f->setConversionPattern("%d: %p %c %x: %m%n"); > log4cpp::Appender* file_appender = new log4cpp::FileAppender("main", > str_log); > file_appender->setLayout(layout_f); > > glog.addAppender(file_appender); > glog.addAppender(console_appender); > glog.setPriority(log4cpp::Priority::DEBUG); > > // Prepare our context and publisher > zmq::context_t context (1); > zmq::socket_t publisher (context, ZMQ_PUB); > int hwm = 0; > publisher.setsockopt(ZMQ_SNDHWM, &hwm, sizeof(hwm)); > publisher.bind("tcp://*:5556"); > > boost::uniform_int<> distribution(1, 3000); > boost::mt19937 engine; > boost::variate_generator<boost::mt19937, boost::uniform_int<> > > myrandom(engine, distribution); > while (1) > { > int code; > char prefix[8] = {0}; > TDF_MARKET_DATA tick; > > zmq::message_t message(7 + sizeof(TDF_MARKET_DATA)); > code = myrandom(); > code % 2 == 0 ? sprintf(prefix, "0600%03d", code) : sprintf(prefix, > "1000%03d", code); > //sprintf(data.szCode, "%07d", prefix); > //memcpy(tick.szCode, prefix, 7); > strcpy(tick.szCode, prefix); > memcpy((char*)message.data(), prefix, 7); > memcpy((char*)message.data() + 7, &tick, sizeof(TDF_MARKET_DATA)); > > publisher.send(message); > > //if (strncmp(prefix, "1001271", 7) == 0) > if (strncmp(prefix, "0600008", 7) == 0) > { > glog.info("send %s", prefix); > } > > boost::this_thread::sleep(boost::posix_time::microseconds(1000));// 1ms > //Sleep(1); > } > return 0; > } > > > -------------------------Client(Black box > pattern)--------------------------- > // > // Weather update client in C++ > // Connects SUB socket to tcp://localhost:5556 > // Collects weather updates and finds avg temp in zipcode > // > // Olivier Chamoux <[email protected]> > // > #include <zmq.hpp> > #include <string> > #include <iostream> > #include <sstream> > #include <boost/thread.hpp> > #include <boost/function.hpp> > #include "zhelpers.hpp" > #include <stdio.h> > #include <stdlib.h> > #include <time.h> > #include "zhelpers.hpp" > #include <boost/asio.hpp> > #include <boost/bind.hpp> > #include <boost/thread.hpp> > #include <boost/lexical_cast.hpp> > #include <boost/tokenizer.hpp> > #include <boost/filesystem.hpp> > #include <boost/date_time/gregorian/gregorian.hpp> > #include <log4cpp/Category.hh> > #include <log4cpp/Appender.hh> > #include <log4cpp/Priority.hh> > #include <log4cpp/FileAppender.hh> > #include <log4cpp/PatternLayout.hh> > #include <log4cpp/OstreamAppender.hh> > #include "../TDFPub/src/TDF/TDFAPIStruct.h" > log4cpp::Category& glog = log4cpp::Category::getRoot(); > > class client_task > { > public: > client_task() > : context_(NULL) > { > context_ = zmq_ctx_new(); > } > void subscribe() > { > void* sender = zmq_socket(context_, ZMQ_PAIR); > int rc = zmq_connect(sender, "inproc://input"); > assert(rc == 0); > std::string msg; > while (getline(std::cin, msg) != "") > { > int size = zmq_send(sender, msg.c_str(), msg.length(), 0); > assert(size != 0); > } > } > void poll() > { > void* receiver = zmq_socket(context_, ZMQ_PAIR); > int rc = zmq_bind(receiver, "inproc://input"); > assert(rc == 0); > void* subscriber = zmq_socket(context_, ZMQ_SUB); > int hwm = 0; > rc = zmq_setsockopt(subscriber, ZMQ_RCVHWM, &hwm, sizeof(hwm)); > assert(rc == 0); > rc = zmq_setsockopt(subscriber, ZMQ_SNDHWM, &hwm, sizeof(hwm)); > assert(rc == 0); > rc = zmq_connect(subscriber, "tcp://localhost:5556"); > void* task_sender = zmq_socket(context_, ZMQ_PUSH); > rc = zmq_bind(task_sender, "inproc://task_send"); > zmq_pollitem_t items [] = { > {receiver, 0, ZMQ_POLLIN, 0}, > {subscriber, 0, ZMQ_POLLIN, 0} > }; > while (true) > { > zmq_poll(&items[0], 2, -1); > if (items[0].revents & ZMQ_POLLIN) > { > zmq_msg_t msg; > zmq_msg_init(&msg); > zmq_recvmsg(receiver, &msg, 0); > char cmd = (static_cast<char*>(zmq_msg_data(&msg)))[0]; > std::string list(static_cast<char*>(zmq_msg_data(&msg)) + 2, > zmq_msg_size(&msg) - 2); > std::vector<std::string> vec_stocks; > const char* sep = ","; > const char* begin = list.c_str(); > char* p = NULL; > p = strtok((char*)begin, sep); > if (p) > { > vec_stocks.push_back(p); > } > else > { > continue; > } > while ((p = strtok(NULL, sep)) != NULL) > { > vec_stocks.push_back(p); > } > if (vec_stocks.size() == 0) > { > continue; > } > if (cmd == 's') > { > for (std::vector<std::string>::const_iterator it = vec_stocks.begin(); > it != vec_stocks.end(); it++) > { > rc = zmq_setsockopt(subscriber, ZMQ_SUBSCRIBE, it->c_str(), > it->length()); > assert(rc == 0); > } > } > else if (cmd == 'u') > { > for (std::vector<std::string>::const_iterator it = vec_stocks.begin(); > it != vec_stocks.end(); it++) > { > rc = zmq_setsockopt(subscriber, ZMQ_UNSUBSCRIBE, it->c_str(), > it->length()); > assert(rc == 0); > } > } > zmq_msg_close(&msg); > } > if (items[1].revents & ZMQ_POLLIN) > { > zmq_msg_t msg; > zmq_msg_init(&msg); > zmq_recvmsg(subscriber, &msg, 0); > /*if (memcmp(static_cast<char*>(zmq_msg_data(&msg)), "0600008", 7) == 0) > { > glog.info("recv"); > }*/ > > //zmq_send(task_sender, &msg, zmq_msg_size(&msg), 0); > zmq_sendmsg(task_sender, &msg, 0); > zmq_msg_close(&msg); > } > } > } > void task_handle() > { > void* receiver = zmq_socket(context_, ZMQ_PULL); > int rc = zmq_connect(receiver, "inproc://task_send"); > assert(rc == 0); > while (true) > { > zmq_msg_t msg; > zmq_msg_init(&msg); > zmq_recvmsg(receiver, &msg, 0); > > char prefix[8] = {0}; > memcpy(prefix, static_cast<char*>(zmq_msg_data(&msg)), 7); > if (strncmp(prefix, "0600008", 7) == 0) > { > TDF_MARKET_DATA dst; > memset(&dst, 0, sizeof(dst)); > memcpy(&dst, static_cast<char*>(zmq_msg_data(&msg)) + 7, sizeof(dst)); > glog.info("[%d] recv %s, szCode = %s", > boost::this_thread::get_id(), prefix, dst.szCode); > } > zmq_msg_close(&msg); > } > } > > private: > void* context_; > }; > int main() > { > boost::filesystem::path log_path("./log/"); > if (!boost::filesystem::exists(log_path)) > { > create_directory(log_path); > } > std::string local_day = > boost::gregorian::to_iso_string(boost::gregorian::day_clock::local_day()); > boost::filesystem::path path_local_day(""); > path_local_day += log_path; > path_local_day += local_day; > if (!boost::filesystem::exists(path_local_day)) > { > create_directory(path_local_day); > } > log4cpp::PatternLayout* layout_c = new log4cpp::PatternLayout(); > layout_c->setConversionPattern("%d: %p %c %x: %m%n"); > log4cpp::Appender *console_appender = new > log4cpp::OstreamAppender("console", &std::cout); > console_appender->setLayout(layout_c); > const std::string str_log(path_local_day.string() + "/wuclient.log"); > log4cpp::PatternLayout* layout_f = new log4cpp::PatternLayout(); > layout_f->setConversionPattern("%d: %p %c %x: %m%n"); > log4cpp::Appender* file_appender = new log4cpp::FileAppender("main", > str_log); > file_appender->setLayout(layout_f); > glog.addAppender(file_appender); > glog.addAppender(console_appender); > glog.setPriority(log4cpp::Priority::DEBUG); > client_task ct; > for (int i = 0; i < 2; i++) > { > boost::thread t_handle(boost::bind(&client_task::task_handle, &ct)); > } > Sleep(2000); > boost::thread t_poll(boost::bind(&client_task::poll, &ct)); > Sleep(500); > boost::thread t_subscribe(boost::bind(&client_task::subscribe, &ct)); > t_poll.join(); > t_subscribe.join(); > return 0; > } > > _______________________________________________ > zeromq-dev mailing list > [email protected] > https://lists.zeromq.org/mailman/listinfo/zeromq-dev _______________________________________________ zeromq-dev mailing list [email protected] https://lists.zeromq.org/mailman/listinfo/zeromq-dev
