Skip to content

Commit 3311978

Browse files
authored
Merge pull request #156 from MITLibraries/TIMX-530-prep-work-and-s3-client
Timx 530 prep work and s3 client
2 parents 9bf9b1b + b907a15 commit 3311978

9 files changed

Lines changed: 841 additions & 559 deletions

File tree

Makefile

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -63,9 +63,10 @@ ruff-apply: # Resolve 'fixable errors' with 'ruff'
6363
######################
6464
minio-start:
6565
docker run \
66-
-p 9000:9000 \
67-
-p 9001:9001 \
68-
-v $(MINIO_DATA):/data \
69-
-e "MINIO_ROOT_USER=$(MINIO_USERNAME)" \
70-
-e "MINIO_ROOT_PASSWORD=$(MINIO_PASSWORD)" \
71-
quay.io/minio/minio server /data --console-address ":9001"
66+
-d \
67+
-p 9000:9000 \
68+
-p 9001:9001 \
69+
-v $(MINIO_DATA):/data \
70+
-e "MINIO_ROOT_USER=$(MINIO_USERNAME)" \
71+
-e "MINIO_ROOT_PASSWORD=$(MINIO_PASSWORD)" \
72+
quay.io/minio/minio server /data --console-address ":9001"

Pipfile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ pyarrow = "*"
1212

1313
[dev-packages]
1414
black = "*"
15-
boto3-stubs = {version = "*", extras = ["s3"]}
15+
boto3-stubs = {extras = ["essential"], version = "*"}
1616
coveralls = "*"
1717
ipython = "*"
1818
moto = "*"

Pipfile.lock

Lines changed: 625 additions & 526 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

tests/conftest.py

Lines changed: 15 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,9 @@
11
"""tests/conftest.py"""
22

3-
# ruff: noqa: D205, D209
4-
53
import os
64

5+
import boto3
6+
import moto
77
import pytest
88

