Skip to content
Open
Show file tree
Hide file tree
Changes from 9 commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
8e66ba7
Add source-layer shuffle to iceberg-source for correct and scalable C…
lawofcycles Mar 29, 2026
ce5de14
skip updating lastsnapshotId when shuffle failed
lawofcycles Mar 29, 2026
77d1a5f
spotless apply
lawofcycles Mar 29, 2026
22bf746
Add S3 certificate path support and ssl_insecure_disable_verification…
lawofcycles Mar 29, 2026
0310bee
Fix coalesce to collect index from all nodes and extract ShuffleNodeC…
lawofcycles Mar 30, 2026
7482b55
Add remote shuffle file cleanup via HTTP DELETE endpoint on all nodes
lawofcycles Mar 30, 2026
7c04330
Fix shuffle write completion key race condition by creating GlobalSta…
lawofcycles Mar 30, 2026
107ac2d
Fix shuffle Avro serialization to use Iceberg DataWriter/PlannedDataR…
lawofcycles Mar 30, 2026
84858c6
Use common HTTP server infrastructure for shuffle
lawofcycles Apr 2, 2026
1d1ede2
Replace fully qualified ByteBuffer with import in ShuffleHttpService
lawofcycles Apr 10, 2026
18dc31b
Replace magic numbers with named constants in shuffle writer and reader
lawofcycles Apr 11, 2026
bb2b4ca
Add path traversal protection to LocalDiskShuffleStorage
lawofcycles Apr 11, 2026
390fcfb
Add input validation to shuffle HTTP endpoints with tests
lawofcycles Apr 11, 2026
73e6118
Improve error handling in shuffle HTTP endpoints
lawofcycles Apr 11, 2026
963fef6
Support authentication plugin for shuffle HTTP server
lawofcycles Apr 11, 2026
66c83b2
Make shuffle storage path configurable and fix flaky test
lawofcycles Apr 11, 2026
291c8ed
Add retry limit to DynamoDB write loops in ChangelogWorker
lawofcycles Apr 11, 2026
67ad064
Use toString-based hash for deterministic shuffle partitioning across…
lawofcycles Apr 11, 2026
171b0b2
Isolate shuffle storage per node and preserve base directory on cleanup
lawofcycles Apr 11, 2026
b2c486c
fix import
lawofcycles Apr 11, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -296,6 +296,45 @@ public Server createHTTPServer(final Buffer<Record<Log>> buffer,
return sb.build();
}

/**
* Creates a lightweight HTTP server with TLS support and optional authentication.
* Intended for internal node-to-node communication (e.g. shuffle data transfer)
* where throttling, health checks, and buffer integration are not needed.
*
* @param certificateProvider TLS certificate provider, or null if SSL is disabled
* @param authenticationProvider authentication decorator, or null to skip authentication
* @param annotatedService Armeria annotated service to register
* @param path base path for the annotated service
* @return configured Armeria Server
*/
public Server createHTTPServer(
final CertificateProvider certificateProvider,
final ArmeriaHttpAuthenticationProvider authenticationProvider,
final Object annotatedService,
final String path) {
final ServerBuilder sb = Server.builder();
sb.disableServerHeader();

if (serverConfiguration.isSsl()) {
LOG.info("Creating {} with SSL/TLS enabled.", sourceName);
final Certificate certificate = certificateProvider.getCertificate();
sb.https(serverConfiguration.getPort()).tls(
new ByteArrayInputStream(certificate.getCertificate().getBytes(StandardCharsets.UTF_8)),
new ByteArrayInputStream(certificate.getPrivateKey().getBytes(StandardCharsets.UTF_8)));
} else {
LOG.warn("Creating {} without SSL/TLS. This is not secure.", sourceName);
sb.http(serverConfiguration.getPort());
}

if (authenticationProvider != null) {
authenticationProvider.getAuthenticationDecorator().ifPresent(sb::decorator);
}

sb.annotatedService(path, annotatedService);

return sb.build();
}

