Source code for ae.sideloading_server

"""
sideloading server
==================

this ae namespace portion provides a simple http server to download a file to another device in the same local network,
using any web browser.

by adding this module to your android app you can distribute e.g. the APK file of your app directly to another android
device to install it there - without requiring an app store or an internet connection.

The implementation is inspired by the accepted answer to `<https://stackoverflow.com/questions/18543640
/how-would-i-create-a-python-web-server-that-downloads-a-file-on-any-get-request>`_.

Useful debugging tools can be found at `<https://www.maketecheasier.com/transferring-files-using-python-http-server>`_.


sideloading server lifecycle
----------------------------

The sideloading server provided by this ae namespace portion can be started by first creating an instance of the
sideloading app and then call its :meth:`~SideloadingServerApp.start_server` method::

    from ae.sideloading_server import server_factory

    sideloading_server_app = server_factory()
    sideloading_server_app.start_server()

By calling :meth:`~SideloadingServerApp.start_server` without arguments a default file mask (:data:`DEFAULT_FILE_MASK`)
will be used, which specifies the APK file of the main app situated in the `Downloads` folder of the device.

The web address that has to be entered in the web browser of the receiving device can be determined with the
:meth:`~SideloadingServerApp.server_url` method::

    client_url = sideloading_server_app.server_url()

While the server is running the :meth:`~SideloadingServerApp.client_progress` method is providing the progress state
of any currently running sideloading task. More detailed info of the running sideloading process can be gathered
by calling the :meth:`~SideloadingServerApp.fetch_log_entries` method.

to temporarily stop the sideloading server simple call the :meth:`~SideloadingServerApp.stop_server` method. On app
exit additionally call the method :meth:`~SideloadingServerApp.shutdown`.

.. hint::
    The ae namespace portion :mod:`~ae.kivy_sideloading` is providing a package for an easy integration of this
    sideloading server into your kivy app.

    An example usage of this sideloading server and the :mod:`ae.kivy_sideloading` package is integrated into the demo
    apps `GlslTester <https://github.com/AndiEcker/glsl_tester>`_, `Lisz <https://gitlab.com/ae-group/kivy_lisz>`_ and
    `ComPartY <https://gitlab.com/ae-group/comparty>`_.

"""
import datetime
import glob
import os
import threading

from http.server import BaseHTTPRequestHandler, ThreadingHTTPServer     # ThreadingHTTPServer available in Python>=3.7
from typing import Any, Callable, Dict, List, Optional, Tuple

from ae.base import DATE_TIME_ISO, dummy_function, os_local_ip          # type: ignore
from ae.files import copy_bytes                                         # type: ignore
from ae.paths import normalize                                          # type: ignore
from ae.console import ConsoleApp                                       # type: ignore


__version__ = '0.3.11'


DEFAULT_FILE_MASK = "{downloads}/{main_app_name}*.apk"     #: default glob file mask to sideloading file
FILE_COUNT_MISMATCH = " files found matching "  #: err msg part of :meth:`SideloadingServerApp.start_server`

SERVER_BIND = ""                    #: setting BIND to '' or None to allow connections for all local devices
SERVER_PORT = 36996                 #: http server listening port (a port number under 1024 requires root privileges)

SOCKET_BUF_LEN = 16 * 1024          #: buf length for socket sends in :meth:`~SideloadingServerApp.response_to_request`

SHUTDOWN_TIMEOUT = 3.9              #: timeout(in seconds) to shut down/stop the console app and http sideloading server

SideloadingKwargs = Dict[str, Any]  #: command/log format of sideloading requests

clients_lock = threading.Lock()
requests_lock = threading.Lock()


