"""DLT645 服务端服务模块。
本模块实现了 DLT645 协议的服务端业务服务功能,包括:
- 处理客户端数据读取请求
- 处理客户端数据写入请求
- 通讯地址管理
- 密码验证和管理
"""
import struct
from typing import Optional, Union, List
from ...common.transform import (
bytes_to_spaced_hex,
float_to_bcd,
datetime_to_bcd,
string_to_bcd,
bcd_to_value,
bcd_to_string,
)
from ...model.data.data_handler import set_data_item, get_data_item
from ...model.types.data_type import DataItem
from ...model.validators import validate_device
from ...model.types.dlt645_type import (
DI_LEN,
PASSWORD_LEN,
ADDRESS_LEN,
OPERATOR_CODE_LEN,
CtrlCode,
Demand,
ErrorCode,
EventRecord,
PasswordManager,
)
from ...protocol.protocol import DLT645Protocol
from ...model.data import data_handler as data
from ...service.serversvc.log import log
from ...transport.server.rtu_server import RtuServer
from ...transport.server.tcp_server import TcpServer
from ...common.message_capture import MessageCapture
from ...common.message_types import MessageRecord, MessagePair
from typing import List
[文档]
class MeterServerService:
"""电表服务端服务类。
用于模拟 DLT645 电表设备,响应客户端的数据读写请求。
:ivar server: 通信服务器(TCP 或 RTU)。
:ivar address: 设备地址(6字节)。
:ivar password_manager: 密码管理器。
:ivar clear_meter_event_records: 电表清零事件记录列表。
:ivar event_records: 事件记录列表。
"""
def __init__(
self,
server: Union[TcpServer, RtuServer],
address: Optional[bytearray] = bytearray([0x00, 0x00, 0x00, 0x00, 0x00, 0x00]),
password_manager: Optional[PasswordManager] = PasswordManager(),
):
"""初始化电表服务端服务。
:param server: 通信服务器实例(TcpServer 或 RtuServer)。
:type server: Union[TcpServer, RtuServer]
:param address: 设备地址,默认为全零。
:type address: Optional[bytearray]
:param password_manager: 密码管理器,默认创建新实例。
:type password_manager: Optional[PasswordManager]
"""
self.server = server
self.address = address
self.password_manager = password_manager
self.clear_meter_event_records = [] # 记录电表清零事件
self.event_records = [] # 记录事件
[文档]
@classmethod
def new_tcp_server(
cls, ip: str, port: int, timeout: float = 5.0
) -> "MeterServerService":
"""创建 TCP 服务器
:param ip: IP 地址
:param port: 端口
:param timeout: 超时时间
:return:
"""
# 1. 先创建 TcpServer
tcp_server = TcpServer(ip, port, timeout, None)
# 2. 创建 MeterServerService,注入 TcpServer(作为 Server 接口)
return cls.new_meter_server_service(tcp_server)
[文档]
@classmethod
def new_rtu_server(
cls,
port: str,
data_bits: int,
stop_bits: int,
baud_rate: int,
parity: str,
timeout: float,
) -> "MeterServerService":
"""创建 RTU 服务器
:param port: 端口
:param data_bits: 数据位
:param stop_bits: 停止位
:param baud_rate: 波特率
:param parity: 校验位
:param timeout: 超时时间
:return:
"""
# 1. 先创建 RtuServer
rtu_server = RtuServer(port, data_bits, stop_bits, baud_rate, parity, timeout)
# 2. 创建 MeterServerService,注入 RtuServer(作为 Server 接口)
return cls.new_meter_server_service(rtu_server)
[文档]
@classmethod
def new_meter_server_service(
cls, server: Union[TcpServer, RtuServer]
) -> "MeterServerService":
"""创建新的MeterServerService实例
:param server: 服务器实例(TCP或RTU)
:return: MeterServerService实例
"""
# 创建业务服务实例
meter_service = cls(server)
# 将服务实例注入回服务器
server.service = meter_service
return meter_service
# 设置时间,需根据实际情况实现
[文档]
def set_time(self, data_bytes):
pass
[文档]
def set_address(self, address: str):
"""写通讯地址
:param address:
:return:
"""
address = string_to_bcd(address)
if len(address) != ADDRESS_LEN:
raise ValueError("invalid address length")
self.address = address
[文档]
def set_password(self, password: str) -> None:
"""写密码
:param password:
:return:
"""
password = string_to_bcd(password)
self.password_manager.set_password(password)
log.info(f"设置密码: {bytes_to_spaced_hex(password)}")
[文档]
def set_00(self, di: int, value: float) -> bool:
"""写电能量
:param di: 数据项
:param value: 值
:return:
"""
ok = set_data_item(di, value)
if not ok:
log.error(f"写电能量失败")
return ok
[文档]
def set_01(self, di: int, demand: Demand) -> bool:
"""写最大需量及发生时间
:param di: 数据项
:param demand: 值
:return:
"""
ok = set_data_item(di, demand)
if not ok:
log.error(f"写最大需量及发生时间失败")
return ok
[文档]
def set_02(self, di: int, value: float) -> bool:
"""写变量
:param di: 数据项
:param value: 值
:return:
"""
data_item = get_data_item(di)
if data_item is None:
log.error(f"获取变量数据项失败")
return False
ok = set_data_item(di, value)
if not ok:
log.error(f"写变量失败")
return False
return ok
[文档]
def set_03(self, di: int, value: list[str, tuple[str, str]]) -> bool:
"""写事件记录
:param di: 数据项
:param value: 值
:return:
"""
data_item = get_data_item(di)
if data_item is None:
log.error(f"获取事件记录数据项失败")
return False
if not set_data_item(di, value):
log.error(f"写事件记录失败")
return False
return True
[文档]
def set_04(self, di: int, value: str | list) -> bool:
"""写参变量
:param di: 数据项
:param value: 值
:return:
"""
data_item = get_data_item(di)
if data_item is None:
log.error(f"获取参变量数据项失败")
return False
if not set_data_item(di, value):
log.error(f"写参变量失败")
return False
return True
[文档]
def get_data_item(self, di: int) -> Optional[DataItem]:
"""获取数据项
:param di: 数据项
:return:
"""
return get_data_item(di)
[文档]
def handle_request(self, frame):
"""处理读数据请求
:param frame:
:return:
"""
try:
# 1. 验证设备
if not validate_device(self.address, frame.ctrl_code, frame.addr):
log.error(f"验证设备地址: {bytes_to_spaced_hex(frame.addr)} 失败")
# 返回未授权异常帧
return self._build_error_response(
frame, error_code=ErrorCode.OtherError
)
# 2. 根据控制码判断请求类型
if frame.ctrl_code == CtrlCode.BroadcastTimeSync: # 广播校时
log.info(f"广播校时: {bytes_to_spaced_hex(frame.data)}")
self.set_time(frame.data)
return DLT645Protocol.build_frame(
frame.addr, frame.ctrl_code | 0x80, frame.data
)
elif frame.ctrl_code == CtrlCode.ReadData:
# 解析数据标识
di = frame.data
di3 = di[3]
if di3 == 0x00: # 读取电能
# 构建响应帧
res_data = bytearray(8)
# 解析数据标识为 32 位无符号整数
data_id = struct.unpack("<I", frame.data[:DI_LEN])[0]
data_item = get_data_item(data_id)
if data_item is None:
log.error(f"数据项未找到: {data_id}")
return self._build_error_response(
frame, error_code=ErrorCode.RequestDataEmpty
)
res_data[:DI_LEN] = frame.data[:DI_LEN] # 仅复制前 4 字节数据标识
value = data_item.value
# 转换为 BCD 码
bcd_value = float_to_bcd(value, data_item.data_format, "little")
res_data[DI_LEN:] = bcd_value
return DLT645Protocol.build_frame(
frame.addr, frame.ctrl_code | 0x80, bytes(res_data)
)
elif di3 == 0x01: # 读取最大需量及发生时间
res_data = bytearray(12)
# 解析数据标识为 32 位无符号整数
data_id = struct.unpack("<I", frame.data[:DI_LEN])[0]
data_item = get_data_item(data_id)
if data_item is None:
log.error(f"数据项未找到: {data_id}")
return self._build_error_response(
frame, error_code=ErrorCode.RequestDataEmpty
)
res_data[:DI_LEN] = frame.data[:DI_LEN] # 返回数据标识
demand: Demand = data_item.value
# 转换为 BCD 码
bcd_value = float_to_bcd(
demand.value, data_item.data_format, "little"
)
res_data[DI_LEN : DI_LEN + 3] = bcd_value[:3]
# 需量发生时间
res_data[DI_LEN + 3 : 12] = datetime_to_bcd(demand.time)
return DLT645Protocol.build_frame(
frame.addr, frame.ctrl_code | 0x80, bytes(res_data)
)
elif di3 == 0x02: # 读变量
# 解析数据标识为 32 位无符号整数
data_id = struct.unpack("<I", frame.data[:DI_LEN])[0]
data_item = get_data_item(data_id)
if data_item is None:
log.error(f"数据项未找到: {data_id}")
return self._build_error_response(
frame, error_code=ErrorCode.RequestDataEmpty
)
# 变量数据长度
data_len = DI_LEN
data_len += (
len(data_item.data_format) - 1
) // 2 # (数据格式长度 - 1 位小数点)/2
# 构建响应帧
res_data = bytearray(data_len)
res_data[:DI_LEN] = frame.data[:DI_LEN] # 仅复制前 DI_LEN 字节
value = data_item.value
# 转换为 BCD 码(小端序)
bcd_value = float_to_bcd(value, data_item.data_format, "little")
res_data[DI_LEN:data_len] = bcd_value
return DLT645Protocol.build_frame(
frame.addr, frame.ctrl_code | 0x80, bytes(res_data)
)
elif di3 == 0x03: # 读事件记录
# 解析数据标识为 32 位无符号整数
data_id = struct.unpack("<I", frame.data[:DI_LEN])[0]
data_item: Optional[List[DataItem]] = get_data_item(data_id)
if data_item is None:
log.error(f"数据项未找到: {data_id}")
return self._build_error_response(
frame, error_code=ErrorCode.RequestDataEmpty
)
res_data = bytearray()
res_data.extend(frame.data[:DI_LEN]) # 仅复制前 DI_LEN 字节
for item in data_item:
event_record: EventRecord = item.value
if isinstance(event_record.event, tuple):
for event_item in reversed(event_record.event):
value = string_to_bcd(event_item, "little")
res_data.extend(value)
elif isinstance(event_record.event, str):
value = string_to_bcd(event_record.event, "little")
res_data.extend(value)
elif isinstance(event_record.event, float):
value = float_to_bcd(event_record.event, "little")
res_data.extend(value)
return DLT645Protocol.build_frame(
frame.addr, frame.ctrl_code | 0x80, bytes(res_data)
)
elif di3 == 0x04: # 读参变量
# 解析数据标识为 32 位无符号整数
data_id = struct.unpack("<I", frame.data[:DI_LEN])[0]
data_item = get_data_item(data_id)
if data_item is None:
log.error(f"数据项未找到: {data_id}")
return self._build_error_response(
frame, error_code=ErrorCode.RequestDataEmpty
)
# 变量数据长度
data_len = DI_LEN
# 时段表数据
if (
0x04010000
<= int.from_bytes(di, byteorder="little")
<= 0x04020008
):
res_data = bytearray(DI_LEN + len(data_item) * 2)
for i, item in enumerate(data_item):
step = len(item.data_format) // 2
data_len += step
res_data[:DI_LEN] = frame.data[:DI_LEN] # 复制数据标识
value = item.value
bcd_value = string_to_bcd(value, "little")
# 扩展res_data以容纳BCD数据
res_data[DI_LEN + step * i : DI_LEN + step * (i + 1)] = (
bcd_value
)
return DLT645Protocol.build_frame(
frame.addr, frame.ctrl_code | 0x80, bytes(res_data)
)
else:
# 根据数据格式确定数据长度
data_format = data_item.data_format
data_len += len(data_format) // 2
# 构建响应帧
res_data = bytearray(data_len)
res_data[:DI_LEN] = frame.data[:DI_LEN] # 复制数据标识
value = data_item.value
bcd_value = string_to_bcd(value, "little")
# 扩展res_data以容纳BCD数据
res_data[DI_LEN : DI_LEN + data_len] = bcd_value
return DLT645Protocol.build_frame(
frame.addr, frame.ctrl_code | 0x80, bytes(res_data)
)
else:
log.error(f"未知的数据标识类型: {hex(di3)}")
return self._build_error_response(
frame, error_code=ErrorCode.OtherError
)
elif frame.ctrl_code == CtrlCode.WriteData:
log.debug(f"收到写数据请求: {bytes_to_spaced_hex(frame.data)}")
# 解析数据标识为 32 位无符号整数
data_id = struct.unpack("<I", frame.data[:DI_LEN])[0]
data_item = get_data_item(data_id)
if data_item is None:
log.error(f"数据项未找到: {data_id}")
return self._build_error_response(
frame, error_code=ErrorCode.RequestDataEmpty
)
# 提取密码
password = frame.data[DI_LEN : DI_LEN + PASSWORD_LEN]
if not self.password_manager.check_password(password):
log.error(f"密码错误: {bytes_to_spaced_hex(password)}")
return self._build_error_response(
frame, error_code=ErrorCode.AuthFailed
)
# 提取操作者代码
operator_code = frame.data[
DI_LEN + PASSWORD_LEN : DI_LEN + PASSWORD_LEN + OPERATOR_CODE_LEN
]
# 提取数据
data_len = len(frame.data) - DI_LEN - PASSWORD_LEN
data = frame.data[
DI_LEN
+ PASSWORD_LEN
+ OPERATOR_CODE_LEN : DI_LEN
+ PASSWORD_LEN
+ OPERATOR_CODE_LEN
+ data_len
]
# 解析数据
value = bcd_to_value(data, data_item.data_format, "little")
if not set_data_item(data_id, value):
log.error(f"设置数据项 {data_id} 失败")
return self._build_error_response(
frame, error_code=ErrorCode.OtherError
)
# 构建响应帧
res_data = bytearray() # 广播校时不需要返回数据
return DLT645Protocol.build_frame(
frame.addr, frame.ctrl_code | 0x80, res_data
)
elif frame.ctrl_code == CtrlCode.ReadAddress:
# 构建响应帧
res_data = self.address[:ADDRESS_LEN]
return DLT645Protocol.build_frame(
bytes(self.address), frame.ctrl_code | 0x80, bytes(res_data)
)
elif frame.ctrl_code == CtrlCode.WriteAddress:
res_data = bytearray()
# 解析数据
addr = frame.data[:ADDRESS_LEN]
self.set_address(addr) # 设置通讯地址
return DLT645Protocol.build_frame(
bytes(self.address), frame.ctrl_code | 0x80, res_data
)
elif frame.ctrl_code == CtrlCode.ChangePassword:
# 解析数据
res_data = bytearray()
old_password = frame.data[DI_LEN : DI_LEN + PASSWORD_LEN]
new_password = frame.data[
DI_LEN + PASSWORD_LEN : DI_LEN + PASSWORD_LEN * 2
]
if not self.password_manager.change_password(
old_password, new_password
):
log.error(f"修改密码失败")
return self._build_error_response(
frame, error_code=ErrorCode.AuthFailed
)
res_data = new_password # 返回新密码响应
return DLT645Protocol.build_frame(
bytes(self.address), frame.ctrl_code | 0x80, res_data
)
elif frame.ctrl_code == CtrlCode.ClearDemand:
log.debug(f"收到需量清零请求: {bytes_to_spaced_hex(frame.data)}")
# 解析数据标识为 32 位无符号整数
data_id = struct.unpack("<I", frame.data[:DI_LEN])[0]
data_item = get_data_item(data_id)
if data_item is None:
log.error(f"数据项未找到: {data_id}")
return self._build_error_response(
frame, error_code=ErrorCode.RequestDataEmpty
)
# 提取密码
password = frame.data[DI_LEN : DI_LEN + PASSWORD_LEN]
if not self.password_manager.check_password(password):
log.error(f"密码错误: {bytes_to_spaced_hex(password)}")
return self._build_error_response(
frame, error_code=ErrorCode.AuthFailed
)
# 提取操作者代码
operator_code = frame.data[
DI_LEN + PASSWORD_LEN : DI_LEN + PASSWORD_LEN + OPERATOR_CODE_LEN
]
# 执行需量清零:将需量值设为0,时间设为当前时间
from datetime import datetime
cleared_demand = Demand(0.0, datetime.now())
if not set_data_item(data_id, cleared_demand):
log.error(f"需量清零失败: {data_id}")
return self._build_error_response(
frame, error_code=ErrorCode.OtherError
)
log.info(f"需量清零成功: DI={hex(data_id)}, 操作者代码={bytes_to_spaced_hex(operator_code)}")
# 构建响应帧
res_data = bytearray()
return DLT645Protocol.build_frame(
frame.addr, frame.ctrl_code | 0x80, res_data
)
else:
log.error(f"未知的控制码: {hex(frame.ctrl_code)}")
return self._build_error_response(
frame, error_code=ErrorCode.OtherError
)
except Exception as e:
# 捕获其他未预期的异常
log.error(f"处理请求时发生未预期异常: {str(e)}")
# 返回通用错误异常帧
return self._build_error_response(frame, error_code=ErrorCode.OtherError)
def _build_error_response(self, frame, error_code: int):
"""构建异常响应帧
:param frame: 原始请求帧
:param error_code: 错误码
:return: 异常响应帧
"""
# 构建异常响应帧,控制码最高位设为1表示响应
log.debug(
f"构建异常响应帧: 地址={frame.addr.hex()}, 控制码={hex(frame.ctrl_code | 0xC0)}, 错误码={error_code}"
)
error_data = bytes([error_code])
return DLT645Protocol.build_frame( # D7=1, D6=1表示异常响应, C=1100
frame.addr, frame.ctrl_code | 0xC0, error_data
)
# ==================== 报文捕获方法 ====================
[文档]
def enable_message_capture(self, queue_size: int = 100) -> None:
"""启用报文捕获功能。
:param queue_size: 报文队列大小,默认100
:type queue_size: int
"""
if self.server._message_capture is None:
self.server._message_capture = MessageCapture(enabled=True, queue_size=queue_size)
else:
self.server._message_capture.enable()
self.server._message_capture.set_queue_size(queue_size)
log.info(f"报文捕获已启用,队列大小: {queue_size}")
[文档]
def disable_message_capture(self) -> None:
"""禁用报文捕获功能。"""
if self.server._message_capture:
self.server._message_capture.disable()
log.info("报文捕获已禁用")
[文档]
def get_captured_messages(self, count: int = 0) -> List[MessageRecord]:
"""获取捕获的报文列表。
:param count: 要获取的数量,0表示全部
:type count: int
:return: 报文列表
:rtype: List[MessageRecord]
"""
if self.server._message_capture:
return self.server._message_capture.get_all_messages(count)
return []
[文档]
def get_captured_tx_messages(self, count: int = 0) -> List[MessageRecord]:
"""获取捕获的发送报文列表。
:param count: 要获取的数量,0表示全部
:type count: int
:return: 发送报文列表
:rtype: List[MessageRecord]
"""
if self.server._message_capture:
return self.server._message_capture.get_tx_messages(count)
return []
[文档]
def get_captured_rx_messages(self, count: int = 0) -> List[MessageRecord]:
"""获取捕获的接收报文列表。
:param count: 要获取的数量,0表示全部
:type count: int
:return: 接收报文列表
:rtype: List[MessageRecord]
"""
if self.server._message_capture:
return self.server._message_capture.get_rx_messages(count)
return []
[文档]
def get_captured_pairs(self, count: int = 0) -> List[MessagePair]:
"""获取捕获的TX/RX配对列表。
:param count: 要获取的数量,0表示全部
:type count: int
:return: 配对列表
:rtype: List[MessagePair]
"""
if self.server._message_capture:
return self.server._message_capture.get_pairs(count)
return []
[文档]
def clear_captured_messages(self) -> None:
"""清空所有捕获的报文。"""
if self.server._message_capture:
self.server._message_capture.clear()
log.info("捕获的报文已清空")
[文档]
def get_message_capture_stats(self) -> dict:
"""获取报文捕获统计信息。
:return: 统计信息字典
:rtype: dict
"""
if self.server._message_capture:
return self.server._message_capture.get_stats()
return {"enabled": False}