Source code for ae.db_core

"""
database connection and data manipulation base classes
======================================================

This module is providing generic base classes and helper methods to implement and integrate
any database driver that is compatible to the :pep:`Python DB API <249>`. The base classes
are providing already all attributes and methods to handle database connections,
data selections and manipulations.

Additionally this package is defining generalized interfaces to switch dynamically
between databases without the need to adapt the code of your application - simply by selecting
another database connector class within your :ref:`configuration files <config-files>`.


basic usage
-----------

The abstract base class :class:`DbBase` allows you to implement a db-specific class with few
lines of code. Simply inherit from :class:`DbBase` and implement a :meth:`~DbBase.connect`
method that is setting the :attr:`~DbBase.conn` instance attribute to a
:pep:`Python DB API <249>` compatible connection object.

For a fictive specific database driver package `SpecificDbDriver` that is providing a `connect`
method, a minimal example would look like::

    from ae.db_core import DbBase
    ...
    class SpecificDb(DbBase):
        def connect(self) -> str:
            self.last_err_msg = ''
            try:
               self.conn = SpecificDbDriver.connect(**self.connect_params())
            except Exception as ex:
                self.last_err_msg = f"SpecificDb-connect() error: {ex} for {self}"
            else:
                self.create_cursor()
           return self.last_err_msg

The helper method :meth:`~DbBase.connect_params` provided by this module is helping you to merge
any specified database system credentials and features into a dict to be passed as connection parameters
to the database-specific connection class/function/factory of the database driver.

Another helper :meth:`~DbBase.create_cursor` provided by :class:`DbBase` can be used in the
:meth:`~DbBase.connect` method to create a Python DB API-compatible cursor object and
assign it to the :attr:`~DbBase.curs` attribute.

In this implementation your class SpecificDb does automatically also inherit useful methods from :class:`DbBase`
to execute :meth:`INSERT <DbBase.insert>`, :meth:`DELETE <DbBase.delete>`,
:meth:`UPDATE <DbBase.update>` and :meth:`UPSERT <DbBase.upsert>` commands,
to run :meth:`SELECT <DbBase.select>` queries and
to call :meth:`stored procedures <DbBase.call_proc>`.

.. hint::
    Examples of an implementation of a specific database class are e.g. the portion packages
    :mod:`ae.db_pg` for Postgres and :mod:`ae.db_ora` for Oracle.


connection parameters
---------------------

:class:`DbBase` supports the following connection parameter keys:

    * **user** : user name or database account name.
    * **password** : user account password.
    * **dbname** : name of the database to connect.
    * **host** : host name or IP address of the database server.
    * **port**: port number of the database server.

Some database drivers are using default values for most of these connection parameters,
if they are not specified. The mandatory connection parameters can differ for different database
drivers. Most of them need at least a user name and password.

.. hint::
    Most database drivers like e.g. `psycopg2 <http://initd.org/psycopg/docs/module.html>`_ or
    `pg8000 <https://kite.com/python/docs/pg8000.connect>`_ for Postgres databases
    are supporting all of these connection parameter keys. For others like e.g.
    `cx_Oracle <https://cx-oracle.readthedocs.io/en/7.2/module.html>`_ which are not supporting
    some of them (like e.g. `host` and `dbname`) you may also need to overwrite the
    :meth:`~DbBase.connect_params` method to convert/adopt unsupported connection parameter keys.

Alternatively you can provide the connection parameters needed by the database driver connector
with a single database url string in the
`SQLAlchemy Database URL format <https://docs.sqlalchemy.org/en/13/core/engines.html#database-urls>`_::

    dialect+driver://user:password@host:port/dbname

So if your connection parameters dict - respective your database system credentials and features -
providing a `url` key then :meth:`~DbBase.connect_params` will parse and split the url into
separate connection parameters.

.. note::
    When you specify a connection parameter as separate key and also within the `url` value
    then the separate key value will have preference. The scheme part (dialect+driver) including
    the following colon character of the URL string are not evaluated and can be omitted.


bind variables format
---------------------

Bind variables in your sql query strings have to be formatted in the
:pep:`named parameter style <249#paramstyle>`. If your database driver is only supporting the
`pyformat` parameter style, then overload the __init__ method and set the :attr:`~DbBase.param_style`
to ``'pyformat'`` and all sql query strings will be automatically converted by :class:`DbBase`
before they get sent to the database::

    class SpecificDb(DbBase):
        def __init__(self, ...)
            super().__init__(...)
            self.param_style = 'pyformat'
        ...

"""
from abc import ABC, abstractmethod
from copy import deepcopy
from typing import Any, Dict, List, Optional, Sequence, Tuple
import urllib.parse

