Skip to content

Commit 3afaa8c

Browse files
authored
[FLINK-39256][table] Support ORDER BY clause in Process Table Functions
This closes #27929.
1 parent aedc817 commit 3afaa8c

50 files changed

Lines changed: 3989 additions & 132 deletions

File tree

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

docs/content.zh/docs/dev/table/functions/ptfs.md

Lines changed: 168 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -932,22 +932,37 @@ table.
932932

933933
The timestamp of the firing timer currently being processed within the `onTimer()` method.
934934

935+
#### Table Watermark
936+
937+
`TimeContext#tableWatermark()` returns the event-time watermark of the input table currently being processed.
938+
939+
Watermarks are generated in sources and sent through the topology for advancing the logical clock. The current watermark
940+
of an input table is the minimum watermark of all upstream Flink subtasks producing the table.
941+
942+
In multi-input scenarios, each input table can have its own independent watermark. This method returns the watermark
943+
specific to the input table that is currently being processed in the `eval()` method, rather than the global minimum
944+
watermark across all input tables (which is returned by `currentWatermark()`).
945+
946+
This is particularly useful for late event detection on a per-input basis.
947+
948+
It returns the current watermark of the input table being processed. `null` if called within the `onTimer()` method or a
949+
watermark has not yet been received from all upstream Flink subtasks producing the table.
950+
935951
#### Current Watermark
936952

937-
`TimeContext#currentWatermark()` returns the current event-time watermark.
953+
`TimeContext#currentWatermark()` returns the current event-time watermark at this PTF instance.
938954

939-
Watermarks are generated in sources and sent through the topology for advancing the logical clock in each Flink subtask.
940-
The current watermark of a Flink subtask is the global minimum watermark of all inputs (i.e. across all parallel inputs
941-
and table partitions).
955+
Watermarks are generated in sources and sent through the topology for advancing the logical clock. The current watermark
956+
of a PTF instance is the global minimum watermark of all input tables (i.e., across all upstream Flink subtasks and
957+
table partitions).
942958

943959
This method returns the current watermark of the Flink subtask that evaluates the PTF. Thus, the returned timestamp
944-
represents the entire Flink subtask, independent of the currently processed partition. This behavior is similar to a call
945-
to `SELECT CURRENT_WATERMARK(...)` in SQL.
960+
represents the entire Flink subtask, independent of the currently processed input table and partition. This behavior is
961+
similar to a call to `SELECT CURRENT_WATERMARK(...)` in SQL.
946962

947-
If a watermark was not received from all inputs, the method returns `null`.
948-
949-
In case this method is called within the `onTimer()` method, the returned watermark is the triggering watermark
950-
that currently fires the timer.
963+
It returns the current watermark at the PTF instance across all upstream Flink subtasks and table partitions. A `null`
964+
value is returned if no minimum logical time could be calculated across all inputs; this happens during startup
965+
or recovery when one or more active (i.e. not idle) inputs haven't sent a watermark yet.
951966

952967
### Timers
953968

