增加DUF模式烧录

This commit is contained in:
边瑞峰 2022-05-24 14:31:23 +08:00
parent 76b12e2c11
commit 2630d837e8
7 changed files with 2668 additions and 1246 deletions

File diff suppressed because it is too large Load Diff

View File

@ -0,0 +1,5 @@
from .chipDB import *
from .errorBase import portError
from .intelHex import *
from .ispBase import *
from .stk500v2 import *

16
src/avr_isp/errorBase.py Normal file
View File

@ -0,0 +1,16 @@
class portError(Exception):
errorInvalid = 0
errorBusy = 1
errorOpen = 2
def __init__(self, value, port):
self.value = value
self.port = str(port)
def __str__(self):
if self.value == self.errorInvalid:
return "Invalid serial port : " + self.port + " !"
elif self.value == self.errorBusy:
return "Serial port " + self.port + " is busy!"
elif self.value == self.errorOpen:
return "Serial port " + self.port + " failed to open!"

View File

@ -16,6 +16,7 @@ from PyQt5.QtSerialPort import QSerialPort, QSerialPortInfo
from PyQt5.QtWidgets import QApplication
from avr_isp import intelHex, ispBase
from .errorBase import portError
class Stk500v2(ispBase.IspBase, QSerialPort):
@ -87,7 +88,10 @@ class Stk500v2(ispBase.IspBase, QSerialPort):
loadCount = (len(flashData) + pageSize - 1) // pageSize
for i in range(0, loadCount):
self.sendMessage([0x13, pageSize >> 8, pageSize & 0xFF, 0xc1, 0x0a, 0x40, 0x4c, 0x20, 0x00, 0x00] + flashData[(i * pageSize):(i * pageSize + pageSize)])
self.sendMessage(
[0x13, pageSize >> 8, pageSize & 0xFF, 0xc1, 0x0a, 0x40, 0x4c, 0x20, 0x00, 0x00] + flashData[
(i * pageSize):(
i * pageSize + pageSize)])
self.progressCallback.emit(i + 1, loadCount * 2)
def verifyFlash(self, flashData):
@ -172,24 +176,6 @@ class Stk500v2(ispBase.IspBase, QSerialPort):
return data
class portError(Exception):
errorInvalid = 0
errorBusy = 1
errorOpen = 2
def __init__(self, value, port):
self.value = value
self.port = str(port)
def __str__(self):
if self.value == self.errorInvalid:
return "Invalid serial port : " + self.port + " !"
elif self.value == self.errorBusy:
return "Serial port " + self.port + " is busy!"
elif self.value == self.errorOpen:
return "Serial port " + self.port + " failed to open!"
class stk500v2Thread(QThread):
stateCallback = pyqtSignal([str], [Exception])
@ -205,8 +191,9 @@ class stk500v2Thread(QThread):
self.finished.connect(self.done)
def run(self):
self.isWork = True
try:
self.isWork = True
self.programmer = Stk500v2()
if self.callback is not None:
self.programmer.progressCallback.connect(self.callback)
@ -214,24 +201,26 @@ class stk500v2Thread(QThread):
if self.parent is None:
runProgrammer(self.port, self.speed, self.filename, self.programmer)
else:
self.stateCallback[str].emit(self.tr("Connecting..."))
self.stateCallback[str].emit(self.tr("Connecting.."))
self.msleep(200)
self.programmer.connect(self.port, self.speed)
self.stateCallback[str].emit(self.tr("Programming..."))
self.programmer.programChip(intelHex.readHex(self.filename))
except Exception as e:
if(self.isInterruptionRequested()):
if self.isInterruptionRequested():
print("Int")
else:
if self.parent is not None:
self.stateCallback[Exception].emit(e)
while(self.isWork):
pass # 等待父进程处理异常
# while self.isWork:
# pass # 等待父进程处理异常
else:
raise e
self.isWork = False
finally:
self.isWork = False
if self.programmer.isConnected():
self.programmer.fastReset()
self.programmer.close()
@ -241,8 +230,9 @@ class stk500v2Thread(QThread):
return self.programmer is not None and self.programmer.isConnected()
def done(self):
print("结束烧录程序")
if self.parent is not None:
if(self.isWork):
if self.isWork:
self.isWork = False
self.stateCallback[str].emit(self.tr("Done!"))
else:
@ -271,12 +261,13 @@ def main():
""" Entry point to call the stk500v2 programmer from the commandline. """
programmer = Stk500v2()
try:
if(len(sys.argv) > 2):
if len(sys.argv) > 2:
programmer.connect(sys.argv[1])
programmer.programChip(intelHex.readHex(sys.argv[2]))
else:
programmer.connect("COM4")
programmer.programChip(intelHex.readHex("D:/OneDrive/Desktop/CreatBot F160 01 EN KTC ( AUTO_LEVELING ).hex"))
programmer.programChip(
intelHex.readHex("D:/OneDrive/Desktop/CreatBot F160 01 EN KTC ( AUTO_LEVELING ).hex"))
except portError as e:
print(e.value)
print("PortError: " + str(e))
@ -301,4 +292,3 @@ if __name__ == '__main__':
print(e)
sys.exit(app.exec())

View File

