ATasks is an asynchronous distributed task queue system.
Every task is defined as an asynchronous coroutine. We call such a task atask:
a(synchronous) task.
atask looks like a usual asynchronous coroutine. It may be awaited using
await syntax, and controlled by the asyncio package.
The atask may await other coroutines and atasks. Because of asynchronous
nature of atask it doesn't block a thread evaluating atask and
allows easy and transparent task decomposition as usual asynchronous
procedure, including sequential and parallel awaiting of other atasks.
Stable version from the PyPi package repository
pip install atasksLast development version from the GitHub source version control system
pip install git+git://github.com/nnseva/atasks.git
Before execution some number of core objects should be constructed and initialized.
from atasks.transport.backends.amqp import AMQPTransport
from atasks.router import get_router
from atasks.codecs import PickleCodec
...
PickleCodec()
transport = AMQPTransport()
await transport.connect()
if mode == 'server':
router = get_router()
await router.activate(transport)Codec determines a way to encode and decode objects passed through the network. It should support as many types as it can.
The atasks.codecs.PickleCodec provided by the package uses standard python pickle package.
It is universal but not always safe solution.
from atasks.codecs import PickleCodec
...
PickleCodec()User can inherit atasks.codecs.Codec as a base class and create an own codec implementation.
Just replace all methods generating NotImplementedError. Note that most of methods are asynchronous.
from atasks.codecs import Codec
class MyCodec(Codec):
async def encode(self, obj):
...
async def decode(self, content):
...To activate a codec, yu need just create an instance of it. The codec is installed into the system while construction.
Transport determines the method of sending requests and returning results
from awaiter to the performing coroutine and back to support awaiting
atasks among a network.
The atasks.transport.base.LoopbackTransport provided by the package passes
all requests back to the awaiter thread only. It doesn't allow atasks
performing distribution among several processes or even threads. You can
use it for the testing purposes.
The atasks.transport.backends.amqp.AMQPTransport provided by the package passes
requests through the RabbitMQ or other AMQP broker to any ATasks worker started
on the same or another host.
After creation a transport instance, the asynchronous connect() method of just
created instance should be awaited.
from atasks.transport/base import LoopbackTransport
from atasks.transport.backends.amqp import AMQPTransport
...
if transport == 'loopback':
LoopbackTransport()
elif transport == 'amqp':
AMQPTransport()
await transport.connect()Other transport kinds may be implemented later.
User can inherit atasks.transport.base.Transport as a base class and create an own
transport implementation. Just replace all methods generating NotImplementedError. Note that
most of methods are asynchronous.
from atasks.transport.base import Transport
class MyTransport(Transport):
async def connect(self):
...
async def disconnect(self):
...
async def send_request(self, name, content):
...Router determines a way how the reference looks like, how it is awaited, what data are passed over the network etc. Router is a core of the ATasks package.
The atasks.router.Router is an only default router implementation.
User can inherit atasks.router.Router and create an own
router implementation if necessary.
As a rule, you don't need to do it. In this case, you can just
use get_router() function to get a default router instance.
from atasks.router import get_router
...
router = get_router()If your application should send requests only, no any other actions required on the initialization stage.
Server application which listens to events should also activate a transport to receive requests:
server = AMQPTransport()
...
router = get_router()
await router.activate(server)Decorator atasks.tasks.atask is used to markup the asynchronous coroutine
(or even synchronous returning future object) as an asynchronous distributed
task.
Note that the first call to the wrapper creates a default router. You should create your own Router (or ancestor) instance before the first call to the wrapper if necessary.
@atask
async def some_task(a):
...Client and server should use the same module defining atasks as a rule.
In order to await atask the atask name is used. Default name is determined
by the coroutine name and containing module. You can replace a default name
using additional name parameter of the decorator:
@atask(name="some_other_name")
async def some_task(a):
...The atask is awaited as a usual coroutine. You can use await keyword, or
get a future calling atask synchronously and control future using asyncio module.
@atask
async def some_task(a):
ret = await some_other_task(a)
@atask
async def some_other_task(a):
...
async def not_a_task_just_coro():
a = await some_task(42)
...Objects may be instantiated in separate namespaces. Just
pass an additional namespace=... parameter to:
- constructor of codec, transport, or route object
- atask decorator
get_route,get_transport, orget_codecfunction
One namespace is completely separated from anoher. Every namespace uses it's own set of router, transport, and codec, so init them separately for every namespace which is used in your application.
The default namespace has a name default.
You can await task from one namespace in another.
@atask(namespace='one')
async def some_task():
await some_other_task()
...
@atask(namespace='other')
async def some_other_task():
....The package uses Django management subsystem to provide command-line interface.
Django project using django_atasks application has the following command:
python manage.py run_atask file-or-module [options here]The command runs any file-or-module referenced in the command line which contains
@atask definitions and optional aiomain asynchronous coroutine. The
optional aiomain coroutine is evaluated when the file is running.
All options passed to the command are passed then to the aiomain keyword parameters.
The run_atask management command initializes all necessary objects (as
described above) to run module in three available modes: server, client,
and loopback. The loopback mode allows to use the same process instance
as server and client simultaneously.
Note that if you use dedicated server process instance, you should not use
loopback transport (which is not appropriate to reach the dedicated server
in this case). Use amqp (or other interprocess transport) instead.
The module naming is different slightly depending on what you use in command line,
either file name, or module name. Use the same module naming starting server
and client to avoid misnaming of atasks.
You can start several modules simultaneously in one process instance enlisting them all in the command line.
You can start several server process instances, the client will then request them in arbitrary order.
Call the help command to see the command details.
See dev/tests/scenarios.py file as an example of the file which can be called
by the run_atask management command.
The idea of ATasks has been inspired by asyncio, Celery
and aiotasks packages.
The main advantages of the ATasks comparing with Celery:
- asynchronous task evaluation instead of synchronous tasks
- free combining of
ataskawaits inside anotherataskusingawait - easy awaiting an
ataskand getting a result - parallelization using standard asynchronous syntax
- no any restriction for recurrent
awaits
The main advantages of the ATasks comparing with aiotasks:
- easier getting a result (
awaitinstead ofasync with) - full transparency - the only difference from usual
coroutine
awaitis distributingatasksevaluation among a network - actual development
The main disadvantages comparing with Celery and aiotasks:
delay(),send(),async_call(),s()etc. syntax is not available, and will never be implemented
Usual scenarios see in the scenarios.py file.
After the atask is started, it is running in one thread from the beginning
to the end. Other atasks may share the same thread in an asynchronous manner.
On the other side, another atask called from the first one may be
running on any ATasks worker, on the same as the first one, or another
worker and host, depending on the decision taken on the transport layer,
and present ATask workers connected to the same transport layer.
The point where the atask is awaited is the only point of taking
a decision, where the awaited atask should run. The
transport layer takes this decision.
The ATasks application can issue remote awaits immediately after
transport connect(). The ATask application receives remote
awaits after the activate() call of the Router.
The LoopbackTransport always passes all awaits immediately to
coroutines in the same thread. It may be used for testing purposes.
Other Transports may allow remote awaits inside a process,
or a host, or passed among a network.
The AMQPTransport allows using RabbitMQ (or analogue) to
pass remote awaits among a network to any number
of instances.
???
The awaiting coroutine will take an exception if the atask is crashed
with exception. The exception should be serializable using codec.
???
???