Implements interfaces: IWorker

I implement an IWorker that runs tasks in a dedicated worker process.

You can also supply a series keyword containing a list of one or more task series that I am qualified to handle.

Note: Each task's callable is pickled along with its arguments to be sent over the interprocess pipe. Thus it must be something that can be reconstructed at the process, i.e., a method of an instance of a class that is importable by the process. Keep this in mind if you get errors like this:

 cPickle.PicklingError:
   Can't pickle <type 'function'>: attribute lookup
   __builtin__.function failed

Tuning the backoff coefficient

I did some testing of backoff coefficients with unit tests, where the reactor wasn't doing much other than running the asynqueue and Twisted's trial test runner.

With tasks whose completion time range from 0 to 1 second, backoff of 1.10 was significantly more efficent than 1.15, even more so than 1.20.

Backoff of 1.05 was somewhat more efficient than 1.10 with completion times ranging from 0 to 200 ms: 96.7% process/worker efficiency vs. 94.5%, with the mean overhead cut in half to around 3.3ms. But that's not the whole story: with a constant completion time of 100ms, 1.05 was actually less efficient: 95.5% vs. 99%, and mean overhead increased from around 0.5 ms to around 4 ms!

After 100 ms, there will have been 7 checks, with the check interval finally doubling to 2.1 ms. Things take off rapidly; reaching 200 ms takes just another 4 checks, and the interval is then 3.1 ms.

It's only the calls that take longer that benefit from a smaller backoff. Going from 1.05 to 1.10 decreased the efficiency of 1-second calls from 97.5% to 94.3% because the overhead doubled from 25 ms to 60 ms. After so many event checks, the check interval had increased considerably, enough to add some significant dead time after the calls were done. By the time the second is up, there will have been 48 checks and the check interval will be 0.1 second.

A backoff of 1.10 is a bit magic numerically in that the current check interval is always about one tenth the amount of time that has passed since the first check. For a backoff of 1.05, the interval is half that. (It takes 81 checks to reach 1 second instead of 48.)

The cost of having too many checks is considerable, though, and must be worse with a busy reactor, so a backoff less than 1.10 with an (initial) interval of 0.001 isn't recommended. But you might consider tuning it for your application and system.

Instance Variable interval The initial event-checking interval, in seconds. (type: float)
Instance Variable backoff The backoff exponent. (type: float)
Class Variable cQualified 'process', 'local'
Static Method cores
Method __init__ Constructs me with a ThreadLooper and an empty list of tasks.
Method do_next Do a next call of the iterator held by my process, over the pipe and in Twisted fashion.
Method stats No summary
Method memUsage On a real operating system, returns the memory usage of the Python sub-process I use, in kilobytes as an int, or None if the process is not running.
Method setResignator Undocumented
Method run Sends the task callable and args, kw to the process (must all be picklable) and polls the interprocess connection for a result, with exponential backoff.
Method stop The resulting Deferred fires when all tasks and Deferators are done, the task loop has ended, and its process has terminated.
Method crash Undocumented
Method _killProcess Undocumented
interval =
The initial event-checking interval, in seconds. (type: float)
backoff =
The backoff exponent. (type: float)
cQualified =
'process', 'local'
@staticmethod
def cores():
ReturnsThe number of CPU cores available. (type: int)
def __init__(self, series=[], raw=False, callStats=False, useReactor=False):

Constructs me with a ThreadLooper and an empty list of tasks.

ParametersseriesA list of one or more task series that this particular instance of me is qualified to handle.
rawSet True if you want raw iterators to be returned instead of iteration.Deferator instances. You can override this in with the same keyword set False in a call.
callStatsSet True if you want stats kept about how long the calls took to send and to run on the process. Might add significant memory usage and slow things down a bit overall if there are lots of calls. Obtain a list of the call times here and on the process (2-tuples) with the stats method.
useReactorSet True to use a Twisted reactor in the process. (currently only compatible with raw mode.)
def _killProcess(self):
Undocumented
def do_next(self, ID):

Do a next call of the iterator held by my process, over the pipe and in Twisted fashion.

ParametersIDA unique identifier for the iterator.
def stats(self):

Assembles and returns a (deferred) list of call times. Each list item is a 2-tuple. The first element is the time it took to get the result from the process after sending the call to it, and the second element is how long the process took to run on the process.

def memUsage(self):

On a real operating system, returns the memory usage of the Python sub-process I use, in kilobytes as an int, or None if the process is not running.

def setResignator(self, callableObject):
Undocumented
@defer.inlineCallbacks
def run(self, task):

Sends the task callable and args, kw to the process (must all be picklable) and polls the interprocess connection for a result, with exponential backoff.

This actually works very well, O ye Twisted event-purists.

@defer.inlineCallbacks
def stop(self):

The resulting Deferred fires when all tasks and Deferators are done, the task loop has ended, and its process has terminated.

def crash(self):
Undocumented
API Documentation for AsynQueue, generated by pydoctor at 2022-11-17 13:13:24.