[docs]def server_factory(task_id_func: Optional[Callable[[str, str, str], str]] = None) -> 'SideloadingServerApp': """ create server app instance. :param task_id_func: callable to return an unique id for a transfer request task. :return: sideloading server app instance. """ global server_app #: singleton server instance server_app = SideloadingServerApp(app_name='sideloading_server', multi_threading=True, disable_buffering=True) server_app.add_option('bind', "server bind address", SERVER_BIND, 'b') server_app.add_option('port', "server listening port", SERVER_PORT, 'p') server_app.add_option('file_mask', "glob file mask for the sideloading file", DEFAULT_FILE_MASK, 'm') server_app.add_option('buf_len', "socket buffer length", SOCKET_BUF_LEN, 'l') if task_id_func: # noinspection PyTypeHints server_app.id_of_task = task_id_func # type: ignore return server_app
[docs]def update_handler_progress(handler: Optional['SimpleHTTPRequestHandler'] = None, transferred_bytes: int = -3, total_bytes: int = -3, **_kwargs): """ default progress update callback. :param handler: server request handler - containing the attributes to be updated. :param transferred_bytes: already transferred bytes or error code. :param total_bytes: total sideloading bytes. :param _kwargs: additional/optional kwargs (e.g. client_ip). """ if handler: handler.progress_transferred = transferred_bytes handler.progress_total = total_bytes
[docs]class SimpleHTTPRequestHandler(BaseHTTPRequestHandler): """ server request handler. """ progress_total: int = -1 #: total file size in bytes to be transferred progress_transferred: int = -1 #: transferred bytes in this sideloading progress thread # noinspection PyPep8Naming
[docs] def do_GET(self): """ handler callback for a http GET command request. """ server_app.response_to_request(self)
[docs] def log_request(self, code='-', size='-'): """ overwrite to prevent console output if not in debug mode. """ if server_app.debug: super().log_request(code=code, size=size) # pragma: no cover
[docs]class SideloadingServerApp(ConsoleApp): """ server service app class """ client_handlers: Dict[str, SimpleHTTPRequestHandler] #: handler for each active client connection file_path: str #: path to the found sideloading file load_requests: List[SideloadingKwargs] #: sideloading requests and error/debug log progress_callback: Callable = dummy_function #: progress event callback server_instance: Optional[ThreadingHTTPServer] #: server class instance server_thread: Optional[threading.Thread] #: server thread (main or separate thread)
[docs] def client_progress(self, client_ip: str) -> Tuple[int, int]: """ determine sideloading progress (transferred bytes) for the client with the passed ip address. This method can be used alternatively to the callback :attr:`~SideloadingServerApp.progress_callback`. :param client_ip: client ip address. :return: tuple of two int values: status and file size of the sideloading file. A positive status value signifies the number of already transferred bytes. A negative value in this first int signals an error. Possibles error codes are: * -1 if sideloading task did not start yet * -2 if not exists or sideloading task has already finished * -3 if the default Please note that the file size given in the second int can be 0 if an error has occurred or if the sideloading task is not yet fully created/started. """ clients_lock.acquire() handler = self.client_handlers.get(client_ip, None) clients_lock.release() return (handler.progress_transferred, handler.progress_total) if handler else (-2, 0)
[docs] @staticmethod def id_of_task(action: str, object_type: str, object_key: str) -> str: """ compile the id of the sideloading request task. :param action: action or log level string. :param object_type: task object type (currently only 'log' is implemented/supported). :param object_key: task key (for log entries the timestamp of the log entry is used). :return: unique key identifying the task/log-entry. """ return action + '_' + object_type + ':' + object_key
[docs] def log(self, log_level: str, message: str): """ print log message and optionally add it to the requests (to be read by controller app). :param log_level: 'print' always prints, 'debug' prints if `self.debug` is True and 'verbose' prints a message to the log if `self.verbose` is True. :param message: message to print. """ out_method = getattr(self, log_level + '_out', None) if callable(out_method): out_method(("" if self.active_log_stream else f"{threading.current_thread().name: <15}") + f"{message}") if getattr(self, log_level, True) and hasattr(self, 'load_requests'): log_time = datetime.datetime.now() requests_lock.acquire() self.load_requests.append(dict( method_name=log_level + '_' + 'log', message=message, log_time=log_time, rt_id=self.id_of_task(log_level, 'log', log_time.strftime(DATE_TIME_ISO)))) requests_lock.release()
[docs] def fetch_log_entries(self) -> Dict[str, SideloadingKwargs]: """ collect last log entries, return them and remove them from this app instance. :return: Dict[request_tasks_id, sideloading_request_kwargs] with fetched requests log entries. """ requests_lock.acquire() log_entries = {} requests = self.load_requests for entry in requests: log_entries[entry['rt_id']] = entry requests[:] = [] requests_lock.release() return log_entries
[docs] def response_to_request(self, handler: SimpleHTTPRequestHandler): """ process request to this server and return response string. .. note:: this method may run in a separate thread (created by the server to process this request). :param handler: stream request handler instance. """ pre = "SideloadingServerApp.response_to_request()" self.log('verbose', f"{pre} from client={handler.client_address}") client_ip = handler.client_address[0] clients_lock.acquire() self.client_handlers[client_ip] = handler # removed by controlling process/app on finished sideloading clients_lock.release() errors: List[str] = [] try: handler.send_response(200) handler.send_header("Content-Type", 'application/octet-stream') handler.send_header("Content-Disposition", f'attachment; filename="{os.path.basename(self.file_path)}"') with open(self.file_path, 'rb') as file_handle: file_size = os.fstat(file_handle.fileno()).st_size handler.progress_total = file_size handler.send_header("Content-Length", str(file_size)) handler.end_headers() progress_kwargs = dict(client_ip=client_ip, handler=handler) copy_bytes(file_handle, handler.wfile, total_bytes=file_size, buf_size=self.get_opt('buf_len'), errors=errors, progress_func=self.progress_callback, **progress_kwargs) if errors: self.log('print', f"{pre} copy_bytes errors: {errors}") else: handler.progress_transferred = file_size except (KeyError, IOError, OSError, SyntaxError, ValueError, Exception) as ex: self.log('print', f"{pre} exception {ex} on send of {self.file_path}") else: if not errors: self.log('debug', f"{pre} side-loaded {file_size} bytes of {self.file_path} to {client_ip}")
[docs] def server_url(self) -> str: """ determine an url string to put into the address field of any browser to start sideloading. :return: server url string. """ # noinspection HttpUrlsUsage url = "http://" + os_local_ip() port = self.get_opt('port') if port != 80: url += ":" + str(port) return url
[docs] def shutdown(self, exit_code: Optional[int] = 0, timeout: Optional[float] = None): """ overwritten to stop any running sideloading server instance/threads on shutdown of this app instance. :param exit_code: set application OS exit code - see :meth:`~ae.core.AppBase.shutdown`. :param timeout: timeout float value in seconds - see :meth:`~ae.core.AppBase.shutdown`. """ self.stop_server() super().shutdown(exit_code=exit_code, timeout=timeout)
[docs] def start_server(self, file_mask: str = "", threaded: bool = False, progress: Callable = update_handler_progress ) -> str: """ start http file sideloading server to run until :meth:`~.stop_server` get called. :param file_mask: optional glob.glob file mask to specify the sideloading file. If not passed then the :data:`DEFAULT_FILE_MASK` will be used which specifies an APK file of the main app situated in the `Downloads` folder of the device. The sideloading server will only be started if the file mask matches exactly one file. The file path of the matched file can be accessed via the :attr:`~SideloadingServerApp.file_path` attribute. :param threaded: optionally pass True to use separate thread to run the server instance. :param progress: optional callback executed for each transferred/side-loaded chunk. If not specified then the default callback method :func:`update_handler_progress` will be called to update the progress attributes of the request handler, which can be polled via the :meth:`~SideloadingServerApp.client_progress` method. :return: empty string/"" if server instance/thread got started else the error message string. .. note:: if :paramref:`~.start_server.threaded` is `True` then the :paramref:`~.start_server.progress` callback get executed in its own thread. """ pre = "SideloadingServerApp.start_server()" if not file_mask: file_mask = self.get_opt('file_mask', DEFAULT_FILE_MASK) self.progress_callback = progress self.server_instance = self.server_thread = None self.client_handlers = {} self.load_requests = [] if requests_lock.locked(): # pragma: no cover requests_lock.release() # release before self.log() call (new acquiring) self.log('print', f"{pre}: released requests lock") if clients_lock.locked(): # pragma: no cover self.log('print', f"{pre}: releasing clients lock") clients_lock.release() self.log('debug', f"{pre}: {'threaded' if threaded else ''} with file mask='{file_mask}' and cb={progress}") err_msg = "" try: files = glob.glob(normalize(file_mask)) file_count = len(files) if file_count != 1: return f"{pre}: {str(file_count) if file_count else 'no'}{FILE_COUNT_MISMATCH}{file_mask}" self.file_path = files[0] server_address = (self.get_opt('bind'), self.get_opt('port')) ThreadingHTTPServer.allow_reuse_address = True SimpleHTTPRequestHandler.protocol_version = "HTTP/1.0" self.server_instance = ThreadingHTTPServer(server_address, SimpleHTTPRequestHandler) self.server_instance.timeout = SHUTDOWN_TIMEOUT self.log('verbose', f"{pre}: {self.file_path} (ip,port)={server_address}" f"/{self.server_instance.server_address}" f"/{self.server_instance.socket.getsockname()}") tct = threading.current_thread() if threaded: # Start a thread with the server -- that thread will then start one more thread for each request self.server_thread = threading.Thread(name="SideloadingTrd", target=self.server_instance.serve_forever) self.server_thread.start() self.log('verbose', f"{pre}: started server from thread={tct.name} in thread={self.server_thread.name}") else: self.log('verbose', f"{pre}: starting server loop; blocking this main thread={tct.name}") self.server_thread = tct self.server_instance.serve_forever() except (IOError, OSError, Exception) as ex: err_msg = f"{pre}: exception {ex}" self.log('print', err_msg) self.server_instance = self.server_thread = None return err_msg
[docs] def stop_server(self): """ stop/pause sideloading server - callable also if not running to reset/prepare this app instance. """ pre = "SideloadingServerApp.stop_server()" if requests_lock.locked(): # pragma: no cover requests_lock.release() # release before self.log() call (new acquiring) self.log('print', f"{pre}: released requests lock") if clients_lock.locked(): # pragma: no cover self.log('print', f"{pre}: releasing clients lock") clients_lock.release() self.log('verbose', f"{pre}") if getattr(self, 'server_instance', False) and getattr(self, 'server_thread', False): if self.server_thread == threading.current_thread(): # pragma: no cover thread = threading.Thread(name="StopSideloadingServerThread", target=self.server_instance.shutdown) thread.start() thread.join(timeout=SHUTDOWN_TIMEOUT) if thread.is_alive(): self.log('print', f"{pre}: server shutdown thread join timed out") else: self.server_instance.shutdown() self.server_thread.join(timeout=SHUTDOWN_TIMEOUT) if self.server_thread.is_alive(): self.log('print', f"{pre}: server thread join timed out") # pragma: no cover self.server_instance = self.server_thread = None
if __name__ == '__main__': # pragma: no cover server_app = server_factory() server_app.run_app() server_app.start_server()