Development documentation

(stub)

Noodles

noodles.schedule(f, **hints)

Decorator; schedule calls to function f into a workflow, in stead of running them at once. The decorated function returns a PromisedObject.

noodles.schedule_hint(**hints)

Decorator; same as schedule(), with added hints. These hints can be anything.

noodles.run_single(workflow)

“Run workflow in a single thread (same as the scheduler).

Parameters:workflow – Workflow or PromisedObject to be evaluated.
Returns:Evaluated result.
noodles.run_process(workflow, *, n_processes, registry, verbose=False, jobdirs=False, init=None, finish=None, deref=False)

Run the workflow using a number of new python processes. Use this runner to test the workflow in a situation where data serial is needed.

Parameters:
  • workflow (Workflow or PromisedObject) – The workflow.
  • n_processes – Number of processes to start.
  • registry – The serial registry.
  • verbose – Request verbose output on worker side
  • jobdirs – Create a new directory for each job to prevent filename collision.(NYI)
  • init – An init function that needs to be run in each process before other jobs can be run. This should be a scheduled function returning True on success.
  • finish – A function that wraps up when the worker closes down.
  • deref (bool) – Set this to True to pass the result through one more encoding and decoding step with object derefencing turned on.
Returns:

the result of evaluating the workflow

Return type:

any

class noodles.Scheduler(verbose=False, error_handler=None, job_keeper=None)

Schedules jobs, recieves results, then schedules more jobs as they become ready to compute. This class communicates with a pool of workers by means of coroutines.

run(connection: noodles.lib.connection.Connection, master: noodles.workflow.model.Workflow)

Run a workflow.

Parameters:
  • connection (Connection) – A connection giving a sink to the job-queue and a source yielding results.
  • master (Workflow) – The workflow.
noodles.has_scheduled_methods(cls)

Decorator; use this on a class for which some methods have been decorated with schedule() or schedule_hint(). Those methods are then tagged with the attribute __member_of__, so that we may serialise and retrieve the correct method. This should be considered a patch to a flaw in the Python object model.

class noodles.Fail(func, fails=None, exception=None)

Signifies a failure in a computation that was wrapped by a @maybe decorator. Because Noodles runs all functions from the same context, it is not possible to use Python stack traces to find out where an error happened. In stead we use a Fail object to store information about exceptions and the subsequent continuation of the failure.

add_call(func)

Add a call to the trace.

is_root_cause

If the field exception is set in this object, it means that we are looking at the root cause of the failure.

noodles.failed(obj)

Returns True if obj is an instance of Fail.

noodles.run_logging(wf, n_threads, display)

Adds a display to the parallel runner. Because messages come in asynchronously now, we start an extra thread just for the display routine.

noodles.run_parallel(workflow, n_threads)

Run a workflow in parallel threads.

Parameters:
  • workflow – Workflow or PromisedObject to evaluate.
  • n_threads – number of threads to use (in addition to the scheduler).
Returns:

evaluated workflow.

noodles.unwrap(f)

Safely obtain the inner function of a previously wrapped (or decorated) function. This either returns f.__wrapped__ or just f if the latter fails.

noodles.gather(*a)

(scheduled) Converts a list of promises (i.e. PromisedObject) to a promised list of values.

noodles.gather_all(a)

Converts an iterator of promises into a promise of a list.

noodles.gather_dict(**kwargs)

(scheduled) Creates a promise of a dictionary.

noodles.lift(obj, memo=None)

Make a promise out of object obj, where obj may contain promises internally.

Parameters:
  • obj – Any object.
  • memo – used for internal caching (similar to deepcopy()).

If the object is a PromisedObject, or pass-by-value (str, int, float, complex) it is returned as is.

If the object’s id has an entry in memo, the value from memo is returned.

If the object has a method __lift__, it is used to get the promise. __lift__ should take one additional argument for the memo dictionary, entirely analogous to deepcopy().

If the object is an instance of one of the basic container types (list, dictionary, tuple and set), we use the analogous function (make_list(), make_dict(), make_tuple(), and make_set()) to promise their counterparts should these objects contain any promises. First, we map all items in the container through lift(), then check the result for any promises. Note that in the case of dictionaries, we lift all the items (i.e. the list of key/value tuples) and then construct a new dictionary.

If the object is an instance of a subclass of any of the basic container types, the __dict__ of the object is lifted as well as the object cast to its base type. We then use set_dict() to set the __dict__ of the new promise. Again, if the object did not contain any promises, we return it without change.

