Myslím, že kladete několik velmi dobrých otázek, které zdůrazňují, jak užitečný může být SWF jako služba. Stručně řečeno, neříkáte svým serverům, aby koordinovaly práci mezi sebou. To vše za vás řídí váš rozhodovatel s pomocí služby SWF.
Implementace vašeho pracovního postupu bude probíhat následovně:
- Registrace vašeho pracovního postupu a jeho aktivit ve službě (jednorázová).
- Implementujte rozhodce a pracovníky.
- Nechte své pracovníky a osoby, které rozhodují, běžet.
- Začněte nový pracovní postup.
Existuje řada způsobů, jak zadat přihlašovací údaje do kódu boto.swf. Pro účely tohoto cvičení je doporučuji exportovat do prostředí před spuštěním níže uvedeného kódu:
export AWS_ACCESS_KEY_ID=<your access key>
export AWS_SECRET_ACCESS_KEY=<your secret key>
1) Pro registraci domény, pracovního postupu a činností proveďte následující:
# ab_setup.py
import boto.swf.layer2 as swf
DOMAIN = 'stackoverflow'
ACTIVITY1 = 'ServerAActivity'
ACTIVITY2 = 'ServerBActivity'
VERSION = '1.0'
swf.Domain(name=DOMAIN).register()
swf.ActivityType(domain=DOMAIN, name=ACTIVITY1, version=VERSION, task_list='a_tasks').register()
swf.ActivityType(domain=DOMAIN, name=ACTIVITY2, version=VERSION, task_list='b_tasks').register()
swf.WorkflowType(domain=DOMAIN, name='MyWorkflow', version=VERSION, task_list='default_tasks').register()
2) Implementujte a řiďte rozhodující a pracovníky.
# ab_decider.py
import time
import boto.swf.layer2 as swf
DOMAIN = 'stackoverflow'
ACTIVITY1 = 'ServerAActivity'
ACTIVITY2 = 'ServerBActivity'
VERSION = '1.0'
class ABDecider(swf.Decider):
domain = DOMAIN
task_list = 'default_tasks'
version = VERSION
def run(self):
history = self.poll()
# Print history to familiarize yourself with its format.
print history
if 'events' in history:
# Get a list of non-decision events to see what event came in last.
workflow_events = [e for e in history['events']
if not e['eventType'].startswith('Decision')]
decisions = swf.Layer1Decisions()
# Record latest non-decision event.
last_event = workflow_events[-1]
last_event_type = last_event['eventType']
if last_event_type == 'WorkflowExecutionStarted':
# At the start, get the worker to fetch the first assignment.
decisions.schedule_activity_task('%s-%i' % (ACTIVITY1, time.time()),
ACTIVITY1, VERSION, task_list='a_tasks')
elif last_event_type == 'ActivityTaskCompleted':
# Take decision based on the name of activity that has just completed.
# 1) Get activity's event id.
last_event_attrs = last_event['activityTaskCompletedEventAttributes']
completed_activity_id = last_event_attrs['scheduledEventId'] - 1
# 2) Extract its name.
activity_data = history['events'][completed_activity_id]
activity_attrs = activity_data['activityTaskScheduledEventAttributes']
activity_name = activity_attrs['activityType']['name']
# 3) Optionally, get the result from the activity.
result = last_event['activityTaskCompletedEventAttributes'].get('result')
# Take the decision.
if activity_name == ACTIVITY1:
# Completed ACTIVITY1 just came in. Kick off ACTIVITY2.
decisions.schedule_activity_task('%s-%i' % (ACTIVITY2, time.time()),
ACTIVITY2, VERSION, task_list='b_tasks', input=result)
elif activity_name == ACTIVITY2:
# Server B completed activity. We're done.
decisions.complete_workflow_execution()
self.complete(decisions=decisions)
return True
Pracovníci jsou mnohem jednodušší, pokud nechcete, nemusíte používat dědictví.
# ab_worker.py
import os
import time
import boto.swf.layer2 as swf
DOMAIN = 'stackoverflow'
ACTIVITY1 = 'ServerAActivity'
ACTIVITY2 = 'ServerBActivity'
VERSION = '1.0'
class MyBaseWorker(swf.ActivityWorker):
domain = DOMAIN
version = VERSION
task_list = None
def run(self):
activity_task = self.poll()
print activity_task
if 'activityId' in activity_task:
# Get input.
# Get the method for the requested activity.
try:
self.activity(activity_task.get('input'))
except Exception, error:
self.fail(reason=str(error))
raise error
return True
def activity(self, activity_input):
raise NotImplementedError
class WorkerA(MyBaseWorker):
task_list = 'a_tasks'
def activity(self, activity_input):
result = str(time.time())
print 'worker a reporting time: %s' % result
self.complete(result=result)
class WorkerB(MyBaseWorker):
task_list = 'b_tasks'
def activity(self, activity_input):
result = str(os.getpid())
print 'worker b returning pid: %s' % result
self.complete(result=result)
3) Spusťte své rozhodovací jednotky a pracovníky. Vaše rozhodovací zařízení a pracovníci mohou běžet ze samostatných hostitelů nebo z jednoho a stejného počítače. Otevřete čtyři terminály a spusťte své herce:
Nejprve vaše rozhodnutí
$ python -i ab_decider.py
>>> while ABDecider().run(): pass
...
Pak pracovník A, můžete to udělat ze serveru A:
$ python -i ab_workers.py
>>> while WorkerA().run(): pass
Pak pracovník B, možná ze serveru B, ale pokud je všechny spustíte z notebooku, bude to fungovat stejně dobře:
$ python -i ab_workers.py
>>> while WorkerB().run(): pass
...
4) Nakonec spusťte pracovní postup.
$ python
Python 2.6.5 (r265:79063, Apr 16 2010, 13:57:41)
[GCC 4.4.3] on linux2
Type "help", "copyright", "credits" or "license" for more information.
>>> import boto.swf.layer2 as swf
>>> workflows = swf.Domain(name='stackoverflow').workflows()
>>> workflows
[<WorkflowType 'MyWorkflow-1.0' at 0xdeb1d0>]
>>> execution = workflows[0].start(task_list='default_tasks')
>>>
Přepněte zpět, abyste viděli, co se stane s vašimi herci. Mohou se odpojit od služby po jedné minutě nečinnosti. Pokud k tomu dojde, stisknutím šipky-nahoru+enter znovu vstoupíte do smyčky hlasování.
Nyní můžete přejít na panel SWF vaší konzoly pro správu AWS, zkontrolovat, jak si provádění vedou, a zobrazit jejich historii. Případně se na něj můžete zeptat pomocí příkazového řádku.
>>> execution.history()
[{'eventId': 1, 'eventType': 'WorkflowExecutionStarted',
'workflowExecutionStartedEventAttributes': {'taskList': {'name': 'default_tasks'},
'parentInitiatedEventId': 0, 'taskStartToCloseTimeout': '300', 'childPolicy':
'TERMINATE', 'executionStartToCloseTimeout': '3600', 'workflowType': {'version':
'1.0', 'name': 'MyWorkflow'}}, 'eventTimestamp': 1361132267.5810001}, {'eventId': 2,
'eventType': 'DecisionTaskScheduled', 'decisionTaskScheduledEventAttributes':
{'startToCloseTimeout': '300', 'taskList': {'name': ...
To je jen příklad pracovního postupu se sériovým prováděním činností, ale je také možné, aby rozhodující osoba naplánovala a koordinovala paralelní provádění činností.
Doufám, že vás to alespoň nastartuje. Pro trochu složitější příklad sériového workflow doporučuji podívat se na toto.
Nemám žádný příklad kódu ke sdílení, ale rozhodně můžete použít SWF ke koordinaci provádění skriptů na dvou serverech. Hlavní myšlenkou je vytvořit tři části kódu, které budou komunikovat se SWF:
- Komponenta, která ví, který skript se má spustit jako první a co má dělat, jakmile je spuštění prvního skriptu dokončeno. To se nazývá „rozhodující“ v termínech SWF.
- Dvě součásti, z nichž každá rozumí tomu, jak spustit konkrétní skript, který chcete spustit na každém počítači. Tito se nazývají „pracovníci činností“ v termínech SWF.
První komponenta, rozhodování, volá dvě rozhraní SWF API:PollForDecisionTask a RespondDecisionTaskCompleted. Požadavek na průzkum poskytne komponentě rozhodování aktuální historii spouštěného pracovního postupu, v podstatě informace o stavu „kde jsem“ pro váš spouštěč skriptů. Napíšete kód, který se podívá na tyto události, a zjistíte, který skript by se měl spustit. Tyto "příkazy" ke spuštění skriptu by měly formu plánování úkolu aktivity, který je vrácen jako součást volání RespondDecisionTaskCompleted.
Druhá komponenta, kterou napíšete, pracovníci aktivity, každá volá dvě rozhraní API SWF:PollForActivityTask a RespondActivityTaskCompleted. Požadavek na průzkum poskytne pracovníkovi aktivity indikaci, že by měl spustit skript, o kterém ví, co SWF nazývá úlohou aktivity. Informace vrácené z požadavku na dotaz do SWF mohou zahrnovat jednotlivá data specifická pro provedení, která byla odeslána do SWF jako součást plánování úlohy aktivity. Každý z vašich serverů by nezávisle dotazoval SWF na úlohy aktivity, aby indikoval provedení lokálního skriptu na tomto hostiteli. Jakmile pracovník dokončí provádění skriptu, zavolá zpět SWF prostřednictvím rozhraní API RespondActivityTaskCompleted.
Zpětné volání od vašeho pracovníka aktivity do SWF má za následek předání nové historie komponentě rozhodování, kterou jsem již zmínil. Podívá se na historii, uvidí, že první skript je hotový, a naplánuje spuštění druhého. Jakmile uvidí, že je hotovo druhé, může „uzavřít“ pracovní postup pomocí jiného typu rozhodnutí.
Celý proces spouštění skriptů na každém hostiteli zahájíte voláním rozhraní API StartWorkflowExecution. Tím se vytvoří záznam celkového procesu v SWF a spustí se první historie procesu rozhodování, který naplánuje spuštění prvního skriptu na prvním hostiteli.
Doufejme, že to poskytuje trochu více kontextu, jak dosáhnout tohoto typu pracovního postupu pomocí SWF. Pokud jste to ještě neudělali, podíval bych se do průvodce pro vývojáře na stránce SWF, kde najdete další informace.