dey-examples-hdp: update sample to python3

- Some Python modules are no longer available in DEY-2.2 since
  the official supported Python version is now 3. This caused the
  HDP application to fail importing some of the old modules. For this
  reason the sample application has been updated to use Python3
  syntax and modules.

https://jira.digi.com/browse/DEL-3996

Signed-off-by: David Escalona <david.escalona@digi.com>
This commit is contained in:
David Escalona 2017-03-28 18:55:41 +02:00
parent c1709e4fd3
commit 98d514c1c2
2 changed files with 59 additions and 69 deletions

View File

@ -14,6 +14,6 @@ do_install() {
install -m 0755 hdp-test.py ${D}${bindir}
}
RDEPENDS_${PN} = "python python-argparse python-crypt python-dbus python-pygobject"
RDEPENDS_${PN} = "python3 python3-argparse python3-crypt python3-dbus python3-pygobject"
COMPATIBLE_MACHINE = "(ccardimx28|ccimx6$|ccimx6ul)"

View File

@ -1,4 +1,4 @@
#!/usr/bin/env python
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
#
@ -9,12 +9,13 @@
import sys
import os
import glib
from gi.repository import GLib
import dbus
import socket
import dbus.service
import gobject
from gi.repository import GObject
from dbus.mainloop.glib import DBusGMainLoop
from dbus.exceptions import DBusException
import argparse
# from hdp_utils import *
@ -31,18 +32,8 @@ class MessageType:
Release_Confirmation, Data, Unknown) = range(0, 6)
class HdpMessage:
def s2b(self, msg):
if msg is None:
return None
return [ord(x) for x in msg]
def b2s(self, msg):
if msg is None:
return None
return "".join([chr(int(x)) for x in msg])
def getAssociationResponse(self, invokeId):
return self.b2s((
return bytes((
0xe3, 0x00, #APDU CHOICE Type(AareApdu)
0x00, 0x2c, #CHOICE.length = 44
0x00, 0x00, #result=accept (known config)
@ -62,7 +53,7 @@ class HdpMessage:
))
def getConfigurationResp(self, invokeId):
return self.b2s((
return bytes((
0xe7, 0x00, #APDU CHOICE Type(PrstApdu)
0x00, 0x16, #CHOICE.length = 22
0x00, 0x14, #OCTET STRING.length = 20
@ -78,13 +69,13 @@ class HdpMessage:
))
def getReleaseRequest(self, invokeId):
return self.b2s((0xe4, 0x00, 0x00, 0x02, 0x00, 0x00))
return bytes((0xe4, 0x00, 0x00, 0x02, 0x00, 0x00))
def getReleaseResponse(self, invokeId):
return self.b2s((0xe5, 0x00, 0x00, 0x02, 0x00, 0x00))
return bytes((0xe5, 0x00, 0x00, 0x02, 0x00, 0x00))
def getDataResponse(self, invokeId):
return self.b2s((
return bytes((
0xe7, 0x00, #APDU CHOICE Type(PrstApdu)
0x00, 0x12, #CHOICE.length = 18
0x00, 0x10, #OCTET STRING.length = 16
@ -113,28 +104,27 @@ class HdpMessage:
invokeId = (0, 0)
sp02 = 0
pulse = 0
msg = self.s2b(string_msg)
if debugOn:
print "IEEE opcode received: %x, length = %d" % (int(msg[0]), len(msg))
for i in range(len(msg)):
print("IEEE opcode received: %x, length = %d" % (int(string_msg[0]), len(string_msg)))
for i in range(len(string_msg)):
if ((i & 15) == 0):
print
print '%2.2X' % int(msg[i]),
print('%2.2X' % int(string_msg[i]),)
print
if int(msg[0]) == 0xe2:
if int(string_msg[0]) == 0xe2:
msg_type = MessageType.Association
elif int(msg[0]) == 0xe7:
invokeId = int(msg[6]), int(msg[7])
if int(msg[18]) == 0x0d and int(msg[19]) == 0x1c:
elif int(string_msg[0]) == 0xe7:
invokeId = int(string_msg[6]), int(string_msg[7])
if int(string_msg[18]) == 0x0d and int(string_msg[19]) == 0x1c:
msg_type = MessageType.Configuration
else:
msg_type = MessageType.Data
sp02 = int(msg[35])
pulse = int(msg[49])
elif int(msg[0]) == 0xe4:
sp02 = int(string_msg[35])
pulse = int(string_msg[49])
elif int(string_msg[0]) == 0xe4:
msg_type = MessageType.Release_Request
elif int(msg[0]) == 0xe5:
elif int(string_msg[0]) == 0xe5:
msg_type = MessageType.Release_Confirmation
else:
msg_type = MessageType.Unknown
@ -152,7 +142,7 @@ def receive_data(sk, evt):
data = None
disconnecting = False
hdp = HdpMessage()
if evt & glib.IO_IN:
if evt & GLib.IO_IN:
try:
data = sk.recv(1024)
except IOError:
@ -163,44 +153,44 @@ def receive_data(sk, evt):
invokeId = result[1]
if msgType == MessageType.Association:
if debugOn:
print "Oximeter has associated"
print("Oximeter has associated")
sk.send(hdp.getAssociationResponse(invokeId))
elif msgType == MessageType.Configuration:
if debugOn:
print "Received configuration data"
print("Received configuration data")
sk.send(hdp.getConfigurationResponse(invokeId))
elif msgType == MessageType.Release_Request:
if debugOn:
print "Received release request"
print("Received release request")
sk.send(hdp.getReleaseResponse(invokeId))
disconnecting = True
elif msgType == MessageType.Release_Confirmation:
if debugOn:
print "Received release confirmation"
print("Received release confirmation")
disconnecting = True
elif msgType == MessageType.Data:
sk.send(hdp.getDataResponse(invokeId))
sp02 = result[2]
pulse = result[3]
if debugOn:
print "Received data from oximeter"
print "SpO2 Level: %d, Beats/second: %d" % \
(result[2], result[3])
print("Received data from oximeter")
print("SpO2 Level: %d, Beats/second: %d" % \
(result[2], result[3]))
if debugOn:
print "Sending disconnect"
print("Sending disconnect")
sk.send(hdp.getReleaseRequest(invokeId))
else:
print "Received unknown message, disconnecting"
print("Received unknown message, disconnecting")
sk.send(hdp.getReleaseRequest(invokeId))
disconnecting = True
if disconnecting or evt != glib.IO_IN or not data:
if disconnecting or evt != GLib.IO_IN or not data:
try:
sk.shutdown(2)
except IOError:
pass
sk.close()
print "Disconnected from oximeter"
print("Disconnected from oximeter")
return False
else:
return True
@ -223,9 +213,9 @@ class SignalHandler(object):
dbus_interface=HEALTH_DEVICE_INTERFACE)
def ChannelConnected(self, channel, interface, device):
print "%s has connected" % device
print("%s has connected" % device)
if debugOn:
print "Channel: %s" % channel
print("Channel: %s" % channel)
#
# The oximeter has connected to us. Let's get
# a socket for the connection.
@ -241,14 +231,14 @@ class SignalHandler(object):
# Now set up our receiver function to be called
# when interesting events are detected on that
# socket
watch_bitmap = glib.IO_IN | glib.IO_ERR| glib.IO_HUP | glib.IO_NVAL
glib.io_add_watch(sk, watch_bitmap, receive_data)
watch_bitmap = GLib.IO_IN | GLib.IO_ERR| GLib.IO_HUP | GLib.IO_NVAL
GLib.io_add_watch(sk, watch_bitmap, receive_data)
except DBusException:
print "Error communicating with Oximeter."
print "Please make sure the Oximeter has fresh batteries."
print("Error communicating with Oximeter.")
print("Please make sure the Oximeter has fresh batteries.")
def ChannelDeleted(self, channel, interface, device):
print "Device %s channel %s deleted" % (device, channel)
print("Device %s channel %s deleted" % (device, channel))
parser = argparse.ArgumentParser()
parser.add_argument("-d", "--debug", help="supply debug output",
@ -257,7 +247,7 @@ args = parser.parse_args()
debugOn = args.debug
DBusGMainLoop(set_as_default=True)
loop = gobject.MainLoop()
loop = GObject.MainLoop()
bus = dbus.SystemBus()
signal_handler = SignalHandler()
@ -275,16 +265,16 @@ config = dbus.Dictionary({"Role": "Sink", "DataType": dbus.types.UInt16(0x1004),
manager = dbus.Interface(bus.get_object(BUS_NAME, PATH),
HEALTH_MANAGER_INTERFACE)
app = manager.CreateApplication(config)
print "HDP application created, waiting for connection from"
print "a pulse oximeter. Press control-c to terminate."
print("HDP application created, waiting for connection from")
print("a pulse oximeter. Press control-c to terminate.")
try:
loop = glib.MainLoop()
loop = GLib.MainLoop()
loop.run()
except KeyboardInterrupt:
pass
finally:
manager.DestroyApplication(app)
print
print "Application stopped"
print("Application stopped")
print