Otherwise, we lift the __dict__ and create a promise of a new object of the same class as the input, using create_object(). This works fine for what we call reasonable objects. Since calling lift() is an explicit action, we do not require reasonable objects to be derived from Reasonable as we do with serialisation, where such a default behaviour could lead to unexplicable bugs.

noodles.unpack(t, n)

Iterates over a promised sequence, the sequence should support random access by object.__getitem__(). Also the length of the sequence should be known beforehand.

Parameters:
  • t – a sequence.
  • n – the length of the sequence.
Returns:

an unpackable generator for the elements in the sequence.

noodles.maybe(func)

Calls f in a try/except block, returning a Fail object if the call fails in any way. If any of the arguments to the call are Fail objects, the call is not attempted.

noodles.delay(value)

(scheduled) Creates a promise of a given value. TODO: this function should have a different name.

noodles.update_hints(obj, data)

Update the hints on the root-node of a workflow. Usually, schedule hints are fixed per function. Sometimes a user may want to set hints manually on a specific promised object. update_hints() uses the update method on the hints dictionary with data as its argument.

Parameters:
  • obj – a PromisedObject.
  • data – a dict containing additional hints.

The hints are modified, in place, on the node. All workflows that contain the node are affected.

noodles.quote(promise)

Quote a promise.

noodles.unquote(quoted)

Unquote a quoted promise.

noodles.result(obj)

Results are stored on the nodes in the workflow at run time. This function can be used to get at a result of a node in a workflow after run time. This is not a recommended way of getting at results, but can help with debugging.

noodles.fold(fun: Callable, state: Any, xs: Iterable)

(scheduled) Traverse an iterable object while performing stateful computations with the elements. It returns a PromisedObject containing the result of the stateful computations.

For a general definition of folding see: https://en.wikipedia.org/wiki/Fold_(higher-order_function)

Parameters:
  • fun – stateful function.
  • state – initial state.
  • xs – iterable object.
Returns:

PromisedObject

noodles.find_first(pred, lst)

Find the first result of a list of promises lst that satisfies a predicate pred.

Parameters:
  • pred – a function of one argument returning True or False.
  • lst – a list of promises or values.
Returns:

a promise of a value or None.

This is a wrapper around s_find_first(). The first item on the list is passed as is, forcing evalutation. The tail of the list is quoted, and only unquoted if the predicate fails on the result of the first promise.

If the input list is empty, None is returned.

noodles.conditional(b: bool, branch_true: Any, branch_false: Any = None) → Any

Control statement to follow a branch in workflow. Equivalent to the if statement in standard Python.

The quote function delay the evaluation of the branches until the boolean is evaluated.

Parameters:
  • b – promised boolean value.
  • branch_true – statement to execute in case of a true predicate.
  • branch_false – default operation to execute in case of a false predicate.
Returns:

PromisedObject

noodles.simple_lift(obj)

(scheduled) Create a promise from a plain object.

Internal Specs

Inverts the call-graph to get a dependency graph. Possibly slow, short version.

Parameters:links (Mapping[NodeId, Set[(NodeId, ArgumentType, [int|str]])]) – forward links of a call-graph.
Returns:inverted graph, giving dependency of jobs.
Return type:Mapping[NodeId, Mapping[(ArgumentType, [int|str]), NodeId]]
noodles.workflow.from_call(foo, args, kwargs, hints, call_by_value=True)

Takes a function and a set of arguments it needs to run on. Returns a newly constructed workflow representing the promised value from the evaluation of the function with said arguments.

These arguments are stored in a BoundArguments object matching to the signature of the given function foo. That is, bound_args was constructed by doing:

inspect.signature(foo).bind(*args, **kwargs)

The arguments stored in the bound_args object are filtered on being either plain, or promised. If an argument is promised, the value it represents is not actually available and needs to be computed by evaluating a workflow.

If an argument is a promised value, the workflow representing the value is added to the new workflow. First all the nodes in the original workflow, if not already present in the new workflow from an earlier argument, are copied to the new workflow, and a new entry is made into the link dictionary. Then the links in the old workflow are also added to the link dictionary. Since the link dictionary points from nodes to a set of ArgumentAddress es, no links are duplicated.

In the bound_args object the promised value is replaced by the Empty object, so that we can see which arguments still have to be evaluated.

Doing this for all promised value arguments in the bound_args object, results in a new workflow with all the correct dependencies represented as links in the graph.

Parameters:
  • foo (Callable) – Function (or object) being called.
  • args – Normal arguments to call
  • kwargs – Keyword arguments to call
  • hints – Hints that can be passed to the scheduler on where or how to schedule this job.
Returns:

New workflow.

Return type:

Workflow

class noodles.workflow.Workflow(root, nodes, links)

