asynqueue.base.TaskQueue(object)
class documentation
Part of asynqueue.base
(View In Hierarchy)
Known subclasses: asynqueue.null.NullQueue, asynqueue.process.ProcessQueue, asynqueue.threads.ThreadQueue
I am a task queue for dispatching arbitrary callables to be run by one or more worker objects.
You can construct me with one or more workers, or you can attach them
later with attachWorker
,
in which case you'll receive an ID that you can use to detach the
worker.
Parameters | timeout | A number of seconds after which to more drastically terminate my workers if they haven't gracefully shut down by that point. |
warn | Merely log errors via an 'asynqueue' logger with ERROR events. The default
is to stop the reactor and print an error message on stderr
when an error is encountered. | |
verbose | Provide detailed info about tasks that are logged or result in errors. | |
spew | Log all task calls, whether they raise errors or not. Can generate huge
logs! Implies verbose=True . | |
returnFailure | If a task raises an exception, call its errback with a Failure. Default is to either log an error (if 'warn' is set) or stop the reactor. |
Method | __init__ | TaskQueue (self, *args, **kw) |
Method | __len__ | Returns my "length" as the number of workers currently at my disposal. |
Method | __bool__ | I evaluate as True if I am running and have at least one
worker. |
Method | isRunning | Returns True if shutdown has not been initiated and both my
task handler and queue are running, False otherwise. |
Method | shutdown | You must call this and wait for the Deferred it returns
when you're done with me. Calls Queue.shutdown ,
among other things. |
Method | attachWorker | Registers a new provider of IWorker for working on tasks
from the queue. |
Method | detachWorker | Detaches and terminates the worker supplied or specified by its ID. |
Method | qualifyWorker | Adds the specified series to the qualifications of the supplied worker. |
Method | workers | Returns the worker object specified by ID, or None
if that worker is not employed with me. |
Method | taskDone | Processes the status/result tuple from a worker running a task. You don't need to call this directly. |
Method | newTask | Makes a new tasks.Task object from a
func-args-kw combo. You won't call this directly. |
Method | call | Queues up a function call. |
Method | update | Sets an update task from func with any supplied arguments and keywords to be run directly on all current and future workers. |
Method | _getWorkerID | Undocumented |
TaskQueue
(self, *args, **kw)
Returns True
if shutdown has not been initiated and both my
task handler and queue are running, False
otherwise.
def shutdown(self):
You must call this and wait for the Deferred
it returns
when you're done with me. Calls Queue.shutdown
,
among other things.
In an earlier version, there was a system event trigger that called this method before shutdown. But, in user code, that had the unfortunate side effect of killing task queues before all the tasks that might need to run in them could be called. So you have to make sure to call this method sometime yourself.
Registers a new provider of IWorker
for working on tasks
from the queue.
Returns | A Deferred that fires with an integer ID uniquely identifying
the worker. | |
See Also | tasks.TaskHandler.hire . |
Detaches and terminates the worker supplied or specified by its ID.
Returns a Deferred
that fires with a list of any tasks left
unfinished by the worker.
If reassign is set True
, any tasks left unfinished
by the worker are put into new assignments for other or future workers.
Otherwise, they are returned via the deferred's callback.
See Also | tasks.TaskHandler.terminate . |
Adds the specified series to the qualifications of the supplied worker.
Returns the worker object specified by ID, or None
if that worker is not employed with me.
If no ID is specified, a list of the workers currently attached, in no particular order, will be returned instead.
Processes the status/result tuple from a worker running a task. You don't need to call this directly.
- e: An exception was raised; the result is a pretty-printed traceback string. If the keyword 'returnFailure' was set for my constructor or this task, I will make it into a failure so the task's errback is triggered.
- r: The task ran fine, the result is the return value of the call.
- i: Ran fine, but the result was an iterable other than a standard Python one. So my result is a Deferator that yields deferreds to the worker's iterations, or, if you specified a consumer, an IterationProducer registered with the consumer that needs to get running to write iterations to it. If the iterator was empty, the result is just an empty list.
- c: Ran fine (on an AMP server), but the result is being chunked because it was too big for a single return value. So the result is a deferred that will eventually fire with the result after all the chunks of the return value have arrived and been magically pieced together and unpickled.
- t: The task timed out. I'll try to re-run it, once.
- n: The task returned [n]othing, as will I.
- d: The task didn't run, probably because there was a disconnection. I'll re-run it.
Makes a new tasks.Task
object from a
func-args-kw combo. You won't call this directly.
Queues up a function call.
Puts a call to func with any supplied arguments and keywords into the pipeline. This is perhaps the single most important method of the AsynQueue API.
Scheduling of the call is impacted by the niceness keyword that can be included in addition to any keywords for the call. As with UNIX niceness, the value should be an integer where 0 is normal scheduling, negative numbers are higher priority, and positive numbers are lower priority.
Tasks in a series of tasks all having niceness N+10 are dequeued and run at approximately half the rate of tasks in another series with niceness N.
Parameters | niceness | Scheduling niceness, an integer between -20 and 20, with lower numbers
having higher scheduling priority as in UNIX nice and
renice . |
series | A hashable object uniquely identifying a series for this task. Tasks of multiple different series will be run with somewhat concurrent scheduling between the series even if they are dumped into the queue in big batches, whereas tasks within a single series will always run in sequence (except for niceness adjustments). | |
doNext | Set True to assign highest possible priority, even higher than
a deeply queued task with niceness = -20. | |
doLast | Set True to assign priority so low that any other-priority
task gets run before this one, no matter how long this task has been queued
up. | |
timeout | A timeout interval in seconds from when a worker gets a task assignment for the call, after which the call will be retried. | |
consumer | An implementor of interfaces.IConsumer that will receive
iterations if the result of the call is an interator. In such case, the
returned result is a deferred that fires (with a reference to the consumer)
when the iterations have all been produced. | |
returnFailure | If a task raises an exception, call its errback with a Failure. Default is to either log an error (if 'warn' is set) or stop the queue. | |
Returns | A Deferred to the eventual result of the call when it is
eventually pulled from the pipeline and run. |
Sets an update task from func with any supplied arguments and keywords to be run directly on all current and future workers.
Returns a Deferred
to the result of the call on all current
workers, though there is no mechanism for obtaining such results for new
hires, so it's probably best not to rely too much on them.
The updates are run directly via tasks.TaskHandler.update
,
not through the queue. Because of the disconnect between queued and direct
calls, it is likely but not guaranteed that any jobs you have queued when
this method is called will run on a particular worker after this
update is run. Wait for the Deferred
from this method to fire
before queuing any jobs that need the update to be in place before
running.
If you don't want the task saved to the update list, but only run on the
workers currently attached, set the ephemeral keyword
True
.