"""
transfer client and server services
===================================
this ae portion is providing client and server services to transfer files and text messages between two devices in
the same local network.
the number of parallel running file and message transfers is only limited by the available resources of the involved
devices.
if a file transfers gets interrupted it can be recovered later and without the need to resend the already transferred
file content.
standard file paths - like e.g. the documents or downloads folders - are getting automatically adopted to the specific
path structures of each involved device and operating system.
transfer service life cycle
---------------------------
the transfer service can be invoked in different ways: standalone as a separate process or attached and embedded into
a controlling application.
run transfer service in standalone mode
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
execute this module to run the transfer services as a separate standalone process via::
python transfer_service.py [--bind=...] [--port=...] [--buf_len=...]
the following command line options are overwriting the default server address and socket buffer length (see also
:func:`service_factory`):
* 'bind' to restrict the incoming connections to an ip address/range (overwriting the default :data:`SERVER_BIND`).
* 'port' to specify the socket port (overwriting the default port :data:`SERVER_PORT`).
* 'buf_len' to specify the socket buffer length (overwriting the default buffer length :data:`SOCKET_BUF_LEN`).
after that the transfer service will be able to receive files send from another process or device.
.. note::
on Android a standalone transfer service has to be started as android service.
run transfer service attached to any app
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
alternatively you can run the transfer service server in a separate thread within respectively attached to your
application::
from ae.transfer_service import service_factory
transfer_service_app = service_factory()
transfer_service_app.set_option('port', 12345, save_to_config=False)
transfer_service_app.set_option('buf_len', 34567, save_to_config=False)
transfer_service_app.start_server(threaded=True)
pause or stop transfer service
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
to manually pause the transfer service, store the app instance of the transfer service app
(`transfer_service_app` in the example above) and call its :meth:`~TransferServiceApp.stop_server` method::
transfer_service_app.stop_server()
to fully stop the transfer service and terminate the transfer service app call instead its
:meth:`~TransferServiceApp.shutdown` method::
transfer_service_app.shutdown()
.. hint::
the :meth:`~ae.core.AppBase.shutdown` method of the base app instance (:class:`~ae.core.AppBase`) automatically
ensures a clean shutdown of the transfer service server on app quit/exit.
send file to another transfer service server
--------------------------------------------
to send files from one transfer server to another running transfer server, a separate client process has to be
started on the device storing the file to be send.
to initiate the file transfer the client process has to make a tcp connection to the transfer server running on the
same device, specifying the path of the file to send and the remote ip of the receiving transfer server and finally call
the remote procedure `send_file` like shown in the following example::
import socket
from ae.transfer_service import connect_and_request
request_kwargs = dict(method_name='send_file', file_path='path_to_file/file_name.ext', remote_ip='192.168.3.123')
with socket.socket() as sock:
response_kwargs = connect_and_request(sock, request_kwargs)
if 'error' in response_kwargs:
# handle error (e.g. display to user or add to a log file)
if the file transfer failed then the transfer kwargs dict returned by :func:`connect_and_request` will contain an
`error` key containing the error message text.
implemented remote procedures
-----------------------------
the following remote procedures are provided by the transfer service server:
* `cancel_request`: cancel running file transfer.
* `pending_requests`: get log info the progress/status of all currently running file transfers.
* `recv_file`: receive file from other transfer service server.
* `recv_message`: receive text message from other transfer service server.
* `send_file`: send file to other transfer service server.
* `send_message`: send text message to other transfer service server.
.. hint::
the demo app `ComPartY <https://gitlab.com/ae-group/comparty>`_ is using all provided remote procedures.
"""
import ast
import datetime
import os
import socket
import threading
from copy import deepcopy
from socketserver import StreamRequestHandler, ThreadingTCPServer
from typing import Any, Callable, Dict, List, Optional, Union
from ae.base import DATE_TIME_ISO, UNSET, os_local_ip # type: ignore
from ae.files import copy_bytes # type: ignore
from ae.paths import PATH_PLACEHOLDERS, normalize, placeholder_path, series_file_name # type: ignore
from ae.deep import deep_replace # type: ignore
from ae.console import ConsoleApp # type: ignore
__version__ = '0.3.10'
CONNECTION_TIMEOUT = 2.7 #: default timeout (in seconds) to connect and request a server process
CONNECT_ERR_PREFIX = "transfer_service.connect_and_request() exception "
""" error message string prefix if error happened directly in :func:`connect_and_request` helper (no protocol error).
"""
ENCODING_KWARGS = dict(encoding='UTF-8', errors='ignore') #: default encoding and encoding error handling strategy
SERVER_BIND = "" #: setting BIND to '' or None to allow connections for all available interfaces
SERVER_PORT = 36969 #: server listening port
SHUTDOWN_TIMEOUT = 3.9 #: timeout (in seconds) to shutdown/stop the console app
SOCKET_BUF_LEN = 16 * 1024 #: buf length for socket receives and sends
TRANSFER_KWARGS_DATE_TIME_NAME_PARTS = ('_date', '_time') #: kwarg name suffix for values automatically converted
TRANSFER_KWARGS_LINE_END_CHAR = "\n" #: end of line/command character as string
TRANSFER_KWARGS_LINE_END_BYTE = bytes(TRANSFER_KWARGS_LINE_END_CHAR, **ENCODING_KWARGS) #: .. and as byte value
# InetAddress = Tuple[str, int] #: for socket.connect / server.bind
TransferKwargs = Dict[str, Any] #: command/action format of requests and responses
requests_lock = threading.Lock() #: locking requests TransferKwargs in :attr:`TransferServiceApp.reqs_and_logs`
server_app: Optional['TransferServiceApp'] = None #: transfer service server app
[docs]def clean_log_str(log_str: Union[str, bytes]) -> str:
""" remove high-commas and backslashes from the passed string to add it to the logs (preventing \\ duplicates).
:param log_str: log string or bytes array to clean up.
:return: cleaned log string.
"""
if isinstance(log_str, bytes):
log_str = str(log_str, **ENCODING_KWARGS)
log_str = log_str.replace("\\n", "")
log_str = log_str.replace("\n", "")
log_str = log_str.replace("\r", "")
log_str = log_str.replace("\\", "")
log_str = log_str.replace("'", "")
return log_str.replace('"', "")
[docs]def connect_and_request(sock: socket.socket, request_kwargs: TransferKwargs,
buf_len: int = SOCKET_BUF_LEN, timeout: Optional[float] = CONNECTION_TIMEOUT) -> TransferKwargs:
""" connect to remote, send first command/action and return response as transfer kwargs dict.
:param sock: new socket instance.
:param request_kwargs: first/initial request kwargs dict. if the key `'server_address'` is not provided (with
the server address as (host, ip) tuple), then ('localhost', SERVER_PORT) is used.
if the key 'local_ip' is not specified then the local ip address will be used.
:param buf_len: socket buffer length. if not passed then :data:`SOCKET_BUF_LEN` will be used. pass
zero/0 to use the buf length defined via the 'buf_len' command line option.
:param timeout: timeout in seconds or None to use socket/system default timeout. if not passed
then the default timeout specified by :data:`CONNECTION_TIMEOUT` will be used.
:return: response transfer kwargs dict.
"""
if 'local_ip' not in request_kwargs:
request_kwargs['local_ip'] = os_local_ip()
server_app and server_app.vpo(f"transfer_service.connect_and_request(): timeout={timeout} req={request_kwargs}")
try:
sock.settimeout(timeout)
sock.connect(request_kwargs.get('server_address', ('localhost', SERVER_PORT)))
sock.sendall(bytes(transfer_kwargs_literal(request_kwargs), **ENCODING_KWARGS))
response_lit = str(recv_bytes(sock, buf_len=buf_len), **ENCODING_KWARGS)[:-1]
return transfer_kwargs_from_literal(response_lit)
except (Exception, IOError, OSError, SyntaxError, ValueError) as ex:
request_kwargs['error'] = CONNECT_ERR_PREFIX + f"{ex} processing received request {request_kwargs}"
return request_kwargs
[docs]def recv_bytes(sock: socket.socket, buf_len: int = SOCKET_BUF_LEN) -> bytes:
""" receive all bytes from the passed client socket instance until connection lost or line end reached.
:param sock: socket instance.
:param buf_len: socket buffer length. if not passed then :data:`SOCKET_BUF_LEN` will be used. pass
zero/0 to use the buf length defined via the 'buf_len' command line option.
:return: received bytes.
"""
pre = "transfer_service.recv_bytes()"
if not buf_len:
buf_len = server_app.get_opt('buf_len') if server_app else SOCKET_BUF_LEN
server_app and server_app.vpo(f"{pre}: sock={sock} buf_len={buf_len} ... waiting for receive")
buf = b""
while True:
chunk = sock.recv(buf_len)
server_app and server_app.vpo(f"{pre}: received {len(chunk)} bytes chunk=({clean_log_str(chunk)}); sock={sock}")
buf += chunk
if buf[-1:] == TRANSFER_KWARGS_LINE_END_BYTE:
server_app and server_app.vpo(f"{pre}: received end-of-line-char from socket {sock}")
break
if not chunk: # pragma: no cover
server_app and server_app.vpo(f"{pre}: received empty chunk from socket {sock}")
buf = bytes(transfer_kwargs_literal(dict(error=f"{pre}: empty chunk error")), **ENCODING_KWARGS)
break
return buf
[docs]def service_factory(task_id_func: Optional[Callable[[str, str, str], str]] = None) -> 'TransferServiceApp':
""" create server app instance including the command line options `bind` and `port`.
:param task_id_func: callable to return an unique id for a transfer request task.
:return: transfer service app instance.
"""
global server_app #: singleton server instance
server_app = TransferServiceApp(app_name='transfer_service', multi_threading=True, disable_buffering=True)
server_app.add_option('bind', "server bind address", SERVER_BIND, 'b')
server_app.add_option('port', "server listening port", SERVER_PORT, 'p')
server_app.add_option('buf_len', "socket buffer length", SOCKET_BUF_LEN, 'l')
if task_id_func:
# noinspection PyTypeHints
server_app.id_of_task = task_id_func # type: ignore
return server_app
[docs]def transfer_kwargs_error(transfer_kwargs: TransferKwargs, err_msg: str):
""" add/append error to transfer kwargs dict without overwriting any previous error.
:param transfer_kwargs: request/response transfer kwargs dict.
:param err_msg: error message to add.
"""
old_err = transfer_kwargs.get('error', "")
if old_err:
err_msg = f"{old_err}+++{err_msg}"
transfer_kwargs['error'] = err_msg
[docs]def transfer_kwargs_from_literal(transfer_kwargs_lit: str) -> TransferKwargs:
""" convert dict literal (created with transfer_kwargs_literal()) to transfer kwargs dict.
:param transfer_kwargs_lit: request/response dict literal string.
:return: transfer kwargs dict.
"""
transfer_kwargs = ast.literal_eval(transfer_kwargs_lit)
deep_replace(
transfer_kwargs,
lambda dp, key, val:
datetime.datetime(*val)
if isinstance(key, str) and any(fragment in key for fragment in TRANSFER_KWARGS_DATE_TIME_NAME_PARTS)
else UNSET,
key_filter=lambda index_or_attr: False # allow also kwarg key names with a leading underscore character
)
return transfer_kwargs
[docs]def transfer_kwargs_literal(transfer_kwargs: TransferKwargs) -> str:
""" convert dict to str literal to be sent via sockets (re-instantiable via transfer_kwargs_from_literal()).
.. note::
to ensure security (and prevent injections) only the following basic types can be used: `int`, `float`,
`boolean`, `str`, `bytes`, `list`, `tuple` and `dict`. date/time values are only allowed as dict value and of
the type `datetime.datetime`; additionally the key of this dict value has to contain one of the fragments
defined in :data:`TRANSFER_KWARGS_DATE_TIME_NAME_PARTS`.
:param transfer_kwargs: request/response transfer kwargs dict.
:return: literal string of transfer kwargs dict terminated with TRANSFER_KWARGS_LINE_END_CHAR.
"""
transfer_kwargs = deepcopy(transfer_kwargs)
deep_replace(
transfer_kwargs,
lambda pd, key, val:
tuple(val.timetuple())[:7]
if isinstance(key, str) and any(fragment in key for fragment in TRANSFER_KWARGS_DATE_TIME_NAME_PARTS)
else UNSET,
key_filter=lambda index_or_attr: False # allow also kwarg key names with a leading underscore character
)
ret = str(transfer_kwargs).replace(TRANSFER_KWARGS_LINE_END_CHAR, "\\n") + TRANSFER_KWARGS_LINE_END_CHAR
return ret
[docs]def transfer_kwargs_update(*variables, **kwargs):
""" update multiple transfer dicts with the same keys/values (locking with requests_lock).
:param variables: transfer kwargs variables/dicts.
:param kwargs: kwargs to update.
"""
requests_lock.acquire()
for var in variables:
var.update(**kwargs)
requests_lock.release()
[docs]class ThreadedTCPRequestHandler(StreamRequestHandler):
""" server request handler.
self.rfile is a file-like object created by the handler; to use e.g. readline() instead of raw recv()
likewise, self.wfile is a file-like object used to write back to the client.
"""
[docs] def handle(self):
""" handle a single request """
pre = "ThreadedTCPRequestHandler.handle() "
try:
request_lit = str(self.rfile.readline(), **ENCODING_KWARGS)[:-1]
server_app.vpo(f"{pre}request len={len(request_lit)}; req={clean_log_str(request_lit)}")
response_lit = server_app.response_to_request(request_lit, self)
server_app.vpo(f"{pre}response len={len(response_lit)}; res={clean_log_str(response_lit)}")
self.wfile.write(bytes(response_lit, **ENCODING_KWARGS))
except (IOError, OSError, Exception) as ex:
server_app.log('print', f"{pre}error: {ex}")
[docs]class TransferServiceApp(ConsoleApp):
""" server service app class """
reqs_and_logs: List[TransferKwargs] #: list of transfer_kwargs of currently processed requests
server_instance: Optional[ThreadingTCPServer] #: server class instance
server_thread: Optional[threading.Thread] #: server thread (main or separate thread)
[docs] def cancel_request(self, request_kwargs: TransferKwargs, handler: StreamRequestHandler) -> TransferKwargs:
""" cancel running request.
:param request_kwargs: request transfer kwargs dict, specifying via `rt_id_to_cancel` the request to cancel.
:param handler: request handler instance.
:return: response kwargs, having `error` key if request could not be found/cancelled.
"""
pre = "TransferServiceApp.cancel_request()"
rt_id_to_cancel = request_kwargs['rt_id_to_cancel']
msg = f"'{rt_id_to_cancel}' from client {handler.client_address}"
self.log('debug', f"{pre}: {msg}")
response_kwargs = request_kwargs.copy()
requests_lock.acquire()
for req in self.reqs_and_logs:
if req['rt_id'] == rt_id_to_cancel:
req['error'] = msg + " cancelled"
request_kwargs['completed'] = True
break
else:
response_kwargs['error'] = msg + " not found/cancelled"
requests_lock.release()
return response_kwargs
[docs] @staticmethod
def id_of_task(action: str, object_type: str, object_key: str) -> str:
""" compile the id of a transfer request task.
:param action: action or log level string.
:param object_type: task object type ('log' for log entries, 'file' for file transfers, else 'message').
:param object_key: task key (file path for file transfers, message string for messages and timestamp
for log entries).
:return: unique key identifying the task/log-entry.
"""
return action + '_' + object_type + ':' + object_key
[docs] def log(self, log_level: str, message: str):
""" print log message and add it to reqs_and_logs (to be read by controller app).
.. note::
please note that you have to use :func:`print` or one of the console print methods of the :class:`AppBase`
(like e.g. :meth:`~AppBase.verbose_out`, respective self.vpo) instead of this method for the logging of
low level transport methods/functions (like e.g. :meth:`~TransferServiceApp.pending_requests`,
:meth:`~TransferServiceApp.response_to_request`, :meth:`~ThreadedTCPRequestHandler.handle` or
:func:`recv_bytes`). this will prevent the duplication of a log message, because each call of this method
creates a new entry in :attr:`~TransferServiceApp.reqs_and_logs` which will be sent to the controlling app
via the low level transport methods (which would recursively grow the sent messages until the system
freezes), especially if the transfer kwargs are included into the log message.
:param log_level: 'print' always prints, 'debug' prints if self.debug, 'verbose' prints if self.verbose.
:param message: message to print.
"""
out_method = getattr(self, log_level + '_out', None) # calling print() via self.po/.dpo/.vpo()
if callable(out_method):
out_method(("" if self.active_log_stream else f"{threading.current_thread().name: <15}") + f"{message}")
if getattr(self, log_level, True) and hasattr(self, 'reqs_and_logs'):
log_time = datetime.datetime.now()
requests_lock.acquire()
self.reqs_and_logs.append(dict(
method_name=log_level + '_' + 'log', message=message, completed=True,
log_time=log_time, rt_id=self.id_of_task(log_level, 'log', log_time.strftime(DATE_TIME_ISO))))
requests_lock.release()
[docs] def pending_requests(self, request_kwargs: TransferKwargs, handler: StreamRequestHandler) -> TransferKwargs:
""" determine currently running/pending server requests and debug log messages (in debug mode only).
:param request_kwargs: request transfer kwargs dict.
:param handler: request handler class instance.
:return: response transfer kwargs dict keys.
"""
self.vpo(f"TransferServiceApp.pending_requests {request_kwargs} from {handler.client_address}")
requests_lock.acquire()
# copy all log messages and pending transfer requests (the pending_requests get not added to reqs_and_logs)
request_kwargs['pending_requests'] = self.reqs_and_logs.copy()
# remove completed transfers, cancellations, errors and debug log messages (just passed to the controlling app)
self.reqs_and_logs[:] = [_ for _ in self.reqs_and_logs if 'completed' not in _ and 'error' not in _]
requests_lock.release()
return request_kwargs
# uncomment the following method for verbose logging/debugging of threading issues
# def po(self, *objects, **kwargs):
# """ overwritten to add thread name to console printouts. """
# if not self.active_log_stream:
# objects = (f"{threading.current_thread().name: <18}", ) + objects
# super().po(*objects, **kwargs)
[docs] def recv_file(self, request_kwargs: TransferKwargs, handler: StreamRequestHandler) -> TransferKwargs:
""" receive binary file content from client.
:param request_kwargs: request transfer kwargs dict with the following keys:
* `'file_path'`: file path (can contain path placeholders).
* `'total_bytes'`: total file length in bytes.
* `'series_file'`: optionally, pass any value to ensures new file name.
:param handler: request handler class instance.
:return: response transfer kwargs dict keys (request kwargs extended with additional keys):
* `'error'`: error message string if an error occurred.
* `'transferred_bytes'`: start offset on recovered transfer (previously received bytes).
"""
file_path = normalize(request_kwargs['file_path'], make_absolute=False, resolve_sym_links=False)
file_folder, file_name = os.path.split(file_path)
if not os.path.exists(file_folder):
file_folder = PATH_PLACEHOLDERS['downloads']
recv_file = os.path.join(file_folder, file_name)
if request_kwargs.get('series_file'):
recv_file = request_kwargs['series_file_name'] = series_file_name(recv_file)
file_length = request_kwargs['total_bytes']
pre = "TransferServiceApp.recv_file()"
self.log('debug', f"{pre}: receive '{recv_file}' ({file_length} bytes) from client {handler.client_address}")
if not recv_file or not file_length:
request_kwargs['error'] = f"{pre} called without file name/length arguments"
return request_kwargs
if not os.path.exists(recv_file):
start_offset = 0
elif not os.path.isfile(recv_file):
request_kwargs['error'] = f"{pre} destination {recv_file} is not a file"
return request_kwargs
else:
with open(recv_file, 'ab+') as file_handle:
file_handle.seek(0, 2)
start_offset = file_handle.tell() # ==os.fstat(...).st_size; tell() faster: EOF seek anyway needed
self.log('debug', f"{pre}: recovering interrupted transfer at position {start_offset}")
transfer_kwargs_update(request_kwargs, transferred_bytes=start_offset)
if start_offset >= file_length:
msg = "already transferred" if start_offset == file_length else f"size err {start_offset}>{file_length}"
self.log('print', f"{pre}: file {msg}")
request_kwargs['error'] = f"{pre}: file {msg}"
return request_kwargs
response_kwargs = request_kwargs.copy()
handler.wfile.write(bytes(transfer_kwargs_literal(response_kwargs), **ENCODING_KWARGS))
def _progress(**kwargs) -> str:
""" copy_bytes progress callback.
:param kwargs: transfer kwargs to update.
:return: error message string if error (from other thread) detected, else empty string.
"""
self.vpo(f"{pre}._progress(): copy bytes progress kwargs={kwargs}")
if 'error' in request_kwargs: # pragma: no cover
return f"{pre}._progress(): error in request kwargs={request_kwargs}" # cancel transfer
transfer_kwargs_update(request_kwargs, response_kwargs, **kwargs)
return ""
errors: List[str] = []
copy_bytes(handler.rfile, recv_file, total_bytes=file_length, transferred_bytes=start_offset,
buf_size=self.get_opt('buf_len'), recoverable=True, errors=errors, progress_func=_progress)
if errors:
transfer_kwargs_update(request_kwargs, response_kwargs, error="\n".join(errors))
return response_kwargs
[docs] def recv_message(self, request_kwargs: TransferKwargs, handler: StreamRequestHandler) -> TransferKwargs:
""" receive message from client/peer. """
msg = request_kwargs['message']
self.log('debug', f"TransferServiceApp.recv_message '{msg}' from client {handler.client_address}")
response_kwargs = request_kwargs.copy()
response_kwargs['transferred_bytes'] = len(msg)
return response_kwargs
[docs] def response_to_request(self, request_lit: str, handler: StreamRequestHandler) -> str:
""" process request to this server and return response string.
.. note:: this method is running in a separate thread (created by the server to process this request).
:param request_lit: request string, which is a dict literal with `'method_name'` key.
:param handler: stream request handler instance.
:return: response string with transfer kwargs as literal.
if an error occurred then the returned response kwargs contains an `'error'` key
which stores the related error message.
"""
pre = "TransferServiceApp.response_to_request()"
self.vpo(f"{pre} from client={handler.client_address} request={request_lit}")
try:
request_kwargs = transfer_kwargs_from_literal(request_lit)
except (KeyError, SyntaxError, ValueError) as ex:
self.po(f"{pre} exception {ex} on eval of request literal {request_lit[:180]}...")
response_kwargs = dict(error=f"{pre} exception='{ex}' in parsing the request literal '{request_lit}'")
else:
response_kwargs = {} # default response if exception get raised
method_name = request_kwargs['method_name']
try:
if method_name != 'pending_requests':
requests_lock.acquire()
self.reqs_and_logs.append(request_kwargs) # removed in pending_requests()
requests_lock.release()
response_kwargs = getattr(self, method_name)(request_kwargs, handler)
if not response_kwargs:
raise IOError(f"{method_name} call returned empty response={response_kwargs}; req={request_kwargs}")
if 'error' in response_kwargs: # cancelled by other peer
transfer_kwargs_error(request_kwargs, response_kwargs['error'])
elif 'error' in request_kwargs: # cancel_request() called by this peer
transfer_kwargs_error(response_kwargs, request_kwargs['error'])
elif 'transferred_bytes' in response_kwargs: # update pending requests progress
transferred = response_kwargs['transferred_bytes']
requests_lock.acquire()
if transferred and transferred == request_kwargs.get('total_bytes', 0):
request_kwargs['transferred_bytes'] = transferred
request_kwargs['completed'] = True
requests_lock.release()
except (KeyError, IOError, OSError, SyntaxError, ValueError, Exception) as ex:
self.log('print', f"{pre} {method_name} exception {ex}; req={request_kwargs}; res={response_kwargs}")
if not response_kwargs:
response_kwargs = request_kwargs.copy()
transfer_kwargs_error(response_kwargs, f"{pre} exception '{ex}' req={request_kwargs}")
return transfer_kwargs_literal(response_kwargs)
[docs] def send_file(self, request_kwargs: TransferKwargs, handler: StreamRequestHandler) -> TransferKwargs:
""" send binary file content to remote server. """
pre = "TransferServiceApp.send_file()"
file_path = request_kwargs['file_path']
try:
with open(file_path, 'rb') as file_handle:
content = file_handle.read()
count = len(content)
except (FileNotFoundError, IOError, OSError) as ex:
self.log('print', f"{pre} exception {ex} on reading {file_path} file content; request={request_kwargs}")
request_kwargs['error'] = f"{pre} exception {ex} while reading content of file {file_path} to be send"
return request_kwargs
transfer_kwargs_update(request_kwargs, total_bytes=count, transferred_bytes=0)
if not count:
request_kwargs['error'] = f"file {file_path} is empty"
return request_kwargs
self.log('debug', f"{pre} {count} bytes on behalf of {handler.client_address}; req={request_kwargs}")
file_path = placeholder_path(request_kwargs['file_path'])
local_ip = request_kwargs['local_ip']
remote_ip = request_kwargs['remote_ip']
recv_kwargs = dict(method_name='recv_file', file_path=file_path,
transferred_bytes=0, total_bytes=request_kwargs['total_bytes'],
remote_ip=remote_ip, local_ip=local_ip,
server_address=(remote_ip, SERVER_PORT),
rt_id=self.id_of_task('recv', 'file', file_path + '@' + local_ip))
if remote_ip == local_ip:
recv_kwargs['series_file'] = True # create duplicate for debugging and testing
with socket.socket() as sock: # use socket default args: socket.AF_INET, socket.SOCK_STREAM
response_kwargs = connect_and_request(sock, recv_kwargs, buf_len=0)
self.log('verbose', f"{pre} received response to {recv_kwargs['method_name']} method: {response_kwargs}")
if 'error' in response_kwargs:
return response_kwargs
requests_lock.acquire()
offset = request_kwargs['transferred_bytes'] = response_kwargs['transferred_bytes']
requests_lock.release()
if offset: # pragma: no cover
self.log('debug', f"{pre} recovering interrupted transfer at offset {offset}")
content = content[offset:]
# instead of sock.sendall(content) send in chunks to allow progress display
buf_len = self.get_opt('buf_len')
while offset < count and 'error' not in request_kwargs:
chunk = content[:buf_len]
sock.send(chunk)
offset += len(chunk)
transfer_kwargs_update(request_kwargs, response_kwargs, transferred_bytes=offset)
content = content[buf_len:]
return response_kwargs
[docs] def send_message(self, request_kwargs: TransferKwargs, handler: StreamRequestHandler) -> TransferKwargs:
""" send message to remote server. """
pre = "TransferServiceApp.send_message()"
self.log('debug', f"{pre} {request_kwargs} on behalf of {handler.client_address}")
requests_lock.acquire()
msg = request_kwargs['message']
request_kwargs['total_bytes'] = len(msg)
request_kwargs['transferred_bytes'] = 0
requests_lock.release()
recv_kwargs = request_kwargs.copy()
recv_kwargs['method_name'] = 'recv_message'
recv_kwargs['server_address'] = (recv_kwargs['remote_ip'], SERVER_PORT)
recv_kwargs['rt_id'] = self.id_of_task('recv', 'message', msg + '@' + recv_kwargs['local_ip'])
with socket.socket() as sock: # use socket default args: socket.AF_INET, socket.SOCK_STREAM
response_kwargs = connect_and_request(sock, recv_kwargs, buf_len=0)
self.log('verbose', f"{pre}: received response to {recv_kwargs['method_name']} method call: {response_kwargs}")
return response_kwargs
[docs] def shutdown(self, exit_code: Optional[int] = 0, timeout: Optional[float] = None):
""" overwritten to stop a running transfer service server/threads on shutdown of this app instance.
:param exit_code: set application OS exit code - see :meth:`~ae.core.AppBase.shutdown`.
:param timeout: timeout float value in seconds - see :meth:`~ae.core.AppBase.shutdown`.
"""
self.stop_server()
super().shutdown(exit_code=exit_code, timeout=timeout)
[docs] def start_server(self, threaded: bool = False) -> str:
""" start server and run until main app :meth:`~.stop_server`.
:param threaded: optionally pass True to use separate thread for the server process.
:return: empty string/"" if server instance/thread got started else error message string.
"""
pre = "TransferServiceApp.start_server()"
self.reqs_and_logs = []
self.server_instance = self.server_thread = None
if requests_lock.locked(): # pragma: no cover
requests_lock.release() # reset from crashed request
self.log('print', f"{pre}: released requests lock")
self.log('debug', f"{pre}: threaded={threaded}")
err_msg = ""
try:
server_address = (self.get_opt('bind'), self.get_opt('port'))
ThreadingTCPServer.allow_reuse_address = True # patching class: https://stackoverflow.com/a/15278302/90580
self.server_instance = ThreadingTCPServer(server_address, ThreadedTCPRequestHandler)
self.log('verbose', f"{pre}: (ip,port)={server_address}/{self.server_instance.server_address}")
tct = threading.current_thread()
if threaded:
# start a thread with the server -- that thread will then start one more thread for each request
self.server_thread = threading.Thread(name="TransferThread", target=self.server_instance.serve_forever)
self.server_thread.start()
self.log('verbose', f"{pre}: server started from thread={tct.name} in thread={self.server_thread.name}")
else:
self.log('verbose', f"{pre}: starting server loop - using current thread={tct.name}")
self.server_thread = tct
self.server_instance.serve_forever()
except (IOError, OSError, Exception) as ex:
err_msg = f"{pre}: exception {ex}"
self.log('print', err_msg)
self.server_instance = self.server_thread = None
return err_msg
[docs] def stop_server(self):
""" stop/pause transfer service server - callable also if not running to reset/prepare this app instance. """
pre = "TransferServiceApp.stop_server"
if requests_lock.locked(): # pragma: no cover
requests_lock.release()
self.log('print', f"{pre}: released requests lock")
if getattr(self, 'server_instance', False) and getattr(self, 'server_thread', False):
if threading.current_thread() == self.server_thread: # pragma: no cover
thread = threading.Thread(name="StopTransferService", target=self.server_instance.shutdown)
thread.start()
thread.join(timeout=SHUTDOWN_TIMEOUT)
if thread.is_alive():
self.log('print', f"{pre}: server shutdown thread join timed out")
else:
self.server_instance.shutdown()
self.server_thread.join(timeout=SHUTDOWN_TIMEOUT)
if self.server_thread.is_alive(): # pragma: no cover
self.log('print', f"{pre}: server thread join timed out")
self.server_instance = self.server_thread = None
if __name__ == '__main__': # pragma: no cover
# create app instance, parse command line args and start server
server_app = service_factory()
server_app.run_app()
server_app.start_server()