forked from databricks/databricks-sql-python
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathstreaming_put_tests.py
More file actions
51 lines (39 loc) · 1.91 KB
/
streaming_put_tests.py
File metadata and controls
51 lines (39 loc) · 1.91 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
#!/usr/bin/env python3
"""
E2E tests for streaming PUT operations.
"""
import io
import pytest
from datetime import datetime
class PySQLStreamingPutTestSuiteMixin:
"""Test suite for streaming PUT operations."""
def test_streaming_put_basic(self, catalog, schema):
"""Test basic streaming PUT functionality."""
# Create test data
test_data = b"Hello, streaming world! This is test data."
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
filename = f"stream_test_{timestamp}.txt"
with self.connection() as conn:
with conn.cursor() as cursor:
with io.BytesIO(test_data) as stream:
cursor.execute(
f"PUT '__input_stream__' INTO '/Volumes/{catalog}/{schema}/e2etests/{filename}'",
input_stream=stream
)
# Verify file exists
cursor.execute(f"LIST '/Volumes/{catalog}/{schema}/e2etests/'")
files = cursor.fetchall()
# Check if our file is in the list
file_paths = [row[0] for row in files]
expected_path = f"/Volumes/{catalog}/{schema}/e2etests/{filename}"
assert expected_path in file_paths, f"File {expected_path} not found in {file_paths}"
def test_streaming_put_missing_stream(self, catalog, schema):
"""Test that missing stream raises appropriate error."""
with self.connection() as conn:
with conn.cursor() as cursor:
# Test without providing stream
with pytest.raises(Exception): # Should fail
cursor.execute(
f"PUT '__input_stream__' INTO '/Volumes/{catalog}/{schema}/e2etests/test.txt'"
# Note: No input_stream parameter
)