From 14f849fb8d8a22e7d1f8e2b4cea9dbfb3b831768 Mon Sep 17 00:00:00 2001 From: David Escalona Date: Tue, 11 Oct 2022 13:56:25 +0200 Subject: [PATCH] connectcore-demo-example: use 'libdigiapix' API for network and bluetooth Signed-off-by: David Escalona --- connectcore-demo-example/demoserver.py | 281 +++++++++++-------------- 1 file changed, 122 insertions(+), 159 deletions(-) diff --git a/connectcore-demo-example/demoserver.py b/connectcore-demo-example/demoserver.py index 713f243..0d74695 100755 --- a/connectcore-demo-example/demoserver.py +++ b/connectcore-demo-example/demoserver.py @@ -29,6 +29,10 @@ import socketserver import stat import subprocess +from digi.apix.bluetooth import BluetoothException, BluetoothDevice +from digi.apix.exceptions import DigiAPIXException +from digi.apix.network import NetworkException, NetStatus, NetworkInterface +from digi.apix.wifi import WifiException, WifiInterface from logging.handlers import SysLogHandler from subprocess import call, TimeoutExpired from threading import Thread @@ -112,7 +116,11 @@ class RequestHandler(http.server.SimpleHTTPRequestHandler): mca_versions = get_mca_version() info = { "uboot_version": read_proc_file("/proc/device-tree/digi,uboot,version"), - "kernel_version": "%s %s %s %s %s GNU/Linux" % (platform.system(), platform.node(), platform.release(), platform.version(), platform.machine()), + "kernel_version": "%s %s %s %s %s GNU/Linux" % (platform.system(), + platform.node(), + platform.release(), + platform.version(), + platform.machine()), "dey_version": "DEY-%s-%s" % (get_dey_version(), read_file("/etc/version")), "serial_number": read_proc_file("/proc/device-tree/digi,hwid,sn"), "device_type": read_proc_file("/proc/device-tree/digi,machine,name"), @@ -125,14 +133,57 @@ class RequestHandler(http.server.SimpleHTTPRequestHandler): "flash_size": get_storage_size(), "video_resolution": get_video_resolution(), "fw_store_path": get_fw_store_path(), - "bluetooth_mac": get_bt_mac("hci0"), - "wifi_mac": read_file("/sys/class/net/wlan0/address").strip().upper() if "wlan0" in list_net_ifaces() else ZERO_MAC, - "wifi_ip": get_iface_ip("wlan0") if "wlan0" in list_net_ifaces() else ZERO_IP, - "ethernet0_mac": read_file("/sys/class/net/eth0/address").strip().upper() if "eth0" in list_net_ifaces() else ZERO_MAC, - "ethernet0_ip": get_iface_ip("eth0") if "eth0" in list_net_ifaces() else ZERO_IP, - "ethernet1_mac": read_file("/sys/class/net/eth1/address").strip().upper() if "eth1" in list_net_ifaces() else ZERO_MAC, - "ethernet1_ip": get_iface_ip("eth1") if "eth1" in list_net_ifaces() else ZERO_IP, + "bluetooth_mac": ZERO_MAC, + "wifi_mac": ZERO_MAC, + "wifi_ip": ZERO_IP, + "ethernet0_mac": ZERO_MAC, + "ethernet0_ip": ZERO_IP, + "ethernet1_mac": ZERO_MAC, + "ethernet1_ip": ZERO_IP, } + # Fill bluetooth data. + try: + bt_devices = BluetoothDevice.list_devices() + if bt_devices and "hci0" in bt_devices: + try: + bt_device = BluetoothDevice.get("hci0") + info["bluetooth_mac"] = mac_to_human_string(bt_device.mac) + except BluetoothException as exc2: + log.error("Error reading interface 'hci0' data: %s", str(exc2)) + except DigiAPIXException as exc: + log.error("Error listing bluetooth devices: %s", str(exc)) + # Fill ethernet interfaces data. + try: + interfaces = NetworkInterface.list_interfaces() + if interfaces and "eth0" in interfaces: + try: + net_iface = NetworkInterface.get("eth0") + info["ethernet0_mac"] = str(net_iface.mac) + info["ethernet0_ip"] = str(net_iface.ipv4) + except NetworkException as exc2: + log.error("Error reading interface 'eth0' data: %s", str(exc2)) + if interfaces and "eth1" in interfaces: + try: + net_iface = NetworkInterface.get("eth1") + info["ethernet1_mac"] = mac_to_human_string(net_iface.mac) + info["ethernet1_ip"] = str(net_iface.ipv4) + except NetworkException as exc2: + log.error("Error reading interface 'eth1' data: %s", str(exc2)) + except DigiAPIXException as exc: + log.error("Error listing network interfaces: %s", str(exc)) + + # Fill WiFi interfaces data. + try: + interfaces = WifiInterface.list_interfaces() + if interfaces and "wlan0" in interfaces: + try: + net_iface = WifiInterface.get("wlan0") + info["wifi_mac"] = mac_to_human_string(net_iface.mac) + info["wifi_ip"] = str(net_iface.ipv4) + except WifiException as exc2: + log.error("Error reading interface 'wlan0' data: %s", str(exc2)) + except DigiAPIXException as exc: + log.error("Error listing network interfaces: %s", str(exc)) # Send the JSON value. self.wfile.write(json.dumps(info).encode(encoding="utf_8")) @@ -146,7 +197,7 @@ class RequestHandler(http.server.SimpleHTTPRequestHandler): mem_total = int(mem_info.get("MemTotal", "-1")) if mem_info else -1 mem_free = int(mem_info.get("MemFree", "-1")) if mem_info else -1 mem_used = (mem_total - mem_free) if (mem_total > 0 and mem_free > 0) else -1 - bt_data = get_bt_data("hci0") + status = { "system_monitor/cpu_load": get_cpu_load(), "system_monitor/cpu_temperature": get_cpu_temp(), @@ -154,18 +205,56 @@ class RequestHandler(http.server.SimpleHTTPRequestHandler): "system_monitor/uptime": get_uptime(), "system_monitor/free_memory": mem_free, "system_monitor/used_memory": mem_used, - "system_monitor/led_status": get_led_status("USER_LED"), - "system_monitor/hci0/state": bt_data.get("state", 0), - "system_monitor/hci0/rx_bytes": bt_data.get("rx_bytes", 0), - "system_monitor/hci0/tx_bytes": bt_data.get("tx_bytes", 0), + "system_monitor/led_status": get_led_status("USER_LED") } - list_ifaces = list_net_ifaces() - for name in list_ifaces: - data = get_iface_data(name) - status["system_monitor/%s/state" % name] = data["state"] - status["system_monitor/%s/rx_bytes" % name] = data["rx_bytes"] - status["system_monitor/%s/tx_bytes" % name] = data["tx_bytes"] + # Fill bluetooth devices data. + try: + bt_devices = BluetoothDevice.list_devices() + for device in bt_devices: + try: + bt_device = BluetoothDevice.get(device) + statistics = bt_device.get_statistics() + status[f"system_monitor/{device}/state"] = 1 if \ + bt_device.is_enabled else 0 + status[f"system_monitor/{device}/rx_bytes"] = statistics.rx_bytes + status[f"system_monitor/{device}/tx_bytes"] = statistics.tx_bytes + except NetworkException as exc2: + log.error("Error reading bluetooth device '%s' data: %s", device, str(exc2)) + except DigiAPIXException as exc: + log.error("Error listing bluetooth devices: %s", str(exc)) + + # Fill ethernet interfaces data. + try: + interfaces = NetworkInterface.list_interfaces() + for iface in interfaces: + try: + net_iface = NetworkInterface.get(iface) + statistics = net_iface.get_statistics() + status[f"system_monitor/{iface}/state"] = 1 if \ + net_iface.status == NetStatus.CONNECTED else 0 + status[f"system_monitor/{iface}/rx_bytes"] = statistics.rx_bytes + status[f"system_monitor/{iface}/tx_bytes"] = statistics.tx_bytes + except NetworkException as exc2: + log.error("Error reading interface '%s' data: %s", iface, str(exc2)) + except DigiAPIXException as exc: + log.error("Error listing network interfaces: %s", str(exc)) + + # Fill WiFi interfaces data. + try: + interfaces = WifiInterface.list_interfaces() + for iface in interfaces: + try: + wifi_iface = WifiInterface.get(iface) + statistics = wifi_iface.get_statistics() + status[f"system_monitor/{iface}/state"] = 1 if \ + wifi_iface.status == NetStatus.CONNECTED else 0 + status[f"system_monitor/{iface}/rx_bytes"] = statistics.rx_bytes + status[f"system_monitor/{iface}/tx_bytes"] = statistics.tx_bytes + except WifiException as exc2: + log.error("Error reading interface '%s' data: %s", iface, str(exc2)) + except DigiAPIXException as exc: + log.error("Error listing WiFi interfaces: %s", str(exc)) # Send the JSON value. self.wfile.write(json.dumps(status).encode(encoding="utf_8")) @@ -753,146 +842,6 @@ def get_mca_version(): return NOT_AVAILABLE, NOT_AVAILABLE -def list_net_ifaces(): - """ - Returs a list with the names of the network interfaces. - - Returns: - List: List of network inteface names. - """ - return os.listdir("/sys/class/net") - - -def get_iface_ip(iface_name): - """ - Gets the IP address of the provided interface. - - Args: - iface_name (String): Name of the network interface to get its IP. - - Returns: - String: The IP of the interface. - """ - res = exec_cmd("ip addr show %s" % iface_name) - if res[0] == 0: - tmp = res[1].split("inet ") - if len(tmp) > 1: - return tmp[1].split("/")[0] - - log.error("Error getting IP of interface '%s': %s", iface_name, res[1]) - - return ZERO_IP - - -def get_iface_data(iface_name): - """ - Gets network interface state and statistics. - - Args: - iface_name (String): Name of the interface to get data. - - Returns: - Dictionary: The network interface state and statistics. - """ - data = { - "state": 0, - "rx_bytes": 0, - "tx_bytes": 0 - } - state = read_file("/sys/class/net/%s/operstate" % iface_name) - if state != NOT_AVAILABLE: - data["state"] = 1 if state.strip() == "up" else 0 - - stats = read_file("/proc/net/dev") - if stats == NOT_AVAILABLE: - return data - - for line in stats.splitlines(): - if not line.strip().startswith("%s: " % iface_name): - continue - fields = line.split() - data["rx_bytes"] = fields[1] - data["tx_bytes"] = fields[9] - break - - return data - - -def is_bt_available(): - """ - Checks if Bluetooth is available on the device. - - Returns: - Boolean: True if available, False otherwise. - """ - return os.path.isdir("/proc/device-tree/bluetooth") - - -def get_bt_mac(iface_name): - """ - Gets Bluetooth MAC address. - - Args: - iface_name (String): Name of the interface to get the MAC. - - Returns: - String: The Bluetooth MAC. - """ - if not is_bt_available(): - return ZERO_MAC - - res = exec_cmd("hciconfig %s" % iface_name) - if res[0] == 0: - return res[1].split("BD Address:")[1].split("ACL")[0].strip() - else: - log.error("Error getting MAC of Bluetooth interface '%s': %s", iface_name, res[1]) - return ZERO_MAC - - -def get_bt_data(iface_name): - """ - Gets Bluetooth interface state and statistics. - - Args: - iface_name (String): Name of the interface to get data. - - Returns: - Dictionary: The Bluetooth interface state and statistics. - """ - data = { - "state": 0, - "rx_bytes": 0, - "tx_bytes": 0 - } - - if not is_bt_available(): - return data - - res = exec_cmd("hciconfig %s" % iface_name) - if res[0] == 0: - info = res[1] - else: - log.error("Error getting status of Bluetooth interface '%s': %s", iface_name, res[1]) - return data - - lines = info.splitlines() - if not lines: - return data - - if len(lines) > 2 and re.fullmatch("(UP) .*", lines[2].strip()): - data["state"] = 1 - - if len(lines) > 3: - m = re.fullmatch("RX bytes:([0-9]+) .*", lines[3].strip()) - data["rx_bytes"] = m.group(1) if m else 0 - - if len(lines) > 4: - m = re.fullmatch("TX bytes:([0-9]+) .*", lines[4].strip()) - data["tx_bytes"] = m.group(1) if m else 0 - - return data - - def get_led_status(name): """ Get the LED value. @@ -996,6 +945,20 @@ def set_audio_volume(value): return None +def mac_to_human_string(mac, num_bytes=6): + """ + Transforms the given MAC address into a human readable string. + + Args: + mac (:class:`.MacAddress`): The MAC address to transform. + num_bytes (Integer): The number of bytes to use in the MAC Address. + + Return: + String: The given MAC as human readable string. + """ + return ":".join(["%02X" % i for i in mac][8-num_bytes:]).upper() + + def read_proc_file(path): """ Gets contents of a proc file.