I could not find any guide or managed to get any of the examples working.
The only example I got to work was
machinekit/src/machinetalk/tutorial/zeroconf/resolve.py but didn't
understand what was going on - nor was it clear if I have to understand
what's happening or not.
Attached are two simple examples I came up with - if there are better ways
of doing it please let me know. Both examples use the packages python-zmq
and python-zeroconf which are available on all major distributions and for
python2 and python3. The reason I used zeroconf instead of the avahi/dbus
approach of resolve.py is because it's a lot simpler - at least from my
perspective.
mk-hello-world.py scans for all machinekit services and prints each one of
them to the screen. Once no more services are discovered the process is
terminated by pressing Ctrl+C.
I consider this the hello-world for Machinetalk because it is the first
step to connecting to machinetalk. Because Machinetalk uses random ports
one must first find a service's endpoint before we can connect to it..
The simple services (from a code effort to usefulness standpoint) are the
publish/subscriber services. mk-hello-service.py subsribes to the status
and error service. Each published service has a list of topics one can
subscribe to (I could not find any documentation about the details). One
must subscribe to a topic in order to trigger the initial full update
message. An empty string can be used as the topic which serves as a
wildcard for all topics - although in this case no initial full update is
triggered, but you'll get all subsequent incremental updates.
It turns out to be important that the reverse lookups are setup correctly,
otherwise there are significant delays in establishing a connection.
--
website: http://www.machinekit.io blog: http://blog.machinekit.io github:
https://github.com/machinekit
---
You received this message because you are subscribed to the Google Groups
"Machinekit" group.
To unsubscribe from this group and stop receiving emails from it, send an email
to [email protected].
Visit this group at https://groups.google.com/group/machinekit.
To view this discussion on the web visit
https://groups.google.com/d/msgid/machinekit/ac2c20ee-3a8e-48a3-a9e4-b83dfedded24%40googlegroups.com.
For more options, visit https://groups.google.com/d/optout.
#!/usr/bin/python
#
# Simple example to discover all Machinetalk services and their end points.
import sys
import time
import zeroconf
import zmq
class MKServiceMonitor(object):
def __init__(self, zc):
self.browser = zeroconf.ServiceBrowser(zc, "_machinekit._tcp.local.", self)
def remove_service(self, zc, typ, name):
print("%s terminated" % name)
def add_service(self, zc, typ, name):
info = zc.get_service_info(typ, name)
if info.properties.get(b'service'):
sname = info.properties[b'service'].decode()
dsn = info.properties[b'dsn'].decode()
sys.stdout.write("\n%-13s %s " % (sname, dsn))
else:
sys.stdout.write("\n%s: '%s'" % (name, info))
sys.stdout.flush()
def stop(self):
self.browser.done = True
zc = zeroconf.Zeroconf()
serviceMonitor = MKServiceMonitor(zc)
try:
while True:
sys.stdout.write('.')
sys.stdout.flush()
time.sleep(1)
except KeyboardInterrupt as e:
pass
except Exception as e:
print(e)
finally:
serviceMonitor.stop()
zc.close()
print("---- we're done ---")
#!/usr/bin/python
#
# Example of how to connect to a (publish/subscribe) service, get an initial full
# update and subsequent incremental updates until the service terminates, or the
# loop is interrupted by Ctrl-C
import sys
import time
import zeroconf
import zmq
from machinetalk.protobuf.message_pb2 import Container
from machinetalk.protobuf.types_pb2 import MT_EMC_OPERATOR_ERROR, MT_PING
class MKService(object):
def __init__(self, name, properties):
self.service = properties[b'service']
self.uuid = properties[b'uuid']
self.dsn = properties[b'dsn']
self.name = self.service.decode()
self.serviceName = name
def process(self, container):
pass
class MKLogMonitor(MKService):
def __init__(self, ctx, name, properties):
MKService.__init__(self, name, properties)
self.socket = ctx.socket(zmq.SUB)
if 'status' == self.name:
#self.socket.setsockopt(zmq.SUBSCRIBE, b'task')
self.socket.setsockopt(zmq.SUBSCRIBE, b'config')
elif 'error' == self.name:
self.socket.setsockopt(zmq.SUBSCRIBE, b'text')
self.socket.setsockopt(zmq.SUBSCRIBE, b'error')
self.socket.setsockopt(zmq.SUBSCRIBE, b'display')
else:
self.socket.setsockopt(zmq.SUBSCRIBE, b'')
self.socket.connect(self.dsn)
def process(self, msg, container):
#print("%s: %s" % (self.name, [f[0].name for f in container.ListFields()]))
if container.type == MT_EMC_OPERATOR_ERROR:
print("%s[%s]: '%s'" % (self.name, container.type, container.note))
elif container.type != MT_PING:
print("%s: -|%s|-" % (self.name, str(container)))
class MKServiceMonitor(object):
def __init__(self, zc):
self.rx = Container()
self.zc = zc
self.poller = zmq.Poller()
self.context = zmq.Context()
self.services = []
self.browser = zeroconf.ServiceBrowser(zc, "_machinekit._tcp.local.", self)
def remove_service(self, zc, typ, name):
for service in self.services:
if service.serviceName == name:
self.poller.unregister(service.socket)
self.services = [s for s in self.services if s != service]
print("%s terminated" % service.name)
break
def add_service(self, zc, typ, name):
info = zc.get_service_info(typ, name)
if info.properties.get(b'service'):
sname = info.properties[b'service'].decode()
dsn = ''
ch = '-'
#if sname in ['log', 'error', 'status', 'preview', 'previewstatus', 'halrcomp]:
if sname in ['status', 'error']:
service = MKLogMonitor(self.context, name, info.properties)
self.services.append(service)
self.poller.register(service.socket, zmq.POLLIN)
dsn = service.dsn.decode()
ch = '+'
sys.stdout.write("\n%s %-13s %s " % (ch, sname, dsn))
else:
sys.stdout.write("\n%s: '%s'" % (name, info))
sys.stdout.flush()
def poll(self, timeout):
s = dict(self.poller.poll(timeout))
for service in self.services:
if service.socket in s:
try:
(origin, msg) = service.socket.recv_multipart()
self.rx.ParseFromString(msg)
except Exception as e:
print("%s exception: %s" % (service.name, e))
else:
service.process(msg, self.rx);
continue
def stop(self):
self.browser.done = True
zc = zeroconf.Zeroconf()
serviceMonitor = MKServiceMonitor(zc)
try:
first = True
while True:
if serviceMonitor.services:
if not first:
print('')
first = True
serviceMonitor.poll(1000)
else:
if first:
sys.stdout.write('waiting ')
first = False
else:
sys.stdout.write('.')
sys.stdout.flush()
time.sleep(1)
except KeyboardInterrupt as e:
pass
except Exception as e:
print(e)
finally:
serviceMonitor.stop()
zc.close()
print("---- we're done ---")