from ae.dynamicod import try_eval                               # type: ignore  # for mypy
from ae.lockname import NamedLocks                              # type: ignore  # for mypy
from ae.sys_core import SystemBase, SystemConnectorBase         # type: ignore  # for mypy


__version__ = '0.3.15'


NAMED_BIND_VAR_PREFIX: str = ':'        #: character to mark a bind variable in a sql query
CHK_BIND_VAR_PREFIX: str = 'CV_'
""" bind variable name prefix, to allow for same column/-name a new/separate value in e.g. the SET clause and
an old value in the WHERE clause. Gets added to bind variables in filters/chk_values and extra where clauses. """


[docs]def connect_args_from_params(conn_params: Dict[str, Any]) -> Tuple[Dict[str, str], List[str]]: """ split dict with database connection parameters into credentials and features. :param conn_params: connection params dict. :return: tuple of credentials dict and features list. """ credentials = {} features = [] for arg_name, arg_val in conn_params.items(): if isinstance(arg_val, str): credentials[arg_name] = arg_val elif not isinstance(arg_val, bool): features.append(f"{arg_name}={arg_val!r}") elif arg_val: features.append(arg_name) return credentials, features
[docs]def _normalize_col_values(col_values: Dict[str, Any]) -> Dict[str, Any]: """ convert empty strings into real None values. :param col_values: dict of column values (dict key is the column name). :return: col_values dict where empty values got replaced with None (also in original dict). """ for key, val in col_values.items(): if isinstance(val, str) and not val: col_values[key] = None return col_values
[docs]def _prepare_in_clause(sql: str, bind_vars: Optional[Dict[str, Any]] = None, additional_col_values: Optional[Dict[str, Any]] = None) -> Tuple[str, Dict[str, Any]]: """ replace list bind variables used in an IN clause with separate bind variables for each list item. :param sql: query to be executed. :param bind_vars: dict of all available bind variables for the execution. :param additional_col_values: additional bind variables. :return: tuple of adapted query string and joined bind variables. """ new_bind_vars = deepcopy(additional_col_values or {}) if bind_vars: for key, val in bind_vars.items(): if isinstance(val, list): # expand IN clause bind list variable to separate bind variables var_list = [key + '_' + str(_) for _ in range(len(val))] in_vars = ','.join([NAMED_BIND_VAR_PREFIX + c for c in var_list]) sql = sql.replace(NAMED_BIND_VAR_PREFIX + key, in_vars) for var_val in zip(var_list, val): new_bind_vars[var_val[0]] = var_val[1] else: new_bind_vars[key] = val return sql, new_bind_vars
[docs]def _rebind(chk_values: Optional[Dict[str, Any]] = None, where_group_order: str = "", bind_vars: Optional[Dict[str, Any]] = None, extra_bind: Optional[Dict[str, Any]] = None ) -> Tuple[Optional[Dict[str, Any]], str, Dict[str, Any]]: """ merge where_group_order string with chk_values filter dict and merge/rename bind variables. :param chk_values: dict with column_name: value items, used to filter/restrict the resulting rows. This method compiles this dict into a sql WHERE clause expression. If you also passed additional sql clauses into the :paramref:`~_rebind.where_group_order` then it will be merged with the compiled expression. The names of the sql parameters and related bind variables are build from the column names/keys of this dict, and will be prefixed with :data:`CHK_BIND_VAR_PREFIX`. Passing None or a empty dict to this and to the :paramref:`~_rebind.extra_bind` arguments will disable any filtering. If only this argument is None/empty then the first item of the :paramref:`~_rebind.extra_bind` dict will be used as filter. :param where_group_order: sql part with optional WHERE/GROUP/ORDER clauses (the part after the WHERE), including bind variables in the :ref:`named parameter style <sql-parameter-style>`. :param bind_vars: dict with bind variables (variable name has to be prefixed in :paramref:`~_rebind.where_group_order` with :data:`CHK_BIND_VAR_PREFIX`). :param extra_bind: additional dict with bind variables (variable name has NOT to be prefixed/adapted in :paramref:`~_rebind.where_group_order` argument). :return: tuple of corrected/rebound values of chk_values, where_group_order and bind_vars. """ rebound_vars: Dict[str, Any] = {} # use new instance to not change callers bind_vars dict if extra_bind is not None: rebound_vars.update(extra_bind) if not chk_values: def_pkey: list = [next(iter(extra_bind.items()))] # use first dict item as pkey check value chk_values = dict(def_pkey) # mypy+PyCharm: merging this with previous code line shows type error if chk_values: rebound_vars.update({CHK_BIND_VAR_PREFIX + k: v for k, v in chk_values.items()}) extra_where = " AND ".join([f"{k} = {NAMED_BIND_VAR_PREFIX}{CHK_BIND_VAR_PREFIX}{k}" for k in chk_values.keys()]) if not where_group_order: where_group_order = extra_where elif where_group_order.upper().startswith(('GROUP BY', 'ORDER BY')): where_group_order = f"{extra_where} {where_group_order}" else: where_group_order = f"({extra_where}) AND {where_group_order}" if not where_group_order: where_group_order = '1=1' if bind_vars: rebound_vars.update({CHK_BIND_VAR_PREFIX + k: v for k, v in bind_vars.items()}) return chk_values, where_group_order, rebound_vars
[docs]class DbBase(SystemConnectorBase, ABC): """ abstract database connector base class for the `ae` namespace database system layers. This class inherits from :class:`~ae.sys_core.SystemConnectorBase` the following attributes: **system**: instance of the related :class:`ae-sys_core.SystemBase` class. **console_app**: instance of the :class:`application <ae.console_app.ConsoleApp>` using this database system. **last_err_msg**: the error message string of the last error or an empty string if no error occurred in the last database action/operation. """
[docs] def __init__(self, system: SystemBase): """ create instance of generic database object (base class for real database like e.g. postgres or oracle). :param system: :class:`~ae.sys_core.SystemBase` instance. Providing useful attributes, like e.g.: **credentials**: dict with database driver specific account credentials. **features**: list of features. Features will also be passed as connection parameters to the database driver. The main differences to credentials are that a feature without any value will be interpreted as boolean True and that features can have any value that can be specified as a literal (like int or even datetime). **console_app**: instance of the :class:`application <ae.console_app.ConsoleApp>` using this database system. """ super().__init__(system) # init self.system and self.console_app self.conn = None #: database driver connection self.curs = None #: database driver cursor self.param_style: str = 'named' #: database driver bind variable/parameter style
[docs] @abstractmethod def connect(self) -> str: """ sub-class has to implement this connect method """
[docs] def _adapt_sql(self, sql: str, bind_vars: Dict[str, Any]) -> str: """ replace the parameter style of bind variables from `pyformat` into `named`. :param sql: query to scan for named bind variables. :param bind_vars: dict of all available bind variables. :return: adapted query string. .. _sql-parameter-style: .. note:: For database drivers - like psycopg2 - that are support only the `pyformat` parameter style syntax (in the format ``%(bind_var)s``) the sql query string will be adapted, by converting all bind variables from the parameter style `named` into `pyformat`. The returned query will be unchanged for all other database drivers (that are directly supporting the `named` parameter style). """ new_sql = sql if self.param_style == 'pyformat': for key in bind_vars.keys(): new_sql = new_sql.replace(NAMED_BIND_VAR_PREFIX + key, '%(' + key + ')s') return new_sql
[docs] def call_proc(self, proc_name: str, proc_args: Sequence, ret_dict: Optional[Dict[str, Any]] = None) -> str: """ execute stored procedure on database server. :param proc_name: name of the stored procedure. :param proc_args: tuple of parameters/arguments passed to the stored procedure. :param ret_dict: optional dict - if passed then the dict item with the key `return` will be set/updated to the value returned from the stored procedure/database. :return: empty string if no error occurred else the error message. """ self.last_err_msg = "" try: assert self.curs is not None, f"call_proc(): cursor is not initialized for {self}" # mypy ret = self.curs.callproc(proc_name, proc_args) if ret_dict is not None: ret_dict['return'] = ret except Exception as ex: self.last_err_msg = f"call_proc() error: {ex} for {self}" return self.last_err_msg
[docs] def close(self, commit: bool = True) -> str: """ close the connection to the database driver and server. :param commit: pass False to prevent commit (and also execute rollback) before closure of connection. :return: empty string if no error occurred else the error message. """ self.last_err_msg = "" if self.conn: if commit: self.last_err_msg = self.commit() else: self.last_err_msg = self.rollback() try: if self.curs: self.curs.close() self.curs = None self.console_app.dpo(f"close(): cursor closed for {self}") self.conn.close() self.conn = None self.console_app.dpo(f"close(): connection closed for {self}") except Exception as ex: self.last_err_msg += f"close() error: {ex} for {self}" return self.last_err_msg
[docs] def connect_params(self) -> Dict[str, Any]: """ merges self.system.credentials with self.system.features into a database driver connection parameters dict. :return: new dict with the items from self.system.credentials and then extended with the entries of self.system.features. Credential values are always of type str, only features can be of any type. Features without a specified value are set True. All keys of the returned dict will be in lower-case. """ conn_dict = {k.lower(): v for k, v in self.system.credentials.items()} for feat in [_ for _ in self.system.features if _]: key_val = feat.split('=', maxsplit=1) conn_dict[key_val[0].lower()] = try_eval(key_val[1]) if len(key_val) > 1 else True if 'url' in conn_dict: url_parts = urllib.parse.urlparse(conn_dict.pop('url')) if 'user' not in conn_dict and url_parts.username: conn_dict['user'] = url_parts.username if 'password' not in conn_dict and url_parts.password: conn_dict['password'] = url_parts.password if 'host' not in conn_dict and url_parts.hostname: conn_dict['host'] = url_parts.hostname if 'port' not in conn_dict and url_parts.port: conn_dict['port'] = url_parts.port if 'dbname' not in conn_dict and url_parts.path: conn_dict['dbname'] = url_parts.path[1:] # remove leading slash character (path root) return conn_dict
[docs] def create_cursor(self): """ allow sub-class to create Python DB API-conform database driver cursor """ try: self.curs = self.conn.cursor() self.console_app.dpo(f"create_cursor(): database cursor created for {self}") except Exception as ex: self.last_err_msg = f"create_cursor() error: {ex} for {self}"
[docs] def cursor_description(self) -> Optional[str]: """ return description text if opened cursor or None if cursor is closed or not yet opened. """ if self.curs is not None: return self.curs.description return None
[docs] def fetch_all(self) -> Sequence[Tuple]: """ fetch all the rows found from the last executed SELECT query. :return: empty list on error or if query result is empty, else a list of database rows. """ self.last_err_msg = "" rows: Sequence = [] try: assert self.curs is not None, f"fetch_all(): cursor is not initialized for {self}" # mypy rows = self.curs.fetchall() self.console_app.dpo(f"fetch_all(), 1st of {len(rows)} recs: {rows[:1]} for {self}") except Exception as ex: self.last_err_msg = f"fetch_all() exception: {ex} for {self}" self.console_app.po(self.last_err_msg) return rows
[docs] def fetch_value(self, col_idx: int = 0) -> Any: """ fetch the value of a column of the first/next row of the found rows of the last SELECT query. :param col_idx: index of the column with the value to fetch and return. :return: value of the column at index :paramref:`~.fetch_value.col_idx`. """ self.last_err_msg = "" val = None try: assert self.curs is not None, f"fetch_value(): cursor is not initialized for {self}" # mypy values = self.curs.fetchone() if values: val = values[col_idx] self.console_app.dpo(f"fetch_value() retrieved values: {values}[{col_idx}]={val!r} for {self}") except Exception as ex: assert self.curs is not None, f"fetch_value()-EXCEPT: cursor is not initialized for {self}" self.last_err_msg = \ f"fetch_value()[{col_idx}] exception: {ex}; status message={self.curs.statusmessage} for {self}" self.console_app.po(self.last_err_msg) return val
[docs] def execute_sql(self, sql: str, commit: bool = False, bind_vars: Optional[Dict[str, Any]] = None) -> str: """ execute sql query with optional bind variables. :param sql: sql query to execute. :param commit: pass True to execute a COMMIT command directly after the query execution. :param bind_vars: optional dict with bind variables. :return: empty string if no error occurred else the error message. """ action = sql.split()[0] if action.startswith(('--', '/*')): action = 'SCRIPT' elif action.upper().startswith('CREATE'): action += ' ' + sql.split()[1] if self.conn or not self.connect(): # lazy connection self.last_err_msg = "" sql, bind_vars = _prepare_in_clause(sql, bind_vars) sql = self._adapt_sql(sql, bind_vars) try: assert self.curs is not None, f"execute_sql(): cursor is not initialized for {self}" # mypy if bind_vars: self.curs.execute(sql, bind_vars) else: # if no bind vars then call without to prevent error "'dict' object does not support indexing" # .. in scripts with the % char (like e.g. dba_create_audit.sql) self.curs.execute(sql) assert self.conn is not None, f"execute_sql(): connection is not initialized for {self}" # mypy if commit: self.conn.commit() self.console_app.dpo(f"execute_sql({sql}, {bind_vars}) {action} rows={self.curs.rowcount}" f" desc:{self.curs.description} for {self}") except Exception as ex: self.last_err_msg += f"execute_sql() {action} error={ex}; {sql}, {bind_vars} for {self}" if self.last_err_msg and self.console_app.debug: self.console_app.po(self.last_err_msg) return self.last_err_msg
[docs] def delete(self, table_name: str, chk_values: Optional[Dict[str, Any]] = None, where_group_order: str = '', bind_vars: Optional[Dict[str, Any]] = None, commit: bool = False) -> str: """ execute a DELETE command against a table. :param table_name: name of the database table. :param chk_values: dict of column names/values to identify the record(s) to delete. :param where_group_order: extra sql added after the WHERE clause (merged with chk_values by :meth:`._rebind`). This string can include additional WHERE expressions with extra bind variables. :param bind_vars: dict of extra bind variables (key=variable name, value=value). :param commit: bool value to specify if commit should be done. Pass True to commit. :return: last error message or empty string if no errors occurred. """ chk_values, where_group_order, bind_vars = _rebind(chk_values, where_group_order, bind_vars) sql = f"DELETE FROM {table_name} WHERE {where_group_order}" assert chk_values is not None, f"delete(): chk_values cannot be None for {self}" # mypy with self.thread_lock_init(table_name, chk_values): self.execute_sql(sql, commit=commit, bind_vars=bind_vars) return self.last_err_msg
[docs] def insert(self, table_name: str, col_values: Dict[str, Any], returning_column: str = '', commit: bool = False) -> str: """ execute an INSERT command to add one record to a database table. :param table_name: name of the database table. :param col_values: dict of inserted column values with the column name as key. :param returning_column: name of column which value will be returned by next fetch_all/fetch_value() call. :param commit: bool value to specify if commit should be done. Pass True to commit. :return: last error message or empty string if no errors occurred. """ _normalize_col_values(col_values) sql = f"INSERT INTO {table_name} (" + ", ".join(col_values.keys()) \ + ") VALUES (" + ", ".join([NAMED_BIND_VAR_PREFIX + c for c in col_values.keys()]) + ")" if returning_column: sql += " RETURNING " + returning_column return self.execute_sql(sql, commit=commit, bind_vars=col_values)
[docs] def select(self, from_join: str = "", cols: Sequence[str] = (), chk_values: Optional[Dict[str, Any]] = None, where_group_order: str = '', bind_vars: Optional[Dict[str, Any]] = None, hints: str = '') -> str: """ execute a SELECT query against a database table. :param from_join: name(s) of the involved database table(s), optional with JOIN clause(s). Passing an empty string results in a SELECT statement without the FROM keyword. :param cols: sequence of the column names that will be selected and included in the resulting data-rows. :param chk_values: dict of column names/values to identify selected record(s). :param where_group_order: extra sql added after the WHERE clause (merged with chk_values by :meth:`._rebind`). This string can include additional WHERE expressions with extra bind variables, ORDER BY and GROUP BY expressions. These special/extra bind variables have to be specified in the :paramref:`~.upsert.bind_vars` argument and have to be prefixed with the string `'CV_'` in the WHERE clause (see also the :data:`CHK_BIND_VAR_PREFIX` data constant in this module). :param bind_vars: dict of extra bind variables (key=variable name, value=value). :param hints: optional SELECT optimization hint string. :return: last error message or empty string if no errors occurred. Use the methods :meth:`.fetch_all` or :meth:`.fetch_value` to retrieve the resulting data-rows. """ if not cols: cols = list('*') chk_values, where_group_order, bind_vars = _rebind(chk_values, where_group_order, bind_vars) if from_join: from_join = " FROM " + from_join sql = f"SELECT {hints} {','.join(cols)}{from_join} WHERE {where_group_order}" return self.execute_sql(sql, bind_vars=bind_vars)
[docs] def update(self, table_name: str, col_values: Dict[str, Any], chk_values: Optional[Dict[str, Any]] = None, where_group_order: str = '', bind_vars: Optional[Dict[str, Any]] = None, commit: bool = False, locked_cols: Sequence[str] = ()) -> str: """ execute an UPDATE command against a database table. :param table_name: name of the database table. :param col_values: dict of inserted/updated column values with the column name as key. :param chk_values: dict of column names/values to identify affected record(s). If not passed then the first name/value of :paramref:`~update.col_values` is used as primary key check/filter value. :param where_group_order: extra sql added after the WHERE clause (merged with chk_values by :meth:`._rebind`). This string can include additional WHERE expressions with extra bind variables, ORDER BY and GROUP BY expressions. These special/extra bind variables have to be specified in the :paramref:`~.upsert.bind_vars` argument and have to be prefixed with the string `'CV_'` (see also the :data:`CHK_BIND_VAR_PREFIX` data constant in this module). :param bind_vars: dict of extra bind variables (key=variable name, value=value). :param commit: bool value to specify if commit should be done. Pass True to commit. :param locked_cols: list of column names not be overwritten on update of column value is not empty. :return: last error message or empty string if no errors occurred. """ _normalize_col_values(col_values) chk_values, where_group_order, bind_vars = _rebind(chk_values, where_group_order, bind_vars, extra_bind=col_values) sql = "UPDATE " + table_name \ + " SET " + ", ".join([ f"{col} = " + (f"COALESCE({col}, {NAMED_BIND_VAR_PREFIX}{col})" if col in locked_cols else f"{NAMED_BIND_VAR_PREFIX}{col}") for col in col_values.keys()]) if where_group_order: sql += " WHERE " + where_group_order assert chk_values is not None, f"update(): chk_values cannot be None (passed col_values) for {self}" # mypy with self.thread_lock_init(table_name, chk_values): self.execute_sql(sql, commit=commit, bind_vars=bind_vars) return self.last_err_msg
[docs] def upsert(self, table_name: str, col_values: Dict[str, Any], chk_values: Dict[str, Any], where_group_order: str = '', bind_vars: Optional[Dict[str, Any]] = None, returning_column: str = '', commit: bool = False, locked_cols: Sequence[str] = (), multiple_row_update: bool = True) -> str: """ execute an INSERT or UPDATE command against a record of a database table (UPDATE if record already exists). :param table_name: name of the database table. :param col_values: dict of inserted/updated column values with the column name as key. :param chk_values: dict of column names/values to identify affected record(s), also used to check if record already exists (and data has to updated instead of inserted). If not passed then the first name/value of col_values is used as primary key check/filter value. :param where_group_order: extra sql added after the WHERE clause (merged with chk_values by :meth:`._rebind`). This string can include additional WHERE expressions with extra bind variables, ORDER BY and GROUP BY expressions. These special/extra bind variables have to be specified in the :paramref:`~.upsert.bind_vars` argument and have to be prefixed with the string `'CV_'` in the query string (see also the :data:`CHK_BIND_VAR_PREFIX` data constant in this module). :param bind_vars: dict of extra bind variables (key=variable name, value=value). :param returning_column: name of column which value will be returned by next fetch_all/fetch_value() call. :param commit: bool value to specify if commit should be done. Pass True to commit record changes. :param locked_cols: list of column names not be overwritten on update of column value is not empty. :param multiple_row_update: allow update of multiple records with the same chk_values. :return: last error message or empty string if no errors occurred. """ _normalize_col_values(col_values) with self.thread_lock_init(table_name, chk_values): if not self.select(table_name, ["count(*)"], chk_values=chk_values, where_group_order=where_group_order, bind_vars=bind_vars): count = self.fetch_value() if not self.last_err_msg: if count == 1 or (multiple_row_update and count > 1): if not self.update(table_name, col_values, chk_values=chk_values, where_group_order=where_group_order, bind_vars=bind_vars, commit=commit, locked_cols=locked_cols) \ and returning_column: self.select(table_name, [returning_column], chk_values=chk_values, where_group_order=where_group_order, bind_vars=bind_vars) elif count == 0: col_values.update(chk_values) self.insert(table_name, col_values, returning_column=returning_column, commit=commit) else: # count not in (0, 1) and multi_row_update == self.last_err_msg = f"upsert(): SELECT COUNT(*) returned {count}; args={table_name}, " \ f"{col_values}, {chk_values}, {where_group_order}, {bind_vars} for {self}" return self.last_err_msg
[docs] def commit(self, reset_last_err_msg: bool = False) -> str: """ commit the current transaction if the database driver supports/implements a commit method. :param reset_last_err_msg: pass True to reset the last error message of this instance before the commit. :return: last error message or empty string if no error happened. """ if reset_last_err_msg: self.last_err_msg = "" if self.conn and getattr(self.conn, 'commit', None): try: assert self.conn is not None, f"rollback(): connection is not initialized for {self}" # mypy self.conn.commit() self.console_app.dpo(f"commit() on {self}") except Exception as ex: self.last_err_msg = f"commit() error: {ex} for {self}" return self.last_err_msg
[docs] def rollback(self, reset_last_err_msg: bool = False) -> str: """ roll the current transaction back if the DB driver supports transactions and does have a rollback method. :param reset_last_err_msg: pass True to reset the last error message of this instance before the rollback. :return: last error message or empty string if no error happened. """ if reset_last_err_msg: self.last_err_msg = "" if self.conn and getattr(self.conn, 'rollback', None): try: assert self.conn is not None, f"rollback(): connection is not initialized for {self}" # mypy self.conn.rollback() self.console_app.dpo(f"rollback() on {self}") except Exception as ex: self.last_err_msg = f"rollback() error: {ex} for {self}" return self.last_err_msg
[docs] def get_row_count(self) -> int: """ determine rowcount of last executed query. :return: the number of affected rows of the last query. """ assert self.curs is not None, f"get_row_count(): cursor is not initialized for {self}" # mypy return self.curs.rowcount
[docs] def selected_column_names(self) -> List[str]: """ determine the column names fo the last executed SELECT query. :return: list of column names - order by column index. """ curs_desc = self.cursor_description() col_names = [] if curs_desc: for col_desc in curs_desc: col_names.append(col_desc[0]) return col_names
[docs] @staticmethod def thread_lock_init(table_name: str, chk_values: Dict[str, Any]) -> NamedLocks: """ created named locking instance for passed table and filter/check expression. :return: :class:`~.lockname.NamedLocks` instance for this table and filter. """ return NamedLocks(table_name + str(sorted(chk_values.items())))