26 Commits

Author SHA1 Message Date
Florian Magin 7b1577fe06 Refactor hci hooking 2020-01-10 13:16:44 +01:00
Florian Magin 0fd8545d93 Fix logging in core.py and print exceptions in _sendThreadFunc in the log 2020-01-10 13:14:46 +01:00
Florian Magin f8afcaad84 Partial refactoring of hooking 2020-01-10 10:38:53 +01:00
Florian Magin ef8ed5bd7e Add recvfrom in SocketRecvHook 2020-01-10 10:38:00 +01:00
Florian Magin c225be4f9b Add lowest level HCI tracing functionality 2020-01-07 11:42:31 +01:00
Florian Magin 90af9fbde3 Fix minor None check 2020-01-06 13:48:58 +01:00
Florian Magin 79fc673617 Allow specification of decive via CLI arg 2020-01-06 13:43:27 +01:00
Florian Magin c36b8fe93d Add type annotation to device_list 2020-01-06 13:42:47 +01:00
Florian Magin ef624644b3 Add required interface methods as NotImplemented 2020-01-06 13:40:37 +01:00
Florian Magin 9766a12ece Add hci command test 2020-01-06 12:09:59 +01:00
Florian Magin 01a7decfc9 Add first testcases 2020-01-06 12:09:59 +01:00
Florian Magin 8f136d1172 Make args to internalblue_cli explicit so it can be called with custom args from python 2020-01-06 12:09:59 +01:00
Florian Magin 21d3ff3b44 Annotate testcore device_list 2020-01-06 12:09:59 +01:00
Florian Magin 4dafd0bf8f Remove usage of logging wrapper for now 2020-01-06 12:09:59 +01:00
Florian Magin ac6f8ea630 Add --overwrite to memdump for testing 2020-01-06 12:09:59 +01:00
Florian Magin a1222c897a Add logging wrapper 2020-01-06 12:09:59 +01:00
Florian Magin ce463df61b Add init command argument for testing 2020-01-06 12:09:59 +01:00
Florian Magin 0c8bf457a3 Annotations and import fixes 2020-01-06 12:09:59 +01:00
Florian Magin 28e39d6c34 Minor fixes and annotations 2020-01-06 12:03:49 +01:00
Florian Magin 88bdc3e58e Add type annotations for cmds 2020-01-06 12:03:49 +01:00
Florian Magin d190a2537a Type annotations for core.py and general type aliases 2020-01-06 12:03:49 +01:00
Davide Toldo 9377c8d17e Add write memory functionality to testcore 2020-01-06 12:03:49 +01:00
Davide Toldo c6575d904d Fix bug 2020-01-06 12:03:49 +01:00
Davide Toldo 9a16c93191 Fix bug 2020-01-06 12:03:49 +01:00
Davide Toldo a8a9736bf1 Some cleanup 2020-01-06 12:03:49 +01:00
Davide Toldo c43c4bb580 Functional startup and memory reads 2020-01-06 12:03:49 +01:00
26 changed files with 682 additions and 49 deletions
BIN
View File
Binary file not shown.
+61
View File
@@ -1 +1,62 @@
try:
from Queue import Queue
from typing import List, Optional, Any, TYPE_CHECKING, Tuple, Union, NewType, Callable, Dict
if TYPE_CHECKING:
import datetime
from internalblue.hci import HCI
from internalblue.core import InternalBlue
Address = NewType("Address", int)
Record = Tuple[HCI, int, int, int, Any, datetime.datetime]
FilterFunction = Callable[[Record], bool]
ConnectionNumber = NewType("ConnectionNumber", int)
ConnectionIndex = NewType("ConnectionIndex", int)
BluetoothAddress = NewType("BluetoothAddress", bytes)
ConnectionDict = NewType("ConnectionDict", Dict[str,Any])
HeapInformation = NewType("HeapInformation", Dict[str, Any])
QueueInformation = NewType('QueueInformation', Dict[str, Any])
Opcode = NewType('Opcode', int)
HCI_CMD = NewType('HCI_CMD', int)
Task = Tuple[HCI_CMD, bytes, Queue.Queue, Callable[[Record], bool]]
Device = NewType("Device", Dict[str, Any])
"""{"dev_id": dev_id,
"dev_name": dev_name,
"dev_bdaddr": dev_bdaddr,
"dev_flags": dev_flags,
"dev_flags_str": dev_flags_str}"""
# InternalBlueCore, Device Name, SomeString
DeviceTuple = Tuple[InternalBlue, str, str]
except:
pass
import logging
class ProgressLog():
"""
Hack to get around the dependency to the pwnlib logger
This looses functionality, but at some point this can be replaced by something like progressbar2
"""
def __init__(self, init_msg,logger):
self.logger = logger
self.logger.warning(init_msg)
def failure(self, msg):
self.logger.warning(msg)
def status(self, msg):
self.logger.info(msg)
def success(self, msg):
self.logger.info(msg)
def getLogger(name):
logger = logging.getLogger(name)
logger.progress = lambda msg: ProgressLog(msg, logger)
return logger
+2 -2
View File
@@ -4,11 +4,11 @@ import datetime
import socket
import Queue
import random
import hci
from internalblue import hci
from pwn import *
from core import InternalBlue
from .core import InternalBlue
class ADBCore(InternalBlue):
+56 -13
View File
@@ -33,11 +33,20 @@ import os
import traceback
import argparse
from adbcore import ADBCore
from hcicore import HCICore
from .adbcore import ADBCore
from .hcicore import HCICore
from sys import platform
import cmds
from . import cmds
try:
import typing
from typing import List, Optional
from internalblue.core import InternalBlue
from . import DeviceTuple
except:
pass
HISTFILE = "_internalblue.hist"
@@ -53,11 +62,15 @@ type <help> for usage information!\n\n"""
for line in banner:
term.output(text.blue(line))
def commandLoop(internalblue):
def commandLoop(internalblue, init_commands=None):
cmdstack = init_commands.split(';')[::-1] if init_commands else None
while internalblue.running and not internalblue.exit_requested:
cmd_instance = None
try:
cmdline = term.readline.readline(prompt='> ').strip()
if cmdstack:
cmdline = cmdstack.pop().strip()
else:
cmdline = term.readline.readline(prompt='> ').strip()
cmdword = cmdline.split(' ')[0].split('=')[0]
if(cmdword == ''):
continue
@@ -89,7 +102,7 @@ def commandLoop(internalblue):
# Main Program Start
def internalblue_cli():
def internalblue_cli(argv):
print_banner()
parser = argparse.ArgumentParser()
@@ -97,7 +110,11 @@ def internalblue_cli():
parser.add_argument("--verbose", "-v", help="Set log level to DEBUG", action="store_true")
parser.add_argument("--ios-device", "-i", help="Tell internalblue to connect to a remote iPhone HCI socket. Specify socket IP address and port (i.e., 172.20.10.1:1234).")
parser.add_argument("--serialsu", "-s", help="On ADB, directly try su/serial/busybox scripting, if you do not have a special bluetooth.default.so file.", action="store_true")
args = parser.parse_args()
parser.add_argument("--testdevice", "-t", help="Use a dummy test device to execute testcases", action="store_true")
parser.add_argument("--trace", help="Trace hci connection")
parser.add_argument("--device", help="Specify device/core to be used")
parser.add_argument("--commands", "-c", help="CLI command to run before prompting, seperated by ';' (used for easier testing)")
args = parser.parse_args(argv)
if args.data_directory is not None:
data_directory = args.data_directory
@@ -119,13 +136,26 @@ def internalblue_cli():
readline_completer = term.completer.LongestPrefixCompleter(words=cmd_keywords)
term.readline.set_completer(readline_completer)
if args.trace:
from .socket_hooks import hook
from internalblue import socket_hooks
HookClass = getattr(socket_hooks, args.trace)
hook(HCICore, HookClass)
# Initalize cores and get devices
# As macOS has additional dependencies (objc), only import it here if needed
connection_methods = [] # type: List[InternalBlue]
if args.ios_device:
from ioscore import iOSCore
from .ioscore import iOSCore
connection_methods = [iOSCore(args.ios_device, log_level=log_level, data_directory=data_directory)]
elif args.testdevice:
from .testcore import testCore
connection_methods = [testCore(log_level=log_level, data_directory=data_directory)]
elif platform == "darwin":
from macoscore import macOSCore
from .macoscore import macOSCore
connection_methods = [
macOSCore(log_level=log_level, data_directory=data_directory),
ADBCore(log_level=log_level, data_directory=data_directory)]
@@ -134,12 +164,25 @@ def internalblue_cli():
ADBCore(log_level=log_level, data_directory=data_directory, serial=args.serialsu),
HCICore(log_level=log_level, data_directory=data_directory)]
devices = []
devices = [] # type: List[DeviceTuple]
for connection_method in connection_methods:
devices.extend(connection_method.device_list())
device = None # type: Optional[DeviceTuple]
if len(devices) > 0:
if len(devices) == 1:
if args.device:
matching_devices = [ dev for dev in devices if dev[1] == args.device]
if len(matching_devices) > 1:
log.critical("Found multiple matching devices")
exit(-1)
elif len(matching_devices) == 1:
log.info("Found device is: {}".format(matching_devices[0]))
device = matching_devices[0]
else:
log.critical("No matching devices found")
exit(-1)
elif len(devices) == 1:
device = devices[0]
else:
i = options('Please specify device:', [d[2] for d in devices], 0)
@@ -160,7 +203,7 @@ def internalblue_cli():
exit(-1)
# Enter command loop (runs until user quits)
commandLoop(reference)
commandLoop(reference, init_commands=args.commands)
# shutdown connection
reference.shutdown()
@@ -175,5 +218,5 @@ def internalblue_cli():
if __name__ == "__main__":
internalblue_cli()
internalblue_cli(sys.argv[1:])
+40 -6
View File
@@ -36,7 +36,25 @@ import time
import select
import json
from internalblue import getLogger
#log = getLogger(__name__)
try:
from typing import List, Optional, Any, TYPE_CHECKING, Tuple, Type
if TYPE_CHECKING:
from internalblue.core import InternalBlue
from internalblue.hci import HCI
from internalblue import Record, BluetoothAddress, Address
except:
pass
def getCmdList():
# type: () -> List[Type['Cmd']]
""" Returns a list of all commands which are defined in this cmds.py file.
This is done by searching for all subclasses of Cmd
"""
@@ -44,6 +62,7 @@ def getCmdList():
if inspect.isclass(obj) and issubclass(obj, Cmd)][1:]
def findCmd(keyword):
# type: (str) -> Optional[Type['Cmd']]
""" Find and return a Cmd subclass for a given keyword.
"""
command_list = getCmdList()
@@ -61,11 +80,13 @@ def auto_int(x):
return int(x, 0)
def bt_addr_to_str(bt_addr):
# type: (BluetoothAddress) -> str
""" Convert a Bluetooth address (6 bytes) into a human readable format.
"""
return ":".join([b.encode("hex") for b in bt_addr])
def parse_bt_addr(bt_addr):
# type: (Any) -> Optional[BluetoothAddress]
""" Convert Bluetooth address argument and check lengths.
"""
addr = bt_addr
@@ -92,11 +113,12 @@ class Cmd:
'keywords' list as member variable. The actual implementation of the
command should be located in the work() method.
"""
keywords = []
keywords = [] # type: List[str]
memory_image = None
memory_image = None # type: Optional[bytes]
def __init__(self, cmdline, internalblue):
# type: (str, InternalBlue) -> None
self.cmdline = cmdline
self.internalblue = internalblue
self.memory_image_template_filename = internalblue.data_directory + "/memdump__template.bin"
@@ -105,23 +127,28 @@ class Cmd:
self.internalblue.fw.__name__[6:12] + "_template.bin"
def __str__(self):
# type: () -> str
return self.cmdline
def work(self):
# type: () -> bool
return True
def abort_cmd(self):
# type: () -> None
self.aborted = True
if hasattr(self, 'progress_log'):
self.progress_log.failure("Command aborted")
def getArgs(self):
# type: () -> Any
try:
return self.parser.parse_args(self.cmdline.split(' ')[1:])
except SystemExit:
return None
def isAddressInSections(self, address, length=0, sectiontype=""):
# type: (int, int, str) -> bool
if not self.internalblue.fw:
return False
@@ -137,12 +164,15 @@ class Cmd:
return False
def readMem(self, address, length, progress_log=None, bytes_done=0, bytes_total=0):
# type: (Address, int, Optional[Any], int, int) -> Optional[bytes]
return self.internalblue.readMem(address, length, progress_log, bytes_done, bytes_total)
def writeMem(self, address, data, progress_log=None, bytes_done=0, bytes_total=0):
# type: (Address, bytes, Optional[Any], int, int) -> bool
return self.internalblue.writeMem(address, data, progress_log, bytes_done, bytes_total)
def initMemoryImage(self):
# type: () -> None
"""
Initially read out a chip's memory, all sections (RAM+ROM).
:return:
@@ -168,6 +198,7 @@ class Cmd:
self.refreshMemoryImage()
def refreshMemoryImage(self):
# type: () -> None
"""
Update an existing memory dump, only RAM sections.
:return:
@@ -178,12 +209,13 @@ class Cmd:
for section in self.internalblue.fw.SECTIONS:
if not section.is_rom:
sectiondump = self.readMem(section.start_addr, section.size(), self.progress_log, bytes_done, bytes_total)
if sectiondump:
if sectiondump and Cmd.memory_image:
Cmd.memory_image = Cmd.memory_image[0:section.start_addr] + sectiondump + Cmd.memory_image[section.end_addr:]
bytes_done += section.size()
self.progress_log.success("Received Data: complete")
def getMemoryImage(self, refresh=False):
# type: (bool) -> Any
if Cmd.memory_image is None:
self.initMemoryImage()
elif refresh:
@@ -411,6 +443,7 @@ class CmdMonitor(Cmd):
self.wireshark_process = None
def adbhciCallback(self, record):
# type: (Record) -> None
hcipkt, orig_len, inc_len, flags, drops, recvtime = record
dummy = "\x00\x00\x00" # TODO: Figure out purpose of these fields
@@ -503,6 +536,7 @@ class CmdDumpMem(Cmd):
help="Only dump the two RAM sections.")
parser.add_argument("--file", "-f", default="memdump.bin",
help="Filename of memory dump (default: %(default)s)")
parser.add_argument("--overwrite", action='store_true')
def work(self):
args = self.getArgs()
@@ -517,7 +551,7 @@ class CmdDumpMem(Cmd):
for section in filter(lambda s: s.is_ram, self.internalblue.fw.SECTIONS):
filename = args.file + "_" + hex(section.start_addr)
if os.path.exists(filename):
if not yesno("Update '%s'?" % filename):
if not (args.overwrite or yesno("Update '%s'?" % filename)):
log.info("Skipping section @%s" % hex(section.start_addr))
bytes_done += section.size()
continue
@@ -531,7 +565,7 @@ class CmdDumpMem(Cmd):
# Get complete memory image
if os.path.exists(args.file):
if not yesno("Update '%s'?" % os.path.abspath(args.file)):
if not (args.overwrite or yesno("Update '%s'?" % os.path.abspath(args.file))):
return False
dump = self.getMemoryImage(refresh=not args.norefresh)
@@ -926,7 +960,7 @@ class CmdSendHciCmd(Cmd):
else:
data += data_part.decode('hex')
self.internalblue.sendHciCommand(args.cmdcode, data)
return self.internalblue.sendHciCommand(args.cmdcode, data)
return True
+69 -11
View File
@@ -28,17 +28,31 @@
from abc import ABCMeta, abstractmethod
from pwn import *
from fw.fw import Firmware
from .fw.fw import Firmware
import datetime
import time
import Queue
import hci
from . import hci
try:
from typing import List, Optional, Any, TYPE_CHECKING, Tuple, Union, NewType, Callable
from internalblue import Address, Record, Task, HCI_CMD, FilterFunction, ConnectionNumber, ConnectionDict, \
ConnectionIndex, BluetoothAddress, HeapInformation, QueueInformation, Opcode
from internalblue.hci import HCI
from . import DeviceTuple
if TYPE_CHECKING:
pass
except:
pass
#import logging
#log = logging.getLogger(__name__)
class InternalBlue:
__metaclass__ = ABCMeta
def __init__(self, queue_size=1000, btsnooplog_filename='btsnoop.log', log_level='info', fix_binutils='True', data_directory="."):
# type: (int, str, str, bool, str) -> None
context.log_level = log_level
context.log_file = data_directory + '/_internalblue.log'
context.arch = "thumb"
@@ -48,11 +62,11 @@ class InternalBlue:
self.data_directory = data_directory
self.s_inject = None # This is the TCP socket to the HCI inject port
self.s_snoop = None # This is the TCP socket to the HCI snoop port
self.s_inject = None #type: socket.socket # This is the TCP socket to the HCI inject port
self.s_snoop = None #type: socket.socket # This is the TCP socket to the HCI snoop port
# If btsnooplog_filename is set, write all incomming HCI packets to a file (can be viewed in wireshark for debugging)
if btsnooplog_filename != None:
if btsnooplog_filename is not None:
self.write_btsnooplog = True
self.btsnooplog_file = open(self.data_directory + "/" + btsnooplog_filename, "wb")
else:
@@ -78,7 +92,7 @@ class InternalBlue:
# firmware (the response is recognized with the help of the filter function).
# Once the response arrived, it puts the response into the response_queue from
# the tuple. See sendH4() and sendHciCommand().
self.sendQueue = Queue.Queue(queue_size)
self.sendQueue = Queue.Queue(queue_size) # type: Queue.Queue[Task]
self.recvThread = None # The thread which is responsible for the HCI snoop socket
self.sendThread = None # The thread which is responsible for the HCI inject socket
@@ -103,7 +117,7 @@ class InternalBlue:
# filter_function will be called for each packet that is received and only if it returns
# True, the packet will be put into the queue. The filter_function can be None in order
# to put all packets into the queue.
self.registeredHciRecvQueues = []
self.registeredHciRecvQueues = [] # type: List[Tuple[Queue.Queue[Record], FilterFunction]]
self.exit_requested = False # Will be set to true when the framework wants to shut down (e.g. on error or user exit)
self.running = False # 'running' is True once the connection to the HCI sockets is established
@@ -162,6 +176,7 @@ class InternalBlue:
return False
def _parse_time(self, time):
# type: (Any) -> datetime.datetime
"""
Taken from: https://github.com/joekickass/python-btsnoop
@@ -178,9 +193,11 @@ class InternalBlue:
@abstractmethod
def _recvThreadFunc(self):
# type: () -> None
pass
def _sendThreadFunc(self):
# type: () -> None
"""
This is the run-function of the sendThread. It polls the sendQueue for new 'send tasks'
and executes them (sends H4 commands to the chip and returns the response).
@@ -262,8 +279,8 @@ class InternalBlue:
try:
log.debug("_sendThreadFunc: Send: " + str(out.encode('hex')))
self.s_inject.send(out)
except:
log.warn("_sendThreadFunc: Sending to socket failed, reestablishing connection.\nWith HCI sockets, some HCI commands require root!")
except Exception as e:
log.warn("_sendThreadFunc: Sending to socket failed with {}, reestablishing connection.\nWith HCI sockets, some HCI commands require root!".format(e))
# socket are terminated by hcicore..
self._teardownSockets()
self._setupSockets()
@@ -287,6 +304,7 @@ class InternalBlue:
log.debug("Send Thread terminated.")
def _tracepointHciCallbackFunction(self, record):
# type: (Record) -> None
hcipkt = record[0] # get HCI Event packet
timestamp = record[5] # get timestamp
@@ -348,6 +366,7 @@ class InternalBlue:
def addTracepoint(self, address):
# type: (Address) -> bool
# Check if constants are defined in fw.py
for const in ['TRACEPOINT_BODY_ASM_LOCATION', 'TRACEPOINT_BODY_ASM_SNIPPET',
'TRACEPOINT_HOOK_ASM', 'TRACEPOINT_HOOKS_LOCATION',
@@ -441,6 +460,7 @@ class InternalBlue:
return True
def deleteTracepoint(self, address):
# type: (Address) -> bool
if not self.check_running():
return False
@@ -460,6 +480,7 @@ class InternalBlue:
return True
def check_running(self):
# type: () -> bool
"""
Check if the framework is running (i.e. the sockets are connected,
the recv and send threads are running and exit_requested is not True)
@@ -475,10 +496,11 @@ class InternalBlue:
@abstractmethod
def device_list(self):
# type: () -> List[DeviceTuple]
pass
def connect(self):
# type: () -> bool
if self.exit_requested:
self.shutdown()
@@ -524,6 +546,7 @@ class InternalBlue:
return True
def initialize_fimware(self):
# type: () -> bool
"""
Checks if we are running on a Broadcom chip and loads available firmware information based
on LMP subversion.
@@ -561,6 +584,7 @@ class InternalBlue:
return True
def shutdown(self):
# type: () -> None
"""
Shutdown the framework by stopping the send and recv threads. Socket shutdown
also terminates port forwarding if adb is used.
@@ -593,6 +617,7 @@ class InternalBlue:
log.info("Shutdown complete.")
def registerHciCallback(self, callback):
# type: (Callable[[Record], None ]) -> None
"""
Add a new callback function to self.registeredHciCallbacks.
The function will be called every time the recvThread receives
@@ -612,6 +637,7 @@ class InternalBlue:
self.registeredHciCallbacks.append(callback)
def unregisterHciCallback(self, callback):
# type: (Callable[[Tuple[HCI, int, int, int, Any, datetime.datetime]], None ]) -> None
"""
Remove a callback function from self.registeredHciCallbacks.
"""
@@ -622,6 +648,7 @@ class InternalBlue:
log.warn("registerHciCallback: no such callback is registered!")
def registerHciRecvQueue(self, queue, filter_function=None):
# type: (Queue.Queue[Record], FilterFunction) -> None
"""
Add a new queue to self.registeredHciRecvQueues.
The queue will be filled by the recvThread every time the thread receives
@@ -644,6 +671,7 @@ class InternalBlue:
self.registeredHciRecvQueues.append((queue, filter_function))
def unregisterHciRecvQueue(self, queue):
# type: (Queue.Queue[Tuple[HCI, int, int, int, Any, datetime]]) -> None
"""
Remove a queue from self.registeredHciRecvQueues.
"""
@@ -655,6 +683,7 @@ class InternalBlue:
log.warn("registerHciRecvQueue: no such queue is registered!")
def sendHciCommand(self, opcode, data, timeout=3):
# type: (Opcode, bytes, int) -> Optional[Any]
"""
Send an arbitrary HCI command packet by pushing a send-task into the
sendQueue. This function blocks until the response is received
@@ -674,6 +703,7 @@ class InternalBlue:
# define a filter function which recognizes the response (command complete
# or command status event).
def recvFilterFunction(record):
# type: (Record) -> bool
hcipkt = record[0]
log.debug("sendHciCommand.recvFilterFunction: got response")
@@ -703,6 +733,7 @@ class InternalBlue:
return None
def sendH4(self, h4type, data, timeout=2):
# type: (HCI_CMD, bytes, int) -> bool
"""
Send an arbitrary H4 packet by pushing a send-task into the
sendQueue. This function does not wait for a response! If you
@@ -719,6 +750,7 @@ class InternalBlue:
return False
def recvPacket(self, timeout=None):
# type: (Optional[int]) -> Optional[Record]
"""
This function polls the recvQueue for the next available HCI
packet and returns it. The function checks whether it is called
@@ -743,6 +775,7 @@ class InternalBlue:
return None
def readMem(self, address, length, progress_log=None, bytes_done=0, bytes_total=0):
# type: (int, int, Optional[Any], int, int) -> Optional[bytes]
"""
Reads <length> bytes from the memory space of the firmware at the given
address. Reading from unmapped memory or certain memory-mapped-IO areas
@@ -827,6 +860,7 @@ class InternalBlue:
return outbuffer
def readMemAligned(self, address, length, progress_log=None, bytes_done=0, bytes_total=0):
# type: (int, int, Optional[Any], int, int) -> Any
"""
This is an alternative to readMem() which enforces a strictly aligned access
to the memory that is read. This is needed for e.g. the memory-mapped-IO
@@ -864,6 +898,7 @@ class InternalBlue:
recvQueue = Queue.Queue(1)
def hciFilterFunction(record):
# type: (Record) -> bool
hcipkt = record[0]
if not issubclass(hcipkt.__class__, hci.HCI_Event):
return False
@@ -922,6 +957,7 @@ class InternalBlue:
return outbuffer
def writeMem(self, address, data, progress_log=None, bytes_done=0, bytes_total=0):
# type: (int, bytes, Optional[Any], int, int) -> Optional[bool]
"""
Writes the <data> to the memory space of the firmware at the given
address.
@@ -964,6 +1000,7 @@ class InternalBlue:
return True
def launchRam(self, address):
# type: (int) -> bool
"""
Executes a function at the specified address in the context of the HCI
handler thread. The function has to comply with the calling convention.
@@ -988,6 +1025,7 @@ class InternalBlue:
return True
def getPatchramState(self):
# type: () -> Tuple[List[Optional[int]], List[Any], List[Any]]
"""
Retrieves the current state of the patchram unit. The return value
is a tuple containing 3 lists which are indexed by the slot number:
@@ -1033,6 +1071,7 @@ class InternalBlue:
return (table_addresses, table_values, slot_bits)
def patchRom(self, address, patch, slot=None):
# type: (int, Any, Optional[Any]) -> bool
"""
Patch a 4-byte value (DWORD) inside the ROM section of the firmware
(0x0 - 0x8FFFF) using the patchram mechanism. There are 128 available
@@ -1114,6 +1153,7 @@ class InternalBlue:
return True
def disableRomPatch(self, address, slot=None):
# type: (int, Optional[int]) -> bool
"""
Disable a patchram slot (see also patchRom()). The slot can either be
specified by the target address (address that was patched) or by providing
@@ -1156,6 +1196,7 @@ class InternalBlue:
return True
def readConnectionInformation(self, conn_number):
# type: (ConnectionNumber) -> Optional[ConnectionDict]
"""
Reads and parses a connection struct based on the connection number.
Note: The connection number is different from the connection index!
@@ -1221,6 +1262,7 @@ class InternalBlue:
return conn_dict
def sendLmpPacket(self, opcode, payload='', is_master=True, conn_handle=0x0c, extended_op=False):
# type: (Opcode, Any, bool, ConnectionNumber, bool) -> bool
"""
Inject a LMP packet into a Bluetooth connection (i.e. send a LMP packet
to a remote device which is paired and connected with our local device).
@@ -1285,6 +1327,7 @@ class InternalBlue:
return True
def fuzzLmp(self):
# type: ()-> bool
"""
Installs a patch inside the sendLmp HCI handler that allows sending arbitrary
LMP payloads. Afterwards, use sendLmpPacket as before.
@@ -1312,6 +1355,7 @@ class InternalBlue:
return True
def sendLmpPacketLegacy(self, conn_nr, opcode, payload, extended_op=False):
# type: (int, Opcode, bytes, bool) -> bool
"""
Inject a LMP packet into a Bluetooth connection (i.e. send a LMP packet
to a remote device which is paired and connected with our local device).
@@ -1348,7 +1392,7 @@ class InternalBlue:
# Prepare the assembler snippet by injecting the connection number
# and appending the LMP packet data.
asm_code = self.fw.SENDLMP_ASM_CODE % (conn_nr)
asm_code = self.fw.SENDLMP_ASM_CODE % (conn_nr) # type: str
asm_code_with_data = asm_code + ''.join([".byte 0x%02x\n" % ord(x)
for x in data.ljust(20, "\x00")])
@@ -1364,6 +1408,7 @@ class InternalBlue:
return False
def sendLcpPacket(self, conn_idx, payload):
# type: (ConnectionIndex, bytes) -> bool
"""
Inject a LCP packet into a Bluetooth LE connection (i.e. send a LCP packet
to a remote device which is paired and connected with our local device).
@@ -1400,6 +1445,7 @@ class InternalBlue:
return False
def connectToRemoteDevice(self, bt_addr):
# type: (BluetoothAddress) -> None
"""
Send a HCI Connect Command to the firmware. This will setup
a connection (inserted into the connection structure) if the
@@ -1427,6 +1473,7 @@ class InternalBlue:
self.sendHciCommand(0x0405, bt_addr[::-1] + '\x00\x00\x00\x00\x00\x00\x01')
def connectToRemoteLEDevice(self, bt_addr, addr_type=0x00):
# type: (BluetoothAddress, int) -> None
"""
Send a HCI LE Create Connection Command to the firmware as
defined in the Bluetooth Core Specification 5.0 p. 1266.
@@ -1443,6 +1490,7 @@ class InternalBlue:
self.sendHciCommand(0x200d, '\x60\x00\x30\x00\x00' + p8(addr_type) + bt_addr[::-1] + '\x01\x18\x00\x28\x00\x00\x00\xd0\x07\x00\x00\x00\x00')
def connectionStatusCallback(self, record):
# type: (Record) -> None
"""
HCI Callback function to detect HCI Events related to
Create Connection
@@ -1475,6 +1523,7 @@ class InternalBlue:
log.info("[Disconnect Complete: Handle=0x%x]" % (conn_handle))
def coexStatusCallback(self, record):
# type: (Record) -> None
"""
Coexistence Callback Function
Interprets debug counters for coexistence with WiFi/LTE
@@ -1499,6 +1548,7 @@ class InternalBlue:
return
def readHeapInformation(self):
# type: () -> Optional[Union[HeapInformation, bool]]
"""
Traverses the double-linked list of BLOC structs and returns them as a
list of dictionaries. The dicts have the following fields:
@@ -1592,6 +1642,7 @@ class InternalBlue:
def readQueueInformation(self):
# type: () -> Optional[Union[bool, QueueInformation]]
"""
Traverses the double-linked list of QUEUE structs and returns them as a
list of dictionaries. The dicts have the following fields:
@@ -1655,6 +1706,7 @@ class InternalBlue:
return queuelist
def enableBroadcomDiagnosticLogging(self, enable):
# type: (bool) -> None
"""
Broadcom implemented their own H4 layer protocol. Normally H4 handles HCI
messages like HCI commands, SCO and ACL data, and HCI events. Their types are
@@ -1681,3 +1733,9 @@ class InternalBlue:
# itself crashes when receiving diagnostic frames...
else:
log.warn("Diagnostic protocol requires modified Android driver!")
def _setupSockets(self):
raise NotImplementedError()
def _teardownSockets(self):
raise NotImplementedError()
+1 -1
View File
@@ -20,7 +20,7 @@
# out of or in connection with the Software or the use or other dealings in the
# Software.
from fw import MemorySection
from .fw import MemorySection
# Firmware Infos
# iPhone 6
+1 -1
View File
@@ -20,7 +20,7 @@
# out of or in connection with the Software or the use or other dealings in the
# Software.
from fw import MemorySection
from .fw import MemorySection
# Firmware Infos
# Samsung Galaxy S8
+1 -1
View File
@@ -23,7 +23,7 @@
# out of or in connection with the Software or the use or other dealings in the
# Software.
from fw import MemorySection
from .fw import MemorySection
# Firmware Infos
FW_NAME = "BCM20702A2"
+1 -1
View File
@@ -20,7 +20,7 @@
# out of or in connection with the Software or the use or other dealings in the
# Software.
from fw import MemorySection
from .fw import MemorySection
# Firmware Infos
# Evaluation Kit CYW920735
+1 -1
View File
@@ -20,7 +20,7 @@
# out of or in connection with the Software or the use or other dealings in the
# Software.
from fw import MemorySection
from .fw import MemorySection
# Firmware Infos
# Evaluation Kit CYW927019
+1 -1
View File
@@ -20,7 +20,7 @@
# out of or in connection with the Software or the use or other dealings in the
# Software.
from fw import MemorySection
from .fw import MemorySection
# Firmware Infos
# iPhone 8/X/XR
+1 -1
View File
@@ -22,7 +22,7 @@
# out of or in connection with the Software or the use or other dealings in the
# Software.
from fw import MemorySection
from .fw import MemorySection
# Firmware Infos
FW_NAME = "BCM2070B0 (MacBook Pro 2011)"
+1 -1
View File
@@ -20,7 +20,7 @@
# out of or in connection with the Software or the use or other dealings in the
# Software.
from fw import MemorySection
from .fw import MemorySection
# Firmware Infos
# This runs on an iPhone 7
+1 -1
View File
@@ -25,7 +25,7 @@
# out of or in connection with the Software or the use or other dealings in the
# Software.
from fw import MemorySection
from .fw import MemorySection
# Firmware Infos
# This runs on Nexus 5, Xperia Z3, Samsung Galaxy Note 3
+1 -1
View File
@@ -20,7 +20,7 @@
# out of or in connection with the Software or the use or other dealings in the
# Software.
from fw import MemorySection
from .fw import MemorySection
# Firmware Infos
# iPhone 6
+1 -1
View File
@@ -20,7 +20,7 @@
# out of or in connection with the Software or the use or other dealings in the
# Software.
from fw import MemorySection
from .fw import MemorySection
# Firmware Infos
# iPhone SE
+1 -1
View File
@@ -22,7 +22,7 @@
# out of or in connection with the Software or the use or other dealings in the
# Software.
from fw import MemorySection
from .fw import MemorySection
# Firmware Infos
FW_NAME = "default (unknown firmware)"
+1
View File
@@ -56,6 +56,7 @@ class HCI(object):
return HCI_UART_TYPE_CLASS[uart_type].from_data(data[1:])
def __init__(self, uart_type):
self.event_code = None
self.uart_type = uart_type
def getRaw(self):
+14 -5
View File
@@ -4,11 +4,19 @@ import subprocess
import datetime
from pwn import *
import fcntl
from core import InternalBlue
from .core import InternalBlue
import hci
import Queue
import threading
try:
from typing import List
from internalblue import Device
except:
pass
# from /usr/include/bluetooth/hci.h:
#define HCIDEVUP _IOW('H', 201, int)
#define HCIGETDEVLIST _IOR('H', 210, int)
@@ -34,6 +42,7 @@ class HCICore(InternalBlue):
self.doublecheck = False
def getHciDeviceList(self):
# type: () -> List[Device]
"""
Get a list of available HCI devices. The list is obtained by executing
ioctl syscalls HCIGETDEVLIST and HCIGETDEVINFO. The returned list
@@ -123,8 +132,8 @@ class HCICore(InternalBlue):
log.info("HCI device: %s [%s] flags=%d<%s>" %
(dev["dev_name"], dev["dev_bdaddr"],
dev["dev_flags"], dev["dev_flags_str"]))
device_list.append([self, dev["dev_name"], 'hci: %s (%s) <%s>' %
(dev["dev_bdaddr"], dev["dev_name"], dev["dev_flags_str"])])
device_list.append((self, dev["dev_name"], 'hci: %s (%s) <%s>' %
(dev["dev_bdaddr"], dev["dev_name"], dev["dev_flags_str"])))
if len(device_list) == 0:
log.info('No connected HCI device found')
@@ -184,8 +193,8 @@ class HCICore(InternalBlue):
record_data = self.s_snoop.recv(1024)
except socket.timeout:
continue # this is ok. just try again without error
except Exception:
log.critical("Lost device interface, terminating receive thread...")
except Exception as e:
log.critical("Lost device interface with exception {}, terminating receive thread...".format(e))
self.exit_requested = True
continue
+175
View File
@@ -0,0 +1,175 @@
import binascii
import time
class SocketRecvHook():
def __init__(self, socket):
# type: (socket.socket) -> None
self.socket = socket
self.replace = False
def recv_hook(self, data):
raise NotImplementedError()
def recv_replace(self, length, **kwargs):
raise NotImplementedError()
def recv(self, length, **kwargs):
if not self.replace:
data = self.socket.recv(length, **kwargs)
else:
data = self.recv_replace(length, **kwargs)
self.recv_hook(data)
return data
def recvfrom(self, length):
# type: (int) -> Tuple[bytes, Any]
if not self.replace:
data, addr = self.socket.recvfrom(length)
self.recv_hook(data)
return data
class SocketInjectHook():
def __init__(self, socket):
# type: (socket.socket) -> None
self.socket = socket
self.replace = False
def close(self):
self.socket.close()
def send(self,data):
self.send_hook(data)
if not self.replace:
try:
self.socket.send(data)
except Exception as e:
self.send_exception(e)
raise e
else:
self.send_replace(data)
def send_hook(self,result):
raise NotImplementedError()
def send_replace(self,data):
raise NotImplementedError()
def send_exception(self, e):
raise NotImplementedError()
class SocketDuplexHook(SocketInjectHook, SocketRecvHook):
def __init__(self, socket):
# type: (socket.socket) -> None
self.socket = socket
self.replace = False
pass
class HookBase():
def send_hook(self,data):
raise NotImplementedError
def recv_hook(self, data):
raise NotImplementedError
class TraceToFileHook(SocketDuplexHook):
def __init__(self, socket, filename='/tmp/bt_hci.log'):
SocketDuplexHook.__init__(self, socket)
self.file = open(filename, 'a')
self.replace = False
self.log = []
def recv_hook(self, data):
line = "RX {}\n".format(binascii.hexlify(data))
print(line)
self.log.append(line)
def send_hook(self, data):
line = "TX {}\n".format(binascii.hexlify(data))
print(line)
self.log.append(line)
def send_exception(self, e):
line = "EX '{}'\n".format(e)
print(line)
self.log.append(line)
def close(self):
self.socket.close()
self.log.append("Socket closed\n")
self.file.writelines(self.log)
self.file.close()
import socket
class PrintTrace(SocketDuplexHook):
def send_hook(self, data, **kwargs):
print("Sent: {}".format(binascii.hexlify(data)))
def recv_hook(self, data, **kwargs):
print("Recv: {}".format(binascii.hexlify(data)))
def recvfrom_hook(self, data, **kwargs):
print("Recv: {}".format(binascii.hexlify(data)))
class ReplaySocket():
def __init__(self, filename='/tmp/bt.log'):
super(ReplaySocket, self).__init__()
self.log = open(filename).readlines()
self.index = 0
def send(self, data, **kwargs):
encoded_data = "" # type: str
direction, encoded_data = self.log[self.index].split(" ")
assert(direction == "TX")
log_data = binascii.unhexlify(encoded_data.rstrip('\n'))
assert(data == log_data)
self.index+=1
def recv(self, **kwargs):
time.sleep(0.01)
direction, encoded_data = self.log[self.index].split(" ")
if direction == "RX":
return binascii.unhexlify(encoded_data.rstrip('\n'))
else:
raise socket.timeout()
from internalblue.core import InternalBlue
try:
import typing
from typing import Type
except ImportError:
pass
def hook(core, socket_hook):
# type: (Type[InternalBlue], Type[SocketDuplexHook]) -> None
def wrap_socket_setup(orig_func):
def wrapped_socket_setup(self, *args, **kwargs):
status = orig_func(self, *args, **kwargs)
if self.s_inject == self.s_snoop:
h = socket_hook(self.s_inject)
self.s_inject = h
self.s_snoop = h
else:
self.s_inject = socket_hook(self.s_inject)
self.s_snoop = socket_hook(self.s_snoop)
return status
return wrapped_socket_setup
core._setupSockets = wrap_socket_setup(core._setupSockets)
+159
View File
@@ -0,0 +1,159 @@
#!/usr/bin/env python2
import socket
import Queue
import hci
from pwn import *
from core import InternalBlue
import binascii
filepath = os.path.dirname(os.path.abspath(__file__))
try:
import typing
from typing import List, Tuple, Any
from internalblue.core import InternalBlue
except:
pass
class testCore(InternalBlue):
def __init__(self, queue_size=1000, btsnooplog_filename='btsnoop.log', log_level='info', fix_binutils='True', data_directory="."):
super(testCore, self).__init__(queue_size, btsnooplog_filename, log_level, fix_binutils, data_directory=".")
file = open(filepath+'/../dummymemdump.bin', mode='rb')
self.memory = file.read()
file.close()
self.doublecheck = False
def device_list(self):
# type: () -> List[Tuple[InternalBlue,str,str]]
"""
Get a list of connected devices
"""
if self.exit_requested:
self.shutdown()
if self.running:
log.warn("Already running. Call shutdown() first!")
return []
# assume that a explicitly specified iPhone exists
device_list = [(self, "Testchip", "Testchip")]
return device_list
def sendH4(self, h4type, data, timeout=2):
"""
Send an arbitrary HCI packet by pushing a send-task into the
sendQueue. This function blocks until the response is received
or the timeout expires. The return value is the Payload of the
HCI Command Complete Event which was received in response to
the command or None if no response was received within the timeout.
"""
queue = Queue.Queue(1)
try:
self.sendQueue.put((h4type, data, queue, None), timeout=timeout)
ret = queue.get(timeout=timeout)
return ret
except Queue.Empty:
log.warn("sendH4: waiting for response timed out!")
return None
except Queue.Full:
log.warn("sendH4: send queue is full!")
return None
def local_connect(self):
return True
def _setupSockets(self):
self.hciport = random.randint(60000, 65535 - 1)
log.debug("_setupSockets: Selected random ports snoop=%d and inject=%d" % (self.hciport, self.hciport + 1))
log.info("Wireshark configuration (on Loopback interface): udp.port == %d || udp.port == %d" % (
self.hciport, self.hciport + 1))
# Create s_snoop socket
self.s_snoop = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.s_snoop.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
self.s_snoop.bind(('127.0.0.1', self.hciport))
self.s_snoop.settimeout(0.5)
self.s_snoop.setblocking(True)
# Create s_inject
self.s_inject = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.s_inject.settimeout(0.5)
self.s_inject.setblocking(True)
time.sleep(1.5)
return True
def _recvThreadFunc(self):
log.debug("Receive Thread terminated.")
def _sendThreadFunc(self):
log.debug("Send Thread started.")
while not self.exit_requested:
# Little bit ugly: need to re-apply changes to the global context to the thread-copy
context.log_level = self.log_level
# Wait for 'send task' in send queue
try:
task = self.sendQueue.get(timeout=0.5)
except Queue.Empty:
continue
# Extract the components of the task
h4type, data, queue, filter_function = task
# Prepend UART TYPE and length.
out = p8(h4type) + p8(len(data)) + data
# Send command to the chip using IOBluetoothExtended framework
h4type, data, queue, filter_function = task
opcode = binascii.hexlify(data[1]) + binascii.hexlify(data[0])
log.debug("Sending command: 0x" + binascii.hexlify(data))
# if the caller expects a response: register a queue to receive the response
if queue is not None and filter_function is not None:
recvQueue = Queue.Queue(1)
self.registerHciRecvQueue(recvQueue, filter_function)
# if the caller expects a response:
# Wait for the HCI event response by polling the recvQueue
if queue is not None and filter_function is not None:
# Return responses according to the opcode & operands
if opcode == '1001':
record_data = '040E0C0101100006b415060f000e22'.decode('hex')
data = hci.parse_hci_packet(record_data).data
elif opcode == 'fc4d':
length = int(binascii.hexlify(data[7]), 16)
address = int(binascii.hexlify(data[6]+data[5]+data[4]+data[3]), 16)
data = '014dfc00'.decode('hex') + self.memory[address:address+length]
elif opcode == 'fc4c':
log.info(data.encode('hex'))
length = int(binascii.hexlify(data[2]), 16)
address = int(binascii.hexlify(data[6]+data[5]+data[4]+data[3]), 16)
self.memory = self.memory[:address] + data[7:len(data)] + self.memory[address+length:]
else:
print(opcode)
queue.put(data)
self.unregisterHciRecvQueue(recvQueue)
log.debug("Send Thread terminated.")
def enableBroadcomDiagnosticLogging(self, enable):
return
def _teardownSockets(self):
return True
def shutdown(self):
return True
View File
+17
View File
@@ -0,0 +1,17 @@
import unittest
from internalblue.testcore import testCore
class DummyCoreTest(unittest.TestCase):
def setUp(self):
t = testCore(log_level='debug', data_directory='/tmp')
dev = t.device_list()[0]
reference = dev[0]
reference.interface = dev[1]
self.assert_(reference.connect(), 'Connect failed')
self.reference = reference
def tearDown(self):
self.reference.shutdown()
+31
View File
@@ -0,0 +1,31 @@
import nose
from .dummy_core_test import DummyCoreTest
from internalblue.cmds import CmdHexdump, CmdSendHciCmd
import unittest
class TestCmdSendHciCmd(DummyCoreTest):
def test_version(self):
cmd = CmdSendHciCmd("sendhcicmd 0x1001", self.reference)
result = cmd.work()
nose.tools.assert_equal(result, b'\x01\x01\x10\x00\x06\xb4\x15\x06\x0f\x00\x0e"')
pass
def test_write(self):
cmd = CmdSendHciCmd("sendhcicmd 0xfc4c", self.reference)
pass
#cmd.readMem()
if __name__ == '__main__':
unittest.main()
+45
View File
@@ -0,0 +1,45 @@
import nose
try:
from typing import TYPE_CHECKING
if TYPE_CHECKING:
from internalblue import Address
except ImportError:
Address = lambda x: x
pass
from .dummy_core_test import DummyCoreTest
from internalblue.cmds import CmdHexdump
import unittest
class TestDirectMemReadWrite(DummyCoreTest):
def test_read_mem(self):
from internalblue.cmds import CmdHexdump
hdxdmp = CmdHexdump("hexdump 0xc0 -l 0x20", self.reference)
dump = hdxdmp.readMem(Address(0xc0), 0x20)
nose.tools.assert_equal(dump, b'\\\x01\x00\x00-.\x03\x00Copyright (c) Broadcom C')
def test_write_mem(self):
from internalblue.cmds import CmdWriteMem
cmd = CmdWriteMem("writemem --hex 0xc0 41424344", self.reference)
data = b'FOOBAR'
status = cmd.writeMem(Address(0x1000), data)
nose.tools.assert_true(status)
read = cmd.readMem(Address(0x1000), len(data))
nose.tools.assert_equal(data, read)
if __name__ == '__main__':
unittest.main()