Skip to content

Commit 9d2cdcb

Browse files
committed
[FLINK-39261][table] Introduce ChangelogModeStrategy interface for BuiltInFunctionDefinition
1 parent c4a3391 commit 9d2cdcb

4 files changed

Lines changed: 54 additions & 19 deletions

File tree

flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinition.java

Lines changed: 13 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -21,9 +21,8 @@
2121
import org.apache.flink.annotation.Internal;
2222
import org.apache.flink.table.api.TableException;
2323
import org.apache.flink.table.catalog.DataTypeFactory;
24-
import org.apache.flink.table.connector.ChangelogMode;
25-
import org.apache.flink.table.functions.ChangelogFunction.ChangelogContext;
2624
import org.apache.flink.table.types.DataType;
25+
import org.apache.flink.table.types.inference.ChangelogModeStrategy;
2726
import org.apache.flink.table.types.inference.InputTypeStrategy;
2827
import org.apache.flink.table.types.inference.StaticArgument;
2928
import org.apache.flink.table.types.inference.TypeInference;
@@ -35,7 +34,6 @@
3534
import java.util.Arrays;
3635
import java.util.Locale;
3736
import java.util.Optional;
38-
import java.util.function.Function;
3937
import java.util.regex.Pattern;
4038

4139
import static org.apache.flink.util.Preconditions.checkArgument;
@@ -75,7 +73,7 @@ public final class BuiltInFunctionDefinition implements SpecializedFunction {
7573

7674
private final SqlCallSyntax sqlCallSyntax;
7775

78-
private final @Nullable Function<ChangelogContext, ChangelogMode> changelogModeResolver;
76+
private final @Nullable ChangelogModeStrategy changelogModeStrategy;
7977

8078
private final String sqlName;
8179

@@ -90,7 +88,7 @@ private BuiltInFunctionDefinition(
9088
boolean isRuntimeProvided,
9189
String runtimeClass,
9290
boolean isInternal,
93-
@Nullable Function<ChangelogContext, ChangelogMode> changelogModeResolver) {
91+
@Nullable ChangelogModeStrategy changelogModeStrategy) {
9492
this.name = checkNotNull(name, "Name must not be null.");
9593
this.sqlName = sqlName;
9694
this.version = isInternal ? null : version;
@@ -101,7 +99,7 @@ private BuiltInFunctionDefinition(
10199
this.runtimeClass = runtimeClass;
102100
this.isInternal = isInternal;
103101
this.sqlCallSyntax = sqlCallSyntax;
104-
this.changelogModeResolver = changelogModeResolver;
102+
this.changelogModeStrategy = changelogModeStrategy;
105103
validateFunction(this.name, this.version, this.isInternal);
106104
}
107105

@@ -139,11 +137,11 @@ public boolean isInternal() {
139137
}
140138

141139
/**
142-
* Returns the optional changelog mode resolver for built-in PTFs that emit updates (e.g.,
140+
* Returns the optional {@link ChangelogModeStrategy} for built-in PTFs that emit updates (e.g.,
143141
* FROM_CHANGELOG). The planner uses this to determine the output changelog mode.
144142
*/
145-
public Optional<Function<ChangelogContext, ChangelogMode>> getChangelogModeResolver() {
146-
return Optional.ofNullable(changelogModeResolver);
143+
public Optional<ChangelogModeStrategy> getChangelogModeStrategy() {
144+
return Optional.ofNullable(changelogModeStrategy);
147145
}
148146

149147
public String getQualifiedName() {
@@ -268,7 +266,7 @@ public static final class Builder {
268266

269267
private SqlCallSyntax sqlCallSyntax = SqlCallSyntax.FUNCTION;
270268

271-
private @Nullable Function<ChangelogContext, ChangelogMode> changelogModeResolver;
269+
private @Nullable ChangelogModeStrategy changelogModeStrategy;
272270

273271
public Builder() {
274272
// default constructor to allow a fluent definition
@@ -417,12 +415,11 @@ public Builder sqlName(String name) {
417415
}
418416

419417
/**
420-
* Sets a resolver that determines the output {@link ChangelogMode} for this built-in
421-
* function. Only needed for functions that emit updates (e.g., FROM_CHANGELOG).
418+
* Sets the {@link ChangelogModeStrategy} that determines the output changelog mode for this
419+
* built-in PTF. Only needed for PTFs that emit updates (e.g., FROM_CHANGELOG).
422420
*/
423-
public Builder changelogModeResolver(
424-
Function<ChangelogContext, ChangelogMode> changelogModeResolver) {
425-
this.changelogModeResolver = changelogModeResolver;
421+
public Builder changelogModeStrategy(ChangelogModeStrategy changelogModeStrategy) {
422+
this.changelogModeStrategy = changelogModeStrategy;
426423
return this;
427424
}
428425

@@ -438,7 +435,7 @@ public BuiltInFunctionDefinition build() {
438435
isRuntimeProvided,
439436
runtimeClass,
440437
isInternal,
441-
changelogModeResolver);
438+
changelogModeStrategy);
442439
}
443440
}
444441
}

flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -829,7 +829,7 @@ ANY, and(logical(LogicalTypeRoot.BOOLEAN), LITERAL)
829829
"op_mapping",
830830
DataTypes.MAP(DataTypes.STRING(), DataTypes.STRING()),
831831
true))
832-
.changelogModeResolver(ctx -> ChangelogMode.all())
832+
.changelogModeStrategy(ctx -> ChangelogMode.all())
833833
.inputTypeStrategy(FROM_CHANGELOG_INPUT_TYPE_STRATEGY)
834834
.outputTypeStrategy(FROM_CHANGELOG_OUTPUT_TYPE_STRATEGY)
835835
.runtimeClass(
Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.flink.table.types.inference;
20+
21+
import org.apache.flink.annotation.Experimental;
22+
import org.apache.flink.table.connector.ChangelogMode;
23+
import org.apache.flink.table.functions.ChangelogFunction.ChangelogContext;
24+
25+
/**
26+
* Strategy for determining the output {@link ChangelogMode} of a built-in process table function.
27+
*
28+
* <p>Similar to {@link TypeStrategy}, this is used to declare changelog semantics in the function
29+
* definition rather than implementing the {@link
30+
* org.apache.flink.table.functions.ChangelogFunction} interface directly.
31+
*/
32+
@Experimental
33+
public interface ChangelogModeStrategy {
34+
35+
/** Infers the output {@link ChangelogMode} based on the given {@link ChangelogContext}. */
36+
ChangelogMode inferChangelogMode(ChangelogContext changelogContext);
37+
}

flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/optimize/program/FlinkChangelogModeInferenceProgram.scala

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1713,11 +1713,12 @@ class FlinkChangelogModeInferenceProgram extends FlinkOptimizeProgram[StreamOpti
17131713
val changelogMode = changelogFunction.getChangelogMode(changelogContext)
17141714
verifyPtfTableArgsForUpdates(call, changelogMode)
17151715
toTraitSet(changelogMode)
1716-
case builtIn: BuiltInFunctionDefinition if builtIn.getChangelogModeResolver.isPresent =>
1716+
case builtIn: BuiltInFunctionDefinition if builtIn.getChangelogModeStrategy.isPresent =>
17171717
val inputChangelogModes = children.map(toChangelogMode(_, None, None))
17181718
val changelogContext =
17191719
toPtfChangelogContext(process, inputChangelogModes, requiredChangelogMode)
1720-
val changelogMode = builtIn.getChangelogModeResolver.get().apply(changelogContext)
1720+
val changelogMode =
1721+
builtIn.getChangelogModeStrategy.get().inferChangelogMode(changelogContext)
17211722
verifyPtfTableArgsForUpdates(call, changelogMode)
17221723
toTraitSet(changelogMode)
17231724
case _ =>

0 commit comments

Comments
 (0)