Real World Tutorial 1: Translating Poetry¶
First example¶
We build workflows by calling functions. The simplest example of this is the “diamond workflow”:
[1]:
from noodles import run_single
from noodles.tutorial import (add, sub, mul)
u = add(5, 4)
v = sub(u, 3)
w = sub(u, 2)
x = mul(v, w)
answer = run_single(x)
print("The answer is {0}.".format(answer))
The answer is 42.
That looks like any other Python code! But this example is a bit silly. How do we leverage Noodles to earn an honest living? Here’s a slightly less silly example (but only just!). We will build a small translation engine that translates sentences by submitting each word to an online dictionary over a Rest API. To do this we make loops (“For thou shalt make loops of blue”). First we build the program as you would do in Python, then we sprinkle some Noodles magic and make it work parallel! Furthermore, we’ll see how to:
- make more loops
- cache results for reuse
Making loops¶
Thats all swell, but how do we make a parallel loop? Let’s look at a map
operation; in Python there are several ways to perform a function on all elements in an array. For this example, we will translate some words using the Glosbe service, which has a nice REST interface. We first build some functionality to use this interface.
[2]:
import urllib.request
import json
import re
class Translate:
"""Translate words and sentences in the worst possible way. The Glosbe dictionary
has a nice REST interface that we query for a phrase. We then take the first result.
To translate a sentence, we cut it in pieces, translate it and paste it back into
a Frankenstein monster."""
def __init__(self, src_lang='en', tgt_lang='fy'):
self.src = src_lang
self.tgt = tgt_lang
self.url = 'https://glosbe.com/gapi/translate?' \
'from={src}&dest={tgt}&' \
'phrase={{phrase}}&format=json'.format(
src=src_lang, tgt=tgt_lang)
def query_phrase(self, phrase):
with urllib.request.urlopen(self.url.format(phrase=phrase.lower())) as response:
translation = json.loads(response.read().decode())
return translation
def word(self, phrase):
translation = self.query_phrase(phrase)
#translation = {'tuc': [{'phrase': {'text': phrase.lower()[::-1]}}]}
if len(translation['tuc']) > 0 and 'phrase' in translation['tuc'][0]:
result = translation['tuc'][0]['phrase']['text']
if phrase[0].isupper():
return result.title()
else:
return result
else:
return "<" + phrase + ">"
def sentence(self, phrase):
words = re.sub("[^\w]", " ", phrase).split()
space = re.sub("[\w]+", "{}", phrase)
return space.format(*map(self.word, words))
We start with a list of strings that desparately need translation. And add a little routine to print it in a gracious manner.
[3]:
shakespeare = [
"If music be the food of love, play on,",
"Give me excess of it; that surfeiting,",
"The appetite may sicken, and so die."]
def print_poem(intro, poem):
print(intro)
for line in poem:
print(" ", line)
print()
print_poem("Original:", shakespeare)
Original:
If music be the food of love, play on,
Give me excess of it; that surfeiting,
The appetite may sicken, and so die.
Beginning Python programmers like to append things; this is not how you are supposed to program in Python; if you do, please go and read Jeff Knupp’s Writing Idiomatic Python.
[4]:
shakespeare_auf_deutsch = []
for line in shakespeare:
shakespeare_auf_deutsch.append(
Translate('en', 'de').sentence(line))
print_poem("Auf Deutsch:", shakespeare_auf_deutsch)
Auf Deutsch:
Wenn Musik sein der Essen von Minne, spielen an,
Geben ich Übermaß von es; das übersättigend,
Der Appetit dürfen erkranken, und so sterben.
Rather use a comprehension like so:
[5]:
shakespeare_ynt_frysk = \
(Translate('en', 'fy').sentence(line) for line in shakespeare)
print_poem("Yn it Frysk:", shakespeare_ynt_frysk)
Yn it Frysk:
At muzyk wêze de fiedsel fan leafde, boartsje oan,
Jaan <me> by fersin fan it; dat <surfeiting>,
De <appetite> maaie <sicken>, en dus deagean.
Or use map
:
[6]:
shakespeare_pa_dansk = \
map(Translate('en', 'da').sentence, shakespeare)
print_poem("På Dansk:", shakespeare_pa_dansk)
På Dansk:
Hvis musik være de mad af kærlighed, spil på,
Give mig udskejelser af det; som <surfeiting>,
De appetit må <sicken>, og så dø.
Noodlify!¶
If your connection is a bit slow, you may find that the translations take a while to process. Wouldn’t it be nice to do it in parallel? How much code would we have to change to get there in Noodles? Let’s take the slow part of the program and add a @schedule
decorator, and run! Sadly, it is not that simple. We can add @schedule
to the word
method. This means that it will return a promise.
- Rule: Functions that take promises need to be scheduled functions, or refer to a scheduled function at some level.
We could write
return schedule(space.format)(*(self.word(w) for w in words))
in the last line of the sentence
method, but the string format method doesn’t support wrapping. We rely on getting the signature of a function by calling inspect.signature
. In some cases of build-in function this raises an exception. We may find a work around for these cases in future versions of Noodles. For the moment we’ll have to define a little wrapper function.
[7]:
from noodles import schedule
@schedule
def format_string(s, *args, **kwargs):
return s.format(*args, **kwargs)
import urllib.request
import json
import re
class Translate:
"""Translate words and sentences in the worst possible way. The Glosbe dictionary
has a nice REST interface that we query for a phrase. We then take the first result.
To translate a sentence, we cut it in pieces, translate it and paste it back into
a Frankenstein monster."""
def __init__(self, src_lang='en', tgt_lang='fy'):
self.src = src_lang
self.tgt = tgt_lang
self.url = 'https://glosbe.com/gapi/translate?' \
'from={src}&dest={tgt}&' \
'phrase={{phrase}}&format=json'.format(
src=src_lang, tgt=tgt_lang)
def query_phrase(self, phrase):
with urllib.request.urlopen(self.url.format(phrase=phrase.lower())) as response:
translation = json.loads(response.read().decode())
return translation
@schedule
def word(self, phrase):
#translation = {'tuc': [{'phrase': {'text': phrase.lower()[::-1]}}]}
translation = self.query_phrase(phrase)
if len(translation['tuc']) > 0 and 'phrase' in translation['tuc'][0]:
result = translation['tuc'][0]['phrase']['text']
if phrase[0].isupper():
return result.title()
else:
return result
else:
return "<" + phrase + ">"
def sentence(self, phrase):
words = re.sub("[^\w]", " ", phrase).split()
space = re.sub("[\w]+", "{}", phrase)
return format_string(space, *map(self.word, words))
def __str__(self):
return "[{} -> {}]".format(self.src, self.tgt)
def __serialize__(self, pack):
return pack({'src_lang': self.src,
'tgt_lang': self.tgt})
@classmethod
def __construct__(cls, msg):
return cls(**msg)
Let’s take stock of the mutations to the original. We’ve added a @schedule
decorator to word
, and changed a function call in sentence
. Also we added the __str__
method; this is only needed to plot the workflow graph. Let’s run the new script.
[8]:
from noodles import gather, run_parallel
from noodles.tutorial import get_workflow_graph
shakespeare_en_esperanto = \
map(Translate('en', 'eo').sentence, shakespeare)
wf = gather(*shakespeare_en_esperanto)
workflow_graph = get_workflow_graph(wf._workflow)
result = run_parallel(wf, n_threads=8)
print_poem("Shakespeare en Esperanto:", result)
Shakespeare en Esperanto:
Se muziko esti la manĝaĵo de ami, ludi sur,
Doni mi eksceso de ĝi; ke <surfeiting>,
La apetito povi naŭzi, kaj tiel morti.
The last peculiar thing that you may notice, is the gather
function. It collects the promises that map
generates and creates a single new promise. The definition of gather
is very simple:
@schedule
def gather(*lst):
return lst
The workflow graph of the Esperanto translator script looks like this:
[9]:
workflow_graph.attr(size='10')
workflow_graph
[9]:
Dealing with repetition¶
In the following example we have a line with some repetition.
[1]:
from noodles import (schedule, gather_all)
import re
@schedule
def word_size(word):
return len(word)
@schedule
def format_string(s, *args, **kwargs):
return s.format(*args, **kwargs)
def word_size_phrase(phrase):
words = re.sub("[^\w]", " ", phrase).split()
space = re.sub("[\w]+", "{}", phrase)
word_lengths = map(word_size, words)
return format_string(space, *word_lengths)
[2]:
from noodles.tutorial import display_workflows, run_and_print_log
display_workflows(
prefix='poetry',
sizes=word_size_phrase("Oote oote oote, Boe"))
sizes |
---|
Let’s run the example workflows now, but focus on the actions taken, looking at the logs. The function run_and_print_log
in the tutorial module runs our workflow with four parallel threads and caches results in a Sqlite3 database.
To see how this program is being run, we monitor the job submission, retrieval and result storage. First, should you have run this tutorial before, remove the database file.
[3]:
# remove the database if it already exists
!rm -f tutorial.db
Running the workflow, we can now see that at the second occurence of the word ‘oote’, the function call is attached to the first job that asked for the same result. The job word_size('oote')
is run only once.
[4]:
run_and_print_log(word_size_phrase("Oote oote oote, Boe"), highlight=range(4, 8))
2018-02-08 11:27:49,767 - job 1: word_size('Oote')
2018-02-08 11:27:49,767 - job 2: word_size('oote')
2018-02-08 11:27:49,768 - job 3: word_size('oote')
2018-02-08 11:27:49,768 - job 4: word_size('Boe')
2018-02-08 11:27:49,772 - result 1 [word_size('Oote')]: done -> 4
2018-02-08 11:27:49,772 - result 2 [word_size('oote')]: done -> 4
2018-02-08 11:27:49,773 - result 3 [word_size('oote')]: attached -> 4
2018-02-08 11:27:49,773 - result 4 [word_size('Boe')]: done -> 3
2018-02-08 11:27:49,773 - job … at_string('{} {} {}, {}', 4, 4, 4, 3)
2018-02-08 11:27:49,775 - result … {}', 4, 4, 4, 3)]: done -> '4 4 4, 3'
2018-02-08 11:27:49,775 - -end-of-queue-
[4]:
'4 4 4, 3'
Now, running a similar workflow again, notice that previous results are retrieved from the database.
[5]:
run_and_print_log(word_size_phrase("Oe oe oote oote oote"), highlight=range(5, 10))
2018-02-08 11:28:02,608 - job 6: word_size('Oe')
2018-02-08 11:28:02,609 - job 7: word_size('oe')
2018-02-08 11:28:02,609 - job 8: word_size('oote')
2018-02-08 11:28:02,609 - job 9: word_size('oote')
2018-02-08 11:28:02,609 - job 10: word_size('oote')
2018-02-08 11:28:02,618 - result 6 [word_size('Oe')]: done -> 2
2018-02-08 11:28:02,619 - result 7 [word_size('oe')]: done -> 2
2018-02-08 11:28:02,620 - result 8 [word_size('oote')]: retrieved -> 4
2018-02-08 11:28:02,620 - result 9 [word_size('oote')]: retrieved -> 4
2018-02-08 11:28:02,621 - result 10 [word_size('oote')]: retrieved -> 4
2018-02-08 11:28:02,621 - job … ring('{} {} {} {} {}', 2, 2, 4, 4, 4)
2018-02-08 11:28:02,624 - result … 2, 2, 4, 4, 4)]: done -> '2 2 4 4 4'
2018-02-08 11:28:02,624 - -end-of-queue-
[5]:
'2 2 4 4 4'
Although the result of every single job is retrieved we still had to go through the trouble of looking up the results of word_size('Oote')
, word_size('oote')
, and word_size('Boe')
to find out that we wanted the result from the format_string
. If you want to cache the result of an entire workflow, pack the workflow in another scheduled function!
Versioning¶
We may add a version string to a function. This version is taken into account when looking up results in the database.
[6]:
@schedule(version='1.0')
def word_size_phrase(phrase):
words = re.sub("[^\w]", " ", phrase).split()
space = re.sub("[\w]+", "{}", phrase)
word_lengths = map(word_size, words)
return format_string(space, *word_lengths)
run_and_print_log(
word_size_phrase("Kneu kneu kneu kneu ote kneu eur"),
highlight=[1, 17])
2018-02-08 11:28:04,517 - job … ize_phrase('Kneu kneu ... kneu eur')
2018-02-08 11:28:04,530 - result … eur')]: done -> workflow 7f1164b07198
2018-02-08 11:28:04,531 - job 13: word_size('Kneu')
2018-02-08 11:28:04,532 - job 14: word_size('kneu')
2018-02-08 11:28:04,533 - job 15: word_size('kneu')
2018-02-08 11:28:04,535 - job 16: word_size('kneu')
2018-02-08 11:28:04,536 - job 17: word_size('ote')
2018-02-08 11:28:04,537 - job 18: word_size('kneu')
2018-02-08 11:28:04,538 - job 19: word_size('eur')
2018-02-08 11:28:04,540 - result 13 [word_size('Kneu')]: done -> 4
2018-02-08 11:28:04,540 - result 14 [word_size('kneu')]: done -> 4
2018-02-08 11:28:04,541 - result 15 [word_size('kneu')]: attached -> 4
2018-02-08 11:28:04,541 - result 16 [word_size('kneu')]: attached -> 4
2018-02-08 11:28:04,541 - result 18 [word_size('kneu')]: attached -> 4
2018-02-08 11:28:04,541 - result 17 [word_size('ote')]: done -> 3
2018-02-08 11:28:04,541 - result 19 [word_size('eur')]: done -> 3
2018-02-08 11:28:04,542 - job … {} {} {} {} {}', 4, 4, 4, 4, 3, 4, 3)
2018-02-08 11:28:04,545 - result … 4, 3, 4, 3)]: done -> '4 4 4 4 3 4 3'
2018-02-08 11:28:04,546 - -end-of-queue-
[6]:
'4 4 4 4 3 4 3'
See how the first job is evaluated to return a new workflow. Note that if the version is omitted, it is automatically generated from the source of the function. For example, let’s say we decided the function word_size_phrase
should return a dictionary of all word sizes in stead of a string. Here we use the function called lift
to transform a dictionary containing promises to a promise of a dictionary. lift
can handle lists, dictionaries, sets, tuples and objects that are
constructable from their __dict__
member.
[7]:
from noodles import lift
def word_size_phrase(phrase):
words = re.sub("[^\w]", " ", phrase).split()
return lift({word: word_size(word) for word in words})
display_workflows(prefix='poetry', lift=word_size_phrase("Kneu kneu kneu kneu ote kneu eur"))
lift |
---|
[8]:
run_and_print_log(word_size_phrase("Kneu kneu kneu kneu ote kneu eur"))
2018-02-08 11:28:19,992 - job 21: word_size('Kneu')
2018-02-08 11:28:19,992 - job 22: word_size('kneu')
2018-02-08 11:28:19,992 - job 23: word_size('ote')
2018-02-08 11:28:19,992 - job 24: word_size('eur')
2018-02-08 11:28:19,997 - result 21 [word_size('Kneu')]: retrieved -> 4
2018-02-08 11:28:19,998 - job 25: make_tuple('Kneu', 4)
2018-02-08 11:28:19,999 - result 22 [word_size('kneu')]: retrieved -> 4
2018-02-08 11:28:20,000 - job 26: make_tuple('kneu', 4)
2018-02-08 11:28:20,001 - result 23 [word_size('ote')]: retrieved -> 3
2018-02-08 11:28:20,002 - job 27: make_tuple('ote', 3)
2018-02-08 11:28:20,003 - result 24 [word_size('eur')]: retrieved -> 3
2018-02-08 11:28:20,004 - job 28: make_tuple('eur', 3)
2018-02-08 11:28:20,005 - result … uple('Kneu', 4)]: done -> ('Kneu', 4)
2018-02-08 11:28:20,005 - result … uple('kneu', 4)]: done -> ('kneu', 4)
2018-02-08 11:28:20,006 - result … _tuple('ote', 3)]: done -> ('ote', 3)
2018-02-08 11:28:20,006 - result … _tuple('eur', 3)]: done -> ('eur', 3)
2018-02-08 11:28:20,006 - job … ('kneu', 4), ('ote', 3), ('eur', 3))
2018-02-08 11:28:20,008 - result … u': 4, 'kneu': 4, 'ote': 3, 'eur': 3}
2018-02-08 11:28:20,008 - -end-of-queue-
[8]:
{'Kneu': 4, 'eur': 3, 'kneu': 4, 'ote': 3}
Be careful with versions! Noodles will believe you upon your word! If we lie about the version, it will go ahead and retrieve the result belonging to the old function:
[9]:
@schedule(version='1.0')
def word_size_phrase(phrase):
words = re.sub("[^\w]", " ", phrase).split()
return lift({word: word_size(word) for word in words})
run_and_print_log(
word_size_phrase("Kneu kneu kneu kneu ote kneu eur"),
highlight=[1])
2018-02-08 11:28:29,728 - job … ize_phrase('Kneu kneu ... kneu eur')
2018-02-08 11:28:29,734 - result … eur')]: retrieved -> '4 4 4 4 3 4 3'
2018-02-08 11:28:29,734 - -end-of-queue-
[9]:
'4 4 4 4 3 4 3'