[Feature] [Connector-V2] Add MQTT Sink Connector#10575
Conversation
58c4408 to
41ff5b5
Compare
Issue 1: Exception handling does not comply with project specificationsLocation: throw new RuntimeException("Failed to connect MQTT client [" + clientId + "]", e);Related Context:
Issue Description: Potential Risks:
Scope of Impact:
Severity: MAJOR Improvement Suggestions:
// seatunnel-connectors-v2/connector-mqtt/src/main/java/org/apache/seatunnel/connectors/seatunnel/mqtt/exception/MqttConnectorErrorCode.java
package org.apache.seatunnel.connectors.seatunnel.mqtt.exception;
import org.apache.seatunnel.common.exception.SeaTunnelErrorCode;
public enum MqttConnectorErrorCode implements SeaTunnelErrorCode {
CONNECTION_FAILED("MQTT-01", "MQTT connection failed"),
PUBLISH_FAILED("MQTT-02", "MQTT message publish failed"),
INVALID_CONFIG("MQTT-03", "Invalid MQTT configuration");
private final String code;
private final String description;
// Constructor and getter
}// seatunnel-connectors-v2/connector-mqtt/src/main/java/org/apache/seatunnel/connectors/seatunnel/mqtt/exception/MqttConnectorException.java
package org.apache.seatunnel.connectors.seatunnel.mqtt.exception;
import org.apache.seatunnel.common.exception.SeaTunnelErrorCode;
import org.apache.seatunnel.common.exception.SeaTunnelRuntimeException;
public class MqttConnectorException extends SeaTunnelRuntimeException {
public MqttConnectorException(SeaTunnelErrorCode errorCode, String errorMessage) {
super(errorCode, errorMessage);
}
public MqttConnectorException(SeaTunnelErrorCode errorCode, String errorMessage, Throwable cause) {
super(errorCode, errorMessage, cause);
}
}
// Import new exception class
import org.apache.seatunnel.connectors.seatunnel.mqtt.exception.MqttConnectorException;
import org.apache.seatunnel.connectors.seatunnel.mqtt.exception.MqttConnectorErrorCode;
// In constructor
} catch (MqttException e) {
throw new MqttConnectorException(
MqttConnectorErrorCode.CONNECTION_FAILED,
"Failed to connect MQTT client [" + clientId + "]",
e);
}
// In write method
throw new IOException(
new MqttConnectorException(
MqttConnectorErrorCode.PUBLISH_FAILED,
"Failed to publish MQTT message after " + retryTimeoutMs + "ms")
.getMessage(),
lastException);Rationale: Follow project specifications to improve consistency and maintainability of error handling. Issue 2: Missing QoS parameter validationLocation: // MqttSinkOptions.java
public static final Option<Integer> QOS =
Options.key("qos")
.intType()
.defaultValue(1)
.withDescription("MQTT QoS level: 0 (at-most-once), 1 (at-least-once)");
// MqttSinkWriter.java - No validation
this.qos = pluginConfig.get(MqttSinkOptions.QOS);Related Context:
Issue Description: Potential Risks:
Scope of Impact:
Severity: MAJOR Improvement Suggestions: // Add validation in MqttSinkWriter constructor
public MqttSinkWriter(
SinkWriter.Context context, SeaTunnelRowType rowType, ReadonlyConfig pluginConfig) {
this.topic = pluginConfig.get(MqttSinkOptions.TOPIC);
this.qos = pluginConfig.get(MqttSinkOptions.QOS);
// Add QoS validation
if (qos < 0 || qos > 1) {
throw new IllegalArgumentException(
"MQTT QoS must be 0 (at-most-once) or 1 (at-least-once), got: " + qos);
}
this.retryTimeoutMs = pluginConfig.get(MqttSinkOptions.RETRY_TIMEOUT);
// ...
}Or add validation in MqttSinkOptions: public static final Option<Integer> QOS =
Options.key("qos")
.intType()
.defaultValue(1)
.withValidator validators -> {
int qos = (Integer) validators;
if (qos < 0 || qos > 1) {
throw new IllegalArgumentException("QoS must be 0 or 1");
}
})
.withDescription("MQTT QoS level: 0 (at-most-once), 1 (at-least-once)");Rationale: Provide clear error messages and detect errors during configuration phase rather than runtime. Issue 3: CleanSession=true contradicts at-least-once semanticsLocation: options.setCleanSession(true);Related Context:
Issue Description:
Potential Risks:
Scope of Impact:
Severity: CRITICAL Improvement Suggestions:
## Key features
- [ ] [exactly-once](../../introduction/concepts/connector-v2-features.md)
**Delivery Semantics Notice**:
This connector provides **at-most-once** delivery when QoS=0, and **best-effort at-least-once** when QoS=1.
Due to `cleanSession=true` (required for stateless operation), unacknowledged messages may be lost during
client disconnections. For stronger guarantees, consider enabling Source replay capabilities in SeaTunnel.
// MqttSinkOptions.java
public static final Option<Boolean> CLEAN_SESSION =
Options.key("clean_session")
.booleanType()
.defaultValue(true)
.withDescription("Whether to use clean session. false enables persistent sessions but may cause broker-side state accumulation");// MqttSinkWriter.java
options.setCleanSession(config.get(MqttSinkOptions.CLEAN_SESSION));
if (!config.get(MqttSinkOptions.CLEAN_SESSION)) {
log.warn("clean_session=false may cause broker-side state accumulation. Ensure proper clientId management.");
}Rationale: Honestly inform users of limitations to avoid misleading promises. CleanSession=true is reasonable for stateless design, but should not claim to provide complete at-least-once guarantees. Issue 4: Missing unit testsLocation: Related Context:
Issue Description:
Potential Risks:
Scope of Impact:
Severity: MAJOR Improvement Suggestions: Create unit test classes: // seatunnel-connectors-v2/connector-mqtt/src/test/java/org/apache/seatunnel/connectors/seatunnel/mqtt/sink/MqttSinkWriterTest.java
package org.apache.seatunnel.connectors.seatunnel.mqtt.sink;
import org.apache.seatunnel.api.configuration.ReadonlyConfig;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
import static org.junit.jupiter.api.Assertions.*;
import static org.mockito.Mockito.*;
@ExtendWith(MockitoExtension.class)
class MqttSinkWriterTest {
@Mock
private SinkWriter.Context context;
@Mock
private SeaTunnelRowType rowType;
@Test
void testInvalidQosThrowsException() {
ReadonlyConfig config = ReadonlyConfig.fromMap(Map.of(
"url", "tcp://localhost:1883",
"topic", "test",
"qos", 2 // Invalid value
));
IllegalArgumentException ex = assertThrows(
IllegalArgumentException.class,
() -> new MqttSinkWriter(context, rowType, config)
);
assertTrue(ex.getMessage().contains("QoS must be 0 or 1"));
}
@Test
void testInvalidFormatThrowsException() {
ReadonlyConfig config = ReadonlyConfig.fromMap(Map.of(
"url", "tcp://localhost:1883",
"topic", "test",
"format", "xml" // Invalid format
));
assertThrows(
IllegalArgumentException.class,
() -> new MqttSinkWriter(context, rowType, config)
);
}
@Test
void testConnectionFailureThrowsWrappedException() {
// Mock Paho client to throw MqttException
// Validation exception is properly wrapped
}
@Test
void testWriteWithRetrySuccess() {
// Simulate first failure, second success
}
@Test
void testWriteTimeoutAfterRetries() {
// Simulate retry timeout
}
}// seatunnel-connectors-v2/connector-mqtt/src/test/java/org/apache/seatunnel/connectors/seatunnel/mqtt/sink/MqttSinkFactoryTest.java
@Test
void testOptionRule() {
MqttSinkFactory factory = new MqttSinkFactory();
OptionRule rule = factory.optionRule();
Set<Option<?>> required = rule.getRequiredOptions();
assertTrue(required.contains(MqttSinkOptions.URL));
assertTrue(required.contains(MqttSinkOptions.TOPIC));
Set<Option<?>> optional = rule.getOptionalOptions();
assertTrue(optional.contains(MqttSinkOptions.QOS));
}Rationale: Improve code quality, ensure safe refactoring, meet Apache project standards. Issue 5: Text format delimiter hardcodedLocation: case "text":
return TextSerializationSchema.builder()
.seaTunnelRowType(rowType)
.delimiter(",")
.build();Related Context:
Issue Description: Potential Risks:
Scope of Impact:
Severity: MINOR Improvement Suggestions:
// MqttSinkOptions.java
public static final Option<String> FIELD_DELIMITER =
Options.key("field_delimiter")
.stringType()
.defaultValue(",")
.withDescription("Field delimiter for text format. Only used when format=text");
.optional(
MqttSinkOptions.USERNAME,
MqttSinkOptions.PASSWORD,
MqttSinkOptions.QOS,
MqttSinkOptions.FORMAT,
MqttSinkOptions.FIELD_DELIMITER, // Add
MqttSinkOptions.RETRY_TIMEOUT,
MqttSinkOptions.CONNECTION_TIMEOUT)
case "text":
String delimiter = config.get(MqttSinkOptions.FIELD_DELIMITER);
return TextSerializationSchema.builder()
.seaTunnelRowType(rowType)
.delimiter(delimiter)
.build();Rationale: Improve flexibility, stay consistent with Kafka Sink. Issue 6: Changelog placeholder not updatedLocation: - Add MQTT Sink Connector ([#XXXX](https://github.com/apache/seatunnel/pull/XXXX))Related Context:
Issue Description: Potential Risks:
Scope of Impact:
Severity: MINOR Improvement Suggestions: ## next version
### Sink
- Add MQTT Sink Connector ([#10575](https://github.com/apache/seatunnel/pull/10575))Also recommend linking Issue #9566: - Add MQTT Sink Connector ([#10575](https://github.com/apache/seatunnel/pull/10575))
Resolves [#9566](https://github.com/apache/seatunnel/issues/9566)Rationale: Maintain documentation integrity, facilitate user tracing. Issue 7: Performance bottleneck - synchronous blocking sendLocation: public void write(SeaTunnelRow element) throws IOException {
byte[] payload = serializationSchema.serialize(element);
MqttMessage message = new MqttMessage(payload);
message.setQos(qos);
// Synchronous retry loop
while (System.currentTimeMillis() < deadline) {
if (mqttClient.isConnected()) {
mqttClient.publish(topic, message); // Blocking call
return;
}
Thread.sleep(RETRY_BACKOFF_MS);
}
}Related Context:
Issue Description: Potential Risks:
Scope of Impact:
Severity: MAJOR (if positioned as high-performance Connector) Improvement Suggestions: Solution 1: Batch send (recommended) // Add batch configuration
public static final Option<Integer> BATCH_SIZE =
Options.key("batch_size")
.intType()
.defaultValue(1)
.withDescription("Number of messages to batch before sending");
// Implement batch sending logic
private final List<MqttMessage> messageBuffer = new ArrayList<>(batchSize);
@Override
public void write(SeaTunnelRow element) throws IOException {
byte[] payload = serializationSchema.serialize(element);
MqttMessage message = new MqttMessage(payload);
message.setQos(qos);
synchronized (messageBuffer) {
messageBuffer.add(message);
if (messageBuffer.size() >= batchSize) {
flushBatch();
}
}
}
private void flushBatch() throws IOException {
// Use MqttClient.publish(topic, MqttMessage[]) for batch sending
// Or send in loop but reduce synchronization overhead
}
@Override
public Optional<Void> prepareCommit() {
flushBatch(); // Flush before checkpoint
return Optional.empty();
}Solution 2: Async send // Use MqttAsyncClient (but requires major refactoring)
// Or use thread pool for asynchronous sendingSolution 3: Document in docs (simplest) ## Performance Considerations
The MQTT Sink sends messages synchronously to guarantee delivery. Typical throughput:
- QoS 0: ~10,000 messages/sec (local network)
- QoS 1: ~5,000 messages/sec (requires broker ACK)
For higher throughput requirements, consider:
- Using Kafka Sink instead
- Reducing QoS to 0
- Increasing SeaTunnel parallelismRationale: Current design is sufficient for IoT scenarios (low-frequency messages), but performance limitations should be clearly documented to avoid user misunderstanding. |
41ff5b5 to
42dc644
Compare
|
Help,No Source Only Sink Connector? |
|
Hi @gitfortian the scope of this PR is only on the Mqtt Sink Connector. |
|
@DanielCarter-stack @davidzollo I've resolved the issues listed above :
Still, I don't know why the CI is not passing. On my local machine, the build for the MQTT connector as well as the global build (install -DskipTests) are passing perfectly. Looking at the GitHub Actions logs, it seems to be failing due to 3-hour runner timeouts on unrelated modules and Hazelcast instance is not active, leading to memory crashes. Could anyone verify this underlying issue and perhaps re-trigger the build? |
4fd1923 to
f8bc6f7
Compare
|
If it's not caused by your own PR issue, it might be a network problem or the Github CI resources being tight. You need to first check the specific error message. If it's a CI timeout, you can modify the corresponding timeout period in your PR in the file |
|
Please add the related Chinese doc. You can write the content in English within the Chinese doc. |
davidzollo
left a comment
There was a problem hiding this comment.
Overall, it is close to being ready for merging. Keep up the good work! Note that the CI must pass successfully.
bd2cfb8 to
4176070
Compare
4176070 to
a782315
Compare
|
I've increased the CI timeouts for it-4 and it-2 as discussed, and all checks are now passing. |
Co-authored-by: dy102 <[email protected]>
Co-authored-by: dy102 <[email protected]>
davidzollo
left a comment
There was a problem hiding this comment.
+1
Good job. Considering that this is your first time contributing to the community, I give a +1 (of course, this also depends on the attitudes of other reviewers).
The current implementation is more like a first-version sink with clean_session=true, rather than the "stronger guarantee" described in the documentation.
Considering that this is your first time contributing to the community, I give a +1 (of course, this also depends on the attitudes of other reviewers).
clean_session=false is inconsistent with the actual recovery link. The documentation in Mqtt.md (line 17) clearly implies that using a stable clientId can achieve stronger delivery semantics; however, in the implementation, a new random clientId is generated each time a writer is created (MqttSinkWriter.java#L80), and only in-memory persistence is used (MqttSinkWriter.java#L91). At the same time, the sink does not have a custom recovery state. Both SeaTunnelSink.java (line 85) and SinkWriter.java (line 82) follow the default empty state. During failure recovery, the engine will recreate the writer (SinkFlowLifeCycle.java#L342). This means that a new clientId will definitely be used after recovery, and the persistent session on the broker side cannot actually be reused.
Purpose of this pull request
Resolves #9566
This pull request introduces a new MQTT Sink Connector for Apache SeaTunnel V2. It enables high-performance, distributed data integration with IoT endpoints and message brokers using the MQTT 3.1.1 protocol.
Core Engineering Highlights
clientIdvalues by appending the engine's subtask index. This prevents broker-side connection hijacking during parallel execution.MemoryPersistencefrom the Eclipse Paho client to maximize throughput and avoid disk I/O overhead in containerized environments.MqttCallbacktriggers to handle transient network interruptions gracefully.Registry & Integration Updates
The following project configuration files were updated to properly register the new connector:
seatunnel-connectors-v2/pom.xml— Registered theconnector-mqttmodule.seatunnel-e2e/seatunnel-connector-v2-e2e/pom.xml— Registered the E2E test module.plugin-mapping.properties— Addedseatunnel.sink.MQTTmapping.seatunnel-dist/pom.xml— Added dependency withprovidedscope for the distribution build..github/workflows/labeler/label-scope-conf.yml— Configured automated GitHub labeling.config/plugin_config— Registered the MQTT sink for startup scripts and plugin loading.Does this PR introduce any user-facing change?
Yes. This PR introduces a new MQTT Sink Connector that allows users to publish SeaTunnel pipeline output to MQTT brokers.
Example configuration: