AsynQueue : asynqueue.threads.py

# AsynQueue:
# Asynchronous task queueing based on the Twisted framework, with task
# prioritization and a powerful worker interface.
#
# Copyright (C) 2006-2007, 2015, 2019 by Edwin A. Suominen,
# http://edsuom.com/AsynQueue
#
# See edsuom.com for API documentation as well as information about
# Ed's background and other projects, software and otherwise.
# 
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the
# License. You may obtain a copy of the License at
# 
#   http://www.apache.org/licenses/LICENSE-2.0
# 
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an "AS
# IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
# express or implied. See the License for the specific language
# governing permissions and limitations under the License.

"""
L{ThreadQueue}, L{ThreadWorker} and their support staff. Also, a
cool implementation of the oft-desired C{deferToThread}, in
L{ThreadQueue.deferToThread}.
"""

import threading

from zope.interface import implementer
from twisted.internet import defer, reactor
from twisted.python import threadable, threadpool
from twisted.python.failure import Failure
from twisted.internet.interfaces import IConsumer, IPushProducer
threadable.init()

from asynqueue.base import TaskQueue
from asynqueue.interfaces import IWorker
from asynqueue import errors, util, iteration


_DTL = [None]
def deferToThread(*fargs, **kw):
    """
    Module-level function that lets you call a function in a dedicated
    thread and get a C{Deferred} to its result, with no fuss on your
    part.

    The thread will remain alive and will be used for further calls to
    this function and this function only. 

    Call with I{f}, I{*args}, and I{**kw} as usual.

    This is AsynQueue's single-threaded, queued, I{doNext}-able,
    L{iteration.Deferator}-able answer to Twisted's C{deferToThread}.

    If you expect a deferred iterator as your result (an instance of
    L{iteration.Deferator}), supply an C{IConsumer} implementor via
    the I{consumer} keyword. Each iteration will be written to it, and
    the deferred will fire when the iterations are done. Otherwise,
    the deferred will fire with an L{iteration.Deferator}.
    
    If you want to kill the dedicated thread, just call this function
    with no arguments, not even a callable object I{f}. A C{Deferred}
    will be returned that fires when the thread is gone.
    """
    if not fargs:
        tl = _DTL[0]
        if tl is None:
            return defer.succeed(None)
        return tl.stop()
    if _DTL[0] is None:
        tl = _DTL[0] = ThreadLooper()
        reactor.addSystemEventTrigger('before', 'shutdown', tl.stop)
    return _DTL[0].deferToThread(*fargs, **kw)
    

class ThreadQueue(TaskQueue):
    """
    I am a L{TaskQueue} for dispatching arbitrary callables to be run
    by a single worker thread.

    Having one and just one worker thread is surprisingly useful. It
    lets you do synchronous processing without blocking Twisted's
    event loop, yet assures you that objects processed during one
    queued task will not be disturbed until completion of the Deferred
    callback chain from that task.
    """
    def __init__(self, **kw):
        """C{ThreadQueue}(**kw)"""
        raw = kw.pop('raw', False)
        TaskQueue.__init__(self, **kw)
        self.worker = ThreadWorker(raw=raw)
        self.d = self.attachWorker(self.worker)

    def deferToThread(self, f, *args, **kw):
        """
        Runs the f-args-kw call in my dedicated worker thread, skipping
        past the queue. As with a regular L{TaskQueue.call}, returns a
        C{Deferred} that fires with the result and deals with
        iterators.
        """
        return util.callAfterDeferred(
            self, 'd', self.worker.t.deferToThread, f, *args, **kw)


