I manage asynchronous access to a database.

Before you use any instance of me, you must specify the parameters for creating an SQLAlchemy database engine. A single argument is used, which specifies a connection to a database via an RFC-1738 url. In addition, the verbose keyword options can be employed to spew out log messages about calls and errors. (Use that only for debugging.)

You can set an engine globally, for all instances of me via the sasync.engine package-level function, or via my engine class method. Alternatively, you can specify an engine for one particular instance by supplying the parameters to the constructor.

I employ AsynQueue to queue up transactions asynchronously and perform them one at a time. But I still want a connection pool for my database engine to handle asynchronous iteration of rows from query results. See the source for handleResult.

SQLAlchemy has excellent documentation, which describes the engine parameters in plenty of detail. See http://docs.sqlalchemy.org/en/rel_1_0/core/engines.html.

See Alsohttp://edsuom.com/AsynQueue/asynqueue.threads.ThreadQueue.html
Instance Variable q A property-generated reference to a threaded task queue that is dedicated to my database connection.
Instance Variable connection The current SQLAlchemy connection object, if any yet exists. Generated by my connect method.
Class Method setup Constructs a global queue for all instances of me, returning a deferred that fires with it.
Method __init__ Constructs an instance of me, optionally specifying parameters for an SQLAlchemy engine object that serves this instance only.
Method singleton Undocumented
Method connect Undocumented
Method waitUntilRunning Returns a Deferred that fires when the broker is running and ready for transactions.
Method callWhenRunning Calls the f-args-kw combo when the broker is running and ready for transactions.
Method table Instantiates a new table object, creating it in the transaction thread as needed.
Method startup This method runs before the first transaction to start my synchronous task queue.
Method first No summary
Method shutdown Shuts down my database transaction functionality and threaded task queue, returning a Deferred that fires when all queued tasks are done and the shutdown is complete.
Method handleResult No summary
Method s Polymorphic method for working with select instances within a cached selection subcontext.
Method select Just returns an SQLAlchemy select object. You do everything else.
Method selex Supply columns as arguments and this method generates a select on the columns, yielding a placeholder object with the same attributes as the select object itself.
Method selectorator No summary
Method execute Does a connection.execute(*args, **kw) as a transaction, in my thread with my current connection, with all the usual handling of the ResultProxy.
Method sql Executes raw SQL as a transaction, in my thread with my current connection, with any rows of the result returned as a list.
Method produceRows No summary
Method deferToQueue Dispatches callable(*args, **kw) as a task via the like-named method of my asynchronous queue, returning a Deferred to its eventual result.
q =
A property-generated reference to a threaded task queue that is dedicated to my database connection.
connection =
The current SQLAlchemy connection object, if any yet exists. Generated by my connect method.
@classmethod
def setup(cls, url, **kw):

Constructs a global queue for all instances of me, returning a deferred that fires with it.

def __init__(self, *args, **kw):

Constructs an instance of me, optionally specifying parameters for an SQLAlchemy engine object that serves this instance only.

@property
def singleton(self):
Undocumented
def connect(self):
Undocumented
@defer.inlineCallbacks
def waitUntilRunning(self):

Returns a Deferred that fires when the broker is running and ready for transactions.

def callWhenRunning(self, f, *args, **kw):

Calls the f-args-kw combo when the broker is running and ready for transactions.

@defer.inlineCallbacks
def table(self, name, *cols, **kw):

Instantiates a new table object, creating it in the transaction thread as needed.

One or more indexes other than the primary key can be defined via a keyword prefixed with index_ or unique_ and having the index name as the suffix. Use the unique_ prefix if the index is to be a unique one. The value of the keyword is a list or tuple containing the names of all columns in the index.

def startup(self):

This method runs before the first transaction to start my synchronous task queue.

Override it to get whatever pre-transaction stuff you have run in the main loop before a database engine/connection is created.

def first(self):

This method automatically runs as the first transaction after completion of startup. Override it to define table contents or whatever else you want as a first transaction that immediately follows your pre-transaction stuff.

You don't need to decorate the method with @transact, but it doesn't break anything if you do.

@defer.inlineCallbacks
def shutdown(self, *null):

Shuts down my database transaction functionality and threaded task queue, returning a Deferred that fires when all queued tasks are done and the shutdown is complete.

If you call this before I have even started up for some reason, the Deferred will not fire until my startup sequence has completed.

Repeated calls after I've already shutdown will be rewarded with a Deferred that fires immediately.

A method of a subclass of me that returns a Deferred (rather than just being decorated with the transact function) could possibly have some database processing pending when I am told to shutdown. To ensure that I wait for such a Deferred to fire before proceeding to shut down my thread queue, decorate the method with the wait function.

