diff --git a/data-prepper-plugins/http-common/src/main/java/org/opensearch/dataprepper/plugins/server/CreateServer.java b/data-prepper-plugins/http-common/src/main/java/org/opensearch/dataprepper/plugins/server/CreateServer.java index f148c362ed..6d09b0e6fc 100644 --- a/data-prepper-plugins/http-common/src/main/java/org/opensearch/dataprepper/plugins/server/CreateServer.java +++ b/data-prepper-plugins/http-common/src/main/java/org/opensearch/dataprepper/plugins/server/CreateServer.java @@ -296,6 +296,45 @@ public Server createHTTPServer(final Buffer> buffer, return sb.build(); } + /** + * Creates a lightweight HTTP server with TLS support and optional authentication. + * Intended for internal node-to-node communication (e.g. shuffle data transfer) + * where throttling, health checks, and buffer integration are not needed. + * + * @param certificateProvider TLS certificate provider, or null if SSL is disabled + * @param authenticationProvider authentication decorator, or null to skip authentication + * @param annotatedService Armeria annotated service to register + * @param path base path for the annotated service + * @return configured Armeria Server + */ + public Server createHTTPServer( + final CertificateProvider certificateProvider, + final ArmeriaHttpAuthenticationProvider authenticationProvider, + final Object annotatedService, + final String path) { + final ServerBuilder sb = Server.builder(); + sb.disableServerHeader(); + + if (serverConfiguration.isSsl()) { + LOG.info("Creating {} with SSL/TLS enabled.", sourceName); + final Certificate certificate = certificateProvider.getCertificate(); + sb.https(serverConfiguration.getPort()).tls( + new ByteArrayInputStream(certificate.getCertificate().getBytes(StandardCharsets.UTF_8)), + new ByteArrayInputStream(certificate.getPrivateKey().getBytes(StandardCharsets.UTF_8))); + } else { + LOG.warn("Creating {} without SSL/TLS. This is not secure.", sourceName); + sb.http(serverConfiguration.getPort()); + } + + if (authenticationProvider != null) { + authenticationProvider.getAuthenticationDecorator().ifPresent(sb::decorator); + } + + sb.annotatedService(path, annotatedService); + + return sb.build(); + } + private GrpcExceptionHandlerFunction createGrpExceptionHandler() { RetryInfoConfig retryInfo = serverConfiguration.getRetryInfo() != null ? serverConfiguration.getRetryInfo() diff --git a/data-prepper-plugins/iceberg-source/build.gradle b/data-prepper-plugins/iceberg-source/build.gradle index bd59f11b00..ef9d1e6795 100644 --- a/data-prepper-plugins/iceberg-source/build.gradle +++ b/data-prepper-plugins/iceberg-source/build.gradle @@ -28,6 +28,10 @@ dependencies { implementation project(':data-prepper-api') implementation project(':data-prepper-plugins:buffer-common') implementation project(':data-prepper-plugins:aws-plugin-api') + implementation project(':data-prepper-plugins:common') + implementation project(':data-prepper-plugins:armeria-common') + implementation project(':data-prepper-plugins:http-common') + implementation project(':data-prepper-plugins:http-source-common') implementation 'org.apache.iceberg:iceberg-core:1.10.1' implementation 'org.apache.iceberg:iceberg-data:1.10.1' @@ -40,6 +44,10 @@ dependencies { implementation libs.hadoop.common implementation 'org.apache.orc:orc-core:1.9.5' + implementation libs.armeria.core + + implementation 'org.lz4:lz4-java:1.8.0' + implementation 'software.amazon.awssdk:glue' implementation 'software.amazon.awssdk:s3' implementation 'software.amazon.awssdk:sts' diff --git a/data-prepper-plugins/iceberg-source/src/integrationTest/java/org/opensearch/dataprepper/plugins/source/iceberg/IcebergSourceIT.java b/data-prepper-plugins/iceberg-source/src/integrationTest/java/org/opensearch/dataprepper/plugins/source/iceberg/IcebergSourceIT.java index ccfb178e67..221b607a31 100644 --- a/data-prepper-plugins/iceberg-source/src/integrationTest/java/org/opensearch/dataprepper/plugins/source/iceberg/IcebergSourceIT.java +++ b/data-prepper-plugins/iceberg-source/src/integrationTest/java/org/opensearch/dataprepper/plugins/source/iceberg/IcebergSourceIT.java @@ -30,6 +30,7 @@ import org.opensearch.dataprepper.model.buffer.Buffer; import org.opensearch.dataprepper.model.configuration.PluginModel; import org.opensearch.dataprepper.model.event.Event; +import org.opensearch.dataprepper.model.plugin.PluginFactory; import org.opensearch.dataprepper.model.source.coordinator.enhanced.EnhancedSourceCoordinator; import org.opensearch.dataprepper.plugins.source.iceberg.coordination.PartitionFactory; import org.opensearch.dataprepper.plugins.source.iceberg.coordination.partition.LeaderPartition; @@ -85,6 +86,9 @@ public class IcebergSourceIT { @Mock private AcknowledgementSetManager acknowledgementSetManager; + @Mock + private PluginFactory pluginFactory; + private final List> receivedRecords = Collections.synchronizedList(new ArrayList<>()); @@ -227,6 +231,85 @@ void cdc_delete_produces_delete_event() throws Exception { } } + /** + * When a partition column is updated (e.g. region US -> EU), Iceberg produces + * a DELETE in the old partition and an INSERT in the new partition. The shuffle + * routes both to the same node by identifier_columns hash, enabling correct + * UPDATE merge across partitions. + */ + @Test + void cdc_partition_column_update_correctly_handled_by_shuffle() throws Exception { + final Schema partitionedSchema = new Schema( + Types.NestedField.required(1, "id", Types.IntegerType.get()), + Types.NestedField.required(2, "name", Types.StringType.get()), + Types.NestedField.required(3, "region", Types.StringType.get()) + ); + + final String partitionedTable = "partitioned_users"; + helper.dropTable(TEST_NAMESPACE, partitionedTable); + final Table table = helper.createPartitionedTable(TEST_NAMESPACE, partitionedTable, + partitionedSchema, org.apache.iceberg.PartitionSpec.builderFor(partitionedSchema).identity("region").build()); + + // Insert initial data: id=1 in US, id=2 in EU + final DataFile usFile = helper.appendRows(table, List.of( + helper.newRecord(partitionedSchema, 1, "Alice", "US") + )); + final DataFile euFile = helper.appendRows(table, List.of( + helper.newRecord(partitionedSchema, 2, "Bob", "EU") + )); + + final String fullTableName = TEST_NAMESPACE + "." + partitionedTable; + final IcebergService service = createServiceForTable(fullTableName, List.of("id"), false); + final Buffer> buffer = createMockBuffer(); + service.start(buffer); + + try { + // Wait for initial load (2 rows from 2 separate appends) + await().atMost(30, TimeUnit.SECONDS) + .untilAsserted(() -> assertThat(receivedRecords, hasSize(greaterThanOrEqualTo(2)))); + final int afterInitialLoad = receivedRecords.size(); + + // UPDATE: move id=1 from US to EU (partition column change) + // Simulate CoW: delete old US file, add new EU file with id=1 in single overwrite + table.refresh(); + final DataFile newEuFile = helper.writeDataFile(table, List.of( + helper.newRecord(partitionedSchema, 1, "Alice", "EU"), + helper.newRecord(partitionedSchema, 2, "Bob", "EU") + )); + table.newOverwrite() + .deleteFile(usFile) + .deleteFile(euFile) + .addFile(newEuFile) + .commit(); + + // Wait for CDC events + await().atMost(60, TimeUnit.SECONDS) + .untilAsserted(() -> assertThat(receivedRecords, hasSize(greaterThanOrEqualTo(afterInitialLoad + 1)))); + + final List> cdcEvents = + receivedRecords.subList(afterInitialLoad, receivedRecords.size()); + + // With shuffle: the DELETE(id=1, US) and INSERT(id=1, EU) are routed to the same + // node by identifier_columns hash. UPDATE merge detects the pair and drops the DELETE. + // Only the INSERT (INDEX action) should remain. + boolean foundAliceEU = false; + for (final org.opensearch.dataprepper.model.record.Record record : cdcEvents) { + final Event event = record.getData(); + if ("Alice".equals(event.get("name", String.class)) + && "EU".equals(event.get("region", String.class))) { + assertThat(event.getMetadata().getAttribute("bulk_action"), equalTo("index")); + foundAliceEU = true; + } + } + assertThat("Expected INSERT event for Alice in EU after partition column update", + foundAliceEU, equalTo(true)); + + } finally { + service.shutdown(); + helper.dropTable(TEST_NAMESPACE, partitionedTable); + } + } + /** * Common setup for CDC tests: creates a table with 3 rows (Alice, Bob, Carol), * starts IcebergService, and waits for initial load to complete. @@ -284,7 +367,12 @@ private Buffer> createMock } private IcebergService createService(final boolean disableExport) throws Exception { - final String fullTableName = TEST_NAMESPACE + "." + TEST_TABLE; + return createServiceForTable(TEST_NAMESPACE + "." + TEST_TABLE, List.of("id"), disableExport); + } + + private IcebergService createServiceForTable(final String fullTableName, + final List identifierColumns, + final boolean disableExport) throws Exception { // Build config via reflection since fields are private final IcebergSourceConfig sourceConfig = mock(IcebergSourceConfig.class); @@ -292,18 +380,29 @@ private IcebergService createService(final boolean disableExport) throws Excepti when(tableConfig.getTableName()).thenReturn(fullTableName); when(tableConfig.getCatalog()).thenReturn(helper.catalogProperties()); - when(tableConfig.getIdentifierColumns()).thenReturn(List.of("id")); + when(tableConfig.getIdentifierColumns()).thenReturn(identifierColumns); when(tableConfig.isDisableExport()).thenReturn(disableExport); when(sourceConfig.getTables()).thenReturn(List.of(tableConfig)); when(sourceConfig.getPollingInterval()).thenReturn(Duration.ofSeconds(5)); lenient().when(sourceConfig.isAcknowledgmentsEnabled()).thenReturn(false); + when(sourceConfig.getShuffleConfig()).thenReturn(createTestShuffleConfig()); final EnhancedSourceCoordinator coordinator = createInMemoryCoordinator(); coordinator.createPartition(new LeaderPartition()); return new IcebergService(coordinator, sourceConfig, pluginMetrics, acknowledgementSetManager, - org.opensearch.dataprepper.event.TestEventFactory.getTestEventFactory()); + org.opensearch.dataprepper.event.TestEventFactory.getTestEventFactory(), pluginFactory); + } + + private org.opensearch.dataprepper.plugins.source.iceberg.shuffle.ShuffleConfig createTestShuffleConfig() { + try { + final com.fasterxml.jackson.databind.ObjectMapper mapper = new com.fasterxml.jackson.databind.ObjectMapper(); + return mapper.readValue("{\"ssl\": false, \"port\": 4995}", + org.opensearch.dataprepper.plugins.source.iceberg.shuffle.ShuffleConfig.class); + } catch (final Exception e) { + throw new RuntimeException(e); + } } private EnhancedSourceCoordinator createInMemoryCoordinator() { diff --git a/data-prepper-plugins/iceberg-source/src/integrationTest/java/org/opensearch/dataprepper/plugins/source/iceberg/IcebergTestHelper.java b/data-prepper-plugins/iceberg-source/src/integrationTest/java/org/opensearch/dataprepper/plugins/source/iceberg/IcebergTestHelper.java index 000e0e20ed..3734d22282 100644 --- a/data-prepper-plugins/iceberg-source/src/integrationTest/java/org/opensearch/dataprepper/plugins/source/iceberg/IcebergTestHelper.java +++ b/data-prepper-plugins/iceberg-source/src/integrationTest/java/org/opensearch/dataprepper/plugins/source/iceberg/IcebergTestHelper.java @@ -102,6 +102,11 @@ public Table createTable(final String namespace, final String tableName, final S return catalog.createTable(TableIdentifier.of(namespace, tableName), schema, PartitionSpec.unpartitioned()); } + public Table createPartitionedTable(final String namespace, final String tableName, + final Schema schema, final PartitionSpec spec) { + return catalog.createTable(TableIdentifier.of(namespace, tableName), schema, spec); + } + public void dropTable(final String namespace, final String tableName) { try { catalog.dropTable(TableIdentifier.of(namespace, tableName), true); diff --git a/data-prepper-plugins/iceberg-source/src/main/java/org/opensearch/dataprepper/plugins/source/iceberg/IcebergService.java b/data-prepper-plugins/iceberg-source/src/main/java/org/opensearch/dataprepper/plugins/source/iceberg/IcebergService.java index 2288cef9fe..d2bfe8799b 100644 --- a/data-prepper-plugins/iceberg-source/src/main/java/org/opensearch/dataprepper/plugins/source/iceberg/IcebergService.java +++ b/data-prepper-plugins/iceberg-source/src/main/java/org/opensearch/dataprepper/plugins/source/iceberg/IcebergService.java @@ -14,19 +14,30 @@ import org.apache.iceberg.Table; import org.apache.iceberg.catalog.Catalog; import org.apache.iceberg.catalog.TableIdentifier; +import org.opensearch.dataprepper.armeria.authentication.ArmeriaHttpAuthenticationProvider; import org.opensearch.dataprepper.metrics.PluginMetrics; import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSetManager; import org.opensearch.dataprepper.model.buffer.Buffer; +import org.opensearch.dataprepper.model.configuration.PluginModel; +import org.opensearch.dataprepper.model.configuration.PluginSetting; import org.opensearch.dataprepper.model.event.Event; import org.opensearch.dataprepper.model.event.EventFactory; +import org.opensearch.dataprepper.model.plugin.PluginFactory; import org.opensearch.dataprepper.model.record.Record; import org.opensearch.dataprepper.model.source.coordinator.enhanced.EnhancedSourceCoordinator; import org.opensearch.dataprepper.plugins.source.iceberg.leader.LeaderScheduler; +import org.opensearch.dataprepper.plugins.source.iceberg.shuffle.LocalDiskShuffleStorage; +import org.opensearch.dataprepper.plugins.source.iceberg.shuffle.ShuffleConfig; +import org.opensearch.dataprepper.plugins.source.iceberg.shuffle.ShuffleHttpServer; +import org.opensearch.dataprepper.plugins.source.iceberg.shuffle.ShuffleHttpService; +import org.opensearch.dataprepper.plugins.source.iceberg.shuffle.ShuffleStorage; import org.opensearch.dataprepper.plugins.source.iceberg.worker.ChangelogWorker; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.nio.file.Path; import java.util.ArrayList; +import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -43,23 +54,37 @@ public class IcebergService { private final PluginMetrics pluginMetrics; private final AcknowledgementSetManager acknowledgementSetManager; private final EventFactory eventFactory; + private final PluginFactory pluginFactory; + private final ShuffleStorage shuffleStorage; + private ShuffleHttpServer shuffleHttpServer; private ExecutorService executor; public IcebergService(final EnhancedSourceCoordinator sourceCoordinator, final IcebergSourceConfig sourceConfig, final PluginMetrics pluginMetrics, final AcknowledgementSetManager acknowledgementSetManager, - final EventFactory eventFactory) { + final EventFactory eventFactory, + final PluginFactory pluginFactory) { this.sourceCoordinator = sourceCoordinator; this.sourceConfig = sourceConfig; this.pluginMetrics = pluginMetrics; this.acknowledgementSetManager = acknowledgementSetManager; this.eventFactory = eventFactory; + this.pluginFactory = pluginFactory; + final Path shuffleBaseDir = resolveShuffleBaseDir(sourceConfig.getShuffleConfig()); + this.shuffleStorage = new LocalDiskShuffleStorage(shuffleBaseDir); + this.shuffleStorage.cleanupAll(); } public void start(final Buffer> buffer) { LOG.info("Starting Iceberg service"); + // Start shuffle HTTP server + final ShuffleHttpService shuffleHttpService = new ShuffleHttpService(shuffleStorage); + final ArmeriaHttpAuthenticationProvider authenticationProvider = loadAuthenticationProvider(); + shuffleHttpServer = new ShuffleHttpServer(sourceConfig.getShuffleConfig(), shuffleHttpService, authenticationProvider); + shuffleHttpServer.start(); + // Load all tables upfront. Single point of Table lifecycle management. final Map tables = new HashMap<>(); final Map tableConfigs = new HashMap<>(); @@ -100,19 +125,53 @@ public void start(final Buffer> buffer) { // Start schedulers with shared table references final List runnableList = new ArrayList<>(); - runnableList.add(new LeaderScheduler(sourceCoordinator, tableConfigs, sourceConfig.getPollingInterval(), tables)); + runnableList.add(new LeaderScheduler(sourceCoordinator, tableConfigs, + sourceConfig.getPollingInterval(), tables, shuffleStorage, sourceConfig.getShuffleConfig())); runnableList.add(new ChangelogWorker( - sourceCoordinator, sourceConfig, tables, tableConfigs, buffer, acknowledgementSetManager, eventFactory)); + sourceCoordinator, sourceConfig, tables, tableConfigs, buffer, acknowledgementSetManager, eventFactory, shuffleStorage)); executor = Executors.newFixedThreadPool(runnableList.size()); runnableList.forEach(executor::submit); } + private static Path resolveShuffleBaseDir(final ShuffleConfig shuffleConfig) { + final Path baseDir; + if (shuffleConfig.getStoragePath() != null) { + baseDir = Path.of(shuffleConfig.getStoragePath()); + } else { + final String dataPrepperDir = System.getProperty("data-prepper.dir"); + if (dataPrepperDir != null) { + baseDir = Path.of(dataPrepperDir, "data", "shuffle"); + } else { + baseDir = Path.of(System.getProperty("java.io.tmpdir"), "data-prepper-shuffle"); + } + } + return baseDir.resolve(String.valueOf(shuffleConfig.getServerPort())); + } + + private ArmeriaHttpAuthenticationProvider loadAuthenticationProvider() { + final PluginModel authConfig = sourceConfig.getShuffleConfig().getAuthentication(); + final PluginSetting pluginSetting; + if (authConfig != null) { + pluginSetting = new PluginSetting(authConfig.getPluginName(), authConfig.getPluginSettings()); + } else { + LOG.warn("Creating shuffle HTTP server without authentication. This is not secure."); + LOG.warn("To set up authentication for the shuffle server, configure the 'authentication' option under 'shuffle' in the iceberg source configuration."); + pluginSetting = new PluginSetting(ArmeriaHttpAuthenticationProvider.UNAUTHENTICATED_PLUGIN_NAME, + Collections.emptyMap()); + } + pluginSetting.setPipelineName("iceberg-source"); + return pluginFactory.loadPlugin(ArmeriaHttpAuthenticationProvider.class, pluginSetting); + } + public void shutdown() { LOG.info("Shutting down Iceberg service"); if (executor != null) { executor.shutdownNow(); } + if (shuffleHttpServer != null) { + shuffleHttpServer.stop(); + } } private void validateCoWTable(final Table table, final String tableName) { diff --git a/data-prepper-plugins/iceberg-source/src/main/java/org/opensearch/dataprepper/plugins/source/iceberg/IcebergSource.java b/data-prepper-plugins/iceberg-source/src/main/java/org/opensearch/dataprepper/plugins/source/iceberg/IcebergSource.java index 6338d1f996..285beebf0b 100644 --- a/data-prepper-plugins/iceberg-source/src/main/java/org/opensearch/dataprepper/plugins/source/iceberg/IcebergSource.java +++ b/data-prepper-plugins/iceberg-source/src/main/java/org/opensearch/dataprepper/plugins/source/iceberg/IcebergSource.java @@ -18,6 +18,7 @@ import org.opensearch.dataprepper.model.buffer.Buffer; import org.opensearch.dataprepper.model.event.Event; import org.opensearch.dataprepper.model.event.EventFactory; +import org.opensearch.dataprepper.model.plugin.PluginFactory; import org.opensearch.dataprepper.model.record.Record; import org.opensearch.dataprepper.model.source.Source; import org.opensearch.dataprepper.model.source.coordinator.SourcePartitionStoreItem; @@ -42,6 +43,7 @@ public class IcebergSource implements Source>, UsesEnhancedSourceC private final PluginMetrics pluginMetrics; private final AcknowledgementSetManager acknowledgementSetManager; private final EventFactory eventFactory; + private final PluginFactory pluginFactory; private EnhancedSourceCoordinator sourceCoordinator; private IcebergService icebergService; @@ -49,11 +51,13 @@ public class IcebergSource implements Source>, UsesEnhancedSourceC public IcebergSource(final IcebergSourceConfig sourceConfig, final PluginMetrics pluginMetrics, final AcknowledgementSetManager acknowledgementSetManager, - final EventFactory eventFactory) { + final EventFactory eventFactory, + final PluginFactory pluginFactory) { this.sourceConfig = sourceConfig; this.pluginMetrics = pluginMetrics; this.acknowledgementSetManager = acknowledgementSetManager; this.eventFactory = eventFactory; + this.pluginFactory = pluginFactory; LOG.info("Creating Iceberg Source for {} table(s)", sourceConfig.getTables().size()); } @@ -64,7 +68,7 @@ public void start(final Buffer> buffer) { sourceCoordinator.createPartition(new LeaderPartition()); - icebergService = new IcebergService(sourceCoordinator, sourceConfig, pluginMetrics, acknowledgementSetManager, eventFactory); + icebergService = new IcebergService(sourceCoordinator, sourceConfig, pluginMetrics, acknowledgementSetManager, eventFactory, pluginFactory); icebergService.start(buffer); } diff --git a/data-prepper-plugins/iceberg-source/src/main/java/org/opensearch/dataprepper/plugins/source/iceberg/IcebergSourceConfig.java b/data-prepper-plugins/iceberg-source/src/main/java/org/opensearch/dataprepper/plugins/source/iceberg/IcebergSourceConfig.java index 0dac830aed..2354dac22b 100644 --- a/data-prepper-plugins/iceberg-source/src/main/java/org/opensearch/dataprepper/plugins/source/iceberg/IcebergSourceConfig.java +++ b/data-prepper-plugins/iceberg-source/src/main/java/org/opensearch/dataprepper/plugins/source/iceberg/IcebergSourceConfig.java @@ -13,6 +13,7 @@ import com.fasterxml.jackson.annotation.JsonProperty; import jakarta.validation.Valid; import jakarta.validation.constraints.NotEmpty; +import org.opensearch.dataprepper.plugins.source.iceberg.shuffle.ShuffleConfig; import java.time.Duration; import java.util.List; @@ -32,6 +33,10 @@ public class IcebergSourceConfig { @JsonProperty("acknowledgments") private boolean acknowledgments = true; + @JsonProperty("shuffle") + @Valid + private ShuffleConfig shuffleConfig = new ShuffleConfig(); + public List getTables() { return tables; } @@ -43,4 +48,8 @@ public Duration getPollingInterval() { public boolean isAcknowledgmentsEnabled() { return acknowledgments; } + + public ShuffleConfig getShuffleConfig() { + return shuffleConfig; + } } diff --git a/data-prepper-plugins/iceberg-source/src/main/java/org/opensearch/dataprepper/plugins/source/iceberg/coordination/PartitionFactory.java b/data-prepper-plugins/iceberg-source/src/main/java/org/opensearch/dataprepper/plugins/source/iceberg/coordination/PartitionFactory.java index 4484b90acc..fece047f03 100644 --- a/data-prepper-plugins/iceberg-source/src/main/java/org/opensearch/dataprepper/plugins/source/iceberg/coordination/PartitionFactory.java +++ b/data-prepper-plugins/iceberg-source/src/main/java/org/opensearch/dataprepper/plugins/source/iceberg/coordination/PartitionFactory.java @@ -16,6 +16,8 @@ import org.opensearch.dataprepper.plugins.source.iceberg.coordination.partition.GlobalState; import org.opensearch.dataprepper.plugins.source.iceberg.coordination.partition.InitialLoadTaskPartition; import org.opensearch.dataprepper.plugins.source.iceberg.coordination.partition.LeaderPartition; +import org.opensearch.dataprepper.plugins.source.iceberg.coordination.partition.ShuffleReadPartition; +import org.opensearch.dataprepper.plugins.source.iceberg.coordination.partition.ShuffleWritePartition; import java.util.function.Function; @@ -26,14 +28,19 @@ public EnhancedSourcePartition apply(final SourcePartitionStoreItem partitionSto final String sourceIdentifier = partitionStoreItem.getSourceIdentifier(); final String partitionType = sourceIdentifier.substring(sourceIdentifier.lastIndexOf('|') + 1); - if (LeaderPartition.PARTITION_TYPE.equals(partitionType)) { - return new LeaderPartition(partitionStoreItem); - } else if (ChangelogTaskPartition.PARTITION_TYPE.equals(partitionType)) { - return new ChangelogTaskPartition(partitionStoreItem); - } else if (InitialLoadTaskPartition.PARTITION_TYPE.equals(partitionType)) { - return new InitialLoadTaskPartition(partitionStoreItem); - } else { - return new GlobalState(partitionStoreItem); + switch (partitionType) { + case LeaderPartition.PARTITION_TYPE: + return new LeaderPartition(partitionStoreItem); + case ChangelogTaskPartition.PARTITION_TYPE: + return new ChangelogTaskPartition(partitionStoreItem); + case InitialLoadTaskPartition.PARTITION_TYPE: + return new InitialLoadTaskPartition(partitionStoreItem); + case ShuffleWritePartition.PARTITION_TYPE: + return new ShuffleWritePartition(partitionStoreItem); + case ShuffleReadPartition.PARTITION_TYPE: + return new ShuffleReadPartition(partitionStoreItem); + default: + return new GlobalState(partitionStoreItem); } } } diff --git a/data-prepper-plugins/iceberg-source/src/main/java/org/opensearch/dataprepper/plugins/source/iceberg/coordination/partition/ShuffleReadPartition.java b/data-prepper-plugins/iceberg-source/src/main/java/org/opensearch/dataprepper/plugins/source/iceberg/coordination/partition/ShuffleReadPartition.java new file mode 100644 index 0000000000..d73a78fe48 --- /dev/null +++ b/data-prepper-plugins/iceberg-source/src/main/java/org/opensearch/dataprepper/plugins/source/iceberg/coordination/partition/ShuffleReadPartition.java @@ -0,0 +1,46 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + * + */ + +package org.opensearch.dataprepper.plugins.source.iceberg.coordination.partition; + +import org.opensearch.dataprepper.model.source.coordinator.SourcePartitionStoreItem; +import org.opensearch.dataprepper.model.source.coordinator.enhanced.EnhancedSourcePartition; +import org.opensearch.dataprepper.plugins.source.iceberg.coordination.state.ShuffleReadProgressState; + +import java.util.Optional; + +public class ShuffleReadPartition extends EnhancedSourcePartition { + + public static final String PARTITION_TYPE = "SHUFFLE_READ"; + + private final String partitionKey; + private final ShuffleReadProgressState state; + + public ShuffleReadPartition(final String partitionKey, final ShuffleReadProgressState state) { + this.partitionKey = partitionKey; + this.state = state; + } + + public ShuffleReadPartition(final SourcePartitionStoreItem sourcePartitionStoreItem) { + setSourcePartitionStoreItem(sourcePartitionStoreItem); + this.partitionKey = sourcePartitionStoreItem.getSourcePartitionKey(); + this.state = convertStringToPartitionProgressState(ShuffleReadProgressState.class, + sourcePartitionStoreItem.getPartitionProgressState()); + } + + @Override + public String getPartitionType() { return PARTITION_TYPE; } + + @Override + public String getPartitionKey() { return partitionKey; } + + @Override + public Optional getProgressState() { return Optional.of(state); } +} diff --git a/data-prepper-plugins/iceberg-source/src/main/java/org/opensearch/dataprepper/plugins/source/iceberg/coordination/partition/ShuffleWritePartition.java b/data-prepper-plugins/iceberg-source/src/main/java/org/opensearch/dataprepper/plugins/source/iceberg/coordination/partition/ShuffleWritePartition.java new file mode 100644 index 0000000000..4df8eb9b1a --- /dev/null +++ b/data-prepper-plugins/iceberg-source/src/main/java/org/opensearch/dataprepper/plugins/source/iceberg/coordination/partition/ShuffleWritePartition.java @@ -0,0 +1,46 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + * + */ + +package org.opensearch.dataprepper.plugins.source.iceberg.coordination.partition; + +import org.opensearch.dataprepper.model.source.coordinator.SourcePartitionStoreItem; +import org.opensearch.dataprepper.model.source.coordinator.enhanced.EnhancedSourcePartition; +import org.opensearch.dataprepper.plugins.source.iceberg.coordination.state.ShuffleWriteProgressState; + +import java.util.Optional; + +public class ShuffleWritePartition extends EnhancedSourcePartition { + + public static final String PARTITION_TYPE = "SHUFFLE_WRITE"; + + private final String partitionKey; + private final ShuffleWriteProgressState state; + + public ShuffleWritePartition(final String partitionKey, final ShuffleWriteProgressState state) { + this.partitionKey = partitionKey; + this.state = state; + } + + public ShuffleWritePartition(final SourcePartitionStoreItem sourcePartitionStoreItem) { + setSourcePartitionStoreItem(sourcePartitionStoreItem); + this.partitionKey = sourcePartitionStoreItem.getSourcePartitionKey(); + this.state = convertStringToPartitionProgressState(ShuffleWriteProgressState.class, + sourcePartitionStoreItem.getPartitionProgressState()); + } + + @Override + public String getPartitionType() { return PARTITION_TYPE; } + + @Override + public String getPartitionKey() { return partitionKey; } + + @Override + public Optional getProgressState() { return Optional.of(state); } +} diff --git a/data-prepper-plugins/iceberg-source/src/main/java/org/opensearch/dataprepper/plugins/source/iceberg/coordination/state/ShuffleReadProgressState.java b/data-prepper-plugins/iceberg-source/src/main/java/org/opensearch/dataprepper/plugins/source/iceberg/coordination/state/ShuffleReadProgressState.java new file mode 100644 index 0000000000..397f0b3be5 --- /dev/null +++ b/data-prepper-plugins/iceberg-source/src/main/java/org/opensearch/dataprepper/plugins/source/iceberg/coordination/state/ShuffleReadProgressState.java @@ -0,0 +1,54 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + * + */ + +package org.opensearch.dataprepper.plugins.source.iceberg.coordination.state; + +import com.fasterxml.jackson.annotation.JsonProperty; + +import java.util.List; + +public class ShuffleReadProgressState { + + @JsonProperty("snapshotId") + private long snapshotId; + + @JsonProperty("tableName") + private String tableName; + + @JsonProperty("partitionRangeStart") + private int partitionRangeStart; + + @JsonProperty("partitionRangeEnd") + private int partitionRangeEnd; + + @JsonProperty("shuffleWriteTaskIds") + private List shuffleWriteTaskIds; + + @JsonProperty("nodeAddresses") + private List nodeAddresses; + + public long getSnapshotId() { return snapshotId; } + public void setSnapshotId(final long snapshotId) { this.snapshotId = snapshotId; } + + public String getTableName() { return tableName; } + public void setTableName(final String tableName) { this.tableName = tableName; } + + public int getPartitionRangeStart() { return partitionRangeStart; } + public void setPartitionRangeStart(final int partitionRangeStart) { this.partitionRangeStart = partitionRangeStart; } + + public int getPartitionRangeEnd() { return partitionRangeEnd; } + public void setPartitionRangeEnd(final int partitionRangeEnd) { this.partitionRangeEnd = partitionRangeEnd; } + + public List getShuffleWriteTaskIds() { return shuffleWriteTaskIds; } + public void setShuffleWriteTaskIds(final List shuffleWriteTaskIds) { this.shuffleWriteTaskIds = shuffleWriteTaskIds; } + + public List getNodeAddresses() { return nodeAddresses; } + public void setNodeAddresses(final List nodeAddresses) { this.nodeAddresses = nodeAddresses; } +} diff --git a/data-prepper-plugins/iceberg-source/src/main/java/org/opensearch/dataprepper/plugins/source/iceberg/coordination/state/ShuffleWriteProgressState.java b/data-prepper-plugins/iceberg-source/src/main/java/org/opensearch/dataprepper/plugins/source/iceberg/coordination/state/ShuffleWriteProgressState.java new file mode 100644 index 0000000000..7c52998e8a --- /dev/null +++ b/data-prepper-plugins/iceberg-source/src/main/java/org/opensearch/dataprepper/plugins/source/iceberg/coordination/state/ShuffleWriteProgressState.java @@ -0,0 +1,58 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + * + */ + +package org.opensearch.dataprepper.plugins.source.iceberg.coordination.state; + +import com.fasterxml.jackson.annotation.JsonProperty; + +public class ShuffleWriteProgressState { + + @JsonProperty("snapshotId") + private long snapshotId; + + @JsonProperty("tableName") + private String tableName; + + @JsonProperty("dataFilePath") + private String dataFilePath; + + @JsonProperty("taskType") + private String taskType; + + @JsonProperty("shuffleTaskId") + private String shuffleTaskId; + + @JsonProperty("nodeAddress") + private String nodeAddress; + + @JsonProperty("changeOrdinal") + private int changeOrdinal; + + public long getSnapshotId() { return snapshotId; } + public void setSnapshotId(final long snapshotId) { this.snapshotId = snapshotId; } + + public String getTableName() { return tableName; } + public void setTableName(final String tableName) { this.tableName = tableName; } + + public String getDataFilePath() { return dataFilePath; } + public void setDataFilePath(final String dataFilePath) { this.dataFilePath = dataFilePath; } + + public String getTaskType() { return taskType; } + public void setTaskType(final String taskType) { this.taskType = taskType; } + + public String getShuffleTaskId() { return shuffleTaskId; } + public void setShuffleTaskId(final String shuffleTaskId) { this.shuffleTaskId = shuffleTaskId; } + + public String getNodeAddress() { return nodeAddress; } + public void setNodeAddress(final String nodeAddress) { this.nodeAddress = nodeAddress; } + + public int getChangeOrdinal() { return changeOrdinal; } + public void setChangeOrdinal(final int changeOrdinal) { this.changeOrdinal = changeOrdinal; } +} diff --git a/data-prepper-plugins/iceberg-source/src/main/java/org/opensearch/dataprepper/plugins/source/iceberg/leader/LeaderScheduler.java b/data-prepper-plugins/iceberg-source/src/main/java/org/opensearch/dataprepper/plugins/source/iceberg/leader/LeaderScheduler.java index b432386832..1a19b03a4d 100644 --- a/data-prepper-plugins/iceberg-source/src/main/java/org/opensearch/dataprepper/plugins/source/iceberg/leader/LeaderScheduler.java +++ b/data-prepper-plugins/iceberg-source/src/main/java/org/opensearch/dataprepper/plugins/source/iceberg/leader/LeaderScheduler.java @@ -10,7 +10,9 @@ package org.opensearch.dataprepper.plugins.source.iceberg.leader; +import org.apache.iceberg.ChangelogScanTask; import org.apache.iceberg.FileScanTask; +import org.apache.iceberg.IncrementalChangelogScan; import org.apache.iceberg.Snapshot; import org.apache.iceberg.Table; import org.apache.iceberg.TableScan; @@ -22,18 +24,30 @@ import org.opensearch.dataprepper.plugins.source.iceberg.coordination.partition.GlobalState; import org.opensearch.dataprepper.plugins.source.iceberg.coordination.partition.InitialLoadTaskPartition; import org.opensearch.dataprepper.plugins.source.iceberg.coordination.partition.LeaderPartition; +import org.opensearch.dataprepper.plugins.source.iceberg.coordination.partition.ShuffleReadPartition; +import org.opensearch.dataprepper.plugins.source.iceberg.coordination.partition.ShuffleWritePartition; import org.opensearch.dataprepper.plugins.source.iceberg.coordination.state.ChangelogTaskProgressState; import org.opensearch.dataprepper.plugins.source.iceberg.coordination.state.InitialLoadTaskProgressState; import org.opensearch.dataprepper.plugins.source.iceberg.coordination.state.LeaderProgressState; +import org.opensearch.dataprepper.plugins.source.iceberg.coordination.state.ShuffleReadProgressState; +import org.opensearch.dataprepper.plugins.source.iceberg.coordination.state.ShuffleWriteProgressState; +import org.opensearch.dataprepper.plugins.source.iceberg.shuffle.ShuffleConfig; +import org.opensearch.dataprepper.plugins.source.iceberg.shuffle.ShuffleNodeClient; +import org.opensearch.dataprepper.plugins.source.iceberg.shuffle.ShufflePartitionCoalescer; +import org.opensearch.dataprepper.plugins.source.iceberg.shuffle.ShuffleStorage; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.security.MessageDigest; +import java.security.NoSuchAlgorithmException; import java.time.Duration; import java.util.ArrayList; +import java.util.Collections; import java.util.List; import java.util.Map; import java.util.Optional; -import java.util.UUID; public class LeaderScheduler implements Runnable { @@ -41,22 +55,29 @@ public class LeaderScheduler implements Runnable { private static final Duration DEFAULT_EXTEND_LEASE_DURATION = Duration.ofMinutes(3); private static final Duration COMPLETION_CHECK_INTERVAL = Duration.ofSeconds(2); static final String SNAPSHOT_COMPLETION_PREFIX = "snapshot-completion-"; + public static final String SHUFFLE_FAILED_PREFIX = "shuffle-failed-"; private final EnhancedSourceCoordinator sourceCoordinator; private final Map tableConfigs; private final Duration pollingInterval; private final Map tables; + private final ShuffleStorage shuffleStorage; + private final ShuffleConfig shuffleConfig; private final TaskGrouper taskGrouper = new TaskGrouper(); private LeaderPartition leaderPartition; public LeaderScheduler(final EnhancedSourceCoordinator sourceCoordinator, final Map tableConfigs, final Duration pollingInterval, - final Map tables) { + final Map tables, + final ShuffleStorage shuffleStorage, + final ShuffleConfig shuffleConfig) { this.sourceCoordinator = sourceCoordinator; this.tableConfigs = tableConfigs; this.pollingInterval = pollingInterval; this.tables = tables; + this.shuffleStorage = shuffleStorage; + this.shuffleConfig = shuffleConfig; } @Override @@ -143,7 +164,7 @@ private void performInitialLoad() { taskState.setDataFilePath(task.file().location()); taskState.setTotalRecords(task.file().recordCount()); - final String partitionKey = tableName + "|initial|" + UUID.randomUUID(); + final String partitionKey = tableName + "|initial|" + snapshotId + "|" + task.file().location(); sourceCoordinator.createPartition(new InitialLoadTaskPartition(partitionKey, taskState)); taskCount++; } @@ -222,31 +243,30 @@ private void pollAndPlan() { LOG.info("Planning snapshot {} for table {} (operation: {})", snapshot.snapshotId(), tableName, snapshot.operation()); - final List taskGroups = - taskGrouper.planAndGroup(table, tableName, parentId, snapshot.snapshotId()); - - if (taskGroups.isEmpty()) { - progressState.setLastProcessedSnapshotId(snapshot.snapshotId()); - continue; - } - - // Create a completion tracker in GlobalState - final String completionKey = SNAPSHOT_COMPLETION_PREFIX + snapshot.snapshotId(); - sourceCoordinator.createPartition(new GlobalState(completionKey, - Map.of("total", taskGroups.size(), "completed", 0))); - - for (final ChangelogTaskProgressState taskState : taskGroups) { - final String partitionKey = tableName + "|" + snapshot.snapshotId() - + "|" + UUID.randomUUID(); - sourceCoordinator.createPartition( - new ChangelogTaskPartition(partitionKey, taskState)); + // Scan once and decide path based on DELETED file presence + final List taskInfos = scanChangelogTasks( + table, tableName, parentId, snapshot.snapshotId()); + final boolean hasDeleted = taskInfos.stream() + .anyMatch(t -> "DELETED".equals(t.taskType)); + + if (hasDeleted) { + final TableConfig tableConfig = tableConfigs.get(tableName); + if (tableConfig.getIdentifierColumns().isEmpty()) { + throw new IllegalStateException( + "Snapshot " + snapshot.snapshotId() + " for table " + tableName + + " contains UPDATE/DELETE operations but identifier_columns is not configured. " + + "identifier_columns is required for correct CDC processing of UPDATE/DELETE."); + } + final List shuffleTasks = + taskGrouper.planShuffleWriteTasks(taskInfos, tableName, snapshot.snapshotId()); + if (!processShuffleSnapshot(tableName, snapshot.snapshotId(), shuffleTasks)) { + LOG.warn("Shuffle failed for snapshot {}, will retry on next poll", snapshot.snapshotId()); + break; + } + } else { + processInsertOnlySnapshot(tableName, snapshot.snapshotId(), taskInfos); } - LOG.info("Created {} partition(s) for snapshot {}, waiting for completion", - taskGroups.size(), snapshot.snapshotId()); - - waitForSnapshotComplete(completionKey, taskGroups.size()); - progressState.setLastProcessedSnapshotId(snapshot.snapshotId()); sourceCoordinator.saveProgressStateForPartition( leaderPartition, DEFAULT_EXTEND_LEASE_DURATION); @@ -304,4 +324,217 @@ private void waitForSnapshotComplete(final String completionKey, final int total } } } + + private List scanChangelogTasks( + final Table table, final String tableName, + final long fromSnapshotIdExclusive, final long toSnapshotId) { + final IncrementalChangelogScan scan = table.newIncrementalChangelogScan() + .fromSnapshotExclusive(fromSnapshotIdExclusive) + .toSnapshot(toSnapshotId); + + final List taskInfos = new ArrayList<>(); + try (CloseableIterable tasks = scan.planFiles()) { + for (final ChangelogScanTask task : tasks) { + taskInfos.add(TaskGrouper.TaskInfo.from(task)); + } + } catch (final IOException e) { + throw new RuntimeException("Failed to plan changelog scan for " + tableName, e); + } + return taskInfos; + } + + private void processInsertOnlySnapshot(final String tableName, final long snapshotId, + final List taskInfos) { + final List taskGroups = + taskGrouper.planInsertOnlyTasks(taskInfos, tableName, snapshotId); + + if (taskGroups.isEmpty()) { + return; + } + + final String completionKey = SNAPSHOT_COMPLETION_PREFIX + snapshotId; + sourceCoordinator.createPartition(new GlobalState(completionKey, + Map.of("total", taskGroups.size(), "completed", 0))); + + for (final ChangelogTaskProgressState taskState : taskGroups) { + final String filesHash = deterministicHash(taskState.getDataFilePaths()); + final String partitionKey = tableName + "|" + snapshotId + "|" + filesHash; + sourceCoordinator.createPartition(new ChangelogTaskPartition(partitionKey, taskState)); + } + + LOG.info("Created {} INSERT-only partition(s) for snapshot {}", taskGroups.size(), snapshotId); + waitForSnapshotComplete(completionKey, taskGroups.size()); + } + + private boolean processShuffleSnapshot(final String tableName, final long snapshotId, + final List shuffleTasks) { + final String snapshotIdStr = String.valueOf(snapshotId); + + // Phase 1: Create SHUFFLE_WRITE tasks + // Create location tracker GlobalState (workers will write their address here on completion) + final String locationKey = "shuffle-locations-" + snapshotIdStr; + sourceCoordinator.createPartition(new GlobalState(locationKey, new java.util.HashMap<>())); + + // Create completion key before partitions to avoid race condition where workers + // complete and try to increment before the key exists + final String writeCompletionKey = SNAPSHOT_COMPLETION_PREFIX + "sw-" + snapshotId; + sourceCoordinator.createPartition(new GlobalState(writeCompletionKey, + Map.of("total", shuffleTasks.size(), "completed", 0))); + + for (final ShuffleWriteProgressState taskState : shuffleTasks) { + final String shuffleTaskId = deterministicHash(List.of(taskState.getDataFilePath())); + taskState.setShuffleTaskId(shuffleTaskId); + + final String partitionKey = tableName + "|sw|" + snapshotId + "|" + shuffleTaskId; + sourceCoordinator.createPartition(new ShuffleWritePartition(partitionKey, taskState)); + } + + LOG.info("Created {} SHUFFLE_WRITE task(s) for snapshot {}", shuffleTasks.size(), snapshotId); + waitForShuffleComplete(writeCompletionKey, shuffleTasks.size(), snapshotIdStr); + + // Check if shuffle failed + if (isShuffleFailed(snapshotIdStr)) { + LOG.warn("Shuffle failed for snapshot {}, skipping", snapshotId); + cleanupAllNodes(snapshotIdStr, locationKey); + return false; + } + + // Barrier: collect index data from all nodes and coalesce + // Read shuffle write locations from GlobalState first (need node addresses to fetch remote indexes) + final Optional locationPartition = sourceCoordinator.getPartition(locationKey); + final Map locationMap = locationPartition.map(enhancedSourcePartition -> ((GlobalState) enhancedSourcePartition).getProgressState().orElse(Map.of())).orElseGet(Map::of); + + final List completedTaskIds = new ArrayList<>(); + final List completedNodeAddresses = new ArrayList<>(); + for (final Map.Entry entry : locationMap.entrySet()) { + completedTaskIds.add(entry.getKey()); + completedNodeAddresses.add(String.valueOf(entry.getValue())); + } + LOG.info("Collected {} shuffle write locations for snapshot {}", completedTaskIds.size(), snapshotId); + + final int numPartitions = shuffleConfig.getPartitions(); + final ShuffleNodeClient client = new ShuffleNodeClient(shuffleConfig); + final long[] partitionSizes = client.collectPartitionSizes( + shuffleStorage, snapshotIdStr, completedTaskIds, completedNodeAddresses, numPartitions); + + final ShufflePartitionCoalescer coalescer = + new ShufflePartitionCoalescer(shuffleConfig.getTargetPartitionSizeBytes()); + final List ranges = coalescer.coalesce(partitionSizes); + + if (ranges.isEmpty()) { + LOG.info("No data after shuffle for snapshot {}", snapshotId); + cleanupAllNodes(snapshotIdStr, locationKey); + return true; + } + + // Phase 2: Create SHUFFLE_READ tasks + final String readCompletionKey = SNAPSHOT_COMPLETION_PREFIX + "sr-" + snapshotId; + sourceCoordinator.createPartition(new GlobalState(readCompletionKey, + Map.of("total", ranges.size(), "completed", 0))); + + for (final ShufflePartitionCoalescer.PartitionRange range : ranges) { + final ShuffleReadProgressState readState = new ShuffleReadProgressState(); + readState.setSnapshotId(snapshotId); + readState.setTableName(tableName); + readState.setPartitionRangeStart(range.getStartPartition()); + readState.setPartitionRangeEnd(range.getEndPartitionInclusive()); + readState.setShuffleWriteTaskIds(completedTaskIds); + readState.setNodeAddresses(completedNodeAddresses); + + final String partitionKey = tableName + "|sr|" + snapshotId + "|" + range.getStartPartition() + "-" + range.getEndPartitionInclusive(); + sourceCoordinator.createPartition(new ShuffleReadPartition(partitionKey, readState)); + } + + LOG.info("Created {} SHUFFLE_READ task(s) for snapshot {} (coalesced from {} partitions)", + ranges.size(), snapshotId, numPartitions); + waitForShuffleComplete(readCompletionKey, ranges.size(), snapshotIdStr); + + if (isShuffleFailed(snapshotIdStr)) { + LOG.warn("Shuffle read failed for snapshot {}, skipping", snapshotId); + cleanupAllNodes(snapshotIdStr, locationKey); + return false; + } + + cleanupAllNodes(snapshotIdStr, locationKey); + return true; + } + + private void waitForShuffleComplete(final String completionKey, final int totalPartitions, + final String snapshotIdStr) { + while (!Thread.currentThread().isInterrupted()) { + if (isShuffleFailed(snapshotIdStr)) { + return; + } + + final Optional state = + sourceCoordinator.getPartition(completionKey); + + if (state.isPresent()) { + final GlobalState gs = (GlobalState) state.get(); + final Map progress = gs.getProgressState().orElse(Map.of()); + final int completed = ((Number) progress.getOrDefault("completed", 0)).intValue(); + if (completed >= totalPartitions) { + return; + } + } + + try { + sourceCoordinator.saveProgressStateForPartition( + leaderPartition, DEFAULT_EXTEND_LEASE_DURATION); + } catch (final Exception e) { + LOG.warn("Failed to extend lease while waiting for shuffle", e); + } + + try { + Thread.sleep(COMPLETION_CHECK_INTERVAL.toMillis()); + } catch (final InterruptedException e) { + Thread.currentThread().interrupt(); + return; + } + } + } + + private boolean isShuffleFailed(final String snapshotIdStr) { + final Optional failedState = + sourceCoordinator.getPartition(SHUFFLE_FAILED_PREFIX + snapshotIdStr); + if (failedState.isPresent()) { + final GlobalState gs = (GlobalState) failedState.get(); + final Map progress = gs.getProgressState().orElse(Map.of()); + return Boolean.TRUE.equals(progress.get("failed")); + } + return false; + } + + private void cleanupAllNodes(final String snapshotId, final String locationKey) { + shuffleStorage.cleanup(snapshotId); + final Optional locationPartition = sourceCoordinator.getPartition(locationKey); + locationPartition.ifPresent(p -> { + final Map locations = ((GlobalState) p).getProgressState().orElse(Map.of()); + final ShuffleNodeClient client = new ShuffleNodeClient(shuffleConfig); + locations.values().stream() + .map(String::valueOf) + .distinct() + .filter(addr -> !ShuffleNodeClient.isLocalAddress(addr)) + .forEach(addr -> client.requestCleanup(addr, snapshotId)); + }); + } + + private static String deterministicHash(final List values) { + final List sorted = new ArrayList<>(values); + Collections.sort(sorted); + try { + final MessageDigest md = MessageDigest.getInstance("SHA-256"); + for (final String v : sorted) { + md.update(v.getBytes(StandardCharsets.UTF_8)); + } + final byte[] digest = md.digest(); + final StringBuilder sb = new StringBuilder(); + for (int i = 0; i < 8; i++) { + sb.append(String.format("%02x", digest[i])); + } + return sb.toString(); + } catch (final NoSuchAlgorithmException e) { + throw new RuntimeException(e); + } + } } diff --git a/data-prepper-plugins/iceberg-source/src/main/java/org/opensearch/dataprepper/plugins/source/iceberg/leader/TaskGrouper.java b/data-prepper-plugins/iceberg-source/src/main/java/org/opensearch/dataprepper/plugins/source/iceberg/leader/TaskGrouper.java index 56fe5eb1f0..e9e272e013 100644 --- a/data-prepper-plugins/iceberg-source/src/main/java/org/opensearch/dataprepper/plugins/source/iceberg/leader/TaskGrouper.java +++ b/data-prepper-plugins/iceberg-source/src/main/java/org/opensearch/dataprepper/plugins/source/iceberg/leader/TaskGrouper.java @@ -12,234 +12,98 @@ import org.apache.iceberg.AddedRowsScanTask; import org.apache.iceberg.ChangelogScanTask; -import org.apache.iceberg.ContentScanTask; import org.apache.iceberg.DeletedDataFileScanTask; -import org.apache.iceberg.IncrementalChangelogScan; -import org.apache.iceberg.StructLike; -import org.apache.iceberg.Table; -import org.apache.iceberg.io.CloseableIterable; import org.opensearch.dataprepper.plugins.source.iceberg.coordination.state.ChangelogTaskProgressState; +import org.opensearch.dataprepper.plugins.source.iceberg.coordination.state.ShuffleWriteProgressState; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.IOException; import java.util.ArrayList; -import java.util.HashMap; -import java.util.Iterator; import java.util.List; -import java.util.Map; -import java.util.stream.Collectors; /** - * Groups ChangelogScanTasks for distributed processing. + * Converts scan results into task states for distributed processing. *

- * Grouping stages: - * 1. Iceberg partition isolation: tasks grouped by partition value - * 2. Bounds-based pairing: within a partition, DELETED-ADDED file pairs with matching - * bounds are split into individual groups for maximum distribution + * For INSERT-only snapshots, each data file becomes an independent ChangelogTask. + * For snapshots containing DELETED files, each data file becomes a SHUFFLE_WRITE task. */ public class TaskGrouper { private static final Logger LOG = LoggerFactory.getLogger(TaskGrouper.class); - private static final String UNPARTITIONED_KEY = "__unpartitioned__"; - public List planAndGroup( - final Table table, + /** + * Converts scan results into independent ChangelogTask states for INSERT-only snapshots. + */ + public List planInsertOnlyTasks( + final List taskInfos, final String tableName, - final long fromSnapshotIdExclusive, - final long toSnapshotId) { - - final IncrementalChangelogScan scan = table.newIncrementalChangelogScan() - .fromSnapshotExclusive(fromSnapshotIdExclusive) - .toSnapshot(toSnapshotId) - .includeColumnStats(); + final long snapshotId) { - // Stage 1: Group by Iceberg partition - final Map> tasksByPartition = new HashMap<>(); - - try (CloseableIterable tasks = scan.planFiles()) { - for (final ChangelogScanTask task : tasks) { - final String partitionKey = extractPartitionKey(task); - tasksByPartition - .computeIfAbsent(partitionKey, k -> new ArrayList<>()) - .add(TaskInfo.from(task)); - } - } catch (final IOException e) { - throw new RuntimeException("Failed to plan changelog scan for " + tableName, e); - } - - // Stage 2: Within each partition, attempt bounds-based pairing final List result = new ArrayList<>(); - - for (final Map.Entry> entry : tasksByPartition.entrySet()) { - final List partitionTasks = entry.getValue(); - final List> groups = pairByBounds(partitionTasks); - - for (final List group : groups) { - final ChangelogTaskProgressState state = new ChangelogTaskProgressState(); - state.setSnapshotId(toSnapshotId); - state.setTableName(tableName); - state.setDataFilePaths(group.stream().map(t -> t.filePath).collect(Collectors.toList())); - state.setTaskTypes(group.stream().map(t -> t.taskType).collect(Collectors.toList())); - state.setTotalRecords(group.stream().mapToLong(t -> t.recordCount).sum()); - result.add(state); - } + for (final TaskInfo info : taskInfos) { + final ChangelogTaskProgressState state = new ChangelogTaskProgressState(); + state.setSnapshotId(snapshotId); + state.setTableName(tableName); + state.setDataFilePaths(List.of(info.filePath)); + state.setTaskTypes(List.of(info.taskType)); + state.setTotalRecords(info.recordCount); + result.add(state); } - LOG.info("Planned {} task group(s) for table {} (snapshot {} -> {})", - result.size(), tableName, fromSnapshotIdExclusive, toSnapshotId); + LOG.info("Planned {} CHANGELOG task(s) for table {} (snapshot {})", + result.size(), tableName, snapshotId); return result; } /** - * Pairs DELETED and ADDED tasks by matching lower/upper bounds. - * Returns a list of groups, where each group is a list of tasks to process together. + * Converts scan results into SHUFFLE_WRITE task states for snapshots containing DELETED files. */ - List> pairByBounds(final List tasks) { - final List deleted = new ArrayList<>(); - final List added = new ArrayList<>(); - - for (final TaskInfo task : tasks) { - if ("DELETED".equals(task.taskType)) { - deleted.add(task); - } else { - added.add(task); - } - } - - // If no deleted files, each added file is an independent group (INSERT only) - if (deleted.isEmpty()) { - final List> result = new ArrayList<>(); - for (final TaskInfo task : added) { - result.add(List.of(task)); - } - return result; - } - - // If no added files, each deleted file is an independent group (full-file delete) - if (added.isEmpty()) { - final List> result = new ArrayList<>(); - for (final TaskInfo task : deleted) { - result.add(List.of(task)); - } - return result; - } - - // Try to pair DELETED-ADDED by bounds - final List> paired = new ArrayList<>(); - final List unpairedDeleted = new ArrayList<>(deleted); - final List unpairedAdded = new ArrayList<>(added); - - final Iterator delIter = unpairedDeleted.iterator(); - while (delIter.hasNext()) { - final TaskInfo del = delIter.next(); - TaskInfo matchedAdd = null; - int matchCount = 0; - - for (final TaskInfo add : unpairedAdded) { - if (boundsMatch(del, add)) { - matchedAdd = add; - matchCount++; - } - } - - // Only pair if exactly one match (unambiguous) - if (matchCount == 1) { - paired.add(List.of(del, matchedAdd)); - delIter.remove(); - unpairedAdded.remove(matchedAdd); - } - } - - // Unpaired DELETED-only or ADDED-only -> individual groups - for (final TaskInfo task : unpairedDeleted) { - if (unpairedAdded.isEmpty()) { - paired.add(List.of(task)); - } - } - for (final TaskInfo task : unpairedAdded) { - if (unpairedDeleted.isEmpty()) { - paired.add(List.of(task)); - } - } - - // If both unpaired DELETED and ADDED remain -> single fallback group. - // These must stay together so that carryover removal can match DELETE-INSERT - // pairs from different files on the same worker node. - // TODO: A Source-layer shuffle via PeerForwarder extension could distribute - // these rows by hash of all columns, enabling parallel carryover removal. - // See RFC #6552 Section 5.2.4 for details. - if (!unpairedDeleted.isEmpty() && !unpairedAdded.isEmpty()) { - final List fallback = new ArrayList<>(); - fallback.addAll(unpairedDeleted); - fallback.addAll(unpairedAdded); - paired.add(fallback); - } - - return paired; - } - - private boolean boundsMatch(final TaskInfo a, final TaskInfo b) { - if (a.boundsKey == null || b.boundsKey == null) { - return false; + public List planShuffleWriteTasks( + final List taskInfos, + final String tableName, + final long snapshotId) { + + final List tasks = new ArrayList<>(); + for (final TaskInfo info : taskInfos) { + final ShuffleWriteProgressState state = new ShuffleWriteProgressState(); + state.setSnapshotId(snapshotId); + state.setTableName(tableName); + state.setDataFilePath(info.filePath); + state.setTaskType(info.taskType); + state.setChangeOrdinal(info.changeOrdinal); + tasks.add(state); } - return a.boundsKey.equals(b.boundsKey); - } - private String extractPartitionKey(final ChangelogScanTask task) { - if (task instanceof ContentScanTask) { - final StructLike partition = ((ContentScanTask) task).file().partition(); - if (partition.size() == 0) { - return UNPARTITIONED_KEY; - } - final StringBuilder sb = new StringBuilder(); - for (int i = 0; i < partition.size(); i++) { - if (i > 0) { - sb.append("|"); - } - sb.append(partition.get(i, Object.class)); - } - return sb.toString(); - } - return UNPARTITIONED_KEY; + LOG.info("Planned {} SHUFFLE_WRITE task(s) for table {} (snapshot {})", + tasks.size(), tableName, snapshotId); + return tasks; } - static class TaskInfo { + public static class TaskInfo { final String filePath; final String taskType; final long recordCount; - final String boundsKey; // serialized lower+upper bounds for pairing + final int changeOrdinal; - TaskInfo(final String filePath, final String taskType, - final long recordCount, final String boundsKey) { + public TaskInfo(final String filePath, final String taskType, + final long recordCount, final int changeOrdinal) { this.filePath = filePath; this.taskType = taskType; this.recordCount = recordCount; - this.boundsKey = boundsKey; + this.changeOrdinal = changeOrdinal; } - static TaskInfo from(final ChangelogScanTask task) { + public static TaskInfo from(final ChangelogScanTask task) { if (task instanceof AddedRowsScanTask) { final AddedRowsScanTask t = (AddedRowsScanTask) task; - return new TaskInfo( - t.file().location(), "ADDED", - t.file().recordCount(), extractBoundsKey(t)); + return new TaskInfo(t.file().location(), "ADDED", + t.file().recordCount(), task.changeOrdinal()); } else if (task instanceof DeletedDataFileScanTask) { final DeletedDataFileScanTask t = (DeletedDataFileScanTask) task; - return new TaskInfo( - t.file().location(), "DELETED", - t.file().recordCount(), extractBoundsKey(t)); + return new TaskInfo(t.file().location(), "DELETED", + t.file().recordCount(), task.changeOrdinal()); } throw new IllegalArgumentException("Unsupported ChangelogScanTask type: " + task.getClass().getName()); } - - private static String extractBoundsKey(final ContentScanTask task) { - final var lower = task.file().lowerBounds(); - final var upper = task.file().upperBounds(); - if (lower == null || upper == null || lower.isEmpty() || upper.isEmpty()) { - return null; - } - return lower + "|" + upper; - } } } diff --git a/data-prepper-plugins/iceberg-source/src/main/java/org/opensearch/dataprepper/plugins/source/iceberg/shuffle/LocalDiskShuffleReader.java b/data-prepper-plugins/iceberg-source/src/main/java/org/opensearch/dataprepper/plugins/source/iceberg/shuffle/LocalDiskShuffleReader.java new file mode 100644 index 0000000000..59f59d683d --- /dev/null +++ b/data-prepper-plugins/iceberg-source/src/main/java/org/opensearch/dataprepper/plugins/source/iceberg/shuffle/LocalDiskShuffleReader.java @@ -0,0 +1,120 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + * + */ + +package org.opensearch.dataprepper.plugins.source.iceberg.shuffle; + +import net.jpountz.lz4.LZ4Factory; +import net.jpountz.lz4.LZ4FastDecompressor; + +import java.io.IOException; +import java.io.RandomAccessFile; +import java.io.UncheckedIOException; +import java.nio.ByteBuffer; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.ArrayList; +import java.util.List; + +/** + * Reads LZ4-compressed shuffle data from a data file + index file written by {@link LocalDiskShuffleWriter}. + */ +public class LocalDiskShuffleReader implements ShuffleReader { + + private static final LZ4FastDecompressor DECOMPRESSOR = LZ4Factory.fastestInstance().fastDecompressor(); + private static final int BLOCK_HEADER_SIZE = Integer.BYTES + Integer.BYTES; + + private final Path dataFilePath; + private final Path indexFilePath; + + LocalDiskShuffleReader(final Path dataFilePath, final Path indexFilePath) { + this.dataFilePath = dataFilePath; + this.indexFilePath = indexFilePath; + } + + @Override + public long[] readIndex() { + try { + final byte[] indexBytes = Files.readAllBytes(indexFilePath); + final ByteBuffer buf = ByteBuffer.wrap(indexBytes); + final long[] offsets = new long[indexBytes.length / Long.BYTES]; + for (int i = 0; i < offsets.length; i++) { + offsets[i] = buf.getLong(); + } + return offsets; + } catch (final IOException e) { + throw new UncheckedIOException("Failed to read shuffle index: " + indexFilePath, e); + } + } + + @Override + public List readPartitions(final int startPartition, final int endPartitionInclusive) { + final long[] offsets = readIndex(); + final List allRecords = new ArrayList<>(); + + for (int p = startPartition; p <= endPartitionInclusive; p++) { + final long start = offsets[p]; + final long end = offsets[p + 1]; + if (start == end) { + continue; + } + final byte[] compressedBlock = readBytes(start, (int) (end - start)); + final byte[] uncompressed = decompressBlock(compressedBlock); + allRecords.addAll(parseRecords(uncompressed)); + } + + return allRecords; + } + + @Override + public byte[] readBytes(final long offset, final int length) { + if (length == 0) { + return new byte[0]; + } + try (RandomAccessFile raf = new RandomAccessFile(dataFilePath.toFile(), "r")) { + raf.seek(offset); + final byte[] data = new byte[length]; + raf.readFully(data); + return data; + } catch (final IOException e) { + throw new UncheckedIOException("Failed to read shuffle data: " + dataFilePath, e); + } + } + + @Override + public void close() { + // No persistent resources to close + } + + /** + * Decompresses a block: [4 bytes: uncompressed size][4 bytes: compressed size][compressed data] + */ + public static byte[] decompressBlock(final byte[] block) { + final ByteBuffer buf = ByteBuffer.wrap(block); + final int uncompressedSize = buf.getInt(); + final int compressedSize = buf.getInt(); + final byte[] uncompressed = new byte[uncompressedSize]; + DECOMPRESSOR.decompress(block, BLOCK_HEADER_SIZE, uncompressed, 0, uncompressedSize); + return uncompressed; + } + + public static List parseRecords(final byte[] data) { + final List records = new ArrayList<>(); + final ByteBuffer buf = ByteBuffer.wrap(data); + while (buf.hasRemaining()) { + final int recordLength = buf.getInt(); + final byte operation = buf.get(); + final int changeOrdinal = buf.getInt(); + final byte[] serialized = new byte[recordLength - ShuffleRecord.OPERATION_SIZE - ShuffleRecord.CHANGE_ORDINAL_SIZE]; + buf.get(serialized); + records.add(new ShuffleRecord(operation, changeOrdinal, serialized)); + } + return records; + } +} diff --git a/data-prepper-plugins/iceberg-source/src/main/java/org/opensearch/dataprepper/plugins/source/iceberg/shuffle/LocalDiskShuffleStorage.java b/data-prepper-plugins/iceberg-source/src/main/java/org/opensearch/dataprepper/plugins/source/iceberg/shuffle/LocalDiskShuffleStorage.java new file mode 100644 index 0000000000..dd7283a190 --- /dev/null +++ b/data-prepper-plugins/iceberg-source/src/main/java/org/opensearch/dataprepper/plugins/source/iceberg/shuffle/LocalDiskShuffleStorage.java @@ -0,0 +1,154 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + * + */ + +package org.opensearch.dataprepper.plugins.source.iceberg.shuffle; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.io.UncheckedIOException; +import java.nio.file.DirectoryStream; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.ArrayList; +import java.util.List; + +/** + * Local disk implementation of {@link ShuffleStorage}. + * Files are stored under {baseDir}/{snapshotId}/shuffle-{taskId}.data and .index. + */ +public class LocalDiskShuffleStorage implements ShuffleStorage { + + private static final Logger LOG = LoggerFactory.getLogger(LocalDiskShuffleStorage.class); + private static final String DATA_SUFFIX = ".data"; + private static final String INDEX_SUFFIX = ".index"; + + private final Path baseDir; + + public LocalDiskShuffleStorage(final Path baseDir) { + this.baseDir = baseDir; + } + + Path dataFilePath(final String snapshotId, final String taskId) { + final Path snapshotDir = validateSubdirectory(baseDir.resolve(snapshotId)); + return validateSubdirectory(snapshotDir.resolve("shuffle-" + taskId + DATA_SUFFIX)); + } + + Path indexFilePath(final String snapshotId, final String taskId) { + final Path snapshotDir = validateSubdirectory(baseDir.resolve(snapshotId)); + return validateSubdirectory(snapshotDir.resolve("shuffle-" + taskId + INDEX_SUFFIX)); + } + + @Override + public ShuffleWriter createWriter(final String snapshotId, final String taskId, final int numPartitions) { + final Path snapshotDir = validateSubdirectory(baseDir.resolve(snapshotId)); + try { + Files.createDirectories(snapshotDir); + } catch (final IOException e) { + throw new UncheckedIOException("Failed to create shuffle directory: " + snapshotDir, e); + } + return new LocalDiskShuffleWriter( + dataFilePath(snapshotId, taskId), + indexFilePath(snapshotId, taskId), + numPartitions); + } + + @Override + public ShuffleReader createReader(final String snapshotId, final String taskId) { + return new LocalDiskShuffleReader( + dataFilePath(snapshotId, taskId), + indexFilePath(snapshotId, taskId)); + } + + @Override + public long[] getPartitionSizes(final String snapshotId, final int numPartitions) { + final long[] sizes = new long[numPartitions]; + for (final String taskId : getTaskIds(snapshotId)) { + try (ShuffleReader reader = createReader(snapshotId, taskId)) { + final long[] offsets = reader.readIndex(); + for (int i = 0; i < numPartitions && i + 1 < offsets.length; i++) { + sizes[i] += offsets[i + 1] - offsets[i]; + } + } catch (final IOException e) { + throw new UncheckedIOException("Failed to read index for task " + taskId, e); + } + } + return sizes; + } + + @Override + public List getTaskIds(final String snapshotId) { + final Path snapshotDir = validateSubdirectory(baseDir.resolve(snapshotId)); + final List taskIds = new ArrayList<>(); + if (!Files.exists(snapshotDir)) { + return taskIds; + } + try (DirectoryStream stream = Files.newDirectoryStream(snapshotDir, "*" + INDEX_SUFFIX)) { + for (final Path path : stream) { + final String fileName = path.getFileName().toString(); + // shuffle-{taskId}.index -> taskId + taskIds.add(fileName.substring("shuffle-".length(), fileName.length() - INDEX_SUFFIX.length())); + } + } catch (final IOException e) { + throw new UncheckedIOException("Failed to list shuffle files in " + snapshotDir, e); + } + return taskIds; + } + + @Override + public void cleanup(final String snapshotId) { + final Path snapshotDir = validateSubdirectory(baseDir.resolve(snapshotId)); + deleteDirectory(snapshotDir); + } + + @Override + public void cleanupAll() { + if (!Files.exists(baseDir)) { + return; + } + try (DirectoryStream stream = Files.newDirectoryStream(baseDir)) { + for (final Path entry : stream) { + if (Files.isDirectory(entry)) { + deleteDirectory(entry); + } else { + Files.deleteIfExists(entry); + } + } + } catch (final IOException e) { + LOG.warn("Failed to clean up shuffle base directory: {}", baseDir, e); + } + } + + private Path validateSubdirectory(final Path path) { + final Path normalized = path.normalize(); + if (!normalized.startsWith(baseDir)) { + throw new IllegalArgumentException("Invalid path: resolved path is outside the base directory"); + } + return normalized; + } + private void deleteDirectory(final Path dir) { + if (!Files.exists(dir)) { + return; + } + try (DirectoryStream stream = Files.newDirectoryStream(dir)) { + for (final Path entry : stream) { + if (Files.isDirectory(entry)) { + deleteDirectory(entry); + } else { + Files.deleteIfExists(entry); + } + } + Files.deleteIfExists(dir); + } catch (final IOException e) { + LOG.warn("Failed to delete shuffle directory: {}", dir, e); + } + } +} diff --git a/data-prepper-plugins/iceberg-source/src/main/java/org/opensearch/dataprepper/plugins/source/iceberg/shuffle/LocalDiskShuffleWriter.java b/data-prepper-plugins/iceberg-source/src/main/java/org/opensearch/dataprepper/plugins/source/iceberg/shuffle/LocalDiskShuffleWriter.java new file mode 100644 index 0000000000..44da2809cd --- /dev/null +++ b/data-prepper-plugins/iceberg-source/src/main/java/org/opensearch/dataprepper/plugins/source/iceberg/shuffle/LocalDiskShuffleWriter.java @@ -0,0 +1,164 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + * + */ + +package org.opensearch.dataprepper.plugins.source.iceberg.shuffle; + +import net.jpountz.lz4.LZ4Compressor; +import net.jpountz.lz4.LZ4Factory; + +import java.io.BufferedOutputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.OutputStream; +import java.io.UncheckedIOException; +import java.nio.ByteBuffer; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.ArrayList; +import java.util.Comparator; +import java.util.List; + +/** + * Buffers records in memory, sorts by partition number, and writes a single data file + index file. + * Each partition's data block is LZ4 compressed. + *

+ * Data file format: for each non-empty partition, a compressed block: + * [4 bytes: uncompressed size][4 bytes: compressed size][compressed data] + *

+ * Within each uncompressed block, records are: + * [4 bytes: record length][1 byte: operation][4 bytes: changeOrdinal][N bytes: serialized data] + *

+ * Index file format: (numPartitions + 1) long values (8 bytes each) representing byte offsets + * into the data file. offset[i] to offset[i+1] is the byte range for partition i. + */ +class LocalDiskShuffleWriter implements ShuffleWriter { + + private static final LZ4Factory LZ4 = LZ4Factory.fastestInstance(); + + /** [4 bytes: uncompressed size][4 bytes: compressed size] */ + private static final int BLOCK_HEADER_SIZE = Integer.BYTES + Integer.BYTES; + + private final Path dataFilePath; + private final Path indexFilePath; + private final int numPartitions; + private final List buffer = new ArrayList<>(); + + LocalDiskShuffleWriter(final Path dataFilePath, final Path indexFilePath, final int numPartitions) { + this.dataFilePath = dataFilePath; + this.indexFilePath = indexFilePath; + this.numPartitions = numPartitions; + } + + @Override + public void addRecord(final int partitionNumber, final byte operation, final int changeOrdinal, + final byte[] serializedRecord) { + buffer.add(new BufferedRecord(partitionNumber, operation, changeOrdinal, serializedRecord)); + } + + @Override + public void finish() { + buffer.sort(Comparator.comparingInt(r -> r.partitionNumber)); + + final long[] offsets = new long[numPartitions + 1]; + long currentOffset = 0; + + // Group records by partition + final List> partitionGroups = new ArrayList<>(); + for (int i = 0; i < numPartitions; i++) { + partitionGroups.add(new ArrayList<>()); + } + for (final BufferedRecord record : buffer) { + partitionGroups.get(record.partitionNumber).add(record); + } + + try (OutputStream out = new BufferedOutputStream(Files.newOutputStream(dataFilePath))) { + final LZ4Compressor compressor = LZ4.fastCompressor(); + + for (int p = 0; p < numPartitions; p++) { + offsets[p] = currentOffset; + final List records = partitionGroups.get(p); + if (records.isEmpty()) { + continue; + } + + // Serialize records to uncompressed bytes + final byte[] uncompressed = serializeRecords(records); + + // Compress + final int maxCompressedLength = compressor.maxCompressedLength(uncompressed.length); + final byte[] compressed = new byte[maxCompressedLength]; + final int compressedLength = compressor.compress(uncompressed, 0, uncompressed.length, + compressed, 0, maxCompressedLength); + + // Write: [uncompressed size][compressed size][compressed data] + final byte[] header = new byte[BLOCK_HEADER_SIZE]; + ByteBuffer.wrap(header).putInt(uncompressed.length).putInt(compressedLength); + out.write(header); + out.write(compressed, 0, compressedLength); + currentOffset += BLOCK_HEADER_SIZE + compressedLength; + } + offsets[numPartitions] = currentOffset; + } catch (final IOException e) { + throw new UncheckedIOException("Failed to write shuffle data file: " + dataFilePath, e); + } + + // Write index file + try (OutputStream out = new BufferedOutputStream(Files.newOutputStream(indexFilePath))) { + final ByteBuffer indexBuffer = ByteBuffer.allocate((numPartitions + 1) * Long.BYTES); + for (final long offset : offsets) { + indexBuffer.putLong(offset); + } + out.write(indexBuffer.array()); + } catch (final IOException e) { + throw new UncheckedIOException("Failed to write shuffle index file: " + indexFilePath, e); + } + + buffer.clear(); + } + + @Override + public void close() { + buffer.clear(); + } + + private static byte[] serializeRecords(final List records) { + final ByteArrayOutputStream baos = new ByteArrayOutputStream(); + for (final BufferedRecord record : records) { + final int recordLength = ShuffleRecord.OPERATION_SIZE + ShuffleRecord.CHANGE_ORDINAL_SIZE + record.serializedRecord.length; + final byte[] header = new byte[Integer.BYTES + ShuffleRecord.OPERATION_SIZE + ShuffleRecord.CHANGE_ORDINAL_SIZE]; + ByteBuffer.wrap(header) + .putInt(recordLength) + .put(record.operation) + .putInt(record.changeOrdinal); + try { + baos.write(header); + baos.write(record.serializedRecord); + } catch (final IOException e) { + throw new UncheckedIOException(e); + } + } + return baos.toByteArray(); + } + + private static class BufferedRecord { + final int partitionNumber; + final byte operation; + final int changeOrdinal; + final byte[] serializedRecord; + + BufferedRecord(final int partitionNumber, final byte operation, final int changeOrdinal, + final byte[] serializedRecord) { + this.partitionNumber = partitionNumber; + this.operation = operation; + this.changeOrdinal = changeOrdinal; + this.serializedRecord = serializedRecord; + } + } +} diff --git a/data-prepper-plugins/iceberg-source/src/main/java/org/opensearch/dataprepper/plugins/source/iceberg/shuffle/ShuffleConfig.java b/data-prepper-plugins/iceberg-source/src/main/java/org/opensearch/dataprepper/plugins/source/iceberg/shuffle/ShuffleConfig.java new file mode 100644 index 0000000000..5193b372d2 --- /dev/null +++ b/data-prepper-plugins/iceberg-source/src/main/java/org/opensearch/dataprepper/plugins/source/iceberg/shuffle/ShuffleConfig.java @@ -0,0 +1,94 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + * + */ + +package org.opensearch.dataprepper.plugins.source.iceberg.shuffle; + +import com.fasterxml.jackson.annotation.JsonIgnoreProperties; +import com.fasterxml.jackson.annotation.JsonProperty; +import jakarta.validation.constraints.Max; +import jakarta.validation.constraints.Min; +import org.opensearch.dataprepper.http.BaseHttpServerConfig; +import org.opensearch.dataprepper.model.types.ByteCount; + +@JsonIgnoreProperties({"path", "compression", "health_check_service", + "unauthenticated_health_check", "request_timeout", "thread_count", + "max_connection_count", "max_pending_requests", "max_request_length"}) +public class ShuffleConfig extends BaseHttpServerConfig { + + static final int DEFAULT_PARTITIONS = 64; + static final String DEFAULT_TARGET_PARTITION_SIZE = "64mb"; + static final int DEFAULT_SERVER_PORT = 4995; + + @JsonProperty("partitions") + @Min(1) + @Max(10000) + private int partitions = DEFAULT_PARTITIONS; + + @JsonProperty("target_partition_size") + private ByteCount targetPartitionSize = ByteCount.parse(DEFAULT_TARGET_PARTITION_SIZE); + + @JsonProperty("storage_path") + private String storagePath; + + @JsonProperty("ssl") + private boolean ssl = true; + + @JsonProperty("ssl_insecure_disable_verification") + private boolean sslInsecureDisableVerification = false; + + @Override + public int getDefaultPort() { + return DEFAULT_SERVER_PORT; + } + + @Override + public String getDefaultPath() { + return "/shuffle"; + } + + @Override + public boolean isSsl() { + return ssl; + } + + @Override + public boolean isSslCertAndKeyFileInS3() { + return ssl && getSslCertificateFile() != null + && getSslCertificateFile().toLowerCase().startsWith("s3://") + && getSslKeyFile() != null + && getSslKeyFile().toLowerCase().startsWith("s3://"); + } + + @Override + public boolean isSslCertificateFileValid() { + if (ssl && !isUseAcmCertificateForSsl()) { + return getSslCertificateFile() != null && !getSslCertificateFile().isEmpty(); + } + return true; + } + + @Override + public boolean isSslKeyFileValid() { + if (ssl && !isUseAcmCertificateForSsl()) { + return getSslKeyFile() != null && !getSslKeyFile().isEmpty(); + } + return true; + } + + public int getPartitions() { return partitions; } + + public long getTargetPartitionSizeBytes() { return targetPartitionSize.getBytes(); } + + public String getStoragePath() { return storagePath; } + + public int getServerPort() { return getPort(); } + + public boolean isSslInsecureDisableVerification() { return sslInsecureDisableVerification; } +} diff --git a/data-prepper-plugins/iceberg-source/src/main/java/org/opensearch/dataprepper/plugins/source/iceberg/shuffle/ShuffleHttpServer.java b/data-prepper-plugins/iceberg-source/src/main/java/org/opensearch/dataprepper/plugins/source/iceberg/shuffle/ShuffleHttpServer.java new file mode 100644 index 0000000000..422ed94040 --- /dev/null +++ b/data-prepper-plugins/iceberg-source/src/main/java/org/opensearch/dataprepper/plugins/source/iceberg/shuffle/ShuffleHttpServer.java @@ -0,0 +1,73 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + * + */ + +package org.opensearch.dataprepper.plugins.source.iceberg.shuffle; + +import com.linecorp.armeria.server.Server; +import org.opensearch.dataprepper.armeria.authentication.ArmeriaHttpAuthenticationProvider; +import org.opensearch.dataprepper.http.certificate.CertificateProviderFactory; +import org.opensearch.dataprepper.metrics.PluginMetrics; +import org.opensearch.dataprepper.plugins.certificate.CertificateProvider; +import org.opensearch.dataprepper.plugins.server.CreateServer; +import org.opensearch.dataprepper.plugins.server.ServerConfiguration; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.concurrent.CompletableFuture; + +/** + * HTTP server for serving shuffle data to other Data Prepper nodes. + */ +public class ShuffleHttpServer { + + private static final Logger LOG = LoggerFactory.getLogger(ShuffleHttpServer.class); + + private final ShuffleConfig config; + private final ShuffleHttpService service; + private final ArmeriaHttpAuthenticationProvider authenticationProvider; + private Server server; + + public ShuffleHttpServer(final ShuffleConfig config, final ShuffleHttpService service, + final ArmeriaHttpAuthenticationProvider authenticationProvider) { + this.config = config; + this.service = service; + this.authenticationProvider = authenticationProvider; + } + + public void start() { + final ServerConfiguration serverConfig = new ServerConfiguration(); + serverConfig.setPort(config.getServerPort()); + serverConfig.setSsl(config.isSsl()); + + final CertificateProvider certificateProvider = config.isSsl() + ? new CertificateProviderFactory(config).getCertificateProvider() + : null; + + final CreateServer createServer = new CreateServer( + serverConfig, LOG, PluginMetrics.fromNames("shuffle", "iceberg-source"), + "shuffle-server", "iceberg-cdc-pipeline"); + + server = createServer.createHTTPServer(certificateProvider, authenticationProvider, service, "/shuffle"); + final CompletableFuture future = server.start(); + future.join(); + LOG.info("Shuffle HTTP server started on port {}", config.getServerPort()); + } + + public void stop() { + if (server != null) { + server.stop().join(); + LOG.info("Shuffle HTTP server stopped"); + } + } + + public int getPort() { + return config.getServerPort(); + } +} diff --git a/data-prepper-plugins/iceberg-source/src/main/java/org/opensearch/dataprepper/plugins/source/iceberg/shuffle/ShuffleHttpService.java b/data-prepper-plugins/iceberg-source/src/main/java/org/opensearch/dataprepper/plugins/source/iceberg/shuffle/ShuffleHttpService.java new file mode 100644 index 0000000000..b6cb5969ed --- /dev/null +++ b/data-prepper-plugins/iceberg-source/src/main/java/org/opensearch/dataprepper/plugins/source/iceberg/shuffle/ShuffleHttpService.java @@ -0,0 +1,120 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + * + */ + +package org.opensearch.dataprepper.plugins.source.iceberg.shuffle; + +import com.linecorp.armeria.common.HttpResponse; +import com.linecorp.armeria.common.HttpStatus; +import com.linecorp.armeria.common.MediaType; +import com.linecorp.armeria.server.annotation.Delete; +import com.linecorp.armeria.server.annotation.Get; +import com.linecorp.armeria.server.annotation.Param; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.UncheckedIOException; +import java.nio.ByteBuffer; +import java.nio.file.NoSuchFileException; +import java.util.regex.Pattern; + +/** + * HTTP service that serves shuffle data files to remote SHUFFLE_READ workers. + */ +public class ShuffleHttpService { + + private static final Logger LOG = LoggerFactory.getLogger(ShuffleHttpService.class); + private static final Pattern SNAPSHOT_ID_PATTERN = Pattern.compile("\\d+"); + private static final Pattern TASK_ID_PATTERN = Pattern.compile("[0-9a-f]+"); + + private final ShuffleStorage shuffleStorage; + + public ShuffleHttpService(final ShuffleStorage shuffleStorage) { + this.shuffleStorage = shuffleStorage; + } + + @Get("/{snapshotId}/{taskId}/index") + public HttpResponse getIndex(@Param("snapshotId") final String snapshotId, + @Param("taskId") final String taskId) { + if (isInvalidSnapshotId(snapshotId) || isInvalidTaskId(taskId)) { + return HttpResponse.of(HttpStatus.BAD_REQUEST); + } + try { + final ShuffleReader reader = shuffleStorage.createReader(snapshotId, taskId); + final long[] offsets = reader.readIndex(); + reader.close(); + + final byte[] bytes = new byte[offsets.length * Long.BYTES]; + ByteBuffer.wrap(bytes).asLongBuffer().put(offsets); + return HttpResponse.of(HttpStatus.OK, MediaType.OCTET_STREAM, bytes); + } catch (final UncheckedIOException e) { + if (e.getCause() instanceof NoSuchFileException) { + return HttpResponse.of(HttpStatus.NOT_FOUND); + } + LOG.error("Failed to serve index for snapshot={} task={}", snapshotId, taskId, e); + return HttpResponse.of(HttpStatus.INTERNAL_SERVER_ERROR); + } catch (final Exception e) { + LOG.error("Failed to serve index for snapshot={} task={}", snapshotId, taskId, e); + return HttpResponse.of(HttpStatus.INTERNAL_SERVER_ERROR); + } + } + + @Get("/{snapshotId}/{taskId}/data") + public HttpResponse getData(@Param("snapshotId") final String snapshotId, + @Param("taskId") final String taskId, + @Param("offset") final long offset, + @Param("length") final int length) { + if (isInvalidSnapshotId(snapshotId) || isInvalidTaskId(taskId) || offset < 0 || length < 0) { + return HttpResponse.of(HttpStatus.BAD_REQUEST); + } + if (length == 0) { + return HttpResponse.of(HttpStatus.OK, MediaType.OCTET_STREAM, new byte[0]); + } + try { + final ShuffleReader reader = shuffleStorage.createReader(snapshotId, taskId); + final byte[] data = reader.readBytes(offset, length); + reader.close(); + return HttpResponse.of(HttpStatus.OK, MediaType.OCTET_STREAM, data); + } catch (final UncheckedIOException e) { + if (e.getCause() instanceof NoSuchFileException) { + return HttpResponse.of(HttpStatus.NOT_FOUND); + } + LOG.error("Failed to serve data for snapshot={} task={} offset={} length={}", + snapshotId, taskId, offset, length, e); + return HttpResponse.of(HttpStatus.INTERNAL_SERVER_ERROR); + } catch (final Exception e) { + LOG.error("Failed to serve data for snapshot={} task={} offset={} length={}", + snapshotId, taskId, offset, length, e); + return HttpResponse.of(HttpStatus.INTERNAL_SERVER_ERROR); + } + } + + @Delete("/{snapshotId}") + public HttpResponse cleanup(@Param("snapshotId") final String snapshotId) { + if (isInvalidSnapshotId(snapshotId)) { + return HttpResponse.of(HttpStatus.BAD_REQUEST); + } + try { + shuffleStorage.cleanup(snapshotId); + LOG.info("Cleaned up shuffle files for snapshot {}", snapshotId); + return HttpResponse.of(HttpStatus.OK); + } catch (final Exception e) { + LOG.warn("Failed to clean up shuffle files for snapshot {}", snapshotId, e); + return HttpResponse.of(HttpStatus.INTERNAL_SERVER_ERROR); + } + } + + private static boolean isInvalidSnapshotId(final String snapshotId) { + return snapshotId == null || !SNAPSHOT_ID_PATTERN.matcher(snapshotId).matches(); + } + + private static boolean isInvalidTaskId(final String taskId) { + return taskId == null || !TASK_ID_PATTERN.matcher(taskId).matches(); + } +} diff --git a/data-prepper-plugins/iceberg-source/src/main/java/org/opensearch/dataprepper/plugins/source/iceberg/shuffle/ShuffleNodeClient.java b/data-prepper-plugins/iceberg-source/src/main/java/org/opensearch/dataprepper/plugins/source/iceberg/shuffle/ShuffleNodeClient.java new file mode 100644 index 0000000000..1c6fb64875 --- /dev/null +++ b/data-prepper-plugins/iceberg-source/src/main/java/org/opensearch/dataprepper/plugins/source/iceberg/shuffle/ShuffleNodeClient.java @@ -0,0 +1,176 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + * + */ + +package org.opensearch.dataprepper.plugins.source.iceberg.shuffle; + +import com.linecorp.armeria.client.ClientFactory; +import com.linecorp.armeria.client.WebClient; +import com.linecorp.armeria.common.AggregatedHttpResponse; +import com.linecorp.armeria.common.HttpMethod; +import com.linecorp.armeria.common.HttpStatus; +import com.linecorp.armeria.common.RequestHeaders; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.net.InetAddress; +import java.net.NetworkInterface; +import java.nio.ByteBuffer; +import java.time.Duration; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +/** + * HTTP client for pulling shuffle index and data from remote nodes. + */ +public class ShuffleNodeClient { + + private static final Logger LOG = LoggerFactory.getLogger(ShuffleNodeClient.class); + private static final int MAX_RETRIES = 3; + private static final Duration RESPONSE_TIMEOUT = Duration.ofSeconds(30); + + private final String scheme; + private final int port; + private final ClientFactory clientFactory; + private final Map clientCache = new ConcurrentHashMap<>(); + + public ShuffleNodeClient(final ShuffleConfig config) { + this.scheme = config.isSsl() ? "https" : "http"; + this.port = config.getServerPort(); + this.clientFactory = buildClientFactory(config); + } + + public long[] pullIndex(final String nodeAddress, final String snapshotId, final String taskId) throws Exception { + final byte[] body = executeWithRetry(nodeAddress, + String.format("/shuffle/%s/%s/index", snapshotId, taskId), + Duration.ofSeconds(10), + "index from " + nodeAddress); + final ByteBuffer buf = ByteBuffer.wrap(body); + final long[] offsets = new long[body.length / Long.BYTES]; + for (int i = 0; i < offsets.length; i++) { + offsets[i] = buf.getLong(); + } + return offsets; + } + + public byte[] pullData(final String nodeAddress, final String snapshotId, final String taskId, + final long offset, final int length) throws Exception { + return executeWithRetry(nodeAddress, + String.format("/shuffle/%s/%s/data?offset=%d&length=%d", snapshotId, taskId, offset, length), + RESPONSE_TIMEOUT, + "data from " + nodeAddress); + } + + public void requestCleanup(final String nodeAddress, final String snapshotId) { + try { + final WebClient client = getClient(nodeAddress); + client.execute(RequestHeaders.of(HttpMethod.DELETE, String.format("/shuffle/%s", snapshotId))) + .aggregate() + .thenAccept(response -> { + if (!response.status().equals(HttpStatus.OK)) { + LOG.warn("Remote cleanup failed for snapshot {} on {}: status={}", snapshotId, nodeAddress, response.status()); + } + }) + .exceptionally(error -> { + LOG.warn("Remote cleanup failed for snapshot {} on {}", snapshotId, nodeAddress, error); + return null; + }); + } catch (final Exception e) { + LOG.warn("Remote cleanup failed for snapshot {} on {}", snapshotId, nodeAddress, e); + } + } + + public long[] collectPartitionSizes(final ShuffleStorage shuffleStorage, + final String snapshotId, + final List taskIds, + final List nodeAddresses, + final int numPartitions) { + final long[] sizes = new long[numPartitions]; + for (int i = 0; i < taskIds.size(); i++) { + final String taskId = taskIds.get(i); + final String nodeAddress = nodeAddresses.get(i); + try { + final long[] offsets; + if (isLocalAddress(nodeAddress)) { + try (var reader = shuffleStorage.createReader(snapshotId, taskId)) { + offsets = reader.readIndex(); + } + } else { + offsets = pullIndex(nodeAddress, snapshotId, taskId); + } + for (int p = 0; p < numPartitions && p + 1 < offsets.length; p++) { + sizes[p] += offsets[p + 1] - offsets[p]; + } + } catch (final Exception e) { + LOG.warn("Failed to read index for task {} from node {}, skipping for coalesce", taskId, nodeAddress, e); + } + } + return sizes; + } + + public static boolean isLocalAddress(final String address) { + try { + final InetAddress inetAddress = InetAddress.getByName(address); + if (inetAddress.isAnyLocalAddress() || inetAddress.isLoopbackAddress()) { + return true; + } + return NetworkInterface.getByInetAddress(inetAddress) != null; + } catch (final Exception e) { + return false; + } + } + + public static String resolveLocalAddress() { + try { + return InetAddress.getLocalHost().getHostName(); + } catch (final Exception e) { + throw new RuntimeException("Failed to resolve local host name", e); + } + } + + private WebClient getClient(final String nodeAddress) { + return clientCache.computeIfAbsent(nodeAddress, addr -> + WebClient.builder(String.format("%s://%s:%d", scheme, addr, port)) + .factory(clientFactory) + .responseTimeout(RESPONSE_TIMEOUT) + .build()); + } + + private byte[] executeWithRetry(final String nodeAddress, final String path, + final Duration timeout, final String description) throws Exception { + final WebClient client = getClient(nodeAddress); + for (int attempt = 1; attempt <= MAX_RETRIES; attempt++) { + try { + final AggregatedHttpResponse response = client.get(path).aggregate().join(); + if (response.status().equals(HttpStatus.OK)) { + try (var content = response.content()) { + return content.array(); + } + } + LOG.warn("HTTP pull failed for {}: status={} attempt={}/{}", description, response.status(), attempt, MAX_RETRIES); + } catch (final Exception e) { + LOG.warn("HTTP pull failed for {}: attempt={}/{}", description, attempt, MAX_RETRIES, e); + } + if (attempt < MAX_RETRIES) { + Thread.sleep(1000L * attempt); + } + } + throw new RuntimeException("Failed to pull " + description + " after " + MAX_RETRIES + " retries"); + } + + private static ClientFactory buildClientFactory(final ShuffleConfig config) { + if (config.isSsl() && config.isSslInsecureDisableVerification()) { + return ClientFactory.builder() + .tlsNoVerify() + .build(); + } + return ClientFactory.ofDefault(); + } +} diff --git a/data-prepper-plugins/iceberg-source/src/main/java/org/opensearch/dataprepper/plugins/source/iceberg/shuffle/ShufflePartitionCoalescer.java b/data-prepper-plugins/iceberg-source/src/main/java/org/opensearch/dataprepper/plugins/source/iceberg/shuffle/ShufflePartitionCoalescer.java new file mode 100644 index 0000000000..73f82a8c9e --- /dev/null +++ b/data-prepper-plugins/iceberg-source/src/main/java/org/opensearch/dataprepper/plugins/source/iceberg/shuffle/ShufflePartitionCoalescer.java @@ -0,0 +1,78 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + * + */ + +package org.opensearch.dataprepper.plugins.source.iceberg.shuffle; + +import java.util.ArrayList; +import java.util.List; + +/** + * Coalesces shuffle partitions based on data size, similar to Spark's AQE + * (Adaptive Query Execution) coalescing of post-shuffle partitions. + *

+ * Empty partitions are skipped. Adjacent small partitions are merged until + * the target size is reached. + * + * @see + * Spark ShufflePartitionsUtil + */ +public class ShufflePartitionCoalescer { + + private final long targetSizeBytes; + + public ShufflePartitionCoalescer(final long targetSizeBytes) { + this.targetSizeBytes = targetSizeBytes; + } + + /** + * @param partitionSizes size in bytes for each partition (index = partition number) + * @return list of partition ranges, each representing one SHUFFLE_READ task + */ + public List coalesce(final long[] partitionSizes) { + final List result = new ArrayList<>(); + int rangeStart = -1; + long currentSize = 0; + + for (int i = 0; i < partitionSizes.length; i++) { + if (partitionSizes[i] == 0) { + continue; + } + if (rangeStart == -1) { + rangeStart = i; + currentSize = partitionSizes[i]; + } else if (currentSize + partitionSizes[i] > targetSizeBytes) { + result.add(new PartitionRange(rangeStart, i - 1)); + rangeStart = i; + currentSize = partitionSizes[i]; + } else { + currentSize += partitionSizes[i]; + } + } + + if (rangeStart != -1) { + result.add(new PartitionRange(rangeStart, partitionSizes.length - 1)); + } + + return result; + } + + public static class PartitionRange { + private final int startPartition; + private final int endPartitionInclusive; + + public PartitionRange(final int startPartition, final int endPartitionInclusive) { + this.startPartition = startPartition; + this.endPartitionInclusive = endPartitionInclusive; + } + + public int getStartPartition() { return startPartition; } + public int getEndPartitionInclusive() { return endPartitionInclusive; } + } +} diff --git a/data-prepper-plugins/iceberg-source/src/main/java/org/opensearch/dataprepper/plugins/source/iceberg/shuffle/ShuffleReader.java b/data-prepper-plugins/iceberg-source/src/main/java/org/opensearch/dataprepper/plugins/source/iceberg/shuffle/ShuffleReader.java new file mode 100644 index 0000000000..321dc110df --- /dev/null +++ b/data-prepper-plugins/iceberg-source/src/main/java/org/opensearch/dataprepper/plugins/source/iceberg/shuffle/ShuffleReader.java @@ -0,0 +1,36 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + * + */ + +package org.opensearch.dataprepper.plugins.source.iceberg.shuffle; + +import java.io.Closeable; +import java.util.List; + +/** + * Reads shuffle records for a specific partition range from a data file + index file. + */ +public interface ShuffleReader extends Closeable { + + /** + * Returns the partition offset array from the index file. + */ + long[] readIndex(); + + /** + * Reads all records belonging to the specified partition range. + */ + List readPartitions(int startPartition, int endPartitionInclusive); + + /** + * Returns raw bytes for the specified byte range of the data file. + * Used by the HTTP endpoint to serve data to remote readers. + */ + byte[] readBytes(long offset, int length); +} diff --git a/data-prepper-plugins/iceberg-source/src/main/java/org/opensearch/dataprepper/plugins/source/iceberg/shuffle/ShuffleRecord.java b/data-prepper-plugins/iceberg-source/src/main/java/org/opensearch/dataprepper/plugins/source/iceberg/shuffle/ShuffleRecord.java new file mode 100644 index 0000000000..bfee7d898e --- /dev/null +++ b/data-prepper-plugins/iceberg-source/src/main/java/org/opensearch/dataprepper/plugins/source/iceberg/shuffle/ShuffleRecord.java @@ -0,0 +1,44 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + * + */ + +package org.opensearch.dataprepper.plugins.source.iceberg.shuffle; + +/** + * A single record in the shuffle intermediate data. + */ +public class ShuffleRecord { + public static final byte OP_DELETE = 0; + public static final byte OP_INSERT = 1; + + static final int OPERATION_SIZE = Byte.BYTES; + static final int CHANGE_ORDINAL_SIZE = Integer.BYTES; + + private final byte operation; + private final int changeOrdinal; + private final byte[] serializedRecord; + + public ShuffleRecord(final byte operation, final int changeOrdinal, final byte[] serializedRecord) { + this.operation = operation; + this.changeOrdinal = changeOrdinal; + this.serializedRecord = serializedRecord; + } + + public byte getOperation() { + return operation; + } + + public int getChangeOrdinal() { + return changeOrdinal; + } + + public byte[] getSerializedRecord() { + return serializedRecord; + } +} diff --git a/data-prepper-plugins/iceberg-source/src/main/java/org/opensearch/dataprepper/plugins/source/iceberg/shuffle/ShuffleStorage.java b/data-prepper-plugins/iceberg-source/src/main/java/org/opensearch/dataprepper/plugins/source/iceberg/shuffle/ShuffleStorage.java new file mode 100644 index 0000000000..596d107c6b --- /dev/null +++ b/data-prepper-plugins/iceberg-source/src/main/java/org/opensearch/dataprepper/plugins/source/iceberg/shuffle/ShuffleStorage.java @@ -0,0 +1,42 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + * + */ + +package org.opensearch.dataprepper.plugins.source.iceberg.shuffle; + +/** + * Abstraction for shuffle intermediate storage. + * Initial implementation uses local disk. Future implementations may use other storages like S3. + */ +public interface ShuffleStorage { + + ShuffleWriter createWriter(String snapshotId, String taskId, int numPartitions); + + ShuffleReader createReader(String snapshotId, String taskId); + + /** + * Returns the total byte size of each hash partition for a given snapshot, + * summed across all shuffle write tasks. + * + * @param snapshotId the snapshot to query + * @param numPartitions the number of hash partitions (e.g. 64) + * @return an array of length {@code numPartitions} where element at index {@code p} + * is the total bytes written to hash partition {@code p} across all tasks + */ + long[] getPartitionSizes(String snapshotId, int numPartitions); + + /** + * Returns all task IDs that have shuffle files for a given snapshot. + */ + java.util.List getTaskIds(String snapshotId); + + void cleanup(String snapshotId); + + void cleanupAll(); +} diff --git a/data-prepper-plugins/iceberg-source/src/main/java/org/opensearch/dataprepper/plugins/source/iceberg/shuffle/ShuffleWriter.java b/data-prepper-plugins/iceberg-source/src/main/java/org/opensearch/dataprepper/plugins/source/iceberg/shuffle/ShuffleWriter.java new file mode 100644 index 0000000000..f486275cac --- /dev/null +++ b/data-prepper-plugins/iceberg-source/src/main/java/org/opensearch/dataprepper/plugins/source/iceberg/shuffle/ShuffleWriter.java @@ -0,0 +1,33 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + * + */ + +package org.opensearch.dataprepper.plugins.source.iceberg.shuffle; + +import java.io.Closeable; + +/** + * Writes shuffle records to a single data file + index file, sorted by partition number. + *

+ * Usage: + *

+ *   writer.addRecord(partitionNumber, operation, changeOrdinal, avroBytes);
+ *   // ... add all records ...
+ *   writer.finish();  // sorts and writes to disk
+ * 
+ */ +public interface ShuffleWriter extends Closeable { + + void addRecord(int partitionNumber, byte operation, int changeOrdinal, byte[] serializedRecord); + + /** + * Sorts buffered records by partition number and writes data file + index file. + */ + void finish(); +} diff --git a/data-prepper-plugins/iceberg-source/src/main/java/org/opensearch/dataprepper/plugins/source/iceberg/worker/ChangelogWorker.java b/data-prepper-plugins/iceberg-source/src/main/java/org/opensearch/dataprepper/plugins/source/iceberg/worker/ChangelogWorker.java index db044fe0c6..428c01bfba 100644 --- a/data-prepper-plugins/iceberg-source/src/main/java/org/opensearch/dataprepper/plugins/source/iceberg/worker/ChangelogWorker.java +++ b/data-prepper-plugins/iceberg-source/src/main/java/org/opensearch/dataprepper/plugins/source/iceberg/worker/ChangelogWorker.java @@ -14,6 +14,7 @@ import org.apache.iceberg.Schema; import org.apache.iceberg.Table; import org.apache.iceberg.avro.Avro; +import org.apache.iceberg.avro.AvroSchemaUtil; import org.apache.iceberg.data.Record; import org.apache.iceberg.data.avro.DataReader; import org.apache.iceberg.data.orc.GenericOrcReader; @@ -36,8 +37,18 @@ import org.opensearch.dataprepper.plugins.source.iceberg.coordination.partition.ChangelogTaskPartition; import org.opensearch.dataprepper.plugins.source.iceberg.coordination.partition.GlobalState; import org.opensearch.dataprepper.plugins.source.iceberg.coordination.partition.InitialLoadTaskPartition; +import org.opensearch.dataprepper.plugins.source.iceberg.coordination.partition.ShuffleReadPartition; +import org.opensearch.dataprepper.plugins.source.iceberg.coordination.partition.ShuffleWritePartition; import org.opensearch.dataprepper.plugins.source.iceberg.coordination.state.ChangelogTaskProgressState; import org.opensearch.dataprepper.plugins.source.iceberg.coordination.state.InitialLoadTaskProgressState; +import org.opensearch.dataprepper.plugins.source.iceberg.coordination.state.ShuffleReadProgressState; +import org.opensearch.dataprepper.plugins.source.iceberg.coordination.state.ShuffleWriteProgressState; +import org.opensearch.dataprepper.plugins.source.iceberg.leader.LeaderScheduler; +import org.opensearch.dataprepper.plugins.source.iceberg.shuffle.LocalDiskShuffleReader; +import org.opensearch.dataprepper.plugins.source.iceberg.shuffle.ShuffleNodeClient; +import org.opensearch.dataprepper.plugins.source.iceberg.shuffle.ShuffleRecord; +import org.opensearch.dataprepper.plugins.source.iceberg.shuffle.ShuffleStorage; +import org.opensearch.dataprepper.plugins.source.iceberg.shuffle.ShuffleWriter; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -64,6 +75,9 @@ public class ChangelogWorker implements Runnable { private final Buffer> buffer; private final AcknowledgementSetManager acknowledgementSetManager; private final EventFactory eventFactory; + private final ShuffleStorage shuffleStorage; + private final ShuffleNodeClient shuffleNodeClient; + private final String localNodeAddress; public ChangelogWorker(final EnhancedSourceCoordinator sourceCoordinator, final IcebergSourceConfig sourceConfig, @@ -71,7 +85,8 @@ public ChangelogWorker(final EnhancedSourceCoordinator sourceCoordinator, final Map tableConfigs, final Buffer> buffer, final AcknowledgementSetManager acknowledgementSetManager, - final EventFactory eventFactory) { + final EventFactory eventFactory, + final ShuffleStorage shuffleStorage) { this.sourceCoordinator = sourceCoordinator; this.sourceConfig = sourceConfig; this.tables = tables; @@ -79,6 +94,9 @@ public ChangelogWorker(final EnhancedSourceCoordinator sourceCoordinator, this.buffer = buffer; this.acknowledgementSetManager = acknowledgementSetManager; this.eventFactory = eventFactory; + this.shuffleStorage = shuffleStorage; + this.shuffleNodeClient = new ShuffleNodeClient(sourceConfig.getShuffleConfig()); + this.localNodeAddress = ShuffleNodeClient.resolveLocalAddress(); } @Override @@ -97,14 +115,30 @@ public void run() { processed = true; } else { partition.ifPresent(sourceCoordinator::giveUpPartition); - // Try initial load tasks - partition = sourceCoordinator.acquireAvailablePartition( - InitialLoadTaskPartition.PARTITION_TYPE); - if (partition.isPresent() && partition.get() instanceof InitialLoadTaskPartition) { - processInitialLoadPartition((InitialLoadTaskPartition) partition.get()); + // Try shuffle write tasks + partition = sourceCoordinator.acquireAvailablePartition(ShuffleWritePartition.PARTITION_TYPE); + if (partition.isPresent() && partition.get() instanceof ShuffleWritePartition) { + processShuffleWrite((ShuffleWritePartition) partition.get()); processed = true; } else { partition.ifPresent(sourceCoordinator::giveUpPartition); + // Try shuffle read tasks + partition = sourceCoordinator.acquireAvailablePartition(ShuffleReadPartition.PARTITION_TYPE); + if (partition.isPresent() && partition.get() instanceof ShuffleReadPartition) { + processShuffleRead((ShuffleReadPartition) partition.get()); + processed = true; + } else { + partition.ifPresent(sourceCoordinator::giveUpPartition); + // Try initial load tasks + partition = sourceCoordinator.acquireAvailablePartition( + InitialLoadTaskPartition.PARTITION_TYPE); + if (partition.isPresent() && partition.get() instanceof InitialLoadTaskPartition) { + processInitialLoadPartition((InitialLoadTaskPartition) partition.get()); + processed = true; + } else { + partition.ifPresent(sourceCoordinator::giveUpPartition); + } + } } } } catch (final Exception e) { @@ -352,7 +386,8 @@ private String buildDocumentId(final Record record, final Schema schema, final L private synchronized void incrementSnapshotCompletionCount(final String snapshotKey) { final String completionKey = "snapshot-completion-" + snapshotKey; - while (true) { + final int maxRetries = 10; + for (int attempt = 0; attempt < maxRetries; attempt++) { final Optional partitionOpt = sourceCoordinator.getPartition(completionKey); if (partitionOpt.isEmpty()) { LOG.error("Failed to get completion status for {}", completionKey); @@ -366,11 +401,12 @@ private synchronized void incrementSnapshotCompletionCount(final String snapshot gs.setProgressState(progress); try { sourceCoordinator.saveProgressStateForPartition(gs, Duration.ZERO); - break; + return; } catch (final Exception e) { - LOG.warn("Completion count update conflict for {}, retrying", completionKey); + LOG.warn("Completion count update conflict for {}, attempt {}/{}", completionKey, attempt + 1, maxRetries); } } + throw new RuntimeException("Failed to update completion count for " + completionKey + " after " + maxRetries + " attempts"); } // TODO: Replace format switch with FormatModelRegistry when available (Iceberg 1.11+). @@ -413,4 +449,233 @@ private static class RowWithMeta { this.operation = operation; } } + + private void processShuffleWrite(final ShuffleWritePartition partition) throws Exception { + final ShuffleWriteProgressState state = partition.getProgressState().orElseThrow(); + final String tableName = state.getTableName(); + final Table table = tables.get(tableName); + + if (table == null) { + LOG.error("Table {} not found for shuffle write, giving up", tableName); + sourceCoordinator.giveUpPartition(partition); + return; + } + + final Schema schema = table.schema(); + final TableConfig tableConfig = tableConfigs.get(tableName); + final List identifierColumns = tableConfig.getIdentifierColumns(); + final int numPartitions = sourceConfig.getShuffleConfig().getPartitions(); + final String snapshotIdStr = String.valueOf(state.getSnapshotId()); + final String operation = "DELETED".equals(state.getTaskType()) ? "DELETE" : "INSERT"; + + LOG.info("SHUFFLE_WRITE: table={} file={} type={}", tableName, state.getDataFilePath(), state.getTaskType()); + + // Record node address for SHUFFLE_READ to know where to pull from + state.setNodeAddress(localNodeAddress); + + final InputFile inputFile = table.io().newInputFile(state.getDataFilePath()); + try (ShuffleWriter writer = shuffleStorage.createWriter(snapshotIdStr, state.getShuffleTaskId(), numPartitions); + CloseableIterable reader = openDataFile(inputFile, schema, state.getDataFilePath())) { + + final org.apache.avro.Schema avroSchema = AvroSchemaUtil.convert(schema, tableName); + + for (final Record record : reader) { + final int partitionNum = computeShufflePartition(record, identifierColumns, numPartitions); + final byte op = "DELETE".equals(operation) ? ShuffleRecord.OP_DELETE : ShuffleRecord.OP_INSERT; + final byte[] serialized = RecordAvroSerializer.serialize(record, avroSchema); + writer.addRecord(partitionNum, op, state.getChangeOrdinal(), serialized); + } + writer.finish(); + } + + // Register location in GlobalState (Spark MapStatus pattern) + registerShuffleWriteLocation(state.getSnapshotId(), state.getShuffleTaskId(), localNodeAddress); + + sourceCoordinator.completePartition(partition); + incrementSnapshotCompletionCount("sw-" + state.getSnapshotId()); + LOG.info("SHUFFLE_WRITE completed: task={}", state.getShuffleTaskId()); + } + + private void processShuffleRead(final ShuffleReadPartition partition) { + final ShuffleReadProgressState state = partition.getProgressState().orElseThrow(); + final String tableName = state.getTableName(); + final Table table = tables.get(tableName); + final TableConfig tableConfig = tableConfigs.get(tableName); + + if (table == null || tableConfig == null) { + LOG.error("Table {} not found for shuffle read, giving up", tableName); + sourceCoordinator.giveUpPartition(partition); + return; + } + + final String snapshotIdStr = String.valueOf(state.getSnapshotId()); + final int startPartition = state.getPartitionRangeStart(); + final int endPartition = state.getPartitionRangeEnd(); + + LOG.info("SHUFFLE_READ: table={} partitions={}-{}", tableName, startPartition, endPartition); + + try { + // Collect records from all nodes for our partition range + final List allRecords = new ArrayList<>(); + + LOG.debug("SHUFFLE_READ taskIds={} nodeAddresses={}", state.getShuffleWriteTaskIds(), state.getNodeAddresses()); + for (int i = 0; i < state.getShuffleWriteTaskIds().size(); i++) { + final String taskId = state.getShuffleWriteTaskIds().get(i); + final String nodeAddress = state.getNodeAddresses().get( + i < state.getNodeAddresses().size() ? i : 0); + + final List records = pullShuffleData( + snapshotIdStr, taskId, nodeAddress, startPartition, endPartition); + allRecords.addAll(records); + } + + if (allRecords.isEmpty()) { + sourceCoordinator.completePartition(partition); + incrementSnapshotCompletionCount("sr-" + state.getSnapshotId()); + return; + } + + // Deserialize, carryover removal, UPDATE merge, write to buffer + final Schema schema = table.schema(); + final org.apache.avro.Schema avroSchema = AvroSchemaUtil.convert(schema, tableName); + final ChangelogRecordConverter converter = new ChangelogRecordConverter(tableName, tableConfig.getIdentifierColumns(), eventFactory); + final CarryoverRemover carryoverRemover = new CarryoverRemover(); + + // Convert ShuffleRecords to RowWithMeta + final List rows = new ArrayList<>(); + for (final ShuffleRecord sr : allRecords) { + final Record record = RecordAvroSerializer.deserialize(sr.getSerializedRecord(), schema, avroSchema); + final String op = sr.getOperation() == ShuffleRecord.OP_DELETE ? "DELETE" : "INSERT"; + rows.add(new RowWithMeta(record, op)); + } + + // Carryover removal + final List changelogRows = new ArrayList<>(); + for (int i = 0; i < rows.size(); i++) { + final RowWithMeta row = rows.get(i); + final List dataColumns = new ArrayList<>(); + for (final Types.NestedField field : schema.columns()) { + dataColumns.add(row.record.getField(field.name())); + } + changelogRows.add(new CarryoverRemover.ChangelogRow(dataColumns, row.operation, i)); + } + final List survivingIndices = carryoverRemover.removeCarryover(changelogRows); + + // UPDATE merge + final Set deletesToSkip = new HashSet<>(); + if (!tableConfig.getIdentifierColumns().isEmpty()) { + final Map deleteByDocId = new LinkedHashMap<>(); + for (final int idx : survivingIndices) { + final RowWithMeta row = rows.get(idx); + if ("DELETE".equals(row.operation)) { + deleteByDocId.put(buildDocumentId(row.record, schema, tableConfig.getIdentifierColumns()), idx); + } + } + for (final int idx : survivingIndices) { + final RowWithMeta row = rows.get(idx); + if ("INSERT".equals(row.operation)) { + final String docId = buildDocumentId(row.record, schema, tableConfig.getIdentifierColumns()); + if (deleteByDocId.containsKey(docId)) { + deletesToSkip.add(deleteByDocId.remove(docId)); + } + } + } + } + + // Write to buffer + final BufferAccumulator> accumulator = + BufferAccumulator.create(buffer, BUFFER_ACCUMULATOR_SIZE, BUFFER_TIMEOUT); + + for (final int idx : survivingIndices) { + if (deletesToSkip.contains(idx)) continue; + final RowWithMeta row = rows.get(idx); + final Event event = converter.convert(row.record, schema, row.operation, state.getSnapshotId()); + accumulator.add(new org.opensearch.dataprepper.model.record.Record<>(event)); + } + accumulator.flush(); + + sourceCoordinator.completePartition(partition); + incrementSnapshotCompletionCount("sr-" + state.getSnapshotId()); + LOG.info("SHUFFLE_READ completed: partitions={}-{}, {} events", startPartition, endPartition, + survivingIndices.size() - deletesToSkip.size()); + + } catch (final Exception e) { + LOG.error("SHUFFLE_READ failed for partitions {}-{}", startPartition, endPartition, e); + markShuffleFailed(snapshotIdStr); + sourceCoordinator.giveUpPartition(partition); + } + } + + private List pullShuffleData(final String snapshotId, final String taskId, + final String nodeAddress, final int startPartition, + final int endPartition) throws Exception { + LOG.debug("Pulling shuffle data: snapshot={} task={} node={} partitions={}-{} isLocal={}", + snapshotId, taskId, nodeAddress, startPartition, endPartition, ShuffleNodeClient.isLocalAddress(nodeAddress)); + // Local node: read directly from disk + if (ShuffleNodeClient.isLocalAddress(nodeAddress)) { + try (var reader = shuffleStorage.createReader(snapshotId, taskId)) { + return reader.readPartitions(startPartition, endPartition); + } + } + + // Remote node: get index, then pull each partition's compressed block + final long[] offsets = shuffleNodeClient.pullIndex(nodeAddress, snapshotId, taskId); + + final List allRecords = new ArrayList<>(); + for (int p = startPartition; p <= endPartition; p++) { + final long offset = offsets[p]; + final long end = offsets[p + 1]; + final int length = (int) (end - offset); + if (length == 0) { + continue; + } + + final byte[] compressedBlock = shuffleNodeClient.pullData(nodeAddress, snapshotId, taskId, offset, length); + final byte[] uncompressed = LocalDiskShuffleReader.decompressBlock(compressedBlock); + allRecords.addAll(LocalDiskShuffleReader.parseRecords(uncompressed)); + } + return allRecords; + } + + private int computeShufflePartition(final Record record, + final List identifierColumns, final int numPartitions) { + int hash = 0; + for (final String col : identifierColumns) { + final Object val = record.getField(col); + hash = 31 * hash + (val != null ? val.toString().hashCode() : 0); + } + return Math.floorMod(hash, numPartitions); + } + + private synchronized void registerShuffleWriteLocation(final long snapshotId, final String taskId, final String nodeAddress) { + final String locationKey = "shuffle-locations-" + snapshotId; + final int maxRetries = 10; + for (int attempt = 0; attempt < maxRetries; attempt++) { + final Optional partitionOpt = sourceCoordinator.getPartition(locationKey); + if (partitionOpt.isEmpty()) { + LOG.error("Failed to get shuffle location state for {}", locationKey); + return; + } + final GlobalState gs = (GlobalState) partitionOpt.get(); + final Map locations = new java.util.HashMap<>(gs.getProgressState().orElse(Map.of())); + locations.put(taskId, nodeAddress); + gs.setProgressState(locations); + try { + sourceCoordinator.saveProgressStateForPartition(gs, Duration.ZERO); + return; + } catch (final Exception e) { + LOG.warn("Location update conflict for {}, attempt {}/{}", locationKey, attempt + 1, maxRetries); + } + } + throw new RuntimeException("Failed to register shuffle write location for " + locationKey + " after " + maxRetries + " attempts"); + } + + private void markShuffleFailed(final String snapshotIdStr) { + final String key = LeaderScheduler.SHUFFLE_FAILED_PREFIX + snapshotIdStr; + try { + sourceCoordinator.createPartition(new GlobalState(key, Map.of("failed", true))); + } catch (final Exception e) { + LOG.warn("Failed to mark shuffle as failed for snapshot {}", snapshotIdStr, e); + } + } } diff --git a/data-prepper-plugins/iceberg-source/src/main/java/org/opensearch/dataprepper/plugins/source/iceberg/worker/RecordAvroSerializer.java b/data-prepper-plugins/iceberg-source/src/main/java/org/opensearch/dataprepper/plugins/source/iceberg/worker/RecordAvroSerializer.java new file mode 100644 index 0000000000..888c52984b --- /dev/null +++ b/data-prepper-plugins/iceberg-source/src/main/java/org/opensearch/dataprepper/plugins/source/iceberg/worker/RecordAvroSerializer.java @@ -0,0 +1,50 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + * + */ + +package org.opensearch.dataprepper.plugins.source.iceberg.worker; + +import org.apache.avro.io.BinaryDecoder; +import org.apache.avro.io.BinaryEncoder; +import org.apache.avro.io.DecoderFactory; +import org.apache.avro.io.EncoderFactory; +import org.apache.iceberg.Schema; +import org.apache.iceberg.data.Record; +import org.apache.iceberg.data.avro.DataWriter; +import org.apache.iceberg.data.avro.PlannedDataReader; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; + +/** + * Serializes and deserializes Iceberg {@link Record}s using Avro binary encoding. + * Uses Iceberg's {@link DataWriter} and {@link PlannedDataReader} which handle + * Iceberg-specific types (OffsetDateTime, LocalDate, etc.) correctly. + */ +public class RecordAvroSerializer { + + private RecordAvroSerializer() {} + + public static byte[] serialize(final Record record, final org.apache.avro.Schema avroSchema) throws IOException { + final ByteArrayOutputStream out = new ByteArrayOutputStream(); + final BinaryEncoder encoder = EncoderFactory.get().binaryEncoder(out, null); + final DataWriter writer = DataWriter.create(avroSchema); + writer.write(record, encoder); + encoder.flush(); + return out.toByteArray(); + } + + public static Record deserialize(final byte[] data, final Schema icebergSchema, + final org.apache.avro.Schema avroSchema) throws IOException { + final BinaryDecoder decoder = DecoderFactory.get().binaryDecoder(data, null); + final PlannedDataReader reader = PlannedDataReader.create(icebergSchema); + reader.setSchema(avroSchema); + return reader.read(null, decoder); + } +} diff --git a/data-prepper-plugins/iceberg-source/src/test/java/org/opensearch/dataprepper/plugins/source/iceberg/IcebergSourceTest.java b/data-prepper-plugins/iceberg-source/src/test/java/org/opensearch/dataprepper/plugins/source/iceberg/IcebergSourceTest.java index 5ec4bacf2b..f36b54ec8b 100644 --- a/data-prepper-plugins/iceberg-source/src/test/java/org/opensearch/dataprepper/plugins/source/iceberg/IcebergSourceTest.java +++ b/data-prepper-plugins/iceberg-source/src/test/java/org/opensearch/dataprepper/plugins/source/iceberg/IcebergSourceTest.java @@ -17,6 +17,7 @@ import org.opensearch.dataprepper.metrics.PluginMetrics; import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSetManager; import org.opensearch.dataprepper.model.event.EventFactory; +import org.opensearch.dataprepper.model.plugin.PluginFactory; import org.opensearch.dataprepper.model.source.coordinator.enhanced.EnhancedSourceCoordinator; import java.util.List; @@ -40,6 +41,9 @@ class IcebergSourceTest { @Mock private EventFactory eventFactory; + @Mock + private PluginFactory pluginFactory; + @Mock private EnhancedSourceCoordinator sourceCoordinator; @@ -51,7 +55,7 @@ void areAcknowledgementsEnabled_returnsConfigValue_whenTrue() { when(sourceConfig.isAcknowledgmentsEnabled()).thenReturn(true); when(sourceConfig.getTables()).thenReturn(List.of(tableConfig)); - final IcebergSource source = new IcebergSource(sourceConfig, pluginMetrics, acknowledgementSetManager, eventFactory); + final IcebergSource source = new IcebergSource(sourceConfig, pluginMetrics, acknowledgementSetManager, eventFactory, pluginFactory); assertThat(source.areAcknowledgementsEnabled(), equalTo(true)); } @@ -60,7 +64,7 @@ void areAcknowledgementsEnabled_returnsConfigValue_whenFalse() { when(sourceConfig.isAcknowledgmentsEnabled()).thenReturn(false); when(sourceConfig.getTables()).thenReturn(List.of(tableConfig)); - final IcebergSource source = new IcebergSource(sourceConfig, pluginMetrics, acknowledgementSetManager, eventFactory); + final IcebergSource source = new IcebergSource(sourceConfig, pluginMetrics, acknowledgementSetManager, eventFactory, pluginFactory); assertThat(source.areAcknowledgementsEnabled(), equalTo(false)); } @@ -68,7 +72,7 @@ void areAcknowledgementsEnabled_returnsConfigValue_whenFalse() { void getPartitionFactory_returnsNonNull() { when(sourceConfig.getTables()).thenReturn(List.of(tableConfig)); - final IcebergSource source = new IcebergSource(sourceConfig, pluginMetrics, acknowledgementSetManager, eventFactory); + final IcebergSource source = new IcebergSource(sourceConfig, pluginMetrics, acknowledgementSetManager, eventFactory, pluginFactory); assertThat(source.getPartitionFactory() != null, equalTo(true)); } } diff --git a/data-prepper-plugins/iceberg-source/src/test/java/org/opensearch/dataprepper/plugins/source/iceberg/leader/TaskGrouperTest.java b/data-prepper-plugins/iceberg-source/src/test/java/org/opensearch/dataprepper/plugins/source/iceberg/leader/TaskGrouperTest.java index 305954c602..65dbe293b8 100644 --- a/data-prepper-plugins/iceberg-source/src/test/java/org/opensearch/dataprepper/plugins/source/iceberg/leader/TaskGrouperTest.java +++ b/data-prepper-plugins/iceberg-source/src/test/java/org/opensearch/dataprepper/plugins/source/iceberg/leader/TaskGrouperTest.java @@ -11,10 +11,13 @@ package org.opensearch.dataprepper.plugins.source.iceberg.leader; import org.junit.jupiter.api.Test; +import org.opensearch.dataprepper.plugins.source.iceberg.coordination.state.ChangelogTaskProgressState; +import org.opensearch.dataprepper.plugins.source.iceberg.coordination.state.ShuffleWriteProgressState; import java.util.List; import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.hasSize; class TaskGrouperTest { @@ -22,85 +25,63 @@ class TaskGrouperTest { private final TaskGrouper taskGrouper = new TaskGrouper(); @Test - void pairByBounds_insertOnly_eachFileIsIndependentGroup() { - final List tasks = List.of( - new TaskGrouper.TaskInfo("file1", "ADDED", 100, "bounds1"), - new TaskGrouper.TaskInfo("file2", "ADDED", 200, "bounds2") + void planInsertOnlyTasks_creates_one_task_per_file() { + final List taskInfos = List.of( + new TaskGrouper.TaskInfo("file1.parquet", "ADDED", 100, 0), + new TaskGrouper.TaskInfo("file2.parquet", "ADDED", 200, 0) ); - final List> groups = taskGrouper.pairByBounds(tasks); - assertThat(groups, hasSize(2)); - } - @Test - void pairByBounds_deleteOnly_eachFileIsIndependentGroup() { - final List tasks = List.of( - new TaskGrouper.TaskInfo("file1", "DELETED", 100, "bounds1") - ); - final List> groups = taskGrouper.pairByBounds(tasks); - assertThat(groups, hasSize(1)); - } + final List result = + taskGrouper.planInsertOnlyTasks(taskInfos, "db.table", 12345L); - @Test - void pairByBounds_matchingBounds_pairedTogether() { - final List tasks = List.of( - new TaskGrouper.TaskInfo("old_file", "DELETED", 100, "{1=abc}|{1=xyz}"), - new TaskGrouper.TaskInfo("new_file", "ADDED", 100, "{1=abc}|{1=xyz}") - ); - final List> groups = taskGrouper.pairByBounds(tasks); - assertThat(groups, hasSize(1)); - assertThat(groups.get(0), hasSize(2)); + assertThat(result, hasSize(2)); + assertThat(result.get(0).getDataFilePaths(), equalTo(List.of("file1.parquet"))); + assertThat(result.get(0).getTotalRecords(), equalTo(100L)); + assertThat(result.get(1).getDataFilePaths(), equalTo(List.of("file2.parquet"))); + assertThat(result.get(1).getTotalRecords(), equalTo(200L)); } @Test - void pairByBounds_differentBounds_fallbackGroup() { - final List tasks = List.of( - new TaskGrouper.TaskInfo("old_file", "DELETED", 100, "{1=abc}|{1=xyz}"), - new TaskGrouper.TaskInfo("new_file", "ADDED", 100, "{1=abc}|{1=zzz}") + void planShuffleWriteTasks_creates_one_task_per_file_with_change_ordinal() { + final List taskInfos = List.of( + new TaskGrouper.TaskInfo("deleted.parquet", "DELETED", 100, 0), + new TaskGrouper.TaskInfo("added.parquet", "ADDED", 100, 0) ); - final List> groups = taskGrouper.pairByBounds(tasks); - // Bounds don't match -> fallback group with both - assertThat(groups, hasSize(1)); - assertThat(groups.get(0), hasSize(2)); - } - @Test - void pairByBounds_multiplePairs_eachPairedSeparately() { - final List tasks = List.of( - new TaskGrouper.TaskInfo("old_us", "DELETED", 100, "bounds_us"), - new TaskGrouper.TaskInfo("old_eu", "DELETED", 100, "bounds_eu"), - new TaskGrouper.TaskInfo("new_us", "ADDED", 100, "bounds_us"), - new TaskGrouper.TaskInfo("new_eu", "ADDED", 100, "bounds_eu") - ); - final List> groups = taskGrouper.pairByBounds(tasks); - assertThat(groups, hasSize(2)); - assertThat(groups.get(0), hasSize(2)); - assertThat(groups.get(1), hasSize(2)); + final List result = + taskGrouper.planShuffleWriteTasks(taskInfos, "db.table", 12345L); + + assertThat(result, hasSize(2)); + assertThat(result.get(0).getDataFilePath(), equalTo("deleted.parquet")); + assertThat(result.get(0).getTaskType(), equalTo("DELETED")); + assertThat(result.get(0).getChangeOrdinal(), equalTo(0)); + assertThat(result.get(1).getDataFilePath(), equalTo("added.parquet")); + assertThat(result.get(1).getTaskType(), equalTo("ADDED")); } @Test - void pairByBounds_ambiguousBounds_fallbackGroup() { - // Two DELETED and two ADDED with same bounds -> can't pair uniquely - final List tasks = List.of( - new TaskGrouper.TaskInfo("old1", "DELETED", 100, "same_bounds"), - new TaskGrouper.TaskInfo("old2", "DELETED", 100, "same_bounds"), - new TaskGrouper.TaskInfo("new1", "ADDED", 100, "same_bounds"), - new TaskGrouper.TaskInfo("new2", "ADDED", 100, "same_bounds") + void hasDeleted_is_true_when_deleted_task_exists() { + final List taskInfos = List.of( + new TaskGrouper.TaskInfo("deleted.parquet", "DELETED", 100, 0), + new TaskGrouper.TaskInfo("added.parquet", "ADDED", 100, 0) ); - final List> groups = taskGrouper.pairByBounds(tasks); - // Ambiguous -> all in one fallback group - assertThat(groups, hasSize(1)); - assertThat(groups.get(0), hasSize(4)); + + final boolean hasDeleted = taskInfos.stream() + .anyMatch(t -> "DELETED".equals(t.taskType)); + + assertThat(hasDeleted, equalTo(true)); } @Test - void pairByBounds_nullBounds_fallbackGroup() { - final List tasks = List.of( - new TaskGrouper.TaskInfo("old_file", "DELETED", 100, null), - new TaskGrouper.TaskInfo("new_file", "ADDED", 100, null) + void hasDeleted_is_false_for_insert_only() { + final List taskInfos = List.of( + new TaskGrouper.TaskInfo("file1.parquet", "ADDED", 100, 0), + new TaskGrouper.TaskInfo("file2.parquet", "ADDED", 200, 0) ); - final List> groups = taskGrouper.pairByBounds(tasks); - // Null bounds -> can't pair -> fallback - assertThat(groups, hasSize(1)); - assertThat(groups.get(0), hasSize(2)); + + final boolean hasDeleted = taskInfos.stream() + .anyMatch(t -> "DELETED".equals(t.taskType)); + + assertThat(hasDeleted, equalTo(false)); } } diff --git a/data-prepper-plugins/iceberg-source/src/test/java/org/opensearch/dataprepper/plugins/source/iceberg/shuffle/LocalDiskShuffleStorageTest.java b/data-prepper-plugins/iceberg-source/src/test/java/org/opensearch/dataprepper/plugins/source/iceberg/shuffle/LocalDiskShuffleStorageTest.java new file mode 100644 index 0000000000..9bf78fdb52 --- /dev/null +++ b/data-prepper-plugins/iceberg-source/src/test/java/org/opensearch/dataprepper/plugins/source/iceberg/shuffle/LocalDiskShuffleStorageTest.java @@ -0,0 +1,109 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + * + */ + +package org.opensearch.dataprepper.plugins.source.iceberg.shuffle; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.io.TempDir; + +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.List; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.containsInAnyOrder; +import static org.hamcrest.Matchers.empty; +import static org.hamcrest.Matchers.hasSize; +import static org.hamcrest.Matchers.is; + +class LocalDiskShuffleStorageTest { + + @TempDir + Path tempDir; + + private LocalDiskShuffleStorage storage; + + @BeforeEach + void setUp() { + storage = new LocalDiskShuffleStorage(tempDir); + } + + @Test + void getTaskIds_returnsAllTasks() throws Exception { + writeTask("snap1", "taskA", 4); + writeTask("snap1", "taskB", 4); + + final List taskIds = storage.getTaskIds("snap1"); + assertThat(taskIds, hasSize(2)); + assertThat(taskIds, containsInAnyOrder("taskA", "taskB")); + } + + @Test + void getTaskIds_nonExistentSnapshot_returnsEmpty() { + assertThat(storage.getTaskIds("nonexistent"), is(empty())); + } + + @Test + void getPartitionSizes_aggregatesAcrossTasks() throws Exception { + // taskA: partition 0 has data, partition 1 empty + try (ShuffleWriter writer = storage.createWriter("snap1", "taskA", 2)) { + writer.addRecord(0, ShuffleRecord.OP_INSERT, 0, new byte[]{1, 2, 3}); + writer.finish(); + } + // taskB: partition 0 has data, partition 1 has data + try (ShuffleWriter writer = storage.createWriter("snap1", "taskB", 2)) { + writer.addRecord(0, ShuffleRecord.OP_INSERT, 0, new byte[]{4, 5}); + writer.addRecord(1, ShuffleRecord.OP_DELETE, 0, new byte[]{6, 7, 8, 9}); + writer.finish(); + } + + final long[] sizes = storage.getPartitionSizes("snap1", 2); + assertThat(sizes.length, is(2)); + // Partition 0: data from both tasks + assertThat(sizes[0] > 0, is(true)); + // Partition 1: data from taskB only + assertThat(sizes[1] > 0, is(true)); + // Partition 0 should be larger (more records) + assertThat(sizes[0] > sizes[1], is(true)); + } + + @Test + void cleanup_removesSnapshotDirectory() throws Exception { + writeTask("snap1", "task1", 2); + assertThat(Files.exists(tempDir.resolve("snap1")), is(true)); + + storage.cleanup("snap1"); + assertThat(Files.exists(tempDir.resolve("snap1")), is(false)); + } + + @Test + void cleanupAll_removesContentsButKeepsBaseDirectory() throws Exception { + writeTask("snap1", "task1", 2); + writeTask("snap2", "task1", 2); + + storage.cleanupAll(); + assertThat(Files.exists(tempDir), is(true)); + assertThat(storage.getTaskIds("snap1"), hasSize(0)); + assertThat(storage.getTaskIds("snap2"), hasSize(0)); + } + + @Test + void cleanup_nonExistentSnapshot_doesNotThrow() { + storage.cleanup("nonexistent"); + } + + private void writeTask(final String snapshotId, final String taskId, final int numPartitions) throws Exception { + try (ShuffleWriter writer = storage.createWriter(snapshotId, taskId, numPartitions)) { + writer.addRecord(0, ShuffleRecord.OP_INSERT, 0, new byte[]{1}); + writer.finish(); + } + } +} diff --git a/data-prepper-plugins/iceberg-source/src/test/java/org/opensearch/dataprepper/plugins/source/iceberg/shuffle/LocalDiskShuffleWriterReaderTest.java b/data-prepper-plugins/iceberg-source/src/test/java/org/opensearch/dataprepper/plugins/source/iceberg/shuffle/LocalDiskShuffleWriterReaderTest.java new file mode 100644 index 0000000000..bcb869111d --- /dev/null +++ b/data-prepper-plugins/iceberg-source/src/test/java/org/opensearch/dataprepper/plugins/source/iceberg/shuffle/LocalDiskShuffleWriterReaderTest.java @@ -0,0 +1,171 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + * + */ + +package org.opensearch.dataprepper.plugins.source.iceberg.shuffle; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.io.TempDir; + +import java.nio.file.Path; +import java.util.List; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.empty; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.hasSize; +import static org.hamcrest.Matchers.is; + +class LocalDiskShuffleWriterReaderTest { + + @TempDir + Path tempDir; + + private LocalDiskShuffleStorage storage; + + @BeforeEach + void setUp() { + storage = new LocalDiskShuffleStorage(tempDir); + } + + @Test + void roundTrip_singlePartition() throws Exception { + final byte[] data1 = {1, 2, 3}; + final byte[] data2 = {4, 5, 6, 7}; + + try (ShuffleWriter writer = storage.createWriter("snap1", "task1", 1)) { + writer.addRecord(0, ShuffleRecord.OP_INSERT, 0, data1); + writer.addRecord(0, ShuffleRecord.OP_DELETE, 0, data2); + writer.finish(); + } + + try (ShuffleReader reader = storage.createReader("snap1", "task1")) { + final List records = reader.readPartitions(0, 0); + assertThat(records, hasSize(2)); + assertThat(records.get(0).getOperation(), is(ShuffleRecord.OP_INSERT)); + assertThat(records.get(0).getSerializedRecord(), equalTo(data1)); + assertThat(records.get(1).getOperation(), is(ShuffleRecord.OP_DELETE)); + assertThat(records.get(1).getSerializedRecord(), equalTo(data2)); + assertThat(records.get(1).getChangeOrdinal(), is(0)); + } + } + + @Test + void roundTrip_multiplePartitions_recordsSortedByPartition() throws Exception { + try (ShuffleWriter writer = storage.createWriter("snap1", "task1", 4)) { + // Add records out of partition order + writer.addRecord(2, ShuffleRecord.OP_INSERT, 0, new byte[]{20}); + writer.addRecord(0, ShuffleRecord.OP_INSERT, 0, new byte[]{1}); + writer.addRecord(3, ShuffleRecord.OP_DELETE, 1, new byte[]{30}); + writer.addRecord(0, ShuffleRecord.OP_DELETE, 0, new byte[]{2}); + writer.addRecord(2, ShuffleRecord.OP_DELETE, 0, new byte[]{21}); + writer.finish(); + } + + try (ShuffleReader reader = storage.createReader("snap1", "task1")) { + // Partition 0: two records + final List p0 = reader.readPartitions(0, 0); + assertThat(p0, hasSize(2)); + assertThat(p0.get(0).getSerializedRecord(), equalTo(new byte[]{1})); + assertThat(p0.get(1).getSerializedRecord(), equalTo(new byte[]{2})); + + // Partition 1: empty + final List p1 = reader.readPartitions(1, 1); + assertThat(p1, is(empty())); + + // Partition 2: two records + final List p2 = reader.readPartitions(2, 2); + assertThat(p2, hasSize(2)); + assertThat(p2.get(0).getSerializedRecord(), equalTo(new byte[]{20})); + + // Partition 3: one record + final List p3 = reader.readPartitions(3, 3); + assertThat(p3, hasSize(1)); + assertThat(p3.get(0).getChangeOrdinal(), is(1)); + } + } + + @Test + void roundTrip_readPartitionRange() throws Exception { + try (ShuffleWriter writer = storage.createWriter("snap1", "task1", 4)) { + writer.addRecord(0, ShuffleRecord.OP_INSERT, 0, new byte[]{1}); + writer.addRecord(1, ShuffleRecord.OP_INSERT, 0, new byte[]{10}); + writer.addRecord(2, ShuffleRecord.OP_INSERT, 0, new byte[]{20}); + writer.addRecord(3, ShuffleRecord.OP_INSERT, 0, new byte[]{30}); + writer.finish(); + } + + try (ShuffleReader reader = storage.createReader("snap1", "task1")) { + // Read partitions 1-2 as a range + final List range = reader.readPartitions(1, 2); + assertThat(range, hasSize(2)); + assertThat(range.get(0).getSerializedRecord(), equalTo(new byte[]{10})); + assertThat(range.get(1).getSerializedRecord(), equalTo(new byte[]{20})); + } + } + + @Test + void emptyPartitions_indexOffsetsCorrect() throws Exception { + try (ShuffleWriter writer = storage.createWriter("snap1", "task1", 5)) { + // Only partition 2 has data + writer.addRecord(2, ShuffleRecord.OP_INSERT, 0, new byte[]{99}); + writer.finish(); + } + + try (ShuffleReader reader = storage.createReader("snap1", "task1")) { + final long[] offsets = reader.readIndex(); + assertThat(offsets.length, is(6)); // 5 partitions + 1 + + // Partitions 0, 1 are empty + assertThat(offsets[0], is(0L)); + assertThat(offsets[1], is(0L)); + assertThat(offsets[2], is(0L)); + + // Partition 2 has data + assertThat(offsets[3] > offsets[2], is(true)); + + // Partitions 3, 4 are empty + assertThat(offsets[4], is(offsets[3])); + assertThat(offsets[5], is(offsets[3])); + + // Reading empty partitions returns empty list + assertThat(reader.readPartitions(0, 0), is(empty())); + assertThat(reader.readPartitions(3, 4), is(empty())); + + // Reading partition 2 returns the record + final List p2 = reader.readPartitions(2, 2); + assertThat(p2, hasSize(1)); + assertThat(p2.get(0).getSerializedRecord(), equalTo(new byte[]{99})); + } + } + + @Test + void readBytes_returnsCorrectRange() throws Exception { + try (ShuffleWriter writer = storage.createWriter("snap1", "task1", 2)) { + writer.addRecord(0, ShuffleRecord.OP_INSERT, 0, new byte[]{1, 2, 3}); + writer.addRecord(1, ShuffleRecord.OP_INSERT, 0, new byte[]{4, 5, 6}); + writer.finish(); + } + + try (ShuffleReader reader = storage.createReader("snap1", "task1")) { + final long[] offsets = reader.readIndex(); + // Read only partition 1's compressed block + final long offset = offsets[1]; + final int length = (int) (offsets[2] - offsets[1]); + final byte[] compressedBlock = reader.readBytes(offset, length); + + // Decompress and parse + final byte[] uncompressed = LocalDiskShuffleReader.decompressBlock(compressedBlock); + final List records = LocalDiskShuffleReader.parseRecords(uncompressed); + assertThat(records, hasSize(1)); + assertThat(records.get(0).getSerializedRecord(), equalTo(new byte[]{4, 5, 6})); + } + } +} diff --git a/data-prepper-plugins/iceberg-source/src/test/java/org/opensearch/dataprepper/plugins/source/iceberg/shuffle/ShuffleConfigTest.java b/data-prepper-plugins/iceberg-source/src/test/java/org/opensearch/dataprepper/plugins/source/iceberg/shuffle/ShuffleConfigTest.java new file mode 100644 index 0000000000..70ee678384 --- /dev/null +++ b/data-prepper-plugins/iceberg-source/src/test/java/org/opensearch/dataprepper/plugins/source/iceberg/shuffle/ShuffleConfigTest.java @@ -0,0 +1,104 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + * + */ + +package org.opensearch.dataprepper.plugins.source.iceberg.shuffle; + +import com.fasterxml.jackson.databind.ObjectMapper; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.CsvSource; + +import java.util.Map; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.nullValue; + +class ShuffleConfigTest { + + private static final ObjectMapper MAPPER = new ObjectMapper(); + + private ShuffleConfig deserialize(final Map map) throws Exception { + final String json = MAPPER.writeValueAsString(map); + return MAPPER.readValue(json, ShuffleConfig.class); + } + + @Test + void defaults() throws Exception { + final ShuffleConfig config = deserialize(Map.of()); + assertThat(config.getPartitions(), is(ShuffleConfig.DEFAULT_PARTITIONS)); + assertThat(config.getServerPort(), is(ShuffleConfig.DEFAULT_SERVER_PORT)); + assertThat(config.isSsl(), is(true)); + assertThat(config.isSslInsecureDisableVerification(), is(false)); + } + + @Test + void ssl_disabled_does_not_require_cert_files() throws Exception { + final ShuffleConfig config = deserialize(Map.of("ssl", false)); + assertThat(config.isSslCertificateFileValid(), is(true)); + assertThat(config.isSslKeyFileValid(), is(true)); + } + + @Test + void ssl_enabled_without_cert_files_fails_validation() throws Exception { + final ShuffleConfig config = deserialize(Map.of("ssl", true)); + assertThat(config.isSslCertificateFileValid(), is(false)); + assertThat(config.isSslKeyFileValid(), is(false)); + } + + @Test + void ssl_enabled_with_cert_files_passes_validation() throws Exception { + final ShuffleConfig config = deserialize(Map.of( + "ssl", true, + "ssl_certificate_file", "/path/to/cert.pem", + "ssl_key_file", "/path/to/key.pem" + )); + assertThat(config.isSslCertificateFileValid(), is(true)); + assertThat(config.isSslKeyFileValid(), is(true)); + } + + @ParameterizedTest + @CsvSource({"true", "false"}) + void ssl_insecure_disable_verification_deserialized(final boolean value) throws Exception { + final ShuffleConfig config = deserialize(Map.of( + "ssl", false, + "ssl_insecure_disable_verification", value + )); + assertThat(config.isSslInsecureDisableVerification(), is(value)); + } + + @Test + void authentication_is_deserialized() throws Exception { + final ShuffleConfig config = deserialize(Map.of( + "ssl", false, + "authentication", Map.of("http_basic", Map.of("username", "admin", "password", "secret")) + )); + assertThat(config.getAuthentication().getPluginName(), equalTo("http_basic")); + } + + @Test + void authentication_defaults_to_null() throws Exception { + final ShuffleConfig config = deserialize(Map.of("ssl", false)); + assertThat(config.getAuthentication(), is(nullValue())); + } + + @Test + void storage_path_is_deserialized() throws Exception { + final ShuffleConfig config = deserialize(Map.of("ssl", false, "storage_path", "/custom/shuffle")); + assertThat(config.getStoragePath(), equalTo("/custom/shuffle")); + } + + @Test + void storage_path_defaults_to_null() throws Exception { + final ShuffleConfig config = deserialize(Map.of("ssl", false)); + assertThat(config.getStoragePath(), is(nullValue())); + } +} diff --git a/data-prepper-plugins/iceberg-source/src/test/java/org/opensearch/dataprepper/plugins/source/iceberg/shuffle/ShuffleHttpServiceTest.java b/data-prepper-plugins/iceberg-source/src/test/java/org/opensearch/dataprepper/plugins/source/iceberg/shuffle/ShuffleHttpServiceTest.java new file mode 100644 index 0000000000..11a1af14e9 --- /dev/null +++ b/data-prepper-plugins/iceberg-source/src/test/java/org/opensearch/dataprepper/plugins/source/iceberg/shuffle/ShuffleHttpServiceTest.java @@ -0,0 +1,111 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + * + */ + +package org.opensearch.dataprepper.plugins.source.iceberg.shuffle; + +import com.linecorp.armeria.client.ClientFactory; +import com.linecorp.armeria.client.WebClient; +import com.linecorp.armeria.common.AggregatedHttpResponse; +import com.linecorp.armeria.common.HttpStatus; +import com.linecorp.armeria.server.Server; +import com.linecorp.armeria.server.ServerBuilder; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ValueSource; + +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.equalTo; + +class ShuffleHttpServiceTest { + + private Path tempDir; + private Server server; + private ClientFactory clientFactory; + private WebClient client; + + @BeforeEach + void setUp() throws IOException { + tempDir = Files.createTempDirectory("shuffle-http-test"); + final LocalDiskShuffleStorage storage = new LocalDiskShuffleStorage(tempDir); + final ShuffleHttpService service = new ShuffleHttpService(storage); + + final ServerBuilder sb = Server.builder(); + sb.http(0); + sb.annotatedService("/shuffle", service); + server = sb.build(); + server.start().join(); + + clientFactory = ClientFactory.builder().build(); + client = WebClient.builder("http://127.0.0.1:" + server.activeLocalPort()) + .factory(clientFactory) + .build(); + } + + @AfterEach + void tearDown() { + if (clientFactory != null) { + clientFactory.close(); + } + if (server != null) { + server.stop().join(); + } + } + + @ParameterizedTest + @ValueSource(strings = {"../etc/passwd", "abc", "12.34", "-1"}) + void getIndex_returns_bad_request_for_invalid_snapshotId(final String snapshotId) { + final AggregatedHttpResponse response = client.get("/shuffle/" + snapshotId + "/abcd1234/index").aggregate().join(); + assertThat(response.status(), equalTo(HttpStatus.BAD_REQUEST)); + } + + @ParameterizedTest + @ValueSource(strings = {"../etc/passwd", "ABCD", "task-id"}) + void getIndex_returns_bad_request_for_invalid_taskId(final String taskId) { + final AggregatedHttpResponse response = client.get("/shuffle/12345/" + taskId + "/index").aggregate().join(); + assertThat(response.status(), equalTo(HttpStatus.BAD_REQUEST)); + } + + @Test + void getData_returns_bad_request_for_negative_offset() { + final AggregatedHttpResponse response = client.get("/shuffle/12345/abcd1234/data?offset=-1&length=10").aggregate().join(); + assertThat(response.status(), equalTo(HttpStatus.BAD_REQUEST)); + } + + @Test + void getData_returns_bad_request_for_negative_length() { + final AggregatedHttpResponse response = client.get("/shuffle/12345/abcd1234/data?offset=0&length=-1").aggregate().join(); + assertThat(response.status(), equalTo(HttpStatus.BAD_REQUEST)); + } + + @ParameterizedTest + @ValueSource(strings = {"../etc", "abc", "12.34"}) + void cleanup_returns_bad_request_for_invalid_snapshotId(final String snapshotId) { + final AggregatedHttpResponse response = client.delete("/shuffle/" + snapshotId).aggregate().join(); + assertThat(response.status(), equalTo(HttpStatus.BAD_REQUEST)); + } + + @Test + void getIndex_returns_not_found_for_nonexistent_file() { + final AggregatedHttpResponse response = client.get("/shuffle/99999/abcd1234/index").aggregate().join(); + assertThat(response.status(), equalTo(HttpStatus.NOT_FOUND)); + } + + @Test + void getData_returns_ok_for_zero_length() { + final AggregatedHttpResponse response = client.get("/shuffle/12345/abcd1234/data?offset=0&length=0").aggregate().join(); + assertThat(response.status(), equalTo(HttpStatus.OK)); + } +} diff --git a/data-prepper-plugins/iceberg-source/src/test/java/org/opensearch/dataprepper/plugins/source/iceberg/shuffle/ShuffleNodeClientTest.java b/data-prepper-plugins/iceberg-source/src/test/java/org/opensearch/dataprepper/plugins/source/iceberg/shuffle/ShuffleNodeClientTest.java new file mode 100644 index 0000000000..a67f1d2d79 --- /dev/null +++ b/data-prepper-plugins/iceberg-source/src/test/java/org/opensearch/dataprepper/plugins/source/iceberg/shuffle/ShuffleNodeClientTest.java @@ -0,0 +1,148 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + * + */ + +package org.opensearch.dataprepper.plugins.source.iceberg.shuffle; + +import com.linecorp.armeria.server.Server; +import com.linecorp.armeria.server.ServerBuilder; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.io.TempDir; + +import java.nio.file.Path; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.greaterThan; +import static org.hamcrest.Matchers.hasSize; +import static org.hamcrest.Matchers.notNullValue; + +class ShuffleNodeClientTest { + + @TempDir + Path tempDir; + + private LocalDiskShuffleStorage storage; + private Server server; + private int port; + private ShuffleNodeClient client; + + @BeforeEach + void setUp() throws Exception { + storage = new LocalDiskShuffleStorage(tempDir); + + // Write test shuffle data + try (ShuffleWriter writer = storage.createWriter("12345", "abcd1234", 4)) { + writer.addRecord(0, ShuffleRecord.OP_INSERT, 0, new byte[]{1, 2, 3}); + writer.addRecord(2, ShuffleRecord.OP_DELETE, 0, new byte[]{4, 5}); + writer.finish(); + } + + // Start a real ShuffleHttpServer on random port + final ShuffleHttpService service = new ShuffleHttpService(storage); + final ServerBuilder sb = Server.builder(); + sb.http(0); + sb.annotatedService("/shuffle", service); + server = sb.build(); + server.start().join(); + port = server.activeLocalPort(); + + final ShuffleConfig config = createConfig(port); + + client = new ShuffleNodeClient(config); + } + + private static ShuffleConfig createConfig(final int port) throws Exception { + final com.fasterxml.jackson.databind.ObjectMapper mapper = new com.fasterxml.jackson.databind.ObjectMapper(); + return mapper.readValue( + mapper.writeValueAsString(java.util.Map.of("port", port, "ssl", false)), + ShuffleConfig.class); + } + + @AfterEach + void tearDown() { + if (server != null) { + server.stop().join(); + } + } + + @Test + void pullIndex_returnsOffsets() throws Exception { + final long[] offsets = client.pullIndex("localhost", "12345", "abcd1234"); + + // 4 partitions + 1 = 5 offsets + assertThat(offsets.length, is(5)); + assertThat(offsets[0], is(0L)); + // Partition 0 has data + assertThat(offsets[1], greaterThan(offsets[0])); + // Partition 1 is empty + assertThat(offsets[2], is(offsets[1])); + // Partition 2 has data + assertThat(offsets[3], greaterThan(offsets[2])); + // Partition 3 is empty + assertThat(offsets[4], is(offsets[3])); + } + + @Test + void pullData_returnsCompressedBlock() throws Exception { + final long[] offsets = client.pullIndex("localhost", "12345", "abcd1234"); + final long offset = offsets[0]; + final int length = (int) (offsets[1] - offsets[0]); + + final byte[] data = client.pullData("localhost", "12345", "abcd1234", offset, length); + + assertThat(data, notNullValue()); + assertThat(data.length, is(length)); + + // Verify it can be decompressed and parsed + final byte[] uncompressed = LocalDiskShuffleReader.decompressBlock(data); + final var records = LocalDiskShuffleReader.parseRecords(uncompressed); + assertThat(records.size(), is(1)); + assertThat(records.get(0).getOperation(), is(ShuffleRecord.OP_INSERT)); + } + + @Test + void isLocalAddress_localhost_returnsTrue() { + assertThat(ShuffleNodeClient.isLocalAddress("localhost"), is(true)); + assertThat(ShuffleNodeClient.isLocalAddress("127.0.0.1"), is(true)); + } + + @Test + void isLocalAddress_remoteAddress_returnsFalse() { + assertThat(ShuffleNodeClient.isLocalAddress("192.0.2.1"), is(false)); + } + + @Test + void resolveLocalAddress_returnsNonNull() { + assertThat(ShuffleNodeClient.resolveLocalAddress(), notNullValue()); + } + + @Test + void requestCleanup_deletesShuffleFiles() throws Exception { + assertThat(storage.getTaskIds("12345"), hasSize(1)); + + // Test the DELETE endpoint directly with a synchronous call to avoid flakiness + final java.net.http.HttpClient httpClient = java.net.http.HttpClient.newHttpClient(); + final java.net.http.HttpResponse response = httpClient.send( + java.net.http.HttpRequest.newBuilder( + java.net.URI.create("http://localhost:" + port + "/shuffle/12345")) + .DELETE().build(), + java.net.http.HttpResponse.BodyHandlers.discarding()); + assertThat(response.statusCode(), is(200)); + assertThat(storage.getTaskIds("12345"), hasSize(0)); + } + + @Test + void requestCleanup_nonExistentSnapshot_doesNotThrow() { + // Should not throw, just log a warning at most + client.requestCleanup("localhost", "nonexistent"); + } +} diff --git a/data-prepper-plugins/iceberg-source/src/test/java/org/opensearch/dataprepper/plugins/source/iceberg/shuffle/ShufflePartitionCoalescerTest.java b/data-prepper-plugins/iceberg-source/src/test/java/org/opensearch/dataprepper/plugins/source/iceberg/shuffle/ShufflePartitionCoalescerTest.java new file mode 100644 index 0000000000..991594ccea --- /dev/null +++ b/data-prepper-plugins/iceberg-source/src/test/java/org/opensearch/dataprepper/plugins/source/iceberg/shuffle/ShufflePartitionCoalescerTest.java @@ -0,0 +1,83 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + * + */ + +package org.opensearch.dataprepper.plugins.source.iceberg.shuffle; + +import org.junit.jupiter.api.Test; + +import java.util.List; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.empty; +import static org.hamcrest.Matchers.hasSize; +import static org.hamcrest.Matchers.is; + +class ShufflePartitionCoalescerTest { + + @Test + void allEmpty_returnsNoRanges() { + final var coalescer = new ShufflePartitionCoalescer(64); + final List result = + coalescer.coalesce(new long[]{0, 0, 0, 0}); + assertThat(result, is(empty())); + } + + @Test + void singleNonEmpty_returnsSingleRange() { + final var coalescer = new ShufflePartitionCoalescer(100); + final List result = + coalescer.coalesce(new long[]{0, 0, 50, 0}); + assertThat(result, hasSize(1)); + assertThat(result.get(0).getStartPartition(), is(2)); + assertThat(result.get(0).getEndPartitionInclusive(), is(3)); + } + + @Test + void smallPartitions_coalescedTogether() { + final var coalescer = new ShufflePartitionCoalescer(100); + // 10 + 20 + 30 = 60 < 100, all fit in one range + final List result = + coalescer.coalesce(new long[]{10, 20, 30, 0}); + assertThat(result, hasSize(1)); + assertThat(result.get(0).getStartPartition(), is(0)); + } + + @Test + void largePartitions_splitIntoSeparateRanges() { + final var coalescer = new ShufflePartitionCoalescer(100); + // 80 + 90 > 100, must split + final List result = + coalescer.coalesce(new long[]{80, 90, 10}); + assertThat(result, hasSize(2)); + assertThat(result.get(0).getStartPartition(), is(0)); + assertThat(result.get(0).getEndPartitionInclusive(), is(0)); + assertThat(result.get(1).getStartPartition(), is(1)); + assertThat(result.get(1).getEndPartitionInclusive(), is(2)); + } + + @Test + void emptyPartitions_skippedInCoalescing() { + final var coalescer = new ShufflePartitionCoalescer(100); + // partitions: 10, 0, 0, 20, 0, 30 -> coalesced as 10+20+30=60 + final List result = + coalescer.coalesce(new long[]{10, 0, 0, 20, 0, 30}); + assertThat(result, hasSize(1)); + assertThat(result.get(0).getStartPartition(), is(0)); + assertThat(result.get(0).getEndPartitionInclusive(), is(5)); + } + + @Test + void eachPartitionExceedsTarget_oneRangePerPartition() { + final var coalescer = new ShufflePartitionCoalescer(50); + final List result = + coalescer.coalesce(new long[]{60, 70, 80}); + assertThat(result, hasSize(3)); + } +} diff --git a/data-prepper-plugins/iceberg-source/src/test/java/org/opensearch/dataprepper/plugins/source/iceberg/worker/RecordAvroSerializerTest.java b/data-prepper-plugins/iceberg-source/src/test/java/org/opensearch/dataprepper/plugins/source/iceberg/worker/RecordAvroSerializerTest.java new file mode 100644 index 0000000000..86a061173e --- /dev/null +++ b/data-prepper-plugins/iceberg-source/src/test/java/org/opensearch/dataprepper/plugins/source/iceberg/worker/RecordAvroSerializerTest.java @@ -0,0 +1,92 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + * + */ + +package org.opensearch.dataprepper.plugins.source.iceberg.worker; + +import org.apache.iceberg.Schema; +import org.apache.iceberg.avro.AvroSchemaUtil; +import org.apache.iceberg.data.GenericRecord; +import org.apache.iceberg.data.Record; +import org.apache.iceberg.types.Types; +import org.junit.jupiter.api.Test; + +import java.io.IOException; +import java.time.LocalDate; +import java.time.LocalTime; +import java.time.LocalDateTime; +import java.time.OffsetDateTime; +import java.time.ZoneOffset; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.equalTo; + +class RecordAvroSerializerTest { + + @Test + void roundtrip_with_temporal_types() throws IOException { + final Schema icebergSchema = new Schema( + Types.NestedField.required(1, "id", Types.IntegerType.get()), + Types.NestedField.optional(2, "name", Types.StringType.get()), + Types.NestedField.optional(3, "event_date", Types.DateType.get()), + Types.NestedField.optional(4, "event_time", Types.TimeType.get()), + Types.NestedField.optional(5, "created_at", Types.TimestampType.withoutZone()), + Types.NestedField.optional(6, "updated_at", Types.TimestampType.withZone()), + Types.NestedField.optional(7, "score", Types.DoubleType.get()), + Types.NestedField.optional(8, "active", Types.BooleanType.get()) + ); + + final org.apache.avro.Schema avroSchema = AvroSchemaUtil.convert(icebergSchema, "test_table"); + + final GenericRecord original = GenericRecord.create(icebergSchema); + original.setField("id", 42); + original.setField("name", "Alice"); + original.setField("event_date", LocalDate.of(2024, 3, 15)); + original.setField("event_time", LocalTime.of(14, 30, 0)); + original.setField("created_at", LocalDateTime.of(2024, 3, 15, 14, 30, 0)); + original.setField("updated_at", OffsetDateTime.of(2024, 3, 15, 14, 30, 0, 0, ZoneOffset.UTC)); + original.setField("score", 99.5); + original.setField("active", true); + + final byte[] serialized = RecordAvroSerializer.serialize(original, avroSchema); + final Record deserialized = RecordAvroSerializer.deserialize(serialized, icebergSchema, avroSchema); + + assertThat(deserialized.getField("id"), equalTo(42)); + assertThat(deserialized.getField("name"), equalTo("Alice")); + assertThat(deserialized.getField("event_date"), equalTo(LocalDate.of(2024, 3, 15))); + assertThat(deserialized.getField("event_time"), equalTo(LocalTime.of(14, 30, 0))); + assertThat(deserialized.getField("created_at"), equalTo(LocalDateTime.of(2024, 3, 15, 14, 30, 0))); + assertThat(deserialized.getField("updated_at"), equalTo(OffsetDateTime.of(2024, 3, 15, 14, 30, 0, 0, ZoneOffset.UTC))); + assertThat(deserialized.getField("score"), equalTo(99.5)); + assertThat(deserialized.getField("active"), equalTo(true)); + } + + @Test + void roundtrip_with_null_values() throws IOException { + final Schema icebergSchema = new Schema( + Types.NestedField.required(1, "id", Types.IntegerType.get()), + Types.NestedField.optional(2, "updated_at", Types.TimestampType.withZone()), + Types.NestedField.optional(3, "name", Types.StringType.get()) + ); + + final org.apache.avro.Schema avroSchema = AvroSchemaUtil.convert(icebergSchema, "test_table"); + + final GenericRecord original = GenericRecord.create(icebergSchema); + original.setField("id", 1); + original.setField("updated_at", null); + original.setField("name", null); + + final byte[] serialized = RecordAvroSerializer.serialize(original, avroSchema); + final Record deserialized = RecordAvroSerializer.deserialize(serialized, icebergSchema, avroSchema); + + assertThat(deserialized.getField("id"), equalTo(1)); + assertThat(deserialized.getField("updated_at"), equalTo(null)); + assertThat(deserialized.getField("name"), equalTo(null)); + } +}