-
Notifications
You must be signed in to change notification settings - Fork 3
Expand file tree
/
Copy pathobserve.py
More file actions
122 lines (95 loc) · 5.38 KB
/
observe.py
File metadata and controls
122 lines (95 loc) · 5.38 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
import requests
from learning_orchestra_client._util._response_treat import ResponseTreat
class Observer:
debug = True
__TIMEOUT_TIME_MULTIPLICATION = 1000
__INPUT_NAME = "filename"
__FILENAME_REQUEST_FIELD = 'filename'
__OBSERVER_TYPE_REQUEST_FIELD = 'observe_type'
__TIMEOUT_REQUEST_FIELD = 'timeout'
__OBSERVER_NAME_REQUEST_FIELD = 'observer_name'
__OBSERVER_PIPELINE_REQUEST_FIELD = 'pipeline'
__MICROSERVICE_PORT = '5010'
def __init__(self, cluster_ip: str):
self.__api_path = "/api/learningOrchestra/v1/observer"
self.__service_base_url = f'{cluster_ip}:{self.__MICROSERVICE_PORT}'
self.__service_url = f'{self.__service_base_url}{self.__api_path}'
self.cluster_ip = cluster_ip.replace("http://", "")
self.__response_treat = ResponseTreat()
def wait(self, name: str, timeout: int=0,
observer_name:str='', pretty_response: bool = False) -> dict:
"""
Observe the end of a pipe for a timeout seconds or
until the pipe finishes its execution.
:param name: Represents the pipe name. Any tune, train, predict service can wait its finish with a wait method call.
:param timeout: the maximum time to wait the observed step, in seconds. If set to 0, there will be no timeout
:param observer_name: the name of the observer (default: observer_)
:return: Returns a dictionary with the content of a mongo collection, representing any pipe result
"""
return self.watch(name=name,
timeout=timeout,
type="wait",
observer_name=observer_name,
pretty_response=pretty_response)
def watch(self, name: str, timeout: int=0, type:str="wait",
observer_name:str='',pipeline:[]=None,
pretty_response: bool = False) -> dict:
"""
Observe the pipe for a timeout seconds or
until the pipe finishes its execution. It is a more complete method,
you can use it to configure your own pipelines if you wish. For more
simplistic uses, try the methods "wait" and "start_observing_pipe"
:param name: the name of the pipe to be observed. A train, predict, explore, transform or any other pipe can be observed.
:param timeout: the maximum time to wait the observed step, in seconds. If set to 0, there will be no timeout
:param type: type of the observation, it can be "wait" to observe the end of the pipe, "observe" to observe until the pipe change it's content or "custom" if you wish to provide your own mongo pipeline
:param observer_name: the name of the observer (default observer_)
:param pipeline: the custom pipeline that you wish to use on the observer. It is only used if type is set to "custom"
:return: Returns a dictionary with the content of a mongo collection, representing any pipe result
"""
if type == "all" or type == "wait" or type == '1':
type = "wait"
elif type == "finish" or type == "observe" or type == '2':
type = "observe"
else:
raise NameError("Invalid type parameter: " + type)
request_url = f'{self.__service_url}'
request_body = {
self.__FILENAME_REQUEST_FIELD: name,
self.__OBSERVER_TYPE_REQUEST_FIELD: type,
self.__TIMEOUT_REQUEST_FIELD: timeout,
self.__OBSERVER_NAME_REQUEST_FIELD: observer_name,
self.__OBSERVER_PIPELINE_REQUEST_FIELD: pipeline
}
observer_uri = requests.post(url=f'{request_url}',
json=request_body)
if(observer_uri.status_code >= 200 and observer_uri.status_code < 400):
url = f"{self.__service_base_url}{observer_uri.json()['result']}"
response = requests.get(url=url)
else:
raise Exception(observer_uri.json()['result'])
if response.status_code >= 200 and response.status_code < 400:
response = self.__response_treat.treatment(response,pretty_response)
else:
if response.status_code == 408:
raise TimeoutError(response.json()['result'])
raise Exception(response.json()['result'])
delete_resp = requests.delete(url=url)
return response
def start_observing_pipe(self, name: str, timeout: int=0,
observer_name:str='',
pretty_response: bool = False) -> dict:
"""
It waits until a pipe change its content
(replace, insert, update and delete mongoDB collection operation
types), so it is a bit different
from wait method with a timeout and a finish explicit condition.
:param name: the name of the pipe to be observed. A train, predict, explore, transform or any other pipe can be observed.
:param timeout: the maximum time to wait the observed step, in seconds. If set to 0, there will be no timeout
:param observer_name: the name of the observer (default observer_)
:returns: a dictionary with the content of a mongo collection, representing any pipe result
"""
return self.watch(name=name,
timeout=timeout,
type="observe",
observer_name=observer_name,
pretty_response=pretty_response)