@wait
@defer.inlineCallbacks
def handleResult(self, result, consumer=None, conn=None, asList=False, N=1):

Handles the result of a transaction or connection.execute. If it's a ResultsProxy and possibly an implementor of IConsumer, returns a (deferred) instance of Deferator or couples your consumer to an IterationProducer.

You can supply a connection to be closed after iterations are done with the keyword 'conn'. With the keyword asList, you can force the rows of a ResultProxy to be fetched (in my thread) and returned as a (deferred) list of rows.

def s(self, *args, **kw):

Polymorphic method for working with select instances within a cached selection subcontext.

  • When called with a single argument (the select object's name as a string) and no keywords, this method indicates if the named select object already exists and sets its selection subcontext to name.
  • With multiple arguments or any keywords, the method acts like a call to sqlalchemy.select(...).compile(), except that nothing is returned. Instead, the resulting select object is stored in the current selection subcontext.
  • With no arguments or keywords, the method returns the select object for the current selection subcontext.

Call from inside a transaction.

def select(self, *args, **kw):

Just returns an SQLAlchemy select object. You do everything else.

This is an immediate result, not a Deferred.

@contextmanager
def selex(self, *args, **kw):

Supply columns as arguments and this method generates a select on the columns, yielding a placeholder object with the same attributes as the select object itself.

Supply a callable as an argument (along with any of its args) and it yields a placeholder whose attributes are the same as the result of that call.

In either case, you do stuff with the placeholder and call it to execute the connection with it. Supply the name of a resultsproxy method (and any of its args) to the call to get the result instead of the rp. Do all of this within the context of the placeholder:

 with <me>.selex(<table>.c.foo) as sh:
     sh.where(<table>.c.bar == "correct")
 for dRow in sh():
     row = yield dRow
     ...

You can call this outside of a transaction and get a deferred result from calling the placeholder object. For such usage, you can specify the usual transaction keywords via keywords to this method.

@wait
@defer.inlineCallbacks
def selectorator(self, selectObj, consumer=None, de=None, N=1):

When called with a select object that results in an iterable ResultProxy when executed, returns a deferred that fires with a Deferator that can be iterated over deferreds, each of which fires with a successive row of the select's ResultProxy.

If you supply an IConsumer with the consumer keyword, I will couple an iteration.IterationProducer to your consumer and run it. The returned Deferred will fire (with a reference to your consumer) when the iterations are done, but you don't need to wait for that before doing another transaction.

If you supply a Deferred via the de keyword, it will be fired (unless already fired for some reason) with None when the select object has been executed, but before iterations have begun.

If you want multiple rows produced at once using the fetchmany method of the ResultProxy, set N to the number of rows you want at a time.

Call directly, *not* from inside a transaction:

 dr = yield <me>.selectorator(<select>)
 for d in dr:
     row = yield d
     <proceed with row>
 d = <me>.selectorator(<select>, <consumer>)
 <do other stuff>
 consumer = yield d
 consumer.revealMagnificentSummary()
@transact
def execute(self, *args, **kw):

Does a connection.execute(*args, **kw) as a transaction, in my thread with my current connection, with all the usual handling of the ResultProxy.

def sql(self, sqlText, **kw):

Executes raw SQL as a transaction, in my thread with my current connection, with any rows of the result returned as a list.

@wait
@defer.inlineCallbacks
def produceRows(self, f, iterator, table, colName, **kw):

Calls the single-argument function f repeatedly, with each value yielded from the supplied iterator, to produce values for new rows in the specified table. The named column colName will have that value set for each row, in order. Other column will be set to values that are constant across the rows, as specified via keywords.

The function f must not block, but it may return either an immediate or deferred result. It doesn't matter if some calls to f take longer than others; the rows will be written in the same order as the input values to f are yielded from iterator.

Warning: Needs unit testing.

ReturnsA Deferred that fires with a list of the primary key values for each row, in the same order as iterator, when all the values have been generated and written.
@wait
def deferToQueue(self, func, *args, **kw):

Dispatches callable(*args, **kw) as a task via the like-named method of my asynchronous queue, returning a Deferred to its eventual result.

Scheduling of the call is impacted by the niceness keyword that can be included in **kw. As with UNIX niceness, the value should be an integer where 0 is normal scheduling, negative numbers are higher priority, and positive numbers are lower priority.

There will be no shutdown of my queue until the deferred result is obtained.

ParametersnicenessScheduling niceness, an integer between -20 and 20, with lower numbers having higher scheduling priority as in UNIX nice and renice.
API Documentation for sAsync, generated by pydoctor at 2021-09-18 08:41:22.