Skip to content

Commit a02799d

Browse files
committed
[FLINK-39261][table] Add FROM_CHANGELOG built-in process table function
1 parent 548ea5b commit a02799d

21 files changed

Lines changed: 1324 additions & 11 deletions

File tree

docs/content/docs/sql/reference/queries/changelog.md

Lines changed: 111 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,9 +30,119 @@ Flink SQL provides built-in process table functions (PTFs) for working with chan
3030

3131
| Function | Description |
3232
|:---------|:------------|
33+
| [FROM_CHANGELOG](#from_changelog) | Converts an append-only table with operation codes into a (potentially updating) dynamic table |
3334
| [TO_CHANGELOG](#to_changelog) | Converts a dynamic table into an append-only table with explicit operation codes |
3435

35-
<!-- Placeholder for future FROM_CHANGELOG function -->
36+
## FROM_CHANGELOG
37+
38+
The `FROM_CHANGELOG` PTF converts an append-only table with an explicit operation code column into a (potentially updating) dynamic table. Each input row is expected to have a string column that indicates the change operation. The operation column is interpreted by the engine and removed from the output.
39+
40+
This is useful when consuming Change Data Capture (CDC) streams from systems like Debezium where events arrive as flat append-only records with an explicit operation field. It's also useful to be used in combination with the TO_CHANGELOG function, when converting the append-only table back into an updating table after doing some specific transformation to the events.
41+
42+
Note: This version requires that your CDC data encodes updates using a full image (i.e. providing separate events for before and after the update). Please double-check whether your source provides both UPDATE_BEFORE and UPDATE_AFTER events. FROM_CHANGELOG is a very powerful function but might produce incorrect results in subsequent operations and tables, if not configured correctly.
43+
44+
### Syntax
45+
46+
```sql
47+
SELECT * FROM FROM_CHANGELOG(
48+
input => TABLE source_table,
49+
[op => DESCRIPTOR(op_column_name),]
50+
[op_mapping => MAP[
51+
'c, r', 'INSERT',
52+
'ub', 'UPDATE_BEFORE',
53+
'ua', 'UPDATE_AFTER',
54+
'd', 'DELETE'
55+
]]
56+
)
57+
```
58+
59+
### Parameters
60+
61+
| Parameter | Required | Description |
62+
|:-------------|:---------|:------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
63+
| `input` | Yes | The input table. Must be append-only. |
64+
| `op` | No | A `DESCRIPTOR` with a single column name for the operation code column. Defaults to `op`. The column must exist in the input table and be of type STRING. |
65+
| `op_mapping` | No | A `MAP<STRING, STRING>` mapping user-defined codes to Flink change operation names. Keys are user-defined codes (e.g., `'c'`, `'u'`, `'d'`), values are Flink change operation names (`INSERT`, `UPDATE_BEFORE`, `UPDATE_AFTER`, `DELETE`). Keys can contain comma-separated codes to map multiple codes to the same operation (e.g., `'c, r'`). When provided, only mapped codes are forwarded - unmapped codes are dropped. Each change operation may appear at most once across all entries. |
66+
67+
#### Default op_mapping
68+
69+
When `op_mapping` is omitted, the following standard names are used. They allow a reverse conversion from TO_CHANGELOG by default.
70+
71+
| Input code | Change operation |
72+
|:-------------------|:------------------|
73+
| `'INSERT'` | INSERT |
74+
| `'UPDATE_BEFORE'` | UPDATE_BEFORE |
75+
| `'UPDATE_AFTER'` | UPDATE_AFTER |
76+
| `'DELETE'` | DELETE |
77+
78+
### Output Schema
79+
80+
The output contains all input columns except the operation code (e.g., op) column, which is interpreted by Flink's SQL engine and removed. Each output row carries the appropriate change operation (INSERT, UPDATE_BEFORE, UPDATE_AFTER, or DELETE).
81+
82+
```
83+
[all_input_columns_without_op]
84+
```
85+
86+
### Examples
87+
88+
#### Basic usage with standard op names
89+
90+
```sql
91+
-- Input (append-only):
92+
-- +I[id:1, op:'INSERT', name:'Alice']
93+
-- +I[id:1, op:'UPDATE_BEFORE', name:'Alice']
94+
-- +I[id:1, op:'UPDATE_AFTER', name:'Alice2']
95+
-- +I[id:2, op:'DELETE', name:'Bob']
96+
97+
SELECT * FROM FROM_CHANGELOG(
98+
input => TABLE cdc_stream
99+
)
100+
101+
-- Output (updating table):
102+
-- +I[id:1, name:'Alice']
103+
-- -U[id:1, name:'Alice']
104+
-- +U[id:1, name:'Alice2']
105+
-- -D[id:2, name:'Bob']
106+
107+
-- Table state after all events:
108+
-- | id | name |
109+
-- |----|--------|
110+
-- | 1 | Alice2 |
111+
```
112+
113+
#### Custom operation column name
114+
115+
```sql
116+
-- Source schema: id INT, operation STRING, name STRING
117+
SELECT * FROM FROM_CHANGELOG(
118+
input => TABLE cdc_stream,
119+
op => DESCRIPTOR(operation)
120+
)
121+
-- The operation column named 'operation' is used instead of 'op'
122+
```
123+
124+
#### Table API
125+
126+
```java
127+
Table cdcStream = ...;
128+
129+
// Default: reads 'op' column with standard change operation names
130+
Table result = cdcStream.fromChangelog();
131+
132+
// With custom op column name
133+
Table result = cdcStream.fromChangelog(
134+
descriptor("operation").asArgument("op")
135+
);
136+
137+
// With custom op_mapping
138+
Table result = cdcStream.fromChangelog(
139+
descriptor("op").asArgument("op"),
140+
map("c, r", "INSERT",
141+
"ub", "UPDATE_BEFORE",
142+
"ua", "UPDATE_AFTER",
143+
"d", "DELETE").asArgument("op_mapping")
144+
);
145+
```
36146

37147
## TO_CHANGELOG
38148

flink-python/pyflink/table/tests/test_table_completeness.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@ def excluded_methods(cls):
4242
'asArgument',
4343
'process',
4444
'partitionBy',
45+
'fromChangelog',
4546
}
4647

4748
@classmethod

flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/Table.java

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1454,4 +1454,31 @@ default TableResult executeInsert(
14541454
* @return an append-only {@link Table} with an {@code op} column prepended to the input columns
14551455
*/
14561456
Table toChangelog(Expression... arguments);
1457+
1458+
/**
1459+
* Converts this append-only table with an explicit operation code column into a dynamic table
1460+
* using the built-in {@code FROM_CHANGELOG} process table function.
1461+
*
1462+
* <p>Each input row is expected to have a string operation code column (default: {@code "op"})
1463+
* that indicates the change operation (e.g., INSERT, UPDATE_AFTER, UPDATE_BEFORE, DELETE). The
1464+
* output table is a dynamic table backed by a changelog stream.
1465+
*
1466+
* <p>Optional arguments can be passed using named expressions:
1467+
*
1468+
* <pre>{@code
1469+
* // Default: reads 'op' column with standard change operation names
1470+
* table.fromChangelog();
1471+
*
1472+
* // Custom op column name and mapping (Debezium-style codes)
1473+
* table.fromChangelog(
1474+
* descriptor("__op").asArgument("op"),
1475+
* map("c, r", "INSERT", "u", "UPDATE_AFTER", "d", "DELETE").asArgument("op_mapping")
1476+
* );
1477+
* }</pre>
1478+
*
1479+
* @param arguments optional named arguments for {@code op} and {@code op_mapping}
1480+
* @return a dynamic {@link Table} with the op column removed and proper change operation
1481+
* semantics
1482+
*/
1483+
Table fromChangelog(Expression... arguments);
14571484
}

flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableImpl.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -497,6 +497,11 @@ public PartitionedTable partitionBy(Expression... fields) {
497497
return new PartitionedTableImpl(this, Arrays.asList(fields));
498498
}
499499

500+
@Override
501+
public Table fromChangelog(Expression... arguments) {
502+
return process(BuiltInFunctionDefinitions.FROM_CHANGELOG.getName(), (Object[]) arguments);
503+
}
504+
500505
@Override
501506
public ApiExpression asArgument(String name) {
502507
return createArgumentExpression(operationTree, tableEnvironment, name);

flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinition.java

Lines changed: 26 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,8 @@ public final class BuiltInFunctionDefinition implements SpecializedFunction {
7272

7373
private final SqlCallSyntax sqlCallSyntax;
7474

75+
private final @Nullable ChangelogModeStrategy changelogModeStrategy;
76+
7577
private final String sqlName;
7678

7779
private BuiltInFunctionDefinition(
@@ -84,7 +86,8 @@ private BuiltInFunctionDefinition(
8486
boolean isDeterministic,
8587
boolean isRuntimeProvided,
8688
String runtimeClass,
87-
boolean isInternal) {
89+
boolean isInternal,
90+
@Nullable ChangelogModeStrategy changelogModeStrategy) {
8891
this.name = checkNotNull(name, "Name must not be null.");
8992
this.sqlName = sqlName;
9093
this.version = isInternal ? null : version;
@@ -95,6 +98,7 @@ private BuiltInFunctionDefinition(
9598
this.runtimeClass = runtimeClass;
9699
this.isInternal = isInternal;
97100
this.sqlCallSyntax = sqlCallSyntax;
101+
this.changelogModeStrategy = changelogModeStrategy;
98102
validateFunction(this.name, this.version, this.isInternal);
99103
}
100104

@@ -131,6 +135,14 @@ public boolean isInternal() {
131135
return isInternal;
132136
}
133137

138+
/**
139+
* Returns the optional {@link ChangelogModeStrategy} for built-in PTFs that emit updates (e.g.,
140+
* FROM_CHANGELOG). The planner uses this to determine the output changelog mode.
141+
*/
142+
public Optional<ChangelogModeStrategy> getChangelogModeStrategy() {
143+
return Optional.ofNullable(changelogModeStrategy);
144+
}
145+
134146
public String getQualifiedName() {
135147
if (isInternal) {
136148
return name;
@@ -253,6 +265,8 @@ public static final class Builder {
253265

254266
private SqlCallSyntax sqlCallSyntax = SqlCallSyntax.FUNCTION;
255267

268+
private @Nullable ChangelogModeStrategy changelogModeStrategy;
269+
256270
public Builder() {
257271
// default constructor to allow a fluent definition
258272
}
@@ -399,6 +413,15 @@ public Builder sqlName(String name) {
399413
return this;
400414
}
401415

416+
/**
417+
* Sets the {@link ChangelogModeStrategy} that determines the output changelog mode for this
418+
* built-in PTF. Only needed for PTFs that emit updates (e.g., FROM_CHANGELOG).
419+
*/
420+
public Builder changelogModeStrategy(ChangelogModeStrategy changelogModeStrategy) {
421+
this.changelogModeStrategy = changelogModeStrategy;
422+
return this;
423+
}
424+
402425
public BuiltInFunctionDefinition build() {
403426
return new BuiltInFunctionDefinition(
404427
name,
@@ -410,7 +433,8 @@ public BuiltInFunctionDefinition build() {
410433
isDeterministic,
411434
isRuntimeProvided,
412435
runtimeClass,
413-
isInternal);
436+
isInternal,
437+
changelogModeStrategy);
414438
}
415439
}
416440
}

flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
import org.apache.flink.table.api.JsonType;
2828
import org.apache.flink.table.api.JsonValueOnEmptyOrError;
2929
import org.apache.flink.table.api.TableException;
30+
import org.apache.flink.table.connector.ChangelogMode;
3031
import org.apache.flink.table.expressions.TimeIntervalUnit;
3132
import org.apache.flink.table.expressions.TimePointUnit;
3233
import org.apache.flink.table.expressions.ValueLiteralExpression;
@@ -106,6 +107,7 @@
106107
import static org.apache.flink.table.types.inference.TypeStrategies.varyingString;
107108
import static org.apache.flink.table.types.inference.strategies.SpecificInputTypeStrategies.ARRAY_ELEMENT_ARG;
108109
import static org.apache.flink.table.types.inference.strategies.SpecificInputTypeStrategies.ARRAY_FULLY_COMPARABLE;
110+
import static org.apache.flink.table.types.inference.strategies.SpecificInputTypeStrategies.FROM_CHANGELOG_INPUT_TYPE_STRATEGY;
109111
import static org.apache.flink.table.types.inference.strategies.SpecificInputTypeStrategies.INDEX;
110112
import static org.apache.flink.table.types.inference.strategies.SpecificInputTypeStrategies.JSON_ARGUMENT;
111113
import static org.apache.flink.table.types.inference.strategies.SpecificInputTypeStrategies.ML_PREDICT_INPUT_TYPE_STRATEGY;
@@ -115,6 +117,7 @@
115117
import static org.apache.flink.table.types.inference.strategies.SpecificInputTypeStrategies.percentage;
116118
import static org.apache.flink.table.types.inference.strategies.SpecificInputTypeStrategies.percentageArray;
117119
import static org.apache.flink.table.types.inference.strategies.SpecificTypeStrategies.ARRAY_APPEND_PREPEND;
120+
import static org.apache.flink.table.types.inference.strategies.SpecificTypeStrategies.FROM_CHANGELOG_OUTPUT_TYPE_STRATEGY;
118121
import static org.apache.flink.table.types.inference.strategies.SpecificTypeStrategies.ML_PREDICT_OUTPUT_TYPE_STRATEGY;
119122
import static org.apache.flink.table.types.inference.strategies.SpecificTypeStrategies.TO_CHANGELOG_OUTPUT_TYPE_STRATEGY;
120123

@@ -809,6 +812,30 @@ ANY, and(logical(LogicalTypeRoot.BOOLEAN), LITERAL)
809812
"org.apache.flink.table.runtime.functions.ptf.ToChangelogFunction")
810813
.build();
811814

815+
public static final BuiltInFunctionDefinition FROM_CHANGELOG =
816+
BuiltInFunctionDefinition.newBuilder()
817+
.name("FROM_CHANGELOG")
818+
.kind(PROCESS_TABLE)
819+
.staticArguments(
820+
StaticArgument.table(
821+
"input",
822+
Row.class,
823+
false,
824+
EnumSet.of(
825+
StaticArgumentTrait.TABLE,
826+
StaticArgumentTrait.ROW_SEMANTIC_TABLE)),
827+
StaticArgument.scalar("op", DataTypes.DESCRIPTOR(), true),
828+
StaticArgument.scalar(
829+
"op_mapping",
830+
DataTypes.MAP(DataTypes.STRING(), DataTypes.STRING()),
831+
true))
832+
.changelogModeStrategy(ctx -> ChangelogMode.all())
833+
.inputTypeStrategy(FROM_CHANGELOG_INPUT_TYPE_STRATEGY)
834+
.outputTypeStrategy(FROM_CHANGELOG_OUTPUT_TYPE_STRATEGY)
835+
.runtimeClass(
836+
"org.apache.flink.table.runtime.functions.ptf.FromChangelogFunction")
837+
.build();
838+
812839
public static final BuiltInFunctionDefinition GREATEST =
813840
BuiltInFunctionDefinition.newBuilder()
814841
.name("GREATEST")
Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.flink.table.functions;
20+
21+
import org.apache.flink.annotation.Internal;
22+
import org.apache.flink.table.connector.ChangelogMode;
23+
import org.apache.flink.table.functions.ChangelogFunction.ChangelogContext;
24+
import org.apache.flink.table.types.inference.TypeStrategy;
25+
26+
/**
27+
* Strategy for determining the output {@link ChangelogMode} of a built-in process table function.
28+
*
29+
* <p>Similar to {@link TypeStrategy}, this is used to declare changelog semantics in the function
30+
* definition rather than implementing the {@link ChangelogFunction} interface.
31+
*/
32+
@Internal
33+
public interface ChangelogModeStrategy {
34+
35+
/** Infers the output {@link ChangelogMode} based on the given {@link ChangelogContext}. */
36+
ChangelogMode inferChangelogMode(ChangelogContext changelogContext);
37+
}

0 commit comments

Comments
 (0)