Skip to content

Commit d2b6c9f

Browse files
author
ShreyeshArangath
committed
address some concerns
1 parent 7200857 commit d2b6c9f

4 files changed

Lines changed: 889 additions & 749 deletions

File tree

python/datafusion/plan.py

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -157,7 +157,13 @@ def to_proto(self) -> bytes:
157157
return self._raw_plan.to_proto()
158158

159159
def metrics(self) -> MetricsSet | None:
160-
"""Return metrics for this plan node after execution, or None if unavailable."""
160+
"""Return metrics for this plan node, or None if this node has no MetricsSet.
161+
162+
Some operators (e.g. DataSourceExec) eagerly initialize a MetricsSet
163+
when the plan is created, so this may return a set even before
164+
execution. Metric *values* (such as ``output_rows``) are only
165+
meaningful after the DataFrame has been executed.
166+
"""
161167
raw = self._raw_plan.metrics()
162168
if raw is None:
163169
return None

python/tests/test_plans.py

Lines changed: 12 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -95,14 +95,21 @@ def test_metric_properties() -> None:
9595
assert found_any_metric, "Expected at least one metric after execution"
9696

9797

98-
def test_no_metrics_before_execution() -> None:
98+
def test_no_meaningful_metrics_before_execution() -> None:
9999
ctx = SessionContext()
100100
ctx.sql("CREATE TABLE t AS VALUES (1), (2), (3)")
101101
df = ctx.sql("SELECT * FROM t")
102102
plan = df.execution_plan()
103-
ms = plan.metrics()
104-
# Before execution, the root plan node has no MetricsSet
105-
assert ms is None
103+
104+
# Some plan nodes (e.g. DataSourceExec) eagerly initialize a MetricsSet,
105+
# so metrics() may return a set even before execution. However, no rows
106+
# should have been processed yet.
107+
output_rows_before = [
108+
ms.output_rows
109+
for _, ms in plan.collect_metrics()
110+
if ms.output_rows is not None and ms.output_rows > 0
111+
]
112+
assert len(output_rows_before) == 0, "Expected no output rows before execution"
106113

107114

108115
def test_collect_partitioned_metrics() -> None:
@@ -174,7 +181,7 @@ def test_value_as_datetime() -> None:
174181
assert metric.value_as_datetime is None
175182

176183

177-
def test_collect_twice_reuses_plan() -> None:
184+
def test_collect_twice_has_metrics() -> None:
178185
ctx = SessionContext()
179186
ctx.sql("CREATE TABLE t AS VALUES (1, 'a'), (2, 'b'), (3, 'c')")
180187
df = ctx.sql("SELECT * FROM t WHERE column1 > 1")

src/dataframe.rs

Lines changed: 4 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -387,21 +387,11 @@ impl PyDataFrame {
387387
&self,
388388
py: Python,
389389
) -> PyDataFusionResult<(Arc<dyn DFExecutionPlan>, Arc<TaskContext>)> {
390-
let plan = {
391-
let cached = self.last_plan.lock();
392-
cached.as_ref().map(Arc::clone)
393-
};
394-
let plan = match plan {
395-
Some(p) => p,
396-
None => {
397-
let df = self.df.as_ref().clone();
398-
let new_plan = wait_for_future(py, df.create_physical_plan())??;
399-
*self.last_plan.lock() = Some(Arc::clone(&new_plan));
400-
new_plan
401-
}
402-
};
390+
let df = self.df.as_ref().clone();
391+
let new_plan = wait_for_future(py, df.create_physical_plan())??;
392+
*self.last_plan.lock() = Some(Arc::clone(&new_plan));
403393
let task_ctx = Arc::new(self.df.as_ref().task_ctx());
404-
Ok((plan, task_ctx))
394+
Ok((new_plan, task_ctx))
405395
}
406396

407397
async fn collect_column_inner(&self, column: &str) -> Result<ArrayRef, DataFusionError> {

0 commit comments

Comments
 (0)