src.transport.client.tcp_client 源代码

"""TCP 客户端模块。

本模块实现了 DLT645 协议的 TCP 客户端功能。
"""

import socket
import time
from typing import Optional, Any

from ...common.transform import bytes_to_spaced_hex
from ...common.message_capture import MessageCapture
from ...protocol.protocol import DLT645Protocol
from ...transport.client.log import log


[文档] class TcpClient: """TCP 客户端类,用于与 DLT645 设备进行 TCP 通信。 :ivar ip: 服务器 IP 地址。 :ivar port: 服务器端口号。 :ivar timeout: 连接超时时间(秒)。 :ivar conn: socket 连接对象。 """ def __init__(self, ip: str = "", port: int = 0, timeout: float = 5.0): """初始化 TCP 客户端。 :param ip: 服务器 IP 地址(如 '0.0.0.0')。 :type ip: str :param port: 服务器端口号。 :type port: int :param timeout: 连接超时时间(秒),默认 5.0。 :type timeout: float """ self.ip = ip self.port = port self.timeout = timeout self.conn: Optional[socket.socket] = None # 报文捕获管理器 self._message_capture: Optional[MessageCapture] = None
[文档] def connect(self) -> bool: """连接到服务器""" address = f"{self.ip}:{self.port}" try: self.conn = socket.socket(socket.AF_INET, socket.SOCK_STREAM) self.conn.settimeout(self.timeout) self.conn.connect((self.ip, self.port)) log.info(f"成功连接到服务器 {address}") return True except Exception as e: log.error(f"连接服务器失败: {e}") return False
[文档] def disconnect(self) -> bool: """断开与服务器的连接""" if self.conn is not None: try: self.conn.close() self.conn = None log.info("已断开与服务器的连接") return True except Exception as e: log.error(f"断开连接失败: {e}") return False return True
[文档] def send_request( self, data: bytes, write_timeout: float = 2.0, read_timeout: float = 5.0, total_timeout: float = 10.0, min_response_len: int = 1, retries: int = 1, ) -> Optional[bytes]: """增强版TCP请求-响应(支持超时控制和分片数据处理) Args: data: 要发送的请求数据 write_timeout: 数据写入超时(秒) read_timeout: 单次recv操作的超时(秒) total_timeout: 整个请求-响应的总超时(秒) min_response_len: 最小有效响应长度 retries: 失败重试次数 Returns: bytes: 成功接收的响应数据 None: 超时或失败时返回 """ if self.conn is None: log.error("Not connected to server") return None original_timeout = self.conn.gettimeout() # 保存原始超时设置 data_buffer = bytearray() for attempt in range(retries + 1): try: # 0. 初始化计时器和缓冲区 start_time = time.time() data_buffer.clear() # 1. 设置socket超时(影响后续所有操作) self.conn.settimeout(read_timeout) # 2. 带超时的数据写入 try: self.conn.sendall(data) log.info(f"TX: {bytes_to_spaced_hex(data)}") # 捕获发送的报文 current_tx_id: Optional[str] = None if self._message_capture: current_tx_id = self._message_capture.capture_tx(data) except socket.timeout: raise TimeoutError(f"Write timeout after {write_timeout}s") # 3. 接收数据(带总超时控制和分片处理) while (time.time() - start_time) < total_timeout: try: # 读取数据到缓冲区 chunk = self.conn.recv(1024) # 增加缓冲区大小 if chunk: data_buffer.extend(chunk) log.info( f"RX: {bytes_to_spaced_hex(chunk)} (buffer size: {len(data_buffer)})" ) # 尝试解析缓冲区中的数据 try: remaining_data, frame = ( DLT645Protocol.deserialize_with_remaining( bytes(data_buffer) ) ) if frame is not None: log.debug( f"Successfully received complete response: {bytes_to_spaced_hex(data_buffer)}" ) # 捕获接收的报文并与发送配对 if self._message_capture: self._message_capture.capture_rx(bytes(data_buffer), current_tx_id) # 恢复原始超时设置 self.conn.settimeout(original_timeout) return data_buffer except Exception as parse_error: log.warning( f"Parse error (might be incomplete data): {parse_error}" ) # 继续等待更多数据 continue else: # 空数据表示连接关闭 log.warning("Connection closed by peer") break except socket.timeout: # 单次recv超时,检查总超时 if (time.time() - start_time) >= total_timeout: log.warning(f"Total timeout reached after {total_timeout}s") break # 继续等待更多数据 continue # 超时或中断处理 if len(data_buffer) >= min_response_len: log.warning( f"Incomplete response ({len(data_buffer)} bytes): {bytes_to_spaced_hex(data_buffer)}" ) else: log.error(f"No valid response within {total_timeout}s") except TimeoutError as e: log.error(str(e)) except Exception as e: log.error(f"Attempt {attempt} failed: {type(e).__name__}: {str(e)}") # 非最后一次尝试时延迟重试 if attempt < retries: log.info(f"Retrying ({attempt+1}/{retries})...") time.sleep(0.5 * (attempt + 1)) # 指数退避 # 重连逻辑(如果连接已断开) if not self._ensure_connection(): continue # 恢复原始超时设置 self.conn.settimeout(original_timeout) return None
def _ensure_connection(self) -> bool: """确保连接有效(用于重试时重新连接)""" if self.conn is None: return self.connect() # 简单检查连接是否仍有效 try: self.conn.getsockopt(socket.SOL_SOCKET, socket.SO_ERROR) return True except: self.disconnect() return self.connect() def _safe_sendall(self, data: bytes, timeout: float) -> bool: """带超时的sendall实现""" self.conn.settimeout(timeout) try: self.conn.sendall(data) return True except socket.timeout: return False finally: self.conn.settimeout(self.timeout) # 恢复原始超时 def __enter__(self): """上下文管理器入口""" self.connect() return self def __exit__(self, exc_type, exc_val, exc_tb): """上下文管理器退出""" self.disconnect()