From 6c10ce9ad650f0e69aea9f5118680592e06b46f6 Mon Sep 17 00:00:00 2001 From: Jason Antwi-Appah Date: Thu, 27 Feb 2025 20:22:29 -0600 Subject: [PATCH 01/18] Remove pycache files --- .../python/__pycache__/msgs_pb2.cpython-313.pyc | Bin 2937 -> 0 bytes ws_urc/launch/__pycache__/rover.cpython-310.pyc | Bin 1174 -> 0 bytes ws_urc/launch/__pycache__/rover.cpython-311.pyc | Bin 726 -> 0 bytes 3 files changed, 0 insertions(+), 0 deletions(-) delete mode 100644 basestation/rover-msgs/python/__pycache__/msgs_pb2.cpython-313.pyc delete mode 100644 ws_urc/launch/__pycache__/rover.cpython-310.pyc delete mode 100644 ws_urc/launch/__pycache__/rover.cpython-311.pyc diff --git a/basestation/rover-msgs/python/__pycache__/msgs_pb2.cpython-313.pyc b/basestation/rover-msgs/python/__pycache__/msgs_pb2.cpython-313.pyc deleted file mode 100644 index e2454332714727765fe806fc3ee9c48bb2e7a1d4..0000000000000000000000000000000000000000 GIT binary patch literal 0 HcmV?d00001 literal 2937 zcmai0&2Jk;6rWvxtUZqNv2lXiq^avRPST_?Y6)tf1v!an()1(sCaHuXt=8U&yJ)>@ zX4fsr0p(c16$Dpqa07`WhYE>5z-cO*MLi%+IUogY2z`kpM3KZv{5p4&r-2lc10V-!$d!X2hpFJo zA&?_9>dN7Dd^1M-QfNmgoNk-$yweOH@91r*FwvGeZ%L3MQdGiHY$4o2!D_US>M+_; z8ZCGt9oh-Ko>7==uj;&|z6GwO(czs$m$i|dzQSCG*OuTbhId95AWrN7ulfq}ExCJS zEGZ7WKnn$4pzvllqa{y!kopZmQvc3~G!OzrKh7cZvd9;;hpla4(~KzV?DfaAM5bW05;LXJL_W|UJ9^tJ-oD!b$I6iyU@d% z^f8}T?l1SxWcJVAAO@j|P1IDCnzmUo^;EI8yGv-QWU6HTn8j>{bFfOR64k1Lb0alDxt^B!Q(S*U^#ctKCW10 zYu@$2`U)`I1&ASPg4wi%h1S#;Ri&4%zMF$dm&$SK{114Ce?^xx7=#QZ^+}^tHH-n1N?;?jCS$B;fD&<6r)5S zlL3g&G#>PF0ClLFCC%P*W7MgUcDByrv0i4kxsYpa77w>lE-R%H(b+*^QDc$(x5W&e zcm=uJsKAV-uVUQwMN!Os;uQdhdv57)B8dBLak}Co_ZztMMJ%-4|G#DFw{h5uZ43u2 z^0B)|T#Vz8Wh<4cyHAwW+%8q%)M~1`U36*m!-RX7F0u2NEt^^Pt`>`7_h?>mB21U7 z#mjdbypore3u_yjcMH}ioZ?rnJRV^OJR;xDOH%Ge{`fIF^q(;-EN|XllQxeJ8S<>f zV&@~bnbUrTYexG823W~zh+)6NHKP3ngA0w;HMwUNI|* zW;o%Et)*LQ%Z{)@c9oiL!zaO>vRZV8a;hp3I1swFPt*@IC|LOK0x>A-W>L{C*sLv0 zEU`xHb<>=o=8alq#c%PJ)0*Ejt6d<_yakpc%3D%iUY0v0AUOlFZo&~)WwXkTiRBE- zmhX)Z4t0Z2i`0gNT|8o_-=P=aHgfJ~w_w7`mP`XKDN3q3%UD(=53^S30o*&6peR3r z_G@Lg3d>n`$ACe$cebmBt9$kXK-zUXU)^&e*Pu7nbn+g33-*C3L)ND-o&-3K`>PLe zndfNXIeNeSyV^v94U|1Z*=8`&2#(c*V~t?C9!xibp=L1L9tZ!93mkuvK(Vm~P9Ng* zQ+(k-IKilYq7k1wj88s|Pc`C;_4wimLhr7R!$?Ce}+9Uvge zq9}ZiTYrN99+1q-$q6Bjry0##CoSY#;Q!{G_|QJ~`oSCY^!mhZJ?7IQdG<)?M9Ip? zvOuMORDwvH$N3`2^l%^I)~^wO{OlRe5TNc{XhR?gtkk#)YshzC9kJ{9v<j6C8aBU!nEZSsNp6!yVig zBGQ##33mDQ(V{y%*b9fFaX1V|W4omU&$KAC*kcLLh2nPSe=1gB_cmzki)$*4m%d?A zW~E}4Dkm}%wlms~=KJGlem7d&08yD|EH7c2iLepQ{nkan<4iDpS|m?wFXmdXiNfqm zf?cb{OcpN|+dPwL!Rp|!iRt@xrQ*p8o?<#N7s=GPqpV+Gxb0Nj9b<&j)=w)vx2uRv z`D;m;=d3D~S(pKc*+2^=c{YeocO?v_I^I2&nM}%pJ?61yhd=BMqrD^c^!On&C42hd z$0zKel8_2^SQbW=Stb-4jz{s`;XxF|@kqcBf7*NdkVJxyN27zuJ=m3#DB5*NBipG9 z(3KD%fBDw)Y)j@-yNXa3*l($I93$0r;3ms%%p{1CX@eJnH)E-+YzLH_38T0y?B+lG zvPJWg)Q08Lxeai7g(w!6v7w1f2T2ZWCbIC9=UKn~-%LR{HPzIp4VQM^U4S;+jlR8f zL1@aP$~`XCLg*825c&b-wMO>e)6Thte*Bj{X!Ml$#Q*!;KcLPl_*31%bVvQR u4VrT5M(p;oHdX47C$4E-H&=Vtja`7-c!FL1u9@ol?ms{W2wksBy7Uv=NJv`% diff --git a/ws_urc/launch/__pycache__/rover.cpython-311.pyc b/ws_urc/launch/__pycache__/rover.cpython-311.pyc deleted file mode 100644 index 9530566794dc3f54ec324511b0f94ee3bea20938..0000000000000000000000000000000000000000 GIT binary patch literal 0 HcmV?d00001 literal 726 zcmZWmJ8u&~5T3o4eZ~*Yv2YO{1(Dz+lt|Qw0#PD26i9AP$L(NS+)I0RfpjVO5BUc{ zDEyd|$4ZfqsOTnZ75GegZ^9~#=C84c#3C;jSxQc8l5vc5%Qd*9 zP1?&j!3mUX?I%?#|5`ZX1~<9&-9EJd&SBgf$_-rmlc8K>0s?0;us=}x34FpLXK z3w$0#VS*x*_qJR?N_=JK#xETj2Ql?6*~-J{V>qKQe#WRO!}l>tE2UP_$*MB1N;%}1 zm-AqeP+D-fz)vXT-P=ChzI!|F8$yjO)P&z)siwRh#QG@@M0SW`KR=SYGfataiNPuo z@S9f_BPvnV#XEvyxVUrwqTQ`WPwMeO?LDY>b{p4rJC{IkrQlB&++IC;T95tOd$b*I z2K|T5CG@+kD+LYo?Uoo)?}=|qXGK7Bmcun0krn%4q`th+b7)G!qY_;wcF4MJzeqA( k#rRT;>8L)Zcty~djIkQ_&No*LkIy$(V+|OQ*#>>bU-=@p)&Kwi From 2aac74dbf1a3fe7a81aa8b169891124ad7da0046 Mon Sep 17 00:00:00 2001 From: Jason Antwi-Appah Date: Thu, 27 Feb 2025 20:29:38 -0600 Subject: [PATCH 02/18] Initial radiodriver implementation --- radiodriver-proxy/.gitignore | 2 + radiodriver-proxy/README.md | 5 ++ radiodriver-proxy/proxy.py | 97 ++++++++++++++++++++++++++++++++++++ 3 files changed, 104 insertions(+) create mode 100644 radiodriver-proxy/.gitignore create mode 100644 radiodriver-proxy/README.md create mode 100644 radiodriver-proxy/proxy.py diff --git a/radiodriver-proxy/.gitignore b/radiodriver-proxy/.gitignore new file mode 100644 index 00000000..408d9c5d --- /dev/null +++ b/radiodriver-proxy/.gitignore @@ -0,0 +1,2 @@ +.venv +radiodriver-proxy.log \ No newline at end of file diff --git a/radiodriver-proxy/README.md b/radiodriver-proxy/README.md new file mode 100644 index 00000000..74f41b8d --- /dev/null +++ b/radiodriver-proxy/README.md @@ -0,0 +1,5 @@ +# radiodriver-proxy + +Service which will be used by the base station and ROS nodes to handle logic for sending messages to the [radio driver](http://github.com/comet-robotics/urc-radiodriver). basestation and ROS nodes send messages to this process via ____ (TBD... websockets? tcp? unix pipes? unix sockets?), then this process handles message queueing and sending to the radio driver via serial. + +There is additional logic needed to wrap messages before they are sent to the radio driver which this process handles, reducing redundant code in each component. diff --git a/radiodriver-proxy/proxy.py b/radiodriver-proxy/proxy.py new file mode 100644 index 00000000..b578ea9b --- /dev/null +++ b/radiodriver-proxy/proxy.py @@ -0,0 +1,97 @@ +import threading +import enum +import logging +import time +logger = logging.getLogger(__name__) + +RADIODRIVER_SERIAL_PORT = '/dev/tty.debug-console' +LOG_PATH = 'radiodriver-proxy.log' + +messages: list[str] = [] +kiss_message_queue: list[bytes] = [] + +class KISSChars(enum.Enum): + FEND = 0xC0 # Frame End + FESC = 0xDB # Frame Escape + TFEND = 0xDC # Transposed Frame End + TFESC = 0xDD # Transposed Frame Escape + + +def format_kiss_message(chars: str, tnc_port: int = 0) -> bytes: + tnc_port_valid = tnc_port >= 0 and tnc_port <= 9 + if not tnc_port_valid: + raise ValueError("TNC Port must be between 0 and 9") + + packet = bytearray() + packet.append(KISSChars.FEND.value) + + dataframe = tnc_port << 4 + packet.append(dataframe) + + for char in chars: + if char == KISSChars.FEND.value: + packet.append(KISSChars.FESC.value) + packet.append(KISSChars.TFEND.value) + elif char == KISSChars.FESC.value: + packet.append(KISSChars.FESC.value) + packet.append(KISSChars.TFESC.value) + else: + packet.append(ord(char)) + + packet.append(KISSChars.FEND.value) + return packet + +def serial_manager(): + logger = logging.getLogger("SerialManager") + logger.info("Starting serial manager...") + with open(RADIODRIVER_SERIAL_PORT, "wb") as ser: + logger.info(f"Serial port opened: {ser.name}") + while True: + time.sleep(0.001) + while len(kiss_message_queue) > 0: + m = kiss_message_queue.pop(0) + logger.info("Writing message to serial") + ser.write(m) + logger.info("Message written to serial") + +def message_formatter(): + logger = logging.getLogger("MessageFormatter") + logger.info("Starting message formatter...") + while True: + time.sleep(0.001) + while len(messages) > 0: + logger.info("Formatting message...") + formatted_message = format_kiss_message(messages.pop(0)) + logger.info("Formatted message") + kiss_message_queue.append(formatted_message) + logger.info("Message added to queue") + + +def mock_message_receiver(): + logger = logging.getLogger("MessageReceiver") + logger.info("Starting message receiver...") + # TODO: receive messages from ros/basestation + import json, datetime, random + while True: + fake_msg = json.dumps({"random": random.random(), "time": datetime.datetime.now().isoformat()}) + logger.info(f"Received message: {fake_msg}") + messages.append(fake_msg) + time.sleep(1) + +def main(): + print(f"Logging to {LOG_PATH}.") + + logging.basicConfig(level=logging.INFO, filename=LOG_PATH) + logger.info("--- STARTING RADIODRIVER PROXY ---") + + message_receiver_thread = threading.Thread(target=mock_message_receiver) + message_receiver_thread.start() + + serial_thread = threading.Thread(target=serial_manager) + serial_thread.start() + + message_formatter_thread = threading.Thread(target=message_formatter) + message_formatter_thread.start() + +if __name__ == "__main__": + main() \ No newline at end of file From 336aae8cf9cd8cc931a1133ea63242ec1caf40e2 Mon Sep 17 00:00:00 2001 From: Jason Antwi-Appah Date: Fri, 28 Feb 2025 00:27:04 -0600 Subject: [PATCH 03/18] Receive messages over unix socket --- radiodriver-proxy/proxy.py | 83 ++++++++++++++++++++++++++++++++------ 1 file changed, 71 insertions(+), 12 deletions(-) diff --git a/radiodriver-proxy/proxy.py b/radiodriver-proxy/proxy.py index b578ea9b..e1b55122 100644 --- a/radiodriver-proxy/proxy.py +++ b/radiodriver-proxy/proxy.py @@ -2,12 +2,14 @@ import enum import logging import time +import socket logger = logging.getLogger(__name__) RADIODRIVER_SERIAL_PORT = '/dev/tty.debug-console' -LOG_PATH = 'radiodriver-proxy.log' +UNIX_SOCKET_PATH = '/var/run/radiodriver-proxy.sock' +SLEEP_TIME = 0.001 -messages: list[str] = [] +messages: list[bytes] = [] kiss_message_queue: list[bytes] = [] class KISSChars(enum.Enum): @@ -17,7 +19,7 @@ class KISSChars(enum.Enum): TFESC = 0xDD # Transposed Frame Escape -def format_kiss_message(chars: str, tnc_port: int = 0) -> bytes: +def format_kiss_message(data: bytes, tnc_port: int = 0) -> bytes: tnc_port_valid = tnc_port >= 0 and tnc_port <= 9 if not tnc_port_valid: raise ValueError("TNC Port must be between 0 and 9") @@ -28,7 +30,7 @@ def format_kiss_message(chars: str, tnc_port: int = 0) -> bytes: dataframe = tnc_port << 4 packet.append(dataframe) - for char in chars: + for char in data: if char == KISSChars.FEND.value: packet.append(KISSChars.FESC.value) packet.append(KISSChars.TFEND.value) @@ -36,7 +38,7 @@ def format_kiss_message(chars: str, tnc_port: int = 0) -> bytes: packet.append(KISSChars.FESC.value) packet.append(KISSChars.TFESC.value) else: - packet.append(ord(char)) + packet.append(char) packet.append(KISSChars.FEND.value) return packet @@ -68,23 +70,80 @@ def message_formatter(): def mock_message_receiver(): - logger = logging.getLogger("MessageReceiver") + logger = logging.getLogger("MockMessageReceiver") logger.info("Starting message receiver...") - # TODO: receive messages from ros/basestation import json, datetime, random while True: fake_msg = json.dumps({"random": random.random(), "time": datetime.datetime.now().isoformat()}) logger.info(f"Received message: {fake_msg}") messages.append(fake_msg) - time.sleep(1) -def main(): - print(f"Logging to {LOG_PATH}.") +class ListeningFor(enum.IntEnum): + PAYLOAD_SIZE = 0 + PAYLOAD = 1 + +DEFAULT_CHUNK_SIZE = 1024 +def receive_all(conn: socket.socket, size: int) -> bytes: + """ + Receive an exact number of bytes from a socket connection. + Implements chunked reading with a reasonable buffer size. + """ + chunks = [] + bytes_recd = 0 + while bytes_recd < size: + remaining_bytes = size - bytes_recd + + chunk = conn.recv(min(remaining_bytes, DEFAULT_CHUNK_SIZE)) + if not chunk: # Connection was closed + raise ConnectionError("Socket connection closed while receiving data") + chunks.append(chunk) + bytes_recd += len(chunk) + return b''.join(chunks) + +def unix_socket_message_receiver(): + import socket, struct, os + + logger = logging.getLogger("UnixSocketMessageReceiver") + logger.info("Starting Unix socket message receiver...") + os.unlink(UNIX_SOCKET_PATH) + sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) + sock.bind(UNIX_SOCKET_PATH) + sock.listen(1) + logger.info(f"Listening on {UNIX_SOCKET_PATH}") + + listening_for = ListeningFor.PAYLOAD_SIZE + expected_payload_size = -1 + + conn, addr = sock.accept() + while True: + logger.debug(f"Listening for next {listening_for.name}...") + try: + if listening_for == ListeningFor.PAYLOAD_SIZE: + buf = receive_all(conn, 4) # Always read exactly 4 bytes for size + size = struct.unpack('!I', buf)[0] + logger.info(f"Expected payload size: {size}") + expected_payload_size = size + listening_for = ListeningFor.PAYLOAD + elif listening_for == ListeningFor.PAYLOAD: + buf = receive_all(conn, expected_payload_size) + messages.append(buf) + logger.debug("Received message") + listening_for = ListeningFor.PAYLOAD_SIZE + except ConnectionError as e: + logger.error(f"Connection error: {e}") + break # Exit loop on connection error + except Exception as e: + logger.error(f"Error receiving data: {e}") + listening_for = ListeningFor.PAYLOAD_SIZE # Reset state on error + continue + + time.sleep(SLEEP_TIME) - logging.basicConfig(level=logging.INFO, filename=LOG_PATH) +def main(): + logging.basicConfig(level=logging.DEBUG, format='[%(levelname)s] %(asctime)s: %(message)s',) logger.info("--- STARTING RADIODRIVER PROXY ---") - message_receiver_thread = threading.Thread(target=mock_message_receiver) + message_receiver_thread = threading.Thread(target=unix_socket_message_receiver) message_receiver_thread.start() serial_thread = threading.Thread(target=serial_manager) From b5a39be660868bffd4702395e324dd10622abd36 Mon Sep 17 00:00:00 2001 From: Jason Antwi-Appah Date: Fri, 28 Feb 2025 00:39:39 -0600 Subject: [PATCH 04/18] Standardize sleep time, readd source to proxy logs --- radiodriver-proxy/proxy.py | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/radiodriver-proxy/proxy.py b/radiodriver-proxy/proxy.py index e1b55122..be8243f5 100644 --- a/radiodriver-proxy/proxy.py +++ b/radiodriver-proxy/proxy.py @@ -49,7 +49,7 @@ def serial_manager(): with open(RADIODRIVER_SERIAL_PORT, "wb") as ser: logger.info(f"Serial port opened: {ser.name}") while True: - time.sleep(0.001) + time.sleep(SLEEP_TIME) while len(kiss_message_queue) > 0: m = kiss_message_queue.pop(0) logger.info("Writing message to serial") @@ -60,7 +60,7 @@ def message_formatter(): logger = logging.getLogger("MessageFormatter") logger.info("Starting message formatter...") while True: - time.sleep(0.001) + time.sleep(SLEEP_TIME) while len(messages) > 0: logger.info("Formatting message...") formatted_message = format_kiss_message(messages.pop(0)) @@ -77,6 +77,7 @@ def mock_message_receiver(): fake_msg = json.dumps({"random": random.random(), "time": datetime.datetime.now().isoformat()}) logger.info(f"Received message: {fake_msg}") messages.append(fake_msg) + time.sleep(SLEEP_TIME) class ListeningFor(enum.IntEnum): PAYLOAD_SIZE = 0 @@ -140,7 +141,7 @@ def unix_socket_message_receiver(): time.sleep(SLEEP_TIME) def main(): - logging.basicConfig(level=logging.DEBUG, format='[%(levelname)s] %(asctime)s: %(message)s',) + logging.basicConfig(level=logging.DEBUG, format='[%(levelname)s] %(asctime)s | %(name)s: %(message)s',) logger.info("--- STARTING RADIODRIVER PROXY ---") message_receiver_thread = threading.Thread(target=unix_socket_message_receiver) From 60384400f1fbe7a33f40dcd702335fc9a1511f60 Mon Sep 17 00:00:00 2001 From: Jason Antwi-Appah Date: Fri, 28 Feb 2025 00:53:39 -0600 Subject: [PATCH 05/18] Add ham link to fake rover --- basestation/fake-rover/rover.py | 15 +++++++++++++-- 1 file changed, 13 insertions(+), 2 deletions(-) diff --git a/basestation/fake-rover/rover.py b/basestation/fake-rover/rover.py index 3b552a2a..978ffb60 100644 --- a/basestation/fake-rover/rover.py +++ b/basestation/fake-rover/rover.py @@ -7,6 +7,7 @@ import random import struct import os +from enum import IntEnum current_dir = os.path.dirname(os.path.abspath(__file__)) @@ -21,6 +22,12 @@ from google.protobuf.message import Message import msgs_pb2 as rover +class BaseStationConnectionMethod(IntEnum): + TCP = 0 + HAM_LINK = 1 + +CONNECT_MODE = BaseStationConnectionMethod.HAM_LINK + # Constants RETRY_DELAY = 1 # seconds STARTING_LAT = 41.745161 @@ -199,8 +206,12 @@ def signal_handler(sig, frame): # Connect to TCP server with retry logic while running: try: - stream = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - stream.connect(("localhost", 8000)) + if CONNECT_MODE == BaseStationConnectionMethod.TCP: + stream = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + stream.connect(("localhost", 8000)) + elif CONNECT_MODE == BaseStationConnectionMethod.HAM_LINK: + stream = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) + stream.connect("/var/run/radiodriver-proxy.sock") stream.setblocking(False) break # Connection successful, exit the loop except socket.error as e: From 64da9814038486ae07dbcca1046c0a844f79630b Mon Sep 17 00:00:00 2001 From: Jason Antwi-Appah Date: Sat, 1 Mar 2025 01:28:50 -0600 Subject: [PATCH 06/18] Update radiodriver-proxy README --- radiodriver-proxy/README.md | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/radiodriver-proxy/README.md b/radiodriver-proxy/README.md index 74f41b8d..e4aa0398 100644 --- a/radiodriver-proxy/README.md +++ b/radiodriver-proxy/README.md @@ -1,5 +1,8 @@ # radiodriver-proxy -Service which will be used by the base station and ROS nodes to handle logic for sending messages to the [radio driver](http://github.com/comet-robotics/urc-radiodriver). basestation and ROS nodes send messages to this process via ____ (TBD... websockets? tcp? unix pipes? unix sockets?), then this process handles message queueing and sending to the radio driver via serial. +This service is for the the base station and ROS nodes to handle logic for sending and receiving messages with the [radio driver](http://github.com/comet-robotics/urc-radiodriver), as an alternative/backup option for the Wi-Fi TCP uplink. There is additional logic needed to [un-]wrap messages before they are sent/received to the radio driver which this process handles, removing the need to implement serial support and KISS framing support in both the base station and ROS nodes. Both the base station and rover will have local instances of this service running. + +To send a message via radio, other processes simply write an integer representing the size of the message in bytes, followed by the message data, to the Unix socket at `/var/run/radiodriver-proxy.sock`. The proxy will receive the message, KISS frame it, and send it to the radio driver via serial for transmission. + +When the radio driver receives a message from the radio, it sends the message back to this proxy over serial. Then, the proxy will unframe the message, and send it to the receiving process via Unix socket. The receiving process will listen for data on the Unix socket at `/var/run/radiodriver-proxy.sock`, again consisting of an integer, representing the size of the message in bytes, followed by the message data. -There is additional logic needed to wrap messages before they are sent to the radio driver which this process handles, reducing redundant code in each component. From a79154c06c7b2cff2be193fb2e46441b565d40ad Mon Sep 17 00:00:00 2001 From: Jason Antwi-Appah Date: Sat, 1 Mar 2025 01:31:39 -0600 Subject: [PATCH 07/18] Begin radio transmit stuff --- radiodriver-proxy/proxy.py | 64 ++++++++++++++++++++++++++------------ 1 file changed, 44 insertions(+), 20 deletions(-) diff --git a/radiodriver-proxy/proxy.py b/radiodriver-proxy/proxy.py index be8243f5..b04ad91c 100644 --- a/radiodriver-proxy/proxy.py +++ b/radiodriver-proxy/proxy.py @@ -3,14 +3,19 @@ import logging import time import socket +import os logger = logging.getLogger(__name__) RADIODRIVER_SERIAL_PORT = '/dev/tty.debug-console' UNIX_SOCKET_PATH = '/var/run/radiodriver-proxy.sock' SLEEP_TIME = 0.001 -messages: list[bytes] = [] -kiss_message_queue: list[bytes] = [] +unframed_messages_to_tx: list[bytes] = [] +framed_messages_to_tx: list[bytes] = [] + +serial_read_buffer = b'' +framed_messages_from_rx: list[bytes] = [] +unframed_messages_from_rx: list[bytes] = [] class KISSChars(enum.Enum): FEND = 0xC0 # Frame End @@ -45,38 +50,57 @@ def format_kiss_message(data: bytes, tnc_port: int = 0) -> bytes: def serial_manager(): logger = logging.getLogger("SerialManager") + global framed_messages_to_tx, serial_read_buffer logger.info("Starting serial manager...") with open(RADIODRIVER_SERIAL_PORT, "wb") as ser: + # switching to nonblocking file reads so we can quickly switch over to tx if there is nothing to read - https://stackoverflow.com/a/66410605 + os.set_blocking(ser.fileno(), False) + logger.info(f"Serial port opened: {ser.name}") while True: time.sleep(SLEEP_TIME) - while len(kiss_message_queue) > 0: - m = kiss_message_queue.pop(0) + + while len(framed_messages_to_tx) > 0: + m = framed_messages_to_tx.pop(0) logger.info("Writing message to serial") ser.write(m) logger.info("Message written to serial") + char = ser.read(1) + if len(char) > 0: + serial_read_buffer += char + + def message_formatter(): logger = logging.getLogger("MessageFormatter") logger.info("Starting message formatter...") while True: time.sleep(SLEEP_TIME) - while len(messages) > 0: + while len(unframed_messages_to_tx) > 0: logger.info("Formatting message...") - formatted_message = format_kiss_message(messages.pop(0)) + formatted_message = format_kiss_message(unframed_messages_to_tx.pop(0)) logger.info("Formatted message") - kiss_message_queue.append(formatted_message) + framed_messages_to_tx.append(formatted_message) logger.info("Message added to queue") - -def mock_message_receiver(): - logger = logging.getLogger("MockMessageReceiver") - logger.info("Starting message receiver...") + while len(serial_read_buffer) > 0: + pass + # TODO: Handle serial read buffer + + +def mock_message_transport(): + logger = logging.getLogger("MockMessageTransport") + logger.info("Starting message transport...") import json, datetime, random while True: fake_msg = json.dumps({"random": random.random(), "time": datetime.datetime.now().isoformat()}) - logger.info(f"Received message: {fake_msg}") - messages.append(fake_msg) + logger.info(f"Message from process: {fake_msg}") + unframed_messages_to_tx.append(fake_msg.encode()) + + while len(unframed_messages_from_rx) > 0: + msg = unframed_messages_from_rx.pop(0) + logger.info(f"Message from radio: {msg}") + time.sleep(SLEEP_TIME) class ListeningFor(enum.IntEnum): @@ -101,11 +125,11 @@ def receive_all(conn: socket.socket, size: int) -> bytes: bytes_recd += len(chunk) return b''.join(chunks) -def unix_socket_message_receiver(): - import socket, struct, os +def unix_socket_message_transport(): + import socket, struct - logger = logging.getLogger("UnixSocketMessageReceiver") - logger.info("Starting Unix socket message receiver...") + logger = logging.getLogger("UnixSocketMessageTransport") + logger.info("Starting Unix socket message transport...") os.unlink(UNIX_SOCKET_PATH) sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) sock.bind(UNIX_SOCKET_PATH) @@ -127,7 +151,7 @@ def unix_socket_message_receiver(): listening_for = ListeningFor.PAYLOAD elif listening_for == ListeningFor.PAYLOAD: buf = receive_all(conn, expected_payload_size) - messages.append(buf) + unframed_messages_to_tx.append(buf) logger.debug("Received message") listening_for = ListeningFor.PAYLOAD_SIZE except ConnectionError as e: @@ -144,8 +168,8 @@ def main(): logging.basicConfig(level=logging.DEBUG, format='[%(levelname)s] %(asctime)s | %(name)s: %(message)s',) logger.info("--- STARTING RADIODRIVER PROXY ---") - message_receiver_thread = threading.Thread(target=unix_socket_message_receiver) - message_receiver_thread.start() + message_transport_thread = threading.Thread(target=unix_socket_message_transport) + message_transport_thread.start() serial_thread = threading.Thread(target=serial_manager) serial_thread.start() From 1fd41e4a4c3b2124db86bbd9666c247f9abb452e Mon Sep 17 00:00:00 2001 From: Jason Antwi-Appah Date: Tue, 18 Mar 2025 21:40:32 -0500 Subject: [PATCH 08/18] Add design breakdown and sequence diagram to README --- radiodriver-proxy/README.md | 64 +++++++++++++++++++++++++++++++++++-- radiodriver-proxy/proxy.py | 16 ++++++---- 2 files changed, 71 insertions(+), 9 deletions(-) diff --git a/radiodriver-proxy/README.md b/radiodriver-proxy/README.md index e4aa0398..7febf30e 100644 --- a/radiodriver-proxy/README.md +++ b/radiodriver-proxy/README.md @@ -1,8 +1,68 @@ # radiodriver-proxy -This service is for the the base station and ROS nodes to handle logic for sending and receiving messages with the [radio driver](http://github.com/comet-robotics/urc-radiodriver), as an alternative/backup option for the Wi-Fi TCP uplink. There is additional logic needed to [un-]wrap messages before they are sent/received to the radio driver which this process handles, removing the need to implement serial support and KISS framing support in both the base station and ROS nodes. Both the base station and rover will have local instances of this service running. +## Overview +This service is for the the base station and ROS nodes to handle logic for sending and receiving messages with the [radio driver](http://github.com/comet-robotics/urc-radiodriver), as an alternative/backup option for the Wi-Fi TCP uplink. There is additional logic needed to \[un-\]wrap messages before they are sent/received to the radio driver which this process handles, removing the need to implement serial support and KISS framing support in both the base station and ROS nodes. Both the base station and rover will have local instances of this service running. -To send a message via radio, other processes simply write an integer representing the size of the message in bytes, followed by the message data, to the Unix socket at `/var/run/radiodriver-proxy.sock`. The proxy will receive the message, KISS frame it, and send it to the radio driver via serial for transmission. +To send a message via radio, other processes simply write an integer representing the size of the upcoming data in bytes, followed by the message data, to the Unix socket at `/var/run/radiodriver-proxy.sock`. The proxy will receive the message, frame it, and send it to the radio driver via serial for transmission. When the radio driver receives a message from the radio, it sends the message back to this proxy over serial. Then, the proxy will unframe the message, and send it to the receiving process via Unix socket. The receiving process will listen for data on the Unix socket at `/var/run/radiodriver-proxy.sock`, again consisting of an integer, representing the size of the message in bytes, followed by the message data. +## Design +The proxy runs 3 threads which each take care of 1-2 tasks in order to prevent issues with blocking. The threads pass data on to each other using through a number of shared queues. + +The message transport thread handles sending and receiving messages (bytes) over the Unix socket used to communicate with upstream applications like the base station and ROS nodes. The serial manager thread handles sending and receiving bytes over the serial port used to communicate with the radio driver. The message formatter thread handles framing messages before they are passed to the serial manager thread, and unframing messages when they are received by the serial manager thread. + +This diagram shows the flow of a 2-byte message, 'Hi', being sent from the base station to the rover via ham link. + + + + +```mermaid +--- +title: Base Station sending "Hi", a 2-byte message to the rover +--- + +sequenceDiagram + autonumber + participant Base as Base Station server + box Green radiodriver-proxy running on Base Station + participant BSocket as Unix Socket Message Transport thread + participant BMsg as Message Formatter thread + participant BSerial as Serial Manager thread + end + participant BDriver as Base Station radio driver + participant BRadio as Base Station transceiver + Base ->> BSocket: Sends `2Hi` via Unix Socket + BSocket ->> BMsg: Pushes `Hi` into shared queue of
messages that need to be framed + BMsg ->> BSerial: Pushes `\xc0\x00Hi\xc0` into shared queue of
messages that have been framed + BSerial ->> BDriver: Sends `\xc0\x00Hi\xc0`
over serial + BDriver ->> BRadio: Sends `\xc0\x00Hi\xc0` to radio over SPI/UART
(idk which lol) + + BRadio ->> RRadio: Transmits `\xc0\x00Hi\xc0` via
magic airwave stuff + + RDriver ->> RRadio: Receives `\xc0\x00Hi\xc0` over SPI/UART (again idk which lol) + RSerial ->> RDriver: Receives `\xc0\x00Hi\xc0` over serial + RMsg ->> RSerial: Pulls `\xc0\x00Hi\xc0` off shared queue of
messages that need to be unframed + RSocket ->> RMsg: Pulls `Hi` off shared queue of
messages that have been unframed + Rover ->> RSocket: Receives `Hi` via Unix Socket + + + participant RRadio as Rover transceiver + participant RDriver as Rover radio driver + box Green radiodriver-proxy running on Rover + participant RSerial as Serial Manager thread + participant RMsg as Message Formatter thread + participant RSocket as Unix Socket Message Transport thread + end + participant Rover +``` + +--- +TODOs +- [x] Receive messages from other processes to transmit with radiodriver via Unix socket +- [ ] Send received messages from radiodriver to other processes via Unix socket +- [x] Implement message TX over Unix socket for fake rover +- [ ] Implement message RX over Unix socket for base station +- [ ] Implement message TX over Unix socket for base station +- [ ] Handle exit signals on proxy +- [ ] Test everything \ No newline at end of file diff --git a/radiodriver-proxy/proxy.py b/radiodriver-proxy/proxy.py index b04ad91c..bc9ee1e9 100644 --- a/radiodriver-proxy/proxy.py +++ b/radiodriver-proxy/proxy.py @@ -4,6 +4,11 @@ import time import socket import os +import struct +import json +import datetime +import random + logger = logging.getLogger(__name__) RADIODRIVER_SERIAL_PORT = '/dev/tty.debug-console' @@ -11,7 +16,7 @@ SLEEP_TIME = 0.001 unframed_messages_to_tx: list[bytes] = [] -framed_messages_to_tx: list[bytes] = [] +framed_messages_to_tx: list[bytearray] = [] serial_read_buffer = b'' framed_messages_from_rx: list[bytes] = [] @@ -24,7 +29,7 @@ class KISSChars(enum.Enum): TFESC = 0xDD # Transposed Frame Escape -def format_kiss_message(data: bytes, tnc_port: int = 0) -> bytes: +def format_kiss_message(data: bytes, tnc_port: int = 0) -> bytearray: tnc_port_valid = tnc_port >= 0 and tnc_port <= 9 if not tnc_port_valid: raise ValueError("TNC Port must be between 0 and 9") @@ -91,7 +96,6 @@ def message_formatter(): def mock_message_transport(): logger = logging.getLogger("MockMessageTransport") logger.info("Starting message transport...") - import json, datetime, random while True: fake_msg = json.dumps({"random": random.random(), "time": datetime.datetime.now().isoformat()}) logger.info(f"Message from process: {fake_msg}") @@ -126,8 +130,6 @@ def receive_all(conn: socket.socket, size: int) -> bytes: return b''.join(chunks) def unix_socket_message_transport(): - import socket, struct - logger = logging.getLogger("UnixSocketMessageTransport") logger.info("Starting Unix socket message transport...") os.unlink(UNIX_SOCKET_PATH) @@ -171,8 +173,8 @@ def main(): message_transport_thread = threading.Thread(target=unix_socket_message_transport) message_transport_thread.start() - serial_thread = threading.Thread(target=serial_manager) - serial_thread.start() + serial_manager_thread = threading.Thread(target=serial_manager) + serial_manager_thread.start() message_formatter_thread = threading.Thread(target=message_formatter) message_formatter_thread.start() From 648d62e3960656120604e597b266d340a1b5e4eb Mon Sep 17 00:00:00 2001 From: Jason Antwi-Appah Date: Tue, 18 Mar 2025 22:05:45 -0500 Subject: [PATCH 09/18] Explain purpose of shared queues --- radiodriver-proxy/proxy.py | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/radiodriver-proxy/proxy.py b/radiodriver-proxy/proxy.py index bc9ee1e9..c732c824 100644 --- a/radiodriver-proxy/proxy.py +++ b/radiodriver-proxy/proxy.py @@ -15,11 +15,19 @@ UNIX_SOCKET_PATH = '/var/run/radiodriver-proxy.sock' SLEEP_TIME = 0.001 +# Queue of messages that need to be framed before being sent to the radio driver via serial. Queue is populated by the message transport thread and consumed by the message formatter thread. unframed_messages_to_tx: list[bytes] = [] + +# Queue of framed messages that are ready to be sent to the radio driver via serial. Queue is populated by the message formatter thread, consumed by the serial manager thread. framed_messages_to_tx: list[bytearray] = [] +# Buffer used to store bytes as they are received from the serial port. Populated by the serial manager thread, consumed by the message formatter thread. Once a full message has been received, it is added to the framed_messages_from_rx queue. serial_read_buffer = b'' + +# Queue of framed messages that have been received from the radio driver via serial. Queue is populated and consumed by the message formatter thread. framed_messages_from_rx: list[bytes] = [] + +# Queue of messages that have been unframed by the message formatter thread. Queue is populated by the message transport thread and consumed by the message transport thread. unframed_messages_from_rx: list[bytes] = [] class KISSChars(enum.Enum): From 859be322b059698cb6ddd1e32a2eed48e8037114 Mon Sep 17 00:00:00 2001 From: Jason Antwi-Appah Date: Tue, 18 Mar 2025 22:31:23 -0500 Subject: [PATCH 10/18] Send received messages from radiodriver to other processes via Unix socket --- radiodriver-proxy/README.md | 3 ++- radiodriver-proxy/proxy.py | 7 ++++++- 2 files changed, 8 insertions(+), 2 deletions(-) diff --git a/radiodriver-proxy/README.md b/radiodriver-proxy/README.md index 7febf30e..383b433e 100644 --- a/radiodriver-proxy/README.md +++ b/radiodriver-proxy/README.md @@ -60,8 +60,9 @@ sequenceDiagram --- TODOs - [x] Receive messages from other processes to transmit with radiodriver via Unix socket -- [ ] Send received messages from radiodriver to other processes via Unix socket +- [x] Send received messages from radiodriver to other processes via Unix socket - [x] Implement message TX over Unix socket for fake rover +- [ ] Handle unframing of messages received from radiodriver - [ ] Implement message RX over Unix socket for base station - [ ] Implement message TX over Unix socket for base station - [ ] Handle exit signals on proxy diff --git a/radiodriver-proxy/proxy.py b/radiodriver-proxy/proxy.py index c732c824..a916436d 100644 --- a/radiodriver-proxy/proxy.py +++ b/radiodriver-proxy/proxy.py @@ -27,7 +27,7 @@ # Queue of framed messages that have been received from the radio driver via serial. Queue is populated and consumed by the message formatter thread. framed_messages_from_rx: list[bytes] = [] -# Queue of messages that have been unframed by the message formatter thread. Queue is populated by the message transport thread and consumed by the message transport thread. +# Queue of messages that have been unframed by the message formatter thread. Queue is populated by the message transport thread and consumed by the message transport thread. unframed_messages_from_rx: list[bytes] = [] class KISSChars(enum.Enum): @@ -164,6 +164,11 @@ def unix_socket_message_transport(): unframed_messages_to_tx.append(buf) logger.debug("Received message") listening_for = ListeningFor.PAYLOAD_SIZE + + if len(unframed_messages_from_rx) > 0: + msg = unframed_messages_from_rx.pop(0) + conn.sendall(msg) + logger.debug("Message sent") except ConnectionError as e: logger.error(f"Connection error: {e}") break # Exit loop on connection error From d587936a3b6cd76c82e32d56619b246c1ee0c3c5 Mon Sep 17 00:00:00 2001 From: Jason Antwi-Appah Date: Tue, 18 Mar 2025 23:53:43 -0500 Subject: [PATCH 11/18] Begin serial line parsing method --- radiodriver-proxy/README.md | 1 + radiodriver-proxy/proxy.py | 55 +++++++++++++++++++++++++++++-------- 2 files changed, 44 insertions(+), 12 deletions(-) diff --git a/radiodriver-proxy/README.md b/radiodriver-proxy/README.md index 383b433e..5f3de968 100644 --- a/radiodriver-proxy/README.md +++ b/radiodriver-proxy/README.md @@ -63,6 +63,7 @@ TODOs - [x] Send received messages from radiodriver to other processes via Unix socket - [x] Implement message TX over Unix socket for fake rover - [ ] Handle unframing of messages received from radiodriver +- [ ] Use queue.SimpleQueue instead of arrays as queues :skull: - [ ] Implement message RX over Unix socket for base station - [ ] Implement message TX over Unix socket for base station - [ ] Handle exit signals on proxy diff --git a/radiodriver-proxy/proxy.py b/radiodriver-proxy/proxy.py index a916436d..5dfdea24 100644 --- a/radiodriver-proxy/proxy.py +++ b/radiodriver-proxy/proxy.py @@ -8,6 +8,7 @@ import json import datetime import random +from typing import NamedTuple logger = logging.getLogger(__name__) @@ -21,11 +22,9 @@ # Queue of framed messages that are ready to be sent to the radio driver via serial. Queue is populated by the message formatter thread, consumed by the serial manager thread. framed_messages_to_tx: list[bytearray] = [] -# Buffer used to store bytes as they are received from the serial port. Populated by the serial manager thread, consumed by the message formatter thread. Once a full message has been received, it is added to the framed_messages_from_rx queue. -serial_read_buffer = b'' - -# Queue of framed messages that have been received from the radio driver via serial. Queue is populated and consumed by the message formatter thread. -framed_messages_from_rx: list[bytes] = [] +# Queue of lines that have been received from the radio driver via serial. Each line should begin with '[PACKET RX]', then the bytes of the framed message, and ending with a number indicating the number of bytes in the message before it was framed. Queue is populated by the serial manager thread and consumed by the message formatter thread. +lines_read_from_driver: list[bytes] = [] +LINE_PREFIX = b'[PACKET RX]' # Queue of messages that have been unframed by the message formatter thread. Queue is populated by the message transport thread and consumed by the message transport thread. unframed_messages_from_rx: list[bytes] = [] @@ -63,7 +62,7 @@ def format_kiss_message(data: bytes, tnc_port: int = 0) -> bytearray: def serial_manager(): logger = logging.getLogger("SerialManager") - global framed_messages_to_tx, serial_read_buffer + global framed_messages_to_tx logger.info("Starting serial manager...") with open(RADIODRIVER_SERIAL_PORT, "wb") as ser: # switching to nonblocking file reads so we can quickly switch over to tx if there is nothing to read - https://stackoverflow.com/a/66410605 @@ -79,9 +78,13 @@ def serial_manager(): ser.write(m) logger.info("Message written to serial") - char = ser.read(1) - if len(char) > 0: - serial_read_buffer += char + logger.info("Reading line") + line = ser.readline() + if len(line) > 0: + logger.debug(f"Line read: {line}") + lines_read_from_driver.append(line) + else: + logger.debug("No line read") def message_formatter(): @@ -96,10 +99,38 @@ def message_formatter(): framed_messages_to_tx.append(formatted_message) logger.info("Message added to queue") - while len(serial_read_buffer) > 0: - pass - # TODO: Handle serial read buffer + + line = lines_read_from_driver.pop(0) + try: + parsed_line = parse_data_and_length_from_line(line) + unframed_messages_from_rx.append(parsed_line.data) + except ValueError as e: + logger.error(f"Error parsing line: {e}") + + +class ParsedLine(NamedTuple): + data: bytes + length: int + +def parse_data_and_length_from_line(line: bytes) -> ParsedLine: + if line.startswith(LINE_PREFIX): + stripped_line = line[len(LINE_PREFIX):] + else: + raise ValueError("Line does not start with expected prefix", LINE_PREFIX, line) + + unframed_data = b'' + while len(stripped_line) > 0: + # TODO: unframing. raise an exception if the data is badly formed. break out out the loop when we reach the end of the data. + pass + + expected_data_length = int(stripped_line) + if len(unframed_data) != expected_data_length: + raise ValueError("Expected data length does not match actual data length", expected_data_length, len(unframed_data)) + + return ParsedLine(data=unframed_data, length=expected_data_length) + + def mock_message_transport(): logger = logging.getLogger("MockMessageTransport") From 0611ecc4759371030d40fea76f0de385204e0b66 Mon Sep 17 00:00:00 2001 From: Jason Antwi-Appah Date: Wed, 19 Mar 2025 01:03:38 -0500 Subject: [PATCH 12/18] Handle empty frames gracefully --- radiodriver-proxy/README.md | 2 - radiodriver-proxy/proxy.py | 84 +++++++++++++++++-------------------- 2 files changed, 39 insertions(+), 47 deletions(-) diff --git a/radiodriver-proxy/README.md b/radiodriver-proxy/README.md index 5f3de968..4342ea86 100644 --- a/radiodriver-proxy/README.md +++ b/radiodriver-proxy/README.md @@ -64,7 +64,5 @@ TODOs - [x] Implement message TX over Unix socket for fake rover - [ ] Handle unframing of messages received from radiodriver - [ ] Use queue.SimpleQueue instead of arrays as queues :skull: -- [ ] Implement message RX over Unix socket for base station -- [ ] Implement message TX over Unix socket for base station - [ ] Handle exit signals on proxy - [ ] Test everything \ No newline at end of file diff --git a/radiodriver-proxy/proxy.py b/radiodriver-proxy/proxy.py index 5dfdea24..00bc06fd 100644 --- a/radiodriver-proxy/proxy.py +++ b/radiodriver-proxy/proxy.py @@ -8,7 +8,6 @@ import json import datetime import random -from typing import NamedTuple logger = logging.getLogger(__name__) @@ -22,13 +21,15 @@ # Queue of framed messages that are ready to be sent to the radio driver via serial. Queue is populated by the message formatter thread, consumed by the serial manager thread. framed_messages_to_tx: list[bytearray] = [] -# Queue of lines that have been received from the radio driver via serial. Each line should begin with '[PACKET RX]', then the bytes of the framed message, and ending with a number indicating the number of bytes in the message before it was framed. Queue is populated by the serial manager thread and consumed by the message formatter thread. -lines_read_from_driver: list[bytes] = [] -LINE_PREFIX = b'[PACKET RX]' - -# Queue of messages that have been unframed by the message formatter thread. Queue is populated by the message transport thread and consumed by the message transport thread. +# Queue of messages that have been unframed by the message formatter thread. Queue is populated by the message formatter thread and consumed by the message transport thread. unframed_messages_from_rx: list[bytes] = [] +# Buffer used to store bytes as they are received from the serial port. Populated by the serial manager thread, consumed by the message formatter thread. Once a full message has been received, it is added to the framed_messages_from_rx queue. +serial_read_buffer = b'' + +# Lock used to ensure that only one thread is accessing the serial_read_buffer at a time. +serial_read_buffer_lock = threading.Lock() + class KISSChars(enum.Enum): FEND = 0xC0 # Frame End FESC = 0xDB # Frame Escape @@ -62,7 +63,7 @@ def format_kiss_message(data: bytes, tnc_port: int = 0) -> bytearray: def serial_manager(): logger = logging.getLogger("SerialManager") - global framed_messages_to_tx + global framed_messages_to_tx, serial_read_buffer, serial_read_buffer_lock logger.info("Starting serial manager...") with open(RADIODRIVER_SERIAL_PORT, "wb") as ser: # switching to nonblocking file reads so we can quickly switch over to tx if there is nothing to read - https://stackoverflow.com/a/66410605 @@ -78,18 +79,33 @@ def serial_manager(): ser.write(m) logger.info("Message written to serial") - logger.info("Reading line") - line = ser.readline() - if len(line) > 0: - logger.debug(f"Line read: {line}") - lines_read_from_driver.append(line) - else: - logger.debug("No line read") + chars = ser.read() + if len(chars) > 0: + with serial_read_buffer_lock: + serial_read_buffer += chars def message_formatter(): + global serial_read_buffer, serial_read_buffer_lock logger = logging.getLogger("MessageFormatter") logger.info("Starting message formatter...") + + def parse_serial_read_buffer(): + global serial_read_buffer, serial_read_buffer_lock + logger.info("Parsing serial read buffer...") + fend_indexes: list[int] = [] + for char_index, char in enumerate(serial_read_buffer): + match char: + case KISSChars.FEND.value: + fend_indexes.append(char_index) + back_to_back_fends_exist = len(fend_indexes) > 1 and fend_indexes[-1] - fend_indexes[-2] == 1 + if back_to_back_fends_exist: + serial_read_buffer = serial_read_buffer[fend_indexes[-1]:] + logger.debug("Removed empty frame from serial read buffer.") + parse_serial_read_buffer() + # TODO: handle the other chars correctly + + while True: time.sleep(SLEEP_TIME) while len(unframed_messages_to_tx) > 0: @@ -99,38 +115,16 @@ def message_formatter(): framed_messages_to_tx.append(formatted_message) logger.info("Message added to queue") + logger.debug("Taking serial buffer lock") + with serial_read_buffer_lock: + logger.debug("Serial buffer lock taken") + if len(serial_read_buffer) == 0: + logger.debug("No data in serial buffer") + break + parse_serial_read_buffer() + logger.debug("Lock released") + - line = lines_read_from_driver.pop(0) - - try: - parsed_line = parse_data_and_length_from_line(line) - unframed_messages_from_rx.append(parsed_line.data) - except ValueError as e: - logger.error(f"Error parsing line: {e}") - - -class ParsedLine(NamedTuple): - data: bytes - length: int - -def parse_data_and_length_from_line(line: bytes) -> ParsedLine: - if line.startswith(LINE_PREFIX): - stripped_line = line[len(LINE_PREFIX):] - else: - raise ValueError("Line does not start with expected prefix", LINE_PREFIX, line) - - unframed_data = b'' - while len(stripped_line) > 0: - # TODO: unframing. raise an exception if the data is badly formed. break out out the loop when we reach the end of the data. - pass - - expected_data_length = int(stripped_line) - if len(unframed_data) != expected_data_length: - raise ValueError("Expected data length does not match actual data length", expected_data_length, len(unframed_data)) - - return ParsedLine(data=unframed_data, length=expected_data_length) - - def mock_message_transport(): logger = logging.getLogger("MockMessageTransport") From deeb3d6071a3244aea7cff0bee3e7990c1829bd6 Mon Sep 17 00:00:00 2001 From: Jason Antwi-Appah Date: Wed, 19 Mar 2025 12:02:44 -0500 Subject: [PATCH 13/18] Message unframing --- radiodriver-proxy/proxy.py | 78 ++++++++++++++++++++++++++++---------- 1 file changed, 57 insertions(+), 21 deletions(-) diff --git a/radiodriver-proxy/proxy.py b/radiodriver-proxy/proxy.py index 00bc06fd..10d9ae9b 100644 --- a/radiodriver-proxy/proxy.py +++ b/radiodriver-proxy/proxy.py @@ -11,9 +11,12 @@ logger = logging.getLogger(__name__) +DEV = True RADIODRIVER_SERIAL_PORT = '/dev/tty.debug-console' UNIX_SOCKET_PATH = '/var/run/radiodriver-proxy.sock' -SLEEP_TIME = 0.001 + +# slowing down so I can read the logs ;) +SLEEP_TIME = 1 if DEV else 0.001 # Queue of messages that need to be framed before being sent to the radio driver via serial. Queue is populated by the message transport thread and consumed by the message formatter thread. unframed_messages_to_tx: list[bytes] = [] @@ -22,7 +25,7 @@ framed_messages_to_tx: list[bytearray] = [] # Queue of messages that have been unframed by the message formatter thread. Queue is populated by the message formatter thread and consumed by the message transport thread. -unframed_messages_from_rx: list[bytes] = [] +unframed_messages_from_rx: list[bytearray] = [] # Buffer used to store bytes as they are received from the serial port. Populated by the serial manager thread, consumed by the message formatter thread. Once a full message has been received, it is added to the framed_messages_from_rx queue. serial_read_buffer = b'' @@ -65,7 +68,7 @@ def serial_manager(): logger = logging.getLogger("SerialManager") global framed_messages_to_tx, serial_read_buffer, serial_read_buffer_lock logger.info("Starting serial manager...") - with open(RADIODRIVER_SERIAL_PORT, "wb") as ser: + with open(RADIODRIVER_SERIAL_PORT, "w+b") as ser: # switching to nonblocking file reads so we can quickly switch over to tx if there is nothing to read - https://stackoverflow.com/a/66410605 os.set_blocking(ser.fileno(), False) @@ -80,32 +83,68 @@ def serial_manager(): logger.info("Message written to serial") chars = ser.read() - if len(chars) > 0: + if chars and len(chars) > 0: with serial_read_buffer_lock: serial_read_buffer += chars + +class KISSPacketParserStates(enum.Enum): + EXPECTING_INITIAL_FEND = 0 + EXPECTING_COMMAND = 1 + EXPECTING_DATA_OR_FINAL_FEND = 2 + EXPECTING_ESCAPED_CHAR = 3 + def message_formatter(): - global serial_read_buffer, serial_read_buffer_lock + global serial_read_buffer_lock logger = logging.getLogger("MessageFormatter") logger.info("Starting message formatter...") def parse_serial_read_buffer(): - global serial_read_buffer, serial_read_buffer_lock + global serial_read_buffer + EXPECTED_COMMAND = 0x00 logger.info("Parsing serial read buffer...") - fend_indexes: list[int] = [] + unframed_message = bytearray() + state = KISSPacketParserStates.EXPECTING_INITIAL_FEND + for char_index, char in enumerate(serial_read_buffer): - match char: - case KISSChars.FEND.value: - fend_indexes.append(char_index) - back_to_back_fends_exist = len(fend_indexes) > 1 and fend_indexes[-1] - fend_indexes[-2] == 1 - if back_to_back_fends_exist: - serial_read_buffer = serial_read_buffer[fend_indexes[-1]:] - logger.debug("Removed empty frame from serial read buffer.") - parse_serial_read_buffer() - # TODO: handle the other chars correctly - - + match state: + case KISSPacketParserStates.EXPECTING_INITIAL_FEND: + if char == KISSChars.FEND.value: + state = KISSPacketParserStates.EXPECTING_COMMAND + else: + logger.error("Unexpected character in EXPECTING_INITIAL_FEND state") + serial_read_buffer = serial_read_buffer[char_index+1:] + break + case KISSPacketParserStates.EXPECTING_COMMAND: + if char == EXPECTED_COMMAND: + state = KISSPacketParserStates.EXPECTING_DATA_OR_FINAL_FEND + else: + logger.error("Unexpected character in EXPECTING_COMMAND state") + serial_read_buffer = serial_read_buffer[char_index+1:] + break + case KISSPacketParserStates.EXPECTING_DATA_OR_FINAL_FEND: + match char: + case KISSChars.FESC.value: + state = KISSPacketParserStates.EXPECTING_ESCAPED_CHAR + case KISSChars.FEND.value: + if len(unframed_message) > 0: + logger.debug("Adding message to unframed_messages_from_rx") + unframed_messages_from_rx.append(unframed_message) + else: + logger.debug("Message is empty, ignoring") + serial_read_buffer = serial_read_buffer[char_index+1:] + break + case _: + unframed_message.append(char) + case KISSPacketParserStates.EXPECTING_ESCAPED_CHAR: + match char: + case KISSChars.TFEND.value: + unframed_message.append(KISSChars.FEND.value) + case KISSChars.TFESC.value: + unframed_message.append(KISSChars.FESC.value) + case _: + logger.error("Unexpected character in EXPECTING_ESCAPED_CHAR state") while True: time.sleep(SLEEP_TIME) while len(unframed_messages_to_tx) > 0: @@ -118,9 +157,6 @@ def parse_serial_read_buffer(): logger.debug("Taking serial buffer lock") with serial_read_buffer_lock: logger.debug("Serial buffer lock taken") - if len(serial_read_buffer) == 0: - logger.debug("No data in serial buffer") - break parse_serial_read_buffer() logger.debug("Lock released") From 2bee1cff5aecfbb0b8188db059f3fc28661f18ef Mon Sep 17 00:00:00 2001 From: Jason Antwi-Appah Date: Wed, 19 Mar 2025 12:41:47 -0500 Subject: [PATCH 14/18] Switch to SimpleQueue for thread safety --- basestation/main.go | 16 ++++----- radiodriver-proxy/README.md | 17 ++++++++-- radiodriver-proxy/proxy.py | 67 +++++++++++++++++++++---------------- 3 files changed, 61 insertions(+), 39 deletions(-) diff --git a/basestation/main.go b/basestation/main.go index adc06f62..a69fbaeb 100644 --- a/basestation/main.go +++ b/basestation/main.go @@ -72,14 +72,14 @@ func main() { videoStreams[depth_video.ID()] = depth_video // Initialize serial connection (add this before starting the HTTP server) - serial, err := serial.NewSerialConnection("/dev/tty.usbmodem3401", 115200) - serialConn = serial - if err != nil { - fmt.Printf("Error opening serial port: %v\n", err) - } else { - defer serialConn.Close() - } - serial.SetReadTimeout(10 * time.Millisecond) // Set a read timeout of 10ms + // serial, err := serial.NewSerialConnection("/dev/tty.debug-console", 115200) + // serialConn = serial + // if err != nil { + // fmt.Printf("Error opening serial port: %v\n", err) + // } else { + // defer serialConn.Close() + // } + // serial.SetReadTimeout(10 * time.Millisecond) // Set a read timeout of 10ms // After initializing serial connection, start the serial handler if serialConn != nil { diff --git a/radiodriver-proxy/README.md b/radiodriver-proxy/README.md index 4342ea86..1e347399 100644 --- a/radiodriver-proxy/README.md +++ b/radiodriver-proxy/README.md @@ -7,6 +7,16 @@ To send a message via radio, other processes simply write an integer representin When the radio driver receives a message from the radio, it sends the message back to this proxy over serial. Then, the proxy will unframe the message, and send it to the receiving process via Unix socket. The receiving process will listen for data on the Unix socket at `/var/run/radiodriver-proxy.sock`, again consisting of an integer, representing the size of the message in bytes, followed by the message data. +## Testing +To test, you'll want to be on a Unix-based system due to the proxy's dependence on Unix sockets, so the steps here will assume you're on a Unix-based system. Windows 11 and some Windows 10 versions [should support Unix sockets](https://devblogs.microsoft.com/commandline/af_unix-comes-to-windows/), though I haven't tested this so your mileage may vary. You might need to edit the socket path to point to a location that would exist on a Windows system. + +The proxy requires elevated permissions to run due to the location of the Unix socket, so start the proxy like so: +``` +sudo python proxy.py +``` +then run the base station +then run the fake rover as sudo so it can connect to the socket + ## Design The proxy runs 3 threads which each take care of 1-2 tasks in order to prevent issues with blocking. The threads pass data on to each other using through a number of shared queues. @@ -62,7 +72,8 @@ TODOs - [x] Receive messages from other processes to transmit with radiodriver via Unix socket - [x] Send received messages from radiodriver to other processes via Unix socket - [x] Implement message TX over Unix socket for fake rover -- [ ] Handle unframing of messages received from radiodriver -- [ ] Use queue.SimpleQueue instead of arrays as queues :skull: -- [ ] Handle exit signals on proxy +- [x] Handle unframing of messages received from radiodriver +- [x] Use queue.SimpleQueue instead of arrays as queues :skull: +- [ ] Use bytearray instead of bytes for buffer +- [ ] Handle exit signals on proxy (should close serial connection, delete unix socket) - [ ] Test everything \ No newline at end of file diff --git a/radiodriver-proxy/proxy.py b/radiodriver-proxy/proxy.py index 10d9ae9b..8a3a2c08 100644 --- a/radiodriver-proxy/proxy.py +++ b/radiodriver-proxy/proxy.py @@ -8,24 +8,25 @@ import json import datetime import random +import queue logger = logging.getLogger(__name__) -DEV = True -RADIODRIVER_SERIAL_PORT = '/dev/tty.debug-console' +DEV = False +RADIODRIVER_SERIAL_PORT = '/dev/tty.Bluetooth-Incoming-Port' UNIX_SOCKET_PATH = '/var/run/radiodriver-proxy.sock' # slowing down so I can read the logs ;) SLEEP_TIME = 1 if DEV else 0.001 # Queue of messages that need to be framed before being sent to the radio driver via serial. Queue is populated by the message transport thread and consumed by the message formatter thread. -unframed_messages_to_tx: list[bytes] = [] +unframed_messages_to_tx: queue.SimpleQueue[bytes] = queue.SimpleQueue() # Queue of framed messages that are ready to be sent to the radio driver via serial. Queue is populated by the message formatter thread, consumed by the serial manager thread. -framed_messages_to_tx: list[bytearray] = [] +framed_messages_to_tx: queue.SimpleQueue[bytearray] = queue.SimpleQueue() # Queue of messages that have been unframed by the message formatter thread. Queue is populated by the message formatter thread and consumed by the message transport thread. -unframed_messages_from_rx: list[bytearray] = [] +unframed_messages_from_rx: queue.SimpleQueue[bytearray] = queue.SimpleQueue() # Buffer used to store bytes as they are received from the serial port. Populated by the serial manager thread, consumed by the message formatter thread. Once a full message has been received, it is added to the framed_messages_from_rx queue. serial_read_buffer = b'' @@ -75,12 +76,14 @@ def serial_manager(): logger.info(f"Serial port opened: {ser.name}") while True: time.sleep(SLEEP_TIME) - - while len(framed_messages_to_tx) > 0: - m = framed_messages_to_tx.pop(0) + + try: + while m := framed_messages_to_tx.get_nowait(): logger.info("Writing message to serial") ser.write(m) logger.info("Message written to serial") + except queue.Empty: + logger.debug("Queue empty") chars = ser.read() if chars and len(chars) > 0: @@ -130,7 +133,7 @@ def parse_serial_read_buffer(): case KISSChars.FEND.value: if len(unframed_message) > 0: logger.debug("Adding message to unframed_messages_from_rx") - unframed_messages_from_rx.append(unframed_message) + unframed_messages_from_rx.put(unframed_message) else: logger.debug("Message is empty, ignoring") serial_read_buffer = serial_read_buffer[char_index+1:] @@ -145,15 +148,20 @@ def parse_serial_read_buffer(): unframed_message.append(KISSChars.FESC.value) case _: logger.error("Unexpected character in EXPECTING_ESCAPED_CHAR state") + while True: time.sleep(SLEEP_TIME) - while len(unframed_messages_to_tx) > 0: - logger.info("Formatting message...") - formatted_message = format_kiss_message(unframed_messages_to_tx.pop(0)) - logger.info("Formatted message") - framed_messages_to_tx.append(formatted_message) - logger.info("Message added to queue") - + + try: + while m := unframed_messages_to_tx.get_nowait(): + logger.info("Formatting message...") + formatted_message = format_kiss_message(m) + logger.info("Formatted message") + framed_messages_to_tx.put(formatted_message) + logger.info("Message added to queue") + except queue.Empty: + logger.debug("Queue empty") + logger.debug("Taking serial buffer lock") with serial_read_buffer_lock: logger.debug("Serial buffer lock taken") @@ -168,12 +176,13 @@ def mock_message_transport(): while True: fake_msg = json.dumps({"random": random.random(), "time": datetime.datetime.now().isoformat()}) logger.info(f"Message from process: {fake_msg}") - unframed_messages_to_tx.append(fake_msg.encode()) - - while len(unframed_messages_from_rx) > 0: - msg = unframed_messages_from_rx.pop(0) - logger.info(f"Message from radio: {msg}") - + unframed_messages_to_tx.put(fake_msg.encode()) + + try: + while msg := unframed_messages_from_rx.get_nowait(): + logger.info(f"Message from radio: {msg}") + except queue.Empty: + logger.debug("Queue empty") time.sleep(SLEEP_TIME) class ListeningFor(enum.IntEnum): @@ -222,14 +231,16 @@ def unix_socket_message_transport(): listening_for = ListeningFor.PAYLOAD elif listening_for == ListeningFor.PAYLOAD: buf = receive_all(conn, expected_payload_size) - unframed_messages_to_tx.append(buf) + unframed_messages_to_tx.put(buf) logger.debug("Received message") listening_for = ListeningFor.PAYLOAD_SIZE - - if len(unframed_messages_from_rx) > 0: - msg = unframed_messages_from_rx.pop(0) - conn.sendall(msg) - logger.debug("Message sent") + + try: + if msg := unframed_messages_from_rx.get_nowait(): + conn.sendall(msg) + logger.debug("Message sent") + except queue.Empty: + logger.debug("Queue empty") except ConnectionError as e: logger.error(f"Connection error: {e}") break # Exit loop on connection error From 2432f92300449f1ea021236c44ba2582150e0eb2 Mon Sep 17 00:00:00 2001 From: Jason Antwi-Appah Date: Wed, 19 Mar 2025 12:56:11 -0500 Subject: [PATCH 15/18] Switch serial buffer to bytearray --- radiodriver-proxy/README.md | 2 +- radiodriver-proxy/proxy.py | 6 +++--- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/radiodriver-proxy/README.md b/radiodriver-proxy/README.md index 1e347399..ef7a03ee 100644 --- a/radiodriver-proxy/README.md +++ b/radiodriver-proxy/README.md @@ -74,6 +74,6 @@ TODOs - [x] Implement message TX over Unix socket for fake rover - [x] Handle unframing of messages received from radiodriver - [x] Use queue.SimpleQueue instead of arrays as queues :skull: -- [ ] Use bytearray instead of bytes for buffer +- [x] Use bytearray instead of bytes for buffer - [ ] Handle exit signals on proxy (should close serial connection, delete unix socket) - [ ] Test everything \ No newline at end of file diff --git a/radiodriver-proxy/proxy.py b/radiodriver-proxy/proxy.py index 8a3a2c08..713d1829 100644 --- a/radiodriver-proxy/proxy.py +++ b/radiodriver-proxy/proxy.py @@ -23,13 +23,13 @@ unframed_messages_to_tx: queue.SimpleQueue[bytes] = queue.SimpleQueue() # Queue of framed messages that are ready to be sent to the radio driver via serial. Queue is populated by the message formatter thread, consumed by the serial manager thread. -framed_messages_to_tx: queue.SimpleQueue[bytearray] = queue.SimpleQueue() +framed_messages_to_tx: queue.SimpleQueue[bytes] = queue.SimpleQueue() # Queue of messages that have been unframed by the message formatter thread. Queue is populated by the message formatter thread and consumed by the message transport thread. unframed_messages_from_rx: queue.SimpleQueue[bytearray] = queue.SimpleQueue() # Buffer used to store bytes as they are received from the serial port. Populated by the serial manager thread, consumed by the message formatter thread. Once a full message has been received, it is added to the framed_messages_from_rx queue. -serial_read_buffer = b'' +serial_read_buffer = bytearray() # Lock used to ensure that only one thread is accessing the serial_read_buffer at a time. serial_read_buffer_lock = threading.Lock() @@ -88,7 +88,7 @@ def serial_manager(): chars = ser.read() if chars and len(chars) > 0: with serial_read_buffer_lock: - serial_read_buffer += chars + serial_read_buffer.extend(chars) class KISSPacketParserStates(enum.Enum): From 97df96693712f3e6ce71ca9e6a0500b6483af975 Mon Sep 17 00:00:00 2001 From: Jason Antwi-Appah Date: Wed, 19 Mar 2025 13:30:43 -0500 Subject: [PATCH 16/18] Fix type --- radiodriver-proxy/proxy.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/radiodriver-proxy/proxy.py b/radiodriver-proxy/proxy.py index 713d1829..829f9435 100644 --- a/radiodriver-proxy/proxy.py +++ b/radiodriver-proxy/proxy.py @@ -23,7 +23,7 @@ unframed_messages_to_tx: queue.SimpleQueue[bytes] = queue.SimpleQueue() # Queue of framed messages that are ready to be sent to the radio driver via serial. Queue is populated by the message formatter thread, consumed by the serial manager thread. -framed_messages_to_tx: queue.SimpleQueue[bytes] = queue.SimpleQueue() +framed_messages_to_tx: queue.SimpleQueue[bytearray] = queue.SimpleQueue() # Queue of messages that have been unframed by the message formatter thread. Queue is populated by the message formatter thread and consumed by the message transport thread. unframed_messages_from_rx: queue.SimpleQueue[bytearray] = queue.SimpleQueue() From f264593c7acf83600e9aee95826617470a9b9b2b Mon Sep 17 00:00:00 2001 From: Jason Antwi-Appah Date: Wed, 19 Mar 2025 13:44:00 -0500 Subject: [PATCH 17/18] Remove TODOs, update sequence diagram --- radiodriver-proxy/README.md | 13 +------------ 1 file changed, 1 insertion(+), 12 deletions(-) diff --git a/radiodriver-proxy/README.md b/radiodriver-proxy/README.md index ef7a03ee..2a8b077b 100644 --- a/radiodriver-proxy/README.md +++ b/radiodriver-proxy/README.md @@ -50,7 +50,7 @@ sequenceDiagram BRadio ->> RRadio: Transmits `\xc0\x00Hi\xc0` via
magic airwave stuff - RDriver ->> RRadio: Receives `\xc0\x00Hi\xc0` over SPI/UART (again idk which lol) + RDriver ->> RRadio: Receives `\xc0\x00Hi\xc0` over
SPI/UART (again idk which lol) RSerial ->> RDriver: Receives `\xc0\x00Hi\xc0` over serial RMsg ->> RSerial: Pulls `\xc0\x00Hi\xc0` off shared queue of
messages that need to be unframed RSocket ->> RMsg: Pulls `Hi` off shared queue of
messages that have been unframed @@ -66,14 +66,3 @@ sequenceDiagram end participant Rover ``` - ---- -TODOs -- [x] Receive messages from other processes to transmit with radiodriver via Unix socket -- [x] Send received messages from radiodriver to other processes via Unix socket -- [x] Implement message TX over Unix socket for fake rover -- [x] Handle unframing of messages received from radiodriver -- [x] Use queue.SimpleQueue instead of arrays as queues :skull: -- [x] Use bytearray instead of bytes for buffer -- [ ] Handle exit signals on proxy (should close serial connection, delete unix socket) -- [ ] Test everything \ No newline at end of file From e6deac7f5e5d4a672ec56be3c3bac4504963ca98 Mon Sep 17 00:00:00 2001 From: Jason Antwi-Appah Date: Wed, 19 Mar 2025 13:49:01 -0500 Subject: [PATCH 18/18] Revert unintended base station change --- basestation/main.go | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/basestation/main.go b/basestation/main.go index a69fbaeb..adc06f62 100644 --- a/basestation/main.go +++ b/basestation/main.go @@ -72,14 +72,14 @@ func main() { videoStreams[depth_video.ID()] = depth_video // Initialize serial connection (add this before starting the HTTP server) - // serial, err := serial.NewSerialConnection("/dev/tty.debug-console", 115200) - // serialConn = serial - // if err != nil { - // fmt.Printf("Error opening serial port: %v\n", err) - // } else { - // defer serialConn.Close() - // } - // serial.SetReadTimeout(10 * time.Millisecond) // Set a read timeout of 10ms + serial, err := serial.NewSerialConnection("/dev/tty.usbmodem3401", 115200) + serialConn = serial + if err != nil { + fmt.Printf("Error opening serial port: %v\n", err) + } else { + defer serialConn.Close() + } + serial.SetReadTimeout(10 * time.Millisecond) // Set a read timeout of 10ms // After initializing serial connection, start the serial handler if serialConn != nil {