Real World Tutorial 1: Translating Poetry

First example

We build workflows by calling functions. The simplest example of this is the “diamond workflow”:

[1]:
from noodles import run_single
from noodles.tutorial import (add, sub, mul)

u = add(5, 4)
v = sub(u, 3)
w = sub(u, 2)
x = mul(v, w)

answer = run_single(x)

print("The answer is {0}.".format(answer))
The answer is 42.

That looks like any other Python code! But this example is a bit silly. How do we leverage Noodles to earn an honest living? Here’s a slightly less silly example (but only just!). We will build a small translation engine that translates sentences by submitting each word to an online dictionary over a Rest API. To do this we make loops (“For thou shalt make loops of blue”). First we build the program as you would do in Python, then we sprinkle some Noodles magic and make it work parallel! Furthermore, we’ll see how to:

  • make more loops
  • cache results for reuse

Making loops

Thats all swell, but how do we make a parallel loop? Let’s look at a map operation; in Python there are several ways to perform a function on all elements in an array. For this example, we will translate some words using the Glosbe service, which has a nice REST interface. We first build some functionality to use this interface.

[2]:
import urllib.request
import json
import re


class Translate:
    """Translate words and sentences in the worst possible way. The Glosbe dictionary
    has a nice REST interface that we query for a phrase. We then take the first result.
    To translate a sentence, we cut it in pieces, translate it and paste it back into
    a Frankenstein monster."""
    def __init__(self, src_lang='en', tgt_lang='fy'):
        self.src = src_lang
        self.tgt = tgt_lang
        self.url = 'https://glosbe.com/gapi/translate?' \
                   'from={src}&dest={tgt}&' \
                   'phrase={{phrase}}&format=json'.format(
                        src=src_lang, tgt=tgt_lang)

    def query_phrase(self, phrase):
        with urllib.request.urlopen(self.url.format(phrase=phrase.lower())) as response:
            translation = json.loads(response.read().decode())
        return translation

    def word(self, phrase):
        translation = self.query_phrase(phrase)
        #translation = {'tuc': [{'phrase': {'text': phrase.lower()[::-1]}}]}
        if len(translation['tuc']) > 0 and 'phrase' in translation['tuc'][0]:
            result = translation['tuc'][0]['phrase']['text']
            if phrase[0].isupper():
                return result.title()
            else:
                return result
        else:
            return "<" + phrase + ">"

    def sentence(self, phrase):
        words = re.sub("[^\w]", " ", phrase).split()
        space = re.sub("[\w]+", "{}", phrase)
        return space.format(*map(self.word, words))

We start with a list of strings that desparately need translation. And add a little routine to print it in a gracious manner.

[3]:
shakespeare = [
    "If music be the food of love, play on,",
    "Give me excess of it; that surfeiting,",
    "The appetite may sicken, and so die."]

def print_poem(intro, poem):
    print(intro)
    for line in poem:
        print("     ", line)
    print()

print_poem("Original:", shakespeare)
Original:
      If music be the food of love, play on,
      Give me excess of it; that surfeiting,
      The appetite may sicken, and so die.

Beginning Python programmers like to append things; this is not how you are supposed to program in Python; if you do, please go and read Jeff Knupp’s Writing Idiomatic Python.

[4]:
shakespeare_auf_deutsch = []
for line in shakespeare:
    shakespeare_auf_deutsch.append(
        Translate('en', 'de').sentence(line))
print_poem("Auf Deutsch:", shakespeare_auf_deutsch)
Auf Deutsch:
      Wenn Musik sein der Essen von Minne, spielen an,
      Geben ich Übermaß von es; das übersättigend,
      Der Appetit dürfen erkranken, und so sterben.

Rather use a comprehension like so:

[5]:
shakespeare_ynt_frysk = \
    (Translate('en', 'fy').sentence(line) for line in shakespeare)
print_poem("Yn it Frysk:", shakespeare_ynt_frysk)
Yn it Frysk:
      At muzyk wêze de fiedsel fan leafde, boartsje oan,
      Jaan <me> by fersin fan it; dat <surfeiting>,
      De <appetite> maaie <sicken>, en dus deagean.

Or use map:

[6]:
shakespeare_pa_dansk = \
    map(Translate('en', 'da').sentence, shakespeare)
print_poem("På Dansk:", shakespeare_pa_dansk)
På Dansk:
      Hvis musik være de mad af kærlighed, spil på,
      Give mig udskejelser af det; som <surfeiting>,
      De appetit må <sicken>, og så dø.

Noodlify!

If your connection is a bit slow, you may find that the translations take a while to process. Wouldn’t it be nice to do it in parallel? How much code would we have to change to get there in Noodles? Let’s take the slow part of the program and add a @schedule decorator, and run! Sadly, it is not that simple. We can add @schedule to the word method. This means that it will return a promise.

  • Rule: Functions that take promises need to be scheduled functions, or refer to a scheduled function at some level.

We could write

return schedule(space.format)(*(self.word(w) for w in words))

in the last line of the sentence method, but the string format method doesn’t support wrapping. We rely on getting the signature of a function by calling inspect.signature. In some cases of build-in function this raises an exception. We may find a work around for these cases in future versions of Noodles. For the moment we’ll have to define a little wrapper function.

[7]:
from noodles import schedule


@schedule
def format_string(s, *args, **kwargs):
    return s.format(*args, **kwargs)


import urllib.request
import json
import re


class Translate:
    """Translate words and sentences in the worst possible way. The Glosbe dictionary
    has a nice REST interface that we query for a phrase. We then take the first result.
    To translate a sentence, we cut it in pieces, translate it and paste it back into
    a Frankenstein monster."""
    def __init__(self, src_lang='en', tgt_lang='fy'):
        self.src = src_lang
        self.tgt = tgt_lang
        self.url = 'https://glosbe.com/gapi/translate?' \
                   'from={src}&dest={tgt}&' \
                   'phrase={{phrase}}&format=json'.format(
                        src=src_lang, tgt=tgt_lang)

    def query_phrase(self, phrase):
        with urllib.request.urlopen(self.url.format(phrase=phrase.lower())) as response:
            translation = json.loads(response.read().decode())
        return translation

    @schedule
    def word(self, phrase):
        #translation = {'tuc': [{'phrase': {'text': phrase.lower()[::-1]}}]}
        translation = self.query_phrase(phrase)

        if len(translation['tuc']) > 0 and 'phrase' in translation['tuc'][0]:
            result = translation['tuc'][0]['phrase']['text']
            if phrase[0].isupper():
                return result.title()
            else:
                return result
        else:
            return "<" + phrase + ">"

    def sentence(self, phrase):
        words = re.sub("[^\w]", " ", phrase).split()
        space = re.sub("[\w]+", "{}", phrase)
        return format_string(space, *map(self.word, words))

    def __str__(self):
        return "[{} -> {}]".format(self.src, self.tgt)

    def __serialize__(self, pack):
        return pack({'src_lang': self.src,
                     'tgt_lang': self.tgt})

    @classmethod
    def __construct__(cls, msg):
        return cls(**msg)

Let’s take stock of the mutations to the original. We’ve added a @schedule decorator to word, and changed a function call in sentence. Also we added the __str__ method; this is only needed to plot the workflow graph. Let’s run the new script.

[8]:
from noodles import gather, run_parallel
from noodles.tutorial import get_workflow_graph

shakespeare_en_esperanto = \
    map(Translate('en', 'eo').sentence, shakespeare)

wf = gather(*shakespeare_en_esperanto)
workflow_graph = get_workflow_graph(wf._workflow)
result = run_parallel(wf, n_threads=8)
print_poem("Shakespeare en Esperanto:", result)
Shakespeare en Esperanto:
      Se muziko esti la manĝaĵo de ami, ludi sur,
      Doni mi eksceso de ĝi; ke <surfeiting>,
      La apetito povi naŭzi, kaj tiel morti.

The last peculiar thing that you may notice, is the gather function. It collects the promises that map generates and creates a single new promise. The definition of gather is very simple:

@schedule
def gather(*lst):
    return lst

The workflow graph of the Esperanto translator script looks like this:

[9]:
workflow_graph.attr(size='10')
workflow_graph
[9]:
_images/poetry_tutorial_18_0.svg

Dealing with repetition

In the following example we have a line with some repetition.

[1]:
from noodles import (schedule, gather_all)
import re

@schedule
def word_size(word):
    return len(word)

@schedule
def format_string(s, *args, **kwargs):
    return s.format(*args, **kwargs)

def word_size_phrase(phrase):
    words = re.sub("[^\w]", " ", phrase).split()
    space = re.sub("[\w]+", "{}", phrase)
    word_lengths = map(word_size, words)
    return format_string(space, *word_lengths)
[2]:
from noodles.tutorial import display_workflows, run_and_print_log

display_workflows(
    prefix='poetry',
    sizes=word_size_phrase("Oote oote oote, Boe"))
