From 9090c13d62ef0509ab8f3e1dcd4e29282ec9ed4e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Florian=20N=C3=BCcke?= Date: Mon, 17 Jan 2022 21:14:16 +0100 Subject: [PATCH] Add python util modules for device interop and blocking robot movement. --- src/main/scripts/init.d/S20export_python_path | 43 +++++ src/main/scripts/lib/micropython/devices.py | 157 ++++++++++++++++++ src/main/scripts/lib/micropython/robot.py | 63 +++++++ 3 files changed, 263 insertions(+) create mode 100644 src/main/scripts/init.d/S20export_python_path create mode 100644 src/main/scripts/lib/micropython/devices.py create mode 100644 src/main/scripts/lib/micropython/robot.py diff --git a/src/main/scripts/init.d/S20export_python_path b/src/main/scripts/init.d/S20export_python_path new file mode 100644 index 00000000..9448342f --- /dev/null +++ b/src/main/scripts/init.d/S20export_python_path @@ -0,0 +1,43 @@ +#!/bin/sh + +PROFILE_D_PATH="/etc/profile.d" +INIT_SCRIPT_NAME="python_path.sh" + +start() { + INIT_SCRIPT_PATH="$PROFILE_D_PATH/$INIT_SCRIPT_NAME" + if [ ! -f "$INIT_SCRIPT_PATH" ]; then + THIS_SCRIPT_PATH=$(readlink -f "$0") + THIS_SCRIPT_DIR=$(dirname "$THIS_SCRIPT_PATH") + PYTHON_LIB_PATH=$(readlink -f "$THIS_SCRIPT_DIR/../lib/micropython") + + echo 'if [ -z "$MICROPYPATH" ]; then' >>"$INIT_SCRIPT_PATH" + echo ' export MICROPYPATH="'$PYTHON_LIB_PATH'"' >>"$INIT_SCRIPT_PATH" + echo 'else' >>"$INIT_SCRIPT_PATH" + echo ' export MICROPYPATH="$MICROPYPATH:'$PYTHON_LIB_PATH'"' >>"$INIT_SCRIPT_PATH" + echo 'fi' >>"$INIT_SCRIPT_PATH" + fi + return 0 +} + +stop() { + return 0 +} + +reload() { + return 0 +} + +case "$1" in +start | stop | reload) + "$1" + ;; +restart) + reload + ;; +*) + echo "Usage: $0 {start|stop|reload|restart}" + exit 1 + ;; +esac + +exit $? diff --git a/src/main/scripts/lib/micropython/devices.py b/src/main/scripts/lib/micropython/devices.py new file mode 100644 index 00000000..49fb1c4a --- /dev/null +++ b/src/main/scripts/lib/micropython/devices.py @@ -0,0 +1,157 @@ +import io +import os +import select +import json + + +class Device: + def __init__(self, device_bus, device_id): + self.bus = device_bus + self.device_id = device_id + self.methods = None + + def __getattr__(self, item): + return lambda *args: self.bus.invoke(self.device_id, item, *args) + + def __str__(self): + if self.methods is None: + self.methods = self.bus.methods(self.device_id) + doc = "" + for method in self.methods: + doc += method["name"] + "(" + if "parameters" in method: + i = 0 + for p in method["parameters"]: + if i > 0: + doc += ", " + if "name" in p: + doc += p["name"] + else: + doc += "arg" + str(i) + doc += ": " + p["type"] + i += 1 + doc += "): " + method["returnType"] + "\n" + + if "description" in method and method["description"]: + doc += method["description"] + "\n" + + if "parameters" in method: + i = 0 + for p in method["parameters"]: + if "description" in p: + doc += " " + if "name" in p: + doc += p["name"] + else: + doc += "args" + str(i) + doc += " " + p["description"] + "\n" + i += 1 + return doc + + +class DeviceBus: + MESSAGE_DELIMITER = "\0" + + def __init__(self, path): + self.file = io.open(path, "+b") + os.system("stty -F %s raw -echo" % path) + self.poll = select.poll() + self.poll.register(self.file.fileno(), select.POLLIN) + self.buffer = None + self.buffer_pos = 0 + + def close(self): + self.file.close() + self.buffer = None + + def flush(self): + self._clear_buffer() + self._skip_input() + + def list(self): + self.flush() + self._write_message({'type': "list"}) + return self._read_message("list") + + def get(self, device_id): + for device in self.list(): + if device["deviceId"] == device_id: + return Device(self, device["deviceId"]) + return None + + def find(self, type_name): + for device in self.list(): + if "typeNames" in device and type_name in device["typeNames"]: + return Device(self, device["deviceId"]) + return None + + def methods(self, device_id): + self.flush() + self._write_message({"type": "methods", "data": device_id}) + return self._read_message("methods") + + def invoke(self, device_id, method_name, *args): + self.flush() + self._write_message({"type": "invoke", "data": { + "deviceId": device_id, + "name": method_name, + "parameters": args + }}) + return self._read_message("result") + + def _write_message(self, data): + self.file.write(self.MESSAGE_DELIMITER + json.dumps(data) + self.MESSAGE_DELIMITER) + + def _read_message(self, expected_type): + message = "" + while True: + value = chr(self._read_one()) + if value == self.MESSAGE_DELIMITER: + if message: + data = json.loads(message) + if data["type"] != expected_type: + raise Exception("unexpected message type: %s" % data["type"]) + if "data" in data: + return data["data"] + else: + return + else: + message = "" + else: + message += value + + def _read_one(self): + if self.buffer is None: + self._fill_buffer() + result = self.buffer[self.buffer_pos] + self.buffer_pos += 1 + if self.buffer_pos >= len(self.buffer): + self._clear_buffer() + return result + + def _clear_buffer(self): + self.buffer = None + self.buffer_pos = 0 + + def _fill_buffer(self): + self.poll.poll() # Blocking wait until we have some data. + self.buffer = self._read(1024) + self.buffer_pos = 0 + + def _read(self, limit): + # This is horrible, but don't know how to know how many bytes are available, + # so reading one by one is necessary to avoid blocking. + data = bytearray() + while len(data) < limit and len(self.poll.poll(0)) > 0: + data.extend(self.file.read(1)) + return data + + def _skip_input(self): + # This is horrible, but don't know how to know how many bytes are available, + # so reading one by one is necessary to avoid blocking. + while len(self.poll.poll(0)) > 0: + self.file.read(1) + + +def bus(): + return DeviceBus("/dev/hvc0") diff --git a/src/main/scripts/lib/micropython/robot.py b/src/main/scripts/lib/micropython/robot.py new file mode 100644 index 00000000..d8c4a59e --- /dev/null +++ b/src/main/scripts/lib/micropython/robot.py @@ -0,0 +1,63 @@ +import time + +direction = { + "forward": "forward", + "backward": "backward", + "upward": "upward", + "downward": "downward", + "left": "left", + "right": "right", +} + + +class Robot: + def __init__(self, device): + if device is None: + raise Exception("robot device not found") + self.device = device + + def energy(self): + return self.device.getEnergyStored() + + def capacity(self): + return self.device.getEnergyCapacity() + + def slot(self, value=None): + if value is not None: + self.device.setSelectedSlot(value) + return self.device.getSelectedSlot() + + def stack(self, slot=None): + return self.device.getStackInSlot(slot or self.slot()) + + def move(self, direction): + self.move_async(direction) + return self._wait_for_last_action() + + def move_async(self, direction): + if not direction: + raise Exception("no direction specified") + while not self.device.move(direction): + time.sleep(1) + + def turn(self, direction): + self.turn_async(direction) + return self._wait_for_last_action() + + def turn_async(self, direction): + if not direction: + raise Exception("no direction specified") + while not self.device.turn(direction): + time.sleep(1) + + def _wait_for_last_action(self): + id = self.device.getLastActionId() + result = self.device.getActionResult(id) + while result and result == "INCOMPLETE": + time.sleep(1) + result = self.device.getActionResult(id) + return result == "SUCCESS" + + +def robot(bus): + return Robot(bus.find("robot"))