Source code for ae.sys_core_sh

"""
SiHOT PMS system core xml interface
===================================

This portion is very old and needs refactoring and much more unit tests.

The classes provided by this portion are allowing the implementation of client
and server components to communicate with the Sihot PMS system.

TODO:
    - use other xml library because xml.etree the xml modules in the Python standard library are not secure
      against maliciously constructed data - the problem here is that the xml generated by the Sihot system
      is not 100% conform to the xml standards.
    - refactor SihotXmlParser and inherited classes: migrating the attributes oc, tn, id, rc, hn, ... to a dict.
    - inject cae app instance into _SihotTcpClient, RequestXmlHandler and TcpServer (replacing ae.core.po()).
"""
import datetime
import pprint
import re
import socket
import threading
import time
import socketserver
from traceback import format_exc
from typing import Any, Dict, List, Optional, Sequence, Tuple, Union

from abc import ABCMeta, abstractmethod

# import xml.etree.ElementTree as Et
# noinspection StandardLibraryXml
from xml.etree.ElementTree import XMLParser, ParseError

from ae.base import DATE_ISO, DEF_ENCODE_ERRORS, round_traditional                                  # type: ignore
from ae.core import DEBUG_LEVEL_DISABLED, DEBUG_LEVEL_VERBOSE, po                                   # type: ignore
from ae.console import ConsoleApp                                                                   # type: ignore
from ae.sys_core import SystemConnectorBase                                                         # type: ignore


__version__ = '0.3.6'


SDI_SH = 'Sh'                               #: Sihot System Interface Id

SH_DEF_SEARCH_FIELD = 'ShId'    #: default search field for external systems (used by sys_data_sh.cl_field_data())

SDF_SH_SERVER_ADDRESS = 'shServerIP'        #: Sihot Server address or ip
SDF_SH_KERNEL_PORT = 'shServerKernelPort'   #: Sihot Kernel Interface port
SDF_SH_WEB_PORT = 'shServerPort'            #: Sihot Web interfaces port
SDF_SH_CLIENT_PORT = 'shClientPort'         #: Sihot Server client port
SDF_SH_TIMEOUT = 'shTimeout'
SDF_SH_XML_ENCODING = 'shXmlEncoding'
SDF_SH_USE_KERNEL_FOR_CLIENT = 'shUseKernelForClient'
SDF_SH_USE_KERNEL_FOR_RES = 'shUseKernelForRes'
SDF_SH_CLIENT_MAP = 'shMapClient'
SDF_SH_RES_MAP = 'shMapRes'


# latin1 (synonym to ISO-8859-1) doesn't have the Euro-symbol
# .. so we use ISO-8859-15 instead ?!?!? (see
# .. http://www.gerd-riesselmann.net/webentwicklung/utf-8-latin1-aka-iso-8859-1-und-das-euro-zeichen/  and
# .. http://www.i18nqa.com/debug/table-iso8859-1-vs-windows-1252.html  and
# .. http://www.i18nqa.com/debug/table-iso8859-1-vs-iso8859-15.html   )
# SXML_DEF_ENCODING = 'ISO-8859-15'
# But even with ISO-8859-15 we are getting errors with e.g. ACUTE ACCENT' (U+00B4/0xb4) therefore next tried UTF8
# SXML_DEF_ENCODING = 'utf8'
# .. but then I get the following error in reading all the clients:
# .. 'charmap' codec can't decode byte 0x90 in position 2: character maps to <undefined>
# then added an output type handler to the connection (see db_core.py) which did not solve the problem (because
# .. the db_core.py module is not using this default encoding but the one in NLS_LANG env var
# to fix showing umlaut character correctly tried cp1252 (windows charset)
# .. and finally this worked for all characters (because it has less undefined code points_import)
# SXML_DEF_ENCODING = 'cp1252'
# But with the added errors=DEF_ENCODE_ERRORS argument for the bytes() new/call used in the
# .. _SihotTcpClient.send_to_server() method we try sihot interface encoding again
# but SXML_DEF_ENCODING = 'ISO-8859-1' failed again with umlaut characters
# .. Y203585/HUN - Name decoded wrongly with ISO
SXML_DEF_ENCODING = 'cp1252'                                #: encoding used by the Sihot xml interface
ERR_MESSAGE_PREFIX_CONTINUE = 'CONTINUE:'                   #: error message prefix for ignorable errors
TCP_CONNECTION_BROKEN_MSG = "socket connection broken!"     #: error message fragment added if connection is broken

# private module constants
_TCP_MAXBUFLEN = 8192                                       #: tcp buffer length
_TCP_END_OF_MSG_CHAR = b'\x04'                              #: end-of-message character of the Sihot xml interface
_DEBUG_RUNNING_CHARS = "|/-\\"                              #: progress animation characters for console output


