From 2630d837e84d1d9aa40855af64bea285b7937143 Mon Sep 17 00:00:00 2001 From: bianruifeng <912736557@qq.com> Date: Tue, 24 May 2022 14:31:23 +0800 Subject: [PATCH] =?UTF-8?q?=E5=A2=9E=E5=8A=A0DUF=E6=A8=A1=E5=BC=8F?= =?UTF-8?q?=E7=83=A7=E5=BD=95?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- Package/Firmware Installer.aip | 1757 ++++++++++++++++++++------------ src/avr_isp/__init__.py | 5 + src/avr_isp/errorBase.py | 16 + src/avr_isp/stk500v2.py | 484 +++++---- src/firmwareInstaller.py | 794 +++++++++------ src/pydfu.py | 856 ++++++++++++++++ 打包.bat | 2 +- 7 files changed, 2668 insertions(+), 1246 deletions(-) create mode 100644 src/avr_isp/errorBase.py create mode 100644 src/pydfu.py diff --git a/Package/Firmware Installer.aip b/Package/Firmware Installer.aip index f61557c..4585b9c 100644 --- a/Package/Firmware Installer.aip +++ b/Package/Firmware Installer.aip @@ -2,7 +2,7 @@ - + @@ -34,36 +34,39 @@ - + - + + - + - + - + + - + - + + - - - - - + + + + + @@ -79,24 +82,25 @@ - + - + - + - + - + - + + @@ -107,34 +111,46 @@ - - - + + + + + + - + + - - - - - - - + + + + + + + + + + + - + + + + + @@ -145,11 +161,12 @@ + - + - - + + @@ -157,10 +174,14 @@ + + + - + + @@ -168,7 +189,9 @@ + + @@ -179,47 +202,53 @@ - - - - - - - - - - - - - - - - - - - - - - + + + + + + + + + + + + + + + + + + + + + + + + + + + + - - - - - - - - - - - - - - + + + + + + + + + + + + + + - + @@ -285,6 +314,8 @@ + + @@ -299,217 +330,249 @@ - - - - - - - - + + + + + + + + + + + + + + + - - - - - - - - - - - - - - - - - - - - + + + + + + + + + + + + + + + + + + + + - - - + + + + + + + - - - - + + + + - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + - - - - - - + + + + + + + @@ -530,9 +593,124 @@ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + @@ -552,6 +730,9 @@ + + + @@ -614,17 +795,22 @@ + + + - - - - + + + + - - + + + + - - + + @@ -644,7 +830,7 @@ - + @@ -661,9 +847,9 @@ - + - + @@ -672,28 +858,28 @@ - - + + - - - + + + - - + + - - + + - - - - - - + + + + + + - - + + @@ -715,7 +901,7 @@ - + @@ -750,7 +936,7 @@ - + @@ -785,7 +971,7 @@ - + @@ -934,7 +1120,7 @@ - + @@ -963,10 +1149,10 @@ - - + + - + @@ -1162,10 +1348,10 @@ - + - + @@ -1214,10 +1400,10 @@ - + - + @@ -1271,10 +1457,10 @@ - + - + @@ -1311,10 +1497,10 @@ - + - + @@ -1364,10 +1550,10 @@ - + - + @@ -1415,10 +1601,10 @@ - + - - + + @@ -1428,8 +1614,8 @@ - - + + @@ -1483,7 +1669,7 @@ - + @@ -1494,10 +1680,10 @@ - + - + @@ -1507,53 +1693,53 @@ - - + + - - + + - - - - + + + + - - + + - - + + - - + + - - + + - - + + - - + + - - - - + + + + - - + + - - + + - - + + - - + + @@ -1577,10 +1763,10 @@ - + - - + + @@ -1590,8 +1776,8 @@ - - + + @@ -1600,14 +1786,17 @@ - - + + + + + - - + + - - + + @@ -1629,7 +1818,7 @@ - + @@ -1664,7 +1853,7 @@ - + @@ -1699,7 +1888,7 @@ - + @@ -1848,7 +2037,7 @@ - + @@ -1877,9 +2066,10 @@ - + + - + @@ -2015,6 +2205,7 @@ + @@ -2065,6 +2256,7 @@ + @@ -2075,10 +2267,10 @@ - + - + @@ -2091,6 +2283,7 @@ + @@ -2104,7 +2297,9 @@ + + @@ -2118,6 +2313,7 @@ + @@ -2126,10 +2322,10 @@ - + - + @@ -2139,6 +2335,7 @@ + @@ -2153,6 +2350,7 @@ + @@ -2172,6 +2370,7 @@ + @@ -2182,10 +2381,10 @@ - + - + @@ -2198,6 +2397,7 @@ + @@ -2213,6 +2413,7 @@ + @@ -2221,10 +2422,10 @@ - + - + @@ -2235,6 +2436,7 @@ + @@ -2263,6 +2465,7 @@ + @@ -2273,10 +2476,10 @@ - + - + @@ -2287,6 +2490,7 @@ + @@ -2301,8 +2505,10 @@ - - + + + + @@ -2349,7 +2555,7 @@ - + @@ -2360,10 +2566,10 @@ - + - + @@ -2373,29 +2579,27 @@ - - + + - - - - + + + + - - - + + + - - + + - - + + - - - - + + @@ -2415,7 +2619,7 @@ - + @@ -2432,9 +2636,9 @@ - + - + @@ -2443,47 +2647,325 @@ - - + + - - - + + + - - + + - - + + - - - - - - + + + + + + + - - + + - - + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + - - - - + + + + - - - + + + - - + + - - + + @@ -2599,114 +3081,13 @@ - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - + - + @@ -2810,11 +3191,26 @@ + + + + + + + + + + + + + + + + - @@ -2833,6 +3229,9 @@ + + + @@ -2843,7 +3242,6 @@ - @@ -2852,7 +3250,6 @@ - @@ -2871,44 +3268,43 @@ - + + + + - + - - - - - + - + + + - + + + - - - - - + @@ -2927,129 +3323,22 @@ - + - + - + - - - - - + - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - @@ -3100,7 +3389,7 @@ - + @@ -3141,6 +3430,138 @@ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + diff --git a/src/avr_isp/__init__.py b/src/avr_isp/__init__.py index e69de29..fb9bd4d 100644 --- a/src/avr_isp/__init__.py +++ b/src/avr_isp/__init__.py @@ -0,0 +1,5 @@ +from .chipDB import * +from .errorBase import portError +from .intelHex import * +from .ispBase import * +from .stk500v2 import * \ No newline at end of file diff --git a/src/avr_isp/errorBase.py b/src/avr_isp/errorBase.py new file mode 100644 index 0000000..8c2fbfb --- /dev/null +++ b/src/avr_isp/errorBase.py @@ -0,0 +1,16 @@ +class portError(Exception): + errorInvalid = 0 + errorBusy = 1 + errorOpen = 2 + + def __init__(self, value, port): + self.value = value + self.port = str(port) + + def __str__(self): + if self.value == self.errorInvalid: + return "Invalid serial port : " + self.port + " !" + elif self.value == self.errorBusy: + return "Serial port " + self.port + " is busy!" + elif self.value == self.errorOpen: + return "Serial port " + self.port + " failed to open!" diff --git a/src/avr_isp/stk500v2.py b/src/avr_isp/stk500v2.py index 54220ae..3435c2d 100644 --- a/src/avr_isp/stk500v2.py +++ b/src/avr_isp/stk500v2.py @@ -3,10 +3,10 @@ Created on 2017年8月15日 @author: CreatBot-SW ''' -#=============================================================================== +# =============================================================================== # STK500v2 protocol implementation for programming AVR chips. # The STK500v2 protocol is used by the ArduinoMega2560 and a few other Arduino platforms to load firmware. -#=============================================================================== +# =============================================================================== import struct, sys import time @@ -16,289 +16,279 @@ from PyQt5.QtSerialPort import QSerialPort, QSerialPortInfo from PyQt5.QtWidgets import QApplication from avr_isp import intelHex, ispBase +from .errorBase import portError class Stk500v2(ispBase.IspBase, QSerialPort): - progressCallback = pyqtSignal(int, int) + progressCallback = pyqtSignal(int, int) - def __init__(self): - super(Stk500v2, self).__init__() - self.seq = 1 - self.lastAddr = -1 - self.portInfo = None + def __init__(self): + super(Stk500v2, self).__init__() + self.seq = 1 + self.lastAddr = -1 + self.portInfo = None - def connect(self, port = 'COM4', speed = 115200): - self.portInfo = QSerialPortInfo(port) - self.setPort(self.portInfo) - self.setBaudRate(speed) + def connect(self, port='COM4', speed=115200): + self.portInfo = QSerialPortInfo(port) + self.setPort(self.portInfo) + self.setBaudRate(speed) - if self.portInfo.isNull(): - raise portError(portError.errorInvalid, port) - else: - if self.portInfo.isBusy(): - raise portError(portError.errorBusy, port) - else: - if self.open(QIODevice.ReadWrite): -# self.setBreakEnabled() - self.entryISP() - else: - raise portError(portError.errorOpen, port) + if self.portInfo.isNull(): + raise portError(portError.errorInvalid, port) + else: + if self.portInfo.isBusy(): + raise portError(portError.errorBusy, port) + else: + if self.open(QIODevice.ReadWrite): + # self.setBreakEnabled() + self.entryISP() + else: + raise portError(portError.errorOpen, port) - def close(self): - super(Stk500v2, self).close() - self.portInfo = None + def close(self): + super(Stk500v2, self).close() + self.portInfo = None - def entryISP(self): - self.seq = 1 - # Reset the controller - self.setDataTerminalReady(True) - QThread.msleep(100) - self.setDataTerminalReady(False) - QThread.msleep(200) - self.clear() + def entryISP(self): + self.seq = 1 + # Reset the controller + self.setDataTerminalReady(True) + QThread.msleep(100) + self.setDataTerminalReady(False) + QThread.msleep(200) + self.clear() - recv = self.sendMessage([1])[3:] - if "".join([chr(c) for c in recv]) != "AVRISP_2": - raise ispBase.IspError("Unkonwn bootloaders!") + recv = self.sendMessage([1])[3:] + if "".join([chr(c) for c in recv]) != "AVRISP_2": + raise ispBase.IspError("Unkonwn bootloaders!") - if self.sendMessage([0x10, 0xc8, 0x64, 0x19, 0x20, 0x00, 0x53, 0x03, 0xac, 0x53, 0x00, 0x00]) != [0x10, 0x00]: - raise ispBase.IspError("Failed to enter programming mode!") + if self.sendMessage([0x10, 0xc8, 0x64, 0x19, 0x20, 0x00, 0x53, 0x03, 0xac, 0x53, 0x00, 0x00]) != [0x10, 0x00]: + raise ispBase.IspError("Failed to enter programming mode!") - def leaveISP(self): - if self.portInfo is not None: - if self.sendMessage([0x11]) != [0x11, 0x00]: - raise ispBase.IspError("Failed to leave programming mode!") + def leaveISP(self): + if self.portInfo is not None: + if self.sendMessage([0x11]) != [0x11, 0x00]: + raise ispBase.IspError("Failed to leave programming mode!") - def isConnected(self): - return self.isOpen() + def isConnected(self): + return self.isOpen() - def sendISP(self, data): - recv = self.sendMessage([0x1D, 4, 4, 0, data[0], data[1], data[2], data[3]]) - return recv[2:6] + def sendISP(self, data): + recv = self.sendMessage([0x1D, 4, 4, 0, data[0], data[1], data[2], data[3]]) + return recv[2:6] - def writeFlash(self, flashData): - # Set load addr to 0, in case we have more then 64k flash we need to enable the address extension - pageSize = self.chip['pageSize'] * 2 - flashSize = pageSize * self.chip['pageCount'] - if flashSize > 0xFFFF: - self.sendMessage([0x06, 0x80, 0x00, 0x00, 0x00]) - else: - self.sendMessage([0x06, 0x00, 0x00, 0x00, 0x00]) + def writeFlash(self, flashData): + # Set load addr to 0, in case we have more then 64k flash we need to enable the address extension + pageSize = self.chip['pageSize'] * 2 + flashSize = pageSize * self.chip['pageCount'] + if flashSize > 0xFFFF: + self.sendMessage([0x06, 0x80, 0x00, 0x00, 0x00]) + else: + self.sendMessage([0x06, 0x00, 0x00, 0x00, 0x00]) - loadCount = (len(flashData) + pageSize - 1) // pageSize - for i in range(0, loadCount): - self.sendMessage([0x13, pageSize >> 8, pageSize & 0xFF, 0xc1, 0x0a, 0x40, 0x4c, 0x20, 0x00, 0x00] + flashData[(i * pageSize):(i * pageSize + pageSize)]) - self.progressCallback.emit(i + 1, loadCount * 2) + loadCount = (len(flashData) + pageSize - 1) // pageSize + for i in range(0, loadCount): + self.sendMessage( + [0x13, pageSize >> 8, pageSize & 0xFF, 0xc1, 0x0a, 0x40, 0x4c, 0x20, 0x00, 0x00] + flashData[ + (i * pageSize):( + i * pageSize + pageSize)]) + self.progressCallback.emit(i + 1, loadCount * 2) - def verifyFlash(self, flashData): - # Set load addr to 0, in case we have more then 64k flash we need to enable the address extension - flashSize = self.chip['pageSize'] * 2 * self.chip['pageCount'] - if flashSize > 0xFFFF: - self.sendMessage([0x06, 0x80, 0x00, 0x00, 0x00]) - else: - self.sendMessage([0x06, 0x00, 0x00, 0x00, 0x00]) + def verifyFlash(self, flashData): + # Set load addr to 0, in case we have more then 64k flash we need to enable the address extension + flashSize = self.chip['pageSize'] * 2 * self.chip['pageCount'] + if flashSize > 0xFFFF: + self.sendMessage([0x06, 0x80, 0x00, 0x00, 0x00]) + else: + self.sendMessage([0x06, 0x00, 0x00, 0x00, 0x00]) - loadCount = (len(flashData) + 0xFF) // 0x100 - for i in range(0, loadCount): - recv = self.sendMessage([0x14, 0x01, 0x00, 0x20])[2:0x102] - self.progressCallback.emit(loadCount + i + 1, loadCount * 2) - for j in range(0, 0x100): - if i * 0x100 + j < len(flashData) and flashData[i * 0x100 + j] != recv[j]: - raise ispBase.IspError('Verify error at: 0x%x' % (i * 0x100 + j)) + loadCount = (len(flashData) + 0xFF) // 0x100 + for i in range(0, loadCount): + recv = self.sendMessage([0x14, 0x01, 0x00, 0x20])[2:0x102] + self.progressCallback.emit(loadCount + i + 1, loadCount * 2) + for j in range(0, 0x100): + if i * 0x100 + j < len(flashData) and flashData[i * 0x100 + j] != recv[j]: + raise ispBase.IspError('Verify error at: 0x%x' % (i * 0x100 + j)) - def fastReset(self): - QThread.msleep(50) - self.setDataTerminalReady(True) - self.setDataTerminalReady(False) + def fastReset(self): + QThread.msleep(50) + self.setDataTerminalReady(True) + self.setDataTerminalReady(False) - def sendMessage(self, data): - message = struct.pack(">BBHB", 0x1B, self.seq, len(data), 0x0E) - for c in data: - message += struct.pack(">B", c) - checksum = 0 - for c in message: - checksum ^= c - message += struct.pack(">B", checksum) - try: - self.write(message) - self.flush() - except: - raise ispBase.IspError("Serial send timeout") - self.seq = (self.seq + 1) & 0xFF - - if(self.waitForReadyRead(100)): - return self.recvMessage() - else: - raise ispBase.IspError("Serial recv timeout") + def sendMessage(self, data): + message = struct.pack(">BBHB", 0x1B, self.seq, len(data), 0x0E) + for c in data: + message += struct.pack(">B", c) + checksum = 0 + for c in message: + checksum ^= c + message += struct.pack(">B", checksum) + try: + self.write(message) + self.flush() + except: + raise ispBase.IspError("Serial send timeout") + self.seq = (self.seq + 1) & 0xFF - def recvMessage(self): - state = 'Start' - checksum = 0 - while True: - s = self.read(1) - if(len(s) < 1): - if(self.waitForReadyRead(20)): - continue - else: - raise ispBase.IspError("Serial read timeout") - b = struct.unpack(">B", s)[0] - checksum ^= b - if state == 'Start': - if b == 0x1B: - state = 'GetSeq' - checksum = 0x1B - elif state == 'GetSeq': - state = 'MsgSize1' - elif state == 'MsgSize1': - msgSize = b << 8 - state = 'MsgSize2' - elif state == 'MsgSize2': - msgSize |= b - state = 'Token' - elif state == 'Token': - if b != 0x0E: - state = 'Start' - else: - state = 'Data' - data = [] - elif state == 'Data': - data.append(b) - if len(data) == msgSize: - state = 'Checksum' - elif state == 'Checksum': - if checksum != 0: - state = 'Start' - else: - return data + if (self.waitForReadyRead(100)): + return self.recvMessage() + else: + raise ispBase.IspError("Serial recv timeout") - -class portError(Exception): - errorInvalid = 0 - errorBusy = 1 - errorOpen = 2 - - def __init__(self, value, port): - self.value = value - self.port = str(port) - - def __str__(self): - if self.value == self.errorInvalid: - return "Invalid serial port : " + self.port + " !" - elif self.value == self.errorBusy: - return "Serial port " + self.port + " is busy!" - elif self.value == self.errorOpen: - return "Serial port " + self.port + " failed to open!" + def recvMessage(self): + state = 'Start' + checksum = 0 + while True: + s = self.read(1) + if (len(s) < 1): + if (self.waitForReadyRead(20)): + continue + else: + raise ispBase.IspError("Serial read timeout") + b = struct.unpack(">B", s)[0] + checksum ^= b + if state == 'Start': + if b == 0x1B: + state = 'GetSeq' + checksum = 0x1B + elif state == 'GetSeq': + state = 'MsgSize1' + elif state == 'MsgSize1': + msgSize = b << 8 + state = 'MsgSize2' + elif state == 'MsgSize2': + msgSize |= b + state = 'Token' + elif state == 'Token': + if b != 0x0E: + state = 'Start' + else: + state = 'Data' + data = [] + elif state == 'Data': + data.append(b) + if len(data) == msgSize: + state = 'Checksum' + elif state == 'Checksum': + if checksum != 0: + state = 'Start' + else: + return data class stk500v2Thread(QThread): - stateCallback = pyqtSignal([str], [Exception]) + stateCallback = pyqtSignal([str], [Exception]) - def __init__(self, parent, port, speed, filename, callback = None): - super(stk500v2Thread, self).__init__() - self.parent = parent - self.port = port - self.speed = speed - self.filename = filename - self.callback = callback - self.programmer = None - self.isWork = False - self.finished.connect(self.done) + def __init__(self, parent, port, speed, filename, callback=None): + super(stk500v2Thread, self).__init__() + self.parent = parent + self.port = port + self.speed = speed + self.filename = filename + self.callback = callback + self.programmer = None + self.isWork = False + self.finished.connect(self.done) - def run(self): - self.isWork = True - try: - self.programmer = Stk500v2() - if self.callback is not None: - self.programmer.progressCallback.connect(self.callback) + def run(self): - if self.parent is None: - runProgrammer(self.port, self.speed, self.filename, self.programmer) - else: - self.stateCallback[str].emit(self.tr("Connecting...")) - self.msleep(200) - self.programmer.connect(self.port, self.speed) + try: + self.isWork = True + self.programmer = Stk500v2() + if self.callback is not None: + self.programmer.progressCallback.connect(self.callback) - self.stateCallback[str].emit(self.tr("Programming...")) - self.programmer.programChip(intelHex.readHex(self.filename)) - except Exception as e: - if(self.isInterruptionRequested()): - print("Int") - else: - if self.parent is not None: - self.stateCallback[Exception].emit(e) - while(self.isWork): - pass # 等待父进程处理异常 - else: - raise e - self.isWork = False - finally: - if self.programmer.isConnected(): - self.programmer.fastReset() - self.programmer.close() - self.programmer = None + if self.parent is None: + runProgrammer(self.port, self.speed, self.filename, self.programmer) + else: + self.stateCallback[str].emit(self.tr("Connecting..")) + self.msleep(200) + self.programmer.connect(self.port, self.speed) - def isReady(self): - return self.programmer is not None and self.programmer.isConnected() + self.stateCallback[str].emit(self.tr("Programming...")) + self.programmer.programChip(intelHex.readHex(self.filename)) - def done(self): - if self.parent is not None: - if(self.isWork): - self.isWork = False - self.stateCallback[str].emit(self.tr("Done!")) - else: - print("Success!") - else: - print("Failure!") + except Exception as e: + if self.isInterruptionRequested(): + print("Int") + else: + if self.parent is not None: + self.stateCallback[Exception].emit(e) + # while self.isWork: + # pass # 等待父进程处理异常 + else: + raise e + self.isWork = False + finally: + self.isWork = False + if self.programmer.isConnected(): + self.programmer.fastReset() + self.programmer.close() + self.programmer = None - def terminate(self): - if self.programmer is not None: - self.requestInterruption() - self.programmer.close() - self.programmer = None - return super(stk500v2Thread, self).terminate() + def isReady(self): + return self.programmer is not None and self.programmer.isConnected() + + def done(self): + print("结束烧录程序") + if self.parent is not None: + if self.isWork: + self.isWork = False + self.stateCallback[str].emit(self.tr("Done!")) + else: + print("Success!") + else: + print("Failure!") + + def terminate(self): + if self.programmer is not None: + self.requestInterruption() + self.programmer.close() + self.programmer = None + return super(stk500v2Thread, self).terminate() -def runProgrammer(port, speed, filename, programmer = None): - """ Run an STK500v2 program on serial port 'port' and write 'filename' into flash. """ - if programmer is None: - programmer = Stk500v2() - programmer.connect(port = port, speed = speed) - programmer.programChip(intelHex.readHex(filename)) - programmer.close() +def runProgrammer(port, speed, filename, programmer=None): + """ Run an STK500v2 program on serial port 'port' and write 'filename' into flash. """ + if programmer is None: + programmer = Stk500v2() + programmer.connect(port=port, speed=speed) + programmer.programChip(intelHex.readHex(filename)) + programmer.close() def main(): - """ Entry point to call the stk500v2 programmer from the commandline. """ - programmer = Stk500v2() - try: - if(len(sys.argv) > 2): - programmer.connect(sys.argv[1]) - programmer.programChip(intelHex.readHex(sys.argv[2])) - else: - programmer.connect("COM4") - programmer.programChip(intelHex.readHex("D:/OneDrive/Desktop/CreatBot F160 01 EN KTC ( AUTO_LEVELING ).hex")) - except portError as e: - print(e.value) - print("PortError: " + str(e)) - except ispBase.IspError as e: - print("IspError: " + str(e)) - except intelHex.formatError as e: - print("HexError: " + str(e)) - finally: - programmer.close() + """ Entry point to call the stk500v2 programmer from the commandline. """ + programmer = Stk500v2() + try: + if len(sys.argv) > 2: + programmer.connect(sys.argv[1]) + programmer.programChip(intelHex.readHex(sys.argv[2])) + else: + programmer.connect("COM4") + programmer.programChip( + intelHex.readHex("D:/OneDrive/Desktop/CreatBot F160 01 EN KTC ( AUTO_LEVELING ).hex")) + except portError as e: + print(e.value) + print("PortError: " + str(e)) + except ispBase.IspError as e: + print("IspError: " + str(e)) + except intelHex.formatError as e: + print("HexError: " + str(e)) + finally: + programmer.close() if __name__ == '__main__': -# main() -# runProgrammer("COM4", 115200, "D:/OneDrive/Desktop/CreatBot F160 01 EN KTC ( AUTO_LEVELING ).hex") + # main() + # runProgrammer("COM4", 115200, "D:/OneDrive/Desktop/CreatBot F160 01 EN KTC ( AUTO_LEVELING ).hex") - app = QApplication(sys.argv) - task = stk500v2Thread(None, "COM4", 115200, "D:/OneDrive/Desktop/CreatBot F160 01 EN KTC ( AUTO_LEVELING ).hex") -# task = stk500v2Thread(None, "COM4", 115200, "D:/OneDrive/Desktop/test.hex") - try: - task.start() - except Exception as e: - print(e) - - sys.exit(app.exec()) + app = QApplication(sys.argv) + task = stk500v2Thread(None, "COM4", 115200, "D:/OneDrive/Desktop/CreatBot F160 01 EN KTC ( AUTO_LEVELING ).hex") + # task = stk500v2Thread(None, "COM4", 115200, "D:/OneDrive/Desktop/test.hex") + try: + task.start() + except Exception as e: + print(e) + sys.exit(app.exec()) diff --git a/src/firmwareInstaller.py b/src/firmwareInstaller.py index c05886b..c4a794a 100644 --- a/src/firmwareInstaller.py +++ b/src/firmwareInstaller.py @@ -6,414 +6,548 @@ Created on 2017年8月15日 import os import sys +import time +from PyQt5 import QtSerialPort, QtCore, QtWidgets from PyQt5.Qt import pyqtSignal from PyQt5.QtCore import QSize, QDir, QTimer from PyQt5.QtGui import QIcon from PyQt5.QtSerialPort import QSerialPortInfo from PyQt5.QtWidgets import QApplication, QVBoxLayout, QGroupBox, \ - QRadioButton, QGridLayout, QWidget, QProgressBar, QStatusBar, QComboBox, QLabel, \ - QHBoxLayout, QLineEdit, QPushButton, QFileDialog, QCheckBox + QRadioButton, QGridLayout, QWidget, QProgressBar, QStatusBar, QComboBox, QLabel, \ + QHBoxLayout, QLineEdit, QPushButton, QFileDialog, QCheckBox from avr_isp.intelHex import formatError from avr_isp.ispBase import IspError from avr_isp.stk500v2 import stk500v2Thread, portError +# from DfuseTool import DfuseTool as DFUse +from pydfu import DFUTool as DFUse if getattr(sys, 'frozen', False): - bundle_dir = sys._MEIPASS + bundle_dir = sys._MEIPASS else: - bundle_dir = os.path.dirname(os.path.abspath(__file__)) + bundle_dir = os.path.dirname(os.path.abspath(__file__)) def portListAll(): - return [port.portName() for port in QSerialPortInfo.availablePorts()] + return [port.portName() for port in QSerialPortInfo.availablePorts()] def portList(): - return [port.portName() for port in QSerialPortInfo.availablePorts() if port.description() == "Arduino Mega 2560"] + return [port.portName() for port in QSerialPortInfo.availablePorts() if port.description() == "Arduino Mega 2560"] class countLabel(QLabel): - def __init__(self, text): - super(countLabel, self).__init__(text) + def __init__(self, text): + super(countLabel, self).__init__(text) - def mouseDoubleClickEvent(self, e): - self.setText("0") - return super(countLabel, self).mouseDoubleClickEvent(e) + def mouseDoubleClickEvent(self, e): + self.setText("0") + return super(countLabel, self).mouseDoubleClickEvent(e) class portCombox(QComboBox): - - showPopupSignal = pyqtSignal(bool) - - def __init__(self): - super(portCombox, self).__init__() - - def showPopup(self): - self.showPopupSignal.emit(True) - self.setSizeAdjustPolicy(QComboBox.AdjustToContents) - return super(portCombox, self).showPopup() + showPopupSignal = pyqtSignal(bool) + + def __init__(self): + super(portCombox, self).__init__() + + def showPopup(self): + self.showPopupSignal.emit(True) + self.setSizeAdjustPolicy(QComboBox.AdjustToContents) + return super(portCombox, self).showPopup() + class mainWindow(QWidget): - def __init__(self): - super(mainWindow, self).__init__() - self.setWindowTitle(self.tr("Firmware Installer")) - self.setWindowIcon(QIcon(os.path.join(bundle_dir, "ico.ico"))) - self.setFixedSize(QSize(480, 240)) - self.setAcceptDrops(True) + def __init__(self): + super(mainWindow, self).__init__() + self.setWindowTitle(self.tr("Firmware Installer")) + self.setWindowIcon(QIcon(os.path.join(bundle_dir, "ico.ico"))) + self.setFixedSize(QSize(480, 240)) + self.setAcceptDrops(True) - self.portUpdateTimer = QTimer() - self.portUpdateTimer.timeout.connect(self.portUpdate) - self.portUpdateTimer.start(100) - self.autoTimer = QTimer() - self.autoTimer.setSingleShot(True) - self.autoTimer.timeout.connect(self.installFile) - self.task = None + self.portUpdateTimer = QTimer() + self.portUpdateTimer.timeout.connect(self.portUpdate) + self.portUpdateTimer.start(200) - self.initUI() - self.configUI() - self.resize() + self.autoTimer = QTimer() + # self.autoTimer.setSingleShot(True) + self.autoTimer.timeout.connect(self.portUpdate) + # self.autoTimer.start(1000) - def initUI(self): - mainLayout = QVBoxLayout() + self.task = None - self.portBox = QGroupBox(self.tr("Port Selection")) - self.fileBox = QGroupBox(self.tr("File Selection")) - self.ctrlBox = QGroupBox(self.tr("Control")) + self.initUI() + self.configUI() + self.resize() - # PortBox widget - self.autoRadio = QRadioButton(self.tr("Auto")) - self.manualRadio = QRadioButton(self.tr("Manual")) + def initUI(self): + mainLayout = QVBoxLayout() - self.manualBox = QGroupBox() - manualLayout = QHBoxLayout() - self.manualBox.setLayout(manualLayout) + self.portBox = QGroupBox(self.tr("Port Selection")) + self.fileBox = QGroupBox(self.tr("File Selection")) + self.ctrlBox = QGroupBox(self.tr("Control")) - self.portCombo = portCombox() - self.baudCombo = QComboBox() - manualLayout.addWidget(QLabel(self.tr("Port:"))) - manualLayout.addWidget(self.portCombo) - manualLayout.addStretch() - manualLayout.addWidget(QLabel(self.tr("Baudrate:"))) - manualLayout.addWidget(self.baudCombo) - manualLayout.addStretch() + # PortBox widget + self.autoRadio = QRadioButton(self.tr("Auto")) + self.manualRadio = QRadioButton(self.tr("Manual")) - portLayout = QGridLayout() - self.portBox.setLayout(portLayout) - portLayout.addWidget(self.autoRadio, 0, 0) - portLayout.addWidget(self.manualRadio, 0, 1) - portLayout.addWidget(self.manualBox, 1, 0, 1, 2) + self.manualBox = QGroupBox() + manualLayout = QHBoxLayout() + self.manualBox.setLayout(manualLayout) - # FileBox widget - self.file = QLineEdit() - self.file.setPlaceholderText("Please select a hex file.") - self.file.setReadOnly(True) - self.file.setFrame(False) - self.file.setDragEnabled(True) -# self.file.setDisabled(True) - self.fileBtn = QPushButton(self.tr("&Open")) + self.portCombo = portCombox() # 端口号下拉选框 + self.baudCombo = QComboBox() # 波特率 + manualLayout.addWidget(QLabel(self.tr("Port:"))) + manualLayout.addWidget(self.portCombo) + manualLayout.addStretch() + manualLayout.addWidget(QLabel(self.tr("Baudrate:"))) + manualLayout.addWidget(self.baudCombo) + manualLayout.addStretch() - fileLayout = QHBoxLayout() - self.fileBox.setLayout(fileLayout) - fileLayout.addWidget(QLabel(self.tr("Hex file:"))) - fileLayout.addWidget(self.file) - fileLayout.addWidget(self.fileBtn) + portLayout = QGridLayout() + self.portBox.setLayout(portLayout) + portLayout.addWidget(self.autoRadio, 0, 0) + portLayout.addWidget(self.manualRadio, 0, 1) + portLayout.addWidget(self.manualBox, 1, 0, 1, 2) - # CtrlBox widget - self.installBtn = QPushButton(self.tr("&Install")) - self.stopBtn = QPushButton(self.tr("&Stop")) - self.autoCheck = QCheckBox(self.tr("Auto Install")) - self.autoTimeLabel = QLabel(self.tr("Idle Time:")) - self.autoTimeLabel2 = QLabel(self.tr("s")) - self.autoTime = QLineEdit("2") - self.autoTime.setInputMask("00") - self.autoTime.setMaximumWidth(20) + # FileBox widget + self.file = QLineEdit() + self.file.setPlaceholderText("Please select a firmware file.") + self.file.setReadOnly(True) + self.file.setFrame(False) + self.file.setDragEnabled(True) + # self.file.setDisabled(True) + self.fileBtn = QPushButton(self.tr("&Open")) - layout1 = QVBoxLayout() - layout1.addWidget(self.autoCheck) - layout2 = QHBoxLayout() - layout2.addWidget(self.autoTimeLabel) - layout2.addWidget(self.autoTime) - layout2.addWidget(self.autoTimeLabel2) - layout1.addLayout(layout2) - self.autoInstallWidget = QWidget() - self.autoInstallWidget.setLayout(layout1) + fileLayout = QHBoxLayout() + self.fileBox.setLayout(fileLayout) + fileLayout.addWidget(QLabel(self.tr("firmware:"))) + fileLayout.addWidget(self.file) + fileLayout.addWidget(self.fileBtn) - ctrlLayout = QHBoxLayout() - self.ctrlBox.setLayout(ctrlLayout) + # CtrlBox widget + self.installBtn = QPushButton(self.tr("&Install")) + self.stopBtn = QPushButton(self.tr("&Stop")) + self.autoCheck = QCheckBox(self.tr("Auto Install")) + self.autoTimeLabel = QLabel(self.tr("Idle Time:")) + self.autoTimeLabel2 = QLabel(self.tr("s")) + self.autoTime = QLineEdit("3") + self.autoTime.setInputMask("00") + self.autoTime.setMaximumWidth(20) - ctrlLayout.addWidget(self.autoInstallWidget) - ctrlLayout.addStretch() - ctrlLayout.addWidget(self.installBtn) - ctrlLayout.addStretch() - ctrlLayout.addWidget(self.stopBtn) - ctrlLayout.addStretch() + layout1 = QVBoxLayout() + layout1.addWidget(self.autoCheck) + layout2 = QHBoxLayout() + layout2.addWidget(self.autoTimeLabel) + layout2.addWidget(self.autoTime) + layout2.addWidget(self.autoTimeLabel2) + layout1.addLayout(layout2) + self.autoInstallWidget = QWidget() + self.autoInstallWidget.setLayout(layout1) - # 进度条和状态栏 - self.progress = QProgressBar() - self.progress.setTextVisible(False) - self.progress.setRange(0, 100) - self.statusBar = QStatusBar() - self.statusBar.setSizeGripEnabled(False) + ctrlLayout = QHBoxLayout() + self.ctrlBox.setLayout(ctrlLayout) - self.tryAgainLabel = QLabel(self.tr("Try again...")) - self.tryAgain = QLabel("0") - self.statusBar.addPermanentWidget(self.tryAgainLabel) - self.statusBar.addPermanentWidget(self.tryAgain) + ctrlLayout.addWidget(self.autoInstallWidget) + ctrlLayout.addStretch() + ctrlLayout.addWidget(self.installBtn) + ctrlLayout.addStretch() + ctrlLayout.addWidget(self.stopBtn) + ctrlLayout.addStretch() - # 计数栏 - self.countBar = QStatusBar() - self.countBar.setSizeGripEnabled(False) - self.countBar.setStyleSheet("QStatusBar::item { border: 0 } QLabel {border:0; font-size: 14px; font-weight: bold}") - countSuccessLabel = QLabel("Success: ") - countFailureLabel = QLabel("Failure: ") - self.countSuccess = countLabel("0") - self.countFailure = countLabel("0") - countSuccessLabel.setStyleSheet("QLabel {color: green}") - self.countSuccess.setStyleSheet("QLabel {color: green}") - countFailureLabel.setStyleSheet("QLabel {color: red}") - self.countFailure.setStyleSheet("QLabel {color: red}") + # 进度条和状态栏 + self.progress = QProgressBar() + self.progress.setTextVisible(False) + self.progress.setRange(0, 100) + self.statusBar = QStatusBar() + self.statusBar.setSizeGripEnabled(False) -# print(self.countFailure.mouseDoubleClickEvent()) - self.url = QLabel("www.CreatBot.com") - self.url.setOpenExternalLinks(True) + self.tryAgainLabel = QLabel(self.tr("Try again...")) + self.tryAgain = QLabel("0") + self.statusBar.addPermanentWidget(self.tryAgainLabel) + self.statusBar.addPermanentWidget(self.tryAgain) - self.countBar.addWidget(QLabel(""), 1) - self.countBar.addWidget(countSuccessLabel) - self.countBar.addWidget(self.countSuccess, 1) - self.countBar.addWidget(countFailureLabel) - self.countBar.addWidget(self.countFailure, 1) - self.countBar.addWidget(self.url) - self.countBar.addWidget(QLabel(""), 1) + # 计数栏 + self.countBar = QStatusBar() + self.countBar.setSizeGripEnabled(False) + self.countBar.setStyleSheet( + "QStatusBar::item { border: 0 } QLabel {border:0; font-size: 14px; font-weight: bold}") + countSuccessLabel = QLabel("Success: ") + countFailureLabel = QLabel("Failure: ") + self.countSuccess = countLabel("0") + self.countFailure = countLabel("0") + countSuccessLabel.setStyleSheet("QLabel {color: green}") + self.countSuccess.setStyleSheet("QLabel {color: green}") + countFailureLabel.setStyleSheet("QLabel {color: red}") + self.countFailure.setStyleSheet("QLabel {color: red}") - # MainLayout - mainLayout.addWidget(self.portBox) - mainLayout.addWidget(self.fileBox) - mainLayout.addWidget(self.ctrlBox) - mainLayout.addWidget(self.progress) - mainLayout.addWidget(self.statusBar) - mainLayout.addWidget(self.countBar) - self.setLayout(mainLayout) + # print(self.countFailure.mouseDoubleClickEvent()) + self.url = QLabel("www.CreatBot.com") + self.url.setOpenExternalLinks(True) - def configUI(self): - self.baudCombo.addItems([str(baud) for baud in QSerialPortInfo.standardBaudRates()]) + self.countBar.addWidget(QLabel(""), 1) + self.countBar.addWidget(countSuccessLabel) + self.countBar.addWidget(self.countSuccess, 1) + self.countBar.addWidget(countFailureLabel) + self.countBar.addWidget(self.countFailure, 1) + self.countBar.addWidget(self.url) + self.countBar.addWidget(QLabel(""), 1) - self.progress.hide() - self.statusBar.hide() - self.stopBtn.setDisabled(True) - self.portCombo.show() - self.autoRadio.toggled.connect(self.manualBox.setDisabled) - self.autoRadio.toggled.connect(self.autoInstallWidget.setVisible) - self.autoRadio.toggled.connect(self.resize) - self.manualRadio.clicked.connect(self.disableAutoInstall) - self.manualRadio.clicked.connect(self.resize) - self.autoCheck.toggled.connect(self.autoTimeLabel.setEnabled) - self.autoCheck.toggled.connect(self.autoTime.setEnabled) - self.autoCheck.toggled.connect(self.autoTimeLabel2.setEnabled) - self.autoCheck.stateChanged.connect(self.autoStateChangeAction) - self.portCombo.showPopupSignal.connect(self.portUpdate) - self.autoTime.returnPressed.connect(self.autoTimeChangeAction) - self.statusBar.messageChanged.connect(self.stateClearAction) + # MainLayout + mainLayout.addWidget(self.portBox) + mainLayout.addWidget(self.fileBox) + mainLayout.addWidget(self.ctrlBox) + mainLayout.addWidget(self.progress) + mainLayout.addWidget(self.statusBar) + mainLayout.addWidget(self.countBar) + self.setLayout(mainLayout) - self.autoCheck.click() - self.autoCheck.click() - self.autoRadio.click() + def configUI(self): + self.baudCombo.addItems([str(baud) for baud in QSerialPortInfo.standardBaudRates()]) - self.fileBtn.clicked.connect(self.selectFile) - self.installBtn.clicked.connect(self.installFile) - self.installBtn.setFocus() - self.stopBtn.clicked.connect(self.stopInstall) + self.progress.hide() + self.statusBar.hide() + self.stopBtn.setDisabled(True) + self.portCombo.show() - self.file.__class__.dragEnterEvent = self.dragEnterEvent + self.autoRadio.toggled.connect(self.manualBox.setDisabled) + self.autoRadio.toggled.connect(self.autoInstallWidget.setVisible) + self.autoRadio.toggled.connect(self.resize) - def portUpdate(self, forceUpdate = False): - if self.autoRadio.isChecked(): - self.baudCombo.setCurrentText("115200") -# self.portCombo.addItems(portList()) - self.portCombo.clear() - for port in QSerialPortInfo.availablePorts(): - if port.description() not in ["Arduino Mega 2560", "USB-SERIAL CH340"]: # 过滤2560和CH340 - continue - self.portCombo.addItem(port.portName() + " (" + port.description() + ")" , port.portName()) - else: - currentPortData = self.portCombo.currentData() - if forceUpdate or (currentPortData and currentPortData not in [port.portName() for port in QSerialPortInfo.availablePorts()]): - self.portCombo.clear() - for port in QSerialPortInfo.availablePorts(): - self.portCombo.addItem(port.portName() + " (" + port.description() + ")" , port.portName()) - self.portCombo.setCurrentIndex(self.portCombo.findData(currentPortData)) - - self.portCombo.setSizeAdjustPolicy(QComboBox.AdjustToContents) - self.baudCombo.setSizeAdjustPolicy(QComboBox.AdjustToContents) + self.manualRadio.clicked.connect(self.disableAutoInstall) + self.manualRadio.clicked.connect(self.resize) - def disableAutoInstall(self): - self.autoCheck.setChecked(False) + self.autoCheck.toggled.connect(self.autoTimeLabel.setEnabled) + self.autoCheck.toggled.connect(self.autoTime.setEnabled) + self.autoCheck.toggled.connect(self.autoTimeLabel2.setEnabled) + self.autoCheck.stateChanged.connect(self.autoStateChangeAction) - def autoStateChangeAction(self, check): - if not check and self.autoTimer.remainingTime() > 0: - self.stopInstall() + self.portCombo.showPopupSignal.connect(self.portUpdate) # 弹出下拉选框,更新可用端口列表 + self.autoTime.returnPressed.connect(self.autoTimeChangeAction) + self.statusBar.messageChanged.connect(self.stateClearAction) - def autoTimeChangeAction(self): - self.autoTime.clearFocus() - if self.autoCheck.isChecked() and self.autoTimer.remainingTime() > 0: - self.autoTimer.stop() - self.installFile() + # 默认选中 + self.autoRadio.click() + # self.manualRadio.click() + # self.autoCheck.click() - def stateClearAction(self, msg): - if msg is "" and self.statusBar.isVisible(): - self.statusBar.hide() - if msg is not "" and self.statusBar.isHidden(): - self.statusBar.show() - self.resize() + self.fileBtn.clicked.connect(self.selectFile) + self.installBtn.clicked.connect(self.installFile) + self.installBtn.setFocus() + self.stopBtn.clicked.connect(self.stopInstall) - def selectFile(self): - hexFileDialog = QFileDialog() - hexFileDialog.setWindowTitle(self.tr("Select hex file")) - hexFileDialog.setNameFilter(self.tr("Hex files(*.hex)")) - hexFileDialog.setFileMode(QFileDialog.ExistingFile) - if(self.file.text() is ""): - hexFileDialog.setDirectory(QDir.home()) # 设置为Home目录 - else: - fileDir = QDir(self.file.text()) - fileDir.cdUp() - if(fileDir.exists()): - hexFileDialog.setDirectory(fileDir) # 设置为当前文件所在目录 - else: - hexFileDialog.setDirectory(QDir.home()) - if(hexFileDialog.exec()): - self.file.setText(hexFileDialog.selectedFiles()[0]) + self.file.__class__.dragEnterEvent = self.dragEnterEvent - def installFile(self, notFromButton = True): - if(self.file.text() is ""): - if(not notFromButton): - self.selectFile() - self.installFile(True) - else: - if self.autoTimer.remainingTime() is not -1: - self.autoTimer.stop() + def portUpdate(self, forceUpdate=False): + """ Auto 监听端口 """ + print("search port") + if self.autoRadio.isChecked(): - self.portBox.setDisabled(True) - self.fileBox.setDisabled(True) - self.installBtn.setDisabled(True) - self.stopBtn.setEnabled(True) - self.progress.show() - self.resize() + self.baudCombo.setCurrentText("115200") + # self.portCombo.addItems(portList()) + # self.portCombo.clear() + port_list = QSerialPortInfo.availablePorts() + for port in port_list: + print(':--:', port.portName(), port.description()) + if port.description() not in ["Arduino Mega 2560", "USB-SERIAL CH340"]: # 过滤2560和CH340 + continue + self.portCombo.addItem(port.portName() + " (" + port.description() + ")", port.portName()) - self.task = stk500v2Thread(self, self.portCombo.currentData(), int(self.baudCombo.currentText()), self.file.text(), self.progressUpdate) - self.task.stateCallback[str].connect(self.stateUpdate) - self.task.stateCallback[Exception].connect(self.stateUpdate) - self.task.finished.connect(self.autoAction) - self.task.start() - self.statusBar.showMessage(" ") + if len(port_list) > 0: + port = port_list[0] + info = QSerialPortInfo(port) + print(info.productIdentifier()) + if info.productIdentifier() == 0: + return + if not self.installBtn.isEnabled(): + self.installFile() - def stopInstall(self, succeed = False, autoInstall = False): - if autoInstall: - self.progress.reset() - else: - self.portBox.setEnabled(True) - self.fileBox.setEnabled(True) - self.installBtn.setEnabled(True) - self.stopBtn.setDisabled(True) - self.progress.reset() - self.progress.hide() - self.tryAgain.setText("0") - self.resize() + else: + currentPortData = self.portCombo.currentData() + if forceUpdate or (currentPortData and currentPortData not in [port.portName() for port in + QSerialPortInfo.availablePorts()]): + self.portCombo.clear() + for port in QSerialPortInfo.availablePorts(): + self.portCombo.addItem(port.portName() + " (" + port.description() + ")", port.portName()) + self.portCombo.setCurrentIndex(self.portCombo.findData(currentPortData)) - if succeed: - self.countSuccess.setText(str(int(self.countSuccess.text()) + 1)) - self.task = None - else: - if self.autoTimer.remainingTime() is not -1: - self.autoTimer.stop() - if self.task is not None and self.task.isRunning(): - self.task.finished.disconnect() - if self.task.isReady(): - self.countFailure.setText(str(int(self.countFailure.text()) + 1)) + self.portCombo.setSizeAdjustPolicy(QComboBox.AdjustToContents) + self.baudCombo.setSizeAdjustPolicy(QComboBox.AdjustToContents) - if(self.task.isInterruptionRequested()): - pass - else: -# self.task.requestInterruption() - self.task.terminate() - self.statusBar.clearMessage() -# self.task = None - else: - self.statusBar.clearMessage() + def disableAutoInstall(self): + self.autoCheck.setChecked(False) - def autoAction(self): - self.stopInstall(True, self.autoCheck.isChecked()) - if self.autoCheck.isChecked(): - self.autoTimer.start(int(self.autoTime.text()) * 1000) + def autoStateChangeAction(self, check): + if not check and self.autoTimer.remainingTime() > 0: + self.stopInstall() - def resize(self): - self.setFixedHeight(self.sizeHint().height()) + if check: + # 开启自动安装的端口扫描 + self.portUpdateTimer.stop() + self.autoTimer.start(int(self.autoTime.text())*1000) + else: + self.autoTimer.stop() + self.portUpdateTimer.start(200) - def progressUpdate(self, cur, total): - self.progress.setMaximum(total) - self.progress.setValue(cur) + def autoTimeChangeAction(self): + """ 修改时间间隔 """ + self.autoTime.clearFocus() + if self.autoCheck.isChecked() and self.autoTimer.remainingTime() > 0: + self.autoTimer.stop() + self.installFile() - def stateUpdate(self, stateOrError): - self.tryAgainLabel.setHidden(True) - self.tryAgain.setHidden(True) - if self.task.isReady(): - self.tryAgain.setText("0") + def stateClearAction(self, msg): + """ 清理状态栏 """ + if msg == "" and self.statusBar.isVisible(): + self.statusBar.hide() + if msg != "" and self.statusBar.isHidden(): + self.statusBar.show() + self.resize() - if type(stateOrError) == str: -# self.statusBar.setStyleSheet("QStatusBar::item { border: 0 } QLabel {color: red; font-weight: bold}") - self.statusBar.setStyleSheet("QStatusBar {font-weight: bold; color: black} QStatusBar::item { border: 0 } QLabel {font-weight: bold}") - if self.task is not None and not self.task.isWork and not self.autoCheck.isChecked(): - self.statusBar.showMessage(stateOrError, 3000) - else: - self.statusBar.showMessage(stateOrError) - else: - self.task.requestInterruption() - self.statusBar.setStyleSheet("QStatusBar {font-weight: bold; color: red} QStatusBar::item { border: 0 } QLabel {font-weight: bold; color: red}") + def selectFile(self): + """ 选择固件文件 """ + hexFileDialog = QFileDialog() + hexFileDialog.setWindowTitle(self.tr("Select firmware file")) + # filesFilter = "firmware file (*.hex)|*.bin|" "All files (*.*)|*.*" + filesFilter = "firmware file (*.hex | *.bin)" + hexFileDialog.setNameFilter(self.tr(filesFilter)) + hexFileDialog.setFileMode(QFileDialog.ExistingFile) + if (self.file.text() == ""): + hexFileDialog.setDirectory(QDir.home()) # 设置为Home目录 + else: + fileDir = QDir(self.file.text()) + fileDir.cdUp() + if (fileDir.exists()): + hexFileDialog.setDirectory(fileDir) # 设置为当前文件所在目录 + else: + hexFileDialog.setDirectory(QDir.home()) + if (hexFileDialog.exec()): + self.file.setText(hexFileDialog.selectedFiles()[0]) - if type(stateOrError) == portError: - if (stateOrError.value in [portError.errorInvalid, portError.errorBusy]) and (int(self.tryAgain.text()) < 20): - self.statusBar.showMessage("PortError: " + str(stateOrError)) - self.tryAgain.setText(str(int(self.tryAgain.text()) + 1)) - self.tryAgainLabel.setVisible(True) - self.tryAgain.setVisible(True) - self.stopInstall(autoInstall = True) - self.autoTimer.start(1000) # 1秒自动重试 - else: - self.statusBar.showMessage("PortError: " + str(stateOrError), 5000) - self.stopInstall() - elif type(stateOrError) == IspError: - if self.autoCheck.isChecked(): - self.statusBar.showMessage("IspError: " + str(stateOrError)) - self.tryAgainLabel.setVisible(True) - self.tryAgain.setVisible(True) - self.stopInstall(autoInstall = True) - self.autoTimer.start(int(self.autoTime.text()) * 1000) - else: - self.statusBar.showMessage("IspError: " + str(stateOrError), 5000) - self.stopInstall() - elif type(stateOrError) == formatError: - self.statusBar.showMessage("HexError: " + str(stateOrError), 5000) - self.stopInstall() - else: - self.statusBar.showMessage("Error: " + str(stateOrError), 5000) - self.stopInstall() + def installFile(self, notFromButton=True): + """ 开始安装 """ + print("-------------------开始安装-------------------") + # for port in QSerialPortInfo.availablePorts(): + # print('port.description===', port.portName()) + # print('port.description===', port.description()) + # info = QSerialPortInfo(port) + # print(info.productIdentifier()) + # print(info.vendorIdentifier()) - self.task.isWork = False - self.task.wait(100) - self.task = None + if self.file.text() == "": + if not notFromButton: + self.selectFile() + self.installFile(True) + else: + if self.autoTimer.remainingTime() > 0: + self.autoTimer.stop() + + port_list = QSerialPortInfo.availablePorts() + if len(port_list) <= 0: + QtWidgets.QMessageBox.about(self, "提示", "没有可用的串口!") + return + + port = port_list[0] + info = QSerialPortInfo(port) + + port_Name = None + baud_rate = None + if self.manualRadio.isChecked(): + if self.portCombo.currentData() is None: + QtWidgets.QMessageBox.about(self, "提示", "请选择串口设备") + return + else: + port_Name = self.portCombo.currentData() + baud_rate = int(self.baudCombo.currentText()) + + else: + port_Name = info.portName() + baud_rate = QtSerialPort.QSerialPort.Baud115200 + + print("port:", port_Name) + print("baudCombo:", baud_rate) + print(info.portName()) + print(info.description()) + print(info.productIdentifier()) + print(info.vendorIdentifier()) + if info.productIdentifier() == 0: + self.autoTimer.start(1000) + pass + + if info.vendorIdentifier() == 1155: + print("------命令开启DFU模式 start------") + self.statusBar.showMessage("Serial to DFU...") + serial = QtSerialPort.QSerialPort(self) + serial.setPortName(port_Name) + serial.setBaudRate(baud_rate) + serial.setDataBits(QtSerialPort.QSerialPort.Data8) + serial.setParity(QtSerialPort.QSerialPort.NoParity) + serial.setStopBits(QtSerialPort.QSerialPort.OneStop) + serial.setFlowControl(QtSerialPort.QSerialPort.NoFlowControl) + + if not serial.open(QtCore.QIODevice.ReadWrite): + # QtWidgets.QMessageBox.about(self, "提示", "无法打开串口!") + return + + data = bytes("M9999\r", encoding='utf-8') + data = QtCore.QByteArray(data) + serial.write(data) + print("------命令开启DFU模式 end------") + self.statusBar.showMessage("Serial to dfu...") + + self.task = DFUse(self, self.file.text(), self.progressUpdate) + self.task.stateCallback[str].connect(self.stateUpdate) + self.task.stateCallback[Exception].connect(self.stateUpdate) + self.task.finished.connect(self.autoAction) # 检查是否自动烧写,并启动。 + # self.task.start() + + elif info.vendorIdentifier() != 0: + self.task = stk500v2Thread(self, self.portCombo.currentData(), int(self.baudCombo.currentText()), + self.file.text(), self.progressUpdate) + self.task.stateCallback[str].connect(self.stateUpdate) + self.task.stateCallback[Exception].connect(self.stateUpdate) + self.task.finished.connect(self.autoAction) + self.task.start() + + # 开始烧录,刷新UI + # self.statusBar.showMessage(" ") + self.portBox.setDisabled(True) + self.fileBox.setDisabled(True) + self.installBtn.setDisabled(True) + self.stopBtn.setEnabled(True) + self.progress.show() + self.resize() + + def stopInstall(self, succeed=False, autoInstall=False): + """ 停止自动安装 """ + if autoInstall: + self.progress.reset() + else: + self.portBox.setEnabled(True) + self.fileBox.setEnabled(True) + self.installBtn.setEnabled(True) + self.stopBtn.setDisabled(True) + self.progress.reset() + self.progress.hide() + self.tryAgain.setText("0") + self.resize() + + if succeed: + self.countSuccess.setText(str(int(self.countSuccess.text()) + 1)) + self.task = None + else: + # if self.autoTimer.remainingTime() != -1: + # self.autoTimer.stop() + if self.task is not None and self.task.isRunning(): + self.task.finished.disconnect() + if self.task.isReady(): + self.countFailure.setText(str(int(self.countFailure.text()) + 1)) + + if self.task.isInterruptionRequested(): + pass + else: + # self.task.requestInterruption() + self.task.terminate() + self.statusBar.clearMessage() + # self.task = None + else: + self.statusBar.clearMessage() + + def autoAction(self): + """ 上一个任务结束后,开启新的烧录任务 """ + self.task = None + self.statusBar.showMessage("Done!") + self.stopInstall(True, self.autoCheck.isChecked()) + + # 开启自动安装 + if self.autoCheck.isChecked(): + if self.autoTimer.remainingTime() > 0: + self.autoTimer.stop() + self.autoTimer.start(int(self.autoTime.text()) * 1000) + + def resize(self): + self.setFixedHeight(self.sizeHint().height()) + + def progressUpdate(self, cur, total): + """ 进度条 """ + self.progress.setMaximum(total) + self.progress.setValue(cur) + + def stateUpdate(self, stateOrError): + """ 安装状态 """ + self.tryAgainLabel.setHidden(True) + self.tryAgain.setHidden(True) + if self.task.isReady(): + self.tryAgain.setText("0") + + print(stateOrError, str) + if type(stateOrError) == str: + print("222---") + # self.statusBar.setStyleSheet("QStatusBar::item { border: 0 } QLabel {color: red; font-weight: bold}") + self.statusBar.setStyleSheet( + "QStatusBar {font-weight: bold; color: black} QStatusBar::item { border: 0 } QLabel {font-weight: bold}") + if self.task is not None and not self.task.isWork and not self.autoCheck.isChecked(): + print("statusBar---") + self.statusBar.showMessage(stateOrError, 3000) + else: + print("else---") + self.statusBar.showMessage(stateOrError) + + else: + print("333---") + self.task.requestInterruption() + self.statusBar.setStyleSheet( + "QStatusBar {font-weight: bold; color: red} QStatusBar::item { border: 0 } QLabel {font-weight: bold; color: red}") + + if type(stateOrError) == portError: + if (stateOrError.value in [portError.errorInvalid, portError.errorBusy]) and ( + int(self.tryAgain.text()) < 20): + self.statusBar.showMessage("PortError: " + str(stateOrError)) + self.tryAgain.setText(str(int(self.tryAgain.text()) + 1)) + self.tryAgainLabel.setVisible(True) + self.tryAgain.setVisible(True) + self.stopInstall(autoInstall=True) + self.autoTimer.start(1000) # 1秒自动重试 + else: + self.statusBar.showMessage("PortError: " + str(stateOrError), 5000) + self.stopInstall() + elif type(stateOrError) == IspError: + if self.autoCheck.isChecked(): + self.statusBar.showMessage("IspError: " + str(stateOrError)) + self.tryAgainLabel.setVisible(True) + self.tryAgain.setVisible(True) + self.stopInstall(autoInstall=True) + self.autoTimer.start(int(self.autoTime.text()) * 1000) + else: + self.statusBar.showMessage("IspError: " + str(stateOrError), 5000) + self.stopInstall() + elif type(stateOrError) == formatError: + self.statusBar.showMessage("HexError: " + str(stateOrError), 5000) + self.stopInstall() + else: + self.statusBar.showMessage("Error: " + str(stateOrError), 5000) + self.stopInstall() + print("1111---") + + self.task.isWork = False + self.task.wait(100) + self.task = None if __name__ == '__main__': - app = QApplication(sys.argv) - win = mainWindow() - win.show() + app = QApplication(sys.argv) - if(len(sys.argv) > 1): # 关联hex文件自动安装 - win.portUpdate() - win.file.setText(sys.argv[1]) - win.installBtn.click() + win = mainWindow() + win.show() - sys.exit(app.exec()) + if len(sys.argv) > 1: # 关联hex文件自动安装 + win.portUpdate() + win.file.setText(sys.argv[1]) + win.installBtn.click() + + sys.exit(app.exec()) diff --git a/src/pydfu.py b/src/pydfu.py new file mode 100644 index 0000000..98f0b81 --- /dev/null +++ b/src/pydfu.py @@ -0,0 +1,856 @@ +#!/usr/bin/env python +# This file is part of the OpenMV project. +# Copyright (c) 2013/2014 Ibrahim Abdelkader +# This work is licensed under the MIT license, see the file LICENSE for +# details. + +"""This module implements enough functionality to program the STM32F4xx over +DFU, without requiring dfu-util. + +See app note AN3156 for a description of the DFU protocol. +See document UM0391 for a dscription of the DFuse file. +""" + +from __future__ import print_function + +import argparse +import collections +import inspect +import re +import struct +import sys, time +import usb.core +import usb.util +import zlib + +# USB request __TIMEOUT +__TIMEOUT = 4000 + +# DFU commands +__DFU_DETACH = 0 +__DFU_DNLOAD = 1 +__DFU_UPLOAD = 2 +__DFU_GETSTATUS = 3 +__DFU_CLRSTATUS = 4 +__DFU_GETSTATE = 5 +__DFU_ABORT = 6 + +# DFU status +__DFU_STATE_APP_IDLE = 0x00 +__DFU_STATE_APP_DETACH = 0x01 +__DFU_STATE_DFU_IDLE = 0x02 +__DFU_STATE_DFU_DOWNLOAD_SYNC = 0x03 +__DFU_STATE_DFU_DOWNLOAD_BUSY = 0x04 +__DFU_STATE_DFU_DOWNLOAD_IDLE = 0x05 +__DFU_STATE_DFU_MANIFEST_SYNC = 0x06 +__DFU_STATE_DFU_MANIFEST = 0x07 +__DFU_STATE_DFU_MANIFEST_WAIT_RESET = 0x08 +__DFU_STATE_DFU_UPLOAD_IDLE = 0x09 +__DFU_STATE_DFU_ERROR = 0x0A + +_DFU_DESCRIPTOR_TYPE = 0x21 + +__DFU_STATUS_STR = { + __DFU_STATE_APP_IDLE: "STATE_APP_IDLE", + __DFU_STATE_APP_DETACH: "STATE_APP_DETACH", + __DFU_STATE_DFU_IDLE: "STATE_DFU_IDLE", + __DFU_STATE_DFU_DOWNLOAD_SYNC: "STATE_DFU_DOWNLOAD_SYNC", + __DFU_STATE_DFU_DOWNLOAD_BUSY: "STATE_DFU_DOWNLOAD_BUSY", + __DFU_STATE_DFU_DOWNLOAD_IDLE: "STATE_DFU_DOWNLOAD_IDLE", + __DFU_STATE_DFU_MANIFEST_SYNC: "STATE_DFU_MANIFEST_SYNC", + __DFU_STATE_DFU_MANIFEST: "STATE_DFU_MANIFEST", + __DFU_STATE_DFU_MANIFEST_WAIT_RESET: "STATE_DFU_MANIFEST_WAIT_RESET", + __DFU_STATE_DFU_UPLOAD_IDLE: "STATE_DFU_UPLOAD_IDLE", + __DFU_STATE_DFU_ERROR: "STATE_DFU_ERROR", +} + +# USB device handle +__dev = None + +# Configuration descriptor of the device +__cfg_descr = None + +__verbose = None + +# USB DFU interface +__DFU_INTERFACE = 0 + +# Python 3 deprecated getargspec in favour of getfullargspec, but +# Python 2 doesn't have the latter, so detect which one to use +getargspec = getattr(inspect, "getfullargspec", inspect.getargspec) + +if "length" in getargspec(usb.util.get_string).args: + # PyUSB 1.0.0.b1 has the length argument + def get_string(dev, index): + return usb.util.get_string(dev, 255, index) + + +else: + # PyUSB 1.0.0.b2 dropped the length argument + def get_string(dev, index): + return usb.util.get_string(dev, index) + + +def find_dfu_cfg_descr(descr): + if len(descr) == 9 and descr[0] == 9 and descr[1] == _DFU_DESCRIPTOR_TYPE: + nt = collections.namedtuple( + "CfgDescr", + [ + "bLength", + "bDescriptorType", + "bmAttributes", + "wDetachTimeOut", + "wTransferSize", + "bcdDFUVersion", + ], + ) + return nt(*struct.unpack("= 5: + # break + + if not devices: + raise ValueError("No DFU device found") + if len(devices) > 1: + raise ValueError("Multiple DFU devices found") + __dev = devices[0] + __dev.set_configuration() + + # Claim DFU interface + usb.util.claim_interface(__dev, __DFU_INTERFACE) + + # Find the DFU configuration descriptor, either in the device or interfaces + __cfg_descr = None + for cfg in __dev.configurations(): + __cfg_descr = find_dfu_cfg_descr(cfg.extra_descriptors) + if __cfg_descr: + break + for itf in cfg.interfaces(): + __cfg_descr = find_dfu_cfg_descr(itf.extra_descriptors) + if __cfg_descr: + break + + # Get device into idle state + for attempt in range(4): + status = get_status() + if status == __DFU_STATE_DFU_IDLE: + break + elif status == __DFU_STATE_DFU_DOWNLOAD_IDLE or status == __DFU_STATE_DFU_UPLOAD_IDLE: + abort_request() + else: + clr_status() + + +def abort_request(): + """Sends an abort request.""" + __dev.ctrl_transfer(0x21, __DFU_ABORT, 0, __DFU_INTERFACE, None, __TIMEOUT) + + +def clr_status(): + """Clears any error status (perhaps left over from a previous session).""" + __dev.ctrl_transfer(0x21, __DFU_CLRSTATUS, 0, __DFU_INTERFACE, None, __TIMEOUT) + + +def get_status(): + """Get the status of the last operation.""" + stat = __dev.ctrl_transfer(0xA1, __DFU_GETSTATUS, 0, __DFU_INTERFACE, 6, 20000) + + # firmware can provide an optional string for any error + if stat[5]: + message = get_string(__dev, stat[5]) + if message: + print(message) + + return stat[4] + + +def check_status(stage, expected): + status = get_status() + if status != expected: + raise SystemExit("DFU: %s failed (%s)" % (stage, __DFU_STATUS_STR.get(status, status))) + + +def mass_erase(): + """Performs a MASS erase (i.e. erases the entire device).""" + # Send DNLOAD with first byte=0x41 + __dev.ctrl_transfer(0x21, __DFU_DNLOAD, 0, __DFU_INTERFACE, "\x41", __TIMEOUT) + + # Execute last command + check_status("erase", __DFU_STATE_DFU_DOWNLOAD_BUSY) + + # Check command state + check_status("erase", __DFU_STATE_DFU_DOWNLOAD_IDLE) + + +def page_erase(addr): + """Erases a single page.""" + if __verbose: + print("Erasing page: 0x%x..." % (addr)) + + # Send DNLOAD with first byte=0x41 and page address + buf = struct.pack(" 0: + write_size = size + if not mass_erase_used: + for segment in mem_layout: + if addr >= segment["addr"] and addr <= segment["last_addr"]: + # We found the page containing the address we want to + # write, erase it + page_size = segment["page_size"] + page_addr = addr & ~(page_size - 1) + if addr + write_size > page_addr + page_size: + write_size = page_addr + page_size - addr + page_erase(page_addr) + break + print("addr===", addr, "\nwrite_size===", write_size, "\n", progress, "elem_addr===", elem_addr, + "elem_size===", elem_size) + write_memory(addr, data[:write_size], progress, elem_addr, elem_size) + data = data[write_size:] + addr += write_size + size -= write_size + if progress: + progress(elem_addr, addr - elem_addr, elem_size) + + +def cli_progress(addr, offset, size): + """Prints a progress report suitable for use on the command line.""" + width = 25 + done = offset * width // size + print( + "\r0x{:08x} {:7d} [{}{}] {:3d}% ".format( + addr, size, "=" * done, " " * (width - done), offset * 100 // size + ), + end="", + ) + try: + sys.stdout.flush() + except OSError: + pass # Ignore Windows CLI "WinError 87" on Python 3.6 + if offset == size: + print("") + + +def main(): + """Test program for verifying this files functionality.""" + global __verbose + # Parse CMD args + parser = argparse.ArgumentParser(description="DFU Python Util") + parser.add_argument( + "-l", "--list", help="list available DFU devices", action="store_true", default=False + ) + # parser.add_argument("--vid", help="USB Vendor ID", type=lambda x: int(x, 0), default=None) + # parser.add_argument("--pid", help="USB Product ID", type=lambda x: int(x, 0), default=None) + parser.add_argument( + "-m", "--mass-erase", help="mass erase device", action="store_true", default=False + ) + # parser.add_argument( + # "-u", "--upload", help="read file from DFU device", dest="path", default=False + # ) + + parser.add_argument("--vid", help="USB Vendor ID", type=lambda x: int(x, 0), default=0x0483) + parser.add_argument("--pid", help="USB Product ID", type=lambda x: int(x, 0), default=0xdf11) + # parser.add_argument( + # "-u", "--upload", help="read file from DFU device", dest="path", default="C:\\Users\\User\\Documents\\stm32-test\\board-STM32F103-Mini\\usbdfu.bin" + # ) + parser.add_argument( + "-u", "--upload", help="read file from DFU device", dest="path", + default="C:\\Users\\User\\Downloads\\stm32loader-master\\max.dfu" + ) + + parser.add_argument("-x", "--exit", help="Exit DFU", action="store_true", default=False) + parser.add_argument( + "-v", "--verbose", help="increase output verbosity", action="store_true", default=False + ) + args = parser.parse_args() + + __verbose = args.verbose + + kwargs = {} + if args.vid: + kwargs["idVendor"] = args.vid + + if args.pid: + kwargs["idProduct"] = args.pid + + if args.list: + list_dfu_devices(**kwargs) + return + + init(**kwargs) + + # 测试写入其他类型文件 + # file = "C:\\Users\\User\\Downloads\\stm32loader-master\\pro.bin" + # data = open(file, 'rb').read() + # + # write_page(data, 0) + # return + command_run = False + if args.mass_erase: + print("Mass erase...") + mass_erase() + command_run = True + + if args.path: + elements = read_dfu_file(args.path) + if not elements: + print("No data in dfu file") + return + print("Writing memory...") + write_elements(elements, args.mass_erase, progress=cli_progress) + + print("Exiting DFU...") + exit_dfu() + command_run = True + + if args.exit: + print("Exiting DFU...") + exit_dfu() + command_run = True + + if command_run: + print("Finished") + else: + print("No command specified") + + +# if __name__ == "__main__": +# main() + +from avr_isp.errorBase import portError + +from PyQt5.QtCore import QIODevice, QThread, pyqtSignal +from PyQt5.QtSerialPort import QSerialPort, QSerialPortInfo +from PyQt5.QtWidgets import QApplication + + +class DFUTool(QThread): + print("DFUTool(QThread)") + stateCallback = pyqtSignal([str], [Exception]) + progressCallback = pyqtSignal(int, int) + + def __init__(self, parent, filename, callback=None): + super(DFUTool, self).__init__() + print("----------------------") + self.parent = parent + self.filename = filename + self.progress = callback + self.isWork = False + self.finished.connect(self.done) + + global __verbose + # Parse CMD args + parser = argparse.ArgumentParser(description="DFU Python Util") + parser.add_argument( + "-l", "--list", help="list available DFU devices", action="store_true", default=False + ) + # parser.add_argument("--vid", help="USB Vendor ID", type=lambda x: int(x, 0), default=None) + # parser.add_argument("--pid", help="USB Product ID", type=lambda x: int(x, 0), default=None) + parser.add_argument( + "-m", "--mass-erase", help="mass erase device", action="store_true", default=False + ) + # parser.add_argument( + # "-u", "--upload", help="read file from DFU device", dest="path", default=False + # ) + + parser.add_argument("--vid", help="USB Vendor ID", type=lambda x: int(x, 0), default=0x0483) + parser.add_argument("--pid", help="USB Product ID", type=lambda x: int(x, 0), default=0xdf11) + # parser.add_argument( + # "-u", "--upload", help="read file from DFU device", dest="path", default="C:\\Users\\User\\Documents\\stm32-test\\board-STM32F103-Mini\\usbdfu.bin" + # ) + parser.add_argument( + "-u", "--upload", help="read file from DFU device", dest="path", + default=self.filename + ) + + parser.add_argument("-x", "--exit", help="Exit DFU", action="store_true", default=False) + parser.add_argument( + "-v", "--verbose", help="increase output verbosity", action="store_true", default=False + ) + self.args = parser.parse_args() + + __verbose = self.args.verbose + + kwargs = {} + if self.args.vid: + kwargs["idVendor"] = self.args.vid + + if self.args.pid: + kwargs["idProduct"] = self.args.pid + + def w(state): + if state is False: + self.stateCallback[str].emit("Done!") + self.quit() + return + + try: + self.thread.exit() + init(**kwargs) + self.start() + except Exception as err: + print("----------------------------======================") + print(err) + print(1) + + self.thread = DFUSearch(self, kwargs=kwargs) + self.thread.searchResults.connect(w) # 异步完成后执行函数w + self.thread.start() + + def cl_progress(self, addr, offset, size): + """Prints a progress report suitable for use on the command line.""" + self.progressCallback.emit(offset, size) + + def run(self): + if self.progress is not None: + self.progressCallback.connect(self.progress) + print(self.args) + self.stateCallback[str].emit("read fils") + with open(self.args.path, "rb") as fin: + dfu_file = fin.read() + + if dfu_file is None: + print("file is None") + return + + self.isWork = True + + elem = {"addr": 134217728, "size": len(dfu_file), "data": dfu_file} + + try: + self.stateCallback[str].emit("Write file...") + write_elements([elem], self.args.mass_erase, progress=self.cl_progress) + except Exception as err: + if self.isInterruptionRequested(): + print("int") + else: + if self.parent is not None: + self.stateCallback[Exception].emit(err) + while self.isWork: + pass # 等待父进程处理异常 + else: + raise err + self.isWork = False + finally: + self.isWork = False + self.stateCallback[str].emit("Exiting DFU...") + print("Exiting DFU...") + exit_dfu() + # 退出DFU模式 + print("Done...") + self.ude = None + + def isReady(self): + return True + try: + status = get_status() + print(status) + + if status[1] == 0x02: + return True + return False + + except Exception as err: + print("isReady", err) + return False + + def done(self): + print("结束烧录程序") + if self.parent is not None: + if self.isWork: + self.isWork = False + self.stateCallback[str].emit(self.tr("Done!")) + else: + print("Success!") + else: + print("Failure!") + + def terminate(self): + self.requestInterruption() + return super(DFUTool, self).terminate() + + +class DFUSearch(QThread): + searchResults = pyqtSignal(bool) # 信号 + + def __init__(self, parent=None, kwargs=None): + super(DFUSearch, self).__init__() + self.kwargs = kwargs + + def __del__(self): + self.wait() + + def run(self): + # 耗时内容 + devices = get_dfu_devices(**self.kwargs) + # Waiting 2 seconds before trying again..." + attempts = 0 + while not devices: + devices = get_dfu_devices(**self.kwargs) + attempts += 1 + print("搜索DFU设备", attempts) + if attempts > 10: + self.searchResults.emit(False) + self.quit() + return + time.sleep(2) + + self.searchResults.emit(True) + + def terminate(self): + self.requestInterruption() + return super(DFUSearch, self).terminate() + + +if __name__ == '__main__': + # main() + app = QApplication(sys.argv) + task = DFUTool(None, None) + # task = stk500v2Thread(None, "COM4", 115200, "D:/OneDrive/Desktop/test.hex") + try: + task.start() + except Exception as e: + print(e) + + sys.exit(app.exec()) diff --git a/打包.bat b/打包.bat index c753ba7..8cec051 100644 --- a/打包.bat +++ b/打包.bat @@ -1,2 +1,2 @@ -call "%USERPROFILE%\.virtualenvs\FirmwareInstaller\Scripts\activate.bat" +rem call "%USERPROFILE%\.virtualenvs\FirmwareInstaller\Scripts\activate.bat" @pyinstaller --workpath Package/build --distpath Package/dist -y Installer.spec \ No newline at end of file