@@ -1015,6 +1030,149 @@ not needed anymore via `Context#clearAllTimers()` or `TimeContext#clearTimer(Str
10151030

10161031
{{< top >}}
10171032

1033+
Ordering
1034+
--------
1035+
1036+
A PTF that takes a table with set semantics can optionally specify an ORDER BY clause in the
1037+
function call to define the order in which rows are processed within each partition. The ORDER BY
1038+
clause guarantees that rows are delivered to the eval() method in the specified order.
1039+
1040+
The ORDER BY clause requires that the first column is a time attribute column (i.e., a
1041+
TIMESTAMP or TIMESTAMP_LTZ column with a watermark declaration). The first ORDER BY column must
1042+
be specified in ascending order. This ensures that rows are processed in event-time order.
1043+
Additional columns can be specified as secondary sort keys to define the ordering of
1044+
rows with the same timestamp.
1045+
1046+
{{< tabs "2137eeed-order-by-syntax" >}}
1047+
{{< tab "SQL" >}}
1048+
```sql
1049+
SELECT * FROM my_ptf(
1050+
input_table => TABLE source_table
1051+
PARTITION BY user_id
1052+
ORDER BY (event_time ASC, priority DESC NULLS FIRST)
1053+
)
1054+
```
1055+
{{< /tab >}}
1056+
{{< tab "Java" >}}
1057+
```java
1058+
env.from("source_table")
1059+
.partitionBy($("user_id"))
1060+
.orderBy($("event_time").asc(), $("priority").desc())
1061+
.process(MyPTF.class, descriptor("event_time").asArgument("on_time"))
1062+
```
1063+
{{< /tab >}}
1064+
{{< /tabs >}}
1065+
1066+
### Difference Between ORDER BY and on_time Argument
1067+
1068+
While both ORDER BY and the `on_time` argument relate to time attributes, they serve
1069+
different purposes:
1070+
1071+
- **on_time**: Declares which time attribute column powers the time context (`TimeContext#time()`) and
1072+
output timestamp. It does NOT affect the processing order of rows.
1073+
- **ORDER BY**: Physically buffers and sorts rows within each partition to guarantee ordered delivery
1074+
to the eval() method. If both ORDER BY and `on_time` are specified for the same table argument, they
1075+
must reference the same time attribute column.
1076+
1077+
### Ordering Guarantees and Late Events
1078+
1079+
When ORDER BY is specified on a time attribute column, the framework maintains a sort buffer
1080+
per partition and input table to reorder out-of-order events. The sort buffer is flushed when the
1081+
watermark for the given input table advances, at which point all buffered rows with timestamps
1082+
less than or equal to the watermark are delivered to the eval() method in sorted order. Late
1083+
events (arriving after the watermark) are dropped to maintain the ordering guarantee.
1084+
1085+
The following example demonstrates ordered processing with secondary sorting. First, the function implementation:
1086+
1087+
```java
1088+
// Function that processes events in order and captures the ordering
1089+
public static class OrderedProcessor extends ProcessTableFunction<List<Event>> {
1090+
public record Event(Integer score, Instant ts) {}
1091+
1092+
public static class BufferState {
1093+
// Stores all input events that enter the eval() after sorting
1094+
public List<Event> events = new ArrayList<>();
1095+
}
1096+
1097+
public void eval(
1098+
Context ctx,
1099+
@StateHint BufferState state,
1100+
@ArgumentHint(SET_SEMANTIC_TABLE) Row input
1101+
) {
1102+
// Optional: Access ordering information at runtime
1103+
TableSemantics semantics = ctx.tableSemanticsFor("input");
1104+
int[] orderColumns = semantics.orderByColumns();
1105+
SortDirection[] directions = semantics.orderByDirections();
1106+
1107+
// Buffer the incoming row
1108+
state.events.add(new Event(input.getFieldAs("score"), input.getFieldAs("ts")));
1109+
1110+
// Emit current buffer state
1111+
collect(state.events);
1112+
}
1113+
}
1114+
```
1115+
1116+
The function can be called using either SQL or Table API:
1117+
1118+
{{< tabs "2237eeed-order-by-example" >}}
1119+
{{< tab "SQL" >}}
1120+
```sql
1121+
-- Create a watermarked table
1122+
CREATE TABLE Events (
1123+
name STRING,
1124+
score INT,
1125+
ts TIMESTAMP_LTZ(3),
1126+
WATERMARK FOR ts AS ts - INTERVAL '1' SECOND
1127+
) WITH (
1128+
'connector' = 'datagen'
1129+
);
1130+
1131+
-- Register the function
1132+
CREATE FUNCTION OrderedProcessor AS 'org.example.OrderedProcessor';
1133+
1134+
-- Use ORDER BY with primary and secondary sort columns
1135+
SELECT * FROM OrderedProcessor(
1136+
input => TABLE Events
1137+
PARTITION BY name
1138+
ORDER BY (ts ASC, score DESC)
1139+
);
1140+
```
1141+
{{< /tab >}}
1142+
{{< tab "Java" >}}
1143+
```java
1144+
TableEnvironment env = TableEnvironment.create(EnvironmentSettings.inStreamingMode());
1145+
1146+
// Create a watermarked table
1147+
env.executeSql(
1148+
"CREATE TABLE Events (" +
1149+
"name STRING, " +
1150+
"score INT, " +
1151+
"ts TIMESTAMP_LTZ(3), " +
1152+
"WATERMARK FOR ts AS ts - INTERVAL '1' SECOND" +
1153+
") WITH ('connector' = 'datagen')"
1154+
);
1155+
1156+
// Use orderBy() with primary and secondary sort columns
1157+
env.from("Events")
1158+
.partitionBy($("name"))
1159+
.orderBy($("ts").asc(), $("score").desc())
1160+
.process(OrderedProcessor.class)
1161+
.execute()
1162+
.print();
1163+
```
1164+
{{< /tab >}}
1165+
{{< /tabs >}}
1166+
1167+
In this example:
1168+
- Events are first sorted by `ts` (ascending), ensuring event-time order
1169+
- Events with the same timestamp are then sorted by `score`
1170+
- Late events (with timestamp less than the watermark) are automatically dropped
1171+
- The `TableSemantics` API provides runtime access to the ordering configuration
1172+
- The output is an ever-growing list of sorted input events
1173+
1174+
{{< top >}}
1175+
10181176
Multiple Tables
10191177
---------------
10201178

0 commit comments

Comments
 (0)