diff --git a/LICENSE b/LICENSE deleted file mode 100644 index d7c8722..0000000 --- a/LICENSE +++ /dev/null @@ -1,21 +0,0 @@ -MIT License - -Copyright (c) 2018 Ed Davisson - -Permission is hereby granted, free of charge, to any person obtaining a copy -of this software and associated documentation files (the "Software"), to deal -in the Software without restriction, including without limitation the rights -to use, copy, modify, merge, publish, distribute, sublicense, and/or sell -copies of the Software, and to permit persons to whom the Software is -furnished to do so, subject to the following conditions: - -The above copyright notice and this permission notice shall be included in all -copies or substantial portions of the Software. - -THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR -IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, -FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE -AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER -LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, -OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE -SOFTWARE. diff --git a/README.md b/README.md index 5cc357c..35f1509 100644 --- a/README.md +++ b/README.md @@ -53,7 +53,7 @@ These are the devices and sensors I tested and used with MudPi successfully. Man Let me know if you are able to confirm tests on any other devices ## License -This project is licensed under the MIT License - see the [LICENSE.md](LICENSE.md) file for details +This project is licensed under the BSD-4-Clause License - see the [LICENSE.md](LICENSE.md) file for details MudPi Smart Garden diff --git a/action.py b/action.py index 357e9ef..865855a 100644 --- a/action.py +++ b/action.py @@ -4,7 +4,6 @@ import subprocess import sys sys.path.append('..') -import variables class Action(): @@ -15,6 +14,10 @@ def __init__(self, config): self.key = config.get("key", None).replace(" ", "_").lower() if config.get("key") is not None else self.name.replace(" ", "_").lower() # Actions will be either objects to publish for events or a command string to execute self.action = config.get("action") + try: + self.r = config["redis"] if config["redis"] is not None else redis.Redis(host='127.0.0.1', port=6379) + except KeyError: + self.r = redis.Redis(host='127.0.0.1', port=6379) return def init_action(self): @@ -31,7 +34,7 @@ def trigger(self, value=None): return def emitEvent(self): - variables.r.publish(self.topic, json.dumps(self.action)) + self.r.publish(self.topic, json.dumps(self.action)) return def runCommand(self, value=None): diff --git a/controls/arduino/button_control.py b/controls/arduino/button_control.py index 622de30..838a7c1 100644 --- a/controls/arduino/button_control.py +++ b/controls/arduino/button_control.py @@ -10,8 +10,8 @@ class ButtonControl(Control): - def __init__(self, pin, name='ButtonControl', key=None, connection=default_connection, analog_pin_mode=False, topic=None): - super().__init__(pin, name=name, key=key, connection=connection, analog_pin_mode=analog_pin_mode) + def __init__(self, pin, name='ButtonControl', key=None, connection=default_connection, analog_pin_mode=False, topic=None, redis_conn=None): + super().__init__(pin, name=name, key=key, connection=connection, analog_pin_mode=analog_pin_mode, redis_conn=redis_conn) self.topic = topic.replace(" ", "/").lower() if topic is not None else 'mudpi/relay/' self.state_counter = 3 self.previous_state = 0 diff --git a/controls/arduino/control.py b/controls/arduino/control.py index 6731ace..90a1341 100644 --- a/controls/arduino/control.py +++ b/controls/arduino/control.py @@ -4,20 +4,23 @@ from nanpy import (ArduinoApi, SerialManager) import sys sys.path.append('..') -import variables default_connection = SerialManager() # Base sensor class to extend all other arduino sensors from. class Control(): - def __init__(self, pin, name='Control', connection=default_connection, analog_pin_mode=False, key=None): + def __init__(self, pin, name='Control', connection=default_connection, analog_pin_mode=False, key=None, redis_conn=None): self.pin = pin self.name = name self.key = key.replace(" ", "_").lower() if key is not None else self.name.replace(" ", "_").lower() self.analog_pin_mode = analog_pin_mode self.connection = connection self.api = ArduinoApi(connection) + try: + self.r = redis_conn if redis_conn is not None else redis.Redis(host='127.0.0.1', port=6379) + except KeyError: + self.r = redis.Redis(host='127.0.0.1', port=6379) return def init_control(self): @@ -46,4 +49,4 @@ def emitEvent(self, data): } } print(message["data"]) - variables.r.publish('controls', json.dumps(message)) \ No newline at end of file + self.r.publish('controls', json.dumps(message)) \ No newline at end of file diff --git a/controls/arduino/potentiometer_control.py b/controls/arduino/potentiometer_control.py index dfe2f7b..29831ba 100644 --- a/controls/arduino/potentiometer_control.py +++ b/controls/arduino/potentiometer_control.py @@ -10,8 +10,8 @@ class PotentiometerControl(Control): - def __init__(self, pin, name='PotentiometerControl', key=None, connection=default_connection, analog_pin_mode=True, topic=None, reading_buffer=3): - super().__init__(pin, name=name, key=key, connection=connection, analog_pin_mode=analog_pin_mode) + def __init__(self, pin, name='PotentiometerControl', key=None, connection=default_connection, analog_pin_mode=True, topic=None, reading_buffer=3, redis_conn=None): + super().__init__(pin, name=name, key=key, connection=connection, analog_pin_mode=analog_pin_mode, redis_conn=redis_conn) self.previous_state = 0 # Reading buffer helps prevent multiple events when values are floating between small amounts self.reading_buffer = reading_buffer diff --git a/controls/arduino/switch_control.py b/controls/arduino/switch_control.py index b6ce966..0de49b4 100644 --- a/controls/arduino/switch_control.py +++ b/controls/arduino/switch_control.py @@ -10,8 +10,8 @@ class SwitchControl(Control): - def __init__(self, pin, name='SwitchControl', key=None, connection=default_connection, analog_pin_mode=False, topic=None): - super().__init__(pin, name=name, key=key, connection=connection, analog_pin_mode=analog_pin_mode) + def __init__(self, pin, name='SwitchControl', key=None, connection=default_connection, analog_pin_mode=False, topic=None, redis_conn=None): + super().__init__(pin, name=name, key=key, connection=connection, analog_pin_mode=analog_pin_mode, redis_conn=redis_conn) self.topic = topic.replace(" ", "/").lower() if topic is not None else 'mudpi/relay/' self.state_counter = 3 self.previous_state = 0 diff --git a/controls/pi/button_control.py b/controls/pi/button_control.py index eaa04d3..36471ea 100644 --- a/controls/pi/button_control.py +++ b/controls/pi/button_control.py @@ -9,8 +9,8 @@ class ButtonControl(Control): - def __init__(self, pin, name='ButtonControl', key=None, resistor=None, edge_detection=None, debounce=None, topic=None): - super().__init__(pin, name=name, key=key, resistor=resistor, edge_detection=edge_detection, debounce=debounce) + def __init__(self, pin, name='ButtonControl', key=None, resistor=None, edge_detection=None, debounce=None, topic=None, redis_conn=None): + super().__init__(pin, name=name, key=key, resistor=resistor, edge_detection=edge_detection, debounce=debounce, redis_conn=redis_conn) self.topic = topic.replace(" ", "/").lower() if topic is not None else 'mudpi/relay/' return diff --git a/controls/pi/control.py b/controls/pi/control.py index a9d0665..9a919bb 100644 --- a/controls/pi/control.py +++ b/controls/pi/control.py @@ -4,17 +4,20 @@ import RPi.GPIO as GPIO import sys sys.path.append('..') -import variables # Base sensor class to extend all other arduino sensors from. class Control(): - def __init__(self, pin, name='Control',key=None, resistor=None, edge_detection=None, debounce=None): + def __init__(self, pin, name='Control',key=None, resistor=None, edge_detection=None, debounce=None, redis_conn=None): self.pin = pin self.name = name self.key = key.replace(" ", "_").lower() if key is not None else self.name.replace(" ", "_").lower() self.gpio = GPIO self.debounce = debounce if debounce is not None else None + try: + self.r = redis_conn if redis_conn is not None else redis.Redis(host='127.0.0.1', port=6379) + except KeyError: + self.r = redis.Redis(host='127.0.0.1', port=6379) if resistor is not None: if resistor == "up" or resistor == GPIO.PUD_UP: @@ -67,4 +70,4 @@ def emitEvent(self, data): } } print(message["data"]) - variables.r.publish('controls', json.dumps(message)) \ No newline at end of file + self.r.publish('controls', json.dumps(message)) \ No newline at end of file diff --git a/controls/pi/switch_control.py b/controls/pi/switch_control.py index 968565a..180ed6b 100644 --- a/controls/pi/switch_control.py +++ b/controls/pi/switch_control.py @@ -9,8 +9,8 @@ class SwitchControl(Control): - def __init__(self, pin, name='SwitchControl', key=None, resistor=None, edge_detection=None, debounce=None, topic=None): - super().__init__(pin, name=name, key=key, resistor=resistor, edge_detection=edge_detection, debounce=debounce) + def __init__(self, pin, name='SwitchControl', key=None, resistor=None, edge_detection=None, debounce=None, topic=None, redis_conn=None): + super().__init__(pin, name=name, key=key, resistor=resistor, edge_detection=edge_detection, debounce=debounce, redis_conn=redis_conn) self.topic = topic.replace(" ", "/").lower() if topic is not None else 'mudpi/relay/' # Keep counter 1 above delay to avoid event on boot self.state_counter = 3 diff --git a/mudpi.py b/mudpi.py index 143e7ab..18ac9da 100755 --- a/mudpi.py +++ b/mudpi.py @@ -2,58 +2,63 @@ import threading import datetime import socket +import redis import time import json import sys -import traceback sys.path.append('..') from action import Action from config_load import loadConfigJson from server.mudpi_server import MudpiServer -from workers.lcd_worker import LCDWorker -from workers.relay_worker import RelayWorker -from workers.camera_worker import CameraWorker +from workers.pi.lcd_worker import LcdWorker +from workers.pi.i2c_worker import PiI2CWorker +from workers.pi.relay_worker import RelayWorker +from workers.pi.camera_worker import CameraWorker +from workers.pi.sensor_worker import PiSensorWorker +from workers.pi.control_worker import PiControlWorker from workers.trigger_worker import TriggerWorker -from workers.pi_sensor_worker import PiSensorWorker -from workers.pi_control_worker import PiControlWorker try: - # Does this prevent the need to install the module if you dont use it? - from workers.arduino_worker import ArduinoWorker + from workers.arduino.arduino_worker import ArduinoWorker NANPY_ENABLED = True except ImportError: NANPY_ENABLED = False try: - # Does this prevent the need to install the module if you dont use it? from workers.adc_worker import ADCMCP3008Worker MCP_ENABLED = True except ImportError: MCP_ENABLED = False - import variables -# __ __ _ _____ _ -#| \/ | | | __ (_) -#| \ / |_ _ __| | |__) | -#| |\/| | | | |/ _` | ___/ | -#| | | | |_| | (_| | | | | -#|_| |_|\__,_|\__,_|_| |_| -# https://mudpi.app - +############################## +# MudPi Core +# Author: Eric Davisson (@theDavisson) +# https://mudpi.app +# MudPi Core is a python library to gather sensor readings, control components, +# and manage devices using a Raspberry Pi on an event based system using redis. +# CONFIGS = {} PROGRAM_RUNNING = True +threads = [] +actions = {} +relays = [] +relayEvents = {} +relay_index = 0 +workers = [] +nodes = [] print(chr(27) + "[2J") print('Loading MudPi Configs...\r', end="", flush=True) -#load the configuration CONFIGS = loadConfigJson() -#Waiting for redis and services to be running +# Singleton redis to prevent connection conflicts +try: + r = redis.Redis(host=CONFIGS['redis'].get('host', '127.0.0.1'), port=int(CONFIGS['redis'].get('port', 6379))) +except KeyError: + r = redis.Redis(host='127.0.0.1', port=6379) +# Waiting for redis and services to be running time.sleep(5) -print('Loading MudPi Configs...\t\033[1;32m Complete\033[0;0m') -time.sleep(1) - -#Clear the console if its open for debugging +print('Loading MudPi Configs...\t\033[1;32m Complete\033[0;0m') print(chr(27) + "[2J") -#Print a display logo for startup +# Print a display logo for startup print("\033[1;32m") print(' __ __ _ _____ _ ') print('| \/ | | | __ (_)') @@ -64,7 +69,8 @@ print('_________________________________________________') print('') print('Eric Davisson @theDavisson') -print('Version: ', CONFIGS.get('version', '0.8.11')) +print('https://mudpi.app') +print('Version: ', CONFIGS.get('version', '0.9.0')) print('\033[0;0m') if CONFIGS['debug'] is True: @@ -80,156 +86,159 @@ GPIO.setwarnings(False) GPIO.setmode(GPIO.BCM) GPIO.cleanup() - #Pause for GPIO to finish + # Pause for GPIO to finish time.sleep(0.1) print('Initializing Garden Control...\t\t\033[1;32m Complete\033[0;0m') print('Preparing Threads for Workers\r', end="", flush=True) - threads = [] - actions = {} - relays = [] - relayEvents = {} - relay_index = 0 - variables.lcd_message = {'line_1': 'Mudpi Control', 'line_2': 'Is Now Running'} - - new_messages_waiting = threading.Event() #Event to signal LCD to pull new messages - main_thread_running = threading.Event() #Event to signal workers to close - system_ready = threading.Event() #Event to tell workers to begin working - camera_available = threading.Event() #Event to signal if camera can be used - main_thread_running.set() #Main event to tell workers to run/shutdown + new_messages_waiting = threading.Event() # Event to signal LCD to pull new messages + main_thread_running = threading.Event() # Event to signal workers to close + system_ready = threading.Event() # Event to tell workers to begin working + camera_available = threading.Event() # Event to signal if camera can be used + lcd_available = threading.Event() # Event to signal if lcd displays can be used + main_thread_running.set() # Main event to tell workers to run/shutdown time.sleep(0.1) print('Preparing Threads for Workers...\t\033[1;32m Complete\033[0;0m') - #l = LCDWorker(new_messages_waiting,main_thread_running,system_ready) - #print('Loading LCD Worker') - #l = l.run() - #threads.append(l) - # Worker for Camera try: - c = CameraWorker(CONFIGS['camera'], main_thread_running, system_ready, camera_available) - print('Loading Pi Camera Worker') - c = c.run() - threads.append(c) - camera_available.set() + if len(CONFIGS["camera"]) > 0: + CONFIGS["camera"]["redis"] = r + c = CameraWorker(CONFIGS['camera'], main_thread_running, system_ready, camera_available) + print('MudPi Camera...\t\t\t\033[1;32m Initializing\033[0;0m') + workers.append(c) + camera_available.set() except KeyError: - print('No Camera Found to Load') + print('MudPi Pi Camera...\t\t\t\033[1;31m Disabled\033[0;0m') - # Workers for pi (Sensors, Controls, Relays) + # Workers for pi (Sensors, Controls, Relays, I2C) try: - for worker in CONFIGS['workers']: - # Create worker for worker - if worker['type'] == "sensor": - pw = PiSensorWorker(worker, main_thread_running, system_ready) - print('Loading Pi Sensor Worker...') - elif worker['type'] == "control": - pw = PiControlWorker(worker, main_thread_running, system_ready) - print('Loading Pi Control Worker...') - elif worker['type'] == "relay": - # Add Relay Worker Here for Better Config Control - print('Loading Pi Relay Worker...') - else: - raise Exception("Unknown Worker Type: " + worker['type']) - pw = pw.run() - if pw is not None: - threads.append(pw) - except KeyError: - print('No Pi Workers Found to Load or Invalid Type') - traceback.print_exc() - + if len(CONFIGS["workers"]) > 0: + for worker in CONFIGS['workers']: + # Create worker for worker + worker["redis"] = r + if worker['type'] == "sensor": + pw = PiSensorWorker(worker, main_thread_running, system_ready) + print('MudPi Sensors...\t\t\t\033[1;32m Initializing\033[0;0m') + elif worker['type'] == "control": + pw = PiControlWorker(worker, main_thread_running, system_ready) + print('MudPi Controls...\t\t\t\033[1;32m Initializing\033[0;0m') + elif worker['type'] == "i2c": + pw = PiI2CWorker(worker, main_thread_running, system_ready) + print('MudPi I2C...\t\t\t\t\033[1;32m Initializing\033[0;0m') + elif worker['type'] == "lcd": + for lcd in worker['lcds']: + lcd["redis"] = r + pw = LcdWorker(lcd, main_thread_running, system_ready, lcd_available) + lcd_available.set() + print('MudPi LCD Displays...\t\t\t\033[1;32m Initializing\033[0;0m') + elif worker['type'] == "relay": + # Add Relay Worker Here for Better Config Control + print('MudPi Relay...\t\t\t\033[1;32m Initializing\033[0;0m') + else: + raise Exception("Unknown Worker Type: " + worker['type']) + workers.append(pw) + except KeyError as e: + print('MudPi Pi Workers...\t\t\t\033[1;31m Disabled\033[0;0m') + print(e) # Worker for relays attached to pi try: - for relay in CONFIGS['relays']: - #Create a threading event for each relay to check status - relayState = { - "available": threading.Event(), #Event to allow relay to activate - "active": threading.Event() #Event to signal relay to open/close - } - #Store the relays under the key or index if no key is found, this way we can reference the right relays - relayEvents[relay.get("key", relay_index)] = relayState - #Create sensor worker for a relay - r = RelayWorker(relay, main_thread_running, system_ready, relayState['available'], relayState['active']) - r = r.run() - #Make the relays available, this event is toggled off elsewhere if we need to disable relays - relayState['available'].set() - relay_index +=1 - if r is not None: - threads.append(r) + if len(CONFIGS["relays"]) > 0: + for relay in CONFIGS['relays']: + relay["redis"] = r + relayState = { + "available": threading.Event(), # Event to allow relay to activate + "active": threading.Event() # Event to signal relay to open/close + } + relayEvents[relay.get("key", relay_index)] = relayState + rw = RelayWorker(relay, main_thread_running, system_ready, relayState['available'], relayState['active']) + workers.append(rw) + # Make the relays available, this event is toggled off elsewhere if we need to disable relays + relayState['available'].set() + relay_index +=1 except KeyError: - print('No Relays Found to Load') - traceback.print_exc() - - # Worker for nodes attached to pi via serial or wifi[esp8266] - # Supported nodes: arduinos, esp8266, ADC-MCP3xxx, probably others - try: - for node in CONFIGS['nodes']: - # Create worker for node - if node['type'] == "arduino": - if NANPY_ENABLED: - t = ArduinoWorker(node, main_thread_running, system_ready) - else: - print('Error Loading Nanpy library. Did you pip3 install -r requirements.txt?') - elif node['type'] == "ADC-MCP3008": - if MCP_ENABLED: - t = ADCMCP3008Worker(node, main_thread_running, system_ready) - else: - print('Error Loading MCP3xxx library. Did you pip3 install -r requirements.txt;?') - else: - raise Exception("Unknown Node Type: " + node['type']) - t = t.run() - if t is not None: - threads.append(t) - except KeyError as e: - print('Invalid or no Nodes found to Load') - traceback.print_exc() - + print('MudPi Relays Workers...\t\t\033[1;31m Disabled\033[0;0m') # Load in Actions try: - for action in CONFIGS["actions"]: - a = Action(action) - a.init_action() - actions[a.key] = a + if len(CONFIGS["actions"]) > 0: + for action in CONFIGS["actions"]: + print('MudPi Actions...\t\t\t\033[1;32m Initializing\033[0;0m') + action["redis"] = r + a = Action(action) + a.init_action() + actions[a.key] = a except KeyError: - print('No Actions Found to Load') - traceback.print_exc() + print('MudPi Actions...\t\t\t\033[1;31m Disabled\033[0;0m') # Worker for Triggers - try: - t = TriggerWorker(CONFIGS['triggers'], main_thread_running, system_ready, actions) - print('Loading Triggers...') - t = t.run() - threads.append(t) + try: + if len(CONFIGS["triggers"]) > 0: + CONFIGS["triggers"]["redis"] = r + t = TriggerWorker(CONFIGS['triggers'], main_thread_running, system_ready, actions) + print('MudPi Triggers...\t\t\t\033[1;32m Initializing\033[0;0m') + workers.append(t) except KeyError: - print('No Triggers Found to Load') - traceback.print_exc() + print('MudPi Triggers...\t\t\t\033[1;31m Disabled\033[0;0m') + + # Worker for nodes attached to pi via serial or wifi[esp8266, esp32] + # Supported nodes: arduinos, esp8266, ADC-MCP3xxx, probably others (esp32 with custom nanpy fork) + try: + if len(CONFIGS["nodes"]) > 0: + for node in CONFIGS['nodes']: + node["redis"] = r + if node['type'] == "arduino": + if NANPY_ENABLED: + print('MudPi Arduino Workers...\t\t\033[1;32m Initializing\033[0;0m') + t = ArduinoWorker(node, main_thread_running, system_ready) + else: + print('Error Loading Nanpy library. Did you pip3 install -r requirements.txt?') + elif node['type'] == "ADC-MCP3008": + if MCP_ENABLED: + print('MudPi ADC Workers...\t\t\033[1;32m Initializing\033[0;0m') + t = ADCMCP3008Worker(node, main_thread_running, system_ready) + else: + print('Error Loading MCP3xxx library. Did you pip3 install -r requirements.txt;?') + else: + raise Exception("Unknown Node Type: " + node['type']) + nodes.append(t) + except KeyError as e: + print('MudPi Node Workers...\t\t\t\033[1;31m Disabled\033[0;0m') + try: + if (CONFIGS['server'] is not None): + print('MudPi Server...\t\t\t\t\033[1;33m Starting\033[0;0m', end='\r', flush=True) + time.sleep(1) + server = MudpiServer(main_thread_running, CONFIGS['server']['host'], CONFIGS['server']['port']) + s = threading.Thread(target=server_worker) + threads.append(s) + s.start() + except KeyError: + print('MudPi Socket Server...\t\t\t\033[1;31m Disabled\033[0;0m') - #Decided not to build server worker (this is replaced with nodejs, expressjs) - #Maybe use this for internal communication across devices if using wireless - def server_worker(): - server.listen() - print('MudPi Server...\t\t\t\t\033[1;33m Starting\033[0;0m', end='\r', flush=True) - time.sleep(1) - server = MudpiServer(main_thread_running, CONFIGS['server']['host'], CONFIGS['server']['port']) - s = threading.Thread(target=server_worker) - threads.append(s) - s.start() + print('MudPi Garden Controls...\t\t\033[1;32m Initialized\033[0;0m') + print('Engaging MudPi Workers...\t\t\033[1;32m \033[0;0m') + for worker in workers: + t = worker.run() + threads.append(t) + time.sleep(.5) + for node in nodes: + t = node.run() + threads.append(t) + time.sleep(.5) time.sleep(.5) print('MudPi Garden Control...\t\t\t\033[1;32m Online\033[0;0m') print('_________________________________________________') system_ready.set() #Workers will not process until system is ready - variables.r.set('started_at', str(datetime.datetime.now())) #Store current time to track uptime + r.set('started_at', str(datetime.datetime.now())) #Store current time to track uptime system_message = {'event':'SystemStarted', 'data':1} - variables.r.publish('mudpi', json.dumps(system_message)) + r.publish('mudpi', json.dumps(system_message)) - - #Hold the program here until its time to graceful shutdown - #This is our pump cycle check, Using redis to determine if pump should activate + # Hold the program here until its time to graceful shutdown while PROGRAM_RUNNING: # Main program loop # add logging or other system operations here... @@ -239,22 +248,20 @@ def server_worker(): PROGRAM_RUNNING = False finally: print('MudPi Shutting Down...') - #Perform any cleanup tasks here... + # Perform any cleanup tasks here... - #load a client on the server to clear it from waiting - # sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - #sock.connect((CONFIGS['SERVER_HOST'], int(CONFIGS['SERVER_PORT']))) - server.sock.shutdown(socket.SHUT_RDWR) - # time.sleep(1) - # sock.close() + try: + server.sock.shutdown(socket.SHUT_RDWR) + except: + pass - #Clear main running event to signal threads to close + # Clear main running event to signal threads to close main_thread_running.clear() - #Shutdown the camera loop + # Shutdown the camera loop camera_available.clear() - #Join all our threads for shutdown + # Join all our threads for shutdown for thread in threads: thread.join() diff --git a/package.json b/package.json index 6f4dfd0..58ca662 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "mudpi-core", - "version": "0.8.8", + "version": "0.9.0", "description": "Configurable automated smart garden for raspberry pi", "bugs": "https://github.com/mudpi/mudpi-core/issues", "contributors": [ diff --git a/requirements.txt b/requirements.txt index 222e642..77afc98 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,7 +1,9 @@ -https://github.com/olixr/nanpy/archive/master.zip +pycron +redis +picamera Adafruit_DHT adafruit-circuitpython-mcp3xxx adafruit-blinka -pycron -redis -picamera \ No newline at end of file +adafruit-circuitpython-bme680 +https://github.com/olixr/nanpy/archive/master.zip +https://github.com/Tim-Jackins/Adafruit_CircuitPython_CharLCD/archive/master.zip \ No newline at end of file diff --git a/sensors/MCP3xxx/sensor.py b/sensors/MCP3xxx/sensor.py index e5eee87..3da291a 100644 --- a/sensors/MCP3xxx/sensor.py +++ b/sensors/MCP3xxx/sensor.py @@ -1,4 +1,5 @@ import adafruit_mcp3xxx.mcp3008 as MCP +import redis # Base sensor class to extend all other mcp3xxx sensors from. @@ -15,14 +16,18 @@ class Sensor: 7: MCP.P7, } - def __init__(self, pin: int, mcp, name='Sensor', key=None): + def __init__(self, pin: int, mcp, name='Sensor', key=None, redis_conn=None): self.pin = pin self.mcp = mcp - self.channel = None + self.topic = None self.name = name self.key = key.replace(" ", "_").lower() if key is not None else self.name\ .replace(" ", str(pin)) + try: + self.r = redis_conn if redis_conn is not None else redis.Redis(host='127.0.0.1', port=6379) + except KeyError: + self.r = redis.Redis(host='127.0.0.1', port=6379) def init_sensor(self): """ @@ -43,8 +48,8 @@ def readRaw(self): Read the sensor(s) but return the raw voltage, useful for debugging :return: """ - return self.channel.voltage + return self.topic.voltage def readPin(self): """ Read the pin from the MCP3xxx as unaltered digital value""" - return self.channel.value + return self.topic.value diff --git a/sensors/MCP3xxx/soil_sensor.py b/sensors/MCP3xxx/soil_sensor.py index 01c760c..7b5e67e 100644 --- a/sensors/MCP3xxx/soil_sensor.py +++ b/sensors/MCP3xxx/soil_sensor.py @@ -8,8 +8,6 @@ sys.path.append('..') -import variables - # Tested using Sun3Drucker Model SX239 # Wet Water = 287 # Dry Air = 584 @@ -20,12 +18,12 @@ class SoilSensor(Sensor): - def __init__(self, pin, mcp, name='SoilSensor', key=None): - super().__init__(pin, name=name, key=key, mcp=mcp) + def __init__(self, pin, mcp, name='SoilSensor', key=None, redis_conn=None): + super().__init__(pin, name=name, key=key, mcp=mcp, redis_conn=redis_conn) return def init_sensor(self): - self.channel = AnalogIn(self.mcp, Sensor.PINS[self.pin]) + self.topic = AnalogIn(self.mcp, Sensor.PINS[self.pin]) def read(self): resistance = self.readPin() @@ -40,7 +38,7 @@ def read(self): moisture = 'Very Wet - ' + str(int(moistpercent)) # print("Resistance: %d" % resistance) # TODO: Put redis store into sensor worker - variables.r.set(self.key, + self.r.set(self.key, resistance) # TODO: CHANGE BACK TO 'moistpercent' (PERSONAL CONFIG) print("moisture: {0}".format(moisture)) diff --git a/sensors/arduino/float_sensor.py b/sensors/arduino/float_sensor.py index 6f98959..3311912 100644 --- a/sensors/arduino/float_sensor.py +++ b/sensors/arduino/float_sensor.py @@ -12,11 +12,10 @@ default_connection = SerialManager(device='/dev/ttyUSB0') #r = redis.Redis(host='127.0.0.1', port=6379) - class FloatSensor(Sensor): - def __init__(self, pin, name='FloatSensor', key=None, connection=default_connection): - super().__init__(pin, name=name, key=key, connection=connection) + def __init__(self, pin, name='FloatSensor', key=None, connection=default_connection, redis_conn=None): + super().__init__(pin, name=name, key=key, connection=connection, redis_conn=redis_conn) return def init_sensor(self): @@ -25,23 +24,8 @@ def init_sensor(self): def read(self): value = self.api.digitalRead(self.pin) - variables.r.set(self.key, value) + self.r.set(self.key, value) return value def readRaw(self): - return self.read() - - -if __name__ == '__main__': - try: - loop_count = 10 - while (loop_count > 0): - sensor = FloatSensor(9) - rainread = sensor.read() - print('Float: ', rainread) - loop_count += 1 - time.sleep(3) - except KeyboardInterrupt: - pass - finally: - print('Float Sensor Closing...') \ No newline at end of file + return self.read() \ No newline at end of file diff --git a/sensors/arduino/humidity_sensor.py b/sensors/arduino/humidity_sensor.py index 697bc61..9bf5291 100644 --- a/sensors/arduino/humidity_sensor.py +++ b/sensors/arduino/humidity_sensor.py @@ -15,8 +15,8 @@ class HumiditySensor(Sensor): - def __init__(self, pin, name='HumiditySensor', key=None, connection=default_connection, model='11'): - super().__init__(pin, name=name, key=key, connection=connection) + def __init__(self, pin, name='HumiditySensor', key=None, connection=default_connection, model='11', api=None, redis_conn=None): + super().__init__(pin, name=name, key=key, connection=connection, redis_conn=redis_conn) self.type = model #DHT11 or DHT22 maybe AM2302 return @@ -25,7 +25,7 @@ def init_sensor(self): sensor_types = { '11': DHT.DHT11, '22': DHT.DHT22, '2301': DHT.AM2301 } - if len(self.type) == 3 and self.type in sensor_types: + if self.type in sensor_types: self.sensor = sensor_types[self.type] else: # print('Sensor Type Error: Defaulting to DHT11') @@ -37,9 +37,9 @@ def read(self): temperature = self.dht.readTemperature(True) humidity = self.dht.readHumidity() data = {'temperature': round(temperature, 2), 'humidity': round(humidity, 2)} - variables.r.set(self.key + '_temperature', temperature) - variables.r.set(self.key + '_humidity', humidity) - variables.r.set(self.key, json.dumps(data)) + self.r.set(self.key + '_temperature', temperature) + self.r.set(self.key + '_humidity', humidity) + self.r.set(self.key, json.dumps(data)) return data def readRaw(self): diff --git a/sensors/arduino/light_sensor.py b/sensors/arduino/light_sensor.py index 79c260c..96cc8ff 100644 --- a/sensors/arduino/light_sensor.py +++ b/sensors/arduino/light_sensor.py @@ -4,14 +4,18 @@ import redis from .sensor import Sensor from nanpy import (ArduinoApi, SerialManager) +import sys +sys.path.append('..') + +import variables default_connection = SerialManager(device='/dev/ttyUSB0') -r = redis.Redis(host='127.0.0.1', port=6379) +#r = redis.Redis(host='127.0.0.1', port=6379) class LightSensor(Sensor): - def __init__(self, pin, name='LightSensor', key=None, connection=default_connection): - super().__init__(pin, name=name, key=key, connection=connection) + def __init__(self, pin, name='LightSensor', key=None, connection=default_connection, redis_conn=None): + super().__init__(pin, name=name, key=key, connection=connection, redis_conn=redis_conn) return def init_sensor(self): @@ -19,19 +23,9 @@ def init_sensor(self): self.api.pinMode(self.pin, self.api.INPUT) def read(self): - ldr_resistance = self.api.analogRead(self.pin) - resistor1 = 10 #1k Resistor in the divider - - Vout = ldr_resistance * 0.0048828125 #Some frequency clock thing related to amps - #lux = 500 / ( resistor1 * ( (5 - Vout) / Vout )) #calculate lux using voltage divider formula with LDR to lux conversion - lux = ( 2500 / Vout - 500 ) / resistor1 - - #print("ldr_resistance: %d" % ldr_resistance) - r.set(self.key, lux) - return lux + light_intesity = self.api.analogRead(self.pin) + self.r.set(self.key, light_intesity) + return light_intesity def readRaw(self): - resistance = self.api.analogRead(self.pin) - #print("Resistance: %d" % resistance) - r.set(self.key+'_raw', resistance) - return resistance \ No newline at end of file + return self.read() \ No newline at end of file diff --git a/sensors/arduino/rain_sensor.py b/sensors/arduino/rain_sensor.py index 73f04c3..2257758 100644 --- a/sensors/arduino/rain_sensor.py +++ b/sensors/arduino/rain_sensor.py @@ -21,8 +21,8 @@ class RainSensor(Sensor): - def __init__(self, pin, name='RainSensor', key=None, connection=default_connection): - super().__init__(pin, name=name, key=key, connection=connection) + def __init__(self, pin, name='RainSensor', key=None, connection=default_connection, redis_conn=None): + super().__init__(pin, name=name, key=key, connection=connection, redis_conn=redis_conn) return def init_sensor(self): @@ -32,13 +32,13 @@ def init_sensor(self): def read(self): rain = self.api.analogRead(self.pin) #TODO: REMOVE (PERSONAL CONFIG) #rain = self.parseSensorReading(self.api.analogRead(self.pin)) - variables.r.set(self.key, rain) + self.r.set(self.key, rain) return rain def readRaw(self): resistance = self.api.analogRead(self.pin) #print("Resistance: %d" % resistance) - variables.r.set(self.key+'_raw', resistance) + self.r.set(self.key+'_raw', resistance) return resistance def parseSensorReading(self, raw_data): diff --git a/sensors/arduino/sensor.py b/sensors/arduino/sensor.py index 3c5313b..e6fd3d4 100644 --- a/sensors/arduino/sensor.py +++ b/sensors/arduino/sensor.py @@ -8,13 +8,17 @@ # Base sensor class to extend all other arduino sensors from. class Sensor(): - def __init__(self, pin, name='Sensor', connection=default_connection, analog_pin_mode=False, key=None): + def __init__(self, pin, name='Sensor', connection=default_connection, analog_pin_mode=False, key=None, api=None, redis_conn=None): self.pin = pin self.name = name self.key = key.replace(" ", "_").lower() if key is not None else self.name.replace(" ", "_").lower() self.analog_pin_mode = analog_pin_mode self.connection = connection - self.api = ArduinoApi(connection) + self.api = api if api is not None else ArduinoApi(connection) + try: + self.r = redis_conn if redis_conn is not None else redis.Redis(host='127.0.0.1', port=6379) + except KeyError: + self.r = redis.Redis(host='127.0.0.1', port=6379) return def init_sensor(self): diff --git a/sensors/arduino/soil_sensor.py b/sensors/arduino/soil_sensor.py index 4b1be2b..dcd6f40 100644 --- a/sensors/arduino/soil_sensor.py +++ b/sensors/arduino/soil_sensor.py @@ -19,8 +19,8 @@ intervals = int((AirBounds - WaterBounds)/3); class SoilSensor(Sensor): - def __init__(self, pin, name='SoilSensor', key=None, connection=default_connection): - super().__init__(pin, name=name, key=key, connection=connection) + def __init__(self, pin, name='SoilSensor', key=None, connection=default_connection, redis_conn=None): + super().__init__(pin, name=name, key=key, connection=connection, redis_conn=redis_conn) return def init_sensor(self): @@ -40,11 +40,11 @@ def read(self): moisture = 'Very Wet - ' + str(int(moistpercent)) #print("Resistance: %d" % resistance) #TODO: Put redis store into sensor worker - variables.r.set(self.key, resistance) #TODO: CHANGE BACK TO 'moistpercent' (PERSONAL CONFIG) + self.r.set(self.key, resistance) #TODO: CHANGE BACK TO 'moistpercent' (PERSONAL CONFIG) return resistance def readRaw(self): resistance = self.api.analogRead(self.pin) #print("Resistance: %d" % resistance) - variables.r.set(self.key+'_raw', resistance) + self.r.set(self.key+'_raw', resistance) return resistance \ No newline at end of file diff --git a/sensors/arduino/temperature_sensor.py b/sensors/arduino/temperature_sensor.py index ff16537..2175264 100644 --- a/sensors/arduino/temperature_sensor.py +++ b/sensors/arduino/temperature_sensor.py @@ -14,8 +14,8 @@ class TemperatureSensor(Sensor): - def __init__(self, pin, name='TemperatureHumiditySensor', key=None, connection=default_connection): - super().__init__(pin, name=name, key=key, connection=connection) + def __init__(self, pin, name='TemperatureHumiditySensor', key=None, connection=default_connection, redis_conn=None): + super().__init__(pin, name=name, key=key, connection=connection, redis_conn=redis_conn) return def init_sensor(self): @@ -35,7 +35,7 @@ def init_sensor(self): #sensor = id of sensor you want in addresses[] def read(self): #temp = self.sensors.getTempF(sensor) - #variables.r.set('temp_'+str(sensor), temp) + #self.r.set('temp_'+str(sensor), temp) #return temp return self.readAll() @@ -45,10 +45,10 @@ def readAll(self): for i in range(self.sensor_bus): temp = self.sensors.getTempC(i) temps['temp_'+str(i)] = temp - #variables.r.set(self.key+'_'+str(i), temp) + #self.r.set(self.key+'_'+str(i), temp) #print("Device %d (%s) " % (i, self.addresses[i])) #print("Let's convert it in Fahrenheit degrees: %0.2f" % DallasTemperature.toFahrenheit(temp)) - variables.r.set(self.key, temps) + self.r.set(self.key, temps) return temps if __name__ == '__main__': diff --git a/sensors/pi/float_sensor.py b/sensors/pi/float_sensor.py index b05bea4..6762a6a 100644 --- a/sensors/pi/float_sensor.py +++ b/sensors/pi/float_sensor.py @@ -8,8 +8,8 @@ class FloatSensor(Sensor): - def __init__(self, pin, name='FloatSensor', key=None): - super().__init__(pin, name=name, key=key) + def __init__(self, pin, name='FloatSensor', key=None, redis_conn=None): + super().__init__(pin, name=name, key=key, redis_conn=redis_conn) return def init_sensor(self): diff --git a/sensors/pi/humidity_sensor.py b/sensors/pi/humidity_sensor.py index 78a8df0..05a95ec 100644 --- a/sensors/pi/humidity_sensor.py +++ b/sensors/pi/humidity_sensor.py @@ -7,15 +7,13 @@ import sys sys.path.append('..') -import variables - #r = redis.Redis(host='127.0.0.1', port=6379) #PIN MODE : OUT | IN class HumiditySensor(Sensor): - def __init__(self, pin, name='HumdityTempSensor', key=None, model='11'): - super().__init__(pin, name=name, key=key) + def __init__(self, pin, name='HumdityTempSensor', key=None, model='11', redis_conn=None): + super().__init__(pin, name=name, key=key, redis_conn=redis_conn) self.type = model return @@ -37,14 +35,13 @@ def read(self): humidity, temperature = Adafruit_DHT.read_retry(self.sensor, self.pin) if humidity is not None and temperature is not None: - variables.r.set(self.key + '_temperature', round(temperature * 1.8 + 32, 2)) - variables.r.set(self.key + '_humidity', humidity) - readings = {'temperature': round(temperature * 1.8 + 32, 2), 'humidity': humidity} - variables.r.set(self.key, json.dumps(readings)) - print('Pi Temp:', readings) + self.r.set(self.key + '_temperature', round(temperature * 1.8 + 32, 2)) + self.r.set(self.key + '_humidity', humidity) + readings = {'temperature': round(temperature * 1.8 + 32, 2), 'humidity': round(humidity, 2)} + self.r.set(self.key, json.dumps(readings)) return readings else: - print('Failed to get reading. Try again!') + print('Failed to get DHT reading. Try again!') def readRaw(self): diff --git a/sensors/pi/i2c/__init__.py b/sensors/pi/i2c/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/sensors/pi/i2c/bme680_sensor.py b/sensors/pi/i2c/bme680_sensor.py new file mode 100644 index 0000000..1af9be6 --- /dev/null +++ b/sensors/pi/i2c/bme680_sensor.py @@ -0,0 +1,49 @@ +import time +import json +import redis +from .sensor import Sensor +import board +from busio import I2C +import adafruit_bme680 + +import sys +sys.path.append('..') + +import variables + + +class Bme680Sensor(Sensor): + + def __init__(self, address = None, name='PressureSensor', key=None, redis_conn=None): + super().__init__(address, name=name, key=key, redis_conn=redis_conn) + return + + def init_sensor(self): + self.sensor = adafruit_bme680.Adafruit_BME680_I2C(self.i2c, debug=False) + # change this to match the location's pressure (hPa) at sea level + self.sensor.sea_level_pressure = 1013.25 + return + + def read(self): + temperature = round((self.sensor.temperature - 5) * 1.8 + 32, 2) + gas = self.sensor.gas + humidity = round(self.sensor.humidity, 1) + pressure = round(self.sensor.pressure, 2) + altitude = round(self.sensor.altitude, 3) + + if humidity is not None and temperature is not None: + self.r.set(self.key + '_temperature', temperature) + self.r.set(self.key + '_humidity', humidity) + self.r.set(self.key + '_gas', gas) + self.r.set(self.key + '_pressure', pressure) + self.r.set(self.key + '_altitude', altitude) + readings = {'temperature': temperature, 'humidity': humidity, 'pressure': pressure, 'gas': gas, 'altitude': altitude} + self.r.set(self.key, json.dumps(readings)) + # print('BME680:', readings) + return readings + else: + print('Failed to get reading [BME680]. Try again!') + + def readRaw(self): + #Read the sensor(s) but return the raw data, useful for debugging + return self.read() diff --git a/sensors/pi/i2c/sensor.py b/sensors/pi/i2c/sensor.py new file mode 100644 index 0000000..7275fc2 --- /dev/null +++ b/sensors/pi/i2c/sensor.py @@ -0,0 +1,42 @@ +import time +import json +import redis +import board +from busio import I2C +import RPi.GPIO as GPIO + +#PIN MODE : OUT | IN + +class Sensor(): + + def __init__(self, address, name='Sensor', key=None, redis_conn=None): + self.address = address + self.name = name + self.key = key.replace(" ", "_").lower() if key is not None else self.name.replace(" ", "_").lower() + self.gpio = GPIO + self.i2c = I2C(board.SCL, board.SDA) + try: + self.r = redis_conn if redis_conn is not None else redis.Redis(host='127.0.0.1', port=6379) + except KeyError: + self.r = redis.Redis(host='127.0.0.1', port=6379) + return + + def init_sensor(self): + #Initialize the sensor here (i.e. set pin mode, get addresses, etc) + #GPIO.setmode(GPIO.BCM) + #GPIO.setup(pin, GPIO.IN) + pass + + def read(self): + #Read the sensor(s), parse the data and store it in redis if redis is configured + #GPIO.input(pin) + pass + + def readRaw(self): + #Read the sensor(s) but return the raw data, useful for debugging + pass + + def readPin(self): + #Read the pin from the ardiuno. Can be analog or digital based on "analog_pin_mode" + data = self.gpio.input(self.pin) + return data \ No newline at end of file diff --git a/sensors/pi/sensor.py b/sensors/pi/sensor.py index 15b0da7..5cc813e 100644 --- a/sensors/pi/sensor.py +++ b/sensors/pi/sensor.py @@ -7,11 +7,15 @@ class Sensor(): - def __init__(self, pin, name='Sensor', key=None): + def __init__(self, pin, name='Sensor', key=None, redis_conn=None): self.pin = pin self.name = name self.key = key.replace(" ", "_").lower() if key is not None else self.name.replace(" ", "_").lower() self.gpio = GPIO + try: + self.r = redis_conn if redis_conn is not None else redis.Redis(host='127.0.0.1', port=6379) + except KeyError: + self.r = redis.Redis(host='127.0.0.1', port=6379) return def init_sensor(self): diff --git a/server/mudpi_server.py b/server/mudpi_server.py index 64111ec..f9d0a21 100644 --- a/server/mudpi_server.py +++ b/server/mudpi_server.py @@ -9,7 +9,7 @@ class MudpiServer(object): - def __init__(self, system_running, host='127.0.0.1', port=6601): + def __init__(self, system_running, host='127.0.0.1', port=7002): self.port = int(port) self.host = host self.system_running = system_running @@ -24,7 +24,7 @@ def __init__(self, system_running, host='127.0.0.1', port=6601): def listen(self): self.sock.listen(10) #number of clients to listen for - print('MudPi Server...\t\t\t\t\033[1;32m Running\033[0;0m ') + print('MudPi Server...\t\t\t\t\033[1;32m Online\033[0;0m ') while self.system_running.is_set(): try: client, address = self.sock.accept() @@ -57,7 +57,7 @@ def listenToClient(self, client, address): if __name__ == "__main__": host = '127.0.0.1' - port = 6002 + port = 7002 server = MudpiServer(host, port) server.listen(); while True: diff --git a/tools/event_send_tool.py b/tools/event_send_tool.py index 2d0ad5e..c9b7ee0 100644 --- a/tools/event_send_tool.py +++ b/tools/event_send_tool.py @@ -15,7 +15,7 @@ def timedMessage(message, delay=3): message = {} r = redis.Redis(host='127.0.0.1', port=6379, decode_responses=True) publisher = r - channel = None + topic = None while option != 0: #Clear the screen command print(chr(27) + "[2J") @@ -53,22 +53,22 @@ def timedMessage(message, delay=3): 'data': "/home/pi/Desktop/mudpi/img/mudpi-0039-2019-04-14-02-21.jpg", 'source': "camera_1" } - channel = 'garden/pi/camera' + topic = 'garden/pi/camera' else: timedMessage('Option not recognized') print(chr(27) + "[2J") continue - if channel is None: - channel = str(input('Enter Channel to Broadcast: ')) + if topic is None: + topic = str(input('Enter Topic to Broadcast: ')) - if channel is not None and channel != '': + if topic is not None and topic != '': #Publish the message - publisher.publish(channel, json.dumps(message)) + publisher.publish(topic, json.dumps(message)) print(message) timedMessage('Message Successfully Published!') else: - timedMessage('Channel Input Invalid') + timedMessage('Topic Input Invalid') time.sleep(2) print('Exit') diff --git a/tools/lcd_message_tool.py b/tools/lcd_message_tool.py new file mode 100644 index 0000000..9e63f1c --- /dev/null +++ b/tools/lcd_message_tool.py @@ -0,0 +1,96 @@ +import redis +import threading +import json +import time + +def timedMessage(message, delay=3): + for s in range(1,delay): + remainingTime = delay - s + print(message + '...{0}s \r'.format(remainingTime), end="", flush=True) + time.sleep(s) + +if __name__ == "__main__": + try: + option = True + message = {} + r = redis.Redis(host='127.0.0.1', port=6379, decode_responses=True) + publisher = r + topic = None + while option != 0: + #Clear the screen command + print(chr(27) + "[2J") + print('--------- LCD MudPi ---------') + print('|4. Clear Message Queue |') + print('|3. Clear Display |') + print('|2. Test Message |') + print('|1. Add Message |') + print('|0. Shutdown |') + print('-------------------------------') + try: + option = int(input('Enter Option: ')) + except: + #Catch string input error + option = 9 + if option != 0: + if option == 1: + try: + msg = { + "message":"", + "duration":10 + } + msg["message"] = str(input('Enter Message to Display: ')) + msg["duration"] = int(input('Enter Duration to Display (seconds): ')) + + except: + msg = { + "message":"Error Test", + "duration":10 + } + message = { + 'event': 'Message', + 'data': msg + } + elif option == 2: + msg = { + "message":"Test Message\nMudPi Test", + "duration":15 + } + message = { + 'event': 'Message', + 'data': msg + } + elif option == 3: + message = { + 'event': 'Clear', + 'data': 1 + } + elif option == 4: + message = { + 'event': 'ClearQueue', + 'data': 1 + } + else: + timedMessage('Option not recognized') + print(chr(27) + "[2J") + continue + + if topic is None: + topic = str(input('Enter the LCD Topic to Broadcast: ')) + + if topic is not None and topic != '': + #Publish the message + publisher.publish(topic, json.dumps(message)) + print(message) + timedMessage('Message Successfully Queued!') + else: + timedMessage('Topic Input Invalid') + time.sleep(2) + + print('Exit') + except KeyboardInterrupt: + #Kill The Server + #r.publish('test', json.dumps({'EXIT':True})) + print('LCD Message Program Terminated...') + finally: + pass + diff --git a/triggers/control_trigger.py b/triggers/control_trigger.py index ee883a0..95acdfc 100644 --- a/triggers/control_trigger.py +++ b/triggers/control_trigger.py @@ -4,20 +4,23 @@ import sys from .trigger import Trigger sys.path.append('..') -import variables class ControlTrigger(Trigger): - def __init__(self, main_thread_running, system_ready, name='ControlTrigger',key=None, source=None, thresholds=None, channel="controls", trigger_active=None, frequency='once', actions=[], group=None): + def __init__(self, main_thread_running, system_ready, name='ControlTrigger',key=None, source=None, thresholds=None, topic="controls", trigger_active=None, frequency='once', actions=[], group=None, redis_conn=None): super().__init__(main_thread_running, system_ready, name=name, key=key, source=source, thresholds=thresholds, trigger_active=trigger_active, frequency=frequency, actions=actions, trigger_interval=0.5, group=group) - self.channel = channel.replace(" ", "_").lower() if channel is not None else "controls" + self.topic = topic.replace(" ", "_").lower() if topic is not None else "controls" + try: + self.r = redis_conn if redis_conn is not None else redis.Redis(host='127.0.0.1', port=6379) + except KeyError: + self.r = redis.Redis(host='127.0.0.1', port=6379) return def init_trigger(self): #Initialize the trigger here (i.e. set listeners or create cron jobs) #Pubsub Listeners - self.pubsub = variables.r.pubsub() - self.pubsub.subscribe(**{self.channel: self.handleEvent}) + self.pubsub = self.r.pubsub() + self.pubsub.subscribe(**{self.topic: self.handleEvent}) pass def check(self): diff --git a/triggers/sensor_trigger.py b/triggers/sensor_trigger.py index 73cbcc2..21a7d38 100644 --- a/triggers/sensor_trigger.py +++ b/triggers/sensor_trigger.py @@ -4,21 +4,24 @@ import sys from .trigger import Trigger sys.path.append('..') -import variables class SensorTrigger(Trigger): - def __init__(self, main_thread_running, system_ready, name='SensorTrigger',key=None, source=None, nested_source=None, thresholds=None, channel="sensors", trigger_active=None, frequency='once', actions=[], group=None): + def __init__(self, main_thread_running, system_ready, name='SensorTrigger',key=None, source=None, nested_source=None, thresholds=None, topic="sensors", trigger_active=None, frequency='once', actions=[], group=None, redis_conn=None): super().__init__(main_thread_running, system_ready, name=name, key=key, source=source, thresholds=thresholds, trigger_active=trigger_active, frequency=frequency, actions=actions, trigger_interval=0.5, group=group) - self.channel = channel.replace(" ", "_").lower() if channel is not None else "sensors" + self.topic = topic.replace(" ", "_").lower() if topic is not None else "sensors" self.nested_source = nested_source.lower() if nested_source is not None else nested_source + try: + self.r = redis_conn if redis_conn is not None else redis.Redis(host='127.0.0.1', port=6379) + except KeyError: + self.r = redis.Redis(host='127.0.0.1', port=6379) return def init_trigger(self): #Initialize the trigger here (i.e. set listeners or create cron jobs) #Pubsub Listeners - self.pubsub = variables.r.pubsub() - self.pubsub.subscribe(**{self.channel: self.handleEvent}) + self.pubsub = self.r.pubsub() + self.pubsub.subscribe(**{self.topic: self.handleEvent}) pass def check(self): diff --git a/triggers/time_trigger.py b/triggers/time_trigger.py index 48629df..0a50c1c 100644 --- a/triggers/time_trigger.py +++ b/triggers/time_trigger.py @@ -9,7 +9,6 @@ except ImportError: CRON_ENABLED = False sys.path.append('..') -import variables class TimeTrigger(Trigger): diff --git a/triggers/trigger.py b/triggers/trigger.py index 9196ba3..2f9f1af 100644 --- a/triggers/trigger.py +++ b/triggers/trigger.py @@ -4,7 +4,6 @@ import threading import sys sys.path.append('..') -import variables class Trigger(): diff --git a/triggers/trigger_group.py b/triggers/trigger_group.py index 83ec9e7..b8a48e3 100644 --- a/triggers/trigger_group.py +++ b/triggers/trigger_group.py @@ -4,7 +4,6 @@ import threading import sys sys.path.append('..') -import variables class TriggerGroup(): diff --git a/variables.py b/variables.py index 3d2bc3f..1388c2d 100644 --- a/variables.py +++ b/variables.py @@ -1,11 +1,5 @@ -import redis - -lcd_message = {'line_1': 'Temperature: ', 'line_2': 'Humidity: '} PREVIOUS_LINE="\x1b[1F" RED_BACK="\x1b[41;37m" GREEN_BACK="\x1b[42;30m" YELLOW_BACK="\x1b[43;30m" RESET="\x1b[0m" - -#Singleton redis to prevent connection conflicts -r = redis.Redis(host='127.0.0.1', port=6379) \ No newline at end of file diff --git a/workers/adc_worker.py b/workers/adc_worker.py index 58aaa51..736f83c 100644 --- a/workers/adc_worker.py +++ b/workers/adc_worker.py @@ -3,11 +3,10 @@ import board import adafruit_mcp3xxx.mcp3008 as MCP from adafruit_mcp3xxx.analog_in import AnalogIn - import threading -import variables import time import json +import redis import importlib @@ -40,6 +39,10 @@ def __init__(self, config: dict, main_thread_running, system_ready): self.main_thread_running = main_thread_running self.system_ready = system_ready self.node_ready = False + try: + self.r = redis_conn if redis_conn is not None else redis.Redis(host='127.0.0.1', port=6379) + except KeyError: + self.r = redis.Redis(host='127.0.0.1', port=6379) spi = busio.SPI(clock=board.SCK, MISO=board.MISO, MOSI=board.MOSI) cs = digitalio.DigitalInOut(ADCMCP3008Worker.PINS[config['pin']]) @@ -85,7 +88,7 @@ def run(self): t = threading.Thread(target=self.work, args=()) t.start() print(str(self.config['name']) + ' Node Worker [' + str( - len(self.config['sensors'])) + ' Sensors]...\t\033[1;32m Running\033[0;0m') + len(self.config['sensors'])) + ' Sensors]...\t\033[1;32m Online\033[0;0m') return t else: print("Node Connection...\t\t\t\033[1;31m Failed\033[0;0m") @@ -104,7 +107,7 @@ def work(self): print(readings) message['data'] = readings - variables.r.publish('sensors', json.dumps(message)) + self.r.publish('sensors', json.dumps(message)) time.sleep(15) # This is only ran after the main thread is shut down diff --git a/workers/arduino/__init__.py b/workers/arduino/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/workers/arduino_control_worker.py b/workers/arduino/arduino_control_worker.py similarity index 81% rename from workers/arduino_control_worker.py rename to workers/arduino/arduino_control_worker.py index 4b39ebd..e6aec4a 100644 --- a/workers/arduino_control_worker.py +++ b/workers/arduino/arduino_control_worker.py @@ -6,6 +6,7 @@ from nanpy import (SerialManager) from nanpy.serialmanager import SerialManagerError from nanpy.sockconnection import (SocketManager, SocketManagerError) +from .worker import Worker import sys sys.path.append('..') @@ -14,35 +15,22 @@ #r = redis.Redis(host='127.0.0.1', port=6379) -class ArduinoControlWorker(): +class ArduinoControlWorker(Worker): def __init__(self, config, main_thread_running, system_ready, node_connected, connection=None): - #self.config = {**config, **self.config} - self.config = config - self.main_thread_running = main_thread_running - self.system_ready = system_ready + super().__init__(config, main_thread_running, system_ready) self.controls_ready = False self.node_connected = node_connected self.connection = connection self.controls = [] + if node_connected.is_set(): - self.init_controls() - self.controls_ready = True + self.init() + self.controls_ready = True return - def dynamic_import(self, path): - components = path.split('.') - - s = '' - for component in components[:-1]: - s += component + '.' - - parent = importlib.import_module(s[:-1]) - sensor = getattr(parent, components[-1]) - return sensor - - def init_controls(self): + def init(self): try: for control in self.config['controls']: if control.get('type', None) is not None: @@ -52,7 +40,6 @@ def init_controls(self): analog_pin_mode = False if control.get('is_digital', False) else True imported_control = self.dynamic_import(control_type) - #new_control = imported_control(control.get('pin'), name=control.get('name', control.get('type')), connection=self.connection, key=control.get('key', None)) # Define default kwargs for all control types, conditionally include optional variables below if they exist control_kwargs = { @@ -71,6 +58,7 @@ def init_controls(self): new_control.init_control() self.controls.append(new_control) + self.controls_ready = True print('{type} Control {pin}...\t\t\t\033[1;32m Ready\033[0;0m'.format(**control)) except (SerialManagerError, SocketManagerError, BrokenPipeError, ConnectionResetError, OSError, socket.timeout) as e: # Connection error. Reset everything for reconnect @@ -85,7 +73,6 @@ def run(self): return t def work(self): - while self.main_thread_running.is_set(): if self.system_ready.is_set(): if self.node_connected.is_set(): @@ -103,8 +90,7 @@ def work(self): time.sleep(15) else: # Worker connected but controls not initialized - self.init_controls() - self.controls_ready = True + self.init() else: # Node not connected. Wait for reconnect self.controls_ready = False diff --git a/workers/arduino/arduino_relay_worker.py b/workers/arduino/arduino_relay_worker.py new file mode 100644 index 0000000..c5c07eb --- /dev/null +++ b/workers/arduino/arduino_relay_worker.py @@ -0,0 +1,174 @@ +import time +import datetime +import json +import redis +import threading +import sys +import socket +from nanpy import (SerialManager, ArduinoApi) +from nanpy.serialmanager import SerialManagerError +from nanpy.sockconnection import (SocketManager, SocketManagerError) +from .worker import Worker +sys.path.append('..') + +import variables + +class ArduinoRelayWorker(Worker): + def __init__(self, config, main_thread_running, system_ready, relay_available, relay_active, node_connected, connection=None, api=None): + super().__init__(config, main_thread_running, system_ready) + self.config['pin'] = int(self.config['pin']) # parse possbile strings to avoid errors + + # Events + self.main_thread_running = main_thread_running + self.system_ready = system_ready + self.relay_available = relay_available + self.relay_active = relay_active + self.node_connected = node_connected + + # Dynamic Properties based on config + self.active = False + self.relay_ready = False + self.topic = self.config['topic'].replace(" ", "/").lower() if self.config['topic'] is not None else 'mudpi/relay/*' + + # Pubsub Listeners + self.pubsub = self.r.pubsub() + self.pubsub.subscribe(**{self.topic: self.handleMessage}) + self.api = api + + if self.node_connected.is_set(): + self.init() + return + + def init(self): + print('{name} Relay Worker {key}...\t\t\033[1;32m Initializing\033[0;0m'.format(**self.config)) + self.api = self.api if self.api is not None else ArduinoApi(connection) + self.pin_state_off = self.api.HIGH if self.config['normally_open'] is not None and self.config['normally_open'] else self.api.LOW + self.pin_state_on = self.api.LOW if self.config['normally_open'] is not None and self.config['normally_open'] else self.api.HIGH + self.api.pinMode(self.config['pin'], self.api.OUTPUT) + #Close the relay by default, we use the pin state we determined based on the config at init + self.api.digitalWrite(self.config['pin'], self.pin_state_off) + time.sleep(0.1) + + #Feature to restore relay state in case of crash or unexpected shutdown. This will check for last state stored in redis and set relay accordingly + if(self.config.get('restore_last_known_state', None) is not None and self.config.get('restore_last_known_state', False) is True): + if(self.r.get(self.config['key']+'_state')): + self.api.digitalWrite(self.config['pin'], self.pin_state_on) + print('Restoring Relay \033[1;36m{0} On\033[0;0m'.format(self.config['key'])) + + self.relay_ready = True + return + + def run(self): + t = threading.Thread(target=self.work, args=()) + t.start() + print('Node Relay {key} Worker...\t\t\033[1;32m Online\033[0;0m'.format(**self.config)) + return t + + def decodeMessageData(self, message): + if isinstance(message, dict): + #print('Dict Found') + return message + elif isinstance(message.decode('utf-8'), str): + try: + temp = json.loads(message.decode('utf-8')) + #print('Json Found') + return temp + except: + #print('Json Error. Str Found') + return {'event':'Unknown', 'data':message} + else: + #print('Failed to detect type') + return {'event':'Unknown', 'data':message} + + def handleMessage(self, message): + data = message['data'] + if data is not None: + decoded_message = self.decodeMessageData(data) + try: + if decoded_message['event'] == 'Switch': + if decoded_message.get('data', None): + self.relay_active.set() + elif decoded_message.get('data', None) == 0: + self.relay_active.clear() + print('Switch Relay \033[1;36m{0}\033[0;0m state to \033[1;36m{1}\033[0;0m'.format(self.config['key'], decoded_message['data'])) + elif decoded_message['event'] == 'Toggle': + state = 'Off' if self.active else 'On' + if self.relay_active.is_set(): + self.relay_active.clear() + else: + self.relay_active.set() + print('Toggle Relay \033[1;36m{0} {1} \033[0;0m'.format(self.config['key'], state)) + except: + print('Error Decoding Message for Relay {0}'.format(self.config['key'])) + + def elapsedTime(self): + self.time_elapsed = time.perf_counter() - self.time_start + return self.time_elapsed + + def resetElapsedTime(self): + self.time_start = time.perf_counter() + pass + + def turnOn(self): + #Turn on relay if its available + if self.relay_available.is_set(): + if not self.active: + self.api.digitalWrite(self.config['pin'], self.pin_state_on) + message = {'event':'StateChanged', 'data':1} + self.r.set(self.config['key']+'_state', 1) + self.r.publish(self.topic, json.dumps(message)) + self.active = True + #self.relay_active.set() This is handled by the redis listener now + self.resetElapsedTime() + + def turnOff(self): + #Turn off volkeye to flip off relay + if self.relay_available.is_set(): + if self.active: + self.api.digitalWrite(self.config['pin'], self.pin_state_off) + message = {'event':'StateChanged', 'data':0} + self.r.delete(self.config['key']+'_state') + self.r.publish(self.topic, json.dumps(message)) + #self.relay_active.clear() This is handled by the redis listener now + self.active = False + self.resetElapsedTime() + + def work(self): + self.resetElapsedTime() + while self.main_thread_running.is_set(): + if self.system_ready.is_set(): + if self.node_connected.is_set(): + if self.relay_ready: + try: + self.pubsub.get_message() + if self.relay_available.is_set(): + if self.relay_active.is_set(): + self.turnOn() + else: + self.turnOff() + else: + self.turnOff() + time.sleep(1) + except e: + print("Node Relay Worker \033[1;36m{key}\033[0;0m \t\033[1;31m Unexpected Error\033[0;0m".format(**self.config)) + print(e) + else: + self.init() + else: + # Node offline + self.relay_ready = False + time.sleep(5) + + else: + #System not ready relay should be off + self.turnOff() + time.sleep(1) + self.resetElapsedTime() + + time.sleep(0.1) + + + #This is only ran after the main thread is shut down + #Close the pubsub connection + self.pubsub.close() + print("Node Relay {key} Shutting Down...\t\033[1;32m Complete\033[0;0m".format(**self.config)) \ No newline at end of file diff --git a/workers/arduino_sensor_worker.py b/workers/arduino/arduino_sensor_worker.py similarity index 72% rename from workers/arduino_sensor_worker.py rename to workers/arduino/arduino_sensor_worker.py index bb5a7bf..9e23630 100644 --- a/workers/arduino_sensor_worker.py +++ b/workers/arduino/arduino_sensor_worker.py @@ -3,47 +3,39 @@ import threading import random import socket +from .worker import Worker from nanpy import (SerialManager) from nanpy.serialmanager import SerialManagerError from nanpy.sockconnection import (SocketManager, SocketManagerError) +from sensors.arduino.rain_sensor import (RainSensor) +from sensors.arduino.soil_sensor import (SoilSensor) +from sensors.arduino.float_sensor import (FloatSensor) +from sensors.arduino.light_sensor import (LightSensor) +from sensors.arduino.humidity_sensor import (HumiditySensor) +from sensors.arduino.temperature_sensor import (TemperatureSensor) import sys sys.path.append('..') -import variables import importlib #r = redis.Redis(host='127.0.0.1', port=6379) -class ArduinoSensorWorker(): - def __init__(self, config, main_thread_running, system_ready, node_connected, connection=None): - #self.config = {**config, **self.config} - self.config = config - self.main_thread_running = main_thread_running - self.system_ready = system_ready - self.sleep_duration = config.get('sleep_duration', 15) - self.channel = config.get('channel', 'sensors').replace(" ", "_").lower() +class ArduinoSensorWorker(Worker): + def __init__(self, config, main_thread_running, system_ready, node_connected, connection=None, api=None): + super().__init__(config, main_thread_running, system_ready) + self.topic = config.get('topic', 'sensors').replace(" ", "_").lower() self.sensors_ready = False self.node_connected = node_connected self.connection = connection + self.api = api self.sensors = [] if node_connected.is_set(): - self.init_sensors() - self.sensors_ready = True + self.init() + self.sensors_ready = True return - def dynamic_import(self, path): - components = path.split('.') - - s = '' - for component in components[:-1]: - s += component + '.' - - parent = importlib.import_module(s[:-1]) - sensor = getattr(parent, components[-1]) - - return sensor - - def init_sensors(self, connection=None): + def init(self, connection=None): + print('{name} Sensor Worker...\t\t\033[1;32m Preparing\033[0;0m'.format(**self.config)) try: for sensor in self.config['sensors']: if sensor.get('type', None) is not None: @@ -66,13 +58,16 @@ def init_sensors(self, connection=None): # optional sensor variables # Model is specific to DHT modules to specify DHT11(11) DHT22(22) or DHT2301(21) if sensor.get('model'): - sensor_kwargs['model'] = sensor.get('model') + sensor_kwargs['model'] = str(sensor.get('model')) + sensor_kwargs['api'] = self.api new_sensor = imported_sensor(**sensor_kwargs) + # print('{type} Sensor {pin}...\t\t\t\033[1;32m Preparing\033[0;0m'.format(**sensor)) new_sensor.init_sensor() self.sensors.append(new_sensor) - print('{type} Sensor {pin}...\t\t\t\033[1;32m Ready\033[0;0m'.format(**sensor)) + # print('{type} Sensor {pin}...\t\t\t\033[1;32m Ready\033[0;0m'.format(**sensor)) + self.sensors_ready = True except (SerialManagerError, SocketManagerError, BrokenPipeError, ConnectionResetError, OSError, socket.timeout) as e: # Connection error. Reset everything for reconnect self.sensors_ready = False @@ -83,6 +78,7 @@ def init_sensors(self, connection=None): def run(self): t = threading.Thread(target=self.work, args=()) t.start() + print('Node {name} Sensor Worker...\t\t\033[1;32m Online\033[0;0m'.format(**self.config)) return t def work(self): @@ -98,12 +94,11 @@ def work(self): readings[sensor.key] = result #r.set(sensor.get('key', sensor.get('type')), value) - print(readings) + print("Node Readings: ", readings) message['data'] = readings - variables.r.publish(self.channel, json.dumps(message)) + self.r.publish(self.topic, json.dumps(message)) except (SerialManagerError, SocketManagerError, BrokenPipeError, ConnectionResetError, OSError, socket.timeout) as e: print('\033[1;36m{name}\033[0;0m -> \033[1;33mSensors Timeout!\033[0;0m'.format(**self.config)) - self.sensors_ready = False self.sensors = [] self.node_connected.clear() time.sleep(15) @@ -113,8 +108,8 @@ def work(self): self.sensors_ready = True else: #Node not connected, sensors not ready. Wait for reconnect - self.sensors_ready = False self.sensors = [] + self.sensors_ready = False # Main loop delay between cycles time.sleep(self.sleep_duration) diff --git a/workers/arduino_worker.py b/workers/arduino/arduino_worker.py similarity index 64% rename from workers/arduino_worker.py rename to workers/arduino/arduino_worker.py index 7270c8a..72f350a 100644 --- a/workers/arduino_worker.py +++ b/workers/arduino/arduino_worker.py @@ -6,44 +6,71 @@ from nanpy import (SerialManager, ArduinoApi) from nanpy.serialmanager import SerialManagerError from nanpy.sockconnection import (SocketManager, SocketManagerError) -from workers.arduino_control_worker import ArduinoControlWorker -from workers.arduino_sensor_worker import ArduinoSensorWorker +from workers.arduino.arduino_control_worker import ArduinoControlWorker +from workers.arduino.arduino_sensor_worker import ArduinoSensorWorker +from workers.arduino.arduino_relay_worker import ArduinoRelayWorker +from .worker import Worker import sys sys.path.append('..') -import variables import importlib #r = redis.Redis(host='127.0.0.1', port=6379) -class ArduinoWorker(): +class ArduinoWorker(Worker): def __init__(self, config, main_thread_running, system_ready, connection=None): - #self.config = {**config, **self.config} - self.config = config - self.main_thread_running = main_thread_running - self.system_ready = system_ready - self.sleep_duration = config.get('sleep_duration', 15) + super().__init__(config, main_thread_running, system_ready) self.connection = connection self.threads = [] + + # Events self.node_ready = threading.Event() - self.node_connected = threading.Event() #Event to signal if camera can be used + self.node_connected = threading.Event() # Event to signal if node can be used + self.workers = [] + self.relays = [] + self.relayEvents = {} + self.relay_index = 0 + if connection is None: self.connection = self.connect() - if self.config['controls'] is not None: - acw = ArduinoControlWorker(self.config, main_thread_running, system_ready, self.node_connected, self.connection) - self.workers.append(acw) - acw = acw.run() - if acw is not None: - self.threads.append(acw) + try: + if self.config['controls'] is not None: + acw = ArduinoControlWorker(self.config, main_thread_running, system_ready, self.node_connected, self.connection) + self.workers.append(acw) + time.sleep(3) + except KeyError: + print('{name} Node Controls...\t\t\033[1;31m Disabled\033[0;0m'.format(**self.config)) + + try: + if self.config['relays'] is not None: + for relay in self.config['relays']: + #Create a threading event for each relay to check status + relayState = { + "available": threading.Event(), #Event to allow relay to activate + "active": threading.Event() #Event to signal relay to open/close + } + #Store the relays under the key or index if no key is found, this way we can reference the right relays + self.relayEvents[relay.get("key", self.relay_index)] = relayState + #Create sensor worker for a relay + arw = ArduinoRelayWorker(relay, main_thread_running, system_ready, relayState['available'], relayState['active'], self.node_connected, self.connection, self.api) + #Make the relays available, this event is toggled off elsewhere if we need to disable relays + relayState['available'].set() + self.relay_index +=1 + self.workers.append(arw) + time.sleep(3) + except KeyError: + print('{name} Node Relays...\t\t\033[1;31m Disabled\033[0;0m'.format(**self.config)) + + try: + if self.config['sensors'] is not None: + asw = ArduinoSensorWorker(self.config, main_thread_running, system_ready, self.node_connected, self.connection, self.api) + self.workers.append(asw) + time.sleep(3) + except KeyError: + print('{name} Node Sensors...\t\t\033[1;31m Disabled\033[0;0m'.format(**self.config)) - if self.config['sensors'] is not None: - asw = ArduinoSensorWorker(self.config, main_thread_running, system_ready, self.node_connected, self.connection) - self.workers.append(asw) - asw = asw.run() - if asw is not None: - self.threads.append(asw) return def connect(self): @@ -52,7 +79,7 @@ def connect(self): if self.config.get('use_wifi', False): while attempts > 0 and self.main_thread_running.is_set(): try: - print('\033[1;36m{0}\033[0;0m -> Connecting... \t'.format(self.config["name"], (3-attempts)), end='\r', flush=True) + print('\033[1;36m{0}\033[0;0m -> Connecting... \t'.format(self.config["name"], (3-attempts))) attempts-= 1 conn = SocketManager(host=str(self.config.get('address', 'mudpi.local'))) # Test the connection with api @@ -60,7 +87,7 @@ def connect(self): except (SocketManagerError, BrokenPipeError, ConnectionResetError, socket.timeout) as e: print('{name} -> Connecting...\t\t\033[1;33m Timeout\033[0;0m '.format(**self.config)) if attempts > 0: - print('{name} -> Preparing Reconnect... \t'.format(**self.config), end='\r', flush=True) + print('{name} -> Preparing Reconnect... \t'.format(**self.config)) else: print('{name} -> Connection Attempts...\t\033[1;31m Failed\033[0;0m '.format(**self.config)) conn = None @@ -109,10 +136,15 @@ def resetConnection(self): def run(self): + for worker in self.workers: + t = worker.run() + self.threads.append(t) + time.sleep(1) + t = threading.Thread(target=self.work, args=()) t.start() if self.node_ready.is_set(): - print(str(self.config['name']) +' Node Worker '+ '[S: ' + str(len(self.config['sensors'])) + ']' + '[C: ' + str(len(self.config['controls'])) + ']...\t\033[1;32m Running\033[0;0m') + print(str(self.config['name']) +' Node Worker '+ '[S: ' + str(len(self.config['sensors'])) + ']' + '[C: ' + str(len(self.config['controls'])) + ']...\t\033[1;32m Online\033[0;0m') else: print(str(self.config['name']) +'...\t\t\t\t\033[1;33m Pending Reconnect\033[0;0m ') return t @@ -130,7 +162,7 @@ def work(self): # Node reconnection cycle if not self.node_connected.is_set(): # Random delay before connections to offset multiple attempts (1-5 min delay) - random_delay = random.randrange(60,300) * delay_multiplier + random_delay = (random.randrange(30, self.config.get("max_reconnect_delay", 300)) * delay_multiplier) / 2 time.sleep(10) print('\033[1;36m'+str(self.config['name']) +'\033[0;0m -> Retrying in '+ '{0}s...'.format(random_delay)+'\t\033[1;33m Pending Reconnect\033[0;0m ') # Two separate checks for main thread event to prevent re-connections during shutdown @@ -147,8 +179,8 @@ def work(self): # Main loop delay between cycles time.sleep(self.sleep_duration) - #This is only ran after the main thread is shut down - #Join all our sub threads for shutdown + # This is only ran after the main thread is shut down + # Join all our sub threads for shutdown for thread in self.threads: thread.join() print("{name} Shutting Down...\t\t\033[1;32m Complete\033[0;0m".format(**self.config)) \ No newline at end of file diff --git a/workers/arduino/worker.py b/workers/arduino/worker.py new file mode 100644 index 0000000..8ce513c --- /dev/null +++ b/workers/arduino/worker.py @@ -0,0 +1,70 @@ +import time +import datetime +import json +import redis +import threading +import sys +sys.path.append('..') + +# Base Worker Class +# A worker is responsible for handling its set of operations and running on a thread +class Worker(): + def __init__(self, config, main_thread_running, system_ready): + self.config = config + try: + self.r = config["redis"] + except KeyError: + self.r = redis.Redis(host='127.0.0.1', port=6379) + self.topic = config.get('topic', 'mudpi').replace(" ", "_").lower() + self.sleep_duration = config.get('sleep_duration', 15) + + # Threading Events to Keep Everything in Sync + self.main_thread_running = main_thread_running + self.system_ready = system_ready + self.worker_available = threading.Event() + + self.api = None + self.components = [] + return + + def init(self): + # print('Worker...\t\t\t\033[1;32m Initializing\033[0;0m'.format(**control)) + return + + def run(self): + t = threading.Thread(target=self.work, args=()) + t.start() + return t + + def work(self): + while self.main_thread_running.is_set(): + if self.system_ready.is_set(): + time.sleep(self.sleep_duration) + #This is only ran after the main thread is shut down + print("Worker Shutting Down...\t\033[1;32m Complete\033[0;0m") + + def dynamic_import(self, name): + # Split path of the class folder structure: {sensor name}_sensor . {SensorName}Sensor + components = name.split('.') + # Dynamically import root of component path + module = __import__(components[0]) + # Get component attributes + for component in components[1:]: + module = getattr(module, component) + return module + + def decodeMessageData(self, message): + if isinstance(message, dict): + #print('Dict Found') + return message + elif isinstance(message.decode('utf-8'), str): + try: + temp = json.loads(message.decode('utf-8')) + #print('Json Found') + return temp + except: + #print('Json Error. Str Found') + return {'event':'Unknown', 'data':message} + else: + #print('Failed to detect type') + return {'event':'Unknown', 'data':message} \ No newline at end of file diff --git a/workers/lcd_worker.py b/workers/lcd_worker.py deleted file mode 100644 index c377019..0000000 --- a/workers/lcd_worker.py +++ /dev/null @@ -1,207 +0,0 @@ - - -# The wiring for the LCD is as follows: -# 1 : GND -# 2 : 5V -# 3 : Contrast (0-5V)* -# 4 : RS (Register Select) -# 5 : R/W (Read Write) - GROUND THIS PIN -# 6 : Enable or Strobe -# 7 : Data Bit 0 - NOT USED -# 8 : Data Bit 1 - NOT USED -# 9 : Data Bit 2 - NOT USED -# 10: Data Bit 3 - NOT USED -# 11: Data Bit 4 -# 12: Data Bit 5 -# 13: Data Bit 6 -# 14: Data Bit 7 -# 15: LCD Backlight +5V** -# 16: LCD Backlight GND - -import RPi.GPIO as GPIO -import time -import redis -import json -import threading -import logging - -import sys -sys.path.append('..') -import variables - -# Define GPIO to LCD mapping -LCD_RS = 7 -LCD_E = 8 -LCD_D4 = 25 -LCD_D5 = 24 -LCD_D6 = 23 -LCD_D7 = 18 - -# Define some device constants -LCD_WIDTH = 16 # Maximum characters per line -LCD_CHR = True -LCD_CMD = False - -LCD_LINE_1 = 0x80 # LCD RAM address for the 1st line -LCD_LINE_2 = 0xC0 # LCD RAM address for the 2nd line - -# Timing constants -E_PULSE = 0.0005 -E_DELAY = 0.0005 - -MESSAGE_QUEUE = [] - -#r = redis.Redis(host='127.0.0.1', port=6379) -r = variables.r - -class LCDWorker(): - - def __init__(self, new_messages_waiting, main_thread_running, system_ready): - self.new_messages_waiting = new_messages_waiting - self.main_thread_running = main_thread_running - self.system_ready = system_ready - return - - def run(self): - t = threading.Thread(target=self.process_loop, args=()) - t.start() - print('LCD Worker...\t\t\t\t\033[1;32m Running\033[0;0m') - return t - - def process_loop(self): - self.prepare_gpio() - self.prepare_messages() - - try: - while self.main_thread_running.is_set(): - if(self.system_ready.is_set()): - global MESSAGE_QUEUE - - #print('Message Queue Begin:') - for msg in MESSAGE_QUEUE: - if not (self.main_thread_running.is_set()): - return - - #print('LCD MESSAGE\nLine 1: %s \nLine 2: %s' % (msg['line_1'],msg['line_2'])) - # Send some test - self.lcd_string(msg['line_1'],LCD_LINE_1) - self.lcd_string(msg['line_2'],LCD_LINE_2) - - time.sleep(3) # 3 second delay - - #Display Control Messages - #print('Main Messages Begin:') - self.lcd_string(variables.lcd_message['line_1'],LCD_LINE_1) - self.lcd_string(variables.lcd_message['line_2'],LCD_LINE_2) - time.sleep(3) - - if ((not MESSAGE_QUEUE) or (self.new_messages_waiting.is_set()) and (self.main_thread_running.is_set())): - #print('|| LCD Loading New Messages ||') - time.sleep(5) - self.prepare_messages() - #Clear the event that tells us we should download messages - self.new_messages_waiting.clear() - - #This is after the main thread has ended, clear out the lcd display - print('LCD Worker Shutting Down...\t\t\033[1;32m Complete\033[0;0m') - self.lcd_byte(0x01, LCD_CMD) - self.lcd_string("Garden Control",LCD_LINE_1) - self.lcd_string("Shutting Down...",LCD_LINE_2) - GPIO.cleanup() - except KeyboardInterrupt: - self.lcd_byte(0x01, LCD_CMD) - self.lcd_string("Garden Control",LCD_LINE_1) - self.lcd_string("Shutting Down...",LCD_LINE_2) - GPIO.cleanup() - - def prepare_messages(self): - global MESSAGE_QUEUE - if r.exists('lcdmessages'): - MESSAGE_QUEUE = json.loads(r.get('lcdmessages').decode('utf-8')) - #print('Message Pulled:', MESSAGE_QUEUE) - - - def prepare_gpio(self): - # Main program block - GPIO.setwarnings(False) - GPIO.setmode(GPIO.BCM) # Use BCM GPIO numbers - GPIO.setup(LCD_E, GPIO.OUT) # E - GPIO.setup(LCD_RS, GPIO.OUT) # RS - GPIO.setup(LCD_D4, GPIO.OUT) # DB4 - GPIO.setup(LCD_D5, GPIO.OUT) # DB5 - GPIO.setup(LCD_D6, GPIO.OUT) # DB6 - GPIO.setup(LCD_D7, GPIO.OUT) # DB7 - # Initialise display - self.lcd_init() - - - def lcd_init(self): - # Initialise display - self.lcd_byte(0x33,LCD_CMD) # 110011 Initialise - self.lcd_byte(0x32,LCD_CMD) # 110010 Initialise - self.lcd_byte(0x06,LCD_CMD) # 000110 Cursor move direction - self.lcd_byte(0x0C,LCD_CMD) # 001100 Display On,Cursor Off, Blink Off - self.lcd_byte(0x28,LCD_CMD) # 101000 Data length, number of lines, font size - self.lcd_byte(0x01,LCD_CMD) # 000001 Clear display - time.sleep(E_DELAY) - - def lcd_byte(self, bits, mode): - # Send byte to data pins - # bits = data - # mode = True for character - # False for command - - GPIO.output(LCD_RS, mode) # RS - - # High bits - GPIO.output(LCD_D4, False) - GPIO.output(LCD_D5, False) - GPIO.output(LCD_D6, False) - GPIO.output(LCD_D7, False) - if bits&0x10==0x10: - GPIO.output(LCD_D4, True) - if bits&0x20==0x20: - GPIO.output(LCD_D5, True) - if bits&0x40==0x40: - GPIO.output(LCD_D6, True) - if bits&0x80==0x80: - GPIO.output(LCD_D7, True) - - # Toggle 'Enable' pin - self.lcd_toggle_enable() - - # Low bits - GPIO.output(LCD_D4, False) - GPIO.output(LCD_D5, False) - GPIO.output(LCD_D6, False) - GPIO.output(LCD_D7, False) - if bits&0x01==0x01: - GPIO.output(LCD_D4, True) - if bits&0x02==0x02: - GPIO.output(LCD_D5, True) - if bits&0x04==0x04: - GPIO.output(LCD_D6, True) - if bits&0x08==0x08: - GPIO.output(LCD_D7, True) - - # Toggle 'Enable' pin - self.lcd_toggle_enable() - - def lcd_toggle_enable(self): - # Toggle enable - time.sleep(E_DELAY) - GPIO.output(LCD_E, True) - time.sleep(E_PULSE) - GPIO.output(LCD_E, False) - time.sleep(E_DELAY) - - def lcd_string(self,message,line): - # Send string to display - - message = message.ljust(LCD_WIDTH," ") - - self.lcd_byte(line, LCD_CMD) - - for i in range(LCD_WIDTH): - self.lcd_byte(ord(message[i]),LCD_CHR) - \ No newline at end of file diff --git a/workers/pi/__init__.py b/workers/pi/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/workers/camera_worker.py b/workers/pi/camera_worker.py similarity index 85% rename from workers/camera_worker.py rename to workers/pi/camera_worker.py index 3a6341b..b0a6f43 100644 --- a/workers/camera_worker.py +++ b/workers/pi/camera_worker.py @@ -7,25 +7,19 @@ import os import RPi.GPIO as GPIO from picamera import PiCamera +from .worker import Worker sys.path.append('..') -import variables -#r = redis.Redis(host='127.0.0.1', port=6379) -GPIO.setmode(GPIO.BCM) - -class CameraWorker(): +class CameraWorker(Worker): def __init__(self, config, main_thread_running, system_ready, camera_available): - #self.config = {**config, **self.config} - self.config = config + super().__init__(config, main_thread_running, system_ready) self.pending_reset = False - #Events - self.main_thread_running = main_thread_running - self.system_ready = system_ready + # Events self.camera_available = camera_available - #Dynamic Properties based on config + # Dynamic Properties based on config self.path = self.config['path'].replace(" ", "-") if self.config['path'] is not None else '/etc/mudpi/img/' self.topic = self.config['topic'].replace(" ", "/").lower() if self.config['topic'] is not None else 'mudpi/camera/' if self.config['resolution'] is not None: @@ -56,7 +50,7 @@ def init(self): self.camera = PiCamera() #Pubsub Listeners - self.pubsub = variables.r.pubsub() + self.pubsub = self.r.pubsub() self.pubsub.subscribe(**{self.topic: self.handleEvent}) print('Camera Worker...\t\t\t\033[1;32m Ready\033[0;0m') @@ -67,7 +61,7 @@ def run(self): t.start() self.listener = threading.Thread(target=self.listen, args=()) self.listener.start() - print('Camera Worker...\t\t\t\033[1;32m Running\033[0;0m') + print('Camera Worker...\t\t\t\033[1;32m Online\033[0;0m') return t def wait(self): @@ -75,19 +69,11 @@ def wait(self): try: self.next_time = (datetime.datetime.now() + datetime.timedelta(hours=self.hours, minutes=self.minutes, seconds=self.seconds)).replace(microsecond=0) except: - #Default every hour + # Default every hour self.next_time = (datetime.datetime.now() + datetime.timedelta(hours=1)).replace(minute=0, second=0, microsecond=0) delay = (self.next_time - datetime.datetime.now()).seconds time.sleep(delay) - def elapsedTime(self): - self.time_elapsed = time.perf_counter() - self.time_start - return self.time_elapsed - - def resetElapsedTime(self): - self.time_start = time.perf_counter() - pass - def handleEvent(self, message): data = message['data'] decoded_message = None @@ -135,8 +121,8 @@ def work(self): print("Error During Camera Reset Cleanup") break; message = {'event':'StateChanged', 'data':filename} - variables.r.set('last_camera_image', filename) - variables.r.publish(self.topic, json.dumps(message)) + self.r.set('last_camera_image', filename) + self.r.publish(self.topic, json.dumps(message)) print('Image Captured \033[1;36m%s\033[0;0m' % filename) self.wait() # except: diff --git a/workers/pi_control_worker.py b/workers/pi/control_worker.py similarity index 68% rename from workers/pi_control_worker.py rename to workers/pi/control_worker.py index 6bf0663..d725eed 100644 --- a/workers/pi_control_worker.py +++ b/workers/pi/control_worker.py @@ -3,40 +3,23 @@ import json import redis import threading +from .worker import Worker import sys sys.path.append('..') from controls.pi.button_control import (ButtonControl) from controls.pi.switch_control import (SwitchControl) -import variables - -#r = redis.Redis(host='127.0.0.1', port=6379) -# def clamp(n, smallest, largest): return max(smallest, min(n, largest)) - -class PiControlWorker(): +class PiControlWorker(Worker): def __init__(self, config, main_thread_running, system_ready): - #self.config = {**config, **self.config} - self.config = config - self.channel = config.get('channel', 'controls').replace(" ", "_").lower() + super().__init__(config, main_thread_running, system_ready) + self.topic = config.get('topic', 'controls').replace(" ", "_").lower() self.sleep_duration = config.get('sleep_duration', 0.5) - self.main_thread_running = main_thread_running - self.system_ready = system_ready - #Store pump event so we can shutdown pump with float readings + self.controls = [] - self.init_controls() + self.init() return - def dynamic_import(self, name): - #Split path of the class folder structure: {sensor name}_sensor . {SensorName}Sensor - components = name.split('.') - #Dynamically import root of component path - module = __import__(components[0]) - #Get component attributes - for component in components[1:]: - module = getattr(module, component) - return module - - def init_controls(self): + def init(self): for control in self.config['controls']: if control.get('type', None) is not None: #Get the control from the controls folder {control name}_control.{ControlName}Control @@ -67,13 +50,10 @@ def init_controls(self): return def run(self): - t = threading.Thread(target=self.work, args=()) - t.start() - print('Pi Control Worker [' + str(len(self.config['controls'])) + ' Controls]...\t\033[1;32m Running\033[0;0m') - return t + print('Pi Control Worker [' + str(len(self.config['controls'])) + ' Controls]...\t\033[1;32m Online\033[0;0m') + return super().run() def work(self): - while self.main_thread_running.is_set(): if self.system_ready.is_set(): readings = {} diff --git a/workers/pi/i2c_worker.py b/workers/pi/i2c_worker.py new file mode 100644 index 0000000..46b4c1d --- /dev/null +++ b/workers/pi/i2c_worker.py @@ -0,0 +1,75 @@ +import time +import datetime +import json +import redis +import threading +import sys +sys.path.append('..') +from .worker import Worker +from sensors.pi.i2c.bme680_sensor import (Bme680Sensor) + + +class PiI2CWorker(Worker): + def __init__(self, config, main_thread_running, system_ready): + super().__init__(config, main_thread_running, system_ready) + self.topic = config.get('topic', 'i2c').replace(" ", "_").lower() + self.sleep_duration = config.get('sleep_duration', 30) + + self.sensors = [] + self.init() + return + + def init(self): + for sensor in self.config['sensors']: + if sensor.get('type', None) is not None: + #Get the sensor from the sensors folder {sensor name}_sensor.{SensorName}Sensor + sensor_type = 'sensors.pi.i2c.' + sensor.get('type').lower() + '_sensor.' + sensor.get('type').capitalize() + 'Sensor' + + imported_sensor = self.dynamic_import(sensor_type) + + # Define default kwargs for all sensor types, conditionally include optional variables below if they exist + sensor_kwargs = { + 'name' : sensor.get('name', sensor.get('type')), + 'address' : int(sensor.get('address', 00)), + 'key' : sensor.get('key', None) + } + + # Optional sensor variables + # Model is specific to DHT modules to specify DHT11 DHT22 or DHT2302 + if sensor.get('model'): + sensor_kwargs['model'] = str(sensor.get('model')) + + new_sensor = imported_sensor(**sensor_kwargs) + new_sensor.init_sensor() + + #Set the sensor type and determine if the readings are critical to operations + new_sensor.type = sensor.get('type').lower() + + self.sensors.append(new_sensor) + # print('{type} Sensor (Pi) {address}...\t\t\033[1;32m Ready\033[0;0m'.format(**sensor)) + return + + def run(self): + print('Pi I2C Sensor Worker [' + str(len(self.sensors)) + ' Sensors]...\t\033[1;32m Online\033[0;0m') + return super().run() + + def work(self): + while self.main_thread_running.is_set(): + if self.system_ready.is_set(): + + message = {'event':'PiSensorUpdate'} + readings = {} + + for sensor in self.sensors: + result = sensor.read() + readings[sensor.key] = result + self.r.set(sensor.key, json.dumps(result)) + + message['data'] = readings + print(readings); + self.r.publish(self.topic, json.dumps(message)) + time.sleep(self.sleep_duration) + + time.sleep(2) + #This is only ran after the main thread is shut down + print("Pi I2C Sensor Worker Shutting Down...\t\033[1;32m Complete\033[0;0m") \ No newline at end of file diff --git a/workers/pi/lcd_worker.py b/workers/pi/lcd_worker.py new file mode 100644 index 0000000..e7923c6 --- /dev/null +++ b/workers/pi/lcd_worker.py @@ -0,0 +1,159 @@ +import time +import datetime +import json +import redis +import threading +import board +import busio +import adafruit_character_lcd.character_lcd_rgb_i2c as character_rgb_lcd +import adafruit_character_lcd.character_lcd_i2c as character_lcd +from .worker import Worker +import sys +sys.path.append('..') + +class LcdWorker(Worker): + def __init__(self, config, main_thread_running, system_ready, lcd_available): + super().__init__(config, main_thread_running, system_ready) + try: + self.address = int(self.config['address']) if self.config['address'] is not None else None + except KeyError: + self.address = None + try: + self.model = str(self.config['model']) if self.config['model'] is not None else '' + except KeyError: + self.model = '' + try: + self.columns = int(self.config['columns']) if self.config['columns'] is not None else 16 + except KeyError: + self.columns = 16 + try: + self.rows = int(self.config['rows']) if self.config['rows'] is not None else 2 + except KeyError: + self.rows = 2 + try: + self.address = int(self.config['address']) if self.config['address'] is not None else None + except KeyError: + self.address = None + try: + self.default_duration = int(self.config['default_duration']) if self.config['default_duration'] is not None else 5 + except KeyError: + self.default_duration = 5 + + self.current_message = "" + self.cached_message = {'message':''} + self.need_new_message = True + self.message_queue = [] + + # Events + self.lcd_available = lcd_available + + # Dynamic Properties based on config + try: + self.topic = self.config['topic'].replace(" ", "/").lower() if self.config['topic'] is not None else 'mudpi/lcd' + except KeyError: + self.topic = 'mudpi/lcd' + + # Pubsub Listeners + self.pubsub = self.r.pubsub() + self.pubsub.subscribe(**{self.topic: self.handleMessage}) + + self.init() + return + + def init(self): + print('LCD Display Worker...\t\t\t\033[1;32m Initializing\033[0;0m'.format(**self.config)) + # prepare sensor on specified pin + self.i2c = busio.I2C(board.SCL, board.SDA) + if(self.model): + if (self.model.lower() == 'rgb'): + self.lcd = character_lcd.Character_LCD_RGB_I2C(self.i2c, self.columns, self.rows, self.address) + elif (self.model.lower() == 'pcf'): + self.lcd = character_lcd.Character_LCD_I2C(self.i2c, self.columns, self.rows, address=self.address, usingPCF=True) + else: + self.lcd = character_lcd.Character_LCD_I2C(self.i2c, self.columns, self.rows, self.address) + else: + self.lcd = character_lcd.Character_LCD_I2C(self.i2c, self.columns, self.rows, self.address) + self.lcd.clear() + self.lcd.message = "MudPi\nGarden Online" + return + + def run(self): + print('LCD Display Worker ...\t\t\t\033[1;32m Online\033[0;0m'.format(**self.config)) + return super().run() + + def handleMessage(self, message): + data = message['data'] + if data is not None: + decoded_message = self.decodeMessageData(data) + try: + if decoded_message['event'] == 'Message': + if decoded_message.get('data', None): + self.addMessageToQueue(decoded_message['data'].get('message', ''), int(decoded_message['data'].get('duration', self.default_duration))) + print('LCD Message Queued: \033[1;36m{0}\033[0;0m'.format(decoded_message['data'])) + elif decoded_message['event'] == 'Clear': + self.lcd.clear() + print('Cleared the LCD Screen') + elif decoded_message['event'] == 'ClearQueue': + self.message_queue = [] + print('Cleared the LCD Message Queue') + except: + print('Error Decoding Message for LCD') + + def addMessageToQueue(self, message, duration = 3): + #Add message to queue if LCD available + if self.lcd_available.is_set(): + + new_message = { + "message": message, + "duration": duration + } + self.message_queue.append(new_message) + + msg = { 'event':'MessageQueued', 'data': new_message } + self.r.publish(self.topic, json.dumps(msg)) + return + + def nextMessageFromQueue(self): + if len(self.message_queue) > 0: + self.need_new_message = False + self.resetElapsedTime() + return self.message_queue.pop(0) + else: + return None + + def work(self): + self.resetElapsedTime() + self.lcd.clear() + self.need_new_message = True + while self.main_thread_running.is_set(): + if self.system_ready.is_set(): + try: + self.pubsub.get_message() + if self.lcd_available.is_set(): + if self.cached_message and not self.need_new_message: + if self.current_message != self.cached_message['message']: + self.lcd.clear() + self.lcd.message = self.cached_message['message'] + self.current_message = self.cached_message['message'] # store message to only display once and prevent flickers + if self.elapsedTime() > self.cached_message['duration'] + 1: + self.need_new_message = True + else: + if self.need_new_message: + # Get first time message after clear + self.cached_message = self.nextMessageFromQueue() + else: + time.sleep(1) + except Exception as e: + print("LCD Worker \t\033[1;31m Unexpected Error\033[0;0m".format(**self.config)) + print(e) + else: + #System not ready + time.sleep(1) + self.resetElapsedTime() + + time.sleep(0.1) + + #This is only ran after the main thread is shut down + #Close the pubsub connection + self.pubsub.close() + print("LCD Worker Shutting Down...\t\t\033[1;32m Complete\033[0;0m".format(**self.config)) \ No newline at end of file diff --git a/workers/relay_worker.py b/workers/pi/relay_worker.py similarity index 72% rename from workers/relay_worker.py rename to workers/pi/relay_worker.py index c70f28f..1a4227a 100644 --- a/workers/relay_worker.py +++ b/workers/pi/relay_worker.py @@ -5,35 +5,26 @@ import threading import sys import RPi.GPIO as GPIO +from .worker import Worker sys.path.append('..') -import variables - -#r = redis.Redis(host='127.0.0.1', port=6379) -GPIO.setmode(GPIO.BCM) - -# ToDO Update relay to make a key if one is not set in config - -class RelayWorker(): +class RelayWorker(Worker): def __init__(self, config, main_thread_running, system_ready, relay_available, relay_active): - #self.config = {**config, **self.config} - self.config = config + super().__init__(config, main_thread_running, system_ready) self.config['pin'] = int(self.config['pin']) #parse possbile strings to avoid errors - #Events - self.main_thread_running = main_thread_running - self.system_ready = system_ready + # Events self.relay_available = relay_available self.relay_active = relay_active - #Dynamic Properties based on config + # Dynamic Properties based on config self.active = False self.topic = self.config['topic'].replace(" ", "/").lower() if self.config['topic'] is not None else 'mudpi/relay/' self.pin_state_off = GPIO.HIGH if self.config['normally_open'] is not None and self.config['normally_open'] else GPIO.LOW self.pin_state_on = GPIO.LOW if self.config['normally_open'] is not None and self.config['normally_open'] else GPIO.HIGH - #Pubsub Listeners - self.pubsub = variables.r.pubsub() + # Pubsub Listeners + self.pubsub = self.r.pubsub() self.pubsub.subscribe(**{self.topic: self.handleMessage}) self.init() @@ -47,7 +38,7 @@ def init(self): #Feature to restore relay state in case of crash or unexpected shutdown. This will check for last state stored in redis and set relay accordingly if(self.config.get('restore_last_known_state', None) is not None and self.config.get('restore_last_known_state', False) is True): - if(variables.r.get(self.config['key']+'_state')): + if(self.r.get(self.config['key']+'_state')): GPIO.output(self.config['pin'], self.pin_state_on) print('Restoring Relay \033[1;36m{0} On\033[0;0m'.format(self.config['key'])) @@ -56,26 +47,8 @@ def init(self): return def run(self): - t = threading.Thread(target=self.work, args=()) - t.start() - print('Relay Worker {key}...\t\t\t\033[1;32m Running\033[0;0m'.format(**self.config)) - return t - - def decodeMessageData(self, message): - if isinstance(message, dict): - #print('Dict Found') - return message - elif isinstance(message.decode('utf-8'), str): - try: - temp = json.loads(message.decode('utf-8')) - #print('Json Found') - return temp - except: - #print('Json Error. Str Found') - return {'event':'Unknown', 'data':message} - else: - #print('Failed to detect type') - return {'event':'Unknown', 'data':message} + print('Relay Worker {key}...\t\t\t\033[1;32m Online\033[0;0m'.format(**self.config)) + return super().run() def handleMessage(self, message): data = message['data'] @@ -97,14 +70,6 @@ def handleMessage(self, message): print('Toggle Relay \033[1;36m{0} {1} \033[0;0m'.format(self.config['key'], state)) except: print('Error Decoding Message for Relay {0}'.format(self.config['key'])) - - def elapsedTime(self): - self.time_elapsed = time.perf_counter() - self.time_start - return self.time_elapsed - - def resetElapsedTime(self): - self.time_start = time.perf_counter() - pass def turnOn(self): #Turn on relay if its available @@ -112,8 +77,8 @@ def turnOn(self): if not self.active: GPIO.output(self.config['pin'], self.pin_state_on) message = {'event':'StateChanged', 'data':1} - variables.r.set(self.config['key']+'_state', 1) - variables.r.publish(self.topic, json.dumps(message)) + self.r.set(self.config['key']+'_state', 1) + self.r.publish(self.topic, json.dumps(message)) self.active = True #self.relay_active.set() This is handled by the redis listener now self.resetElapsedTime() @@ -124,8 +89,8 @@ def turnOff(self): if self.active: GPIO.output(self.config['pin'], self.pin_state_off) message = {'event':'StateChanged', 'data':0} - variables.r.delete(self.config['key']+'_state') - variables.r.publish(self.topic, json.dumps(message)) + self.r.delete(self.config['key']+'_state') + self.r.publish(self.topic, json.dumps(message)) #self.relay_active.clear() This is handled by the redis listener now self.active = False self.resetElapsedTime() @@ -156,7 +121,6 @@ def work(self): time.sleep(0.1) - #This is only ran after the main thread is shut down #Close the pubsub connection self.pubsub.close() diff --git a/workers/pi_sensor_worker.py b/workers/pi/sensor_worker.py similarity index 64% rename from workers/pi_sensor_worker.py rename to workers/pi/sensor_worker.py index df2be6d..ea6c2a3 100644 --- a/workers/pi_sensor_worker.py +++ b/workers/pi/sensor_worker.py @@ -3,40 +3,23 @@ import json import redis import threading +from .worker import Worker import sys sys.path.append('..') from sensors.pi.float_sensor import (FloatSensor) from sensors.pi.humidity_sensor import (HumiditySensor) -import variables - -#r = redis.Redis(host='127.0.0.1', port=6379) -# def clamp(n, smallest, largest): return max(smallest, min(n, largest)) - -class PiSensorWorker(): +class PiSensorWorker(Worker): def __init__(self, config, main_thread_running, system_ready): - #self.config = {**config, **self.config} - self.config = config - self.channel = config.get('channel', 'sensors').replace(" ", "_").lower() + super().__init__(config, main_thread_running, system_ready) + self.topic = config.get('topic', 'sensors').replace(" ", "_").lower() self.sleep_duration = config.get('sleep_duration', 30) - self.main_thread_running = main_thread_running - self.system_ready = system_ready - #Store pump event so we can shutdown pump with float readings + self.sensors = [] - self.init_sensors() + self.init() return - def dynamic_import(self, name): - #Split path of the class folder structure: {sensor name}_sensor . {SensorName}Sensor - components = name.split('.') - #Dynamically import root of component path - module = __import__(components[0]) - #Get component attributes - for component in components[1:]: - module = getattr(module, component) - return module - - def init_sensors(self): + def init(self): for sensor in self.config['sensors']: if sensor.get('type', None) is not None: #Get the sensor from the sensors folder {sensor name}_sensor.{SensorName}Sensor @@ -47,14 +30,14 @@ def init_sensors(self): # Define default kwargs for all sensor types, conditionally include optional variables below if they exist sensor_kwargs = { 'name' : sensor.get('name', sensor.get('type')), - 'pin' : int(sensor.get('pin')), + 'pin' : int(sensor.get('pin', 0)), 'key' : sensor.get('key', None) } # optional sensor variables # Model is specific to DHT modules to specify DHT11 DHT22 or DHT2302 if sensor.get('model'): - sensor_kwargs['model'] = sensor.get('model') + sensor_kwargs['model'] = str(sensor.get('model')) new_sensor = imported_sensor(**sensor_kwargs) new_sensor.init_sensor() @@ -67,17 +50,14 @@ def init_sensors(self): new_sensor.critical = False self.sensors.append(new_sensor) - print('{type} Sensor (Pi) {pin}...\t\t\033[1;32m Ready\033[0;0m'.format(**sensor)) + # print('{type} Sensor (Pi) {pin}...\t\t\033[1;32m Ready\033[0;0m'.format(**sensor)) return def run(self): - t = threading.Thread(target=self.work, args=()) - t.start() - print('Pi Sensor Worker [' + str(len(self.sensors)) + ' Sensors]...\t\t\033[1;32m Running\033[0;0m') - return t + print('Pi Sensor Worker [' + str(len(self.sensors)) + ' Sensors]...\t\t\033[1;32m Online\033[0;0m') + return super().run() def work(self): - while self.main_thread_running.is_set(): if self.system_ready.is_set(): message = {'event':'PiSensorUpdate'} @@ -85,7 +65,7 @@ def work(self): for sensor in self.sensors: result = sensor.read() readings[sensor.key] = result - variables.r.set(sensor.key, json.dumps(result)) + self.r.set(sensor.key, json.dumps(result)) #print(sensor.name, result) #Check for a critical water level from any float sensors @@ -97,11 +77,10 @@ def work(self): else: pass #self.pump_ready.clear() - - - #print(readings) + + print(readings) message['data'] = readings - variables.r.publish(self.channel, json.dumps(message)) + self.r.publish(self.topic, json.dumps(message)) time.sleep(self.sleep_duration) time.sleep(2) diff --git a/workers/pi/worker.py b/workers/pi/worker.py new file mode 100644 index 0000000..aaaf517 --- /dev/null +++ b/workers/pi/worker.py @@ -0,0 +1,85 @@ +import time +import datetime +import json +import redis +import threading +import sys +sys.path.append('..') + +import variables + +def log(func): + def wrapper(*args, **kwargs): + print("MudPi Debug Log: " + " ".join([str(arg) for arg in args]) + " at " + str(datetime.datetime.now())) + value = func(*args, **kwargs) + return value + +# Base Worker Class +# A worker is responsible for handling its set of operations and running on a thread +class Worker(): + def __init__(self, config, main_thread_running, system_ready): + self.config = config + try: + self.r = config["redis"] + except KeyError: + self.r = redis.Redis(host='127.0.0.1', port=6379) + self.topic = config.get('topic', 'mudpi').replace(" ", "_").lower() + self.sleep_duration = config.get('sleep_duration', 15) + + # Threading Events to Keep Everything in Sync + self.main_thread_running = main_thread_running + self.system_ready = system_ready + self.worker_available = threading.Event() + + self.components = [] + return + + def init(self): + # print('Worker...\t\t\t\033[1;32m Initializing\033[0;0m'.format(**control)) + return + + def run(self): + t = threading.Thread(target=self.work, args=()) + t.start() + return t + + def work(self): + while self.main_thread_running.is_set(): + if self.system_ready.is_set(): + time.sleep(self.sleep_duration) + #This is only ran after the main thread is shut down + print("Worker Shutting Down...\t\033[1;32m Complete\033[0;0m") + + def elapsedTime(self): + self.time_elapsed = time.perf_counter() - self.time_start + return self.time_elapsed + + def resetElapsedTime(self): + self.time_start = time.perf_counter() + pass + + def dynamic_import(self, name): + # Split path of the class folder structure: {sensor name}_sensor . {SensorName}Sensor + components = name.split('.') + # Dynamically import root of component path + module = __import__(components[0]) + # Get component attributes + for component in components[1:]: + module = getattr(module, component) + return module + + def decodeMessageData(self, message): + if isinstance(message, dict): + #print('Dict Found') + return message + elif isinstance(message.decode('utf-8'), str): + try: + temp = json.loads(message.decode('utf-8')) + #print('Json Found') + return temp + except: + #print('Json Error. Str Found') + return {'event':'Unknown', 'data':message} + else: + #print('Failed to detect type') + return {'event':'Unknown', 'data':message} \ No newline at end of file diff --git a/workers/pump_worker.py b/workers/pump_worker.py deleted file mode 100644 index d4bcaa0..0000000 --- a/workers/pump_worker.py +++ /dev/null @@ -1,109 +0,0 @@ -import time -import datetime -import json -import redis -import threading -import sys -import RPi.GPIO as GPIO -sys.path.append('..') - -import variables - -#r = redis.Redis(host='127.0.0.1', port=6379) -GPIO.setmode(GPIO.BCM) - -class PumpWorker(): - def __init__(self, config, main_thread_running, system_ready, pump_ready, pump_should_be_running): - #self.config = {**config, **self.config} - self.config = config - self.config['pin'] = int(self.config['pin']) #parse possbile strings to avoid errors - self.main_thread_running = main_thread_running - self.system_ready = system_ready - self.pump_ready = pump_ready - self.pump_should_be_running = pump_should_be_running - self.pump_running = False - self.needs_first_water_cycle = True - self.init() - return - - def init(self): - GPIO.setup(self.config['pin'], GPIO.OUT) - #Close the relay by default - GPIO.output(self.config['pin'], GPIO.HIGH) - print('Pump Worker...\t\t\t\t\033[1;32m Ready\033[0;0m') - return - - def run(self): - t = threading.Thread(target=self.work, args=()) - t.start() - print('Pump Worker...\t\t\t\t\033[1;32m Running\033[0;0m') - return t - - def elapsedTime(self): - self.time_elapsed = time.perf_counter() - self.time_start - return self.time_elapsed - - def resetElapsedTime(self): - self.time_start = time.perf_counter() - pass - - def checkFirstWaterCycle(self): - if self.needs_first_water_cycle: - self.turnPumpOn() - self.needs_first_water_cycle = False - else: - if self.pump_ready.is_set(): - if self.pump_should_be_running.is_set() and not self.pump_running: - self.needs_first_water_cycle = True - else: - self.turnPumpOff() - - def turnPumpOn(self): - #Turn off voltage to flip on relay - if not self.pump_running: - message = {'event':'PumpTurnedOn', 'data':1} - GPIO.output(self.config['pin'], GPIO.LOW) - variables.r.set('pump_running', True) - variables.r.publish('pump', json.dumps(message)) - variables.r.set('last_watered_at', datetime.datetime.now()) #Store current time to track watering times - self.pump_running = True - self.resetElapsedTime() - print('Pump Turning On!') - - def turnPumpOff(self): - #Turn off voltage to flip on relay - if self.pump_running: - message = {'event':'PumpTurnedOff', 'data':1} - GPIO.output(self.config['pin'], GPIO.HIGH) - self.pump_should_be_running.clear() - variables.r.delete('pump_running', False) - variables.r.publish('pump', json.dumps(message)) - self.pump_running = False - print('Pump Turning Off!') - - def work(self): - self.resetElapsedTime() - while self.main_thread_running.is_set(): - if self.system_ready.is_set(): - while self.pump_should_be_running.is_set() and self.pump_ready.is_set(): - - self.checkFirstWaterCycle() - - #Calculate elapsed time and check against system limit - if (self.elapsedTime() >= self.config['max_duration']): - self.turnPumpOff() - - time.sleep(1) - #Waiting for next pump cycle - self.turnPumpOff() - time.sleep(5) - self.resetElapsedTime() - else: - #System not ready pump should be off - self.turnPumpOff() - time.sleep(5) - self.resetElapsedTime() - - time.sleep(5) - #This is only ran after the main thread is shut down - print("Pump Worker Shutting Down...\t\t\033[1;32m Complete\033[0;0m") \ No newline at end of file diff --git a/workers/sensor_worker.py b/workers/sensor_worker.py deleted file mode 100644 index 0580479..0000000 --- a/workers/sensor_worker.py +++ /dev/null @@ -1,127 +0,0 @@ -import time -import json -import threading -from nanpy import (SerialManager) -from nanpy.serialmanager import SerialManagerError -from nanpy.sockconnection import (SocketManager, SocketManagerError) -import sys -sys.path.append('..') - -import variables -import importlib - -#r = redis.Redis(host='127.0.0.1', port=6379) - -class SensorWorker(): - def __init__(self, config, main_thread_running, system_ready): - #self.config = {**config, **self.config} - self.config = config - self.main_thread_running = main_thread_running - self.system_ready = system_ready - self.node_ready = False - - attempts = 3 - if self.config.get('use_wifi', False): - while attempts > 0: - try: - attempts-= 1 - self.connection = SocketManager(host=str(self.config.get('address', 'mudpi.local'))) - self.sensors = [] - self.init_sensors() - except SocketManagerError: - print('[{name}] \033[1;33m Node Timeout\033[0;0m ['.format(**self.config), attempts, ' tries left]...') - time.sleep(15) - print('Retrying Connection...') - else: - print('[{name}] Wifi Connection \t\033[1;32m Success\033[0;0m'.format(**self.config)) - self.node_ready = True - break - else: - while attempts > 0: - try: - attempts-= 1 - self.connection = SerialManager(device=str(self.config.get('address', '/dev/ttyUSB1'))) - self.sensors = [] - self.init_sensors() - except SerialManagerError: - print('[{name}] \033[1;33m Node Timeout\033[0;0m ['.format(**self.config), attempts, ' tries left]...') - time.sleep(15) - print('Retrying Connection...') - else: - print('[{name}] Serial Connection \t\033[1;32m Success\033[0;0m'.format(**self.config)) - self.node_ready = True - break - - return - - def dynamic_sensor_import(self, path): - components = path.split('.') - - s = '' - for component in components[:-1]: - s += component + '.' - - parent = importlib.import_module(s[:-1]) - sensor = getattr(parent, components[-1]) - - return sensor - - def init_sensors(self): - for sensor in self.config['sensors']: - if sensor.get('type', None) is not None: - #Get the sensor from the sensors folder {sensor name}_sensor.{SensorName}Sensor - sensor_type = 'sensors.arduino.' + sensor.get('type').lower() + '_sensor.' + sensor.get('type').capitalize() + 'Sensor' - - #analog_pin_mode = False if sensor.get('is_digital', False) else True - - imported_sensor = self.dynamic_sensor_import(sensor_type) - #new_sensor = imported_sensor(sensor.get('pin'), name=sensor.get('name', sensor.get('type')), connection=self.connection, key=sensor.get('key', None)) - - # Define default kwargs for all sensor types, conditionally include optional variables below if they exist - sensor_kwargs = { - 'name' : sensor.get('name', sensor.get('type')), - 'pin' : int(sensor.get('pin')), - 'connection': self.connection, - 'key' : sensor.get('key', None) - } - - # optional sensor variables - # Model is specific to DHT modules to specify DHT11(11) DHT22(22) or DHT2301(21) - if sensor.get('model'): - sensor_kwargs['model'] = sensor.get('model') - - new_sensor = imported_sensor(**sensor_kwargs) - - new_sensor.init_sensor() - self.sensors.append(new_sensor) - print('{type} Sensor {pin}...\t\t\t\033[1;32m Ready\033[0;0m'.format(**sensor)) - return - - def run(self): - if self.node_ready: - t = threading.Thread(target=self.work, args=()) - t.start() - print(str(self.config['name']) +' Node Worker [' + str(len(self.config['sensors'])) + ' Sensors]...\t\033[1;32m Running\033[0;0m') - return t - else: - print("Node Connection...\t\t\t\033[1;31m Failed\033[0;0m") - return None - - def work(self): - - while self.main_thread_running.is_set(): - if self.system_ready.is_set() and self.node_ready: - message = {'event':'SensorUpdate'} - readings = {} - for sensor in self.sensors: - result = sensor.read() - readings[sensor.key] = result - #r.set(sensor.get('key', sensor.get('type')), value) - - print(readings) - message['data'] = readings - variables.r.publish('sensors', json.dumps(message)) - - time.sleep(15) - #This is only ran after the main thread is shut down - print("{name} Node Worker Shutting Down...\t\t\033[1;32m Complete\033[0;0m".format(**self.config)) \ No newline at end of file diff --git a/workers/trigger_worker.py b/workers/trigger_worker.py index a24cf7e..de355d2 100644 --- a/workers/trigger_worker.py +++ b/workers/trigger_worker.py @@ -3,6 +3,7 @@ import json import redis import threading +from .worker import Worker import sys sys.path.append('..') from triggers.trigger_group import TriggerGroup @@ -10,32 +11,17 @@ import variables import importlib -class TriggerWorker(): +class TriggerWorker(Worker): def __init__(self, config, main_thread_running, system_ready, actions): - #self.config = {**config, **self.config} - self.config = config - self.main_thread_running = main_thread_running - self.system_ready = system_ready + super().__init__(config, main_thread_running, system_ready) self.actions = actions self.triggers = [] self.trigger_threads = [] self.trigger_events = {} - self.init_triggers() + self.init() return - def dynamic_import(self, path): - components = path.split('.') - - s = '' - for component in components[:-1]: - s += component + '.' - - parent = importlib.import_module(s[:-1]) - sensor = getattr(parent, components[-1]) - - return sensor - - def init_triggers(self): + def init(self): trigger_index = 0 for trigger in self.config: if trigger.get("triggers", False): @@ -107,8 +93,8 @@ def init_trigger(self, config, trigger_index, group=None): if config.get('nested_source'): trigger_kwargs['nested_source'] = config.get('nested_source') - if config.get('channel'): - trigger_kwargs['channel'] = config.get('channel') + if config.get('topic'): + trigger_kwargs['topic'] = config.get('topic') if config.get('thresholds'): trigger_kwargs['thresholds'] = config.get('thresholds') @@ -124,23 +110,20 @@ def init_trigger(self, config, trigger_index, group=None): return new_trigger def run(self): - t = threading.Thread(target=self.work, args=()) - t.start() - print('Trigger Worker [' + str(len(self.config)) + ' Triggers]...\t\t\033[1;32m Running\033[0;0m') - return t + print('Trigger Worker [' + str(len(self.config)) + ' Triggers]...\t\t\033[1;32m Online\033[0;0m') + return super().run() def work(self): - while self.main_thread_running.is_set(): if self.system_ready.is_set(): - #Main Loop + # Main Loop time.sleep(1) time.sleep(2) - #This is only ran after the main thread is shut down + # This is only ran after the main thread is shut down for trigger in self.triggers: trigger.shutdown() - #Join all our sub threads for shutdown + # Join all our sub threads for shutdown for thread in self.trigger_threads: thread.join() print("Trigger Worker Shutting Down...\t\t\033[1;32m Complete\033[0;0m") \ No newline at end of file diff --git a/workers/worker.py b/workers/worker.py new file mode 100644 index 0000000..90a3ae9 --- /dev/null +++ b/workers/worker.py @@ -0,0 +1,78 @@ +import time +import datetime +import json +import redis +import threading +import sys +sys.path.append('..') + +import variables + +def log(func): + def wrapper(*args, **kwargs): + print("MudPi Debug Log: " + " ".join([str(arg) for arg in args]) + " at " + str(datetime.datetime.now())) + value = func(*args, **kwargs) + return value + +# Base Worker Class +# A worker is responsible for handling its set of operations and running on a thread +class Worker(): + def __init__(self, config, main_thread_running, system_ready): + self.config = config + # Threading Events to Keep Everything in Sync + self.main_thread_running = main_thread_running + self.system_ready = system_ready + self.worker_available = threading.Event() + + self.components = [] + return + + def init(self): + # print('Worker...\t\t\t\033[1;32m Initializing\033[0;0m'.format(**control)) + return + + def run(self): + t = threading.Thread(target=self.work, args=()) + t.start() + return t + + def work(self): + while self.main_thread_running.is_set(): + if self.system_ready.is_set(): + time.sleep(self.sleep_duration) + #This is only ran after the main thread is shut down + print("Worker Shutting Down...\t\033[1;32m Complete\033[0;0m") + + def elapsedTime(self): + self.time_elapsed = time.perf_counter() - self.time_start + return self.time_elapsed + + def resetElapsedTime(self): + self.time_start = time.perf_counter() + pass + + def dynamic_import(self, name): + # Split path of the class folder structure: {sensor name}_sensor . {SensorName}Sensor + components = name.split('.') + # Dynamically import root of component path + module = __import__(components[0]) + # Get component attributes + for component in components[1:]: + module = getattr(module, component) + return module + + def decodeMessageData(self, message): + if isinstance(message, dict): + #print('Dict Found') + return message + elif isinstance(message.decode('utf-8'), str): + try: + temp = json.loads(message.decode('utf-8')) + #print('Json Found') + return temp + except: + #print('Json Error. Str Found') + return {'event':'Unknown', 'data':message} + else: + #print('Failed to detect type') + return {'event':'Unknown', 'data':message} \ No newline at end of file