"""DLT645 客户端服务模块。
本模块实现了 DLT645 协议的客户端业务服务功能,包括:
- 电能数据读取
- 需量数据读取
- 变量数据读取
- 事件记录读取
- 参变量读写
- 通讯地址读写
- 密码管理
"""
from concurrent.futures import ThreadPoolExecutor
from datetime import datetime
import time
import struct
from typing import Optional, Union
from ...common.transform import (
bcd_to_float,
bcd_to_time,
bcd_to_string,
bytes_to_int,
bytes_to_spaced_hex,
string_to_bcd,
)
from ...model.validators import validate_device
from ...model.types.data_type import DataFormat, DataItem
from ...model.types.dlt645_type import (
DI_LEN,
ADDRESS_LEN,
PASSWORD_LEN,
CtrlCode,
Demand,
EventRecord,
PasswordManager,
ErrorCode,
get_error_msg,
)
from ...protocol.protocol import DLT645Protocol
from ...protocol.frame import Frame
from ...model.data import data_handler as data
from ...service.clientsvc.log import log
from ...transport.client.rtu_client import RtuClient
from ...transport.client.tcp_client import TcpClient
from ...common.message_capture import MessageCapture
from ...common.message_types import MessageRecord, MessagePair
from typing import List
[文档]
class MeterClientService:
"""电表客户端服务类。
用于与 DLT645 电表设备进行通信,提供数据读写等业务功能。
:ivar address: 设备地址(6字节)。
:ivar password_manager: 密码管理器。
:ivar operation_code: 操作码(4字节)。
:ivar client: 通信客户端(TCP 或 RTU)。
"""
def __init__(self, client: Union[TcpClient, RtuClient]):
"""初始化电表客户端服务。
:param client: 通信客户端实例(TcpClient 或 RtuClient)。
:type client: Union[TcpClient, RtuClient]
"""
self.address = bytearray(6) # 6字节地址
self.password_manager: PasswordManager = PasswordManager() # 4字节密码
self.operation_code = bytearray(4) # 4字节操作码
self.client = client
self._executor = ThreadPoolExecutor(max_workers=1) # 用于超时控制
[文档]
@classmethod
def new_tcp_client(
cls, ip: str, port: int, timeout: float = 30.0
) -> Optional["MeterClientService"]:
"""创建TCP客户端"""
tcp_client = TcpClient(ip=ip, port=port, timeout=timeout)
# 创建业务服务实例
return cls.new_meter_client_service(tcp_client)
[文档]
@classmethod
def new_rtu_client(
cls,
port: str,
baudrate: int,
databits: int,
stopbits: int,
parity: str,
timeout: float,
) -> Optional["MeterClientService"]:
"""创建RTU客户端"""
rtu_client = RtuClient(
port=port,
baud_rate=baudrate,
data_bits=databits,
stop_bits=stopbits,
parity=parity,
timeout=timeout,
)
# 创建业务服务实例
return cls.new_meter_client_service(rtu_client)
[文档]
@classmethod
def new_meter_client_service(
cls, client: Union[TcpClient, RtuClient]
) -> Optional["MeterClientService"]:
"""创建新的MeterService实例"""
service = cls(client)
return service
[文档]
def get_time(self, t: bytes) -> datetime:
"""从字节数据获取时间"""
timestamp = bytes_to_int(t)
log.debug(f"timestamp: {timestamp}")
return datetime.fromtimestamp(timestamp)
[文档]
def set_address(self, address: str) -> bool:
"""设置设备地址"""
address = string_to_bcd(address)
if len(address) != ADDRESS_LEN:
log.error("无效的地址长度")
return False
self.address = address
log.info(f"设置客户端通讯地址: {bytes_to_spaced_hex(self.address)}")
return True
[文档]
def set_password(self, password: str) -> bool:
"""设置设备密码, 修改数据的命令会带上密码发送出去"""
password = string_to_bcd(password)
self.password_manager.set_password(password)
log.info(f"设置客户端密码: {bytes_to_spaced_hex(password)}")
return True
[文档]
def change_password(self, old_password: str, new_password: str) -> bool:
"""修改设备密码"""
old_password = string_to_bcd(old_password)
new_password = string_to_bcd(new_password)
if not self.password_manager.is_password_valid(old_password):
return False
if not self.password_manager.is_password_valid(new_password):
return False
new_level = new_password[0]
di = 0x40000C00 | new_level # 新密码的DI
write_data = old_password + new_password
data_bytes = struct.pack("<I", di) + write_data # 小端序
frame_bytes = DLT645Protocol.build_frame(
self.address, CtrlCode.ChangePassword, data_bytes
)
return self.send_and_handle_request(frame_bytes)
[文档]
def read_00(self, di: int) -> Optional[DataItem]:
"""读取电能"""
data_bytes = struct.pack("<I", di) # 小端序
frame_bytes = DLT645Protocol.build_frame(
self.address, CtrlCode.ReadData, data_bytes
)
return self.send_and_handle_request(frame_bytes)
[文档]
def read_01(self, di: int) -> Optional[DataItem]:
"""读取最大需量及发生时间"""
data_bytes = struct.pack("<I", di) # 小端序
frame_bytes = DLT645Protocol.build_frame(
self.address, CtrlCode.ReadData, data_bytes
)
return self.send_and_handle_request(frame_bytes)
[文档]
def read_02(self, di: int) -> Optional[DataItem]:
"""读取变量"""
data_bytes = struct.pack("<I", di) # 小端序
frame_bytes = DLT645Protocol.build_frame(
self.address, CtrlCode.ReadData, data_bytes
)
return self.send_and_handle_request(frame_bytes)
[文档]
def read_03(self, di: int) -> Optional[DataItem]:
"""读取事件记录"""
data_bytes = struct.pack("<I", di) # 小端序
frame_bytes = DLT645Protocol.build_frame(
self.address, CtrlCode.ReadData, data_bytes
)
return self.send_and_handle_request(frame_bytes)
[文档]
def read_04(self, di: int) -> Optional[DataItem]:
"""读取参变量"""
data_bytes = struct.pack("<I", di) # 小端序
frame_bytes = DLT645Protocol.build_frame(
self.address, CtrlCode.ReadData, data_bytes
)
return self.send_and_handle_request(frame_bytes)
[文档]
def write_04(self, di: int, value: str, password: str) -> Optional[DataItem]:
"""写参变量"""
# 密码 + 操作码 + 值
password = string_to_bcd(password)
write_data = (
password + self.operation_code + string_to_bcd(value, endian="little")
)
data_bytes = struct.pack("<I", di) + write_data # 小端序
frame_bytes = DLT645Protocol.build_frame(
self.address, CtrlCode.WriteData, data_bytes
)
return self.send_and_handle_request(frame_bytes)
[文档]
def read_address(self) -> Optional[DataItem]:
"""读取通讯地址"""
# 读取通讯地址需要使用特殊的广播地址0xAAAAAAAAAAAA
broadcast_address = bytearray([0xAA, 0xAA, 0xAA, 0xAA, 0xAA, 0xAA])
frame_bytes = DLT645Protocol.build_frame(
broadcast_address, CtrlCode.ReadAddress, None
)
return self.send_and_handle_request(frame_bytes)
[文档]
def write_address(self, new_address: bytes) -> Optional[DataItem]:
"""写通讯地址"""
if len(new_address) != ADDRESS_LEN:
log.error("无效的新地址长度")
return None
frame_bytes = DLT645Protocol.build_frame(
self.address, CtrlCode.WriteAddress, new_address
)
return self.send_and_handle_request(frame_bytes)
[文档]
def send_and_handle_request(
self,
frame_bytes: bytes,
) -> Optional[DataItem]:
"""发送请求并处理响应(带超时控制)
Args:
frame_bytes: 要发送的帧数据
Returns:
DataItem: 成功时返回数据项
None: 超时或失败时返回
"""
try:
if self.client is None:
log.error("连接未初始化")
return None
# 确保连接有效(重用连接,如果断开则重新连接)
if not self.client._ensure_connection():
log.error("连接失败")
return None
# 请求阶段超时控制
response = self.client.send_request(frame_bytes)
if response is None:
return None
# 解析阶段
frame = DLT645Protocol.deserialize(response)
if frame is None:
log.error("解析响应失败")
return None
# 处理响应
data_item = self.handle_response(frame)
return data_item
except Exception as e:
log.error(f"未知错误: {str(e)}", exc_info=True)
return None
def _is_valid_response(self, frame: Frame) -> bool:
"""验证响应帧是否有效"""
# 检测异常处理帧 (DLT645协议中,异常响应的控制码次高位为1)
if (frame.ctrl_code & 0x40) == 0x40: # 检查次高位
error_code = frame.data[0] if len(frame.data) > 0 else None
error_msg = "设备返回异常响应"
# 如果数据域不为空,尝试解析错误码
if frame.data:
error_code = frame.data[0] if len(frame.data) > 0 else None
# 根据常见的DLT645错误码定义错误信息
if any(error_code == ec for ec in ErrorCode):
error_msg = f"设备返回异常响应: {get_error_msg(error_code)} (错误码: {error_code:02X})"
else:
error_msg = f"设备返回异常响应: 未知错误码"
log.error(error_msg)
return False
return True
[文档]
def handle_response(self, frame: Frame) -> Optional[DataItem]:
"""处理响应帧,包括异常帧检测"""
try:
if not self._is_valid_response(frame):
return None
# 验证设备地址 - 特殊控制码不需要验证
if not validate_device(self.address, frame.ctrl_code, frame.addr):
log.warning(f"验证设备地址: {bytes_to_spaced_hex(frame.addr)} 失败")
return None
# 根据控制码判断响应类型
if frame.ctrl_code == (CtrlCode.BroadcastTimeSync | 0x80): # 广播校时响应
log.debug(f"广播校时响应: {bytes_to_spaced_hex(frame.data)}")
time_value = self.get_time(frame.data[0:4])
data_item = data.get_data_item(bytes_to_int(frame.data[0:4]))
if not data_item:
log.warning("获取数据项失败")
return None
data_item.value = time_value
return data_item
elif frame.ctrl_code == (CtrlCode.ReadData | 0x80): # 读数据响应
# 解析数据标识
if len(frame.data) < DI_LEN:
log.warning("读数据响应数据长度无效")
return None
di = frame.data[0:DI_LEN]
di3 = di[3]
if di3 == 0x00: # 读取电能响应
log.debug(f"读取电能响应: {bytes_to_spaced_hex(frame.data)}")
data_item = data.get_data_item(bytes_to_int(di))
if not data_item:
log.error("获取电能数据项失败")
return None
data_item.value = bcd_to_float(
frame.data[4:8], data_item.data_format, "little"
)
return data_item
elif di3 == 0x01: # 读取最大需量及发生时间响应
log.debug(
f"读取最大需量及发生时间响应: {bytes_to_spaced_hex(frame.data)}"
)
data_item = data.get_data_item(bytes_to_int(di))
if not data_item:
log.error("获取最大需量数据项失败")
return None
# 转换时间
occur_time = bcd_to_time(frame.data[7:12])
# 转换需量值
demand_value = bcd_to_float(
frame.data[DI_LEN : DI_LEN + 3], data_item.data_format, "little"
)
data_item.value = Demand(value=demand_value, time=occur_time)
return data_item
elif di3 == 0x02:
data_item = data.get_data_item(bytes_to_int(di))
if not data_item:
log.error("获取变量数据项失败")
return None
data_item.value = bcd_to_float(
frame.data[DI_LEN : DI_LEN + 4], data_item.data_format, "little"
)
return data_item
elif di3 == 0x03:
log.debug(f"读取事件记录响应: {bytes_to_spaced_hex(frame.data)}")
data_item = data.get_data_item(bytes_to_int(di))
if not data_item:
log.error("获取事件记录数据项失败")
return None
start_len = DI_LEN
for i, item in enumerate(data_item):
event_record: EventRecord = item.value
if isinstance(event_record.event, tuple):
step = len(item.data_format.split(",")[0]) // 2
event_list = list(event_record.event)
for i, _ in enumerate(event_list):
# 提取BCD数据部分
bcd_data = frame.data[start_len : start_len + step]
start_len += step
event_list[i] = bcd_to_string(bcd_data, "little")
event_record.event = tuple(reversed(event_list))
else:
step = len(item.data_format) // 2
# 提取BCD数据部分
bcd_data = frame.data[start_len : start_len + step]
start_len += step
item.value = bcd_to_string(bcd_data, "little")
return data_item
elif di3 == 0x04: # 读参变量响应
log.debug(f"读取参变量响应: {bytes_to_spaced_hex(frame.data)}")
data_item = data.get_data_item(bytes_to_int(di))
if not data_item:
log.error("获取参变量数据项失败")
return None
start_len = DI_LEN
# 时段表数据
if (
0x04010000
<= int.from_bytes(di, byteorder="little")
<= 0x04020008
):
for i, item in enumerate(data_item):
step = len(item.data_format) // 2
# 提取BCD数据部分
bcd_data = frame.data[start_len : start_len + step]
start_len += step
item.value = bcd_to_string(bcd_data, "little")
else:
# 提取BCD数据部分
bcd_data = frame.data[start_len:]
start_len += len(bcd_data)
data_item.value = bcd_to_string(bcd_data, "little")
return data_item
else:
log.warning(f"未知数据项: {bytes_to_spaced_hex(di)}")
return None
elif frame.ctrl_code == (CtrlCode.WriteData | 0x80): # 写参变量响应
log.debug(f"写参变量")
return None
elif frame.ctrl_code == (CtrlCode.ReadAddress | 0x80): # 读通讯地址响应
log.debug(f"读通讯地址响应: {bytes_to_spaced_hex(frame.data)}")
if len(frame.data) == ADDRESS_LEN:
self.address = frame.data[:ADDRESS_LEN]
return DataItem(
di=bytes_to_int(frame.data[0:DI_LEN]),
name="通讯地址",
data_format=DataFormat.XXXXXXXX.value,
value=bcd_to_string(frame.data),
unit="",
update_time=datetime.now(),
)
elif frame.ctrl_code == (CtrlCode.WriteAddress | 0x80): # 写通讯地址响应
log.debug(f"写通讯地址响应: {bytes_to_spaced_hex(frame.data)}")
return DataItem(
di=bytes_to_int(frame.data[0:DI_LEN]),
name="通讯地址",
data_format=DataFormat.XXXXXXXX.value,
value=bcd_to_string(frame.data),
unit="",
update_time=datetime.now(),
)
elif frame.ctrl_code == (CtrlCode.ChangePassword | 0x80): # 写密码响应
log.debug(f"写密码响应: {bytes_to_spaced_hex(frame.data)}")
password = frame.data[:DI_LEN]
self.password_manager.set_password(password)
else:
log.warning(f"Unknown control code: {frame.ctrl_code}")
return None
except Exception as e:
log.error(f"处理响应帧时出错: {e}")
raise
# ==================== 报文捕获方法 ====================
[文档]
def enable_message_capture(self, queue_size: int = 100) -> None:
"""启用报文捕获功能。
:param queue_size: 报文队列大小,默认100
:type queue_size: int
"""
if self.client._message_capture is None:
self.client._message_capture = MessageCapture(enabled=True, queue_size=queue_size)
else:
self.client._message_capture.enable()
self.client._message_capture.set_queue_size(queue_size)
log.info(f"报文捕获已启用,队列大小: {queue_size}")
[文档]
def disable_message_capture(self) -> None:
"""禁用报文捕获功能。"""
if self.client._message_capture:
self.client._message_capture.disable()
log.info("报文捕获已禁用")
[文档]
def get_captured_messages(self, count: int = 0) -> List[MessageRecord]:
"""获取捕获的报文列表。
:param count: 要获取的数量,0表示全部
:type count: int
:return: 报文列表
:rtype: List[MessageRecord]
"""
if self.client._message_capture:
return self.client._message_capture.get_all_messages(count)
return []
[文档]
def get_captured_tx_messages(self, count: int = 0) -> List[MessageRecord]:
"""获取捕获的发送报文列表。
:param count: 要获取的数量,0表示全部
:type count: int
:return: 发送报文列表
:rtype: List[MessageRecord]
"""
if self.client._message_capture:
return self.client._message_capture.get_tx_messages(count)
return []
[文档]
def get_captured_rx_messages(self, count: int = 0) -> List[MessageRecord]:
"""获取捕获的接收报文列表。
:param count: 要获取的数量,0表示全部
:type count: int
:return: 接收报文列表
:rtype: List[MessageRecord]
"""
if self.client._message_capture:
return self.client._message_capture.get_rx_messages(count)
return []
[文档]
def get_captured_pairs(self, count: int = 0) -> List[MessagePair]:
"""获取捕获的TX/RX配对列表。
:param count: 要获取的数量,0表示全部
:type count: int
:return: 配对列表
:rtype: List[MessagePair]
"""
if self.client._message_capture:
return self.client._message_capture.get_pairs(count)
return []
[文档]
def clear_captured_messages(self) -> None:
"""清空所有捕获的报文。"""
if self.client._message_capture:
self.client._message_capture.clear()
log.info("捕获的报文已清空")
[文档]
def get_message_capture_stats(self) -> dict:
"""获取报文捕获统计信息。
:return: 统计信息字典
:rtype: dict
"""
if self.client._message_capture:
return self.client._message_capture.get_stats()
return {"enabled": False}