connectcore-demo-example: add BLE support to the demo
Signed-off-by: Tatiana Leon <Tatiana.Leon@digi.com> Signed-off-by: David Escalona <david.escalona@digi.com>
This commit is contained in:
parent
2b39907ddf
commit
8a81829e7b
|
|
@ -1,6 +1,6 @@
|
|||
#!/usr/bin/python3
|
||||
|
||||
# Copyright (c) 2022, Digi International, Inc.
|
||||
# Copyright (c) 2022, 2023, Digi International, Inc.
|
||||
#
|
||||
# Permission to use, copy, modify, and/or distribute this software for any
|
||||
# purpose with or without fee is hereby granted, provided that the above
|
||||
|
|
@ -15,8 +15,10 @@
|
|||
# PERFORMANCE OF THIS SOFTWARE.
|
||||
|
||||
import argparse
|
||||
import base64
|
||||
import cgi
|
||||
import errno
|
||||
import hashlib
|
||||
import http.server
|
||||
import json
|
||||
import logging
|
||||
|
|
@ -28,15 +30,18 @@ import signal
|
|||
import socketserver
|
||||
import stat
|
||||
import subprocess
|
||||
import time
|
||||
from json import JSONDecodeError
|
||||
|
||||
from digi.apix.bluetooth import BluetoothException, BluetoothDevice, BluetoothProfile
|
||||
from digi.apix.exceptions import DigiAPIXException
|
||||
from digi.apix.network import IPMode, NetworkException, NetStatus, NetworkInterface, NetworkProfile
|
||||
from digi.apix.wifi import SecurityMode, WifiException, WifiInterface, WifiProfile
|
||||
from logging.handlers import SysLogHandler
|
||||
from subprocess import call, TimeoutExpired
|
||||
from threading import Thread
|
||||
from threading import Thread, Lock
|
||||
|
||||
from digi.ccble.exceptions import BluetoothNotSupportedException, ConnectCoreBLEException
|
||||
from digi.ccble.service import BLEInterface, ConnectCoreBLEService
|
||||
|
||||
# Constants.
|
||||
APP_NAME = "Demo server"
|
||||
|
|
@ -50,7 +55,7 @@ SIZE_KB = "KB"
|
|||
SIZE_MB = "MB"
|
||||
SIZE_GB = "GB"
|
||||
|
||||
DIVIDERS = {SIZE_KB : 1, SIZE_MB: 2, SIZE_GB: 3}
|
||||
DIVIDERS = {SIZE_KB: 1, SIZE_MB: 2, SIZE_GB: 3}
|
||||
|
||||
ZERO_MAC = "00:00:00:00:00:00"
|
||||
ZERO_IP = "0.0.0.0"
|
||||
|
|
@ -58,11 +63,45 @@ NOT_AVAILABLE = "N/A"
|
|||
|
||||
ELEMENT_BLUETOOTH = "bluetooth"
|
||||
ELEMENT_ETHERNET = "ethernet"
|
||||
ELEMENT_INFO = "info"
|
||||
ELEMENT_PING = "ping"
|
||||
ELEMENT_RESTART_BLUETOOTH = "restart-bluetooth"
|
||||
ELEMENT_TEST_CONNECTIVITY = "test-connectivity"
|
||||
ELEMENT_WIFI = "wifi"
|
||||
|
||||
PREFIX_ETHERNET = "eth"
|
||||
PREFIX_WIFI = "wlan"
|
||||
|
||||
BT_PASSWORD_FILE = "/etc/bt.pwd"
|
||||
BT_PASSWORD_DEFAULT = "1234"
|
||||
BT_PASSWORD_LENGTH = 16
|
||||
BT_PASSWORD_SERIAL_LENGTH = 6
|
||||
|
||||
BT_ADVERT_NAME = "CONNECTCORE-%s"
|
||||
|
||||
BT_TAG_DATA = "data"
|
||||
BT_TAG_DESC = "desc"
|
||||
BT_TAG_ELEMENT = "element"
|
||||
BT_TAG_IFACE = "iface"
|
||||
BT_TAG_INDEX = "index"
|
||||
BT_TAG_NAME = "name"
|
||||
BT_TAG_OP = "operation"
|
||||
BT_TAG_PASSWORD = "password"
|
||||
BT_TAG_SETTINGS = "settings"
|
||||
BT_TAG_STATUS = "status"
|
||||
BT_TAG_TOTAL = "total"
|
||||
BT_TAG_VALUE = "value"
|
||||
|
||||
BT_OP_READ = "R"
|
||||
BT_OP_WRITE = "W"
|
||||
|
||||
BT_PAYLOAD_LIMIT = 200
|
||||
|
||||
BT_REQUEST_TIMEOUT = 5000
|
||||
|
||||
COMMAND_PING = "ping -q -w 1 -c 1 -I %s 8.8.8.8"
|
||||
COMMAND_READ_SN = "fw_printenv -n serial#"
|
||||
|
||||
# Variables.
|
||||
log = logging.getLogger(APP_NAME)
|
||||
last_cpu_work = 0
|
||||
|
|
@ -129,7 +168,7 @@ class RequestHandler(http.server.SimpleHTTPRequestHandler):
|
|||
platform.version(),
|
||||
platform.machine()),
|
||||
"dey_version": "DEY-%s-%s" % (get_dey_version(), read_file("/etc/version")),
|
||||
"serial_number": read_proc_file("/proc/device-tree/digi,hwid,sn"),
|
||||
"serial_number": get_serial_number(),
|
||||
"device_type": read_proc_file("/proc/device-tree/digi,machine,name"),
|
||||
"module_variant": read_proc_file("/proc/device-tree/digi,hwid,variant"),
|
||||
"board_variant": read_proc_file("/proc/device-tree/digi,carrierboard,version"),
|
||||
|
|
@ -140,7 +179,7 @@ class RequestHandler(http.server.SimpleHTTPRequestHandler):
|
|||
"flash_size": get_storage_size(),
|
||||
"video_resolution": get_video_resolution(),
|
||||
"fw_store_path": get_fw_store_path(),
|
||||
"bluetooth_mac": ZERO_MAC,
|
||||
"bluetooth_mac": get_bluetooth_mac(),
|
||||
"wifi_mac": ZERO_MAC,
|
||||
"wifi_ip": ZERO_IP,
|
||||
"ethernet0_mac": ZERO_MAC,
|
||||
|
|
@ -148,17 +187,6 @@ class RequestHandler(http.server.SimpleHTTPRequestHandler):
|
|||
"ethernet1_mac": ZERO_MAC,
|
||||
"ethernet1_ip": ZERO_IP,
|
||||
}
|
||||
# Fill bluetooth data.
|
||||
try:
|
||||
bt_devices = BluetoothDevice.list_devices()
|
||||
if bt_devices and "hci0" in bt_devices:
|
||||
try:
|
||||
bt_device = BluetoothDevice.get("hci0")
|
||||
info["bluetooth_mac"] = mac_to_human_string(bt_device.mac)
|
||||
except BluetoothException as exc2:
|
||||
log.error("Error reading interface 'hci0' data: %s", str(exc2))
|
||||
except DigiAPIXException as exc:
|
||||
log.error("Error listing bluetooth devices: %s", str(exc))
|
||||
# Fill ethernet interfaces data.
|
||||
try:
|
||||
interfaces = NetworkInterface.list_interfaces()
|
||||
|
|
@ -442,7 +470,7 @@ class RequestHandler(http.server.SimpleHTTPRequestHandler):
|
|||
|
||||
work_path = os.path.realpath(path)
|
||||
try:
|
||||
with open(work_path, "rb") as f:
|
||||
with open(work_path, "rb") as f:
|
||||
contents = f.read()
|
||||
except OSError as e:
|
||||
self._set_headers(200)
|
||||
|
|
@ -624,6 +652,390 @@ class RequestHandler(http.server.SimpleHTTPRequestHandler):
|
|||
self.end_headers()
|
||||
|
||||
|
||||
class BluetoothService:
|
||||
def __init__(self):
|
||||
self._last_request_chunk_time = 0
|
||||
self._request_chunks = []
|
||||
self._ble_service = None
|
||||
self._ble_thread = None
|
||||
self._lock = Lock()
|
||||
|
||||
def start(self):
|
||||
"""
|
||||
Start Bluetooth service.
|
||||
"""
|
||||
with self._lock:
|
||||
try:
|
||||
self._ble_service = ConnectCoreBLEService.get_instance()
|
||||
self._ble_service.add_connect_callback(self._connection_cb)
|
||||
self._ble_service.add_data_received_callback(self._data_received_cb)
|
||||
self._ble_service.set_password(get_bt_password())
|
||||
self._ble_service.set_advertising_name(get_bt_advertising_name())
|
||||
self._ble_thread = Thread(target=self._ble_service.start, daemon=True)
|
||||
self._ble_thread.start()
|
||||
return True
|
||||
except BluetoothNotSupportedException:
|
||||
log.warning("The system does not support Bluetooth")
|
||||
return False
|
||||
|
||||
def stop(self):
|
||||
"""
|
||||
Stop Bluetooth service.
|
||||
"""
|
||||
with self._lock:
|
||||
if self._ble_service:
|
||||
self._ble_service.remove_data_received_callback(self._data_received_cb)
|
||||
self._ble_service.remove_connect_callback(self._connection_cb)
|
||||
self._ble_service.stop()
|
||||
if self._ble_thread:
|
||||
self._ble_thread.join()
|
||||
self._ble_thread = None
|
||||
|
||||
def restart(self):
|
||||
"""
|
||||
Restarts the Bluetooth service.
|
||||
"""
|
||||
time.sleep(0.5)
|
||||
self.stop()
|
||||
time.sleep(1)
|
||||
self.start()
|
||||
|
||||
def _connection_cb(self):
|
||||
"""
|
||||
Callback to be notified when a new Bluetooth connection is established.
|
||||
"""
|
||||
log.info("New connection (type: '%s')", self._ble_service.get_interface_type().title)
|
||||
|
||||
def _data_received_cb(self, data):
|
||||
"""
|
||||
Callback to be notified when new Bluetooth data is received.
|
||||
- For a read request, send back the configuration data.
|
||||
- For a write request, update the configuration.
|
||||
|
||||
Args:
|
||||
data (Bytearray): received (JSON) data.
|
||||
"""
|
||||
# Check if request chunks list must be reset.
|
||||
if time.time() * 1000 > self._last_request_chunk_time + BT_REQUEST_TIMEOUT:
|
||||
self._request_chunks = []
|
||||
self._last_request_chunk_time = time.time() * 1000
|
||||
# Process request chunk.
|
||||
try:
|
||||
request_chunk = json.loads(data.decode("utf-8"))
|
||||
except JSONDecodeError as exc:
|
||||
self._send_error(None, f"Invalid JSON data from Bluetooth: {str(exc)}")
|
||||
return
|
||||
if request_chunk[BT_TAG_INDEX] is None or request_chunk[BT_TAG_TOTAL] is None \
|
||||
or request_chunk[BT_TAG_DATA] is None:
|
||||
self._send_error(None, "Invalid JSON data struct from Bluetooth")
|
||||
return
|
||||
# Extract request chunk data.
|
||||
self._request_chunks.insert(request_chunk[BT_TAG_INDEX] - 1, request_chunk[BT_TAG_DATA])
|
||||
# Check if request is complete.
|
||||
if len(self._request_chunks) == request_chunk[BT_TAG_TOTAL]:
|
||||
# Build request.
|
||||
encoded_data = "".join(chunk for chunk in self._request_chunks)
|
||||
self._request_chunks = []
|
||||
request = self._get_request(base64.b64decode(encoded_data))
|
||||
if not request:
|
||||
return
|
||||
self._process_request(request)
|
||||
|
||||
def _get_request(self, data):
|
||||
"""
|
||||
Checks if a request follows the format:
|
||||
|
||||
{
|
||||
"element" : "<info, wifi, ethernet>", required: optional, default=info
|
||||
"iface" : "<iface_name>", required: element=wifi or element=ethernet
|
||||
"operation": "R" / "W", required: optional, default=R
|
||||
"settings" required: always
|
||||
[
|
||||
{
|
||||
"name" : "<setting_name>", required: always
|
||||
"value" : "<setting_value>" required: Operation=W
|
||||
},
|
||||
. . .
|
||||
]
|
||||
}
|
||||
|
||||
Args:
|
||||
data (Bytearray): Received (JSON) data.
|
||||
|
||||
Returns:
|
||||
Dictionary: A dictionary, `None` if request is invalid.
|
||||
"""
|
||||
try:
|
||||
req = json.loads(data.decode("utf-8"))
|
||||
except JSONDecodeError as exc:
|
||||
self._send_error(None, f"Invalid JSON data from Bluetooth: {str(exc)}")
|
||||
return None
|
||||
|
||||
if (req.get(BT_TAG_ELEMENT, ELEMENT_INFO) == ELEMENT_INFO
|
||||
and req.get(BT_TAG_OP, BT_OP_READ) != BT_OP_READ):
|
||||
self._send_error(req,
|
||||
f"Invalid request format: '{req.get(BT_TAG_OP, BT_OP_READ)}' "
|
||||
f"not allowed for '{ELEMENT_INFO}'")
|
||||
return None
|
||||
|
||||
invalid_tag = None
|
||||
if (req.get(BT_TAG_ELEMENT, ELEMENT_INFO) in (ELEMENT_ETHERNET, ELEMENT_WIFI)
|
||||
and not req.get(BT_TAG_IFACE, None)):
|
||||
invalid_tag = BT_TAG_IFACE
|
||||
elif (req.get(BT_TAG_ELEMENT, ELEMENT_INFO) not in (ELEMENT_TEST_CONNECTIVITY,
|
||||
ELEMENT_RESTART_BLUETOOTH,
|
||||
ELEMENT_PING)
|
||||
and BT_TAG_SETTINGS not in req):
|
||||
invalid_tag = BT_TAG_SETTINGS
|
||||
elif (req.get(BT_TAG_OP, BT_OP_READ) == BT_OP_WRITE
|
||||
and not all(BT_TAG_VALUE in setting for setting in req.get(BT_TAG_SETTINGS, None))):
|
||||
invalid_tag = BT_TAG_VALUE
|
||||
|
||||
if invalid_tag:
|
||||
self._send_error(req, f"Invalid request format: '{invalid_tag}' not specified")
|
||||
req = None
|
||||
|
||||
return req
|
||||
|
||||
def _process_request(self, request):
|
||||
"""
|
||||
Processes the request.
|
||||
|
||||
Args:
|
||||
request (Dictionary): JSON string to be processed.
|
||||
"""
|
||||
print(f"REQUEST: {request}")
|
||||
element = request.get(BT_TAG_ELEMENT, ELEMENT_INFO)
|
||||
|
||||
if element == ELEMENT_INFO:
|
||||
self._process_info_request(request)
|
||||
elif element in (ELEMENT_ETHERNET, ELEMENT_WIFI):
|
||||
self._process_net_request(request)
|
||||
elif element == ELEMENT_BLUETOOTH:
|
||||
self._process_bluetooth_request(request)
|
||||
elif element == ELEMENT_TEST_CONNECTIVITY:
|
||||
self._process_test_connection_request(request)
|
||||
elif element == ELEMENT_PING:
|
||||
self._process_ping_request()
|
||||
elif element == ELEMENT_RESTART_BLUETOOTH:
|
||||
self._process_restart_bluetooth()
|
||||
else:
|
||||
self._send_error(request, f"Element '{element}' not found")
|
||||
return
|
||||
|
||||
def _process_info_request(self, request):
|
||||
"""
|
||||
Processes an 'info' request.
|
||||
|
||||
Args:
|
||||
request (Dictionary): The info request.
|
||||
"""
|
||||
settings = request.get(BT_TAG_SETTINGS, None)
|
||||
for setting in settings:
|
||||
setting_name = setting[BT_TAG_NAME]
|
||||
if setting_name == "uboot_version":
|
||||
setting_value = read_proc_file("/proc/device-tree/digi,uboot,version")
|
||||
elif setting_name == "kernel_version":
|
||||
setting_value = f"{platform.system()} " \
|
||||
f"{platform.node()} " \
|
||||
f"{platform.release()} " \
|
||||
f"{platform.version()} " \
|
||||
f"{platform.machine()} GNU/Linux"
|
||||
elif setting_name == "dey_version":
|
||||
setting_value = f"DEY-{get_dey_version()}-{read_file('/etc/version')}"
|
||||
elif setting_name == "device_type":
|
||||
setting_value = read_proc_file("/proc/device-tree/digi,machine,name")
|
||||
elif setting_name == "ethernet0_mac":
|
||||
setting_value = get_ethernet_mac()
|
||||
elif setting_name == "n_eth":
|
||||
setting_value = 0
|
||||
net_ifaces = NetworkInterface.list_interfaces()
|
||||
for net_iface in net_ifaces:
|
||||
setting_value = setting_value + 1 \
|
||||
if net_iface.startswith("eth")else setting_value
|
||||
elif setting_name == "variant":
|
||||
setting_value = read_proc_file("/proc/device-tree/digi,hwid,variant")
|
||||
elif setting_name == "serial":
|
||||
setting_value = get_serial_number()
|
||||
else:
|
||||
self._send_error(request, f"Invalid '{ELEMENT_INFO}' setting: '{setting_name}'")
|
||||
return
|
||||
setting[BT_TAG_VALUE] = setting_value
|
||||
|
||||
resp = {BT_TAG_STATUS: 0,
|
||||
BT_TAG_ELEMENT: ELEMENT_INFO,
|
||||
BT_TAG_SETTINGS: settings}
|
||||
|
||||
self._send_data(json.dumps(resp).encode('utf-8'))
|
||||
|
||||
def _process_net_request(self, request):
|
||||
"""
|
||||
Processes a 'ethernet' or 'wifi' request.
|
||||
|
||||
Args:
|
||||
request (Dictionary): The request.
|
||||
"""
|
||||
element = request.get(BT_TAG_ELEMENT, ELEMENT_ETHERNET)
|
||||
iface_name = request.get(BT_TAG_IFACE, None)
|
||||
settings = request.get(BT_TAG_SETTINGS, None)
|
||||
operation = request.get(BT_TAG_OP, BT_OP_READ)
|
||||
|
||||
if operation == BT_OP_READ:
|
||||
try:
|
||||
res = get_network_iface_configuration(iface_name)
|
||||
except DigiAPIXException as exc:
|
||||
self._send_error(request, f"Error getting interface configuration: {str(exc)}")
|
||||
return
|
||||
|
||||
for setting in settings:
|
||||
setting[BT_TAG_VALUE] = res.get(setting[BT_TAG_NAME], None)
|
||||
if setting[BT_TAG_VALUE] is None:
|
||||
self._send_error(request, f"Invalid '{iface_name}' setting: "
|
||||
f"'{setting[BT_TAG_NAME]}'")
|
||||
return
|
||||
|
||||
resp = {BT_TAG_STATUS: 0,
|
||||
BT_TAG_ELEMENT: element,
|
||||
BT_TAG_IFACE: iface_name,
|
||||
BT_TAG_SETTINGS: settings}
|
||||
else:
|
||||
cfg = {}
|
||||
for setting in settings:
|
||||
cfg[setting[BT_TAG_NAME]] = setting[BT_TAG_VALUE]
|
||||
res = set_network_iface_configuration(iface_name, cfg)
|
||||
if res.get("status", None) == 1:
|
||||
self._send_error(request, f"Error setting '{iface_name}' "
|
||||
f"configuration: {res.get('desc', '')}")
|
||||
return
|
||||
resp = {BT_TAG_STATUS: 0,
|
||||
BT_TAG_ELEMENT: element,
|
||||
BT_TAG_IFACE: iface_name,
|
||||
BT_TAG_SETTINGS: settings}
|
||||
|
||||
self._send_data(json.dumps(resp).encode('utf-8'))
|
||||
|
||||
def _process_bluetooth_request(self, request):
|
||||
"""
|
||||
Processes a 'bluetooth' request.
|
||||
|
||||
Args:
|
||||
request (Dictionary): The request.
|
||||
"""
|
||||
settings = request.get(BT_TAG_SETTINGS, None)
|
||||
operation = request.get(BT_TAG_OP, BT_OP_READ)
|
||||
|
||||
if operation == BT_OP_READ:
|
||||
try:
|
||||
res = get_bluetooth_device_configuration(0)
|
||||
except DigiAPIXException as exc:
|
||||
self._send_error(request, f"Error getting bluetooth configuration: {str(exc)}")
|
||||
return
|
||||
|
||||
for setting in settings:
|
||||
setting[BT_TAG_VALUE] = res.get(setting[BT_TAG_NAME], None)
|
||||
if setting[BT_TAG_VALUE] is None:
|
||||
self._send_error(request, f"Invalid 'bluetooth' setting: "
|
||||
f"'{setting[BT_TAG_NAME]}'")
|
||||
return
|
||||
|
||||
resp = {BT_TAG_STATUS: 0,
|
||||
BT_TAG_ELEMENT: ELEMENT_BLUETOOTH,
|
||||
BT_TAG_SETTINGS: settings}
|
||||
else:
|
||||
# For the moment, attend only 'password' setting.
|
||||
for setting in settings:
|
||||
if setting[BT_TAG_NAME] == BT_TAG_PASSWORD:
|
||||
if not save_bt_password(setting[BT_TAG_VALUE]):
|
||||
self._send_error(request, "Error saving Bluetooth password")
|
||||
return
|
||||
|
||||
resp = {BT_TAG_STATUS: 0,
|
||||
BT_TAG_ELEMENT: ELEMENT_BLUETOOTH,
|
||||
BT_TAG_SETTINGS: settings}
|
||||
|
||||
self._send_data(json.dumps(resp).encode('utf-8'))
|
||||
|
||||
def _process_test_connection_request(self, request):
|
||||
"""
|
||||
Processes a 'test-connectivity' request.
|
||||
|
||||
Args:
|
||||
request (Dictionary): The request.
|
||||
"""
|
||||
interface = request.get(BT_TAG_IFACE, None)
|
||||
|
||||
if not test_interface_connection(interface):
|
||||
self._send_error(request, f"Connectivity error in '{interface}'")
|
||||
return
|
||||
|
||||
resp = {BT_TAG_STATUS: 0,
|
||||
BT_TAG_ELEMENT: ELEMENT_TEST_CONNECTIVITY}
|
||||
|
||||
self._send_data(json.dumps(resp).encode('utf-8'))
|
||||
|
||||
def _process_restart_bluetooth(self):
|
||||
"""
|
||||
Processes a 'restart-bluetooth' request.
|
||||
"""
|
||||
# No answer can be sent as connection will be closed.
|
||||
restart_thread = Thread(target=self.restart)
|
||||
restart_thread.start()
|
||||
|
||||
def _process_ping_request(self):
|
||||
"""
|
||||
Processes a 'ping' request.
|
||||
"""
|
||||
resp = {BT_TAG_STATUS: 0, BT_TAG_ELEMENT: ELEMENT_PING}
|
||||
|
||||
self._send_data(json.dumps(resp).encode('utf-8'))
|
||||
|
||||
def _send_data(self, data):
|
||||
"""
|
||||
Sends data to the Bluetooth connected device.
|
||||
|
||||
Args:
|
||||
data (Bytearray): Data to be sent.
|
||||
"""
|
||||
print(f"REQUEST ANSWER: {data}")
|
||||
try:
|
||||
encoded_data = base64.b64encode(data).decode('utf-8')
|
||||
total_chunks = (int(len(encoded_data) / BT_PAYLOAD_LIMIT)) + \
|
||||
(1 if len(encoded_data) % BT_PAYLOAD_LIMIT != 0 else 0)
|
||||
chunk_index = 0
|
||||
while chunk_index < total_chunks:
|
||||
chunk_data = {
|
||||
BT_TAG_INDEX: chunk_index + 1,
|
||||
BT_TAG_TOTAL: total_chunks,
|
||||
BT_TAG_DATA: encoded_data[chunk_index * BT_PAYLOAD_LIMIT: chunk_index *
|
||||
BT_PAYLOAD_LIMIT + BT_PAYLOAD_LIMIT]
|
||||
}
|
||||
self._ble_service.send_data(json.dumps(chunk_data).encode('utf-8'))
|
||||
time.sleep(0.2)
|
||||
chunk_index += 1
|
||||
except ConnectCoreBLEException as exc:
|
||||
log.error("Unable send data to connected device: %s", str(exc))
|
||||
|
||||
def _send_error(self, request, msg):
|
||||
"""
|
||||
Sends an error message to the client.
|
||||
|
||||
Args:
|
||||
request (Dictionary): Original request.
|
||||
msg (String): Error message to send.
|
||||
"""
|
||||
log.error(msg)
|
||||
resp = {BT_TAG_STATUS: 1}
|
||||
if request:
|
||||
element = request.get(BT_TAG_ELEMENT, ELEMENT_INFO)
|
||||
iface = request.get(BT_TAG_IFACE, None)
|
||||
resp.update({BT_TAG_ELEMENT: element})
|
||||
if iface is not None:
|
||||
resp.update({BT_TAG_IFACE: iface})
|
||||
|
||||
resp.update({BT_TAG_DESC: msg})
|
||||
self._send_data(json.dumps(resp).encode('utf-8'))
|
||||
|
||||
|
||||
def filter_by_extension(name, filters):
|
||||
"""
|
||||
Returns whether the provided name ends with one of the provided filters.
|
||||
|
|
@ -872,6 +1284,134 @@ def get_mca_version():
|
|||
return NOT_AVAILABLE, NOT_AVAILABLE
|
||||
|
||||
|
||||
def get_serial_number():
|
||||
"""
|
||||
Gets the device serial number.
|
||||
|
||||
Returns:
|
||||
String: The device serial number.
|
||||
"""
|
||||
code, serial = exec_cmd(COMMAND_READ_SN)
|
||||
if code == 0:
|
||||
return serial.strip()
|
||||
|
||||
return read_proc_file("/proc/device-tree/digi,hwid,sn")
|
||||
|
||||
|
||||
def get_bluetooth_mac():
|
||||
"""
|
||||
Gets the device Bluetooth MAC address.
|
||||
|
||||
Returns:
|
||||
String: The Bluetooth MAC address.
|
||||
"""
|
||||
bt_mac = ZERO_MAC
|
||||
try:
|
||||
bt_devices = BluetoothDevice.list_devices()
|
||||
if bt_devices and "hci0" in bt_devices:
|
||||
try:
|
||||
bt_device = BluetoothDevice.get("hci0")
|
||||
bt_mac = mac_to_human_string(bt_device.mac)
|
||||
except BluetoothException as exc2:
|
||||
log.error("Error reading interface 'hci0' data: %s", str(exc2))
|
||||
except DigiAPIXException as exc:
|
||||
log.error("Error listing bluetooth devices: %s", str(exc))
|
||||
|
||||
return bt_mac
|
||||
|
||||
|
||||
def get_ethernet_mac():
|
||||
"""
|
||||
Gets the device Bluetooth MAC address.
|
||||
|
||||
Returns:
|
||||
String: The Bluetooth MAC address.
|
||||
"""
|
||||
eth_mac = ZERO_MAC
|
||||
try:
|
||||
net_iface = NetworkInterface.get("eth0")
|
||||
eth_mac = str(net_iface.mac)
|
||||
except NetworkException as exc:
|
||||
log.error("Error getting eth0 MAC: %s", str(exc))
|
||||
|
||||
return eth_mac
|
||||
|
||||
|
||||
def get_bt_advertising_name():
|
||||
"""
|
||||
Gets the Bluetooth advertising name.
|
||||
|
||||
Returns:
|
||||
String: The Bluetooth advertising name.
|
||||
"""
|
||||
# The advertising name must be a value well known by both sides: the device and the mobile app.
|
||||
# The mobile app only has access to the scanned label values, which includes the ETH MAC and
|
||||
# uses it to calculate the device BT MAC address. For devices with only one ethernet interface,
|
||||
# the BT MAC is the ETH MAC + 2, but for devices with 2 ethernet interfaces, the BT MAC is the
|
||||
# ETH MAC + 3. Since with the scanned label it is impossible to determine for the mobile app
|
||||
# whether the device has 1 or 2 ethernet interfaces, we will be using the ETH0 MAC address as
|
||||
# the advertised name, which is well known in both sides instead of trying to determine the
|
||||
# real BT MAC.
|
||||
mac = get_ethernet_mac().replace(":", "")
|
||||
mac_tail = f"{mac[-4:]}".upper()
|
||||
|
||||
return BT_ADVERT_NAME % mac_tail
|
||||
|
||||
|
||||
def get_bt_password():
|
||||
"""
|
||||
Gets the Bluetooth password.
|
||||
|
||||
Returns:
|
||||
String: The Bluetooth password.
|
||||
"""
|
||||
# Check first if password is saved in file.
|
||||
password = read_file(BT_PASSWORD_FILE)
|
||||
if not password or password == NOT_AVAILABLE:
|
||||
# Generate password using Serial number.
|
||||
password = get_serial_number()
|
||||
if not password or password == NOT_AVAILABLE:
|
||||
# Use default password
|
||||
password = BT_PASSWORD_DEFAULT
|
||||
else:
|
||||
# Use only the last 6 positions of the serial.
|
||||
password = password[-1 * BT_PASSWORD_SERIAL_LENGTH:]
|
||||
# Hash the plain password and return 16 last characters.
|
||||
password_hash = hashlib.sha256(password.encode())
|
||||
password = password_hash.hexdigest().lower()
|
||||
|
||||
return password[-1 * BT_PASSWORD_LENGTH:]
|
||||
|
||||
|
||||
def save_bt_password(password):
|
||||
"""
|
||||
Saves the Bluetooth password.
|
||||
|
||||
Params:
|
||||
password (String): The password to save.
|
||||
|
||||
Returns:
|
||||
Boolean: 'True' if success, 'False' otherwise.
|
||||
"""
|
||||
# Hash the plain password and save the 16 last characters.
|
||||
password_hash = hashlib.sha256(password.encode())
|
||||
password = password_hash.hexdigest().lower()
|
||||
|
||||
return write_file(BT_PASSWORD_FILE, password)
|
||||
|
||||
|
||||
def test_interface_connection(interface):
|
||||
"""
|
||||
Tests the given interface connectivity.
|
||||
|
||||
Returns:
|
||||
Boolean: 'True' if success, 'False' otherwise.
|
||||
"""
|
||||
code, _ = exec_cmd(COMMAND_PING % interface)
|
||||
|
||||
return code == 0
|
||||
|
||||
|
||||
def get_led_status(name):
|
||||
"""
|
||||
Get the LED value.
|
||||
|
|
@ -1309,17 +1849,38 @@ def read_file(path):
|
|||
String: Contents of the file, 'N/A' if any error occurs.
|
||||
"""
|
||||
try:
|
||||
with open(path) as f:
|
||||
return f.read()
|
||||
except OSError as e:
|
||||
if e.errno == errno.ENOENT:
|
||||
log.error("File '%s' does not exist (errno: %d)", path, e.errno)
|
||||
with open(path, encoding="utf-8") as file:
|
||||
return file.read()
|
||||
except OSError as exc:
|
||||
if exc.errno == errno.ENOENT:
|
||||
log.error("File '%s' does not exist (errno: %d)", path, exc.errno)
|
||||
else:
|
||||
log.error("Cannot open file '%s' (errno: %d)", path, e.errno)
|
||||
log.error("Cannot open file '%s' (errno: %d)", path, exc.errno)
|
||||
|
||||
return NOT_AVAILABLE
|
||||
|
||||
|
||||
def write_file(path, value):
|
||||
"""
|
||||
Writes the provided file path with the given value.
|
||||
|
||||
Args:
|
||||
path (String): Absolute path of the file to write.
|
||||
value (String): Value to write.
|
||||
|
||||
Returns:
|
||||
Boolean: 'True' if success, 'False' otherwise.
|
||||
"""
|
||||
try:
|
||||
with open(path, 'w', encoding="utf-8") as file:
|
||||
file.write(value)
|
||||
except OSError as exc:
|
||||
log.error("Cannot write file '%s' (errno: %d)", path, exc.errno)
|
||||
return False
|
||||
|
||||
return True
|
||||
|
||||
|
||||
def resize_to(value, to, divider=1024):
|
||||
"""
|
||||
Resizes the given value.
|
||||
|
|
@ -1432,9 +1993,13 @@ def main():
|
|||
server_thread.deamon = True
|
||||
server_thread.start()
|
||||
|
||||
bt_service = BluetoothService()
|
||||
bt_service.start()
|
||||
|
||||
# Wait for termination/interrupt signal.
|
||||
signal.sigwait([signal.SIGTERM, signal.SIGINT])
|
||||
|
||||
bt_service.stop()
|
||||
server.shutdown()
|
||||
server.server_close()
|
||||
log.info("Sever stopped")
|
||||
|
|
@ -1443,4 +2008,3 @@ def main():
|
|||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
|
||||
|
|
|
|||
Loading…
Reference in New Issue