Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 47 additions & 0 deletions docs/source/examples/basic_task_label.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
"""Basic task label example"""

import ipyparallel as ipp

# start up ipp cluster with 2 engines
cluster = ipp.Cluster(n=2)
cluster.start_cluster_sync()

rc = cluster.connect_client_sync()
rc.wait_for_engines(n=2)


def wait(t):
import time

tic = time.time()
time.sleep(t)
return time.time() - tic


# send tasks to cluster
balanced_view = True
if balanced_view:
# use load balanced view
dview = rc.load_balanced_view()
ar_list = [
dview.map_async(wait, [2], task_label=f"task_label_{i:02}") for i in range(10)
]
dview.wait(ar_list)
else:
# use direct view
dview = rc[:]
ar_list = [
dview.apply_async(wait, 2, task_label=f"task_label_{i:02}") for i in range(10)
]
dview.wait(ar_list)

# query database
data = rc.db_query(
{'task_label': {"$nin": ""}}, keys=['msg_id', 'task_label', 'engine_uuid']
)
for d in data:
print(
f"msg_id={d['msg_id']}; task_label={d['task_label']}; engine_uuid={d['engine_uuid']}"
)

cluster.stop_cluster_sync()
6 changes: 5 additions & 1 deletion ipyparallel/client/remotefunction.py
Original file line number Diff line number Diff line change
Expand Up @@ -195,12 +195,14 @@ def __init__(
chunksize=None,
ordered=True,
return_exceptions=False,
task_label=None,
**flags,
):
super().__init__(view, f, block=block, **flags)
self.chunksize = chunksize
self.ordered = ordered
self.return_exceptions = return_exceptions
self.task_label = task_label

mapClass = Map.dists[dist]
self.mapObject = mapClass()
Expand Down Expand Up @@ -293,7 +295,9 @@ def __call__(self, *sequences, **kwargs):

view = self.view if balanced else client[t]
with view.temp_flags(block=False, **self.flags):
ar = view.apply(f, *args)
ar = view.apply(
f, *args, task_label=self.task_label
) # is this the right place to insert the task_label?
ar.owner = False

msg_id = ar.msg_ids[0]
Expand Down
46 changes: 42 additions & 4 deletions ipyparallel/client/view.py
Original file line number Diff line number Diff line change
Expand Up @@ -566,13 +566,24 @@ def _really_apply(
_idents, _targets = self.client._build_targets(targets)
futures = []

task_label = (
kwargs.pop("task_label") if "task_label" in kwargs else None
) # is this the correct/best way of retieving task_label?
metadata = dict(task_label=task_label)

pf = PrePickled(f)
pargs = [PrePickled(arg) for arg in args]
pkwargs = {k: PrePickled(v) for k, v in kwargs.items()}

for ident in _idents:
future = self.client.send_apply_request(
self._socket, pf, pargs, pkwargs, track=track, ident=ident
self._socket,
pf,
pargs,
pkwargs,
track=track,
ident=ident,
metadata=metadata,
)
futures.append(future)
if track:
Expand All @@ -592,7 +603,15 @@ def _really_apply(
return ar

@sync_results
def map(self, f, *sequences, block=None, track=False, return_exceptions=False):
def map(
self,
f,
*sequences,
block=None,
track=False,
return_exceptions=False,
task_label=None,
):
"""Parallel version of builtin `map`, using this View's `targets`.

There will be one task per target, so work will be chunked
Expand Down Expand Up @@ -1036,7 +1055,15 @@ def _broadcast_map(f, *sequence_names):
return list(map(f, *sequences))

@_not_coalescing
def map(self, f, *sequences, block=None, track=False, return_exceptions=False):
def map(
self,
f,
*sequences,
block=None,
track=False,
return_exceptions=False,
task_label=None,
):
"""Parallel version of builtin `map`, using this View's `targets`.

There will be one task per engine, so work will be chunked
Expand Down Expand Up @@ -1355,10 +1382,19 @@ def _really_apply(
# ensure *not* bytes
idents = [ident.decode() for ident in idents]

task_label = (
kwargs.pop("task_label") if "task_label" in kwargs else None
) # is this the correct/best way of retieving task_label?

after = self._render_dependency(after)
follow = self._render_dependency(follow)
metadata = dict(
after=after, follow=follow, timeout=timeout, targets=idents, retries=retries
after=after,
follow=follow,
timeout=timeout,
targets=idents,
retries=retries,
task_label=task_label,
)

future = self.client.send_apply_request(
Expand Down Expand Up @@ -1389,6 +1425,7 @@ def map(
chunksize=1,
ordered=True,
return_exceptions=False,
task_label=None,
):
"""Parallel version of builtin `map`, load-balanced by this View.

Expand Down Expand Up @@ -1443,6 +1480,7 @@ def map(
chunksize=chunksize,
ordered=ordered,
return_exceptions=return_exceptions,
task_label=task_label,
)
return pf.map(*sequences)

Expand Down
2 changes: 2 additions & 0 deletions ipyparallel/controller/hub.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ def empty_record():
'error': None,
'stdout': '',
'stderr': '',
'task_label': None,
}


Expand Down Expand Up @@ -111,6 +112,7 @@ def init_record(msg):
'error': None,
'stdout': '',
'stderr': '',
'task_label': msg['metadata'].get('task_label', None),
}


Expand Down
1 change: 1 addition & 0 deletions ipyparallel/engine/kernel.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ def init_metadata(self, parent):
'is_broadcast': parent_metadata.get('is_broadcast', False),
'is_coalescing': parent_metadata.get('is_coalescing', False),
'original_msg_id': parent_metadata.get('original_msg_id', ''),
'task_label': parent_metadata.get('task_label', None),
}

def finish_metadata(self, parent, metadata, reply_content):
Expand Down