Skip to content

Commit bcb3682

Browse files
[FLINK-39419][table] Switch TO_CHANGELOG to row semantics with full deletes + require update before
This closes #27911.
1 parent 5c2a4cc commit bcb3682

13 files changed

Lines changed: 187 additions & 174 deletions

File tree

docs/content/docs/sql/reference/queries/changelog.md

Lines changed: 16 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@ This is useful when you need to materialize changelog events into a downstream s
4444

4545
```sql
4646
SELECT * FROM TO_CHANGELOG(
47-
input => TABLE source_table PARTITION BY key_col,
47+
input => TABLE source_table,
4848
[op => DESCRIPTOR(op_column_name),]
4949
[op_mapping => MAP['INSERT', 'I', 'DELETE', 'D', ...]]
5050
)
@@ -54,7 +54,7 @@ SELECT * FROM TO_CHANGELOG(
5454

5555
| Parameter | Required | Description |
5656
|:-------------|:---------|:------------|
57-
| `input` | Yes | The input table. Must include `PARTITION BY` for parallel execution. Accepts insert-only, retract, and upsert tables. |
57+
| `input` | Yes | The input table. Accepts insert-only, retract, and upsert tables. |
5858
| `op` | No | A `DESCRIPTOR` with a single column name for the operation code column. Defaults to `op`. |
5959
| `op_mapping` | No | A `MAP<STRING, STRING>` mapping change operation names to custom output codes. Keys can contain comma-separated names to map multiple operations to the same code (e.g., `'INSERT, UPDATE_AFTER'`). When provided, only mapped operations are forwarded - unmapped events are dropped. Each change operation may appear at most once across all entries. |
6060

@@ -74,7 +74,7 @@ When `op_mapping` is omitted, all four change operations are mapped to their sta
7474
The output columns are ordered as:
7575

7676
```
77-
[partition_key_columns, op_column, remaining_columns]
77+
[op_column, all_input_columns]
7878
```
7979

8080
All output rows have `INSERT` - the table is always append-only.
@@ -85,25 +85,25 @@ All output rows have `INSERT` - the table is always append-only.
8585

8686
```sql
8787
-- Input: retract table from an aggregation
88-
-- +I[id:1, name:'Alice', cnt:1]
89-
-- +U[id:1, name:'Alice', cnt:2]
90-
-- -D[id:2, name:'Bob', cnt:1]
88+
-- +I[name:'Alice', cnt:1]
89+
-- +U[name:'Alice', cnt:2]
90+
-- -D[name:'Bob', cnt:1]
9191

9292
SELECT * FROM TO_CHANGELOG(
93-
input => TABLE my_aggregation PARTITION BY id
93+
input => TABLE my_aggregation
9494
)
9595

