Skip to content

Commit f75a879

Browse files
committed
[FLINK-39261][table] Address Feedbacks
1 parent fb58909 commit f75a879

7 files changed

Lines changed: 101 additions & 26 deletions

File tree

docs/content/docs/sql/reference/queries/changelog.md

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -126,6 +126,20 @@ SELECT * FROM FROM_CHANGELOG(
126126
```java
127127
// Default: reads 'op' column with standard change operation names
128128
Table result = cdcStream.fromChangelog();
129+
130+
// With custom op column name
131+
Table result = cdcStream.fromChangelog(
132+
descriptor("operation").asArgument("op")
133+
);
134+
135+
// With custom op_mapping
136+
Table result = cdcStream.fromChangelog(
137+
descriptor("op").asArgument("op"),
138+
map("c, r", "INSERT",
139+
"ub", "UPDATE_BEFORE",
140+
"ua", "UPDATE_AFTER",
141+
"d", "DELETE").asArgument("op_mapping")
142+
);
129143
```
130144

131145
## TO_CHANGELOG

flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinition.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -417,8 +417,8 @@ public Builder sqlName(String name) {
417417
}
418418

419419
/**
420-
* Sets a resolver that determines the output {@link ChangelogMode} for this built-in PTF.
421-
* Only needed for PTFs that emit updates (e.g., FROM_CHANGELOG).
420+
* Sets a resolver that determines the output {@link ChangelogMode} for this built-in
421+
* function. Only needed for functions that emit updates (e.g., FROM_CHANGELOG).
422422
*/
423423
public Builder changelogModeResolver(
424424
Function<ChangelogContext, ChangelogMode> changelogModeResolver) {

flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/FromChangelogTypeStrategy.java

Lines changed: 55 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
import org.apache.flink.table.types.inference.TypeStrategy;
3030
import org.apache.flink.table.types.logical.LogicalTypeFamily;
3131
import org.apache.flink.types.ColumnList;
32+
import org.apache.flink.types.RowKind;
3233

3334
import java.util.HashSet;
3435
import java.util.List;
@@ -79,6 +80,8 @@ public Optional<List<DataType>> inferInputTypes(
7980

8081
return Optional.of(DataTypes.ROW(outputFields).notNull());
8182
};
83+
private static final String UPDATE_BEFORE = RowKind.UPDATE_BEFORE.name();
84+
private static final String UPDATE_AFTER = RowKind.UPDATE_AFTER.name();
8285

8386
// --------------------------------------------------------------------------------------------
8487
// Helpers
@@ -87,22 +90,55 @@ public Optional<List<DataType>> inferInputTypes(
8790
@SuppressWarnings("rawtypes")
8891
private static Optional<List<DataType>> validateInputs(
8992
final CallContext callContext, final boolean throwOnFailure) {
90-
final boolean isMissingTableArg = callContext.getTableSemantics(0).isEmpty();
91-
if (isMissingTableArg) {
93+
Optional<List<DataType>> error;
94+
95+
error = validateTableArg(callContext, throwOnFailure);
96+
if (error.isPresent()) {
97+
return error;
98+
}
99+
100+
error = validateOpDescriptor(callContext, throwOnFailure);
101+
if (error.isPresent()) {
102+
return error;
103+
}
104+
105+
error = validateOpColumn(callContext, throwOnFailure);
106+
if (error.isPresent()) {
107+
return error;
108+
}
109+
110+
error = validateOpMapping(callContext, throwOnFailure);
111+
if (error.isPresent()) {
112+
return error;
113+
}
114+
115+
return Optional.of(callContext.getArgumentDataTypes());
116+
}
117+
118+
private static Optional<List<DataType>> validateTableArg(
119+
final CallContext callContext, final boolean throwOnFailure) {
120+
if (callContext.getTableSemantics(0).isEmpty()) {
92121
return callContext.fail(
93122
throwOnFailure, "First argument must be a table for FROM_CHANGELOG.");
94123
}
124+
return Optional.empty();
125+
}
95126

127+
private static Optional<List<DataType>> validateOpDescriptor(
128+
final CallContext callContext, final boolean throwOnFailure) {
96129
final Optional<ColumnList> opDescriptor = callContext.getArgumentValue(1, ColumnList.class);
97-
final boolean hasInvalidOpDescriptor =
98-
opDescriptor.isPresent() && opDescriptor.get().getNames().size() != 1;
99-
if (hasInvalidOpDescriptor) {
130+
if (opDescriptor.isPresent() && opDescriptor.get().getNames().size() != 1) {
100131
return callContext.fail(
101132
throwOnFailure,
102133
"The descriptor for argument 'op' must contain exactly one column name.");
103134
}
135+
return Optional.empty();
136+
}
137+
138+
/** Validates that the op column exists in the input schema and is of STRING type. */
139+
private static Optional<List<DataType>> validateOpColumn(
140+
final CallContext callContext, final boolean throwOnFailure) {
104141

105-
// Validate that the op column exists in the input schema and is of STRING type
106142
final TableSemantics tableSemantics = callContext.getTableSemantics(0).get();
107143
final String opColumnName = resolveOpColumnName(callContext);
108144
final List<Field> inputFields = DataType.getFields(tableSemantics.dataType());
@@ -122,7 +158,13 @@ private static Optional<List<DataType>> validateInputs(
122158
"The op column '%s' must be of STRING type, but was '%s'.",
123159
opColumnName, opField.get().getDataType().getLogicalType()));
124160
}
161+
return Optional.empty();
162+
}
125163

164+
/** Validates op_mapping is a literal and its values are valid change operation names. */
165+
@SuppressWarnings("unchecked")
166+
private static Optional<List<DataType>> validateOpMapping(
167+
final CallContext callContext, final boolean throwOnFailure) {
126168
final boolean hasMappingArgProvided = !callContext.isArgumentNull(2);
127169
final boolean isMappingArgLiteral = callContext.isArgumentLiteral(2);
128170
if (hasMappingArgProvided && !isMappingArgLiteral) {
@@ -139,18 +181,19 @@ private static Optional<List<DataType>> validateInputs(
139181
return validationError;
140182
}
141183

142-
// Retract mode requires UPDATE_BEFORE in the mapping
143184
final boolean hasUpdateBefore =
144-
mapping.values().stream().anyMatch(v -> "UPDATE_BEFORE".equals(v.trim()));
145-
if (!hasUpdateBefore) {
185+
mapping.values().stream().anyMatch(v -> UPDATE_BEFORE.equals(v.trim()));
186+
final boolean hasUpdateAfter =
187+
mapping.values().stream().anyMatch(v -> UPDATE_AFTER.equals(v.trim()));
188+
if (hasUpdateAfter && !hasUpdateBefore) {
146189
return callContext.fail(
147190
throwOnFailure,
148191
"The 'op_mapping' must include UPDATE_BEFORE for retract mode. "
149-
+ "Upsert mode (without UPDATE_BEFORE) is not supported in this version.");
192+
+ "Upsert mode (without UPDATE_BEFORE) is not supported "
193+
+ "in this version.");
150194
}
151195
}
152-
153-
return Optional.of(callContext.getArgumentDataTypes());
196+
return Optional.empty();
154197
}
155198

156199
/**

flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/inference/strategies/FromChangelogInputTypeStrategyTest.java

Lines changed: 22 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -130,9 +130,29 @@ protected Stream<TestSpec> testData() {
130130
.calledWithLiteralAt(2, Map.of("c", "INSERT", "r", "INSERT"))
131131
.expectErrorMessage("Duplicate change operation: 'INSERT'"),
132132

133-
// Error: upsert mapping (no UPDATE_BEFORE) not supported
133+
// Valid: INSERT-only mapping (append mode, no updates)
134134
TestSpec.forStrategy(
135-
"Upsert mapping not supported", FROM_CHANGELOG_INPUT_TYPE_STRATEGY)
135+
"Valid INSERT-only mapping", FROM_CHANGELOG_INPUT_TYPE_STRATEGY)
136+
.calledWithArgumentTypes(TABLE_TYPE, DESCRIPTOR_TYPE, MAP_TYPE)
137+
.calledWithTableSemanticsAt(0, new TableSemanticsMock(TABLE_TYPE))
138+
.calledWithLiteralAt(1, ColumnList.of(List.of("op")))
139+
.calledWithLiteralAt(2, Map.of("c, r", "INSERT"))
140+
.expectArgumentTypes(TABLE_TYPE, DESCRIPTOR_TYPE, MAP_TYPE),
141+
142+
// Valid: INSERT + DELETE mapping (no updates)
143+
TestSpec.forStrategy(
144+
"Valid INSERT and DELETE mapping",
145+
FROM_CHANGELOG_INPUT_TYPE_STRATEGY)
146+
.calledWithArgumentTypes(TABLE_TYPE, DESCRIPTOR_TYPE, MAP_TYPE)
147+
.calledWithTableSemanticsAt(0, new TableSemanticsMock(TABLE_TYPE))
148+
.calledWithLiteralAt(1, ColumnList.of(List.of("op")))
149+
.calledWithLiteralAt(2, Map.of("c", "INSERT", "d", "DELETE"))
150+
.expectArgumentTypes(TABLE_TYPE, DESCRIPTOR_TYPE, MAP_TYPE),
151+
152+
// Error: UPDATE_AFTER without UPDATE_BEFORE not supported
153+
TestSpec.forStrategy(
154+
"UPDATE_AFTER requires UPDATE_BEFORE",
155+
FROM_CHANGELOG_INPUT_TYPE_STRATEGY)
136156
.calledWithArgumentTypes(TABLE_TYPE, DESCRIPTOR_TYPE, MAP_TYPE)
137157
.calledWithTableSemanticsAt(0, new TableSemanticsMock(TABLE_TYPE))
138158
.calledWithLiteralAt(1, ColumnList.of(List.of("op")))

flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/FromChangelogSemanticTests.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ protected void applyDefaultEnvironmentOptions(TableConfig config) {
4040
public List<TableTestProgram> programs() {
4141
return List.of(
4242
FromChangelogTestPrograms.DEFAULT_OP_MAPPING,
43-
FromChangelogTestPrograms.DEBEZIUM_MAPPING,
43+
FromChangelogTestPrograms.CUSTOM_OP_MAPPING,
4444
FromChangelogTestPrograms.UNMAPPED_CODES_DROPPED,
4545
FromChangelogTestPrograms.CUSTOM_OP_NAME,
4646
FromChangelogTestPrograms.TABLE_API_DEFAULT,

flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/FromChangelogTestPrograms.java

Lines changed: 4 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -70,13 +70,13 @@ public class FromChangelogTestPrograms {
7070
+ "input => TABLE cdc_stream)")
7171
.build();
7272

73-
public static final TableTestProgram DEBEZIUM_MAPPING =
73+
public static final TableTestProgram CUSTOM_OP_MAPPING =
7474
TableTestProgram.of(
75-
"from-changelog-custom-mapping",
75+
"from-changelog-custom-op-mapping",
7676
"custom op_mapping with comma-separated keys")
7777
.setupTableSource(
7878
SourceTestStep.newBuilder("cdc_stream")
79-
.addSchema("id INT", "__op STRING", "name STRING")
79+
.addSchema("id INT", "op STRING", "name STRING")
8080
.producedValues(
8181
Row.of(1, "c", "Alice"),
8282
Row.of(2, "r", "Bob"),
@@ -100,7 +100,6 @@ public class FromChangelogTestPrograms {
100100
.runSql(
101101
"INSERT INTO sink SELECT * FROM FROM_CHANGELOG("
102102
+ "input => TABLE cdc_stream, "
103-
+ "op => DESCRIPTOR(__op), "
104103
+ "op_mapping => MAP['c, r', 'INSERT', 'ub', 'UPDATE_BEFORE', 'ua', 'UPDATE_AFTER', 'd', 'DELETE'])")
105104
.build();
106105

@@ -194,10 +193,7 @@ public class FromChangelogTestPrograms {
194193
// Round-trip test: FROM_CHANGELOG(TO_CHANGELOG(table))
195194
// --------------------------------------------------------------------------------------------
196195

197-
/**
198-
* Verifies that FROM_CHANGELOG(TO_CHANGELOG(table)) recovers the original dynamic table.
199-
* Requires TO_CHANGELOG with row semantics (PR #27911).
200-
*/
196+
/** Verifies that FROM_CHANGELOG(TO_CHANGELOG(table)) recovers the original dynamic table. */
201197
public static final TableTestProgram ROUND_TRIP =
202198
TableTestProgram.of(
203199
"from-changelog-round-trip",

flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ProcessTableFunctionTestUtils.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -927,7 +927,9 @@ public ChangelogMode getChangelogMode(ChangelogContext changelogContext) {
927927
/** Testing function that uses row semantics with retract mode (valid). */
928928
public static class UpdatingRetractRowSemanticFunction
929929
extends ChangelogProcessTableFunctionBase {
930-
public void eval(Context ctx, @ArgumentHint({ROW_SEMANTIC_TABLE, SUPPORT_UPDATES}) Row r) {
930+
public void eval(
931+
Context ctx,
932+
@ArgumentHint({ROW_SEMANTIC_TABLE, SUPPORT_UPDATES, REQUIRE_UPDATE_BEFORE}) Row r) {
931933
collectUpdate(ctx, r);
932934
}
933935

0 commit comments

Comments
 (0)