@@ -480,13 +480,13 @@ def read_batches_iter(
480480 )
481481
482482 if self ._current_records :
483- yield from self ._yield_current_record_deduped_batches (batches )
483+ yield from self ._yield_current_record_batches (batches )
484484 else :
485485 for batch in batches :
486486 if len (batch ) > 0 :
487487 yield batch
488488
489- def _yield_current_record_deduped_batches (
489+ def _yield_current_record_batches (
490490 self ,
491491 batches : Iterator [pa .RecordBatch ],
492492 ) -> Iterator [pa .RecordBatch ]:
@@ -503,55 +503,51 @@ def _yield_current_record_deduped_batches(
503503 contains the actual records and columns we are interested in, and may contain
504504 filtering
505505
506- 2. "id_batches" - a lightweight RecordBatch iterator that only contains the
507- 'timdex_record_id' column from a pre-filtering dataset saved during .load()
506+ 2. "unfiltered_batches" - a lightweight RecordBatch iterator that only
507+ contains the 'timdex_record_id' column from a pre-filtering dataset saved
508+ during .load()
508509
509510 These two iterators are guaranteed to have the same number of total batches based
510511 on how pyarrow.Dataset.to_batches() reads from parquet files. Even if dataset
511512 filtering is applied, this does not affect the batch count; you may just end up
512513 with smaller or empty batches.
513514
514- As such, as we move through the batches we use batches from the "ids_iterator" to
515- keep a list of seen timdex_record_id's . Even if a timdex_record_is not in the
516- "records_iterator", likely due to filtering, we will mark the truly most current
517- version as "seen" and not yield it from any future batches.
515+ As we move through the record batches we use unfiltered batches to keep a list of
516+ seen timdex_record_ids . Even if a timdex_record_is not in the record batch --
517+ likely due to filtering -- we will mark that timdex_record_id as "seen" and not
518+ yield it from any future batches.
518519
519520 Args:
520521 - batches: batches of records to actually yield from
521522 - current_record_id_batches: batches of timdex_record_id's that inform when
522523 to yield or skip a record for a batch
523524 """
524- # create a RecordBatch iterator from self._current_records_dataset, which was
525- # saved during .load() before any filtering was applied
526- id_batches = self ._current_records_dataset .to_batches (
525+ unfiltered_batches = self ._current_records_dataset .to_batches (
527526 columns = ["timdex_record_id" ],
528527 batch_size = self .config .read_batch_size ,
529528 batch_readahead = self .config .batch_read_ahead ,
530529 fragment_readahead = self .config .fragment_read_ahead ,
531530 )
532531
533532 seen_records = set ()
534- for id_batch , batch in zip (id_batches , batches , strict = True ):
535- dedupe_ids = id_batch .column ("timdex_record_id" ).to_pylist ()
536- batch_ids = batch .column ("timdex_record_id" ).to_pylist ()
537-
533+ for unfiltered_batch , batch in zip (unfiltered_batches , batches , strict = True ):
538534 # init list of indices from the batch for records we have never yielded
539535 unseen_batch_indices = []
540536
541537 # check each record id and track unseen ones
542- for i , record_id in enumerate (batch_ids ):
538+ for i , record_id in enumerate (batch . column ( "timdex_record_id" ). to_pylist () ):
543539 if record_id not in seen_records :
544540 unseen_batch_indices .append (i )
545541
546542 # even if not a record to yield, update our list of seen records from all
547- # records in the id_batch
548- seen_records .update (dedupe_ids )
543+ # records in the unfiltered batch
544+ seen_records .update (unfiltered_batch . column ( "timdex_record_id" ). to_pylist () )
549545
550- # if no records unseen this batch, skip yielding
546+ # if no unseen records from this batch, skip yielding entirely
551547 if not unseen_batch_indices :
552548 continue
553549
554- # use the unseen indices to create a new, subset of the batch and yield it
550+ # create a new RecordBatch using the unseen indices of the batch
555551 _batch = batch .take (pa .array (unseen_batch_indices )) # type: ignore[arg-type]
556552 if len (_batch ) > 0 :
557553 yield _batch
0 commit comments