connectcore-demo-example: use 'libdigiapix' API for network and bluetooth

Signed-off-by: David Escalona <david.escalona@digi.com>
This commit is contained in:
David Escalona 2022-10-11 13:56:25 +02:00
parent fe23aafc92
commit 14f849fb8d
1 changed files with 122 additions and 159 deletions

View File

@ -29,6 +29,10 @@ import socketserver
import stat
import subprocess
from digi.apix.bluetooth import BluetoothException, BluetoothDevice
from digi.apix.exceptions import DigiAPIXException
from digi.apix.network import NetworkException, NetStatus, NetworkInterface
from digi.apix.wifi import WifiException, WifiInterface
from logging.handlers import SysLogHandler
from subprocess import call, TimeoutExpired
from threading import Thread
@ -112,7 +116,11 @@ class RequestHandler(http.server.SimpleHTTPRequestHandler):
mca_versions = get_mca_version()
info = {
"uboot_version": read_proc_file("/proc/device-tree/digi,uboot,version"),
"kernel_version": "%s %s %s %s %s GNU/Linux" % (platform.system(), platform.node(), platform.release(), platform.version(), platform.machine()),
"kernel_version": "%s %s %s %s %s GNU/Linux" % (platform.system(),
platform.node(),
platform.release(),
platform.version(),
platform.machine()),
"dey_version": "DEY-%s-%s" % (get_dey_version(), read_file("/etc/version")),
"serial_number": read_proc_file("/proc/device-tree/digi,hwid,sn"),
"device_type": read_proc_file("/proc/device-tree/digi,machine,name"),
@ -125,14 +133,57 @@ class RequestHandler(http.server.SimpleHTTPRequestHandler):
"flash_size": get_storage_size(),
"video_resolution": get_video_resolution(),
"fw_store_path": get_fw_store_path(),
"bluetooth_mac": get_bt_mac("hci0"),
"wifi_mac": read_file("/sys/class/net/wlan0/address").strip().upper() if "wlan0" in list_net_ifaces() else ZERO_MAC,
"wifi_ip": get_iface_ip("wlan0") if "wlan0" in list_net_ifaces() else ZERO_IP,
"ethernet0_mac": read_file("/sys/class/net/eth0/address").strip().upper() if "eth0" in list_net_ifaces() else ZERO_MAC,
"ethernet0_ip": get_iface_ip("eth0") if "eth0" in list_net_ifaces() else ZERO_IP,
"ethernet1_mac": read_file("/sys/class/net/eth1/address").strip().upper() if "eth1" in list_net_ifaces() else ZERO_MAC,
"ethernet1_ip": get_iface_ip("eth1") if "eth1" in list_net_ifaces() else ZERO_IP,
"bluetooth_mac": ZERO_MAC,
"wifi_mac": ZERO_MAC,
"wifi_ip": ZERO_IP,
"ethernet0_mac": ZERO_MAC,
"ethernet0_ip": ZERO_IP,
"ethernet1_mac": ZERO_MAC,
"ethernet1_ip": ZERO_IP,
}
# Fill bluetooth data.
try:
bt_devices = BluetoothDevice.list_devices()
if bt_devices and "hci0" in bt_devices:
try:
bt_device = BluetoothDevice.get("hci0")
info["bluetooth_mac"] = mac_to_human_string(bt_device.mac)
except BluetoothException as exc2:
log.error("Error reading interface 'hci0' data: %s", str(exc2))
except DigiAPIXException as exc:
log.error("Error listing bluetooth devices: %s", str(exc))
# Fill ethernet interfaces data.
try:
interfaces = NetworkInterface.list_interfaces()
if interfaces and "eth0" in interfaces:
try:
net_iface = NetworkInterface.get("eth0")
info["ethernet0_mac"] = str(net_iface.mac)
info["ethernet0_ip"] = str(net_iface.ipv4)
except NetworkException as exc2:
log.error("Error reading interface 'eth0' data: %s", str(exc2))
if interfaces and "eth1" in interfaces:
try:
net_iface = NetworkInterface.get("eth1")
info["ethernet1_mac"] = mac_to_human_string(net_iface.mac)
info["ethernet1_ip"] = str(net_iface.ipv4)
except NetworkException as exc2:
log.error("Error reading interface 'eth1' data: %s", str(exc2))
except DigiAPIXException as exc:
log.error("Error listing network interfaces: %s", str(exc))
# Fill WiFi interfaces data.
try:
interfaces = WifiInterface.list_interfaces()
if interfaces and "wlan0" in interfaces:
try:
net_iface = WifiInterface.get("wlan0")
info["wifi_mac"] = mac_to_human_string(net_iface.mac)
info["wifi_ip"] = str(net_iface.ipv4)
except WifiException as exc2:
log.error("Error reading interface 'wlan0' data: %s", str(exc2))
except DigiAPIXException as exc:
log.error("Error listing network interfaces: %s", str(exc))
# Send the JSON value.
self.wfile.write(json.dumps(info).encode(encoding="utf_8"))
@ -146,7 +197,7 @@ class RequestHandler(http.server.SimpleHTTPRequestHandler):
mem_total = int(mem_info.get("MemTotal", "-1")) if mem_info else -1
mem_free = int(mem_info.get("MemFree", "-1")) if mem_info else -1
mem_used = (mem_total - mem_free) if (mem_total > 0 and mem_free > 0) else -1
bt_data = get_bt_data("hci0")
status = {
"system_monitor/cpu_load": get_cpu_load(),
"system_monitor/cpu_temperature": get_cpu_temp(),
@ -154,18 +205,56 @@ class RequestHandler(http.server.SimpleHTTPRequestHandler):
"system_monitor/uptime": get_uptime(),
"system_monitor/free_memory": mem_free,
"system_monitor/used_memory": mem_used,
"system_monitor/led_status": get_led_status("USER_LED"),
"system_monitor/hci0/state": bt_data.get("state", 0),
"system_monitor/hci0/rx_bytes": bt_data.get("rx_bytes", 0),
"system_monitor/hci0/tx_bytes": bt_data.get("tx_bytes", 0),
"system_monitor/led_status": get_led_status("USER_LED")
}
list_ifaces = list_net_ifaces()
for name in list_ifaces:
data = get_iface_data(name)
status["system_monitor/%s/state" % name] = data["state"]
status["system_monitor/%s/rx_bytes" % name] = data["rx_bytes"]
status["system_monitor/%s/tx_bytes" % name] = data["tx_bytes"]
# Fill bluetooth devices data.
try:
bt_devices = BluetoothDevice.list_devices()
for device in bt_devices:
try:
bt_device = BluetoothDevice.get(device)
statistics = bt_device.get_statistics()
status[f"system_monitor/{device}/state"] = 1 if \
bt_device.is_enabled else 0
status[f"system_monitor/{device}/rx_bytes"] = statistics.rx_bytes
status[f"system_monitor/{device}/tx_bytes"] = statistics.tx_bytes
except NetworkException as exc2:
log.error("Error reading bluetooth device '%s' data: %s", device, str(exc2))
except DigiAPIXException as exc:
log.error("Error listing bluetooth devices: %s", str(exc))
# Fill ethernet interfaces data.
try:
interfaces = NetworkInterface.list_interfaces()
for iface in interfaces:
try:
net_iface = NetworkInterface.get(iface)
statistics = net_iface.get_statistics()
status[f"system_monitor/{iface}/state"] = 1 if \
net_iface.status == NetStatus.CONNECTED else 0
status[f"system_monitor/{iface}/rx_bytes"] = statistics.rx_bytes
status[f"system_monitor/{iface}/tx_bytes"] = statistics.tx_bytes
except NetworkException as exc2:
log.error("Error reading interface '%s' data: %s", iface, str(exc2))
except DigiAPIXException as exc:
log.error("Error listing network interfaces: %s", str(exc))
# Fill WiFi interfaces data.
try:
interfaces = WifiInterface.list_interfaces()
for iface in interfaces:
try:
wifi_iface = WifiInterface.get(iface)
statistics = wifi_iface.get_statistics()
status[f"system_monitor/{iface}/state"] = 1 if \
wifi_iface.status == NetStatus.CONNECTED else 0
status[f"system_monitor/{iface}/rx_bytes"] = statistics.rx_bytes
status[f"system_monitor/{iface}/tx_bytes"] = statistics.tx_bytes
except WifiException as exc2:
log.error("Error reading interface '%s' data: %s", iface, str(exc2))
except DigiAPIXException as exc:
log.error("Error listing WiFi interfaces: %s", str(exc))
# Send the JSON value.
self.wfile.write(json.dumps(status).encode(encoding="utf_8"))
@ -753,146 +842,6 @@ def get_mca_version():
return NOT_AVAILABLE, NOT_AVAILABLE
def list_net_ifaces():
"""
Returs a list with the names of the network interfaces.
Returns:
List: List of network inteface names.
"""
return os.listdir("/sys/class/net")
def get_iface_ip(iface_name):
"""
Gets the IP address of the provided interface.
Args:
iface_name (String): Name of the network interface to get its IP.
Returns:
String: The IP of the interface.
"""
res = exec_cmd("ip addr show %s" % iface_name)
if res[0] == 0:
tmp = res[1].split("inet ")
if len(tmp) > 1:
return tmp[1].split("/")[0]
log.error("Error getting IP of interface '%s': %s", iface_name, res[1])
return ZERO_IP
def get_iface_data(iface_name):
"""
Gets network interface state and statistics.
Args:
iface_name (String): Name of the interface to get data.
Returns:
Dictionary: The network interface state and statistics.
"""
data = {
"state": 0,
"rx_bytes": 0,
"tx_bytes": 0
}
state = read_file("/sys/class/net/%s/operstate" % iface_name)
if state != NOT_AVAILABLE:
data["state"] = 1 if state.strip() == "up" else 0
stats = read_file("/proc/net/dev")
if stats == NOT_AVAILABLE:
return data
for line in stats.splitlines():
if not line.strip().startswith("%s: " % iface_name):
continue
fields = line.split()
data["rx_bytes"] = fields[1]
data["tx_bytes"] = fields[9]
break
return data
def is_bt_available():
"""
Checks if Bluetooth is available on the device.
Returns:
Boolean: True if available, False otherwise.
"""
return os.path.isdir("/proc/device-tree/bluetooth")
def get_bt_mac(iface_name):
"""
Gets Bluetooth MAC address.
Args:
iface_name (String): Name of the interface to get the MAC.
Returns:
String: The Bluetooth MAC.
"""
if not is_bt_available():
return ZERO_MAC
res = exec_cmd("hciconfig %s" % iface_name)
if res[0] == 0:
return res[1].split("BD Address:")[1].split("ACL")[0].strip()
else:
log.error("Error getting MAC of Bluetooth interface '%s': %s", iface_name, res[1])
return ZERO_MAC
def get_bt_data(iface_name):
"""
Gets Bluetooth interface state and statistics.
Args:
iface_name (String): Name of the interface to get data.
Returns:
Dictionary: The Bluetooth interface state and statistics.
"""
data = {
"state": 0,
"rx_bytes": 0,
"tx_bytes": 0
}
if not is_bt_available():
return data
res = exec_cmd("hciconfig %s" % iface_name)
if res[0] == 0:
info = res[1]
else:
log.error("Error getting status of Bluetooth interface '%s': %s", iface_name, res[1])
return data
lines = info.splitlines()
if not lines:
return data
if len(lines) > 2 and re.fullmatch("(UP) .*", lines[2].strip()):
data["state"] = 1
if len(lines) > 3:
m = re.fullmatch("RX bytes:([0-9]+) .*", lines[3].strip())
data["rx_bytes"] = m.group(1) if m else 0
if len(lines) > 4:
m = re.fullmatch("TX bytes:([0-9]+) .*", lines[4].strip())
data["tx_bytes"] = m.group(1) if m else 0
return data
def get_led_status(name):
"""
Get the LED value.
@ -996,6 +945,20 @@ def set_audio_volume(value):
return None
def mac_to_human_string(mac, num_bytes=6):
"""
Transforms the given MAC address into a human readable string.
Args:
mac (:class:`.MacAddress`): The MAC address to transform.
num_bytes (Integer): The number of bytes to use in the MAC Address.
Return:
String: The given MAC as human readable string.
"""
return ":".join(["%02X" % i for i in mac][8-num_bytes:]).upper()
def read_proc_file(path):
"""
Gets contents of a proc file.