Fix various issues in cmds.py

This commit is contained in:
Florian Magin
2020-03-07 15:09:03 +01:00
parent d737068304
commit bc3d52f00e
3 changed files with 125 additions and 103 deletions
+120 -102
View File
@@ -25,6 +25,7 @@
from __future__ import print_function
import binascii
import re
from builtins import str
from builtins import hex
@@ -45,25 +46,23 @@ import json
from pwnlib.context import context
from pwnlib.asm import disasm, asm
from pwnlib.exception import PwnlibException
from pwnlib.log import Progress
from pwnlib.ui import yesno
from pwnlib.util.fiddling import isprint
from internalblue.utils.pwnlib_wrapper import log, flat, read, p8, p32, u32, p16
from internalblue.utils import bytes_to_hex
try:
from typing import List, Optional, Any, TYPE_CHECKING, Tuple, Type
if TYPE_CHECKING:
from internalblue.core import InternalBlue
from internalblue.hci import HCI
from internalblue import Record, BluetoothAddress, Address
except:
pass
from internalblue.hci import HCI_COMND
def getCmdList():
# type: () -> List[Type['Cmd']]
from typing import List, Optional, Any, TYPE_CHECKING, Type, cast
if TYPE_CHECKING:
from internalblue.core import InternalBlue
from internalblue import Record, BluetoothAddress, Address
def getCmdList() -> List[Type["Cmd"]]:
""" Returns a list of all commands which are defined in this cmds.py file.
This is done by searching for all subclasses of Cmd
"""
@@ -130,12 +129,14 @@ class Cmd(object):
command should be located in the work() method.
"""
keywords = [] # type: List[str]
description: str
parser: argparse.ArgumentParser
keywords: List[str] = []
aborted: bool
progress_log: Optional[Progress]
memory_image: Optional[bytes] = None
memory_image = None # type: Optional[bytes]
def __init__(self, cmdline, internalblue):
# type: (str, InternalBlue) -> None
def __init__(self, cmdline: str, internalblue: InternalBlue) -> None:
self.cmdline = cmdline
self.internalblue = internalblue
self.memory_image_template_filename = (
@@ -150,21 +151,17 @@ class Cmd(object):
)
def __str__(self):
# type: () -> str
return self.cmdline
def work(self):
# type: () -> bool
def work(self) -> bool:
return True
def abort_cmd(self):
# type: () -> None
def abort_cmd(self) -> None:
self.aborted = True
if hasattr(self, "progress_log"):
self.progress_log.failure("Command aborted")
def getArgs(self):
# type: () -> Any
def getArgs(self) -> Optional[argparse.Namespace]:
try:
return self.parser.parse_args(self.cmdline.split(" ")[1:])
except SystemExit:
@@ -181,7 +178,7 @@ class Cmd(object):
):
continue
if address >= section.start_addr and address <= section.end_addr:
if section.start_addr <= address <= section.end_addr:
if address + length <= section.end_addr:
return True
else:
@@ -296,7 +293,7 @@ class CmdHelp(Cmd):
command_list = getCmdList()
if len(args) > 1:
cmd = findCmd(args[1])
if cmd == None:
if cmd is None:
log.info("No command with the name: " + args[1])
return True
if hasattr(cmd, "parser"):
@@ -358,7 +355,7 @@ class CmdLogLevel(Cmd):
def work(self):
args = self.getArgs()
if args == None:
if args is None:
return True
loglevel = args.level
if loglevel.upper() in self.log_levels:
@@ -386,13 +383,14 @@ class CmdMonitor(Cmd):
@staticmethod
def getMonitorController(internalblue):
if CmdMonitor.MonitorController.instance == None:
if CmdMonitor.MonitorController.instance is None:
# Encapsulation type: Bluetooth H4 with linux header (99) None:
CmdMonitor.MonitorController.instance = CmdMonitor.MonitorController.__MonitorController(
internalblue, 0xC9
)
return CmdMonitor.MonitorController.instance
# noinspection PyPep8Naming,SpellCheckingInspection
class __MonitorController(object):
def __init__(self, internalblue, pcap_data_link_type):
self.internalblue = internalblue
@@ -457,7 +455,7 @@ class CmdMonitor(Cmd):
return True
def _pollTimer(self):
if self.running and self.wireshark_process != None:
if self.running and self.wireshark_process is not None:
if self.wireshark_process.poll() == 0:
# Process has ended
log.debug("_pollTimer: Wireshark has terminated")
@@ -473,7 +471,7 @@ class CmdMonitor(Cmd):
log.warn("HCI Monitor already running!")
return False
if self.wireshark_process == None:
if self.wireshark_process is None:
if not self._spawnWireshark():
log.info("Unable to start HCI Monitor.")
return False
@@ -501,10 +499,10 @@ class CmdMonitor(Cmd):
def killMonitor(self):
if self.running:
self.stopMonitor()
if self.poll_timer != None:
if self.poll_timer is not None:
self.poll_timer.cancel()
self.poll_timer = None
if self.wireshark_process != None:
if self.wireshark_process is not None:
log.info("Killing Wireshark process...")
try:
self.wireshark_process.terminate()
@@ -517,7 +515,7 @@ class CmdMonitor(Cmd):
# type: (Record) -> None
hcipkt, orig_len, inc_len, flags, drops, recvtime = record
dummy = "\x00\x00\x00" # TODO: Figure out purpose of these fields
dummy = b"\x00\x00\x00" # TODO: Figure out purpose of these fields
direction = p8(flags & 0x01)
packet = dummy + direction + hcipkt.getRaw()
length = len(packet)
@@ -533,7 +531,9 @@ class CmdMonitor(Cmd):
self.wireshark_process.stdin.flush()
log.debug("HciMonitorController._callback: done")
except IOError as e:
log.warn("HciMonitorController._callback: broken pipe. terminate.")
log.warn(
"HciMonitorController._callback: broken pipe. terminate." f"{e}"
)
self.killMonitor()
def work(self):
@@ -585,7 +585,7 @@ class CmdRepeat(Cmd):
repcmdline = " ".join(args[2:])
cmdclass = findCmd(args[2])
if cmdclass == None:
if cmdclass is None:
log.warn("Unknown command: " + args[2])
return False
@@ -712,7 +712,7 @@ class CmdSearchMem(Cmd):
def work(self):
args = self.getArgs()
if args == None:
if args is None:
return True
pattern = " ".join(args.pattern)
@@ -765,7 +765,7 @@ class CmdHexdump(Cmd):
def work(self):
args = self.getArgs()
if args == None:
if args is None:
return True
# if not self.isAddressInSections(args.address, args.length):
@@ -779,7 +779,7 @@ class CmdHexdump(Cmd):
else:
dump = self.readMem(args.address, args.length)
if dump == None:
if dump is None:
return False
log.hexdump(bytes(dump), begin=args.address)
@@ -832,7 +832,7 @@ class CmdTelescope(Cmd):
def work(self):
args = self.getArgs()
if args == None:
if args is None:
return True
if not self.isAddressInSections(args.address, args.length):
@@ -844,7 +844,7 @@ class CmdTelescope(Cmd):
return False
dump = self.readMem(args.address, args.length + 4)
if dump == None:
if dump is None:
return False
for index in range(0, len(dump) - 4, 4):
@@ -877,7 +877,7 @@ class CmdDisasm(Cmd):
def work(self):
args = self.getArgs()
if args == None:
if args is None:
return True
if not self.isAddressInSections(args.address, args.length):
@@ -890,11 +890,17 @@ class CmdDisasm(Cmd):
dump = self.readMem(args.address, args.length)
if dump == None:
if dump is None:
return False
print(disasm(dump, vma=args.address))
return True
else:
# PyCharm thinks disasm wants a str and not bytes
# so until pwnlibs gets type annotations we just trick the type checker to to prevent a false positive
if TYPE_CHECKING:
d = str(dump)
else:
d = dump
print(disasm(d, vma=args.address)) # type: ignore
return True
class CmdWriteMem(Cmd):
@@ -932,10 +938,10 @@ class CmdWriteMem(Cmd):
def work(self):
args = self.getArgs()
if args == None:
if args is None:
return True
if args.file != None:
if args.file is not None:
data = read(args.file)
elif len(args.data) > 0:
data = " ".join(args.data)
@@ -997,10 +1003,10 @@ class CmdWriteAsm(Cmd):
def work(self):
args = self.getArgs()
if args == None:
if args is None:
return True
if args.file != None:
if args.file is not None:
if not os.path.exists(args.file):
f = open(args.file, "w")
f.write("/* Write arm thumb code here.\n")
@@ -1091,7 +1097,7 @@ class CmdExec(Cmd):
def work(self):
args = self.getArgs()
if args == None:
if args is None:
return True
filename = self.internalblue.data_directory + "/exec_%s.s" % args.cmd
@@ -1178,7 +1184,7 @@ class CmdSendHciCmd(Cmd):
def work(self):
args = self.getArgs()
if args == None:
if args is None:
return True
if args.cmdcode > 0xFFFF:
@@ -1194,8 +1200,6 @@ class CmdSendHciCmd(Cmd):
return self.internalblue.sendHciCommand(args.cmdcode, data)
return True
class CmdPatch(Cmd):
keywords = ["patch"]
@@ -1233,10 +1237,10 @@ class CmdPatch(Cmd):
def work(self):
args = self.getArgs()
if args == None:
if args is None:
return True
if args.slot != None:
if args.slot is not None:
if (
args.slot < 0
or args.slot > self.internalblue.fw.PATCHRAM_NUMBER_OF_SLOTS
@@ -1249,31 +1253,34 @@ class CmdPatch(Cmd):
# Patch Deletion
if args.delete:
if args.slot != None:
if args.slot is not None:
log.info("Deleting patch in slot %d..." % args.slot)
elif args.address != None:
elif args.address is not None:
log.info("Deleting patch at address 0x%x..." % args.address)
else:
log.warn("Address or Slot number required!")
return False
return self.internalblue.disableRomPatch(args.address, args.slot)
if args.address == None:
if args.address is None:
log.warn("Address is required!")
return False
if len(args.data) > 0:
data = " ".join(args.data)
argument_data = " ".join(args.data)
if args.hex:
try:
data = data.decode("hex")
data = binascii.unhexlify(argument_data)
except TypeError as e:
log.warn("Data string cannot be converted to hexstring: " + str(e))
return False
elif args.int:
data = p32(auto_int(data))
data = p32(auto_int(argument_data))
elif args.asm:
data = asm(data, vma=args.address)
data = asm(argument_data, vma=args.address)
else:
log.warning("--hex, --int or --asm are required")
return
else:
self.parser.print_usage()
print("Data is required!")
@@ -1284,9 +1291,9 @@ class CmdPatch(Cmd):
data = data[0:4]
if len(data) < 4:
log.warn("Data size is %d bytes. 0-Padding to 4 byte!" % len(data))
data = data.ljust(4, "\x00")
data = data.ljust(4, b"\x00")
if args.address != None and not self.isAddressInSections(
if args.address is not None and not self.isAddressInSections(
args.address, len(data), sectiontype="ROM"
):
answer = yesno(
@@ -1332,18 +1339,20 @@ class CmdSendLmp(Cmd):
def work(self):
args = self.getArgs()
if args == None:
if args is None:
return True
# initially assume we are master
is_master = True
# automatically get the first valid connection handle if not set
if args.conn_handle == None:
if args.conn_handle is None:
if hasattr(self.internalblue.fw, "CONNECTION_MAX"):
for i in range(self.internalblue.fw.CONNECTION_MAX):
connection = self.internalblue.readConnectionInformation(i + 1)
if connection == None:
connection = self.internalblue.readConnectionInformation(
cast("ConnectionNumber", i + 1)
)
if connection is None:
continue
if (
connection.connection_handle != 0
@@ -1354,7 +1363,7 @@ class CmdSendLmp(Cmd):
break
# if still not set, typical connection handles seem to be 0x0b...0x0d
if args.conn_handle == None:
if args.conn_handle is None:
args.conn_handle = 0x0C
# arguments override auto detection
@@ -1363,10 +1372,8 @@ class CmdSendLmp(Cmd):
if args.master:
is_master = True
data = None
try:
data = args.data.decode("hex")
data = binascii.unhexlify(args.data)
except TypeError as e:
log.warn("Data string cannot be converted to hexstring: " + str(e))
return False
@@ -1428,7 +1435,9 @@ class CmdSendLcp(Cmd):
"Sending data=%s to connection index=0x%04x"
% (data.encode("hex"), args.conn_index)
)
return self.internalblue.sendLcpPacket(args.conn_index, data)
return self.internalblue.sendLcpPacket(
cast("ConnectionIndex", args.conn_index), data
)
class CmdInfo(Cmd):
@@ -1462,8 +1471,10 @@ class CmdInfo(Cmd):
return False
for i in range(self.internalblue.fw.CONNECTION_MAX):
connection = self.internalblue.readConnectionInformation(i + 1)
if connection == None:
connection = self.internalblue.readConnectionInformation(
cast("ConnectionNumber", i + 1)
)
if connection is None:
continue
log.info("### | Connection ---%02d--- ###" % i)
@@ -1580,9 +1591,9 @@ class CmdInfo(Cmd):
# TODO: waitlist
marker_str = "> "
if bloc_address != None and heappool["address"] == bloc_address:
if bloc_address is not None and heappool["address"] == bloc_address:
bloc_for_details = heappool
elif bloc_index != None and heappool["index"] == bloc_index:
elif bloc_index is not None and heappool["index"] == bloc_index:
bloc_for_details = heappool
else:
marker_str = " "
@@ -1622,7 +1633,7 @@ class CmdInfo(Cmd):
log.info("")
# Print Bloc Buffer Details
if bloc_for_details == None:
if bloc_for_details is None:
progress_log.success("done")
return True
@@ -1684,16 +1695,17 @@ class CmdInfo(Cmd):
def work(self):
args = self.getArgs()
if args == None:
if args is None:
return True
subcommands = {}
subcommands["connections"] = self.infoConnections
subcommands["device"] = self.infoDevice
subcommands["patchram"] = self.infoPatchram
subcommands["heap"] = self.infoHeap
subcommands["bloc"] = self.infoHeap
subcommands["queue"] = self.infoQueue
subcommands = {
"connections": self.infoConnections,
"device": self.infoDevice,
"patchram": self.infoPatchram,
"heap": self.infoHeap,
"bloc": self.infoHeap,
"queue": self.infoQueue,
}
if args.type in subcommands:
return subcommands[args.type](args.args)
@@ -1720,11 +1732,11 @@ class CmdTracepoint(Cmd):
def work(self):
args = self.getArgs()
if args == None:
if args is None:
return True
if args.command in ["add", "set"]:
if args.address == None:
if args.address is None:
log.warn("Missing address. Use tracepoint add <address>")
return False
log.info("Inserting tracepoint at 0x%x..." % args.address)
@@ -1734,7 +1746,7 @@ class CmdTracepoint(Cmd):
return False
elif args.command in ["remove", "delete", "del"]:
if args.address == None:
if args.address is None:
log.warn("Missing address. Use tracepoint del <address>")
return False
log.info("Deleting tracepoint at 0x%x..." % args.address)
@@ -1768,7 +1780,7 @@ class CmdBreakpoint(Cmd):
def work(self):
args = self.getArgs()
if args == None:
if args is None:
return True
log.info("Inserting breakpoint at 0x%x..." % args.address)
@@ -1823,7 +1835,7 @@ class CmdConnectLeCmd(Cmd):
def work(self):
args = self.getArgs()
if args == None:
if args is None:
return True
addr = parse_bt_addr(args.btaddr)
@@ -1862,8 +1874,10 @@ class CmdCustom(Cmd):
try:
with open(file, "r") as reader:
custom_commands = json.loads(reader.read())
except:
log.critical("Something went wrong with loading custom commands!")
except Exception as e:
log.critical(
"Encountered an error while trying to load custom commands!" f"{e}"
)
@staticmethod
def save(custom_commands):
@@ -1873,7 +1887,7 @@ class CmdCustom(Cmd):
def work(self):
args = self.getArgs()
if args == None:
if args is None:
return True
if args.do == "list":
@@ -1914,7 +1928,7 @@ class CmdCustom(Cmd):
matching_cmd = findCmd(cmd.split(" ")[0])
if matching_cmd == None:
if matching_cmd is None:
log.warn("Command unknown: " + cmd)
return False
@@ -1932,7 +1946,7 @@ class CmdCustom(Cmd):
return True
if args.do == "remove":
if not args.alias in CmdCustom.custom_commands:
if args.alias not in CmdCustom.custom_commands:
log.info("Custom command not found: " + args.alias)
return False
@@ -1963,12 +1977,14 @@ class CmdReadAfhChannelMap(Cmd):
def work(self):
args = self.getArgs()
if args == None or args.conn_handle == None:
if args is None or args.conn_handle is None:
# automatically get all connection handles if not set
if hasattr(self.internalblue.fw, "CONNECTION_MAX"):
for i in range(self.internalblue.fw.CONNECTION_MAX):
connection = self.internalblue.readConnectionInformation(i + 1)
if connection == None:
connection = self.internalblue.readConnectionInformation(
cast("ConnectionNumber", i + 1)
)
if connection is None:
continue
else:
self.readafh(connection.connection_handle)
@@ -1982,7 +1998,9 @@ class CmdReadAfhChannelMap(Cmd):
def readafh(self, handle):
""" This is a standard HCI command but might be useful when playing around with the physical layer.
"""
response = self.internalblue.sendHciCommand(0x1406, p16(handle))
response = self.internalblue.sendHciCommand(
HCI_COMND.Read_AFH_Channel_Map, p16(handle)
)
if len(response) < 17 or response[8:] == b"\x00" * 9:
log.info("Connection 0x%04x is not established." % handle)
@@ -2048,12 +2066,12 @@ class CmdSendDiagCmd(Cmd):
if not args or not args.data:
return True
data = ""
data = b""
for data_part in args.data:
if data_part[0:2] == "0x":
data += p32(auto_int(data_part))
else:
data += data_part.decode("hex")
data += binascii.unhexlify(data_part)
self.internalblue.sendH4(args.type, data)
+4
View File
@@ -50,6 +50,10 @@ class MemorySection(object):
class FirmwareDefinition:
DEVICE_NAME: Address
BD_ADDR: Address
SECTIONS: List[MemorySection]
TRACEPOINT_BODY_ASM_SNIPPET: str
TRACEPOINT_HOOKS_LOCATION: int
@@ -9,7 +9,7 @@ class ConnectionInformation(object):
master_of_connection = False
remote_name_address = 0
remote_address = None
id = None
id: bytearray
public_rand = None
extended_lmp_feat = None
link_key = None