@implementer(IWorker)
class ThreadWorker(object):
    """
    I implement an L{IWorker} that runs tasks in a dedicated worker
    thread.

    @cvar cQualified: A task series that all instances of me are
      qualified to perform.

    @ivar iQualified: A task series that this instance of me is
      qualified to perform. Usually left blank, unless you want only
      some workers doing certain tasks.

    @ivar tasks: A list of pending tasks.

    @ivar t: An instance of L{ThreadLooper}.
    
    @keyword series: A list of one or more task series that this
        particular instance of me is qualified to handle.

    @keyword raw: Set C{True} if you want raw iterators to be returned
        instead of L{iteration.Deferator} instances. You can override
        this in with the same keyword set C{False} in a call.
    """
    cQualified = ['thread', 'local']

    def __init__(self, series=[], raw=False):
        """C{ThreadWorker}(series=[], raw=False)"""
        self.tasks = []
        # Is this really necessary?
        # -----------------------------------------
        self.tasksPendingBeforeShutdown = {}
        # -----------------------------------------
        self.iQualified = series
        self.t = ThreadLooper(raw)

    def setResignator(self, callableObject):
        self.t.dLock.addStopper(callableObject)

    def run(self, task):
        """
        Returns a C{Deferred} that fires only after the threaded call is
        done for the supplied I{task}.

        I do basic FIFO queuing of calls to this method, but priority
        queuing is above my paygrade and you'd best honor my deferred
        and let someone like L{tasks.TaskHandler} only call this
        method when I say I'm ready.

        One simple thing I B{will} do is apply the I{doNext} keyword
        to any task with the highest priority, -20 or lower (for a
        L{base.TaskQueue.call} with its own I{doNext} keyword set). If
        you call this method one task at a time like you're supposed
        to, even that won't make a difference, except that it will cut
        in front of any existing call with I{doNext} set. So use
        judiciously.
        """
        def done(statusResult):
            if task in self.tasks:
                self.tasks.remove(task)
            if statusResult[0] == b'i':
                # What we got is a Deferator, but if a consumer was
                # supplied, we need to couple an IterationProducer to
                # it and fire the task callback with the deferred from
                # running the producer.
                if consumer:
                    dr = statusResult[1]
                    ip = iteration.IterationProducer(dr, consumer)
                    statusResult = (b'i', ip)
            task.d.callback(statusResult)
            if task in self.tasksPendingBeforeShutdown:
                self.tasksPendingBeforeShutdown.pop(task).callback(None)
        
        self.tasks.append(task)
        f, args, kw = task.callTuple
        consumer = kw.pop('consumer', None)
        if task.priority <= -20:
            kw['doNext'] = True
        return self.t.call(f, *args, **kw).addCallback(done)

    def stop(self):
        """
        Returns a C{Deferred} that fires when all pending tasks have been
        run, the task loop has ended and its thread has terminated.

        Waits for all pending tasks to finish because they run in the
        task loop and can't do so once the task loop has ended. So the
        list of outstanding tasks that is the deferred result should
        always be empty.
        """
        def tasksDone(null):
            return self.t.stop()
        
        dList = []
        for task in self.tasks:
            d = defer.Deferred()
            self.tasksPendingBeforeShutdown[task] = d
            dList.append(d)
        return defer.DeferredList(dList).addCallback(tasksDone)

    def crash(self):
        """
        Since a thread can only terminate itself, calling this method only
        forces firing of the deferred returned from a previous call to
        L{stop} and returns the task that hung the thread.
        """
        self.t.stop()
        return self.tasks