The workflow data container.

root

A reference to the root node in the graph.

nodes

A dict listing the nodes in the graph. We use a dict only to have a persistent object reference.

A dict giving a set of links from each node.

class noodles.workflow.FunctionNode(foo, bound_args, hints, result)

Captures a function call as a combination of function and arguments. Some of these arguments may be set to Empty, these need to be filled in by the workflow before the function can be applied.

foo

The function (or object) that is being called.

bound_args

A BoundArguments object storing the arguments to the function.

data

Convert to a NodeData for subsequent serial.

class noodles.workflow.NodeData(function, arguments, hints)
arguments

Alias for field number 1

function

Alias for field number 0

hints

Alias for field number 2

noodles.workflow.insert_result(node, address, value)

Runs set_argument, but checks first wether the data location is not already filled with some data. In any normal circumstance this checking is redundant, but if we don’t give an error here the program would continue with unexpected results.

noodles.workflow.Empty

alias of inspect._empty

noodles.workflow.is_node_ready(node)

Returns True if none of the argument holders contain any Empty object.

class noodles.workflow.Argument(address, value)
address

Alias for field number 0

value

Alias for field number 1

class noodles.workflow.ArgumentAddress(kind, name, key)

Codifies a value given for some argument.

key

Alias for field number 2

kind

Alias for field number 0

name

Alias for field number 1

class noodles.workflow.ArgumentKind

Codifies the location to a unique argument of a function.

Promised object

noodles.interface.delay(value)

(scheduled) Creates a promise of a given value. TODO: this function should have a different name.

noodles.interface.gather(*a)

(scheduled) Converts a list of promises (i.e. PromisedObject) to a promised list of values.

noodles.interface.gather_all(a)

Converts an iterator of promises into a promise of a list.

noodles.interface.gather_dict(**kwargs)

(scheduled) Creates a promise of a dictionary.

noodles.interface.schedule_hint(**hints)

Decorator; same as schedule(), with added hints. These hints can be anything.

noodles.interface.schedule(f, **hints)

Decorator; schedule calls to function f into a workflow, in stead of running them at once. The decorated function returns a PromisedObject.

noodles.interface.unpack(t, n)

Iterates over a promised sequence, the sequence should support random access by object.__getitem__(). Also the length of the sequence should be known beforehand.

Parameters:
  • t – a sequence.
  • n – the length of the sequence.
Returns:

an unpackable generator for the elements in the sequence.

noodles.interface.has_scheduled_methods(cls)

Decorator; use this on a class for which some methods have been decorated with schedule() or schedule_hint(). Those methods are then tagged with the attribute __member_of__, so that we may serialise and retrieve the correct method. This should be considered a patch to a flaw in the Python object model.

noodles.interface.unwrap(f)

Safely obtain the inner function of a previously wrapped (or decorated) function. This either returns f.__wrapped__ or just f if the latter fails.

noodles.interface.update_hints(obj, data)

Update the hints on the root-node of a workflow. Usually, schedule hints are fixed per function. Sometimes a user may want to set hints manually on a specific promised object. update_hints() uses the update method on the hints dictionary with data as its argument.

Parameters:
  • obj – a PromisedObject.
  • data – a dict containing additional hints.

The hints are modified, in place, on the node. All workflows that contain the node are affected.

noodles.interface.lift(obj, memo=None)

Make a promise out of object obj, where obj may contain promises internally.

Parameters:
  • obj – Any object.
  • memo – used for internal caching (similar to deepcopy()).

If the object is a PromisedObject, or pass-by-value (str, int, float, complex) it is returned as is.

If the object’s id has an entry in memo, the value from memo is returned.

If the object has a method __lift__, it is used to get the promise. __lift__ should take one additional argument for the memo dictionary, entirely analogous to deepcopy().

If the object is an instance of one of the basic container types (list, dictionary, tuple and set), we use the analogous function (make_list(), make_dict(), make_tuple(), and make_set()) to promise their counterparts should these objects contain any promises. First, we map all items in the container through lift(), then check the result for any promises. Note that in the case of dictionaries, we lift all the items (i.e. the list of key/value tuples) and then construct a new dictionary.

If the object is an instance of a subclass of any of the basic container types, the __dict__ of the object is lifted as well as the object cast to its base type. We then use set_dict() to set the __dict__ of the new promise. Again, if the object did not contain any promises, we return it without change.

Otherwise, we lift the __dict__ and create a promise of a new object of the same class as the input, using create_object(). This works fine for what we call reasonable objects. Since calling lift() is an explicit action, we do not require reasonable objects to be derived from Reasonable as we do with serialisation, where such a default behaviour could lead to unexplicable bugs.