@ -6,7 +6,9 @@ Created on 2017年8月15日
import os
import sys
import time
from PyQt5 import QtSerialPort, QtCore, QtWidgets
from PyQt5.Qt import pyqtSignal
from PyQt5.QtCore import QSize, QDir, QTimer
from PyQt5.QtGui import QIcon
@ -19,6 +21,8 @@ from avr_isp.intelHex import formatError
from avr_isp.ispBase import IspError
from avr_isp.stk500v2 import stk500v2Thread, portError
# from DfuseTool import DfuseTool as DFUse
from pydfu import DFUTool as DFUse
if getattr(sys, 'frozen', False):
bundle_dir = sys._MEIPASS
@ -45,7 +49,6 @@ class countLabel(QLabel):
class portCombox(QComboBox):
showPopupSignal = pyqtSignal(bool)
def __init__(self):
@ -56,6 +59,7 @@ class portCombox(QComboBox):
self.setSizeAdjustPolicy(QComboBox.AdjustToContents)
return super(portCombox, self).showPopup()
class mainWindow(QWidget):
def __init__(self):
@ -67,10 +71,13 @@ class mainWindow(QWidget):
self.portUpdateTimer = QTimer()
self.portUpdateTimer.timeout.connect(self.portUpdate)
self.portUpdateTimer.start(100)
self.portUpdateTimer.start(200)
self.autoTimer = QTimer()
self.autoTimer.setSingleShot(True)
self.autoTimer.timeout.connect(self.installFile)
# self.autoTimer.setSingleShot(True)
self.autoTimer.timeout.connect(self.portUpdate)
# self.autoTimer.start(1000)
self.task = None
self.initUI()
@ -92,8 +99,8 @@ class mainWindow(QWidget):
manualLayout = QHBoxLayout()
self.manualBox.setLayout(manualLayout)
self.portCombo = portCombox()
self.baudCombo = QComboBox()
self.portCombo = portCombox() # 端口号下拉选框
self.baudCombo = QComboBox() # 波特率
manualLayout.addWidget(QLabel(self.tr("Port:")))
manualLayout.addWidget(self.portCombo)
manualLayout.addStretch()
@ -109,7 +116,7 @@ class mainWindow(QWidget):
# FileBox widget
self.file = QLineEdit()
self.file.setPlaceholderText("Please select a hex file.")
self.file.setPlaceholderText("Please select a firmware file.")
self.file.setReadOnly(True)
self.file.setFrame(False)
self.file.setDragEnabled(True)
@ -118,7 +125,7 @@ class mainWindow(QWidget):
fileLayout = QHBoxLayout()
self.fileBox.setLayout(fileLayout)
fileLayout.addWidget(QLabel(self.tr("Hex file:")))
fileLayout.addWidget(QLabel(self.tr("firmware:")))
fileLayout.addWidget(self.file)
fileLayout.addWidget(self.fileBtn)
@ -128,7 +135,7 @@ class mainWindow(QWidget):
self.autoCheck = QCheckBox(self.tr("Auto Install"))
self.autoTimeLabel = QLabel(self.tr("Idle Time:"))
self.autoTimeLabel2 = QLabel(self.tr("s"))
self.autoTime = QLineEdit("2")
self.autoTime = QLineEdit("3")
self.autoTime.setInputMask("00")
self.autoTime.setMaximumWidth(20)
@ -167,7 +174,8 @@ class mainWindow(QWidget):
# 计数栏
self.countBar = QStatusBar()
self.countBar.setSizeGripEnabled(False)
self.countBar.setStyleSheet("QStatusBar::item { border: 0 } QLabel {border:0; font-size: 14px; font-weight: bold}")
self.countBar.setStyleSheet(
"QStatusBar::item { border: 0 } QLabel {border:0; font-size: 14px; font-weight: bold}")
countSuccessLabel = QLabel("Success: ")
countFailureLabel = QLabel("Failure: ")
self.countSuccess = countLabel("0")
@ -205,22 +213,27 @@ class mainWindow(QWidget):
self.statusBar.hide()
self.stopBtn.setDisabled(True)
self.portCombo.show()
self.autoRadio.toggled.connect(self.manualBox.setDisabled)
self.autoRadio.toggled.connect(self.autoInstallWidget.setVisible)
self.autoRadio.toggled.connect(self.resize)
self.manualRadio.clicked.connect(self.disableAutoInstall)
self.manualRadio.clicked.connect(self.resize)
self.autoCheck.toggled.connect(self.autoTimeLabel.setEnabled)
self.autoCheck.toggled.connect(self.autoTime.setEnabled)
self.autoCheck.toggled.connect(self.autoTimeLabel2.setEnabled)
self.autoCheck.stateChanged.connect(self.autoStateChangeAction)
self.portCombo.showPopupSignal.connect(self.portUpdate)
self.portCombo.showPopupSignal.connect(self.portUpdate) # 弹出下拉选框,更新可用端口列表
self.autoTime.returnPressed.connect(self.autoTimeChangeAction)
self.statusBar.messageChanged.connect(self.stateClearAction)
self.autoCheck.click()
self.autoCheck.click()
# 默认选中
self.autoRadio.click()
# self.manualRadio.click()
# self.autoCheck.click()
self.fileBtn.clicked.connect(self.selectFile)
self.installBtn.clicked.connect(self.installFile)
@ -230,17 +243,33 @@ class mainWindow(QWidget):
self.file.__class__.dragEnterEvent = self.dragEnterEvent
def portUpdate(self, forceUpdate=False):
""" Auto 监听端口 """
print("search port")
if self.autoRadio.isChecked():
self.baudCombo.setCurrentText("115200")
# self.portCombo.addItems(portList())
self.portCombo.clear()
for port in QSerialPortInfo.availablePorts():
# self.portCombo.clear()
port_list = QSerialPortInfo.availablePorts()
for port in port_list:
print(':--:', port.portName(), port.description())
if port.description() not in ["Arduino Mega 2560", "USB-SERIAL CH340"]: # 过滤2560和CH340
continue
self.portCombo.addItem(port.portName() + " (" + port.description() + ")", port.portName())
if len(port_list) > 0:
port = port_list[0]
info = QSerialPortInfo(port)
print(info.productIdentifier())
if info.productIdentifier() == 0:
return
if not self.installBtn.isEnabled():
self.installFile()
else:
currentPortData = self.portCombo.currentData()
if forceUpdate or (currentPortData and currentPortData not in [port.portName() for port in QSerialPortInfo.availablePorts()]):
if forceUpdate or (currentPortData and currentPortData not in [port.portName() for port in
QSerialPortInfo.availablePorts()]):
self.portCombo.clear()
for port in QSerialPortInfo.availablePorts():
self.portCombo.addItem(port.portName() + " (" + port.description() + ")", port.portName())
@ -256,25 +285,38 @@ class mainWindow(QWidget):
if not check and self.autoTimer.remainingTime() > 0:
self.stopInstall()
if check:
# 开启自动安装的端口扫描
self.portUpdateTimer.stop()
self.autoTimer.start(int(self.autoTime.text())*1000)
else:
self.autoTimer.stop()
self.portUpdateTimer.start(200)
def autoTimeChangeAction(self):
""" 修改时间间隔 """
self.autoTime.clearFocus()
if self.autoCheck.isChecked() and self.autoTimer.remainingTime() > 0:
self.autoTimer.stop()
self.installFile()
def stateClearAction(self, msg):
if msg is "" and self.statusBar.isVisible():
""" 清理状态栏 """
if msg == "" and self.statusBar.isVisible():
self.statusBar.hide()
if msg is not "" and self.statusBar.isHidden():
if msg != "" and self.statusBar.isHidden():
self.statusBar.show()
self.resize()
def selectFile(self):
""" 选择固件文件 """
hexFileDialog = QFileDialog()
hexFileDialog.setWindowTitle(self.tr("Select hex file"))
hexFileDialog.setNameFilter(self.tr("Hex files(*.hex)"))
hexFileDialog.setWindowTitle(self.tr("Select firmware file"))
# filesFilter = "firmware file (*.hex)|*.bin|" "All files (*.*)|*.*"
filesFilter = "firmware file (*.hex | *.bin)"
hexFileDialog.setNameFilter(self.tr(filesFilter))
hexFileDialog.setFileMode(QFileDialog.ExistingFile)
if(self.file.text() is ""):
if (self.file.text() == ""):
hexFileDialog.setDirectory(QDir.home()) # 设置为Home目录
else:
fileDir = QDir(self.file.text())
@ -287,14 +329,92 @@ class mainWindow(QWidget):
self.file.setText(hexFileDialog.selectedFiles()[0])
def installFile(self, notFromButton=True):
if(self.file.text() is ""):
if(not notFromButton):
""" 开始安装 """
print("-------------------开始安装-------------------")
# for port in QSerialPortInfo.availablePorts():
# print('port.description===', port.portName())
# print('port.description===', port.description())
# info = QSerialPortInfo(port)
# print(info.productIdentifier())
# print(info.vendorIdentifier())
if self.file.text() == "":
if not notFromButton:
self.selectFile()
self.installFile(True)
else:
if self.autoTimer.remainingTime() is not -1:
if self.autoTimer.remainingTime() > 0:
self.autoTimer.stop()
port_list = QSerialPortInfo.availablePorts()
if len(port_list) <= 0:
QtWidgets.QMessageBox.about(self, "提示", "没有可用的串口!")
return
port = port_list[0]
info = QSerialPortInfo(port)
port_Name = None
baud_rate = None
if self.manualRadio.isChecked():
if self.portCombo.currentData() is None:
QtWidgets.QMessageBox.about(self, "提示", "请选择串口设备")
return
else:
port_Name = self.portCombo.currentData()
baud_rate = int(self.baudCombo.currentText())
else:
port_Name = info.portName()
baud_rate = QtSerialPort.QSerialPort.Baud115200
print("port:", port_Name)
print("baudCombo:", baud_rate)
print(info.portName())
print(info.description())
print(info.productIdentifier())
print(info.vendorIdentifier())
if info.productIdentifier() == 0:
self.autoTimer.start(1000)
pass
if info.vendorIdentifier() == 1155:
print("------命令开启DFU模式 start------")
self.statusBar.showMessage("Serial to DFU...")
serial = QtSerialPort.QSerialPort(self)
serial.setPortName(port_Name)
serial.setBaudRate(baud_rate)
serial.setDataBits(QtSerialPort.QSerialPort.Data8)
serial.setParity(QtSerialPort.QSerialPort.NoParity)
serial.setStopBits(QtSerialPort.QSerialPort.OneStop)
serial.setFlowControl(QtSerialPort.QSerialPort.NoFlowControl)
if not serial.open(QtCore.QIODevice.ReadWrite):
# QtWidgets.QMessageBox.about(self, "提示", "无法打开串口!")
return
data = bytes("M9999\r", encoding='utf-8')
data = QtCore.QByteArray(data)
serial.write(data)
print("------命令开启DFU模式 end------")
self.statusBar.showMessage("Serial to dfu...")
self.task = DFUse(self, self.file.text(), self.progressUpdate)
self.task.stateCallback[str].connect(self.stateUpdate)
self.task.stateCallback[Exception].connect(self.stateUpdate)
self.task.finished.connect(self.autoAction) # 检查是否自动烧写,并启动。
# self.task.start()
elif info.vendorIdentifier() != 0:
self.task = stk500v2Thread(self, self.portCombo.currentData(), int(self.baudCombo.currentText()),
self.file.text(), self.progressUpdate)
self.task.stateCallback[str].connect(self.stateUpdate)
self.task.stateCallback[Exception].connect(self.stateUpdate)
self.task.finished.connect(self.autoAction)
self.task.start()
# 开始烧录刷新UI
# self.statusBar.showMessage(" ")
self.portBox.setDisabled(True)
self.fileBox.setDisabled(True)
self.installBtn.setDisabled(True)
@ -302,14 +422,8 @@ class mainWindow(QWidget):
self.progress.show()
self.resize()
self.task = stk500v2Thread(self, self.portCombo.currentData(), int(self.baudCombo.currentText()), self.file.text(), self.progressUpdate)
self.task.stateCallback[str].connect(self.stateUpdate)
self.task.stateCallback[Exception].connect(self.stateUpdate)
self.task.finished.connect(self.autoAction)
self.task.start()
self.statusBar.showMessage(" ")
def stopInstall(self, succeed=False, autoInstall=False):
""" 停止自动安装 """
if autoInstall:
self.progress.reset()
else:
@ -326,14 +440,14 @@ class mainWindow(QWidget):
self.countSuccess.setText(str(int(self.countSuccess.text()) + 1))
self.task = None
else:
if self.autoTimer.remainingTime() is not -1:
self.autoTimer.stop()
# if self.autoTimer.remainingTime() != -1:
# self.autoTimer.stop()
if self.task is not None and self.task.isRunning():
self.task.finished.disconnect()
if self.task.isReady():
self.countFailure.setText(str(int(self.countFailure.text()) + 1))
if(self.task.isInterruptionRequested()):
if self.task.isInterruptionRequested():
pass
else:
# self.task.requestInterruption()
@ -344,36 +458,54 @@ class mainWindow(QWidget):
self.statusBar.clearMessage()
def autoAction(self):
""" 上一个任务结束后,开启新的烧录任务 """
self.task = None
self.statusBar.showMessage("Done!")
self.stopInstall(True, self.autoCheck.isChecked())
# 开启自动安装
if self.autoCheck.isChecked():
if self.autoTimer.remainingTime() > 0:
self.autoTimer.stop()
self.autoTimer.start(int(self.autoTime.text()) * 1000)
def resize(self):
self.setFixedHeight(self.sizeHint().height())
def progressUpdate(self, cur, total):
""" 进度条 """
self.progress.setMaximum(total)
self.progress.setValue(cur)
def stateUpdate(self, stateOrError):
""" 安装状态 """
self.tryAgainLabel.setHidden(True)
self.tryAgain.setHidden(True)
if self.task.isReady():
self.tryAgain.setText("0")
print(stateOrError, str)
if type(stateOrError) == str:
print("222---")
# self.statusBar.setStyleSheet("QStatusBar::item { border: 0 } QLabel {color: red; font-weight: bold}")
self.statusBar.setStyleSheet("QStatusBar {font-weight: bold; color: black} QStatusBar::item { border: 0 } QLabel {font-weight: bold}")
self.statusBar.setStyleSheet(
"QStatusBar {font-weight: bold; color: black} QStatusBar::item { border: 0 } QLabel {font-weight: bold}")
if self.task is not None and not self.task.isWork and not self.autoCheck.isChecked():
print("statusBar---")
self.statusBar.showMessage(stateOrError, 3000)
else:
print("else---")
self.statusBar.showMessage(stateOrError)
else:
print("333---")
self.task.requestInterruption()
self.statusBar.setStyleSheet("QStatusBar {font-weight: bold; color: red} QStatusBar::item { border: 0 } QLabel {font-weight: bold; color: red}")
self.statusBar.setStyleSheet(
"QStatusBar {font-weight: bold; color: red} QStatusBar::item { border: 0 } QLabel {font-weight: bold; color: red}")
if type(stateOrError) == portError:
if (stateOrError.value in [portError.errorInvalid, portError.errorBusy]) and (int(self.tryAgain.text()) < 20):
if (stateOrError.value in [portError.errorInvalid, portError.errorBusy]) and (
int(self.tryAgain.text()) < 20):
self.statusBar.showMessage("PortError: " + str(stateOrError))
self.tryAgain.setText(str(int(self.tryAgain.text()) + 1))
self.tryAgainLabel.setVisible(True)
@ -399,6 +531,7 @@ class mainWindow(QWidget):
else:
self.statusBar.showMessage("Error: " + str(stateOrError), 5000)
self.stopInstall()
print("1111---")
self.task.isWork = False
self.task.wait(100)
@ -406,12 +539,13 @@ class mainWindow(QWidget):
if __name__ == '__main__':
app = QApplication(sys.argv)
win = mainWindow()
win.show()
if(len(sys.argv) > 1): # 关联hex文件自动安装
if len(sys.argv) > 1: # 关联hex文件自动安装
win.portUpdate()
win.file.setText(sys.argv[1])
win.installBtn.click()

