Add python util modules for device interop and blocking robot movement.

This commit is contained in:
Florian Nücke
2022-01-17 21:14:16 +01:00
parent fc319b6d8d
commit 9090c13d62
3 changed files with 263 additions and 0 deletions

View File

@@ -0,0 +1,43 @@
#!/bin/sh
PROFILE_D_PATH="/etc/profile.d"
INIT_SCRIPT_NAME="python_path.sh"
start() {
INIT_SCRIPT_PATH="$PROFILE_D_PATH/$INIT_SCRIPT_NAME"
if [ ! -f "$INIT_SCRIPT_PATH" ]; then
THIS_SCRIPT_PATH=$(readlink -f "$0")
THIS_SCRIPT_DIR=$(dirname "$THIS_SCRIPT_PATH")
PYTHON_LIB_PATH=$(readlink -f "$THIS_SCRIPT_DIR/../lib/micropython")
echo 'if [ -z "$MICROPYPATH" ]; then' >>"$INIT_SCRIPT_PATH"
echo ' export MICROPYPATH="'$PYTHON_LIB_PATH'"' >>"$INIT_SCRIPT_PATH"
echo 'else' >>"$INIT_SCRIPT_PATH"
echo ' export MICROPYPATH="$MICROPYPATH:'$PYTHON_LIB_PATH'"' >>"$INIT_SCRIPT_PATH"
echo 'fi' >>"$INIT_SCRIPT_PATH"
fi
return 0
}
stop() {
return 0
}
reload() {
return 0
}
case "$1" in
start | stop | reload)
"$1"
;;
restart)
reload
;;
*)
echo "Usage: $0 {start|stop|reload|restart}"
exit 1
;;
esac
exit $?

View File

@@ -0,0 +1,157 @@
import io
import os
import select
import json
class Device:
def __init__(self, device_bus, device_id):
self.bus = device_bus
self.device_id = device_id
self.methods = None
def __getattr__(self, item):
return lambda *args: self.bus.invoke(self.device_id, item, *args)
def __str__(self):
if self.methods is None:
self.methods = self.bus.methods(self.device_id)
doc = ""
for method in self.methods:
doc += method["name"] + "("
if "parameters" in method:
i = 0
for p in method["parameters"]:
if i > 0:
doc += ", "
if "name" in p:
doc += p["name"]
else:
doc += "arg" + str(i)
doc += ": " + p["type"]
i += 1
doc += "): " + method["returnType"] + "\n"
if "description" in method and method["description"]:
doc += method["description"] + "\n"
if "parameters" in method:
i = 0
for p in method["parameters"]:
if "description" in p:
doc += " "
if "name" in p:
doc += p["name"]
else:
doc += "args" + str(i)
doc += " " + p["description"] + "\n"
i += 1
return doc
class DeviceBus:
MESSAGE_DELIMITER = "\0"
def __init__(self, path):
self.file = io.open(path, "+b")
os.system("stty -F %s raw -echo" % path)
self.poll = select.poll()
self.poll.register(self.file.fileno(), select.POLLIN)
self.buffer = None
self.buffer_pos = 0
def close(self):
self.file.close()
self.buffer = None
def flush(self):
self._clear_buffer()
self._skip_input()
def list(self):
self.flush()
self._write_message({'type': "list"})
return self._read_message("list")
def get(self, device_id):
for device in self.list():
if device["deviceId"] == device_id:
return Device(self, device["deviceId"])
return None
def find(self, type_name):
for device in self.list():
if "typeNames" in device and type_name in device["typeNames"]:
return Device(self, device["deviceId"])
return None
def methods(self, device_id):
self.flush()
self._write_message({"type": "methods", "data": device_id})
return self._read_message("methods")
def invoke(self, device_id, method_name, *args):
self.flush()
self._write_message({"type": "invoke", "data": {
"deviceId": device_id,
"name": method_name,
"parameters": args
}})
return self._read_message("result")
def _write_message(self, data):
self.file.write(self.MESSAGE_DELIMITER + json.dumps(data) + self.MESSAGE_DELIMITER)
def _read_message(self, expected_type):
message = ""
while True:
value = chr(self._read_one())
if value == self.MESSAGE_DELIMITER:
if message:
data = json.loads(message)
if data["type"] != expected_type:
raise Exception("unexpected message type: %s" % data["type"])
if "data" in data:
return data["data"]
else:
return
else:
message = ""
else:
message += value
def _read_one(self):
if self.buffer is None:
self._fill_buffer()
result = self.buffer[self.buffer_pos]
self.buffer_pos += 1
if self.buffer_pos >= len(self.buffer):
self._clear_buffer()
return result
def _clear_buffer(self):
self.buffer = None
self.buffer_pos = 0
def _fill_buffer(self):
self.poll.poll() # Blocking wait until we have some data.
self.buffer = self._read(1024)
self.buffer_pos = 0
def _read(self, limit):
# This is horrible, but don't know how to know how many bytes are available,
# so reading one by one is necessary to avoid blocking.
data = bytearray()
while len(data) < limit and len(self.poll.poll(0)) > 0:
data.extend(self.file.read(1))
return data
def _skip_input(self):
# This is horrible, but don't know how to know how many bytes are available,
# so reading one by one is necessary to avoid blocking.
while len(self.poll.poll(0)) > 0:
self.file.read(1)
def bus():
return DeviceBus("/dev/hvc0")

View File

@@ -0,0 +1,63 @@
import time
direction = {
"forward": "forward",
"backward": "backward",
"upward": "upward",
"downward": "downward",
"left": "left",
"right": "right",
}
class Robot:
def __init__(self, device):
if device is None:
raise Exception("robot device not found")
self.device = device
def energy(self):
return self.device.getEnergyStored()
def capacity(self):
return self.device.getEnergyCapacity()
def slot(self, value=None):
if value is not None:
self.device.setSelectedSlot(value)
return self.device.getSelectedSlot()
def stack(self, slot=None):
return self.device.getStackInSlot(slot or self.slot())
def move(self, direction):
self.move_async(direction)
return self._wait_for_last_action()
def move_async(self, direction):
if not direction:
raise Exception("no direction specified")
while not self.device.move(direction):
time.sleep(1)
def turn(self, direction):
self.turn_async(direction)
return self._wait_for_last_action()
def turn_async(self, direction):
if not direction:
raise Exception("no direction specified")
while not self.device.turn(direction):
time.sleep(1)
def _wait_for_last_action(self):
id = self.device.getLastActionId()
result = self.device.getActionResult(id)
while result and result == "INCOMPLETE":
time.sleep(1)
result = self.device.getActionResult(id)
return result == "SUCCESS"
def robot(bus):
return Robot(bus.find("robot"))