Skip to content

[FLINK-39261][table] Add FROM_CHANGELOG built-in process table function#27901

Merged
twalthr merged 1 commit intoapache:masterfrom
raminqaf:FLINK-39261
Apr 20, 2026
Merged

[FLINK-39261][table] Add FROM_CHANGELOG built-in process table function#27901
twalthr merged 1 commit intoapache:masterfrom
raminqaf:FLINK-39261

Conversation

@raminqaf
Copy link
Copy Markdown
Contributor

@raminqaf raminqaf commented Apr 7, 2026

What is the purpose of the change

Implement the FROM_CHANGELOG built-in process table function as specified in FLIP-564 (append-only stream to retract stream, flat mode).

FROM_CHANGELOG converts an append-only table with an explicit operation code column into a (potentially updating) dynamic table. Each input record is mapped to one output record with the appropriate RowKind. The operation column is interpreted by the engine and removed from the output. The implementation is stateless and uses row semantics.

SELECT * FROM FROM_CHANGELOG(
    input => TABLE cdc_stream,
    op => DESCRIPTOR(op),
    op_mapping => MAP['c, r', 'INSERT', 'ub', 'UPDATE_BEFORE', 'ua', 'UPDATE_AFTER', 'd', 'DELETE']
)

Brief change log

  • Add changelogModeResolver to BuiltInFunctionDefinition so built-in PTFs can declare their output changelog mode without implementing ChangelogFunction
  • Update FlinkChangelogModeInferenceProgram to check for the resolver on built-in PTFs
  • Allow retract mode (with UPDATE_BEFORE) for row-semantic PTFs in verifyPtfTableArgsForUpdates
  • Add FromChangelogTypeStrategy with input validation (op column existence, STRING type, op_mapping value validation, UPDATE_BEFORE required) and output type inference (removes op column from output)
  • Introduce ValidationOnlyInputTypeStrategy base class for PTFs that use static arguments
  • Add FROM_CHANGELOG built-in function definition with ROW_SEMANTIC_TABLE and ChangelogMode.all() output
  • Add FromChangelogFunction runtime implementation using ProjectedRowData for zero-copy projection
  • Add fromChangelog() convenience method to Table API
  • Add ChangelogContext.getArgumentValue() for inspecting function arguments during changelog inference
  • Add documentation for FROM_CHANGELOG in changelog.md

Verifying this change

This change added tests and can be verified as follows:

  • Added unit tests for FromChangelogTypeStrategy input validation (valid mapping, op column not found, wrong type, invalid descriptor, invalid RowKind, UPDATE_BEFORE required, duplicate RowKind, upsert mapping rejected)
  • Added semantic tests covering: default op_mapping with all four change operations, custom op_mapping with comma-separated keys, unmapped codes dropped, custom op column name, Table API convenience method
  • Added plan tests verifying changelog mode propagation (changelogMode=[I,UB,UA,D] for retract output, changelogMode=[I] for source input) and custom op_mapping
  • Added plan test for retract mode with row semantics in ProcessTableFunctionTest
  • Updated ProcessTableFunctionTest to verify upsert mode with row semantics is rejected

Does this pull request potentially affect one of the following parts:

  • Dependencies (does it add or upgrade a dependency): no
  • The public API, i.e., is any changed class annotated with @Public(Evolving): yes (Table.fromChangelog(), ChangelogContext.getArgumentValue())
  • The serializers: no
  • The runtime per-record code paths (performance sensitive): no
  • Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: no
  • The S3 file system connector: no

Documentation

  • Does this pull request introduce a new feature? yes
  • If yes, how is the feature documented? docs / JavaDocs

@flinkbot
Copy link
Copy Markdown
Collaborator

flinkbot commented Apr 7, 2026

CI report:

Bot commands The @flinkbot bot supports the following commands:
  • @flinkbot run azure re-run the last Azure build

Copy link
Copy Markdown
Contributor

@gustavodemorais gustavodemorais left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the PR, @raminqaf! In general, it looks very consistent with to_changelog.

Here is my first set of reviews. In this one, I think there are some minor details we have to decide on and two bigger things:

  • Going with retract by default as the output mode
  • Finding a nice way of expressing the changelog mode with the BuiltInFunctionsDefinition

See below for more details

Comment thread docs/content/docs/sql/reference/queries/changelog.md Outdated
Comment thread docs/content/docs/sql/reference/queries/changelog.md Outdated
Comment thread docs/content/docs/sql/reference/queries/changelog.md Outdated
Comment thread docs/content/docs/sql/reference/queries/changelog.md Outdated
Comment thread docs/content/docs/sql/reference/queries/changelog.md Outdated
Comment thread docs/content/docs/sql/reference/queries/changelog.md Outdated
@github-actions github-actions Bot added the community-reviewed PR has been reviewed by the community. label Apr 7, 2026
Copy link
Copy Markdown
Contributor

@gustavodemorais gustavodemorais left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the updates, Ramin! Added some comments, take a look

Comment thread docs/content/docs/sql/reference/queries/changelog.md Outdated
Comment thread docs/content/docs/sql/reference/queries/changelog.md Outdated
@raminqaf raminqaf force-pushed the FLINK-39261 branch 5 times, most recently from d6be0aa to 9d75cbb Compare April 14, 2026 08:35
Copy link
Copy Markdown
Contributor

@twalthr twalthr left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Some initial comments.

Comment thread docs/content/docs/sql/reference/queries/changelog.md Outdated
Comment thread docs/content/docs/sql/reference/queries/changelog.md Outdated
Comment thread docs/content/docs/sql/reference/queries/changelog.md Outdated
Comment thread docs/content/docs/sql/reference/queries/changelog.md Outdated
Comment thread docs/content/docs/sql/reference/queries/changelog.md Outdated
@raminqaf raminqaf force-pushed the FLINK-39261 branch 6 times, most recently from df99efe to 71394ba Compare April 15, 2026 10:19
Copy link
Copy Markdown
Contributor

@gustavodemorais gustavodemorais left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for all the updates and the replies, @raminqaf. I think we clarified most of the major things and we're almost there!

Comment thread docs/content/docs/sql/reference/queries/changelog.md
@raminqaf raminqaf force-pushed the FLINK-39261 branch 3 times, most recently from ce6aee2 to 614d33e Compare April 16, 2026 12:22
Copy link
Copy Markdown
Contributor

@gustavodemorais gustavodemorais left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks great, @raminqaf! Thanks for all the updates. LGTM! We can continue interacting in the next PRs.

Comment thread flink-python/pyflink/table/table.py
@raminqaf raminqaf force-pushed the FLINK-39261 branch 3 times, most recently from 9d2cdcb to f25c489 Compare April 17, 2026 09:08
Copy link
Copy Markdown
Contributor

@twalthr twalthr left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Excellent PR @raminqaf. I just had a couple of minor improvement comments. Should be good to merge in the next iteration.

Comment thread docs/content/docs/sql/reference/queries/changelog.md
Copy link
Copy Markdown
Contributor

@twalthr twalthr left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM. Thanks @raminqaf.

@raminqaf raminqaf force-pushed the FLINK-39261 branch 3 times, most recently from 7fe7e1c to 917d73a Compare April 18, 2026 13:55
@gustavodemorais
Copy link
Copy Markdown
Contributor

Hey @raminqaf, I'd suggest doing changing this in the next PR so we close this and we can adjust it for both to and from changelog 🙂

@twalthr twalthr merged commit 4997ca4 into apache:master Apr 20, 2026
twalthr pushed a commit that referenced this pull request Apr 22, 2026
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

community-reviewed PR has been reviewed by the community.

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants