diff --git a/internalblue/cmds.py b/internalblue/cmds.py index 251d9c6..33d7158 100644 --- a/internalblue/cmds.py +++ b/internalblue/cmds.py @@ -25,6 +25,7 @@ from __future__ import print_function +import binascii import re from builtins import str from builtins import hex @@ -45,25 +46,23 @@ import json from pwnlib.context import context from pwnlib.asm import disasm, asm from pwnlib.exception import PwnlibException +from pwnlib.log import Progress from pwnlib.ui import yesno from pwnlib.util.fiddling import isprint from internalblue.utils.pwnlib_wrapper import log, flat, read, p8, p32, u32, p16 from internalblue.utils import bytes_to_hex - -try: - from typing import List, Optional, Any, TYPE_CHECKING, Tuple, Type - - if TYPE_CHECKING: - from internalblue.core import InternalBlue - from internalblue.hci import HCI - from internalblue import Record, BluetoothAddress, Address -except: - pass +from internalblue.hci import HCI_COMND -def getCmdList(): - # type: () -> List[Type['Cmd']] +from typing import List, Optional, Any, TYPE_CHECKING, Type, cast + +if TYPE_CHECKING: + from internalblue.core import InternalBlue + from internalblue import Record, BluetoothAddress, Address + + +def getCmdList() -> List[Type["Cmd"]]: """ Returns a list of all commands which are defined in this cmds.py file. This is done by searching for all subclasses of Cmd """ @@ -130,12 +129,14 @@ class Cmd(object): command should be located in the work() method. """ - keywords = [] # type: List[str] + description: str + parser: argparse.ArgumentParser + keywords: List[str] = [] + aborted: bool + progress_log: Optional[Progress] + memory_image: Optional[bytes] = None - memory_image = None # type: Optional[bytes] - - def __init__(self, cmdline, internalblue): - # type: (str, InternalBlue) -> None + def __init__(self, cmdline: str, internalblue: InternalBlue) -> None: self.cmdline = cmdline self.internalblue = internalblue self.memory_image_template_filename = ( @@ -150,21 +151,17 @@ class Cmd(object): ) def __str__(self): - # type: () -> str return self.cmdline - def work(self): - # type: () -> bool + def work(self) -> bool: return True - def abort_cmd(self): - # type: () -> None + def abort_cmd(self) -> None: self.aborted = True if hasattr(self, "progress_log"): self.progress_log.failure("Command aborted") - def getArgs(self): - # type: () -> Any + def getArgs(self) -> Optional[argparse.Namespace]: try: return self.parser.parse_args(self.cmdline.split(" ")[1:]) except SystemExit: @@ -181,7 +178,7 @@ class Cmd(object): ): continue - if address >= section.start_addr and address <= section.end_addr: + if section.start_addr <= address <= section.end_addr: if address + length <= section.end_addr: return True else: @@ -296,7 +293,7 @@ class CmdHelp(Cmd): command_list = getCmdList() if len(args) > 1: cmd = findCmd(args[1]) - if cmd == None: + if cmd is None: log.info("No command with the name: " + args[1]) return True if hasattr(cmd, "parser"): @@ -358,7 +355,7 @@ class CmdLogLevel(Cmd): def work(self): args = self.getArgs() - if args == None: + if args is None: return True loglevel = args.level if loglevel.upper() in self.log_levels: @@ -386,13 +383,14 @@ class CmdMonitor(Cmd): @staticmethod def getMonitorController(internalblue): - if CmdMonitor.MonitorController.instance == None: + if CmdMonitor.MonitorController.instance is None: # Encapsulation type: Bluetooth H4 with linux header (99) None: CmdMonitor.MonitorController.instance = CmdMonitor.MonitorController.__MonitorController( internalblue, 0xC9 ) return CmdMonitor.MonitorController.instance + # noinspection PyPep8Naming,SpellCheckingInspection class __MonitorController(object): def __init__(self, internalblue, pcap_data_link_type): self.internalblue = internalblue @@ -457,7 +455,7 @@ class CmdMonitor(Cmd): return True def _pollTimer(self): - if self.running and self.wireshark_process != None: + if self.running and self.wireshark_process is not None: if self.wireshark_process.poll() == 0: # Process has ended log.debug("_pollTimer: Wireshark has terminated") @@ -473,7 +471,7 @@ class CmdMonitor(Cmd): log.warn("HCI Monitor already running!") return False - if self.wireshark_process == None: + if self.wireshark_process is None: if not self._spawnWireshark(): log.info("Unable to start HCI Monitor.") return False @@ -501,10 +499,10 @@ class CmdMonitor(Cmd): def killMonitor(self): if self.running: self.stopMonitor() - if self.poll_timer != None: + if self.poll_timer is not None: self.poll_timer.cancel() self.poll_timer = None - if self.wireshark_process != None: + if self.wireshark_process is not None: log.info("Killing Wireshark process...") try: self.wireshark_process.terminate() @@ -517,7 +515,7 @@ class CmdMonitor(Cmd): # type: (Record) -> None hcipkt, orig_len, inc_len, flags, drops, recvtime = record - dummy = "\x00\x00\x00" # TODO: Figure out purpose of these fields + dummy = b"\x00\x00\x00" # TODO: Figure out purpose of these fields direction = p8(flags & 0x01) packet = dummy + direction + hcipkt.getRaw() length = len(packet) @@ -533,7 +531,9 @@ class CmdMonitor(Cmd): self.wireshark_process.stdin.flush() log.debug("HciMonitorController._callback: done") except IOError as e: - log.warn("HciMonitorController._callback: broken pipe. terminate.") + log.warn( + "HciMonitorController._callback: broken pipe. terminate." f"{e}" + ) self.killMonitor() def work(self): @@ -585,7 +585,7 @@ class CmdRepeat(Cmd): repcmdline = " ".join(args[2:]) cmdclass = findCmd(args[2]) - if cmdclass == None: + if cmdclass is None: log.warn("Unknown command: " + args[2]) return False @@ -712,7 +712,7 @@ class CmdSearchMem(Cmd): def work(self): args = self.getArgs() - if args == None: + if args is None: return True pattern = " ".join(args.pattern) @@ -765,7 +765,7 @@ class CmdHexdump(Cmd): def work(self): args = self.getArgs() - if args == None: + if args is None: return True # if not self.isAddressInSections(args.address, args.length): @@ -779,7 +779,7 @@ class CmdHexdump(Cmd): else: dump = self.readMem(args.address, args.length) - if dump == None: + if dump is None: return False log.hexdump(bytes(dump), begin=args.address) @@ -832,7 +832,7 @@ class CmdTelescope(Cmd): def work(self): args = self.getArgs() - if args == None: + if args is None: return True if not self.isAddressInSections(args.address, args.length): @@ -844,7 +844,7 @@ class CmdTelescope(Cmd): return False dump = self.readMem(args.address, args.length + 4) - if dump == None: + if dump is None: return False for index in range(0, len(dump) - 4, 4): @@ -877,7 +877,7 @@ class CmdDisasm(Cmd): def work(self): args = self.getArgs() - if args == None: + if args is None: return True if not self.isAddressInSections(args.address, args.length): @@ -890,11 +890,17 @@ class CmdDisasm(Cmd): dump = self.readMem(args.address, args.length) - if dump == None: + if dump is None: return False - - print(disasm(dump, vma=args.address)) - return True + else: + # PyCharm thinks disasm wants a str and not bytes + # so until pwnlibs gets type annotations we just trick the type checker to to prevent a false positive + if TYPE_CHECKING: + d = str(dump) + else: + d = dump + print(disasm(d, vma=args.address)) # type: ignore + return True class CmdWriteMem(Cmd): @@ -932,10 +938,10 @@ class CmdWriteMem(Cmd): def work(self): args = self.getArgs() - if args == None: + if args is None: return True - if args.file != None: + if args.file is not None: data = read(args.file) elif len(args.data) > 0: data = " ".join(args.data) @@ -997,10 +1003,10 @@ class CmdWriteAsm(Cmd): def work(self): args = self.getArgs() - if args == None: + if args is None: return True - if args.file != None: + if args.file is not None: if not os.path.exists(args.file): f = open(args.file, "w") f.write("/* Write arm thumb code here.\n") @@ -1091,7 +1097,7 @@ class CmdExec(Cmd): def work(self): args = self.getArgs() - if args == None: + if args is None: return True filename = self.internalblue.data_directory + "/exec_%s.s" % args.cmd @@ -1178,7 +1184,7 @@ class CmdSendHciCmd(Cmd): def work(self): args = self.getArgs() - if args == None: + if args is None: return True if args.cmdcode > 0xFFFF: @@ -1194,8 +1200,6 @@ class CmdSendHciCmd(Cmd): return self.internalblue.sendHciCommand(args.cmdcode, data) - return True - class CmdPatch(Cmd): keywords = ["patch"] @@ -1233,10 +1237,10 @@ class CmdPatch(Cmd): def work(self): args = self.getArgs() - if args == None: + if args is None: return True - if args.slot != None: + if args.slot is not None: if ( args.slot < 0 or args.slot > self.internalblue.fw.PATCHRAM_NUMBER_OF_SLOTS @@ -1249,31 +1253,34 @@ class CmdPatch(Cmd): # Patch Deletion if args.delete: - if args.slot != None: + if args.slot is not None: log.info("Deleting patch in slot %d..." % args.slot) - elif args.address != None: + elif args.address is not None: log.info("Deleting patch at address 0x%x..." % args.address) else: log.warn("Address or Slot number required!") return False return self.internalblue.disableRomPatch(args.address, args.slot) - if args.address == None: + if args.address is None: log.warn("Address is required!") return False if len(args.data) > 0: - data = " ".join(args.data) + argument_data = " ".join(args.data) if args.hex: try: - data = data.decode("hex") + data = binascii.unhexlify(argument_data) except TypeError as e: log.warn("Data string cannot be converted to hexstring: " + str(e)) return False elif args.int: - data = p32(auto_int(data)) + data = p32(auto_int(argument_data)) elif args.asm: - data = asm(data, vma=args.address) + data = asm(argument_data, vma=args.address) + else: + log.warning("--hex, --int or --asm are required") + return else: self.parser.print_usage() print("Data is required!") @@ -1284,9 +1291,9 @@ class CmdPatch(Cmd): data = data[0:4] if len(data) < 4: log.warn("Data size is %d bytes. 0-Padding to 4 byte!" % len(data)) - data = data.ljust(4, "\x00") + data = data.ljust(4, b"\x00") - if args.address != None and not self.isAddressInSections( + if args.address is not None and not self.isAddressInSections( args.address, len(data), sectiontype="ROM" ): answer = yesno( @@ -1332,18 +1339,20 @@ class CmdSendLmp(Cmd): def work(self): args = self.getArgs() - if args == None: + if args is None: return True # initially assume we are master is_master = True # automatically get the first valid connection handle if not set - if args.conn_handle == None: + if args.conn_handle is None: if hasattr(self.internalblue.fw, "CONNECTION_MAX"): for i in range(self.internalblue.fw.CONNECTION_MAX): - connection = self.internalblue.readConnectionInformation(i + 1) - if connection == None: + connection = self.internalblue.readConnectionInformation( + cast("ConnectionNumber", i + 1) + ) + if connection is None: continue if ( connection.connection_handle != 0 @@ -1354,7 +1363,7 @@ class CmdSendLmp(Cmd): break # if still not set, typical connection handles seem to be 0x0b...0x0d - if args.conn_handle == None: + if args.conn_handle is None: args.conn_handle = 0x0C # arguments override auto detection @@ -1363,10 +1372,8 @@ class CmdSendLmp(Cmd): if args.master: is_master = True - data = None - try: - data = args.data.decode("hex") + data = binascii.unhexlify(args.data) except TypeError as e: log.warn("Data string cannot be converted to hexstring: " + str(e)) return False @@ -1428,7 +1435,9 @@ class CmdSendLcp(Cmd): "Sending data=%s to connection index=0x%04x" % (data.encode("hex"), args.conn_index) ) - return self.internalblue.sendLcpPacket(args.conn_index, data) + return self.internalblue.sendLcpPacket( + cast("ConnectionIndex", args.conn_index), data + ) class CmdInfo(Cmd): @@ -1462,8 +1471,10 @@ class CmdInfo(Cmd): return False for i in range(self.internalblue.fw.CONNECTION_MAX): - connection = self.internalblue.readConnectionInformation(i + 1) - if connection == None: + connection = self.internalblue.readConnectionInformation( + cast("ConnectionNumber", i + 1) + ) + if connection is None: continue log.info("### | Connection ---%02d--- ###" % i) @@ -1580,9 +1591,9 @@ class CmdInfo(Cmd): # TODO: waitlist marker_str = "> " - if bloc_address != None and heappool["address"] == bloc_address: + if bloc_address is not None and heappool["address"] == bloc_address: bloc_for_details = heappool - elif bloc_index != None and heappool["index"] == bloc_index: + elif bloc_index is not None and heappool["index"] == bloc_index: bloc_for_details = heappool else: marker_str = " " @@ -1622,7 +1633,7 @@ class CmdInfo(Cmd): log.info("") # Print Bloc Buffer Details - if bloc_for_details == None: + if bloc_for_details is None: progress_log.success("done") return True @@ -1684,16 +1695,17 @@ class CmdInfo(Cmd): def work(self): args = self.getArgs() - if args == None: + if args is None: return True - subcommands = {} - subcommands["connections"] = self.infoConnections - subcommands["device"] = self.infoDevice - subcommands["patchram"] = self.infoPatchram - subcommands["heap"] = self.infoHeap - subcommands["bloc"] = self.infoHeap - subcommands["queue"] = self.infoQueue + subcommands = { + "connections": self.infoConnections, + "device": self.infoDevice, + "patchram": self.infoPatchram, + "heap": self.infoHeap, + "bloc": self.infoHeap, + "queue": self.infoQueue, + } if args.type in subcommands: return subcommands[args.type](args.args) @@ -1720,11 +1732,11 @@ class CmdTracepoint(Cmd): def work(self): args = self.getArgs() - if args == None: + if args is None: return True if args.command in ["add", "set"]: - if args.address == None: + if args.address is None: log.warn("Missing address. Use tracepoint add
") return False log.info("Inserting tracepoint at 0x%x..." % args.address) @@ -1734,7 +1746,7 @@ class CmdTracepoint(Cmd): return False elif args.command in ["remove", "delete", "del"]: - if args.address == None: + if args.address is None: log.warn("Missing address. Use tracepoint del ") return False log.info("Deleting tracepoint at 0x%x..." % args.address) @@ -1768,7 +1780,7 @@ class CmdBreakpoint(Cmd): def work(self): args = self.getArgs() - if args == None: + if args is None: return True log.info("Inserting breakpoint at 0x%x..." % args.address) @@ -1823,7 +1835,7 @@ class CmdConnectLeCmd(Cmd): def work(self): args = self.getArgs() - if args == None: + if args is None: return True addr = parse_bt_addr(args.btaddr) @@ -1862,8 +1874,10 @@ class CmdCustom(Cmd): try: with open(file, "r") as reader: custom_commands = json.loads(reader.read()) - except: - log.critical("Something went wrong with loading custom commands!") + except Exception as e: + log.critical( + "Encountered an error while trying to load custom commands!" f"{e}" + ) @staticmethod def save(custom_commands): @@ -1873,7 +1887,7 @@ class CmdCustom(Cmd): def work(self): args = self.getArgs() - if args == None: + if args is None: return True if args.do == "list": @@ -1914,7 +1928,7 @@ class CmdCustom(Cmd): matching_cmd = findCmd(cmd.split(" ")[0]) - if matching_cmd == None: + if matching_cmd is None: log.warn("Command unknown: " + cmd) return False @@ -1932,7 +1946,7 @@ class CmdCustom(Cmd): return True if args.do == "remove": - if not args.alias in CmdCustom.custom_commands: + if args.alias not in CmdCustom.custom_commands: log.info("Custom command not found: " + args.alias) return False @@ -1963,12 +1977,14 @@ class CmdReadAfhChannelMap(Cmd): def work(self): args = self.getArgs() - if args == None or args.conn_handle == None: + if args is None or args.conn_handle is None: # automatically get all connection handles if not set if hasattr(self.internalblue.fw, "CONNECTION_MAX"): for i in range(self.internalblue.fw.CONNECTION_MAX): - connection = self.internalblue.readConnectionInformation(i + 1) - if connection == None: + connection = self.internalblue.readConnectionInformation( + cast("ConnectionNumber", i + 1) + ) + if connection is None: continue else: self.readafh(connection.connection_handle) @@ -1982,7 +1998,9 @@ class CmdReadAfhChannelMap(Cmd): def readafh(self, handle): """ This is a standard HCI command but might be useful when playing around with the physical layer. """ - response = self.internalblue.sendHciCommand(0x1406, p16(handle)) + response = self.internalblue.sendHciCommand( + HCI_COMND.Read_AFH_Channel_Map, p16(handle) + ) if len(response) < 17 or response[8:] == b"\x00" * 9: log.info("Connection 0x%04x is not established." % handle) @@ -2048,12 +2066,12 @@ class CmdSendDiagCmd(Cmd): if not args or not args.data: return True - data = "" + data = b"" for data_part in args.data: if data_part[0:2] == "0x": data += p32(auto_int(data_part)) else: - data += data_part.decode("hex") + data += binascii.unhexlify(data_part) self.internalblue.sendH4(args.type, data) diff --git a/internalblue/fw/fw.py b/internalblue/fw/fw.py index d80f694..b88bab5 100644 --- a/internalblue/fw/fw.py +++ b/internalblue/fw/fw.py @@ -50,6 +50,10 @@ class MemorySection(object): class FirmwareDefinition: + DEVICE_NAME: Address + + BD_ADDR: Address + SECTIONS: List[MemorySection] TRACEPOINT_BODY_ASM_SNIPPET: str TRACEPOINT_HOOKS_LOCATION: int diff --git a/internalblue/objects/connection_information.py b/internalblue/objects/connection_information.py index 829a870..072ec12 100644 --- a/internalblue/objects/connection_information.py +++ b/internalblue/objects/connection_information.py @@ -9,7 +9,7 @@ class ConnectionInformation(object): master_of_connection = False remote_name_address = 0 remote_address = None - id = None + id: bytearray public_rand = None extended_lmp_feat = None link_key = None