STM32 DFU 烧录完善

This commit is contained in:
边瑞峰 2022-05-26 14:20:46 +08:00
parent f564adb140
commit ba98c39c3d
14 changed files with 1023 additions and 2482 deletions

5
.gitignore vendored
View File

@ -202,4 +202,7 @@ local.properties
/Package/Firmware Installer Setup.exe
/Package/Firmware Installer-cache/
.idea
.Package/
.DS_Store
.idea/
firmwareInstaller.bak.py

View File

@ -1,8 +0,0 @@
<?xml version="1.0" encoding="UTF-8"?>
<module type="PYTHON_MODULE" version="4">
<component name="NewModuleRootManager">
<content url="file://$MODULE_DIR$" />
<orderEntry type="inheritedJdk" />
<orderEntry type="sourceFolder" forTests="false" />
</component>
</module>

4
.idea/misc.xml generated
View File

@ -1,4 +0,0 @@
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="ProjectRootManager" version="2" project-jdk-name="Python 3.7 (FirmwareInstaller)" project-jdk-type="Python SDK" />
</project>

View File

@ -3,12 +3,14 @@
block_cipher = None
binaries = [
('C:\\Windows\\System32\\libusb0.dll', '.'),
]
a = Analysis(['src\\firmwareInstaller.py'],
pathex=['D:\\Work\\Project\\FirmwareInstaller'],
binaries=[],
binaries=binaries,
datas=[('src\\ico.ico', '.')],
hiddenimports=[],
hiddenimports=['usb'],
hookspath=[],
hooksconfig={},
runtime_hooks=[],

File diff suppressed because it is too large Load Diff

View File

@ -6,8 +6,8 @@ VSVersionInfo(
ffi=FixedFileInfo(
# filevers and prodvers should be always a tuple with four items: (1, 2, 3, 4)
# Set not needed items to zero 0.
filevers=(3, 0, 0, 1),
prodvers=(3, 0, 0, 1),
filevers=(3, 0, 0, 2),
prodvers=(3, 0, 0, 2),
# Contains a bitmask that specifies the valid bits 'flags'r
mask=0x3f,
# Contains a bitmask that specifies the Boolean attributes of the file.
@ -31,13 +31,13 @@ VSVersionInfo(
u'040904B0',
[StringStruct(u'CompanyName', u'CreatBot'),
StringStruct(u'FileDescription', u'Hex Installer'),
StringStruct(u'FileVersion', u'3.0.0.1'),
StringStruct(u'FileVersion', u'3.0.0.2'),
StringStruct(u'InternalName', u'Firmware Installer'),
StringStruct(u'LegalCopyright', u'Copyright(C) CreatBot 2021. All rights reserved'),
StringStruct(u'LegalCopyright', u'Copyright(C) CreatBot 2022. All rights reserved'),
StringStruct(u'LegalTrademarks', u'CreatBot'),
StringStruct(u'OriginalFilename', u'Installer.exe'),
StringStruct(u'ProductName', u'Firmware Installer'),
StringStruct(u'ProductVersion', u'3.0.0.1')])
StringStruct(u'ProductVersion', u'3.0.0.2')])
]),
VarFileInfo([VarStruct(u'Translation', [1033, 1200])])
]

Binary file not shown.

View File

@ -2,4 +2,4 @@ from .chipDB import *
from .errorBase import portError
from .intelHex import *
from .ispBase import *
from .stk500v2 import *
from .stk500v2 import *

View File

