[RFC] Iceberg CDC Source Plugin
This RFC proposes a new iceberg source plugin for Data Prepper that captures row-level changes from Apache Iceberg tables and ingests them into OpenSearch as CDC events.
1. Context
Apache Iceberg is an open table format widely used for lakehouse architectures on storage like S3, GCS, and HDFS. Iceberg tables are written by engines like Spark, Flink, and Trino, and are commonly used as the central store for analytical data.
Iceberg tracks all table changes as immutable snapshots. Each write operation (insert, update, delete) creates a new snapshot that records which data files are added or removed. Iceberg provides a core Java API called IncrementalChangelogScan that computes the row-level diff between snapshots, returning which rows were added, deleted, or updated. This API is part of the Iceberg core library and can be called from any Java process, including Data Prepper, without depending on a distributed compute engine like Spark or Flink.
2. Motivation
A common pattern in data platforms is to use an Iceberg-based lakehouse as the single source of truth, and keep OpenSearch synchronized for search and dashboarding over a subset of that data. Iceberg tables may hold raw ingested data, but they also frequently hold curated, transformed, or aggregated data produced through multi-stage processing (e.g., medallion architecture). In either case, the Iceberg table represents the authoritative state that needs to be reflected in OpenSearch.
For example, an e-commerce platform might run Spark jobs that join order streams with product catalogs and write enriched results to a gold-layer Iceberg table. Syncing this table to OpenSearch enables full-text search and real-time dashboards over the processed data.
One approach is dual-writing from the data source to both Iceberg and OpenSearch simultaneously. However, this has several drawbacks. It requires every writer (Spark jobs, Flink pipelines, ETL processes) to know about and write to OpenSearch, coupling the search infrastructure to all upstream producers. It also cannot guarantee consistency between the two stores when writes to one succeed and the other fails. Furthermore, it cannot handle derived or transformed data that only exists after processing in the lakehouse, such as aggregations, joins across tables, or ML feature outputs written back to Iceberg.
The alternative is to treat Iceberg as the source of truth and propagate changes downstream. Today, this requires building custom Spark or Flink jobs to read Iceberg's changelog, transform the data, and write to OpenSearch. This adds operational complexity (maintaining a compute cluster, implementing changelog reading and transformation logic, handling failures and checkpointing) for what is essentially a data movement task.
There is currently no way to capture changes from Iceberg tables and ingest them into OpenSearch through Data Prepper.
3. Proposal
A new iceberg source plugin for Data Prepper that detects changes in Iceberg tables and emits them as CDC events (INSERT, DELETE, UPDATE) into the pipeline.
The plugin supports two modes.
- Initial load + CDC (default): Performs a full table scan of the current snapshot to load all existing data into OpenSearch, then continuously polls for new snapshots and emits incremental changes. This is the typical setup when first connecting an Iceberg table to OpenSearch.
- CDC only (
disable_export: true): Skips the full scan and starts capturing changes from the current snapshot onward. Useful when OpenSearch already has the data or when only future changes are needed.
Example pipeline configuration:
iceberg-cdc-pipeline:
source:
iceberg:
tables:
- table_name: "my_database.my_table"
catalog:
type: rest
uri: "https://example.com/api/catalog"
warehouse: "my_warehouse"
disable_export: false
polling_interval: "PT30S"
identifier_columns: ["region", "category"]
sink:
- opensearch:
hosts: ["https://opensearch:9200"]
document_id_field: "document_id"
action: "${getMetadata(\"bulk_action\")}"
The catalog configuration is passed directly to Iceberg's CatalogUtil.loadCatalog(). Any catalog implementation that Iceberg supports (REST, Glue, Hive, JDBC, Nessie, etc.) can be used by specifying the appropriate type and properties.
Supported Iceberg versions
The plugin supports Iceberg format version 2 and 3 tables.
The initial implementation targets version 2 (CoW) tables. Version 3 features (Row Lineage, Deletion Vectors) are planned as future enhancements (see Section 6).
Processing overview
The plugin works as follows:
- Periodically poll the Iceberg table for new snapshots
- When a new snapshot is detected, run
IncrementalChangelogScan to get the list of changed data files
- Distribute the work across Data Prepper nodes using
EnhancedSourceCoordinator
- Each worker reads the changed data files, removes carryover rows, and emits events to the buffer
- Checkpoint the last processed snapshot ID for resumability
For initial load, step 1 is replaced by a full TableScan. The resulting file scan tasks are grouped into batches (e.g., by total file size or count) and registered as partitions. After the full scan completes, the plugin transitions to the CDC flow above starting from the snapshot at which the scan was initiated.
4. Architecture
4.1 Iceberg snapshots and data files
An Iceberg table stores its actual data in data files (Parquet, ORC, or Avro) on object storage such as S3. Separately from the data files, Iceberg maintains a metadata layer that tracks which data files belong to the current table state. This separation of data and metadata is a key design principle of Iceberg.
Every write operation creates a new immutable snapshot. A snapshot does not copy or move data files. Instead, it records a new set of manifest files that reference the data files making up the table at that point in time. Each snapshot knows which data files were added and which were removed compared to the previous snapshot.
Tables can optionally be partitioned (e.g., by date or region). Partitioning affects how data files are organized, and is relevant to how this plugin distributes work.
Iceberg supports two strategies for handling updates and deletes:
- Copy-on-Write (CoW): When any row in a data file is updated or deleted, the entire file is rewritten. This produces a new data file containing all original rows with the modifications applied. The snapshot records the old file as removed and the new file as added.
- Merge-on-Read (MoR): Instead of rewriting the data file, a separate delete file records which rows are deleted or updated. Reads merge the data file and delete file at query time. MoR avoids the rewrite cost but adds read-time overhead.
The following example illustrates CoW behavior:
Snapshot S1: [file_A, file_B]
-- INSERT 2 rows into partition 'US' -->
Snapshot S2: [file_A, file_B, file_C]
-- UPDATE 1 row in file_A (CoW rewrites entire file) -->
Snapshot S3: [file_A', file_B, file_C] (file_A removed, file_A' added)
file_A' contains all rows from file_A with the one updated row modified. The unchanged rows are carried over as-is. This has implications for change detection, discussed below.
4.2 IncrementalChangelogScan
Iceberg provides IncrementalChangelogScan, a core Java API that computes the row-level diff between two snapshots. It returns a list of ChangelogScanTask objects, each pointing to a data file and indicating whether the file was added or removed.
IncrementalChangelogScan scan = table.newIncrementalChangelogScan()
.fromSnapshotExclusive(lastProcessedSnapshotId)
.toSnapshot(currentSnapshotId);
for (ChangelogScanTask task : scan.planFiles()) {
// AddedRowsScanTask -> rows in a newly added data file (INSERT)
// DeletedDataFileScanTask -> rows in a removed data file (DELETE)
}
For the S2 -> S3 transition above, the scan returns:
DeletedDataFileScanTask(file_A): all rows in the removed file, interpreted as DELETEs
AddedRowsScanTask(file_A'): all rows in the new file, interpreted as INSERTs
The actual change was a single row update, but because CoW rewrites the entire file, unchanged rows appear as both DELETE and INSERT. These redundant pairs are called "carryover" and must be removed to correctly reflect the actual changes in downstream systems.
This API is part of iceberg-core and can be called from any Java process. It only reads metadata (manifest files) during planning. The actual data file reading happens separately when processing each task.
For reference, Apache Spark provides a higher-level interface for the same functionality through the create_changelog_view stored procedure:
CALL catalog.system.create_changelog_view(
table => 'my_database.my_table',
changelog_view => 'my_changelog'
);
SELECT * FROM my_changelog;
-- +----+-------+-----+--------------+
-- | id | name | age | _change_type |
-- +----+-------+-----+--------------+
-- | 2 | Bob | 25 | DELETE |
-- | 2 | Bobby | 25 | INSERT |
-- +----+-------+-----+--------------+
Under the hood, Spark uses the same IncrementalChangelogScan API, then applies carryover removal via RemoveCarryoverIterator using a repartition (shuffle) to group rows by their data values.
4.3 Plugin overview
Data Prepper can run as a cluster of multiple nodes. To distribute work across nodes, source plugins use the EnhancedSourceCoordinator, which provides a shared work queue backed by a persistent store. The coordinator manages "partitions" (units of work) that any node can acquire, process, and mark as complete.
This plugin uses a leader-worker pattern.
LeaderScheduler runs on a single node (elected via the coordinator's leader lease). It is responsible for detecting new Iceberg snapshots and planning the work. When a new snapshot is found, the leader runs IncrementalChangelogScan.planFiles() to get the list of changed data files, groups related tasks together, and registers each group as a partition in the coordinator. The leader does not read any data files itself.
ChangelogWorker runs on every node (including the leader node). Each worker repeatedly acquires an available partition from the coordinator, reads the data files referenced by that partition, converts the rows to Data Prepper events, writes them to the pipeline buffer, and marks the partition as complete. Multiple workers process different partitions in parallel.
The catalog configuration in the pipeline YAML is passed directly to Iceberg's CatalogUtil.loadCatalog(). Any catalog implementation that Iceberg supports (REST, Glue, Hive, JDBC, Nessie, etc.) can be used.
4.4 Event format and type mapping
Each row read from Iceberg data files is converted to a Data Prepper event (JSON document) with the row's columns as top-level fields. The plugin adds metadata attributes that downstream processors and the OpenSearch sink can reference.
| Attribute |
Example value |
Description |
iceberg_operation |
INSERT, DELETE, UPDATE_BEFORE, UPDATE_AFTER |
The type of change this row represents. INSERT and UPDATE_AFTER map to an OpenSearch INDEX action. DELETE maps to an OpenSearch DELETE action. UPDATE_BEFORE is the previous value of an updated row (available when identifier columns are configured). |
iceberg_table_name |
my_database.my_table |
The fully qualified Iceberg table name. Useful for routing to different OpenSearch indices when multiple tables are configured. |
iceberg_snapshot_id |
4859623548962345 |
The snapshot ID that produced this change. Useful for debugging and auditing. |
| document_id |
us-electronics |
Generated from the configured identifier_columns by concatenating their values. Used as the OpenSearch document _id to enable idempotent writes. Required for CDC use cases with UPDATE/DELETE (see Section 5.4 for details). Not set when identifier_columns is not configured. |
bulk_action |
INDEX or DELETE |
The OpenSearch bulk action to use. The sink can reference this via ${getMetadata("bulk_action")}. |
Iceberg column types are mapped to OpenSearch field types as follows. The mapping is fixed and designed to work with OpenSearch's dynamic mapping.
| Iceberg type |
OpenSearch type (dynamic mapping) |
Notes |
| boolean |
boolean |
|
| int, long |
long |
|
| float, double |
float |
|
| string, uuid |
text + keyword |
|
| date |
date |
|
| timestamp / timestamptz |
date |
ISO 8601 format |
| decimal |
keyword |
String output preserves full precision |
| binary / fixed |
text |
base64 encoded. Template needed for binary type |
| list, map, struct |
auto (array / object) |
|
For precise control over OpenSearch field types, users can provide an index template via the OpenSearch sink's existing template_file / template_content configuration. For type conversions (e.g., decimal string to number), the convert_type processor can be used downstream.
4.5 Schema evolution
Iceberg tracks columns by field ID rather than by name, and its reader handles schema evolution transparently. When the plugin reads data files written under an older schema using the current table schema as projection, the following cases are handled automatically.
- Column added: Old data files return null for the new column.
- Column removed: The removed column is simply not read.
- Column renamed: Old data files still contain the same field ID, so the reader returns the value under the current column name.
- Type widened (e.g., int to long): The reader casts automatically.
The plugin always reads using the current table schema, so no special handling is needed in the plugin code itself.
One caveat applies to column renames. Documents already ingested into OpenSearch under the old column name will retain the old field name. New documents will use the new name. This means the same logical column may appear as two separate fields in OpenSearch. This is an inherent characteristic of CDC-based synchronization, where previously ingested documents are not retroactively updated. Users can address this through OpenSearch reindexing or field aliases if needed.
5. Processing flow
5.1 Basic flow
Consider the simplest case: an INSERT-only snapshot on a partitioned table. The leader detects a new snapshot, runs IncrementalChangelogScan, and gets back a list of AddedRowsScanTask objects, each pointing to a newly added data file. Since there are no deleted files, there is no carryover to worry about. The leader registers each task as an independent partition, and workers pick them up in parallel.
Snapshot S2 adds 3 new data files (INSERT only):
LeaderScheduler:
scan.planFiles() returns:
AddedRowsScanTask(file_C, partition='US')
AddedRowsScanTask(file_D, partition='EU')
AddedRowsScanTask(file_E, partition='AP')
Registers 3 partitions -> workers process them in parallel
ChangelogWorker (processing file_C):
1. Read all rows from file_C
2. For each row, create an event with the row's columns as fields
3. Set metadata: iceberg_operation=INSERT, bulk_action=INDEX, document_id=<from identifier columns>
4. Write events to pipeline buffer
5. Mark partition complete
Each event flows through the pipeline to the OpenSearch sink, which uses bulk_action and document_id from the metadata to construct the appropriate bulk request.
5.2 Carryover removal
5.2.1 The problem
When a snapshot contains UPDATE or DELETE operations on a CoW table, the situation becomes more complex. CoW rewrites entire data files, so the changelog scan reports unchanged rows as both DELETE and INSERT.
Example: updating row id=2 in a 3-row data file.
Changelog scan raw output:
(id=1, name='Alice', op=DELETE) <- from deleted file A (carryover)
(id=2, name='Bob', op=DELETE) <- from deleted file A (actual change)
(id=3, name='Carol', op=DELETE) <- from deleted file A (carryover)
(id=1, name='Alice', op=INSERT) <- from added file A' (carryover)
(id=2, name='Bobby', op=INSERT) <- from added file A' (actual change)
(id=3, name='Carol', op=INSERT) <- from added file A' (carryover)
If these events were sent to OpenSearch without carryover removal, the DELETE for id=1 and id=3 would remove documents that should still exist.
5.2.2 Removal algorithm
The plugin removes carryover using full-column equality, matching the approach used by Spark's RemoveCarryoverIterator. No primary key is needed because carryover rows are by definition identical across all columns.
- Sort all rows by all data columns, with DELETE ordered before INSERT for equal rows
- Walk the sorted list. If adjacent DELETE and INSERT have identical data columns, skip both (they are carryover)
After sorting and removal:
(id=2, name='Bob', op=DELETE) <- actual change (UPDATE before image)
(id=2, name='Bobby', op=INSERT) <- actual change (UPDATE after image)
After carryover removal, the remaining rows are converted to events and written to the pipeline buffer. When identifier_columns is configured, a DELETE and INSERT pair sharing the same document_id represents an UPDATE. In this case, only the INSERT is emitted as an INDEX event, because OpenSearch INDEX is an upsert that overwrites the existing document. The DELETE is unnecessary. Pure deletes (a DELETE with no matching INSERT for the same document_id) are emitted as DELETE events. This ensures that each document_id produces at most one event per snapshot, which is safe for parallel consumption by multiple ProcessWorker threads in the pipeline.
5.2.3 Distributed processing
Carryover removal requires that the DELETE (from the old file) and the INSERT (from the new file) of the same row are processed on the same node. If they end up on different nodes, neither node can detect the carryover pair.
The plugin solves this by grouping related tasks at the planning stage, so that a DELETED file and its corresponding ADDED file are always in the same partition. This is done entirely by the LeaderScheduler using metadata, without reading any data files and without any network shuffle between workers.
Grouping is done in three stages.
Stage 1: Snapshot isolation. The leader processes one snapshot at a time. All partitions belonging to snapshot N must be completed before the leader registers partitions for snapshot N+1. This ensures that if the same row is modified in consecutive snapshots, the changes are applied in order.
Stage 2: Iceberg partition isolation. Within a snapshot, tasks are grouped by Iceberg partition value. CoW rewrites are confined to a single Iceberg partition, so carryover pairs always share the same partition value. Tasks from different Iceberg partitions can be processed in parallel without risk.
Stage 3: Bounds-based pairing. Within an Iceberg partition, the plugin attempts to pair each DELETED file with its corresponding ADDED file using column statistics (lower_bounds / upper_bounds). CoW rewrites produce a new file that is nearly identical to the old file, so their per-column min/max bounds typically match. The IncrementalChangelogScan API supports includeColumnStats() which provides these statistics from metadata without reading data files.
Pairing algorithm within a single snapshot + Iceberg partition:
1. List DELETED tasks and ADDED tasks
2. Find DELETED-ADDED pairs where bounds match 1:1 uniquely
3. Group by result:
Paired DELETED + ADDED -> individual group (distributable)
Unpaired DELETED only (no remaining ADDED) -> individual group (full-file delete, no carryover)
Unpaired ADDED only (no remaining DELETED) -> individual group (pure insert, no carryover)
Unpaired DELETED + ADDED both remain -> single fallback group (carryover possible)
Pairing is a distribution optimization only. If pairing fails (e.g., bounds changed due to an UPDATE affecting the min/max, or multiple files share the same bounds), all remaining tasks fall into a single fallback group where carryover removal works correctly. The algorithm never places a DELETED file and its corresponding ADDED file into separate groups.
When pairing succeeds, or when a snapshot contains only INSERTs, work is distributed at the file level across all available workers. When pairing fails, the fallback is Iceberg partition level for partitioned tables, or snapshot level for unpartitioned tables.
5.2.4 Future improvement: Source-layer shuffle via PeerForwarder extension
The grouping strategy described above distributes work at three levels: snapshot isolation, Iceberg partition isolation, and bounds-based file pairing. The first two levels are reliable and cover most cases well. Bounds-based pairing is a best-effort optimization for the remaining case where multiple DELETED and ADDED files exist within the same Iceberg partition. When pairing fails (e.g., an UPDATE changes a column's min/max bounds), these files fall back to a single group processed on one node.
A more scalable approach for this fallback case would be to perform a shuffle similar to Spark's repartition: distribute rows across workers by a hash of all data columns, so that carryover pairs (identical DELETE and INSERT rows) land on the same worker regardless of which data file they came from. Data Prepper's PeerForwarder infrastructure provides useful building blocks for this (consistent hashing via HashRing, node discovery, Armeria HTTP transport). However, the PeerForwarder's orchestration layer is tightly coupled to the Processor execution model (synchronous send/receive within a batch processing cycle, ReceiveBuffer integration with the Pipeline, pluginId-based routing), so Source-layer shuffling would need its own send/receive flow built on top of these lower-level components, or the PeerForwarder architecture would need to be generalized to support non-Processor use cases.
RFC #700 (Core Peer Forwarding) notes that the initial implementation targets Processor plugins only, and suggests opening a new issue to expand the functionality to Source or Sink plugins. No existing Source plugin currently performs inter-node data exchange; all rely on EnhancedSourceCoordinator for work distribution with each worker reading directly from storage. Iceberg CDC's carryover removal is the first Source-layer use case that requires matching rows across different data files, making it a natural candidate for this extension.
This is not implemented in the initial version. I plan to open a separate issue proposing the PeerForwarder extension for Source plugins, with Iceberg CDC as the first use case.
5.3 Compaction handling
Iceberg tables are periodically compacted to merge small data files into larger ones or to apply accumulated delete files. Compaction creates a snapshot with the REPLACE operation type, which reorganizes data files without changing the logical contents of the table.
IncrementalChangelogScan skips REPLACE snapshots automatically, since they do not represent logical data changes. The plugin does not need to handle compaction as a special case.
5.4 Failure recovery and data guarantees
The plugin supports Data Prepper's end-to-end acknowledgements. When enabled, the plugin does not advance the checkpoint until the OpenSearch sink confirms that all events in a partition have been written successfully. If the sink reports a failure, the plugin releases the partition back to the coordinator, and another node re-processes it from the beginning.
With identifier_columns configured (recommended for CDC):
All operations are idempotent when identifier_columns is configured, because each event carries a deterministic document_id derived from the configured columns.
| Scenario |
Behavior |
| Partition re-processed after sink failure |
Same _id values are written again. INDEX overwrites the existing document. DELETE on a non-existent document is a no-op. Final state is correct. |
| Data Prepper node crashes mid-partition |
Partition lease times out. Another node acquires and re-processes it. Same idempotency applies. |
| Sink partially succeeds before failure |
Some documents are written twice. Since _id is deterministic, duplicates overwrite rather than accumulate. |
The guarantee is at-least-once delivery with effectively-once semantics: the same event may be written to OpenSearch more than once, but the final state of each document is correct.
Without identifier_columns:
When identifier_columns is not configured, OpenSearch auto-generates _id for each document. This has several consequences:
- INSERT duplication: If a partition is re-processed, the same rows are inserted again with new auto-generated
_id values, producing duplicate documents.
- DELETE not possible: DELETE events cannot target a specific document because there is no known
_id to delete.
- UPDATE not possible: UPDATE requires DELETE of the old document followed by INDEX of the new document. Without a deterministic
_id, the old document cannot be identified.
This mode is only suitable for append-only tables (INSERT only) where some duplication is acceptable, such as log or event data. The plugin logs a warning at startup when identifier_columns is not configured.
A content-based hash (e.g., SHA-256 of all column values) was evaluated as a potential fallback for generating deterministic _id without identifier_columns. While this works correctly when the table schema is static, it fails when the schema evolves. For example, if acolumn is added between the initial INSERT and a subsequent UPDATE, the INSERT was hashed with N columns but the UPDATE's DELETE event is hashed with N+1 columns (the new column being null), producing a different _id. The old document remains in OpenSearch and a new document is created, resulting in duplicates. Since Iceberg's schema evolution is a core feature that users are expected to use, this approach was not adopted. Iceberg v3's Row Lineage feature (_row_id) is expected to resolve this limitation by providing a stable, schema-independent row identifier (see Section 6).
Ordering guarantees:
- Snapshots are processed sequentially. All partitions for snapshot N must complete before snapshot N+1 begins.
- Within a snapshot, partitions are processed in parallel, but the grouping algorithm ensures that events for the same
document_id are always in the same partition.
- Within a partition, when
identifier_columns is configured, UPDATE pairs (DELETE + INSERT with the same document_id) are merged into a single INDEX event. This eliminates ordering dependencies between events for the same document within a partition.
6. Limitations and future work
Data guarantees
With identifier_columns configured and end-to-end acknowledgements enabled, the plugin provides at-least-once delivery with idempotent writes. See Section 5.4 for detailed failure scenarios.
Current limitations
- Only Copy-on-Write (CoW) tables are supported. The plugin rejects Merge-on-Read (MoR) tables at startup by checking
write.delete.mode / write.update.mode table properties. If UnsupportedOperationException occurs at runtime (e.g., table properties changed), the pipeline stops and the checkpoint is not updated, allowing safe restart after the issue is resolved.
- Unpartitioned CoW tables with large UPDATE/DELETE operations may concentrate work on a single node when bounds-based pairing fails.
identifier_columns must be configured by the user for CDC correctness. Iceberg's identifier-field-ids table property is optional and rarely set, so the plugin does not rely on it.
Future work
-
MoR support: Iceberg's IncrementalChangelogScan currently throws UnsupportedOperationException for MoR tables. Support is being actively worked on in apache/iceberg#14264. Once merged, MoR row deltas will not produce carryover, so each task can be distributed independently.
-
Iceberg v3 Row Lineage: Version 3 tables assign a globally unique _row_id to every row. This enables three improvements:
_row_id can serve as document_id, eliminating the need for user-configured identifier_columns. Tables without a natural primary key (e.g., log data) would get idempotent writes automatically.
- Carryover removal can use
_row_id as a hash key for O(n) matching instead of full-column sort O(n log n).
- Tables using Deletion Vectors (v3 MoR) do not produce carryover, so the removal step can be skipped entirely.
Version 2 tables would continue to use identifier_columns as a fallback.
7. References
[RFC] Iceberg CDC Source Plugin
This RFC proposes a new
icebergsource plugin for Data Prepper that captures row-level changes from Apache Iceberg tables and ingests them into OpenSearch as CDC events.1. Context
Apache Iceberg is an open table format widely used for lakehouse architectures on storage like S3, GCS, and HDFS. Iceberg tables are written by engines like Spark, Flink, and Trino, and are commonly used as the central store for analytical data.
Iceberg tracks all table changes as immutable snapshots. Each write operation (insert, update, delete) creates a new snapshot that records which data files are added or removed. Iceberg provides a core Java API called
IncrementalChangelogScanthat computes the row-level diff between snapshots, returning which rows were added, deleted, or updated. This API is part of the Iceberg core library and can be called from any Java process, including Data Prepper, without depending on a distributed compute engine like Spark or Flink.2. Motivation
A common pattern in data platforms is to use an Iceberg-based lakehouse as the single source of truth, and keep OpenSearch synchronized for search and dashboarding over a subset of that data. Iceberg tables may hold raw ingested data, but they also frequently hold curated, transformed, or aggregated data produced through multi-stage processing (e.g., medallion architecture). In either case, the Iceberg table represents the authoritative state that needs to be reflected in OpenSearch.
For example, an e-commerce platform might run Spark jobs that join order streams with product catalogs and write enriched results to a gold-layer Iceberg table. Syncing this table to OpenSearch enables full-text search and real-time dashboards over the processed data.
One approach is dual-writing from the data source to both Iceberg and OpenSearch simultaneously. However, this has several drawbacks. It requires every writer (Spark jobs, Flink pipelines, ETL processes) to know about and write to OpenSearch, coupling the search infrastructure to all upstream producers. It also cannot guarantee consistency between the two stores when writes to one succeed and the other fails. Furthermore, it cannot handle derived or transformed data that only exists after processing in the lakehouse, such as aggregations, joins across tables, or ML feature outputs written back to Iceberg.
The alternative is to treat Iceberg as the source of truth and propagate changes downstream. Today, this requires building custom Spark or Flink jobs to read Iceberg's changelog, transform the data, and write to OpenSearch. This adds operational complexity (maintaining a compute cluster, implementing changelog reading and transformation logic, handling failures and checkpointing) for what is essentially a data movement task.
There is currently no way to capture changes from Iceberg tables and ingest them into OpenSearch through Data Prepper.
3. Proposal
A new
icebergsource plugin for Data Prepper that detects changes in Iceberg tables and emits them as CDC events (INSERT, DELETE, UPDATE) into the pipeline.The plugin supports two modes.
disable_export: true): Skips the full scan and starts capturing changes from the current snapshot onward. Useful when OpenSearch already has the data or when only future changes are needed.Example pipeline configuration:
The catalog configuration is passed directly to Iceberg's
CatalogUtil.loadCatalog(). Any catalog implementation that Iceberg supports (REST, Glue, Hive, JDBC, Nessie, etc.) can be used by specifying the appropriatetypeand properties.Supported Iceberg versions
The plugin supports Iceberg format version 2 and 3 tables.
The initial implementation targets version 2 (CoW) tables. Version 3 features (Row Lineage, Deletion Vectors) are planned as future enhancements (see Section 6).
Processing overview
The plugin works as follows:
IncrementalChangelogScanto get the list of changed data filesEnhancedSourceCoordinatorFor initial load, step 1 is replaced by a full
TableScan. The resulting file scan tasks are grouped into batches (e.g., by total file size or count) and registered as partitions. After the full scan completes, the plugin transitions to the CDC flow above starting from the snapshot at which the scan was initiated.4. Architecture
4.1 Iceberg snapshots and data files
An Iceberg table stores its actual data in data files (Parquet, ORC, or Avro) on object storage such as S3. Separately from the data files, Iceberg maintains a metadata layer that tracks which data files belong to the current table state. This separation of data and metadata is a key design principle of Iceberg.
Every write operation creates a new immutable snapshot. A snapshot does not copy or move data files. Instead, it records a new set of manifest files that reference the data files making up the table at that point in time. Each snapshot knows which data files were added and which were removed compared to the previous snapshot.
Tables can optionally be partitioned (e.g., by date or region). Partitioning affects how data files are organized, and is relevant to how this plugin distributes work.
Iceberg supports two strategies for handling updates and deletes:
The following example illustrates CoW behavior:
file_A' contains all rows from file_A with the one updated row modified. The unchanged rows are carried over as-is. This has implications for change detection, discussed below.
4.2 IncrementalChangelogScan
Iceberg provides
IncrementalChangelogScan, a core Java API that computes the row-level diff between two snapshots. It returns a list ofChangelogScanTaskobjects, each pointing to a data file and indicating whether the file was added or removed.For the S2 -> S3 transition above, the scan returns:
DeletedDataFileScanTask(file_A): all rows in the removed file, interpreted as DELETEsAddedRowsScanTask(file_A'): all rows in the new file, interpreted as INSERTsThe actual change was a single row update, but because CoW rewrites the entire file, unchanged rows appear as both DELETE and INSERT. These redundant pairs are called "carryover" and must be removed to correctly reflect the actual changes in downstream systems.
This API is part of
iceberg-coreand can be called from any Java process. It only reads metadata (manifest files) during planning. The actual data file reading happens separately when processing each task.For reference, Apache Spark provides a higher-level interface for the same functionality through the
create_changelog_viewstored procedure:Under the hood, Spark uses the same
IncrementalChangelogScanAPI, then applies carryover removal viaRemoveCarryoverIteratorusing a repartition (shuffle) to group rows by their data values.4.3 Plugin overview
Data Prepper can run as a cluster of multiple nodes. To distribute work across nodes, source plugins use the
EnhancedSourceCoordinator, which provides a shared work queue backed by a persistent store. The coordinator manages "partitions" (units of work) that any node can acquire, process, and mark as complete.This plugin uses a leader-worker pattern.
LeaderScheduler runs on a single node (elected via the coordinator's leader lease). It is responsible for detecting new Iceberg snapshots and planning the work. When a new snapshot is found, the leader runs
IncrementalChangelogScan.planFiles()to get the list of changed data files, groups related tasks together, and registers each group as a partition in the coordinator. The leader does not read any data files itself.ChangelogWorker runs on every node (including the leader node). Each worker repeatedly acquires an available partition from the coordinator, reads the data files referenced by that partition, converts the rows to Data Prepper events, writes them to the pipeline buffer, and marks the partition as complete. Multiple workers process different partitions in parallel.
The catalog configuration in the pipeline YAML is passed directly to Iceberg's
CatalogUtil.loadCatalog(). Any catalog implementation that Iceberg supports (REST, Glue, Hive, JDBC, Nessie, etc.) can be used.4.4 Event format and type mapping
Each row read from Iceberg data files is converted to a Data Prepper event (JSON document) with the row's columns as top-level fields. The plugin adds metadata attributes that downstream processors and the OpenSearch sink can reference.
iceberg_operationINSERT,DELETE,UPDATE_BEFORE,UPDATE_AFTERiceberg_table_namemy_database.my_tableiceberg_snapshot_id4859623548962345bulk_actionINDEXorDELETE${getMetadata("bulk_action")}.Iceberg column types are mapped to OpenSearch field types as follows. The mapping is fixed and designed to work with OpenSearch's dynamic mapping.
For precise control over OpenSearch field types, users can provide an index template via the OpenSearch sink's existing
template_file/template_contentconfiguration. For type conversions (e.g., decimal string to number), theconvert_typeprocessor can be used downstream.4.5 Schema evolution
Iceberg tracks columns by field ID rather than by name, and its reader handles schema evolution transparently. When the plugin reads data files written under an older schema using the current table schema as projection, the following cases are handled automatically.
The plugin always reads using the current table schema, so no special handling is needed in the plugin code itself.
One caveat applies to column renames. Documents already ingested into OpenSearch under the old column name will retain the old field name. New documents will use the new name. This means the same logical column may appear as two separate fields in OpenSearch. This is an inherent characteristic of CDC-based synchronization, where previously ingested documents are not retroactively updated. Users can address this through OpenSearch reindexing or field aliases if needed.
5. Processing flow
5.1 Basic flow
Consider the simplest case: an INSERT-only snapshot on a partitioned table. The leader detects a new snapshot, runs
IncrementalChangelogScan, and gets back a list ofAddedRowsScanTaskobjects, each pointing to a newly added data file. Since there are no deleted files, there is no carryover to worry about. The leader registers each task as an independent partition, and workers pick them up in parallel.Each event flows through the pipeline to the OpenSearch sink, which uses
bulk_actionanddocument_idfrom the metadata to construct the appropriate bulk request.5.2 Carryover removal
5.2.1 The problem
When a snapshot contains UPDATE or DELETE operations on a CoW table, the situation becomes more complex. CoW rewrites entire data files, so the changelog scan reports unchanged rows as both DELETE and INSERT.
Example: updating row id=2 in a 3-row data file.
If these events were sent to OpenSearch without carryover removal, the DELETE for id=1 and id=3 would remove documents that should still exist.
5.2.2 Removal algorithm
The plugin removes carryover using full-column equality, matching the approach used by Spark's
RemoveCarryoverIterator. No primary key is needed because carryover rows are by definition identical across all columns.After carryover removal, the remaining rows are converted to events and written to the pipeline buffer. When
identifier_columnsis configured, a DELETE and INSERT pair sharing the samedocument_idrepresents an UPDATE. In this case, only the INSERT is emitted as an INDEX event, because OpenSearch INDEX is an upsert that overwrites the existing document. The DELETE is unnecessary. Pure deletes (a DELETE with no matching INSERT for the samedocument_id) are emitted as DELETE events. This ensures that eachdocument_idproduces at most one event per snapshot, which is safe for parallel consumption by multiple ProcessWorker threads in the pipeline.5.2.3 Distributed processing
Carryover removal requires that the DELETE (from the old file) and the INSERT (from the new file) of the same row are processed on the same node. If they end up on different nodes, neither node can detect the carryover pair.
The plugin solves this by grouping related tasks at the planning stage, so that a DELETED file and its corresponding ADDED file are always in the same partition. This is done entirely by the LeaderScheduler using metadata, without reading any data files and without any network shuffle between workers.
Grouping is done in three stages.
Stage 1: Snapshot isolation. The leader processes one snapshot at a time. All partitions belonging to snapshot N must be completed before the leader registers partitions for snapshot N+1. This ensures that if the same row is modified in consecutive snapshots, the changes are applied in order.
Stage 2: Iceberg partition isolation. Within a snapshot, tasks are grouped by Iceberg partition value. CoW rewrites are confined to a single Iceberg partition, so carryover pairs always share the same partition value. Tasks from different Iceberg partitions can be processed in parallel without risk.
Stage 3: Bounds-based pairing. Within an Iceberg partition, the plugin attempts to pair each DELETED file with its corresponding ADDED file using column statistics (lower_bounds / upper_bounds). CoW rewrites produce a new file that is nearly identical to the old file, so their per-column min/max bounds typically match. The
IncrementalChangelogScanAPI supportsincludeColumnStats()which provides these statistics from metadata without reading data files.Pairing algorithm within a single snapshot + Iceberg partition:
Pairing is a distribution optimization only. If pairing fails (e.g., bounds changed due to an UPDATE affecting the min/max, or multiple files share the same bounds), all remaining tasks fall into a single fallback group where carryover removal works correctly. The algorithm never places a DELETED file and its corresponding ADDED file into separate groups.
When pairing succeeds, or when a snapshot contains only INSERTs, work is distributed at the file level across all available workers. When pairing fails, the fallback is Iceberg partition level for partitioned tables, or snapshot level for unpartitioned tables.
5.2.4 Future improvement: Source-layer shuffle via PeerForwarder extension
The grouping strategy described above distributes work at three levels: snapshot isolation, Iceberg partition isolation, and bounds-based file pairing. The first two levels are reliable and cover most cases well. Bounds-based pairing is a best-effort optimization for the remaining case where multiple DELETED and ADDED files exist within the same Iceberg partition. When pairing fails (e.g., an UPDATE changes a column's min/max bounds), these files fall back to a single group processed on one node.
A more scalable approach for this fallback case would be to perform a shuffle similar to Spark's repartition: distribute rows across workers by a hash of all data columns, so that carryover pairs (identical DELETE and INSERT rows) land on the same worker regardless of which data file they came from. Data Prepper's PeerForwarder infrastructure provides useful building blocks for this (consistent hashing via HashRing, node discovery, Armeria HTTP transport). However, the PeerForwarder's orchestration layer is tightly coupled to the Processor execution model (synchronous send/receive within a batch processing cycle, ReceiveBuffer integration with the Pipeline, pluginId-based routing), so Source-layer shuffling would need its own send/receive flow built on top of these lower-level components, or the PeerForwarder architecture would need to be generalized to support non-Processor use cases.
RFC #700 (Core Peer Forwarding) notes that the initial implementation targets Processor plugins only, and suggests opening a new issue to expand the functionality to Source or Sink plugins. No existing Source plugin currently performs inter-node data exchange; all rely on
EnhancedSourceCoordinatorfor work distribution with each worker reading directly from storage. Iceberg CDC's carryover removal is the first Source-layer use case that requires matching rows across different data files, making it a natural candidate for this extension.This is not implemented in the initial version. I plan to open a separate issue proposing the PeerForwarder extension for Source plugins, with Iceberg CDC as the first use case.
5.3 Compaction handling
Iceberg tables are periodically compacted to merge small data files into larger ones or to apply accumulated delete files. Compaction creates a snapshot with the
REPLACEoperation type, which reorganizes data files without changing the logical contents of the table.IncrementalChangelogScanskipsREPLACEsnapshots automatically, since they do not represent logical data changes. The plugin does not need to handle compaction as a special case.5.4 Failure recovery and data guarantees
The plugin supports Data Prepper's end-to-end acknowledgements. When enabled, the plugin does not advance the checkpoint until the OpenSearch sink confirms that all events in a partition have been written successfully. If the sink reports a failure, the plugin releases the partition back to the coordinator, and another node re-processes it from the beginning.
With
identifier_columnsconfigured (recommended for CDC):All operations are idempotent when
identifier_columnsis configured, because each event carries a deterministicdocument_idderived from the configured columns._idvalues are written again. INDEX overwrites the existing document. DELETE on a non-existent document is a no-op. Final state is correct._idis deterministic, duplicates overwrite rather than accumulate.The guarantee is at-least-once delivery with effectively-once semantics: the same event may be written to OpenSearch more than once, but the final state of each document is correct.
Without
identifier_columns:When
identifier_columnsis not configured, OpenSearch auto-generates_idfor each document. This has several consequences:_idvalues, producing duplicate documents._idto delete._id, the old document cannot be identified.This mode is only suitable for append-only tables (INSERT only) where some duplication is acceptable, such as log or event data. The plugin logs a warning at startup when
identifier_columnsis not configured.A content-based hash (e.g., SHA-256 of all column values) was evaluated as a potential fallback for generating deterministic _id without identifier_columns. While this works correctly when the table schema is static, it fails when the schema evolves. For example, if acolumn is added between the initial INSERT and a subsequent UPDATE, the INSERT was hashed with N columns but the UPDATE's DELETE event is hashed with N+1 columns (the new column being null), producing a different _id. The old document remains in OpenSearch and a new document is created, resulting in duplicates. Since Iceberg's schema evolution is a core feature that users are expected to use, this approach was not adopted. Iceberg v3's Row Lineage feature (_row_id) is expected to resolve this limitation by providing a stable, schema-independent row identifier (see Section 6).
Ordering guarantees:
document_idare always in the same partition.identifier_columnsis configured, UPDATE pairs (DELETE + INSERT with the samedocument_id) are merged into a single INDEX event. This eliminates ordering dependencies between events for the same document within a partition.6. Limitations and future work
Data guarantees
With
identifier_columnsconfigured and end-to-end acknowledgements enabled, the plugin provides at-least-once delivery with idempotent writes. See Section 5.4 for detailed failure scenarios.Current limitations
write.delete.mode/write.update.modetable properties. IfUnsupportedOperationExceptionoccurs at runtime (e.g., table properties changed), the pipeline stops and the checkpoint is not updated, allowing safe restart after the issue is resolved.identifier_columnsmust be configured by the user for CDC correctness. Iceberg'sidentifier-field-idstable property is optional and rarely set, so the plugin does not rely on it.Future work
MoR support: Iceberg's
IncrementalChangelogScancurrently throwsUnsupportedOperationExceptionfor MoR tables. Support is being actively worked on in apache/iceberg#14264. Once merged, MoR row deltas will not produce carryover, so each task can be distributed independently.Iceberg v3 Row Lineage: Version 3 tables assign a globally unique
_row_idto every row. This enables three improvements:_row_idcan serve asdocument_id, eliminating the need for user-configuredidentifier_columns. Tables without a natural primary key (e.g., log data) would get idempotent writes automatically._row_idas a hash key for O(n) matching instead of full-column sort O(n log n).Version 2 tables would continue to use
identifier_columnsas a fallback.7. References