# sAsync:
# An enhancement to the SQLAlchemy package that provides persistent
# item-value stores, arrays, and dictionaries, and an access broker for
# conveniently managing database access, table setup, and
# transactions. Everything can be run in an asynchronous fashion using
# the Twisted framework and its deferred processing capabilities.
# Copyright (C) 2006, 2015 by Edwin A. Suominen,
# See for API documentation as well as information about
# Ed's background and other projects, software and otherwise.
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the
# License. You may obtain a copy of the License at
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an "AS
# express or implied. See the License for the specific language
# governing permissions and limitations under the License.

Asynchronous database transactions via C{SQLAlchemy} and
C{Twisted}. You will surely have a subclass of L{AccessBroker}.

import inspect, logging
from contextlib import contextmanager

from twisted.internet import defer, reactor
from twisted.python.failure import Failure

import sqlalchemy as SA
from sqlalchemy import pool

import asynqueue
from asynqueue import iteration, threads

import errors, queue
from selex import SelectAndResultHolder

def transaction(self, func, *t_args, **t_kw):
    Everything making up a transaction, and everything run in the
    thread, is contained within this little function,
    including of course a call to C{func}.
    trans = self.connection.begin()
    if not hasattr(func, 'im_self'):
        t_args = (self,) + t_args
        result = func(*t_args, **t_kw)
    except Exception as e:
        text = asynqueue.Info().setCall(
            func, t_args, t_kw).aboutException(exception=e)
        raise errors.TransactionError(text)
        return result

def nextFromRP(rp, N=1):
    I{Transaction magic.}
        if N == 1:
            result = rp.fetchone()
            result = rp.fetchmany(N)
        result = []
    if result:
        return result
    raise StopIteration
def isNested(self):
    I{Transaction magic.}
    firstCode = None
    frame = inspect.currentframe()
    while True:
        frame = frame.f_back
        if frame is None:
            result = False
        if frame.f_code == transaction.func_code:
            result = True
        # Check if nested inside first-transaction code if necessary
        if firstCode is None and hasattr(self, 'first'):
            firstCode = getattr(self.first, 'func_code', False)
        if firstCode and frame.f_code == firstCode:
            result = True
    del frame
    del firstCode
    return result

