-
Notifications
You must be signed in to change notification settings - Fork 141
Expand file tree
/
Copy pathlatency_logger.py
More file actions
231 lines (180 loc) · 7.63 KB
/
latency_logger.py
File metadata and controls
231 lines (180 loc) · 7.63 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
import time
import functools
from typing import Optional
from databricks.sql.telemetry.telemetry_client import TelemetryClientFactory
from databricks.sql.telemetry.models.event import (
SqlExecutionEvent,
)
from databricks.sql.telemetry.models.enums import ExecutionResultFormat, StatementType
from databricks.sql.utils import ColumnQueue, CloudFetchQueue, ArrowQueue
from uuid import UUID
class TelemetryExtractor:
"""
Base class for extracting telemetry information from various object types.
This class serves as a proxy that delegates attribute access to the wrapped object
while providing a common interface for extracting telemetry-related data.
"""
def __init__(self, obj):
"""
Initialize the extractor with an object to wrap.
Args:
obj: The object to extract telemetry information from.
"""
self._obj = obj
def __getattr__(self, name):
"""
Delegate attribute access to the wrapped object.
Args:
name (str): The name of the attribute to access.
Returns:
The attribute value from the wrapped object.
"""
return getattr(self._obj, name)
def get_session_id_hex(self):
pass
def get_statement_id(self):
pass
def get_is_compressed(self):
pass
def get_execution_result(self):
pass
def get_retry_count(self):
pass
class CursorExtractor(TelemetryExtractor):
"""
Telemetry extractor specialized for Cursor objects.
Extracts telemetry information from database cursor objects, including
statement IDs, session information, compression settings, and result formats.
"""
def get_statement_id(self) -> Optional[str]:
return self.query_id
def get_session_id_hex(self) -> Optional[str]:
return self.connection.get_session_id_hex()
def get_is_compressed(self) -> bool:
return self.connection.lz4_compression
def get_execution_result(self) -> ExecutionResultFormat:
if self.active_result_set is None:
return ExecutionResultFormat.FORMAT_UNSPECIFIED
if isinstance(self.active_result_set.results, ColumnQueue):
return ExecutionResultFormat.COLUMNAR_INLINE
elif isinstance(self.active_result_set.results, CloudFetchQueue):
return ExecutionResultFormat.EXTERNAL_LINKS
elif isinstance(self.active_result_set.results, ArrowQueue):
return ExecutionResultFormat.INLINE_ARROW
return ExecutionResultFormat.FORMAT_UNSPECIFIED
def get_retry_count(self) -> int:
if (
hasattr(self.thrift_backend, "retry_policy")
and self.thrift_backend.retry_policy
):
return len(self.thrift_backend.retry_policy.history)
return 0
class ResultSetExtractor(TelemetryExtractor):
"""
Telemetry extractor specialized for ResultSet objects.
Extracts telemetry information from database result set objects, including
operation IDs, session information, compression settings, and result formats.
"""
def get_statement_id(self) -> Optional[str]:
if self.command_id:
return str(UUID(bytes=self.command_id.operationId.guid))
return None
def get_session_id_hex(self) -> Optional[str]:
return self.connection.get_session_id_hex()
def get_is_compressed(self) -> bool:
return self.lz4_compressed
def get_execution_result(self) -> ExecutionResultFormat:
if isinstance(self.results, ColumnQueue):
return ExecutionResultFormat.COLUMNAR_INLINE
elif isinstance(self.results, CloudFetchQueue):
return ExecutionResultFormat.EXTERNAL_LINKS
elif isinstance(self.results, ArrowQueue):
return ExecutionResultFormat.INLINE_ARROW
return ExecutionResultFormat.FORMAT_UNSPECIFIED
def get_retry_count(self) -> int:
if (
hasattr(self.thrift_backend, "retry_policy")
and self.thrift_backend.retry_policy
):
return len(self.thrift_backend.retry_policy.history)
return 0
def get_extractor(obj):
"""
Factory function to create the appropriate telemetry extractor for an object.
Determines the object type and returns the corresponding specialized extractor
that can extract telemetry information from that object type.
Args:
obj: The object to create an extractor for. Can be a Cursor, ResultSet,
or any other object.
Returns:
TelemetryExtractor: A specialized extractor instance:
- CursorExtractor for Cursor objects
- ResultSetExtractor for ResultSet objects
- Throws an NotImplementedError for all other objects
"""
if obj.__class__.__name__ == "Cursor":
return CursorExtractor(obj)
elif obj.__class__.__name__ == "ResultSet":
return ResultSetExtractor(obj)
else:
raise NotImplementedError(f"No extractor found for {obj.__class__.__name__}")
def log_latency(statement_type: StatementType = StatementType.NONE):
"""
Decorator for logging execution latency and telemetry information.
This decorator measures the execution time of a method and sends telemetry
data about the operation, including latency, statement information, and
execution context.
The decorator automatically:
- Measures execution time using high-precision performance counters
- Extracts telemetry information from the method's object (self)
- Creates a SqlExecutionEvent with execution details
- Sends the telemetry data asynchronously via TelemetryClient
Args:
statement_type (StatementType): The type of SQL statement being executed.
Usage:
@log_latency(StatementType.SQL)
def execute(self, query):
# Method implementation
pass
Returns:
function: A decorator that wraps methods to add latency logging.
Note:
The wrapped method's object (self) must be compatible with the
telemetry extractor system (e.g., Cursor or ResultSet objects).
"""
def decorator(func):
@functools.wraps(func)
def wrapper(self, *args, **kwargs):
start_time = time.perf_counter()
result = None
try:
result = func(self, *args, **kwargs)
return result
finally:
def _safe_call(func_to_call):
"""Calls a function and returns a default value on any exception."""
try:
return func_to_call()
except Exception:
return None
end_time = time.perf_counter()
duration_ms = int((end_time - start_time) * 1000)
extractor = get_extractor(self)
session_id_hex = _safe_call(extractor.get_session_id_hex)
statement_id = _safe_call(extractor.get_statement_id)
sql_exec_event = SqlExecutionEvent(
statement_type=statement_type,
is_compressed=_safe_call(extractor.get_is_compressed),
execution_result=_safe_call(extractor.get_execution_result),
retry_count=_safe_call(extractor.get_retry_count),
)
telemetry_client = TelemetryClientFactory.get_telemetry_client(
session_id_hex
)
telemetry_client.export_latency_log(
latency_ms=duration_ms,
sql_execution_event=sql_exec_event,
sql_statement_id=statement_id,
)
return wrapper
return decorator