Skip to content

Commit df99efe

Browse files
committed
[FLINK-39261][table] Address improvement feedbacks
1 parent 20f42b7 commit df99efe

8 files changed

Lines changed: 117 additions & 115 deletions

File tree

flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/Table.java

Lines changed: 27 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -1323,33 +1323,6 @@ default TableResult executeInsert(
13231323
*/
13241324
PartitionedTable partitionBy(Expression... fields);
13251325

1326-
/**
1327-
* Converts this append-only table with an explicit operation code column into a dynamic table
1328-
* using the built-in {@code FROM_CHANGELOG} process table function.
1329-
*
1330-
* <p>Each input row is expected to have a string operation code column (default: {@code "op"})
1331-
* that indicates the change operation (e.g., INSERT, UPDATE_AFTER, UPDATE_BEFORE, DELETE). The
1332-
* output table is a dynamic table backed by a changelog stream.
1333-
*
1334-
* <p>Optional arguments can be passed using named expressions:
1335-
*
1336-
* <pre>{@code
1337-
* // Default: reads 'op' column with standard change operation names
1338-
* table.fromChangelog();
1339-
*
1340-
* // Custom op column name and mapping (Debezium-style codes)
1341-
* table.fromChangelog(
1342-
* descriptor("__op").asArgument("op"),
1343-
* map("c, r", "INSERT", "u", "UPDATE_AFTER", "d", "DELETE").asArgument("op_mapping")
1344-
* );
1345-
* }</pre>
1346-
*
1347-
* @param arguments optional named arguments for {@code op} and {@code op_mapping}
1348-
* @return a dynamic {@link Table} with the op column removed and proper change operation
1349-
* semantics
1350-
*/
1351-
Table fromChangelog(Expression... arguments);
1352-
13531326
/**
13541327
* Converts this table object into a named argument.
13551328
*
@@ -1481,4 +1454,31 @@ default TableResult executeInsert(
14811454
* @return an append-only {@link Table} with an {@code op} column prepended to the input columns
14821455
*/
14831456
Table toChangelog(Expression... arguments);
1457+
1458+
/**
1459+
* Converts this append-only table with an explicit operation code column into a dynamic table
1460+
* using the built-in {@code FROM_CHANGELOG} process table function.
1461+
*
1462+
* <p>Each input row is expected to have a string operation code column (default: {@code "op"})
1463+
* that indicates the change operation (e.g., INSERT, UPDATE_AFTER, UPDATE_BEFORE, DELETE). The
1464+
* output table is a dynamic table backed by a changelog stream.
1465+
*
1466+
* <p>Optional arguments can be passed using named expressions:
1467+
*
1468+
* <pre>{@code
1469+
* // Default: reads 'op' column with standard change operation names
1470+
* table.fromChangelog();
1471+
*
1472+
* // Custom op column name and mapping (Debezium-style codes)
1473+
* table.fromChangelog(
1474+
* descriptor("__op").asArgument("op"),
1475+
* map("c, r", "INSERT", "u", "UPDATE_AFTER", "d", "DELETE").asArgument("op_mapping")
1476+
* );
1477+
* }</pre>
1478+
*
1479+
* @param arguments optional named arguments for {@code op} and {@code op_mapping}
1480+
* @return a dynamic {@link Table} with the op column removed and proper change operation
1481+
* semantics
1482+
*/
1483+
Table fromChangelog(Expression... arguments);
14841484
}

flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinition.java

