You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
This RFC proposes a new iceberg sink plugin for Data Prepper that writes events into Apache Iceberg tables. The plugin supports INSERT, UPDATE, and DELETE operations, enabling use cases from log ingestion to change data replication.
Apache Iceberg is an open table format widely used for lakehouse architectures on storage like S3, GCS, and HDFS. Iceberg tables are written by engines like Spark, Flink, and Trino, and are commonly used as the central store for analytical data.
Data Prepper already has an Iceberg CDC Source plugin (#6552) that reads changes from Iceberg tables. An Iceberg Sink plugin completes the picture by enabling Data Prepper to write data into Iceberg tables from any source, including HTTP, S3, Kafka, RDS, and the Iceberg Source itself.
2. Motivation
Data Prepper collects data from diverse sources (logs, metrics, traces, change streams) and delivers them to sinks such as OpenSearch, S3, and Kafka. Adding an Iceberg sink enables the following use cases.
Log and event analytics in the lakehouse. Organizations using Iceberg-based lakehouses can ingest logs, metrics, and traces directly from Data Prepper into Iceberg tables, making them queryable by Spark, Trino, Athena, and other engines without an intermediate ETL step.
CDC replication to Iceberg. Data Prepper's RDS Source captures row-level changes from relational databases. An Iceberg sink enables replicating these changes into Iceberg tables for analytical processing, replacing custom Spark/Flink jobs.
Iceberg-to-Iceberg synchronization. Combined with the Iceberg CDC Source (#6552), the sink enables replicating changes between Iceberg tables across different catalogs or environments (e.g., production to analytics).
3. Proposal
A new iceberg sink plugin that writes Data Prepper events into Iceberg tables using the Iceberg Java API.
Features
Streaming ingestion from any Data Prepper source to Iceberg tables
Dynamic multi-table routing using Data Prepper expression language
INSERT, UPDATE, and DELETE support from RDS and Iceberg sources
Automatic table creation from pipeline configuration
Coordinated commits across multi-node Data Prepper clusters
At-least-once delivery with end-to-end acknowledgements
Automatic schema evolution
Deletion Vector support for in-batch deduplication on v3 tables
Pipeline examples
Append-only: HTTP source to Iceberg
Events received via HTTP are written as new rows to an existing Iceberg table.
Row-level changes: RDS Source to Iceberg (multi-table)
Replicate row-level changes from a relational database to Iceberg tables. The table_identifier uses the expression language to route each event to the corresponding Iceberg table based on the source table name. The operation setting reads the operation type from the event metadata set by the RDS Source.
To write different events to different tables with static table names, use multiple sink entries. Combined with Data Prepper's route feature, each sink receives only the events matching its route condition.
When the destination table does not exist yet, the plugin can create it from a schema definition in the pipeline YAML. Column types use Iceberg type names directly. Event fields are matched to table columns by name.
When auto_create is true and no schema definition is provided, the plugin infers the schema from the first batch of events. Nested JSON objects are inferred as Iceberg StructType, arrays as ListType (element type inferred from the first element), and scalar types are mapped based on Java type (Boolean to BooleanType, Integer/Long to LongType, Float/Double to DoubleType, else StringType).
Schema inference has limitations: it cannot distinguish between int and long, float and double, or string and timestamp. Empty arrays and objects are inferred as StringType. For precise type control, use a schema definition or create the table in advance.
The full configuration reference is at the end of this document.
4. Background: Iceberg write model
This section explains the Iceberg concepts relevant to the sink's design.
An Iceberg table stores its actual data in data files (Parquet, ORC, or Avro) on object storage such as S3. Separately from the data files, Iceberg maintains a metadata layer that tracks which data files belong to the current table state. This separation of data and metadata is a key design principle of Iceberg.
The metadata layer consists of a metadata file, manifest lists, and manifest files, forming a tree structure that ultimately points to the data files. A catalog (such as REST, Glue, or Hive Metastore) stores the location of the current metadata file for each table. The catalog is the entry point for all table operations.
Every write operation creates a new immutable snapshot. A snapshot does not copy or move data files. Instead, it records a new set of manifest files that reference the data files making up the table at that point in time. Each snapshot knows which data files were added and which were removed compared to the previous snapshot.
A commit is the atomic operation that publishes a new snapshot. The writer creates new metadata and manifest files on storage, then asks the catalog to atomically update the metadata pointer. Iceberg uses Optimistic Concurrency Control (OCC) to handle concurrent commits: if another writer committed in the meantime, the operation is retried against the updated metadata.
4.2 Write modes for DELETE and UPDATE
Iceberg supports two approaches for handling DELETE and UPDATE operations. The approach is not fixed per table. Different engines, or the same engine at different times, can write to the same table using different approaches.
Copy-on-Write (CoW). When a row in a data file is updated or deleted, the entire data file is rewritten with the modification applied. The snapshot records the old file as removed and the new file as added. This produces the best read performance because there are no delete files to reconcile at query time, but write cost is high because unchanged rows must also be rewritten.
Merge-on-Read (MoR). Instead of rewriting data files, a separate delete file records which rows should be considered deleted. Readers merge the data files and delete files at query time. This is cheaper to write because existing data files are not touched, but adds read-time overhead. Periodic compaction merges delete files into data files, eliminating the read-time cost and reclaiming storage.
MoR uses delete files, which come in three forms:
Delete file type
Mechanism
Write cost
Read cost
Equality delete
Specifies rows to delete by column values (e.g., user_id = 123)
Low. No existing data read needed
High. Every read must match against delete files
Positional delete
Specifies rows to delete by data file path + row position
Low if the writer knows the row position. Otherwise requires a table scan to locate the row
Low. Direct row lookup
Deletion Vector (v3)
A bitmap indicating deleted row positions within a data file, stored in Puffin format
Higher than positional delete. Writers must merge existing DVs to maintain at most one DV per data file
Low. Direct row lookup with compact representation
Note: some query engines do not support reading equality deletes (e.g., Snowflake, PyIceberg). Tables containing equality delete files may not be readable by these engines until the delete files are compacted away.
5. Architecture
5.1 Overview
The plugin's processing is split into two paths.
Write path. Data Prepper's pipeline framework calls Sink.output(records) concurrently from multiple threads on each node. Since Iceberg's TaskWriter is not thread-safe, each thread maintains its own instance. When table_identifier is dynamic, the plugin resolves the target table for each event and maintains a separate TaskWriter per table (similar to the OpenSearch Sink's dynamic index routing). In a multi-node cluster, every node writes data files independently to storage. The TaskWriter handles file size management automatically (rolling to a new file when targetFileSize is reached).
Commit path. A CommitScheduler runs on a single leader node. Leader election uses Data Prepper's existing lease-based coordination infrastructure (EnhancedSourceCoordinator), which is currently used by Source plugins such as the Iceberg CDC Source. This RFC proposes extending it to support Sink plugins as well. At each commit_interval (default 5 minutes), the scheduler collects pending ManifestFile metadata from all nodes, reads the delta manifest files from storage to reconstruct the write results, and commits them to Iceberg.
sequenceDiagram
participant N1 as Node 1
participant N2 as Node 2
participant ST as Storage (S3/GCS)
participant CS as Coordination Store
participant L as Leader Node
participant CAT as Iceberg Catalog
Note over N1,N2: Write path (flush on flush_interval)
N1->>ST: TaskWriter writes data/delete files
N1->>ST: Write delta manifest file to table location
N1->>CS: Register ManifestFile metadata
N2->>ST: TaskWriter writes data/delete files
N2->>ST: Write delta manifest file to table location
N2->>CS: Register ManifestFile metadata
Note over L: Commit path (every commit_interval)
L->>CS: Collect pending ManifestFile metadata
L->>ST: Read delta manifest files
ST-->>L: DataFile and DeleteFile metadata
L->>CAT: Commit snapshot (newAppend / newRowDelta)
CAT-->>L: Commit success
L->>ST: Delete committed delta manifest files
L->>CS: Mark partitions complete
L->>L: EventHandle.release(true) for all committed events
Loading
5.2 Write mode
When handling DELETE and UPDATE operations, the plugin uses equality delete with in-batch positional delete optimization, the same approach as the Flink Iceberg sink. Equality delete writes a small delete file instead of rewriting existing data files, making it suitable for high-frequency streaming writes. For rows INSERTed and then DELETEd within the same batch, the sink already knows the file path and row position, so it uses the cheaper positional delete (or Deletion Vectors on v3 tables) instead of equality delete.
This is a pragmatic choice for streaming. Depending on the query engines used to read the table, different write strategies may be preferable (e.g., CoW for engines that do not support equality deletes, or positional delete only for better read performance). The plugin is designed with extensibility in mind so that alternative strategies can be offered as configuration options in the future.
5.3 Commit coordination
sequenceDiagram
participant N1 as Node 1
participant N2 as Node 2
participant CS as Coordination Store
participant L as Leader Node
participant CAT as Iceberg Catalog
Note over N1,N2: flush triggered (flush_interval reached)
N1->>ST: Write delta manifest file to table location
N1->>CS: Register WriteResultPartition (ManifestFile metadata)
N2->>ST: Write delta manifest file to table location
N2->>CS: Register WriteResultPartition (ManifestFile metadata)
Note over L: CommitScheduler wakes (every commit_interval)
L->>CS: Acquire pending WriteResultPartitions
CS-->>L: ManifestFile metadata from Node 1 and Node 2
L->>ST: Read delta manifest files, reconstruct DataFiles
alt Append-only (no delete files)
L->>CAT: table.newAppend() with all data files
else With delete files
L->>CAT: table.newRowDelta() per batch in order
end
CAT-->>L: Commit success
L->>ST: Delete committed delta manifest files
L->>CS: Mark partitions complete
L->>L: EventHandle.release(true)
Note over L: On commit failure
L->>L: Iceberg auto-retry (OCC)
L-->>L: If exhausted, retry next cycle
Loading
Each Iceberg commit creates a new snapshot with associated metadata and manifest files. If every node committed independently at high frequency, the resulting volume of snapshots and small data files would degrade read performance and increase OCC conflicts. Centralizing commits into a single leader at a controlled interval (default 5 minutes) keeps snapshot and file counts manageable. The Flink Iceberg sink and the Kafka Connect Iceberg sink both use this single-committer pattern.
The plugin follows the same approach by extending Data Prepper's EnhancedSourceCoordinator to support Sink plugins.
Framework changes required. The EnhancedSourceCoordinator itself is a general-purpose lease-based partition manager with no Source-specific logic. Extending it to Sink plugins requires the following changes, all additive with no modification to existing code:
A new UsesEnhancedSinkCoordination interface in data-prepper-api, mirroring UsesEnhancedSourceCoordination (two methods: setEnhancedSourceCoordinator and getPartitionFactory).
In Pipeline.execute(), after the existing Source coordinator injection block, an additional block that checks each Sink for UsesEnhancedSinkCoordination and injects a coordinator instance. This is approximately 5 lines of code following the same pattern as the existing Source block.
The EnhancedSourceCoordinator implementation, the coordination store, and all existing Source plugins are not modified. Each Sink that uses coordination gets its own coordinator instance with its own partition namespace, so there is no interaction with Source partitions.
The coordinator is a general-purpose lease-based partition manager backed by Data Prepper's coordination store.
Partition types used by the plugin:
Partition type
Purpose
LeaderPartition
Leader election. Single global partition.
WriteResultPartition
One per flush per table. Contains the table identifier and ManifestFile metadata (serialized pointers to the delta manifest files on storage). The delta manifest files contain the actual data file and delete file metadata. EventHandles are held in memory on the originating node, keyed by partition ID.
WriteResults committed individually in order via table.newRowDelta(). Merging would break equality delete semantics because Iceberg applies equality deletes only to data files with a strictly lower sequence number.
When table_identifier is dynamic, each WriteResultPartition includes the resolved table identifier. The CommitScheduler groups pending partitions by table and commits each table independently.
Event ordering in CDC pipelines. When the pipeline handles DELETE or UPDATE operations, the workers setting (which controls the number of ProcessWorkers reading from the buffer) must remain at 1 (the default). With workers > 1, multiple ProcessWorkers drain events from the buffer concurrently, which can reorder causally related events for the same key (e.g., an INSERT followed by a DELETE). If a DELETE is committed before its corresponding INSERT, the equality delete applies only to data files with a lower sequence number and will not affect the later-committed INSERT, leaving stale data in the table. This ordering constraint is not specific to the Iceberg sink. It applies to any sink that processes CDC events with DELETE semantics, including the OpenSearch sink. CDC sources (RDS Source, Iceberg Source) produce events on a single node, so workers: 1 is sufficient to preserve causal order.
5.4 Leader failover
When the leader node fails, the coordinator's lease expires and another node becomes leader. Uncommitted WriteResultPartition entries remain in the coordinator for the new leader to process. Because the delta manifest files are persisted on storage (S3/GCS) independently of the coordination store, the new leader can read them using the registered ManifestFile metadata and proceed with the commit. No write results are lost due to leader failover.
To prevent duplicate commits when the leader fails mid-commit, the CommitScheduler maintains a monotonically increasing commitSequence (long). Each commit records this sequence in the Iceberg snapshot's summary properties (data-prepper.commit-sequence). Before committing, the CommitScheduler writes the commitSequence into each pending partition's progress state in the coordination store.
When a new leader starts, it scans the snapshot history (walking parent pointers) to find the maximum committed sequence. Pending partitions whose commitSequence is at or below this maximum are already committed and are simply marked complete without re-committing. Partitions with a higher commitSequence are committed normally. This is the same approach used by the Flink Iceberg sink (flink.job-id + flink.max-committed-checkpoint-id in snapshot summary).
When delete files are present and WriteResults are committed individually, each commit records commitSequence-subIndex (e.g., 5-0, 5-1). On failover recovery, the new leader determines which sub-commits have already succeeded and resumes from the next one.
6. Processing flow
6.1 Append-only flow
By default, the plugin writes all events as new rows (INSERT). This is the behavior when the operation setting (described in the next section) is not configured.
Each event received in output() is converted to an Iceberg GenericRecord using the table schema (see Event conversion below). The record is then written to the TaskWriter, which handles partitioning and file management internally: it computes the partition value from the record using the table's partition spec, routes the record to the appropriate per-partition file writer, and rolls to a new file when targetFileSize is reached.
Each thread maintains a long-lived TaskWriter across multiple output() calls. A flush is triggered when flush_interval has elapsed since the last flush. On flush, the thread closes the TaskWriter, writes the resulting WriteResult as a delta manifest file to the table's storage location, registers the ManifestFile metadata in the coordination store, and creates a new TaskWriter. This keeps the number of WriteResults per commit cycle to approximately (number of nodes) x (number of threads), rather than one per output() call. Each thread flushes independently; the CommitScheduler commits whatever ManifestFile metadata is available in the coordinator at that point. Delta manifests from threads that have not yet flushed are committed in the next cycle. Because the flush and commit cycles are independent, the maximum commit latency is commit_interval + flush_interval.
After writing, the TaskWriter's WriteResult (containing the paths of data files written) is registered in the coordination store. The EventHandles for the written events are not stored in the coordinator (they are not serializable). Instead, each node holds them in memory, keyed by the WriteResultPartition ID. When the CommitScheduler marks a partition as COMPLETED in the coordinator, the originating node detects this via polling and calls EventHandle.release(true) for the corresponding events.
Acknowledgement flow. Each thread collects EventHandles during output() and associates them with the WriteResultPartition ID on flush. Because the CommitScheduler runs on a single leader node and EventHandles are JVM-local objects that cannot be accessed from other nodes, each node runs a polling thread that queries the coordination store for completed WriteResultPartitions at a configurable interval (ack_poll_interval, default 5 seconds). When a completed partition matches a locally held EventHandle set, the handles are released with release(true), signaling the source that the events have been durably committed to Iceberg. On shutdown, all unreleased EventHandles are released with release(false).
6.2 DELETE and UPDATE flow
When operation is configured, the plugin resolves the operation type for each event using the configured expression (e.g., ${getMetadata("change_event_type")}). The resolved value is mapped to an Iceberg operation (case-insensitive):
Value
Operation
Notes
index, insert, create, c, i, r
INSERT
r is Debezium's snapshot (read) operation
update, u
UPDATE (equality DELETE + INSERT)
delete, d
DELETE (equality delete)
Unrecognized
DLQ
RDS Source sets two metadata attributes: opensearch_action (values: index, delete) and change_event_type (values: insert, update, delete). For Iceberg CDC pipelines, use change_event_type because opensearch_action maps both INSERT and UPDATE to index, which would prevent equality deletes for UPDATE events.
INSERT: The record is written to a data file. The writer records the record's key and file position in an in-memory map (insertedRowMap) for potential in-batch deduplication.
DELETE: The writer first checks insertedRowMap. If the key was inserted earlier in the same batch, the writer emits a positional delete (or Deletion Vector on v3 tables), which is cheaper to read than an equality delete. If the key is not found (the row was written in a previous commit), the writer emits an equality delete file using the identifier_columns.
UPDATE: Processed as a DELETE followed by an INSERT.
identifier_columns specifies which columns uniquely identify a row in the table (e.g., a primary key). The equality delete file contains only these columns' values, and at read time, any row matching those values is treated as deleted. This setting is required when operation is configured, because without it the plugin cannot construct equality delete files. If the table already has identifier-field-ids set in its schema, those are used by default. The YAML identifier_columns setting overrides the table definition, which is useful when the table does not have identifier-field-ids set (the more common case in practice).
For partitioned tables, the plugin maintains a separate BaseEqualityDeltaWriter per partition, so that each partition has its own insertedRowMap.
The plugin selects the TaskWriter implementation automatically based on the table's partition spec and whether identifier_columns is configured:
Events (JacksonEvent) are schema-less JSON. The plugin converts them to Iceberg GenericRecord using the table schema as the reference, applying type coercion where possible:
Iceberg type
Accepted event values
int, long
Number (intValue()/longValue()) or String (parseInt()/parseLong())
float, double
Number or String
decimal(P,S)
Number or String -> BigDecimal
boolean
Boolean or String (parseBoolean())
string
String, Number (toString()), Boolean (toString())
timestamp, timestamptz
String (ISO 8601) or Number (epoch millis)
date
String (ISO 8601) or Number (epoch days)
time
String (ISO 8601) or Number (millis)
binary
String (Base64 decode)
uuid
String (UUID.fromString())
struct
Map (recursive conversion)
list
List (recursive conversion)
map
Map
Mismatch handling is per-event. One bad event does not fail the entire batch.
Scenario
Behavior
Exact match
Write
Extra fields in event
Ignore
Missing optional field
NULL
Missing required field
DLQ
Wrong type, coercible
Coerce and write
Wrong type, not coercible, optional column
NULL
Wrong type, not coercible, required column
DLQ
No fields match schema
DLQ
6.4 Schema management
Table exists (default). The plugin loads the table and uses table.schema() for conversion. No schema configuration needed.
Auto-create. When auto_create is true and the table does not exist, the output() thread creates it at the point where it first attempts to write to the table. This is the natural place for table creation because the thread needs the table to exist before writing data files. In a multi-node cluster, concurrent creation attempts are handled by catching AlreadyExistsException and falling back to loadTable(). If schema is provided, the table is created from the definition. If schema is not provided, the schema is inferred from the first batch of events. After creation, the table schema is used for conversion.
When table_identifier is dynamic, auto-create applies independently to each resolved table. In this case, schema cannot be specified (the plugin rejects this combination at startup) because different tables typically have different schemas. Each table's schema is inferred from the first batch of events routed to that table.
Schema evolution. When schema_evolution is true and an event contains fields not present in the current table schema, the plugin adds the missing columns automatically. The process follows the same approach as the Kafka Connect Iceberg Sink: flush the current file, call table.updateSchema().addColumn() to commit the new columns, reinitialize the TaskWriter with the updated schema, and re-convert the event. Column types are inferred from the event value using the same logic as schema inference in auto-create. Concurrent schema updates from multiple nodes are handled by refreshing the table metadata and skipping columns that have already been added by another node.
Schema evolution supports column addition only. Column deletion, type changes (e.g., int to long), and renames require precise type information that is not available from schemaless JSON events. Supporting these operations would require an external schema source such as a schema registry.
When multiple nodes infer different types for the same new field, the first committed type wins. Subsequent nodes refresh the table metadata, discover the existing column, and convert values using the established type. If conversion fails, the standard mismatch handling applies (NULL for optional columns, DLQ for required columns).
If auto_create is false (the default) and the table does not exist, the plugin fails at startup. When table_identifier is dynamic, the table name is resolved at runtime. If a resolved table does not exist, the event is sent to DLQ.
7. Data guarantees
7.1 At-least-once delivery
With end-to-end acknowledgements enabled. Events are acknowledged back to the source only after the Iceberg commit succeeds and the originating node detects the commit via coordination store polling. This provides at-least-once delivery.
The maximum acknowledgement latency under normal operation is flush_interval + commit_interval + ack_poll_interval (default: 5m + 5m + 5s = 10m 5s). The source's acknowledgement timeout must be set longer than this value. When a leader failover occurs, an additional delay of up to LEASE_DURATION (3m) + commit_interval (5m) is added while the new leader takes over and completes pending partitions, bringing the worst case to approximately 18 minutes with default settings.
EventHandles are held in memory on each node, not in the coordination store. If a node crashes, its in-memory EventHandles are lost and the corresponding acknowledgements are never sent. The source treats this as a timeout and re-sends the events.
Duplicate events can occur in the following scenarios.
Worker node crash after partition registration. If a worker node crashes after successfully registering a WriteResultPartition in the coordination store but before the EventHandles are released, the leader will commit the data to Iceberg. However, the EventHandles are lost with the crashed node, so the source re-sends the same events, resulting in duplicate writes. If the worker crashes before partition registration completes, no data is committed for those events, and the source re-send produces the first (and only) write.
Leader node crash after Iceberg commit. If the leader crashes after a successful Iceberg commit but before marking the partitions as COMPLETED in the coordination store, the new leader recovers the commit state from Iceberg snapshot properties (commitSequence) and marks the partitions as complete. If this recovery completes before the source's acknowledgement timeout, the originating nodes detect the completion via polling and release the EventHandles normally, avoiding duplicates. If the recovery takes longer than the acknowledgement timeout, the source re-sends the events, resulting in duplicates.
Without end-to-end acknowledgements. The source does not wait for confirmation from the sink. Events may be lost if the sink fails before committing. This provides at-most-once delivery. Throughput is higher because the source does not block on acknowledgements.
Choosing between the two. Whether acknowledgements are enabled depends on the source configuration (e.g., RDS Source enables them by default, HTTP Source does not). Use at-least-once (acknowledgements enabled) when data completeness matters more than avoiding duplicates, such as CDC replication where missing a DELETE would leave stale rows. Use at-most-once (acknowledgements disabled) when occasional data loss is acceptable and throughput is the priority, such as high-volume log ingestion where duplicates or gaps do not affect analytical results.
7.2 Duplicate impact by write mode
Scenario
Impact
Append-only, duplicate INSERT
Duplicate rows exist. Addressable with aggregation queries (DISTINCT, GROUP BY).
Duplicate DELETE
No impact on data correctness. Equality delete is idempotent. Extra delete files accumulate until compaction.
Duplicate INSERT + subsequent DELETE
No impact. Equality delete removes all matching rows.
Duplicate UPDATE (DELETE + INSERT pair)
Temporary duplicate rows until the next UPDATE/DELETE. Addressable with ROW_NUMBER() deduplication in queries.
7.3 Internal deduplication
Duplicate commits from the same WriteResult (e.g., during leader failover) are prevented by the commit ID recorded in snapshot properties (see Leader failover above).
8. Error handling
Errors are handled differently based on their cause.
Event conversion errors (caused by event content):
Error
Scope
Action
Type conversion failure, required column
Per-event
DLQ
Required field missing
Per-event
DLQ
No fields match schema
Per-event
DLQ
Unrecognized operation type
Per-event
DLQ
Type conversion failure, optional column
Per-event
Write NULL, continue
File write errors (caused by infrastructure):
Error
Scope
Action
Transient storage failure (network, S3 503)
Per-batch
Retry, then throw exception. Source re-sends.
Permanent storage failure (permissions, capacity)
Per-batch
Retry, then throw exception. Source re-sends.
OOM (excessive partition count)
Per-node
Crash. Source re-sends after restart.
Commit errors (caused by infrastructure):
Error
Scope
Action
OCC conflict
Per-commit
Iceberg auto-retry (configurable via commit.retry.* table properties)
Retry limit exceeded
Per-commit
Retry in next commit cycle
Persistent commit failure
Per-commit
EventHandle.release(false). Source re-sends.
Startup errors:
Error
Scope
Action
Table does not exist (auto_create is false)
Pipeline
Pipeline fails to start
Catalog unreachable
Pipeline
Pipeline fails to start
DLQ entries include the original event and the error reason. The plugin uses Data Prepper's standard DlqProvider.
Orphan data files (written but not committed) do not affect data correctness. Cleanup is handled by external maintenance operations (removeOrphanFiles), which is the standard practice across the Iceberg ecosystem.
9. Operational considerations
Compaction. Streaming writes produce many small data files and, when DELETE/UPDATE operations are used, equality delete files. Both degrade read performance over time. Periodic compaction is required. Iceberg provides the rewriteDataFiles action for this purpose, and it can be run from Spark or any engine that supports Iceberg's maintenance operations.
Graceful shutdown. On shutdown, each node closes its TaskWriters and registers the resulting WriteResults in the coordinator. The CommitScheduler attempts a final commit. WriteResults that are not committed remain in the coordinator and are processed after restart. Events whose EventHandles are not released are re-sent by the source.
10. Configuration reference
Setting
Type
Required
Default
Description
catalog
Map<String, String>
Yes
Iceberg catalog properties, passed to CatalogUtil.buildIcebergCatalog(). The type property determines the catalog implementation (rest, glue, hive, jdbc, nessie, hadoop). Any catalog supported by Iceberg can be used. Authentication and storage configuration are part of these properties.
table_identifier
String
Yes
Table identifier, resolved per event. Supports Data Prepper expression language for dynamic routing (e.g., ${getMetadata("table_name")}). When the value is static, all events are written to a single table.
operation
String
No
None (all INSERT)
Operation type for each event. Supports Data Prepper expression language (e.g., ${getMetadata("opensearch_action")}). See the DELETE and UPDATE flow section for the default value mapping.
identifier_columns
List
No
Table's identifier-field-ids
Columns used for equality delete. YAML setting takes precedence over table definition. Required when operation is configured and the table does not have identifier-field-ids.
auto_create
Boolean
No
false
Create the table if it does not exist. If schema is provided, uses the definition. Otherwise, infers the schema from the first batch of events.
schema_evolution
Boolean
No
false
When true, automatically add new columns to the table schema when events contain fields not in the current schema. Type is inferred from the event value.
schema.columns
List
No
Column definitions for auto-create. Each entry has name (String), type (Iceberg type name), and optional required (Boolean, default false).
schema.partition_spec
List
No
Partition spec for auto-create. Each entry has column (String) and transform (String, e.g., day, hour, bucket[16], truncate[4]).
table_properties
Map<String, String>
No
(none)
Iceberg table properties to set when creating a table with auto_create. For example, format-version, write.format.default, write.parquet.compression-codec.
table_location
String
No
(none)
Custom storage location for the table when creating with auto_create. If not set, the catalog determines the location.
commit_interval
Duration
No
5m
Interval between Iceberg commits by the CommitScheduler.
flush_interval
Duration
No
5m
Interval between TaskWriter flushes on each thread. When elapsed, the thread closes the TaskWriter, writes a delta manifest file, and registers the ManifestFile metadata in the coordination store.
ack_poll_interval
Duration
No
5s
Interval for polling the coordination store to detect committed partitions and release EventHandles. Shorter intervals reduce acknowledgement latency but increase coordination store request volume. With 3 nodes at the default 5s interval, the coordination store receives approximately 36 requests per minute.
dlq
Object
No
Dead letter queue configuration. Uses Data Prepper's standard DlqProvider (e.g., dlq.s3.bucket, dlq.s3.key_path_prefix).
Acknowledgement timeout guidance. When using end-to-end acknowledgements, the source's acknowledgement timeout must be greater than flush_interval + commit_interval + ack_poll_interval. With default settings, this is approximately 10 minutes. To account for leader failover scenarios, a timeout of at least 20 minutes is recommended.
11. Limitations and future work
Limitations
CoW write mode is not supported. The plugin uses equality delete (MoR) only.
Exactly-once delivery is not supported. The plugin provides at-least-once delivery.
In most scenarios, consistency can be ensured with the current design. But there are the edge case that source events can be processed more than twice when node failure.
Schema evolution supports column addition only. Column deletion, type changes, and renames are not supported.
Writing to many partitions simultaneously increases memory consumption, because PartitionedFanoutWriter keeps a file writer open for each active partition.
When table_identifier is dynamic, each resolved table maintains its own set of TaskWriters (one per thread). With many distinct tables, the total number of open writers grows as (number of tables) x (number of threads) x (partitions per table). There is currently no limit on the number of concurrent writers. A maximum writer count with eviction of idle writers may be needed for high-cardinality dynamic routing.
Future enhancements
Feature
Description
Schema registry
Obtain schema from an external schema registry.
Write properties passthrough
Override Iceberg writer properties from sink configuration.
Commit branch
Write to a specific Iceberg branch for staging/validation workflows.
Case-insensitive field matching
Match event fields to table columns ignoring case.
CoW write mode
Copy-on-Write mode for maximum read performance.
Exactly-once delivery
Record source offsets in snapshot properties for duplicate detection on recovery.
[RFC] Iceberg Sink Plugin
This RFC proposes a new
icebergsink plugin for Data Prepper that writes events into Apache Iceberg tables. The plugin supports INSERT, UPDATE, and DELETE operations, enabling use cases from log ingestion to change data replication.Related: #6652 (S3 Tables Iceberg proposal), #6552 (Iceberg CDC Source Plugin)
1. Context
Apache Iceberg is an open table format widely used for lakehouse architectures on storage like S3, GCS, and HDFS. Iceberg tables are written by engines like Spark, Flink, and Trino, and are commonly used as the central store for analytical data.
Data Prepper already has an Iceberg CDC Source plugin (#6552) that reads changes from Iceberg tables. An Iceberg Sink plugin completes the picture by enabling Data Prepper to write data into Iceberg tables from any source, including HTTP, S3, Kafka, RDS, and the Iceberg Source itself.
2. Motivation
Data Prepper collects data from diverse sources (logs, metrics, traces, change streams) and delivers them to sinks such as OpenSearch, S3, and Kafka. Adding an Iceberg sink enables the following use cases.
Log and event analytics in the lakehouse. Organizations using Iceberg-based lakehouses can ingest logs, metrics, and traces directly from Data Prepper into Iceberg tables, making them queryable by Spark, Trino, Athena, and other engines without an intermediate ETL step.
CDC replication to Iceberg. Data Prepper's RDS Source captures row-level changes from relational databases. An Iceberg sink enables replicating these changes into Iceberg tables for analytical processing, replacing custom Spark/Flink jobs.
Iceberg-to-Iceberg synchronization. Combined with the Iceberg CDC Source (#6552), the sink enables replicating changes between Iceberg tables across different catalogs or environments (e.g., production to analytics).
3. Proposal
A new
icebergsink plugin that writes Data Prepper events into Iceberg tables using the Iceberg Java API.Features
Pipeline examples
Append-only: HTTP source to Iceberg
Events received via HTTP are written as new rows to an existing Iceberg table.
Row-level changes: RDS Source to Iceberg (multi-table)
Replicate row-level changes from a relational database to Iceberg tables. The
table_identifieruses the expression language to route each event to the corresponding Iceberg table based on the source table name. Theoperationsetting reads the operation type from the event metadata set by the RDS Source.Multiple static tables
To write different events to different tables with static table names, use multiple sink entries. Combined with Data Prepper's
routefeature, each sink receives only the events matching its route condition.Auto-create with schema definition
When the destination table does not exist yet, the plugin can create it from a schema definition in the pipeline YAML. Column types use Iceberg type names directly. Event fields are matched to table columns by name.
Auto-create with schema inference
When
auto_createis true and no schema definition is provided, the plugin infers the schema from the first batch of events. Nested JSON objects are inferred as Iceberg StructType, arrays as ListType (element type inferred from the first element), and scalar types are mapped based on Java type (Boolean to BooleanType, Integer/Long to LongType, Float/Double to DoubleType, else StringType).Schema inference has limitations: it cannot distinguish between int and long, float and double, or string and timestamp. Empty arrays and objects are inferred as StringType. For precise type control, use a schema definition or create the table in advance.
The full configuration reference is at the end of this document.
4. Background: Iceberg write model
This section explains the Iceberg concepts relevant to the sink's design.
4.1 Table structure
graph TD CAT["Catalog"] -->|points to| MF["Metadata file"] MF -->|current snapshot| ML["Manifest list"] ML --> M1["Manifest file"] ML --> M2["Manifest file"] M1 --> D1["Data file (Parquet)"] M1 --> D2["Data file (Parquet)"] M2 --> D3["Data file (Parquet)"] D1 --- S["Object storage (S3 / GCS / HDFS)"] D2 --- S D3 --- SAn Iceberg table stores its actual data in data files (Parquet, ORC, or Avro) on object storage such as S3. Separately from the data files, Iceberg maintains a metadata layer that tracks which data files belong to the current table state. This separation of data and metadata is a key design principle of Iceberg.
The metadata layer consists of a metadata file, manifest lists, and manifest files, forming a tree structure that ultimately points to the data files. A catalog (such as REST, Glue, or Hive Metastore) stores the location of the current metadata file for each table. The catalog is the entry point for all table operations.
Every write operation creates a new immutable snapshot. A snapshot does not copy or move data files. Instead, it records a new set of manifest files that reference the data files making up the table at that point in time. Each snapshot knows which data files were added and which were removed compared to the previous snapshot.
A commit is the atomic operation that publishes a new snapshot. The writer creates new metadata and manifest files on storage, then asks the catalog to atomically update the metadata pointer. Iceberg uses Optimistic Concurrency Control (OCC) to handle concurrent commits: if another writer committed in the meantime, the operation is retried against the updated metadata.
4.2 Write modes for DELETE and UPDATE
Iceberg supports two approaches for handling DELETE and UPDATE operations. The approach is not fixed per table. Different engines, or the same engine at different times, can write to the same table using different approaches.
Copy-on-Write (CoW). When a row in a data file is updated or deleted, the entire data file is rewritten with the modification applied. The snapshot records the old file as removed and the new file as added. This produces the best read performance because there are no delete files to reconcile at query time, but write cost is high because unchanged rows must also be rewritten.
Merge-on-Read (MoR). Instead of rewriting data files, a separate delete file records which rows should be considered deleted. Readers merge the data files and delete files at query time. This is cheaper to write because existing data files are not touched, but adds read-time overhead. Periodic compaction merges delete files into data files, eliminating the read-time cost and reclaiming storage.
MoR uses delete files, which come in three forms:
user_id = 123)Note: some query engines do not support reading equality deletes (e.g., Snowflake, PyIceberg). Tables containing equality delete files may not be readable by these engines until the delete files are compacted away.
5. Architecture
5.1 Overview
The plugin's processing is split into two paths.
Write path. Data Prepper's pipeline framework calls
Sink.output(records)concurrently from multiple threads on each node. Since Iceberg'sTaskWriteris not thread-safe, each thread maintains its own instance. Whentable_identifieris dynamic, the plugin resolves the target table for each event and maintains a separate TaskWriter per table (similar to the OpenSearch Sink's dynamic index routing). In a multi-node cluster, every node writes data files independently to storage. The TaskWriter handles file size management automatically (rolling to a new file whentargetFileSizeis reached).Commit path. A
CommitSchedulerruns on a single leader node. Leader election uses Data Prepper's existing lease-based coordination infrastructure (EnhancedSourceCoordinator), which is currently used by Source plugins such as the Iceberg CDC Source. This RFC proposes extending it to support Sink plugins as well. At eachcommit_interval(default 5 minutes), the scheduler collects pending ManifestFile metadata from all nodes, reads the delta manifest files from storage to reconstruct the write results, and commits them to Iceberg.flowchart TB subgraph WritePath["Write path (all nodes)"] direction TB Output["Sink.output(records)"] --> Convert["Event → GenericRecord"] Convert --> TW["TaskWriter.write()"] end TW --> Storage[("Storage\n(S3 / GCS / HDFS)")] TW --> CS[("Coordination Store")] subgraph CommitPath["Commit path (leader node only)"] direction TB Scheduler["CommitScheduler\n(every commit_interval)"] --> Collect["Collect pending\nWriteResultPartitions"] Collect --> Commit["Iceberg commit"] Commit --> Ack["EventHandle.release(true)"] end CS --> Collect Commit --> Catalog[("Iceberg Catalog\n(REST / Glue / Hive / ...)")]sequenceDiagram participant N1 as Node 1 participant N2 as Node 2 participant ST as Storage (S3/GCS) participant CS as Coordination Store participant L as Leader Node participant CAT as Iceberg Catalog Note over N1,N2: Write path (flush on flush_interval) N1->>ST: TaskWriter writes data/delete files N1->>ST: Write delta manifest file to table location N1->>CS: Register ManifestFile metadata N2->>ST: TaskWriter writes data/delete files N2->>ST: Write delta manifest file to table location N2->>CS: Register ManifestFile metadata Note over L: Commit path (every commit_interval) L->>CS: Collect pending ManifestFile metadata L->>ST: Read delta manifest files ST-->>L: DataFile and DeleteFile metadata L->>CAT: Commit snapshot (newAppend / newRowDelta) CAT-->>L: Commit success L->>ST: Delete committed delta manifest files L->>CS: Mark partitions complete L->>L: EventHandle.release(true) for all committed events5.2 Write mode
When handling DELETE and UPDATE operations, the plugin uses equality delete with in-batch positional delete optimization, the same approach as the Flink Iceberg sink. Equality delete writes a small delete file instead of rewriting existing data files, making it suitable for high-frequency streaming writes. For rows INSERTed and then DELETEd within the same batch, the sink already knows the file path and row position, so it uses the cheaper positional delete (or Deletion Vectors on v3 tables) instead of equality delete.
This is a pragmatic choice for streaming. Depending on the query engines used to read the table, different write strategies may be preferable (e.g., CoW for engines that do not support equality deletes, or positional delete only for better read performance). The plugin is designed with extensibility in mind so that alternative strategies can be offered as configuration options in the future.
5.3 Commit coordination
sequenceDiagram participant N1 as Node 1 participant N2 as Node 2 participant CS as Coordination Store participant L as Leader Node participant CAT as Iceberg Catalog Note over N1,N2: flush triggered (flush_interval reached) N1->>ST: Write delta manifest file to table location N1->>CS: Register WriteResultPartition (ManifestFile metadata) N2->>ST: Write delta manifest file to table location N2->>CS: Register WriteResultPartition (ManifestFile metadata) Note over L: CommitScheduler wakes (every commit_interval) L->>CS: Acquire pending WriteResultPartitions CS-->>L: ManifestFile metadata from Node 1 and Node 2 L->>ST: Read delta manifest files, reconstruct DataFiles alt Append-only (no delete files) L->>CAT: table.newAppend() with all data files else With delete files L->>CAT: table.newRowDelta() per batch in order end CAT-->>L: Commit success L->>ST: Delete committed delta manifest files L->>CS: Mark partitions complete L->>L: EventHandle.release(true) Note over L: On commit failure L->>L: Iceberg auto-retry (OCC) L-->>L: If exhausted, retry next cycleEach Iceberg commit creates a new snapshot with associated metadata and manifest files. If every node committed independently at high frequency, the resulting volume of snapshots and small data files would degrade read performance and increase OCC conflicts. Centralizing commits into a single leader at a controlled interval (default 5 minutes) keeps snapshot and file counts manageable. The Flink Iceberg sink and the Kafka Connect Iceberg sink both use this single-committer pattern.
The plugin follows the same approach by extending Data Prepper's
EnhancedSourceCoordinatorto support Sink plugins.Framework changes required. The
EnhancedSourceCoordinatoritself is a general-purpose lease-based partition manager with no Source-specific logic. Extending it to Sink plugins requires the following changes, all additive with no modification to existing code:UsesEnhancedSinkCoordinationinterface indata-prepper-api, mirroringUsesEnhancedSourceCoordination(two methods:setEnhancedSourceCoordinatorandgetPartitionFactory).Pipeline.execute(), after the existing Source coordinator injection block, an additional block that checks each Sink forUsesEnhancedSinkCoordinationand injects a coordinator instance. This is approximately 5 lines of code following the same pattern as the existing Source block.The
EnhancedSourceCoordinatorimplementation, the coordination store, and all existing Source plugins are not modified. Each Sink that uses coordination gets its own coordinator instance with its own partition namespace, so there is no interaction with Source partitions.The coordinator is a general-purpose lease-based partition manager backed by Data Prepper's coordination store.
Partition types used by the plugin:
LeaderPartitionWriteResultPartitionCommit behavior depends on the write mode:
table.newAppend()table.newRowDelta(). Merging would break equality delete semantics because Iceberg applies equality deletes only to data files with a strictly lower sequence number.When
table_identifieris dynamic, eachWriteResultPartitionincludes the resolved table identifier. The CommitScheduler groups pending partitions by table and commits each table independently.Event ordering in CDC pipelines. When the pipeline handles DELETE or UPDATE operations, the
workerssetting (which controls the number of ProcessWorkers reading from the buffer) must remain at 1 (the default). Withworkers > 1, multiple ProcessWorkers drain events from the buffer concurrently, which can reorder causally related events for the same key (e.g., an INSERT followed by a DELETE). If a DELETE is committed before its corresponding INSERT, the equality delete applies only to data files with a lower sequence number and will not affect the later-committed INSERT, leaving stale data in the table. This ordering constraint is not specific to the Iceberg sink. It applies to any sink that processes CDC events with DELETE semantics, including the OpenSearch sink. CDC sources (RDS Source, Iceberg Source) produce events on a single node, soworkers: 1is sufficient to preserve causal order.5.4 Leader failover
When the leader node fails, the coordinator's lease expires and another node becomes leader. Uncommitted
WriteResultPartitionentries remain in the coordinator for the new leader to process. Because the delta manifest files are persisted on storage (S3/GCS) independently of the coordination store, the new leader can read them using the registered ManifestFile metadata and proceed with the commit. No write results are lost due to leader failover.To prevent duplicate commits when the leader fails mid-commit, the CommitScheduler maintains a monotonically increasing
commitSequence(long). Each commit records this sequence in the Iceberg snapshot's summary properties (data-prepper.commit-sequence). Before committing, the CommitScheduler writes thecommitSequenceinto each pending partition's progress state in the coordination store.When a new leader starts, it scans the snapshot history (walking parent pointers) to find the maximum committed sequence. Pending partitions whose
commitSequenceis at or below this maximum are already committed and are simply marked complete without re-committing. Partitions with a highercommitSequenceare committed normally. This is the same approach used by the Flink Iceberg sink (flink.job-id+flink.max-committed-checkpoint-idin snapshot summary).When delete files are present and WriteResults are committed individually, each commit records
commitSequence-subIndex(e.g.,5-0,5-1). On failover recovery, the new leader determines which sub-commits have already succeeded and resumes from the next one.6. Processing flow
6.1 Append-only flow
By default, the plugin writes all events as new rows (INSERT). This is the behavior when the
operationsetting (described in the next section) is not configured.Each event received in
output()is converted to an IcebergGenericRecordusing the table schema (see Event conversion below). The record is then written to the TaskWriter, which handles partitioning and file management internally: it computes the partition value from the record using the table's partition spec, routes the record to the appropriate per-partition file writer, and rolls to a new file whentargetFileSizeis reached.Each thread maintains a long-lived TaskWriter across multiple
output()calls. A flush is triggered whenflush_intervalhas elapsed since the last flush. On flush, the thread closes the TaskWriter, writes the resultingWriteResultas a delta manifest file to the table's storage location, registers the ManifestFile metadata in the coordination store, and creates a new TaskWriter. This keeps the number of WriteResults per commit cycle to approximately (number of nodes) x (number of threads), rather than one peroutput()call. Each thread flushes independently; the CommitScheduler commits whatever ManifestFile metadata is available in the coordinator at that point. Delta manifests from threads that have not yet flushed are committed in the next cycle. Because the flush and commit cycles are independent, the maximum commit latency iscommit_interval + flush_interval.After writing, the TaskWriter's
WriteResult(containing the paths of data files written) is registered in the coordination store. TheEventHandles for the written events are not stored in the coordinator (they are not serializable). Instead, each node holds them in memory, keyed by the WriteResultPartition ID. When the CommitScheduler marks a partition as COMPLETED in the coordinator, the originating node detects this via polling and callsEventHandle.release(true)for the corresponding events.Acknowledgement flow. Each thread collects EventHandles during
output()and associates them with the WriteResultPartition ID on flush. Because the CommitScheduler runs on a single leader node and EventHandles are JVM-local objects that cannot be accessed from other nodes, each node runs a polling thread that queries the coordination store for completed WriteResultPartitions at a configurable interval (ack_poll_interval, default 5 seconds). When a completed partition matches a locally held EventHandle set, the handles are released withrelease(true), signaling the source that the events have been durably committed to Iceberg. On shutdown, all unreleased EventHandles are released withrelease(false).6.2 DELETE and UPDATE flow
When
operationis configured, the plugin resolves the operation type for each event using the configured expression (e.g.,${getMetadata("change_event_type")}). The resolved value is mapped to an Iceberg operation (case-insensitive):index,insert,create,c,i,rris Debezium's snapshot (read) operationupdate,udelete,dRDS Source sets two metadata attributes:
opensearch_action(values:index,delete) andchange_event_type(values:insert,update,delete). For Iceberg CDC pipelines, usechange_event_typebecauseopensearch_actionmaps both INSERT and UPDATE toindex, which would prevent equality deletes for UPDATE events.The write path uses
BaseEqualityDeltaWriterfrom Iceberg core. For each event:insertedRowMap) for potential in-batch deduplication.insertedRowMap. If the key was inserted earlier in the same batch, the writer emits a positional delete (or Deletion Vector on v3 tables), which is cheaper to read than an equality delete. If the key is not found (the row was written in a previous commit), the writer emits an equality delete file using theidentifier_columns.identifier_columnsspecifies which columns uniquely identify a row in the table (e.g., a primary key). The equality delete file contains only these columns' values, and at read time, any row matching those values is treated as deleted. This setting is required whenoperationis configured, because without it the plugin cannot construct equality delete files. If the table already hasidentifier-field-idsset in its schema, those are used by default. The YAMLidentifier_columnssetting overrides the table definition, which is useful when the table does not haveidentifier-field-idsset (the more common case in practice).For partitioned tables, the plugin maintains a separate
BaseEqualityDeltaWriterper partition, so that each partition has its owninsertedRowMap.The plugin selects the TaskWriter implementation automatically based on the table's partition spec and whether
identifier_columnsis configured:UnpartitionedWriterPartitionedFanoutWriterBaseEqualityDeltaWriterBaseEqualityDeltaWriter6.3 Event conversion
Events (
JacksonEvent) are schema-less JSON. The plugin converts them to IcebergGenericRecordusing the table schema as the reference, applying type coercion where possible:int,longintValue()/longValue()) or String (parseInt()/parseLong())float,doubledecimal(P,S)BigDecimalbooleanparseBoolean())stringtoString()), Boolean (toString())timestamp,timestamptzdatetimebinaryuuidUUID.fromString())structlistmapMismatch handling is per-event. One bad event does not fail the entire batch.
6.4 Schema management
Table exists (default). The plugin loads the table and uses
table.schema()for conversion. No schema configuration needed.Auto-create. When
auto_createis true and the table does not exist, theoutput()thread creates it at the point where it first attempts to write to the table. This is the natural place for table creation because the thread needs the table to exist before writing data files. In a multi-node cluster, concurrent creation attempts are handled by catchingAlreadyExistsExceptionand falling back toloadTable(). Ifschemais provided, the table is created from the definition. Ifschemais not provided, the schema is inferred from the first batch of events. After creation, the table schema is used for conversion.When
table_identifieris dynamic, auto-create applies independently to each resolved table. In this case,schemacannot be specified (the plugin rejects this combination at startup) because different tables typically have different schemas. Each table's schema is inferred from the first batch of events routed to that table.Schema evolution. When
schema_evolutionis true and an event contains fields not present in the current table schema, the plugin adds the missing columns automatically. The process follows the same approach as the Kafka Connect Iceberg Sink: flush the current file, calltable.updateSchema().addColumn()to commit the new columns, reinitialize the TaskWriter with the updated schema, and re-convert the event. Column types are inferred from the event value using the same logic as schema inference in auto-create. Concurrent schema updates from multiple nodes are handled by refreshing the table metadata and skipping columns that have already been added by another node.Schema evolution supports column addition only. Column deletion, type changes (e.g., int to long), and renames require precise type information that is not available from schemaless JSON events. Supporting these operations would require an external schema source such as a schema registry.
When multiple nodes infer different types for the same new field, the first committed type wins. Subsequent nodes refresh the table metadata, discover the existing column, and convert values using the established type. If conversion fails, the standard mismatch handling applies (NULL for optional columns, DLQ for required columns).
If
auto_createis false (the default) and the table does not exist, the plugin fails at startup. Whentable_identifieris dynamic, the table name is resolved at runtime. If a resolved table does not exist, the event is sent to DLQ.7. Data guarantees
7.1 At-least-once delivery
With end-to-end acknowledgements enabled. Events are acknowledged back to the source only after the Iceberg commit succeeds and the originating node detects the commit via coordination store polling. This provides at-least-once delivery.
The maximum acknowledgement latency under normal operation is
flush_interval + commit_interval + ack_poll_interval(default: 5m + 5m + 5s = 10m 5s). The source's acknowledgement timeout must be set longer than this value. When a leader failover occurs, an additional delay of up toLEASE_DURATION (3m) + commit_interval (5m)is added while the new leader takes over and completes pending partitions, bringing the worst case to approximately 18 minutes with default settings.EventHandles are held in memory on each node, not in the coordination store. If a node crashes, its in-memory EventHandles are lost and the corresponding acknowledgements are never sent. The source treats this as a timeout and re-sends the events.
Duplicate events can occur in the following scenarios.
Worker node crash after partition registration. If a worker node crashes after successfully registering a WriteResultPartition in the coordination store but before the EventHandles are released, the leader will commit the data to Iceberg. However, the EventHandles are lost with the crashed node, so the source re-sends the same events, resulting in duplicate writes. If the worker crashes before partition registration completes, no data is committed for those events, and the source re-send produces the first (and only) write.
Leader node crash after Iceberg commit. If the leader crashes after a successful Iceberg commit but before marking the partitions as COMPLETED in the coordination store, the new leader recovers the commit state from Iceberg snapshot properties (commitSequence) and marks the partitions as complete. If this recovery completes before the source's acknowledgement timeout, the originating nodes detect the completion via polling and release the EventHandles normally, avoiding duplicates. If the recovery takes longer than the acknowledgement timeout, the source re-sends the events, resulting in duplicates.
Without end-to-end acknowledgements. The source does not wait for confirmation from the sink. Events may be lost if the sink fails before committing. This provides at-most-once delivery. Throughput is higher because the source does not block on acknowledgements.
Choosing between the two. Whether acknowledgements are enabled depends on the source configuration (e.g., RDS Source enables them by default, HTTP Source does not). Use at-least-once (acknowledgements enabled) when data completeness matters more than avoiding duplicates, such as CDC replication where missing a DELETE would leave stale rows. Use at-most-once (acknowledgements disabled) when occasional data loss is acceptable and throughput is the priority, such as high-volume log ingestion where duplicates or gaps do not affect analytical results.
7.2 Duplicate impact by write mode
DISTINCT,GROUP BY).ROW_NUMBER()deduplication in queries.7.3 Internal deduplication
Duplicate commits from the same WriteResult (e.g., during leader failover) are prevented by the commit ID recorded in snapshot properties (see Leader failover above).
8. Error handling
Errors are handled differently based on their cause.
Event conversion errors (caused by event content):
File write errors (caused by infrastructure):
Commit errors (caused by infrastructure):
commit.retry.*table properties)EventHandle.release(false). Source re-sends.Startup errors:
auto_createis false)DLQ entries include the original event and the error reason. The plugin uses Data Prepper's standard
DlqProvider.Orphan data files (written but not committed) do not affect data correctness. Cleanup is handled by external maintenance operations (
removeOrphanFiles), which is the standard practice across the Iceberg ecosystem.9. Operational considerations
Compaction. Streaming writes produce many small data files and, when DELETE/UPDATE operations are used, equality delete files. Both degrade read performance over time. Periodic compaction is required. Iceberg provides the
rewriteDataFilesaction for this purpose, and it can be run from Spark or any engine that supports Iceberg's maintenance operations.Graceful shutdown. On shutdown, each node closes its TaskWriters and registers the resulting WriteResults in the coordinator. The CommitScheduler attempts a final commit. WriteResults that are not committed remain in the coordinator and are processed after restart. Events whose EventHandles are not released are re-sent by the source.
10. Configuration reference
catalogCatalogUtil.buildIcebergCatalog(). Thetypeproperty determines the catalog implementation (rest, glue, hive, jdbc, nessie, hadoop). Any catalog supported by Iceberg can be used. Authentication and storage configuration are part of these properties.table_identifier${getMetadata("table_name")}). When the value is static, all events are written to a single table.operation${getMetadata("opensearch_action")}). See the DELETE and UPDATE flow section for the default value mapping.identifier_columnsidentifier-field-idsoperationis configured and the table does not haveidentifier-field-ids.auto_createfalseschemais provided, uses the definition. Otherwise, infers the schema from the first batch of events.schema_evolutionfalseschema.columnsname(String),type(Iceberg type name), and optionalrequired(Boolean, default false).schema.partition_speccolumn(String) andtransform(String, e.g.,day,hour,bucket[16],truncate[4]).table_propertiesauto_create. For example,format-version,write.format.default,write.parquet.compression-codec.table_locationauto_create. If not set, the catalog determines the location.commit_interval5mflush_interval5mack_poll_interval5sdlqDlqProvider(e.g.,dlq.s3.bucket,dlq.s3.key_path_prefix).Acknowledgement timeout guidance. When using end-to-end acknowledgements, the source's acknowledgement timeout must be greater than
flush_interval + commit_interval + ack_poll_interval. With default settings, this is approximately 10 minutes. To account for leader failover scenarios, a timeout of at least 20 minutes is recommended.11. Limitations and future work
Limitations
PartitionedFanoutWriterkeeps a file writer open for each active partition.table_identifieris dynamic, each resolved table maintains its own set of TaskWriters (one per thread). With many distinct tables, the total number of open writers grows as (number of tables) x (number of threads) x (partitions per table). There is currently no limit on the number of concurrent writers. A maximum writer count with eviction of idle writers may be needed for high-cardinality dynamic routing.Future enhancements
12. References