Hi Luca,
I did add a new command to the ZMQ steerable proxy and it appears to
be quite self-contained. However that's my first patch to ZMQ so
please have a look and let me know if I should change something.
For now no changes to the documentation are attached: first I would
like to know if the way I coded the patch is fine or not, if the
naming is ok ("STATISTICS" command is ok?) etc
To keep track of this I opened a github issue:
https://github.com/zeromq/libzmq/issues/2736
Anyway I'm attaching the patch file also to this mail...
Thanks!
Francesco
2017-09-01 20:35 GMT+02:00 Luca Boccassi <[email protected]>:
> On Fri, 2017-09-01 at 20:18 +0200, Francesco wrote:
>> 2017-09-01 19:58 GMT+02:00 Luca Boccassi <[email protected]>:
>> > The third parameter to zmq_proxy: http://api.zeromq.org/4-2:zmq-pro
>> > xy
>> >
>> > If you pass a socket, all messages will be duplicated and sent to
>> > it.
>> > Then you can do all the measurements you need.
>> > Note that they are shallow refcounted copies, so only the small
>> > metadata is actually copied, not the payloads, so it's reasonably
>> > fast.
>>
>> Ok thanks, I can try that!
>> Honestly however I think it would be nice to have an
>> easier/more-optimized way to retrieve such kind of informations...
>> maybe a message that is sent over a steerable proxy control socket
>> (of
>> type REQ/REP maybe) or a socket option to retrieve via
>> zmq_get_socksockopt()
>>
>> Thanks!
>>
>> Francesco
>
> A new steerable message sounds like a good idea, feel free to send a
> PR!
>
> Note that it probably should have an ON/OFF capability, as to avoid
> performance penalties for users that are not interested in the feature.
>
> --
> Kind regards,
> Luca Boccassi
>
> _______________________________________________
> zeromq-dev mailing list
> [email protected]
> https://lists.zeromq.org/mailman/listinfo/zeromq-dev
diff --git a/src/proxy.cpp b/src/proxy.cpp
index aca80e1..600e72e 100644
--- a/src/proxy.cpp
+++ b/src/proxy.cpp
@@ -82,6 +82,21 @@
#endif // ZMQ_HAVE_POLLER
+
+// Control socket messages
+
+typedef struct
+{
+ uint64_t pkts_in;
+ uint64_t bytes_in;
+ uint64_t pkts_out;
+ uint64_t bytes_out;
+} zmq_socket_stats_t;
+
+
+
+// Utility functions
+
int capture (
class zmq::socket_base_t *capture_,
zmq::msg_t& msg_,
@@ -104,18 +119,21 @@ int capture (
}
int forward (
- class zmq::socket_base_t *from_,
- class zmq::socket_base_t *to_,
+ class zmq::socket_base_t *from_, zmq_socket_stats_t* from_stats,
+ class zmq::socket_base_t *to_, zmq_socket_stats_t* to_stats,
class zmq::socket_base_t *capture_,
zmq::msg_t& msg_)
{
int more;
size_t moresz;
+ size_t complete_msg_size = 0;
while (true) {
int rc = from_->recv (&msg_, 0);
if (unlikely (rc < 0))
return -1;
+ complete_msg_size += msg_.size();
+
moresz = sizeof more;
rc = from_->getsockopt (ZMQ_RCVMORE, &more, &moresz);
if (unlikely (rc < 0))
@@ -129,12 +147,45 @@ int forward (
rc = to_->send (&msg_, more ? ZMQ_SNDMORE : 0);
if (unlikely (rc < 0))
return -1;
+
if (more == 0)
break;
}
+
+ // A multipart message counts as 1 packet:
+ from_stats->pkts_in++;
+ from_stats->bytes_in+=complete_msg_size;
+ to_stats->pkts_out++;
+ to_stats->bytes_out+=complete_msg_size;
+
return 0;
}
+int reply_stats(
+ class zmq::socket_base_t *control_,
+ zmq_socket_stats_t* frontend_stats,
+ zmq_socket_stats_t* backend_stats)
+{
+ zmq::msg_t stats_msg;
+ int rc = stats_msg.init_size( sizeof(zmq_socket_stats_t)*2 );
+ if (unlikely (rc < 0)) {
+ return close_and_return (&stats_msg, -1);
+ }
+
+ uint8_t* pdata = (uint8_t*)stats_msg.data();
+ memcpy( pdata + 0, (const void*) frontend_stats, sizeof(zmq_socket_stats_t) );
+ memcpy( pdata + sizeof(zmq_socket_stats_t), (const void*) backend_stats, sizeof(zmq_socket_stats_t) );
+
+ rc = control_->send (&stats_msg, 0);
+ if (unlikely (rc < 0)) {
+ return close_and_return (&stats_msg, -1);
+ }
+
+ rc = stats_msg.close();
+ return rc;
+}
+
+
#ifdef ZMQ_HAVE_POLLER
int zmq::proxy (
@@ -168,6 +219,10 @@ int zmq::proxy (
bool backend_out = false;
bool control_in = false;
zmq::socket_poller_t::event_t events [3];
+ zmq_socket_stats_t frontend_stats;
+ zmq_socket_stats_t backend_stats;
+ memset(&frontend_stats, 0, sizeof(frontend_stats));
+ memset(&backend_stats, 0, sizeof(backend_stats));
// Don't allocate these pollers from stack because they will take more than 900 kB of stack!
// On Windows this blows up default stack of 1 MB and aborts the program.
@@ -323,9 +378,16 @@ int zmq::proxy (
if (msg.size () == 9 && memcmp (msg.data (), "TERMINATE", 9) == 0)
state = terminated;
else {
- // This is an API error, we assert
- puts ("E: invalid command sent to proxy");
- zmq_assert (false);
+ if (msg.size () == 10 && memcmp (msg.data (), "STATISTICS", 10) == 0)
+ {
+ rc = reply_stats(control_, &frontend_stats, &backend_stats);
+ CHECK_RC_EXIT_ON_FAILURE ();
+ }
+ else {
+ // This is an API error, we assert
+ puts ("E: invalid command sent to proxy");
+ zmq_assert (false);
+ }
}
}
control_in = false;
@@ -336,7 +398,7 @@ int zmq::proxy (
// Process a request, 'ZMQ_POLLIN' on 'frontend_' and 'ZMQ_POLLOUT' on 'backend_'.
// In case of frontend_==backend_ there's no 'ZMQ_POLLOUT' event.
if (frontend_in && (backend_out || frontend_equal_to_backend)) {
- rc = forward (frontend_, backend_, capture_, msg);
+ rc = forward (frontend_, &frontend_stats, backend_, &backend_stats, capture_, msg);
CHECK_RC_EXIT_ON_FAILURE ();
request_processed = true;
frontend_in = backend_out = false;
@@ -347,7 +409,7 @@ int zmq::proxy (
// covers all of the cases. 'backend_in' is always false if frontend_==backend_ due to
// design in 'for' event processing loop.
if (backend_in && frontend_out) {
- rc = forward (backend_, frontend_, capture_, msg);
+ rc = forward (backend_, &backend_stats, frontend_, &frontend_stats, capture_, msg);
CHECK_RC_EXIT_ON_FAILURE ();
reply_processed = true;
backend_in = frontend_out = false;
@@ -443,6 +505,11 @@ int zmq::proxy (
{ backend_, 0, ZMQ_POLLOUT, 0 }
};
+ zmq_socket_stats_t frontend_stats;
+ memset(&frontend_stats, 0, sizeof(frontend_stats));
+ zmq_socket_stats_t backend_stats;
+ memset(&backend_stats, 0, sizeof(backend_stats));
+
// Proxy can be in these three states
enum {
active,
@@ -491,16 +558,24 @@ int zmq::proxy (
if (msg.size () == 9 && memcmp (msg.data (), "TERMINATE", 9) == 0)
state = terminated;
else {
- // This is an API error, so we assert
- puts ("E: invalid command sent to proxy");
- zmq_assert (false);
+ if (msg.size () == 10 && memcmp (msg.data (), "STATISTICS", 10) == 0)
+ {
+ rc = reply_stats(control_, &frontend_stats, &backend_stats);
+ if (unlikely (rc < 0))
+ return -1;
+ }
+ else {
+ // This is an API error, we assert
+ puts ("E: invalid command sent to proxy");
+ zmq_assert (false);
+ }
}
}
// Process a request
if (state == active
&& items [0].revents & ZMQ_POLLIN
&& (frontend_ == backend_ || itemsout [1].revents & ZMQ_POLLOUT)) {
- rc = forward (frontend_, backend_, capture_, msg);
+ rc = forward (frontend_, &frontend_stats, backend_, &backend_stats, capture_, msg);
if (unlikely (rc < 0))
return close_and_return (&msg, -1);
}
@@ -509,7 +584,7 @@ int zmq::proxy (
&& frontend_ != backend_
&& items [1].revents & ZMQ_POLLIN
&& itemsout [0].revents & ZMQ_POLLOUT) {
- rc = forward (backend_, frontend_, capture_, msg);
+ rc = forward (backend_, &backend_stats, frontend_, &frontend_stats, capture_, msg);
if (unlikely (rc < 0))
return close_and_return (&msg, -1);
}
diff --git a/tests/test_proxy.cpp b/tests/test_proxy.cpp
index f00f502..8780c3f 100644
--- a/tests/test_proxy.cpp
+++ b/tests/test_proxy.cpp
@@ -48,13 +48,31 @@
#define ID_SIZE_MAX 32
#define QT_WORKERS 5
#define QT_CLIENTS 3
-#define is_verbose 0
+#define is_verbose 1
struct thread_data {
void *ctx;
int id;
};
+typedef struct
+{
+ uint64_t pkts_in;
+ uint64_t bytes_in;
+ uint64_t pkts_out;
+ uint64_t bytes_out;
+} zmq_socket_stats_t;
+
+typedef struct
+{
+ zmq_socket_stats_t frontend;
+ zmq_socket_stats_t backend;
+} zmq_proxy_stats_t;
+
+void *g_clients_pkts_out = NULL;
+void *g_workers_pkts_out = NULL;
+
+
static void
client_task (void *db)
{
@@ -100,6 +118,7 @@ client_task (void *db)
zmq_pollitem_t items [] = { { client, 0, ZMQ_POLLIN, 0 }, { control, 0, ZMQ_POLLIN, 0 } };
int request_nbr = 0;
bool run = true;
+ bool keep_sending = true;
while (run) {
// Tick once per 200 ms, pulling in arriving messages
int centitick;
@@ -119,16 +138,32 @@ client_task (void *db)
}
if (items [1].revents & ZMQ_POLLIN) {
rc = zmq_recv (control, content, CONTENT_SIZE_MAX, 0);
- if (is_verbose) printf("client receive - identity = %s command = %s\n", identity, content);
- if (memcmp (content, "TERMINATE", 9) == 0) {
- run = false;
- break;
+
+ if (rc > 0)
+ {
+ content[rc] = 0; // NULL-terminate the command string
+ if (is_verbose) printf("client receive - identity = %s command = %s\n", identity, content);
+ if (memcmp (content, "TERMINATE", 9) == 0) {
+ run = false;
+ break;
+ }
+ if (memcmp (content, "STOP", 4) == 0) {
+ keep_sending = false;
+ break;
+ }
}
}
}
- sprintf(content, "request #%03d", ++request_nbr); // CONTENT_SIZE
- rc = zmq_send (client, content, CONTENT_SIZE, 0);
- assert (rc == CONTENT_SIZE);
+
+ if (keep_sending)
+ {
+ sprintf(content, "request #%03d", ++request_nbr); // CONTENT_SIZE
+ if (is_verbose) printf("client send - identity = %s request #%03d\n", identity, request_nbr);
+ zmq_atomic_counter_inc(g_clients_pkts_out);
+
+ rc = zmq_send (client, content, CONTENT_SIZE, 0);
+ assert (rc == CONTENT_SIZE);
+ }
}
rc = zmq_close (client);
@@ -173,13 +208,13 @@ server_task (void *ctx)
assert (rc == 0);
// Control socket receives terminate command from main over inproc
- void *control = zmq_socket (ctx, ZMQ_SUB);
+ void *control = zmq_socket (ctx, ZMQ_REP);
assert (control);
- rc = zmq_setsockopt (control, ZMQ_SUBSCRIBE, "", 0);
- assert (rc == 0);
+ /*rc = zmq_setsockopt (control, ZMQ_SUBSCRIBE, "", 0);
+ assert (rc == 0);*/
rc = zmq_setsockopt (control, ZMQ_LINGER, &linger, sizeof (linger));
assert (rc == 0);
- rc = zmq_connect (control, "inproc://control");
+ rc = zmq_connect (control, "inproc://control_proxy");
assert (rc == 0);
// Launch pool of worker threads, precise number is not critical
@@ -255,13 +290,17 @@ server_worker (void *ctx)
char identity [ID_SIZE_MAX]; // the size received is the size sent
bool run = true;
+ bool keep_sending = true;
while (run) {
rc = zmq_recv (control, content, CONTENT_SIZE_MAX, ZMQ_DONTWAIT); // usually, rc == -1 (no message)
if (rc > 0) {
+ content[rc] = 0; // NULL-terminate the command string
if (is_verbose)
printf("server_worker receives command = %s\n", content);
if (memcmp (content, "TERMINATE", 9) == 0)
run = false;
+ if (memcmp (content, "STOP", 4) == 0)
+ keep_sending = false;
}
// The DEALER socket gives us the reply envelope and message
// if we don't poll, we have to use ZMQ_DONTWAIT, if we poll, we can block-receive with 0
@@ -273,15 +312,22 @@ server_worker (void *ctx)
printf ("server receive - identity = %s content = %s\n", identity, content);
// Send 0..4 replies back
- int reply, replies = rand() % 5;
- for (reply = 0; reply < replies; reply++) {
- // Sleep for some fraction of a second
- msleep (rand () % 10 + 1);
- // Send message from server to client
- rc = zmq_send (worker, identity, ID_SIZE, ZMQ_SNDMORE);
- assert (rc == ID_SIZE);
- rc = zmq_send (worker, content, CONTENT_SIZE, 0);
- assert (rc == CONTENT_SIZE);
+ if (keep_sending)
+ {
+ int reply, replies = rand() % 5;
+ for (reply = 0; reply < replies; reply++) {
+ // Sleep for some fraction of a second
+ msleep (rand () % 10 + 1);
+
+ // Send message from server to client
+ if (is_verbose) printf("server send - identity = %s reply\n", identity);
+ zmq_atomic_counter_inc(g_workers_pkts_out);
+
+ rc = zmq_send (worker, identity, ID_SIZE, ZMQ_SNDMORE);
+ assert (rc == ID_SIZE);
+ rc = zmq_send (worker, content, CONTENT_SIZE, 0);
+ assert (rc == CONTENT_SIZE);
+ }
}
}
}
@@ -300,6 +346,11 @@ int main (void)
void *ctx = zmq_ctx_new ();
assert (ctx);
+
+ g_clients_pkts_out = zmq_atomic_counter_new ();
+ g_workers_pkts_out = zmq_atomic_counter_new ();
+
+
// Control socket receives terminate command from main over inproc
void *control = zmq_socket (ctx, ZMQ_PUB);
assert (control);
@@ -309,6 +360,14 @@ int main (void)
rc = zmq_bind (control, "inproc://control");
assert (rc == 0);
+ // Control socket receives terminate command from main over inproc
+ void *control_proxy = zmq_socket (ctx, ZMQ_REQ);
+ assert (control_proxy);
+ rc = zmq_setsockopt (control_proxy, ZMQ_LINGER, &linger, sizeof (linger));
+ assert (rc == 0);
+ rc = zmq_bind (control_proxy, "inproc://control_proxy");
+ assert (rc == 0);
+
void *threads [QT_CLIENTS + 1];
struct thread_data databags [QT_CLIENTS + 1];
for (int i = 0; i < QT_CLIENTS; i++) {
@@ -319,11 +378,64 @@ int main (void)
threads[QT_CLIENTS] = zmq_threadstart (&server_task, ctx);
msleep (500); // Run for 500 ms then quit
+
+ if (is_verbose)
+ printf ("stopping all clients and server workers\n");
+ rc = zmq_send (control, "STOP", 4, 0);
+ assert (rc == 4);
+
+ msleep(500); // Wait for all clients and workers to STOP
+
+
+ if (is_verbose)
+ printf ("retrieving stats from the proxy\n");
+
+ rc = zmq_send (control_proxy, "STATISTICS", 10, 0);
+ assert (rc == 10);
+
+ zmq_msg_t stats_msg;
+ rc = zmq_msg_init (&stats_msg);
+ assert (rc == 0);
+
+ rc = zmq_recvmsg (control_proxy, &stats_msg, 0);
+ assert (rc == sizeof(zmq_proxy_stats_t));
+
+ zmq_proxy_stats_t* stats = (zmq_proxy_stats_t*)zmq_msg_data(&stats_msg);
+ if (is_verbose)
+ {
+ printf ("frontend: pkts_in=%lu bytes_in=%lu pkts_out=%lu bytes_out=%lu\n",
+ stats->frontend.pkts_in, stats->frontend.bytes_in,
+ stats->frontend.pkts_out, stats->frontend.bytes_out);
+ printf ("backend: pkts_in=%lu bytes_in=%lu pkts_out=%lu bytes_out=%lu\n",
+ stats->backend.pkts_in, stats->backend.bytes_in,
+ stats->backend.pkts_out, stats->backend.bytes_out);
+
+ printf ("clients sent out %d requests\n", zmq_atomic_counter_value(g_clients_pkts_out));
+ printf ("workers sent out %d replies\n", zmq_atomic_counter_value(g_workers_pkts_out));
+ }
+ assert( stats->frontend.pkts_in == (unsigned)zmq_atomic_counter_value(g_clients_pkts_out) );
+ assert( stats->frontend.pkts_out == (unsigned)zmq_atomic_counter_value(g_workers_pkts_out) );
+ assert( stats->backend.pkts_in == (unsigned)zmq_atomic_counter_value(g_workers_pkts_out) );
+ assert( stats->backend.pkts_out == (unsigned)zmq_atomic_counter_value(g_clients_pkts_out) );
+
+ rc = zmq_msg_close (&stats_msg);
+ assert (rc == 0);
+
+ if (is_verbose)
+ printf ("shutting down all clients and server workers\n");
rc = zmq_send (control, "TERMINATE", 9, 0);
assert (rc == 9);
+ if (is_verbose)
+ printf ("shutting down the proxy now\n");
+ rc = zmq_send (control_proxy, "TERMINATE", 9, 0);
+ assert (rc == 9);
+
+
rc = zmq_close (control);
assert (rc == 0);
+ rc = zmq_close (control_proxy);
+ assert (rc == 0);
for (int i = 0; i < QT_CLIENTS + 1; i++)
zmq_threadclose (threads[i]);
_______________________________________________
zeromq-dev mailing list
[email protected]
https://lists.zeromq.org/mailman/listinfo/zeromq-dev