不香不帅的个人主页

最后更新:2026-02-05
原创 Python通过Modbus采集电表电量实战
浏览: 111

这里介绍通过python语言,实现对支持485通讯的电表进行电表读数的采集实战。

串口通讯库采用的是pyserial,电能表采用的是德力西PD606E(支持Modbus、485通讯)


废话不多说,直接上代码:

#--*--coding:utf-8--*--

from PySide2.QtCore import QObject,Signal
from utils.config import SysConfig
from utils.DBPowerMeter import DBPowerMeter
import threading,time,logging,serial,struct,logging

class PowerMeter(QObject):

    '''
    返回读取到的电表读数
    参数:
    P1:机号
    P2:电表读数,单位W
    '''
    onPowerMeterResult = Signal(int,int)

    __thread:threading.Thread = None
    __running:bool = True

    __serialPort:str = ""
    __baudrate:int = 0

    __serial:serial.Serial = None

    __initialized:bool = False
    __machineList:list = None

    __machineLocker:threading.Lock = threading.Lock()

    def __init__(self, parent = ...):
        super().__init__(parent)

        cfg = SysConfig()
        pms = cfg.valueByOption("comm","powermeterserial","")
        if pms.__len__()==0:
            self.__initialized = False
            return
        parts = pms.split(",")
        if parts.__len__()!=2:
            self.__initialized = False
            return
        self.__serialPort = parts[0]
        self.__baudrate = int(parts[1])

        self.__machineList = []

        self.reloadMachineList()

        self.__initialized = True

    def reloadMachineList(self):
        self.__machineLocker.acquire()
        try:
            db = DBPowerMeter()
            data = db.loadMachineAddress()
            if data is not None:
                self.__machineList = data[:]
        except Exception as e:
            logging.error(f"重载电表机号列表失败:{e}")
        finally:
            self.__machineLocker.release()

    def isInitialized(self)->bool:
        return self.__initialized
   
    def start(self)->bool:
        if not self.__initialized:
            return False
       
        self.__running = True
        self.__thread = threading.Thread(target=self.__run,daemon=True,name="PowerMeterThread")
        self.__thread.start()


    def __connectToSerial(self)->bool:
        logging.debug(f"开始连接电表串口:{self.__serialPort}")

        try:
            self.__serial = serial.Serial(self.__serialPort,self.__baudrate,timeout=0.1)
            if not self.__serial.is_open:
                raise Exception("串口未打开")
           
            return True
        except Exception as e:
            logging.error(f"串口连接失败:{e}")
            if self.__serial is not None:
                del self.__serial
                self.__serial = None
            return False
               

    def __run(self):
        interval:int = 0
       
        DEF_CAPTURE_INTERVAL:int = 30 #采集间隔时间计时次数,0.1秒*x次数
       
        while self.__running:
            while self.__running and not self.__connectToSerial():
                time.sleep(1)

            while self.__running:
                if interval<DEF_CAPTURE_INTERVAL:
                    interval += 1
                    time.sleep(0.1)
                    continue
                interval = 0

                addrs = None
                self.__machineLocker.acquire()
                try:
                    addrs = self.__machineList[:]
                except Exception as e:
                    logging.error(f"电量采集获取机号列表失败:{e}")
                    continue
                finally:    
                    self.__machineLocker.release()                    

                #读取电表
                for i in range(addrs.__len__()):
                    if not self.__running:
                        break
                   
                    addr = addrs[i]
                    if addr<=0:
                        continue

                    data = bytearray([addr,0x03,0x00,0x31,0x00,0x01])
                    crc = PowerMeter.calc_crc16modbus(data)
                    data.extend(crc)

                    try:
                        self.__serial.write(data)
                    except Exception as e:
                        logging.error(f"串口发送数据失败:{e}")
                        break

                    time.sleep(0.05)

                    try:
                        recvBegin = int(time.time()*1000)
                        while int(time.time()*1000)-recvBegin<500:
                            if self.__serial.in_waiting>0:
                                break
                            time.sleep(0.05)
                        if self.__serial.in_waiting<=0:
                            logging.error(f"超时未接收到{addr}电表数据")
                            continue

                        recv = self.__serial.read(self.__serial.in_waiting)
                        if recv.__len__()<9:
                            logging.error("接收到电表数据长度不足")
                            continue
                    except Exception as e:
                        logging.error(f"串口接收数据失败:{e}")
                        break

                    if recv[0]==addr and recv[1]==0x03 and recv[2]==0x04:
                        recvData = recv[3:7]
                        recvV = (recvData[0]<<24)|(recvData[1]<<16)|(recvData[2]<<8)|recvData[3]
                        v:float = struct.unpack("<f",struct.pack("<I",recvV))[0]
                        wv:int = int(v*1000)

                        self.onPowerMeterResult.emit(addr,wv)    #推送结果
                        logging.debug(f"电表{addr}读数:{wv}W")

            logging.error("电表串口已断开")
            if self.__serial is not None:
                try:
                    self.__serial.close()
                except:
                    pass
                del self.__serial
                self.__serial = None


    def shutdown(self):
        if self.__thread is not None:
            self.__running = False
            self.__thread.join(5000)

    @staticmethod
    def calc_crc16modbus(data: bytes) -> bytes:
        """
        用于计算数据的CRC16-MODBUS校验值的函数
        :param data: 待计算校验值的数据
        :return: 交换了高低字节的计算结果(2字节bytes)
        """
        crc = 0xFFFF
        for pos in data:
            crc ^= pos
            for i in range(8):
                if (crc & 1) != 0:
                    crc >>= 1
                    crc ^= 0xA001
                else:
                    crc >>= 1

        return bytes([(crc & 0xFF), (crc >> 8) & 0xFF])


以上代码是一个完整封装的采集类。因为界面使用到了PySide2,所以其中有些对PySide2的引用,如果有感兴趣的需要使用代码的朋友,可以将代码稍微修改一下。

以下对核心代码进行详细的讲解:


1、串口通讯的初始化

def __connectToSerial(self)->bool:
        logging.debug(f"开始连接电表串口:{self.__serialPort}")

        try:
            self.__serial = serial.Serial(self.__serialPort,self.__baudrate,timeout=0.1)
            if not self.__serial.is_open:
                raise Exception("串口未打开")
           
            return True
        except Exception as e:
            logging.error(f"串口连接失败:{e}")
            if self.__serial is not None:
                del self.__serial
                self.__serial = None
            return False


以上代码在初始化串口时,传入的timeout=0.1,串口读取时,如果100毫秒内未收到数据,将抛出一个错误,而非一直阻塞等待接收数据。设置超时时间后,将在后面串口数据逻辑设计时更好处理。


2、发送指令读取电量值,并接收返回数据

#读取电表
                for i in range(addrs.__len__()):
                    if not self.__running:
                        break
                   
                    addr = addrs[i]
                    if addr<=0:
                        continue

                    data = bytearray([addr,0x03,0x00,0x31,0x00,0x01])
                    crc = PowerMeter.calc_crc16modbus(data)
                    data.extend(crc)

                    try:
                        self.__serial.write(data)
                    except Exception as e:
                        logging.error(f"串口发送数据失败:{e}")
                        break

                    time.sleep(0.05)

                    try:
                        recvBegin = int(time.time()*1000)
                        while int(time.time()*1000)-recvBegin<500:
                            if self.__serial.in_waiting>0:
                                break
                            time.sleep(0.05)
                        if self.__serial.in_waiting<=0:
                            logging.error(f"超时未接收到{addr}电表数据")
                            continue

                        recv = self.__serial.read(self.__serial.in_waiting)
                        if recv.__len__()<9:
                            logging.error("接收到电表数据长度不足")
                            continue
                    except Exception as e:
                        logging.error(f"串口接收数据失败:{e}")
                        break

                    if recv[0]==addr and recv[1]==0x03 and recv[2]==0x04:
                        recvData = recv[3:7]
                        recvV = (recvData[0]<<24)|(recvData[1]<<16)|(recvData[2]<<8)|recvData[3]
                        v:float = struct.unpack("<f",struct.pack("<I",recvV))[0]
                        wv:int = int(v*1000)

                        self.onPowerMeterResult.emit(addr,wv)    #推送结果
                        logging.debug(f"电表{addr}读数:{wv}W")
data = bytearray([addr,0x03,0x00,0x31,0x00,0x01])
crc = PowerMeter.calc_crc16modbus(data)
data.extend(crc)


这段代码是拼接一个读取电量的读取指令,电表地址为:addr,读取指令为0x03(如果对Modbus指令结构不熟悉的,可以看我的另一篇文章,专门介绍Modbus协议)

recvBegin = int(time.time()*1000)
while int(time.time()*1000)-recvBegin<500:
    if self.__serial.in_waiting>0:
        break
    time.sleep(0.05)
if self.__serial.in_waiting<=0:
    logging.error(f"超时未接收到{addr}电表数据")
    continue

recv = self.__serial.read(self.__serial.in_waiting)
if recv.__len__()<9:
    logging.error("接收到电表数据长度不足")
    continue


这段代码是等待500毫秒,检测是否有数据返回。并接收数据

if recv[0]==addr and recv[1]==0x03 and recv[2]==0x04:
    recvData = recv[3:7]
    recvV = (recvData[0]<<24)|(recvData[1]<<16)|(recvData[2]<<8)|recvData[3]
    v:float = struct.unpack("<f",struct.pack("<I",recvV))[0]
    wv:int = int(v*1000)

    self.onPowerMeterResult.emit(addr,wv)    #推送结果
    logging.debug(f"电表{addr}读数:{wv}W")

以上这段代码是对接收到的数据进行数据处理,将读到的值进行解析并通过PySide2的信号onPowerMeterResult推送给调用者。


提醒:原创不易,如果您需要转载本文。为表对本人的尊重烦请标注出本文的出处。内容包括:作者和本文链接。可直接复制以下内容:

原文作者:<a href='http://www.hn-lxm.com' target="_blank">不香不帅</a>

原文链接: <a href='http://www.hn-lxm.com/article/V2lsbGlhbTM5TGVl.html' title='Python通过Modbus采集电表电量实战' target="_blank">Python通过Modbus采集电表电量实战</a>

相关文章
评论