@ -4,27 +4,27 @@ Created on 2017年8月15日
@author: CreatBot-SW
'''
#===============================================================================
# ===============================================================================
# Database of AVR chips for avr_isp programming. Contains signatures and flash sizes from the AVR datasheets.
# To support more chips add the relevant data to the avrChipDB list.
#===============================================================================
# ===============================================================================
avrChipDB = {
'ATMega1280': {
'signature': [0x1E, 0x97, 0x03],
'pageSize': 128,
'pageCount': 512,
},
'ATMega2560': {
'signature': [0x1E, 0x98, 0x01],
'pageSize': 128,
'pageCount': 1024,
},
'ATMega1280': {
'signature': [0x1E, 0x97, 0x03],
'pageSize': 128,
'pageCount': 512,
},
'ATMega2560': {
'signature': [0x1E, 0x98, 0x01],
'pageSize': 128,
'pageCount': 1024,
},
}
def getChipFromDB(sig):
for chip in avrChipDB.values():
if chip['signature'] == sig:
return chip
return False
for chip in avrChipDB.values():
if chip['signature'] == sig:
return chip
return False

View File

@ -5,62 +5,62 @@ Created on 2017年8月15日
'''
import io
#===============================================================================
# ===============================================================================
# Module to read intel hex files into binary data blobs.
# IntelHex files are commonly used to distribute firmware
# See: http://en.wikipedia.org/wiki/Intel_HEX
#===============================================================================
# ===============================================================================
def readHex(filename):
"""
Read an verify an intel hex file. Return the data as an list of bytes.
"""
data = []
extraAddr = 0
f = io.open(filename, "r")
for line in f:
line = line.strip()
if line[0] != ':':
raise formatError("Hex file must start with ':' @ " + line)
recLen = int(line[1:3], 16)
addr = int(line[3:7], 16) + extraAddr
recType = int(line[7:9], 16)
if len(line) != recLen * 2 + 11:
raise formatError("Length error in hex file @ " + line)
checkSum = 0
for i in range(0, recLen + 5):
checkSum += int(line[i * 2 + 1:i * 2 + 3], 16)
checkSum &= 0xFF
if checkSum != 0:
raise formatError("Checksum error in hex file @ " + line)
"""
Read an verify an intel hex file. Return the data as an list of bytes.
"""
data = []
extraAddr = 0
f = io.open(filename, "r")
for line in f:
line = line.strip()
if line[0] != ':':
raise formatError("Hex file must start with ':' @ " + line)
recLen = int(line[1:3], 16)
addr = int(line[3:7], 16) + extraAddr
recType = int(line[7:9], 16)
if len(line) != recLen * 2 + 11:
raise formatError("Length error in hex file @ " + line)
checkSum = 0
for i in range(0, recLen + 5):
checkSum += int(line[i * 2 + 1:i * 2 + 3], 16)
checkSum &= 0xFF
if checkSum != 0:
raise formatError("Checksum error in hex file @ " + line)
if recType == 0: # Data record
while len(data) < addr + recLen:
data.append(0)
for i in range(0, recLen):
data[addr + i] = int(line[i * 2 + 9:i * 2 + 11], 16)
elif recType == 1: # End Of File record
pass
elif recType == 2: # Extended Segment Address Record
extraAddr = int(line[9:13], 16) * 16
elif recType == 3: # Start Segment Address Record
raise formatError("Dont support record type 03")
elif recType == 4: # Extended Linear Address Record
extraAddr = int(line[9:13], 16) << 16
elif recType == 5: # Start Linear Address Record
raise formatError("Dont support record type 05")
else:
print(recType, recLen, addr, checkSum, line)
f.close()
return data
if recType == 0: # Data record
while len(data) < addr + recLen:
data.append(0)
for i in range(0, recLen):
data[addr + i] = int(line[i * 2 + 9:i * 2 + 11], 16)
elif recType == 1: # End Of File record
pass
elif recType == 2: # Extended Segment Address Record
extraAddr = int(line[9:13], 16) * 16
elif recType == 3: # Start Segment Address Record
raise formatError("Dont support record type 03")
elif recType == 4: # Extended Linear Address Record
extraAddr = int(line[9:13], 16) << 16
elif recType == 5: # Start Linear Address Record
raise formatError("Dont support record type 05")
else:
print(recType, recLen, addr, checkSum, line)
f.close()
return data
class formatError(Exception):
def __init__(self, value):
self.value = value
def __str__(self):
return repr(self.value)
def __init__(self, value):
self.value = value
def __str__(self):
return repr(self.value)

View File

@ -5,74 +5,75 @@ Created on 2017年8月15日
'''
from avr_isp import chipDB
#===============================================================================
# ===============================================================================
# General interface for Isp based AVR programmers.
# The ISP AVR programmer can load firmware into AVR chips. Which are commonly used on 3D printers.
#
# Needs to be subclassed to support different programmers.
# Currently only the stk500v2 subclass exists.
#===============================================================================
# ===============================================================================
class IspBase():
"""
Base class for ISP based AVR programmers.
Functions in this class raise an IspError when something goes wrong.
"""
"""
Base class for ISP based AVR programmers.
Functions in this class raise an IspError when something goes wrong.
"""
def programChip(self, flashData):
""" Program a chip with the given flash data. """
self.curExtAddr = -1
self.chip = chipDB.getChipFromDB(self.getSignature())
if not self.chip:
raise IspError("Chip with signature: " + str(self.getSignature()) + "not found")
self.chipErase()
def programChip(self, flashData):
""" Program a chip with the given flash data. """
self.curExtAddr = -1
self.chip = chipDB.getChipFromDB(self.getSignature())
if not self.chip:
raise IspError("Chip with signature: " + str(self.getSignature()) + "not found")
self.chipErase()
print("Flashing %i bytes" % len(flashData))
self.writeFlash(flashData)
print("Verifying %i bytes" % len(flashData))
self.verifyFlash(flashData)
print("Flashing %i bytes" % len(flashData))
self.writeFlash(flashData)
print("Verifying %i bytes" % len(flashData))
self.verifyFlash(flashData)
def getSignature(self):
"""
Get the AVR signature from the chip. This is a 3 byte array which describes which chip we are connected to.
This is important to verify that we are programming the correct type of chip and that we use proper flash block sizes.
"""
sig = []
sig.append(self.sendISP([0x30, 0x00, 0x00, 0x00])[3])
sig.append(self.sendISP([0x30, 0x00, 0x01, 0x00])[3])
sig.append(self.sendISP([0x30, 0x00, 0x02, 0x00])[3])
return sig
def getSignature(self):
"""
Get the AVR signature from the chip. This is a 3 byte array which describes which chip we are connected to.
This is important to verify that we are programming the correct type of chip and that we use proper flash block sizes.
"""
sig = []
sig.append(self.sendISP([0x30, 0x00, 0x00, 0x00])[3])
sig.append(self.sendISP([0x30, 0x00, 0x01, 0x00])[3])
sig.append(self.sendISP([0x30, 0x00, 0x02, 0x00])[3])
return sig
def sendISP(self, data):
"""
Send data to chip, needs to be implemented in a subclass.
"""
raise IspError("Called undefined sendISP")
def sendISP(self, data):
"""
Send data to chip, needs to be implemented in a subclass.
"""
raise IspError("Called undefined sendISP")
def chipErase(self):
"""
Do a full chip erase, clears all data, and lockbits.
"""
self.sendISP([0xAC, 0x80, 0x00, 0x00])
def chipErase(self):
"""
Do a full chip erase, clears all data, and lockbits.
"""
self.sendISP([0xAC, 0x80, 0x00, 0x00])
def writeFlash(self, flashData):
"""
Write the flash data, needs to be implemented in a subclass.
"""
raise IspError("Called undefined writeFlash")
def writeFlash(self, flashData):
"""
Write the flash data, needs to be implemented in a subclass.
"""
raise IspError("Called undefined writeFlash")
def verifyFlash(self, flashData):
"""
Verify the flash data, needs to be implemented in a subclass.
"""
raise IspError("Called undefined verifyFlash")
def verifyFlash(self, flashData):
"""
Verify the flash data, needs to be implemented in a subclass.
"""
raise IspError("Called undefined verifyFlash")
class IspError(Exception):
def __init__(self, value):
self.value = value
def __init__(self, value):
self.value = value
def __str__(self):
return repr(self.value)
def __str__(self):
return repr(self.value)

View File

@ -8,14 +8,14 @@ Created on 2017年8月15日
# The STK500v2 protocol is used by the ArduinoMega2560 and a few other Arduino platforms to load firmware.
# ===============================================================================
import struct, sys
import time
import struct
import sys
from PyQt5.QtCore import QIODevice, QThread, pyqtSignal
from PyQt5.QtSerialPort import QSerialPort, QSerialPortInfo
from PyQt5.QtWidgets import QApplication
from avr_isp import intelHex, ispBase
from .errorBase import portError
@ -88,10 +88,8 @@ class Stk500v2(ispBase.IspBase, QSerialPort):
loadCount = (len(flashData) + pageSize - 1) // pageSize
for i in range(0, loadCount):
self.sendMessage(
[0x13, pageSize >> 8, pageSize & 0xFF, 0xc1, 0x0a, 0x40, 0x4c, 0x20, 0x00, 0x00] + flashData[
(i * pageSize):(
i * pageSize + pageSize)])
self.sendMessage([0x13, pageSize >> 8, pageSize & 0xFF, 0xc1, 0x0a, 0x40, 0x4c, 0x20, 0x00, 0x00] + flashData[(i * pageSize):(
i * pageSize + pageSize)])
self.progressCallback.emit(i + 1, loadCount * 2)
def verifyFlash(self, flashData):
@ -191,9 +189,8 @@ class stk500v2Thread(QThread):
self.finished.connect(self.done)
def run(self):
self.isWork = True
try:
self.isWork = True
self.programmer = Stk500v2()
if self.callback is not None:
self.programmer.progressCallback.connect(self.callback)
@ -201,26 +198,24 @@ class stk500v2Thread(QThread):
if self.parent is None:
runProgrammer(self.port, self.speed, self.filename, self.programmer)
else:
self.stateCallback[str].emit(self.tr("Connecting.."))
self.stateCallback[str].emit(self.tr("Connecting..."))
self.msleep(200)
self.programmer.connect(self.port, self.speed)
self.stateCallback[str].emit(self.tr("Programming..."))
self.programmer.programChip(intelHex.readHex(self.filename))
except Exception as e:
if self.isInterruptionRequested():
print("Int")
else:
if self.parent is not None:
self.stateCallback[Exception].emit(e)
# while self.isWork:
# pass # 等待父进程处理异常
while self.isWork:
pass # 等待父进程处理异常
else:
raise e
self.isWork = False
finally:
self.isWork = False
if self.programmer.isConnected():
self.programmer.fastReset()
self.programmer.close()
@ -266,8 +261,7 @@ def main():
programmer.programChip(intelHex.readHex(sys.argv[2]))
else:
programmer.connect("COM4")
programmer.programChip(
intelHex.readHex("D:/OneDrive/Desktop/CreatBot F160 01 EN KTC ( AUTO_LEVELING ).hex"))
programmer.programChip(intelHex.readHex("D:/OneDrive/Desktop/CreatBot F160 01 EN KTC ( AUTO_LEVELING ).hex"))
except portError as e:
print(e.value)
print("PortError: " + str(e))

View File

@ -6,13 +6,12 @@ Created on 2017年8月15日
import os
import sys
import time
from PyQt5 import QtSerialPort, QtCore, QtWidgets
from PyQt5 import QtCore
from PyQt5.Qt import pyqtSignal
from PyQt5.QtCore import QSize, QDir, QTimer
from PyQt5.QtGui import QIcon
from PyQt5.QtSerialPort import QSerialPortInfo
from PyQt5.QtSerialPort import QSerialPortInfo, QSerialPort
from PyQt5.QtWidgets import QApplication, QVBoxLayout, QGroupBox, \
QRadioButton, QGridLayout, QWidget, QProgressBar, QStatusBar, QComboBox, QLabel, \
QHBoxLayout, QLineEdit, QPushButton, QFileDialog, QCheckBox
@ -20,8 +19,6 @@ from PyQt5.QtWidgets import QApplication, QVBoxLayout, QGroupBox, \
from avr_isp.intelHex import formatError
from avr_isp.ispBase import IspError
from avr_isp.stk500v2 import stk500v2Thread, portError
# from DfuseTool import DfuseTool as DFUse
from pydfu import DFUTool as DFUse
if getattr(sys, 'frozen', False):
@ -64,20 +61,17 @@ class mainWindow(QWidget):
def __init__(self):
super(mainWindow, self).__init__()
self.setWindowTitle(self.tr("Firmware Installer"))
self.setWindowTitle(self.tr("Firmware Installer" + " V3.1.0"))
self.setWindowIcon(QIcon(os.path.join(bundle_dir, "ico.ico")))
self.setFixedSize(QSize(480, 240))
self.setAcceptDrops(True)
self.portUpdateTimer = QTimer()
self.portUpdateTimer.timeout.connect(self.portUpdate)
self.portUpdateTimer.start(200)
self.portUpdateTimer.start(100)
self.autoTimer = QTimer()
# self.autoTimer.setSingleShot(True)
self.autoTimer.timeout.connect(self.portUpdate)
# self.autoTimer.start(1000)
self.autoTimer.timeout.connect(self.installFile)
self.task = None
self.initUI()
@ -135,7 +129,7 @@ class mainWindow(QWidget):
self.autoCheck = QCheckBox(self.tr("Auto Install"))
self.autoTimeLabel = QLabel(self.tr("Idle Time:"))
self.autoTimeLabel2 = QLabel(self.tr("s"))
self.autoTime = QLineEdit("3")
self.autoTime = QLineEdit("2")
self.autoTime.setInputMask("00")
self.autoTime.setMaximumWidth(20)
@ -174,8 +168,7 @@ class mainWindow(QWidget):
# 计数栏
self.countBar = QStatusBar()
self.countBar.setSizeGripEnabled(False)
self.countBar.setStyleSheet(
"QStatusBar::item { border: 0 } QLabel {border:0; font-size: 14px; font-weight: bold}")
self.countBar.setStyleSheet("QStatusBar::item { border: 0 } QLabel {border:0; font-size: 14px; font-weight: bold}")
countSuccessLabel = QLabel("Success: ")
countFailureLabel = QLabel("Failure: ")
self.countSuccess = countLabel("0")
@ -231,9 +224,9 @@ class mainWindow(QWidget):
self.statusBar.messageChanged.connect(self.stateClearAction)
# 默认选中
self.autoCheck.click()
self.autoCheck.click()
self.autoRadio.click()
# self.manualRadio.click()
# self.autoCheck.click()
self.fileBtn.clicked.connect(self.selectFile)
self.installBtn.clicked.connect(self.installFile)
@ -244,36 +237,24 @@ class mainWindow(QWidget):
def portUpdate(self, forceUpdate=False):
""" Auto 监听端口 """
print("search port")
if self.autoRadio.isChecked():
self.baudCombo.setCurrentText("115200")
# self.portCombo.addItems(portList())
# self.portCombo.clear()
port_list = QSerialPortInfo.availablePorts()
for port in port_list:
print(':--:', port.portName(), port.description())
if port.description() not in ["Arduino Mega 2560", "USB-SERIAL CH340"]: # 过滤2560和CH340
self.portCombo.clear()
for port in QSerialPortInfo.availablePorts():
if port.description() not in ["Arduino Mega 2560", "USB-SERIAL CH340", "USB 串行设备"]: # 过滤2560和CH340
continue
self.portCombo.addItem(port.portName() + " (" + port.description() + ")", port.portName())
if len(port_list) > 0:
port = port_list[0]
info = QSerialPortInfo(port)
print(info.productIdentifier())
if info.productIdentifier() == 0:
return
if not self.installBtn.isEnabled():
self.installFile()
portInfo = QSerialPortInfo(port)
self.portCombo.addItem(port.portName() + " (" + port.description() + ")", (port.portName(), portInfo.vendorIdentifier()))
else:
currentPortData = self.portCombo.currentData()
if forceUpdate or (currentPortData and currentPortData not in [port.portName() for port in
QSerialPortInfo.availablePorts()]):
currentPortData = self.portCombo.currentText()
if forceUpdate or (currentPortData and currentPortData not in [port.portName() for port in QSerialPortInfo.availablePorts()]):
self.portCombo.clear()
for port in QSerialPortInfo.availablePorts():
self.portCombo.addItem(port.portName() + " (" + port.description() + ")", port.portName())
self.portCombo.setCurrentIndex(self.portCombo.findData(currentPortData))
portInfo = QSerialPortInfo(port)
self.portCombo.addItem(port.portName() + " (" + port.description() + ")",
(port.portName(), portInfo.vendorIdentifier()))
self.portCombo.setCurrentIndex(self.portCombo.findText(currentPortData))
self.portCombo.setSizeAdjustPolicy(QComboBox.AdjustToContents)
self.baudCombo.setSizeAdjustPolicy(QComboBox.AdjustToContents)
@ -285,14 +266,6 @@ class mainWindow(QWidget):
if not check and self.autoTimer.remainingTime() > 0:
self.stopInstall()
if check:
# 开启自动安装的端口扫描
self.portUpdateTimer.stop()
self.autoTimer.start(int(self.autoTime.text())*1000)
else:
self.autoTimer.stop()
self.portUpdateTimer.start(200)
def autoTimeChangeAction(self):
""" 修改时间间隔 """
self.autoTime.clearFocus()
@ -329,65 +302,35 @@ class mainWindow(QWidget):
self.file.setText(hexFileDialog.selectedFiles()[0])
def installFile(self, notFromButton=True):
""" 开始安装 """
print("-------------------开始安装-------------------")
# for port in QSerialPortInfo.availablePorts():
# print('port.description===', port.portName())
# print('port.description===', port.description())
# info = QSerialPortInfo(port)
# print(info.productIdentifier())
# print(info.vendorIdentifier())
if self.portCombo.currentData() is None:
return
if self.file.text() == "":
if not notFromButton:
self.selectFile()
self.installFile(True)
else:
if self.autoTimer.remainingTime() > 0:
if self.autoTimer.remainingTime() != -1:
self.autoTimer.stop()
port_list = QSerialPortInfo.availablePorts()
if len(port_list) <= 0:
QtWidgets.QMessageBox.about(self, "提示", "没有可用的串口!")
return
self.portBox.setDisabled(True)
self.fileBox.setDisabled(True)
self.installBtn.setDisabled(True)
port = port_list[0]
info = QSerialPortInfo(port)
self.progress.show()
self.resize()
port_Name = None
baud_rate = None
if self.manualRadio.isChecked():
if self.portCombo.currentData() is None:
QtWidgets.QMessageBox.about(self, "提示", "请选择串口设备")
return
else:
port_Name = self.portCombo.currentData()
baud_rate = int(self.baudCombo.currentText())
else:
port_Name = info.portName()
baud_rate = QtSerialPort.QSerialPort.Baud115200
print("port:", port_Name)
print("baudCombo:", baud_rate)
print(info.portName())
print(info.description())
print(info.productIdentifier())
print(info.vendorIdentifier())
if info.productIdentifier() == 0:
self.autoTimer.start(1000)
pass
if info.vendorIdentifier() == 1155:
print("------命令开启DFU模式 start------")
info = self.portCombo.currentData()
if info[1] == 1155:
self.stopBtn.setEnabled(False)
self.statusBar.showMessage("Serial to DFU...")
serial = QtSerialPort.QSerialPort(self)
serial.setPortName(port_Name)
serial.setBaudRate(baud_rate)
serial.setDataBits(QtSerialPort.QSerialPort.Data8)
serial.setParity(QtSerialPort.QSerialPort.NoParity)
serial.setStopBits(QtSerialPort.QSerialPort.OneStop)
serial.setFlowControl(QtSerialPort.QSerialPort.NoFlowControl)
serial = QSerialPort(self)
serial.setPortName(info[0])
serial.setBaudRate(QSerialPort.Baud115200)
serial.setDataBits(QSerialPort.Data8)
serial.setParity(QSerialPort.NoParity)
serial.setStopBits(QSerialPort.OneStop)
serial.setFlowControl(QSerialPort.NoFlowControl)
if not serial.open(QtCore.QIODevice.ReadWrite):
# QtWidgets.QMessageBox.about(self, "提示", "无法打开串口!")
@ -396,25 +339,35 @@ class mainWindow(QWidget):
data = bytes("M9999\r", encoding='utf-8')
data = QtCore.QByteArray(data)
serial.write(data)
print("------命令开启DFU模式 end------")
self.statusBar.showMessage("Serial to dfu...")
self.task = DFUse(self, self.file.text(), self.progressUpdate)
self.task = DFUse(self, self.portCombo.currentData()[0], int(self.baudCombo.currentText()), self.file.text(),
self.progressUpdate)
self.task.stateCallback[str].connect(self.stateUpdate)
self.task.stateCallback[Exception].connect(self.stateUpdate)
self.task.finished.connect(self.autoAction) # 检查是否自动烧写,并启动。
# self.task.start()
elif info.vendorIdentifier() != 0:
self.task = stk500v2Thread(self, self.portCombo.currentData(), int(self.baudCombo.currentText()),
elif info[1] != 0:
self.stopBtn.setEnabled(True)
self.task = stk500v2Thread(self, info[0], int(self.baudCombo.currentText()),
self.file.text(), self.progressUpdate)
self.task.stateCallback[str].connect(self.stateUpdate)
self.task.stateCallback[Exception].connect(self.stateUpdate)
self.task.finished.connect(self.autoAction)
self.task.start()
# 开始烧录刷新UI
# self.statusBar.showMessage(" ")
return
if self.file.text() == "":
if not notFromButton:
self.selectFile()
self.installFile(True)
else:
if self.autoTimer.remainingTime() != -1:
self.autoTimer.stop()
self.portBox.setDisabled(True)
self.fileBox.setDisabled(True)
self.installBtn.setDisabled(True)
@ -422,6 +375,14 @@ class mainWindow(QWidget):
self.progress.show()
self.resize()
self.task = DFUse(self, self.portCombo.currentData()[0], int(self.baudCombo.currentText()), self.file.text(),
self.progressUpdate)
self.task.stateCallback[str].connect(self.stateUpdate)
self.task.stateCallback[Exception].connect(self.stateUpdate)
self.task.finished.connect(self.autoAction)
self.task.start()
self.statusBar.showMessage(" ")
def stopInstall(self, succeed=False, autoInstall=False):
""" 停止自动安装 """
if autoInstall:
@ -440,8 +401,8 @@ class mainWindow(QWidget):
self.countSuccess.setText(str(int(self.countSuccess.text()) + 1))
self.task = None
else:
# if self.autoTimer.remainingTime() != -1:
# self.autoTimer.stop()
if self.autoTimer.remainingTime() != -1:
self.autoTimer.stop()
if self.task is not None and self.task.isRunning():
self.task.finished.disconnect()
if self.task.isReady():
@ -459,14 +420,10 @@ class mainWindow(QWidget):
def autoAction(self):
""" 上一个任务结束后,开启新的烧录任务 """
self.task = None
self.statusBar.showMessage("Done!")
self.stopBtn.setEnabled(True)
self.stopInstall(True, self.autoCheck.isChecked())
# 开启自动安装
if self.autoCheck.isChecked():
if self.autoTimer.remainingTime() > 0:
self.autoTimer.stop()
self.autoTimer.start(int(self.autoTime.text()) * 1000)
def resize(self):
@ -481,31 +438,24 @@ class mainWindow(QWidget):
""" 安装状态 """
self.tryAgainLabel.setHidden(True)
self.tryAgain.setHidden(True)
if self.task.isReady():
self.tryAgain.setText("0")
# if self.task.isReady():
# self.tryAgain.setText("0")
print(stateOrError, str)
if type(stateOrError) == str:
print("222---")
# self.statusBar.setStyleSheet("QStatusBar::item { border: 0 } QLabel {color: red; font-weight: bold}")
self.statusBar.setStyleSheet(
"QStatusBar {font-weight: bold; color: black} QStatusBar::item { border: 0 } QLabel {font-weight: bold}")
if self.task is not None and not self.task.isWork and not self.autoCheck.isChecked():
print("statusBar---")
self.statusBar.showMessage(stateOrError, 3000)
else:
print("else---")
self.statusBar.showMessage(stateOrError)
else:
print("333---")
self.task.requestInterruption()
self.statusBar.setStyleSheet(
"QStatusBar {font-weight: bold; color: red} QStatusBar::item { border: 0 } QLabel {font-weight: bold; color: red}")
if type(stateOrError) == portError:
if (stateOrError.value in [portError.errorInvalid, portError.errorBusy]) and (
int(self.tryAgain.text()) < 20):
if (stateOrError.value in [portError.errorInvalid, portError.errorBusy]) and (int(self.tryAgain.text()) < 20):
self.statusBar.showMessage("PortError: " + str(stateOrError))
self.tryAgain.setText(str(int(self.tryAgain.text()) + 1))
self.tryAgainLabel.setVisible(True)
@ -531,7 +481,6 @@ class mainWindow(QWidget):
else:
self.statusBar.showMessage("Error: " + str(stateOrError), 5000)
self.stopInstall()
print("1111---")
self.task.isWork = False
self.task.wait(100)
@ -539,7 +488,6 @@ class mainWindow(QWidget):
if __name__ == '__main__':
app = QApplication(sys.argv)
win = mainWindow()

View File

@ -18,10 +18,12 @@ import collections
import inspect
import re
import struct
import sys, time
import sys
import time
import zlib
import usb.core
import usb.util
import zlib
# USB request __TIMEOUT
__TIMEOUT = 4000
@ -658,24 +660,213 @@ def main():
# if __name__ == "__main__":
# main()
from avr_isp.errorBase import portError
from PyQt5.QtCore import QIODevice, QThread, pyqtSignal
from PyQt5.QtCore import QIODevice, QThread, pyqtSignal, QByteArray
from PyQt5.QtSerialPort import QSerialPort, QSerialPortInfo
from PyQt5.QtWidgets import QApplication
from avr_isp import ispBase
from avr_isp.errorBase import portError
class STM32Dev(ispBase.IspBase, QSerialPort):
progressCallback = pyqtSignal(int, int)
def __init__(self):
super(STM32Dev, self).__init__()
self.seq = 1
self.lastAddr = -1
self.portInfo = None
def connect(self, port='COM4', speed=115200):
print("connect", port)
self.portInfo = QSerialPortInfo(port)
print("portInfo", self.portInfo)
self.setPortName(port)
# self.setBaudRate(speed)
# self.setPortName("COM3")
self.setBaudRate(QSerialPort.Baud115200)
self.setDataBits(QSerialPort.Data8)
self.setParity(QSerialPort.NoParity)
self.setStopBits(QSerialPort.OneStop)
self.setFlowControl(QSerialPort.NoFlowControl)
if self.portInfo.isNull():
raise portError(portError.errorInvalid, port)
else:
if self.portInfo.isBusy():
raise portError(portError.errorBusy, port)
else:
if self.open(QIODevice.ReadWrite):
# self.setBreakEnabled()
print("open")
# self.entryISP()
print("open--end")
else:
raise portError(portError.errorOpen, port)
def close(self):
super(STM32Dev, self).close()
self.portInfo = None
def serial_DFU(self):
print("serial_DFU")
# if not self.open(QIODevice.ReadWrite):
# # QtWidgets.QMessageBox.about(self, "提示", "无法打开串口!")
# return
print("1111")
data = bytes("M9999\r", encoding='utf-8')
data = QByteArray(data)
print("2222")
# self.write(data)
print(self.write(data))
print("serial_DFU---end")
self.close()
def entryISP(self):
self.seq = 1
# Reset the controller
self.setDataTerminalReady(True)
QThread.msleep(100)
self.setDataTerminalReady(False)
QThread.msleep(200)
self.clear()
print("=====")
recv = self.sendMessage([1])[3:]
if "".join([chr(c) for c in recv]) != "AVRISP_2":
raise ispBase.IspError("Unkonwn bootloaders!")
if self.sendMessage([0x10, 0xc8, 0x64, 0x19, 0x20, 0x00, 0x53, 0x03, 0xac, 0x53, 0x00, 0x00]) != [0x10, 0x00]:
raise ispBase.IspError("Failed to enter programming mode!")
def leaveISP(self):
if self.portInfo is not None:
if self.sendMessage([0x11]) != [0x11, 0x00]:
raise ispBase.IspError("Failed to leave programming mode!")
def isConnected(self):
return self.isOpen()
def sendISP(self, data):
recv = self.sendMessage([0x1D, 4, 4, 0, data[0], data[1], data[2], data[3]])
return recv[2:6]
def writeFlash(self, flashData):
# Set load addr to 0, in case we have more then 64k flash we need to enable the address extension
pageSize = self.chip['pageSize'] * 2
flashSize = pageSize * self.chip['pageCount']
if flashSize > 0xFFFF:
self.sendMessage([0x06, 0x80, 0x00, 0x00, 0x00])
else:
self.sendMessage([0x06, 0x00, 0x00, 0x00, 0x00])
loadCount = (len(flashData) + pageSize - 1) // pageSize
for i in range(0, loadCount):
self.sendMessage([0x13, pageSize >> 8, pageSize & 0xFF, 0xc1, 0x0a, 0x40, 0x4c, 0x20, 0x00, 0x00] + flashData[(i * pageSize):(
i * pageSize + pageSize)])
self.progressCallback.emit(i + 1, loadCount * 2)
def verifyFlash(self, flashData):
# Set load addr to 0, in case we have more then 64k flash we need to enable the address extension
flashSize = self.chip['pageSize'] * 2 * self.chip['pageCount']
if flashSize > 0xFFFF:
self.sendMessage([0x06, 0x80, 0x00, 0x00, 0x00])
else:
self.sendMessage([0x06, 0x00, 0x00, 0x00, 0x00])
loadCount = (len(flashData) + 0xFF) // 0x100
for i in range(0, loadCount):
recv = self.sendMessage([0x14, 0x01, 0x00, 0x20])[2:0x102]
self.progressCallback.emit(loadCount + i + 1, loadCount * 2)
for j in range(0, 0x100):
if i * 0x100 + j < len(flashData) and flashData[i * 0x100 + j] != recv[j]:
raise ispBase.IspError('Verify error at: 0x%x' % (i * 0x100 + j))
def fastReset(self):
QThread.msleep(50)
self.setDataTerminalReady(True)
self.setDataTerminalReady(False)
def sendMessage(self, data):
message = struct.pack(">BBHB", 0x1B, self.seq, len(data), 0x0E)
for c in data:
message += struct.pack(">B", c)
checksum = 0
for c in message:
checksum ^= c
message += struct.pack(">B", checksum)
try:
print("----00")
self.write(message)
self.flush()
print("----11")
except:
raise ispBase.IspError("Serial send timeout")
self.seq = (self.seq + 1) & 0xFF
print("----222")
# time.sleep(1)
if self.waitForReadyRead(1000):
print("----33")
return self.recvMessage()
else:
print("----44")
raise ispBase.IspError("Serial recv timeout")
def recvMessage(self):
state = 'Start'
checksum = 0
while True:
s = self.read(1)
if len(s) < 1:
if self.waitForReadyRead(20):
continue
else:
raise ispBase.IspError("Serial read timeout")
b = struct.unpack(">B", s)[0]
checksum ^= b
if state == 'Start':
if b == 0x1B:
state = 'GetSeq'
checksum = 0x1B
elif state == 'GetSeq':
state = 'MsgSize1'
elif state == 'MsgSize1':
msgSize = b << 8
state = 'MsgSize2'
elif state == 'MsgSize2':
msgSize |= b
state = 'Token'
elif state == 'Token':
if b != 0x0E:
state = 'Start'
else:
state = 'Data'
data = []
elif state == 'Data':
data.append(b)
if len(data) == msgSize:
state = 'Checksum'
elif state == 'Checksum':
if checksum != 0:
state = 'Start'
else:
return data
class DFUTool(QThread):
print("DFUTool(QThread)")
stateCallback = pyqtSignal([str], [Exception])
progressCallback = pyqtSignal(int, int)
def __init__(self, parent, filename, callback=None):
def __init__(self, parent, port, speed, filename, callback=None):
super(DFUTool, self).__init__()
print("----------------------")
self.parent = parent
self.port = port
self.speed = speed
self.filename = filename
self.progress = callback
self.callback = callback
self.programmer = None
self.isWork = False
self.finished.connect(self.done)
@ -721,6 +912,7 @@ class DFUTool(QThread):
def w(state):
if state is False:
self.stateCallback[Exception].emit(portError(portError.errorOpen, port))
self.stateCallback[str].emit("Done!")
self.quit()
return
@ -737,30 +929,40 @@ class DFUTool(QThread):
self.thread = DFUSearch(self, kwargs=kwargs)
self.thread.searchResults.connect(w) # 异步完成后执行函数w
self.thread.start()
self.isWork = True
def cl_progress(self, addr, offset, size):
"""Prints a progress report suitable for use on the command line."""
print("offset", offset, "size", size)
self.progressCallback.emit(offset, size)
def disconnect(self, QMetaObject_Connection=None):
print("QMetaObject_Connection")
def run(self):
if self.progress is not None:
self.progressCallback.connect(self.progress)
print(self.args)
self.stateCallback[str].emit("read fils")
with open(self.args.path, "rb") as fin:
dfu_file = fin.read()
if dfu_file is None:
print("file is None")
return
self.isWork = True
elem = {"addr": 134217728, "size": len(dfu_file), "data": dfu_file}
try:
self.stateCallback[str].emit("Write file...")
write_elements([elem], self.args.mass_erase, progress=self.cl_progress)
with open(self.args.path, "rb") as fin:
dfu_file = fin.read()
if dfu_file is None:
print("file is None")
return
elem = {"addr": 134217728, "size": len(dfu_file), "data": dfu_file}
self.programmer = STM32Dev()
if self.callback is not None:
self.progressCallback.connect(self.callback)
if self.parent is None:
pass
else:
self.stateCallback[str].emit(self.tr("Programming..."))
# self.programmer.
write_elements([elem], self.args.mass_erase, progress=self.cl_progress)
exit_dfu() # 退出DFU模式
except Exception as err:
if self.isInterruptionRequested():
print("int")
@ -773,16 +975,13 @@ class DFUTool(QThread):
raise err
self.isWork = False
finally:
self.isWork = False
self.stateCallback[str].emit("Exiting DFU...")
print("Exiting DFU...")
exit_dfu()
# 退出DFU模式
print("Done...")
self.ude = None
self.stateCallback[str].emit("Done!")
self.programmer = None
def isReady(self):
return True
return self.programmer is not None and self.programmer.isConnected()
try:
status = get_status()
print(status)
@ -807,6 +1006,9 @@ class DFUTool(QThread):
print("Failure!")
def terminate(self):
if self.thread.isRunning():
self.thread.exit()
self.requestInterruption()
return super(DFUTool, self).terminate()
@ -830,11 +1032,11 @@ class DFUSearch(QThread):
devices = get_dfu_devices(**self.kwargs)
attempts += 1
print("搜索DFU设备", attempts)
if attempts > 10:
if attempts > 20:
self.searchResults.emit(False)
self.quit()
return
time.sleep(2)
time.sleep(1)
self.searchResults.emit(True)