Skip to content

Commit 14c110d

Browse files
committed
Move dataset location parsing to init
Why these changes are being introduced: When loading a dataset for reading, or preparing to write, the appropriate pyarrow filesystem must be set and the location normalized (e.g. removing s3:// prefix). How this addresses that need: * Move call of self.parse_location to init as always required * An invalid location will fail early before an attempt to read or write * Previously missing, add filesystem to write method to support S3 writes Side effects of this change: * None Relevant ticket(s): * https://mitlibraries.atlassian.net/browse/TIMX-415
1 parent 45c5d2f commit 14c110d

1 file changed

Lines changed: 6 additions & 5 deletions

File tree

timdex_dataset_api/dataset.py

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@ class TIMDEXDataset:
5050

5151
def __init__(self, location: str | list[str]):
5252
self.location = location
53+
self.filesystem, self.source = self.parse_location(self.location)
5354
self.dataset: ds.Dataset = None # type: ignore[assignment]
5455
self.schema = TIMDEX_DATASET_SCHEMA
5556
self.partition_columns = TIMDEX_DATASET_PARTITION_COLUMNS
@@ -132,13 +133,12 @@ def load_dataset(self) -> ds.Dataset:
132133
raised when reading or writing data.
133134
"""
134135
start_time = time.perf_counter()
135-
filesystem, source = self.parse_location(self.location)
136136
dataset = ds.dataset(
137-
source,
137+
self.source,
138138
schema=self.schema,
139139
format="parquet",
140140
partitioning="hive",
141-
filesystem=filesystem,
141+
filesystem=self.filesystem,
142142
)
143143
logger.info(
144144
f"Dataset successfully loaded: '{self.location}', "
@@ -189,7 +189,7 @@ def write(
189189
"""
190190
start_time = time.perf_counter()
191191

192-
if isinstance(self.location, list):
192+
if isinstance(self.source, list):
193193
raise TypeError(
194194
"Dataset location must be the root of a single dataset for writing"
195195
)
@@ -203,9 +203,10 @@ def write(
203203
written_files = []
204204
ds.write_dataset(
205205
record_batches_iter,
206-
base_dir=self.location,
206+
base_dir=self.source,
207207
basename_template="%s-{i}.parquet" % (str(uuid.uuid4())), # noqa: UP031
208208
existing_data_behavior="delete_matching",
209+
filesystem=self.filesystem,
209210
file_visitor=lambda written_file: written_files.append(written_file),
210211
format="parquet",
211212
max_rows_per_file=MAX_ROWS_PER_FILE,

0 commit comments

Comments
 (0)