diff --git a/meta-digi-dey/recipes-digi/dey-examples/dey-examples-hdp.bb b/meta-digi-dey/recipes-digi/dey-examples/dey-examples-hdp.bb index b306b3a9f..e5642292f 100644 --- a/meta-digi-dey/recipes-digi/dey-examples/dey-examples-hdp.bb +++ b/meta-digi-dey/recipes-digi/dey-examples/dey-examples-hdp.bb @@ -14,6 +14,6 @@ do_install() { install -m 0755 hdp-test.py ${D}${bindir} } -RDEPENDS_${PN} = "python python-argparse python-crypt python-dbus python-pygobject" +RDEPENDS_${PN} = "python3 python3-argparse python3-crypt python3-dbus python3-pygobject" COMPATIBLE_MACHINE = "(ccardimx28|ccimx6$|ccimx6ul)" diff --git a/meta-digi-dey/recipes-digi/dey-examples/files/hdp_test/hdp-test.py b/meta-digi-dey/recipes-digi/dey-examples/files/hdp_test/hdp-test.py index 38930676a..4f417cd09 100644 --- a/meta-digi-dey/recipes-digi/dey-examples/files/hdp_test/hdp-test.py +++ b/meta-digi-dey/recipes-digi/dey-examples/files/hdp_test/hdp-test.py @@ -1,4 +1,4 @@ -#!/usr/bin/env python +#!/usr/bin/env python3 # -*- coding: utf-8 -*- # @@ -9,12 +9,13 @@ import sys import os -import glib +from gi.repository import GLib import dbus import socket import dbus.service -import gobject +from gi.repository import GObject from dbus.mainloop.glib import DBusGMainLoop +from dbus.exceptions import DBusException import argparse # from hdp_utils import * @@ -31,18 +32,8 @@ class MessageType: Release_Confirmation, Data, Unknown) = range(0, 6) class HdpMessage: - def s2b(self, msg): - if msg is None: - return None - return [ord(x) for x in msg] - - def b2s(self, msg): - if msg is None: - return None - return "".join([chr(int(x)) for x in msg]) - def getAssociationResponse(self, invokeId): - return self.b2s(( + return bytes(( 0xe3, 0x00, #APDU CHOICE Type(AareApdu) 0x00, 0x2c, #CHOICE.length = 44 0x00, 0x00, #result=accept (known config) @@ -62,7 +53,7 @@ class HdpMessage: )) def getConfigurationResp(self, invokeId): - return self.b2s(( + return bytes(( 0xe7, 0x00, #APDU CHOICE Type(PrstApdu) 0x00, 0x16, #CHOICE.length = 22 0x00, 0x14, #OCTET STRING.length = 20 @@ -78,24 +69,24 @@ class HdpMessage: )) def getReleaseRequest(self, invokeId): - return self.b2s((0xe4, 0x00, 0x00, 0x02, 0x00, 0x00)) + return bytes((0xe4, 0x00, 0x00, 0x02, 0x00, 0x00)) def getReleaseResponse(self, invokeId): - return self.b2s((0xe5, 0x00, 0x00, 0x02, 0x00, 0x00)) + return bytes((0xe5, 0x00, 0x00, 0x02, 0x00, 0x00)) def getDataResponse(self, invokeId): - return self.b2s(( - 0xe7, 0x00, #APDU CHOICE Type(PrstApdu) - 0x00, 0x12, #CHOICE.length = 18 - 0x00, 0x10, #OCTET STRING.length = 16 - invokeId[0], invokeId[1], #invoke-id (mirrored from invocation) - 0x02, 0x01, #CHOICE(Remote Operation Response | Confirmed Event Report) - 0x00, 0x0a, #CHOICE.length = 10 - 0x00, 0x00, #obj-handle = 0 (MDS object) - 0x00, 0x00, 0x00, 0x00, #currentTime = 0 - 0x0d, 0x1d, #event-type = MDC_NOTI_SCAN_REPORT_FIXED - 0x00, 0x00, #event-reply-info.length = 0 - )) + return bytes(( + 0xe7, 0x00, #APDU CHOICE Type(PrstApdu) + 0x00, 0x12, #CHOICE.length = 18 + 0x00, 0x10, #OCTET STRING.length = 16 + invokeId[0], invokeId[1], #invoke-id (mirrored from invocation) + 0x02, 0x01, #CHOICE(Remote Operation Response | Confirmed Event Report) + 0x00, 0x0a, #CHOICE.length = 10 + 0x00, 0x00, #obj-handle = 0 (MDS object) + 0x00, 0x00, 0x00, 0x00, #currentTime = 0 + 0x0d, 0x1d, #event-type = MDC_NOTI_SCAN_REPORT_FIXED + 0x00, 0x00, #event-reply-info.length = 0 + )) def parse(self, string_msg): # @@ -113,28 +104,27 @@ class HdpMessage: invokeId = (0, 0) sp02 = 0 pulse = 0 - msg = self.s2b(string_msg) if debugOn: - print "IEEE opcode received: %x, length = %d" % (int(msg[0]), len(msg)) - for i in range(len(msg)): - if ((i & 15) == 0): - print - print '%2.2X' % int(msg[i]), - print + print("IEEE opcode received: %x, length = %d" % (int(string_msg[0]), len(string_msg))) + for i in range(len(string_msg)): + if ((i & 15) == 0): + print + print('%2.2X' % int(string_msg[i]),) + print - if int(msg[0]) == 0xe2: + if int(string_msg[0]) == 0xe2: msg_type = MessageType.Association - elif int(msg[0]) == 0xe7: - invokeId = int(msg[6]), int(msg[7]) - if int(msg[18]) == 0x0d and int(msg[19]) == 0x1c: + elif int(string_msg[0]) == 0xe7: + invokeId = int(string_msg[6]), int(string_msg[7]) + if int(string_msg[18]) == 0x0d and int(string_msg[19]) == 0x1c: msg_type = MessageType.Configuration else: msg_type = MessageType.Data - sp02 = int(msg[35]) - pulse = int(msg[49]) - elif int(msg[0]) == 0xe4: + sp02 = int(string_msg[35]) + pulse = int(string_msg[49]) + elif int(string_msg[0]) == 0xe4: msg_type = MessageType.Release_Request - elif int(msg[0]) == 0xe5: + elif int(string_msg[0]) == 0xe5: msg_type = MessageType.Release_Confirmation else: msg_type = MessageType.Unknown @@ -152,7 +142,7 @@ def receive_data(sk, evt): data = None disconnecting = False hdp = HdpMessage() - if evt & glib.IO_IN: + if evt & GLib.IO_IN: try: data = sk.recv(1024) except IOError: @@ -163,44 +153,44 @@ def receive_data(sk, evt): invokeId = result[1] if msgType == MessageType.Association: if debugOn: - print "Oximeter has associated" + print("Oximeter has associated") sk.send(hdp.getAssociationResponse(invokeId)) elif msgType == MessageType.Configuration: if debugOn: - print "Received configuration data" + print("Received configuration data") sk.send(hdp.getConfigurationResponse(invokeId)) elif msgType == MessageType.Release_Request: if debugOn: - print "Received release request" + print("Received release request") sk.send(hdp.getReleaseResponse(invokeId)) disconnecting = True elif msgType == MessageType.Release_Confirmation: if debugOn: - print "Received release confirmation" + print("Received release confirmation") disconnecting = True elif msgType == MessageType.Data: sk.send(hdp.getDataResponse(invokeId)) sp02 = result[2] pulse = result[3] if debugOn: - print "Received data from oximeter" - print "SpO2 Level: %d, Beats/second: %d" % \ - (result[2], result[3]) + print("Received data from oximeter") + print("SpO2 Level: %d, Beats/second: %d" % \ + (result[2], result[3])) if debugOn: - print "Sending disconnect" + print("Sending disconnect") sk.send(hdp.getReleaseRequest(invokeId)) else: - print "Received unknown message, disconnecting" + print("Received unknown message, disconnecting") sk.send(hdp.getReleaseRequest(invokeId)) disconnecting = True - if disconnecting or evt != glib.IO_IN or not data: + if disconnecting or evt != GLib.IO_IN or not data: try: sk.shutdown(2) except IOError: pass sk.close() - print "Disconnected from oximeter" + print("Disconnected from oximeter") return False else: return True @@ -223,9 +213,9 @@ class SignalHandler(object): dbus_interface=HEALTH_DEVICE_INTERFACE) def ChannelConnected(self, channel, interface, device): - print "%s has connected" % device + print("%s has connected" % device) if debugOn: - print "Channel: %s" % channel + print("Channel: %s" % channel) # # The oximeter has connected to us. Let's get # a socket for the connection. @@ -241,14 +231,14 @@ class SignalHandler(object): # Now set up our receiver function to be called # when interesting events are detected on that # socket - watch_bitmap = glib.IO_IN | glib.IO_ERR| glib.IO_HUP | glib.IO_NVAL - glib.io_add_watch(sk, watch_bitmap, receive_data) + watch_bitmap = GLib.IO_IN | GLib.IO_ERR| GLib.IO_HUP | GLib.IO_NVAL + GLib.io_add_watch(sk, watch_bitmap, receive_data) except DBusException: - print "Error communicating with Oximeter." - print "Please make sure the Oximeter has fresh batteries." + print("Error communicating with Oximeter.") + print("Please make sure the Oximeter has fresh batteries.") def ChannelDeleted(self, channel, interface, device): - print "Device %s channel %s deleted" % (device, channel) + print("Device %s channel %s deleted" % (device, channel)) parser = argparse.ArgumentParser() parser.add_argument("-d", "--debug", help="supply debug output", @@ -257,7 +247,7 @@ args = parser.parse_args() debugOn = args.debug DBusGMainLoop(set_as_default=True) -loop = gobject.MainLoop() +loop = GObject.MainLoop() bus = dbus.SystemBus() signal_handler = SignalHandler() @@ -275,16 +265,16 @@ config = dbus.Dictionary({"Role": "Sink", "DataType": dbus.types.UInt16(0x1004), manager = dbus.Interface(bus.get_object(BUS_NAME, PATH), HEALTH_MANAGER_INTERFACE) app = manager.CreateApplication(config) -print "HDP application created, waiting for connection from" -print "a pulse oximeter. Press control-c to terminate." +print("HDP application created, waiting for connection from") +print("a pulse oximeter. Press control-c to terminate.") try: - loop = glib.MainLoop() + loop = GLib.MainLoop() loop.run() except KeyboardInterrupt: pass finally: manager.DestroyApplication(app) print - print "Application stopped" + print("Application stopped") print