Fix various issues in core.py

This commit is contained in:
Florian Magin
2020-03-07 15:23:43 +01:00
parent bc3d52f00e
commit ca070290c5
3 changed files with 45 additions and 43 deletions
+1 -1
View File
@@ -136,7 +136,7 @@ class Cmd(object):
progress_log: Optional[Progress]
memory_image: Optional[bytes] = None
def __init__(self, cmdline: str, internalblue: InternalBlue) -> None:
def __init__(self, cmdline: str, internalblue: 'InternalBlue') -> None:
self.cmdline = cmdline
self.internalblue = internalblue
self.memory_image_template_filename = (
+42 -41
View File
@@ -36,7 +36,7 @@ import pwnlib
from pwnlib.asm import asm
from pwnlib.exception import PwnlibException
from pwnlib.util.fiddling import bits, unbits
from .utils.pwnlib_wrapper import p16, p8, u32, u16, p32, u8, log, context, flat
from .utils.pwnlib_wrapper import p16, p8, u32, u16, p32, log, context, flat
from .fw import FirmwareDefinition
standard_library.install_aliases()
@@ -68,6 +68,7 @@ try:
Union,
NewType,
Callable,
cast,
)
from internalblue import (
Address,
@@ -97,14 +98,13 @@ except:
class InternalBlue(with_metaclass(ABCMeta, object)):
def __init__(
self,
queue_size=1000,
btsnooplog_filename="btsnoop.log",
log_level="info",
fix_binutils="True",
data_directory=".",
replay=False,
):
# type: (int, str, str, bool, str, bool) -> None
queue_size: int = 1000,
btsnooplog_filename: str = "btsnoop.log",
log_level: str = "info",
fix_binutils: bool = True,
data_directory: str = ".",
replay: bool = False,
) -> None:
context.log_level = log_level
context.log_file = data_directory + "/_internalblue.log"
context.arch = "thumb"
@@ -354,7 +354,7 @@ class InternalBlue(with_metaclass(ABCMeta, object)):
out = p8(h4type) + data
# if the caller expects a response: register a queue to receive the response
if queue != None and filter_function != None:
if queue is not None and filter_function is not None:
recvQueue = queue2k.Queue(1)
self.registerHciRecvQueue(recvQueue, filter_function)
@@ -363,6 +363,7 @@ class InternalBlue(with_metaclass(ABCMeta, object)):
log.debug("_sendThreadFunc: Send: " + bytes_to_hex(out))
self.s_inject.send(out)
except socket.error:
# TODO: For some reason this was required for proper save and replay, so this should be handled globally somehow. Or by implementing proper testing instead of the save/replay hack
pass
except socket.error as e:
log.warn(
@@ -376,7 +377,7 @@ class InternalBlue(with_metaclass(ABCMeta, object)):
# if the caller expects a response:
# Wait for the HCI event response by polling the recvQueue
if queue != None and filter_function != None:
if queue is not None and filter_function is not None:
try:
record = recvQueue.get(timeout=2)
hcipkt = record[0]
@@ -451,7 +452,7 @@ class InternalBlue(with_metaclass(ABCMeta, object)):
dump_address = u32(hcipkt.data[6:10])
data = hcipkt.data[10:]
if self.tracepoint_memdump_address == None:
if self.tracepoint_memdump_address is None:
self.tracepoint_memdump_address = dump_address
normalized_address = dump_address - self.tracepoint_memdump_address
self.tracepoint_memdump_parts[normalized_address] = data
@@ -743,11 +744,11 @@ class InternalBlue(with_metaclass(ABCMeta, object)):
self.exit_requested = True
# unregister stackDumpReceiver callback:
if self.stackDumpReceiver != None:
if self.stackDumpReceiver is not None:
self.stackDumpReceiver = None
# unregister stackDumpReceiver callback:
if self.stackDumpReceiver != None:
if self.stackDumpReceiver is not None:
self.unregisterHciCallback(self.stackDumpReceiver.recvPacket)
# Wait until both threads have actually finished
@@ -1039,7 +1040,7 @@ class InternalBlue(with_metaclass(ABCMeta, object)):
outbuffer += data
read_addr += len(data)
byte_counter += len(data)
if progress_log != None:
if progress_log is not None:
msg = "receiving data... %d / %d Bytes (%d%%)" % (
bytes_done + byte_counter,
bytes_total,
@@ -1192,20 +1193,20 @@ class InternalBlue(with_metaclass(ABCMeta, object)):
HCI_COMND.VSC_Write_RAM,
p32(write_addr) + data[byte_counter : byte_counter + blocksize],
)
if response == None:
if response is None:
log.warn(
"writeMem: Timeout while reading response, probably need to wait longer."
)
return False
elif response[3] != 0:
log.warn(
"writeMem: Got error code %s in command complete event."
% bytes_to_hex(response[3])
"writeMem: Got error code %d in command complete event."
% response[3]
)
return False
write_addr += blocksize
byte_counter += blocksize
if progress_log != None:
if progress_log is not None:
msg = "sending data... %d / %d Bytes" % (
bytes_done + byte_counter,
bytes_total,
@@ -1343,7 +1344,7 @@ class InternalBlue(with_metaclass(ABCMeta, object)):
alignment = address % 4
if alignment != 0:
log.debug("patchRom: Address 0x%x is not 4-byte aligned!" % address)
if slot != None:
if slot is not None:
log.warn(
"patchRom: Patch must be splitted into two slots, but fixed slot value was enforced. Do nothing!"
)
@@ -1375,14 +1376,14 @@ class InternalBlue(with_metaclass(ABCMeta, object)):
self.writeMem(self.fw.PATCHRAM_VALUE_TABLE_ADDRESS + slot * 4, patch)
return True
if slot == None:
if slot is None:
# Find free slot:
for i in range(self.fw.PATCHRAM_NUMBER_OF_SLOTS):
if table_addresses[i] == None:
if table_addresses[i] is None:
slot = i
log.info("patchRom: Choosing next free slot: %d" % slot)
break
if slot == None:
if slot is None:
log.warn("patchRom: All slots are in use!")
return False
else:
@@ -1432,8 +1433,8 @@ class InternalBlue(with_metaclass(ABCMeta, object)):
table_addresses, table_values, table_slots = self.getPatchramState()
if slot == None:
if address == None:
if slot is None:
if address is None:
log.warn("disableRomPatch: address is None.")
return False
for i in range(self.fw.PATCHRAM_NUMBER_OF_SLOTS):
@@ -1441,7 +1442,7 @@ class InternalBlue(with_metaclass(ABCMeta, object)):
slot = i
log.info("Slot for address 0x%x is: %d" % (address, slot))
break
if slot == None:
if slot is None:
log.warn("No slot contains address: 0x%x" % address)
return False
@@ -1541,7 +1542,7 @@ class InternalBlue(with_metaclass(ABCMeta, object)):
def sendLmpPacket(
self, opcode, payload="", is_master=True, conn_handle=0x0C, extended_op=False
):
# type: (Opcode, Any, bool, ConnectionNumber, bool) -> bool
# type: (Opcode, bytes, bool, ConnectionNumber, bool) -> bool
"""
Inject a LMP packet into a Bluetooth connection (i.e. send a LMP packet
to a remote device which is paired and connected with our local device).
@@ -1570,8 +1571,8 @@ class InternalBlue(with_metaclass(ABCMeta, object)):
return False
# must be string...
if payload == None:
payload = ""
if payload is None:
payload = b""
if ((not extended_op) and opcode > (0xFF >> 1)) or (
extended_op and opcode > 0xFF
@@ -1591,7 +1592,7 @@ class InternalBlue(with_metaclass(ABCMeta, object)):
# -> 2 bytes connection handle, 1 byte length, which means 17 bytes for opcode and payload remaining
# sendlmp --data 11223344556677889900112233445566 01 -> actually works
# always pad to 17 data bytes...
data = opcode_data + payload + "\x00" * (17 - len(opcode_data) - len(payload))
data = opcode_data + payload + b"\x00" * (17 - len(opcode_data) - len(payload))
if len(data) > 17:
log.warn(
@@ -1610,7 +1611,7 @@ class InternalBlue(with_metaclass(ABCMeta, object)):
)
return False
else:
error_status = u8(result[3])
error_status = result[3]
if error_status != 0:
log.warn("sendLmpPacket: got error status 0x%02x" % error_status)
@@ -1705,7 +1706,7 @@ class InternalBlue(with_metaclass(ABCMeta, object)):
# and appending the LMP packet data.
asm_code = self.fw.SENDLMP_ASM_CODE % (conn_nr) # type: str
asm_code_with_data = asm_code + "".join(
[".byte 0x%02x\n" % ord(x) for x in data.ljust(20, "\x00")]
[".byte 0x%02x\n" % x for x in data.ljust(20, b"\x00")]
)
# Assemble the snippet and write it to SENDLMP_CODE_BASE_ADDRESS
@@ -1747,7 +1748,7 @@ class InternalBlue(with_metaclass(ABCMeta, object)):
# and appending the LMP packet data.
asm_code = self.fw.SENDLCP_ASM_CODE % (conn_idx, len(payload))
asm_code_with_data = asm_code + "".join(
[".byte 0x%02x\n" % ord(x) for x in payload.ljust(20, "\x00")]
[".byte 0x%02x\n" % x for x in payload.ljust(20, b"\x00")]
)
# Assemble the snippet and write it to SENDLCP_CODE_BASE_ADDRESS
@@ -1790,7 +1791,7 @@ class InternalBlue(with_metaclass(ABCMeta, object)):
# TODO: expose more of the connection create parameters (instead of
# passing 0's.
self.sendHciCommand(
HCI_COMND.Create_Connection, bt_addr[::-1] + "\x00\x00\x00\x00\x00\x00\x01"
HCI_COMND.Create_Connection, bt_addr[::-1] + b"\x00\x00\x00\x00\x00\x00\x01"
)
def connectToRemoteLEDevice(self, bt_addr, addr_type=0x00):
@@ -1810,10 +1811,10 @@ class InternalBlue(with_metaclass(ABCMeta, object)):
# passing 0's.
self.sendHciCommand(
HCI_COMND.LE_Create_Connection,
"\x60\x00\x30\x00\x00"
b"\x60\x00\x30\x00\x00"
+ p8(addr_type)
+ bt_addr[::-1]
+ "\x01\x18\x00\x28\x00\x00\x00\xd0\x07\x00\x00\x00\x00",
+ b"\x01\x18\x00\x28\x00\x00\x00\xd0\x07\x00\x00\x00\x00",
)
def connectionStatusCallback(self, record):
@@ -1826,7 +1827,7 @@ class InternalBlue(with_metaclass(ABCMeta, object)):
_hcipkt = record[0]
if not issubclass(_hcipkt.__class__, hci.HCI_Event):
return
hcipkt: hci.HCI_Event = _hcipkt # get HCI Event packet
hcipkt: hci.HCI_Event = cast(hci.HCI_Event, _hcipkt) # get HCI Event packet
timestamp = record[5] # get timestamp
# Check if event is Connection Create Status Event
@@ -1837,7 +1838,7 @@ class InternalBlue(with_metaclass(ABCMeta, object)):
# Check if event is Connection Create Complete Event
if hcipkt.event_code == 0x03:
status = u8(hcipkt.data[0])
status = hcipkt.data[0]
status_str = (
hex(status)
if status not in hcipkt.HCI_COMMAND_ERROR_STR
@@ -1885,7 +1886,7 @@ class InternalBlue(with_metaclass(ABCMeta, object)):
return
def readHeapInformation(self):
# type: () -> Optional[Union[HeapInformation, bool]]
# type: () -> Optional[Union[List[HeapInformation], bool]]
"""
Traverses the double-linked list of BLOC structs and returns them as a
list of dictionaries. The dicts have the following fields:
@@ -1951,7 +1952,7 @@ class InternalBlue(with_metaclass(ABCMeta, object)):
# Old BLOC Struct
else:
bloc_fields = struct.unpack("I" * 12, bloc_struct)
if bloc_fields[0] != u32("COLB"):
if bloc_fields[0] != u32(b"COLB"):
log.warn(
"readHeapInformation: BLOC double-linked list contains non-BLOC element. abort."
)
@@ -2032,7 +2033,7 @@ class InternalBlue(with_metaclass(ABCMeta, object)):
): # Traverse at most 100 (don't loop forever if linked-list is corrupted)
queue_struct = self.readMem(current_queue_struct_address, 0x38)
queue_fields = struct.unpack("I" * 14, queue_struct)
if queue_fields[0] != u32("UEUQ"):
if queue_fields[0] != u32(b"UEUQ"):
log.warn(
"readQueueInformation: QUEUE double-linked list contains non-QUEU element. abort."
)
+2 -1
View File
@@ -1,6 +1,7 @@
# from pwnlib.util.packing import *
from typing import Union
def bytes_to_hex(bytes):
# type: (bytearray) -> str
# type: (Union[bytes, bytearray]) -> str
return "".join(format(x, "02x") for x in bytearray(bytes))