-
Notifications
You must be signed in to change notification settings - Fork 140
Expand file tree
/
Copy pathlarge_queries_mixin.py
More file actions
155 lines (131 loc) · 5.32 KB
/
large_queries_mixin.py
File metadata and controls
155 lines (131 loc) · 5.32 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
import logging
import math
import time
import pytest
log = logging.getLogger(__name__)
class LargeQueriesFetchMixin:
"""Shared fetch helper for large query test classes."""
def fetch_rows(self, cursor, row_count, fetchmany_size):
"""
A generator for rows. Fetches until the end or up to 5 minutes.
"""
# TODO: Remove fetchmany_size when we have fixed the performance issues with fetchone
# in the Python client
max_fetch_time = 5 * 60 # Fetch for at most 5 minutes
rows = self.get_some_rows(cursor, fetchmany_size)
start_time = time.time()
n = 0
while rows:
for row in rows:
n += 1
yield row
if time.time() - start_time >= max_fetch_time:
log.warning("Fetching rows timed out")
break
rows = self.get_some_rows(cursor, fetchmany_size)
if not rows:
# Read all the rows, row_count should match
self.assertEqual(n, row_count)
num_fetches = max(math.ceil(n / 10000), 1)
latency_ms = int((time.time() - start_time) * 1000 / num_fetches), 1
print(
"Fetched {} rows with an avg latency of {} per fetch, ".format(
n, latency_ms
)
+ "assuming 10K fetch size."
)
class LargeWideResultSetMixin(LargeQueriesFetchMixin):
"""Test mixin for large wide result set queries."""
@pytest.mark.parametrize(
"extra_params",
[
{},
{"use_sea": True},
],
)
def test_query_with_large_wide_result_set(self, extra_params):
resultSize = 100 * 1000 * 1000 # 100 MB
width = 8192 # B
rows = resultSize // width
cols = width // 36
# Set the fetchmany_size to get 10MB of data a go
fetchmany_size = 10 * 1024 * 1024 // width
# This is used by PyHive tests to determine the buffer size
self.arraysize = 1000
with self.cursor(extra_params) as cursor:
for lz4_compression in [False, True]:
cursor.connection.lz4_compression = lz4_compression
uuids = ", ".join(["uuid() uuid{}".format(i) for i in range(cols)])
cursor.execute(
"SELECT id, {uuids} FROM RANGE({rows})".format(
uuids=uuids, rows=rows
)
)
assert lz4_compression == cursor.active_result_set.lz4_compressed
for row_id, row in enumerate(
self.fetch_rows(cursor, rows, fetchmany_size)
):
assert row[0] == row_id # Verify no rows are dropped in the middle.
assert len(row[1]) == 36
class LargeNarrowResultSetMixin(LargeQueriesFetchMixin):
"""Test mixin for large narrow result set queries."""
@pytest.mark.parametrize(
"extra_params",
[
{},
{"use_sea": True},
],
)
def test_query_with_large_narrow_result_set(self, extra_params):
resultSize = 100 * 1000 * 1000 # 100 MB
width = 8 # sizeof(long)
rows = resultSize / width
# Set the fetchmany_size to get 10MB of data a go
fetchmany_size = 10 * 1024 * 1024 // width
# This is used by PyHive tests to determine the buffer size
self.arraysize = 10000000
with self.cursor(extra_params) as cursor:
cursor.execute("SELECT * FROM RANGE({rows})".format(rows=rows))
for row_id, row in enumerate(self.fetch_rows(cursor, rows, fetchmany_size)):
assert row[0] == row_id
class LongRunningQueryMixin:
"""Test mixin for long running queries."""
@pytest.mark.parametrize(
"extra_params",
[
{},
{"use_sea": True},
],
)
def test_long_running_query(self, extra_params):
"""Incrementally increase query size until it takes at least 3 minutes,
and asserts that the query completes successfully.
"""
minutes = 60
min_duration = 3 * minutes
duration = -1
scale0 = 10000
scale_factor = 50
with self.cursor(extra_params) as cursor:
while duration < min_duration:
assert scale_factor < 4096, "Detected infinite loop"
start = time.time()
cursor.execute(
"""SELECT count(*)
FROM RANGE({scale}) x
JOIN RANGE({scale0}) y
ON from_unixtime(x.id * y.id, "yyyy-MM-dd") LIKE "%not%a%date%"
""".format(
scale=scale_factor * scale0, scale0=scale0
)
)
(n,) = cursor.fetchone()
assert n == 0
duration = time.time() - start
current_fraction = duration / min_duration
print("Took {} s with scale factor={}".format(duration, scale_factor))
# Extrapolate linearly to reach 3 min and add 50% padding to push over the limit
scale_factor = math.ceil(1.5 * scale_factor / current_fraction)
# Keep backward-compatible alias that combines all three
class LargeQueriesMixin(LargeWideResultSetMixin, LargeNarrowResultSetMixin, LongRunningQueryMixin):
pass