Skip to content

Commit ce6aee2

Browse files
committed
[FLINK-39261][table] add from_changelog to python api
1 parent 7d1a7cd commit ce6aee2

2 files changed

Lines changed: 36 additions & 1 deletion

File tree

flink-python/pyflink/table/table.py

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1188,6 +1188,42 @@ def insert_into(
11881188
)
11891189

11901190

1191+
def from_changelog(self, *arguments: Expression) -> 'Table':
1192+
"""
1193+
Converts this append-only table with an explicit operation code column into a
1194+
(potentially updating) dynamic table using the built-in ``FROM_CHANGELOG``
1195+
process table function.
1196+
1197+
Each input row is expected to have a string operation code column (default: ``op``)
1198+
that indicates the change operation. The operation column is interpreted by the
1199+
engine and removed from the output.
1200+
1201+
Example:
1202+
::
1203+
1204+
>>> from pyflink.table.expressions import descriptor, map_
1205+
>>> # Default: reads 'op' column with standard change operation names
1206+
>>> table.from_changelog()
1207+
>>> # With custom op column name
1208+
>>> table.from_changelog(
1209+
... descriptor("operation").as_argument("op")
1210+
... )
1211+
>>> # With custom op_mapping
1212+
>>> table.from_changelog(
1213+
... descriptor("op").as_argument("op"),
1214+
... map_("c, r", "INSERT",
1215+
... "ub", "UPDATE_BEFORE",
1216+
... "ua", "UPDATE_AFTER",
1217+
... "d", "DELETE").as_argument("op_mapping")
1218+
... )
1219+
1220+
:param arguments: Optional named arguments for ``op`` and ``op_mapping``.
1221+
:return: A (potentially updating) :class:`~pyflink.table.Table` with the op column
1222+
removed and proper change operation semantics.
1223+
"""
1224+
return Table(self._j_table.fromChangelog(to_expression_jarray(arguments)), self._t_env)
1225+
1226+
11911227
@PublicEvolving()
11921228
class GroupedTable(object):
11931229
"""

flink-python/pyflink/table/tests/test_table_completeness.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,6 @@ def excluded_methods(cls):
4444
'process',
4545
'partitionBy',
4646
'toChangelog',
47-
'fromChangelog',
4847
}
4948

5049
@classmethod

0 commit comments

Comments
 (0)