Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 13 additions & 2 deletions basestation/fake-rover/rover.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import random
import struct
import os
from enum import IntEnum

current_dir = os.path.dirname(os.path.abspath(__file__))

Expand All @@ -21,6 +22,12 @@
from google.protobuf.message import Message
import msgs_pb2 as rover

class BaseStationConnectionMethod(IntEnum):
TCP = 0
HAM_LINK = 1

CONNECT_MODE = BaseStationConnectionMethod.HAM_LINK

# Constants
RETRY_DELAY = 1 # seconds
STARTING_LAT = 41.745161
Expand Down Expand Up @@ -199,8 +206,12 @@ def signal_handler(sig, frame):
# Connect to TCP server with retry logic
while running:
try:
stream = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
stream.connect(("localhost", 8000))
if CONNECT_MODE == BaseStationConnectionMethod.TCP:
stream = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
stream.connect(("localhost", 8000))
elif CONNECT_MODE == BaseStationConnectionMethod.HAM_LINK:
stream = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
stream.connect("/var/run/radiodriver-proxy.sock")
stream.setblocking(False)
break # Connection successful, exit the loop
except socket.error as e:
Expand Down
Binary file not shown.
2 changes: 2 additions & 0 deletions radiodriver-proxy/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
.venv
radiodriver-proxy.log
68 changes: 68 additions & 0 deletions radiodriver-proxy/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
# radiodriver-proxy