856
src/pydfu.py Normal file
View File

@ -0,0 +1,856 @@
#!/usr/bin/env python
# This file is part of the OpenMV project.
# Copyright (c) 2013/2014 Ibrahim Abdelkader <i.abdalkader@gmail.com>
# This work is licensed under the MIT license, see the file LICENSE for
# details.
"""This module implements enough functionality to program the STM32F4xx over
DFU, without requiring dfu-util.
See app note AN3156 for a description of the DFU protocol.
See document UM0391 for a dscription of the DFuse file.
"""
from __future__ import print_function
import argparse
import collections
import inspect
import re
import struct
import sys, time
import usb.core
import usb.util
import zlib
# USB request __TIMEOUT
__TIMEOUT = 4000
# DFU commands
__DFU_DETACH = 0
__DFU_DNLOAD = 1
__DFU_UPLOAD = 2
__DFU_GETSTATUS = 3
__DFU_CLRSTATUS = 4
__DFU_GETSTATE = 5
__DFU_ABORT = 6
# DFU status
__DFU_STATE_APP_IDLE = 0x00
__DFU_STATE_APP_DETACH = 0x01
__DFU_STATE_DFU_IDLE = 0x02
__DFU_STATE_DFU_DOWNLOAD_SYNC = 0x03
__DFU_STATE_DFU_DOWNLOAD_BUSY = 0x04
__DFU_STATE_DFU_DOWNLOAD_IDLE = 0x05
__DFU_STATE_DFU_MANIFEST_SYNC = 0x06
__DFU_STATE_DFU_MANIFEST = 0x07
__DFU_STATE_DFU_MANIFEST_WAIT_RESET = 0x08
__DFU_STATE_DFU_UPLOAD_IDLE = 0x09
__DFU_STATE_DFU_ERROR = 0x0A
_DFU_DESCRIPTOR_TYPE = 0x21
__DFU_STATUS_STR = {
__DFU_STATE_APP_IDLE: "STATE_APP_IDLE",
__DFU_STATE_APP_DETACH: "STATE_APP_DETACH",
__DFU_STATE_DFU_IDLE: "STATE_DFU_IDLE",
__DFU_STATE_DFU_DOWNLOAD_SYNC: "STATE_DFU_DOWNLOAD_SYNC",
__DFU_STATE_DFU_DOWNLOAD_BUSY: "STATE_DFU_DOWNLOAD_BUSY",
__DFU_STATE_DFU_DOWNLOAD_IDLE: "STATE_DFU_DOWNLOAD_IDLE",
__DFU_STATE_DFU_MANIFEST_SYNC: "STATE_DFU_MANIFEST_SYNC",
__DFU_STATE_DFU_MANIFEST: "STATE_DFU_MANIFEST",
__DFU_STATE_DFU_MANIFEST_WAIT_RESET: "STATE_DFU_MANIFEST_WAIT_RESET",
__DFU_STATE_DFU_UPLOAD_IDLE: "STATE_DFU_UPLOAD_IDLE",
__DFU_STATE_DFU_ERROR: "STATE_DFU_ERROR",
}
# USB device handle
__dev = None
# Configuration descriptor of the device
__cfg_descr = None
__verbose = None
# USB DFU interface
__DFU_INTERFACE = 0
# Python 3 deprecated getargspec in favour of getfullargspec, but
# Python 2 doesn't have the latter, so detect which one to use
getargspec = getattr(inspect, "getfullargspec", inspect.getargspec)
if "length" in getargspec(usb.util.get_string).args:
# PyUSB 1.0.0.b1 has the length argument
def get_string(dev, index):
return usb.util.get_string(dev, 255, index)
else:
# PyUSB 1.0.0.b2 dropped the length argument
def get_string(dev, index):
return usb.util.get_string(dev, index)
def find_dfu_cfg_descr(descr):
if len(descr) == 9 and descr[0] == 9 and descr[1] == _DFU_DESCRIPTOR_TYPE:
nt = collections.namedtuple(
"CfgDescr",
[
"bLength",
"bDescriptorType",
"bmAttributes",
"wDetachTimeOut",
"wTransferSize",
"bcdDFUVersion",
],
)
return nt(*struct.unpack("<BBBHHH", bytearray(descr)))
return None
def init(**kwargs):
"""Initializes the found DFU device so that we can program it."""
global __dev, __cfg_descr
devices = get_dfu_devices(**kwargs)
# Waiting 2 seconds before trying again..."
attempts = 0
while not devices:
devices = get_dfu_devices(**kwargs)
attempts += 1
print(" + str(attempts) + ", attempts)
time.sleep(2)
# # 尝试5次后报错
# if attempts >= 5:
# break
if not devices:
raise ValueError("No DFU device found")
if len(devices) > 1:
raise ValueError("Multiple DFU devices found")
__dev = devices[0]
__dev.set_configuration()
# Claim DFU interface
usb.util.claim_interface(__dev, __DFU_INTERFACE)
# Find the DFU configuration descriptor, either in the device or interfaces
__cfg_descr = None
for cfg in __dev.configurations():
__cfg_descr = find_dfu_cfg_descr(cfg.extra_descriptors)
if __cfg_descr:
break
for itf in cfg.interfaces():
__cfg_descr = find_dfu_cfg_descr(itf.extra_descriptors)
if __cfg_descr:
break
# Get device into idle state
for attempt in range(4):
status = get_status()
if status == __DFU_STATE_DFU_IDLE:
break
elif status == __DFU_STATE_DFU_DOWNLOAD_IDLE or status == __DFU_STATE_DFU_UPLOAD_IDLE:
abort_request()
else:
clr_status()
def abort_request():
"""Sends an abort request."""
__dev.ctrl_transfer(0x21, __DFU_ABORT, 0, __DFU_INTERFACE, None, __TIMEOUT)
def clr_status():
"""Clears any error status (perhaps left over from a previous session)."""
__dev.ctrl_transfer(0x21, __DFU_CLRSTATUS, 0, __DFU_INTERFACE, None, __TIMEOUT)
def get_status():
"""Get the status of the last operation."""
stat = __dev.ctrl_transfer(0xA1, __DFU_GETSTATUS, 0, __DFU_INTERFACE, 6, 20000)
# firmware can provide an optional string for any error
if stat[5]:
message = get_string(__dev, stat[5])
if message:
print(message)
return stat[4]
def check_status(stage, expected):
status = get_status()
if status != expected:
raise SystemExit("DFU: %s failed (%s)" % (stage, __DFU_STATUS_STR.get(status, status)))
def mass_erase():
"""Performs a MASS erase (i.e. erases the entire device)."""
# Send DNLOAD with first byte=0x41
__dev.ctrl_transfer(0x21, __DFU_DNLOAD, 0, __DFU_INTERFACE, "\x41", __TIMEOUT)
# Execute last command
check_status("erase", __DFU_STATE_DFU_DOWNLOAD_BUSY)
# Check command state
check_status("erase", __DFU_STATE_DFU_DOWNLOAD_IDLE)
def page_erase(addr):
"""Erases a single page."""
if __verbose:
print("Erasing page: 0x%x..." % (addr))
# Send DNLOAD with first byte=0x41 and page address
buf = struct.pack("<BI", 0x41, addr)
__dev.ctrl_transfer(0x21, __DFU_DNLOAD, 0, __DFU_INTERFACE, buf, __TIMEOUT)
# Execute last command
check_status("erase", __DFU_STATE_DFU_DOWNLOAD_BUSY)
# Check command state
check_status("erase", __DFU_STATE_DFU_DOWNLOAD_IDLE)
def set_address(addr):
"""Sets the address for the next operation."""
# Send DNLOAD with first byte=0x21 and page address
buf = struct.pack("<BI", 0x21, addr)
__dev.ctrl_transfer(0x21, __DFU_DNLOAD, 0, __DFU_INTERFACE, buf, __TIMEOUT)
# Execute last command
check_status("set address", __DFU_STATE_DFU_DOWNLOAD_BUSY)
# Check command state
check_status("set address", __DFU_STATE_DFU_DOWNLOAD_IDLE)
def write_memory(addr, buf, progress=None, progress_addr=0, progress_size=0):
"""Writes a buffer into memory. This routine assumes that memory has
already been erased.
"""
xfer_count = 0
xfer_bytes = 0
xfer_total = len(buf)
xfer_base = addr
while xfer_bytes < xfer_total:
if __verbose and xfer_count % 512 == 0:
print(
"Addr 0x%x %dKBs/%dKBs..."
% (xfer_base + xfer_bytes, xfer_bytes // 1024, xfer_total // 1024)
)
if progress and xfer_count % 2 == 0:
progress(progress_addr, xfer_base + xfer_bytes - progress_addr, progress_size)
# Set mem write address
set_address(xfer_base + xfer_bytes)
# Send DNLOAD with fw data
chunk = min(__cfg_descr.wTransferSize, xfer_total - xfer_bytes)
__dev.ctrl_transfer(
0x21, __DFU_DNLOAD, 2, __DFU_INTERFACE, buf[xfer_bytes: xfer_bytes + chunk], __TIMEOUT
)
# Execute last command
check_status("write memory", __DFU_STATE_DFU_DOWNLOAD_BUSY)
# Check command state
check_status("write memory", __DFU_STATE_DFU_DOWNLOAD_IDLE)
xfer_count += 1
xfer_bytes += chunk
def write_page(buf, xfer_offset):
"""Writes a single page. This routine assumes that memory has already
been erased.
"""
xfer_base = 0x08000000
# Set mem write address
set_address(xfer_base + xfer_offset)
# Send DNLOAD with fw data
__dev.ctrl_transfer(0x21, __DFU_DNLOAD, 2, __DFU_INTERFACE, buf, __TIMEOUT)
# Execute last command
check_status("write memory", __DFU_STATE_DFU_DOWNLOAD_BUSY)
# Check command state
check_status("write memory", __DFU_STATE_DFU_DOWNLOAD_IDLE)
if __verbose:
print("Write: 0x%x " % (xfer_base + xfer_offset))
def exit_dfu():
"""Exit DFU mode, and start running the program."""
# Set jump address
set_address(0x08000000)
# Send DNLOAD with 0 length to exit DFU
__dev.ctrl_transfer(0x21, __DFU_DNLOAD, 0, __DFU_INTERFACE, None, __TIMEOUT)
try:
# Execute last command
if get_status() != __DFU_STATE_DFU_MANIFEST:
print("Failed to reset device")
# Release device
usb.util.dispose_resources(__dev)
except:
pass
def named(values, names):
"""Creates a dict with `names` as fields, and `values` as values."""
return dict(zip(names.split(), values))
def consume(fmt, data, names):
"""Parses the struct defined by `fmt` from `data`, stores the parsed fields
into a named tuple using `names`. Returns the named tuple, and the data
with the struct stripped off."""
size = struct.calcsize(fmt)
return named(struct.unpack(fmt, data[:size]), names), data[size:]
def cstring(string):
"""Extracts a null-terminated string from a byte array."""
return string.decode("utf-8", "ignore").split("\0", 1)[0]
def compute_crc(data):
"""Computes the CRC32 value for the data passed in."""
return 0xFFFFFFFF & -zlib.crc32(data) - 1
def read_dfu_file(filename):
"""Reads a DFU file, and parses the individual elements from the file.
Returns an array of elements. Each element is a dictionary with the
following keys:
num - The element index.
address - The address that the element data should be written to.
size - The size of the element data.
data - The element data.
If an error occurs while parsing the file, then None is returned.
"""
print("File: {}".format(filename))
with open(filename, "rb") as fin:
data = fin.read()
crc = compute_crc(data[:-4])
elements = []
# Decode the DFU Prefix
#
# <5sBIB
# < little endian Endianness
# 5s char[5] signature "DfuSe"
# B uint8_t version 1
# I uint32_t size Size of the DFU file (without suffix)
# B uint8_t targets Number of targets
dfu_prefix, data = consume("<5sBIB", data, "signature version size targets")
print(
" %(signature)s v%(version)d, image size: %(size)d, "
"targets: %(targets)d" % dfu_prefix
)
for target_idx in range(dfu_prefix["targets"]):
# Decode the Image Prefix
#
# <6sBI255s2I
# < little endian Endianness
# 6s char[6] signature "Target"
# B uint8_t altsetting
# I uint32_t named Bool indicating if a name was used
# 255s char[255] name Name of the target
# I uint32_t size Size of image (without prefix)
# I uint32_t elements Number of elements in the image
img_prefix, data = consume(
"<6sBI255s2I", data, "signature altsetting named name " "size elements"
)
img_prefix["num"] = target_idx
if img_prefix["named"]:
img_prefix["name"] = cstring(img_prefix["name"])
else:
img_prefix["name"] = ""
print(
" %(signature)s %(num)d, alt setting: %(altsetting)s, "
'name: "%(name)s", size: %(size)d, elements: %(elements)d' % img_prefix
)
target_size = img_prefix["size"]
target_data = data[:target_size]
data = data[target_size:]
for elem_idx in range(img_prefix["elements"]):
# Decode target prefix
#
# <2I
# < little endian Endianness
# I uint32_t element Address
# I uint32_t element Size
elem_prefix, target_data = consume("<2I", target_data, "addr size")
elem_prefix["num"] = elem_idx
print(" %(num)d, address: 0x%(addr)08x, size: %(size)d" % elem_prefix)
elem_size = elem_prefix["size"]
elem_data = target_data[:elem_size]
target_data = target_data[elem_size:]
elem_prefix["data"] = elem_data
elements.append(elem_prefix)
if len(target_data):
print("target %d PARSE ERROR" % target_idx)
# Decode DFU Suffix
#
# <4H3sBI
# < little endian Endianness
# H uint16_t device Firmware version
# H uint16_t product
# H uint16_t vendor
# H uint16_t dfu 0x11a (DFU file format version)
# 3s char[3] ufd "UFD"
# B uint8_t len 16
# I uint32_t crc32 Checksum
dfu_suffix = named(
struct.unpack("<4H3sBI", data[:16]), "device product vendor dfu ufd len crc"
)
print(
" usb: %(vendor)04x:%(product)04x, device: 0x%(device)04x, "
"dfu: 0x%(dfu)04x, %(ufd)s, %(len)d, 0x%(crc)08x" % dfu_suffix
)
if crc != dfu_suffix["crc"]:
print("CRC ERROR: computed crc32 is 0x%08x" % crc)
return
data = data[16:]
if data:
print("PARSE ERROR")
return
return elements
class FilterDFU(object):
"""Class for filtering USB devices to identify devices which are in DFU
mode.
"""
def __call__(self, device):
for cfg in device:
for intf in cfg:
return intf.bInterfaceClass == 0xFE and intf.bInterfaceSubClass == 1
def get_dfu_devices(*args, **kwargs):
"""Returns a list of USB devices which are currently in DFU mode.
Additional filters (like idProduct and idVendor) can be passed in
to refine the search.
"""
# Convert to list for compatibility with newer PyUSB
return list(usb.core.find(*args, find_all=True, custom_match=FilterDFU(), **kwargs))
def get_memory_layout(device):
"""Returns an array which identifies the memory layout. Each entry
of the array will contain a dictionary with the following keys:
addr - Address of this memory segment.
last_addr - Last address contained within the memory segment.
size - Size of the segment, in bytes.
num_pages - Number of pages in the segment.
page_size - Size of each page, in bytes.
"""
cfg = device[0]
intf = cfg[(0, 0)]
mem_layout_str = get_string(device, intf.iInterface)
mem_layout = mem_layout_str.split("/")
result = []
for mem_layout_index in range(1, len(mem_layout), 2):
addr = int(mem_layout[mem_layout_index], 0)
segments = mem_layout[mem_layout_index + 1].split(",")
seg_re = re.compile(r"(\d+)\*(\d+)(.)(.)")
for segment in segments:
seg_match = seg_re.match(segment)
num_pages = int(seg_match.groups()[0], 10)
page_size = int(seg_match.groups()[1], 10)
multiplier = seg_match.groups()[2]
if multiplier == "K":
page_size *= 1024
if multiplier == "M":
page_size *= 1024 * 1024
size = num_pages * page_size
last_addr = addr + size - 1
result.append(
named(
(addr, last_addr, size, num_pages, page_size),
"addr last_addr size num_pages page_size",
)
)
addr += size
return result
def list_dfu_devices(*args, **kwargs):
"""Prints a lits of devices detected in DFU mode."""
devices = get_dfu_devices(*args, **kwargs)
if not devices:
raise SystemExit("No DFU capable devices found")
for device in devices:
print(
"Bus {} Device {:03d}: ID {:04x}:{:04x}".format(
device.bus, device.address, device.idVendor, device.idProduct
)
)
layout = get_memory_layout(device)
print("Memory Layout")
for entry in layout:
print(
" 0x{:x} {:2d} pages of {:3d}K bytes".format(
entry["addr"], entry["num_pages"], entry["page_size"] // 1024
)
)
def write_elements(elements, mass_erase_used, progress=None):
"""Writes the indicated elements into the target memory,
erasing as needed.
"""
mem_layout = get_memory_layout(__dev)
for elem in elements:
addr = elem["addr"]
size = elem["size"] # 固件总长度
data = elem["data"]
elem_size = size
elem_addr = addr
if progress and elem_size:
progress(elem_addr, 0, elem_size)
while size > 0:
write_size = size
if not mass_erase_used:
for segment in mem_layout:
if addr >= segment["addr"] and addr <= segment["last_addr"]:
# We found the page containing the address we want to
# write, erase it
page_size = segment["page_size"]
page_addr = addr & ~(page_size - 1)
if addr + write_size > page_addr + page_size:
write_size = page_addr + page_size - addr
page_erase(page_addr)
break
print("addr===", addr, "\nwrite_size===", write_size, "\n", progress, "elem_addr===", elem_addr,
"elem_size===", elem_size)
write_memory(addr, data[:write_size], progress, elem_addr, elem_size)
data = data[write_size:]
addr += write_size
size -= write_size
if progress:
progress(elem_addr, addr - elem_addr, elem_size)
def cli_progress(addr, offset, size):
"""Prints a progress report suitable for use on the command line."""
width = 25
done = offset * width // size
print(
"\r0x{:08x} {:7d} [{}{}] {:3d}% ".format(
addr, size, "=" * done, " " * (width - done), offset * 100 // size
),
end="",
)
try:
sys.stdout.flush()
except OSError:
pass # Ignore Windows CLI "WinError 87" on Python 3.6
if offset == size:
print("")
def main():
"""Test program for verifying this files functionality."""
global __verbose
# Parse CMD args
parser = argparse.ArgumentParser(description="DFU Python Util")
parser.add_argument(
"-l", "--list", help="list available DFU devices", action="store_true", default=False
)
# parser.add_argument("--vid", help="USB Vendor ID", type=lambda x: int(x, 0), default=None)
# parser.add_argument("--pid", help="USB Product ID", type=lambda x: int(x, 0), default=None)
parser.add_argument(
"-m", "--mass-erase", help="mass erase device", action="store_true", default=False
)
# parser.add_argument(
# "-u", "--upload", help="read file from DFU device", dest="path", default=False
# )
parser.add_argument("--vid", help="USB Vendor ID", type=lambda x: int(x, 0), default=0x0483)
parser.add_argument("--pid", help="USB Product ID", type=lambda x: int(x, 0), default=0xdf11)
# parser.add_argument(
# "-u", "--upload", help="read file from DFU device", dest="path", default="C:\\Users\\User\\Documents\\stm32-test\\board-STM32F103-Mini\\usbdfu.bin"
# )
parser.add_argument(
"-u", "--upload", help="read file from DFU device", dest="path",
default="C:\\Users\\User\\Downloads\\stm32loader-master\\max.dfu"
)
parser.add_argument("-x", "--exit", help="Exit DFU", action="store_true", default=False)
parser.add_argument(
"-v", "--verbose", help="increase output verbosity", action="store_true", default=False
)
args = parser.parse_args()
__verbose = args.verbose
kwargs = {}
if args.vid:
kwargs["idVendor"] = args.vid
if args.pid:
kwargs["idProduct"] = args.pid
if args.list:
list_dfu_devices(**kwargs)
return
init(**kwargs)
# 测试写入其他类型文件
# file = "C:\\Users\\User\\Downloads\\stm32loader-master\\pro.bin"
# data = open(file, 'rb').read()
#
# write_page(data, 0)
# return
command_run = False
if args.mass_erase:
print("Mass erase...")
mass_erase()
command_run = True
if args.path:
elements = read_dfu_file(args.path)
if not elements:
print("No data in dfu file")
return
print("Writing memory...")
write_elements(elements, args.mass_erase, progress=cli_progress)
print("Exiting DFU...")
exit_dfu()
command_run = True
if args.exit:
print("Exiting DFU...")
exit_dfu()
command_run = True
if command_run:
print("Finished")
else:
print("No command specified")
# if __name__ == "__main__":
# main()
from avr_isp.errorBase import portError
from PyQt5.QtCore import QIODevice, QThread, pyqtSignal
from PyQt5.QtSerialPort import QSerialPort, QSerialPortInfo
from PyQt5.QtWidgets import QApplication
class DFUTool(QThread):
print("DFUTool(QThread)")
stateCallback = pyqtSignal([str], [Exception])
progressCallback = pyqtSignal(int, int)
def __init__(self, parent, filename, callback=None):
super(DFUTool, self).__init__()
print("----------------------")
self.parent = parent
self.filename = filename
self.progress = callback
self.isWork = False
self.finished.connect(self.done)
global __verbose
# Parse CMD args
parser = argparse.ArgumentParser(description="DFU Python Util")
parser.add_argument(
"-l", "--list", help="list available DFU devices", action="store_true", default=False
)
# parser.add_argument("--vid", help="USB Vendor ID", type=lambda x: int(x, 0), default=None)
# parser.add_argument("--pid", help="USB Product ID", type=lambda x: int(x, 0), default=None)
parser.add_argument(
"-m", "--mass-erase", help="mass erase device", action="store_true", default=False
)
# parser.add_argument(
# "-u", "--upload", help="read file from DFU device", dest="path", default=False
# )
parser.add_argument("--vid", help="USB Vendor ID", type=lambda x: int(x, 0), default=0x0483)
parser.add_argument("--pid", help="USB Product ID", type=lambda x: int(x, 0), default=0xdf11)
# parser.add_argument(
# "-u", "--upload", help="read file from DFU device", dest="path", default="C:\\Users\\User\\Documents\\stm32-test\\board-STM32F103-Mini\\usbdfu.bin"
# )
parser.add_argument(
"-u", "--upload", help="read file from DFU device", dest="path",
default=self.filename
)
parser.add_argument("-x", "--exit", help="Exit DFU", action="store_true", default=False)
parser.add_argument(
"-v", "--verbose", help="increase output verbosity", action="store_true", default=False
)
self.args = parser.parse_args()
__verbose = self.args.verbose
kwargs = {}
if self.args.vid:
kwargs["idVendor"] = self.args.vid
if self.args.pid:
kwargs["idProduct"] = self.args.pid
def w(state):
if state is False:
self.stateCallback[str].emit("Done!")
self.quit()
return
try:
self.thread.exit()
init(**kwargs)
self.start()
except Exception as err:
print("----------------------------======================")
print(err)
print(1)
self.thread = DFUSearch(self, kwargs=kwargs)
self.thread.searchResults.connect(w) # 异步完成后执行函数w
self.thread.start()
def cl_progress(self, addr, offset, size):
"""Prints a progress report suitable for use on the command line."""
self.progressCallback.emit(offset, size)
def run(self):
if self.progress is not None:
self.progressCallback.connect(self.progress)
print(self.args)
self.stateCallback[str].emit("read fils")
with open(self.args.path, "rb") as fin:
dfu_file = fin.read()
if dfu_file is None:
print("file is None")
return
self.isWork = True
elem = {"addr": 134217728, "size": len(dfu_file), "data": dfu_file}
try:
self.stateCallback[str].emit("Write file...")
write_elements([elem], self.args.mass_erase, progress=self.cl_progress)
except Exception as err:
if self.isInterruptionRequested():
print("int")
else:
if self.parent is not None:
self.stateCallback[Exception].emit(err)
while self.isWork:
pass # 等待父进程处理异常
else:
raise err
self.isWork = False
finally:
self.isWork = False
self.stateCallback[str].emit("Exiting DFU...")
print("Exiting DFU...")
exit_dfu()
# 退出DFU模式
print("Done...")
self.ude = None
def isReady(self):
return True
try:
status = get_status()
print(status)
if status[1] == 0x02:
return True
return False
except Exception as err:
print("isReady", err)
return False
def done(self):
print("结束烧录程序")
if self.parent is not None:
if self.isWork:
self.isWork = False
self.stateCallback[str].emit(self.tr("Done!"))
else:
print("Success!")
else:
print("Failure!")
def terminate(self):
self.requestInterruption()
return super(DFUTool, self).terminate()
class DFUSearch(QThread):
searchResults = pyqtSignal(bool) # 信号
def __init__(self, parent=None, kwargs=None):
super(DFUSearch, self).__init__()
self.kwargs = kwargs
def __del__(self):
self.wait()
def run(self):
# 耗时内容
devices = get_dfu_devices(**self.kwargs)
# Waiting 2 seconds before trying again..."
attempts = 0
while not devices:
devices = get_dfu_devices(**self.kwargs)
attempts += 1
print("搜索DFU设备", attempts)
if attempts > 10:
self.searchResults.emit(False)
self.quit()
return
time.sleep(2)
self.searchResults.emit(True)
def terminate(self):
self.requestInterruption()
return super(DFUSearch, self).terminate()
if __name__ == '__main__':
# main()
app = QApplication(sys.argv)
task = DFUTool(None, None)
# task = stk500v2Thread(None, "COM4", 115200, "D:/OneDrive/Desktop/test.hex")
try:
task.start()
except Exception as e:
print(e)
sys.exit(app.exec())

View File

@ -1,2 +1,2 @@
call "%USERPROFILE%\.virtualenvs\FirmwareInstaller\Scripts\activate.bat"
rem call "%USERPROFILE%\.virtualenvs\FirmwareInstaller\Scripts\activate.bat"
@pyinstaller --workpath Package/build --distpath Package/dist -y Installer.spec