yeah, I've seen that pattern before, I was just curious if there is a way to async send the messages in batches to a load balancer, then wait till all the replies come back or something.
cheers, joe On Wed, Oct 5, 2016 at 12:21 PM, Andrew Hume <[email protected]> wrote: > joe, > > i couldn’t see all your code, but the simple answer is you have an > overseer process(or task) which > has a bunch of work to do, and waits for workers to ask for work. > > each worker does a REQ/REP to teh overseer asking for a (few?) pieces of > work; the reply contains the work. > when the worker is done, it asks for more. > > in this way, each worker spends most of its time working, and the overseer > spends most of > its time waiting for someone to request work. > > is that what you wanted? > > andrew > > On Oct 5, 2016, at 8:06 AM, joe meiring <[email protected]> wrote: > > Suppose I have one master process that divides up data to be processed in > parallel. Lets say there are 1000 chunks of data and 100 nodes on which to > run the computations. Say something simple, like an array of numbers and we > want to square them. > > Is there some way to do REQ/REP to keep all the workers busy? I've tried > to use the load balancer pattern in the guide but with a single client, > sock.recv() is going to block until it receives its response from the > worker. > > Is there a different pattern to use in these kind of situations? > > Here is the code, slightly modified from the zmq guide for a load > balancer. Is starts up one client, 10 workers, and a load balancer/broker > in the middle. How can I get all those workers working at the same time??? > > from __future__ import print_functionfrom multiprocessing import > Processimport zmqimport timeimport uuidimport random > def client_task(): > """Basic request-reply client using REQ socket.""" > socket = zmq.Context().socket(zmq.REQ) > socket.identity = str(uuid.uuid4()) > socket.connect("ipc://frontend.ipc") > # Send request, get reply > for i in range(100): > print("SENDING: ", i) > socket.send('WORK') > msg = socket.recv() > print(msg) > def worker_task(): > """Worker task, using a REQ socket to do load-balancing.""" > socket = zmq.Context().socket(zmq.REQ) > socket.identity = str(uuid.uuid4()) > socket.connect("ipc://backend.ipc") > # Tell broker we're ready for work > socket.send(b"READY") > while True: > address, empty, request = socket.recv_multipart() > time.sleep(random.randint(1, 4)) > socket.send_multipart([address, b"", b"OK : " + str(socket.identity)]) > > def broker(): > context = zmq.Context() > frontend = context.socket(zmq.ROUTER) > frontend.bind("ipc://frontend.ipc") > backend = context.socket(zmq.ROUTER) > backend.bind("ipc://backend.ipc") > # Initialize main loop state > workers = [] > poller = zmq.Poller() > # Only poll for requests from backend until workers are available > poller.register(backend, zmq.POLLIN) > > while True: > sockets = dict(poller.poll()) > if backend in sockets: > # Handle worker activity on the backend > request = backend.recv_multipart() > worker, empty, client = request[:3] > if not workers: > # Poll for clients now that a worker is available > poller.register(frontend, zmq.POLLIN) > workers.append(worker) > if client != b"READY" and len(request) > 3: > # If client reply, send rest back to frontend > empty, reply = request[3:] > frontend.send_multipart([client, b"", reply]) > > if frontend in sockets: > # Get next client request, route to last-used worker > client, empty, request = frontend.recv_multipart() > worker = workers.pop(0) > backend.send_multipart([worker, b"", client, b"", request]) > if not workers: > # Don't poll clients if no workers are available > poller.unregister(frontend) > > # Clean up > backend.close() > frontend.close() > context.term() > def main(): > NUM_CLIENTS = 1 > NUM_WORKERS = 10 > # Start background tasks > def start(task, *args): > process = Process(target=task, args=args) > process.start() > start(broker) > > for i in range(NUM_CLIENTS): > start(client_task) > > for i in range(NUM_WORKERS): > start(worker_task) > > > # Process(target=broker).start() > > > > if __name__ == "__main__": > main() > > _______________________________________________ > zeromq-dev mailing list > [email protected] > http://lists.zeromq.org/mailman/listinfo/zeromq-dev > > > > _______________________________________________ > zeromq-dev mailing list > [email protected] > http://lists.zeromq.org/mailman/listinfo/zeromq-dev >
_______________________________________________ zeromq-dev mailing list [email protected] http://lists.zeromq.org/mailman/listinfo/zeromq-dev
