Skip to content

Commit 0a80a24

Browse files
authored
Merge pull request #159 from MITLibraries/TIMX-526-projected-views
TIMX 526 - projected views
2 parents 184cddc + 43e5350 commit 0a80a24

8 files changed

Lines changed: 806 additions & 380 deletions

File tree

tests/conftest.py

Lines changed: 192 additions & 143 deletions
Large diffs are not rendered by default.

tests/test_dataset.py

Lines changed: 187 additions & 133 deletions
Large diffs are not rendered by default.

tests/test_metadata.py

Lines changed: 205 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -7,8 +7,8 @@
77
from timdex_dataset_api import TIMDEXDatasetMetadata
88

99

10-
def test_tdm_init_no_metadata_file_warning_success(caplog, dataset_with_runs_location):
11-
TIMDEXDatasetMetadata(dataset_with_runs_location)
10+
def test_tdm_init_no_metadata_file_warning_success(caplog, timdex_dataset_with_runs):
11+
TIMDEXDatasetMetadata(timdex_dataset_with_runs.location)
1212

1313
assert "Static metadata database not found" in caplog.text
1414

@@ -20,41 +20,229 @@ def test_tdm_local_dataset_structure_properties(tmp_path):
2020
assert tdm_local.location_scheme == "file"
2121

2222

23-
def test_tdm_s3_dataset_structure_properties(mocked_timdex_bucket):
23+
def test_tdm_s3_dataset_structure_properties(s3_bucket_mocked):
2424
s3_root = "s3://timdex/dataset"
2525
tdm_s3 = TIMDEXDatasetMetadata(s3_root)
2626
assert tdm_s3.location == s3_root
2727
assert tdm_s3.location_scheme == "s3"
2828

2929

30-
def test_tdm_create_metadata_database_file_success(caplog, timdex_dataset_metadata_empty):
30+
def test_tdm_create_metadata_database_file_success(caplog, timdex_metadata_empty):
3131
caplog.set_level("DEBUG")
32-
timdex_dataset_metadata_empty.recreate_static_database_file()
32+
timdex_metadata_empty.recreate_static_database_file()
3333

3434

35-
def test_tdm_init_metadata_file_found_success(timdex_dataset_metadata):
36-
assert isinstance(timdex_dataset_metadata.conn, DuckDBPyConnection)
35+
def test_tdm_init_metadata_file_found_success(timdex_metadata):
36+
assert isinstance(timdex_metadata.conn, DuckDBPyConnection)
3737

3838

39-
def test_tdm_connection_has_static_database_attached(timdex_dataset_metadata):
39+
def test_tdm_connection_has_static_database_attached(timdex_metadata):
4040
assert set(
41-
timdex_dataset_metadata.conn.query("""show databases;""").to_df().database_name
41+
timdex_metadata.conn.query("""show databases;""").to_df().database_name
4242
) == {"memory", "static_db"}
4343

4444

