Add source-layer shuffle to iceberg-source for correct and scalable C…#6682
Add source-layer shuffle to iceberg-source for correct and scalable C…#6682lawofcycles wants to merge 20 commits intoopensearch-project:mainfrom
Conversation
|
@dlvenable I would appreciate your review on this PR. This implements the source-layer shuffle discussed in #6554 (comment), addressing both the correctness bug (cross-partition UPDATE data loss) and the scalability limitation (bounds-based pairing fallback to single-node processing) described in #6666. Once this is merged, I plan to update the CDC RFC (#6552) to reflect the shuffle design. |
|
I have opened a documentation PR for Iceberg source plugin: opensearch-project/documentation-website#12164. It reflects the current state of the implementation and would be useful as a reference during review. |
|
The shuffle HTTP server supports TLS for encryption in transit. Request-level authentication (e.g. mutual TLS) is not included in this PR but can be added as a follow-up. If authentication should be included in the initial implementation, please let me know. |
|
Fixed a bug where |
|
Fixed another issue: Previously, |
|
During multi-node performance testing, I found a race condition where |
✅ License Header Check PassedAll newly added files have proper license headers. Great work! 🎉 |
2d1d6f1 to
bd4e2c5
Compare
|
I addressed bugs and improvement points that I found during the e2e performance test. |
dlvenable
left a comment
There was a problem hiding this comment.
Thanks @lawofcycles for this improvement!
| * Standalone Armeria HTTP server for serving shuffle data. | ||
| * Runs independently from PeerForwarder to avoid core dependencies. | ||
| */ | ||
| public class ShuffleHttpServer { |
There was a problem hiding this comment.
We should use the common HTTP server code. This is important for providing consistent experience with ports that Data Prepper may open. You can see how this is used in the http source for example.
There was a problem hiding this comment.
Migrated to use CreateServer from http-common. Added a lightweight createHTTPServer overload that takes a CertificateProvider and optional ArmeriaHttpAuthenticationProvider, without the buffer, throttling, and health check dependencies. ShuffleConfig now extends BaseHttpServerConfig so it inherits the standard TLS configuration (file, S3, ACM) via CertificateProviderFactory.
| @JsonProperty("target_partition_size") | ||
| private ByteCount targetPartitionSize = ByteCount.parse(DEFAULT_TARGET_PARTITION_SIZE); | ||
|
|
||
| @JsonProperty("server_port") |
There was a problem hiding this comment.
The approach of using S3 versus local disk should probably be handled via a plugin. This would allow us to extend it more. This is how we support different source coordination mechanisms or different authentication mechanisms in plugins like the http source.
Since this source is @Experimental we can break the config between versions so we don't need to do this now.
There was a problem hiding this comment.
Acknowledged. The current ShuffleStorage interface already abstracts the storage backend, so extending it to a plugin model in a future version should be straightforward.
| private static final Logger LOG = LoggerFactory.getLogger(ShuffleNodeClient.class); | ||
| private static final int MAX_RETRIES = 3; | ||
|
|
||
| private final HttpClient httpClient; |
There was a problem hiding this comment.
Let's use Armeria's web client for web requests.
There was a problem hiding this comment.
Replaced java.net.http.HttpClient with Armeria WebClient. TLS verification bypass now uses ClientFactory.builder().tlsNoVerify() instead of a custom TrustAllCerts implementation.
…DC processing Introduce a pull-based shuffle mechanism for processing snapshots that contain DELETE operations (UPDATE/DELETE in Copy-on-Write tables). When a snapshot contains DeletedDataFileScanTasks, records are shuffled by identifier_columns hash across nodes so that carryover removal and UPDATE merge operate on complete data, including cross-partition updates. Signed-off-by: Sotaro Hikita <bering1814@gmail.com>
Signed-off-by: Sotaro Hikita <bering1814@gmail.com>
Signed-off-by: Sotaro Hikita <bering1814@gmail.com>
… to shuffle server Signed-off-by: Sotaro Hikita <bering1814@gmail.com>
…lient utility Signed-off-by: Sotaro Hikita <bering1814@gmail.com>
Signed-off-by: Sotaro Hikita <bering1814@gmail.com>
…te before partitions Signed-off-by: Sotaro Hikita <bering1814@gmail.com>
…eader for correct type handling The shuffle record serialization used GenericDatumWriter/GenericDatumReader which only handle Avro native types. Iceberg Records contain Java types like OffsetDateTime for timestamptz columns, causing AvroRuntimeException during SHUFFLE_WRITE. Replace with Iceberg's DataWriter and PlannedDataReader which handle the Iceberg-to-Avro type conversion internally. Extract serialization logic into RecordAvroSerializer utility class with roundtrip tests covering temporal types. Also fix shuffle write completion key race condition by creating GlobalState before partitions in processShuffleSnapshot, matching the order used in processInsertOnlySnapshot. Signed-off-by: Sotaro Hikita <bering1814@gmail.com>
Migrate ShuffleHttpServer to use CreateServer from http-common and ShuffleNodeClient to use Armeria WebClient. ShuffleConfig now extends BaseHttpServerConfig for consistent TLS configuration including ACM and S3 certificate support. Signed-off-by: Sotaro Hikita <bering1814@gmail.com>
bd4e2c5 to
84858c6
Compare
| @Override | ||
| public void addRecord(final int partitionNumber, final byte operation, final int changeOrdinal, | ||
| final byte[] serializedRecord) { | ||
| buffer.add(new BufferedRecord(partitionNumber, operation, changeOrdinal, serializedRecord)); |
There was a problem hiding this comment.
The writer buffers all records in memory before sorting. With multiple concurrent SHUFFLE_WRITE tasks on the same node, combined heap usage could be significant. Consider adding a configurable memory limit that spills to disk when exceeded.
There was a problem hiding this comment.
Thank you for the review. In the current implementation, each node runs a single ChangelogWorker thread that processes tasks sequentially via acquireAvailablePartition, so multiple SHUFFLE_WRITE tasks do not run concurrently on the same node. The memory usage per task is bounded by one Iceberg data file (default 512MB).
Parallelizing worker threads is something I am planning to work on for performance improvement. When implementing that, adding a configurable memory limit with spill to disk will be necessary to keep the combined heap usage under control. I will create a follow up issue to track both together.
There was a problem hiding this comment.
Thanks for the clarification and for tracking it.
Performance test resultsTested with an Iceberg table (NYC Yellow Taxi 2024, 41 million rows, 19 columns, partitioned by day) on ECS Fargate (2 vCPU / 16 GiB per node, ARM64). Source coordination backed by DynamoDB (provisioned 5,000 RCU / 5,000 WCU). OpenSearch cluster: 6 x r7g.4xlarge. Measurements represent the iceberg-source plugin's processing time from snapshot detection to pipeline buffer submission ( UPDATE/DELETE operations target randomly sampled rows, so the affected rows are scattered across many data files. Due to Copy on Write, each affected data file is fully rewritten even if only one row changes, so the shuffle phase reads all rows in the rewritten files.
INSERT operations complete in under 10 seconds regardless of data volume (100K vs 1M rows), as the bottleneck is task coordination overhead rather than data reading. Small UPDATE/DELETE (50K rows) with shuffle completes in 21 to 37 seconds, scaling well with node count. UPDATE/DELETE is significantly slower than INSERT even for smaller row counts (50K UPDATE: 28 to 35s vs 100K INSERT: 5 to 8s). This is because Iceberg's Copy on Write rewrites entire data files for each affected row. The shuffle phase must read all rows in the rewritten files, not just the changed rows. A 50K row UPDATE scattered across many data files results in reading millions of rows. Large UPDATE/DELETE (500K+ rows) creates thousands of SHUFFLE_WRITE tasks (one per data file). Source coordination stores all tasks under the same DynamoDB partition key, which has a per partition write throughput limit of 1,000 WCU/s regardless of provisioned capacity. This causes write throttling that degrades performance. Grouping multiple files into a single SHUFFLE_WRITE task based on (*) The 16 node UPDATE created more SHUFFLE_WRITE tasks than the 8 node run (2,349 vs 2,068), which amplified the DynamoDB throttling impact. Task count varies between runs because it depends on the number of data files affected by Copy on Write. |
dlvenable
left a comment
There was a problem hiding this comment.
Thank you @lawofcycles for this great contribution! Overall this looks good. I do want to be sure we address some of the security issues before merging though.
| serverConfig, LOG, PluginMetrics.fromNames("shuffle", "iceberg-source"), | ||
| "shuffle-server", "iceberg-cdc-pipeline"); | ||
|
|
||
| server = createServer.createHTTPServer(certificateProvider, null, service, "/shuffle"); |
There was a problem hiding this comment.
This is passing null for authentication. We will need to support authentication plugins to merge this. You can see how other configs do this.
There was a problem hiding this comment.
Added authentication plugin support. IcebergService now loads an ArmeriaHttpAuthenticationProvider via PluginFactory using the same pattern as HTTPSource. When authentication is not configured, it falls back to UnauthenticatedArmeriaHttpAuthenticationProvider with a warning log.
There was a problem hiding this comment.
After further consideration, I realized that ArmeriaHttpAuthenticationProvider (e.g. http_basic) is designed for external client-facing endpoints. It provides server-side request validation but has no client-side counterpart for sending credentials. Since shuffle is node-to-node communication where both sides are Data Prepper nodes, the client would also need to authenticate, which ArmeriaHttpAuthenticationProvider does not support.
PeerForwarder addresses node-to-node authentication using mutual TLS. I am thinking of taking the same approach for shuffle. Does this approach sound reasonable?
| import org.opensearch.dataprepper.http.BaseHttpServerConfig; | ||
| import org.opensearch.dataprepper.model.types.ByteCount; | ||
|
|
||
| @JsonIgnoreProperties({"path", "compression", "authentication", "health_check_service", |
There was a problem hiding this comment.
What does this do? Does this make these non-configurable? We should definitely support authentication. Others can come later.
There was a problem hiding this comment.
Apologies for the oversight. Removed authentication from @JsonIgnoreProperties. The field is inherited from BaseHttpServerConfig and is now deserialized from YAML. Added tests in ShuffleConfigTest to verify deserialization and the null default.
| this.pluginMetrics = pluginMetrics; | ||
| this.acknowledgementSetManager = acknowledgementSetManager; | ||
| this.eventFactory = eventFactory; | ||
| final Path shuffleBaseDir = Path.of(System.getProperty("java.io.tmpdir"), "data-prepper-shuffle"); |
There was a problem hiding this comment.
We should make this configurable. Also I wonder if we should have this nested in the data-prepper directory to start. I think in our Docker deployment and perhaps for other installations via tar.gz, the user that Data Prepper runs under might not have access to this directory. We hit this once with service map, though I don't recall all the details.
Did you test this using the Docker container built by Data Prepper?
There was a problem hiding this comment.
Made the shuffle storage path configurable via storage_path under shuffle. The default resolution order is: explicit storage_path setting, then ${data-prepper.dir}/data/shuffle (following the same pattern as the GeoIP processor), then ${java.io.tmpdir}/data-prepper-shuffle as a fallback for test environments where data-prepper.dir is not set.
I have not tested with the official Docker image built by Data Prepper's release process. My multi-node verification used locally built Docker images. With data-prepper.dir/data/shuffle as the default, the path is under Data Prepper's home directory where the running user has write access.
|
|
||
| @Override | ||
| public ShuffleWriter createWriter(final String snapshotId, final String taskId, final int numPartitions) { | ||
| final Path snapshotDir = baseDir.resolve(snapshotId); |
There was a problem hiding this comment.
Calling resolve can allow for .. which could allow invalid input to resolve to files in other directories. I think we want a couple things - disallow ... Also it would be extra secure to confirm the resolved path is a child of the base path before reading any content. This would give extra assurance.
e.g.
final Path snapshotDir = baseDir.resolve(snapshotId);
if(!isSubdirectory(snapshotDir)) {
throw ...
}
There was a problem hiding this comment.
Added validateSubdirectory() to LocalDiskShuffleStorage. All paths produced by resolve() with external input (snapshotId, taskId) are now normalized and verified to be children of the base directory before any file operation. Covers dataFilePath, indexFilePath, createWriter, getTaskIds, and cleanup.
| } | ||
|
|
||
| @Get("/{snapshotId}/{taskId}/index") | ||
| public HttpResponse getIndex(@Param("snapshotId") final String snapshotId, |
There was a problem hiding this comment.
We should have input validation on all the input parameters. Using JSR-303 bean validations would be ideal.
We should have some tests to verify this as well.
As I noted in another comment, invalid input could possibly lead to reading from other files.
There was a problem hiding this comment.
Added input validation to all three endpoints. snapshotId is validated as numeric (Iceberg snapshot IDs are long values), taskId as lowercase hex (SHA-256 hash prefix), and offset/length as non-negative. Invalid input returns 400 Bad Request.
Added ShuffleHttpServiceTest with parameterized tests covering invalid snapshotId, invalid taskId, negative offset/length, and valid edge cases (nonexistent file returns 404, zero length returns 200).
|
|
||
| private synchronized void registerShuffleWriteLocation(final long snapshotId, final String taskId, final String nodeAddress) { | ||
| final String locationKey = "shuffle-locations-" + snapshotId; | ||
| while (true) { |
There was a problem hiding this comment.
This will keep trying to write until it succeeds. Should this have some form of timeout at which to give up the location?
Also this loop will hold the whole thread.
There was a problem hiding this comment.
Replaced the unbounded while(true) loops with a retry limit of 10 attempts in both registerShuffleWriteLocation and incrementSnapshotCompletionCount. If all retries are exhausted, a RuntimeException is thrown which causes the task to fail. The leader will then detect the incomplete shuffle and retry the snapshot from Phase 1.
| int hash = 0; | ||
| for (final String col : identifierColumns) { | ||
| final Object val = record.getField(col); | ||
| hash = 31 * hash + (val != null ? val.hashCode() : 0); |
There was a problem hiding this comment.
Could this hash code lead to inconsistent results across nodes or even across server restarts?
I'm wondering about two things:
- Different JVMs may have different hash codes
- This is an
Object. Are we sure that each possible class has a hash code implementation that will work? By default the hash is based on the object in memory.
There was a problem hiding this comment.
Good catch. The shuffle requires all nodes to compute the same hash for the same identifier_columns values. Most Iceberg field types (Integer, Long, String, etc.) have deterministic hashCode() implementations. However, Iceberg's FixedType returns byte[], whose hashCode() inherits from Object and is based on memory address, producing different values across JVMs.
Changed to val.toString().hashCode(). toString() produces deterministic output for all Iceberg field types, and String.hashCode() to be consistent across JVMs.
| private static byte[] serializeRecords(final List<BufferedRecord> records) { | ||
| final ByteArrayOutputStream baos = new ByteArrayOutputStream(); | ||
| for (final BufferedRecord record : records) { | ||
| final int recordLength = 1 + 4 + record.serializedRecord.length; |
There was a problem hiding this comment.
What are these magic numbers? Please make static final with a good name for it.
There was a problem hiding this comment.
Replacd magic numbers with named constants in shuffle writer and reader.
| ByteBuffer.wrap(header).putInt(uncompressed.length).putInt(compressedLength); | ||
| out.write(header); | ||
| out.write(compressed, 0, compressedLength); | ||
| currentOffset += 8 + compressedLength; |
There was a problem hiding this comment.
Replacd magic numbers with named constants in shuffle writer and reader.
|
|
||
| @Override | ||
| public void cleanupAll() { | ||
| deleteDirectory(baseDir); |
There was a problem hiding this comment.
Do we want to delete this whole directory? This won't store data between restarts. Also, what if two Data Prepper nodes are running on the same machine, say for testing?
There was a problem hiding this comment.
Two changes to address this. First, cleanupAll() now deletes only the contents of the base directory, preserving the directory itself across restarts. Second, each node's shuffle files are isolated under a port-specific subdirectory (e.g. data/shuffle/4995/, data/shuffle/4996/), so multiple Data Prepper nodes on the same machine will not interfere with each other without requiring separate storage_path configuration.
Signed-off-by: Sotaro Hikita <bering1814@gmail.com>
Signed-off-by: Sotaro Hikita <bering1814@gmail.com>
Signed-off-by: Sotaro Hikita <bering1814@gmail.com>
Signed-off-by: Sotaro Hikita <bering1814@gmail.com>
Signed-off-by: Sotaro Hikita <bering1814@gmail.com>
Signed-off-by: Sotaro Hikita <bering1814@gmail.com>
Signed-off-by: Sotaro Hikita <bering1814@gmail.com>
Signed-off-by: Sotaro Hikita <bering1814@gmail.com>
… JVMs Signed-off-by: Sotaro Hikita <bering1814@gmail.com>
Signed-off-by: Sotaro Hikita <bering1814@gmail.com>
Signed-off-by: Sotaro Hikita <bering1814@gmail.com>
|
@dlvenable Addressed all review comments. The one open question is on authentication. I added |
Description
This PR adds source-layer shuffle to the iceberg-source plugin for correct and scalable CDC processing of snapshots containing DELETE operations (UPDATE/DELETE in Copy-on-Write tables).
Problems solved
Correctness: When a partition column is updated (e.g.
regionchanges fromUStoEU), Iceberg produces a DELETE in the old partition and an INSERT in the new partition. The current implementation groups tasks by Iceberg partition, so these end up in separate tasks and the UPDATE merge cannot detect the cross-partition update. If the DELETE arrives at the sink after the INSERT, the document is lost. Shuffle routes all records with the sameidentifier_columnsto the same node, enabling correct cross-partition UPDATE detection.Scalability: The previous bounds-based pairing heuristic attempts to match DELETED and ADDED files by column statistics. When pairing fails (e.g. an UPDATE changes a column's min/max bounds), all files fall back to a single task processed by one node. This is especially problematic for unpartitioned tables or large partitions where the fallback can include hundreds of files. Shuffle distributes the work evenly across all nodes by hash partitioning, regardless of Iceberg partition structure or file bounds.
Shuffle overview
When a snapshot contains
DeletedDataFileScanTasks, all records in the snapshot are redistributed byidentifier_columnshash so that records sharing the sameidentifier_columnsvalue are guaranteed to land on the same node. Once co-located, each node can independently perform carryover removal and UPDATE merge, because it has both the DELETE and INSERT for any given document.This redistribution uses a pull-based two-phase approach based on Spark's shuffle architecture (
SortShuffleManager,IndexShuffleBlockResolver). All workers must finish Phase 1 before Phase 2 can begin, because a reader needs the complete set of records for its hash partitions across all nodes. Between the two phases, the leader collects index metadata from all shuffle files and coalesces adjacent small hash partitions into SHUFFLE_READ tasks (same approach as Spark's Adaptive Query Execution), so the number of Phase 2 tasks is determined by actual data distribution rather than the fixed partition count.Phase 1 (SHUFFLE_WRITE): Each worker reads its assigned data files, computes
hash(identifier_columns) % Pfor each record (P defaults to 64, configurable viashuffle.partitions), and writes records to local disk sorted by hash partition number. Each task produces one data file and one index file.Phase 2 (SHUFFLE_READ): Each worker pulls its assigned hash partition range from all nodes, then performs carryover removal and UPDATE merge on the collected records. Since all records with the same
identifier_columnsvalue are guaranteed to be on the same node, cross-partition updates are correctly detected.Shuffle storage is abstracted behind a
ShuffleStorageinterface. The current implementation uses local disk (LocalDiskShuffleStorage), but the interface allows alternative storage backends (e.g. S3) to be plugged in without changes to the shuffle orchestration or worker logic.The pull-based design decouples writing from network transfer. Phase 1 is purely local I/O with no network dependency, so it is unaffected by slow or failing remote nodes. Phase 2 readers pull data independently and can retry failed transfers without requiring the writer to resend.
Write-side partitioning uses a fixed count (P=64 by default) because the number of nodes may vary and a sufficiently large P ensures even distribution regardless of cluster size. When the actual data volume is small relative to P, many partitions will be empty or contain very little data. After all writes complete, the leader reads the index files to learn the actual size of each partition, then coalesces adjacent small or empty partitions into larger SHUFFLE_READ tasks targeting a configurable size (default 64MB), avoiding unnecessary tasks.
For INSERT-only snapshots (no
DeletedDataFileScanTask), the shuffle is skipped entirely and each file is processed as an independent task.flowchart TD subgraph LeaderScheduler A[Detect snapshot] --> B{DELETED files?} B -->|No| C[INSERT-only: create one<br/>CHANGELOG_TASK per file] B -->|Yes| D[Phase 1: create SHUFFLE_WRITE tasks<br/>1 data file = 1 task] D --> E[Wait for all SHUFFLE_WRITE<br/>to complete] E --> F{shuffle-failed?} F -->|Yes| G[Clean up shuffle files<br/>on all nodes] G --> A F -->|No| H[Collect index files from all nodes<br/>via local disk and HTTP] H --> I[Coalesce:<br/>skip empty partitions<br/>merge small partitions<br/>target 64MB per task] I --> J[Phase 2: create SHUFFLE_READ tasks<br/>1 task = 1 partition range] J --> K[Wait for all SHUFFLE_READ<br/>to complete] K --> K2[Clean up shuffle files<br/>on all nodes] K2 --> L[Update lastProcessedSnapshotId] L --> A C --> M[Wait for all CHANGELOG_TASKs<br/>to complete] M --> L endsequenceDiagram participant LS as LeaderScheduler participant SC as SourceCoordinator participant W1 as Worker Node 1 participant W2 as Worker Node 2 participant H1 as Node 1 HTTP Server participant H2 as Node 2 HTTP Server Note over LS: Snapshot S5 detected (contains DELETED files) rect rgb(230, 245, 255) Note over LS,H2: Phase 1: SHUFFLE_WRITE LS->>SC: createPartition(SHUFFLE_WRITE, file-A) LS->>SC: createPartition(SHUFFLE_WRITE, file-B) LS->>SC: createPartition(SHUFFLE_WRITE, file-C) W1->>SC: acquirePartition(SHUFFLE_WRITE) SC-->>W1: file-A task W2->>SC: acquirePartition(SHUFFLE_WRITE) SC-->>W2: file-B task W1->>W1: Read file-A, hash(id_cols) % 64, sort by partition# W1->>W1: Write data + index files to local disk W1->>SC: Register nodeAddress in GlobalState W1->>SC: completePartition(file-A) W2->>W2: Read file-B, hash(id_cols) % 64, sort by partition# W2->>W2: Write data + index files to local disk W2->>SC: Register nodeAddress in GlobalState W2->>SC: completePartition(file-B) W1->>SC: acquirePartition(SHUFFLE_WRITE) SC-->>W1: file-C task W1->>W1: Write data + index files to local disk W1->>SC: Register nodeAddress in GlobalState W1->>SC: completePartition(file-C) end Note over LS: All SHUFFLE_WRITE complete rect rgb(255, 245, 230) Note over LS,H2: Barrier: index collection + coalesce LS->>SC: Read shuffle-locations GlobalState LS->>LS: Read local index files (taskA, taskC) LS->>H2: GET /shuffle/{snapshotId}/{taskId}/index (taskB) H2-->>LS: index offsets LS->>LS: Compute per-partition sizes, coalesce (target 64MB) end rect rgb(230, 255, 230) Note over LS,H2: Phase 2: SHUFFLE_READ LS->>SC: createPartition(SHUFFLE_READ, partitions 0-20) LS->>SC: createPartition(SHUFFLE_READ, partitions 21-63) W1->>SC: acquirePartition(SHUFFLE_READ) SC-->>W1: partitions 0-20 W2->>SC: acquirePartition(SHUFFLE_READ) SC-->>W2: partitions 21-63 W1->>W1: Read partitions 0-20 from local disk (taskA, taskC) W1->>H2: GET /shuffle/.../data (taskB, partitions 0-20) H2-->>W1: compressed blocks W1->>W1: Carryover removal + UPDATE merge W1->>W1: Write to Buffer W1->>SC: completePartition W2->>W2: Read partitions 21-63 from local disk (taskB) W2->>H1: GET /shuffle/.../data (taskA, taskC, partitions 21-63) H1-->>W2: compressed blocks W2->>W2: Carryover removal + UPDATE merge W2->>W2: Write to Buffer W2->>SC: completePartition end Note over LS: All SHUFFLE_READ complete LS->>LS: Delete local shuffle files LS->>H1: DELETE /shuffle/{snapshotId} LS->>H2: DELETE /shuffle/{snapshotId} LS->>SC: Update lastProcessedSnapshotIdA SHUFFLE_READ worker retries HTTP pulls up to 3 times with exponential backoff. If all retries fail, it writes a
shuffle-failedGlobalState entry. LeaderScheduler detects this during its completion polling, cleans up shuffle files, and aborts without updatinglastProcessedSnapshotId. The same snapshot is retried from Phase 1 on the next polling cycle.Shuffle data format
Each SHUFFLE_WRITE task produces one data file and one index file.
Index file:
(numPartitions + 1)long offset values (8 bytes each). Partitioni's data occupies the byte rangeoffset[i]tooffset[i+1]in the data file. Empty partitions haveoffset[i] == offset[i+1].Data file: LZ4-compressed blocks in partition order. Each block contains serialized records.
Records are serialized using Avro binary encoding derived from the Iceberg table schema. The schema is not stored in the shuffle files since both write and read sides derive it from the same table.
Writer buffers all records in memory, sorts by partition number, compresses per partition, and writes in a single pass. Memory usage per SHUFFLE_WRITE task is bounded by one Iceberg data file (default 512MB).
Shuffle orchestration
LeaderScheduler coordinates the phases using barrier synchronization via SourceCoordinator.
IncrementalChangelogScanand checks for DELETED filestarget_partition_size; similar to Spark's Adaptive Query Execution). This avoids creating excessive tasks when most hash partitions are smallDELETE /shuffle/{snapshotId}and cleans up local shuffle files. Each node'scleanupAll()at startup serves as a safety net for any missed cleanupsPartition keys are deterministic (based on file paths and partition ranges) to ensure idempotency if LeaderScheduler crashes and replans the same snapshot.
Node-to-node data transfer
Each Data Prepper node runs an Armeria HTTP server (default port 4995) to serve shuffle data.
GET /shuffle/{snapshotId}/{taskId}/indexGET /shuffle/{snapshotId}/{taskId}/data?offset={offset}&length={length}DELETE /shuffle/{snapshotId}SHUFFLE_READ workers pull data from each SHUFFLE_WRITE task. For same-node tasks, data is read directly from disk without HTTP. For remote tasks, the worker first fetches the index to compute offsets, then fetches each partition's compressed block individually.
The shuffle HTTP server supports TLS for encryption in transit. Request-level authentication (e.g. mutual TLS) is not included in this PR but can be added as a follow-up.
Shuffle key and identifier_columns
identifier_columnshash is sufficient as the shuffle key for both carryover removal and UPDATE merge. Carryover pairs (DELETE+INSERT with all data columns identical) necessarily have the sameidentifier_columnsvalues, so they are routed to the same node.identifier_columnsis now required when processing snapshots that contain DELETE operations. Without it, OpenSearch document IDs cannot be determined and correct UPDATE/DELETE processing is impossible. The plugin throwsIllegalStateExceptionif DELETED files are detected withoutidentifier_columnsconfigured. For INSERT-only tables,identifier_columnsis not required.Configuration
shuffle.partitionsshuffle.target_partition_sizeshuffle.storage_path${data-prepper.dir}/data/shuffle${java.io.tmpdir}/data-prepper-shuffleifdata-prepper.diris not setshuffle.server_portshuffle.sslshuffle.ssl_certificate_files3://bucket/cert.pem)shuffle.ssl_key_fileshuffle.use_acm_certificate_for_sslshuffle.acm_certificate_arnshuffle.aws_regionshuffle.authenticationhttp_basic)shuffle.ssl_insecure_disable_verificationOther changes
TaskGrouper(replaced entirely by shuffle)TaskGroupertoLeaderSchedulerto avoid redundantIncrementalChangelogScancalls for INSERT-only snapshotsIssues Resolved
Resolves #6666
Check List
By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.
For more information on following Developer Certificate of Origin and signing off your commits, please check here.