9696
-- Output (append-only):
97-
-- +I[id:1, op:'INSERT', name:'Alice', cnt:1]
98-
-- +I[id:1, op:'UPDATE_AFTER', name:'Alice', cnt:2]
99-
-- +I[id:2, op:'DELETE', name:'Bob', cnt:1]
97+
-- +I[op:'INSERT', name:'Alice', cnt:1]
98+
-- +I[op:'UPDATE_AFTER', name:'Alice', cnt:2]
99+
-- +I[op:'DELETE', name:'Bob', cnt:1]
100100
```
101101

102102
#### Custom operation column name
103103

104104
```sql
105105
SELECT * FROM TO_CHANGELOG(
106-
input => TABLE my_aggregation PARTITION BY id,
106+
input => TABLE my_aggregation,
107107
op => DESCRIPTOR(operation)
108108
)
109109
-- The op column is now named 'operation' instead of 'op'
@@ -113,7 +113,7 @@ SELECT * FROM TO_CHANGELOG(
113113

114114
```sql
115115
SELECT * FROM TO_CHANGELOG(
116-
input => TABLE my_aggregation PARTITION BY id,
116+
input => TABLE my_aggregation,
117117
op => DESCRIPTOR(op_code),
118118
op_mapping => MAP['INSERT', 'I', 'UPDATE_AFTER', 'U']
119119
)
@@ -126,7 +126,7 @@ SELECT * FROM TO_CHANGELOG(
126126

127127
```sql
128128
SELECT * FROM TO_CHANGELOG(
129-
input => TABLE my_aggregation PARTITION BY id,
129+
input => TABLE my_aggregation,
130130
op => DESCRIPTOR(deleted),
131131
op_mapping => MAP['INSERT, UPDATE_AFTER', 'false', 'DELETE', 'true']
132132
)
@@ -139,16 +139,16 @@ SELECT * FROM TO_CHANGELOG(
139139

140140
```java
141141
// Default: adds 'op' column and supports all changelog modes
142-
Table result = myTable.partitionBy($("id")).toChangelog();
142+
Table result = myTable.toChangelog();
143143

144144
// With custom parameters
145-
Table result = myTable.partitionBy($("id")).toChangelog(
145+
Table result = myTable.toChangelog(
146146
descriptor("op_code").asArgument("op"),
147147
map("INSERT", "I", "UPDATE_AFTER", "U").asArgument("op_mapping")
148148
);
149149

150150
// Deletion flag pattern: comma-separated keys map multiple change operations to the same code
151-
Table result = myTable.partitionBy($("id")).toChangelog(
151+
Table result = myTable.toChangelog(
152152
descriptor("deleted").asArgument("op"),
153153
map("INSERT, UPDATE_AFTER", "false", "DELETE", "true").asArgument("op_mapping")
154154
);

flink-python/pyflink/table/tests/test_table_completeness.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@ def excluded_methods(cls):
4242
'asArgument',
4343
'process',
4444
'partitionBy',
45+
'toChangelog',
4546
}
4647

4748
@classmethod

flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/PartitionedTable.java

Lines changed: 0 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@
2020

2121
import org.apache.flink.annotation.PublicEvolving;
2222
import org.apache.flink.table.annotation.ArgumentTrait;
23-
import org.apache.flink.table.expressions.Expression;
2423
import org.apache.flink.table.functions.ProcessTableFunction;
2524
import org.apache.flink.table.functions.UserDefinedFunction;
2625

@@ -139,37 +138,4 @@ public interface PartitionedTable {
139138
* @see ProcessTableFunction
140139
*/
141140
Table process(Class<? extends UserDefinedFunction> function, Object... arguments);
142-
143-
/**
144-
* Converts this dynamic table into an append-only table with an explicit operation code column
145-
* using the built-in {@code TO_CHANGELOG} process table function.
146-
*
147-
* <p>Each input row - regardless of its original change operation - is emitted as an
148-
* INSERT-only row with a string {@code "op"} column indicating the original operation (INSERT,
149-
* UPDATE_AFTER, DELETE, etc.).
150-
*
151-
* <p>Optional arguments can be passed using named expressions:
152-
*
153-
* <pre>{@code
154-
* // Default: adds 'op' column and supports all changelog modes
155-
* table.partitionBy($("id")).toChangelog();
156-
*
157-
* // Custom op column name and mapping
158-
* table.partitionBy($("id")).toChangelog(
159-
* descriptor("op_code").asArgument("op"),
160-
* map("INSERT", "I", "UPDATE_AFTER", "U").asArgument("op_mapping")
161-
* );
162-
*
163-
* // Deletion flag pattern: comma-separated keys map multiple change operations to the same code
164-
* table.partitionBy($("id")).toChangelog(
165-
* descriptor("deleted").asArgument("op"),
166-
* map("INSERT, UPDATE_AFTER", "false", "DELETE", "true").asArgument("op_mapping")
167-
* );
168-
* }</pre>
169-
*
170-
* @param arguments optional named arguments for {@code op} and {@code op_mapping}
171-
* @return an append-only {@link Table} with an {@code op} column prepended to the non-partition
172-
* columns
173-
*/
174-
Table toChangelog(Expression... arguments);
175141
}

flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/Table.java

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1422,4 +1422,36 @@ default TableResult executeInsert(
14221422
* @see ProcessTableFunction
14231423
*/
14241424
Table process(Class<? extends UserDefinedFunction> function, Object... arguments);
1425+
1426+
/**
1427+
* Converts this dynamic table into an append-only table with an explicit operation code column
1428+
* using the built-in {@code TO_CHANGELOG} process table function.
1429+
*
1430+
* <p>Each input row - regardless of its original change operation - is emitted as an
1431+
* INSERT-only row with a string {@code "op"} column indicating the original operation (INSERT,
1432+
* UPDATE_AFTER, DELETE, etc.).
1433+
*
1434+
* <p>Optional arguments can be passed using named expressions:
1435+
*
1436+
* <pre>{@code
1437+
* // Default: adds 'op' column and supports all changelog modes
1438+
* table.toChangelog();
1439+
*
1440+
* // Custom op column name and mapping
1441+
* table.toChangelog(
1442+
* descriptor("op_code").asArgument("op"),
1443+
* map("INSERT", "I", "UPDATE_AFTER", "U").asArgument("op_mapping")
1444+
* );
1445+
*
1446+
* // Deletion flag pattern: comma-separated keys map multiple change operations to the same code
1447+
* table.toChangelog(
1448+
* descriptor("deleted").asArgument("op"),
1449+
* map("INSERT, UPDATE_AFTER", "false", "DELETE", "true").asArgument("op_mapping")
1450+
* );
1451+
* }</pre>
1452+
*
1453+
* @param arguments optional named arguments for {@code op} and {@code op_mapping}
1454+
* @return an append-only {@link Table} with an {@code op} column prepended to the input columns
1455+
*/
1456+
Table toChangelog(Expression... arguments);
14251457
}

flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableImpl.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -514,6 +514,11 @@ public Table process(Class<? extends UserDefinedFunction> function, Object... ar
514514
function, unionTableAndArguments(operationTree, tableEnvironment, arguments));
515515
}
516516

517+
@Override
518+
public Table toChangelog(Expression... arguments) {
519+
return process(BuiltInFunctionDefinitions.TO_CHANGELOG.getName(), (Object[]) arguments);
520+
}
521+
517522
private TablePipeline insertInto(
518523
ContextResolvedTable contextResolvedTable,
519524
@Nullable InsertConflictStrategy conflictStrategy,
@@ -901,11 +906,6 @@ public Table process(Class<? extends UserDefinedFunction> function, Object... ar
901906
createPartitionQueryOperation(), table.tableEnvironment, arguments));
902907
}
903908

904-
@Override
905-
public Table toChangelog(Expression... arguments) {
906-
return process(BuiltInFunctionDefinitions.TO_CHANGELOG.getName(), (Object[]) arguments);
907-
}
908-
909909
private QueryOperation createPartitionQueryOperation() {
910910
return table.operationTreeBuilder.partition(partitionKeys, table.operationTree);
911911
}

flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -782,15 +782,22 @@ ANY, and(logical(LogicalTypeRoot.BOOLEAN), LITERAL)
782782
.name("TO_CHANGELOG")
783783
.kind(PROCESS_TABLE)
784784
.staticArguments(
785+
// Row semantics (no PARTITION BY). Accepts updating
786+
// inputs. The planner inserts ChangelogNormalize for
787+
// upsert sources to produce UPDATE_BEFORE and full
788+
// DELETE rows.
785789
StaticArgument.table(
786790
"input",
787791
Row.class,
788792
false,
789793
EnumSet.of(
790794
StaticArgumentTrait.TABLE,
791-
StaticArgumentTrait.SET_SEMANTIC_TABLE,
795+
StaticArgumentTrait.ROW_SEMANTIC_TABLE,
792796
StaticArgumentTrait.SUPPORT_UPDATES,
793-
StaticArgumentTrait.REQUIRE_UPDATE_BEFORE)),
797+
StaticArgumentTrait.REQUIRE_UPDATE_BEFORE,
798+
// Not strictly necessary but explicitly state that
799+
// we require full deletes.
800+
StaticArgumentTrait.REQUIRE_FULL_DELETE)),
794801
StaticArgument.scalar("op", DataTypes.DESCRIPTOR(), true),
795802
StaticArgument.scalar(
796803
"op_mapping",

flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/ToChangelogTypeStrategy.java

Lines changed: 1 addition & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -40,8 +40,6 @@
4040
import java.util.Map;
4141
import java.util.Optional;
4242
import java.util.Set;
43-
import java.util.stream.Collectors;
44-
import java.util.stream.IntStream;
4543

4644
/** Type strategies for the {@code TO_CHANGELOG} process table function. */
4745
@Internal
@@ -98,15 +96,10 @@ public List<Signature> getExpectedSignatures(final FunctionDefinition definition
9896

9997
final String opColumnName = resolveOpColumnName(callContext);
10098
final List<Field> inputFields = DataType.getFields(semantics.dataType());
101-
final Set<Integer> partitionKeys = intArrayToSet(semantics.partitionByColumns());
10299

103100
final List<Field> outputFields = new ArrayList<>();
104101
outputFields.add(DataTypes.FIELD(opColumnName, DataTypes.STRING()));
105-
for (int i = 0; i < inputFields.size(); i++) {
106-
if (!partitionKeys.contains(i)) {
107-
outputFields.add(inputFields.get(i));
108-
}
109-
}
102+
outputFields.addAll(inputFields);
110103

111104
return Optional.of(DataTypes.ROW(outputFields).notNull());
112105
};
@@ -199,9 +192,5 @@ private static String resolveOpColumnName(final CallContext callContext) {
199192
.orElse(DEFAULT_OP_COLUMN_NAME);
200193
}
201194

202-
private static Set<Integer> intArrayToSet(final int[] array) {
203-
return IntStream.of(array).boxed().collect(Collectors.toSet());
204-
}
205-
206195
private ToChangelogTypeStrategy() {}
207196
}

flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/inference/CallBindingCallContext.java

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -368,9 +368,11 @@ private static ColumnList convertColumnList(List<SqlNode> operands) {
368368
final Map<String, String> map = new LinkedHashMap<>();
369369
try {
370370
for (int i = 0; i < operands.size(); i += 2) {
371-
final String key = SqlLiteral.unchain(operands.get(i)).getValueAs(String.class);
371+
final String key =
372+
SqlLiteral.unchain(unwrapCast(operands.get(i))).getValueAs(String.class);
372373
final String value =
373-
SqlLiteral.unchain(operands.get(i + 1)).getValueAs(String.class);
374+
SqlLiteral.unchain(unwrapCast(operands.get(i + 1)))
375+
.getValueAs(String.class);
374376
map.put(key, value);
375377
}
376378
} catch (Exception e) {
@@ -380,6 +382,18 @@ private static ColumnList convertColumnList(List<SqlNode> operands) {
380382
return map;
381383
}
382384

385+
/** Unwraps implicit CHAR-type CASTs added by Calcite for length normalization. */
386+
private static SqlNode unwrapCast(final SqlNode node) {
387+
if (node.getKind() == SqlKind.CAST && node instanceof SqlCall) {
388+
final SqlNode inner = ((SqlCall) node).operand(0);
389+
if (inner instanceof SqlLiteral
390+
&& SqlTypeName.CHAR_TYPES.contains(((SqlLiteral) inner).getTypeName())) {
391+
return inner;
392+
}
393+
}
394+
return node;
395+
}
396+
383397
/** A MAP constructor is a string literal if all its key-value children are string literals. */
384398
private static boolean isLiteralMap(SqlNode sqlNode) {
385399
if (sqlNode.getKind() != SqlKind.MAP_VALUE_CONSTRUCTOR) {

flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ToChangelogSemanticTests.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,13 +41,13 @@ public List<TableTestProgram> programs() {
4141
return List.of(
4242
ToChangelogTestPrograms.INSERT_ONLY_INPUT,
4343
ToChangelogTestPrograms.UPDATING_INPUT,
44+
ToChangelogTestPrograms.UPSERT_INPUT,
4445
ToChangelogTestPrograms.CUSTOM_OP_MAPPING,
4546
ToChangelogTestPrograms.CUSTOM_OP_NAME,
4647
ToChangelogTestPrograms.TABLE_API_DEFAULT,
4748
ToChangelogTestPrograms.LAG_ON_UPSERT_VIA_CHANGELOG,
4849
ToChangelogTestPrograms.LAG_ON_RETRACT_VIA_CHANGELOG,
4950
ToChangelogTestPrograms.DELETION_FLAG,
50-
ToChangelogTestPrograms.MISSING_PARTITION_BY,
5151
ToChangelogTestPrograms.INVALID_DESCRIPTOR,
5252
ToChangelogTestPrograms.INVALID_OP_MAPPING,
5353
ToChangelogTestPrograms.DUPLICATE_ROW_KIND);

0 commit comments

Comments
 (0)