Skip to content

Commit b32f841

Browse files
committed
Typo and method ordering
1 parent ea300b8 commit b32f841

2 files changed

Lines changed: 48 additions & 48 deletions

File tree

tests/test_dataset.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -244,7 +244,7 @@ def test_dataset_records_data_structure_is_idempotent(timdex_dataset_with_runs):
244244
assert start_file_count == end_file_count
245245

246246

247-
def test_dataset_duckdb_context_crated_on_init(timdex_dataset):
247+
def test_dataset_duckdb_context_created_on_init(timdex_dataset):
248248
assert isinstance(timdex_dataset.conn, DuckDBPyConnection)
249249

250250

timdex_dataset_api/dataset.py

Lines changed: 47 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -350,53 +350,6 @@ def log_write_statistics(
350350
f"total size: {total_size}"
351351
)
352352

353-
def _iter_meta_chunks(self, meta_df: pd.DataFrame) -> Iterator[pd.DataFrame]:
354-
"""Utility method to yield chunks of metadata query results."""
355-
for start in range(0, len(meta_df), self.config.duckdb_join_batch_size):
356-
yield meta_df.iloc[start : start + self.config.duckdb_join_batch_size]
357-
358-
def _build_parquet_file_list(self, meta_chunk_df: pd.DataFrame) -> str:
359-
"""Build SQL list of parquet filepaths."""
360-
filenames = meta_chunk_df["filename"].unique().tolist()
361-
if self.location_scheme == "s3":
362-
filenames = [f"s3://{f.removeprefix('s3://')}" for f in filenames]
363-
return "[" + ",".join((f"'{f}'") for f in filenames) + "]"
364-
365-
def _build_data_query_for_chunk(
366-
self,
367-
columns: list[str] | None,
368-
meta_chunk_df: pd.DataFrame,
369-
registered_metadata_chunk: str = "meta_chunk",
370-
) -> str:
371-
"""Build SQL query used for data retrieval, joining on metadata data."""
372-
parquet_list_sql = self._build_parquet_file_list(meta_chunk_df)
373-
rro_list_sql = ",".join(
374-
str(rro) for rro in meta_chunk_df["run_record_offset"].unique()
375-
)
376-
select_cols = ",".join(
377-
[f"ds.{col}" for col in (columns or TIMDEX_DATASET_SCHEMA.names)]
378-
)
379-
return f"""
380-
select
381-
{select_cols}
382-
from read_parquet(
383-
{parquet_list_sql},
384-
hive_partitioning=true,
385-
filename=true
386-
) as ds
387-
inner join {registered_metadata_chunk} mc using (
388-
timdex_record_id, run_id, run_record_offset
389-
)
390-
where ds.run_record_offset in ({rro_list_sql});
391-
"""
392-
393-
def _stream_data_query_batches(self, data_query: str) -> Iterator[pa.RecordBatch]:
394-
"""Yield pyarrow RecordBatches from a SQL query."""
395-
self.conn.execute("set enable_progress_bar = false;")
396-
cursor = self.conn.execute(data_query)
397-
yield from cursor.fetch_record_batch(rows_per_batch=self.config.read_batch_size)
398-
self.conn.execute("set enable_progress_bar = true;")
399-
400353
def read_batches_iter(
401354
self,
402355
table: str = "records",
@@ -457,6 +410,53 @@ def read_batches_iter(
457410
f"@ {batch_rps} records/second, total yielded: {total_yield_count}"
458411
)
459412

413+
def _iter_meta_chunks(self, meta_df: pd.DataFrame) -> Iterator[pd.DataFrame]:
414+
"""Utility method to yield chunks of metadata query results."""
415+
for start in range(0, len(meta_df), self.config.duckdb_join_batch_size):
416+
yield meta_df.iloc[start : start + self.config.duckdb_join_batch_size]
417+
418+
def _build_parquet_file_list(self, meta_chunk_df: pd.DataFrame) -> str:
419+
"""Build SQL list of parquet filepaths."""
420+
filenames = meta_chunk_df["filename"].unique().tolist()
421+
if self.location_scheme == "s3":
422+
filenames = [f"s3://{f.removeprefix('s3://')}" for f in filenames]
423+
return "[" + ",".join((f"'{f}'") for f in filenames) + "]"
424+
425+
def _build_data_query_for_chunk(
426+
self,
427+
columns: list[str] | None,
428+
meta_chunk_df: pd.DataFrame,
429+
registered_metadata_chunk: str = "meta_chunk",
430+
) -> str:
431+
"""Build SQL query used for data retrieval, joining on metadata data."""
432+
parquet_list_sql = self._build_parquet_file_list(meta_chunk_df)
433+
rro_list_sql = ",".join(
434+
str(rro) for rro in meta_chunk_df["run_record_offset"].unique()
435+
)
436+
select_cols = ",".join(
437+
[f"ds.{col}" for col in (columns or TIMDEX_DATASET_SCHEMA.names)]
438+
)
439+
return f"""
440+
select
441+
{select_cols}
442+
from read_parquet(
443+
{parquet_list_sql},
444+
hive_partitioning=true,
445+
filename=true
446+
) as ds
447+
inner join {registered_metadata_chunk} mc using (
448+
timdex_record_id, run_id, run_record_offset
449+
)
450+
where ds.run_record_offset in ({rro_list_sql});
451+
"""
452+
453+
def _stream_data_query_batches(self, data_query: str) -> Iterator[pa.RecordBatch]:
454+
"""Yield pyarrow RecordBatches from a SQL query."""
455+
self.conn.execute("set enable_progress_bar = false;")
456+
cursor = self.conn.execute(data_query)
457+
yield from cursor.fetch_record_batch(rows_per_batch=self.config.read_batch_size)
458+
self.conn.execute("set enable_progress_bar = true;")
459+
460460
def read_dataframes_iter(
461461
self,
462462
table: str = "records",

0 commit comments

Comments
 (0)