11from dataclasses import dataclass
2+ from time import sleep
23
34import serial
45
89s8_abc_status = b"\xfe \x03 \x00 \x1f \x00 \x01 \xa1 \xc3 "
910s8_abc_enable = b"\xfe \x06 \x00 \x1f \x00 \xb4 \xac \x74 "
1011
12+ s8_clear_hr1 = b"\xfe \x06 \x00 \x00 \x00 \x00 \x9D \xC5 "
13+ s8_read_hr1 = b"\xfe \x03 \x00 \x00 \x00 \x01 \x90 \x05 "
14+ s8_start_calibration = b"\xFE \x06 \x00 \x01 \x7c \x06 \x6C \xC7 "
15+
1116
1217@dataclass
1318class ModbusResponse :
@@ -25,14 +30,8 @@ def __init__(self, response: bytes):
2530 self .data = response [3 :- 2 ]
2631 self .crc = response [- 2 :]
2732 self .message_without_crc = response [:- 2 ]
28-
29- print (f"Address: { self .address } " )
30- print (f"Function: { self .function } " )
31- print (f"Byte Count: { self .byte_count } " )
32- print (f"Data: { self .data } " )
33- print (f"CRC: { self .crc } " )
34- print (f"Generated CRC: { self .generate_crc ()} " )
35- print (f"Data as int: { self .data_as_int ()} " )
33+ if self .crc != self .generate_crc ():
34+ raise ValueError ("CRC Mismatch" )
3635
3736
3837 def data_as_int (self ):
@@ -56,23 +55,30 @@ def __init__(self):
5655 super ().__init__ ()
5756 self .serial = serial .Serial ("/dev/ttyUSB0" , 9600 , 8 , "N" , 1 , timeout = 1 )
5857
59- abc_status = self .send_command (s8_abc_enable , 8 )
60- print (f"ABC Status: { abc_status .data_as_int ()} " )
58+ self .send_command (s8_abc_enable )
6159
6260 def read_response (self , size ) -> ModbusResponse :
6361 output = None
6462 while not output :
6563 output = self .serial .read (size )
6664 return ModbusResponse (output )
6765
68- def send_command (self , command : bytes , expected_response_size :int ) -> ModbusResponse :
66+ def force_calibration (self ):
67+ self .send_command (s8_clear_hr1 )
68+ calibration_ack = self .send_command (s8_start_calibration )
69+ print ({"starting calibration" : calibration_ack })
70+ sleep (4 )
71+ calibration_result = self .send_command (s8_read_hr1 )
72+ print ({"calibration finished" : calibration_result })
73+
74+
75+ def send_command (self , command : bytes , expected_response_size :int = 100 ) -> ModbusResponse :
6976 self .serial .flush ()
7077 self .serial .write (command )
7178 return self .read_response (expected_response_size )
7279
7380 def read_data (self ):
74- response = self .send_command (s8_co2 , 7 )
75- print ({"carbon_dioxide" : response .data_as_int ()})
81+ response = self .send_command (s8_co2 ,7 )
7682 return {"carbon_dioxide" : response .data_as_int ()}
7783
7884 def home_assistant_auto_discovery (self ) -> [str , dict ]:
0 commit comments