Implements interfaces: asynqueue.interfaces.IWorker

Runs tasks "over the wire," via Twisted AMP running on an endpoint connection.

I implement an IWorker that runs named tasks in a remote Python interpreter via Twisted's Asynchronous Messaging Protocol over an endpoint that can be a UNIX socket, TCP/IP, SSL, etc. The task callable must be a method of a subclass of WireWorkerUniverse that has been imported into the same module as the one in which your instance of me is constructed. No pickled callables are sent over the wire, just strings defining the method name of that class instance.

For most applications, see process.ProcessWorker instead.

You can also supply a series keyword containing a list of one or more task series that I am qualified to handle.

When running tasks via me, don't assume that you can just call blocking code because it's done remotely. The AMP server on the other end runs under Twisted, too. (The result of the call may be a Deferred, and that's fine.) If the call is a blocking one, set the thread keyword True for it and it will run via an instance of threads.ThreadLooper.

Class AMP Special disconnection-alerting AMP protocol. When my connection is made, I construct a Deferred referenced as d_lcww, which I will fire it if I get disconnected.
Method __init__ Constructs me with a reference wwu to a WireWorkerUniverse and a client connection description.
Method stopper Undocumented
Method assembleChunkedResult Undocumented
Method do_next Do a next call of the iterator held by my subordinate, over the wire (socket) and in Twisted fashion.
Method resign Undocumented
Method setResignator I resign if my underlying AMP connection is lost.
Method run Sends the task callable, args, kw to the process and returns a deferred to the eventual result.
Method stop No summary
Method crash Takes drastic action to shut down the worker, rudely and synchronously.
Method _handleNext Undocumented
def __init__(self, wwu, description, series=[], raw=False, thread=False, N_concurrent=1):

Constructs me with a reference wwu to a WireWorkerUniverse and a client connection description.

Immediately connects to a WireServer running on another Python interpreter via the AMP protocol.

ParametersN_concurrentThe number of tasks I can have outstanding.
def stopper(self):
Undocumented
def _handleNext(self, ID):
Undocumented
@defer.inlineCallbacks
def assembleChunkedResult(self, ID):
Undocumented
def do_next(self, ID):

Do a next call of the iterator held by my subordinate, over the wire (socket) and in Twisted fashion.

This is done with highest priority because something is waiting for possibly lots of calls of this to complete.

def resign(self, *args):
Undocumented
def setResignator(self, callableObject):

I resign if my underlying AMP connection is lost.

@defer.inlineCallbacks
def run(self, task):

Sends the task callable, args, kw to the process and returns a deferred to the eventual result.

def stop(self):

Attempts to gracefully shut down the worker, returning a Deferred that fires when the worker is done with all assigned tasks and will not cause any errors if the reactor is stopped or its object is deleted.

The Deferred returned by your implementation of this method must not fire until after the results of all pending tasks have been obtained. Thus the deferred must be chained to each task.d somehow.

Make sure that any callbacks you add to the task's internal deferred object task.d return the callback argument. Otherwise, the result of your task will be lost in the callback chain.

def crash(self):

Takes drastic action to shut down the worker, rudely and synchronously.

ReturnsA list of task objects, one for each task left uncompleted. You shouldn't have to call this method if no tasks are left pending; the stop method should be enough in that case.
API Documentation for AsynQueue, generated by pydoctor at 2022-11-17 13:13:24.