I am a task queue for dispatching arbitrary callables to be run by one or more worker objects.

You can construct me with one or more workers, or you can attach them later with attachWorker, in which case you'll receive an ID that you can use to detach the worker.

ParameterstimeoutA number of seconds after which to more drastically terminate my workers if they haven't gracefully shut down by that point.
warnMerely log errors via an 'asynqueue' logger with ERROR events. The default is to stop the reactor and print an error message on stderr when an error is encountered.
verboseProvide detailed info about tasks that are logged or result in errors.
spewLog all task calls, whether they raise errors or not. Can generate huge logs! Implies verbose=True.
returnFailureIf a task raises an exception, call its errback with a Failure. Default is to either log an error (if 'warn' is set) or stop the reactor.
Method __init__ TaskQueue(self, *args, **kw)
Method __len__ Returns my "length" as the number of workers currently at my disposal.
Method __bool__ I evaluate as True if I am running and have at least one worker.
Method isRunning Returns True if shutdown has not been initiated and both my task handler and queue are running, False otherwise.
Method shutdown You must call this and wait for the Deferred it returns when you're done with me. Calls Queue.shutdown, among other things.
Method attachWorker Registers a new provider of IWorker for working on tasks from the queue.
Method detachWorker Detaches and terminates the worker supplied or specified by its ID.
Method qualifyWorker Adds the specified series to the qualifications of the supplied worker.
Method workers Returns the worker object specified by ID, or None if that worker is not employed with me.
Method taskDone Processes the status/result tuple from a worker running a task. You don't need to call this directly.
Method newTask Makes a new tasks.Task object from a func-args-kw combo. You won't call this directly.
Method call Queues up a function call.
Method update Sets an update task from func with any supplied arguments and keywords to be run directly on all current and future workers.
Method _getWorkerID Undocumented
def __init__(self, *args, **kw):
def __len__(self):

Returns my "length" as the number of workers currently at my disposal.

def __bool__(self):

I evaluate as True if I am running and have at least one worker.

def isRunning(self):

Returns True if shutdown has not been initiated and both my task handler and queue are running, False otherwise.

@defer.inlineCallbacks
def shutdown(self):

You must call this and wait for the Deferred it returns when you're done with me. Calls Queue.shutdown, among other things.

In an earlier version, there was a system event trigger that called this method before shutdown. But, in user code, that had the unfortunate side effect of killing task queues before all the tasks that might need to run in them could be called. So you have to make sure to call this method sometime yourself.

def attachWorker(self, worker):

Registers a new provider of IWorker for working on tasks from the queue.

ReturnsA Deferred that fires with an integer ID uniquely identifying the worker.
See Alsotasks.TaskHandler.hire.
def _getWorkerID(self, workerOrID):
Undocumented
def detachWorker(self, workerOrID, reassign=False, crash=False):

Detaches and terminates the worker supplied or specified by its ID.

Returns a Deferred that fires with a list of any tasks left unfinished by the worker.

If reassign is set True, any tasks left unfinished by the worker are put into new assignments for other or future workers. Otherwise, they are returned via the deferred's callback.

See Alsotasks.TaskHandler.terminate.
def qualifyWorker(self, worker, series):

Adds the specified series to the qualifications of the supplied worker.

def workers(self, ID=None):

Returns the worker object specified by ID, or None if that worker is not employed with me.

If no ID is specified, a list of the workers currently attached, in no particular order, will be returned instead.

def taskDone(self, statusResult, task, **kw):

Processes the status/result tuple from a worker running a task. You don't need to call this directly.

  • e: An exception was raised; the result is a pretty-printed traceback string. If the keyword 'returnFailure' was set for my constructor or this task, I will make it into a failure so the task's errback is triggered.
  • r: The task ran fine, the result is the return value of the call.
  • i: Ran fine, but the result was an iterable other than a standard Python one. So my result is a Deferator that yields deferreds to the worker's iterations, or, if you specified a consumer, an IterationProducer registered with the consumer that needs to get running to write iterations to it. If the iterator was empty, the result is just an empty list.
  • c: Ran fine (on an AMP server), but the result is being chunked because it was too big for a single return value. So the result is a deferred that will eventually fire with the result after all the chunks of the return value have arrived and been magically pieced together and unpickled.
  • t: The task timed out. I'll try to re-run it, once.
  • n: The task returned [n]othing, as will I.
  • d: The task didn't run, probably because there was a disconnection. I'll re-run it.
def newTask(self, func, args, kw):

Makes a new tasks.Task object from a func-args-kw combo. You won't call this directly.

def call(self, func, *args, **kw):

Queues up a function call.

Puts a call to func with any supplied arguments and keywords into the pipeline. This is perhaps the single most important method of the AsynQueue API.

Scheduling of the call is impacted by the niceness keyword that can be included in addition to any keywords for the call. As with UNIX niceness, the value should be an integer where 0 is normal scheduling, negative numbers are higher priority, and positive numbers are lower priority.

Tasks in a series of tasks all having niceness N+10 are dequeued and run at approximately half the rate of tasks in another series with niceness N.

ParametersnicenessScheduling niceness, an integer between -20 and 20, with lower numbers having higher scheduling priority as in UNIX nice and renice.
seriesA hashable object uniquely identifying a series for this task. Tasks of multiple different series will be run with somewhat concurrent scheduling between the series even if they are dumped into the queue in big batches, whereas tasks within a single series will always run in sequence (except for niceness adjustments).
doNextSet True to assign highest possible priority, even higher than a deeply queued task with niceness = -20.
doLastSet True to assign priority so low that any other-priority task gets run before this one, no matter how long this task has been queued up.
timeoutA timeout interval in seconds from when a worker gets a task assignment for the call, after which the call will be retried.
consumerAn implementor of interfaces.IConsumer that will receive iterations if the result of the call is an interator. In such case, the returned result is a deferred that fires (with a reference to the consumer) when the iterations have all been produced.
returnFailureIf a task raises an exception, call its errback with a Failure. Default is to either log an error (if 'warn' is set) or stop the queue.
ReturnsA Deferred to the eventual result of the call when it is eventually pulled from the pipeline and run.
def update(self, func, *args, **kw):

Sets an update task from func with any supplied arguments and keywords to be run directly on all current and future workers.

Returns a Deferred to the result of the call on all current workers, though there is no mechanism for obtaining such results for new hires, so it's probably best not to rely too much on them.

The updates are run directly via tasks.TaskHandler.update, not through the queue. Because of the disconnect between queued and direct calls, it is likely but not guaranteed that any jobs you have queued when this method is called will run on a particular worker after this update is run. Wait for the Deferred from this method to fire before queuing any jobs that need the update to be in place before running.

If you don't want the task saved to the update list, but only run on the workers currently attached, set the ephemeral keyword True.

API Documentation for AsynQueue, generated by pydoctor at 2022-11-17 13:13:24.