# AsynQueue:
# Asynchronous task queueing based on the Twisted framework, with task
# prioritization and a powerful worker interface.
#
# Copyright (C) 2006-2007, 2015 by Edwin A. Suominen,
# http://edsuom.com/AsynQueue
#
# See edsuom.com for API documentation as well as information about
# Ed's background and other projects, software and otherwise.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the
# License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an "AS
# IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
# express or implied. See the License for the specific language
# governing permissions and limitations under the License.
"""
The L{TaskQueue} and its immediate support staff.
"""
import heapq, logging, sys
from contextlib import contextmanager
from zope.interface import implementer
from twisted.python.failure import Failure
from twisted.internet import reactor, interfaces, defer
# Use C Deferreds if possible, for efficiency
try:
from twisted.internet import cdefer
except:
pass
else:
defer.Deferred = cdefer.Deferred
from asynqueue import errors, tasks, iteration
from asynqueue.info import Info
class Priority(object):
"""
I provide simple, asynchronous access to a priority heap.
"""
def __init__(self):
self.heap = []
self.pendingGetCalls = []
def shutdown(self):
"""
Shuts down the priority heap, firing errbacks of the deferreds of
any L{get} requests that will not be fulfilled.
"""
if self.pendingGetCalls:
msg = "No more items forthcoming"
theFailure = Failure(errors.QueueRunError(msg))
for d in self.pendingGetCalls:
d.errback(theFailure)
def get(self):
"""
Gets an item with the highest priority (lowest value) from the
heap, returning a C{Deferred} that fires when the item becomes
available.
"""
if len(self.heap):
d = defer.succeed(heapq.heappop(self.heap))
else:
d = defer.Deferred()
self.pendingGetCalls.insert(0, d)
return d
def put(self, item):
"""
Adds the supplied I{item} to the heap, firing the oldest getter
deferred if any L{get} calls are pending.
"""
heapq.heappush(self.heap, item)
if len(self.pendingGetCalls):
d = self.pendingGetCalls.pop()
d.callback(heapq.heappop(self.heap))
def cancel(self, selector):
"""
Removes all pending items from the heap that the supplied I{selector}
function selects. The function must take an item as its sole argument
and return C{True} if it selects the item for queue removal.
"""
for item in self.heap:
if selector(item):
self.heap.remove(item)
# Fix up the possibly mangled heap list
heapq.heapify(self.heap)
@implementer(interfaces.IPushProducer)
class LoadInfoProducer(object):
"""
Produces task queue loading information.
I produce information about the current load of a
L{TaskQueue}. The information consists of the number of tasks
currently queued, and is written as a single integer to my
consumers as a single integer whenever a task is queued up and
again when it is completed.
@ivar consumer: A list of the consumers for whom I'm producing
information.
"""
def __init__(self):
self.queued = 0
self.producing = True
self.consumers = []
def registerConsumer(self, consumer):
"""
Call this with a provider of C{IConsumer} and I'll produce for it
in addition to any others already registered with me.
"""
try:
consumer.registerProducer(self, True)
except RuntimeError:
# I must have already been registered with this consumer
return
self.consumers.append(consumer)
def shutdown(self):
"""
Stop me from producing and unregister any consumers I have.
"""
self.producing = False
for consumer in self.consumers:
consumer.unregisterProducer()
def oneLess(self):
self._update(-1)
def oneMore(self):
self._update(+1)
def _update(self, increment):
self.queued += increment
if self.queued < 0:
self.queued = 0
if self.producing:
for consumer in self.consumers:
consumer.write(self.queued)
#--- IPushProducer implementation -----------------------------------------
def pauseProducing(self):
self.producing = False
def resumeProducing(self):
self.producing = True
def stopProducing(self):
self.shutdown()
class Queue(object):
"""
I am an asynchronous priority queue. Construct me with an item
handler that can be called with each item from the queue and
call L{shutdown} when I'm done.
Put anything you like in the queue except C{None} objects. Those
are reserved for triggering a queue shutdown.
You will probably use a L{TaskQueue} instead of me directly.
"""
def __init__(self, handler, timeout=None):
"""
Starts up a deferred-yielding loop that runs the queue. This
method can only be run once, by the constructor upon
instantiation.
"""
@defer.inlineCallbacks
def runner():
while True:
self._runFlag = True
item = yield self.heap.get()
if item is None:
break
self.loadInfoProducer.oneLess()
yield self.handler(item)
# Clean up after the loop exits
result = yield self.handler.shutdown(timeout)
self.heap.shutdown()
defer.returnValue(result)
if self.isRunning():
raise errors.QueueRunError(
"Startup only occurs upon instantiation")
self.heap = Priority()
self.handler = handler
self.loadInfoProducer = LoadInfoProducer()
# Start my loop
self._d = runner()
def isRunning(self):
"""
Returns C{True} if the queue is running, C{False} otherwise.
"""
return getattr(self, '_runFlag', False)
def shutdown(self):
"""
Initiates a shutdown of the queue by putting a lowest-possible
priority C{None} object onto the priority heap.
@return: A deferred that fires when my handler has shut down,
with a list of any items left unhandled in the queue.
"""
if self.isRunning():
self.heap.put(None)
d = self._d
else:
d = defer.succeed([])
self._runFlag = False
return d
def put(self, item):
"""
Put an item into my heap, only if the queue is running. Otherwise
ignored.
"""
if self.isRunning():
self.heap.put(item)
self.loadInfoProducer.oneMore()
def cancelSeries(self, series):
"""
Cancels any pending items in the specified I{series},
unceremoniously removing them from the queue.
"""
self.heap.cancel(
lambda item: getattr(item, 'series', None) == series)
def cancelAll(self):
"""
Cancels all pending items, unceremoniously removing them from the
queue.
"""
self.heap.cancel(lambda item: True)
def subscribe(self, consumer):
"""
Subscribes the supplied provider of C{IConsumer} to updates on the
number of items queued whenever it goes up or down.
The figure is the integer number of calls currently pending,
i.e., the number of items that have been queued up but haven't
yet been handled plus those that have been called but haven't
yet returned a result.
"""
if interfaces.IConsumer.providedBy(consumer):
self.loadInfoProducer.registerConsumer(consumer)
else:
raise errors.ImplementationError(
"Object doesn't provide the IConsumer interface")
class TaskQueue(object):
"""
I am a task queue for dispatching arbitrary callables to be run by
one or more worker objects.
You can construct me with one or more workers, or you can attach
them later with L{attachWorker}, in which case you'll receive an
ID that you can use to detach the worker.
@keyword timeout: A number of seconds after which to more
drastically terminate my workers if they haven't gracefully shut
down by that point.
@keyword warn: Merely log errors via an 'asynqueue' logger with
ERROR events. The default is to stop the reactor and print an
error message on C{stderr} when an error is encountered.
@keyword verbose: Provide detailed info about tasks that are logged
or result in errors.
@keyword spew: Log all task calls, whether they raise errors or
not. Can generate huge logs! Implies C{verbose=True}.
@keyword returnFailure: If a task raises an exception, call its
errback with a Failure. Default is to either log an error (if
'warn' is set) or stop the reactor.
"""
def __init__(self, *args, **kw):
"""C{TaskQueue}(self, *args, **kw)"""
# Options
self.timeout = kw.get('timeout', None)
self.warn = kw.get('warn', False)
self.spew = kw.get('spew', False)
self.returnFailure = kw.get('returnFailure', False)
if self.warn or self.spew:
handler = logging.StreamHandler(sys.stdout)
handler.setFormatter(logging.Formatter(
"%(levelname)s: %(message)s"))
self.logger = logging.getLogger('asynqueue')
self.logger.setLevel(logging.INFO if self.spew else logging.ERROR)
if kw.get('verbose', False) or self.spew:
self.info = Info(remember=True)
# Bookkeeping
self.tasksBeingRetried = []
# Tools
self.th = tasks.TaskHandler()
self.taskFactory = tasks.TaskFactory()
# Attach any workers provided now
for worker in args:
self.attachWorker(worker)
# Start things up with my very own live asynchronous queue
# using a TaskHandler
self.q = Queue(self.th, self.timeout)
def __len__(self):
"""
Returns my "length" as the number of workers currently at my
disposal.
"""
return len(self.th.roster())
def __bool__(self):
"""
I evaluate as C{True} if I am running and have at least one
worker.
"""
return self.isRunning() and len(self)
def isRunning(self):
"""
Returns C{True} if shutdown has not been initiated and both my
task handler and queue are running, C{False} otherwise.
"""
if getattr(self, '_shutdownInitiated', False):
return False
return self.th.isRunning and self.q.isRunning()
@defer.inlineCallbacks
def shutdown(self):
"""
You must call this and wait for the C{Deferred} it returns when
you're done with me. Calls L{Queue.shutdown}, among other
things.
In an earlier version, there was a system event trigger that
called this method before shutdown. But, in user code, that
had the unfortunate side effect of killing task queues before
all the tasks that might need to run in them could be
called. So you have to make sure to call this method sometime
yourself.
"""
def oops(failure):
failure.printDetailedTraceback()
if self.isRunning():
self._shutdownInitiated = True
yield self.th.shutdown().addErrback(oops)
yield self.q.shutdown().addErrback(oops)
if hasattr(self, '_dc') and self._dc.active():
self._dc.cancel()
for dc in tasks.Task.timeoutCalls:
if dc.active():
dc.cancel()
def attachWorker(self, worker):
"""
Registers a new provider of C{IWorker} for working on tasks from
the queue.
@return: A C{Deferred} that fires with an integer ID uniquely
identifying the worker.
@see: L{tasks.TaskHandler.hire}.
"""
return self.th.hire(worker)
def _getWorkerID(self, workerOrID):
if workerOrID in self.th.workers:
return workerOrID
for thisID, worker in self.th.workers.items():
if worker == workerOrID:
return thisID
def detachWorker(self, workerOrID, reassign=False, crash=False):
"""
Detaches and terminates the worker supplied or specified by its
ID.
Returns a C{Deferred} that fires with a list of any tasks left
unfinished by the worker.
If I{reassign} is set C{True}, any tasks left unfinished by
the worker are put into new assignments for other or future
workers. Otherwise, they are returned via the deferred's
callback.
@see: L{tasks.TaskHandler.terminate}.
"""
ID = self._getWorkerID(workerOrID)
if ID is None:
return defer.succeed([])
if crash:
return self.th.terminate(ID, crash=True, reassign=reassign)
return self.th.terminate(ID, self.timeout, reassign=reassign)
def qualifyWorker(self, worker, series):
"""
Adds the specified I{series} to the qualifications of the supplied
I{worker}.
"""
if series not in worker.iQualified:
worker.iQualified.append(series)
self.th.assignmentFactory.request(worker, series)
def workers(self, ID=None):
"""
Returns the worker object specified by I{ID}, or C{None} if that
worker is not employed with me.
If no ID is specified, a list of the workers currently
attached, in no particular order, will be returned instead.
"""
if ID is None:
return list(self.th.workers.values())
return self.th.workers.get(ID, None)
def taskDone(self, statusResult, task, **kw):
"""
Processes the status/result tuple from a worker running a
task. You don't need to call this directly.
- B{e}: An B{e}xception was raised; the result is a
pretty-printed traceback string. If the keyword
'returnFailure' was set for my constructor or this task, I
will make it into a failure so the task's errback is
triggered.
- B{r}: The task B{r}an fine, the result is the return value
of the call.
- B{i}: Ran fine, but the result was an B{i}terable other
than a standard Python one. So my result is a Deferator
that yields deferreds to the worker's iterations, or, if
you specified a consumer, an IterationProducer registered
with the consumer that needs to get running to write
iterations to it. If the iterator was empty, the result is
just an empty list.
- B{c}: Ran fine (on an AMP server), but the result is being
B{c}hunked because it was too big for a single return
value. So the result is a deferred that will eventually
fire with the result after all the chunks of the return
value have arrived and been magically pieced together and
unpickled.
- B{t}: The task B{t}imed out. I'll try to re-run it, once.
- B{n}: The task returned [n]othing, as will I.
- B{d}: The task B{d}idn't run, probably because there was a
disconnection. I'll re-run it.
"""
@contextmanager
def taskInfo(ID):
if hasattr(self, 'logger'):
if ID:
taskInfo = self.info.aboutCall(ID)
self.info.forgetID(ID)
yield taskInfo
else:
# Why do logging without an info object?
yield "TASK"
else:
yield None
if self.spew:
taskInfo += " -> {}".format(result)
self.logger.info(taskInfo)
def retryTask():
self.tasksBeingRetried.append(task)
task.rush()
self.q.put(task)
return task.reset().addCallback(self.taskDone, task, **kw)
status, result = statusResult
# Deal with any info for this task call
with taskInfo(kw.get('ID', None)) as prefix:
if status == b'e':
# There was an error...
if prefix:
# ...log it
self.logger.error("{}: {}".format(prefix, result))
if kw.get('rf', False):
# ...just return the Failure
result = Failure(errors.WorkerError(result))
elif not self.warn:
# ...stop the reactor
import sys
for msg in ("ERROR: {}".format(result),
"Shutting down in one second!\n"):
sys.stderr.write("\n{}".format(msg))
self._dc = reactor.callLater(1.0, reactor.stop)
return result
if status in (b'r', b'c'):
# A plain result, or a deferred to a chunked one
return result
if status == b'i':
# An iteration, possibly an IterationConsumer that we need
# to run now
if kw.get('consumer', None):
if hasattr(result, 'run'):
return result.run()
# Nothing to produce from an empty iterator, consider
# the iterations "done" right away.
return defer.succeed(None)
return result
if status == b't':
# Timed out. Try again, once.
if task in self.tasksBeingRetried:
self.tasksBeingRetried.remove(task)
return Failure(
errors.TimeoutError(
"Timed out after two tries, gave up"))
return retryTask()
if status == b'n':
# None object
return
if status == b'd':
# Didn't run. Try again, hopefully with a different worker.
return retryTask()
return Failure(
errors.WorkerError("Unknown status '{}'".format(status)))
def newTask(self, func, args, kw):
"""
Makes a new L{tasks.Task} object from a func-args-kw combo. You
won't call this directly.
"""
if not self.isRunning():
text = Info().setCall(func, args, kw).aboutCall()
raise errors.QueueRunError(text)
# Some parameters just for me, not for the task
niceness = kw.pop('niceness', 0 )
series = kw.pop('series', None )
timeout = kw.pop('timeout', None )
doLast = kw.pop('doLast', False )
rf = kw.pop('returnFailure', self.returnFailure )
task = self.taskFactory.new(func, args, kw, niceness, series, timeout)
# Workers have to honor the consumer and doNext keywords, too
if kw.get('doNext', False):
task.rush()
elif doLast:
task.relax()
kwTD = { 'rf': rf, 'consumer': kw.get('consumer', None) }
if hasattr(self, 'info'):
kwTD['ID'] = self.info.setCall(func, args, kw).ID
task.addCallback(self.taskDone, task, **kwTD)
return task
def call(self, func, *args, **kw):
"""
Queues up a function call.
Puts a call to I{func} with any supplied arguments and
keywords into the pipeline. This is perhaps the B{single most
important method} of the AsynQueue API.
Scheduling of the call is impacted by the I{niceness} keyword
that can be included in addition to any keywords for the
call. As with UNIX niceness, the value should be an integer
where 0 is normal scheduling, negative numbers are higher
priority, and positive numbers are lower priority.
Tasks in a series of tasks all having niceness N+10 are
dequeued and run at approximately half the rate of tasks in
another series with niceness N.
@return: A C{Deferred} to the eventual result of the call when
it is eventually pulled from the pipeline and run.
@keyword niceness: Scheduling niceness, an integer between -20
and 20, with lower numbers having higher scheduling priority
as in UNIX C{nice} and C{renice}.
@keyword series: A hashable object uniquely identifying a
series for this task. Tasks of multiple different series
will be run with somewhat concurrent scheduling between the
series even if they are dumped into the queue in big
batches, whereas tasks within a single series will always
run in sequence (except for niceness adjustments).
@keyword doNext: Set C{True} to assign highest possible
priority, even higher than a deeply queued task with
niceness = -20.
@keyword doLast: Set C{True} to assign priority so low that
any other-priority task gets run before this one, no matter
how long this task has been queued up.
@keyword timeout: A timeout interval in seconds from when a
worker gets a task assignment for the call, after which the
call will be retried.
@keyword consumer: An implementor of C{interfaces.IConsumer}
that will receive iterations if the result of the call is an
interator. In such case, the returned result is a deferred
that fires (with a reference to the consumer) when the
iterations have all been produced.
@keyword returnFailure: If a task raises an exception, call
its errback with a Failure. Default is to either log an
error (if 'warn' is set) or stop the queue.
"""
task = self.newTask(func, args, kw)
self.q.put(task)
return task.d
def update(self, func, *args, **kw):
"""
Sets an update task from I{func} with any supplied arguments and
keywords to be run directly on all current and future
workers.
Returns a C{Deferred} to the result of the call on all current
workers, though there is no mechanism for obtaining such
results for new hires, so it's probably best not to rely too
much on them.
The updates are run directly via L{tasks.TaskHandler.update},
not through the queue. Because of the disconnect between
queued and direct calls, it is likely but not guaranteed that
any jobs you have queued when this method is called will run
on a particular worker B{after} this update is run. Wait for
the C{Deferred} from this method to fire before queuing any
jobs that need the update to be in place before running.
If you don't want the task saved to the update list, but only
run on the workers currently attached, set the I{ephemeral}
keyword C{True}.
"""
if 'consumer' in kw:
raise ValueError(
"Can't supply a consumer for an update because there "+\
"may be multiple iteration producers")
ephemeral = kw.pop('ephemeral', False)
task = self.newTask(func, args, kw)
return self.th.update(task, ephemeral)