sasync.database.AccessBroker(object)
class documentation
Part of sasync.database
(View In Hierarchy)
Known subclasses: sasync.items.Transactor, sasync.parray.Transactor, sasync.test.people.PeopleBroker
I manage asynchronous access to a database.
Before you use any instance of me, you must specify the parameters for creating an SQLAlchemy database engine. A single argument is used, which specifies a connection to a database via an RFC-1738 url. In addition, the verbose keyword options can be employed to spew out log messages about calls and errors. (Use that only for debugging.)
You can set an engine globally, for all instances of me via the sasync.engine
package-level
function, or via my engine
class method. Alternatively, you can specify an engine for one particular
instance by supplying the parameters to the constructor.
I employ AsynQueue
to queue up transactions asynchronously
and perform them one at a time. But I still want a connection pool for my
database engine to handle asynchronous iteration of rows from query
results. See the source for handleResult
.
SQLAlchemy has excellent documentation, which describes the engine parameters in plenty of detail. See http://docs.sqlalchemy.org/en/rel_1_0/core/engines.html.
See Also | http://edsuom.com/AsynQueue/asynqueue.threads.ThreadQueue.html |
Instance Variable | q | A property-generated reference to a threaded task queue that is dedicated to my database connection. |
Instance Variable | connection | The current SQLAlchemy connection object, if any yet exists. Generated by
my connect
method. |
Class Method | setup | Constructs a global queue for all instances of me, returning a deferred that fires with it. |
Method | __init__ | Constructs an instance of me, optionally specifying parameters for an SQLAlchemy engine object that serves this instance only. |
Method | singleton | Undocumented |
Method | connect | Undocumented |
Method | waitUntilRunning | Returns a Deferred that fires when the broker is running
and ready for transactions. |
Method | callWhenRunning | Calls the f-args-kw combo when the broker is running and ready for transactions. |
Method | table | Instantiates a new table object, creating it in the transaction thread as needed. |
Method | startup | This method runs before the first transaction to start my synchronous task queue. |
Method | first | No summary |
Method | shutdown | Shuts down my database transaction functionality and threaded task
queue, returning a Deferred that fires when all queued tasks
are done and the shutdown is complete. |
Method | handleResult | No summary |
Method | s | Polymorphic method for working with select instances within
a cached selection subcontext. |
Method | select | Just returns an SQLAlchemy select object. You do everything
else. |
Method | selex | Supply columns as arguments and this method generates a select on the columns, yielding a placeholder object with the same attributes as the select object itself. |
Method | selectorator | No summary |
Method | execute | Does a connection.execute(*args, **kw) as a transaction, in
my thread with my current connection, with all the usual handling of the
ResultProxy . |
Method | sql | Executes raw SQL as a transaction, in my thread with my current connection, with any rows of the result returned as a list. |
Method | produceRows | No summary |
Method | deferToQueue | Dispatches callable(*args, **kw) as a task via the like-named
method of my asynchronous queue, returning a Deferred to its
eventual result. |
connect
method.
def setup(cls, url, **kw):
Constructs a global queue for all instances of me, returning a deferred that fires with it.
Constructs an instance of me, optionally specifying parameters for an SQLAlchemy engine object that serves this instance only.
def waitUntilRunning(self):
Returns a Deferred
that fires when the broker is running
and ready for transactions.
Calls the f-args-kw combo when the broker is running and ready for transactions.
def table(self, name, *cols, **kw):
Instantiates a new table object, creating it in the transaction thread as needed.
One or more indexes other than the primary key can be defined via a keyword prefixed with index_ or unique_ and having the index name as the suffix. Use the unique_ prefix if the index is to be a unique one. The value of the keyword is a list or tuple containing the names of all columns in the index.
This method runs before the first transaction to start my synchronous task queue.
Override it to get whatever pre-transaction stuff you have run in the main loop before a database engine/connection is created.
This method automatically runs as the first transaction after completion
of startup
.
Override it to define table contents or whatever else you want as a
first transaction that immediately follows your pre-transaction stuff.
You don't need to decorate the method with @transact
, but
it doesn't break anything if you do.
def shutdown(self, *null):
Shuts down my database transaction functionality and threaded task
queue, returning a Deferred
that fires when all queued tasks
are done and the shutdown is complete.
If you call this before I have even started up for some reason, the
Deferred
will not fire until my startup sequence has
completed.
Repeated calls after I've already shutdown will be rewarded with a
Deferred
that fires immediately.
A method of a subclass of me that returns a Deferred
(rather than just being decorated with the transact
function)
could possibly have some database processing pending when I am told to
shutdown. To ensure that I wait for such a Deferred
to fire
before proceeding to shut down my thread queue, decorate the method with
the wait
function.
@defer.inlineCallbacks
def handleResult(self, result, consumer=None, conn=None, asList=False, N=1):
Handles the result of a transaction or connection.execute. If it's a
ResultsProxy
and possibly an implementor of
IConsumer
, returns a (deferred) instance of Deferator or
couples your consumer to an IterationProducer.
You can supply a connection to be closed after iterations are done with
the keyword 'conn'. With the keyword asList, you can force the rows
of a ResultProxy
to be fetched (in my thread) and returned as
a (deferred) list of rows.
Polymorphic method for working with select
instances within
a cached selection subcontext.
- When called with a single argument (the select object's name as a string) and no keywords, this method indicates if the named select object already exists and sets its selection subcontext to name.
-
With multiple arguments or any keywords, the method acts like a call to
sqlalchemy.select(...).compile()
, except that nothing is returned. Instead, the resulting select object is stored in the current selection subcontext. - With no arguments or keywords, the method returns the select object for the current selection subcontext.
Call from inside a transaction.
Just returns an SQLAlchemy
select object. You do everything
else.
This is an immediate result, not a Deferred
.
def selex(self, *args, **kw):
Supply columns as arguments and this method generates a select on the columns, yielding a placeholder object with the same attributes as the select object itself.
Supply a callable as an argument (along with any of its args) and it yields a placeholder whose attributes are the same as the result of that call.
In either case, you do stuff with the placeholder and call it to execute the connection with it. Supply the name of a resultsproxy method (and any of its args) to the call to get the result instead of the rp. Do all of this within the context of the placeholder:
with <me>.selex(<table>.c.foo) as sh: sh.where(<table>.c.bar == "correct") for dRow in sh(): row = yield dRow ...
You can call this outside of a transaction and get a deferred result from calling the placeholder object. For such usage, you can specify the usual transaction keywords via keywords to this method.
@defer.inlineCallbacks
def selectorator(self, selectObj, consumer=None, de=None, N=1):
When called with a select object that results in an iterable
ResultProxy
when executed, returns a deferred that fires with
a Deferator
that can be iterated over deferreds, each of which
fires with a successive row of the select's ResultProxy
.
If you supply an IConsumer
with the consumer
keyword, I will couple an iteration.IterationProducer
to your
consumer and run it. The returned Deferred
will fire (with a
reference to your consumer) when the iterations are done, but you don't
need to wait for that before doing another transaction.
If you supply a Deferred
via the de keyword, it will
be fired (unless already fired for some reason) with None
when
the select object has been executed, but before iterations have begun.
If you want multiple rows produced at once using the
fetchmany
method of the ResultProxy
, set N
to the number of rows you want at a time.
Call directly, *not* from inside a transaction:
dr = yield <me>.selectorator(<select>) for d in dr: row = yield d <proceed with row> d = <me>.selectorator(<select>, <consumer>) <do other stuff> consumer = yield d consumer.revealMagnificentSummary()
def execute(self, *args, **kw):
Does a connection.execute(*args, **kw)
as a transaction, in
my thread with my current connection, with all the usual handling of the
ResultProxy
.
Executes raw SQL as a transaction, in my thread with my current connection, with any rows of the result returned as a list.
@defer.inlineCallbacks
def produceRows(self, f, iterator, table, colName, **kw):
Calls the single-argument function f repeatedly, with each value yielded from the supplied iterator, to produce values for new rows in the specified table. The named column colName will have that value set for each row, in order. Other column will be set to values that are constant across the rows, as specified via keywords.
The function f must not block, but it may return either an immediate or deferred result. It doesn't matter if some calls to f take longer than others; the rows will be written in the same order as the input values to f are yielded from iterator.
Warning: Needs unit testing.
Returns | A Deferred that fires with a list of the primary key values
for each row, in the same order as iterator, when all the values
have been generated and written. |
def deferToQueue(self, func, *args, **kw):
Dispatches callable(*args, **kw) as a task via the like-named
method of my asynchronous queue, returning a Deferred
to its
eventual result.
Scheduling of the call is impacted by the niceness keyword that can be included in **kw. As with UNIX niceness, the value should be an integer where 0 is normal scheduling, negative numbers are higher priority, and positive numbers are lower priority.
There will be no shutdown of my queue until the deferred result is obtained.
Parameters | niceness | Scheduling niceness, an integer between -20 and 20, with lower numbers
having higher scheduling priority as in UNIX nice and
renice . |