From 9df31a7b378c897f392465a488f501d5607b7dd5 Mon Sep 17 00:00:00 2001 From: bianruifeng <912736557@qq.com> Date: Tue, 31 May 2022 11:41:28 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BF=AE=E6=94=B9=E5=85=B3=E8=81=94=E6=96=87?= =?UTF-8?q?=E4=BB=B6=EF=BC=8C=E5=8F=8C=E5=87=BB=E6=89=93=E5=BC=80=E5=AE=89?= =?UTF-8?q?=E8=A3=85=E5=B4=A9=E6=BA=83=E3=80=82?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/pydfu.py | 197 +++++++++++++++++++++++++++++++++++---------------- 1 file changed, 136 insertions(+), 61 deletions(-) diff --git a/src/pydfu.py b/src/pydfu.py index 43542dc..1e4530e 100644 --- a/src/pydfu.py +++ b/src/pydfu.py @@ -16,6 +16,8 @@ from __future__ import print_function import argparse import collections import inspect +import json +import os import re import struct import sys @@ -24,6 +26,17 @@ import zlib import usb.core import usb.util +import logging + +import usb.backend.libusb1 as libusb1 +import usb.backend.libusb0 as libusb0 +import usb.backend.openusb as openusb + +from usb.libloader import * + +from PyQt5.QtWidgets import QMessageBox + +_LOGGER = logging.getLogger(__name__) # USB request __TIMEOUT __TIMEOUT = 4000 @@ -116,11 +129,23 @@ def init(**kwargs): devices = get_dfu_devices(**kwargs) if not devices: raise ValueError("No DFU device found") - if len(devices) > 1: - raise ValueError("Multiple DFU devices found") - __dev = devices[0] - __dev.set_configuration() + # if len(devices) > 1: + # raise ValueError("Multiple DFU devices found") + for dev in devices: + try: + __dev = dev + dev.set_configuration() + break + except Exception as err: + print(err) + continue + + # __dev = devices[1] + # print("__dev", __dev) + # __dev.set_configuration() + # __dev.get_active_configuration() + # print("__dev.set_configuration", __dev) # Claim DFU interface usb.util.claim_interface(__dev, __DFU_INTERFACE) @@ -437,14 +462,99 @@ class FilterDFU(object): return intf.bInterfaceClass == 0xFE and intf.bInterfaceSubClass == 1 +def _load_locate_library_ex(candidates, cygwin_lib, name, + win_cls=None, cygwin_cls=None, others_cls=None, + find_library=None): + """Locates and loads a library. + Returns: the loaded library + arguments: + * candidates -- candidates list for locate_library() + * cygwin_lib -- name of the cygwin library + * name -- lib identifier (for logging). Defaults to None. + * win_cls -- class that is used to instantiate the library on + win32 platforms. Defaults to None (-> ctypes.CDLL). + * cygwin_cls -- library class for cygwin platforms. + Defaults to None (-> ctypes.CDLL). + * others_cls -- library class for all other platforms. + Defaults to None (-> ctypes.CDLL). + * find_library -- see locate_library(). Defaults to None. + raises: + * NoLibraryCandidatesException + * LibraryNotFoundException + * LibraryNotLoadedException + * LibraryMissingSymbolsException + """ + if sys.platform == 'cygwin': + if cygwin_lib: + loaded_lib = load_library(cygwin_lib, name, cygwin_cls) + else: + raise NoLibraryCandidatesException(name) + elif candidates: + lib = locate_library(candidates, find_library) + if lib: + if sys.platform == 'win32': + loaded_lib = load_library(lib, name, win_cls) + else: + loaded_lib = load_library(lib, name, others_cls) + _LOGGER.info('{}'.format(loaded_lib)) + else: + _LOGGER.info('%r could not be found', (name or candidates)) + raise LibraryNotFoundException(name) + else: + raise NoLibraryCandidatesException(name) + + if loaded_lib is None: + raise LibraryNotLoadedException(name) + else: + return loaded_lib + + +def _load_library_ex(find_library=None): + return _load_locate_library_ex( + ('usb-0.1', 'usb', 'libusb0'), + 'cygusb0.dll', 'Libusb 0', + find_library=find_library + ) + +def _get_backend_ex(): + try: + if libusb0._lib is None: + _LOGGER.info('_get_backend_ex(): using backend "%s"' % libusb0.__name__) + libusb0._lib = _load_library_ex()#find_library + libusb0._setup_prototypes(libusb0._lib) + libusb0._lib.usb_init() + return libusb0._LibUSB() + except usb.libloader.LibraryException: + # exception already logged (if any) + _LOGGER.error('Error loading libusb 0.1 backend') + return None + except Exception: + _LOGGER.error('Error loading libusb 0.1 backend') + return None + + +def my_get_backend_ex(): + for m in (libusb1, openusb, libusb0): + backend = m.get_backend() + if backend is not None: + _LOGGER.info('my_get_backend_ex(): using backend "%s"' % m.__name__) + break + elif m == libusb0: + backend = _get_backend_ex() + if backend is not None: break + else: + raise ('No backend available') + + return backend + def get_dfu_devices(*args, **kwargs): """Returns a list of USB devices which are currently in DFU mode. Additional filters (like idProduct and idVendor) can be passed in to refine the search. """ - + backend = my_get_backend_ex() # Convert to list for compatibility with newer PyUSB - return list(usb.core.find(*args, find_all=True, custom_match=FilterDFU(), **kwargs)) + return list(usb.core.find(*args, find_all=True, backend=backend, custom_match=FilterDFU(), **kwargs)) def get_memory_layout(device): @@ -568,12 +678,12 @@ def main(): # Parse CMD args parser = argparse.ArgumentParser(description="DFU Python Util") parser.add_argument( - "-l", "--list", help="list available DFU devices", action="store_true", default=False + "-l", "--list", help="list available DFU devices", action="store", default=False ) # parser.add_argument("--vid", help="USB Vendor ID", type=lambda x: int(x, 0), default=None) # parser.add_argument("--pid", help="USB Product ID", type=lambda x: int(x, 0), default=None) parser.add_argument( - "-m", "--mass-erase", help="mass erase device", action="store_true", default=False + "-m", "--mass-erase", help="mass erase device", action="store", default=False ) # parser.add_argument( # "-u", "--upload", help="read file from DFU device", dest="path", default=False @@ -589,9 +699,9 @@ def main(): default="C:\\Users\\User\\Downloads\\stm32loader-master\\max.dfu" ) - parser.add_argument("-x", "--exit", help="Exit DFU", action="store_true", default=False) + parser.add_argument("-x", "--exit", help="Exit DFU", action="store", default=False) parser.add_argument( - "-v", "--verbose", help="increase output verbosity", action="store_true", default=False + "-v", "--verbose", help="increase output verbosity", action="store", default=False ) args = parser.parse_args() @@ -842,11 +952,17 @@ class STM32Dev(ispBase.IspBase, QSerialPort): class DFUTool(QThread): - print("DFUTool(QThread)") stateCallback = pyqtSignal([str], [Exception]) progressCallback = pyqtSignal(int, int) + def is_valid_file(self, parser, arg): + + if not os.path.exists(arg): + parser.error("The file %s does not exist!" % arg) + else: + return open(arg, 'r') # return an open file handle + def __init__(self, parent, port, speed, filename, callback=None): super(DFUTool, self).__init__() self.parent = parent @@ -856,65 +972,26 @@ class DFUTool(QThread): self.callback = callback self.isWork = False self.finished.connect(self.done) - print("__init__") global __verbose - # Parse CMD args - parser = argparse.ArgumentParser(description="DFU Python Util") - parser.add_argument( - "-l", "--list", help="list available DFU devices", action="store_true", default=False - ) - # parser.add_argument("--vid", help="USB Vendor ID", type=lambda x: int(x, 0), default=None) - # parser.add_argument("--pid", help="USB Product ID", type=lambda x: int(x, 0), default=None) - parser.add_argument( - "-m", "--mass-erase", help="mass erase device", action="store_true", default=False - ) - # parser.add_argument( - # "-u", "--upload", help="read file from DFU device", dest="path", default=False - # ) - parser.add_argument("--vid", help="USB Vendor ID", type=lambda x: int(x, 0), default=0x0483) - parser.add_argument("--pid", help="USB Product ID", type=lambda x: int(x, 0), default=0xdf11) - # parser.add_argument( - # "-u", "--upload", help="read file from DFU device", dest="path", default="C:\\Users\\User\\Documents\\stm32-test\\board-STM32F103-Mini\\usbdfu.bin" - # ) - parser.add_argument( - "-u", "--upload", help="read file from DFU device", dest="path", - default=self.filename - ) + __verbose = False - parser.add_argument("-x", "--exit", help="Exit DFU", action="store_true", default=False) - parser.add_argument( - "-v", "--verbose", help="increase output verbosity", action="store_true", default=False - ) - self.args = parser.parse_args() - - __verbose = self.args.verbose - - kwargs = {} - if self.args.vid: - kwargs["idVendor"] = self.args.vid - - if self.args.pid: - kwargs["idProduct"] = self.args.pid + kwargs = {"idVendor": 0x0483, "idProduct": 0xdf11} def w(state): if state is False: - print("未找到DFU") self.stateCallback[Exception].emit(portError(portError.errorOpen, port)) self.stateCallback[str].emit("Done!") self.quit() return try: - print("找到DFU") self.thread.exit() init(**kwargs) self.start() except Exception as err: - print("----------------------------======================") print(err) - print(1) self.thread = DFUSearch(self, kwargs=kwargs) self.thread.searchResults.connect(w) # 异步完成后执行函数w @@ -930,15 +1007,14 @@ class DFUTool(QThread): print("QMetaObject_Connection") def run(self): - print("run") + self.isWork = True try: - print("try") - with open(self.args.path, "rb") as fin: + + with open(self.filename, "rb") as fin: dfu_file = fin.read() if dfu_file is None: - print("file is None") return elem = {"addr": 134217728, "size": len(dfu_file), "data": dfu_file} @@ -950,10 +1026,11 @@ class DFUTool(QThread): else: self.stateCallback[str].emit(self.tr("Programming...")) - write_elements([elem], self.args.mass_erase, progress=self.cl_progress) + write_elements([elem], False, progress=self.cl_progress) exit_dfu() # 退出DFU模式 - print("exit_dfu") + except Exception as err: + print(err) if self.isInterruptionRequested(): print("int") else: @@ -971,7 +1048,7 @@ class DFUTool(QThread): return True def done(self): - print("结束烧录程序") + if self.parent is not None: if self.isWork: self.isWork = False @@ -1001,14 +1078,12 @@ class DFUSearch(QThread): def run(self): # 耗时内容 - print("self.kwargs", self.kwargs) devices = get_dfu_devices(**self.kwargs) # Waiting 2 seconds before trying again..." attempts = 0 while not devices: devices = get_dfu_devices(**self.kwargs) attempts += 1 - print("搜索DFU设备", attempts) if attempts > 20: self.searchResults.emit(False) self.quit()