diff --git a/connectcore-demo-example/demoserver.py b/connectcore-demo-example/demoserver.py index 554a0a7..2d62fa2 100755 --- a/connectcore-demo-example/demoserver.py +++ b/connectcore-demo-example/demoserver.py @@ -102,12 +102,15 @@ BT_REQUEST_TIMEOUT = 5000 COMMAND_PING = "ping -q -w 1 -c 1 -I %s 8.8.8.8" COMMAND_READ_SN = "fw_printenv -n serial#" +NPU_DEMOS_FILE = "static/assets/npu_demos.json" + # Variables. log = logging.getLogger(APP_NAME) last_cpu_work = 0 last_cpu_total = 0 led_status = {} fw_process = None +npu_demos = None class RequestHandler(http.server.SimpleHTTPRequestHandler): @@ -624,6 +627,46 @@ class RequestHandler(http.server.SimpleHTTPRequestHandler): self.wfile.write(json.dumps(result).encode(encoding="utf_8")) else: self.wfile.write(json.dumps({"data": json.dumps(result)}).encode(encoding="utf_8")) + elif re.search("/ajax/get_npu_info", self.path) is not None: + # Set the response headers. + self._set_headers(200) + # Fill NPU info. + info = { + "has-npu-demos": str(has_npu_demos()).lower() + } + # Send the JSON value. + self.wfile.write(json.dumps(info).encode(encoding="utf_8")) + elif re.search("/ajax/get_npu_demos", self.path) is not None: + # Set the response headers. + self._set_headers(200) + # Get NPU demos data. + platform_id = get_platform_id() + demos = get_npu_demos_for_platform(platform_id) + self.wfile.write(json.dumps({"data": json.dumps(demos)}).encode(encoding="utf_8")) + elif re.search("/ajax/run_npu_demo", self.path) is not None: + # Set the response headers. + self._set_headers(200) + # Get the JSON data. + data = self.rfile.read(int(self.headers["Content-Length"])) + demo_id = json.loads(data.decode("utf-8")).get("demo_id", None) + # Get the demo with the given ID. + demo = get_npu_demo(demo_id) + if demo: + # Get the command to execute. + script = None + platform_id = get_platform_id() + for comp_platform in demo.get("compatible_platforms", []): + if comp_platform.get("platform", None) == platform_id: + script = comp_platform.get("launch_script", None) + break + if script: + # Execute the demo. + exec_cmd_nowait(script) + self.wfile.write("{}".encode(encoding="utf_8")) + else: + self.wfile.write(json.dumps({"error": "NPU demo launch script not found."}).encode(encoding="utf_8")) + else: + self.wfile.write(json.dumps({"error": "NPU demo not found."}).encode(encoding="utf_8")) else: # Forbidden. self._set_headers(403) @@ -1766,6 +1809,93 @@ def set_bluetooth_device_configuration(device, config_data): return data +def has_npu_demos(): + """ + Returns whether the device has NPU demos available or not. + + Returns: + Boolean: True if device has NPU demos available, False otherwise. + """ + return len(get_npu_demos_for_platform(get_platform_id())) > 0 + + +def load_npu_demos(): + """ + Returns the loaded NPU demos or parses the JSON file with NPU demos + metadata. + + Returns: + list: A list of dictionaries, each representing an NPU demo, + `None` if error + """ + global npu_demos + + # Return the demos if they are already loaded. + if npu_demos: + return npu_demos + + # Initialize the demos var. + npu_demos = None + + # Build demos file path. + npu_demos_file_path = os.path.join(os.path.dirname(__file__), NPU_DEMOS_FILE) + try: + with open(npu_demos_file_path, 'r') as file: + content = json.load(file) + if content: + npu_demos = content.get("npu_demos", []) + except FileNotFoundError: + log.error("Error reading NPU demos: File '%s' not found." % NPU_DEMOS_FILE) + except json.JSONDecodeError: + log.error("Error reading NPU demos: File '%s' contains invalid JSON." % NPU_DEMOS_FILE) + except OSError as exc: + log.error("Error reading NPU demos: %s" % str(exc)) + + return npu_demos + + +def get_npu_demo(demo_id): + """ + Returns the NPU demo matching the given ID. + + Args: + demo_id (str): ID of the NPU demo to get. + + Returns: + Dictionary: The NPU demo for the given ID, None if the demo is not + found. + """ + demos = load_npu_demos() + if not demos: + return None + + return next((demo for demo in demos if demo.get("id", None) == demo_id), None) + + +def get_npu_demos_for_platform(platform_name): + """ + Returns a list of NPU demos that are compatible with the given platform. + + Args: + platform_name (str): The name of the platform to filter NPU demos by. + + Returns: + list: A list of dictionaries, each representing an NPU demo compatible + with the given platform. + """ + compatible_demos = [] + demos = load_npu_demos() + + if demos: + for demo in demos: + for platform in demo.get('compatible_platforms', []): + if platform.get('platform') == platform_name and file_exists(platform.get('launch_script')): + compatible_demos.append(demo) + break + + return compatible_demos + + def get_platform_id(): """ Returns the running platform ID. @@ -1891,6 +2021,19 @@ def write_file(path, value): return True +def file_exists(file_path): + """ + Determines whether the given file path exists or not. + + Args: + file_path (String): Absolute path of the file to check. + + Returns: + Boolean: 'True' if the file exists, 'False' otherwise. + """ + return os.path.isfile(file_path) + + def resize_to(value, to, divider=1024): """ Resizes the given value. diff --git a/connectcore-demo-example/index.html b/connectcore-demo-example/index.html index 3128396..ac502b3 100644 --- a/connectcore-demo-example/index.html +++ b/connectcore-demo-example/index.html @@ -68,6 +68,14 @@ Digi Demo - Dashboard +
+
+