[FLINK-39261][table] Add FROM_CHANGELOG built-in process table function#27901
[FLINK-39261][table] Add FROM_CHANGELOG built-in process table function#27901twalthr merged 1 commit intoapache:masterfrom
Conversation
There was a problem hiding this comment.
Thanks for the PR, @raminqaf! In general, it looks very consistent with to_changelog.
Here is my first set of reviews. In this one, I think there are some minor details we have to decide on and two bigger things:
- Going with retract by default as the output mode
- Finding a nice way of expressing the changelog mode with the BuiltInFunctionsDefinition
See below for more details
gustavodemorais
left a comment
There was a problem hiding this comment.
Thanks for the updates, Ramin! Added some comments, take a look
d6be0aa to
9d75cbb
Compare
df99efe to
71394ba
Compare
gustavodemorais
left a comment
There was a problem hiding this comment.
Thanks for all the updates and the replies, @raminqaf. I think we clarified most of the major things and we're almost there!
ce6aee2 to
614d33e
Compare
gustavodemorais
left a comment
There was a problem hiding this comment.
Looks great, @raminqaf! Thanks for all the updates. LGTM! We can continue interacting in the next PRs.
9d2cdcb to
f25c489
Compare
7fe7e1c to
917d73a
Compare
|
Hey @raminqaf, I'd suggest doing changing this in the next PR so we close this and we can adjust it for both to and from changelog 🙂 |
What is the purpose of the change
Implement the FROM_CHANGELOG built-in process table function as specified in FLIP-564 (append-only stream to retract stream, flat mode).
FROM_CHANGELOG converts an append-only table with an explicit operation code column into a (potentially updating) dynamic table. Each input record is mapped to one output record with the appropriate RowKind. The operation column is interpreted by the engine and removed from the output. The implementation is stateless and uses row semantics.
Brief change log
changelogModeResolvertoBuiltInFunctionDefinitionso built-in PTFs can declare their output changelog mode without implementingChangelogFunctionFlinkChangelogModeInferenceProgramto check for the resolver on built-in PTFsverifyPtfTableArgsForUpdatesFromChangelogTypeStrategywith input validation (op column existence, STRING type, op_mapping value validation, UPDATE_BEFORE required) and output type inference (removes op column from output)ValidationOnlyInputTypeStrategybase class for PTFs that use static argumentsFROM_CHANGELOGbuilt-in function definition withROW_SEMANTIC_TABLEandChangelogMode.all()outputFromChangelogFunctionruntime implementation usingProjectedRowDatafor zero-copy projectionfromChangelog()convenience method toTableAPIChangelogContext.getArgumentValue()for inspecting function arguments during changelog inferenceVerifying this change
This change added tests and can be verified as follows:
FromChangelogTypeStrategyinput validation (valid mapping, op column not found, wrong type, invalid descriptor, invalid RowKind, UPDATE_BEFORE required, duplicate RowKind, upsert mapping rejected)changelogMode=[I,UB,UA,D]for retract output,changelogMode=[I]for source input) and custom op_mappingProcessTableFunctionTestProcessTableFunctionTestto verify upsert mode with row semantics is rejectedDoes this pull request potentially affect one of the following parts:
@Public(Evolving): yes (Table.fromChangelog(),ChangelogContext.getArgumentValue())Documentation