diff --git a/basestation/fake-rover/rover.py b/basestation/fake-rover/rover.py index 3b552a2a..978ffb60 100644 --- a/basestation/fake-rover/rover.py +++ b/basestation/fake-rover/rover.py @@ -7,6 +7,7 @@ import random import struct import os +from enum import IntEnum current_dir = os.path.dirname(os.path.abspath(__file__)) @@ -21,6 +22,12 @@ from google.protobuf.message import Message import msgs_pb2 as rover +class BaseStationConnectionMethod(IntEnum): + TCP = 0 + HAM_LINK = 1 + +CONNECT_MODE = BaseStationConnectionMethod.HAM_LINK + # Constants RETRY_DELAY = 1 # seconds STARTING_LAT = 41.745161 @@ -199,8 +206,12 @@ def signal_handler(sig, frame): # Connect to TCP server with retry logic while running: try: - stream = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - stream.connect(("localhost", 8000)) + if CONNECT_MODE == BaseStationConnectionMethod.TCP: + stream = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + stream.connect(("localhost", 8000)) + elif CONNECT_MODE == BaseStationConnectionMethod.HAM_LINK: + stream = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) + stream.connect("/var/run/radiodriver-proxy.sock") stream.setblocking(False) break # Connection successful, exit the loop except socket.error as e: diff --git a/basestation/rover-msgs/python/__pycache__/msgs_pb2.cpython-313.pyc b/basestation/rover-msgs/python/__pycache__/msgs_pb2.cpython-313.pyc deleted file mode 100644 index e2454332..00000000 Binary files a/basestation/rover-msgs/python/__pycache__/msgs_pb2.cpython-313.pyc and /dev/null differ diff --git a/radiodriver-proxy/.gitignore b/radiodriver-proxy/.gitignore new file mode 100644 index 00000000..408d9c5d --- /dev/null +++ b/radiodriver-proxy/.gitignore @@ -0,0 +1,2 @@ +.venv +radiodriver-proxy.log \ No newline at end of file diff --git a/radiodriver-proxy/README.md b/radiodriver-proxy/README.md new file mode 100644 index 00000000..2a8b077b --- /dev/null +++ b/radiodriver-proxy/README.md @@ -0,0 +1,68 @@ +# radiodriver-proxy + +## Overview +This service is for the the base station and ROS nodes to handle logic for sending and receiving messages with the [radio driver](http://github.com/comet-robotics/urc-radiodriver), as an alternative/backup option for the Wi-Fi TCP uplink. There is additional logic needed to \[un-\]wrap messages before they are sent/received to the radio driver which this process handles, removing the need to implement serial support and KISS framing support in both the base station and ROS nodes. Both the base station and rover will have local instances of this service running. + +To send a message via radio, other processes simply write an integer representing the size of the upcoming data in bytes, followed by the message data, to the Unix socket at `/var/run/radiodriver-proxy.sock`. The proxy will receive the message, frame it, and send it to the radio driver via serial for transmission. + +When the radio driver receives a message from the radio, it sends the message back to this proxy over serial. Then, the proxy will unframe the message, and send it to the receiving process via Unix socket. The receiving process will listen for data on the Unix socket at `/var/run/radiodriver-proxy.sock`, again consisting of an integer, representing the size of the message in bytes, followed by the message data. + +## Testing +To test, you'll want to be on a Unix-based system due to the proxy's dependence on Unix sockets, so the steps here will assume you're on a Unix-based system. Windows 11 and some Windows 10 versions [should support Unix sockets](https://devblogs.microsoft.com/commandline/af_unix-comes-to-windows/), though I haven't tested this so your mileage may vary. You might need to edit the socket path to point to a location that would exist on a Windows system. + +The proxy requires elevated permissions to run due to the location of the Unix socket, so start the proxy like so: +``` +sudo python proxy.py +``` +then run the base station +then run the fake rover as sudo so it can connect to the socket + +## Design +The proxy runs 3 threads which each take care of 1-2 tasks in order to prevent issues with blocking. The threads pass data on to each other using through a number of shared queues. + +The message transport thread handles sending and receiving messages (bytes) over the Unix socket used to communicate with upstream applications like the base station and ROS nodes. The serial manager thread handles sending and receiving bytes over the serial port used to communicate with the radio driver. The message formatter thread handles framing messages before they are passed to the serial manager thread, and unframing messages when they are received by the serial manager thread. + +This diagram shows the flow of a 2-byte message, 'Hi', being sent from the base station to the rover via ham link. + + + + +```mermaid +--- +title: Base Station sending "Hi", a 2-byte message to the rover +--- + +sequenceDiagram + autonumber + participant Base as Base Station server + box Green radiodriver-proxy running on Base Station + participant BSocket as Unix Socket Message Transport thread + participant BMsg as Message Formatter thread + participant BSerial as Serial Manager thread + end + participant BDriver as Base Station radio driver + participant BRadio as Base Station transceiver + Base ->> BSocket: Sends `2Hi` via Unix Socket + BSocket ->> BMsg: Pushes `Hi` into shared queue of
messages that need to be framed + BMsg ->> BSerial: Pushes `\xc0\x00Hi\xc0` into shared queue of
messages that have been framed + BSerial ->> BDriver: Sends `\xc0\x00Hi\xc0`
over serial + BDriver ->> BRadio: Sends `\xc0\x00Hi\xc0` to radio over SPI/UART
(idk which lol) + + BRadio ->> RRadio: Transmits `\xc0\x00Hi\xc0` via
magic airwave stuff + + RDriver ->> RRadio: Receives `\xc0\x00Hi\xc0` over
SPI/UART (again idk which lol) + RSerial ->> RDriver: Receives `\xc0\x00Hi\xc0` over serial + RMsg ->> RSerial: Pulls `\xc0\x00Hi\xc0` off shared queue of
messages that need to be unframed + RSocket ->> RMsg: Pulls `Hi` off shared queue of
messages that have been unframed + Rover ->> RSocket: Receives `Hi` via Unix Socket + + + participant RRadio as Rover transceiver + participant RDriver as Rover radio driver + box Green radiodriver-proxy running on Rover + participant RSerial as Serial Manager thread + participant RMsg as Message Formatter thread + participant RSocket as Unix Socket Message Transport thread + end + participant Rover +``` diff --git a/radiodriver-proxy/proxy.py b/radiodriver-proxy/proxy.py new file mode 100644 index 00000000..829f9435 --- /dev/null +++ b/radiodriver-proxy/proxy.py @@ -0,0 +1,268 @@ +import threading +import enum +import logging +import time +import socket +import os +import struct +import json +import datetime +import random +import queue + +logger = logging.getLogger(__name__) + +DEV = False +RADIODRIVER_SERIAL_PORT = '/dev/tty.Bluetooth-Incoming-Port' +UNIX_SOCKET_PATH = '/var/run/radiodriver-proxy.sock' + +# slowing down so I can read the logs ;) +SLEEP_TIME = 1 if DEV else 0.001 + +# Queue of messages that need to be framed before being sent to the radio driver via serial. Queue is populated by the message transport thread and consumed by the message formatter thread. +unframed_messages_to_tx: queue.SimpleQueue[bytes] = queue.SimpleQueue() + +# Queue of framed messages that are ready to be sent to the radio driver via serial. Queue is populated by the message formatter thread, consumed by the serial manager thread. +framed_messages_to_tx: queue.SimpleQueue[bytearray] = queue.SimpleQueue() + +# Queue of messages that have been unframed by the message formatter thread. Queue is populated by the message formatter thread and consumed by the message transport thread. +unframed_messages_from_rx: queue.SimpleQueue[bytearray] = queue.SimpleQueue() + +# Buffer used to store bytes as they are received from the serial port. Populated by the serial manager thread, consumed by the message formatter thread. Once a full message has been received, it is added to the framed_messages_from_rx queue. +serial_read_buffer = bytearray() + +# Lock used to ensure that only one thread is accessing the serial_read_buffer at a time. +serial_read_buffer_lock = threading.Lock() + +class KISSChars(enum.Enum): + FEND = 0xC0 # Frame End + FESC = 0xDB # Frame Escape + TFEND = 0xDC # Transposed Frame End + TFESC = 0xDD # Transposed Frame Escape + + +def format_kiss_message(data: bytes, tnc_port: int = 0) -> bytearray: + tnc_port_valid = tnc_port >= 0 and tnc_port <= 9 + if not tnc_port_valid: + raise ValueError("TNC Port must be between 0 and 9") + + packet = bytearray() + packet.append(KISSChars.FEND.value) + + dataframe = tnc_port << 4 + packet.append(dataframe) + + for char in data: + if char == KISSChars.FEND.value: + packet.append(KISSChars.FESC.value) + packet.append(KISSChars.TFEND.value) + elif char == KISSChars.FESC.value: + packet.append(KISSChars.FESC.value) + packet.append(KISSChars.TFESC.value) + else: + packet.append(char) + + packet.append(KISSChars.FEND.value) + return packet + +def serial_manager(): + logger = logging.getLogger("SerialManager") + global framed_messages_to_tx, serial_read_buffer, serial_read_buffer_lock + logger.info("Starting serial manager...") + with open(RADIODRIVER_SERIAL_PORT, "w+b") as ser: + # switching to nonblocking file reads so we can quickly switch over to tx if there is nothing to read - https://stackoverflow.com/a/66410605 + os.set_blocking(ser.fileno(), False) + + logger.info(f"Serial port opened: {ser.name}") + while True: + time.sleep(SLEEP_TIME) + + try: + while m := framed_messages_to_tx.get_nowait(): + logger.info("Writing message to serial") + ser.write(m) + logger.info("Message written to serial") + except queue.Empty: + logger.debug("Queue empty") + + chars = ser.read() + if chars and len(chars) > 0: + with serial_read_buffer_lock: + serial_read_buffer.extend(chars) + + +class KISSPacketParserStates(enum.Enum): + EXPECTING_INITIAL_FEND = 0 + EXPECTING_COMMAND = 1 + EXPECTING_DATA_OR_FINAL_FEND = 2 + EXPECTING_ESCAPED_CHAR = 3 + + +def message_formatter(): + global serial_read_buffer_lock + logger = logging.getLogger("MessageFormatter") + logger.info("Starting message formatter...") + + def parse_serial_read_buffer(): + global serial_read_buffer + EXPECTED_COMMAND = 0x00 + logger.info("Parsing serial read buffer...") + unframed_message = bytearray() + state = KISSPacketParserStates.EXPECTING_INITIAL_FEND + + for char_index, char in enumerate(serial_read_buffer): + match state: + case KISSPacketParserStates.EXPECTING_INITIAL_FEND: + if char == KISSChars.FEND.value: + state = KISSPacketParserStates.EXPECTING_COMMAND + else: + logger.error("Unexpected character in EXPECTING_INITIAL_FEND state") + serial_read_buffer = serial_read_buffer[char_index+1:] + break + case KISSPacketParserStates.EXPECTING_COMMAND: + if char == EXPECTED_COMMAND: + state = KISSPacketParserStates.EXPECTING_DATA_OR_FINAL_FEND + else: + logger.error("Unexpected character in EXPECTING_COMMAND state") + serial_read_buffer = serial_read_buffer[char_index+1:] + break + case KISSPacketParserStates.EXPECTING_DATA_OR_FINAL_FEND: + match char: + case KISSChars.FESC.value: + state = KISSPacketParserStates.EXPECTING_ESCAPED_CHAR + case KISSChars.FEND.value: + if len(unframed_message) > 0: + logger.debug("Adding message to unframed_messages_from_rx") + unframed_messages_from_rx.put(unframed_message) + else: + logger.debug("Message is empty, ignoring") + serial_read_buffer = serial_read_buffer[char_index+1:] + break + case _: + unframed_message.append(char) + case KISSPacketParserStates.EXPECTING_ESCAPED_CHAR: + match char: + case KISSChars.TFEND.value: + unframed_message.append(KISSChars.FEND.value) + case KISSChars.TFESC.value: + unframed_message.append(KISSChars.FESC.value) + case _: + logger.error("Unexpected character in EXPECTING_ESCAPED_CHAR state") + + while True: + time.sleep(SLEEP_TIME) + + try: + while m := unframed_messages_to_tx.get_nowait(): + logger.info("Formatting message...") + formatted_message = format_kiss_message(m) + logger.info("Formatted message") + framed_messages_to_tx.put(formatted_message) + logger.info("Message added to queue") + except queue.Empty: + logger.debug("Queue empty") + + logger.debug("Taking serial buffer lock") + with serial_read_buffer_lock: + logger.debug("Serial buffer lock taken") + parse_serial_read_buffer() + logger.debug("Lock released") + + + +def mock_message_transport(): + logger = logging.getLogger("MockMessageTransport") + logger.info("Starting message transport...") + while True: + fake_msg = json.dumps({"random": random.random(), "time": datetime.datetime.now().isoformat()}) + logger.info(f"Message from process: {fake_msg}") + unframed_messages_to_tx.put(fake_msg.encode()) + + try: + while msg := unframed_messages_from_rx.get_nowait(): + logger.info(f"Message from radio: {msg}") + except queue.Empty: + logger.debug("Queue empty") + time.sleep(SLEEP_TIME) + +class ListeningFor(enum.IntEnum): + PAYLOAD_SIZE = 0 + PAYLOAD = 1 + +DEFAULT_CHUNK_SIZE = 1024 +def receive_all(conn: socket.socket, size: int) -> bytes: + """ + Receive an exact number of bytes from a socket connection. + Implements chunked reading with a reasonable buffer size. + """ + chunks = [] + bytes_recd = 0 + while bytes_recd < size: + remaining_bytes = size - bytes_recd + + chunk = conn.recv(min(remaining_bytes, DEFAULT_CHUNK_SIZE)) + if not chunk: # Connection was closed + raise ConnectionError("Socket connection closed while receiving data") + chunks.append(chunk) + bytes_recd += len(chunk) + return b''.join(chunks) + +def unix_socket_message_transport(): + logger = logging.getLogger("UnixSocketMessageTransport") + logger.info("Starting Unix socket message transport...") + os.unlink(UNIX_SOCKET_PATH) + sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) + sock.bind(UNIX_SOCKET_PATH) + sock.listen(1) + logger.info(f"Listening on {UNIX_SOCKET_PATH}") + + listening_for = ListeningFor.PAYLOAD_SIZE + expected_payload_size = -1 + + conn, addr = sock.accept() + while True: + logger.debug(f"Listening for next {listening_for.name}...") + try: + if listening_for == ListeningFor.PAYLOAD_SIZE: + buf = receive_all(conn, 4) # Always read exactly 4 bytes for size + size = struct.unpack('!I', buf)[0] + logger.info(f"Expected payload size: {size}") + expected_payload_size = size + listening_for = ListeningFor.PAYLOAD + elif listening_for == ListeningFor.PAYLOAD: + buf = receive_all(conn, expected_payload_size) + unframed_messages_to_tx.put(buf) + logger.debug("Received message") + listening_for = ListeningFor.PAYLOAD_SIZE + + try: + if msg := unframed_messages_from_rx.get_nowait(): + conn.sendall(msg) + logger.debug("Message sent") + except queue.Empty: + logger.debug("Queue empty") + except ConnectionError as e: + logger.error(f"Connection error: {e}") + break # Exit loop on connection error + except Exception as e: + logger.error(f"Error receiving data: {e}") + listening_for = ListeningFor.PAYLOAD_SIZE # Reset state on error + continue + + time.sleep(SLEEP_TIME) + +def main(): + logging.basicConfig(level=logging.DEBUG, format='[%(levelname)s] %(asctime)s | %(name)s: %(message)s',) + logger.info("--- STARTING RADIODRIVER PROXY ---") + + message_transport_thread = threading.Thread(target=unix_socket_message_transport) + message_transport_thread.start() + + serial_manager_thread = threading.Thread(target=serial_manager) + serial_manager_thread.start() + + message_formatter_thread = threading.Thread(target=message_formatter) + message_formatter_thread.start() + +if __name__ == "__main__": + main() \ No newline at end of file diff --git a/ws_urc/launch/__pycache__/rover.cpython-310.pyc b/ws_urc/launch/__pycache__/rover.cpython-310.pyc deleted file mode 100644 index 1950e25d..00000000 Binary files a/ws_urc/launch/__pycache__/rover.cpython-310.pyc and /dev/null differ diff --git a/ws_urc/launch/__pycache__/rover.cpython-311.pyc b/ws_urc/launch/__pycache__/rover.cpython-311.pyc deleted file mode 100644 index 95305667..00000000 Binary files a/ws_urc/launch/__pycache__/rover.cpython-311.pyc and /dev/null differ