Skip to content

Commit fb58909

Browse files
committed
[FLINK-39261][table] Add FROM_CHANGELOG built-in process table function
1 parent 09efcb4 commit fb58909

20 files changed

Lines changed: 1226 additions & 11 deletions

File tree

docs/content/docs/sql/reference/queries/changelog.md

Lines changed: 95 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,9 +30,103 @@ Flink SQL provides built-in process table functions (PTFs) for working with chan
3030

3131
| Function | Description |
3232
|:---------|:------------|
33+
| [FROM_CHANGELOG](#from_changelog) | Converts an append-only table with operation codes into a (potentially updating) dynamic table |
3334
| [TO_CHANGELOG](#to_changelog) | Converts a dynamic table into an append-only table with explicit operation codes |
3435

35-
<!-- Placeholder for future FROM_CHANGELOG function -->
36+
## FROM_CHANGELOG
37+
38+
The `FROM_CHANGELOG` PTF converts an append-only table with an explicit operation code column into a (potentially updating) dynamic table. Each input row is expected to have a string column that indicates the change operation. The operation column is interpreted by the engine and removed from the output.
39+
40+
This is useful when consuming Change Data Capture (CDC) streams from systems like Debezium where events arrive as flat append-only records with an explicit operation field. It's also useful to be used in combination with the TO_CHANGELOG function, when converting the append-only table back into an updating table after doing some specific transformation to the events.
41+
42+
Note: This version requires that your CDC data encodes updates using a full image (i.e. providing separate events for before and after the update). Please double-check whether your source provides both UPDATE_BEFORE and UPDATE_AFTER events. FROM_CHANGELOG is a very powerful function but might produce incorrect results in subsequent operations and tables, if not configured correctly.
43+
44+
### Syntax
45+
46+
```sql
47+
SELECT * FROM FROM_CHANGELOG(
48+
input => TABLE source_table,
49+
[op => DESCRIPTOR(op_column_name),]
50+
[op_mapping => MAP[
51+
'c, r', 'INSERT',
52+
'ub', 'UPDATE_BEFORE',
53+
'ua', 'UPDATE_AFTER',
54+
'd', 'DELETE'
55+
]]
56+
)
57+
```
58+
59+
### Parameters
60+
61+
| Parameter | Required | Description |
62+
|:-------------|:---------|:------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
63+
| `input` | Yes | The input table. Must be append-only. |
64+
| `op` | No | A `DESCRIPTOR` with a single column name for the operation code column. Defaults to `op`. The column must exist in the input table and be of type STRING. |
65+
| `op_mapping` | No | A `MAP<STRING, STRING>` mapping user-defined codes to Flink change operation names. Keys are user-defined codes (e.g., `'c'`, `'u'`, `'d'`), values are Flink change operation names (`INSERT`, `UPDATE_BEFORE`, `UPDATE_AFTER`, `DELETE`). Keys can contain comma-separated codes to map multiple codes to the same operation (e.g., `'c, r'`). When provided, only mapped codes are forwarded - unmapped codes are dropped. Each change operation may appear at most once across all entries. |
66+
67+
#### Default op_mapping
68+
69+
When `op_mapping` is omitted, the following standard names are used. They allow a reverse conversion from TO_CHANGELOG by default.
70+
71+
| Input code | Change operation |
72+
|:-------------------|:------------------|
73+
| `'INSERT'` | INSERT |
74+
| `'UPDATE_BEFORE'` | UPDATE_BEFORE |
75+
| `'UPDATE_AFTER'` | UPDATE_AFTER |
76+
| `'DELETE'` | DELETE |
77+
78+
### Output Schema
79+
80+
The output contains all input columns except the operation code (e.g., op) column, which is interpreted by Flink's SQL engine and removed. Each output row carries the appropriate change operation (INSERT, UPDATE_BEFORE, UPDATE_AFTER, or DELETE).
81+
82+
```
83+
[all_input_columns_without_op]
84+
```
85+
86+
### Examples
87+
88+
#### Basic usage with standard op names
89+
90+
```sql
91+
-- Input (append-only):
92+
-- +I[id:1, op:'INSERT', name:'Alice']
93+
-- +I[id:1, op:'UPDATE_BEFORE', name:'Alice']
94+
-- +I[id:1, op:'UPDATE_AFTER', name:'Alice2']
95+
-- +I[id:2, op:'DELETE', name:'Bob']
96+
97+
SELECT * FROM FROM_CHANGELOG(
98+
input => TABLE cdc_stream
99+
)
100+
101+
-- Output (updating table):
102+
-- +I[id:1, name:'Alice']
103+
-- -U[id:1, name:'Alice']
104+
-- +U[id:1, name:'Alice2']
105+
-- -D[id:2, name:'Bob']
106+
107+
-- Table state after all events:
108+
-- | id | name |
109+
-- |----|--------|
110+
-- | 1 | Alice2 |
111+
```
112+
113+
#### Custom operation column name
114+
115+
```sql
116+
-- Source schema: id INT, operation STRING, name STRING
117+
SELECT * FROM FROM_CHANGELOG(
118+
input => TABLE cdc_stream,
119+
op => DESCRIPTOR(operation)
120+
)
121+
-- The operation column named 'operation' is used instead of 'op'
122+
```
123+
124+
#### Table API
125+
126+
```java
127+
// Default: reads 'op' column with standard change operation names
128+
Table result = cdcStream.fromChangelog();
129+
```
36130

37131
## TO_CHANGELOG
38132

flink-python/pyflink/table/tests/test_table_completeness.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@ def excluded_methods(cls):
4242
'asArgument',
4343
'process',
4444
'partitionBy',
45+
'fromChangelog',
4546
}
4647

4748
@classmethod

flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/Table.java

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1454,4 +1454,31 @@ default TableResult executeInsert(
14541454
* @return an append-only {@link Table} with an {@code op} column prepended to the input columns
14551455
*/
14561456
Table toChangelog(Expression... arguments);
1457+
1458+
/**
1459+
* Converts this append-only table with an explicit operation code column into a dynamic table
1460+
* using the built-in {@code FROM_CHANGELOG} process table function.
1461+
*
1462+
* <p>Each input row is expected to have a string operation code column (default: {@code "op"})
1463+
* that indicates the change operation (e.g., INSERT, UPDATE_AFTER, UPDATE_BEFORE, DELETE). The
1464+
* output table is a dynamic table backed by a changelog stream.
1465+
*
1466+
* <p>Optional arguments can be passed using named expressions:
1467+
*
1468+
* <pre>{@code
1469+
* // Default: reads 'op' column with standard change operation names
1470+
* table.fromChangelog();
1471+
*
1472+
* // Custom op column name and mapping (Debezium-style codes)
1473+
* table.fromChangelog(
1474+
* descriptor("__op").asArgument("op"),
1475+
* map("c, r", "INSERT", "u", "UPDATE_AFTER", "d", "DELETE").asArgument("op_mapping")
1476+
* );
1477+
* }</pre>
1478+
*
1479+
* @param arguments optional named arguments for {@code op} and {@code op_mapping}
1480+
* @return a dynamic {@link Table} with the op column removed and proper change operation
1481+
* semantics
1482+
*/
1483+
Table fromChangelog(Expression... arguments);
14571484
}

flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableImpl.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -497,6 +497,11 @@ public PartitionedTable partitionBy(Expression... fields) {
497497
return new PartitionedTableImpl(this, Arrays.asList(fields));
498498
}
499499

500+
@Override
501+
public Table fromChangelog(Expression... arguments) {
502+
return process(BuiltInFunctionDefinitions.FROM_CHANGELOG.getName(), (Object[]) arguments);
503+
}
504+
500505
@Override
501506
public ApiExpression asArgument(String name) {
502507
return createArgumentExpression(operationTree, tableEnvironment, name);

flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinition.java

Lines changed: 30 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,8 @@
2121
import org.apache.flink.annotation.Internal;
2222
import org.apache.flink.table.api.TableException;
2323
import org.apache.flink.table.catalog.DataTypeFactory;
24+
import org.apache.flink.table.connector.ChangelogMode;
25+
import org.apache.flink.table.functions.ChangelogFunction.ChangelogContext;
2426
import org.apache.flink.table.types.DataType;
2527
import org.apache.flink.table.types.inference.InputTypeStrategy;
2628
import org.apache.flink.table.types.inference.StaticArgument;
@@ -33,6 +35,7 @@
3335
import java.util.Arrays;
3436
import java.util.Locale;
3537
import java.util.Optional;
38+
import java.util.function.Function;
3639
import java.util.regex.Pattern;
3740

3841
import static org.apache.flink.util.Preconditions.checkArgument;
@@ -72,6 +75,8 @@ public final class BuiltInFunctionDefinition implements SpecializedFunction {
7275

7376
private final SqlCallSyntax sqlCallSyntax;
7477

78+
private final @Nullable Function<ChangelogContext, ChangelogMode> changelogModeResolver;
79+
7580
private final String sqlName;
7681

7782
private BuiltInFunctionDefinition(
@@ -84,7 +89,8 @@ private BuiltInFunctionDefinition(
8489
boolean isDeterministic,
8590
boolean isRuntimeProvided,
8691
String runtimeClass,
87-
boolean isInternal) {
92+
boolean isInternal,
93+
@Nullable Function<ChangelogContext, ChangelogMode> changelogModeResolver) {
8894
this.name = checkNotNull(name, "Name must not be null.");
8995
this.sqlName = sqlName;
9096
this.version = isInternal ? null : version;
@@ -95,6 +101,7 @@ private BuiltInFunctionDefinition(
95101
this.runtimeClass = runtimeClass;
96102
this.isInternal = isInternal;
97103
this.sqlCallSyntax = sqlCallSyntax;
104+
this.changelogModeResolver = changelogModeResolver;
98105
validateFunction(this.name, this.version, this.isInternal);
99106
}
100107

@@ -131,6 +138,14 @@ public boolean isInternal() {
131138
return isInternal;
132139
}
133140

141+
/**
142+
* Returns the optional changelog mode resolver for built-in PTFs that emit updates (e.g.,
143+
* FROM_CHANGELOG). The planner uses this to determine the output changelog mode.
144+
*/
145+
public Optional<Function<ChangelogContext, ChangelogMode>> getChangelogModeResolver() {
146+
return Optional.ofNullable(changelogModeResolver);
147+
}
148+
134149
public String getQualifiedName() {
135150
if (isInternal) {
136151
return name;
@@ -253,6 +268,8 @@ public static final class Builder {
253268

254269
private SqlCallSyntax sqlCallSyntax = SqlCallSyntax.FUNCTION;
255270

271+
private @Nullable Function<ChangelogContext, ChangelogMode> changelogModeResolver;
272+
256273
public Builder() {
257274
// default constructor to allow a fluent definition
258275
}
@@ -399,6 +416,16 @@ public Builder sqlName(String name) {
399416
return this;
400417
}
401418

419+
/**
420+
* Sets a resolver that determines the output {@link ChangelogMode} for this built-in PTF.
421+
* Only needed for PTFs that emit updates (e.g., FROM_CHANGELOG).
422+
*/
423+
public Builder changelogModeResolver(
424+
Function<ChangelogContext, ChangelogMode> changelogModeResolver) {
425+
this.changelogModeResolver = changelogModeResolver;
426+
return this;
427+
}
428+
402429
public BuiltInFunctionDefinition build() {
403430
return new BuiltInFunctionDefinition(
404431
name,
@@ -410,7 +437,8 @@ public BuiltInFunctionDefinition build() {
410437
isDeterministic,
411438
isRuntimeProvided,
412439
runtimeClass,
413-
isInternal);
440+
isInternal,
441+
changelogModeResolver);
414442
}
415443
}
416444
}

flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
import org.apache.flink.table.api.JsonType;
2828
import org.apache.flink.table.api.JsonValueOnEmptyOrError;
2929
import org.apache.flink.table.api.TableException;
30+
import org.apache.flink.table.connector.ChangelogMode;
3031
import org.apache.flink.table.expressions.TimeIntervalUnit;
3132
import org.apache.flink.table.expressions.TimePointUnit;
3233
import org.apache.flink.table.expressions.ValueLiteralExpression;
@@ -106,6 +107,7 @@
106107
import static org.apache.flink.table.types.inference.TypeStrategies.varyingString;
107108
import static org.apache.flink.table.types.inference.strategies.SpecificInputTypeStrategies.ARRAY_ELEMENT_ARG;
108109
import static org.apache.flink.table.types.inference.strategies.SpecificInputTypeStrategies.ARRAY_FULLY_COMPARABLE;
110+
import static org.apache.flink.table.types.inference.strategies.SpecificInputTypeStrategies.FROM_CHANGELOG_INPUT_TYPE_STRATEGY;
109111
import static org.apache.flink.table.types.inference.strategies.SpecificInputTypeStrategies.INDEX;
110112
import static org.apache.flink.table.types.inference.strategies.SpecificInputTypeStrategies.JSON_ARGUMENT;
111113
import static org.apache.flink.table.types.inference.strategies.SpecificInputTypeStrategies.ML_PREDICT_INPUT_TYPE_STRATEGY;
@@ -115,6 +117,7 @@
115117
import static org.apache.flink.table.types.inference.strategies.SpecificInputTypeStrategies.percentage;
116118
import static org.apache.flink.table.types.inference.strategies.SpecificInputTypeStrategies.percentageArray;
117119
import static org.apache.flink.table.types.inference.strategies.SpecificTypeStrategies.ARRAY_APPEND_PREPEND;
120+
import static org.apache.flink.table.types.inference.strategies.SpecificTypeStrategies.FROM_CHANGELOG_OUTPUT_TYPE_STRATEGY;
118121
import static org.apache.flink.table.types.inference.strategies.SpecificTypeStrategies.ML_PREDICT_OUTPUT_TYPE_STRATEGY;
119122
import static org.apache.flink.table.types.inference.strategies.SpecificTypeStrategies.TO_CHANGELOG_OUTPUT_TYPE_STRATEGY;
120123

@@ -809,6 +812,30 @@ ANY, and(logical(LogicalTypeRoot.BOOLEAN), LITERAL)
809812
"org.apache.flink.table.runtime.functions.ptf.ToChangelogFunction")
810813
.build();
811814

815+
public static final BuiltInFunctionDefinition FROM_CHANGELOG =
816+
BuiltInFunctionDefinition.newBuilder()
817+
.name("FROM_CHANGELOG")
818+
.kind(PROCESS_TABLE)
819+
.staticArguments(
820+
StaticArgument.table(
821+
"input",
822+
Row.class,
823+
false,
824+
EnumSet.of(
825+
StaticArgumentTrait.TABLE,
826+
StaticArgumentTrait.ROW_SEMANTIC_TABLE)),
827+
StaticArgument.scalar("op", DataTypes.DESCRIPTOR(), true),
828+
StaticArgument.scalar(
829+
"op_mapping",
830+
DataTypes.MAP(DataTypes.STRING(), DataTypes.STRING()),
831+
true))
832+
.changelogModeResolver(ctx -> ChangelogMode.all())
833+
.inputTypeStrategy(FROM_CHANGELOG_INPUT_TYPE_STRATEGY)
834+
.outputTypeStrategy(FROM_CHANGELOG_OUTPUT_TYPE_STRATEGY)
835+
.runtimeClass(
836+
"org.apache.flink.table.runtime.functions.ptf.FromChangelogFunction")
837+
.build();
838+
812839
public static final BuiltInFunctionDefinition GREATEST =
813840
BuiltInFunctionDefinition.newBuilder()
814841
.name("GREATEST")

0 commit comments

Comments
 (0)