AsynQueue : asynqueue.workers.py

# AsynQueue:
# Asynchronous task queueing based on the Twisted framework, with task
# prioritization and a powerful worker interface.
#
# Copyright (C) 2006-2007, 2015 by Edwin A. Suominen,
# http://edsuom.com/AsynQueue
#
# See edsuom.com for API documentation as well as information about
# Ed's background and other projects, software and otherwise.
# 
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the
# License. You may obtain a copy of the License at
# 
#   http://www.apache.org/licenses/LICENSE-2.0
# 
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an "AS
# IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
# express or implied. See the License for the specific language
# governing permissions and limitations under the License.

"""
Implementors of the L{interfaces.IWorker} interface. These objects
are what handle the tasks in your L{base.TaskQueue}.
"""
from __future__ import absolute_import
import sys, os, os.path, tempfile, shutil

from zope.interface import implementer
from twisted.internet import defer

from asynqueue import errors, info, util, iteration
from asynqueue.interfaces import IWorker


# Make all our workers (except WireWorker, because of asynqueue.wire
# interpreter invocation) importable from this module
from asynqueue.threads import ThreadWorker
from asynqueue.process import ProcessWorker
# Not this one
#from asynqueue.wire import WireWorker


@implementer(IWorker)
class AsyncWorker(object):
    """
    I implement an L{IWorker} that runs tasks in the Twisted main
    loop.

    I run each L{tasks.Task} one at a time but in a well-behaved
    non-blocking manner. If the task callable doesn't return a
    C{Deferred}, it better get its work done fast. You just can't get
    away with blocking in the Twisted main loop.

    You can supply a I{series} keyword containing a list of one or
    more task series that I am qualified to handle.

    This class was mostly written for testing during development, but
    it helped keep the basic functions of a worker in mind. And who
    knows; it might be useful where you want the benefits of priority
    queueing without leaving the Twisted mindset even for a moment.
    """
    cQualified = ['async', 'local']
    
    def __init__(self, series=[], raw=False):
        """
        Constructs an instance of me with a L{util.DeferredLock}.
        
        @param series: A list of one or more task series that this
          particular instance of me is qualified to handle.

        @param raw: Set C{True} if you want raw iterators to be
          returned instead of L{iteration.Deferator} instances. You
          can override this with the same keyword set C{False} in a
          call.
        """
        self.iQualified = series
        self.raw = raw
        self.info = info.Info()
        self.dLock = util.DeferredLock()

    def setResignator(self, callableObject):
        self.dLock.addStopper(callableObject)

    def run(self, task):
        """
        Implements L{IWorker.run}, running the I{task} in the main
        thread. The task callable B{must} not block.
        """
        def ready(null):
            # THOU SHALT NOT BLOCK!
            return defer.maybeDeferred(
                f, *args, **kw).addCallbacks(done, oops)

        def done(result):
            if not raw and iteration.isIterator(result):
                try:
                    result = iteration.Deferator(result)
                except:
                    result = []
                else:
                    if consumer:
                        result = iteration.IterationProducer(result, consumer)
                status = b'i'
            else:
                status = b'r'
            # Hangs if release is done after the task callback
            self.dLock.release()
            task.callback((status, result))

        def oops(failureObj):
            #import pdb; pdb.set_trace()
            text = self.info.setCall(f, args, kw).aboutFailure(failureObj)
            task.callback((b'e', text))

        f, args, kw = task.callTuple
        raw = kw.pop('raw', None)
        if raw is None:
            raw = self.raw
        consumer = kw.pop('consumer', None)
        vip = (kw.pop('doNext', False) or task.priority <= -20)
        return self.dLock.acquire(vip).addCallback(ready)

    def stop(self):
        """
        Implements L{IWorker.stop}.
        """
        return self.dLock.stop()

    def crash(self):
        """
        There's no point to implementing this because the Twisted main
        loop will block along with any task you give this worker.
        """


__all__ = [
    'ThreadWorker', 'ProcessWorker', 'AsyncWorker', 'WireWorker',
    'IWorker'
]