class ThreadLooper(object):
    """
    I run function calls in a dedicated thread.

    Each call returns a C{Deferred} to its eventual result, which is a
    2-tuple containing the status of the last call and its result
    according to the format of L{util.CallRunner}.

    If the result is an iterable other than one of Python's built-in
    ones, the C{Deferred} fires with an instance of
    L{iteration.Prefetcherator} instead. Couple it to your own
    deferator to iterate over the underlying iterable running in my
    thread. You can disable this behavior by setting C{raw=True} in
    the constructor, or enable/disable it on an individual call by
    setting raw=True/False.

    @ivar timeout: The wait timeout, which defaults to 60 (one
      minute). This is just how long the thread loop waits before
      checking for a pending deferred and firing it with a timeout
      error. Otherwise, it simply waits another minute, and it can do
      that forever with no problem.

    @keyword raw: Set C{True} to have calls return (deferred)
        iterators rather than instances of L{Prefetcherator}.
    """
    timeout = 60
    
    def __init__(self, raw=False):
        """C{ThreadLooper}(raw=False)"""
        # Just a simple attribute to indicate if the thread loop is
        # running, mostly for unit testing
        self.threadRunning = True
        # Tools
        self.runner = util.CallRunner(raw)
        self.dLock = util.DeferredLock()
        self.event = threading.Event()
        self.thread = threading.Thread(name=repr(self), target=self.loop)
        self.thread.start()
        # Deferred Tracker to wait for any running Deferators before
        # shutdown
        self.dt = util.DeferredTracker()
        
    def loop(self):
        """
        Runs a loop in a dedicated thread that waits for new tasks. The loop
        exits when a C{None} object is supplied as a task.
        """
        def callback(status, result):
            reactor.callFromThread(self.d.callback, (status, result))
        
        self.threadRunning = True
        while True:
            # Wait here for my main-thread caller to release me for
            # another call
            self.event.wait(self.timeout)
            # For Python 2.7 and above, we could have just done
            # if not self.event.wait(...):
            if not self.event.isSet():
                # Timed out waiting for the next call. If there indeed
                # was one, we need to let the caller know. That
                # shouldn't ever happen, though.
                if hasattr(self, 'd') and not self.d.called:
                    callback('e', "Thread timed out waiting for this call!")
                continue
            if self.callTuple is None:
                # Shutdown was requested
                break
            status, result = self.runner(self.callTuple)
            # We are about to call back the shared deferred, so clear
            # the event to force me to wait for the next call at the
            # top of the loop. The main thread will not set the event
            # again until the callback is done, so this is safe.
            self.event.clear()
            # OK, now call the shared deferred
            callback(status, result)
        # Broken out of loop, the thread now dies
        self.threadRunning = False

    @defer.inlineCallbacks
    def call(self, f, *args, **kw):
        """
        Runs the supplied callable function with any args and keywords in
        a dedicated thread, returning a C{Deferred} that fires with a
        status/result tuple.

        Calls are done in the order received, unless you set
        C{doNext=True}.

        Set C{raw=True} to have a raw iterator returned instead of a
        Deferator, or C{raw=False} to have a L{Deferator} returned
        instead of a raw iterator, contrary to the instance-wide
        default set with the constructor keyword 'raw'.
        """
        yield self.dLock.acquire(kw.pop('doNext', False))
        self.callTuple = f, args, kw
        self.d = defer.Deferred()
        # The callTuple is set for this call along with the deferred
        # to be called back with its result, so release the thread to
        # work on it, firing this deferred's callback with its result.
        self.event.set()
        statusResult = yield self.d
        # The deferred lock is released after the call is done so
        # that another call can proceed. This is NOT the same as
        # the event used as a threading lock. It keeps the main
        # thread from setting that event before the thread loop is
        # ready for that.
        self.dLock.release()
        status, result = statusResult
        if status == b'i':
            ID = str(hash(result))
            pf = iteration.Prefetcherator(ID)
            ok = yield pf.setup(result)

            if ok:
                # OK, we can iterate this
                result = iteration.Deferator(
                    repr(pf), self.deferToThread, pf.getNext)
                # Make sure Deferator is done before shutting down
                self.dt.put(result.d)
            else:
                # An iterator, but not one we could prefetch
                # from. Probably empty.
                result = []
        # Not an iterator, at least not one being specially
        # processed; we already have our result
        defer.returnValue((status, result))

    def dr2ip(self, dr, consumer=None):
        """
        Converts a L{Deferator} into an L{iteration.IterationProducer},
        with a consumer registered if you supply one.

        Then each iteration will be written to your consumer, and the
        deferred returned will fire when the iterations are
        done. Otherwise, the deferred will fire with an
        L{iteration.IterationProducer} and you will have to register
        with and run it yourself.
        """
        ip = iteration.IterationProducer(dr)
        if consumer:
            ip.registerConsumer(consumer)
            return ip.run()
        return ip
        
    def deferToThread(self, f, *args, **kw):
        """
        My single-threaded, queued, doNext-able, Deferator-able answer to
        Twisted's deferToThread.

        If you expect a deferred iterator as your result (an instance
        of L{iteration.Deferator}), supply an C{IConsumer} implementor
        via the I{consumer} keyword. Each iteration will be written to
        it, and the deferred will fire when the iterations are
        done. Otherwise, unless the I{raw} option has been set C{True}
        in my constructor or as a keyword to this call, the deferred
        will fire with an L{iteration.Deferator}.
        """
        def done(statusResult):
            status, result = statusResult
            if status == b'e':
                return Failure(errors.ThreadError(result))
            elif status == b'i':
                if consumer:
                    ip = iteration.IterationProducer(dr, consumer)
                    return ip.run()
                return result
            return result

        consumer = kw.pop('consumer', None)
        return self.call(f, *args, **kw).addCallback(done)

    def stop(self):
        """
        Returns a C{Deferred} that fires when all tasks and instances of
        L{Deferator} are done, the task loop has ended, and its thread
        has terminated.
        """
        def deferatorsDone(null):
            if self.threadRunning:
                # Tell the thread to quit with a null task
                self.callTuple = None
                self.event.set()
                # Now stop the lock
                self.dLock.addStopper(self.thread.join)
                return self.dLock.stop()
        return self.dt.deferToAll().addCallback(deferatorsDone)


class PoolUser(object):
    """
    Abstract base class for objects that access a global thread pool
    instead of starting their own threads.
    """
    minThreads = 2
    maxThreads = 10
    
    @classmethod
    def setup(cls, maxThreads=None):
        """
        Sets up stuff class-wide, with all the potential pitfalls that
        entails.
        
        Sets up all present and future instances of L{Consumerator},
        L{Filerator}, L{OrderedItemProducer}, and any other subclasses
        of me with a thread pool having at least two and no more than
        I{maxThreads} threads.

        If this method is called with a thread pool already
        instantiated, and I{maxThreads} is specified and different
        from the current value, it adjusts the maximum thread pool
        size for B{all} instances, current and future, absent yet
        another call with still different values.

        @note: In the several years that have gone by since I wrote
            this, I realized that it's almost always a terrible idea
            to make class-wide settings of anything. But it's mature
            code and hasn't given me any trouble.

        @keyword maxThreads: Set to a maximum number of threads to
            use, for all present and future instances.
        """
        if maxThreads:
            if hasattr(cls, '_pool') and maxThreads != cls.maxThreads:
                cls._pool.adjustPoolSize(
                    minThreads=cls.minThreads,
                    maxThreads=maxThreads)
            cls.maxThreads = maxThreads
        if not hasattr(cls, '_pool'):
            cls._pool = threadpool.ThreadPool(
                minthreads=cls.minThreads, maxthreads=cls.maxThreads,
                name="AsynQueue.IterationGetter")
            reactor.addSystemEventTrigger('before', 'shutdown', cls.shutdown)
        if not hasattr(cls, 'dtp'):
            cls.dtp = util.DeferredTracker()
    
    @classmethod
    def shutdown(cls):
        """
        Shuts down all threads, returning a C{Deferred} that fires when
        everything's done, class-wide.
        """
        def ready(null):
            if hasattr(cls, '_pool'):
                cls._pool.stop()
                del cls._pool
        return cls.dtp.deferToAll().addCallback(ready)

    @classmethod
    def deferToThreadInPool(cls, f, *args, **kw):
        """
        Runs the C{f-args-kw} call combo in one of my threads, returning a
        C{Deferred} that fires with the eventual result. Can be run
        from the class or any instance of me with the exact same result.
        """
        def done(success, result):
            f = d.callback if success else d.errback
            reactor.callFromThread(f, result)
        d = defer.Deferred()
        cls.dtp.put(d)
        if not cls._pool.started:
            cls._pool.start()
        cls._pool.callInThreadWithCallback(done, f, *args, **kw)
        return d

    @property
    def pool(self):
        """
        Returns a reference to the class-wide threadpool, starting it if
        this is the first time it's been used.
        """
        if not self._pool.started:
            self._pool.start()
        return self._pool


class IterationGetter(PoolUser):
    """
    Abstract base class for objects that munch data on one end and act
    like iterators to yield it on the other end.

    @see: L{Consumerator} and L{Filerator}.
    """
    class IterationStopper:
        pass

    def __init__(self, maxThreads=None):
        """C{IterationGetter(maxThreads=None)}"""
        self.setup(maxThreads)
        self.runState = 'init'
        self.dList = []

    def start(self):
        """
        Call this when I should start listening for iterations.

        Sets up locks for my iteration-consuming thread (from my
        C{ThreadPool}), the blocking-iterator thread, and the
        next-iteration event. Lock both of my iteration-processing
        loops until an iteration is received, but leaves the
        next-iteration lock I{nLock} unlocked. The iteration-consuming
        thread will lock it to overwrite the blocking-iterator
        thread's value of each iteration.

        Calls my subclass's L{loop} method in its own thread. If too
        many threads are currently open, queues the loop call until
        one finishes up in some other instance of me.
        """
        def done(success, result):
            f = d.callback if success else d.errback
            reactor.callFromThread(f, result)

        self.runState = 'started'
        d = defer.Deferred()
        self.dtp.put(d)
        self.dList.append(d)
        # Locks for my iteration-consuming thread, the
        # blocking-iterator thread, and the next-iteration event
        self.cLock = threading.Semaphore()
        self.bLock = threading.Lock()
        self.nLock = threading.Lock()
        # Lock both of my iteration-processing loops until an
        # iteration is received
        self.cLock.acquire()
        self.bLock.acquire()
        # Call my subclass's loop function in a pool thread
        self.pool.callInThreadWithCallback(done, self.loop)

    def loop(self):
        """
        Override this method to generate a value for my iterations. The
        method must be synchronized with a series of locking
        primitives:

            1. First, wait to acquire I{cLock}. This will be released
               when a value has been given to my subclass instance,
               e.g., through its C{write} method. At this point, you
               can retrieve the value and let your value provider know
               it is free to provide more.
    
            2. Then wait to acquire I{nLock}. This will be released
               when the L{next} method has obtained its next iteration
               value from I{bIterationValue}. At this point, overwrite
               I{bIterationValue} with the new value you've just
               obtained, or with an instance of L{IterationStopper} if
               iteration is done.
            
            3. Release I{bLock} to let the L{next} loop know it can
               process the next iteration value (or iteration stopper)
               now in I{bIterationValue}.
        
        @see: L{Consumerator.loop} and L{Filerator.loop}
        """
        raise NotImplementedError("You must override this in a subclass")

    def deferUntilDone(self):
        """
        Returns a C{Deferred} that fires when I am done iterating.
        """
        return defer.DeferredList(self.dList)
        
    # Iterator implementation -------------------------------------------------
    # Call in its own thread

    def __iter__(self):
        return self

    def __next__(self):
        # Wait for the next iteration to be produced
        self.bLock.acquire()
        # Get a local reference to the iteration value
        value = self.bIterationValue
        # Now it can be changed, so release my iteration-consuming
        # loop to do so
        self.nLock.release()
        if isinstance(value, self.IterationStopper):
            # We are done iterating. The blocking caller will
            # immediately exit its loop.
            raise StopIteration
        # This is a legit iteration value, return it. Since this
        # method runs in the blocking-iterator thread, it won't
        # get called again until the caller is ready for another
        # iteration.
        return value