Lines changed: 12 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import org.apache.flink.table.api.TableException;
2323
import org.apache.flink.table.catalog.DataTypeFactory;
2424
import org.apache.flink.table.connector.ChangelogMode;
25+
import org.apache.flink.table.functions.ChangelogFunction.ChangelogContext;
2526
import org.apache.flink.table.types.DataType;
2627
import org.apache.flink.table.types.inference.InputTypeStrategy;
2728
import org.apache.flink.table.types.inference.StaticArgument;
@@ -54,7 +55,7 @@
5455
* <p>Equality is defined by reference equality.
5556
*/
5657
@Internal
57-
public final class BuiltInFunctionDefinition implements SpecializedFunction, ChangelogFunction {
58+
public final class BuiltInFunctionDefinition implements SpecializedFunction {
5859

5960
private final String name;
6061

@@ -137,6 +138,14 @@ public boolean isInternal() {
137138
return isInternal;
138139
}
139140

141+
/**
142+
* Returns the optional changelog mode resolver for built-in PTFs that emit updates (e.g.,
143+
* FROM_CHANGELOG). The planner uses this to determine the output changelog mode.
144+
*/
145+
public Optional<Function<ChangelogContext, ChangelogMode>> getChangelogModeResolver() {
146+
return Optional.ofNullable(changelogModeResolver);
147+
}
148+
140149
public String getQualifiedName() {
141150
if (isInternal) {
142151
return name;
@@ -231,14 +240,6 @@ public static String qualifyFunctionName(String name, int version) {
231240
return String.format(INTERNAL_NAME_FORMAT, name.toUpperCase(Locale.ROOT), version);
232241
}
233242

234-
@Override
235-
public ChangelogMode getChangelogMode(final ChangelogContext changelogContext) {
236-
if (changelogModeResolver != null) {
237-
return changelogModeResolver.apply(changelogContext);
238-
}
239-
return ChangelogMode.insertOnly();
240-
}
241-
242243
// --------------------------------------------------------------------------------------------
243244
// Builder
244245
// --------------------------------------------------------------------------------------------
@@ -416,10 +417,8 @@ public Builder sqlName(String name) {
416417
}
417418

418419
/**
419-
* Sets a resolver that dynamically determines the output {@link ChangelogMode} for this
420-
* built-in PTF. The resolver receives the {@link ChangelogContext} and can inspect function
421-
* arguments (e.g., op_mapping) to adapt the changelog mode. Only needed for PTFs that emit
422-
* updates (e.g., FROM_CHANGELOG).
420+
* Sets a resolver that determines the output {@link ChangelogMode} for this built-in PTF.
421+
* Only needed for PTFs that emit updates (e.g., FROM_CHANGELOG).
423422
*/
424423
public Builder changelogModeResolver(
425424
Function<ChangelogContext, ChangelogMode> changelogModeResolver) {

flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -829,9 +829,9 @@ ANY, and(logical(LogicalTypeRoot.BOOLEAN), LITERAL)
829829
"op_mapping",
830830
DataTypes.MAP(DataTypes.STRING(), DataTypes.STRING()),
831831
true))
832+
.changelogModeResolver(ctx -> ChangelogMode.all())
832833
.inputTypeStrategy(FROM_CHANGELOG_INPUT_TYPE_STRATEGY)
833834
.outputTypeStrategy(FROM_CHANGELOG_OUTPUT_TYPE_STRATEGY)
834-
.changelogModeResolver(ctx -> ChangelogMode.all())
835835
.runtimeClass(
836836
"org.apache.flink.table.runtime.functions.ptf.FromChangelogFunction")
837837
.build();

flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/ChangelogFunction.java

Lines changed: 2 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -23,8 +23,6 @@
2323
import org.apache.flink.table.connector.ChangelogMode;
2424
import org.apache.flink.types.RowKind;
2525

26-
import java.util.Optional;
27-
2826
/**
2927
* An extension that allows a process table function (PTF) to emit results with changelog semantics.
3028
*
@@ -63,10 +61,8 @@
6361
* </ol>
6462
*
6563
* <p>Emitting changelogs is only valid for PTFs that take table arguments with set semantics (see
66-
* {@link ArgumentTrait#SET_SEMANTIC_TABLE}). When using {@code OPTIONAL_PARTITION_BY}, the
67-
* PARTITION BY clause can be omitted for retract mode (with {@link RowKind#UPDATE_BEFORE}), since
68-
* the stream is self-describing. In case of upserts, the upsert key must be equal to the PARTITION
69-
* BY key.
64+
* {@link ArgumentTrait#SET_SEMANTIC_TABLE}). In case of upserts, the upsert key must be equal to
65+
* the PARTITION BY key.
7066
*
7167
* <p>It is perfectly valid for a {@link ChangelogFunction} implementation to return a fixed {@link
7268
* ChangelogMode}, regardless of the {@link ChangelogContext}. This approach may be appropriate when
@@ -109,14 +105,5 @@ interface ChangelogContext {
109105
* are required and {@link ChangelogMode#keyOnlyDeletes()} are supported.
110106
*/
111107
ChangelogMode getRequiredChangelogMode();
112-
113-
/**
114-
* Returns the value of a scalar argument at the given position.
115-
*
116-
* @param pos the argument position
117-
* @param clazz the expected class of the argument value
118-
* @return the argument value, or empty if the argument is null or not available
119-
*/
120-
<T> Optional<T> getArgumentValue(int pos, Class<T> clazz);
121108
}
122109
}

flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/FromChangelogTypeStrategy.java

Lines changed: 9 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -22,23 +22,17 @@
2222
import org.apache.flink.table.api.DataTypes;
2323
import org.apache.flink.table.api.DataTypes.Field;
2424
import org.apache.flink.table.api.ValidationException;
25-
import org.apache.flink.table.functions.FunctionDefinition;
2625
import org.apache.flink.table.functions.TableSemantics;
2726
import org.apache.flink.table.types.DataType;
28-
import org.apache.flink.table.types.inference.ArgumentCount;
2927
import org.apache.flink.table.types.inference.CallContext;
30-
import org.apache.flink.table.types.inference.ConstantArgumentCount;
3128
import org.apache.flink.table.types.inference.InputTypeStrategy;
32-
import org.apache.flink.table.types.inference.Signature;
33-
import org.apache.flink.table.types.inference.Signature.Argument;
3429
import org.apache.flink.table.types.inference.TypeStrategy;
3530
import org.apache.flink.table.types.logical.LogicalTypeFamily;
3631
import org.apache.flink.types.ColumnList;
3732

3833
import java.util.HashSet;
3934
import java.util.List;
4035
import java.util.Map;
41-
import java.util.Map.Entry;
4236
import java.util.Optional;
4337
import java.util.Set;
4438
import java.util.stream.Collectors;
@@ -57,29 +51,12 @@ public final class FromChangelogTypeStrategy {
5751
// --------------------------------------------------------------------------------------------
5852

5953
public static final InputTypeStrategy INPUT_TYPE_STRATEGY =
60-
new InputTypeStrategy() {
61-
@Override
62-
public ArgumentCount getArgumentCount() {
63-
return ConstantArgumentCount.between(1, 3);
64-
}
65-
54+
new ValidationOnlyInputTypeStrategy() {
6655
@Override
6756
public Optional<List<DataType>> inferInputTypes(
6857
final CallContext callContext, final boolean throwOnFailure) {
6958
return validateInputs(callContext, throwOnFailure);
7059
}
71-
72-
@Override
73-
public List<Signature> getExpectedSignatures(final FunctionDefinition definition) {
74-
return List.of(
75-
Signature.of(Argument.of("input", "TABLE")),
76-
Signature.of(
77-
Argument.of("input", "TABLE"), Argument.of("op", "DESCRIPTOR")),
78-
Signature.of(
79-
Argument.of("input", "TABLE"),
80-
Argument.of("op", "DESCRIPTOR"),
81-
Argument.of("op_mapping", "MAP<STRING, STRING>")));
82-
}
8360
};
8461

8562
// --------------------------------------------------------------------------------------------
@@ -155,19 +132,16 @@ private static Optional<List<DataType>> validateInputs(
155132

156133
final Optional<Map> opMapping = callContext.getArgumentValue(2, Map.class);
157134
if (opMapping.isPresent()) {
135+
final Map<String, String> mapping = opMapping.get();
158136
final Optional<List<DataType>> validationError =
159-
validateOpMappingValues(callContext, opMapping.get(), throwOnFailure);
137+
validateOpMappingValues(callContext, mapping, throwOnFailure);
160138
if (validationError.isPresent()) {
161139
return validationError;
162140
}
163141

164142
// Retract mode requires UPDATE_BEFORE in the mapping
165143
final boolean hasUpdateBefore =
166-
opMapping.get().values().stream()
167-
.anyMatch(
168-
v ->
169-
v instanceof String
170-
&& "UPDATE_BEFORE".equals(((String) v).trim()));
144+
mapping.values().stream().anyMatch(v -> "UPDATE_BEFORE".equals(v.trim()));
171145
if (!hasUpdateBefore) {
172146
return callContext.fail(
173147
throwOnFailure,
@@ -180,28 +154,17 @@ private static Optional<List<DataType>> validateInputs(
180154
}
181155

182156
/**
183-
* Validates op_mapping values. Values must be valid RowKind names from {INSERT, UPDATE_AFTER,
184-
* DELETE}. Keys are arbitrary user strings (e.g., 'c', 'u', 'd') and may be comma-separated to
185-
* map multiple user codes to the same RowKind. Each RowKind name must appear at most once
186-
* across all entries.
157+
* Validates op_mapping values. Values must be valid Flink change operation names. Each name
158+
* must appear at most once across all entries.
187159
*/
188160
private static Optional<List<DataType>> validateOpMappingValues(
189161
final CallContext callContext,
190-
final Map<?, ?> opMapping,
162+
final Map<String, String> opMapping,
191163
final boolean throwOnFailure) {
192164
final Set<String> allRowKindsSeen = new HashSet<>();
193165

194-
for (final Entry<?, ?> entry : opMapping.entrySet()) {
195-
if (!(entry.getKey() instanceof String)) {
196-
return callContext.fail(
197-
throwOnFailure, "Invalid target mapping for argument 'op_mapping'.");
198-
}
199-
final Object value = entry.getValue();
200-
if (!(value instanceof String)) {
201-
return callContext.fail(
202-
throwOnFailure, "Invalid target mapping for argument 'op_mapping'.");
203-
}
204-
final String rowKindName = ((String) value).trim();
166+
for (final String value : opMapping.values()) {
167+
final String rowKindName = value.trim();
205168
if (!VALID_ROW_KIND_NAMES.contains(rowKindName)) {
206169
return callContext.fail(
207170
throwOnFailure,
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.flink.table.types.inference.strategies;
20+
21+
import org.apache.flink.annotation.Internal;
22+
import org.apache.flink.table.functions.FunctionDefinition;
23+
import org.apache.flink.table.types.inference.ArgumentCount;
24+
import org.apache.flink.table.types.inference.ConstantArgumentCount;
25+
import org.apache.flink.table.types.inference.InputTypeStrategy;
26+
import org.apache.flink.table.types.inference.Signature;
27+
import org.apache.flink.table.types.inference.StaticArgument;
28+
29+
import java.util.List;
30+
31+
/**
32+
* Base class for input type strategies that only perform validation. Argument count and signatures
33+
* are handled by {@link StaticArgument}s in the function definition.
34+
*
35+
* <p>Subclasses only need to implement {@link #inferInputTypes} for custom validation logic.
36+
*/
37+
@Internal
38+
public abstract class ValidationOnlyInputTypeStrategy implements InputTypeStrategy {
39+
40+
@Override
41+
public ArgumentCount getArgumentCount() {
42+
return ConstantArgumentCount.any();
43+
}
44+
45+
@Override
46+
public List<Signature> getExpectedSignatures(FunctionDefinition definition) {
47+
return List.of(Signature.of());
48+
}
49+
}

flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/optimize/program/FlinkChangelogModeInferenceProgram.scala

Lines changed: 15 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ import org.apache.flink.table.api.InsertConflictStrategy.ConflictBehavior
2323
import org.apache.flink.table.api.config.ExecutionConfigOptions
2424
import org.apache.flink.table.api.config.ExecutionConfigOptions.UpsertMaterialize
2525
import org.apache.flink.table.connector.ChangelogMode
26-
import org.apache.flink.table.functions.ChangelogFunction
26+
import org.apache.flink.table.functions.{BuiltInFunctionDefinition, ChangelogFunction}
2727
import org.apache.flink.table.functions.ChangelogFunction.ChangelogContext
2828
import org.apache.flink.table.planner.calcite.{FlinkTypeFactory, RexTableArgCall}
2929
import org.apache.flink.table.planner.plan.`trait`._
@@ -1692,10 +1692,6 @@ class FlinkChangelogModeInferenceProgram extends FlinkOptimizeProgram[StreamOpti
16921692
override def getRequiredChangelogMode: ChangelogMode = {
16931693
callContext.getOutputChangelogMode.orElse(null)
16941694
}
1695-
1696-
override def getArgumentValue[T](pos: Int, clazz: Class[T]): java.util.Optional[T] = {
1697-
callContext.getArgumentValue(pos, clazz)
1698-
}
16991695
}
17001696
}
17011697

@@ -1713,12 +1709,14 @@ class FlinkChangelogModeInferenceProgram extends FlinkOptimizeProgram[StreamOpti
17131709
val changelogContext =
17141710
toPtfChangelogContext(process, inputChangelogModes, requiredChangelogMode)
17151711
val changelogMode = changelogFunction.getChangelogMode(changelogContext)
1716-
if (
1717-
!changelogMode.containsOnly(RowKind.INSERT) &&
1718-
!changelogMode.contains(RowKind.UPDATE_BEFORE)
1719-
) {
1720-
verifyPtfTableArgsForUpdates(call)
1721-
}
1712+
verifyPtfTableArgsForUpdates(call, changelogMode)
1713+
toTraitSet(changelogMode)
1714+
case builtIn: BuiltInFunctionDefinition if builtIn.getChangelogModeResolver.isPresent =>
1715+
val inputChangelogModes = children.map(toChangelogMode(_, None, None))
1716+
val changelogContext =
1717+
toPtfChangelogContext(process, inputChangelogModes, requiredChangelogMode)
1718+
val changelogMode = builtIn.getChangelogModeResolver.get().apply(changelogContext)
1719+
verifyPtfTableArgsForUpdates(call, changelogMode)
17221720
toTraitSet(changelogMode)
17231721
case _ =>
17241722
defaultTraitSet
@@ -1734,7 +1732,12 @@ class FlinkChangelogModeInferenceProgram extends FlinkOptimizeProgram[StreamOpti
17341732
* Upsert mode (without UPDATE_BEFORE) requires a key to look up previous values, so set semantics
17351733
* with PARTITION BY is required.
17361734
*/
1737-
private def verifyPtfTableArgsForUpdates(call: RexCall): Unit = {
1735+
private def verifyPtfTableArgsForUpdates(call: RexCall, changelogMode: ChangelogMode): Unit = {
1736+
if (
1737+
changelogMode.containsOnly(RowKind.INSERT) || changelogMode.contains(RowKind.UPDATE_BEFORE)
1738+
) {
1739+
return
1740+
}
17381741
StreamPhysicalProcessTableFunction
17391742
.getProvidedInputArgs(call)
17401743
.map(_.e)

0 commit comments

Comments
 (0)