#!/usr/bin/env python3 import logging import os import re import socket import subprocess import time import typing from abc import ABC, abstractmethod from dataclasses import dataclass, field from flipper.app import App # When adding an interface, also add it to SWD_TRANSPORT in fbt/ufbt options class Programmer(ABC): root_logger = logging.getLogger("Programmer") @abstractmethod def flash(self, file_path: str, do_verify: bool) -> bool: pass @abstractmethod def probe(self) -> bool: pass @abstractmethod def get_name(self) -> str: pass @abstractmethod def set_serial(self, serial: str): pass @classmethod def _spawn_and_await(cls, process_params, show_progress: bool = False): cls.root_logger.debug(f"Launching: {' '.join(process_params)}") process = subprocess.Popen( process_params, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, ) if show_progress: while process.poll() is None: time.sleep(0.25) print(".", end="", flush=True) print() else: process.wait() return process @dataclass class OpenOCDInterface: name: str config_file: str serial_cmd: str additional_args: typing.Optional[list[str]] = field(default_factory=list) class OpenOCDProgrammer(Programmer): def __init__(self, interface: OpenOCDInterface): self.interface = interface self.logger = self.root_logger.getChild("OpenOCD") self.serial: typing.Optional[str] = None def _add_file(self, params: list[str], file: str): params += ["-f", file] def _add_command(self, params: list[str], command: str): params += ["-c", command] def _add_serial(self, params: list[str], serial: str): self._add_command(params, f"{self.interface.serial_cmd} {serial}") def set_serial(self, serial: str): self.serial = serial def flash(self, file_path: str, do_verify: bool) -> bool: if os.altsep: file_path = file_path.replace(os.sep, os.altsep) openocd_launch_params = ["openocd"] self._add_file(openocd_launch_params, self.interface.config_file) if self.serial: self._add_serial(openocd_launch_params, self.serial) for additional_arg in self.interface.additional_args: self._add_command(openocd_launch_params, additional_arg) self._add_file(openocd_launch_params, "target/stm32wbx.cfg") self._add_command(openocd_launch_params, "init") program_params = [ "program", f'"{file_path}"', "verify" if do_verify else "", "reset", "exit", "0x8000000" if file_path.endswith(".bin") else "", ] self._add_command(openocd_launch_params, " ".join(program_params)) # join the list of parameters into a string, but add quote if there are spaces openocd_launch_params_string = " ".join( [f'"{p}"' if " " in p else p for p in openocd_launch_params] ) self.logger.debug(f"Launching: {openocd_launch_params_string}") process = self._spawn_and_await(openocd_launch_params, True) success = process.returncode == 0 if not success: self.logger.error("OpenOCD failed to flash") if process.stdout: self.logger.error(process.stdout.read().decode("utf-8").strip()) return success def probe(self) -> bool: openocd_launch_params = ["openocd"] self._add_file(openocd_launch_params, self.interface.config_file) if self.serial: self._add_serial(openocd_launch_params, self.serial) for additional_arg in self.interface.additional_args: self._add_command(openocd_launch_params, additional_arg) self._add_file(openocd_launch_params, "target/stm32wbx.cfg") self._add_command(openocd_launch_params, "init") self._add_command(openocd_launch_params, "exit") process = self._spawn_and_await(openocd_launch_params) success = process.returncode == 0 output = process.stdout.read().decode("utf-8").strip() if process.stdout else "" self.logger.debug(output) # Find target voltage using regex if match := re.search(r"Target voltage: (\d+\.\d+)", output): voltage = float(match.group(1)) if not success: if voltage < 1: self.logger.warning( f"Found {self.get_name()}, but device is not connected" ) else: self.logger.warning( f"Device is connected, but {self.get_name()} failed to attach. Is System>Debug enabled?" ) if "cannot read IDR" in output: self.logger.warning( f"Found {self.get_name()}, but failed to attach. Is device connected and is System>Debug enabled?" ) success = False return success def get_name(self) -> str: return self.interface.name def blackmagic_find_serial(serial: str): import serial.tools.list_ports as list_ports if serial and os.name == "nt": if not serial.startswith("\\\\.\\"): serial = f"\\\\.\\{serial}" ports = list(list_ports.grep("blackmagic")) if len(ports) == 0: return None elif len(ports) > 2: if serial: ports = list( filter( lambda p: p.serial_number == serial or p.name == serial or p.device == serial, ports, ) ) if len(ports) == 0: return None if len(ports) > 2: raise Exception("More than one Blackmagic probe found") # If you're getting any issues with auto lookup, uncomment this # print("\n".join([f"{p.device} {vars(p)}" for p in ports])) port = sorted(ports, key=lambda p: f"{p.location}_{p.name}")[0] if serial: if ( serial != port.serial_number and serial != port.name and serial != port.device ): return None if os.name == "nt": port.device = f"\\\\.\\{port.device}" return port.device def _resolve_hostname(hostname): try: return socket.gethostbyname(hostname) except socket.gaierror: return None def blackmagic_find_networked(serial: str): if not serial or serial == "auto": serial = "blackmagic.local" # remove the tcp: prefix if it's there if serial.startswith("tcp:"): serial = serial[4:] # remove the port if it's there if ":" in serial: serial = serial.split(":")[0] if not (probe := _resolve_hostname(serial)): return None return f"tcp:{probe}:2345" class BlackmagicProgrammer(Programmer): def __init__( self, port_resolver, # typing.Callable[typing.Union[str, None], typing.Optional[str]] name: str, ): self.port_resolver = port_resolver self.name = name self.logger = self.root_logger.getChild(f"Blackmagic{name}") self.port: typing.Optional[str] = None def _add_command(self, params: list[str], command: str): params.append("-ex") params.append(command) def _valid_ip(self, address): try: socket.inet_aton(address) return True except Exception: return False def set_serial(self, serial: str): if self._valid_ip(serial): self.port = f"{serial}:2345" elif ip := _resolve_hostname(serial): self.port = f"{ip}:2345" else: self.port = serial def _get_gdb_core_params(self) -> list[str]: gdb_launch_params = ["arm-none-eabi-gdb"] self._add_command(gdb_launch_params, f"target extended-remote {self.port}") self._add_command(gdb_launch_params, "set pagination off") self._add_command(gdb_launch_params, "set confirm off") self._add_command(gdb_launch_params, "monitor swdp_scan") return gdb_launch_params def flash(self, file_path: str, do_verify: bool) -> bool: if not self.port: if not self.probe(): return False # We can convert .bin to .elf with objcopy: # arm-none-eabi-objcopy -I binary -O elf32-littlearm --change-section-address=.data=0x8000000 -B arm -S app.bin app.elf # But I choose to use the .elf file directly because we are flashing our own firmware and it always has an elf predecessor. if file_path.endswith(".bin"): file_path = file_path[:-4] + ".elf" if not os.path.exists(file_path): self.logger.error( f"Sorry, but Blackmagic can't flash .bin file, and {file_path} doesn't exist" ) return False # arm-none-eabi-gdb build/f7-firmware-D/firmware.bin # -ex 'set pagination off' # -ex 'target extended-remote /dev/cu.usbmodem21201' # -ex 'set confirm off' # -ex 'monitor swdp_scan' # -ex 'attach 1' # -ex 'set mem inaccessible-by-default off' # -ex 'load' # -ex 'compare-sections' # -ex 'quit' gdb_launch_params = self._get_gdb_core_params() self._add_command(gdb_launch_params, "attach 1") self._add_command(gdb_launch_params, "set mem inaccessible-by-default off") self._add_command(gdb_launch_params, "load") if do_verify: self._add_command(gdb_launch_params, "compare-sections") self._add_command(gdb_launch_params, "quit") gdb_launch_params.append(file_path) process = self._spawn_and_await(gdb_launch_params, True) if not process.stdout: return False output = process.stdout.read().decode("utf-8").strip() flashed = ( "Loading section .text," in output and "MIS-MATCHED!" not in output and "target image does not match the loaded file" not in output ) if not flashed: self.logger.error("Blackmagic failed to flash") self.logger.error(output) return flashed def probe(self) -> bool: if not (port := self.port_resolver(self.port)): return False self.port = port gdb_launch_params = self._get_gdb_core_params() self._add_command(gdb_launch_params, "quit") process = self._spawn_and_await(gdb_launch_params) if not process.stdout or process.returncode != 0: return False output = process.stdout.read().decode("utf-8").strip() if "SW-DP scan failed!" in output: self.logger.warning( f"Found {self.get_name()} at {self.port}, but failed to attach. Is device connected and is System>Debug enabled?" ) return False return True def get_name(self) -> str: return self.name #################### local_flash_interfaces: list[Programmer] = [ OpenOCDProgrammer( OpenOCDInterface( "cmsis-dap", "interface/cmsis-dap.cfg", "cmsis_dap_serial", ["transport select swd"], ), ), OpenOCDProgrammer( OpenOCDInterface( "stlink", "interface/stlink.cfg", "hla_serial", ["transport select hla_swd"], ), ), BlackmagicProgrammer(blackmagic_find_serial, "blackmagic_usb"), ] network_flash_interfaces: list[Programmer] = [ BlackmagicProgrammer(blackmagic_find_networked, "blackmagic_wifi") ] all_flash_interfaces = [*local_flash_interfaces, *network_flash_interfaces] #################### class Main(App): AUTO_INTERFACE = "auto" def init(self): Programmer.root_logger = self.logger self.parser.add_argument( "filename", type=str, help="File to flash", ) self.parser.add_argument( "--verify", "-v", action="store_true", help="Verify flash after programming", default=False, ) self.parser.add_argument( "--interface", choices=( self.AUTO_INTERFACE, *[i.get_name() for i in all_flash_interfaces], ), type=str, default=self.AUTO_INTERFACE, help="Interface to use", ) self.parser.add_argument( "--serial", type=str, default=self.AUTO_INTERFACE, help="Serial number or port of the programmer", ) self.parser.set_defaults(func=self.flash) def _search_interface(self, interface_list: list[Programmer]) -> list[Programmer]: found_programmers = [] for p in interface_list: name = p.get_name() if (serial := self.args.serial) != self.AUTO_INTERFACE: p.set_serial(serial) self.logger.debug(f"Trying {name} with {serial}") else: self.logger.debug(f"Trying {name}") if p.probe(): self.logger.debug(f"Found {name}") found_programmers.append(p) else: self.logger.debug(f"Failed to probe {name}") return found_programmers def flash(self): start_time = time.time() file_path = os.path.abspath(self.args.filename) if not os.path.exists(file_path): self.logger.error(f"Binary file not found: {file_path}") return 1 if self.args.interface != self.AUTO_INTERFACE: available_interfaces = list( filter( lambda p: p.get_name() == self.args.interface, all_flash_interfaces, ) ) else: self.logger.info("Probing for local interfaces...") available_interfaces = self._search_interface(local_flash_interfaces) if not available_interfaces: # Probe network blackmagic self.logger.info("Probing for network interfaces...") available_interfaces = self._search_interface(network_flash_interfaces) if not available_interfaces: self.logger.error("No availiable interfaces") return 1 elif len(available_interfaces) > 1: self.logger.error("Multiple interfaces found:") self.logger.error( f"Please specify '--interface={[i.get_name() for i in available_interfaces]}'" ) return 1 interface = available_interfaces.pop(0) if self.args.serial != self.AUTO_INTERFACE: interface.set_serial(self.args.serial) self.logger.info(f"Using {interface.get_name()} with {self.args.serial}") else: self.logger.info(f"Using {interface.get_name()}") self.logger.info(f"Flashing {file_path}") if not interface.flash(file_path, self.args.verify): self.logger.error(f"Failed to flash via {interface.get_name()}") return 1 flash_time = time.time() - start_time self.logger.info(f"Flashed successfully in {flash_time:.2f}s") if file_path.endswith(".bin"): bin_size = os.path.getsize(file_path) self.logger.info( f"Effective speed: {bin_size / flash_time / 1024:.2f} KiB/s" ) return 0 if __name__ == "__main__": Main()()