@implementer(IConsumer)
class Consumerator(IterationGetter):
    """
    I act like an C{IConsumer} for your Twisted code and an iterator
    for your blocking code running via a L{ThreadWorker}.

    This is handy when you are using a conventional library that
    relies on an iterator as its input::

      def render(request):
          w = png.Writer()
          c = asynqueue.Consumerator()
          c.deferUntilDone().addCallback(lambda _: request.finish())
          p = self.producePixelRows(c)
          w.write(request, c)
          return server.NOT_DONE_YET

    I work with either an I{IPushProducer} or an I{IPullProducer}. You
    can construct me with an instance of the former and I'll get
    started right away. Otherwise, call my L{registerProducer} method
    with the producer and whether it is streaming (push) or not.

    @ivar runState: 'init', 'started', 'running', 'stopping', 'stopped'

    @ivar d: A C{Deferred} that fires when iterations are done.

    @keyword producer: The producer for my instance to register, if
        you want to supply an C{IPushProducer} one on
        instantiation. Otherwise, use L{registerProducer}.
    """
    def __init__(self, producer=None, maxThreads=None):
        """C{Consumerator(producer=None, maxThreads=None)}"""
        super(Consumerator, self).__init__(maxThreads)
        self.dLock = util.DeferredLock()
        if producer:
            self.registerProducer(producer, True)

    def loop(self):
        """
        Runs a loop in a dedicated thread that waits for new iterations to
        be produced.

        When I get an instance of L{IterationGetter.IterationStopper},
        the loop exits. I then call my "all done" C{Deferred} and
        delete my reference to the producer.
        """
        self.runState = 'running'
        while True:
            # Wait for an iteration
            self.cLock.acquire()
            # Get a copy of the value
            value = self.cIterationValue
            # Release the consumer interface to write another
            # iteration
            reactor.callFromThread(self.dLock.release)
            # Wait until it's safe to overwrite the blocking-iterator
            # loop's copy
            self.nLock.acquire()
            # Now do so and release it to work on the new copy
            self.bIterationValue = value
            self.bLock.release()
            if isinstance(value, self.IterationStopper):
                # This was the post-iteration signal; this loop is now
                # done.
                break
        # Wait until we know the iteration stopper was noticed and the
        # blocking iterations stopped.
        self.runState = 'stopping'
        self.nLock.acquire()
        self.runState = 'stopped'
        reactor.callFromThread(delattr, self, 'producer')
        
    def stop(self):
        """
        Good manners urge you to call this to cleanly break out of a loop
        of my iterations so that my producer doesn't keep working for
        nothing.

        Calling this method at the Twisted main-loop level is also a
        fine way to quit producing and iterating when you know you're
        done.

        Not part of the official iterator implementation, but
        useful for a Twisted way of iterating. You need a way of
        letting whatever is producing the iterations know that there
        won't be any more of them.
        """
        if hasattr(self, 'producer'):
            self.producer.stopProducing()
        return self.unregisterProducer()
        
    # --- IConsumer implementation --------------------------------------------

    def write(self, data):
        """
        The producer calls this with a chunk of I{data}. It goes through
        two stages to emerge from my blocking end as an iteration, via
        L{next}.
        """
        def handleData(null, x):
            self.cIterationValue = x
            if self.runState in ('started', 'running'):
                # Release my iteration-consuming loop to work on the next
                # iteration value
                self.cLock.release()
                # The producer can and should write another iteration now
                self.producer.resumeProducing()

        if self.streaming and self.runState in ('started', 'running'):
            # The producer is a IPushProducer, so tell it to hold off
            # on any more iteration values for the moment while
            # everything it's sent (and may yet send) gets processed
            self.producer.pauseProducing()
        # Handle the data in the order received
        return self.dLock.acquire().addCallback(handleData, data)
    
    def registerProducer(self, producer, streaming):
        """
        C{IConsumer} implementation
        """
        if hasattr(self, 'producer'):
            raise RuntimeError()
        self.producer = producer
        self.streaming = streaming
        if not streaming:
            producer.resumeProducing()
        self.start()

    def unregisterProducer(self):
        """
        C{IConsumer} implementation
        """
        if not hasattr(self, 'producer'):
            return defer.succeed(None)
        return self.write(self.IterationStopper())
        

