Skip to content

Commit 09a61b7

Browse files
[Feature] Log Communication overhead (#671)
* [Debug] Log Communication overhead * Implement log_size_of_communicated_objects parameter * Rename parameter to log_obj_size * Format black * fix test --------- Co-authored-by: pyiron-runner <pyiron@mpie.de>
1 parent dc5c003 commit 09a61b7

7 files changed

Lines changed: 82 additions & 6 deletions

File tree

executorlib/executor/flux.py

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,7 @@ class FluxJobExecutor(BaseExecutor):
6161
plot_dependency_graph (bool): Plot the dependencies of multiple future objects without executing them. For
6262
debugging purposes and to get an overview of the specified dependencies.
6363
plot_dependency_graph_filename (str): Name of the file to store the plotted graph in.
64+
log_obj_size (bool): Enable debug mode which reports the size of the communicated objects.
6465
6566
Examples:
6667
```
@@ -100,6 +101,7 @@ def __init__(
100101
refresh_rate: float = 0.01,
101102
plot_dependency_graph: bool = False,
102103
plot_dependency_graph_filename: Optional[str] = None,
104+
log_obj_size: bool = False,
103105
):
104106
"""
105107
The executorlib.FluxJobExecutor leverages either the message passing interface (MPI), the SLURM workload manager
@@ -144,6 +146,7 @@ def __init__(
144146
plot_dependency_graph (bool): Plot the dependencies of multiple future objects without executing them. For
145147
debugging purposes and to get an overview of the specified dependencies.
146148
plot_dependency_graph_filename (str): Name of the file to store the plotted graph in.
149+
log_obj_size (bool): Enable debug mode which reports the size of the communicated objects.
147150
148151
"""
149152
default_resource_dict: dict = {
@@ -174,6 +177,7 @@ def __init__(
174177
hostname_localhost=hostname_localhost,
175178
block_allocation=block_allocation,
176179
init_function=init_function,
180+
log_obj_size=log_obj_size,
177181
),
178182
max_cores=max_cores,
179183
refresh_rate=refresh_rate,
@@ -197,6 +201,7 @@ def __init__(
197201
hostname_localhost=hostname_localhost,
198202
block_allocation=block_allocation,
199203
init_function=init_function,
204+
log_obj_size=log_obj_size,
200205
)
201206
)
202207

@@ -392,6 +397,7 @@ def create_flux_executor(
392397
hostname_localhost: Optional[bool] = None,
393398
block_allocation: bool = False,
394399
init_function: Optional[Callable] = None,
400+
log_obj_size: bool = False,
395401
) -> Union[OneProcessTaskScheduler, BlockAllocationTaskScheduler]:
396402
"""
397403
Create a flux executor
@@ -427,6 +433,7 @@ def create_flux_executor(
427433
resources have to be defined on the executor, rather than during the submission
428434
of the individual function.
429435
init_function (None): optional function to preset arguments for functions which are submitted later
436+
log_obj_size (bool): Enable debug mode which reports the size of the communicated objects.
430437
431438
Returns:
432439
InteractiveStepExecutor/ InteractiveExecutor
@@ -441,6 +448,7 @@ def create_flux_executor(
441448
cores_per_worker = resource_dict.get("cores", 1)
442449
resource_dict["cache_directory"] = cache_directory
443450
resource_dict["hostname_localhost"] = hostname_localhost
451+
resource_dict["log_obj_size"] = log_obj_size
444452
check_init_function(block_allocation=block_allocation, init_function=init_function)
445453
check_pmi(backend="flux_allocation", pmi=flux_executor_pmi_mode)
446454
check_oversubscribe(oversubscribe=resource_dict.get("openmpi_oversubscribe", False))

executorlib/executor/single.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -91,6 +91,7 @@ def __init__(
9191
refresh_rate: float = 0.01,
9292
plot_dependency_graph: bool = False,
9393
plot_dependency_graph_filename: Optional[str] = None,
94+
log_obj_size: bool = False,
9495
):
9596
"""
9697
The executorlib.SingleNodeExecutor leverages either the message passing interface (MPI), the SLURM workload
@@ -131,6 +132,7 @@ def __init__(
131132
plot_dependency_graph (bool): Plot the dependencies of multiple future objects without executing them. For
132133
debugging purposes and to get an overview of the specified dependencies.
133134
plot_dependency_graph_filename (str): Name of the file to store the plotted graph in.
135+
log_obj_size (bool): Enable debug mode which reports the size of the communicated objects.
134136
135137
"""
136138
default_resource_dict: dict = {
@@ -157,6 +159,7 @@ def __init__(
157159
hostname_localhost=hostname_localhost,
158160
block_allocation=block_allocation,
159161
init_function=init_function,
162+
log_obj_size=log_obj_size,
160163
),
161164
max_cores=max_cores,
162165
refresh_rate=refresh_rate,
@@ -176,6 +179,7 @@ def __init__(
176179
hostname_localhost=hostname_localhost,
177180
block_allocation=block_allocation,
178181
init_function=init_function,
182+
log_obj_size=log_obj_size,
179183
)
180184
)
181185

@@ -188,6 +192,7 @@ def create_single_node_executor(
188192
hostname_localhost: Optional[bool] = None,
189193
block_allocation: bool = False,
190194
init_function: Optional[Callable] = None,
195+
log_obj_size: bool = False,
191196
) -> Union[OneProcessTaskScheduler, BlockAllocationTaskScheduler]:
192197
"""
193198
Create a single node executor
@@ -219,6 +224,7 @@ def create_single_node_executor(
219224
resources have to be defined on the executor, rather than during the submission
220225
of the individual function.
221226
init_function (None): optional function to preset arguments for functions which are submitted later
227+
log_obj_size (bool): Enable debug mode which reports the size of the communicated objects.
222228
223229
Returns:
224230
InteractiveStepExecutor/ InteractiveExecutor
@@ -228,6 +234,7 @@ def create_single_node_executor(
228234
cores_per_worker = resource_dict.get("cores", 1)
229235
resource_dict["cache_directory"] = cache_directory
230236
resource_dict["hostname_localhost"] = hostname_localhost
237+
resource_dict["log_obj_size"] = log_obj_size
231238

232239
check_init_function(block_allocation=block_allocation, init_function=init_function)
233240
check_gpus_per_worker(gpus_per_worker=resource_dict.get("gpus_per_core", 0))

executorlib/executor/slurm.py

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -236,6 +236,7 @@ class SlurmJobExecutor(BaseExecutor):
236236
plot_dependency_graph (bool): Plot the dependencies of multiple future objects without executing them. For
237237
debugging purposes and to get an overview of the specified dependencies.
238238
plot_dependency_graph_filename (str): Name of the file to store the plotted graph in.
239+
log_obj_size (bool): Enable debug mode which reports the size of the communicated objects.
239240
240241
Examples:
241242
```
@@ -271,6 +272,7 @@ def __init__(
271272
refresh_rate: float = 0.01,
272273
plot_dependency_graph: bool = False,
273274
plot_dependency_graph_filename: Optional[str] = None,
275+
log_obj_size: bool = False,
274276
):
275277
"""
276278
The executorlib.SlurmJobExecutor leverages either the message passing interface (MPI), the SLURM workload
@@ -315,6 +317,7 @@ def __init__(
315317
plot_dependency_graph (bool): Plot the dependencies of multiple future objects without executing them. For
316318
debugging purposes and to get an overview of the specified dependencies.
317319
plot_dependency_graph_filename (str): Name of the file to store the plotted graph in.
320+
log_obj_size (bool): Enable debug mode which reports the size of the communicated objects.
318321
319322
"""
320323
default_resource_dict: dict = {
@@ -341,6 +344,7 @@ def __init__(
341344
hostname_localhost=hostname_localhost,
342345
block_allocation=block_allocation,
343346
init_function=init_function,
347+
log_obj_size=log_obj_size,
344348
),
345349
max_cores=max_cores,
346350
refresh_rate=refresh_rate,
@@ -360,6 +364,7 @@ def __init__(
360364
hostname_localhost=hostname_localhost,
361365
block_allocation=block_allocation,
362366
init_function=init_function,
367+
log_obj_size=log_obj_size,
363368
)
364369
)
365370

@@ -372,6 +377,7 @@ def create_slurm_executor(
372377
hostname_localhost: Optional[bool] = None,
373378
block_allocation: bool = False,
374379
init_function: Optional[Callable] = None,
380+
log_obj_size: bool = False,
375381
) -> Union[OneProcessTaskScheduler, BlockAllocationTaskScheduler]:
376382
"""
377383
Create a SLURM executor
@@ -407,6 +413,7 @@ def create_slurm_executor(
407413
resources have to be defined on the executor, rather than during the submission
408414
of the individual function.
409415
init_function (None): optional function to preset arguments for functions which are submitted later
416+
log_obj_size (bool): Enable debug mode which reports the size of the communicated objects.
410417
411418
Returns:
412419
InteractiveStepExecutor/ InteractiveExecutor
@@ -416,6 +423,7 @@ def create_slurm_executor(
416423
cores_per_worker = resource_dict.get("cores", 1)
417424
resource_dict["cache_directory"] = cache_directory
418425
resource_dict["hostname_localhost"] = hostname_localhost
426+
resource_dict["log_obj_size"] = log_obj_size
419427
check_init_function(block_allocation=block_allocation, init_function=init_function)
420428
if block_allocation:
421429
resource_dict["init_function"] = init_function

executorlib/standalone/interactive/communication.py

Lines changed: 23 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
import logging
12
import sys
23
from socket import gethostname
34
from typing import Optional
@@ -12,9 +13,10 @@ class SocketInterface:
1213
1314
Args:
1415
spawner (executorlib.shared.spawner.BaseSpawner): Interface for starting the parallel process
16+
log_obj_size (boolean): Enable debug mode which reports the size of the communicated objects.
1517
"""
1618

17-
def __init__(self, spawner=None):
19+
def __init__(self, spawner=None, log_obj_size=False):
1820
"""
1921
Initialize the SocketInterface.
2022
@@ -24,6 +26,10 @@ def __init__(self, spawner=None):
2426
self._context = zmq.Context()
2527
self._socket = self._context.socket(zmq.PAIR)
2628
self._process = None
29+
if log_obj_size:
30+
self._logger = logging.getLogger("executorlib")
31+
else:
32+
self._logger = None
2733
self._spawner = spawner
2834

2935
def send_dict(self, input_dict: dict):
@@ -34,7 +40,10 @@ def send_dict(self, input_dict: dict):
3440
input_dict (dict): dictionary of commands to be communicated. The key "shutdown" is reserved to stop the
3541
connected client from listening.
3642
"""
37-
self._socket.send(cloudpickle.dumps(input_dict))
43+
data = cloudpickle.dumps(input_dict)
44+
if self._logger is not None:
45+
self._logger.warning("Send dictionary of size: " + str(sys.getsizeof(data)))
46+
self._socket.send(data)
3847

3948
def receive_dict(self) -> dict:
4049
"""
@@ -43,7 +52,12 @@ def receive_dict(self) -> dict:
4352
Returns:
4453
dict: dictionary with response received from the connected client
4554
"""
46-
output = cloudpickle.loads(self._socket.recv())
55+
data = self._socket.recv()
56+
if self._logger is not None:
57+
self._logger.warning(
58+
"Received dictionary of size: " + str(sys.getsizeof(data))
59+
)
60+
output = cloudpickle.loads(data)
4761
if "result" in output:
4862
return output["result"]
4963
else:
@@ -121,6 +135,7 @@ def interface_bootup(
121135
command_lst: list[str],
122136
connections,
123137
hostname_localhost: Optional[bool] = None,
138+
log_obj_size: bool = False,
124139
) -> SocketInterface:
125140
"""
126141
Start interface for ZMQ communication
@@ -136,6 +151,7 @@ def interface_bootup(
136151
points to the same address as localhost. Still MacOS >= 12 seems to disable
137152
this look up for security reasons. So on MacOS it is required to set this
138153
option to true
154+
log_obj_size (boolean): Enable debug mode which reports the size of the communicated objects.
139155
140156
Returns:
141157
executorlib.shared.communication.SocketInterface: socket interface for zmq communication
@@ -149,7 +165,10 @@ def interface_bootup(
149165
"--host",
150166
gethostname(),
151167
]
152-
interface = SocketInterface(spawner=connections)
168+
interface = SocketInterface(
169+
spawner=connections,
170+
log_obj_size=log_obj_size,
171+
)
153172
command_lst += [
154173
"--zmqport",
155174
str(interface.bind_to_random_port()),

executorlib/task_scheduler/interactive/shared.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ def execute_tasks(
2323
init_function: Optional[Callable] = None,
2424
cache_directory: Optional[str] = None,
2525
queue_join_on_shutdown: bool = True,
26+
log_obj_size: bool = False,
2627
**kwargs,
2728
) -> None:
2829
"""
@@ -42,13 +43,15 @@ def execute_tasks(
4243
init_function (Callable): optional function to preset arguments for functions which are submitted later
4344
cache_directory (str, optional): The directory to store cache files. Defaults to "cache".
4445
queue_join_on_shutdown (bool): Join communication queue when thread is closed. Defaults to True.
46+
log_obj_size (bool): Enable debug mode which reports the size of the communicated objects.
4547
"""
4648
interface = interface_bootup(
4749
command_lst=_get_backend_path(
4850
cores=cores,
4951
),
5052
connections=spawner(cores=cores, **kwargs),
5153
hostname_localhost=hostname_localhost,
54+
log_obj_size=log_obj_size,
5255
)
5356
if init_function is not None:
5457
interface.send_dict(

tests/test_singlenodeexecutor_dependencies.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -341,6 +341,7 @@ def setUp(self):
341341
'openmpi_oversubscribe': False,
342342
'cache_directory': None,
343343
'hostname_localhost': None,
344+
'log_obj_size': False,
344345
'spawner': MpiExecSpawner,
345346
'max_cores': None,
346347
'max_workers': None,

tests/test_standalone_interactive_communication.py

Lines changed: 32 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -56,11 +56,41 @@ def test_interface_mpi(self):
5656
)
5757
interface.shutdown(wait=True)
5858

59-
def test_interface_serial(self):
59+
def test_interface_serial_without_debug(self):
6060
cloudpickle_register(ind=1)
6161
task_dict = {"fn": calc, "args": (), "kwargs": {"i": 2}}
6262
interface = SocketInterface(
63-
spawner=MpiExecSpawner(cwd=None, cores=1, openmpi_oversubscribe=False)
63+
spawner=MpiExecSpawner(cwd=None, cores=1, openmpi_oversubscribe=False),
64+
log_obj_size=False,
65+
)
66+
interface.bootup(
67+
command_lst=[
68+
sys.executable,
69+
os.path.abspath(
70+
os.path.join(
71+
__file__,
72+
"..",
73+
"..",
74+
"executorlib",
75+
"backend",
76+
"interactive_serial.py",
77+
)
78+
),
79+
"--zmqport",
80+
str(interface.bind_to_random_port()),
81+
]
82+
)
83+
self.assertEqual(
84+
interface.send_and_receive_dict(input_dict=task_dict), np.array(4)
85+
)
86+
interface.shutdown(wait=True)
87+
88+
def test_interface_serial_with_debug(self):
89+
cloudpickle_register(ind=1)
90+
task_dict = {"fn": calc, "args": (), "kwargs": {"i": 2}}
91+
interface = SocketInterface(
92+
spawner=MpiExecSpawner(cwd=None, cores=1, openmpi_oversubscribe=False),
93+
log_obj_size=True,
6494
)
6595
interface.bootup(
6696
command_lst=[

0 commit comments

Comments
 (0)