"""RTU 服务器模块。
本模块实现了 DLT645 协议的 RTU(串口)服务器功能。
"""
from typing import Optional, Any
import serial
import threading
import time
from ...common.transform import bytes_to_spaced_hex
from ...common.message_capture import MessageCapture
from ...protocol.protocol import DLT645Protocol
from ...transport.server.log import log
[文档]
class RtuServer:
"""RTU 服务器类,用于与 DLT645 客户端进行串口通信。
该类实现了 RTU(Remote Terminal Unit)服务器功能,
支持与 DLT645 协议客户端进行串口通信。
:ivar port: 串口名称。
:ivar data_bits: 数据位。
:ivar stop_bits: 停止位。
:ivar baud_rate: 波特率。
:ivar parity: 校验位。
:ivar timeout: 超时时间(秒)。
:ivar service: 服务实例,用于处理业务逻辑。
:ivar conn: 串口连接对象。
"""
def __init__(
self,
port: str,
data_bits: int = 8,
stop_bits: int = 1,
baud_rate: int = 9600,
parity: str = serial.PARITY_NONE,
timeout: float = 5.0,
service=None,
):
"""初始化 RTU 服务器。
:param port: 串口端口名(如 '/dev/ttyUSB0')。
:type port: str
:param data_bits: 数据位,默认 8。
:type data_bits: int
:param stop_bits: 停止位,默认 1。
:type stop_bits: int
:param baud_rate: 波特率,默认 9600。
:type baud_rate: int
:param parity: 校验位,默认无校验。
:type parity: str
:param timeout: 超时时间(秒),默认 5.0。
:type timeout: float
:param service: 服务实例,用于处理业务逻辑。
:type service: Any
"""
self.port = port
self.data_bits = data_bits
self.stop_bits = stop_bits
self.baud_rate = baud_rate
self.parity = parity
self.timeout = timeout
self.service = service
self.conn: Optional[serial.Serial] = None
self._server_thread = None
self._running = False
self._stop_event = threading.Event()
# 报文捕获管理器
self._message_capture: Optional[MessageCapture] = None
[文档]
def start(self) -> bool:
"""启动RTU服务器(非阻塞,在后台线程中运行)"""
if self._running:
log.warning("RTU server is already running")
return True
self._stop_event.clear()
self._running = True
# 在后台线程中启动服务器
self._server_thread = threading.Thread(target=self._run_server, daemon=True)
self._server_thread.start()
# 等待服务器启动完成
time.sleep(0.1)
log.info(
f"RTU server starting in background on {self.port}, baud_rate={self.baud_rate}, data_bits={self.data_bits}, stop_bits={self.stop_bits}, parity={self.parity}, timeout={self.timeout}"
)
return True
def _run_server(self):
"""服务器主循环(在后台线程中运行)"""
try:
self.conn = serial.Serial(
port=self.port,
baudrate=self.baud_rate,
bytesize=self.data_bits,
stopbits=self.stop_bits,
parity=self.parity,
timeout=self.timeout,
)
log.info(f"RTU server started on port {self.port}")
# 启动连接处理循环
self.handle_connection(self.conn)
except Exception as e:
log.error(f"Failed to open serial port: {e}")
finally:
self._running = False
if self.conn:
try:
self.conn.close()
self.conn = None
except:
pass
log.info("RTU server stopped")
[文档]
def stop(self) -> bool:
"""停止RTU服务器"""
if not self._running:
log.warning("RTU server is not running")
return True
log.info("Shutting down RTU server...")
# 设置停止信号
self._stop_event.set()
# 关闭串口连接
if self.conn is not None:
try:
self.conn.close()
self.conn = None
except Exception as e:
log.error(f"Error closing serial connection: {e}")
# 等待服务器线程结束
if self._server_thread and self._server_thread.is_alive():
self._server_thread.join(timeout=5.0)
if self._server_thread.is_alive():
log.warning("Server thread did not stop gracefully")
self._running = False
log.info("RTU server shutdown complete")
return True
[文档]
def is_running(self) -> bool:
"""检查服务器是否正在运行"""
return self._running
def _check_timeout(self, last_data_time: float) -> bool:
"""检查是否超时"""
current_time = time.time()
if current_time - last_data_time > self.timeout:
log.error(
f"Buffer timeout: clearing {len(data_buffer)} bytes of incomplete data: {bytes_to_spaced_hex(data_buffer)}"
)
return True
return False
[文档]
def handle_connection(self, conn: Any) -> None:
"""处理单个串口连接
Args:
conn: 串口连接对象,必须是 serial.Serial 实例
"""
if not isinstance(conn, serial.Serial):
log.error(f"Invalid connection type: {type(conn)}")
return
log.debug(
f"Starting to handle connection on {self.port} - Baud rate: {conn.baudrate}, Data bits: {conn.bytesize}, "
f"Stop bits: {conn.stopbits}, Parity: {conn.parity}, Timeout: {conn.timeout}"
)
# 初始化数据缓冲区,用于累积接收的数据
data_buffer = bytearray()
# 记录最后一次收到数据的时间
last_data_time = time.time()
try:
# 确保串口已经打开
if not conn.is_open:
log.error("Serial port is not open")
return
log.info("Waiting for data...")
while not self._stop_event.is_set():
# 检查串口是否仍然打开
if not conn.is_open:
log.error("Serial port was closed unexpectedly")
break
# 使用较短的超时,以便定期检查停止信号
try:
# 检查缓冲区超时 - 如果缓冲区有数据但超过timeout时间没有收到新数据,清空缓冲区
if len(data_buffer) > 0 and self._check_timeout(last_data_time):
data_buffer = bytearray()
continue
# 先检查是否有数据可读
if conn.in_waiting > 0:
# 读取所有可用数据
data = conn.read(conn.in_waiting)
# 将新接收的数据添加到缓冲区
data_buffer.extend(data)
log.info(f"RX: {bytes_to_spaced_hex(data)} ({len(data)} bytes)")
# 更新最后收到数据的时间
last_data_time = time.time()
# 捕获接收的报文
current_tx_id: Optional[str] = None
if self._message_capture:
current_tx_id = self._message_capture.capture_rx_for_server(data)
# 尝试解析缓冲区中的数据
try:
# 尝试从缓冲区中提取完整的帧
remaining_data, frame = (
DLT645Protocol.deserialize_with_remaining(
bytes(data_buffer)
)
)
if frame is not None:
# 解析成功,更新缓冲区为剩余未解析的数据
data_buffer = bytearray(remaining_data)
log.info(
f"Successfully parsed frame, remaining buffer size: {len(data_buffer)}"
)
# 业务处理
try:
resp = self.service.handle_request(frame)
# 响应
if resp is not None:
try:
bytes_written = conn.write(resp)
log.info(
f"TX: {bytes_to_spaced_hex(resp)} ({bytes_written} bytes)"
)
# 捕获发送的报文并与接收配对
if self._message_capture:
self._message_capture.capture_tx_for_server(resp, current_tx_id)
except Exception as e:
log.error(f"Error writing response: {e}")
except Exception as e:
log.error(f"Error handling request: {e}")
# 继续循环,处理缓冲区中剩余的数据
continue
# 如果没有完整的帧,但收到了新数据,继续等待
# 注意:这里不会清空缓冲区,而是保持数据以等待后续数据
except Exception as e:
# 解析错误,但不立即清空缓冲区
# 可能是因为数据不完整,等待更多数据
log.warning(
f"Error parsing frame (might be incomplete data): {e}"
)
else:
# 没有数据可读,短暂等待以避免CPU占用过高
time.sleep(0.01)
except Exception as read_error:
log.error(f"Error reading from serial port: {read_error}")
# 短暂暂停后继续尝试
time.sleep(0.1)
except Exception as e:
if not self._stop_event.is_set():
log.error(f"Connection handling error: {e}")
finally:
try:
if conn and conn.is_open:
conn.close()
log.info("Serial connection closed")
except Exception as e:
log.error(f"Error closing connection: {e}")