a.w.ProcessWorker(object) : class documentation

Part of asynqueue.workers View In Hierarchy Source for workers.py Main doc for AsynQueue

Implements interfaces: asynqueue.workers.IWorker

I implement an IWorker that runs tasks in a dedicated worker process.

You can also supply a series keyword containing a list of one or more task series that I am qualified to handle.

Note: Each task's callable is pickled along with its arguments to be sent over the interprocess pipe. Thus it must be something that can be reconstructed at the process, i.e., a method of an instance of a class that is importable by the process. Keep this in mind if you get errors like this:


 cPickle.PicklingError:

   Can't pickle <type 'function'>: attribute lookup

   __builtin__.function failed

Tuning the backoff coefficient

I did some testing of backoff coefficients with unit tests, where the reactor wasn't doing much other than running the asynqueue and Twisted's trial test runner.

With tasks whose completion time range from 0 to 1 second, backoff of 1.10 was significantly more efficent than 1.15, even more so than 1.20.

Backoff of 1.05 was somewhat more efficient than 1.10 with completion times ranging from 0 to 200 ms: 96.7% process/worker efficiency vs. 94.5%, with the mean overhead cut in half to around 3.3ms. But that's not the whole story: with a constant completion time of 100ms, 1.05 was actually less efficient: 95.5% vs. 99%, and mean overhead increased from around 0.5 ms to around 4 ms!

After 100 ms, there will have been 7 checks, with the check interval finally doubling to 2.1 ms. Things take off rapidly; reaching 200 ms takes just another 4 checks, and the interval is then 3.1 ms.

It's only the calls that take longer that benefit from a smaller backoff. Going from 1.05 to 1.10 decreased the efficiency of 1-second calls from 97.5% to 94.3% because the overhead doubled from 25 ms to 60 ms. After so many event checks, the check interval had increased considerably, enough to add some significant dead time after the calls were done. By the time the second is up, there will have been 48 checks and the check interval will be 0.1 second.

A backoff of 1.10 is a bit magic numerically in that the current check interval is always about one tenth the amount of time that has passed since the first check. For a backoff of 1.05, the interval is half that. (It takes 81 checks to reach 1 second instead of 48.)

The cost of having too many checks is considerable, though, and must be worse with a busy reactor, so a backoff less than 1.10 with an (initial) interval of 0.001 isn't recommended. But you might consider tuning it for your application and system.

Instance Variable interval The initial event-checking interval, in seconds. (type: float)
Instance Variable backoff The backoff exponent. (type: float)
Class Variable cQualified 'process', 'local'
Static Method cores
Method __init__ Constructs me with a ThreadLooper and an empty list of tasks.
Method next Do a next call of the iterator held by my process, over the pipe and in Twisted fashion.
Method stats No summary
Method setResignator Registers the supplied callableObject to be called if the worker deems it necessary to resign, e.g., a remote connection has been lost.
Method run Sends the task callable and args, kw to the process (must all be picklable) and polls the interprocess connection for a result, with exponential backoff.
Method stop
Method crash Takes drastic action to shut down the worker, rudely and synchronously.
Method _killProcess Undocumented
interval =
The initial event-checking interval, in seconds. (type: float)
backoff =
The backoff exponent. (type: float)
cQualified =
'process', 'local'
@staticmethod
def cores():
Returns The number of CPU cores available. (type: int)
def __init__(self, series=, raw=False, callStats=False):

Constructs me with a ThreadLooper and an empty list of tasks.

Parameters series A list of one or more task series that this particular instance of me is qualified to handle.
raw Set True if you want raw iterators to be returned instead of iteration.Deferator instances. You can override this in with the same keyword set False in a call.
callStats Set True if you want stats kept about how long the calls took to send and to run on the process. Might add significant memory usage and slow things down a bit overall if there are lots of calls. Obtain a list of the call times here and on the process (2-tuples) with the stats method.
def _killProcess(self):
Undocumented
def next(self, ID):

Do a next call of the iterator held by my process, over the pipe and in Twisted fashion.

Parameters ID A unique identifier for the iterator.
def stats(self):

Assembles and returns a (deferred) list of call times. Each list item is a 2-tuple. The first element is the time it took to get the result from the process after sending the call to it, and the second element is how long the process took to run on the process.

def setResignator(self, callableObject):

Registers the supplied callableObject to be called if the worker deems it necessary to resign, e.g., a remote connection has been lost.

@defer.inlineCallbacks
def run(self, task):

Sends the task callable and args, kw to the process (must all be picklable) and polls the interprocess connection for a result, with exponential backoff.

This actually works very well, O ye Twisted event-purists.

def stop(self):
Returns A Deferred that fires when the task loop has ended and its process terminated.
def crash(self):

Takes drastic action to shut down the worker, rudely and synchronously.

Returns A list of task objects, one for each task left uncompleted. You shouldn't have to call this method if no tasks are left pending; the stop method should be enough in that case.
API Documentation for AsynQueue, generated by pydoctor at 2016-11-16 14:52:11.