noodles.interface.failed(obj)

Returns True if obj is an instance of Fail.

class noodles.interface.PromisedObject(workflow)

Wraps a Workflow. The workflow represents the future promise of a Python object. By wrapping the workflow, we can mock the behaviour of this future object and schedule methods that were called by the user as if nothing weird is going on.

class noodles.interface.Quote(promise)

Quote objects store the contents of a workflow, allowing the workflow to be passed as an argument to a higher order function without its contents being evaluated. Don’t use this object, rather use the functions quote() and unquote().

noodles.interface.quote(promise)

Quote a promise.

noodles.interface.unquote(quoted)

Unquote a quoted promise.

noodles.interface.result(obj)

Results are stored on the nodes in the workflow at run time. This function can be used to get at a result of a node in a workflow after run time. This is not a recommended way of getting at results, but can help with debugging.

noodles.interface.maybe(func)

Calls f in a try/except block, returning a Fail object if the call fails in any way. If any of the arguments to the call are Fail objects, the call is not attempted.

class noodles.interface.Fail(func, fails=None, exception=None)

Signifies a failure in a computation that was wrapped by a @maybe decorator. Because Noodles runs all functions from the same context, it is not possible to use Python stack traces to find out where an error happened. In stead we use a Fail object to store information about exceptions and the subsequent continuation of the failure.

add_call(func)

Add a call to the trace.

is_root_cause

If the field exception is set in this object, it means that we are looking at the root cause of the failure.

noodles.interface.simple_lift(obj)

(scheduled) Create a promise from a plain object.

Runners

class noodles.run.scheduler.Scheduler(verbose=False, error_handler=None, job_keeper=None)

Schedules jobs, recieves results, then schedules more jobs as they become ready to compute. This class communicates with a pool of workers by means of coroutines.

run(connection: noodles.lib.connection.Connection, master: noodles.workflow.model.Workflow)

Run a workflow.

Parameters:
  • connection (Connection) – A connection giving a sink to the job-queue and a source yielding results.
  • master (Workflow) – The workflow.
noodles.run.hybrid.hybrid_coroutine_worker(selector, workers)

Runs a set of workers, all of them in the main thread. This runner is here for testing purposes.

Parameters:
  • selector (function) – A function returning a worker key, given a job.
  • workers (dict) – A dict of workers.
noodles.run.hybrid.hybrid_threaded_worker(selector, workers)

Runs a set of workers, each in a separate thread.

Parameters:
  • selector – A function that takes a hints-tuple and returns a key indexing a worker in the workers dictionary.
  • workers – A dictionary of workers.
Returns:

A connection for the scheduler.

Return type:

Connection

The hybrid worker dispatches jobs to the different workers based on the information contained in the hints. If no hints were given, the job is run in the main thread.

Dispatching is done in the main thread. Retrieving results is done in a separate thread for each worker. In this design it is assumed that dispatching a job takes little time, while waiting for one to return a result may take a long time.

noodles.run.hybrid.run_hybrid(wf, selector, workers)

Returns the result of evaluating the workflow; runs through several supplied workers in as many threads.

Parameters:
  • wf (Workflow or PromisedObject) – Workflow to compute
  • selector – A function selecting the worker that should be run, given a hint.
  • workers – A dictionary of workers
Returns:

result of running the workflow

Serialisation

noodles.serial.pickle()

Returns a serialisation registry that “just pickles everything”.

This registry can be used to bolt-on other registries and keep the pickle as the default. The objects are first pickled to a byte-array, which is subsequently encoded with base64.

noodles.serial.base()

Returns the Noodles base serialisation registry.

class noodles.serial.Registry(parent=None, types=None, hooks=None, hook_fn=None, default=None)

Serialisation registry, keeps a record of Serialiser objects.

The Registry keeps a dictionary mapping (qualified) class names to Serialiser objects. Given an object, the __getitem__ method looks for the highest base class that it has a serialiser for. As a fall-back we install a Serialiser matching the Python object class.

Detection by object type is not always meaningful or even possible. Before scannning for known base classes the look-up function passes the object through the hook function, which should return a string or None. If a string is returned that string is used to look-up the serialiser.

Registries can be combined using the ‘+’ operator. The left side argument is than used as parent to the new Registry, while the right-hand argument overrides and augments the Serialisers present. The hook functions are being chained, such that the right-hand registry takes precedence. The default serialiser is inherrited from the left-hand argument.

decode(rec, deref=False)

Decode a record to return an object that could be considered equivalent to the original.

