Skip to content

[FLINK-39637][state] Add filter pushdown support for savepoint connector#28142

Open
soin08 wants to merge 6 commits into
apache:masterfrom
soin08:FLINK-39637
Open

[FLINK-39637][state] Add filter pushdown support for savepoint connector#28142
soin08 wants to merge 6 commits into
apache:masterfrom
soin08:FLINK-39637

Conversation

@soin08
Copy link
Copy Markdown

@soin08 soin08 commented May 11, 2026

What is the purpose of the change

This pull request adds filter pushdown support (SupportsFilterPushDown) for the savepoint SQL connector. When a SQL query filters on the primary key column (e.g., WHERE k = 5, WHERE k IN (3, 7), WHERE k BETWEEN 3 AND 6), the filter is pushed into the savepoint source, enabling two levels of pruning:

  1. Key-group pruning — for exact key filters, only the input splits whose key-group ranges contain matching keys are created, skipping irrelevant splits entirely.
  2. Key-iteration pruning — within each split, only keys that pass the filter are iterated, avoiding deserialization of non-matching key state.

Brief change log

  • SavepointDynamicTableSource now implements SupportsFilterPushDown
  • New SavepointKeyFilter interface with three implementations: ExactKeyFilter, RangeKeyFilter, EmptyKeyFilter
  • New SavepointFilters utility that converts Flink ResolvedExpression predicates (=, OR, AND, BETWEEN, >, >=, <, <=) into SavepointKeyFilter instances
  • KeyedStateInputFormat.createInputSplits() prunes key-group ranges for exact key filters
  • KeyedStateInputFormat.open() wraps the key iterator with a filtering iterator when a key filter is present
  • SavepointReader.readKeyedState() gains a new overload accepting an optional SavepointKeyFilter
  • SavepointDataStreamScanProvider passes the filter through to readKeyedState()

Verifying this change

This change added tests and can be verified as follows:

  • SavepointFiltersTest — unit tests for filter expression extraction
  • SavepointDynamicTableSourceTest — end-to-end correctness tests against real savepoint data:

Does this pull request potentially affect one of the following parts:

Area Affected?
Dependencies (does it add or upgrade a dependency) no
The public API, i.e., is any changed class annotated with @Public(Evolving) no
The serializers no
The runtime per-record code paths (performance sensitive) no
Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper no
The S3 file system connector no

Documentation

  • Does this pull request introduce a new feature? yes
  • If yes, how is the feature documented? JavaDocs

Was generative AI tooling used to co-author this PR?
  • [ X] Yes
    Co-authored by GitHub Copilot

@flinkbot
Copy link
Copy Markdown
Collaborator

flinkbot commented May 11, 2026

CI report:

Bot commands The @flinkbot bot supports the following commands:
  • @flinkbot run azure re-run the last Azure build

@soin08 soin08 marked this pull request as ready for review May 12, 2026 21:13
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants