Source code for pyscenario.client

"""
IFSEI Telnet Client Module.

This module contains the IFSEITelnetClient class, a custom TelnetClient for handling
communication with IFSEI devices over Telnet. It manages the connection, sending,
and receiving of data, and handles connection loss gracefully.

"""

import asyncio
import logging
from collections.abc import Callable
from typing import Any

from telnetlib3 import TelnetClient, TelnetReader, TelnetWriter

from pyscenario import Protocol, QueueManager, TaskManager
from pyscenario.const import IFSEI_ATTR_SEND_DELAY, RESPONSE_TERMINATOR

logger = logging.getLogger(__name__)


[docs] class IFSEITelnetClient(TelnetClient): """ Custom TelnetClient for handling IFSEI device communication. This client handles the connection, sending, and receiving of data to and from an IFSEI device over a Telnet connection. It manages tasks for sending and receiving data, and can handle connection loss gracefully. Attributes ---------- protocol : Protocol The communication protocol (TCP/UDP). task_manager : TaskManager Manager for send and receive tasks. queue_manager : QueueManager Manager for send and receive queues. send_delay : float Delay between sending commands. shell : Callable Shell function to run asynchronously. on_connection_lost_callback : Callable | None Callback when connection is lost. """
[docs] def __init__( self, queue_manager: QueueManager, on_connection_lost_callback: Callable[[], None] | None, *args: tuple, **kwds: Any, ) -> None: """ Initialize the IFSEITelnetClient. Parameters ---------- queue_manager : QueueManager Manager for send and receive queues. on_connection_lost_callback : Callable | None Callback when connection is lost. *args : tuple Additional arguments for the TelnetClient. **kwds : Any Additional keyword arguments for the TelnetClient. """ super().__init__(*args, **kwds) self.protocol: Protocol = Protocol.TCP self.task_manager = TaskManager() self.queue_manager = queue_manager self.send_delay = IFSEI_ATTR_SEND_DELAY self.shell = self._async_run_shell self.on_connection_lost_callback: Callable[[], None] | None = ( on_connection_lost_callback )
async def _async_run_shell(self, reader: TelnetReader, writer: TelnetWriter): """ Run the asynchronous shell for communication. Parameters ---------- reader : TelnetReader Telnet reader object. writer : TelnetWriter Telnet writer object. """ await self._async_start_tasks() async def _async_start_tasks(self): """Start the asynchronous tasks for sending and receiving data.""" self._stop_tasks() logger.debug("[protocol=%s] starting send/receive tasks", self.protocol.name) self.task_manager.send_task = asyncio.create_task(self._async_send_data()) self.task_manager.receive_task = asyncio.create_task(self._async_receive_data()) def _stop_tasks(self): """Stop the currently running asynchronous tasks.""" try: if self.task_manager.send_task is not None: self.task_manager.send_task.cancel() self.task_manager.send_task = None logger.debug("send task cancel requested") except asyncio.CancelledError: logger.debug("send task already cancelled") try: if self.task_manager.receive_task is not None: self.task_manager.receive_task.cancel() self.task_manager.receive_task = None logger.debug("receive task cancel requested") except asyncio.CancelledError: logger.debug("receive task already cancelled") async def _async_send_data(self): """Send data to the IFSEI device from the queue.""" try: logger.debug("[protocol=%s] starting data sending loop", self.protocol.name) while True: command = await self.queue_manager.send_queue.get() logger.debug("[cmd=%s] dequeued, sending", command) if self.protocol == Protocol.TCP: await self._async_send_command_tcp(command) elif self.protocol == Protocol.UDP: raise NotImplementedError if self.writer.connection_closed: logger.debug("[cmd=%s] writer closed, send loop ending", command) break await asyncio.sleep(self.send_delay) except asyncio.CancelledError: logger.debug("send task cancelled") async def _async_send_command_tcp(self, command: str) -> None: """ Send a command using TCP. Parameters ---------- command : str Command to be sent. """ try: self.writer.write(command + "\r") await self.writer.drain() except ConnectionResetError: logger.error("[cmd=%s] connection reset while sending", command) raise except Exception as e: logger.error("[cmd=%s] failed to send (TCP): %s", command, e) raise else: logger.debug("[cmd=%s] sent (TCP)", command) def _send_command_udp(self, command: str) -> None: """ Send a command using UDP. Parameters ---------- command : str Command to be sent. Raises ------ NotImplementedError UDP protocol is not implemented. """ raise NotImplementedError async def _async_receive_data(self): """Receive data from the IFSEI device.""" try: logger.debug( "[protocol=%s] starting data receiving loop", self.protocol.name ) while True: response = await self._async_read_until_prompt() if response: logger.debug("[response=%s] enqueueing for processing", response) await self.queue_manager.receive_queue.put(response) if self.reader.connection_closed: logger.debug("reader closed, receive loop ending") break except Exception as e: logger.error("error receiving data: %s", e) raise async def _async_read_until_prompt(self) -> str: """ Read data from the IFSEI device until a prompt is received. Returns ------- str The response received from the device. """ try: response = "" while True: if self.protocol == Protocol.TCP: char = await self.reader.read(1) elif self.protocol == Protocol.UDP: raise NotImplementedError response += char if ( response.endswith(RESPONSE_TERMINATOR) or self.reader.connection_closed ): break if response.endswith(RESPONSE_TERMINATOR): response = response[: -len(RESPONSE_TERMINATOR)] response = response.rstrip("\r\n") if response: logger.debug("[response=%s] received (raw)", response) return response except asyncio.exceptions.CancelledError: logger.debug("data receiving loop cancelled") raise except Exception as e: logger.error("error reading data: %s", e) raise
[docs] def connection_lost(self, exc: None | Exception, /) -> None: """ Handle connection loss. Parameters ---------- exc : None | Exception Exception that caused the connection loss. """ logger.debug("connection_lost callback fired (exc=%s)", exc) super().connection_lost(exc) self._stop_tasks() if self.on_connection_lost_callback is not None: self.on_connection_lost_callback()
[docs] async def async_close(self) -> None: """Disconnect from the IFSEI device.""" try: self._stop_tasks() self.writer.close() self.reader.close() logger.info("disconnected from IFSEI") except Exception as e: logger.error("failed to disconnect: %s", e) raise