class Filerator(IterationGetter):
    """
    Stream data to me in one end and I will iterate it out the other.
    
    Acts like a file handle for writing in one thread (even the main
    one under the Twisted event loop) and an iterator in another
    thread. Hook me up to an L{iteration.Deferator} to stream data
    over a worker interface.

    You must call my L{close} method to stop me from iterating.
    """
    def __init__(self, maxThreads=None):
        """C{Filerator}(maxThreads=None)"""
        super(Filerator, self).__init__(maxThreads)
        self.itemBuffer = []
        self.start()

    @property
    def closed(self):
        return self.runState == 'stopped'
        
    def loop(self):
        """
        Runs a loop in a dedicated thread that waits for new iterations to
        be written. When I get an instance of
        L{IterationGetter.IterationStopper}, the loop exits.
        """
        self.runState = 'running'
        while True:
            # Wait for an iteration
            self.cLock.acquire()
            # Get the oldest value in the FIFO buffer
            value = self.itemBuffer.pop(0)
            # Wait until it's safe to overwrite the blocking-iterator
            # loop's copy
            self.nLock.acquire()
            # Now do so and release it to work on the new copy
            self.bIterationValue = value
            self.bLock.release()
            if isinstance(value, self.IterationStopper):
                # This was the post-iteration signal; this loop is now
                # done.
                break
        # Wait until we know the iteration stopper was noticed and the
        # blocking iterations stopped.
        self.runState = 'stopping'
        self.nLock.acquire()
        self.runState = 'stopped'
    
    def write(self, data):
        """
        This is called with a chunk of I{data}. It goes through two stages
        to emerge from my blocking end as an iteration, via L{next}.
        """
        if self.closed:
            raise ValueError("Closed, not accepting writes")
        self.itemBuffer.append(data)
        if self.runState == 'running':
            # Release my iteration-consuming loop to work on the next
            # iteration value. The cLock object is actually a
            # semaphore, so it's OK if this gets called multiple times
            # before the other loop can acquire it again.
            self.cLock.release()
        
    def writelines(self, lines):
        """
        Adds a list full of data chunks to my buffer.
        """
        for line in lines:
            self.write(line)
    
    def flush(self):
        """
        Doesn't do anything, because I am always trying to flush my buffer
        by iterating its contents.
        """
        
    def close(self):
        """
        Closing me as a "file" tells me that I can stop iterating once the
        buffer is flushed.
        """
        if not self.closed:
            self.write(self.IterationStopper())


@implementer(IPushProducer)
class OrderedItemProducer(PoolUser):
    """
    Produces blocking iterations in the order they are requested via
    an asynchronous function call.
    
    I am an implementor of Twisted's C{IPushProducer} interface that
    produces an iteration to a blocking call I{fb} for every time you
    call a non-blocking item-generating function I{fb} via my
    L{produceItem} method. Significantly, the items are buffered as
    needed so that the iterations appear in the order of the calls to
    L{produceItem} that generated them, B{not} necessarily in the
    order in which they are actually generated.

    Start things off by constructing an instance of me, with an
    existing task queue if you have one you want me to use, and then
    running L{start} with your blocking f-args-kw combination. Then
    call L{produceItem} repeatedly with whatever f-args-kw combination
    results (eventually) in new items to iterate. These calls may
    return deferred results and should not block.

    When you are done having me produce iterations, call
    L{stop}. Whatever loop the blocking-iterator call is in will then
    terminate and function I{fb} should end.
    
    @ivar i: My L{Consumerator} instance, which acts like an iterator
      for whatever function you supply to L{start}.

    @ivar q: The L{TaskQueue} instance I use, either supplied by you
      during construction or instantiated by me. Either way, you will
      have to call L{TaskQueue.shutdown} on this eventually when
      you're done with the queue.
    """
    def __init__(self, maxThreads=None):
        self.setup(maxThreads)
        self.itemBuffer = {}
        self.k1, self.k2 = 0, 0
        self.seriesID = hash(self)
        self.i = Consumerator(self)
        self.dLock = defer.DeferredLock()
        self.dt = util.DeferredTracker()
        self.dLock.acquire()
        self.produce = True

    def start(self, fb, *args, **kw):
        """
        Starts the blocking function call C{fb(i, *args, **kw)} that
        relies on my L{Consumerator} instance I{i} for iterations, in
        traditional blocking fashion. The function must accept C{i} as
        its first argument, and can also accept further arguments
        C{*args} and keywords C{**kw}, which you can specify in your
        call to L{start}.

        @return: A C{Deferred} that fires when the blocking call has
          started in its own thread. Shouldn't take long at all,
          unless the pool is fully occupied and we need to wait for a
          thread to get freed up.
        """
        def started():
            self.dLock.release()
            dStarted.callback(None)
        def runner():
            reactor.callFromThread(started)
            # The actual blocking call
            return fb(self.i, *args, **kw)
        def finished(success, result):
            if not self.dFinished.called:
                reactor.callFromThread(
                    self.dFinished.callback, (success, result))
            self.stopProducing()
        dStarted = defer.Deferred()
        self.dFinished = defer.Deferred()
        self.dtp.put(self.dFinished)
        self.pool.callInThreadWithCallback(finished, runner)
        return dStarted
    
    def produceItem(self, fp, *args, **kw):
        """
        Runs C{fp(*args, **kw)} to generate an item that I produce as an
        iteration to whatever blocking call was (or will be) set
        running via L{start}.

        While I am running, the returned C{Deferred} fires after the
        call to I{fp} with the item value produced by the call to
        I{f}. You don't need to do anything with these deferreds if
        you don't want to use them for concurrency limiting; they are
        accounted for in L{stop}.

        If an exception is raised during the call, the I{dFinished}
        callback is called with a corresponding C{Failure} object and
        iterations will be stopped. This makes more sense than firing
        an errback of the C{Deferred} returning from this
        C{produceItem} method, because it's the end result of calling
        L{stop} to indicate that iterations are done. *That* is when
        the user should expect a status of the overall item-producing
        operation.

        If my L{stopProducing} method has been called, I no longer
        produce iterations and calls to this method do not run
        I{fp}. The returned C{Deferred} fires immediately with C{None}.
        """
        def gotItem(item):
            # We have a result, but we need to wait our turn to
            # actually produce it
            return self.dLock.acquire().addCallback(gotLock, item)
        def oops(failureObj):
            self.stop(failureObj)
        def gotLock(lock, item):
            if self.k2 == k1 and self.produce:
                self._writeItem(item)
            elif self.produce is not None:
                self.itemBuffer[k1] = item
                self._flushBuffer()
            self.dLock.release()
            return item
        if self.produce is None:
            return defer.succeed(None)
        k1 = self.k1
        self.k1 += 1
        d = defer.maybeDeferred(fp, *args, **kw)
        d.addCallback(gotItem)
        d.addErrback(oops)
        self.dt.put(d)
        return d

    @defer.inlineCallbacks
    def stop(self, failureObj=None):
        """
        Call this to indicate that iterations are done. After any pending
        calls from L{produceItem} are done, my L{Consumerator} will
        raise L{StopIteration} for the blocking iteration-caller in
        I{fb} and that function should exit. Whatever value it returns
        will fire the C{Deferred} that is returned here.

        This method's C{Deferred} may have fired already, if I{fb}
        exited early for some reason, or with a C{Failure} object that
        may have been generated either by the iteration caller or by a
        call to the I{fp} function of L{produceItem}.

        Repeated calls to this method make no sense and will be
        rewarded with deferreds immediately firing with C{None}.
        """
        yield self.dt.deferToAll()
        yield self.dLock.acquire()
        yield self.i.stop()
        if hasattr(self, 'dFinished'):
            success, result = yield self.dFinished
            del self.dFinished
        else:
            result = None
        if failureObj:
            result = failureObj
        self.dLock.release()
        defer.returnValue(result)
        
    def _writeItem(self, item):
        self.i.write(item)
        self.k2 += 1
        self._flushBuffer()

    def _flushBuffer(self):
        if self.k2 in self.itemBuffer:
            item = self.itemBuffer.pop(self.k2)
            # This will result in another call to resumeProducing
            self._writeItem(item)
        
    def stopProducing(self):
        self.produce = None

    def pauseProducing(self):
        self.produce = False

    def resumeProducing(self):
        self.produce = True