Skip to content

Commit ee827df

Browse files
authored
[FLINK-39253][table] Preserve field names in ROW function from AS aliases
1 parent 2379582 commit ee827df

5 files changed

Lines changed: 51 additions & 2 deletions

File tree

flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/resolver/rules/ResolveCallByArgumentsRule.java

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -681,6 +681,23 @@ public List<DataType> getArgumentDataTypes() {
681681
.collect(Collectors.toList());
682682
}
683683

684+
@Override
685+
public Optional<String> getArgumentName(int pos) {
686+
final ResolvedExpression arg = getArgument(pos);
687+
688+
if (arg instanceof CallExpression) {
689+
final CallExpression call = (CallExpression) arg;
690+
if (call.getFunctionDefinition() == BuiltInFunctionDefinitions.AS) {
691+
final List<ResolvedExpression> children = call.getResolvedChildren();
692+
if (children.size() >= 2 && children.get(1) instanceof ValueLiteralExpression) {
693+
return ((ValueLiteralExpression) children.get(1)).getValueAs(String.class);
694+
}
695+
}
696+
}
697+
698+
return Optional.empty();
699+
}
700+
684701
@Override
685702
public Optional<DataType> getOutputDataType() {
686703
return Optional.empty();

flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/CallContext.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -113,6 +113,11 @@ default Optional<ChangelogMode> getOutputChangelogMode() {
113113
*/
114114
List<DataType> getArgumentDataTypes();
115115

116+
/** Returns the name/alias of the argument at the given position if one is available. */
117+
default Optional<String> getArgumentName(int pos) {
118+
return Optional.empty();
119+
}
120+
116121
/**
117122
* Returns the inferred output data type of the function call.
118123
*

flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/RowTypeStrategy.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,12 @@ public Optional<DataType> inferType(CallContext callContext) {
3737
List<DataType> argumentDataTypes = callContext.getArgumentDataTypes();
3838
DataTypes.Field[] fields =
3939
IntStream.range(0, argumentDataTypes.size())
40-
.mapToObj(idx -> DataTypes.FIELD("f" + idx, argumentDataTypes.get(idx)))
40+
.mapToObj(
41+
idx -> {
42+
String fieldName =
43+
callContext.getArgumentName(idx).orElse("f" + idx);
44+
return DataTypes.FIELD(fieldName, argumentDataTypes.get(idx));
45+
})
4146
.toArray(DataTypes.Field[]::new);
4247

4348
return Optional.of(DataTypes.ROW(fields).notNull());

flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/utils/CastCallContext.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -115,6 +115,12 @@ public List<DataType> getArgumentDataTypes() {
115115
return expectedArguments;
116116
}
117117

118+
@Override
119+
public Optional<String> getArgumentName(int pos) {
120+
// argument names remain regardless of casting
121+
return originalContext.getArgumentName(pos);
122+
}
123+
118124
@Override
119125
public Optional<DataType> getOutputDataType() {
120126
return Optional.ofNullable(outputDataType);

flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/RowFunctionITCase.java

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -115,7 +115,23 @@ Stream<TestSetSpec> getTestSetSpecs() {
115115
DataTypes.FIELD("b", DataTypes.TINYINT()),
116116
DataTypes.FIELD("c", DataTypes.BIGINT()),
117117
DataTypes.FIELD("d", DataTypes.BOOLEAN()))
118-
.notNull()));
118+
.notNull()),
119+
TestSetSpec.forFunction(
120+
BuiltInFunctionDefinitions.ROW, "with aliased fields using .as()")
121+
.onFieldsWithData(100, "abc", 75.50)
122+
.andDataTypes(DataTypes.INT(), DataTypes.STRING(), DataTypes.DOUBLE())
123+
.testTableApiResult(
124+
row($("f0").as("a"), $("f1").as("b"), $("f2").as("c")),
125+
Row.of(100, "abc", 75.50),
126+
DataTypes.ROW(
127+
DataTypes.FIELD("a", DataTypes.INT()),
128+
DataTypes.FIELD("b", DataTypes.STRING()),
129+
DataTypes.FIELD("c", DataTypes.DOUBLE()))
130+
.notNull())
131+
.testTableApiResult(
132+
row($("f0").as("a"), $("f1").as("b"), $("f2").as("c")).get("a"),
133+
100,
134+
DataTypes.INT()));
119135
}
120136

121137
// --------------------------------------------------------------------------------------------

0 commit comments

Comments
 (0)