99
from tests.utils import (
@@ -62,7 +62,6 @@ def fixed_local_dataset(tmp_path) -> TIMDEXDataset:
6262
timdex_dataset.write(
6363
generate_sample_records(
6464
num_records=1_000,
65-
timdex_record_id_prefix=source,
6665
source=source,
6766
run_date="2024-12-01",
6867
run_id=run_id,
@@ -82,19 +81,6 @@ def _records_iter(num_records):
8281
return _records_iter
8382

8483

85-
@pytest.fixture
86-
def sample_records_iter_without_partitions():
87-
"""Simulates an iterator of X number of DatasetRecord instances WITHOUT partition
88-
values included."""
89-
90-
def _records_iter(num_records):
91-
return generate_sample_records(
92-
num_records, run_date="invalid run-date", year=None, month=None, day=None
93-
)
94-
95-
return _records_iter
96-
97-
9884
@pytest.fixture
9985
def dataset_with_runs_location(tmp_path) -> str:
10086
"""Fixture to simulate a dataset with multiple full and daily ETL runs."""
@@ -139,7 +125,6 @@ def dataset_with_runs_location(tmp_path) -> str:
139125
num_records, source, run_date, run_type, action, run_id = params
140126
records = generate_sample_records(
141127
num_records,
142-
timdex_record_id_prefix=source,
143128
source=source,
144129
run_date=run_date,
145130
run_type=run_type,
@@ -195,7 +180,6 @@ def dataset_with_same_day_runs(tmp_path) -> TIMDEXDataset:
195180
num_records, source, run_date, run_type, action, run_id, run_timestamp = params
196181
records = generate_sample_records(
197182
num_records,
198-
timdex_record_id_prefix=source,
199183
source=source,
200184
run_date=run_date,
201185
run_type=run_type,
@@ -214,3 +198,16 @@ def dataset_with_same_day_runs(tmp_path) -> TIMDEXDataset:
214198
@pytest.fixture
215199
def timdex_dataset_metadata(dataset_with_same_day_runs):
216200
return TIMDEXDatasetMetadata(timdex_dataset=dataset_with_same_day_runs)
201+
202+
203+
@pytest.fixture
204+
def timdex_bucket():
205+
return "timdex"
206+
207+
208+
@pytest.fixture
209+
def mock_s3_resource(timdex_bucket):
210+
with moto.mock_aws():
211+
conn = boto3.resource("s3", region_name="us-east-1")
212+
conn.create_bucket(Bucket=timdex_bucket)
213+
yield conn

tests/test_s3client.py

Lines changed: 110 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,110 @@
1+
"""tests/test_s3client.py"""
2+
3+
# ruff: noqa: PLR2004, SLF001
4+
5+
import pytest
6+
7+
from timdex_dataset_api.utils import S3Client
8+
9+
10+
def test_s3client_init():
11+
"""Test S3Client initialization."""
12+
client = S3Client()
13+
assert client.resource is not None
14+
15+
16+
def test_s3client_init_with_minio_env(caplog, monkeypatch):
17+
"""Test S3Client initialization with MinIO environment variables."""
18+
caplog.set_level("DEBUG")
19+
20+
monkeypatch.setenv("MINIO_S3_ENDPOINT_URL", "http://localhost:9000")
21+
monkeypatch.setenv("MINIO_USERNAME", "minioadmin")
22+
monkeypatch.setenv("MINIO_PASSWORD", "minioadmin")
23+
monkeypatch.setenv("MINIO_REGION", "us-east-1")
24+
25+
client = S3Client()
26+
assert client.resource is not None
27+
assert "MinIO env vars detected, using for S3Client" in caplog.text
28+
29+
30+
def test_split_s3_uri():
31+
"""Test _split_s3_uri method."""
32+
client = S3Client()
33+
bucket, key = client._split_s3_uri("s3://timdex/path/to/file.txt")
34+
assert bucket == "timdex"
35+
assert key == "path/to/file.txt"
36+
37+
38+
def test_split_s3_uri_invalid():
39+
"""Test _split_s3_uri method with invalid URI."""
40+
client = S3Client()
41+
with pytest.raises(ValueError, match="Invalid S3 URI"):
42+
client._split_s3_uri("timdex/path/to/file.txt")
43+
44+
45+
def test_upload_download_file(mock_s3_resource, tmp_path):
46+
"""Test upload_file and download_file methods."""
47+
client = S3Client()
48+
49+
# Create a test file
50+
test_file = tmp_path / "test.txt"
51+
test_file.write_text("test content")
52+
53+
# Upload the file
54+
s3_uri = "s3://timdex/test.txt"
55+
client.upload_file(test_file, s3_uri)
56+
57+
# Download the file to a different location
58+
download_path = tmp_path / "downloaded.txt"
59+
client.download_file(s3_uri, download_path)
60+
61+
# Verify the content
62+
assert download_path.read_text() == "test content"
63+
64+
65+
def test_delete_file(mock_s3_resource, tmp_path):
66+
"""Test delete_file method."""
67+
client = S3Client()
68+
69+
# Create and upload a test file
70+
test_file = tmp_path / "test.txt"
71+
test_file.write_text("test content")
72+
s3_uri = "s3://timdex/test.txt"
73+
client.upload_file(test_file, s3_uri)
74+
75+
# Delete the file
76+
client.delete_file(s3_uri)
77+
78+
# Verify the file is deleted
79+
bucket = mock_s3_resource.Bucket("timdex")
80+
objects = list(bucket.objects.all())
81+
assert len(objects) == 0
82+
83+
84+
def test_delete_folder(mock_s3_resource, tmp_path):
85+
"""Test delete_folder method."""
86+
client = S3Client()
87+
88+
# Create and upload test files
89+
for i in range(3):
90+
test_file = tmp_path / f"test{i}.txt"
91+
test_file.write_text(f"test content {i}")
92+
s3_uri = f"s3://timdex/folder/test{i}.txt"
93+
client.upload_file(test_file, s3_uri)
94+
95+
# Upload a file outside the folder
96+
other_file = tmp_path / "other.txt"
97+
other_file.write_text("other content")
98+
client.upload_file(other_file, "s3://timdex/other.txt")
99+
100+
# Delete the folder
101+
deleted_keys = client.delete_folder("s3://timdex/folder/")
102+
103+
# Verify only folder contents are deleted
104+
assert len(deleted_keys) == 3
105+
assert all(key.startswith("folder/") for key in deleted_keys)
106+
107+
bucket = mock_s3_resource.Bucket("timdex")
108+
objects = list(bucket.objects.all())
109+
assert len(objects) == 1
110+
assert objects[0].key == "other.txt"

tests/test_write.py

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -98,9 +98,7 @@ def test_dataset_write_partition_for_multiple_sources(
9898

9999
# perform write for source="libguides" and run_date="2024-12-01"
100100
written_files_source_b = new_local_dataset.write(
101-
generate_sample_records(
102-
num_records=7, timdex_record_id_prefix="libguides", source="libguides"
103-
)
101+
generate_sample_records(num_records=7, source="libguides")
104102
)
105103
new_local_dataset.load()
106104

tests/utils.py

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,6 @@
1111

1212
def generate_sample_records(
1313
num_records: int,
14-
timdex_record_id_prefix: str = "alma",
1514
source: str | None = "alma",
1615
run_date: str | None = "2024-12-01",
1716
run_type: str | None = "daily",
@@ -25,7 +24,7 @@ def generate_sample_records(
2524

2625
for x in range(num_records):
2726
yield DatasetRecord(
28-
timdex_record_id=f"{timdex_record_id_prefix}:{x}",
27+
timdex_record_id=f"{source}:{x}",
2928
source_record=b"<record><title>Hello World.</title></record>",
3029
transformed_record=b"""{"title":["Hello World."]}""",
3130
source=source,
@@ -53,7 +52,6 @@ def generate_sample_records_with_simulated_partitions(
5352
source = random.choice(sources)
5453
yield from generate_sample_records(
5554
num_records=batch_size,
56-
timdex_record_id_prefix=source,
5755
source=source,
5856
run_date=random.choice(run_dates),
5957
run_type=random.choice(run_types),

timdex_dataset_api/config.py

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -33,5 +33,8 @@ def configure_logger(
3333

3434
def configure_dev_logger() -> logging.Logger:
3535
"""Invoke to setup DEBUG level console logging for development work."""
36-
logging.basicConfig(level=logging.DEBUG)
37-
return configure_logger(__name__)
36+
if not logging.getLogger().handlers:
37+
logging.basicConfig(level=logging.WARNING)
38+
logger = logging.getLogger("timdex_dataset_api")
39+
logger.setLevel(logging.DEBUG)
40+
return logger

timdex_dataset_api/utils.py

Lines changed: 76 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,76 @@
1+
"""timdex_dataset_api/utils.py"""
2+
3+
import logging
4+
import os
5+
import pathlib
6+
from urllib.parse import urlparse
7+
8+
import boto3
9+
from mypy_boto3_s3.service_resource import S3ServiceResource
10+
11+
logger = logging.getLogger(__name__)
12+
13+
14+
class S3Client:
15+
def __init__(
16+
self,
17+
) -> None:
18+
self.resource = self._create_resource()
19+
20+
def _create_resource(self) -> S3ServiceResource:
21+
"""Instantiate a boto3 S3 resource.
22+
23+
If env var MINIO_S3_ENDPOINT_URL is set, assume using local set of MinIO env vars.
24+
"""
25+
endpoint_url = os.getenv("MINIO_S3_ENDPOINT_URL")
26+
if endpoint_url:
27+
logger.debug("MinIO env vars detected, using for S3Client.")
28+
return boto3.resource(
29+
"s3",
30+
endpoint_url=endpoint_url,
31+
aws_access_key_id=os.getenv("MINIO_USERNAME"),
32+
aws_secret_access_key=os.getenv("MINIO_PASSWORD"),
33+
region_name=os.getenv("MINIO_REGION", "us-east-1"),
34+
)
35+
return boto3.resource("s3")
36+
37+
def download_file(self, s3_uri: str, local_path: str | pathlib.Path) -> None:
38+
bucket, key = self._split_s3_uri(s3_uri)
39+
local_path = pathlib.Path(local_path)
40+
local_path.parent.mkdir(parents=True, exist_ok=True)
41+
self.resource.Bucket(bucket).download_file(key, str(local_path))
42+
logger.info(f"Downloaded {s3_uri} to {local_path}")
43+
44+
def upload_file(self, local_path: str | pathlib.Path, s3_uri: str) -> None:
45+
bucket, key = self._split_s3_uri(s3_uri)
46+
local_path = pathlib.Path(local_path)
47+
self.resource.Bucket(bucket).upload_file(str(local_path), key)
48+
logger.info(f"Uploaded {local_path} to {s3_uri}")
49+
50+
def delete_file(self, s3_uri: str) -> None:
51+
bucket, key = self._split_s3_uri(s3_uri)
52+
self.resource.Object(bucket, key).delete()
53+
logger.info(f"Deleted {s3_uri}")
54+
55+
def delete_folder(self, s3_uri: str) -> list[str]:
56+
"""Delete all objects whose keys start with the given prefix."""
57+
bucket, prefix = self._split_s3_uri(s3_uri)
58+
bucket_obj = self.resource.Bucket(bucket)
59+
receipt = bucket_obj.objects.filter(Prefix=prefix).delete()
60+
61+
deleted_keys = []
62+
for request in receipt:
63+
deleted_keys.extend([item["Key"] for item in request["Deleted"]])
64+
logger.info(f"Deleted {deleted_keys}")
65+
return deleted_keys
66+
67+
@staticmethod
68+
def _split_s3_uri(s3_uri: str) -> tuple[str, str]:
69+
"""Validate and split an S3 URI into (bucket, key)."""
70+
parsed = urlparse(s3_uri)
71+
if parsed.scheme != "s3" or not parsed.netloc or not parsed.path:
72+
raise ValueError(f"Invalid S3 URI: {s3_uri!r}")
73+
74+
bucket = parsed.netloc
75+
key = parsed.path.lstrip("/") # strip leading slash from /key
76+
return bucket, key

0 commit comments

Comments
 (0)