Skip to content

Commit 917d73a

Browse files
committed
[FLINK-39261][table] Throw ValidationException when invalid op mapping is received
The default invalid_op_handling behavior is FAIL. During, runtime if rowKind is not present in the op_map we should throw an ValidationException
1 parent a02799d commit 917d73a

4 files changed

Lines changed: 5 additions & 42 deletions

File tree

flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/inference/strategies/FromChangelogInputTypeStrategyTest.java

Lines changed: 0 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -61,20 +61,6 @@ protected Stream<TestSpec> testData() {
6161
"d", "DELETE"))
6262
.expectArgumentTypes(TABLE_TYPE, DESCRIPTOR_TYPE, MAP_TYPE),
6363

64-
// Valid: retract-style mapping with UPDATE_BEFORE
65-
TestSpec.forStrategy("Valid with UPDATE_BEFORE", FROM_CHANGELOG_INPUT_TYPE_STRATEGY)
66-
.calledWithArgumentTypes(TABLE_TYPE, DESCRIPTOR_TYPE, MAP_TYPE)
67-
.calledWithTableSemanticsAt(0, new TableSemanticsMock(TABLE_TYPE))
68-
.calledWithLiteralAt(1, ColumnList.of(List.of("op")))
69-
.calledWithLiteralAt(
70-
2,
71-
Map.of(
72-
"c", "INSERT",
73-
"ub", "UPDATE_BEFORE",
74-
"ua", "UPDATE_AFTER",
75-
"d", "DELETE"))
76-
.expectArgumentTypes(TABLE_TYPE, DESCRIPTOR_TYPE, MAP_TYPE),
77-
7864
// Error: op column not found
7965
TestSpec.forStrategy(
8066
"Op column not found in schema", FROM_CHANGELOG_INPUT_TYPE_STRATEGY)

flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/FromChangelogSemanticTests.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,6 @@ public List<TableTestProgram> programs() {
4141
return List.of(
4242
FromChangelogTestPrograms.DEFAULT_OP_MAPPING,
4343
FromChangelogTestPrograms.CUSTOM_OP_MAPPING,
44-
FromChangelogTestPrograms.UNMAPPED_CODES_DROPPED,
4544
FromChangelogTestPrograms.CUSTOM_OP_NAME,
4645
FromChangelogTestPrograms.TABLE_API_DEFAULT,
4746
FromChangelogTestPrograms.ROUND_TRIP);

flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/FromChangelogTestPrograms.java

Lines changed: 0 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -93,32 +93,6 @@ public class FromChangelogTestPrograms {
9393
+ "op_mapping => MAP['c, r', 'INSERT', 'ub', 'UPDATE_BEFORE', 'ua', 'UPDATE_AFTER', 'd', 'DELETE'])")
9494
.build();
9595

96-
public static final TableTestProgram UNMAPPED_CODES_DROPPED =
97-
TableTestProgram.of(
98-
"from-changelog-unmapped-codes-dropped",
99-
"unmapped op codes are silently dropped")
100-
.setupTableSource(
101-
SourceTestStep.newBuilder("cdc_stream")
102-
.addSchema(SIMPLE_CDC_SCHEMA)
103-
.producedValues(
104-
Row.of(1, "INSERT", "Alice"),
105-
Row.of(2, "INSERT", "Bob"),
106-
Row.of(1, "UNKNOWN", "Alice2"),
107-
Row.of(2, "DELETE", "Bob"))
108-
.build())
109-
.setupTableSink(
110-
SinkTestStep.newBuilder("sink")
111-
.addSchema("id INT", "name STRING")
112-
.consumedValues(
113-
Row.ofKind(RowKind.INSERT, 1, "Alice"),
114-
Row.ofKind(RowKind.INSERT, 2, "Bob"),
115-
Row.ofKind(RowKind.DELETE, 2, "Bob"))
116-
.build())
117-
.runSql(
118-
"INSERT INTO sink SELECT * FROM FROM_CHANGELOG("
119-
+ "input => TABLE cdc_stream)")
120-
.build();
121-
12296
/** Custom op column name via DESCRIPTOR. */
12397
public static final TableTestProgram CUSTOM_OP_NAME =
12498
TableTestProgram.of(

flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/ptf/FromChangelogFunction.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
package org.apache.flink.table.runtime.functions.ptf;
2020

2121
import org.apache.flink.annotation.Internal;
22+
import org.apache.flink.table.api.ValidationException;
2223
import org.apache.flink.table.data.MapData;
2324
import org.apache.flink.table.data.RowData;
2425
import org.apache.flink.table.data.StringData;
@@ -137,7 +138,10 @@ public void eval(
137138
final StringData opCode = input.getString(opColumnIndex);
138139
final RowKind rowKind = opMap.get(opCode);
139140
if (rowKind == null) {
140-
return;
141+
throw new ValidationException(
142+
String.format(
143+
"Received invalid op code '%s'. Defined op codes are: %s. Failing as configured.",
144+
opCode, opMap.keySet()));
141145
}
142146

143147
projectedOutput.replaceRow(input);

0 commit comments

Comments
 (0)