Re: psycopg2: proper positioning of .commit() within try: except: blocks

Поиск
Список
Период
Сортировка
От Karsten Hilbert
Тема Re: psycopg2: proper positioning of .commit() within try: except: blocks
Дата
Msg-id Zty7ZCMwKQl4C4Id@hermes.hilbert.loc
обсуждение исходный текст
Ответ на Re: psycopg2: proper positioning of .commit() within try: except: blocks  (Adrian Klaver <adrian.klaver@aklaver.com>)
Ответы Re: psycopg2: proper positioning of .commit() within try: except: blocks
Список psycopg
Am Sat, Sep 07, 2024 at 01:03:34PM -0700 schrieb Adrian Klaver:

> In the case you show you are doing commit() before the close() so any errors in the
> transactions will show up then. My first thought would be to wrap the commit() in a
> try/except and deal with error there.

Right, and this was suggested elsewhere ;)

And, yeah, the actual code is much more involved :-D

#------------------------------------------------------------------------
def __safely_close_cursor_and_rollback_close_conn(close_cursor=None, rollback_tx=None, close_conn=None):
    if close_cursor:
        try:
            close_cursor()
        except PG_ERROR_EXCEPTION as pg_exc:
            _log.exception('cannot close cursor')
            gmConnectionPool.log_pg_exception_details(pg_exc)
    if rollback_tx:
        try:
            # need to rollback so ABORT state isn't retained in pooled connections
            rollback_tx()
        except PG_ERROR_EXCEPTION as pg_exc:
            _log.exception('cannot rollback transaction')
            gmConnectionPool.log_pg_exception_details(pg_exc)
    if close_conn:
        try:
            close_conn()
        except PG_ERROR_EXCEPTION as pg_exc:
            _log.exception('cannot close connection')
            gmConnectionPool.log_pg_exception_details(pg_exc)

#------------------------------------------------------------------------
def run_rw_queries (
    link_obj:_TLnkObj=None,
    queries:_TQueries=None,
    end_tx:bool=False,
    return_data:bool=None,
    get_col_idx:bool=False,
    verbose:bool=False
) -> tuple[list[dbapi.extras.DictRow], dict[str, int] | None]:
    """Convenience function for running read-write queries.

    Typically (part of) a transaction.

    Args:
        link_obj: None, cursor, connection
        queries:

        * a list of dicts [{'cmd': <string>, 'args': <dict> or <tuple>)
        * to be executed as a single transaction
        * the last query may usefully return rows, such as:

            SELECT currval('some_sequence');
                or
            INSERT/UPDATE ... RETURNING some_value;

        end_tx:

        * controls whether the transaction is finalized (eg.
          COMMITted/ROLLed BACK) or not, this allows the
          call to run_rw_queries() to be part of a framing
          transaction
        * if link_obj is a *connection* then "end_tx" will
          default to False unless it is explicitly set to
          True which is taken to mean "yes, you do have full
          control over the transaction" in which case the
          transaction is properly finalized
        * if link_obj is a *cursor* we CANNOT finalize the
          transaction because we would need the connection for that
        * if link_obj is *None* "end_tx" will, of course, always
          be True, because we always have full control over the
          connection, not ending the transaction would be pointless

        return_data:

        * if true, the returned data will include the rows
            the last query selected
        * if false, it returns None instead

        get_col_idx:

        * True: the returned tuple will include a dictionary
            mapping field names to column positions
        * False: the returned tuple includes None instead of a field mapping dictionary

    Returns:

        * (None, None) if last query did not return rows
        * ("fetchall() result", <index>) if last query returned any rows and "return_data" was True

        * for *index* see "get_col_idx"
    """
    assert queries is not None, '<queries> must not be None'

    if link_obj is None:
        conn = get_connection(readonly = False)
        curs = conn.cursor()
        conn_close = conn.close
        tx_commit = conn.commit
        tx_rollback = conn.rollback
        curs_close = curs.close
        notices_accessor = conn
    else:
        conn_close = lambda *x: None
        tx_commit = lambda *x: None
        tx_rollback = lambda *x: None
        curs_close = lambda *x: None
        if isinstance(link_obj, dbapi._psycopg.cursor):
            curs = link_obj
            notices_accessor = curs.connection
        elif isinstance(link_obj, dbapi._psycopg.connection):
            if end_tx:
                tx_commit = link_obj.commit
                tx_rollback = link_obj.rollback
            curs = link_obj.cursor()
            curs_close = curs.close
            notices_accessor = link_obj
        else:
            raise ValueError('link_obj must be cursor, connection or None but not [%s]' % link_obj)

    for query in queries:
        try:
            args = query['args']
        except KeyError:
            args = None
        try:
            curs.execute(query['cmd'], args)
            if verbose:
                gmConnectionPool.log_cursor_state(curs)
            for notice in notices_accessor.notices:
                _log.debug(notice.replace('\n', '/').replace('\n', '/'))
            del notices_accessor.notices[:]
        # DB related exceptions
        except dbapi.Error as pg_exc:
            _log.error('query failed in RW connection')
            gmConnectionPool.log_pg_exception_details(pg_exc)
            for notice in notices_accessor.notices:
                _log.debug(notice.replace('\n', '/').replace('\n', '/'))
            del notices_accessor.notices[:]
            __safely_close_cursor_and_rollback_close_conn (
                curs_close,
                tx_rollback,
                conn_close
            )
            # privilege problem ?
            if pg_exc.pgcode == PG_error_codes.INSUFFICIENT_PRIVILEGE:
                details = 'Query: [%s]' % curs.query.decode(errors = 'replace').strip().strip('\n').strip().strip('\n')
                if curs.statusmessage != '':
                    details = 'Status: %s\n%s' % (
                        curs.statusmessage.strip().strip('\n').strip().strip('\n'),
                        details
                    )
                if pg_exc.pgerror is None:
                    msg = '[%s]' % pg_exc.pgcode
                else:
                    msg = '[%s]: %s' % (pg_exc.pgcode, pg_exc.pgerror)
                raise gmExceptions.AccessDenied (
                    msg,
                    source = 'PostgreSQL',
                    code = pg_exc.pgcode,
                    details = details
                )

            # other DB problem
            gmLog2.log_stack_trace()
            raise

        # other exception
        except Exception:
            _log.exception('error running query in RW connection')
            gmConnectionPool.log_cursor_state(curs)
            for notice in notices_accessor.notices:
                _log.debug(notice.replace('\n', '/').replace('\n', '/'))
            del notices_accessor.notices[:]
            gmLog2.log_stack_trace()
            __safely_close_cursor_and_rollback_close_conn (
                curs_close,
                tx_rollback,
                conn_close
            )
            raise

    data = None
    col_idx = None
    if return_data:
        try:
            data = curs.fetchall()
        except Exception:
            _log.exception('error fetching data from RW query')
            gmLog2.log_stack_trace()
            __safely_close_cursor_and_rollback_close_conn (
                curs_close,
                tx_rollback,
                conn_close
            )
            raise

        if get_col_idx:
            col_idx = get_col_indices(curs)
    curs_close()
    tx_commit()
    conn_close()
    return (data, col_idx)

#------------------------------------------------------------------------


Best,
Karsten
--
GPG  40BE 5B0E C98E 1713 AFA6  5BC0 3BEA AC80 7D4F C89B



В списке psycopg по дате отправления: