I manage asynchronous access to a database.
Before you use any instance of me, you must specify the parameters for creating an SQLAlchemy database engine. A single argument is used, which specifies a connection to a database via an RFC-1738 url. In addition, the verbose keyword options can be employed to spew out log messages about calls and errors. (Use that only for debugging.)
You can set an engine globally, for all instances of me via the
function, or via my
class method. Alternatively, you can specify an engine for one particular
instance by supplying the parameters to the constructor.
AsynQueue to queue up transactions asynchronously
and perform them one at a time. But I still want a connection pool for my
database engine to handle asynchronous iteration of rows from query
results. See the source for
SQLAlchemy has excellent documentation, which describes the engine parameters in plenty of detail. See http://docs.sqlalchemy.org/en/rel_1_0/core/engines.html.
|Instance Variable||q||A property-generated reference to a threaded task queue that is dedicated to my database connection.|
|Instance Variable||connection||The current SQLAlchemy connection object, if any yet exists. Generated by
|Class Method||setup||Constructs a global queue for all instances of me, returning a deferred that fires with it.|
|Method||__init__||Constructs an instance of me, optionally specifying parameters for an SQLAlchemy engine object that serves this instance only.|
|Method||callWhenRunning||Calls the f-args-kw combo when the broker is running and ready for transactions.|
|Method||table||Instantiates a new table object, creating it in the transaction thread as needed.|
|Method||startup||This method runs before the first transaction to start my synchronous task queue.|
|Method||shutdown||Shuts down my database transaction functionality and threaded task
queue, returning a
|Method||s||Polymorphic method for working with
|Method||select||Just returns an
|Method||selex||Supply columns as arguments and this method generates a select on the columns, yielding a placeholder object with the same attributes as the select object itself.|
|Method||sql||Executes raw SQL as a transaction, in my thread with my current connection, with any rows of the result returned as a list.|
|Method||deferToQueue||Dispatches callable(*args, **kw) as a task via the like-named
method of my asynchronous queue, returning a
def setup(cls, url, **kw):
Constructs a global queue for all instances of me, returning a deferred that fires with it.
Deferred that fires when the broker is running
and ready for transactions.
Calls the f-args-kw combo when the broker is running and ready for transactions.
def table(self, name, *cols, **kw):
Instantiates a new table object, creating it in the transaction thread as needed.
One or more indexes other than the primary key can be defined via a keyword prefixed with index_ or unique_ and having the index name as the suffix. Use the unique_ prefix if the index is to be a unique one. The value of the keyword is a list or tuple containing the names of all columns in the index.
This method runs before the first transaction to start my synchronous task queue.
Override it to get whatever pre-transaction stuff you have run in the main loop before a database engine/connection is created.
This method automatically runs as the first transaction after completion
Override it to define table contents or whatever else you want as a
first transaction that immediately follows your pre-transaction stuff.
You don't need to decorate the method with
it doesn't break anything if you do.
def shutdown(self, *null):
Shuts down my database transaction functionality and threaded task
queue, returning a
Deferred that fires when all queued tasks
are done and the shutdown is complete.
If you call this before I have even started up for some reason, the
Deferred will not fire until my startup sequence has
Repeated calls after I've already shutdown will be rewarded with a
Deferred that fires immediately.
A method of a subclass of me that returns a
(rather than just being decorated with the
could possibly have some database processing pending when I am told to
shutdown. To ensure that I wait for such a
Deferred to fire
before proceeding to shut down my thread queue, decorate the method with
def handleResult(self, result, consumer=None, conn=None, asList=False, N=1):
Handles the result of a transaction or connection.execute. If it's a
ResultsProxy and possibly an implementor of
IConsumer, returns a (deferred) instance of Deferator or
couples your consumer to an IterationProducer.
You can supply a connection to be closed after iterations are done with
the keyword 'conn'. With the keyword asList, you can force the rows
ResultProxy to be fetched (in my thread) and returned as
a (deferred) list of rows.
Polymorphic method for working with
select instances within
a cached selection subcontext.
- When called with a single argument (the select object's name as a string) and no keywords, this method indicates if the named select object already exists and sets its selection subcontext to name.
With multiple arguments or any keywords, the method acts like a call to
sqlalchemy.select(...).compile(), except that nothing is returned. Instead, the resulting select object is stored in the current selection subcontext.
- With no arguments or keywords, the method returns the select object for the current selection subcontext.
Call from inside a transaction.
Just returns an
SQLAlchemy select object. You do everything
This is an immediate result, not a
def selex(self, *args, **kw):
Supply columns as arguments and this method generates a select on the columns, yielding a placeholder object with the same attributes as the select object itself.
Supply a callable as an argument (along with any of its args) and it yields a placeholder whose attributes are the same as the result of that call.
In either case, you do stuff with the placeholder and call it to execute the connection with it. Supply the name of a resultsproxy method (and any of its args) to the call to get the result instead of the rp. Do all of this within the context of the placeholder:
with <me>.selex(<table>.c.foo) as sh: sh.where(<table>.c.bar == "correct") for dRow in sh(): row = yield dRow ...
You can call this outside of a transaction and get a deferred result from calling the placeholder object. For such usage, you can specify the usual transaction keywords via keywords to this method.
def selectorator(self, selectObj, consumer=None, de=None, N=1):
When called with a select object that results in an iterable
ResultProxy when executed, returns a deferred that fires with
Deferator that can be iterated over deferreds, each of which
fires with a successive row of the select's
If you supply an
IConsumer with the consumer
keyword, I will couple an
iteration.IterationProducer to your
consumer and run it. The returned
Deferred will fire (with a
reference to your consumer) when the iterations are done, but you don't
need to wait for that before doing another transaction.
If you supply a
Deferred via the de keyword, it will
be fired (unless already fired for some reason) with
the select object has been executed, but before iterations have begun.
If you want multiple rows produced at once using the
fetchmany method of the
ResultProxy, set N
to the number of rows you want at a time.
Call directly, *not* from inside a transaction:
dr = yield <me>.selectorator(<select>) for d in dr: row = yield d <proceed with row> d = <me>.selectorator(<select>, <consumer>) <do other stuff> consumer = yield d consumer.revealMagnificentSummary()
def execute(self, *args, **kw):
connection.execute(*args, **kw) as a transaction, in
my thread with my current connection, with all the usual handling of the
Executes raw SQL as a transaction, in my thread with my current connection, with any rows of the result returned as a list.
def produceRows(self, f, iterator, table, colName, **kw):
Calls the single-argument function f repeatedly, with each value yielded from the supplied iterator, to produce values for new rows in the specified table. The named column colName will have that value set for each row, in order. Other column will be set to values that are constant across the rows, as specified via keywords.
The function f must not block, but it may return either an immediate or deferred result. It doesn't matter if some calls to f take longer than others; the rows will be written in the same order as the input values to f are yielded from iterator.
Warning: Needs unit testing.
def deferToQueue(self, func, *args, **kw):
Dispatches callable(*args, **kw) as a task via the like-named
method of my asynchronous queue, returning a
Deferred to its
Scheduling of the call is impacted by the niceness keyword that can be included in **kw. As with UNIX niceness, the value should be an integer where 0 is normal scheduling, negative numbers are higher priority, and positive numbers are lower priority.
There will be no shutdown of my queue until the deferred result is obtained.
|kw||niceness||Scheduling niceness, an integer between -20 and 20, with lower numbers
having higher scheduling priority as in UNIX |