Skip to content

Commit 3e9e416

Browse files
committed
[FLINK-39261][table] Add more semantic tests
1 parent 01803f8 commit 3e9e416

2 files changed

Lines changed: 82 additions & 1 deletion

File tree

flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/FromChangelogSemanticTests.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,11 @@ public List<TableTestProgram> programs() {
4242
FromChangelogTestPrograms.DEFAULT_OP_MAPPING,
4343
FromChangelogTestPrograms.DEBEZIUM_MAPPING,
4444
FromChangelogTestPrograms.UNMAPPED_CODES_DROPPED,
45+
FromChangelogTestPrograms.CUSTOM_OP_NAME,
4546
FromChangelogTestPrograms.TABLE_API_DEFAULT,
46-
FromChangelogTestPrograms.MISSING_PARTITION_BY);
47+
FromChangelogTestPrograms.MISSING_PARTITION_BY
48+
// TODO: enable after TO_CHANGELOG switches to row semantics (PR #27911)
49+
// FromChangelogTestPrograms.ROUND_TRIP
50+
);
4751
}
4852
}

flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/FromChangelogTestPrograms.java

Lines changed: 77 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
package org.apache.flink.table.planner.plan.nodes.exec.stream;
2020

2121
import org.apache.flink.table.api.ValidationException;
22+
import org.apache.flink.table.connector.ChangelogMode;
2223
import org.apache.flink.table.test.program.SinkTestStep;
2324
import org.apache.flink.table.test.program.SourceTestStep;
2425
import org.apache.flink.table.test.program.TableTestProgram;
@@ -133,6 +134,35 @@ public class FromChangelogTestPrograms {
133134
+ "input => TABLE cdc_stream PARTITION BY id)")
134135
.build();
135136

137+
/** Custom op column name via DESCRIPTOR. */
138+
public static final TableTestProgram CUSTOM_OP_NAME =
139+
TableTestProgram.of(
140+
"from-changelog-custom-op-name", "custom op column name via DESCRIPTOR")
141+
.setupTableSource(
142+
SourceTestStep.newBuilder("cdc_stream")
143+
.addSchema("id INT", "operation STRING", "name STRING")
144+
.producedValues(
145+
Row.of(1, "INSERT", "Alice"),
146+
Row.of(1, "UPDATE_BEFORE", "Alice"),
147+
Row.of(1, "UPDATE_AFTER", "Alice2"))
148+
.build())
149+
.setupTableSink(
150+
SinkTestStep.newBuilder("sink")
151+
.addSchema(
152+
"id INT NOT NULL",
153+
"name STRING",
154+
"PRIMARY KEY (id) NOT ENFORCED")
155+
.consumedValues(
156+
Row.ofKind(RowKind.INSERT, 1, "Alice"),
157+
Row.ofKind(RowKind.UPDATE_BEFORE, 1, "Alice"),
158+
Row.ofKind(RowKind.UPDATE_AFTER, 1, "Alice2"))
159+
.build())
160+
.runSql(
161+
"INSERT INTO sink SELECT * FROM FROM_CHANGELOG("
162+
+ "input => TABLE cdc_stream PARTITION BY id, "
163+
+ "op => DESCRIPTOR(operation))")
164+
.build();
165+
136166
// --------------------------------------------------------------------------------------------
137167
// Table API test
138168
// --------------------------------------------------------------------------------------------
@@ -163,6 +193,53 @@ public class FromChangelogTestPrograms {
163193
"sink")
164194
.build();
165195

196+
// --------------------------------------------------------------------------------------------
197+
// Round-trip test: FROM_CHANGELOG(TO_CHANGELOG(table))
198+
// --------------------------------------------------------------------------------------------
199+
200+
/**
201+
* Verifies that FROM_CHANGELOG(TO_CHANGELOG(table)) recovers the original dynamic table.
202+
* Requires TO_CHANGELOG with row semantics (PR #27911).
203+
*/
204+
public static final TableTestProgram ROUND_TRIP =
205+
TableTestProgram.of(
206+
"from-changelog-round-trip",
207+
"FROM_CHANGELOG(TO_CHANGELOG(table)) recovers original table")
208+
.setupTableSource(
209+
SourceTestStep.newBuilder("orders")
210+
.addSchema(
211+
"id INT NOT NULL",
212+
"name STRING",
213+
"PRIMARY KEY (id) NOT ENFORCED")
214+
.addMode(ChangelogMode.all())
215+
.producedValues(
216+
Row.ofKind(RowKind.INSERT, 1, "Alice"),
217+
Row.ofKind(RowKind.INSERT, 2, "Bob"),
218+
Row.ofKind(RowKind.UPDATE_BEFORE, 1, "Alice"),
219+
Row.ofKind(RowKind.UPDATE_AFTER, 1, "Alice2"),
220+
Row.ofKind(RowKind.DELETE, 2, "Bob"))
221+
.build())
222+
.setupTableSink(
223+
SinkTestStep.newBuilder("sink")
224+
.addSchema(
225+
"id INT NOT NULL",
226+
"name STRING",
227+
"PRIMARY KEY (id) NOT ENFORCED")
228+
.consumedValues(
229+
Row.ofKind(RowKind.INSERT, 1, "Alice"),
230+
Row.ofKind(RowKind.INSERT, 2, "Bob"),
231+
Row.ofKind(RowKind.UPDATE_BEFORE, 1, "Alice"),
232+
Row.ofKind(RowKind.UPDATE_AFTER, 1, "Alice2"),
233+
Row.ofKind(RowKind.DELETE, 2, "Bob"))
234+
.build())
235+
.setupSql(
236+
"CREATE VIEW changelog_view AS "
237+
+ "SELECT * FROM TO_CHANGELOG(input => TABLE orders)")
238+
.runSql(
239+
"INSERT INTO sink SELECT * FROM FROM_CHANGELOG("
240+
+ "input => TABLE changelog_view PARTITION BY id)")
241+
.build();
242+
166243
// --------------------------------------------------------------------------------------------
167244
// Error validation tests
168245
// --------------------------------------------------------------------------------------------

0 commit comments

Comments
 (0)