# AsynQueue:
# Asynchronous task queueing based on the Twisted framework, with task
# prioritization and a powerful worker interface.
#
# Copyright (C) 2006-2007, 2015 by Edwin A. Suominen,
# http://edsuom.com/AsynQueue
#
# See edsuom.com for API documentation as well as information about
# Ed's background and other projects, software and otherwise.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the
# License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an "AS
# IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
# express or implied. See the License for the specific language
# governing permissions and limitations under the License.
"""
Iteration, Twisted style!
This module contains multitudes; consider it carefully. It provides a
way of dealing with iterations asynchronously. The L{Deferator} yields
C{Deferred} objects, an asynchronous version of an iterator.
Even cooler is L{IterationProducer}, which I{produces} iterations to
an implementor of C{twisted.internet.interfaces.IConsumer}. You can
make one out of an iterator with L{iteratorToProducer}.
The L{Delay} object is also very useful, both as a
Deferred-after-delay callable and a way to get a C{Deferred} that
fires when an event occurs. This is the key to getting
L{process.ProcessWorker} to work so nicely via Python's standard
multiprocessing module.
"""
import time, inspect
from zope.interface import implementer
from twisted.internet import defer, reactor
from twisted.python.failure import Failure
from twisted.internet.interfaces import IPushProducer, IConsumer
from asynqueue import errors
def deferToDelay(delay):
"""
Returns a C{Deferred} that fires after the specified I{delay} (in
seconds).
"""
return Delay(delay)()
def isIterator(x):
"""
@see: L{Deferator.isIterator}
"""
return Deferator.isIterator(x)
class Delay(object):
"""
I let you delay things and wait for things that may take a while,
in Twisted fashion.
Perhaps a bit more suited to the L{util} module, but that would
require this module to import it, and it imports this module.
With event delays of 100 ms to 1 second (in
L{process.ProcessWorker}, setting I{backoff} to 1.10 seems more
efficient than 1.05 or 1.20, with an (initial) I{interval} of 50
ms. However, you may want to tune things for your application and
system.
@ivar interval: The initial event-checking interval, in seconds.
@type interval: float
@ivar backoff: The backoff exponent.
@type backoff: float
"""
_interval = 0.001
_backoff = 1.10
def __init__(self, interval=None, backoff=None, timeout=None):
self.interval = self._interval if interval is None else interval
self.backoff = self._backoff if backoff is None else backoff
if timeout: self.timeout = timeout
if self.backoff < 1.0 or self.backoff > 1.3:
raise ValueError(
"Unworkable backoff {:f}, keep it in 1.0-1.3 range".format(
self.backoff))
self.pendingCalls = {}
self.triggerID = reactor.addSystemEventTrigger(
'before', 'shutdown', self.shutdown)
def shutdown(self):
"""
This gets called before the reactor shuts down. Causes any pending
delays or L{untilEvent} calls to finish up pronto.
Does not return a C{Deferred}, because it doesn't return until
it's forced everything to wind up.
"""
if self.triggerID is None:
return
reactor.removeSystemEventTrigger(self.triggerID)
self.triggerID = None
while self.pendingCalls:
call = list(self.pendingCalls.keys())[0]
if call.active():
self.pendingCalls[call].callback(None)
call.cancel()
def __call__(self, delay=None):
"""
Returns a C{Deferred} that fires after my default delay interval
or one you specify.
You can have it fire in the next reactor iteration by setting
I{delay} to zero (not C{None}, as that will use the default
delay instead).
The default interval is 10ms unless you override that in by
setting my I{interval} attribute to something else.
"""
def delayOver(null):
self.pendingCalls.pop(call, None)
if self.triggerID is None:
return defer.succeed(None)
if delay is None:
delay = self.interval
d = defer.Deferred().addCallback(delayOver)
call = reactor.callLater(delay, d.callback, None)
self.pendingCalls[call] = d
return d
@defer.inlineCallbacks
def untilEvent(self, eventChecker, *args, **kw):
"""
Returns a C{Deferred} that fires when a call to the supplied
event-checking callable returns an affirmative result, or
until the optional timeout limit is reached.
An affirmative result evaluates at C{True}. (Not C{None},
C{False}, zero, etc.) The result of the C{Deferred} is C{True}
if the event actually happened, or C{False} if a timeout
occurred. Call with:
- I{eventChecker}: A callable that returns an immediate
boolean value indicating if an event occurred.
- I{*args}: Any args for the event checker callable.
- I{**kw}: Any keywords for the event checker callable.
The event checker should B{not} return a C{Deferred}. I call
the event checker less and less frequently as the wait goes
on, depending on the backoff exponent (default is C{1.04}).
"""
if not callable(eventChecker):
raise TypeError("You must supply a callable event checker")
# We do two very quick checks before entering the delay loop,
# to minimize overhead when dealing with very fast events.
if self.triggerID is None or eventChecker(*args, **kw):
# First, if I have been shut down or the event has
# already happened, we don't enter the loop at all.
defer.returnValue(True)
else:
t0 = time.time()
interval = self.interval
# Second, we "wait" until the very next reactor iteration
# to do another check, as the first and possibly only loop
# iteration.
yield self(0)
while self.triggerID is not None:
if eventChecker(*args, **kw):
defer.returnValue(True)
break
if hasattr(self, 'timeout') and time.time()-t0 > self.timeout:
defer.returnValue(False)
break
# No response yet, check again after the poll interval,
# which increases exponentially so that each incremental
# delay is somewhat proportional to the amount of time
# spent waiting thus far.
yield self(interval)
interval *= self.backoff
class Deferator(object):
"""
B{Defer}red-yielding iterB{ator}.
Use an instance of me in place of a task result that is an
iterable other than one of Python's built-in containers (C{list},
C{dict}, etc.). I yield deferreds to the next iteration of the
result and maintain an internal deferred that fires when the
iterations are done or terminated cleanly with a call to my
L{stop} method. The deferred fires with C{True} if the iterations
were completed, or C{False} if not, i.e., a stop was done.
Access the done-iterating deferred via my I{d} attribute. I also
try to provide access to its methods attributes and attributes as
if they were my own.
When the deferred from my first L{next} call fires, with the first
iteration of the underlying (possibly remote) iterable, you can
call L{next} again to get a deferred to the next one, and so on,
until I raise L{StopIteration} just like a regular iterable.
B{NOTE}: There are two very important rules. First, you B{must}
wrap my iteration in a C{defer.inlineCallbacks} loop or otherwise
wait for each yielded deferred to fire before asking for the next
one. Second, you must call the L{stop} method of the Deferator (or
the deferreds it yields) before doing a C{break} or C{return} to
prematurely terminate the loop.
Good behavior looks something like this::
@defer.inlineCallbacks
def printItems(self, ID):
for d in Deferator("remoteIterator", getMore, ID):
listItem = yield d
print listItem
if listItem == "Danger Will Robinson":
d.stop()
# You still have to break out of the loop after calling
# the deferator's stop method
return
Instantiate me with a string representation of the underlying
iterable (or the object itself, if it's handy) and a function
(along with any args and kw) that returns a deferred to a 3-tuple
containing (1) the next value yielded from the task result, (2) a
Bool indicating if this value is valid or a bogus first one from
an empty iterator, and (3) a Bool indicating whether there are
more iterations left.
This requires your get-more function to be one step ahead somehow,
returning C{False} as its status indicator when the I{next} call
would raise L{StopIteration}. Use L{Prefetcherator.getNext} after
setting up the prefetcherator with a suitable iterator or
next-item callable.
The object (or string representation) isn't strictly needed; it's
for informative purposes in case an error gets propagated back
somewhere. You can cheat and just use C{None} for the first
constructor argument. Or you can supply a Prefetcherator as the
first and sole argument, or an iterator for which a
L{Prefetcherator} will be constructed internally.
"""
builtIns = (
str, list, tuple, bytearray, memoryview, dict, set, frozenset)
@classmethod
def isIterator(cls, obj):
"""
Tells you if I{obj} is a suitable iterator.
@return: C{True} if the object is an iterator suitable for use
with me, C{False} otherwise.
"""
if isinstance(obj, defer.Deferred):
return False
if isinstance(obj, cls.builtIns):
return False
if inspect.isgenerator(obj) or inspect.isgeneratorfunction(obj):
return True
for attrName in ('__iter__', '__next__'):
if not callable(getattr(obj, attrName, None)):
return False
return True
def __init__(self, objOrRep, *args, **kw):
self.d = defer.Deferred()
self.moreLeft = True
if isinstance(objOrRep, str):
# Use supplied string representation
self.representation = objOrRep.strip('<>')
else:
# Use repr of the object itself
self.representation = repr(objOrRep)
if args:
# A callTuple was supplied
self.callTuple = args[0], args[1:], kw
return
if isinstance(objOrRep, Prefetcherator):
# A Prefetcherator was supplied
self.callTuple = (objOrRep.getNext, [], {})
return
if self.isIterator(objOrRep):
# An iterator was supplied for which I will make my own
# Prefetcherator
pf = Prefetcherator()
if pf.setup(objOrRep):
self.callTuple = (pf.getNext, [], {})
return
# Nothing worked; make me equivalent to an empty iterator
self.representation = repr([])
# The non-existent iteration was "complete" since nothing was
# terminated prematurely.
self._callback(True)
# Nothing more left, because equivalent to empty
self.moreLeft = False
def __repr__(self):
"""
We all want to be nicely represented.
"""
return "<Deferator wrapping of\n <{}>,\nat 0x{}>".format(
self.representation, format(id(self), '012x'))
def __getattr__(self, name):
"""
Provides access to my done-iterating deferred's attributes as if
they were my own.
"""
if name == 'd':
raise AttributeError("No internal deferred is defined!")
return getattr(self.d, name)
def _callback(self, wasCompleteIteration):
if not self.d.called:
self.d.callback(wasCompleteIteration)
# Iterator implementation
#--------------------------------------------------------------------------
def __iter__(self):
"""
One of two methods needed for me to act like an iterator.
"""
return self
def __next__(self):
"""
One of two methods needed for me to act like an iterator. The
result (unless C{StopIteration} is raised) is a C{Deferred} to
the underlying iterator's next value, not the value itself.
Cool, huh? It took a B{lot} of work to figure this out. You
have to play nice, too, calling this method again only after
the C{Deferred} fires and calling L{stop} if you want to break
out of the iterations early.
"""
def gotNext(result):
value, isValid, self.moreLeft = result
return value
if self.moreLeft:
if hasattr(self, 'dIterate') and not self.dIterate.called:
raise errors.NotReadyError(
"You didn't wait for the last deferred to fire!")
f, args, kw = self.callTuple
self.dIterate = f(*args, **kw).addCallback(gotNext)
self.dIterate.stop = self.stop
return self.dIterate
if hasattr(self, 'dIterate'):
del self.dIterate
self._callback(True)
raise StopIteration
def stop(self):
"""
You must call this to cleanly break out of a loop of my iterations.
Not part of the official iterator implementation, but
necessary for a Twisted way of iterating. You need a way of
letting whatever is producing the iterations know that there
won't be any more of them.
For convenience, each C{Deferred} that I yield while iterating
has a reference to this method via its own C{stop} attribute.
"""
self._callback(False)
# Now that my Deferred's callback has been fired, there really
# is no more left--if my iterating function runs in a thread,
# it no longer need stay alive for me.
self.moreLeft = False
class Prefetcherator(object):
"""
I prefetch iterations from an iterator, providing a L{getNext}
method suitable for L{Deferator}.
You can supply an ID for me, purely to provide a more informative
representation and something you can retrieve via my I{ID}
attribute.
"""
__slots__ = ['ID', 'nextCallTuple', 'lastFetch']
def __init__(self, ID=None):
self.ID = ID
def __repr__(self):
"""
An informative representation. You may thank me for having this
during development.
"""
text = "<Prefetcherator instance '{}'".format(self.ID)
if self.isBusy():
text += "\n with nextCallTuple '{}'>".format(
repr(self.nextCallTuple))
else:
text += ">"
return text
def isBusy(self):
"""
@return: C{True} if I've been set up with a call to L{setup} and
am still running whatever iteration that involved.
"""
return hasattr(self, 'nextCallTuple')
def _tryNext(self):
"""
Returns a deferred that fires with the value from my
I{nextCallTuple} along with a C{bool} indicating if it's a
valid value.
Deletes the I{nextValue} reference after it returns with a
failure.
"""
def done(value):
return value, True
def oops(failureObj):
del self.nextCallTuple
return None, False
if not hasattr(self, 'nextCallTuple'):
return defer.succeed((None, False))
f, args, kw = self.nextCallTuple
return defer.maybeDeferred(f, *args, **kw).addCallbacks(done, oops)
def setup(self, *args, **kw):
"""
Sets me up with an attempt at an initial prefetch.
Set me up with a new iterator, or the callable for an
iterator-like-object, along with any args or keywords. Does a
first prefetch.
@return: A C{Deferred} that fires with C{True} if all goes
well or C{False} otherwise.
"""
def parseArgs():
if not args:
return False
if Deferator.isIterator(args[0]):
iterator = args[0]
if not hasattr(iterator, '__next__'):
iterator = iter(iterator)
if not hasattr(iterator, '__next__'):
raise AttributeError(
"Can't get a nextCallTuple from so-called "+\
"iterator '{}'".format(repr(args[0])))
self.nextCallTuple = (iterator.__next__, [], {})
return True
if callable(args[0]):
self.nextCallTuple = (args[0], args[1:], kw)
return True
return False
def done(result):
self.lastFetch = result
return result[1]
if self.isBusy() or not parseArgs():
return defer.succeed(False)
return self._tryNext().addCallback(done)
def getNext(self):
"""
Prefetch analog to C{__next__} on a regular iterator.
Gets the next value from my current iterator, or a deferred value
from my current nextCallTuple, returning it along with a Bool
indicating if this is a valid value and another one indicating
if more values are left.
Once a prefetch returns a bogus value, the result of this call
will remain (None, False, False), until a new iterator or
nextCallable is set.
Use this method as the callable (second constructor argument)
of L{Deferator}.
"""
def done(thisFetch):
nextIsValid = thisFetch[1]
if not nextIsValid:
if hasattr(self, 'lastFetch'):
del self.lastFetch
# This call's value is valid, but there's no more
return value, True, False
# This call's value is valid and there is more to come
result = value, True, True
self.lastFetch = thisFetch
return result
value, isValid = getattr(self, 'lastFetch', (None, False))
if not isValid:
# The last prefetch returned a bogus value, and obviously
# no more are left now.
return defer.succeed((None, False, False))
# The prefetch of this call's value was valid, so try a
# prefetch for a possible next call after this one.
return self._tryNext().addCallback(done)
@implementer(IConsumer)
class ListConsumer(object):
"""
Bare-bones iteration consumer.
I am a bare-bones iteration consumer that accumulates iterations
as list items, processing each item by running it through
L{processItem}, which you of course can override in your
subclass. It can return a C{Deferred}.
Call my instance to get a C{Deferred} that fires with the
underlying list when the producer unregisters.
Set any attributes you want me to have using keywords.
"""
def __init__(self, **kw):
self.x = {}
self.count = 0
self.dPending = []
self.dp = defer.Deferred()
for name in kw:
setattr(self, name, kw[name])
def __call__(self):
"""
Call to get a (deferred) list of what I consumed.
"""
def done(null):
return [self.x[key] for key in sorted(list(self.x))]
dList = [d for d in self.dPending if not d.called]
if hasattr(self, 'dp'):
# "Wait" for the producer to unregister and fire its
# deferred
dList.append(self.dp)
return defer.DeferredList(dList).addCallback(done)
def registerProducer(self, producer, streaming):
"""
C{IConsumer} implementation.
"""
if hasattr(self, 'producer'):
raise RuntimeError()
if not streaming:
raise TypeError("I only work with push producers")
# Create a deferred that will be fired when production is done
self.producer = producer
def unregisterProducer(self):
"""
C{IConsumer} implementation.
"""
if hasattr(self, 'producer'):
del self.producer
if hasattr(self, 'dp') and not self.dp.called:
self.dp.callback(None)
def write(self, data):
"""
Records data such that it will be returned in the order written,
even if L{processItem} takes a different amount of time for
each.
"""
def doneProcessing(result, k):
if hasattr(self, 'producer'):
self.producer.resumeProducing()
self.x[k] = result
self.dPending.remove(d)
self.count += 1
self.producer.pauseProducing()
d = defer.maybeDeferred(self.processItem, data).addCallback(
doneProcessing, self.count)
self.dPending.append(d)
def processItem(self, item):
"""
Process list items as they come in.
Override this to do special processing on each item as it arrives,
returning the (possibly deferred) value of the item that
should actually get appended to the list.
"""
return item
@implementer(IPushProducer)
class IterationProducer(object):
"""
Producer of iterations from a L{Deferator}.
I am a producer of iterations from a L{Deferator}. Get me running
with a call to L{run}, which returns a deferred that fires when
I'm done iterating or when the consumer has stopped me, whichever
comes first.
"""
def __init__(self, dr, consumer=None):
if not isinstance(dr, Deferator):
raise TypeError("Object {} is not a Deferator".format(repr(dr)))
self.dr = dr
self.delay = Delay()
if consumer is not None:
self.registerConsumer(consumer)
def deferUntilDone(self):
"""
Returns a deferred that fires (with a reference to my consumer)
when I am done producing iterations.
"""
d = defer.Deferred().addCallback(lambda _: self.consumer)
self.dr.chainDeferred(d)
return d
def registerConsumer(self, consumer):
"""
How could we push to a consumer without knowing what it is?
"""
if not IConsumer.providedBy(consumer):
raise errors.ImplementationError(
"Object {} isn't a consumer".format(repr(consumer)))
try:
consumer.registerProducer(self, True)
except RuntimeError:
# Ignore any exception raised from a consumer already
# having registered me.
pass
self.consumer = consumer
@defer.inlineCallbacks
def run(self):
"""
Produces my iterations, returning a C{Deferred} that fires (with a
reference to my consumer) when done.
"""
if not hasattr(self, 'consumer'):
raise AttributeError("Can't run without a consumer registered")
self.paused = False
self.running = True
for d in self.dr:
# Pause/stop opportunity after the last item write (if
# any) and before the deferred fires
if not self.running:
break
if self.paused:
yield self.delay.untilEvent(lambda: not self.paused)
item = yield d
# Another pause/stop opportunity before the item write
if not self.running:
break
if self.paused:
yield self.delay.untilEvent(lambda: not self.paused)
# Write the item and do the next iteration
self.consumer.write(item)
# Done with the iteration, and with producer/consumer
# interaction
self.consumer.unregisterProducer()
defer.returnValue(self.consumer)
def pauseProducing(self):
"""
C{IPushProducer} implementation.
"""
self.paused = True
def resumeProducing(self):
"""
C{IPushProducer} implementation.
"""
self.paused = False
def stopProducing(self):
"""
C{IPushProducer} implementation.
"""
self.running = False
self.dr.stop()
@defer.inlineCallbacks
def iteratorToProducer(iterator, consumer=None, wrapper=None):
"""
Makes an iterator into an L{IterationProducer}.
Converts a possibly slow-running iterator into a Twisted-friendly
producer, returning a deferred that fires with the producer when
it's ready. If the the supplied object is not a suitable iterator
(perhaps empty), the result will be C{None}.
If a consumer is not supplied, whatever consumer gets this must
register with the producer by calling its non-interface method
L{IterationProducer.registerConsumer} and then its
L{IterationProducer.run} method to start the iteration/production.
If you supply a consumer, those two steps will be done
automatically, and this method will fire with a C{Deferred} that
fires when the iteration/production is done.
"""
result = None
if Deferator.isIterator(iterator):
pf = Prefetcherator()
ok = yield pf.setup(iterator)
if ok:
if wrapper:
if callable(wrapper):
args = (wrapper, pf.getNext)
else:
result = Failure(TypeError(
"Wrapper '{}' is not a callable".format(
repr(wrapper))))
else:
args = (pf.getNext,)
dr = Deferator(repr(iterator), *args)
result = IterationProducer(dr, consumer)
if consumer:
yield result.run()
defer.returnValue(result)