## Overview
This service is for the the base station and ROS nodes to handle logic for sending and receiving messages with the [radio driver](http://github.com/comet-robotics/urc-radiodriver), as an alternative/backup option for the Wi-Fi TCP uplink. There is additional logic needed to \[un-\]wrap messages before they are sent/received to the radio driver which this process handles, removing the need to implement serial support and KISS framing support in both the base station and ROS nodes. Both the base station and rover will have local instances of this service running.

To send a message via radio, other processes simply write an integer representing the size of the upcoming data in bytes, followed by the message data, to the Unix socket at `/var/run/radiodriver-proxy.sock`. The proxy will receive the message, frame it, and send it to the radio driver via serial for transmission.

When the radio driver receives a message from the radio, it sends the message back to this proxy over serial. Then, the proxy will unframe the message, and send it to the receiving process via Unix socket. The receiving process will listen for data on the Unix socket at `/var/run/radiodriver-proxy.sock`, again consisting of an integer, representing the size of the message in bytes, followed by the message data.

## Testing
To test, you'll want to be on a Unix-based system due to the proxy's dependence on Unix sockets, so the steps here will assume you're on a Unix-based system. Windows 11 and some Windows 10 versions [should support Unix sockets](https://devblogs.microsoft.com/commandline/af_unix-comes-to-windows/), though I haven't tested this so your mileage may vary. You might need to edit the socket path to point to a location that would exist on a Windows system.

The proxy requires elevated permissions to run due to the location of the Unix socket, so start the proxy like so:
```
sudo python proxy.py
```
then run the base station
then run the fake rover as sudo so it can connect to the socket

## Design
The proxy runs 3 threads which each take care of 1-2 tasks in order to prevent issues with blocking. The threads pass data on to each other using through a number of shared queues.

The message transport thread handles sending and receiving messages (bytes) over the Unix socket used to communicate with upstream applications like the base station and ROS nodes. The serial manager thread handles sending and receiving bytes over the serial port used to communicate with the radio driver. The message formatter thread handles framing messages before they are passed to the serial manager thread, and unframing messages when they are received by the serial manager thread.

This diagram shows the flow of a 2-byte message, 'Hi', being sent from the base station to the rover via ham link.

<!-- NOTE: This is a diagram written using Mermaid syntax. A rendered version of this graph should be displayed when previewing this Markdown file on GitHub. -->
<!-- TODO: I could probably separate this into 3 separate sequence diagrams: one that goes base -> base proxy -> rover proxy -> rover, one that goes into the steps on the base proxy to TX, and one that goes into the the steps on the rover to RX, making 3 smaller/less complex diagrams as opposed to one giant one that is annoying to look at -->

```mermaid
---
title: Base Station sending "Hi", a 2-byte message to the rover
---

sequenceDiagram
autonumber
participant Base as Base Station server
box Green radiodriver-proxy running on Base Station
participant BSocket as Unix Socket Message Transport thread
participant BMsg as Message Formatter thread
participant BSerial as Serial Manager thread
end
participant BDriver as Base Station radio driver
participant BRadio as Base Station transceiver
Base ->> BSocket: Sends `2Hi` via Unix Socket
BSocket ->> BMsg: Pushes `Hi` into shared queue of<br/> messages that need to be framed
BMsg ->> BSerial: Pushes `\xc0\x00Hi\xc0` into shared queue of<br/> messages that have been framed
BSerial ->> BDriver: Sends `\xc0\x00Hi\xc0` <br/> over serial
BDriver ->> BRadio: Sends `\xc0\x00Hi\xc0` to radio over SPI/UART<br/>(idk which lol)

BRadio ->> RRadio: Transmits `\xc0\x00Hi\xc0` via <br/> magic airwave stuff

RDriver ->> RRadio: Receives `\xc0\x00Hi\xc0` over<br/>SPI/UART (again idk which lol)
RSerial ->> RDriver: Receives `\xc0\x00Hi\xc0` over serial
RMsg ->> RSerial: Pulls `\xc0\x00Hi\xc0` off shared queue of<br/> messages that need to be unframed
RSocket ->> RMsg: Pulls `Hi` off shared queue of <br/> messages that have been unframed
Rover ->> RSocket: Receives `Hi` via Unix Socket


participant RRadio as Rover transceiver
participant RDriver as Rover radio driver
box Green radiodriver-proxy running on Rover
participant RSerial as Serial Manager thread
participant RMsg as Message Formatter thread
participant RSocket as Unix Socket Message Transport thread
end
participant Rover
```
268 changes: 268 additions & 0 deletions radiodriver-proxy/proxy.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,268 @@
import threading
import enum
import logging
import time
import socket
import os
import struct
import json
import datetime
import random
import queue

logger = logging.getLogger(__name__)

DEV = False
RADIODRIVER_SERIAL_PORT = '/dev/tty.Bluetooth-Incoming-Port'
UNIX_SOCKET_PATH = '/var/run/radiodriver-proxy.sock'

# slowing down so I can read the logs ;)
SLEEP_TIME = 1 if DEV else 0.001

# Queue of messages that need to be framed before being sent to the radio driver via serial. Queue is populated by the message transport thread and consumed by the message formatter thread.
unframed_messages_to_tx: queue.SimpleQueue[bytes] = queue.SimpleQueue()

# Queue of framed messages that are ready to be sent to the radio driver via serial. Queue is populated by the message formatter thread, consumed by the serial manager thread.
framed_messages_to_tx: queue.SimpleQueue[bytearray] = queue.SimpleQueue()

# Queue of messages that have been unframed by the message formatter thread. Queue is populated by the message formatter thread and consumed by the message transport thread.
unframed_messages_from_rx: queue.SimpleQueue[bytearray] = queue.SimpleQueue()

# Buffer used to store bytes as they are received from the serial port. Populated by the serial manager thread, consumed by the message formatter thread. Once a full message has been received, it is added to the framed_messages_from_rx queue.
serial_read_buffer = bytearray()

# Lock used to ensure that only one thread is accessing the serial_read_buffer at a time.
serial_read_buffer_lock = threading.Lock()

class KISSChars(enum.Enum):
FEND = 0xC0 # Frame End
FESC = 0xDB # Frame Escape
TFEND = 0xDC # Transposed Frame End
TFESC = 0xDD # Transposed Frame Escape


def format_kiss_message(data: bytes, tnc_port: int = 0) -> bytearray:
tnc_port_valid = tnc_port >= 0 and tnc_port <= 9
if not tnc_port_valid:
raise ValueError("TNC Port must be between 0 and 9")

packet = bytearray()
packet.append(KISSChars.FEND.value)

dataframe = tnc_port << 4
packet.append(dataframe)

for char in data:
if char == KISSChars.FEND.value:
packet.append(KISSChars.FESC.value)
packet.append(KISSChars.TFEND.value)
elif char == KISSChars.FESC.value:
packet.append(KISSChars.FESC.value)
packet.append(KISSChars.TFESC.value)
else:
packet.append(char)

packet.append(KISSChars.FEND.value)
return packet

def serial_manager():
logger = logging.getLogger("SerialManager")
global framed_messages_to_tx, serial_read_buffer, serial_read_buffer_lock
logger.info("Starting serial manager...")
with open(RADIODRIVER_SERIAL_PORT, "w+b") as ser:
# switching to nonblocking file reads so we can quickly switch over to tx if there is nothing to read - https://stackoverflow.com/a/66410605
os.set_blocking(ser.fileno(), False)

logger.info(f"Serial port opened: {ser.name}")
while True:
time.sleep(SLEEP_TIME)

try:
while m := framed_messages_to_tx.get_nowait():
logger.info("Writing message to serial")
ser.write(m)
logger.info("Message written to serial")
except queue.Empty:
logger.debug("Queue empty")

chars = ser.read()
if chars and len(chars) > 0:
with serial_read_buffer_lock:
serial_read_buffer.extend(chars)


class KISSPacketParserStates(enum.Enum):
EXPECTING_INITIAL_FEND = 0
EXPECTING_COMMAND = 1
EXPECTING_DATA_OR_FINAL_FEND = 2
EXPECTING_ESCAPED_CHAR = 3


def message_formatter():
global serial_read_buffer_lock
logger = logging.getLogger("MessageFormatter")
logger.info("Starting message formatter...")

def parse_serial_read_buffer():
global serial_read_buffer
EXPECTED_COMMAND = 0x00
logger.info("Parsing serial read buffer...")
unframed_message = bytearray()
state = KISSPacketParserStates.EXPECTING_INITIAL_FEND

for char_index, char in enumerate(serial_read_buffer):
match state:
case KISSPacketParserStates.EXPECTING_INITIAL_FEND:
if char == KISSChars.FEND.value:
state = KISSPacketParserStates.EXPECTING_COMMAND
else:
logger.error("Unexpected character in EXPECTING_INITIAL_FEND state")
serial_read_buffer = serial_read_buffer[char_index+1:]
break
case KISSPacketParserStates.EXPECTING_COMMAND:
if char == EXPECTED_COMMAND:
state = KISSPacketParserStates.EXPECTING_DATA_OR_FINAL_FEND
else:
logger.error("Unexpected character in EXPECTING_COMMAND state")
serial_read_buffer = serial_read_buffer[char_index+1:]
break
case KISSPacketParserStates.EXPECTING_DATA_OR_FINAL_FEND:
match char:
case KISSChars.FESC.value:
state = KISSPacketParserStates.EXPECTING_ESCAPED_CHAR
case KISSChars.FEND.value:
if len(unframed_message) > 0:
logger.debug("Adding message to unframed_messages_from_rx")
unframed_messages_from_rx.put(unframed_message)
else:
logger.debug("Message is empty, ignoring")
serial_read_buffer = serial_read_buffer[char_index+1:]
break
case _:
unframed_message.append(char)
case KISSPacketParserStates.EXPECTING_ESCAPED_CHAR:
match char:
case KISSChars.TFEND.value:
unframed_message.append(KISSChars.FEND.value)
case KISSChars.TFESC.value:
unframed_message.append(KISSChars.FESC.value)
case _:
logger.error("Unexpected character in EXPECTING_ESCAPED_CHAR state")

while True:
time.sleep(SLEEP_TIME)

try:
while m := unframed_messages_to_tx.get_nowait():
logger.info("Formatting message...")
formatted_message = format_kiss_message(m)
logger.info("Formatted message")
framed_messages_to_tx.put(formatted_message)
logger.info("Message added to queue")
except queue.Empty:
logger.debug("Queue empty")

logger.debug("Taking serial buffer lock")
with serial_read_buffer_lock:
logger.debug("Serial buffer lock taken")
parse_serial_read_buffer()
logger.debug("Lock released")



def mock_message_transport():
logger = logging.getLogger("MockMessageTransport")
logger.info("Starting message transport...")
while True:
fake_msg = json.dumps({"random": random.random(), "time": datetime.datetime.now().isoformat()})
logger.info(f"Message from process: {fake_msg}")
unframed_messages_to_tx.put(fake_msg.encode())

try:
while msg := unframed_messages_from_rx.get_nowait():
logger.info(f"Message from radio: {msg}")
except queue.Empty:
logger.debug("Queue empty")
time.sleep(SLEEP_TIME)

class ListeningFor(enum.IntEnum):
PAYLOAD_SIZE = 0
PAYLOAD = 1

DEFAULT_CHUNK_SIZE = 1024
def receive_all(conn: socket.socket, size: int) -> bytes:
"""
Receive an exact number of bytes from a socket connection.
Implements chunked reading with a reasonable buffer size.
"""
chunks = []
bytes_recd = 0
while bytes_recd < size:
remaining_bytes = size - bytes_recd

chunk = conn.recv(min(remaining_bytes, DEFAULT_CHUNK_SIZE))
if not chunk: # Connection was closed
raise ConnectionError("Socket connection closed while receiving data")
chunks.append(chunk)
bytes_recd += len(chunk)
return b''.join(chunks)

def unix_socket_message_transport():
logger = logging.getLogger("UnixSocketMessageTransport")
logger.info("Starting Unix socket message transport...")
os.unlink(UNIX_SOCKET_PATH)
sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
sock.bind(UNIX_SOCKET_PATH)
sock.listen(1)
logger.info(f"Listening on {UNIX_SOCKET_PATH}")

listening_for = ListeningFor.PAYLOAD_SIZE
expected_payload_size = -1

conn, addr = sock.accept()
while True:
logger.debug(f"Listening for next {listening_for.name}...")
try:
if listening_for == ListeningFor.PAYLOAD_SIZE:
buf = receive_all(conn, 4) # Always read exactly 4 bytes for size
size = struct.unpack('!I', buf)[0]
logger.info(f"Expected payload size: {size}")
expected_payload_size = size
listening_for = ListeningFor.PAYLOAD
elif listening_for == ListeningFor.PAYLOAD:
buf = receive_all(conn, expected_payload_size)
unframed_messages_to_tx.put(buf)
logger.debug("Received message")
listening_for = ListeningFor.PAYLOAD_SIZE

try:
if msg := unframed_messages_from_rx.get_nowait():
conn.sendall(msg)
logger.debug("Message sent")
except queue.Empty:
logger.debug("Queue empty")
except ConnectionError as e:
logger.error(f"Connection error: {e}")
break # Exit loop on connection error
except Exception as e:
logger.error(f"Error receiving data: {e}")
listening_for = ListeningFor.PAYLOAD_SIZE # Reset state on error
continue

time.sleep(SLEEP_TIME)

def main():
logging.basicConfig(level=logging.DEBUG, format='[%(levelname)s] %(asctime)s | %(name)s: %(message)s',)
logger.info("--- STARTING RADIODRIVER PROXY ---")

message_transport_thread = threading.Thread(target=unix_socket_message_transport)
message_transport_thread.start()

serial_manager_thread = threading.Thread(target=serial_manager)
serial_manager_thread.start()

message_formatter_thread = threading.Thread(target=message_formatter)
message_formatter_thread.start()

if __name__ == "__main__":
main()
Binary file removed ws_urc/launch/__pycache__/rover.cpython-310.pyc
Binary file not shown.
Binary file removed ws_urc/launch/__pycache__/rover.cpython-311.pyc
Binary file not shown.