src.transport.client.rtu_client 源代码

"""RTU 客户端模块。

本模块实现了 DLT645 协议的 RTU(串口)客户端功能。
"""

import time
import serial
from typing import Optional
from ...common.transform import bytes_to_spaced_hex
from ...common.message_capture import MessageCapture
from ...transport.client.log import log
from ...protocol.frame import FRAME_START_BYTE, FRAME_END_BYTE
from ...protocol.protocol import DLT645Protocol


[文档] class RtuClient: """RTU 客户端类,用于与 DLT645 设备进行串口通信。 该类实现了 RTU(Remote Terminal Unit)客户端功能, 支持与 DLT645 协议设备进行串口通信。 :ivar port: 串口名称。 :ivar baud_rate: 波特率。 :ivar data_bits: 数据位。 :ivar stop_bits: 停止位。 :ivar parity: 校验位。 :ivar timeout: 超时时间(秒)。 :ivar conn: 串口连接对象。 """ def __init__( self, port: str = "", baud_rate: int = 9600, data_bits: int = 8, stop_bits: int = 1, parity: str = serial.PARITY_NONE, timeout: float = 1.0, ): """初始化 RTU 客户端。 :param port: 串口名称(如 "COM1" 或 "/dev/ttyUSB0")。 :type port: str :param baud_rate: 波特率,默认 9600。 :type baud_rate: int :param data_bits: 数据位,默认 8。 :type data_bits: int :param stop_bits: 停止位,默认 1。 :type stop_bits: int :param parity: 校验位,默认无校验。 :type parity: str :param timeout: 超时时间(秒),默认 1.0。 :type timeout: float """ self.port = port self.baud_rate = baud_rate self.data_bits = data_bits self.stop_bits = stop_bits self.parity = parity self.timeout = timeout self.conn: Optional[serial.Serial] = None # 报文捕获管理器 self._message_capture: Optional[MessageCapture] = None
[文档] def connect(self) -> bool: """连接到串口 建立与指定串口的连接,配置为DLT645协议通信所需的参数。 Returns: bool: 连接成功返回True,失败返回False """ try: self.conn = serial.Serial( port=self.port, baudrate=self.baud_rate, bytesize=self.data_bits, stopbits=self.stop_bits, parity=self.parity, timeout=self.timeout, ) log.info(f"RTU client connected to port {self.port}") return True except Exception as e: log.error(f"Failed to open serial port: {e}") return False
def _is_valid_response(self, response: bytearray) -> bool: """检查响应是否为有效的DLT645帧 Args: response: 接收到的响应数据 Returns: bool: 如果响应包含完整的DLT645帧则返回True """ # 检查是否同时包含起始字节和结束字节 if FRAME_START_BYTE in response and FRAME_END_BYTE in response: # 确保结束字节在起始字节之后 start_pos = response.find(FRAME_START_BYTE) end_pos = response.find(FRAME_END_BYTE, start_pos) if end_pos > start_pos: # 检查帧长度是否合理(最小帧长度约为12字节) if end_pos - start_pos >= 12: return True return False
[文档] def disconnect(self) -> bool: """断开与串口的连接 关闭串口连接并释放资源。 Returns: bool: 断开连接成功返回True,失败返回False """ if self.conn is not None: try: self.conn.close() self.conn = None log.info(f"RTU client disconnected from port {self.port}") return True except Exception as e: log.error(f"Failed to close serial port: {e}") return False return True
def _check_timeout(self, start_time: float) -> bool: """检查是否超时 Args: start_time: 开始时间 Returns: bool: 如果超时则返回True,否则返回False """ return time.time() - start_time > self.timeout
[文档] def send_request( self, data: bytes, retries: int = 1, ) -> Optional[bytes]: """发送请求并接收响应 向DLT645设备发送请求数据,并接收响应。支持超时处理和自动重试机制。 Args: data: 要发送的请求数据 retries: 失败重试次数,默认1次 Returns: bytes: 成功接收的响应数据 None: 失败时返回 """ # 确保连接已建立 if not self._ensure_connection(): log.error("Failed to establish serial port connection") return None # 保存原始超时设置 original_timeout = self.conn.timeout for attempt in range(retries + 1): last_data_time = time.time() try: # 清空缓冲区 if not self._safe_clear_buffer(): log.warning("Buffer clearance failed, proceeding anyway") # 发送数据 written = self.conn.write(data) if written != len(data): log.error(f"TX: Write incomplete ({written}/{len(data)} bytes)") continue log.info(f"TX: {bytes_to_spaced_hex(data)}") # 捕获发送的报文 current_tx_id: Optional[str] = None if self._message_capture: current_tx_id = self._message_capture.capture_tx(data) # 初始化数据缓冲区和接收状态 data_buffer = bytearray() max_buffer_size = 1024 # 增加缓冲区大小以处理较大的数据 # 持续读取直到收到完整帧或超时 while not self._check_timeout(last_data_time): # 读取数据, 在没有数据时等待直到超时 chunk = self.conn.read(256) if chunk: data_buffer.extend(chunk) last_data_time = time.time() log.info(f"RX: {bytes_to_spaced_hex(chunk)} len{len(chunk)}") # 无论是否收到数据,只要缓冲区不为空就尝试解析 if data_buffer: try: # 使用deserialize_with_remaining解析分片数据 remaining_data, frame = ( DLT645Protocol.deserialize_with_remaining(data_buffer) ) if frame is not None: # 成功解析到完整帧 log.info( f"RX: Successfully parsed frame, buffer size after parsing: {len(remaining_data)}" ) # 捕获接收的报文并与发送配对 if self._message_capture: self._message_capture.capture_rx(bytes(data_buffer), current_tx_id) return bytes(data_buffer) else: # 更新缓冲区为未解析的剩余数据 data_buffer = remaining_data # 检查缓冲区大小限制 if len(data_buffer) > max_buffer_size: log.warning( f"RX: Buffer overflow, clearing {len(data_buffer)} bytes" ) data_buffer.clear() except Exception as e: log.error( f"Error parsing data: {type(e).__name__}: {str(e)}" ) # 出错时保留数据继续尝试 # 小延迟避免CPU占用过高 time.sleep(0.001) log.warning( f"RX: Timeout after {original_timeout} seconds, received incomplete data" ) except Exception as e: log.error(f"Attempt {attempt} failed: {type(e).__name__}: {str(e)}") # 非最后一次尝试时延迟重试 if attempt < retries: log.info(f"Retrying ({attempt + 1}/{retries})...") time.sleep(0.5 * (attempt + 1)) # 指数退避 log.error("All attempts failed") return None
def _safe_clear_buffer(self) -> bool: """安全清空串口缓冲区 清空输入和输出缓冲区,避免旧数据干扰新的通信。 Returns: bool: 清空成功返回True,失败返回False """ try: if self.conn is not None: self.conn.reset_input_buffer() if hasattr(self.conn, "reset_output_buffer"): self.conn.reset_output_buffer() return True except Exception as e: log.warning(f"Clear buffer failed: {str(e)}") return False def _ensure_connection(self) -> bool: """确保串口连接已建立,如果连接断开则尝试重新连接 Returns: bool: 连接是否成功建立 """ try: # 检查连接是否存在且打开 if self.conn is None or not self.conn.is_open: log.info( "Connection lost or not established, attempting to reconnect..." ) return self.connect() return True except Exception as e: log.error(f"Connection check failed: {str(e)}") # 尝试重新连接 return self.connect()