I could not find any guide or managed to get any of the examples working. 
The only example I got to work was 
machinekit/src/machinetalk/tutorial/zeroconf/resolve.py but didn't 
understand what was going on - nor was it clear if I have to understand 
what's happening or not.

Attached are two simple examples I came up with - if there are better ways 
of doing it please let me know. Both examples use the packages python-zmq 
and python-zeroconf which are available on all major distributions and for 
python2 and python3. The reason I used zeroconf instead of the avahi/dbus 
approach of resolve.py is because it's a lot simpler - at least from my 
perspective.

mk-hello-world.py scans for all machinekit services and prints each one of 
them to the screen. Once no more services are discovered the process is 
terminated by pressing Ctrl+C.

I consider this the hello-world for Machinetalk because it is the first 
step to connecting to machinetalk. Because Machinetalk uses random ports 
one must first find a service's endpoint before we can connect to it..

The simple services (from a code effort to usefulness standpoint) are the 
publish/subscriber services. mk-hello-service.py subsribes to the status 
and error service. Each published service has a list of topics one can 
subscribe to (I could not find any documentation about the details). One 
must subscribe to a topic in order to trigger the initial full update 
message. An empty string can be used as the topic which serves as a 
wildcard for all topics - although in this case no initial full update is 
triggered, but you'll get all subsequent incremental updates.

It turns out to be important that the reverse lookups are setup correctly, 
otherwise there are significant delays in establishing a connection.


-- 
website: http://www.machinekit.io blog: http://blog.machinekit.io github: 
https://github.com/machinekit
--- 
You received this message because you are subscribed to the Google Groups 
"Machinekit" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to [email protected].
Visit this group at https://groups.google.com/group/machinekit.
To view this discussion on the web visit 
https://groups.google.com/d/msgid/machinekit/ac2c20ee-3a8e-48a3-a9e4-b83dfedded24%40googlegroups.com.
For more options, visit https://groups.google.com/d/optout.
#!/usr/bin/python
#
# Simple example to discover all Machinetalk services and their end points.
import sys
import time
import zeroconf
import zmq

class MKServiceMonitor(object):

    def __init__(self, zc):
        self.browser = zeroconf.ServiceBrowser(zc, "_machinekit._tcp.local.", self)

    def remove_service(self, zc, typ, name):
        print("%s terminated" % name)

    def add_service(self, zc, typ, name):
        info = zc.get_service_info(typ, name)
        if info.properties.get(b'service'):
            sname = info.properties[b'service'].decode()
            dsn   = info.properties[b'dsn'].decode()
            sys.stdout.write("\n%-13s %s " % (sname, dsn))
        else:
            sys.stdout.write("\n%s: '%s'" % (name, info))
        sys.stdout.flush()

    def stop(self):
        self.browser.done = True

zc = zeroconf.Zeroconf()
serviceMonitor = MKServiceMonitor(zc)
try:
    while True:
        sys.stdout.write('.')
        sys.stdout.flush()
        time.sleep(1)

except KeyboardInterrupt as e:
    pass
except Exception as e:
    print(e)
finally:
    serviceMonitor.stop()
    zc.close()
    print("---- we're done ---")
#!/usr/bin/python
#
# Example of how to connect to a (publish/subscribe) service, get an initial full
# update and subsequent incremental updates until the service terminates, or the
# loop is interrupted by Ctrl-C

import sys
import time
import zeroconf
import zmq

from machinetalk.protobuf.message_pb2 import Container
from machinetalk.protobuf.types_pb2 import MT_EMC_OPERATOR_ERROR, MT_PING

class MKService(object):
    def __init__(self, name, properties):
        self.service = properties[b'service']
        self.uuid    = properties[b'uuid']
        self.dsn     = properties[b'dsn']
        self.name    = self.service.decode()
        self.serviceName = name

    def process(self, container):
        pass

class MKLogMonitor(MKService):

    def __init__(self, ctx, name, properties):
        MKService.__init__(self, name, properties)
        self.socket = ctx.socket(zmq.SUB)
        if 'status' == self.name:
            #self.socket.setsockopt(zmq.SUBSCRIBE, b'task')
            self.socket.setsockopt(zmq.SUBSCRIBE, b'config')
        elif 'error' == self.name:
            self.socket.setsockopt(zmq.SUBSCRIBE, b'text')
            self.socket.setsockopt(zmq.SUBSCRIBE, b'error')
            self.socket.setsockopt(zmq.SUBSCRIBE, b'display')
        else:
            self.socket.setsockopt(zmq.SUBSCRIBE, b'')
        self.socket.connect(self.dsn)

    def process(self, msg, container):
        #print("%s: %s" % (self.name, [f[0].name for f in container.ListFields()]))
        if container.type == MT_EMC_OPERATOR_ERROR:
            print("%s[%s]: '%s'" % (self.name, container.type, container.note))
        elif container.type != MT_PING:
            print("%s: -|%s|-" % (self.name, str(container)))


class MKServiceMonitor(object):

    def __init__(self, zc):
        self.rx = Container()
        self.zc = zc
        self.poller = zmq.Poller()
        self.context = zmq.Context()
        self.services = []
        self.browser = zeroconf.ServiceBrowser(zc, "_machinekit._tcp.local.", self)

    def remove_service(self, zc, typ, name):
        for service in self.services:
            if service.serviceName == name:
                self.poller.unregister(service.socket)
                self.services = [s for s in self.services if s != service]
                print("%s terminated" % service.name)
                break

    def add_service(self, zc, typ, name):
        info = zc.get_service_info(typ, name)
        if info.properties.get(b'service'):
            sname = info.properties[b'service'].decode()
            dsn = ''
            ch = '-'
            #if sname in ['log', 'error', 'status', 'preview', 'previewstatus', 'halrcomp]:
            if sname in ['status', 'error']:
                service = MKLogMonitor(self.context, name, info.properties)
                self.services.append(service)
                self.poller.register(service.socket, zmq.POLLIN)
                dsn = service.dsn.decode()
                ch = '+'
            sys.stdout.write("\n%s %-13s %s " % (ch, sname, dsn))
        else:
            sys.stdout.write("\n%s: '%s'" % (name, info))
        sys.stdout.flush()


    def poll(self, timeout):
        s = dict(self.poller.poll(timeout))
        for service in self.services:
            if service.socket in s:
                try:
                    (origin, msg) = service.socket.recv_multipart()
                    self.rx.ParseFromString(msg)
                except Exception as e:
                    print("%s exception: %s" % (service.name, e))
                else:
                    service.process(msg, self.rx);
                continue

    def stop(self):
        self.browser.done = True

zc = zeroconf.Zeroconf()
serviceMonitor = MKServiceMonitor(zc)
try:
    first = True
    while True:
        if serviceMonitor.services:
            if not first:
                print('')
            first = True
            serviceMonitor.poll(1000)
        else:
            if first:
                sys.stdout.write('waiting ')
                first = False
            else:
                sys.stdout.write('.')
            sys.stdout.flush()
            time.sleep(1)

except KeyboardInterrupt as e:
    pass
except Exception as e:
    print(e)
finally:
    serviceMonitor.stop()
    zc.close()
    print("---- we're done ---")

Reply via email to