The record is not touched if _noodles is not an item in the record.

Parameters:
  • rec (dict) – A dictionary record to be decoded.
  • deref (bool) – Wether to decode a RefObject. If the encoder wrote files on a remote host, reading this file will be slow and result in an error if the file is not present.
dereference(data, host=None)

Dereferences RefObjects stuck in the hierarchy. This is a bit of an ugly hack.

encode(obj, host=None)

Encode an object using the serialisers available in this registry. Objects that have a type that is one of [dict, list, str, int, float, bool, tuple] are send back unchanged.

A host-name can be given as an additional argument to identify the host in the resulting record if the encoder yields any filenames.

This function only treats the object for one layer deep.

Parameters:
  • obj – The object that needs encoding.
  • host (str) – The name of the encoding host.
from_json(data, deref=False)

Decode the string from JSON to return the original object (if deref is true. Uses the json.loads function with self.decode as object_hook.

Parameters:
  • data (str) – JSON encoded string.
  • deref (bool) – Whether to decode records that gave ref=True at encoding.
to_json(obj, host=None, indent=None)

Recursively encode obj and convert it to a JSON string.

Parameters:
  • obj – Object to encode.
  • host (str) – hostname where this object is being encoded.
class noodles.serial.Serialiser(name='<unknown>')

Serialiser base class.

Serialisation classes should derive from Serialiser and overload the encode and decode methods.

Parameters:base (type) – The type that this class is supposed to serialise. This may differ from the type of the object actually being serialised if its class was derived from base. The supposed base-class is kept here for reference but serves no immediate purpose.
decode(cls, data)

Should decode the data to an object of type ‘cls’.

Parameters:
  • cls (type) – The class is retrieved by the qualified name of the type of the object that was encoded; restored by importing it.
  • data – The data is the record that was passed to make_rec by the encoder.
encode(obj, make_rec)

Should encode an object of type self.base (or derived).

This method receives the object and a function make_rec. This function has signature:

def make_rec(rec, ref=False, files=None):
    ...

If encoding and decoding is somewhat cosuming on resources, the encoder may call with ref=True. Then the resulting record won’t be decoded until needed by the next job. This is most certainly the case when an external file was written. In this case the filename(s) should be passed as a list by files=[...].

The files list is not passed back to the decoder. Rather it is used by noodles to keep track of written files and copy them between hosts if needed. It is the responsibily of the encoder to include the filename information in the passed record as well.

Parameters:
  • obj – Object to be encoded.
  • make_rec – Function used to pack the encoded data with some meta-data.
class noodles.serial.SerPath
decode(cls, data)

Should decode the data to an object of type ‘cls’.

Parameters:
  • cls (type) – The class is retrieved by the qualified name of the type of the object that was encoded; restored by importing it.
  • data – The data is the record that was passed to make_rec by the encoder.
encode(obj, make_rec)

Should encode an object of type self.base (or derived).

This method receives the object and a function make_rec. This function has signature:

def make_rec(rec, ref=False, files=None):
    ...

If encoding and decoding is somewhat cosuming on resources, the encoder may call with ref=True. Then the resulting record won’t be decoded until needed by the next job. This is most certainly the case when an external file was written. In this case the filename(s) should be passed as a list by files=[...].

The files list is not passed back to the decoder. Rather it is used by noodles to keep track of written files and copy them between hosts if needed. It is the responsibily of the encoder to include the filename information in the passed record as well.

Parameters:
  • obj – Object to be encoded.
  • make_rec – Function used to pack the encoded data with some meta-data.
class noodles.serial.RefObject(rec)

Placeholder object to delay decoding a serialised object until needed by a worker.

class noodles.serial.AsDict(cls)
decode(cls, data)

Should decode the data to an object of type ‘cls’.

Parameters:
  • cls (type) – The class is retrieved by the qualified name of the type of the object that was encoded; restored by importing it.
  • data – The data is the record that was passed to make_rec by the encoder.
encode(obj, make_rec)

Should encode an object of type self.base (or derived).

This method receives the object and a function make_rec. This function has signature:

def make_rec(rec, ref=False, files=None):
    ...

If encoding and decoding is somewhat cosuming on resources, the encoder may call with ref=True. Then the resulting record won’t be decoded until needed by the next job. This is most certainly the case when an external file was written. In this case the filename(s) should be passed as a list by files=[...].

The files list is not passed back to the decoder. Rather it is used by noodles to keep track of written files and copy them between hosts if needed. It is the responsibily of the encoder to include the filename information in the passed record as well.

Parameters:
  • obj – Object to be encoded.
  • make_rec – Function used to pack the encoded data with some meta-data.
class noodles.serial.Reasonable

A Reasonable object is an object which is most reasonably serialised using its __dict__ property. To deserialise the object, we first create an instance using the __new__ method, then setting the __dict__ property manualy. This class is empty, it is used as a tag to designate other objects as reasonable.

class noodles.serial.registry.RefObject(rec)

Placeholder object to delay decoding a serialised object until needed by a worker.

class noodles.serial.registry.Registry(parent=None, types=None, hooks=None, hook_fn=None, default=None)

Serialisation registry, keeps a record of Serialiser objects.

The Registry keeps a dictionary mapping (qualified) class names to Serialiser objects. Given an object, the __getitem__ method looks for the highest base class that it has a serialiser for. As a fall-back we install a Serialiser matching the Python object class.

Detection by object type is not always meaningful or even possible. Before scannning for known base classes the look-up function passes the object through the hook function, which should return a string or None. If a string is returned that string is used to look-up the serialiser.

Registries can be combined using the ‘+’ operator. The left side argument is than used as parent to the new Registry, while the right-hand argument overrides and augments the Serialisers present. The hook functions are being chained, such that the right-hand registry takes precedence. The default serialiser is inherrited from the left-hand argument.

decode(rec, deref=False)

Decode a record to return an object that could be considered equivalent to the original.

The record is not touched if _noodles is not an item in the record.

Parameters:
  • rec (dict) – A dictionary record to be decoded.
  • deref (bool) – Wether to decode a RefObject. If the encoder wrote files on a remote host, reading this file will be slow and result in an error if the file is not present.
dereference(data, host=None)

Dereferences RefObjects stuck in the hierarchy. This is a bit of an ugly hack.

encode(obj, host=None)

Encode an object using the serialisers available in this registry. Objects that have a type that is one of [dict, list, str, int, float, bool, tuple] are send back unchanged.

A host-name can be given as an additional argument to identify the host in the resulting record if the encoder yields any filenames.

This function only treats the object for one layer deep.

Parameters:
  • obj – The object that needs encoding.
  • host (str) – The name of the encoding host.
from_json(data, deref=False)

Decode the string from JSON to return the original object (if deref is true. Uses the json.loads function with self.decode as object_hook.

Parameters:
  • data (str) – JSON encoded string.
  • deref (bool) – Whether to decode records that gave ref=True at encoding.
to_json(obj, host=None, indent=None)

Recursively encode obj and convert it to a JSON string.

Parameters:
  • obj – Object to encode.
  • host (str) – hostname where this object is being encoded.
class noodles.serial.registry.SerUnknown(name='<unknown>')
decode(cls, data)

Should decode the data to an object of type ‘cls’.

Parameters:
  • cls (type) – The class is retrieved by the qualified name of the type of the object that was encoded; restored by importing it.
  • data – The data is the record that was passed to make_rec by the encoder.
encode(obj, make_rec)

Should encode an object of type self.base (or derived).

This method receives the object and a function make_rec. This function has signature:

def make_rec(rec, ref=False, files=None):
    ...

If encoding and decoding is somewhat cosuming on resources, the encoder may call with ref=True. Then the resulting record won’t be decoded until needed by the next job. This is most certainly the case when an external file was written. In this case the filename(s) should be passed as a list by files=[...].

The files list is not passed back to the decoder. Rather it is used by noodles to keep track of written files and copy them between hosts if needed. It is the responsibily of the encoder to include the filename information in the passed record as well.

Parameters:
  • obj – Object to be encoded.
  • make_rec – Function used to pack the encoded data with some meta-data.
class noodles.serial.registry.Serialiser(name='<unknown>')

Serialiser base class.

Serialisation classes should derive from Serialiser and overload the encode and decode methods.

Parameters:base (type) – The type that this class is supposed to serialise. This may differ from the type of the object actually being serialised if its class was derived from base. The supposed base-class is kept here for reference but serves no immediate purpose.
decode(cls, data)

Should decode the data to an object of type ‘cls’.

Parameters:
  • cls (type) – The class is retrieved by the qualified name of the type of the object that was encoded; restored by importing it.
  • data – The data is the record that was passed to make_rec by the encoder.
encode(obj, make_rec)

Should encode an object of type self.base (or derived).

This method receives the object and a function make_rec. This function has signature:

def make_rec(rec, ref=False, files=None):
    ...

If encoding and decoding is somewhat cosuming on resources, the encoder may call with ref=True. Then the resulting record won’t be decoded until needed by the next job. This is most certainly the case when an external file was written. In this case the filename(s) should be passed as a list by files=[...].

The files list is not passed back to the decoder. Rather it is used by noodles to keep track of written files and copy them between hosts if needed. It is the responsibily of the encoder to include the filename information in the passed record as well.

Parameters:
  • obj – Object to be encoded.
  • make_rec – Function used to pack the encoded data with some meta-data.

Worker executable

Streams

Coroutine streaming module

Note

In a break with tradition, some classes in this module have lower case names because they tend to be used as function decorators.

We use coroutines to communicate messages between different components in the Noodles runtime. Coroutines can have input or output in two ways passive and active. An example:

def f_pulls(coroutine):
    for msg in coroutine:
        print(msg)

def g_produces(lines):
    for l in lines:
        yield lines

lines = ['aap', 'noot', 'mies']

f_pulls(g_produces(lines))

This prints the words ‘aap’, ‘noot’ and ‘mies’. This same program could be written where the co-routine is the one receiving messages:

def f_receives():
    while True:
        msg = yield
        print(msg)

def g_pushes(coroutine, lines):
    for l in lines:
        coroutine.send(l)

sink = f_receives()
sink.send(None)  # the co-routine needs to be initialised
                 # alternatively, .next() does the same as .send(None)
g_pushes(sink, lines)

The action of creating a coroutine and setting it to the first yield statement can be performed by a little decorator:

from functools import wraps

def coroutine(f):
    @wraps(f)
    def g(*args, **kwargs):
        sink = f(*args, **kwargs)
        sink.send(None)
        return sink

    return g

Pull and push

The pull and push classes capture the idea of pushing and pulling coroutines, wrapping them in an object. These objects can then be chained using the >> operator. Example:

>>> from noodles.lib import (pull_map, pull_from)
>>> @pull_map
... def square(x):
...     return x*x
...
>>> squares = pull_from(range(10)) >> square
>>> list(squares)
[0, 1, 4, 9, 16, 25, 36, 49, 64, 81]

Queues

Queues in python are thread-safe objects. We can define a new Queue object that uses the python queue.Queue to buffer and distribute messages over several threads:

import queue

class Queue(object):
    def __init__(self):
        self._q = queue.Queue()

    def source(self):
        while True:
            msg = self._q.get()
            yield msg
            self._q.task_done()

    @coroutine
    def sink(self):
        while True:
            msg = yield
            self._q.put(msg)

    def wait(self):
        self.Q.join()

Note, that both ends of the queue are, as we call it, passive. We could make an active source (it would become a normal function), taking a call-back as an argument. However, we’re designing the Noodles runtime so that it easy to interleave functionality. Moreover, the Queue object is only concerned with the state of its own queue. The outside universe is only represented by the yield statements, thus preserving the principle of encapsulation.

noodles.lib.decorator(f)

Creates a paramatric decorator from a function. The resulting decorator will optionally take keyword arguments.

noodles.lib.coroutine(f)

A sink should be send None first, so that the coroutine arrives at the yield position. This wrapper takes care that this is done automatically when the coroutine is started.

class noodles.lib.stream

Base class for pull and push coroutines.

class noodles.lib.pull(fn)

A pull coroutine pulls from a source, yielding values. pull Objects can be chained using the >> operator.

A pull object acts as a function of one argument, being the source that the coroutine will pull from. This source argument must always be a thunk (function of zero arguments) returning an iterable.

class noodles.lib.push(fn, dont_wrap=False)

A push coroutine pushes to a sink, receiving values through yield statements. push Objects can be chained using the >> operator.

A push object acts as a function of one argument, being the sink that the coroutine will send to. This sink argument must always be a thunk (function of zero arguments) returning a coroutine.

class noodles.lib.pull_map(f)

A pull_map decorates a function of a single argument, to become a pull object. The resulting pull object pulls object from a source yielding values mapped through the given function.

This is equivalent to:

@pull
def g(source):
    yield from map(f, source())

where f is the function being decorated.

The >> operator on this class is optimised such that only a single loop will be created when chained with another pull_map.

Also, a pull_map may be chained to a function directly, including the given function in the loop.

class noodles.lib.push_map(f)

A push_map decorates a function of a single argument, to become a push object. The resulting push object receives values through yield and sends them on after mapping through the given function.

This is equivalent to:

@push
def g(sink):
    sink = sink()
    while True:
        x = yield
        sink.send(f(x))

where f is the function being decorated.

The >> operator on this class is optimised such that only a single loop will be created when chained with another push_map.

Also, a push_map may be chained to a function directly, including the given function in the loop.

noodles.lib.sink_map(f)

The sink_map() decorator creates a push object from a function that returns no values. The resulting sink can only be used as an end point of a chain.

Equivalent code:

@push
def sink():
    while True:
        x = yield
        f(x)
noodles.lib.broadcast(*sinks_)

The broadcast() decorator creates a push object that receives a message by yield and then sends this message on to all the given sinks.

noodles.lib.branch(*sinks_)

The branch() decorator creates a pull object that pulls from a single source and then sends to all the sinks given. After all the sinks received the message, it is yielded.

noodles.lib.patch(source, sink)

Create a direct link between a source and a sink.

Implementation:

sink = sink()
for value in source():
    sink.send(value)
noodles.lib.pull_from(iterable)

Creates a pull object from an iterable.

Parameters:iterable (Iterable) – an iterable object.
Return type:pull

Equivalent to:

pull(lambda: iter(iterable))
noodles.lib.push_from(iterable)

Creates a push object from an iterable. The resulting function is not a coroutine, but can be chained to another push.

Parameters:iterable (Iterable) – an iterable object.
Return type:push
class noodles.lib.Connection(source, sink, aux=None)

Combine a source and a sink. These should represent the IO of some object, probably a worker. In this case the source is a coroutine generating results, while the sink needs to be fed jobs.

setup()

Activate the source and sink functions and return them in that order.

Returns:source, sink
Return type:tuple
class noodles.lib.Queue(end_of_queue=<class 'noodles.lib.queue.EndOfQueue'>)

A Queue object hides a queue.Queue object behind a source and sink interface.

sink

Receives items that are put on the queue. Pushing the end-of-queue message through the sink will put it on the queue, and will also result in a StopIteration exception being raised.

source

Pull items from the queue. When end-of-queue is encountered the generator returns after re-inserting the end-of-queue message on the queue for other sources to pick up. This way, if many threads are pulling from this queue, they all get the end-of-queue message.

close()

Sends end_of_queue message to the queue. Doesn’t stop running sinks.

flush()

Erases queue and set end-of-queue message.

noodles.lib.thread_pool(*workers, results=None, end_of_queue=<class 'noodles.lib.queue.EndOfQueue'>)

Returns a pull object, call it r, starting a thread for each given worker. Each thread pulls from the source that r is connected to, and the returned results are pushed to a Queue. r yields from the other end of the same Queue.

The target function for each thread is patch(), which can be stopped by exhausting the source.

If all threads have ended, the result queue receives end-of-queue.

Parameters:
  • results (Connection) – If results should go somewhere else than a newly constructed Queue, a different Connection object can be given.
  • end_of_queue – end-of-queue signal object passed on to the creation of the Queue object.
Return type:

pull

noodles.lib.thread_counter(finalize)

Modifies a thread target function, such that the number of active threads is counted. If the count reaches zero, a finalizer is called.

noodles.lib.object_name(obj)

Get the qualified name of an object. This will obtain both the module name from __module__ and object name from __name__, and concatenate those with a ‘.’. Examples:

>>> from math import sin
>>> object_name(sin)
'math.sin'
>>> def f(x):
...     return x*x
...
>>> object_name(f)
'__main__.f'

To have a qualified name, an object must be defined as a class or function in a module (__main__ is also a module). A normal instantiated object does not have a qualified name, even if it is defined and importable from a module. Calling object_name() on such an object will raise AttributeError.

noodles.lib.look_up(name)

Obtain an object from a qualified name. Example:

>>> look_up('math.sin')
<built-in function sin>

This function should be considered the reverse of object_name().

noodles.lib.importable(obj)

Check if an object can be serialised as a qualified name. This is done by checking that a look_up(object_name(obj)) gives back the same object.

noodles.lib.deep_map(f, root)

Sibling to inverse_deep_map(). As map() maps over an iterable, deep_map() maps over a structure of nested ``dict``s and ``list``s. Every object is passed through f recursively. That is, first root is mapped, next any object contained in its result, and so on.

No distinction is made between tuples and lists. This function was created with encoding to JSON compatible data in mind.

noodles.lib.inverse_deep_map(f, root)

Sibling to deep_map(). Recursively maps objects in a nested structure of list and dict objects. Where deep_map() starts at the top, inverse_deep_map() starts at the bottom. First, if root is a list or dict, its contents are |inverse_deep_map|ed. Then at the end, the entire object is passed through f.

This function was created with decoding from JSON compatible data in mind.

noodles.lib.unwrap(f)

Safely obtain the inner function of a previously wrapped (or decorated) function. This either returns f.__wrapped__ or just f if the latter fails.

noodles.lib.is_unwrapped(f)

If f was imported and then unwrapped, this function might return True.