45-
def test_tdm_connection_static_database_records_table_exists(timdex_dataset_metadata):
46-
records_df = timdex_dataset_metadata.conn.query(
45+
def test_tdm_connection_static_database_records_table_exists(timdex_metadata):
46+
records_df = timdex_metadata.conn.query(
4747
"""select * from static_db.records;"""
4848
).to_df()
4949
assert len(records_df) > 0
5050

5151

52-
def test_dataset_metadata_structure_is_idempotent(timdex_dataset_metadata):
53-
assert os.path.exists(timdex_dataset_metadata.metadata_root)
54-
start_file_count = glob.glob(f"{timdex_dataset_metadata.metadata_root}/**/*")
52+
def test_dataset_metadata_structure_is_idempotent(timdex_metadata):
53+
assert os.path.exists(timdex_metadata.metadata_root)
54+
start_file_count = glob.glob(f"{timdex_metadata.metadata_root}/**/*")
5555

56-
timdex_dataset_metadata.create_metadata_structure()
56+
timdex_metadata.create_metadata_structure()
5757

58-
assert os.path.exists(timdex_dataset_metadata.metadata_root)
59-
end_file_count = glob.glob(f"{timdex_dataset_metadata.metadata_root}/**/*")
58+
assert os.path.exists(timdex_metadata.metadata_root)
59+
end_file_count = glob.glob(f"{timdex_metadata.metadata_root}/**/*")
6060
assert start_file_count == end_file_count
61+
62+
63+
def test_tdm_views_created_on_init(timdex_metadata):
64+
views = timdex_metadata.conn.query(
65+
"""select table_name from information_schema.tables where table_type = 'VIEW';"""
66+
).to_df()
67+
68+
expected_views = {"append_deltas", "records", "current_records"}
69+
actual_views = set(views.table_name)
70+
assert expected_views <= actual_views
71+
72+
73+
def test_tdm_records_view_structure(timdex_metadata):
74+
records_df = timdex_metadata.conn.query("""select * from records limit 1;""").to_df()
75+
expected_columns = {
76+
"timdex_record_id",
77+
"source",
78+
"run_date",
79+
"run_type",
80+
"action",
81+
"run_id",
82+
"run_record_offset",
83+
"run_timestamp",
84+
"filename",
85+
}
86+
assert set(records_df.columns) == expected_columns
87+
88+
89+
def test_tdm_current_records_view_structure(timdex_metadata):
90+
current_records_df = timdex_metadata.conn.query(
91+
"""select * from current_records limit 1;"""
92+
).to_df()
93+
expected_columns = {
94+
"timdex_record_id",
95+
"source",
96+
"run_date",
97+
"run_type",
98+
"action",
99+
"run_id",
100+
"run_record_offset",
101+
"run_timestamp",
102+
"filename",
103+
}
104+
assert set(current_records_df.columns) == expected_columns
105+
106+
107+
def test_tdm_append_deltas_view_empty_structure(timdex_metadata):
108+
append_deltas_df = timdex_metadata.conn.query(
109+
"""select * from append_deltas;"""
110+
).to_df()
111+
expected_columns = {
112+
"timdex_record_id",
113+
"source",
114+
"run_date",
115+
"run_type",
116+
"action",
117+
"run_id",
118+
"run_record_offset",
119+
"run_timestamp",
120+
"filename",
121+
}
122+
assert set(append_deltas_df.columns) == expected_columns
123+
assert len(append_deltas_df) == 0
124+
125+
126+
def test_tdm_records_count_property(timdex_metadata):
127+
assert timdex_metadata.records_count > 0
128+
129+
manual_count = timdex_metadata.conn.query(
130+
"""select count(*) from records;"""
131+
).fetchone()[0]
132+
assert timdex_metadata.records_count == manual_count
133+
134+
135+
def test_tdm_current_records_count_property(timdex_metadata):
136+
assert timdex_metadata.current_records_count > 0
137+
138+
manual_count = timdex_metadata.conn.query(
139+
"""select count(*) from current_records;"""
140+
).fetchone()[0]
141+
assert timdex_metadata.current_records_count == manual_count
142+
143+
144+
def test_tdm_append_deltas_count_property_empty(timdex_metadata):
145+
assert timdex_metadata.append_deltas_count == 0
146+
147+
148+
def test_tdm_records_equals_static_without_deltas(timdex_metadata):
149+
static_count = timdex_metadata.conn.query(
150+
"""select count(*) from static_db.records;"""
151+
).fetchone()[0]
152+
records_count = timdex_metadata.conn.query(
153+
"""select count(*) from records;"""
154+
).fetchone()[0]
155+
assert static_count == records_count
156+
157+
158+
def test_tdm_current_records_filtering_logic(timdex_metadata):
159+
current_count = timdex_metadata.current_records_count
160+
total_count = timdex_metadata.records_count
161+
162+
assert current_count <= total_count
163+
assert current_count > 0
164+
165+
166+
def test_tdm_views_with_append_deltas(timdex_metadata_with_deltas):
167+
views = timdex_metadata_with_deltas.conn.query(
168+
"""select table_name from information_schema.tables where table_type = 'VIEW';"""
169+
).to_df()
170+
171+
expected_views = {"append_deltas", "records", "current_records"}
172+
actual_views = set(views.table_name)
173+
assert expected_views.issubset(actual_views)
174+
175+
176+
def test_tdm_append_deltas_view_has_data(timdex_metadata_with_deltas):
177+
append_deltas_count = timdex_metadata_with_deltas.append_deltas_count
178+
assert append_deltas_count > 0
179+
180+
181+
def test_tdm_records_includes_deltas(timdex_metadata_with_deltas):
182+
static_count = timdex_metadata_with_deltas.conn.query(
183+
"""select count(*) from static_db.records;"""
184+
).fetchone()[0]
185+
deltas_count = timdex_metadata_with_deltas.append_deltas_count
186+
records_count = timdex_metadata_with_deltas.records_count
187+
188+
assert records_count == static_count + deltas_count
189+
assert records_count > static_count
190+
191+
192+
def test_tdm_current_records_with_deltas_logic(timdex_metadata_with_deltas):
193+
current_count = timdex_metadata_with_deltas.current_records_count
194+
total_count = timdex_metadata_with_deltas.records_count
195+
196+
assert current_count <= total_count
197+
assert current_count > 0
198+
199+
# verify current records view returns unique timdex_record_id values
200+
current_records_df = timdex_metadata_with_deltas.conn.query(
201+
"""select timdex_record_id from current_records;"""
202+
).to_df()
203+
204+
unique_count = len(current_records_df.timdex_record_id.unique())
205+
assert unique_count == current_count
206+
207+
208+
def test_tdm_current_records_most_recent_version(timdex_metadata_with_deltas):
209+
# check that for records with multiple versions, only the most recent is returned
210+
multi_version_records = timdex_metadata_with_deltas.conn.query(
211+
"""
212+
select timdex_record_id, count(*) as version_count
213+
from records
214+
group by timdex_record_id
215+
having count(*) > 1
216+
limit 1;
217+
"""
218+
).to_df()
219+
220+
if len(multi_version_records) > 0:
221+
record_id = multi_version_records.iloc[0]["timdex_record_id"]
222+
223+
# get most recent timestamp for this record
224+
most_recent = timdex_metadata_with_deltas.conn.query(
225+
f"""
226+
select run_timestamp, run_id
227+
from records
228+
where timdex_record_id = '{record_id}'
229+
order by run_timestamp desc
230+
limit 1;
231+
"""
232+
).to_df()
233+
234+
# verify current_records contains this version
235+
current_version = timdex_metadata_with_deltas.conn.query(
236+
f"""
237+
select run_timestamp, run_id
238+
from current_records
239+
where timdex_record_id = '{record_id}';
240+
"""
241+
).to_df()
242+
243+
assert len(current_version) == 1
244+
assert (
245+
current_version.iloc[0]["run_timestamp"]
246+
== most_recent.iloc[0]["run_timestamp"]
247+
)
248+
assert current_version.iloc[0]["run_id"] == most_recent.iloc[0]["run_id"]

tests/test_read.py

Lines changed: 30 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -9,33 +9,33 @@
99
DATASET_COLUMNS_SET = set(TIMDEX_DATASET_SCHEMA.names)
1010

1111

12-
def test_read_batches_yields_pyarrow_record_batches(fixed_local_dataset):
13-
batches = fixed_local_dataset.read_batches_iter()
12+
def test_read_batches_yields_pyarrow_record_batches(timdex_dataset_multi_source):
13+
batches = timdex_dataset_multi_source.read_batches_iter()
1414
batch = next(batches)
1515
assert isinstance(batch, pa.RecordBatch)
1616

1717

18-
def test_read_batches_all_columns_by_default(fixed_local_dataset):
19-
batches = fixed_local_dataset.read_batches_iter()
18+
def test_read_batches_all_columns_by_default(timdex_dataset_multi_source):
19+
batches = timdex_dataset_multi_source.read_batches_iter()
2020
batch = next(batches)
2121
assert set(batch.column_names) == DATASET_COLUMNS_SET
2222

2323

24-
def test_read_batches_filter_columns(fixed_local_dataset):
24+
def test_read_batches_filter_columns(timdex_dataset_multi_source):
2525
columns_subset = ["source", "transformed_record"]
26-
batches = fixed_local_dataset.read_batches_iter(columns=columns_subset)
26+
batches = timdex_dataset_multi_source.read_batches_iter(columns=columns_subset)
2727
batch = next(batches)
2828
assert set(batch.column_names) == set(columns_subset)
2929

3030

31-
def test_read_batches_no_filters_gets_full_dataset(fixed_local_dataset):
32-
batches = fixed_local_dataset.read_batches_iter()
31+
def test_read_batches_no_filters_gets_full_dataset(timdex_dataset_multi_source):
32+
batches = timdex_dataset_multi_source.read_batches_iter()
3333
table = pa.Table.from_batches(batches)
34-
assert len(table) == fixed_local_dataset.row_count
34+
assert len(table) == timdex_dataset_multi_source.row_count
3535

3636

37-
def test_read_batches_with_filters_gets_subset_of_dataset(fixed_local_dataset):
38-
batches = fixed_local_dataset.read_batches_iter(
37+
def test_read_batches_with_filters_gets_subset_of_dataset(timdex_dataset_multi_source):
38+
batches = timdex_dataset_multi_source.read_batches_iter(
3939
source="libguides",
4040
run_date="2024-12-01",
4141
run_type="daily",
@@ -44,45 +44,49 @@ def test_read_batches_with_filters_gets_subset_of_dataset(fixed_local_dataset):
4444

4545
table = pa.Table.from_batches(batches)
4646
assert len(table) == 1_000
47-
assert len(table) < fixed_local_dataset.row_count
47+
assert len(table) < timdex_dataset_multi_source.row_count
4848

4949
# assert loaded dataset is unchanged by filtering for a read method
50-
assert fixed_local_dataset.row_count == 5_000
50+
assert timdex_dataset_multi_source.row_count == 5_000
5151

5252

53-
def test_read_dataframe_batches_yields_dataframes(fixed_local_dataset):
54-
df_iter = fixed_local_dataset.read_dataframes_iter()
53+
def test_read_dataframe_batches_yields_dataframes(timdex_dataset_multi_source):
54+
df_iter = timdex_dataset_multi_source.read_dataframes_iter()
5555
df_batch = next(df_iter)
5656
assert isinstance(df_batch, pd.DataFrame)
5757
assert len(df_batch) == 1_000
5858

5959

60-
def test_read_dataframe_reads_all_dataset_rows_after_filtering(fixed_local_dataset):
61-
df = fixed_local_dataset.read_dataframe()
60+
def test_read_dataframe_reads_all_dataset_rows_after_filtering(
61+
timdex_dataset_multi_source,
62+
):
63+
df = timdex_dataset_multi_source.read_dataframe()
6264
assert isinstance(df, pd.DataFrame)
63-
assert len(df) == fixed_local_dataset.row_count
65+
assert len(df) == timdex_dataset_multi_source.row_count
6466

6567

66-
def test_read_dicts_yields_dictionary_for_each_dataset_record(fixed_local_dataset):
67-
records = fixed_local_dataset.read_dicts_iter()
68+
def test_read_dicts_yields_dictionary_for_each_dataset_record(
69+
timdex_dataset_multi_source,
70+
):
71+
records = timdex_dataset_multi_source.read_dicts_iter()
6872
record = next(records)
6973
assert isinstance(record, dict)
7074
assert set(record.keys()) == DATASET_COLUMNS_SET
7175

7276

73-
def test_read_batches_filter_to_none_returns_empty_list(fixed_local_dataset):
74-
batches = fixed_local_dataset.read_batches_iter(source="not-gonna-find-me")
77+
def test_read_batches_filter_to_none_returns_empty_list(timdex_dataset_multi_source):
78+
batches = timdex_dataset_multi_source.read_batches_iter(source="not-gonna-find-me")
7579
assert list(batches) == []
7680

7781

78-
def test_read_dicts_filter_to_none_stopiteration_immediately(fixed_local_dataset):
79-
batches = fixed_local_dataset.read_dicts_iter(source="not-gonna-find-me")
82+
def test_read_dicts_filter_to_none_stopiteration_immediately(timdex_dataset_multi_source):
83+
batches = timdex_dataset_multi_source.read_dicts_iter(source="not-gonna-find-me")
8084
with pytest.raises(StopIteration):
8185
next(batches)
8286

8387

84-
def test_read_transformed_records_yields_parsed_dictionary(fixed_local_dataset):
85-
batches = fixed_local_dataset.read_transformed_records_iter()
88+
def test_read_transformed_records_yields_parsed_dictionary(timdex_dataset_multi_source):
89+
batches = timdex_dataset_multi_source.read_transformed_records_iter()
8690
transformed_record = next(batches)
8791
assert isinstance(transformed_record, dict)
8892
assert transformed_record == {"title": ["Hello World."]}

0 commit comments

Comments
 (0)