sizes
workflow sizes

Let’s run the example workflows now, but focus on the actions taken, looking at the logs. The function run_and_print_log in the tutorial module runs our workflow with four parallel threads and caches results in a Sqlite3 database.

To see how this program is being run, we monitor the job submission, retrieval and result storage. First, should you have run this tutorial before, remove the database file.

[3]:
# remove the database if it already exists
!rm -f tutorial.db

Running the workflow, we can now see that at the second occurence of the word ‘oote’, the function call is attached to the first job that asked for the same result. The job word_size('oote') is run only once.

[4]:
run_and_print_log(word_size_phrase("Oote oote oote, Boe"), highlight=range(4, 8))
2018-02-08 11:27:49,767 - job             1: word_size('Oote')
2018-02-08 11:27:49,767 - job             2: word_size('oote')
2018-02-08 11:27:49,768 - job             3: word_size('oote')
2018-02-08 11:27:49,768 - job             4: word_size('Boe')
2018-02-08 11:27:49,772 - result          1 [word_size('Oote')]: done -> 4
2018-02-08 11:27:49,772 - result          2 [word_size('oote')]: done -> 4
2018-02-08 11:27:49,773 - result          3 [word_size('oote')]: attached -> 4
2018-02-08 11:27:49,773 - result          4 [word_size('Boe')]: done -> 3
2018-02-08 11:27:49,773 - job            … at_string('{} {} {}, {}', 4, 4, 4, 3)
2018-02-08 11:27:49,775 - result         … {}', 4, 4, 4, 3)]: done -> '4 4 4, 3'
2018-02-08 11:27:49,775 - -end-of-queue-
[4]:
'4 4 4, 3'

Now, running a similar workflow again, notice that previous results are retrieved from the database.

[5]:
run_and_print_log(word_size_phrase("Oe oe oote oote oote"), highlight=range(5, 10))
2018-02-08 11:28:02,608 - job             6: word_size('Oe')
2018-02-08 11:28:02,609 - job             7: word_size('oe')
2018-02-08 11:28:02,609 - job             8: word_size('oote')
2018-02-08 11:28:02,609 - job             9: word_size('oote')
2018-02-08 11:28:02,609 - job            10: word_size('oote')
2018-02-08 11:28:02,618 - result          6 [word_size('Oe')]: done -> 2
2018-02-08 11:28:02,619 - result          7 [word_size('oe')]: done -> 2
2018-02-08 11:28:02,620 - result          8 [word_size('oote')]: retrieved -> 4
2018-02-08 11:28:02,620 - result          9 [word_size('oote')]: retrieved -> 4
2018-02-08 11:28:02,621 - result         10 [word_size('oote')]: retrieved -> 4
2018-02-08 11:28:02,621 - job            … ring('{} {} {} {} {}', 2, 2, 4, 4, 4)
2018-02-08 11:28:02,624 - result         …  2, 2, 4, 4, 4)]: done -> '2 2 4 4 4'
2018-02-08 11:28:02,624 - -end-of-queue-
[5]:
'2 2 4 4 4'

Although the result of every single job is retrieved we still had to go through the trouble of looking up the results of word_size('Oote'), word_size('oote'), and word_size('Boe') to find out that we wanted the result from the format_string. If you want to cache the result of an entire workflow, pack the workflow in another scheduled function!

Versioning

We may add a version string to a function. This version is taken into account when looking up results in the database.

[6]:
@schedule(version='1.0')
def word_size_phrase(phrase):
    words = re.sub("[^\w]", " ", phrase).split()
    space = re.sub("[\w]+", "{}", phrase)
    word_lengths = map(word_size, words)
    return format_string(space, *word_lengths)

run_and_print_log(
    word_size_phrase("Kneu kneu kneu kneu ote kneu eur"),
    highlight=[1, 17])
