Skip to content

Commit 868681e

Browse files
Refactor: Extract MQTT event publishing to dedicated EventPublisher class
- Create new EventPublisher class to handle all MQTT event publishing logic - Move C-Bus to MQTT conversion logic out of CgateWebBridge - Handle lighting vs PIR sensor differences in dedicated class - Properly calculate level percentages for ON/OFF vs ramp events - Add comprehensive tests for EventPublisher with various event types - Update CgateWebBridge tests to work with new EventPublisher architecture - All 396 tests passing Co-authored-by: Amp <[email protected]> Amp-Thread-ID: https://ampcode.com/threads/T-d50ce17d-0ad8-4b76-ae8c-a8cd5d929fa0
1 parent 96efdc0 commit 868681e

File tree

4 files changed

+412
-78
lines changed

4 files changed

+412
-78
lines changed

src/cgateWebBridge.js

Lines changed: 12 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ const CBusEvent = require('./cbusEvent');
88
const CBusCommand = require('./cbusCommand');
99
const MqttCommandRouter = require('./mqttCommandRouter');
1010
const ConnectionManager = require('./connectionManager');
11+
const EventPublisher = require('./eventPublisher');
1112
const { createLogger } = require('./logger');
1213
const { createValidator } = require('./settingsValidator');
1314
const { LineProcessor } = require('./lineProcessor');
@@ -150,6 +151,14 @@ class CgateWebBridge {
150151
// MQTT options
151152
this._mqttOptions = this.settings.retainreads ? { retain: true, qos: 0 } : { qos: 0 };
152153

154+
// Event publisher for MQTT messages
155+
this.eventPublisher = new EventPublisher({
156+
settings: this.settings,
157+
mqttPublishQueue: this.mqttPublishQueue,
158+
mqttOptions: this._mqttOptions,
159+
logger: this.logger
160+
});
161+
153162
// Validate settings and exit if invalid
154163
if (!this.settingsValidator.validate(this.settings)) {
155164
process.exit(1);
@@ -344,7 +353,7 @@ class CgateWebBridge {
344353
_processCommandObjectStatus(statusData) {
345354
const event = new CBusEvent(`${CGATE_RESPONSE_OBJECT_STATUS} ${statusData}`);
346355
if (event.isValid()) {
347-
this._publishEvent(event, '(Cmd)');
356+
this.eventPublisher.publishEvent(event, '(Cmd)');
348357
this._emitLevelFromEvent(event);
349358
} else {
350359
this.warn(`${WARN_PREFIX} Could not parse object status: ${statusData}`);
@@ -385,7 +394,7 @@ class CgateWebBridge {
385394
try {
386395
const event = new CBusEvent(line);
387396
if (event.isValid()) {
388-
this._publishEvent(event, '(Evt)');
397+
this.eventPublisher.publishEvent(event, '(Evt)');
389398
this._emitLevelFromEvent(event);
390399
} else {
391400
this.warn(`${WARN_PREFIX} Could not parse event line: ${line}`);
@@ -421,54 +430,7 @@ class CgateWebBridge {
421430
}
422431
}
423432

424-
/**
425-
* Publishes C-Bus events to MQTT topics for Home Assistant and other consumers.
426-
*
427-
* Converts C-Bus events into MQTT messages:
428-
* - C-Bus "lighting on 254/56/4" → MQTT "cbus/read/254/56/4/state" with "ON"
429-
* - C-Bus "lighting ramp 254/56/4 128" → MQTT "cbus/read/254/56/4/level" with "50"
430-
*
431-
* Special handling for PIR sensors (motion detectors) that only publish state.
432-
*
433-
* @param {CBusEvent} event - Parsed C-Bus event to publish
434-
* @param {string} [source=''] - Source identifier for logging (e.g., '(Evt)', '(Cmd)')
435-
* @private
436-
*/
437-
_publishEvent(event, source = '') {
438-
if (!event || !event.isValid()) {
439-
return;
440-
}
441-
442-
const topicBase = `${MQTT_TOPIC_PREFIX_READ}/${event.getNetwork()}/${event.getApplication()}/${event.getGroup()}`;
443-
// Convert C-Gate level (0-255) to percentage (0-100) for Home Assistant
444-
const levelPercent = Math.round((event.getLevel() || 0) / CGATE_LEVEL_MAX * 100);
445-
const isPirSensor = event.getApplication() === this.settings.ha_discovery_pir_app_id;
446-
447-
let state;
448-
if (isPirSensor) {
449-
// PIR sensors: state based on action (motion detected/cleared)
450-
state = (event.getAction() === CGATE_CMD_ON.toLowerCase()) ? MQTT_STATE_ON : MQTT_STATE_OFF;
451-
} else {
452-
// Lighting devices: state based on brightness level (any level > 0 = ON)
453-
state = (levelPercent > 0) ? MQTT_STATE_ON : MQTT_STATE_OFF;
454-
}
455-
456-
this.log(`${LOG_PREFIX} C-Bus Status ${source}: ${event.getNetwork()}/${event.getApplication()}/${event.getGroup()} ${state}` + (isPirSensor ? '' : ` (${levelPercent}%)`));
457-
458-
this.mqttPublishQueue.add({
459-
topic: `${topicBase}/${MQTT_TOPIC_SUFFIX_STATE}`,
460-
payload: state,
461-
options: this._mqttOptions
462-
});
463-
464-
if (!isPirSensor) {
465-
this.mqttPublishQueue.add({
466-
topic: `${topicBase}/${MQTT_TOPIC_SUFFIX_LEVEL}`,
467-
payload: levelPercent.toString(),
468-
options: this._mqttOptions
469-
});
470-
}
471-
}
433+
// Event publishing now delegated to EventPublisher
472434

473435
async _sendCgateCommand(command) {
474436
try {

src/eventPublisher.js

Lines changed: 109 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,109 @@
1+
const { createLogger } = require('./logger');
2+
const {
3+
LOG_PREFIX,
4+
MQTT_TOPIC_PREFIX_READ,
5+
MQTT_TOPIC_SUFFIX_STATE,
6+
MQTT_TOPIC_SUFFIX_LEVEL,
7+
MQTT_STATE_ON,
8+
MQTT_STATE_OFF,
9+
CGATE_CMD_ON,
10+
CGATE_LEVEL_MAX
11+
} = require('./constants');
12+
13+
/**
14+
* Handles publishing C-Bus events to MQTT topics.
15+
*
16+
* This class is responsible for:
17+
* - Converting C-Bus events to MQTT messages
18+
* - Handling special logic for PIR sensors vs lighting devices
19+
* - Managing MQTT topic construction and payload formatting
20+
* - Queueing messages for publishing
21+
*/
22+
class EventPublisher {
23+
/**
24+
* Creates a new EventPublisher instance.
25+
*
26+
* @param {Object} options - Configuration options
27+
* @param {Object} options.settings - Bridge settings containing PIR sensor config
28+
* @param {Object} options.mqttPublishQueue - Queue for MQTT publishing
29+
* @param {Object} options.mqttOptions - MQTT publishing options (retain, qos, etc.)
30+
* @param {Object} [options.logger] - Optional logger instance
31+
*/
32+
constructor(options) {
33+
this.settings = options.settings;
34+
this.mqttPublishQueue = options.mqttPublishQueue;
35+
this.mqttOptions = options.mqttOptions;
36+
37+
this.logger = options.logger || createLogger({
38+
component: 'event-publisher',
39+
level: this.settings.logging ? 'info' : 'warn',
40+
enabled: true
41+
});
42+
}
43+
44+
/**
45+
* Publishes a C-Bus event to MQTT topics for Home Assistant and other consumers.
46+
*
47+
* Converts C-Bus events into MQTT messages:
48+
* - C-Bus "lighting on 254/56/4" → MQTT "cbus/read/254/56/4/state" with "ON"
49+
* - C-Bus "lighting ramp 254/56/4 128" → MQTT "cbus/read/254/56/4/level" with "50"
50+
*
51+
* Special handling for PIR sensors (motion detectors) that only publish state.
52+
*
53+
* @param {CBusEvent} event - Parsed C-Bus event to publish
54+
* @param {string} [source=''] - Source identifier for logging (e.g., '(Evt)', '(Cmd)')
55+
*/
56+
publishEvent(event, source = '') {
57+
if (!event || !event.isValid()) {
58+
return;
59+
}
60+
61+
const topicBase = `${MQTT_TOPIC_PREFIX_READ}/${event.getNetwork()}/${event.getApplication()}/${event.getGroup()}`;
62+
const isPirSensor = event.getApplication() === this.settings.ha_discovery_pir_app_id;
63+
64+
// Calculate level percentage for Home Assistant
65+
let levelPercent;
66+
if (event.getLevel() !== null) {
67+
// Explicit level from ramp events (0-255) -> (0-100)
68+
levelPercent = Math.round(event.getLevel() / CGATE_LEVEL_MAX * 100);
69+
} else {
70+
// Implicit level from on/off events: ON=100%, OFF=0%
71+
levelPercent = (event.getAction() === CGATE_CMD_ON.toLowerCase()) ? 100 : 0;
72+
}
73+
74+
let state;
75+
if (isPirSensor) {
76+
// PIR sensors: state based on action (motion detected/cleared)
77+
state = (event.getAction() === CGATE_CMD_ON.toLowerCase()) ? MQTT_STATE_ON : MQTT_STATE_OFF;
78+
} else {
79+
// Lighting devices: state based on action or level
80+
if (event.getLevel() !== null) {
81+
// Ramp events with explicit level (0-255)
82+
state = (levelPercent > 0) ? MQTT_STATE_ON : MQTT_STATE_OFF;
83+
} else {
84+
// On/Off events without explicit level - use action
85+
state = (event.getAction() === CGATE_CMD_ON.toLowerCase()) ? MQTT_STATE_ON : MQTT_STATE_OFF;
86+
}
87+
}
88+
89+
this.logger.info(`${LOG_PREFIX} C-Bus Status ${source}: ${event.getNetwork()}/${event.getApplication()}/${event.getGroup()} ${state}` + (isPirSensor ? '' : ` (${levelPercent}%)`));
90+
91+
// Publish state message
92+
this.mqttPublishQueue.add({
93+
topic: `${topicBase}/${MQTT_TOPIC_SUFFIX_STATE}`,
94+
payload: state,
95+
options: this.mqttOptions
96+
});
97+
98+
// Publish level message for non-PIR sensors
99+
if (!isPirSensor) {
100+
this.mqttPublishQueue.add({
101+
topic: `${topicBase}/${MQTT_TOPIC_SUFFIX_LEVEL}`,
102+
payload: levelPercent.toString(),
103+
options: this.mqttOptions
104+
});
105+
}
106+
}
107+
}
108+
109+
module.exports = EventPublisher;

tests/cgateWebBridge.test.js

Lines changed: 13 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -694,7 +694,7 @@ describe('CgateWebBridge', () => {
694694

695695
describe('_processEventLine()', () => {
696696
it('should process lighting events', () => {
697-
const publishEventSpy = jest.spyOn(bridge, '_publishEvent');
697+
const publishEventSpy = jest.spyOn(bridge.eventPublisher, 'publishEvent');
698698

699699
bridge._processEventLine('lighting on //TestProject/254/56/1');
700700

@@ -703,7 +703,7 @@ describe('CgateWebBridge', () => {
703703
});
704704

705705
it('should ignore invalid event lines', () => {
706-
const publishEventSpy = jest.spyOn(bridge, '_publishEvent');
706+
const publishEventSpy = jest.spyOn(bridge.eventPublisher, 'publishEvent');
707707

708708
bridge._processEventLine('invalid event line');
709709

@@ -712,8 +712,9 @@ describe('CgateWebBridge', () => {
712712
});
713713
});
714714

715-
describe('_publishEvent()', () => {
716-
it('should publish lighting events to MQTT', () => {
715+
describe('EventPublisher integration', () => {
716+
it('should use EventPublisher for publishing events', () => {
717+
const publishEventSpy = jest.spyOn(bridge.eventPublisher, 'publishEvent');
717718
const mockEvent = {
718719
isValid: () => true,
719720
getNetwork: () => '254',
@@ -723,33 +724,17 @@ describe('CgateWebBridge', () => {
723724
getLevel: () => 255
724725
};
725726

726-
bridge._publishEvent(mockEvent);
727+
bridge.eventPublisher.publishEvent(mockEvent, '(Test)');
727728

728-
expect(publishSpy).toHaveBeenCalledWith({
729-
topic: 'cbus/read/254/56/1/state',
730-
payload: 'ON',
731-
options: { qos: 0 }
732-
});
729+
expect(publishEventSpy).toHaveBeenCalledWith(mockEvent, '(Test)');
730+
publishEventSpy.mockRestore();
733731
});
734732

735-
it('should publish PIR sensor events differently', () => {
736-
const mockEvent = {
737-
isValid: () => true,
738-
getNetwork: () => '254',
739-
getApplication: () => '56',
740-
getGroup: () => '1',
741-
getAction: () => 'on',
742-
getLevel: () => null
743-
};
744-
bridge.settings.ha_discovery_pir_app_id = '56';
745-
746-
bridge._publishEvent(mockEvent);
747-
748-
expect(publishSpy).toHaveBeenCalledWith({
749-
topic: 'cbus/read/254/56/1/state',
750-
payload: 'ON',
751-
options: { qos: 0 }
752-
});
733+
it('should initialize EventPublisher with correct options', () => {
734+
expect(bridge.eventPublisher).toBeDefined();
735+
expect(bridge.eventPublisher.settings).toBe(bridge.settings);
736+
expect(bridge.eventPublisher.mqttPublishQueue).toBe(bridge.mqttPublishQueue);
737+
expect(bridge.eventPublisher.mqttOptions).toEqual(bridge._mqttOptions);
753738
});
754739
});
755740

0 commit comments

Comments
 (0)