4141 pa .field ("year" , pa .string ()),
4242 pa .field ("month" , pa .string ()),
4343 pa .field ("day" , pa .string ()),
44+ pa .field ("run_timestamp" , pa .timestamp ("us" , tz = "UTC" )),
4445 )
4546)
4647
@@ -62,6 +63,7 @@ class DatasetFilters(TypedDict, total=False):
6263 year : str | None
6364 month : str | None
6465 day : str | None
66+ run_timestamp : str | datetime | None
6567
6668
6769@dataclass
@@ -112,15 +114,19 @@ def __init__(
112114 location (str | list[str]): Local filesystem path or an S3 URI to
113115 a parquet dataset. For partitioned datasets, set to the base directory.
114116 """
115- self .location = location
116117 self .config = config or TIMDEXDatasetConfig ()
118+ self .location = location
117119
120+ # pyarrow dataset
118121 self .filesystem , self .paths = self .parse_location (self .location )
119122 self .dataset : ds .Dataset = None # type: ignore[assignment]
120123 self .schema = TIMDEX_DATASET_SCHEMA
121124 self .partition_columns = TIMDEX_DATASET_PARTITION_COLUMNS
125+
126+ # writing
122127 self ._written_files : list [ds .WrittenFile ] = None # type: ignore[assignment]
123128
129+ # reading
124130 self ._current_records : bool = False
125131 self ._current_records_dataset : ds .Dataset = None # type: ignore[assignment]
126132
@@ -405,26 +411,31 @@ def write(
405411 return self ._written_files # type: ignore[return-value]
406412
407413 def create_record_batches (
408- self ,
409- records_iter : Iterator ["DatasetRecord" ],
414+ self , records_iter : Iterator ["DatasetRecord" ]
410415 ) -> Iterator [pa .RecordBatch ]:
411416 """Yield pyarrow.RecordBatches for writing.
412417
413418 This method expects an iterator of DatasetRecord instances.
414419
415- Each DatasetRecord is validated and serialized to a dictionary before added to a
416- pyarrow.RecordBatch for writing.
420+ Each DatasetRecord is serialized to a dictionary, any column data shared by all
421+ rows is added to the record, and then added to a pyarrow.RecordBatch for writing.
417422
418423 Args:
419424 - records_iter: Iterator of DatasetRecord instances
420425 """
426+ run_timestamp = datetime .now (UTC )
421427 for i , record_batch in enumerate (
422428 itertools .batched (records_iter , self .config .write_batch_size )
423429 ):
424- batch = pa .RecordBatch .from_pylist (
425- [record .to_dict () for record in record_batch ]
426- )
427- logger .debug (f"Yielding batch { i + 1 } for dataset writing." )
430+ record_dicts = [
431+ {
432+ ** record .to_dict (),
433+ "run_timestamp" : run_timestamp ,
434+ }
435+ for record in record_batch
436+ ]
437+ batch = pa .RecordBatch .from_pylist (record_dicts )
438+ logger .debug (f"Yielding batch { i + 1 } for dataset writing." )
428439 yield batch
429440
430441 def log_write_statistics (self , start_time : float ) -> None :
0 commit comments