Skip to content

Commit e1d0c81

Browse files
author
ShreyeshArangath
committed
first round of reviews
1 parent 0a57da6 commit e1d0c81

6 files changed

Lines changed: 296 additions & 64 deletions

File tree

Lines changed: 164 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,164 @@
1+
.. Licensed to the Apache Software Foundation (ASF) under one
2+
.. or more contributor license agreements. See the NOTICE file
3+
.. distributed with this work for additional information
4+
.. regarding copyright ownership. The ASF licenses this file
5+
.. to you under the Apache License, Version 2.0 (the
6+
.. "License"); you may not use this file except in compliance
7+
.. with the License. You may obtain a copy of the License at
8+
9+
.. http://www.apache.org/licenses/LICENSE-2.0
10+
11+
.. Unless required by applicable law or agreed to in writing,
12+
.. software distributed under the License is distributed on an
13+
.. "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
.. KIND, either express or implied. See the License for the
15+
.. specific language governing permissions and limitations
16+
.. under the License.
17+
18+
.. _execution_metrics:
19+
20+
Execution Metrics
21+
=================
22+
23+
Overview
24+
--------
25+
26+
When DataFusion executes a query it compiles the logical plan into a tree of
27+
*physical plan operators* (e.g. ``FilterExec``, ``ProjectionExec``,
28+
``HashAggregateExec``). Each operator can record runtime statistics while it
29+
runs. These statistics are called **execution metrics**.
30+
31+
Typical metrics include:
32+
33+
- **output_rows** – number of rows produced by the operator
34+
- **elapsed_compute** – total CPU time (nanoseconds) spent inside the operator
35+
- **spill_count** – number of times the operator spilled data to disk
36+
- **spilled_bytes** – total bytes written to disk during spills
37+
- **spilled_rows** – total rows written to disk during spills
38+
39+
Metrics are collected *per-partition*: DataFusion may execute each operator
40+
in parallel across several partitions. The convenience properties on
41+
:py:class:`~datafusion.MetricsSet` (e.g. ``output_rows``, ``elapsed_compute``)
42+
automatically sum the named metric across **all** partitions, giving a single
43+
aggregate value for the operator as a whole. You can also access the raw
44+
per-partition :py:class:`~datafusion.Metric` objects via
45+
:py:meth:`~datafusion.MetricsSet.metrics`.
46+
47+
When Are Metrics Available?
48+
---------------------------
49+
50+
Metrics are populated only **after** the DataFrame has been executed.
51+
Execution is triggered by any of the terminal operations:
52+
53+
- :py:meth:`~datafusion.DataFrame.collect`
54+
- :py:meth:`~datafusion.DataFrame.collect_partitioned`
55+
- :py:meth:`~datafusion.DataFrame.execute_stream`
56+
- :py:meth:`~datafusion.DataFrame.execute_stream_partitioned`
57+
58+
Calling :py:meth:`~datafusion.ExecutionPlan.collect_metrics` before execution
59+
will return entries with empty (or ``None``) metric sets because the operators
60+
have not run yet.
61+
62+
Reading the Physical Plan Tree
63+
--------------------------------
64+
65+
:py:meth:`~datafusion.DataFrame.execution_plan` returns the root
66+
:py:class:`~datafusion.ExecutionPlan` node of the physical plan tree. The tree
67+
mirrors the operator pipeline: the root is typically a projection or
68+
coalescing node; its children are filters, aggregates, scans, etc.
69+
70+
The ``operator_name`` string returned by
71+
:py:meth:`~datafusion.ExecutionPlan.collect_metrics` is the *display* name of
72+
the node, for example ``"FilterExec: column1@0 > 1"``. This is the same string
73+
you would see when calling ``plan.display()``.
74+
75+
Available Metrics
76+
-----------------
77+
78+
The following metrics are directly accessible as properties on
79+
:py:class:`~datafusion.MetricsSet`:
80+
81+
.. list-table::
82+
:header-rows: 1
83+
:widths: 25 75
84+
85+
* - Property
86+
- Description
87+
* - ``output_rows``
88+
- Number of rows emitted by the operator (summed across partitions).
89+
* - ``elapsed_compute``
90+
- CPU time in nanoseconds spent inside the operator's execute loop
91+
(summed across partitions).
92+
* - ``spill_count``
93+
- Number of spill-to-disk events due to memory pressure (summed across
94+
partitions).
95+
* - ``spilled_bytes``
96+
- Total bytes written to disk during spills (summed across partitions).
97+
* - ``spilled_rows``
98+
- Total rows written to disk during spills (summed across partitions).
99+
100+
Any metric not listed above can be accessed via
101+
:py:meth:`~datafusion.MetricsSet.sum_by_name`, or by iterating over the raw
102+
:py:class:`~datafusion.Metric` objects returned by
103+
:py:meth:`~datafusion.MetricsSet.metrics`.
104+
105+
Labels
106+
------
107+
108+
A :py:class:`~datafusion.Metric` may carry *labels*: key/value pairs that
109+
provide additional context. For example, some operators tag their output
110+
metrics with an ``output_type`` label to distinguish between intermediate and
111+
final output:
112+
113+
.. code-block:: python
114+
115+
for metric in metrics_set.metrics():
116+
print(metric.name, metric.labels())
117+
# output_rows {'output_type': 'final'}
118+
119+
Labels are operator-specific; most metrics have no labels.
120+
121+
End-to-End Example
122+
------------------
123+
124+
.. code-block:: python
125+
126+
from datafusion import SessionContext
127+
128+
ctx = SessionContext()
129+
ctx.sql("CREATE TABLE sales AS VALUES (1, 100), (2, 200), (3, 50)")
130+
131+
df = ctx.sql("SELECT * FROM sales WHERE column1 > 1")
132+
133+
# Execute the query — this populates the metrics
134+
results = df.collect()
135+
136+
# Retrieve the physical plan with metrics
137+
plan = df.execution_plan()
138+
139+
# Walk every operator and print its metrics
140+
for operator_name, ms in plan.collect_metrics():
141+
if ms.output_rows is not None:
142+
print(f"{operator_name}")
143+
print(f" output_rows = {ms.output_rows}")
144+
print(f" elapsed_compute = {ms.elapsed_compute} ns")
145+
146+
# Access raw per-partition metrics
147+
for operator_name, ms in plan.collect_metrics():
148+
for metric in ms.metrics():
149+
print(
150+
f" partition={metric.partition} "
151+
f"{metric.name}={metric.value} "
152+
f"labels={metric.labels()}"
153+
)
154+
155+
API Reference
156+
-------------
157+
158+
- :py:class:`datafusion.ExecutionPlan` — physical plan node
159+
- :py:meth:`datafusion.ExecutionPlan.collect_metrics` — walk the tree and
160+
return ``(operator_name, MetricsSet)`` pairs
161+
- :py:meth:`datafusion.ExecutionPlan.metrics` — return the
162+
:py:class:`~datafusion.MetricsSet` for a single node
163+
- :py:class:`datafusion.MetricsSet` — aggregated metrics for one operator
164+
- :py:class:`datafusion.Metric` — a single per-partition metric value