def transact(f):
    Transaction decorator.
    Use this function as a decorator to wrap the supplied method I{f}
    of L{AccessBroker} in a transaction that runs C{f(*args, **kw)} in
    its own transaction.

    Immediately returns a C{Deferred} that will eventually have its
    callback called with the result of the transaction. Inspired by
    and largely copied from Valentino Volonghi's C{makeTransactWith}

    You can add the following keyword options to your function call:

    @keyword niceness: Scheduling niceness, an integer between -20 and
      20, with lower numbers having higher scheduling priority as in
      UNIX C{nice} and C{renice}.

    @keyword doNext: Set C{True} to assign highest possible priority,
      even higher than with niceness = -20.

    @keyword doLast: Set C{True} to assign lower possible priority,
      even lower than with niceness = 20.

    @keyword consumer: Set this to a consumer object (must implement
      the C{twisted.interfaces.IConsumer} interface) and the
      C{SA.ResultProxy} will write its rows to it in Twisted
      fashion. The returned C{Deferred} will fire when all rows have
      been written.

    @keyword raw: Set C{True} to have the transaction result returned
      in its original form even if it's a C{RowProxy} or other
    def substituteFunction(self, *args, **kw):
        Puts the original function in the synchronous task queue and
        returns a deferred to its result when it is eventually run.

        If the transaction resulted in a C{ResultProxy} object, the
        C{Deferred} fires with an C{asynqueue.iteration.Deferator},
        unless you've supplied an IConsumer with the I{consumer}
        keyword. Then the deferred result is an
        C{asynqueue.iteration.IterationProducer} coupled to your

        This function will be given the same name as the original
        function so that it can be asked to masquerade as the original
        function. As a result, the threaded call to the original
        function that it makes inside its C{transaction} sub-function
        will be able to use the arguments for that original
        function. (The caller will actually be calling this substitute
        function, but it won't know that.)

        The original function should be a method of a L{AccessBroker}
        subclass instance, and the queue for that instance will be
        used to run it.

        @see: U{/AsynQueue/asynqueue.iteration.html}
        def oops(failureObj):
            # Encapsulate the failure object in a list to avoid the
            # errback chain until we are ready.
            return [failureObj]

        def doTransaction():
            if self.singleton or not self.running:
                # Not yet running, "wait" here for queue, engine, and
                # connection
                yield self.lock.acquire()
                if not self.singleton:
                    # If we can handle multiple connections (TODO), we
                    # don't want to hold onto the lock because
                    # transactions are queued in the ThreadQueue and
                    # additional connections can be obtained when doing
                    # ResultProxy iteration.
            result = yield
                transaction, self, f, *args, **kw).addErrback(oops)
            if not raw:
                result = yield self.handleResult(
                    result, consumer=consumer, asList=asList)
            if self.singleton:
                # If we can't handle multiple connections, we held
                # onto the lock throughout all of this
            # If the result is a failure, raise its exception to trigger
            # the errback outside this function. Very weird, I
            # know. Basically, there must be an exception IN this function
            # to trigger the errback OUTSIDE of it. Just returning a
            # Failure doesn't seem to work.
            if isinstance(result, list):
                if len(result) == 1 and isinstance(result[0], Failure):

        # The 'raw' keyword is also used by the queue/worker, but
        # 'asList', 'isNested', and 'consumer' are not.
        raw = kw.get('raw', False)
        asList = kw.pop('asList', False)
        consumer = kw.pop('consumer', None)
        if consumer and raw:
            raise ValueError(
                "Can't supply a consumer for a raw transaction result")
        if kw.pop('isNested', False) or isNested(self):
            # The call and its result only get special treatment in
            # the outermost @transact function
            return f(self, *args, **kw)
        # Here's where the ThreadQueue actually runs the
        # transaction
        return doTransaction()

    # We need to ignore any transact decorator on an AccessBroker's
    # 'first' method, because it can't wait for the lock
    if f.func_name == 'first':
        return f
    substituteFunction.func_name = f.func_name
    return substituteFunction

