diff --git a/docs/source/examples/basic_task_label.py b/docs/source/examples/basic_task_label.py new file mode 100644 index 00000000..afd78846 --- /dev/null +++ b/docs/source/examples/basic_task_label.py @@ -0,0 +1,51 @@ +"""Basic task label example""" + +import ipyparallel as ipp + +# start up ipp cluster with 2 engines +cluster = ipp.Cluster(n=2) +cluster.start_cluster_sync() + +rc = cluster.connect_client_sync() +rc.wait_for_engines(n=2) + + +def wait(t): + import time + + tic = time.time() + time.sleep(t) + return time.time() - tic + + +# use load balanced view +bview = rc.load_balanced_view() +ar_list_b1 = [ + bview.set_flags(label=f"mylabel_map_{i:02}").map_async(wait, [2]) for i in range(10) +] +ar_list_b2 = [ + bview.set_flags(label=f"mylabel_map_{i:02}").apply_async(wait, 2) for i in range(10) +] +bview.wait(ar_list_b1) +bview.wait(ar_list_b2) + + +# use direct view +dview = rc[:] +ar_list_d1 = [ + dview.set_flags(label=f"mylabel_map_{i + 10:02}").apply_async(wait, 2) + for i in range(10) +] +ar_list_d2 = [ + dview.set_flags(label=f"mylabel_map_{i + 10:02}").map_async(wait, [2]) + for i in range(10) +] +dview.wait(ar_list_d1) +dview.wait(ar_list_d2) + +# query database +data = rc.db_query({'label': {"$nin": ""}}, keys=['msg_id', 'label', 'engine_uuid']) +for d in data: + print(f"msg_id={d['msg_id']}; label={d['label']}; engine_uuid={d['engine_uuid']}") + +cluster.stop_cluster_sync() diff --git a/ipyparallel/client/client.py b/ipyparallel/client/client.py index 8fec6f11..5a64fa73 100644 --- a/ipyparallel/client/client.py +++ b/ipyparallel/client/client.py @@ -220,6 +220,7 @@ def __init__(self, *args, **kwargs): 'stderr': '', 'outputs': [], 'data': {}, + 'label': None, } self.update(md) self.update(dict(*args, **kwargs)) diff --git a/ipyparallel/client/view.py b/ipyparallel/client/view.py index 53712c39..15ad4b69 100644 --- a/ipyparallel/client/view.py +++ b/ipyparallel/client/view.py @@ -98,6 +98,7 @@ class View(HasTraits): block = Bool(False) track = Bool(False) targets = Any() + label = Any() history = List() outstanding = Set() @@ -105,7 +106,7 @@ class View(HasTraits): client = Instance('ipyparallel.Client', allow_none=True) _socket = Any() - _flag_names = List(['targets', 'block', 'track']) + _flag_names = List(['targets', 'block', 'track', 'label']) _in_sync_results = Bool(False) _targets = Any() _idents = Any() @@ -155,6 +156,8 @@ def set_flags(self, **kwargs): else: setattr(self, name, value) + return self # returning self would allow direct calling of map/apply in one command (no context manager) + @contextmanager def temp_flags(self, **kwargs): """temporarily set flags, for use in `with` statements. @@ -530,7 +533,14 @@ def use_pickle(self): @sync_results @save_ids def _really_apply( - self, f, args=None, kwargs=None, targets=None, block=None, track=None + self, + f, + args=None, + kwargs=None, + targets=None, + block=None, + track=None, + label=None, ): """calls f(*args, **kwargs) on remote engines, returning the result. @@ -562,6 +572,8 @@ def _really_apply( block = self.block if block is None else block track = self.track if track is None else track targets = self.targets if targets is None else targets + label = self.label if label is None else label + metadata = dict(label=label) _idents, _targets = self.client._build_targets(targets) futures = [] @@ -572,7 +584,13 @@ def _really_apply( for ident in _idents: future = self.client.send_apply_request( - self._socket, pf, pargs, pkwargs, track=track, ident=ident + self._socket, + pf, + pargs, + pkwargs, + track=track, + ident=ident, + metadata=metadata, ) futures.append(future) if track: @@ -592,7 +610,15 @@ def _really_apply( return ar @sync_results - def map(self, f, *sequences, block=None, track=False, return_exceptions=False): + def map( + self, + f, + *sequences, + block=None, + track=False, + return_exceptions=False, + label=None, + ): """Parallel version of builtin `map`, using this View's `targets`. There will be one task per target, so work will be chunked @@ -630,10 +656,17 @@ def map(self, f, *sequences, block=None, track=False, return_exceptions=False): if block is None: block = self.block + if label is None: + label = self.label assert len(sequences) > 0, "must have some sequences to map onto!" pf = ParallelFunction( - self, f, block=block, track=track, return_exceptions=return_exceptions + self, + f, + block=block, + track=track, + return_exceptions=return_exceptions, + label=label, ) return pf.map(*sequences) @@ -1036,7 +1069,15 @@ def _broadcast_map(f, *sequence_names): return list(map(f, *sequences)) @_not_coalescing - def map(self, f, *sequences, block=None, track=False, return_exceptions=False): + def map( + self, + f, + *sequences, + block=None, + track=False, + return_exceptions=False, + label=None, + ): """Parallel version of builtin `map`, using this View's `targets`. There will be one task per engine, so work will be chunked @@ -1176,10 +1217,11 @@ class LoadBalancedView(View): after = Any() timeout = CFloat() retries = Integer(0) + label = Any() _task_scheme = Any() _flag_names = List( - ['targets', 'block', 'track', 'follow', 'after', 'timeout', 'retries'] + ['targets', 'block', 'track', 'follow', 'after', 'timeout', 'retries', 'label'] ) _outstanding_maps = Set() @@ -1275,6 +1317,8 @@ def set_flags(self, **kwargs): self.timeout = t + return self # returning self would allow direct calling of map/apply in one command (no context manager) + @sync_results @save_ids def _really_apply( @@ -1289,6 +1333,7 @@ def _really_apply( timeout=None, targets=None, retries=None, + label=None, ): """calls f(*args, **kwargs) on a remote engine, returning the result. @@ -1344,6 +1389,7 @@ def _really_apply( follow = self.follow if follow is None else follow timeout = self.timeout if timeout is None else timeout targets = self.targets if targets is None else targets + label = self.label if label is None else label if not isinstance(retries, int): raise TypeError(f'retries must be int, not {type(retries)!r}') @@ -1358,7 +1404,12 @@ def _really_apply( after = self._render_dependency(after) follow = self._render_dependency(follow) metadata = dict( - after=after, follow=follow, timeout=timeout, targets=idents, retries=retries + after=after, + follow=follow, + timeout=timeout, + targets=idents, + retries=retries, + label=label, ) future = self.client.send_apply_request( @@ -1389,6 +1440,7 @@ def map( chunksize=1, ordered=True, return_exceptions=False, + label=None, ): """Parallel version of builtin `map`, load-balanced by this View. @@ -1433,6 +1485,8 @@ def map( # default if block is None: block = self.block + if label is None: + label = self.label assert len(sequences) > 0, "must have some sequences to map onto!" @@ -1443,6 +1497,7 @@ def map( chunksize=chunksize, ordered=ordered, return_exceptions=return_exceptions, + label=label, ) return pf.map(*sequences) diff --git a/ipyparallel/controller/hub.py b/ipyparallel/controller/hub.py index b166fbce..282ffa10 100644 --- a/ipyparallel/controller/hub.py +++ b/ipyparallel/controller/hub.py @@ -75,6 +75,7 @@ def empty_record(): 'error': None, 'stdout': '', 'stderr': '', + 'label': None, } @@ -111,6 +112,7 @@ def init_record(msg): 'error': None, 'stdout': '', 'stderr': '', + 'label': msg['metadata'].get('label', None), } diff --git a/ipyparallel/engine/kernel.py b/ipyparallel/engine/kernel.py index 8c900167..f346371c 100644 --- a/ipyparallel/engine/kernel.py +++ b/ipyparallel/engine/kernel.py @@ -76,6 +76,7 @@ def init_metadata(self, parent): 'is_broadcast': parent_metadata.get('is_broadcast', False), 'is_coalescing': parent_metadata.get('is_coalescing', False), 'original_msg_id': parent_metadata.get('original_msg_id', ''), + 'label': parent_metadata.get('label', None), } def finish_metadata(self, parent, metadata, reply_content):