1 Star 1 Fork 24

JefferyZh / SerialGenius

forked from edwinfound / SerialGenius 
加入 Gitee
与超过 1200万 开发者一起发现、参与优秀开源项目,私有仓库也完全免费 :)
免费加入
该仓库未声明开源许可证文件(LICENSE),使用请关注具体项目描述及其代码上游依赖。
克隆/下载
SerialUtil.py 5.02 KB
一键复制 编辑 原始数据 按行查看 历史
# coding:utf-8
import threading
import time
import serial
import Global
import ELog
# 界面
PARITY_NONE, PARITY_EVEN, PARITY_ODD, PARITY_MARK, PARITY_SPACE = 'N', 'E', 'O', 'M', 'S'
STOPBITS_ONE, STOPBITS_ONE_POINT_FIVE, STOPBITS_TWO = (1, 1.5, 2)
FIVEBITS, SIXBITS, SEVENBITS, EIGHTBITS = (5, 6, 7, 8)
PARITY_NAMES = {
PARITY_NONE: 'None',
PARITY_EVEN: 'Even',
PARITY_ODD: 'Odd',
PARITY_MARK: 'Mark',
PARITY_SPACE: 'Space',
}
BAUDRATES = (50, 75, 110, 134, 150, 200, 300, 600, 1200, 1800, 2400, 4800,
9600, 19200, 38400, 57600, 115200, 128000, 192000, 256000,
384000, 512000, 768000, 1024000, 2048000, 4096000)
BYTESIZES = (FIVEBITS, SIXBITS, SEVENBITS, EIGHTBITS)
PARITIES = (PARITY_NONE, PARITY_EVEN, PARITY_ODD, PARITY_MARK, PARITY_SPACE)
STOPBITS = (STOPBITS_ONE, STOPBITS_ONE_POINT_FIVE, STOPBITS_TWO)
class SerialThread:
def __init__(self, port=0, baudrate=9600, bytesize=EIGHTBITS, parity=PARITY_NONE, stopbits=STOPBITS_ONE, timeout=2):
self.l_serial = None
self.port = port
self.baudrate = baudrate
self.bytesize = bytesize
self.parity = parity
self.stopbits = stopbits
self.timeout = timeout
self.exceptStop = False
self.running = False
self.thread_read = None
self.thread_write = None
self.thread_monitor = None
self.writeBuffer = []
'''
self.waitEnd = None
self.port = Port
'''
def start(self):
try:
self.l_serial = serial.Serial()
self.l_serial.port = self.port
self.l_serial.baudrate = self.baudrate
self.l_serial.parity = self.parity
self.l_serial.stopbits = self.stopbits
self.l_serial.timeout = self.timeout
self.l_serial.open()
if self.l_serial.isOpen():
# self.waitEnd = threading.Event()
self.running = True
self.exceptStop = False
self.thread_read = threading.Thread(target=self.serialReader)
self.thread_read.setDaemon(True)
self.thread_read.start()
self.thread_write = threading.Thread(target=self.serialWriter)
self.thread_write.setDaemon(True)
self.thread_write.start()
self.thread_monitor = threading.Thread(target=self.serialMonitor)
self.thread_monitor.setDaemon(True)
self.thread_monitor.start()
return True
else:
return False
except:
ELog.Trace()
return False
def stop(self):
self.running = False
try:
self.thread_read.join()
self.thread_write.join()
if self.l_serial.isOpen():
self.l_serial.close()
except:
pass
def serialMonitor(self):
self.thread_read.join()
self.thread_write.join()
if self.exceptStop:
self.stop()
Global.var['PanelTool'].OnOpenPort(None, True)
def SendAppend(self, data):
# print data
self.writeBuffer.extend(data)
def serialWriter(self):
while self.running:
try:
if len(self.writeBuffer):
c = self.writeBuffer.pop(0)
if 0 <= c <= 255:
self.l_serial.write(chr(c))
# else:
# print '0x%X' % c
else:
time.sleep(0.01)
except:
self.running = False
self.exceptStop = True
ELog.Trace()
def serialReader(self):
while self.running:
try:
n = self.l_serial.inWaiting()
if n:
data = self.l_serial.read(n)
rawData = []
for i in range(0, n):
rawData.append(ord(data[i]))
for i in range(0, len(Global.Modules)):
if Global.var['PanelMain'].modules[i].isUsing:
Global.var['PanelMain'].modules[i].RecvData(rawData)
else:
time.sleep(0.001)
except:
self.running = False
self.exceptStop = True
ELog.Trace()
'''
返回可用的串口
'''
def GetValidPort():
valid_port = []
t_serial = None
for i in range(0, 20):
try:
t_serial = serial.Serial()
t_serial.port = i
t_serial.baudrate = 9600
t_serial.timeout = 2
t_serial.bytesize = EIGHTBITS
t_serial.parity = PARITY_NONE
t_serial.stopbits = STOPBITS_ONE
t_serial.open()
except:
pass
if t_serial.isOpen():
try:
t_serial.close()
except:
pass
valid_port.append(i)
return valid_port
if __name__ == '__main__':
print GetValidPort()
1
https://gitee.com/Jeffiy/SerialGenius.git
git@gitee.com:Jeffiy/SerialGenius.git
Jeffiy
SerialGenius
SerialGenius
master

搜索帮助