Skip to content

Commit 09efcb4

Browse files
[FLINK-39442][python] Add descriptor() and to_changelog() to Python Table API
This closes #27934.
1 parent ee827df commit 09efcb4

3 files changed

Lines changed: 54 additions & 2 deletions

File tree

flink-python/pyflink/table/expressions.py

Lines changed: 21 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@
3434
'concat_ws', 'uuid', 'null_of', 'log', 'with_columns', 'without_columns', 'json',
3535
'json_string', 'json_object', 'json_object_agg', 'json_array', 'json_array_agg',
3636
'call', 'call_sql', 'source_watermark', 'to_timestamp_ltz', 'from_unixtime', 'to_date',
37-
'to_timestamp', 'convert_tz', 'unix_timestamp']
37+
'to_timestamp', 'convert_tz', 'unix_timestamp', 'descriptor']
3838

3939

4040
def _leaf_op(op_name: str) -> Expression:
@@ -578,6 +578,26 @@ def map_from_arrays(key, value) -> Expression:
578578
return _binary_op("mapFromArrays", key, value)
579579

580580

581+
@PublicEvolving()
582+
def descriptor(*column_names: str) -> Expression:
583+
"""
584+
Creates a literal describing an arbitrary, unvalidated list of column names.
585+
586+
Passing a list of columns can be useful for parameterizing a function. In particular,
587+
it enables declaring the ``on_time`` argument for process table functions.
588+
589+
Example:
590+
::
591+
592+
>>> descriptor("ts_column")
593+
>>> descriptor("col1", "col2")
594+
595+
:param column_names: One or more column names.
596+
:return: A descriptor expression.
597+
"""
598+
return _varargs_op("descriptor", *column_names)
599+
600+
581601
@PublicEvolving()
582602
def object_of(class_name: str, *args) -> Expression:
583603
"""

flink-python/pyflink/table/table.py

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1187,6 +1187,39 @@ def insert_into(
11871187
t_env=self._t_env,
11881188
)
11891189

1190+
def to_changelog(self, *arguments: Expression) -> 'Table':
1191+
"""
1192+
Converts this table into an append-only table with an explicit operation code
1193+
column using the built-in ``TO_CHANGELOG`` process table function.
1194+
1195+
Each input row - regardless of its original change operation - is emitted as an
1196+
INSERT-only row with a string ``op`` column indicating the original operation
1197+
(INSERT, UPDATE_BEFORE, UPDATE_AFTER, DELETE).
1198+
1199+
Example:
1200+
::
1201+
1202+
>>> from pyflink.table.expressions import descriptor, map_
1203+
>>> # Default: adds 'op' column with standard change operation names
1204+
>>> table.to_changelog()
1205+
>>> # Custom op column name and mapping
1206+
>>> table.to_changelog(
1207+
... descriptor("op_code").as_argument("op"),
1208+
... map_("INSERT", "I", "UPDATE_AFTER", "U").as_argument("op_mapping")
1209+
... )
1210+
>>> # Deletion flag pattern
1211+
>>> table.to_changelog(
1212+
... descriptor("deleted").as_argument("op"),
1213+
... map_("INSERT, UPDATE_AFTER", "false",
1214+
... "DELETE", "true").as_argument("op_mapping")
1215+
... )
1216+
1217+
:param arguments: Optional named arguments for ``op`` and ``op_mapping``.
1218+
:return: An append-only :class:`~pyflink.table.Table` with an ``op`` column prepended
1219+
to the input columns.
1220+
"""
1221+
return Table(self._j_table.toChangelog(to_expression_jarray(arguments)), self._t_env)
1222+
11901223

11911224
@PublicEvolving()
11921225
class GroupedTable(object):

flink-python/pyflink/table/tests/test_table_completeness.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,6 @@ def excluded_methods(cls):
4242
'asArgument',
4343
'process',
4444
'partitionBy',
45-
'toChangelog',
4645
}
4746

4847
@classmethod

0 commit comments

Comments
 (0)