I allow you to track and wait for Twisted Deferred objects without actually having received a reference to them, or interfering with their callback chains.

Uses instances of iteration.Delay to check a count of pending deferreds that have been tracked with calls to put or addWait.

Method __init__ Undocumented
Method addWait Adds a wait condition for me to track that gets removed when you call removeWait.
Method removeWait Removes a wait condition added by addWait. Don't call this unless you've called addWait!
Method quitWaiting Call this to have me quit waiting for any pending or future Deferreds. Only use this if you really don't need me to wait on anything ever again.
Method put Put another Deferred in the tracker.
Method notWaiting Returns True if I'm not waiting for any Deferreds to fire.
Method deferToAll Returns a Deferred that tracks all active deferreds that haven't yet fired.
Method deferToAny Returns a Deferred that fires with True when any of the active deferreds fire.
Method deferUntilFewer Returns a Deferred that fires with True when there are fewer than N tracked deferreds pending.
def __init__(self, interval=None, backoff=None):
Undocumented
def addWait(self):

Adds a wait condition for me to track that gets removed when you call removeWait.

Calling this multiple times before release will add nested wait conditions. Make sure you do a call to removeWait for each addWait call!

def removeWait(self):

Removes a wait condition added by addWait. Don't call this unless you've called addWait!

Caution: Calling this will cause any pending deferToAny calls to fire. It will also reduce the number of pending deferreds that any calls to deferUntilFewer are waiting on.

def quitWaiting(self):

Call this to have me quit waiting for any pending or future Deferreds. Only use this if you really don't need me to wait on anything ever again.

Useful for quickly clearing everybody off the stage (and keeping them there!) during shutdown.

def put(self, d):

Put another Deferred in the tracker.

def notWaiting(self):

Returns True if I'm not waiting for any Deferreds to fire.

def deferToAll(self, timeout=None):

Returns a Deferred that tracks all active deferreds that haven't yet fired.

When all the tracked deferreds fire, the returned deferred fires, too, with True. The tracked deferreds do not get bogged down by the callback chain for the Deferred returned by this method.

If the tracked deferreds never fire and a specified timeout expires, the returned deferred will fire with False. Calling quitWaiting will make it fire almost immediately.

def deferToAny(self, timeout=None):

Returns a Deferred that fires with True when any of the active deferreds fire.

The tracked deferreds do not get bogged down by the callback chain for the returned one. If some of them never fire and a timeout specified in seconds expires, the returned deferred will fire with False. Calling quitWaiting will make it fire almost immediately, with True.

Caution: Don't add any deferreds with calls to put while waiting for this method to finish, because it will mess up the count. You will have to wait for one more deferred to fire for each new one you add before the returned deferred fires.

def deferUntilFewer(self, N, timeout=None):

Returns a Deferred that fires with True when there are fewer than N tracked deferreds pending.

The tracked deferreds do not get bogged down by the callback chain for the returned one. If not enough of the tracked deferreds ever fire and a timeout specified in seconds expires, the returned deferred will fire with False. Calling quitWaiting will make it fire almost immediately, with True.

You can add deferreds with calls to put while waiting for this method to finish. That will just add to the number of tracked deferreds pending.

See the source for the ade package's Population object to see a simple example of this in action. Here's the pertinent code:

   @defer.inlineCallbacks
   def populate():
       k = 0
       while running():
           i = getIndividual()
           if blank:
               i.SSE = np.inf
               addIndividual(i)
               continue
           k += 1
           d = i.evaluate()
           d.addCallback(evaluated, d)
           d.addErrback(oops)
           dt.put(d)
           yield dt.deferUntilFewer(self.N_maxParallel)
           if k >= self.Np:
               break
       yield dt.deferToAll()

The Deferred that gets yielded while the loop is running fires immediately if there are fewer evaluations going on at once than the limit. Otherwise, it fires when enough evaluations finish to put things below the limit.

API Documentation for AsynQueue, generated by pydoctor at 2022-11-17 13:13:24.