docs/source/user-guide/dataframe/index.rst

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -365,7 +365,16 @@ DataFusion provides many built-in functions for data manipulation:
365365
For a complete list of available functions, see the :py:mod:`datafusion.functions` module documentation.
366366

367367

368+
Execution Metrics
369+
-----------------
370+
371+
After executing a DataFrame (via ``collect()``, ``execute_stream()``, etc.),
372+
DataFusion populates per-operator runtime statistics such as row counts and
373+
compute time. See :doc:`execution-metrics` for a full explanation and
374+
worked example.
375+
368376
.. toctree::
369377
:maxdepth: 1
370378

371379
rendering
380+
execution-metrics

python/datafusion/plan.py

Lines changed: 66 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -162,9 +162,29 @@ def metrics(self) -> MetricsSet | None:
162162
return MetricsSet(raw)
163163

164164
def collect_metrics(self) -> list[tuple[str, MetricsSet]]:
165-
"""Walk the plan tree and collect metrics from all operators.
166-
167-
Returns a list of (operator_name, MetricsSet) tuples.
165+
"""Return runtime statistics for each step of the query execution.
166+
167+
DataFusion executes a query as a pipeline of operators — for example a
168+
data source scan, followed by a filter, followed by a projection. After
169+
the DataFrame has been executed (via
170+
:py:meth:`~datafusion.DataFrame.collect`,
171+
:py:meth:`~datafusion.DataFrame.execute_stream`, etc.), each operator
172+
records statistics such as how many rows it produced and how much CPU
173+
time it consumed.
174+
175+
Each entry in the returned list corresponds to one operator that
176+
recorded metrics. The first element of the tuple is the operator's
177+
description string — the same text shown by
178+
:py:meth:`display_indent` — which identifies both the operator type
179+
and its key parameters, for example ``"FilterExec: column1@0 > 1"``
180+
or ``"DataSourceExec: partitions=1"``.
181+
182+
Returns:
183+
A list of ``(description, MetricsSet)`` tuples ordered from the
184+
outermost operator (top of the execution tree) down to the
185+
data-source leaves. Only operators that recorded at least one
186+
metric are included. Returns an empty list if called before the
187+
DataFrame has been executed.
168188
"""
169189
result: list[tuple[str, MetricsSet]] = []
170190

@@ -182,8 +202,11 @@ def _walk(node: ExecutionPlan) -> None:
182202
class MetricsSet:
183203
"""A set of metrics for a single execution plan operator.
184204
185-
Provides both individual metric access and convenience aggregations
186-
across partitions.
205+
A physical plan operator runs independently across one or more partitions.
206+
:py:meth:`metrics` returns the raw per-partition :py:class:`Metric` objects.
207+
The convenience properties (:py:attr:`output_rows`, :py:attr:`elapsed_compute`,
208+
etc.) automatically sum the named metric across *all* partitions, giving a
209+
single aggregate value for the operator as a whole.
187210
"""
188211

189212
def __init__(self, raw: df_internal.MetricsSet) -> None:
@@ -201,12 +224,20 @@ def output_rows(self) -> int | None:
201224

202225
@property
203226
def elapsed_compute(self) -> int | None:
204-
"""Sum of elapsed_compute across all partitions, in nanoseconds."""
227+
"""Total CPU time (in nanoseconds) spent inside this operator's execute loop.
228+
229+
Summed across all partitions. Returns ``None`` if no ``elapsed_compute``
230+
metric was recorded.
231+
"""
205232
return self._raw.elapsed_compute()
206233

207234
@property
208235
def spill_count(self) -> int | None:
209-
"""Sum of spill_count across all partitions."""
236+
"""Number of times this operator spilled data to disk due to memory pressure.
237+
238+
This is a count of spill events, not a byte count. Summed across all
239+
partitions. Returns ``None`` if no ``spill_count`` metric was recorded.
240+
"""
210241
return self._raw.spill_count()
211242

212243
@property
@@ -220,7 +251,14 @@ def spilled_rows(self) -> int | None:
220251
return self._raw.spilled_rows()
221252

222253
def sum_by_name(self, name: str) -> int | None:
223-
"""Return the sum of metrics matching the given name."""
254+
"""Sum the named metric across all partitions.
255+
256+
Useful for accessing any metric not exposed as a first-class property.
257+
Returns ``None`` if no metric with the given name was recorded.
258+
259+
Args:
260+
name: The metric name, e.g. ``"output_rows"`` or ``"elapsed_compute"``.
261+
"""
224262
return self._raw.sum_by_name(name)
225263

226264
def __repr__(self) -> str:
@@ -242,16 +280,33 @@ def name(self) -> str:
242280

243281
@property
244282
def value(self) -> int | None:
245-
"""The numeric value of this metric, or None for non-numeric types."""
283+
"""The numeric value of this metric, or ``None`` when not representable.
284+
285+
``None`` is returned for metric types whose value has not yet been set
286+
(e.g. ``StartTimestamp`` / ``EndTimestamp`` before the operator runs)
287+
and for any metric variant whose value cannot be expressed as an integer.
288+
Timestamp metrics, when available, are returned as nanoseconds since the
289+
Unix epoch.
290+
"""
246291
return self._raw.value
247292

248293
@property
249294
def partition(self) -> int | None:
250-
"""The partition this metric applies to, or None if global."""
295+
"""The 0-based partition index this metric applies to.
296+
297+
Returns ``None`` for metrics that are not partition-specific (i.e. they
298+
apply globally across all partitions of the operator).
299+
"""
251300
return self._raw.partition
252301

253302
def labels(self) -> dict[str, str]:
254-
"""Return the labels associated with this metric."""
303+
"""Return the labels associated with this metric.
304+
305+
Labels provide additional context for a metric. For example::
306+
307+
>>> metric.labels()
308+
{'output_type': 'final'}
309+
"""
255310
return self._raw.labels()
256311

257312
def __repr__(self) -> str:

0 commit comments

Comments
 (0)