Skip to content

Commit b59efc1

Browse files
[Feature] Implement flux job scheduling priorities (#592)
* [Feature] Implement flux job scheduling priorities * Format black * Add test for priority in flux --------- Co-authored-by: pyiron-runner <pyiron@mpie.de>
1 parent 8860291 commit b59efc1

2 files changed

Lines changed: 13 additions & 3 deletions

File tree

executorlib/interactive/fluxspawner.py

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -30,8 +30,11 @@ class FluxPythonSpawner(BaseSpawner):
3030
threads_per_core (int, optional): The number of threads per base. Defaults to 1.
3131
gpus_per_core (int, optional): The number of GPUs per base. Defaults to 0.
3232
num_nodes (int, optional): The number of compute nodes to use for executing the task. Defaults to None.
33-
exclusive (bool): Whether to exclusively reserve the compute nodes, or allow sharing compute notes. Defaults to False.
33+
exclusive (bool): Whether to exclusively reserve the compute nodes, or allow sharing compute notes. Defaults to
34+
False.
3435
openmpi_oversubscribe (bool, optional): Whether to oversubscribe. Defaults to False.
36+
priority (int, optional): job urgency 0 (lowest) through 31 (highest) (default is 16). Priorities 0 through 15
37+
are restricted to the instance owner.
3538
flux_executor (flux.job.FluxExecutor, optional): The FluxExecutor instance. Defaults to None.
3639
flux_executor_pmi_mode (str, optional): The PMI option. Defaults to None.
3740
flux_executor_nesting (bool, optional): Whether to use nested FluxExecutor. Defaults to False.
@@ -46,6 +49,7 @@ def __init__(
4649
gpus_per_core: int = 0,
4750
num_nodes: Optional[int] = None,
4851
exclusive: bool = False,
52+
priority: Optional[int] = None,
4953
openmpi_oversubscribe: bool = False,
5054
flux_executor: Optional[flux.job.FluxExecutor] = None,
5155
flux_executor_pmi_mode: Optional[str] = None,
@@ -65,6 +69,7 @@ def __init__(
6569
self._flux_executor_pmi_mode = flux_executor_pmi_mode
6670
self._flux_executor_nesting = flux_executor_nesting
6771
self._flux_log_files = flux_log_files
72+
self._priority = priority
6873
self._future = None
6974

7075
def bootup(
@@ -114,7 +119,12 @@ def bootup(
114119
elif self._flux_log_files:
115120
jobspec.stderr = os.path.abspath("flux.err")
116121
jobspec.stdout = os.path.abspath("flux.out")
117-
self._future = self._flux_executor.submit(jobspec)
122+
if self._priority is not None:
123+
self._future = self._flux_executor.submit(
124+
jobspec=jobspec, urgency=self._priority
125+
)
126+
else:
127+
self._future = self._flux_executor.submit(jobspec=jobspec)
118128

119129
def shutdown(self, wait: bool = True):
120130
"""

tests/test_flux_executor.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@ def setUp(self):
5050
def test_flux_executor_serial(self):
5151
with BlockAllocationExecutor(
5252
max_workers=2,
53-
executor_kwargs={"flux_executor": self.flux_executor},
53+
executor_kwargs={"flux_executor": self.flux_executor, "priority": 20},
5454
spawner=FluxPythonSpawner,
5555
) as exe:
5656
fs_1 = exe.submit(calc, 1)

0 commit comments

Comments
 (0)