ppf = pprint.PrettyPrinter(indent=12, width=96, depth=9).pformat


#  HELPER METHODS AND CLASSES ###################################

[docs]def elem_to_attr(elem: str) -> str: """ convert element string to attribute string by converting into lower-case and replacing hyphens with underscores. :param elem: element string. :return: attribute string of the passed element string. """ return elem.lower().replace('-', '_')
[docs]class _SihotTcpClient: """ local sihot tcp client used by :meth:`SihotXmlBuilder.send_to_server`. """ error_message = "" received_xml = ""
[docs] def __init__(self, server_ip: str, server_port: int, timeout: float = 3.6, encoding: str = 'utf-8', debug_level: int = DEBUG_LEVEL_DISABLED): self.server_ip = server_ip self.server_port = server_port self.timeout = timeout self.encoding = encoding self.debug_level = debug_level
[docs] def send_to_server(self, xml: str) -> str: """ send passed xml string to the Sihot server. :param xml: xml string to send. :return: error message or empty string if no errors occurred. """ self.error_message = "" self.received_xml = "" try: with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock: if self.debug_level >= DEBUG_LEVEL_VERBOSE: po("_SihotTcpClient connecting to server ", self.server_ip, " on port ", self.server_port, " with encoding", self.encoding, " and timeout", self.timeout) # adding sock.setblocking(0) is resulting in a BlockingIOError exception sock.settimeout(self.timeout) sock.connect((self.server_ip, self.server_port)) buf = bytes(xml, encoding=self.encoding, errors=DEF_ENCODE_ERRORS) sock.sendall(buf + _TCP_END_OF_MSG_CHAR) self.received_xml = self._receive_response(sock) except Exception as ex: self.error_message = "_SihotTcpClient.send_to_server() exception: " + str(ex) \ + (" (sent XML=" + xml + ")" + "\n" + format_exc() if self.debug_level >= DEBUG_LEVEL_VERBOSE else "") return self.error_message
[docs] def _receive_response(self, sock) -> str: """ receive response from Sihot server. :param sock: used socket for the connection to the Sihot server. :return: received string. """ def _handle_err_gracefully(extra_msg=""): # socket connection broken, see https://docs.python.org/3/howto/sockets.html#socket-howto # .. and for 100054 see https://stackoverflow.com/questions/35542404 self.error_message = "_SihotTcpClient._receive_response(): " + TCP_CONNECTION_BROKEN_MSG + extra_msg if self.debug_level: po(self.error_message) xml_recv = b"" try: while xml_recv[-1:] != _TCP_END_OF_MSG_CHAR: chunk = sock.recv(_TCP_MAXBUFLEN) if not chunk: _handle_err_gracefully() break xml_recv += chunk xml_recv = xml_recv[:-1] # remove TCP_END_OF_MSG_CHAR except Exception as ex: if 10054 in ex.args: # [ErrNo|WinError 10054] An existing connection was forcibly closed by the remote host _handle_err_gracefully(" ErrNo=10054 (data loss is possible)") else: self.error_message = "_SihotTcpClient._receive_response() err: " + str(ex) \ + (" (received XML=" + str(xml_recv, self.encoding) + ")" + "\n" + format_exc() if self.debug_level >= DEBUG_LEVEL_VERBOSE else "") return str(xml_recv, self.encoding)
[docs]class RequestXmlHandler(socketserver.BaseRequestHandler, metaclass=ABCMeta): """ server component base class to receive xml data from the Sihot system. """ error_message = "" # def setup(self): # # the socket is called request in the request handler # self.request.settimeout(1.0) # #self.request.setblocking(False)
[docs] def notify(self): """ print error message to console output. """ po("**** " + self.error_message)
[docs] def handle(self): """ receive xml string sent by the Sihot system to this server component. """ xml_recv = b"" try: while xml_recv[-1:] != _TCP_END_OF_MSG_CHAR: chunk = self.request.recv(_TCP_MAXBUFLEN) if not chunk: # socket connection broken, see https://docs.python.org/3/howto/sockets.html#socket-howto self.error_message = "RequestXmlHandler.handle(): " + TCP_CONNECTION_BROKEN_MSG self.notify() return xml_recv += chunk xml_recv = xml_recv[:-1] # remove TCP_END_OF_MSG_CHAR resp = self.handle_xml(xml_recv) + _TCP_END_OF_MSG_CHAR self.request.sendall(resp) except Exception as ex: self.error_message = "RequestXmlHandler.handle() exception='" + str(ex) + "' (XML=" + str(xml_recv) + ")"\ + "\n" + format_exc() self.notify()
[docs] @abstractmethod def handle_xml(self, xml_from_client: bytes) -> bytes: """ abstract method to be implemented by the inheriting class. :param xml_from_client: xml request sent from Sihot xml client as a bytes string. :return: xml response to the Sihot xml client as bytes string. """
[docs]class _ThreadedServer(socketserver.ThreadingMixIn, socketserver.TCPServer): """ local tcp server threading mixin class used by :class:`~ae.sys_core_sh.TcpServer` class. """
[docs]class TcpServer: """ tcp server class to process xml sent by the Sihot client. """
[docs] def __init__(self, ip, port, cls_xml_handler, debug_level=DEBUG_LEVEL_DISABLED): self.debug_level = debug_level # cls_xml_handler is a RequestXmlHandler subclass with an overridden handle_xml() method server = _ThreadedServer((ip, port), cls_xml_handler) if debug_level >= DEBUG_LEVEL_VERBOSE: po("TcpServer initialized on ip/port: ", server.server_address) # start a thread with the server - which then start one more thread for each request/client-socket server_thread = threading.Thread(target=server.serve_forever) # exit server thread when main tread terminates server_thread.daemon = True server_thread.start() if debug_level >= DEBUG_LEVEL_VERBOSE: po("TcpServer running in thread:", server_thread.name) self.server = server
[docs] def run(self, display_animation: bool = False): """ run/start the server. :param display_animation: pass True to display progress animation at console output. """ try: sleep_time = 0.5 / len(_DEBUG_RUNNING_CHARS) index = 0 while True: if display_animation: index = (index + 1) % len(_DEBUG_RUNNING_CHARS) po("Server is running " + _DEBUG_RUNNING_CHARS[index], end="\r", flush=True) time.sleep(sleep_time) except Exception as ex: po("Server killed with exception: ", ex) if self.debug_level: po(format_exc()) self.server.shutdown() self.server.server_close()
# XML PARSER ##############################################################################
[docs]class SihotXmlParser: """ XMLParser interface used by client to parse the responses from the Sihot server. """
[docs] def __init__(self, cae: ConsoleApp): self._xml = '' self._base_tags = ['ERROR-LEVEL', 'ERROR-TEXT', 'ID', 'MSG', 'OC', 'ORG', 'RC', 'TN', 'VER', 'MATCHCODE', 'OBJID'] self._curr_tag = '' self._curr_attr = '' self._elem_path: List[str] = [] # element path implemented as list stack # main xml elements/items self.oc = '' self.tn = '0' self.id = '1' self.matchcode = None self.objid = None self.rc = '0' self.msg = '' self.ver = '' self.error_level = '0' # used by kernel interface instead of RC/MSG self.error_text = '' self.cae = cae # only needed for logging with debug_out()/dpo() self._parser: Optional[XMLParser] = None # set to XMLParser() in self.parse_xml() and close in self.close()
[docs] def parse_xml(self, xml: str): """ parse the xml response string sent by the Sihot server. :param xml: xml string to parse. """ self.cae.dpo("SihotXmlParser.parse_xml():", xml) self._xml = xml # noinspection PyTypeChecker self._parser = XMLParser(target=self) try: self._parser.feed(xml) except ParseError: # replacing '&#128;' with '€', '&#1;' with '¿1¿' and '&#7;' with '¿7¿' for Sihot XML self._xml = self._xml.replace('&#1;', '¿1¿').replace('&#7;', '¿7¿').replace('&#128;', '€') # replacing '&#NNN;' with chr(NNN) for Sihot XML self._xml = re.compile(r"&#([0-9]+);").sub(lambda m: chr(int(m.group(0)[2:-1])), self._xml) self._parser.feed(self._xml)
[docs] def get_xml(self) -> str: """ get the xml string to be parsed. :return: xml string to be parsed. """ return self._xml
# xml parsing interface
[docs] def start(self, tag: str, _attrib: Dict[str, str]) -> Optional[str]: """ parse next opening xml tag; called for each opening tag. :param tag: tag string. :param _attrib: attribute string (not used in Sihot xml elements). :return: None if the tag got parsed and recognized/processed else the unrecognized tag string. """ self._curr_tag = tag self._curr_attr = '' # used as flag for a currently parsed base tag (for self.data()) self._elem_path.append(tag) if tag in self._base_tags: self.cae.dpo("SihotXmlParser.start():", self._elem_path) self._curr_attr = elem_to_attr(tag) setattr(self, self._curr_attr, '') return None # collect extra info on error response (RC != '0') within the MSG tag field if tag[:4] in ('MSG-', "INDE", "VALU"): self._curr_attr = 'msg' # Q&D: by simply using tag[4:] to remove MSG- prefix, INDEX will be shown as X= and VALUE as E= setattr(self, self._curr_attr, getattr(self, self._curr_attr, '') + " " + tag[4:] + "=") return None return tag
[docs] def data(self, data: str) -> Optional[str]: """ process parsed data string; called on each chunk (separated by XMLParser on spaces, special chars, ...). :param data: data string chunk. :return: None if data chunk string got parsed and recognized else the unprocessed data string. """ if self._curr_attr and data.strip(): self.cae.dpo("SihotXmlParser.data(): ", self._elem_path, data) setattr(self, self._curr_attr, getattr(self, self._curr_attr) + data) return None return data
[docs] def end(self, tag: str) -> Optional[str]: """ parser detected end tag of element; called for each closing tag. :param tag: closing element tag string. :return: closing element tag string. """ self.cae.dpo("SihotXmlParser.end():", self._elem_path) self._curr_tag = '' self._curr_attr = '' if self._elem_path: # Q&D Fix for TestGuestSearch to prevent pop() on empty _elem_path list self._elem_path.pop() return tag
[docs] def close(self) -> 'SihotXmlParser': """ end of xml string reached; called when all data has been parsed. :return: this :class:`SihotXmlParser` instance. """ if self._parser: self._parser.close() return self # ._max_depth
[docs] def server_error(self) -> str: """ get the server error code string. :return: '0' if no error occurred, else the Sihot error return code as string. """ if self.rc != '0': return self.rc if self.error_level != '0': return self.error_level return '0'
[docs] def server_err_msg(self) -> str: """ get the server error message. :return: empty string if no error occurred, else the Sihot error message string. """ if self.rc != '0': return self.msg if self.error_level != '0': return self.error_text return ''
[docs]class Request(SihotXmlParser): """ xml parser for generic requests from SIHOT. """
[docs] def get_operation_code(self) -> str: """ return the Sihot operation code of the xml request string sent to the Sihot server. :return: Sihot operation code string. """ return self.oc
[docs]class RoomChange(SihotXmlParser): """ xml parser to process Sihot server room change notifications. """
[docs] def __init__(self, cae: ConsoleApp): super().__init__(cae) # add base tags for room/GDS number, old room/GDS number and guest objid self._base_tags.append('HN') self.hn = None # added to remove pycharm warning self._base_tags.append('RN') self.rn = None self._base_tags.append('ORN') self.orn = None self._base_tags.append('GDSNO') self.gdsno = None self._base_tags.append('RES-NR') self.res_nr = None self._base_tags.append('SUB-NR') self.sub_nr = None self._base_tags.append('OSUB-NR') self.osub_nr = None self._base_tags.append('GID') # Sihot guest object id self.gid = None self._base_tags.append('MC') # ae:05-12-2018 - added to detect and suppress rental reservations self.mc = None
[docs]class ResChange(SihotXmlParser): """ xml parser to process Sihot server reservation change notifications. """
[docs] def __init__(self, cae: ConsoleApp): super().__init__(cae) self._base_tags.append('HN') # def hotel ID as base tag, because is outside of 1st SIHOT-Reservation block self.hn = None # added instance var to remove pycharm warning self.rgr_list: List[Dict[str, Any]] = []
[docs] def start(self, tag: str, attrib: Dict[str, str]) -> None: """ parser detected start tag of next/new xml element; called for each opening tag. """ if super().start(tag, attrib) is None and tag not in ('MATCHCODE', 'OBJID'): return None # processed by base class self.cae.dpo("ResChange.start():", self._elem_path) if tag == 'SIHOT-Reservation': self.rgr_list.append(dict(rgr_ho_fk=self.hn, ResPersons=[])) elif tag in ('FIRST-Person', 'SIHOT-Person'): # FIRST-Person only seen in room change (CI) on first occ self.rgr_list[-1]['ResPersons'].append({}) return None
[docs] def data(self, data: str) -> None: """ process parsed element data chunk. """ if super().data(data) is None and self._curr_tag not in ('MATCHCODE', 'OBJID'): return None # processed by base class self.cae.dpo("ResChange.data():", self._elem_path, self.rgr_list) append = True # flag to detect and prevent multiple values # because data can be sent in chunks on parsing, we first determine the dictionary (dic) and the item key (key) # rgr/reservation group elements if self._curr_tag == 'RNO': dic, key = self.rgr_list[-1], 'rgr_res_id' elif self._curr_tag == 'RSNO': dic, key = self.rgr_list[-1], 'rgr_sub_id' elif self._curr_tag == 'GDSNO': dic, key = self.rgr_list[-1], 'rgr_gds_no' elif self._elem_path == ['SIHOT-Document', 'SIHOT-Reservation', 'OBJID']: # not provided by CI/CO/RM dic, key = self.rgr_list[-1], 'rgr_obj_id' elif self._elem_path == ['SIHOT-Document', 'SIHOT-Reservation', 'ARR']: dic, key = self.rgr_list[-1], 'rgr_arrival' elif self._elem_path == ['SIHOT-Document', 'SIHOT-Reservation', 'DEP']: dic, key = self.rgr_list[-1], 'rgr_departure' elif self._curr_tag == 'RT_SIHOT': # RT has different values (1=definitive, 2=tentative, 3=cxl) # data = 'S' if data == '3' else data # .. so using undocumented RT_SIHOT to prevent conversion dic, key = self.rgr_list[-1], 'rgr_status' elif self._elem_path == ['SIHOT-Document', 'SIHOT-Reservation', 'NOPAX']: dic, key = self.rgr_list[-1], 'rgr_adults' elif self._elem_path == ['SIHOT-Document', 'SIHOT-Reservation', 'NOCHILDS']: # not provided by CR dic, key = self.rgr_list[-1], 'rgr_children' # rgr/reservation group elements that are repeated (e.g. for each PAX in SIHOT-Person sections) elif self._curr_tag == 'CAT': dic, key = self.rgr_list[-1], 'rgr_room_cat_id' append = key in dic and len(dic[key]) < 4 elif self._curr_tag == 'MC': dic, key = self.rgr_list[-1], 'rgr_mkt_segment' append = key in dic and len(dic[key]) < 2 # rgc/reservation clients elements elif self._curr_tag == 'GID': # Sihot Guest object ID dic, key = self.rgr_list[-1]['ResPersons'][-1], 'PersShId' elif self._curr_tag == 'MATCHCODE': dic, key = self.rgr_list[-1]['ResPersons'][-1], 'PersAcuId' elif self._curr_tag == 'SN': dic, key = self.rgr_list[-1]['ResPersons'][-1], 'rgc_surname' elif self._curr_tag == 'CN': dic, key = self.rgr_list[-1]['ResPersons'][-1], 'rgc_firstname' elif self._curr_tag == 'DOB': dic, key = self.rgr_list[-1]['ResPersons'][-1], 'rgc_dob' elif self._curr_tag == 'PHONE': dic, key = self.rgr_list[-1]['ResPersons'][-1], 'rgc_phone' elif self._curr_tag == 'EMAIL': dic, key = self.rgr_list[-1]['ResPersons'][-1], 'rgc_email' elif self._curr_tag == 'LN': dic, key = self.rgr_list[-1]['ResPersons'][-1], 'rgc_language' elif self._curr_tag == 'COUNTRY': dic, key = self.rgr_list[-1]['ResPersons'][-1], 'rgc_country' elif self._curr_tag == 'RN': self.rgr_list[-1]['rgr_room_id'] = data # update also rgr_room_id with same value dic, key = self.rgr_list[-1]['ResPersons'][-1], 'rgc_room_id' # unsupported elements else: self.cae.vpo("ResChange.data(): ignoring element ", self._elem_path, "; data chunk=", data) return None # add data - after check if we need to add or to extend the dictionary item if key not in dic: dic[key] = data elif append: dic[key] += data return None
[docs]class ResResponse(SihotXmlParser): """ xml parser for kernel or web interface responses. """
[docs] def __init__(self, cae: ConsoleApp): super().__init__(cae) # web and kernel (guest/client and reservation) interface response elements self._base_tags.extend(['GDSNO', 'RES-NR', 'SUB-NR']) self.gdsno = self.res_nr = self.sub_nr = None
[docs]class AvailCatInfoResponse(SihotXmlParser): """ processing response of CATINFO operation code of the WEB interface """
[docs] def __init__(self, cae: ConsoleApp): super().__init__(cae) self._curr_cat = '' self._curr_day = '' self.avail_room_cats: Dict[str, Dict[str, Any]] = {}
[docs] def data(self, data: str) -> Optional[str]: """ process parsed element data chunk. """ if super().data(data) is None: return None if self._curr_tag == 'CAT': self.avail_room_cats[data] = {} self._curr_cat = data elif self._curr_tag == 'D': self.avail_room_cats[self._curr_cat][data] = {} self._curr_day = data elif self._curr_tag in ('TOTAL', 'OOO'): self.avail_room_cats[self._curr_cat][self._curr_day][self._curr_tag] = int(data) elif self._curr_tag == 'OCC': self.avail_room_cats[self._curr_cat][self._curr_day][self._curr_tag] = float(data) day = self.avail_room_cats[self._curr_cat][self._curr_day] day['AVAIL'] = int(round_traditional(day['TOTAL'] * (1.0 - day['OCC'] / 100.0))) - day['OOO'] return data
[docs]class CatRoomResponse(SihotXmlParser): """ parser for Sihot room category responses. """
[docs] def __init__(self, cae: ConsoleApp): super().__init__(cae) # ALLROOMS response of the WEB interface self._base_tags += ('NAME', 'RN') self.name = None # added to remove pycharm warning self.rn = None self.cat_rooms: Dict[str, List[str]] = {} # to store the dict with all key values
[docs] def end(self, tag): """ parser detected end tag of element. """ if super().end(tag) is None: return None # tag used/processed by base class if tag == 'NAME': self.cat_rooms[self.name] = [] elif tag == 'RN': self.cat_rooms[self.name].append(self.rn) return tag
[docs]class ConfigDictResponse(SihotXmlParser): """ parser to process Sihot configuration setting responses. """
[docs] def __init__(self, cae: ConsoleApp): super().__init__(cae) # response to GCF operation code of the WEB interface self._base_tags += ('KEY', 'VALUE') # VALUE for key value (remove from additional error info - see 'VALU') self.value = '' # added to remove pycharm warning self.key = '' self.key_values: Dict[str, str] = {} # to store the dict with all key values
[docs] def end(self, tag: str) -> Optional[str]: """ parser detected end tag of Sihot config element. """ if super().end(tag) is None: return None # tag used/processed by base class if tag == 'SIHOT-CFG': self.key_values[self.key] = self.value return tag
[docs]class ResKernelResponse(SihotXmlParser): """ response to the RESERVATION-GET oc/request of the KERNEL interface. """
[docs] def __init__(self, cae: ConsoleApp): super().__init__(cae) self._base_tags.append('HN') self.hn = None self._base_tags.append('RES-NR') self.res_nr = None self._base_tags.append('SUB-NR') self.sub_nr = None self._base_tags.append('GDS-NR') self.gds_nr = None
# XML BUILDER ##############################################################################
[docs]class SihotXmlBuilder: """ generic class to build and send Sihot xml requests. """ tn: str = '1'
[docs] def __init__(self, cae: ConsoleApp, use_kernel: bool = False): """ create an instance of this class. :param cae: instance of the running :class:`~ae.console.ConsoleApp` app. :param use_kernel: pass True to use the Sihot kernel interface (False==use Sihot sxml interface). """ self.cae = cae self.debug_level = cae.get_opt('debug_level') self.use_kernel_interface = use_kernel self.response: Optional[SihotXmlParser] = None self._xml = ''
[docs] def beg_xml(self, operation_code: str, add_inner_xml: str = '', transaction_number: str = ''): """ create a new xml request string including xml header, operation code and transaction number. :param operation_code: Sihot operation code of the new xml element. :param add_inner_xml: inner xml block/elements. :param transaction_number: Sihot transaction number string. """ enc = self.cae.get_opt(SDF_SH_XML_ENCODING) or "" if enc: enc = f' encoding="{enc.lower()}"' self._xml = f'<?xml version="1.0"{enc}?>\n<SIHOT-Document>\n' if self.use_kernel_interface: self._xml += '<SIHOT-XML-REQUEST>\n' self.add_tag('REQUEST-TYPE', operation_code) else: self.add_tag('OC', operation_code) if transaction_number: self.tn = transaction_number else: try: self.tn = str(int(self.tn) + 1) except OverflowError: self.tn = '1' self.add_tag('TN', self.tn) self._xml += add_inner_xml
[docs] def end_xml(self): """ terminate a Sihot xml request string. """ if self.use_kernel_interface: self._xml += '\n</SIHOT-XML-REQUEST>' self._xml += '\n</SIHOT-Document>'
[docs] def add_tag(self, tag: str, val: str = ''): """ add a new xml element tag with the passed value. """ self._xml += self.new_tag(tag, val)
[docs] def send_to_server(self, response_parser: Optional['SihotXmlParser'] = None) -> str: """ send the built xml request to the Sihot server. :param response_parser: used parser to parse the response from the Sihot server (def=SihotXmlParser). :return: error string or empty string if no errors occurred. """ stc = _SihotTcpClient(self.cae.get_opt(SDF_SH_SERVER_ADDRESS), self.cae.get_opt(SDF_SH_KERNEL_PORT if self.use_kernel_interface else SDF_SH_WEB_PORT), timeout=self.cae.get_opt(SDF_SH_TIMEOUT), encoding=self.cae.get_opt(SDF_SH_XML_ENCODING), debug_level=self.debug_level) self.cae.dpo("SihotXmlBuilder.send_to_server(): resp_parser={}\nxml=\n{}".format(response_parser, self.xml)) err_msg = stc.send_to_server(self.xml) if not err_msg: self.response = response_parser or SihotXmlParser(self.cae) self.response.parse_xml(stc.received_xml) err_num = self.response.server_error() if err_num != '0': err_msg = self.response.server_err_msg() if err_msg: err_msg = "msg='{}'".format(err_msg) elif err_num == '29': err_msg = "No Reservations Found" if err_num != '1' or self.debug_level >= DEBUG_LEVEL_VERBOSE: err_msg += "; sent xml='{}'; got xml='{}'".format(self.xml, stc.received_xml)[0 if err_msg else 2:] err_msg = "server return code {} {}".format(err_num, err_msg) if err_msg: self.cae.po("**** SihotXmlBuilder.send_to_server() error: {}".format(err_msg)) return err_msg
[docs] @staticmethod def new_tag(tag: str, val: Any = '', opening: bool = True, closing: bool = True): """ create new xml element with the passed tag and value. :param tag: tag of the new xml element. :param val: value of the new xml element. :param opening: pass False to NOT add the element opening tag. :param closing: pass False to NOT add the element closing tag. :return: new xml element or element part/fragment. """ return \ ('<' + tag + '>' if opening else '') \ + str(val or '') \ + ('</' + tag + '>' if closing else '')
[docs] @staticmethod def convert_value_to_xml_string(value: Any) -> str: """ convert any element value type to the corresponding xml string, replacing & < > characters with escapes. :param value: element value to be converted. :return: element value as string. """ ret = '' if value is None else str(value) # convert None to empty string if isinstance(value, (datetime.datetime, datetime.date)) and ret.endswith(' 00:00:00'): ret = ret[:-9] # escape special characters while preserving already escaped characters - by first un-escape then escape again for key, val in [('&amp;', '&'), ('&lt;', '<'), ('&gt;', '>'), ('&', '&amp;'), ('<', '&lt;'), ('>', '&gt;')]: ret = ret.replace(key, val) return ret
@property def xml(self) -> str: """ property to determine the currently built xml string. :getter: return built xml string. :setter: change xml string. """ return self._xml @xml.setter def xml(self, value): self.cae.dpo('SihotXmlBuilder.xml-set:', value) self._xml = value
[docs]class AvailCatInfo(SihotXmlBuilder): """ build xml request and send it to get available room categories from the Sihot server. """
[docs] def avail_rooms(self, hotel_id: str = '', room_cat: str = '', from_date: datetime.date = datetime.date.today(), to_date: datetime.date = datetime.date.today() ) -> Union[dict, str]: """ determine available rooms for the specified hotel, room category and date range. :param hotel_id: Sihot hotel id or empty string to get available rooms of all hotels. :param room_cat: Sihot room category or empty string to get available rooms of all categories. :param from_date: start date of the date range: defaulting to today. :param to_date: end date of the date range; defaulting to today. :return: Sihot response as dict created by :class:`.AvailCatInfoResponse` with the results or an error message string if an error occurred. """ # flags=''): # SKIP-HIDDEN-ROOM-TYPES'): self.beg_xml(operation_code='CATINFO') if hotel_id: self.add_tag('ID', hotel_id) self.add_tag('FROM', datetime.date.strftime(from_date, DATE_ISO)) # mandatory self.add_tag('TO', datetime.date.strftime(to_date, DATE_ISO)) if room_cat: self.add_tag('CAT', room_cat) # if flags: # self.add_tag('FLAGS', flags) # there is no FLAGS element for the CATINFO oc?!?!? self.end_xml() err_msg = self.send_to_server(response_parser=AvailCatInfoResponse(self.cae)) if err_msg: return err_msg assert isinstance(self.response, AvailCatInfoResponse) return err_msg or self.response.avail_room_cats
[docs]class CatRooms(SihotXmlBuilder): """ built room category request and send it to Sihot server. """
[docs] def get_cat_rooms(self, hotel_id: str = '1', from_date: datetime.date = datetime.date.today(), to_date: datetime.date = datetime.date.today(), scope: str = '') -> Union[dict, str]: """ determine configured room categories of the Sihot system for the specified hotel and date range. :param hotel_id: Sihot hotel id to get the room categories. :param from_date: start date of the date range: defaulting to today. :param to_date: end date of the date range; defaulting to today. :param scope: scope string to request additional information - see Sihot documentation. :return: Sihot response as dict created by :class:`.CatRoomResponse` with the results or an error message string if an error occurred. """ self.beg_xml(operation_code='ALLROOMS') self.add_tag('ID', hotel_id) # mandatory self.add_tag('FROM', datetime.date.strftime(from_date, DATE_ISO)) # mandatory self.add_tag('TO', datetime.date.strftime(to_date, DATE_ISO)) if scope: self.add_tag('SCOPE', scope) # pass 'DESC' to get room description self.end_xml() err_msg = self.send_to_server(response_parser=CatRoomResponse(self.cae)) if err_msg: return err_msg assert isinstance(self.response, CatRoomResponse) return err_msg or self.response.cat_rooms
[docs]class ConfigDict(SihotXmlBuilder): """ build and send request for the Sihot configuration settings. """
[docs] def get_key_values(self, config_type: str, hotel_id: str = '1', language: str = 'EN') -> Union[dict, str]: """ determine a configuration setting of the Sihot system for the specified hotel and language. :param config_type: Sihot config setting type - see Sihot documentation. :param hotel_id: Sihot hotel id to get the configuration setting. :param language: language id for the configuration description texts - see Sihot documentation. :return: Sihot response as dict created by :class:`.ConfigDictResponse` with the results or an error message string if an error occurred. """ self.beg_xml(operation_code='GCF') self.add_tag('CFTYPE', config_type) self.add_tag('HN', hotel_id) # mandatory self.add_tag('LN', language) self.end_xml() err_msg = self.send_to_server(response_parser=ConfigDictResponse(self.cae)) if err_msg: return err_msg assert isinstance(self.response, ConfigDictResponse) return self.response.key_values
[docs]class PostMessage(SihotXmlBuilder): """ build and send request to add a message into the Sihot system message/notification queue. """
[docs] def post_message(self, msg: str, level: int = 3, system: str = 'sys_core_sh_module') -> str: """ build and send request to add a message into the Sihot system message/notification queue. :param msg: message text string to add to the Sihot system messages. :param level: importance level. :param system: message sender system id/string. :return: error message or empty string if no error occurred. """ self.beg_xml(operation_code='SYSMESSAGE') self.add_tag('MSG', msg) self.add_tag('LEVEL', str(level)) self.add_tag('SYSTEM', system) self.end_xml() err_msg = self.send_to_server() if err_msg: ret = err_msg else: assert isinstance(self.response, SihotXmlParser) ret = '' if self.response.rc == '0' else 'Error code ' + self.response.rc return ret
[docs]class ResKernelGet(SihotXmlBuilder): """ build and send generic request to the Sihot kernel interface. """
[docs] def __init__(self, cae: ConsoleApp): super().__init__(cae, use_kernel=True)
[docs] def fetch_res_no(self, obj_id: str, scope='GET') -> Tuple[Any, ...]: """ determine reservation and guest data for the passed reservation object id. :param obj_id: Sihot reservation object id. :param scope: search scope string (see 7.3.1.2 in Sihot KERNEL interface doc V 9.0). :return: either the reservation ids as tuple of (hotel_id, res_id, sub_id, gds_no) or the tuple (None, "error") if the reservation was not found. """ msg = "ResKernelGet.fetch_res_no({}, {}) ".format(obj_id, scope) self.beg_xml(operation_code='RESERVATION-GET') self.add_tag('RESERVATION-PROFILE', self.new_tag('OBJID', obj_id) + self.new_tag('SCOPE', scope)) self.end_xml() err_msg = self.send_to_server(response_parser=ResKernelResponse(self.cae)) if not err_msg and isinstance(self.response, ResKernelResponse): res_data: Tuple[Any, ...] = ( self.response.hn, self.response.res_nr, self.response.sub_nr, self.response.gds_nr) self.cae.dpo(msg + "res_no={};\nxml=\n{}".format(res_data, self.xml)) else: res_data = (None, err_msg) self.cae.po(msg + "error='{}'".format(err_msg)) return res_data
[docs]class ShSysConnector(SystemConnectorBase): """ connector class for the Sihot system. """
[docs] def connect(self) -> str: """ not needed - lazy connection """ return self.last_err_msg
[docs] @staticmethod def clients_match_field_init(match_fields: Sequence) -> str: """ check and return the match field for a client search. :param match_fields: tuple/list of length 1 with the match field name. :return: match field name or an error message if an error occurred. """ msg = "ShSysConnector.clients_match_field_init({}) expects ".format(match_fields) supported_match_fields = [SH_DEF_SEARCH_FIELD, 'AcuId', 'Surname', 'Email'] if match_fields: match_field = match_fields[0] if len(match_fields) > 1: return msg + "single match field" if match_field not in supported_match_fields: return "only one of the match fields {} (not {})".format(supported_match_fields, match_field) else: match_field = SH_DEF_SEARCH_FIELD return match_field