src.transport.server.tcp_server 源代码

"""TCP 服务器模块。

本模块实现了 DLT645 协议的 TCP 服务器功能。
"""

import socket
import threading
import time
from typing import Optional

from ...common.transform import bytes_to_spaced_hex
from ...common.message_capture import MessageCapture
from ...protocol.protocol import DLT645Protocol
from ...transport.server.log import log


[文档] class TcpServer: """TCP 服务器类,用于与 DLT645 客户端进行 TCP 通信。 :ivar ip: 服务器 IP 地址。 :ivar port: 服务器端口号。 :ivar timeout: 连接超时时间(秒)。 :ivar ln: 监听套接字。 :ivar service: 服务实例,用于处理业务逻辑。 """ def __init__(self, ip: str, port: int, timeout: float, service): """初始化 TCP 服务器。 :param ip: 服务器 IP 地址(如 '0.0.0.0')。 :type ip: str :param port: 服务器端口号。 :type port: int :param timeout: 连接超时时间(秒)。 :type timeout: float :param service: 服务实例,用于处理业务逻辑。 :type service: Any """ self.ip = ip self.port = port self.timeout = timeout self.ln = None self.service = service self._server_thread = None self._running = False self._stop_event = threading.Event() # 跟踪所有活跃的客户端连接 self._connections = [] # 用于保护连接列表的锁 self._connections_lock = threading.Lock() # 报文捕获管理器 self._message_capture: Optional[MessageCapture] = None
[文档] def start(self): """启动TCP服务器(非阻塞,在后台线程中运行)""" if self._running: log.warning("TCP server is already running") return None self._stop_event.clear() self._running = True # 在后台线程中启动服务器 self._server_thread = threading.Thread(target=self._run_server, daemon=True) self._server_thread.start() # 等待服务器启动完成 time.sleep(0.1) log.info(f"TCP server starting in background on {self.ip}:{self.port}") return None
def _run_server(self): """服务器主循环(在后台线程中运行)""" try: # 创建 TCP 套接字 self.ln = socket.socket(socket.AF_INET, socket.SOCK_STREAM) # 设置地址可重用 self.ln.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) # 设置非阻塞超时,以便能够响应停止信号 self.ln.settimeout(1.0) # 绑定地址和端口 self.ln.bind((self.ip, self.port)) # 开始监听 self.ln.listen(5) log.info(f"TCP server started on port {self.port}") while not self._stop_event.is_set(): try: # 接受连接 conn, addr = self.ln.accept() log.info(f"Accepted connection from {addr}") # 设置超时时间 conn.settimeout(self.timeout) # 将连接添加到活跃连接列表 with self._connections_lock: self._connections.append(conn) # 启动新线程处理连接 threading.Thread( target=self.handle_connection, args=(conn,), daemon=True ).start() except socket.timeout: # 超时是正常的,继续检查停止信号 continue except socket.error as e: if self._stop_event.is_set(): break log.error(f"Failed to accept connection: {e}") if hasattr(e, "errno") and e.errno == 10038: # 套接字关闭错误 break except Exception as e: log.error(f"Failed to start TCP server: {e}") finally: self._running = False if self.ln: try: self.ln.close() except: pass log.info("TCP server stopped")
[文档] def stop(self): """停止TCP服务器""" if not self._running: log.warning("TCP server is not running") return None log.info("Shutting down TCP server...") # 设置停止信号 self._stop_event.set() # 主动关闭所有活跃的客户端连接 with self._connections_lock: for conn in self._connections: try: conn.close() log.info(f"Closed active connection: {conn}") except Exception as e: log.error(f"Error closing client connection: {e}") # 清空连接列表 self._connections.clear() # 关闭服务器套接字 if self.ln: try: self.ln.close() except Exception as e: log.error(f"Error closing server socket: {e}") # 等待服务器线程结束 if self._server_thread and self._server_thread.is_alive(): self._server_thread.join(timeout=5.0) if self._server_thread.is_alive(): log.warning("Server thread did not stop gracefully") self._running = False log.info("TCP server shutdown complete") return None
[文档] def is_running(self): """检查服务器是否正在运行""" return self._running
[文档] def handle_connection(self, conn): try: # 初始化数据缓冲区,用于累积接收的数据 data_buffer = bytearray() while not self._stop_event.is_set(): try: # 接收数据 buf = conn.recv(256) if not buf: continue # 将新接收的数据添加到缓冲区 data_buffer.extend(buf) log.info(f"RX: {bytes_to_spaced_hex(buf)} (len:{len(data_buffer)})") # 捕获接收的报文 current_tx_id: Optional[str] = None if self._message_capture: current_tx_id = self._message_capture.capture_rx_for_server(buf) # 尝试解析缓冲区中的数据 try: # 尝试从缓冲区中提取完整的帧 remaining_data, frame = ( DLT645Protocol.deserialize_with_remaining( bytes(data_buffer) ) ) if frame is not None: # 解析成功,更新缓冲区为剩余未解析的数据 data_buffer = bytearray(remaining_data) log.info( f"Successfully parsed frame, remaining buffer size: {len(remaining_data)}" ) # 业务处理 try: resp = self.service.handle_request(frame) # 响应 if resp: try: conn.sendall(resp) log.info( f"TX: {bytes_to_spaced_hex(resp)} (len:{len(resp)})" ) # 捕获发送的报文并与接收配对 if self._message_capture: self._message_capture.capture_tx_for_server(resp, current_tx_id) except Exception as e: log.error(f"Error writing response: {e}") except Exception as e: log.error(f"Error handling request: {e}") # 继续循环,处理缓冲区中剩余的数据 continue # 如果没有完整的帧,但收到了新数据,继续等待 # 注意:这里不会清空缓冲区,而是保持数据以等待后续数据 except Exception as e: # 解析错误,但不立即清空缓冲区 # 可能是因为数据不完整,等待更多数据 log.warning( f"Error parsing frame (might be incomplete data): {e}" ) # 只有在确定是无效数据格式时才清空缓冲区 # 这里可以根据具体的异常类型或错误信息来判断 except socket.timeout: # 清空缓冲区 if len(data_buffer) > 0: log.warning( f"Connection timeout with {len(data_buffer)} bytes in buffer, clearing buffer {bytes_to_spaced_hex(data_buffer)}" ) data_buffer.clear() except Exception as e: log.error(f"Error handling connection: {e}") finally: try: # 将连接从活跃连接列表中移除 with self._connections_lock: if conn in self._connections: self._connections.remove(conn) conn.close() log.info("Connection closed") except Exception as e: log.error(f"Error closing connection: {e}")