2018-02-08 11:28:04,517 - job            … ize_phrase('Kneu kneu ...  kneu eur')
2018-02-08 11:28:04,530 - result         … eur')]: done -> workflow 7f1164b07198
2018-02-08 11:28:04,531 - job            13: word_size('Kneu')
2018-02-08 11:28:04,532 - job            14: word_size('kneu')
2018-02-08 11:28:04,533 - job            15: word_size('kneu')
2018-02-08 11:28:04,535 - job            16: word_size('kneu')
2018-02-08 11:28:04,536 - job            17: word_size('ote')
2018-02-08 11:28:04,537 - job            18: word_size('kneu')
2018-02-08 11:28:04,538 - job            19: word_size('eur')
2018-02-08 11:28:04,540 - result         13 [word_size('Kneu')]: done -> 4
2018-02-08 11:28:04,540 - result         14 [word_size('kneu')]: done -> 4
2018-02-08 11:28:04,541 - result         15 [word_size('kneu')]: attached -> 4
2018-02-08 11:28:04,541 - result         16 [word_size('kneu')]: attached -> 4
2018-02-08 11:28:04,541 - result         18 [word_size('kneu')]: attached -> 4
2018-02-08 11:28:04,541 - result         17 [word_size('ote')]: done -> 3
2018-02-08 11:28:04,541 - result         19 [word_size('eur')]: done -> 3
2018-02-08 11:28:04,542 - job            … {} {} {} {} {}', 4, 4, 4, 4, 3, 4, 3)
2018-02-08 11:28:04,545 - result         … 4, 3, 4, 3)]: done -> '4 4 4 4 3 4 3'
2018-02-08 11:28:04,546 - -end-of-queue-
[6]:
'4 4 4 4 3 4 3'

See how the first job is evaluated to return a new workflow. Note that if the version is omitted, it is automatically generated from the source of the function. For example, let’s say we decided the function word_size_phrase should return a dictionary of all word sizes in stead of a string. Here we use the function called lift to transform a dictionary containing promises to a promise of a dictionary. lift can handle lists, dictionaries, sets, tuples and objects that are constructable from their __dict__ member.

[7]:
from noodles import lift

def word_size_phrase(phrase):
    words = re.sub("[^\w]", " ", phrase).split()
    return lift({word: word_size(word) for word in words})

display_workflows(prefix='poetry', lift=word_size_phrase("Kneu kneu kneu kneu ote kneu eur"))
lift
workflow lift
[8]:
run_and_print_log(word_size_phrase("Kneu kneu kneu kneu ote kneu eur"))
2018-02-08 11:28:19,992 - job            21: word_size('Kneu')
2018-02-08 11:28:19,992 - job            22: word_size('kneu')
2018-02-08 11:28:19,992 - job            23: word_size('ote')
2018-02-08 11:28:19,992 - job            24: word_size('eur')
2018-02-08 11:28:19,997 - result         21 [word_size('Kneu')]: retrieved -> 4
2018-02-08 11:28:19,998 - job            25: make_tuple('Kneu', 4)
2018-02-08 11:28:19,999 - result         22 [word_size('kneu')]: retrieved -> 4
2018-02-08 11:28:20,000 - job            26: make_tuple('kneu', 4)
2018-02-08 11:28:20,001 - result         23 [word_size('ote')]: retrieved -> 3
2018-02-08 11:28:20,002 - job            27: make_tuple('ote', 3)
2018-02-08 11:28:20,003 - result         24 [word_size('eur')]: retrieved -> 3
2018-02-08 11:28:20,004 - job            28: make_tuple('eur', 3)
2018-02-08 11:28:20,005 - result         … uple('Kneu', 4)]: done -> ('Kneu', 4)
2018-02-08 11:28:20,005 - result         … uple('kneu', 4)]: done -> ('kneu', 4)
2018-02-08 11:28:20,006 - result         … _tuple('ote', 3)]: done -> ('ote', 3)
2018-02-08 11:28:20,006 - result         … _tuple('eur', 3)]: done -> ('eur', 3)
2018-02-08 11:28:20,006 - job            …  ('kneu', 4), ('ote', 3), ('eur', 3))
2018-02-08 11:28:20,008 - result         … u': 4, 'kneu': 4, 'ote': 3, 'eur': 3}
2018-02-08 11:28:20,008 - -end-of-queue-
[8]:
{'Kneu': 4, 'eur': 3, 'kneu': 4, 'ote': 3}

Be careful with versions! Noodles will believe you upon your word! If we lie about the version, it will go ahead and retrieve the result belonging to the old function:

[9]:
@schedule(version='1.0')
def word_size_phrase(phrase):
    words = re.sub("[^\w]", " ", phrase).split()
    return lift({word: word_size(word) for word in words})

run_and_print_log(
    word_size_phrase("Kneu kneu kneu kneu ote kneu eur"),
    highlight=[1])
2018-02-08 11:28:29,728 - job            … ize_phrase('Kneu kneu ...  kneu eur')
2018-02-08 11:28:29,734 - result         …  eur')]: retrieved -> '4 4 4 4 3 4 3'
2018-02-08 11:28:29,734 - -end-of-queue-
[9]:
'4 4 4 4 3 4 3'