Hi again,

On 25/07/12 03:26, Ming Ji wrote:
Could you share your interface, or C++ API patch? I can probably do a
test run before you release it.

I haven't written the C++ API patch yet, so that'll have to wait for the weekend or so, when I have a minute. I've attached two files that contain my so called "interface".

Please note that this is a first try / prototype, so it will need to be modified quite a bit (very few socket options are supported) in order to respect everything ZeroQ has to offer, and to actually be usable in a generic way. I'm only using the minimum needed for my project, and will add to it when I reach the need for more features. This interface also assumes the use of Google's protobufs; but it would be easy to adapt to any other kind of serialisation library.

The way I use this in my project is as follows, components that "offer" a service, implement their "own socket":

class InstanceManagerSocket {
    private:
    firestarter::sockets::ZMQPublisherSocket publisher;
    firestarter::sockets::ZMQResponseSocket responder;

    public:
    InstanceManagerSocket(zmq::context_t & context) :
            publisher(context, MODULE_ORDERS_SOCKET_URI),
            responder(context, MANAGER_SOCKET_URI) { };
    inline bool send(google::protobuf::Message const & pb_message) {
        return this->publisher.send(pb_message);
    };
    inline bool reply(google::protobuf::Message const & pb_message) {
        return this->responder.send(pb_message);
    };
    inline bool receive(google::protobuf::Message & pb_message,
            bool blocking = false) {
        if (this->responder.receive(pb_message, blocking))
            return this->ack();
        return false;
     };
    inline bool ack() { return this->responder.send(); };
};

Clients who then want to use this service use a client class, for example (usually provided by the service):

class InstanceManagerClientSocket {
    private:
    firestarter::sockets::ZMQSubscriberSocket subscriber;
    firestarter::sockets::ZMQRequestSocket requester;

    public:
    InstanceManagerClientSocket(zmq::context_t & context) :
            subscriber(context, MODULE_ORDERS_SOCKET_URI),
            requester(context, MANAGER_SOCKET_URI) { };
    inline bool send(google::protobuf::Message const & pb_message) {
        if (this->requester.send(pb_message))
            return this->receive_ack();
        return false;
    };
    inline bool receive(google::protobuf::Message & pb_message,
            bool blocking = false) {
        return this->subscriber.receive(pb_message, blocking);
    };
    inline bool receive_ack() { return this->requester.receive(true); };
};


Again, please do take into account that this is in the very early stages of prototyping, and will probably be modified a lot before getting my own stamp of approval. What I'm aiming for, in the end, is to fully abstract the use of Protobufs for serialisation from the perspective of any one using the, for example, "InstanceManager sockets".

Regarding the actual ZeroMQ part, I'll probably change the design of the classes, most probably through the use of templates, in order to remove the small artifacts that exist today (connect() or bind() end up in the wrong sockets, due to the use of virtual multiple inheritance, which I'm not happy with).

@Everyone: I welcome any comments regarding my current use of ZeroMQ---my understanding of it is basic to say the least, which is why I haven't attacked more consuming bits like devices and such.

-S.
#ifndef __ZMQHELPER_H
#define __ZMQHELPER_H

#include "zmq/zmq.hpp"

#define MANAGER_SOCKET_URI "inproc://fs.modules.manager"
#define MODULE_ORDERS_SOCKET_URI "inproc://fs.module.orders"

#endif
#include "zmq/zmqsocket.h"

namespace firestarter { namespace sockets {
	DECLARE_LOG(logger, "firestarter.sockets");
} }

using namespace firestarter::sockets;

bool ZMQSendingSocket::send() {
	LOG_INFO(logger, "Sending empty message on socket (" << &(this->socket) << ").");
	zmq::message_t message(0);
	bool sent = socket->send(message, 0);
	if (sent) {
		LOG_DEBUG(logger, "Empty message sent succesfully.");
	}
	else {
		LOG_ERROR(logger, "Empty message couldn't be sent.");
	}
	return sent;
}

bool ZMQSendingSocket::send(google::protobuf::Message const & pb_message, bool send_more) {
	LOG_INFO(logger, "Sending message (" << pb_message.GetTypeName() << ") on socket (" << &(this->socket) << ").");

	LOG_DEBUG(logger, "Serialising message.");
	std::string pb_serialised;
	pb_message.SerializeToString(&pb_serialised);

	LOG_DEBUG(logger, "Copying message data to socket (" << &(this->socket) << ").");
	zmq::message_t message(pb_serialised.size());
	memcpy(static_cast<void *>(message.data()), pb_serialised.c_str(), pb_serialised.size());

	int flags = 0;
	if (send_more) {
		LOG_DEBUG(logger, "Added ZMQ_SNDMORE flag to socket.");
		flags = ZMQ_SNDMORE;
	}

	else {
		LOG_DEBUG(logger, "No flags to add.");
	}

	LOG_DEBUG(logger, "Sending message.");
	return this->socket->send(message, flags);
}

bool ZMQReceivingSocket::receive(bool blocking) {
	LOG_INFO(logger, "Listening for an empty message on socket (" << &(this->socket) << ").");
	zmq::message_t message;

	int flags = 0;
	if (not blocking) {
		LOG_DEBUG(logger, "Added ZMQ_DONTWAIT flag to socket.");
		flags = ZMQ_DONTWAIT;
	}

	else {
		LOG_DEBUG(logger, "No flags to add.");
	}

	return this->socket->recv(&message, flags);
}

bool ZMQReceivingSocket::receive(google::protobuf::Message & pb_message, bool blocking) {
	LOG_INFO(logger, "Listening for a " << pb_message.GetTypeName() << " on socket (" << &(this->socket) << ").");
	zmq::message_t message;

	int flags = 0;
	if (not blocking) {
		LOG_DEBUG(logger, "Added ZMQ_DONTWAIT flag to socket.");
		flags = ZMQ_DONTWAIT;
	}

	else {
		LOG_DEBUG(logger, "No flags to add.");
	}

	if (this->socket->recv(&message, flags))
		if (pb_message.ParseFromArray(message.data(), message.size()))
			return pb_message.IsInitialized();

	return false;
}

_______________________________________________
zeromq-dev mailing list
[email protected]
http://lists.zeromq.org/mailman/listinfo/zeromq-dev

Reply via email to