private GrpcExceptionHandlerFunction createGrpExceptionHandler() {
RetryInfoConfig retryInfo = serverConfiguration.getRetryInfo() != null
? serverConfiguration.getRetryInfo()
Expand Down
8 changes: 8 additions & 0 deletions data-prepper-plugins/iceberg-source/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,10 @@ dependencies {
implementation project(':data-prepper-api')
implementation project(':data-prepper-plugins:buffer-common')
implementation project(':data-prepper-plugins:aws-plugin-api')
implementation project(':data-prepper-plugins:common')
implementation project(':data-prepper-plugins:armeria-common')
implementation project(':data-prepper-plugins:http-common')
implementation project(':data-prepper-plugins:http-source-common')

implementation 'org.apache.iceberg:iceberg-core:1.10.1'
implementation 'org.apache.iceberg:iceberg-data:1.10.1'
Expand All @@ -40,6 +44,10 @@ dependencies {
implementation libs.hadoop.common
implementation 'org.apache.orc:orc-core:1.9.5'

implementation libs.armeria.core

implementation 'org.lz4:lz4-java:1.8.0'

implementation 'software.amazon.awssdk:glue'
implementation 'software.amazon.awssdk:s3'
implementation 'software.amazon.awssdk:sts'
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -227,6 +227,85 @@ void cdc_delete_produces_delete_event() throws Exception {
}
}

/**
* When a partition column is updated (e.g. region US -> EU), Iceberg produces
* a DELETE in the old partition and an INSERT in the new partition. The shuffle
* routes both to the same node by identifier_columns hash, enabling correct
* UPDATE merge across partitions.
*/
@Test
void cdc_partition_column_update_correctly_handled_by_shuffle() throws Exception {
final Schema partitionedSchema = new Schema(
Types.NestedField.required(1, "id", Types.IntegerType.get()),
Types.NestedField.required(2, "name", Types.StringType.get()),
Types.NestedField.required(3, "region", Types.StringType.get())
);

final String partitionedTable = "partitioned_users";
helper.dropTable(TEST_NAMESPACE, partitionedTable);
final Table table = helper.createPartitionedTable(TEST_NAMESPACE, partitionedTable,
partitionedSchema, org.apache.iceberg.PartitionSpec.builderFor(partitionedSchema).identity("region").build());

// Insert initial data: id=1 in US, id=2 in EU
final DataFile usFile = helper.appendRows(table, List.of(
helper.newRecord(partitionedSchema, 1, "Alice", "US")
));
final DataFile euFile = helper.appendRows(table, List.of(
helper.newRecord(partitionedSchema, 2, "Bob", "EU")
));

final String fullTableName = TEST_NAMESPACE + "." + partitionedTable;
final IcebergService service = createServiceForTable(fullTableName, List.of("id"), false);
final Buffer<org.opensearch.dataprepper.model.record.Record<Event>> buffer = createMockBuffer();
service.start(buffer);

try {
// Wait for initial load (2 rows from 2 separate appends)
await().atMost(30, TimeUnit.SECONDS)
.untilAsserted(() -> assertThat(receivedRecords, hasSize(greaterThanOrEqualTo(2))));
final int afterInitialLoad = receivedRecords.size();

// UPDATE: move id=1 from US to EU (partition column change)
// Simulate CoW: delete old US file, add new EU file with id=1 in single overwrite
table.refresh();
final DataFile newEuFile = helper.writeDataFile(table, List.of(
helper.newRecord(partitionedSchema, 1, "Alice", "EU"),
helper.newRecord(partitionedSchema, 2, "Bob", "EU")
));
table.newOverwrite()
.deleteFile(usFile)
.deleteFile(euFile)
.addFile(newEuFile)
.commit();

// Wait for CDC events
await().atMost(60, TimeUnit.SECONDS)
.untilAsserted(() -> assertThat(receivedRecords, hasSize(greaterThanOrEqualTo(afterInitialLoad + 1))));

final List<org.opensearch.dataprepper.model.record.Record<Event>> cdcEvents =
receivedRecords.subList(afterInitialLoad, receivedRecords.size());

// With shuffle: the DELETE(id=1, US) and INSERT(id=1, EU) are routed to the same
// node by identifier_columns hash. UPDATE merge detects the pair and drops the DELETE.
// Only the INSERT (INDEX action) should remain.
boolean foundAliceEU = false;
for (final org.opensearch.dataprepper.model.record.Record<Event> record : cdcEvents) {
final Event event = record.getData();
if ("Alice".equals(event.get("name", String.class))
&& "EU".equals(event.get("region", String.class))) {
assertThat(event.getMetadata().getAttribute("bulk_action"), equalTo("index"));
foundAliceEU = true;
}
}
assertThat("Expected INSERT event for Alice in EU after partition column update",
foundAliceEU, equalTo(true));

} finally {
service.shutdown();
helper.dropTable(TEST_NAMESPACE, partitionedTable);
}
}

/**
* Common setup for CDC tests: creates a table with 3 rows (Alice, Bob, Carol),
* starts IcebergService, and waits for initial load to complete.
Expand Down Expand Up @@ -284,20 +363,26 @@ private Buffer<org.opensearch.dataprepper.model.record.Record<Event>> createMock
}

private IcebergService createService(final boolean disableExport) throws Exception {
final String fullTableName = TEST_NAMESPACE + "." + TEST_TABLE;
return createServiceForTable(TEST_NAMESPACE + "." + TEST_TABLE, List.of("id"), disableExport);
}

private IcebergService createServiceForTable(final String fullTableName,
final List<String> identifierColumns,
final boolean disableExport) throws Exception {

// Build config via reflection since fields are private
final IcebergSourceConfig sourceConfig = mock(IcebergSourceConfig.class);
final TableConfig tableConfig = mock(TableConfig.class);

when(tableConfig.getTableName()).thenReturn(fullTableName);
when(tableConfig.getCatalog()).thenReturn(helper.catalogProperties());
when(tableConfig.getIdentifierColumns()).thenReturn(List.of("id"));
when(tableConfig.getIdentifierColumns()).thenReturn(identifierColumns);
when(tableConfig.isDisableExport()).thenReturn(disableExport);

when(sourceConfig.getTables()).thenReturn(List.of(tableConfig));
when(sourceConfig.getPollingInterval()).thenReturn(Duration.ofSeconds(5));
lenient().when(sourceConfig.isAcknowledgmentsEnabled()).thenReturn(false);
when(sourceConfig.getShuffleConfig()).thenReturn(createTestShuffleConfig());

final EnhancedSourceCoordinator coordinator = createInMemoryCoordinator();
coordinator.createPartition(new LeaderPartition());
Expand All @@ -306,6 +391,16 @@ private IcebergService createService(final boolean disableExport) throws Excepti
org.opensearch.dataprepper.event.TestEventFactory.getTestEventFactory());
}

private org.opensearch.dataprepper.plugins.source.iceberg.shuffle.ShuffleConfig createTestShuffleConfig() {
try {
final com.fasterxml.jackson.databind.ObjectMapper mapper = new com.fasterxml.jackson.databind.ObjectMapper();
return mapper.readValue("{\"ssl\": false, \"port\": 4995}",
org.opensearch.dataprepper.plugins.source.iceberg.shuffle.ShuffleConfig.class);
} catch (final Exception e) {
throw new RuntimeException(e);
}
}

private EnhancedSourceCoordinator createInMemoryCoordinator() {
final InMemorySourceCoordinationStore store = new InMemorySourceCoordinationStore(
new org.opensearch.dataprepper.model.configuration.PluginSetting("in_memory", Collections.emptyMap()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,11 @@ public Table createTable(final String namespace, final String tableName, final S
return catalog.createTable(TableIdentifier.of(namespace, tableName), schema, PartitionSpec.unpartitioned());
}

public Table createPartitionedTable(final String namespace, final String tableName,
final Schema schema, final PartitionSpec spec) {
return catalog.createTable(TableIdentifier.of(namespace, tableName), schema, spec);
}

public void dropTable(final String namespace, final String tableName) {
try {
catalog.dropTable(TableIdentifier.of(namespace, tableName), true);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,15 @@
import org.opensearch.dataprepper.model.record.Record;
import org.opensearch.dataprepper.model.source.coordinator.enhanced.EnhancedSourceCoordinator;
import org.opensearch.dataprepper.plugins.source.iceberg.leader.LeaderScheduler;
import org.opensearch.dataprepper.plugins.source.iceberg.shuffle.LocalDiskShuffleStorage;
import org.opensearch.dataprepper.plugins.source.iceberg.shuffle.ShuffleHttpServer;
import org.opensearch.dataprepper.plugins.source.iceberg.shuffle.ShuffleHttpService;
import org.opensearch.dataprepper.plugins.source.iceberg.shuffle.ShuffleStorage;
import org.opensearch.dataprepper.plugins.source.iceberg.worker.ChangelogWorker;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.nio.file.Path;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
Expand All @@ -43,6 +48,8 @@ public class IcebergService {
private final PluginMetrics pluginMetrics;
private final AcknowledgementSetManager acknowledgementSetManager;
private final EventFactory eventFactory;
private final ShuffleStorage shuffleStorage;
private ShuffleHttpServer shuffleHttpServer;
private ExecutorService executor;

public IcebergService(final EnhancedSourceCoordinator sourceCoordinator,
Expand All @@ -55,11 +62,19 @@ public IcebergService(final EnhancedSourceCoordinator sourceCoordinator,
this.pluginMetrics = pluginMetrics;
this.acknowledgementSetManager = acknowledgementSetManager;
this.eventFactory = eventFactory;
final Path shuffleBaseDir = Path.of(System.getProperty("java.io.tmpdir"), "data-prepper-shuffle");
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should make this configurable. Also I wonder if we should have this nested in the data-prepper directory to start. I think in our Docker deployment and perhaps for other installations via tar.gz, the user that Data Prepper runs under might not have access to this directory. We hit this once with service map, though I don't recall all the details.

Did you test this using the Docker container built by Data Prepper?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Made the shuffle storage path configurable via storage_path under shuffle. The default resolution order is: explicit storage_path setting, then ${data-prepper.dir}/data/shuffle (following the same pattern as the GeoIP processor), then ${java.io.tmpdir}/data-prepper-shuffle as a fallback for test environments where data-prepper.dir is not set.

I have not tested with the official Docker image built by Data Prepper's release process. My multi-node verification used locally built Docker images. With data-prepper.dir/data/shuffle as the default, the path is under Data Prepper's home directory where the running user has write access.

this.shuffleStorage = new LocalDiskShuffleStorage(shuffleBaseDir);
this.shuffleStorage.cleanupAll();
}

public void start(final Buffer<Record<Event>> buffer) {
LOG.info("Starting Iceberg service");

// Start shuffle HTTP server
final ShuffleHttpService shuffleHttpService = new ShuffleHttpService(shuffleStorage);
shuffleHttpServer = new ShuffleHttpServer(sourceConfig.getShuffleConfig(), shuffleHttpService);
shuffleHttpServer.start();

// Load all tables upfront. Single point of Table lifecycle management.
final Map<String, Table> tables = new HashMap<>();
final Map<String, TableConfig> tableConfigs = new HashMap<>();
Expand Down Expand Up @@ -100,9 +115,10 @@ public void start(final Buffer<Record<Event>> buffer) {
// Start schedulers with shared table references
final List<Runnable> runnableList = new ArrayList<>();

runnableList.add(new LeaderScheduler(sourceCoordinator, tableConfigs, sourceConfig.getPollingInterval(), tables));
runnableList.add(new LeaderScheduler(sourceCoordinator, tableConfigs,
sourceConfig.getPollingInterval(), tables, shuffleStorage, sourceConfig.getShuffleConfig()));
runnableList.add(new ChangelogWorker(
sourceCoordinator, sourceConfig, tables, tableConfigs, buffer, acknowledgementSetManager, eventFactory));
sourceCoordinator, sourceConfig, tables, tableConfigs, buffer, acknowledgementSetManager, eventFactory, shuffleStorage));

executor = Executors.newFixedThreadPool(runnableList.size());
runnableList.forEach(executor::submit);
Expand All @@ -113,6 +129,9 @@ public void shutdown() {
if (executor != null) {
executor.shutdownNow();
}
if (shuffleHttpServer != null) {
shuffleHttpServer.stop();
}
}

private void validateCoWTable(final Table table, final String tableName) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import com.fasterxml.jackson.annotation.JsonProperty;
import jakarta.validation.Valid;
import jakarta.validation.constraints.NotEmpty;
import org.opensearch.dataprepper.plugins.source.iceberg.shuffle.ShuffleConfig;

import java.time.Duration;
import java.util.List;
Expand All @@ -32,6 +33,10 @@ public class IcebergSourceConfig {
@JsonProperty("acknowledgments")
private boolean acknowledgments = true;

@JsonProperty("shuffle")
@Valid
private ShuffleConfig shuffleConfig = new ShuffleConfig();

public List<TableConfig> getTables() {
return tables;
}
Expand All @@ -43,4 +48,8 @@ public Duration getPollingInterval() {
public boolean isAcknowledgmentsEnabled() {
return acknowledgments;
}

public ShuffleConfig getShuffleConfig() {
return shuffleConfig;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
import org.opensearch.dataprepper.plugins.source.iceberg.coordination.partition.GlobalState;
import org.opensearch.dataprepper.plugins.source.iceberg.coordination.partition.InitialLoadTaskPartition;
import org.opensearch.dataprepper.plugins.source.iceberg.coordination.partition.LeaderPartition;
import org.opensearch.dataprepper.plugins.source.iceberg.coordination.partition.ShuffleReadPartition;
import org.opensearch.dataprepper.plugins.source.iceberg.coordination.partition.ShuffleWritePartition;

import java.util.function.Function;

Expand All @@ -26,14 +28,19 @@ public EnhancedSourcePartition apply(final SourcePartitionStoreItem partitionSto
final String sourceIdentifier = partitionStoreItem.getSourceIdentifier();
final String partitionType = sourceIdentifier.substring(sourceIdentifier.lastIndexOf('|') + 1);

if (LeaderPartition.PARTITION_TYPE.equals(partitionType)) {
return new LeaderPartition(partitionStoreItem);
} else if (ChangelogTaskPartition.PARTITION_TYPE.equals(partitionType)) {
return new ChangelogTaskPartition(partitionStoreItem);
} else if (InitialLoadTaskPartition.PARTITION_TYPE.equals(partitionType)) {
return new InitialLoadTaskPartition(partitionStoreItem);
} else {
return new GlobalState(partitionStoreItem);
switch (partitionType) {
case LeaderPartition.PARTITION_TYPE:
return new LeaderPartition(partitionStoreItem);
case ChangelogTaskPartition.PARTITION_TYPE:
return new ChangelogTaskPartition(partitionStoreItem);
case InitialLoadTaskPartition.PARTITION_TYPE:
return new InitialLoadTaskPartition(partitionStoreItem);
case ShuffleWritePartition.PARTITION_TYPE:
return new ShuffleWritePartition(partitionStoreItem);
case ShuffleReadPartition.PARTITION_TYPE:
return new ShuffleReadPartition(partitionStoreItem);
default:
return new GlobalState(partitionStoreItem);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*
*/

package org.opensearch.dataprepper.plugins.source.iceberg.coordination.partition;

import org.opensearch.dataprepper.model.source.coordinator.SourcePartitionStoreItem;
import org.opensearch.dataprepper.model.source.coordinator.enhanced.EnhancedSourcePartition;
import org.opensearch.dataprepper.plugins.source.iceberg.coordination.state.ShuffleReadProgressState;

import java.util.Optional;

public class ShuffleReadPartition extends EnhancedSourcePartition<ShuffleReadProgressState> {

public static final String PARTITION_TYPE = "SHUFFLE_READ";

private final String partitionKey;
private final ShuffleReadProgressState state;

public ShuffleReadPartition(final String partitionKey, final ShuffleReadProgressState state) {
this.partitionKey = partitionKey;
this.state = state;
}

public ShuffleReadPartition(final SourcePartitionStoreItem sourcePartitionStoreItem) {
setSourcePartitionStoreItem(sourcePartitionStoreItem);
this.partitionKey = sourcePartitionStoreItem.getSourcePartitionKey();
this.state = convertStringToPartitionProgressState(ShuffleReadProgressState.class,
sourcePartitionStoreItem.getPartitionProgressState());
}

@Override
public String getPartitionType() { return PARTITION_TYPE; }

@Override
public String getPartitionKey() { return partitionKey; }

@Override
public Optional<ShuffleReadProgressState> getProgressState() { return Optional.of(state); }
}
Loading
Loading