unleashed-firmware/scripts/fwflash.py
Sergey Gavrilov 809418b9da
[FL-3563] StorageListRequest: size filter (#3018)
* Protobuf: size filter
* Update protobuf
* Scripts: types for fwflash.py
* RPC: handle fliter for StorageListRequest
* RPC: StorageListRequest tests for filtering
* Fix unit tests configuration
* Assets: sync protobuf with upstream

Co-authored-by: Aleksandr Kutuzov <alleteam@gmail.com>
2023-09-01 10:23:37 +09:00

499 lines
16 KiB
Python
Executable File

#!/usr/bin/env python3
import logging
import os
import re
import socket
import subprocess
import time
import typing
from abc import ABC, abstractmethod
from dataclasses import dataclass, field
from flipper.app import App
from serial.tools.list_ports_common import ListPortInfo
# When adding an interface, also add it to SWD_TRANSPORT in fbt/ufbt options
class Programmer(ABC):
root_logger = logging.getLogger("Programmer")
@abstractmethod
def flash(self, file_path: str, do_verify: bool) -> bool:
pass
@abstractmethod
def probe(self) -> bool:
pass
@abstractmethod
def get_name(self) -> str:
pass
@abstractmethod
def set_serial(self, serial: str):
pass
@classmethod
def _spawn_and_await(cls, process_params, show_progress: bool = False):
cls.root_logger.debug(f"Launching: {' '.join(process_params)}")
process = subprocess.Popen(
process_params,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
)
if show_progress:
while process.poll() is None:
time.sleep(0.25)
print(".", end="", flush=True)
print()
else:
process.wait()
return process
@dataclass
class OpenOCDInterface:
name: str
config_file: str
serial_cmd: str
additional_args: typing.Optional[list[str]] = field(default_factory=list)
class OpenOCDProgrammer(Programmer):
def __init__(self, interface: OpenOCDInterface):
self.interface = interface
self.logger = self.root_logger.getChild("OpenOCD")
self.serial: typing.Optional[str] = None
def _add_file(self, params: list[str], file: str):
params += ["-f", file]
def _add_command(self, params: list[str], command: str):
params += ["-c", command]
def _add_serial(self, params: list[str], serial: str):
self._add_command(params, f"{self.interface.serial_cmd} {serial}")
def set_serial(self, serial: str):
self.serial = serial
def flash(self, file_path: str, do_verify: bool) -> bool:
if os.altsep:
file_path = file_path.replace(os.sep, os.altsep)
openocd_launch_params = ["openocd"]
self._add_file(openocd_launch_params, self.interface.config_file)
if self.serial:
self._add_serial(openocd_launch_params, self.serial)
if self.interface.additional_args:
for additional_arg in self.interface.additional_args:
self._add_command(openocd_launch_params, additional_arg)
self._add_file(openocd_launch_params, "target/stm32wbx.cfg")
self._add_command(openocd_launch_params, "init")
program_params = [
"program",
f'"{file_path}"',
"verify" if do_verify else "",
"reset",
"exit",
"0x8000000" if file_path.endswith(".bin") else "",
]
self._add_command(openocd_launch_params, " ".join(program_params))
# join the list of parameters into a string, but add quote if there are spaces
openocd_launch_params_string = " ".join(
[f'"{p}"' if " " in p else p for p in openocd_launch_params]
)
self.logger.debug(f"Launching: {openocd_launch_params_string}")
process = self._spawn_and_await(openocd_launch_params, True)
success = process.returncode == 0
if not success:
self.logger.error("OpenOCD failed to flash")
if process.stdout:
self.logger.error(process.stdout.read().decode("utf-8").strip())
return success
def probe(self) -> bool:
openocd_launch_params = ["openocd"]
self._add_file(openocd_launch_params, self.interface.config_file)
if self.serial:
self._add_serial(openocd_launch_params, self.serial)
if self.interface.additional_args:
for additional_arg in self.interface.additional_args:
self._add_command(openocd_launch_params, additional_arg)
self._add_file(openocd_launch_params, "target/stm32wbx.cfg")
self._add_command(openocd_launch_params, "init")
self._add_command(openocd_launch_params, "exit")
process = self._spawn_and_await(openocd_launch_params)
success = process.returncode == 0
output = process.stdout.read().decode("utf-8").strip() if process.stdout else ""
self.logger.debug(output)
# Find target voltage using regex
if match := re.search(r"Target voltage: (\d+\.\d+)", output):
voltage = float(match.group(1))
if not success:
if voltage < 1:
self.logger.warning(
f"Found {self.get_name()}, but device is not connected"
)
else:
self.logger.warning(
f"Device is connected, but {self.get_name()} failed to attach. Is System>Debug enabled?"
)
if "cannot read IDR" in output:
self.logger.warning(
f"Found {self.get_name()}, but failed to attach. Is device connected and is System>Debug enabled?"
)
success = False
return success
def get_name(self) -> str:
return self.interface.name
def blackmagic_find_serial(serial: str):
import serial.tools.list_ports as list_ports
if serial and os.name == "nt":
if not serial.startswith("\\\\.\\"):
serial = f"\\\\.\\{serial}"
# idk why, but python thinks that list_ports.grep returns tuple[str, str, str]
ports: list[ListPortInfo] = list(list_ports.grep("blackmagic")) # type: ignore
if len(ports) == 0:
return None
elif len(ports) > 2:
if serial:
ports = list(
filter(
lambda p: p.serial_number == serial
or p.name == serial
or p.device == serial,
ports,
)
)
if len(ports) == 0:
return None
if len(ports) > 2:
raise Exception("More than one Blackmagic probe found")
# If you're getting any issues with auto lookup, uncomment this
# print("\n".join([f"{p.device} {vars(p)}" for p in ports]))
port = sorted(ports, key=lambda p: f"{p.location}_{p.name}")[0]
if serial:
if (
serial != port.serial_number
and serial != port.name
and serial != port.device
):
return None
if os.name == "nt":
port.device = f"\\\\.\\{port.device}"
return port.device
def _resolve_hostname(hostname):
try:
return socket.gethostbyname(hostname)
except socket.gaierror:
return None
def blackmagic_find_networked(serial: str):
if not serial or serial == "auto":
serial = "blackmagic.local"
# remove the tcp: prefix if it's there
if serial.startswith("tcp:"):
serial = serial[4:]
# remove the port if it's there
if ":" in serial:
serial = serial.split(":")[0]
if not (probe := _resolve_hostname(serial)):
return None
return f"tcp:{probe}:2345"
class BlackmagicProgrammer(Programmer):
def __init__(
self,
port_resolver, # typing.Callable[typing.Union[str, None], typing.Optional[str]]
name: str,
):
self.port_resolver = port_resolver
self.name = name
self.logger = self.root_logger.getChild(f"Blackmagic{name}")
self.port: typing.Optional[str] = None
def _add_command(self, params: list[str], command: str):
params.append("-ex")
params.append(command)
def _valid_ip(self, address):
try:
socket.inet_aton(address)
return True
except Exception:
return False
def set_serial(self, serial: str):
if self._valid_ip(serial):
self.port = f"{serial}:2345"
elif ip := _resolve_hostname(serial):
self.port = f"{ip}:2345"
else:
self.port = serial
def _get_gdb_core_params(self) -> list[str]:
gdb_launch_params = ["arm-none-eabi-gdb"]
self._add_command(gdb_launch_params, f"target extended-remote {self.port}")
self._add_command(gdb_launch_params, "set pagination off")
self._add_command(gdb_launch_params, "set confirm off")
self._add_command(gdb_launch_params, "monitor swdp_scan")
return gdb_launch_params
def flash(self, file_path: str, do_verify: bool) -> bool:
if not self.port:
if not self.probe():
return False
# We can convert .bin to .elf with objcopy:
# arm-none-eabi-objcopy -I binary -O elf32-littlearm --change-section-address=.data=0x8000000 -B arm -S app.bin app.elf
# But I choose to use the .elf file directly because we are flashing our own firmware and it always has an elf predecessor.
if file_path.endswith(".bin"):
file_path = file_path[:-4] + ".elf"
if not os.path.exists(file_path):
self.logger.error(
f"Sorry, but Blackmagic can't flash .bin file, and {file_path} doesn't exist"
)
return False
# arm-none-eabi-gdb build/f7-firmware-D/firmware.bin
# -ex 'set pagination off'
# -ex 'target extended-remote /dev/cu.usbmodem21201'
# -ex 'set confirm off'
# -ex 'monitor swdp_scan'
# -ex 'attach 1'
# -ex 'set mem inaccessible-by-default off'
# -ex 'load'
# -ex 'compare-sections'
# -ex 'quit'
gdb_launch_params = self._get_gdb_core_params()
self._add_command(gdb_launch_params, "attach 1")
self._add_command(gdb_launch_params, "set mem inaccessible-by-default off")
self._add_command(gdb_launch_params, "load")
if do_verify:
self._add_command(gdb_launch_params, "compare-sections")
self._add_command(gdb_launch_params, "quit")
gdb_launch_params.append(file_path)
process = self._spawn_and_await(gdb_launch_params, True)
if not process.stdout:
return False
output = process.stdout.read().decode("utf-8").strip()
flashed = (
"Loading section .text," in output
and "MIS-MATCHED!" not in output
and "target image does not match the loaded file" not in output
)
if not flashed:
self.logger.error("Blackmagic failed to flash")
self.logger.error(output)
return flashed
def probe(self) -> bool:
if not (port := self.port_resolver(self.port)):
return False
self.port = port
gdb_launch_params = self._get_gdb_core_params()
self._add_command(gdb_launch_params, "quit")
process = self._spawn_and_await(gdb_launch_params)
if not process.stdout or process.returncode != 0:
return False
output = process.stdout.read().decode("utf-8").strip()
if "SW-DP scan failed!" in output:
self.logger.warning(
f"Found {self.get_name()} at {self.port}, but failed to attach. Is device connected and is System>Debug enabled?"
)
return False
return True
def get_name(self) -> str:
return self.name
####################
local_flash_interfaces: list[Programmer] = [
OpenOCDProgrammer(
OpenOCDInterface(
"cmsis-dap",
"interface/cmsis-dap.cfg",
"cmsis_dap_serial",
["transport select swd"],
),
),
OpenOCDProgrammer(
OpenOCDInterface(
"stlink",
"interface/stlink.cfg",
"hla_serial",
["transport select hla_swd"],
),
),
BlackmagicProgrammer(blackmagic_find_serial, "blackmagic_usb"),
]
network_flash_interfaces: list[Programmer] = [
BlackmagicProgrammer(blackmagic_find_networked, "blackmagic_wifi")
]
all_flash_interfaces = [*local_flash_interfaces, *network_flash_interfaces]
####################
class Main(App):
AUTO_INTERFACE = "auto"
def init(self):
Programmer.root_logger = self.logger
self.parser.add_argument(
"filename",
type=str,
help="File to flash",
)
self.parser.add_argument(
"--verify",
"-v",
action="store_true",
help="Verify flash after programming",
default=False,
)
self.parser.add_argument(
"--interface",
choices=(
self.AUTO_INTERFACE,
*[i.get_name() for i in all_flash_interfaces],
),
type=str,
default=self.AUTO_INTERFACE,
help="Interface to use",
)
self.parser.add_argument(
"--serial",
type=str,
default=self.AUTO_INTERFACE,
help="Serial number or port of the programmer",
)
self.parser.set_defaults(func=self.flash)
def _search_interface(self, interface_list: list[Programmer]) -> list[Programmer]:
found_programmers = []
for p in interface_list:
name = p.get_name()
if (serial := self.args.serial) != self.AUTO_INTERFACE:
p.set_serial(serial)
self.logger.debug(f"Trying {name} with {serial}")
else:
self.logger.debug(f"Trying {name}")
if p.probe():
self.logger.debug(f"Found {name}")
found_programmers.append(p)
else:
self.logger.debug(f"Failed to probe {name}")
return found_programmers
def flash(self):
start_time = time.time()
file_path = os.path.abspath(self.args.filename)
if not os.path.exists(file_path):
self.logger.error(f"Binary file not found: {file_path}")
return 1
if self.args.interface != self.AUTO_INTERFACE:
available_interfaces = list(
filter(
lambda p: p.get_name() == self.args.interface,
all_flash_interfaces,
)
)
else:
self.logger.info("Probing for local interfaces...")
available_interfaces = self._search_interface(local_flash_interfaces)
if not available_interfaces:
# Probe network blackmagic
self.logger.info("Probing for network interfaces...")
available_interfaces = self._search_interface(network_flash_interfaces)
if not available_interfaces:
self.logger.error("No availiable interfaces")
return 1
elif len(available_interfaces) > 1:
self.logger.error("Multiple interfaces found:")
self.logger.error(
f"Please specify '--interface={[i.get_name() for i in available_interfaces]}'"
)
return 1
interface = available_interfaces.pop(0)
if self.args.serial != self.AUTO_INTERFACE:
interface.set_serial(self.args.serial)
self.logger.info(f"Using {interface.get_name()} with {self.args.serial}")
else:
self.logger.info(f"Using {interface.get_name()}")
self.logger.info(f"Flashing {file_path}")
if not interface.flash(file_path, self.args.verify):
self.logger.error(f"Failed to flash via {interface.get_name()}")
return 1
flash_time = time.time() - start_time
self.logger.info(f"Flashed successfully in {flash_time:.2f}s")
if file_path.endswith(".bin"):
bin_size = os.path.getsize(file_path)
self.logger.info(
f"Effective speed: {bin_size / flash_time / 1024:.2f} KiB/s"
)
return 0
if __name__ == "__main__":
Main()()