"""
postgres database layer
=======================
this module provides the class :class:`PostgresDb` to connect and interact with a Postgres database server.
the communication with the database driver is done with great help of the
`psycopg2 pypi package <https://pypi.org/project/psycopg2>`_.
basic usage of ae.db_pg
-----------------------
to create an instance of the class :class:`PostgresDb` you first have to create a :class:`~ae.sys_core.SystemBase`
instance. this can be done either programmatically by providing an application instance (of the class
:class:`~ae.ae_console.ConsoleApp` or an inherited subclass of it) plus any database parameters, like required
credentials and any database configuration features/options::
app = ConsoleApp()
system = SystemBase('system-id', app, dict(User='user name', Password='password`, Dbname=...), ...)
alternatively provide all system-specific info within the :ref:`ae config files<config-files>` and let
:class:`~ae.sys_core.UsedSystems` load it::
system = used_systems['system-id']
finally pass the database parameters in `system` to create an instance of :class:`PostgresDb`::
pg_db = PostgresDb(system)
then call the :meth:`~PostgresDb.connect` method of this instance to connect to the Postgres database server::
error_message = pg_db.connect()
if error_message:
print(error_message)
if the connection could not be established then :meth:`~PostgresDb.connect` is returning an error message string. if
the return value is an empty string then you can use all the methods provided by :class:`~ae.db_core.DbBase`, like e.g.
:meth:`~ae.db_core.DbBase.update`::
error_message = pg_db.update('my_table', {'my_col': 'new value'})
if error_message:
print(error_message)
error_message = pg_db.rollback()
an explicit call of :meth:`~ae.db_core.DbBase.rollback` is only needed if you use transactions (autocommit is False).
in this case you should also use :meth:`~ae.db_core.DbBase.commit` at the end of each transaction to store any data
updates::
error_message = pg_db.commit()
alternatively you can use the `commit` argument that is provided by the :class:`~ae.db_core.DbBase` DML methods: by
passing a `True` value to this argument, the method will automatically execute a :meth:`~ae.db_core.DbBase.commit` call
for you if no error occurred in the DML method::
error_message = pg_db.update('table`, {'column': 369}, commit=True)
finally after all database actions are done you can close the connection to the databases server with the
:meth:`~ae.db_core.DbBase.close` method::
error_message = pg_db.close()
"""
from typing import Any, Dict, Optional
import psycopg2 # type: ignore # for mypy
# from psycopg2.extensions import ISOLATION_LEVEL_AUTOCOMMIT
from ae.sys_core import SystemBase # type: ignore # for mypy
from ae.db_core import DbBase # type: ignore # for mypy
__version__ = '0.3.7'
[docs]class PostgresDb(DbBase):
""" an instance of this class represents a Postgres database. """
[docs] def __init__(self, system: SystemBase):
""" create instance of Postgres database object
:param system: instance of a :class:`~ae.sys_core.SystemBase` class.
:class:`~ae.sys_core.SystemBase` (defined in the module :mod:`ae.sys_core`) is providing
the credentials and features, which get retrieved from :ref:`config-files`, then converted
by :meth:`~ae.db_core.DbBase.connect_params` into :ref:`connection parameters` to connect
to the Postgres database.
for connections via SSL to the Postgres server you have to add either the connection parameters
**sslmode**, **sslcert** and **sslkey** or **sslrootcert** and **sslcrl** (depending
on the configuration of your server).
**features** : optional list of features.
"""
super().__init__(system)
# for "named" PEP-0249 sql will be adapted to fit postgres driver "(pyformat)" sql bind-var/parameter syntax
self.param_style = 'pyformat'
[docs] def connect(self) -> str:
""" connect this instance to the Postgres database server, using the credentials provided at instantiation.
:return: error message in case of error or empty string if not.
"""
self.last_err_msg = ''
connection_params = self.connect_params()
if 'application_name' not in connection_params and self.console_app.app_name:
connection_params['application_name'] = self.console_app.app_name
try:
self.conn = psycopg2.connect(**connection_params)
self.console_app.dpo(f"PostgresDb.connect(): connected"
f" via api/server {psycopg2.apilevel}/{self.conn.server_version}"
f" with encoding {self.conn.encoding} for {self}")
except Exception as ex:
self.last_err_msg = f"PostgresDb-connect() error: {ex} for {self}"
else:
self.create_cursor()
return self.last_err_msg
[docs] def execute_sql(self, sql: str, commit: bool = False, bind_vars: Optional[Dict[str, Any]] = None,
auto_commit: bool = False) -> str:
""" execute sql query or sql command.
:param sql: sql query or command to execute.
:param commit: pass True to commit (after INSERT or UPDATE queries).
:param bind_vars: dict of bind variables (key=variable name, value=value).
:param auto_commit: pass True activate auto-commit-mode for this postgres session.
:return: last error message or empty string if no errors occurred.
.. hint::
overwriting generic execute_sql for Postgres because if auto_commit is False then a db error
is invalidating the connection until it gets rolled back (optionally to a save-point).
unfortunately psycopg2 does not provide/implement save-points. could be done alternatively with
execute("SAVEPOINT NonAutoCommErrRollback") but RELEASE/ROLLBACK makes it complicated (see also
https://stackoverflow.com/questions/2370328/continuing-a-transaction-after-primary-key-violation-error)::
save_point = None if auto_commit else self.conn.setSavepoint('NonAutoCommErrRollback')
super().execute_sql(sql, commit=commit, auto_commit=auto_commit, bind_vars=bind_vars)
if save_point:
if self.last_err_msg:
self.conn.rollback(save_point)
else:
self.conn.releaseSavepoint(save_point)
return self.last_err_msg
therefore KISS - a simple rollback will do it also.
"""
if self.conn or not self.connect():
if auto_commit:
self.conn.autocommit = True # or use: self.conn.set_isolation_level(ISOLATION_LEVEL_AUTOCOMMIT)
super().execute_sql(sql, commit=commit, bind_vars=bind_vars)
if self.last_err_msg and not auto_commit:
self.console_app.dpo("PostgresDb.execute_sql(): automatic rollback after error (connection recycling)")
self.conn.rollback()
return self.last_err_msg