Skip to content

Support report backlog information to Flink #2509

@xishuaidelin

Description

@xishuaidelin

Search before asking

  • I searched in the issues and found nothing similar.

Motivation

In FLIP-309, Flink introduced the concept of ProcessingBacklog to indicate whether a record should be processed in a low-latency or high-throughput manner. ProcessingBacklog can be used to dynamically adjust a job’s checkpoint interval at runtime.

The Fluss Source can set ProcessingBacklog by marking offsets at the starting position to identify backlog data and report this signal to Flink, enabling Flink to perform relevant optimizations based on it.

Proposed Changes

  • Add backlog boundary tracking in FlinkSourceEnumerator

    • Introduce backlogOffsets: Map<TableBucket, Long> to store the backlog boundary offset per bucket.
    • Add/extend resetBacklog(...) to initialize splits and persist the backlog boundary offsets.
    • Update initPartitionedSplits() to pass initializeAtBeginning / backlog boundary offset information into the created splits.
    • Update handleSourceEvent(...) to handle a new BacklogFinishEvent and update the enumerator’s backlog tracking state.
  • Propagate backlog boundary offset via split metadata

    • Extend split types to carry backlog boundary offset:
      • LogSplit: add backlogOffset
      • HybridSnapshotLogSplit: add backlogOffset
  • Introduce a new source event for backlog completion

    • Add BacklogFinishEvent extends SourceEvent to signal that a specific TableBucket has finished consuming backlog (i.e., reader has reached/passed the backlog boundary offset).
  • Report backlog completion from FlinkSourceSplitReader

    • Add new fields:
      • backlogMarkedOffsets: Map<TableBucket, Long>: backlog boundary offsets per bucket (from assigned splits).
      • onlySnapshotBuckets: Set<TableBucket>: buckets with snapshot-only data (no log backlog to track).
      • backlogEventSentTbls: Set<TableBucket>: dedup to ensure BacklogFinishEvent is sent once per bucket.
      • context: SourceReaderContext: used to send BacklogFinishEvent to the coordinator.
    • Update reader logic to detect backlog completion and emit events:
      • fetch(...): hook backlog completion reporting when a bucket completes backlog.
      • subscribeLog(...) and forLogRecords(...): check current log offsets against backlogMarkedOffsets; once the boundary is reached, send BacklogFinishEvent (deduplicated).

Anything else?

No response

Willingness to contribute

  • I'm willing to submit a PR!

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels
    No fields configured for Feature.

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions