这里介绍通过python语言,实现对支持485通讯的电表进行电表读数的采集实战。
串口通讯库采用的是pyserial,电能表采用的是德力西PD606E(支持Modbus、485通讯)
废话不多说,直接上代码:
#--*--coding:utf-8--*--
from PySide2.QtCore import QObject,Signal
from utils.config import SysConfig
from utils.DBPowerMeter import DBPowerMeter
import threading,time,logging,serial,struct,logging
class PowerMeter(QObject):
'''
返回读取到的电表读数
参数:
P1:机号
P2:电表读数,单位W
'''
onPowerMeterResult = Signal(int,int)
__thread:threading.Thread = None
__running:bool = True
__serialPort:str = ""
__baudrate:int = 0
__serial:serial.Serial = None
__initialized:bool = False
__machineList:list = None
__machineLocker:threading.Lock = threading.Lock()
def __init__(self, parent = ...):
super().__init__(parent)
cfg = SysConfig()
pms = cfg.valueByOption("comm","powermeterserial","")
if pms.__len__()==0:
self.__initialized = False
return
parts = pms.split(",")
if parts.__len__()!=2:
self.__initialized = False
return
self.__serialPort = parts[0]
self.__baudrate = int(parts[1])
self.__machineList = []
self.reloadMachineList()
self.__initialized = True
def reloadMachineList(self):
self.__machineLocker.acquire()
try:
db = DBPowerMeter()
data = db.loadMachineAddress()
if data is not None:
self.__machineList = data[:]
except Exception as e:
logging.error(f"重载电表机号列表失败:{e}")
finally:
self.__machineLocker.release()
def isInitialized(self)->bool:
return self.__initialized
def start(self)->bool:
if not self.__initialized:
return False
self.__running = True
self.__thread = threading.Thread(target=self.__run,daemon=True,name="PowerMeterThread")
self.__thread.start()
def __connectToSerial(self)->bool:
logging.debug(f"开始连接电表串口:{self.__serialPort}")
try:
self.__serial = serial.Serial(self.__serialPort,self.__baudrate,timeout=0.1)
if not self.__serial.is_open:
raise Exception("串口未打开")
return True
except Exception as e:
logging.error(f"串口连接失败:{e}")
if self.__serial is not None:
del self.__serial
self.__serial = None
return False
def __run(self):
interval:int = 0
DEF_CAPTURE_INTERVAL:int = 30 #采集间隔时间计时次数,0.1秒*x次数
while self.__running:
while self.__running and not self.__connectToSerial():
time.sleep(1)
while self.__running:
if interval<DEF_CAPTURE_INTERVAL:
interval += 1
time.sleep(0.1)
continue
interval = 0
addrs = None
self.__machineLocker.acquire()
try:
addrs = self.__machineList[:]
except Exception as e:
logging.error(f"电量采集获取机号列表失败:{e}")
continue
finally:
self.__machineLocker.release()
#读取电表
for i in range(addrs.__len__()):
if not self.__running:
break
addr = addrs[i]
if addr<=0:
continue
data = bytearray([addr,0x03,0x00,0x31,0x00,0x01])
crc = PowerMeter.calc_crc16modbus(data)
data.extend(crc)
try:
self.__serial.write(data)
except Exception as e:
logging.error(f"串口发送数据失败:{e}")
break
time.sleep(0.05)
try:
recvBegin = int(time.time()*1000)
while int(time.time()*1000)-recvBegin<500:
if self.__serial.in_waiting>0:
break
time.sleep(0.05)
if self.__serial.in_waiting<=0:
logging.error(f"超时未接收到{addr}电表数据")
continue
recv = self.__serial.read(self.__serial.in_waiting)
if recv.__len__()<9:
logging.error("接收到电表数据长度不足")
continue
except Exception as e:
logging.error(f"串口接收数据失败:{e}")
break
if recv[0]==addr and recv[1]==0x03 and recv[2]==0x04:
recvData = recv[3:7]
recvV = (recvData[0]<<24)|(recvData[1]<<16)|(recvData[2]<<8)|recvData[3]
v:float = struct.unpack("<f",struct.pack("<I",recvV))[0]
wv:int = int(v*1000)
self.onPowerMeterResult.emit(addr,wv) #推送结果
logging.debug(f"电表{addr}读数:{wv}W")
logging.error("电表串口已断开")
if self.__serial is not None:
try:
self.__serial.close()
except:
pass
del self.__serial
self.__serial = None
def shutdown(self):
if self.__thread is not None:
self.__running = False
self.__thread.join(5000)
@staticmethod
def calc_crc16modbus(data: bytes) -> bytes:
"""
用于计算数据的CRC16-MODBUS校验值的函数
:param data: 待计算校验值的数据
:return: 交换了高低字节的计算结果(2字节bytes)
"""
crc = 0xFFFF
for pos in data:
crc ^= pos
for i in range(8):
if (crc & 1) != 0:
crc >>= 1
crc ^= 0xA001
else:
crc >>= 1
return bytes([(crc & 0xFF), (crc >> 8) & 0xFF])以上代码是一个完整封装的采集类。因为界面使用到了PySide2,所以其中有些对PySide2的引用,如果有感兴趣的需要使用代码的朋友,可以将代码稍微修改一下。
以下对核心代码进行详细的讲解:
1、串口通讯的初始化
def __connectToSerial(self)->bool:
logging.debug(f"开始连接电表串口:{self.__serialPort}")
try:
self.__serial = serial.Serial(self.__serialPort,self.__baudrate,timeout=0.1)
if not self.__serial.is_open:
raise Exception("串口未打开")
return True
except Exception as e:
logging.error(f"串口连接失败:{e}")
if self.__serial is not None:
del self.__serial
self.__serial = None
return False以上代码在初始化串口时,传入的timeout=0.1,串口读取时,如果100毫秒内未收到数据,将抛出一个错误,而非一直阻塞等待接收数据。设置超时时间后,将在后面串口数据逻辑设计时更好处理。
2、发送指令读取电量值,并接收返回数据
#读取电表
for i in range(addrs.__len__()):
if not self.__running:
break
addr = addrs[i]
if addr<=0:
continue
data = bytearray([addr,0x03,0x00,0x31,0x00,0x01])
crc = PowerMeter.calc_crc16modbus(data)
data.extend(crc)
try:
self.__serial.write(data)
except Exception as e:
logging.error(f"串口发送数据失败:{e}")
break
time.sleep(0.05)
try:
recvBegin = int(time.time()*1000)
while int(time.time()*1000)-recvBegin<500:
if self.__serial.in_waiting>0:
break
time.sleep(0.05)
if self.__serial.in_waiting<=0:
logging.error(f"超时未接收到{addr}电表数据")
continue
recv = self.__serial.read(self.__serial.in_waiting)
if recv.__len__()<9:
logging.error("接收到电表数据长度不足")
continue
except Exception as e:
logging.error(f"串口接收数据失败:{e}")
break
if recv[0]==addr and recv[1]==0x03 and recv[2]==0x04:
recvData = recv[3:7]
recvV = (recvData[0]<<24)|(recvData[1]<<16)|(recvData[2]<<8)|recvData[3]
v:float = struct.unpack("<f",struct.pack("<I",recvV))[0]
wv:int = int(v*1000)
self.onPowerMeterResult.emit(addr,wv) #推送结果
logging.debug(f"电表{addr}读数:{wv}W")data = bytearray([addr,0x03,0x00,0x31,0x00,0x01]) crc = PowerMeter.calc_crc16modbus(data) data.extend(crc)
这段代码是拼接一个读取电量的读取指令,电表地址为:addr,读取指令为0x03(如果对Modbus指令结构不熟悉的,可以看我的另一篇文章,专门介绍Modbus协议)
recvBegin = int(time.time()*1000)
while int(time.time()*1000)-recvBegin<500:
if self.__serial.in_waiting>0:
break
time.sleep(0.05)
if self.__serial.in_waiting<=0:
logging.error(f"超时未接收到{addr}电表数据")
continue
recv = self.__serial.read(self.__serial.in_waiting)
if recv.__len__()<9:
logging.error("接收到电表数据长度不足")
continue这段代码是等待500毫秒,检测是否有数据返回。并接收数据
if recv[0]==addr and recv[1]==0x03 and recv[2]==0x04:
recvData = recv[3:7]
recvV = (recvData[0]<<24)|(recvData[1]<<16)|(recvData[2]<<8)|recvData[3]
v:float = struct.unpack("<f",struct.pack("<I",recvV))[0]
wv:int = int(v*1000)
self.onPowerMeterResult.emit(addr,wv) #推送结果
logging.debug(f"电表{addr}读数:{wv}W")以上这段代码是对接收到的数据进行数据处理,将读到的值进行解析并通过PySide2的信号onPowerMeterResult推送给调用者。
备案/许可证编号:

wishst