asynqueue.workers.ProcessWorker(object)
class documentation
Part of asynqueue.workers
(View In Hierarchy)
Implements interfaces: IWorker
I implement an IWorker
that runs
tasks in a dedicated worker process.
You can also supply a series keyword containing a list of one or more task series that I am qualified to handle.
Note: Each task's callable is pickled along with its arguments to be sent over the interprocess pipe. Thus it must be something that can be reconstructed at the process, i.e., a method of an instance of a class that is importable by the process. Keep this in mind if you get errors like this:
cPickle.PicklingError: Can't pickle <type 'function'>: attribute lookup __builtin__.function failed
Tuning the backoff coefficient
I did some testing of backoff coefficients with unit tests, where the reactor wasn't doing much other than running the asynqueue and Twisted's trial test runner.
With tasks whose completion time range from 0 to 1 second, backoff of 1.10 was significantly more efficent than 1.15, even more so than 1.20.
Backoff of 1.05 was somewhat more efficient than 1.10 with completion times ranging from 0 to 200 ms: 96.7% process/worker efficiency vs. 94.5%, with the mean overhead cut in half to around 3.3ms. But that's not the whole story: with a constant completion time of 100ms, 1.05 was actually less efficient: 95.5% vs. 99%, and mean overhead increased from around 0.5 ms to around 4 ms!
After 100 ms, there will have been 7 checks, with the check interval finally doubling to 2.1 ms. Things take off rapidly; reaching 200 ms takes just another 4 checks, and the interval is then 3.1 ms.
It's only the calls that take longer that benefit from a smaller backoff. Going from 1.05 to 1.10 decreased the efficiency of 1-second calls from 97.5% to 94.3% because the overhead doubled from 25 ms to 60 ms. After so many event checks, the check interval had increased considerably, enough to add some significant dead time after the calls were done. By the time the second is up, there will have been 48 checks and the check interval will be 0.1 second.
A backoff of 1.10 is a bit magic numerically in that the current check interval is always about one tenth the amount of time that has passed since the first check. For a backoff of 1.05, the interval is half that. (It takes 81 checks to reach 1 second instead of 48.)
The cost of having too many checks is considerable, though, and must be worse with a busy reactor, so a backoff less than 1.10 with an (initial) interval of 0.001 isn't recommended. But you might consider tuning it for your application and system.
Instance Variable | interval | The initial event-checking interval, in seconds. (type: float) |
Instance Variable | backoff | The backoff exponent. (type: float) |
Class Variable | cQualified | 'process', 'local' |
Static Method | cores | |
Method | __init__ | Constructs me with a ThreadLooper
and an empty list of tasks. |
Method | do_next | Do a next call of the iterator held by my process, over the pipe and in Twisted fashion. |
Method | stats | No summary |
Method | memUsage | On a real operating system, returns the memory usage of the Python
sub-process I use, in kilobytes as an int , or
None if the process is not running. |
Method | setResignator | Undocumented |
Method | run | Sends the task callable and args, kw to the process (must all be picklable) and polls the interprocess connection for a result, with exponential backoff. |
Method | stop | The resulting Deferred fires when all tasks and Deferators
are done, the task loop has ended, and its process has terminated. |
Method | crash | Undocumented |
Method | _killProcess | Undocumented |
Constructs me with a ThreadLooper
and an empty list of tasks.
Parameters | series | A list of one or more task series that this particular instance of me is qualified to handle. |
raw | Set True if you want raw iterators to be returned instead of
iteration.Deferator
instances. You can override this in with the same keyword set
False in a call. | |
callStats | Set True if you want stats kept about how long the calls took
to send and to run on the process. Might add significant memory usage and
slow things down a bit overall if there are lots of calls. Obtain a list of
the call times here and on the process (2-tuples) with the stats
method. | |
useReactor | Set True to use a Twisted reactor in the process. (currently
only compatible with raw mode.) |
Do a next call of the iterator held by my process, over the pipe and in Twisted fashion.
Parameters | ID | A unique identifier for the iterator. |
Assembles and returns a (deferred) list of call times. Each list item is a 2-tuple. The first element is the time it took to get the result from the process after sending the call to it, and the second element is how long the process took to run on the process.
On a real operating system, returns the memory usage of the Python
sub-process I use, in kilobytes as an int
, or
None
if the process is not running.
def run(self, task):
Sends the task callable and args, kw to the process (must all be picklable) and polls the interprocess connection for a result, with exponential backoff.
This actually works very well, O ye Twisted event-purists.