-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathmetric_service.py
More file actions
69 lines (57 loc) · 2.54 KB
/
metric_service.py
File metadata and controls
69 lines (57 loc) · 2.54 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
import logging
from spaceone.core.service import *
from spaceone.monitoring.manager.google_cloud_manager import GoogleCloudManager
from spaceone.monitoring.manager.metric_manager import MetricManager
_LOGGER = logging.getLogger(__name__)
DEFAULT_SCHEMA = 'google_oauth2_credentials'
@authentication_handler
@authorization_handler
@event_handler
class MetricService(BaseService):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.google_mgr: GoogleCloudManager = self.locator.get_manager('GoogleCloudManager')
self.metric_mgr: MetricManager = self.locator.get_manager('MetricManager')
@transaction
@check_required(['options', 'secret_data', 'query'])
def list(self, params):
"""Get Google Cloud Monitoring metrics
Args:
params (dict): {
'schema': 'str',
'options': 'dict',
'secret_data': 'dict',
'query': 'dict'
}
Returns:
plugin_metrics_response (dict)
"""
metrics_info = self.google_mgr.list_metrics(params.get('schema', DEFAULT_SCHEMA), params['options'],
params['secret_data'], params['query'])
return self.metric_mgr.make_metrics_response(metrics_info)
@transaction
@check_required(['options', 'secret_data', 'metric_query', 'start', 'end'])
@change_timestamp_value(['start', 'end'], timestamp_format='iso8601')
def get_data(self, params):
"""Get Google StackDriver metric data
Args:
params (dict): {
'schema': 'str',
'options': 'dict',
'secret_data': 'dict',
'metric_query': 'dict',
'metric': 'str',
'start': 'timestamp',
'end': 'timestamp',
'period': 'int',
'stat': 'str'
}
Returns:
plugin_metric_data_response (dict)
"""
metric_data_info = self.google_mgr.get_metric_data(params.get('schema', DEFAULT_SCHEMA), params['options'],
params['secret_data'],
params['metric_query'], params['metric'],
params['start'], params['end'], params.get('period'),
params.get('stat'))
return self.metric_mgr.make_metric_data_response(metric_data_info)