-
Notifications
You must be signed in to change notification settings - Fork 6
Expand file tree
/
Copy pathtest_interactive_serial.py
More file actions
117 lines (97 loc) · 4.21 KB
/
test_interactive_serial.py
File metadata and controls
117 lines (97 loc) · 4.21 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
from threading import Thread
import unittest
import cloudpickle
import zmq
from executorlib.backend.interactive_serial import evaluate_cmd
def calc(i, j):
return i + j
def set_global_variable():
return {"j": 5}
def raise_error():
raise ValueError("interface error")
def submit(socket):
socket.send(
cloudpickle.dumps({"init": True, "fn": set_global_variable, "args": (), "kwargs": {}})
)
socket.send(cloudpickle.dumps({"fn": calc, "args": (), "kwargs": {"i": 2}}))
socket.send(cloudpickle.dumps({"shutdown": True, "wait": True}))
def submit_error(socket):
socket.send(
cloudpickle.dumps({"init": True, "fn": set_global_variable, "args": (), "kwargs": {}})
)
socket.send(cloudpickle.dumps({"fn": calc, "args": (), "kwargs": {}}))
socket.send(cloudpickle.dumps({"shutdown": True, "wait": True}))
def submit_init_error(socket):
socket.send(
cloudpickle.dumps({"init": True, "fn": raise_error, "args": (), "kwargs": {}})
)
socket.send(cloudpickle.dumps({"fn": calc, "args": (), "kwargs": {"i": 2}}))
socket.send(cloudpickle.dumps({"shutdown": True, "wait": True}))
class TestSerial(unittest.TestCase):
def test_main_as_thread(self):
context = zmq.Context()
socket = context.socket(zmq.PAIR)
port = socket.bind_to_random_port("tcp://*")
t = Thread(target=evaluate_cmd, kwargs={"argument_lst": ["--zmqport", str(port)]})
t.start()
submit(socket=socket)
self.assertEqual(cloudpickle.loads(socket.recv()), {"result": True})
self.assertEqual(cloudpickle.loads(socket.recv()), {"result": 7})
self.assertEqual(cloudpickle.loads(socket.recv()), {"result": True})
socket.close()
context.term()
def test_main_as_thread_error(self):
context = zmq.Context()
socket = context.socket(zmq.PAIR)
port = socket.bind_to_random_port("tcp://*")
t = Thread(target=evaluate_cmd, kwargs={"argument_lst": ["--zmqport", str(port)]})
t.start()
submit_error(socket=socket)
self.assertEqual(cloudpickle.loads(socket.recv()), {"result": True})
self.assertEqual(
str(type(cloudpickle.loads(socket.recv())["error"])), "<class 'TypeError'>"
)
self.assertEqual(cloudpickle.loads(socket.recv()), {"result": True})
socket.close()
context.term()
def test_main_as_thread_init_error(self):
context = zmq.Context()
socket = context.socket(zmq.PAIR)
port = socket.bind_to_random_port("tcp://*")
t = Thread(target=evaluate_cmd, kwargs={"argument_lst": ["--zmqport", str(port)]})
t.start()
submit_init_error(socket=socket)
self.assertEqual(
str(type(cloudpickle.loads(socket.recv())["error"])), "<class 'ValueError'>")
self.assertEqual(
str(type(cloudpickle.loads(socket.recv())["error"])), "<class 'TypeError'>"
)
self.assertEqual(cloudpickle.loads(socket.recv()), {"result": True})
socket.close()
context.term()
def test_submit_as_thread(self):
context = zmq.Context()
socket = context.socket(zmq.PAIR)
port = socket.bind_to_random_port("tcp://*")
t = Thread(target=submit, kwargs={"socket": socket})
t.start()
evaluate_cmd(argument_lst=["--zmqport", str(port)])
self.assertEqual(cloudpickle.loads(socket.recv()), {"result": True})
self.assertEqual(cloudpickle.loads(socket.recv()), {"result": 7})
self.assertEqual(cloudpickle.loads(socket.recv()), {"result": True})
socket.close()
context.term()
def test_submit_as_thread_error(self):
context = zmq.Context()
socket = context.socket(zmq.PAIR)
port = socket.bind_to_random_port("tcp://*")
t = Thread(target=submit_error, kwargs={"socket": socket})
t.start()
evaluate_cmd(argument_lst=["--zmqport", str(port)])
self.assertEqual(cloudpickle.loads(socket.recv()), {"result": True})
self.assertEqual(
str(type(cloudpickle.loads(socket.recv())["error"])), "<class 'TypeError'>"
)
self.assertEqual(cloudpickle.loads(socket.recv()), {"result": True})
socket.close()
context.term()