class AccessBroker(object):
    I manage asynchronous access to a database.

    Before you use any instance of me, you must specify the parameters
    for creating an SQLAlchemy database engine. A single argument is
    used, which specifies a connection to a database via an RFC-1738
    url. In addition, the I{verbose} keyword options can be employed
    to spew out log messages about calls and errors. (Use that only
    for debugging.)

    You can set an engine globally, for all instances of me via the
    L{sasync.engine} package-level function, or via my L{engine} class
    method. Alternatively, you can specify an engine for one
    particular instance by supplying the parameters to the

    I employ C{AsynQueue} to queue up transactions asynchronously and
    perform them one at a time. But I still want a connection pool for
    my database engine to handle asynchronous iteration of rows from
    query results. See the source for L{handleResult}.
    SQLAlchemy has excellent documentation, which describes the engine
    parameters in plenty of detail. See

    @ivar q: A property-generated reference to a threaded task queue
      that is dedicated to my database connection.

    @ivar connection: The current SQLAlchemy connection object, if
      any yet exists. Generated by my L{connect} method.

    @see: U{}
    # A single class-wide queue factory
    qFactory = queue.Factory()

    def setup(cls, url, **kw):
        Constructs a global queue for all instances of me, returning a
        deferred that fires with it.
        return cls.qFactory.setGlobal(url, **kw)
    def __init__(self, *args, **kw):
        Constructs an instance of me, optionally specifying parameters for
        an SQLAlchemy engine object that serves this instance only.
        def firstAsTransaction():
            # Need to call the first transaction here rather than via
            # a regular 'transact' substitute call to avoid competing
            # for the lock
            with self.connection.begin():
        def startup(null):
            # Queue with attached engine, possibly shared with other
            # AccessBrokers
            self.q = yield self.qFactory(*args, **kw)
            # A connection of my very own
            self.connection = yield self.connect()
            # Pre-transaction startup, called in main loop after
            # connection made.
            yield defer.maybeDeferred(self.startup)
            # First transaction, called in thread
            yield self.q.deferToThread(firstAsTransaction)
            # Ready for regular transactions
            self.running = True
                'before', 'shutdown', self.shutdown)
        self.selects = {}
        self.rowProxies = []
        self.running = False
        # The deferred lock lets us easily wait until setup is done
        # and avoids running multiple transactions at once when they
        # aren't wanted.
        self.lock = asynqueue.DeferredLock()

    def singleton(self):
        if not hasattr(self, '_singleton'):
            engine = getattr(getattr(self, 'q', None), 'engine', None)
            self._singleton = getattr(engine, 'name', 'sqlite') == 'sqlite'
        return self._singleton
    def connect(self):
        def nowConnect(null):
        if not getattr(self, 'q', None):
            return self.waitUntilRunning().addCallback(nowConnect)
        return nowConnect(None)

    def waitUntilRunning(self):
        Returns a C{Deferred} that fires when the broker is running and
        ready for transactions.
        if not self.running:
            yield self.lock.acquire()

    def callWhenRunning(self, f, *args, **kw):
        Calls the I{f-args-kw} combo when the broker is running and ready
        for transactions.
        return self.waitUntilRunning().addCallback(lambda _: f(*args, **kw))

    def table(self, name, *cols, **kw):
        Instantiates a new table object, creating it in the transaction
        thread as needed.

        One or more indexes other than the primary key can be defined
        via a keyword prefixed with I{index_} or I{unique_} and having
        the index name as the suffix. Use the I{unique_} prefix if the
        index is to be a unique one. The value of the keyword is a
        list or tuple containing the names of all columns in the
        def haveQueue():
            return getattr(self, 'q', None)
        def makeTable():
            if not hasattr(self, '_meta'):
                self._meta = SA.MetaData(self.q.engine)
            indexes = {}
            for key in kw.keys():
                if key.startswith('index_'):
                    unique = False
                elif key.startswith('unique_'):
                    unique = True
                indexes[key] = kw.pop(key), unique
            kw.setdefault('useexisting', True)
            table = SA.Table(name, self._meta, *cols, **kw)
            setattr(self, name, table)
            return table, indexes

        def makeIndex(tableInfo):
            table, indexes = tableInfo
            for key, info in indexes.iteritems():
                kwIndex = {'unique':info[1]}
                    # This is stupid. Why can't I see if the index
                    # already exists and only create it if needed?
                    index = SA.Index(
                        key, *[
                            getattr(table.c, x) for x in info[0]], **kwIndex)

        if not hasattr(self, name):
            if not haveQueue():
                # This is tricky; startup hasn't finished, but making
                # a table is likely to be part of the startup. What we
                # really need to wait for is the presence of a queue.
                yield iteration.Delay(backoff=1.02).untilEvent(haveQueue)
            tableInfo = yield self.q.deferToThread(makeTable)
            yield self.q.deferToThread(makeIndex, tableInfo)
    def startup(self):
        This method runs before the first transaction to start my
        synchronous task queue. B{Override it} to get whatever
        pre-transaction stuff you have run in the main loop before a
        database engine/connection is created.
        return defer.succeed(None)

    def first(self):
        This method automatically runs as the first transaction after
        completion of L{startup}. B{Override it} to define table
        contents or whatever else you want as a first transaction that
        immediately follows your pre-transaction stuff.

        You don't need to decorate the method with C{@transact}, but
        it doesn't break anything if you do.

    def shutdown(self, *null):
        Shuts down my database transaction functionality and threaded task
        queue, returning a C{Deferred} that fires when all queued
        tasks are done and the shutdown is complete.

        If you call this before I have even started up for some
        reason, the C{Deferred} will not fire until my startup
        sequence has completed.

        Repeated calls after I've already shutdown will be rewarded
        with a C{Deferred} that fires immediately.
        def closeConnection():
            conn = getattr(self, 'connection', None)
            if conn is not None:
                if hasattr(conn, 'connection'):
                    # Close the raw DBAPI connection rather than a
                    # proxied one. Does this actually make any
                    # difference?
                    conn = conn.connection

        if self.running:
            # Regular shutdown, called after already running
            self._haveShutdown = True
            self.running = False
            if self.q.isRunning():
                with self.lock.context() as d:
                    yield d
                    # Calling this via the queue is a problem if the
                    # queue is shared and has been shut down. But
                    # calling it in the main thread seems to work
                    # yield
            yield self.qFactory.kill(self.q)
        elif not getattr(self, '_haveShutdown', False):
            # Shutdown called before I have even started running, need
            # to wait for startup before initiating shutdown
            yield self.waitUntilRunning()
            yield self.shutdown()

    def handleResult(self, result, consumer=None, conn=None, asList=False, N=1):
        Handles the result of a transaction or connection.execute. If it's
        a C{ResultsProxy} and possibly an implementor of C{IConsumer},
        returned a (deferred) instance of Deferator or couples your
        consumer to an IterationProducer.

        You can supply a connection to be closed after iterations are
        done with the keyword 'conn'. With the keyword I{asList}, you
        can force the rows of a C{ResultProxy} to be fetched (in my
        thread) and returned as a (deferred) list of rows.
        def close(null):
            if callable(getattr(result, 'close', None)):
            if conn:

        if getattr(result, 'returns_rows', False):
            # A ResultsProxy gets special handling
            if asList:
                # ...except with asList, when all its rows are fetched
                # in my thread and simply returned as a list
                result = yield self.q.deferToThread(result.fetchall)
                pf = iteration.Prefetcherator(repr(result))
                ok = yield pf.setup(
                    self.q.deferToThread, nextFromRP, result, N)
                if ok:
                    dr = iteration.Deferator(pf)
                    if consumer:
                        # A consumer was supplied, so try to make an
                        # IterationProducer couple to it.
                        ip = iteration.IterationProducer(dr, consumer)
                        # We "wait" here for the iteration/production
                        # to finish. What actually happens is that the
                        # caller receives a deferred that fires when
                        # iteration/production is done. However, the
                        # consumer gets the iterations in the
                        # meantime.
                        result = consumer
                        # No consumer supplied, just "return" the Deferator
                        result = dr
                    # Empty/invalid ResultsProxy, just "return" an empty list
                    result = []
    def s(self, *args, **kw):
        Polymorphic method for working with C{select} instances within a
        cached selection subcontext.

          - When called with a single argument (the select object's name
            as a string) and no keywords, this method indicates if the
            named select object already exists and sets its selection
            subcontext to I{name}.
          - With multiple arguments or any keywords, the method acts
            like a call to C{}, except
            that nothing is returned. Instead, the resulting select
            object is stored in the current selection subcontext.
          - With no arguments or keywords, the method returns the select
            object for the current selection subcontext.

        Call from inside a transaction.

        if kw or (len(args) > 1):
            # It's a compilation.
            context = getattr(self, 'context', None)
            self.selects[context] =*args, **kw).compile()
        elif len(args) == 1:
            # It's a lookup to see if the select has been previously
            # seen and compiled; return True or False.
            self.context = args[0]
            return self.context in self.selects
            # It's a retrieval of a compiled selection object, keyed off
            # the most recently mentioned context.
            context = getattr(self, 'context', None)
            return self.selects.get(context)

    def select(self, *args, **kw):
        Just returns an C{SQLAlchemy} select object. You do everything

        This is an immediate result, not a C{Deferred}.
        return*args, **kw)

    def selex(self, *args, **kw):
        Supply columns as arguments and this method generates a select on
        the columns, yielding a placeholder object with the same
        attributes as the select object itself.

        Supply a callable as an argument (along with any of its args)
        and it yields a placeholder whose attributes are the same as
        the result of that call.

        In either case, you do stuff with the placeholder and call it
        to execute the connection with it. Supply the name of a
        resultsproxy method (and any of its args) to the call to get
        the result instead of the rp. Do all of this within the
        context of the placeholder::

          with <me>.selex(<table> as sh:
              sh.where(<table> == "correct")
          for dRow in sh():
              row = yield dRow

        You can call this outside of a transaction and get a deferred
        result from calling the placeholder object. For such usage,
        you can specify the usual transaction keywords via keywords to
        this method.
        kw['isNested'] = isNested(self)
        sh = SelectAndResultHolder(self, *args, **kw)
        yield sh

    def selectorator(self, selectObj, consumer=None, de=None, N=1):
        When called with a select object that results in an iterable
        C{ResultProxy} when executed, returns a deferred that fires
        with a C{Deferator} that can be iterated over deferreds, each
        of which fires with a successive row of the select's

        If you supply an C{IConsumer} with the I{consumer} keyword, I
        will couple an C{iteration.IterationProducer} to your consumer
        and run it. The returned C{Deferred} will fire (with a
        reference to your consumer) when the iterations are done, but
        you don't need to wait for that before doing another

        If you supply a C{Deferred} via the I{de} keyword, it will be
        fired (unless already fired for some reason) with C{None} when
        the select object has been executed, but before iterations
        have begun.

        If you want multiple rows produced at once using the
        C{fetchmany} method of the C{ResultProxy}, set I{N} to the
        number of rows you want at a time.
        Call directly, *not* from inside a transaction::

          dr = yield <me>.selectorator(<select>)
          for d in dr:
              row = yield d
              <proceed with row>
          d = <me>.selectorator(<select>, <consumer>)
          <do other stuff>
          consumer = yield d
        yield self.waitUntilRunning()
        # A new connection just for this iteration, so that other
        # transactions can (hopefully) proceed while iteration is
        # happening.
        connection = yield self.connect()
        rp = yield, selectObj)
        if isinstance(de, defer.Deferred) and not de.called:
        result = yield self.handleResult(
            rp, consumer=consumer, conn=connection, N=N)

    def execute(self, *args, **kw):
        Does a C{connection.execute(*args, **kw)} as a transaction, in my
        thread with my current connection, with all the usual handling
        of the C{ResultProxy}.
        return self.connection.execute(*args, **kw)

    def sql(self, sqlText, **kw):
        Executes raw SQL as a transaction, in my thread with my current
        connection, with any rows of the result returned as a list.
        kw['asList'] = True
        return self.execute(SA.text(sqlText), **kw)

    def produceRows(self, f, iterator, table, colName, **kw):
        Calls the single-argument function I{f} repeatedly, with each
        value yielded from the supplied I{iterator}, to produce values
        for new rows in the specified I{table}. The named column
        I{colName} will have that value set for each row, in
        order. Other column will be set to values that are constant
        across the rows, as specified via keywords.

        The function I{f} must not block, but it may return either an
        immediate or deferred result. It doesn't matter if some calls
        to I{f} take longer than others; the rows will be written in
        the same order as the B{input} values to I{f} are yielded from

        B{Warning:} Needs unit testing.

        @return: A C{Deferred} that fires with a list of the primary
          key values for each row, in the same order as I{iterator},
          when all the values have been generated and written.
        def insert(i):
            pkList = []
            with self.connection.begin():
                for value in i:
                    kw[colName] = value
                    rp = table.insert().execute(**kw)
            return pkList

        p = threads.OrderedItemProducer()
        yield p.start(insert)
        for x in iterator:
            p.produceItem(f, x)
        result = yield p.stop()
    def deferToQueue(self, func, *args, **kw):
        Dispatches I{callable(*args, **kw)} as a task via the like-named
        method of my asynchronous queue, returning a C{Deferred} to
        its eventual result.

        Scheduling of the call is impacted by the I{niceness} keyword
        that can be included in I{**kw}. As with UNIX niceness, the
        value should be an integer where 0 is normal scheduling,
        negative numbers are higher priority, and positive numbers are
        lower priority.
        @keyword niceness: Scheduling niceness, an integer between -20
          and 20, with lower numbers having higher scheduling priority
          as in UNIX C{nice} and C{renice}.
        return self.waitUntilRunning().addCallback(
            lambda _:, *args, **kw))

__all__ = ['transact', 'AccessBroker', 'SA']