Development documentation¶
(stub)
Noodles¶
-
noodles.
schedule
(f, **hints)¶ Decorator; schedule calls to function
f
into a workflow, in stead of running them at once. The decorated function returns aPromisedObject
.
-
noodles.
schedule_hint
(**hints)¶ Decorator; same as
schedule()
, with added hints. These hints can be anything.
-
noodles.
run_single
(workflow)¶ “Run workflow in a single thread (same as the scheduler).
Parameters: workflow – Workflow or PromisedObject to be evaluated. Returns: Evaluated result.
-
noodles.
run_process
(workflow, *, n_processes, registry, verbose=False, jobdirs=False, init=None, finish=None, deref=False)¶ Run the workflow using a number of new python processes. Use this runner to test the workflow in a situation where data serial is needed.
Parameters: - workflow (
Workflow
orPromisedObject
) – The workflow. - n_processes – Number of processes to start.
- registry – The serial registry.
- verbose – Request verbose output on worker side
- jobdirs – Create a new directory for each job to prevent filename collision.(NYI)
- init – An init function that needs to be run in each process before other jobs can be run. This should be a scheduled function returning True on success.
- finish – A function that wraps up when the worker closes down.
- deref (bool) – Set this to True to pass the result through one more encoding and decoding step with object derefencing turned on.
Returns: the result of evaluating the workflow
Return type: any
- workflow (
-
class
noodles.
Scheduler
(verbose=False, error_handler=None, job_keeper=None)¶ Schedules jobs, recieves results, then schedules more jobs as they become ready to compute. This class communicates with a pool of workers by means of coroutines.
-
run
(connection: noodles.lib.connection.Connection, master: noodles.workflow.model.Workflow)¶ Run a workflow.
Parameters: - connection (Connection) – A connection giving a sink to the job-queue and a source yielding results.
- master (Workflow) – The workflow.
-
-
noodles.
has_scheduled_methods
(cls)¶ Decorator; use this on a class for which some methods have been decorated with
schedule()
orschedule_hint()
. Those methods are then tagged with the attribute__member_of__
, so that we may serialise and retrieve the correct method. This should be considered a patch to a flaw in the Python object model.
-
class
noodles.
Fail
(func, fails=None, exception=None)¶ Signifies a failure in a computation that was wrapped by a
@maybe
decorator. Because Noodles runs all functions from the same context, it is not possible to use Python stack traces to find out where an error happened. In stead we use aFail
object to store information about exceptions and the subsequent continuation of the failure.-
add_call
(func)¶ Add a call to the trace.
-
is_root_cause
¶ If the field
exception
is set in this object, it means that we are looking at the root cause of the failure.
-
-
noodles.
failed
(obj)¶ Returns True if
obj
is an instance ofFail
.
-
noodles.
run_logging
(wf, n_threads, display)¶ Adds a display to the parallel runner. Because messages come in asynchronously now, we start an extra thread just for the display routine.
-
noodles.
run_parallel
(workflow, n_threads)¶ Run a workflow in parallel threads.
Parameters: - workflow – Workflow or PromisedObject to evaluate.
- n_threads – number of threads to use (in addition to the scheduler).
Returns: evaluated workflow.
-
noodles.
unwrap
(f)¶ Safely obtain the inner function of a previously wrapped (or decorated) function. This either returns
f.__wrapped__
or justf
if the latter fails.
-
noodles.
gather
(*a)¶ (scheduled) Converts a list of promises (i.e.
PromisedObject
) to a promised list of values.
-
noodles.
gather_all
(a)¶ Converts an iterator of promises into a promise of a list.
-
noodles.
gather_dict
(**kwargs)¶ (scheduled) Creates a promise of a dictionary.
-
noodles.
lift
(obj, memo=None)¶ Make a promise out of object
obj
, whereobj
may contain promises internally.Parameters: - obj – Any object.
- memo – used for internal caching (similar to
deepcopy()
).
If the object is a
PromisedObject
, or pass-by-value (str
,int
,float
,complex
) it is returned as is.If the object’s
id
has an entry inmemo
, the value frommemo
is returned.If the object has a method
__lift__
, it is used to get the promise.__lift__
should take one additional argument for thememo
dictionary, entirely analogous todeepcopy()
.If the object is an instance of one of the basic container types (list, dictionary, tuple and set), we use the analogous function (
make_list()
,make_dict()
,make_tuple()
, andmake_set()
) to promise their counterparts should these objects contain any promises. First, we map all items in the container throughlift()
, then check the result for any promises. Note that in the case of dictionaries, we lift all the items (i.e. the list of key/value tuples) and then construct a new dictionary.If the object is an instance of a subclass of any of the basic container types, the
__dict__
of the object is lifted as well as the object cast to its base type. We then useset_dict()
to set the__dict__
of the new promise. Again, if the object did not contain any promises, we return it without change.Otherwise, we lift the
__dict__
and create a promise of a new object of the same class as the input, usingcreate_object()
. This works fine for what we call reasonable objects. Since callinglift()
is an explicit action, we do not require reasonable objects to be derived fromReasonable
as we do with serialisation, where such a default behaviour could lead to unexplicable bugs.
-
noodles.
unpack
(t, n)¶ Iterates over a promised sequence, the sequence should support random access by
object.__getitem__()
. Also the length of the sequence should be known beforehand.Parameters: - t – a sequence.
- n – the length of the sequence.
Returns: an unpackable generator for the elements in the sequence.
-
noodles.
maybe
(func)¶ Calls
f
in a try/except block, returning aFail
object if the call fails in any way. If any of the arguments to the call are Fail objects, the call is not attempted.
-
noodles.
delay
(value)¶ (scheduled) Creates a promise of a given value. TODO: this function should have a different name.
-
noodles.
update_hints
(obj, data)¶ Update the hints on the root-node of a workflow. Usually, schedule hints are fixed per function. Sometimes a user may want to set hints manually on a specific promised object.
update_hints()
uses theupdate
method on the hints dictionary withdata
as its argument.Parameters: - obj – a
PromisedObject
. - data – a
dict
containing additional hints.
The hints are modified, in place, on the node. All workflows that contain the node are affected.
- obj – a
-
noodles.
quote
(promise)¶ Quote a promise.
-
noodles.
unquote
(quoted)¶ Unquote a quoted promise.
-
noodles.
result
(obj)¶ Results are stored on the nodes in the workflow at run time. This function can be used to get at a result of a node in a workflow after run time. This is not a recommended way of getting at results, but can help with debugging.
-
noodles.
fold
(fun: Callable, state: Any, xs: Iterable)¶ (scheduled) Traverse an iterable object while performing stateful computations with the elements. It returns a
PromisedObject
containing the result of the stateful computations.For a general definition of folding see: https://en.wikipedia.org/wiki/Fold_(higher-order_function)
Parameters: - fun – stateful function.
- state – initial state.
- xs – iterable object.
Returns: PromisedObject
-
noodles.
find_first
(pred, lst)¶ Find the first result of a list of promises
lst
that satisfies a predicatepred
.Parameters: - pred – a function of one argument returning
True
orFalse
. - lst – a list of promises or values.
Returns: a promise of a value or
None
.This is a wrapper around
s_find_first()
. The first item on the list is passed as is, forcing evalutation. The tail of the list is quoted, and only unquoted if the predicate fails on the result of the first promise.If the input list is empty,
None
is returned.- pred – a function of one argument returning
-
noodles.
conditional
(b: bool, branch_true: Any, branch_false: Any = None) → Any¶ Control statement to follow a branch in workflow. Equivalent to the
if
statement in standard Python.The quote function delay the evaluation of the branches until the boolean is evaluated.
Parameters: - b – promised boolean value.
- branch_true – statement to execute in case of a true predicate.
- branch_false – default operation to execute in case of a false predicate.
Returns: PromisedObject
-
noodles.
simple_lift
(obj)¶ (scheduled) Create a promise from a plain object.
Internal Specs¶
-
noodles.workflow.
invert_links
(links)¶ Inverts the call-graph to get a dependency graph. Possibly slow, short version.
Parameters: links (Mapping[NodeId, Set[(NodeId, ArgumentType, [int|str]])]) – forward links of a call-graph. Returns: inverted graph, giving dependency of jobs. Return type: Mapping[NodeId, Mapping[(ArgumentType, [int|str]), NodeId]]
-
noodles.workflow.
from_call
(foo, args, kwargs, hints, call_by_value=True)¶ Takes a function and a set of arguments it needs to run on. Returns a newly constructed workflow representing the promised value from the evaluation of the function with said arguments.
These arguments are stored in a BoundArguments object matching to the signature of the given function
foo
. That is, bound_args was constructed by doing:inspect.signature(foo).bind(*args, **kwargs)
The arguments stored in the
bound_args
object are filtered on being either plain, or promised. If an argument is promised, the value it represents is not actually available and needs to be computed by evaluating a workflow.If an argument is a promised value, the workflow representing the value is added to the new workflow. First all the nodes in the original workflow, if not already present in the new workflow from an earlier argument, are copied to the new workflow, and a new entry is made into the link dictionary. Then the links in the old workflow are also added to the link dictionary. Since the link dictionary points from nodes to a
set
ofArgumentAddress
es, no links are duplicated.In the
bound_args
object the promised value is replaced by theEmpty
object, so that we can see which arguments still have to be evaluated.Doing this for all promised value arguments in the bound_args object, results in a new workflow with all the correct dependencies represented as links in the graph.
Parameters: - foo (Callable) – Function (or object) being called.
- args – Normal arguments to call
- kwargs – Keyword arguments to call
- hints – Hints that can be passed to the scheduler on where or how to schedule this job.
Returns: New workflow.
Return type:
-
class
noodles.workflow.
Workflow
(root, nodes, links)¶ The workflow data container.
-
root
¶ A reference to the root node in the graph.
-
nodes
¶ A
dict
listing the nodes in the graph. We use adict
only to have a persistent object reference.
-
links
¶ A
dict
giving aset
of links from each node.
-
-
class
noodles.workflow.
FunctionNode
(foo, bound_args, hints, result)¶ Captures a function call as a combination of function and arguments. Some of these arguments may be set to
Empty
, these need to be filled in by the workflow before the function can be applied.-
foo
¶
The function (or object) that is being called.
-
bound_args
¶
A
BoundArguments
object storing the arguments to the function.-
-
class
noodles.workflow.
NodeData
(function, arguments, hints)¶ -
arguments
¶ Alias for field number 1
-
function
¶ Alias for field number 0
-
hints
¶ Alias for field number 2
-
-
noodles.workflow.
insert_result
(node, address, value)¶ Runs
set_argument
, but checks first wether the data location is not already filled with some data. In any normal circumstance this checking is redundant, but if we don’t give an error here the program would continue with unexpected results.
-
noodles.workflow.
Empty
¶ alias of
inspect._empty
-
noodles.workflow.
is_node_ready
(node)¶ Returns True if none of the argument holders contain any
Empty
object.
-
class
noodles.workflow.
Argument
(address, value)¶ -
address
¶ Alias for field number 0
-
value
¶ Alias for field number 1
-
-
class
noodles.workflow.
ArgumentAddress
(kind, name, key)¶ Codifies a value given for some argument.
-
key
¶ Alias for field number 2
-
kind
¶ Alias for field number 0
-
name
¶ Alias for field number 1
-
-
class
noodles.workflow.
ArgumentKind
¶ Codifies the location to a unique argument of a function.
Promised object¶
-
noodles.interface.
delay
(value)¶ (scheduled) Creates a promise of a given value. TODO: this function should have a different name.
-
noodles.interface.
gather
(*a)¶ (scheduled) Converts a list of promises (i.e.
PromisedObject
) to a promised list of values.
-
noodles.interface.
gather_all
(a)¶ Converts an iterator of promises into a promise of a list.
-
noodles.interface.
gather_dict
(**kwargs)¶ (scheduled) Creates a promise of a dictionary.
-
noodles.interface.
schedule_hint
(**hints)¶ Decorator; same as
schedule()
, with added hints. These hints can be anything.
-
noodles.interface.
schedule
(f, **hints)¶ Decorator; schedule calls to function
f
into a workflow, in stead of running them at once. The decorated function returns aPromisedObject
.
-
noodles.interface.
unpack
(t, n)¶ Iterates over a promised sequence, the sequence should support random access by
object.__getitem__()
. Also the length of the sequence should be known beforehand.Parameters: - t – a sequence.
- n – the length of the sequence.
Returns: an unpackable generator for the elements in the sequence.
-
noodles.interface.
has_scheduled_methods
(cls)¶ Decorator; use this on a class for which some methods have been decorated with
schedule()
orschedule_hint()
. Those methods are then tagged with the attribute__member_of__
, so that we may serialise and retrieve the correct method. This should be considered a patch to a flaw in the Python object model.
-
noodles.interface.
unwrap
(f)¶ Safely obtain the inner function of a previously wrapped (or decorated) function. This either returns
f.__wrapped__
or justf
if the latter fails.
-
noodles.interface.
update_hints
(obj, data)¶ Update the hints on the root-node of a workflow. Usually, schedule hints are fixed per function. Sometimes a user may want to set hints manually on a specific promised object.
update_hints()
uses theupdate
method on the hints dictionary withdata
as its argument.Parameters: - obj – a
PromisedObject
. - data – a
dict
containing additional hints.
The hints are modified, in place, on the node. All workflows that contain the node are affected.
- obj – a
-
noodles.interface.
lift
(obj, memo=None)¶ Make a promise out of object
obj
, whereobj
may contain promises internally.Parameters: - obj – Any object.
- memo – used for internal caching (similar to
deepcopy()
).
If the object is a
PromisedObject
, or pass-by-value (str
,int
,float
,complex
) it is returned as is.If the object’s
id
has an entry inmemo
, the value frommemo
is returned.If the object has a method
__lift__
, it is used to get the promise.__lift__
should take one additional argument for thememo
dictionary, entirely analogous todeepcopy()
.If the object is an instance of one of the basic container types (list, dictionary, tuple and set), we use the analogous function (
make_list()
,make_dict()
,make_tuple()
, andmake_set()
) to promise their counterparts should these objects contain any promises. First, we map all items in the container throughlift()
, then check the result for any promises. Note that in the case of dictionaries, we lift all the items (i.e. the list of key/value tuples) and then construct a new dictionary.If the object is an instance of a subclass of any of the basic container types, the
__dict__
of the object is lifted as well as the object cast to its base type. We then useset_dict()
to set the__dict__
of the new promise. Again, if the object did not contain any promises, we return it without change.Otherwise, we lift the
__dict__
and create a promise of a new object of the same class as the input, usingcreate_object()
. This works fine for what we call reasonable objects. Since callinglift()
is an explicit action, we do not require reasonable objects to be derived fromReasonable
as we do with serialisation, where such a default behaviour could lead to unexplicable bugs.
-
noodles.interface.
failed
(obj)¶ Returns True if
obj
is an instance ofFail
.
-
class
noodles.interface.
PromisedObject
(workflow)¶ Wraps a
Workflow
. The workflow represents the future promise of a Python object. By wrapping the workflow, we can mock the behaviour of this future object and schedule methods that were called by the user as if nothing weird is going on.
-
class
noodles.interface.
Quote
(promise)¶ Quote objects store the contents of a workflow, allowing the workflow to be passed as an argument to a higher order function without its contents being evaluated. Don’t use this object, rather use the functions
quote()
andunquote()
.
-
noodles.interface.
quote
(promise)¶ Quote a promise.
-
noodles.interface.
unquote
(quoted)¶ Unquote a quoted promise.
-
noodles.interface.
result
(obj)¶ Results are stored on the nodes in the workflow at run time. This function can be used to get at a result of a node in a workflow after run time. This is not a recommended way of getting at results, but can help with debugging.
-
noodles.interface.
maybe
(func)¶ Calls
f
in a try/except block, returning aFail
object if the call fails in any way. If any of the arguments to the call are Fail objects, the call is not attempted.
-
class
noodles.interface.
Fail
(func, fails=None, exception=None)¶ Signifies a failure in a computation that was wrapped by a
@maybe
decorator. Because Noodles runs all functions from the same context, it is not possible to use Python stack traces to find out where an error happened. In stead we use aFail
object to store information about exceptions and the subsequent continuation of the failure.-
add_call
(func)¶ Add a call to the trace.
-
is_root_cause
¶ If the field
exception
is set in this object, it means that we are looking at the root cause of the failure.
-
-
noodles.interface.
simple_lift
(obj)¶ (scheduled) Create a promise from a plain object.
Runners¶
-
class
noodles.run.scheduler.
Scheduler
(verbose=False, error_handler=None, job_keeper=None)¶ Schedules jobs, recieves results, then schedules more jobs as they become ready to compute. This class communicates with a pool of workers by means of coroutines.
-
run
(connection: noodles.lib.connection.Connection, master: noodles.workflow.model.Workflow)¶ Run a workflow.
Parameters: - connection (Connection) – A connection giving a sink to the job-queue and a source yielding results.
- master (Workflow) – The workflow.
-
-
noodles.run.hybrid.
hybrid_coroutine_worker
(selector, workers)¶ Runs a set of workers, all of them in the main thread. This runner is here for testing purposes.
Parameters: - selector (function) – A function returning a worker key, given a job.
- workers (dict) – A dict of workers.
-
noodles.run.hybrid.
hybrid_threaded_worker
(selector, workers)¶ Runs a set of workers, each in a separate thread.
Parameters: - selector – A function that takes a hints-tuple and returns a key
indexing a worker in the
workers
dictionary. - workers – A dictionary of workers.
Returns: A connection for the scheduler.
Return type: The hybrid worker dispatches jobs to the different workers based on the information contained in the hints. If no hints were given, the job is run in the main thread.
Dispatching is done in the main thread. Retrieving results is done in a separate thread for each worker. In this design it is assumed that dispatching a job takes little time, while waiting for one to return a result may take a long time.
- selector – A function that takes a hints-tuple and returns a key
indexing a worker in the
-
noodles.run.hybrid.
run_hybrid
(wf, selector, workers)¶ Returns the result of evaluating the workflow; runs through several supplied workers in as many threads.
Parameters: - wf (
Workflow
orPromisedObject
) – Workflow to compute - selector – A function selecting the worker that should be run, given a hint.
- workers – A dictionary of workers
Returns: result of running the workflow
- wf (
Serialisation¶
-
noodles.serial.
pickle
()¶ Returns a serialisation registry that “just pickles everything”.
This registry can be used to bolt-on other registries and keep the pickle as the default. The objects are first pickled to a byte-array, which is subsequently encoded with base64.
-
noodles.serial.
base
()¶ Returns the Noodles base serialisation registry.
-
class
noodles.serial.
Registry
(parent=None, types=None, hooks=None, hook_fn=None, default=None)¶ Serialisation registry, keeps a record of
Serialiser
objects.The Registry keeps a dictionary mapping (qualified) class names to
Serialiser
objects. Given an object, the__getitem__
method looks for the highest base class that it has a serialiser for. As a fall-back we install a Serialiser matching the Pythonobject
class.Detection by object type is not always meaningful or even possible. Before scannning for known base classes the look-up function passes the object through the
hook
function, which should return a string orNone
. If a string is returned that string is used to look-up the serialiser.Registries can be combined using the ‘+’ operator. The left side argument is than used as
parent
to the new Registry, while the right-hand argument overrides and augments the Serialisers present. Thehook
functions are being chained, such that the right-hand registry takes precedence. The default serialiser is inherrited from the left-hand argument.-
decode
(rec, deref=False)¶ Decode a record to return an object that could be considered equivalent to the original.
The record is not touched if
_noodles
is not an item in the record.Parameters: - rec (dict) – A dictionary record to be decoded.
- deref (bool) – Wether to decode a RefObject. If the encoder wrote files on a remote host, reading this file will be slow and result in an error if the file is not present.
-
dereference
(data, host=None)¶ Dereferences RefObjects stuck in the hierarchy. This is a bit of an ugly hack.
-
encode
(obj, host=None)¶ Encode an object using the serialisers available in this registry. Objects that have a type that is one of [dict, list, str, int, float, bool, tuple] are send back unchanged.
A host-name can be given as an additional argument to identify the host in the resulting record if the encoder yields any filenames.
This function only treats the object for one layer deep.
Parameters: - obj – The object that needs encoding.
- host (str) – The name of the encoding host.
-
from_json
(data, deref=False)¶ Decode the string from JSON to return the original object (if
deref
is true. Uses thejson.loads
function withself.decode
as object_hook.Parameters: - data (str) – JSON encoded string.
- deref (bool) – Whether to decode records that gave
ref=True
at encoding.
-
to_json
(obj, host=None, indent=None)¶ Recursively encode
obj
and convert it to a JSON string.Parameters: - obj – Object to encode.
- host (str) – hostname where this object is being encoded.
-
-
class
noodles.serial.
Serialiser
(name='<unknown>')¶ Serialiser base class.
Serialisation classes should derive from
Serialiser
and overload theencode
anddecode
methods.Parameters: base (type) – The type that this class is supposed to serialise. This may differ from the type of the object actually being serialised if its class was derived from base
. The supposed base-class is kept here for reference but serves no immediate purpose.-
decode
(cls, data)¶ Should decode the data to an object of type ‘cls’.
Parameters: - cls (type) – The class is retrieved by the qualified name of the type of the object that was encoded; restored by importing it.
- data – The data is the record that was passed to
make_rec
by the encoder.
-
encode
(obj, make_rec)¶ Should encode an object of type
self.base
(or derived).This method receives the object and a function
make_rec
. This function has signature:def make_rec(rec, ref=False, files=None): ...
If encoding and decoding is somewhat cosuming on resources, the encoder may call with
ref=True
. Then the resulting record won’t be decoded until needed by the next job. This is most certainly the case when an external file was written. In this case the filename(s) should be passed as a list byfiles=[...]
.The
files
list is not passed back to the decoder. Rather it is used by noodles to keep track of written files and copy them between hosts if needed. It is the responsibily of the encoder to include the filename information in the passed record as well.Parameters: - obj – Object to be encoded.
- make_rec – Function used to pack the encoded data with some meta-data.
-
-
class
noodles.serial.
SerPath
¶ -
decode
(cls, data)¶ Should decode the data to an object of type ‘cls’.
Parameters: - cls (type) – The class is retrieved by the qualified name of the type of the object that was encoded; restored by importing it.
- data – The data is the record that was passed to
make_rec
by the encoder.
-
encode
(obj, make_rec)¶ Should encode an object of type
self.base
(or derived).This method receives the object and a function
make_rec
. This function has signature:def make_rec(rec, ref=False, files=None): ...
If encoding and decoding is somewhat cosuming on resources, the encoder may call with
ref=True
. Then the resulting record won’t be decoded until needed by the next job. This is most certainly the case when an external file was written. In this case the filename(s) should be passed as a list byfiles=[...]
.The
files
list is not passed back to the decoder. Rather it is used by noodles to keep track of written files and copy them between hosts if needed. It is the responsibily of the encoder to include the filename information in the passed record as well.Parameters: - obj – Object to be encoded.
- make_rec – Function used to pack the encoded data with some meta-data.
-
-
class
noodles.serial.
RefObject
(rec)¶ Placeholder object to delay decoding a serialised object until needed by a worker.
-
class
noodles.serial.
AsDict
(cls)¶ -
decode
(cls, data)¶ Should decode the data to an object of type ‘cls’.
Parameters: - cls (type) – The class is retrieved by the qualified name of the type of the object that was encoded; restored by importing it.
- data – The data is the record that was passed to
make_rec
by the encoder.
-
encode
(obj, make_rec)¶ Should encode an object of type
self.base
(or derived).This method receives the object and a function
make_rec
. This function has signature:def make_rec(rec, ref=False, files=None): ...
If encoding and decoding is somewhat cosuming on resources, the encoder may call with
ref=True
. Then the resulting record won’t be decoded until needed by the next job. This is most certainly the case when an external file was written. In this case the filename(s) should be passed as a list byfiles=[...]
.The
files
list is not passed back to the decoder. Rather it is used by noodles to keep track of written files and copy them between hosts if needed. It is the responsibily of the encoder to include the filename information in the passed record as well.Parameters: - obj – Object to be encoded.
- make_rec – Function used to pack the encoded data with some meta-data.
-
-
class
noodles.serial.
Reasonable
¶ A Reasonable object is an object which is most reasonably serialised using its
__dict__
property. To deserialise the object, we first create an instance using the__new__
method, then setting the__dict__
property manualy. This class is empty, it is used as a tag to designate other objects as reasonable.
-
class
noodles.serial.registry.
RefObject
(rec)¶ Placeholder object to delay decoding a serialised object until needed by a worker.
-
class
noodles.serial.registry.
Registry
(parent=None, types=None, hooks=None, hook_fn=None, default=None)¶ Serialisation registry, keeps a record of
Serialiser
objects.The Registry keeps a dictionary mapping (qualified) class names to
Serialiser
objects. Given an object, the__getitem__
method looks for the highest base class that it has a serialiser for. As a fall-back we install a Serialiser matching the Pythonobject
class.Detection by object type is not always meaningful or even possible. Before scannning for known base classes the look-up function passes the object through the
hook
function, which should return a string orNone
. If a string is returned that string is used to look-up the serialiser.Registries can be combined using the ‘+’ operator. The left side argument is than used as
parent
to the new Registry, while the right-hand argument overrides and augments the Serialisers present. Thehook
functions are being chained, such that the right-hand registry takes precedence. The default serialiser is inherrited from the left-hand argument.-
decode
(rec, deref=False)¶ Decode a record to return an object that could be considered equivalent to the original.
The record is not touched if
_noodles
is not an item in the record.Parameters: - rec (dict) – A dictionary record to be decoded.
- deref (bool) – Wether to decode a RefObject. If the encoder wrote files on a remote host, reading this file will be slow and result in an error if the file is not present.
-
dereference
(data, host=None)¶ Dereferences RefObjects stuck in the hierarchy. This is a bit of an ugly hack.
-
encode
(obj, host=None)¶ Encode an object using the serialisers available in this registry. Objects that have a type that is one of [dict, list, str, int, float, bool, tuple] are send back unchanged.
A host-name can be given as an additional argument to identify the host in the resulting record if the encoder yields any filenames.
This function only treats the object for one layer deep.
Parameters: - obj – The object that needs encoding.
- host (str) – The name of the encoding host.
-
from_json
(data, deref=False)¶ Decode the string from JSON to return the original object (if
deref
is true. Uses thejson.loads
function withself.decode
as object_hook.Parameters: - data (str) – JSON encoded string.
- deref (bool) – Whether to decode records that gave
ref=True
at encoding.
-
to_json
(obj, host=None, indent=None)¶ Recursively encode
obj
and convert it to a JSON string.Parameters: - obj – Object to encode.
- host (str) – hostname where this object is being encoded.
-
-
class
noodles.serial.registry.
SerUnknown
(name='<unknown>')¶ -
decode
(cls, data)¶ Should decode the data to an object of type ‘cls’.
Parameters: - cls (type) – The class is retrieved by the qualified name of the type of the object that was encoded; restored by importing it.
- data – The data is the record that was passed to
make_rec
by the encoder.
-
encode
(obj, make_rec)¶ Should encode an object of type
self.base
(or derived).This method receives the object and a function
make_rec
. This function has signature:def make_rec(rec, ref=False, files=None): ...
If encoding and decoding is somewhat cosuming on resources, the encoder may call with
ref=True
. Then the resulting record won’t be decoded until needed by the next job. This is most certainly the case when an external file was written. In this case the filename(s) should be passed as a list byfiles=[...]
.The
files
list is not passed back to the decoder. Rather it is used by noodles to keep track of written files and copy them between hosts if needed. It is the responsibily of the encoder to include the filename information in the passed record as well.Parameters: - obj – Object to be encoded.
- make_rec – Function used to pack the encoded data with some meta-data.
-
-
class
noodles.serial.registry.
Serialiser
(name='<unknown>')¶ Serialiser base class.
Serialisation classes should derive from
Serialiser
and overload theencode
anddecode
methods.Parameters: base (type) – The type that this class is supposed to serialise. This may differ from the type of the object actually being serialised if its class was derived from base
. The supposed base-class is kept here for reference but serves no immediate purpose.-
decode
(cls, data)¶ Should decode the data to an object of type ‘cls’.
Parameters: - cls (type) – The class is retrieved by the qualified name of the type of the object that was encoded; restored by importing it.
- data – The data is the record that was passed to
make_rec
by the encoder.
-
encode
(obj, make_rec)¶ Should encode an object of type
self.base
(or derived).This method receives the object and a function
make_rec
. This function has signature:def make_rec(rec, ref=False, files=None): ...
If encoding and decoding is somewhat cosuming on resources, the encoder may call with
ref=True
. Then the resulting record won’t be decoded until needed by the next job. This is most certainly the case when an external file was written. In this case the filename(s) should be passed as a list byfiles=[...]
.The
files
list is not passed back to the decoder. Rather it is used by noodles to keep track of written files and copy them between hosts if needed. It is the responsibily of the encoder to include the filename information in the passed record as well.Parameters: - obj – Object to be encoded.
- make_rec – Function used to pack the encoded data with some meta-data.
-
Worker executable¶
Streams¶
Coroutine streaming module¶
Note
In a break with tradition, some classes in this module have lower case names because they tend to be used as function decorators.
We use coroutines to communicate messages between different components in the Noodles runtime. Coroutines can have input or output in two ways passive and active. An example:
def f_pulls(coroutine):
for msg in coroutine:
print(msg)
def g_produces(lines):
for l in lines:
yield lines
lines = ['aap', 'noot', 'mies']
f_pulls(g_produces(lines))
This prints the words ‘aap’, ‘noot’ and ‘mies’. This same program could be written where the co-routine is the one receiving messages:
def f_receives():
while True:
msg = yield
print(msg)
def g_pushes(coroutine, lines):
for l in lines:
coroutine.send(l)
sink = f_receives()
sink.send(None) # the co-routine needs to be initialised
# alternatively, .next() does the same as .send(None)
g_pushes(sink, lines)
The action of creating a coroutine and setting it to the first yield
statement can be performed by a little decorator:
from functools import wraps
def coroutine(f):
@wraps(f)
def g(*args, **kwargs):
sink = f(*args, **kwargs)
sink.send(None)
return sink
return g
Pull and push¶
The pull
and push
classes capture the idea of pushing and pulling
coroutines, wrapping them in an object. These objects can then be chained
using the >>
operator. Example:
>>> from noodles.lib import (pull_map, pull_from)
>>> @pull_map
... def square(x):
... return x*x
...
>>> squares = pull_from(range(10)) >> square
>>> list(squares)
[0, 1, 4, 9, 16, 25, 36, 49, 64, 81]
Queues¶
Queues in python are thread-safe objects. We can define a new Queue
object
that uses the python queue.Queue
to buffer and distribute messages over
several threads:
import queue
class Queue(object):
def __init__(self):
self._q = queue.Queue()
def source(self):
while True:
msg = self._q.get()
yield msg
self._q.task_done()
@coroutine
def sink(self):
while True:
msg = yield
self._q.put(msg)
def wait(self):
self.Q.join()
Note, that both ends of the queue are, as we call it, passive. We could make
an active source (it would become a normal function), taking a call-back as
an argument. However, we’re designing the Noodles runtime so that it easy to
interleave functionality. Moreover, the Queue
object is only concerned
with the state of its own queue. The outside universe is only represented by
the yield
statements, thus preserving the principle of encapsulation.
-
noodles.lib.
decorator
(f)¶ Creates a paramatric decorator from a function. The resulting decorator will optionally take keyword arguments.
-
noodles.lib.
coroutine
(f)¶ A sink should be send
None
first, so that the coroutine arrives at theyield
position. This wrapper takes care that this is done automatically when the coroutine is started.
-
class
noodles.lib.
stream
¶ Base class for pull and push coroutines.
-
class
noodles.lib.
pull
(fn)¶ A
pull
coroutine pulls from a source, yielding values.pull
Objects can be chained using the>>
operator.A
pull
object acts as a function of one argument, being the source that the coroutine will pull from. This source argument must always be a thunk (function of zero arguments) returning an iterable.
-
class
noodles.lib.
push
(fn, dont_wrap=False)¶ A
push
coroutine pushes to a sink, receiving values throughyield
statements.push
Objects can be chained using the>>
operator.A
push
object acts as a function of one argument, being the sink that the coroutine will send to. This sink argument must always be a thunk (function of zero arguments) returning a coroutine.
-
class
noodles.lib.
pull_map
(f)¶ A
pull_map
decorates a function of a single argument, to become apull
object. The resultingpull
object pulls object from a source yielding values mapped through the given function.This is equivalent to:
@pull def g(source): yield from map(f, source())
where
f
is the function being decorated.The
>>
operator on this class is optimised such that only a single loop will be created when chained with anotherpull_map
.Also, a
pull_map
may be chained to a function directly, including the given function in the loop.
-
class
noodles.lib.
push_map
(f)¶ A
push_map
decorates a function of a single argument, to become apush
object. The resultingpush
object receives values throughyield
and sends them on after mapping through the given function.This is equivalent to:
@push def g(sink): sink = sink() while True: x = yield sink.send(f(x))
where
f
is the function being decorated.The
>>
operator on this class is optimised such that only a single loop will be created when chained with anotherpush_map
.Also, a
push_map
may be chained to a function directly, including the given function in the loop.
-
noodles.lib.
sink_map
(f)¶ The
sink_map()
decorator creates apush
object from a function that returns no values. The resulting sink can only be used as an end point of a chain.Equivalent code:
@push def sink(): while True: x = yield f(x)
-
noodles.lib.
broadcast
(*sinks_)¶ The
broadcast()
decorator creates apush
object that receives a message byyield
and then sends this message on to all the given sinks.
-
noodles.lib.
branch
(*sinks_)¶ The
branch()
decorator creates apull
object that pulls from a single source and then sends to all the sinks given. After all the sinks received the message, it is yielded.
-
noodles.lib.
patch
(source, sink)¶ Create a direct link between a source and a sink.
Implementation:
sink = sink() for value in source(): sink.send(value)
-
noodles.lib.
pull_from
(iterable)¶ Creates a
pull
object from an iterable.Parameters: iterable ( Iterable
) – an iterable object.Return type: pull
Equivalent to:
pull(lambda: iter(iterable))
-
noodles.lib.
push_from
(iterable)¶ Creates a
push
object from an iterable. The resulting function is not a coroutine, but can be chained to anotherpush
.Parameters: iterable ( Iterable
) – an iterable object.Return type: push
-
class
noodles.lib.
Connection
(source, sink, aux=None)¶ Combine a source and a sink. These should represent the IO of some object, probably a worker. In this case the
source
is a coroutine generating results, while the sink needs to be fed jobs.-
setup
()¶ Activate the source and sink functions and return them in that order.
Returns: source, sink Return type: tuple
-
-
class
noodles.lib.
Queue
(end_of_queue=<class 'noodles.lib.queue.EndOfQueue'>)¶ A
Queue
object hides aqueue.Queue
object behind a source and sink interface.-
sink
¶ Receives items that are put on the queue. Pushing the
end-of-queue
message through the sink will put it on the queue, and will also result in aStopIteration
exception being raised.
-
source
¶ Pull items from the queue. When
end-of-queue
is encountered the generator returns after re-inserting theend-of-queue
message on the queue for other sources to pick up. This way, if many threads are pulling from this queue, they all get theend-of-queue
message.
-
close
()¶ Sends
end_of_queue
message to the queue. Doesn’t stop running sinks.
-
flush
()¶ Erases queue and set
end-of-queue
message.
-
-
noodles.lib.
thread_pool
(*workers, results=None, end_of_queue=<class 'noodles.lib.queue.EndOfQueue'>)¶ Returns a
pull
object, call itr
, starting a thread for each given worker. Each thread pulls from the source thatr
is connected to, and the returned results are pushed to aQueue
.r
yields from the other end of the sameQueue
.The target function for each thread is
patch()
, which can be stopped by exhausting the source.If all threads have ended, the result queue receives end-of-queue.
Parameters: - results (
Connection
) – If results should go somewhere else than a newly constructedQueue
, a differentConnection
object can be given. - end_of_queue – end-of-queue signal object passed on to the creation
of the
Queue
object.
Return type: - results (
-
noodles.lib.
thread_counter
(finalize)¶ Modifies a thread target function, such that the number of active threads is counted. If the count reaches zero, a finalizer is called.
-
noodles.lib.
object_name
(obj)¶ Get the qualified name of an object. This will obtain both the module name from
__module__
and object name from__name__
, and concatenate those with a ‘.’. Examples:>>> from math import sin >>> object_name(sin) 'math.sin'
>>> def f(x): ... return x*x ... >>> object_name(f) '__main__.f'
To have a qualified name, an object must be defined as a class or function in a module (
__main__
is also a module). A normal instantiated object does not have a qualified name, even if it is defined and importable from a module. Callingobject_name()
on such an object will raiseAttributeError
.
-
noodles.lib.
look_up
(name)¶ Obtain an object from a qualified name. Example:
>>> look_up('math.sin') <built-in function sin>
This function should be considered the reverse of
object_name()
.
-
noodles.lib.
importable
(obj)¶ Check if an object can be serialised as a qualified name. This is done by checking that a
look_up(object_name(obj))
gives back the same object.
-
noodles.lib.
deep_map
(f, root)¶ Sibling to
inverse_deep_map()
. Asmap()
maps over an iterable,deep_map()
maps over a structure of nested ``dict``s and ``list``s. Every object is passed throughf
recursively. That is, firstroot
is mapped, next any object contained in its result, and so on.No distinction is made between tuples and lists. This function was created with encoding to JSON compatible data in mind.
-
noodles.lib.
inverse_deep_map
(f, root)¶ Sibling to
deep_map()
. Recursively maps objects in a nested structure oflist
anddict
objects. Wheredeep_map()
starts at the top,inverse_deep_map()
starts at the bottom. First, ifroot
is alist
ordict
, its contents are |inverse_deep_map|ed. Then at the end, the entire object is passed throughf
.This function was created with decoding from JSON compatible data in mind.
-
noodles.lib.
unwrap
(f)¶ Safely obtain the inner function of a previously wrapped (or decorated) function. This either returns
f.__wrapped__
or justf
if the latter fails.
-
noodles.lib.
is_unwrapped
(f)¶ If
f
was imported and then unwrapped, this function might return True.