4 Commits

Author SHA1 Message Date
Jiska Classen e89a84812e updated readme 2020-02-06 00:55:20 +01:00
Jiska Classen dd9d76cff9 Merge branch 'master' of https://dev.seemoo.tu-darmstadt.de/bcm/internalblue 2020-02-03 23:38:32 +01:00
Jiska Classen 58f9688b84 updated firmware files 2020-02-03 23:38:00 +01:00
Davide Toldo 45ec18744e Make macos core less verbose (forgot to remove line) 2020-01-22 12:52:07 +01:00
30 changed files with 83 additions and 688 deletions
+12 -1
View File
@@ -54,6 +54,17 @@ was also recorded and gives a more high level overview.
Our talk [Playing with Bluetooth](https://media.ccc.de/v/2019-185-playing-with-bluetooth) focuses on new device support
within *InternalBlue* and the Patchram state of various devices.
* **36C3 Talk** (12/2019)
The rather generic talk [All wireless communication stacks are equally broken](https://media.ccc.de/v/36c3-10531-all_wireless_communication_stacks_are_equally_broken)
points out a couple of new research directions and new Bluetooth projects coming up.
* **EWSN Paper & Demo** (02/2020)
We did some work on improving blacklisting performance of BLE data connections. Currently in a separate *blacklisting* branch.
Supported Features
@@ -214,7 +225,7 @@ can replace them with anything you want.
License
-------
Copyright 2018-2019 Dennis Mantz, Jiska Classen
Copyright 2018-2020 The InternalBlue Team
Permission is hereby granted, free of charge, to any person obtaining a copy of
this software and associated documentation files (the "Software"), to deal in
BIN
View File
Binary file not shown.
-61
View File
@@ -1,62 +1 @@
try:
from Queue import Queue
from typing import List, Optional, Any, TYPE_CHECKING, Tuple, Union, NewType, Callable, Dict
if TYPE_CHECKING:
import datetime
from internalblue.hci import HCI
from internalblue.core import InternalBlue
Address = NewType("Address", int)
Record = Tuple[HCI, int, int, int, Any, datetime.datetime]
FilterFunction = Callable[[Record], bool]
ConnectionNumber = NewType("ConnectionNumber", int)
ConnectionIndex = NewType("ConnectionIndex", int)
BluetoothAddress = NewType("BluetoothAddress", bytes)
ConnectionDict = NewType("ConnectionDict", Dict[str,Any])
HeapInformation = NewType("HeapInformation", Dict[str, Any])
QueueInformation = NewType('QueueInformation', Dict[str, Any])
Opcode = NewType('Opcode', int)
HCI_CMD = NewType('HCI_CMD', int)
Task = Tuple[HCI_CMD, bytes, Queue.Queue, Callable[[Record], bool]]
Device = NewType("Device", Dict[str, Any])
"""{"dev_id": dev_id,
"dev_name": dev_name,
"dev_bdaddr": dev_bdaddr,
"dev_flags": dev_flags,
"dev_flags_str": dev_flags_str}"""
# InternalBlueCore, Device Name, SomeString
DeviceTuple = Tuple[InternalBlue, str, str]
except:
pass
import logging
class ProgressLog():
"""
Hack to get around the dependency to the pwnlib logger
This looses functionality, but at some point this can be replaced by something like progressbar2
"""
def __init__(self, init_msg,logger):
self.logger = logger
self.logger.warning(init_msg)
def failure(self, msg):
self.logger.warning(msg)
def status(self, msg):
self.logger.info(msg)
def success(self, msg):
self.logger.info(msg)
def getLogger(name):
logger = logging.getLogger(name)
logger.progress = lambda msg: ProgressLog(msg, logger)
return logger
+2 -2
View File
@@ -4,11 +4,11 @@ import datetime
import socket
import Queue
import random
from internalblue import hci
import hci
from pwn import *
from .core import InternalBlue
from core import InternalBlue
class ADBCore(InternalBlue):
+13 -56
View File
@@ -33,20 +33,11 @@ import os
import traceback
import argparse
from .adbcore import ADBCore
from .hcicore import HCICore
from adbcore import ADBCore
from hcicore import HCICore
from sys import platform
from . import cmds
try:
import typing
from typing import List, Optional
from internalblue.core import InternalBlue
from . import DeviceTuple
except:
pass
import cmds
HISTFILE = "_internalblue.hist"
@@ -62,15 +53,11 @@ type <help> for usage information!\n\n"""
for line in banner:
term.output(text.blue(line))
def commandLoop(internalblue, init_commands=None):
cmdstack = init_commands.split(';')[::-1] if init_commands else None
def commandLoop(internalblue):
while internalblue.running and not internalblue.exit_requested:
cmd_instance = None
try:
if cmdstack:
cmdline = cmdstack.pop().strip()
else:
cmdline = term.readline.readline(prompt='> ').strip()
cmdline = term.readline.readline(prompt='> ').strip()
cmdword = cmdline.split(' ')[0].split('=')[0]
if(cmdword == ''):
continue
@@ -102,7 +89,7 @@ def commandLoop(internalblue, init_commands=None):
# Main Program Start
def internalblue_cli(argv):
def internalblue_cli():
print_banner()
parser = argparse.ArgumentParser()
@@ -110,11 +97,7 @@ def internalblue_cli(argv):
parser.add_argument("--verbose", "-v", help="Set log level to DEBUG", action="store_true")
parser.add_argument("--ios-device", "-i", help="Tell internalblue to connect to a remote iPhone HCI socket. Specify socket IP address and port (i.e., 172.20.10.1:1234).")
parser.add_argument("--serialsu", "-s", help="On ADB, directly try su/serial/busybox scripting, if you do not have a special bluetooth.default.so file.", action="store_true")
parser.add_argument("--testdevice", "-t", help="Use a dummy test device to execute testcases", action="store_true")
parser.add_argument("--trace", help="Trace hci connection")
parser.add_argument("--device", help="Specify device/core to be used")
parser.add_argument("--commands", "-c", help="CLI command to run before prompting, seperated by ';' (used for easier testing)")
args = parser.parse_args(argv)
args = parser.parse_args()
if args.data_directory is not None:
data_directory = args.data_directory
@@ -136,26 +119,13 @@ def internalblue_cli(argv):
readline_completer = term.completer.LongestPrefixCompleter(words=cmd_keywords)
term.readline.set_completer(readline_completer)
if args.trace:
from .socket_hooks import hook
from internalblue import socket_hooks
HookClass = getattr(socket_hooks, args.trace)
hook(HCICore, HookClass)
# Initalize cores and get devices
# As macOS has additional dependencies (objc), only import it here if needed
connection_methods = [] # type: List[InternalBlue]
if args.ios_device:
from .ioscore import iOSCore
from ioscore import iOSCore
connection_methods = [iOSCore(args.ios_device, log_level=log_level, data_directory=data_directory)]
elif args.testdevice:
from .testcore import testCore
connection_methods = [testCore(log_level=log_level, data_directory=data_directory)]
elif platform == "darwin":
from .macoscore import macOSCore
from macoscore import macOSCore
connection_methods = [
macOSCore(log_level=log_level, data_directory=data_directory),
ADBCore(log_level=log_level, data_directory=data_directory)]
@@ -164,25 +134,12 @@ def internalblue_cli(argv):
ADBCore(log_level=log_level, data_directory=data_directory, serial=args.serialsu),
HCICore(log_level=log_level, data_directory=data_directory)]
devices = [] # type: List[DeviceTuple]
devices = []
for connection_method in connection_methods:
devices.extend(connection_method.device_list())
device = None # type: Optional[DeviceTuple]
if len(devices) > 0:
if args.device:
matching_devices = [ dev for dev in devices if dev[1] == args.device]
if len(matching_devices) > 1:
log.critical("Found multiple matching devices")
exit(-1)
elif len(matching_devices) == 1:
log.info("Found device is: {}".format(matching_devices[0]))
device = matching_devices[0]
else:
log.critical("No matching devices found")
exit(-1)
elif len(devices) == 1:
if len(devices) == 1:
device = devices[0]
else:
i = options('Please specify device:', [d[2] for d in devices], 0)
@@ -203,7 +160,7 @@ def internalblue_cli(argv):
exit(-1)
# Enter command loop (runs until user quits)
commandLoop(reference, init_commands=args.commands)
commandLoop(reference)
# shutdown connection
reference.shutdown()
@@ -218,5 +175,5 @@ def internalblue_cli(argv):
if __name__ == "__main__":
internalblue_cli(sys.argv[1:])
internalblue_cli()
+6 -40
View File
@@ -36,25 +36,7 @@ import time
import select
import json
from internalblue import getLogger
#log = getLogger(__name__)
try:
from typing import List, Optional, Any, TYPE_CHECKING, Tuple, Type
if TYPE_CHECKING:
from internalblue.core import InternalBlue
from internalblue.hci import HCI
from internalblue import Record, BluetoothAddress, Address
except:
pass
def getCmdList():
# type: () -> List[Type['Cmd']]
""" Returns a list of all commands which are defined in this cmds.py file.
This is done by searching for all subclasses of Cmd
"""
@@ -62,7 +44,6 @@ def getCmdList():
if inspect.isclass(obj) and issubclass(obj, Cmd)][1:]
def findCmd(keyword):
# type: (str) -> Optional[Type['Cmd']]
""" Find and return a Cmd subclass for a given keyword.
"""
command_list = getCmdList()
@@ -80,13 +61,11 @@ def auto_int(x):
return int(x, 0)
def bt_addr_to_str(bt_addr):
# type: (BluetoothAddress) -> str
""" Convert a Bluetooth address (6 bytes) into a human readable format.
"""
return ":".join([b.encode("hex") for b in bt_addr])
def parse_bt_addr(bt_addr):
# type: (Any) -> Optional[BluetoothAddress]
""" Convert Bluetooth address argument and check lengths.
"""
addr = bt_addr
@@ -113,12 +92,11 @@ class Cmd:
'keywords' list as member variable. The actual implementation of the
command should be located in the work() method.
"""
keywords = [] # type: List[str]
keywords = []
memory_image = None # type: Optional[bytes]
memory_image = None
def __init__(self, cmdline, internalblue):
# type: (str, InternalBlue) -> None
self.cmdline = cmdline
self.internalblue = internalblue
self.memory_image_template_filename = internalblue.data_directory + "/memdump__template.bin"
@@ -127,28 +105,23 @@ class Cmd:
self.internalblue.fw.__name__[6:12] + "_template.bin"
def __str__(self):
# type: () -> str
return self.cmdline
def work(self):
# type: () -> bool
return True
def abort_cmd(self):
# type: () -> None
self.aborted = True
if hasattr(self, 'progress_log'):
self.progress_log.failure("Command aborted")
def getArgs(self):
# type: () -> Any
try:
return self.parser.parse_args(self.cmdline.split(' ')[1:])
except SystemExit:
return None
def isAddressInSections(self, address, length=0, sectiontype=""):
# type: (int, int, str) -> bool
if not self.internalblue.fw:
return False
@@ -164,15 +137,12 @@ class Cmd:
return False
def readMem(self, address, length, progress_log=None, bytes_done=0, bytes_total=0):
# type: (Address, int, Optional[Any], int, int) -> Optional[bytes]
return self.internalblue.readMem(address, length, progress_log, bytes_done, bytes_total)
def writeMem(self, address, data, progress_log=None, bytes_done=0, bytes_total=0):
# type: (Address, bytes, Optional[Any], int, int) -> bool
return self.internalblue.writeMem(address, data, progress_log, bytes_done, bytes_total)
def initMemoryImage(self):
# type: () -> None
"""
Initially read out a chip's memory, all sections (RAM+ROM).
:return:
@@ -198,7 +168,6 @@ class Cmd:
self.refreshMemoryImage()
def refreshMemoryImage(self):
# type: () -> None
"""
Update an existing memory dump, only RAM sections.
:return:
@@ -209,13 +178,12 @@ class Cmd:
for section in self.internalblue.fw.SECTIONS:
if not section.is_rom:
sectiondump = self.readMem(section.start_addr, section.size(), self.progress_log, bytes_done, bytes_total)
if sectiondump and Cmd.memory_image:
if sectiondump:
Cmd.memory_image = Cmd.memory_image[0:section.start_addr] + sectiondump + Cmd.memory_image[section.end_addr:]
bytes_done += section.size()
self.progress_log.success("Received Data: complete")
def getMemoryImage(self, refresh=False):
# type: (bool) -> Any
if Cmd.memory_image is None:
self.initMemoryImage()
elif refresh:
@@ -443,7 +411,6 @@ class CmdMonitor(Cmd):
self.wireshark_process = None
def adbhciCallback(self, record):
# type: (Record) -> None
hcipkt, orig_len, inc_len, flags, drops, recvtime = record
dummy = "\x00\x00\x00" # TODO: Figure out purpose of these fields
@@ -536,7 +503,6 @@ class CmdDumpMem(Cmd):
help="Only dump the two RAM sections.")
parser.add_argument("--file", "-f", default="memdump.bin",
help="Filename of memory dump (default: %(default)s)")
parser.add_argument("--overwrite", action='store_true')
def work(self):
args = self.getArgs()
@@ -551,7 +517,7 @@ class CmdDumpMem(Cmd):
for section in filter(lambda s: s.is_ram, self.internalblue.fw.SECTIONS):
filename = args.file + "_" + hex(section.start_addr)
if os.path.exists(filename):
if not (args.overwrite or yesno("Update '%s'?" % filename)):
if not yesno("Update '%s'?" % filename):
log.info("Skipping section @%s" % hex(section.start_addr))
bytes_done += section.size()
continue
@@ -565,7 +531,7 @@ class CmdDumpMem(Cmd):
# Get complete memory image
if os.path.exists(args.file):
if not (args.overwrite or yesno("Update '%s'?" % os.path.abspath(args.file))):
if not yesno("Update '%s'?" % os.path.abspath(args.file)):
return False
dump = self.getMemoryImage(refresh=not args.norefresh)
@@ -960,7 +926,7 @@ class CmdSendHciCmd(Cmd):
else:
data += data_part.decode('hex')
return self.internalblue.sendHciCommand(args.cmdcode, data)
self.internalblue.sendHciCommand(args.cmdcode, data)
return True
+11 -69
View File
@@ -28,31 +28,17 @@
from abc import ABCMeta, abstractmethod
from pwn import *
from .fw.fw import Firmware
from fw.fw import Firmware
import datetime
import time
import Queue
from . import hci
import hci
try:
from typing import List, Optional, Any, TYPE_CHECKING, Tuple, Union, NewType, Callable
from internalblue import Address, Record, Task, HCI_CMD, FilterFunction, ConnectionNumber, ConnectionDict, \
ConnectionIndex, BluetoothAddress, HeapInformation, QueueInformation, Opcode
from internalblue.hci import HCI
from . import DeviceTuple
if TYPE_CHECKING:
pass
except:
pass
#import logging
#log = logging.getLogger(__name__)
class InternalBlue:
__metaclass__ = ABCMeta
def __init__(self, queue_size=1000, btsnooplog_filename='btsnoop.log', log_level='info', fix_binutils='True', data_directory="."):
# type: (int, str, str, bool, str) -> None
context.log_level = log_level
context.log_file = data_directory + '/_internalblue.log'
context.arch = "thumb"
@@ -62,11 +48,11 @@ class InternalBlue:
self.data_directory = data_directory
self.s_inject = None #type: socket.socket # This is the TCP socket to the HCI inject port
self.s_snoop = None #type: socket.socket # This is the TCP socket to the HCI snoop port
self.s_inject = None # This is the TCP socket to the HCI inject port
self.s_snoop = None # This is the TCP socket to the HCI snoop port
# If btsnooplog_filename is set, write all incomming HCI packets to a file (can be viewed in wireshark for debugging)
if btsnooplog_filename is not None:
if btsnooplog_filename != None:
self.write_btsnooplog = True
self.btsnooplog_file = open(self.data_directory + "/" + btsnooplog_filename, "wb")
else:
@@ -92,7 +78,7 @@ class InternalBlue:
# firmware (the response is recognized with the help of the filter function).
# Once the response arrived, it puts the response into the response_queue from
# the tuple. See sendH4() and sendHciCommand().
self.sendQueue = Queue.Queue(queue_size) # type: Queue.Queue[Task]
self.sendQueue = Queue.Queue(queue_size)
self.recvThread = None # The thread which is responsible for the HCI snoop socket
self.sendThread = None # The thread which is responsible for the HCI inject socket
@@ -117,7 +103,7 @@ class InternalBlue:
# filter_function will be called for each packet that is received and only if it returns
# True, the packet will be put into the queue. The filter_function can be None in order
# to put all packets into the queue.
self.registeredHciRecvQueues = [] # type: List[Tuple[Queue.Queue[Record], FilterFunction]]
self.registeredHciRecvQueues = []
self.exit_requested = False # Will be set to true when the framework wants to shut down (e.g. on error or user exit)
self.running = False # 'running' is True once the connection to the HCI sockets is established
@@ -176,7 +162,6 @@ class InternalBlue:
return False
def _parse_time(self, time):
# type: (Any) -> datetime.datetime
"""
Taken from: https://github.com/joekickass/python-btsnoop
@@ -193,11 +178,9 @@ class InternalBlue:
@abstractmethod
def _recvThreadFunc(self):
# type: () -> None
pass
def _sendThreadFunc(self):
# type: () -> None
"""
This is the run-function of the sendThread. It polls the sendQueue for new 'send tasks'
and executes them (sends H4 commands to the chip and returns the response).
@@ -279,8 +262,8 @@ class InternalBlue:
try:
log.debug("_sendThreadFunc: Send: " + str(out.encode('hex')))
self.s_inject.send(out)
except Exception as e:
log.warn("_sendThreadFunc: Sending to socket failed with {}, reestablishing connection.\nWith HCI sockets, some HCI commands require root!".format(e))
except:
log.warn("_sendThreadFunc: Sending to socket failed, reestablishing connection.\nWith HCI sockets, some HCI commands require root!")
# socket are terminated by hcicore..
self._teardownSockets()
self._setupSockets()
@@ -304,7 +287,6 @@ class InternalBlue:
log.debug("Send Thread terminated.")
def _tracepointHciCallbackFunction(self, record):
# type: (Record) -> None
hcipkt = record[0] # get HCI Event packet
timestamp = record[5] # get timestamp
@@ -366,7 +348,6 @@ class InternalBlue:
def addTracepoint(self, address):
# type: (Address) -> bool
# Check if constants are defined in fw.py
for const in ['TRACEPOINT_BODY_ASM_LOCATION', 'TRACEPOINT_BODY_ASM_SNIPPET',
'TRACEPOINT_HOOK_ASM', 'TRACEPOINT_HOOKS_LOCATION',
@@ -460,7 +441,6 @@ class InternalBlue:
return True
def deleteTracepoint(self, address):
# type: (Address) -> bool
if not self.check_running():
return False
@@ -480,7 +460,6 @@ class InternalBlue:
return True
def check_running(self):
# type: () -> bool
"""
Check if the framework is running (i.e. the sockets are connected,
the recv and send threads are running and exit_requested is not True)
@@ -496,11 +475,10 @@ class InternalBlue:
@abstractmethod
def device_list(self):
# type: () -> List[DeviceTuple]
pass
def connect(self):
# type: () -> bool
if self.exit_requested:
self.shutdown()
@@ -546,7 +524,6 @@ class InternalBlue:
return True
def initialize_fimware(self):
# type: () -> bool
"""
Checks if we are running on a Broadcom chip and loads available firmware information based
on LMP subversion.
@@ -584,7 +561,6 @@ class InternalBlue:
return True
def shutdown(self):
# type: () -> None
"""
Shutdown the framework by stopping the send and recv threads. Socket shutdown
also terminates port forwarding if adb is used.
@@ -617,7 +593,6 @@ class InternalBlue:
log.info("Shutdown complete.")
def registerHciCallback(self, callback):
# type: (Callable[[Record], None ]) -> None
"""
Add a new callback function to self.registeredHciCallbacks.
The function will be called every time the recvThread receives
@@ -637,7 +612,6 @@ class InternalBlue:
self.registeredHciCallbacks.append(callback)
def unregisterHciCallback(self, callback):
# type: (Callable[[Tuple[HCI, int, int, int, Any, datetime.datetime]], None ]) -> None
"""
Remove a callback function from self.registeredHciCallbacks.
"""
@@ -648,7 +622,6 @@ class InternalBlue:
log.warn("registerHciCallback: no such callback is registered!")
def registerHciRecvQueue(self, queue, filter_function=None):
# type: (Queue.Queue[Record], FilterFunction) -> None
"""
Add a new queue to self.registeredHciRecvQueues.
The queue will be filled by the recvThread every time the thread receives
@@ -671,7 +644,6 @@ class InternalBlue:
self.registeredHciRecvQueues.append((queue, filter_function))
def unregisterHciRecvQueue(self, queue):
# type: (Queue.Queue[Tuple[HCI, int, int, int, Any, datetime]]) -> None
"""
Remove a queue from self.registeredHciRecvQueues.
"""
@@ -683,7 +655,6 @@ class InternalBlue:
log.warn("registerHciRecvQueue: no such queue is registered!")
def sendHciCommand(self, opcode, data, timeout=3):
# type: (Opcode, bytes, int) -> Optional[Any]
"""
Send an arbitrary HCI command packet by pushing a send-task into the
sendQueue. This function blocks until the response is received
@@ -703,7 +674,6 @@ class InternalBlue:
# define a filter function which recognizes the response (command complete
# or command status event).
def recvFilterFunction(record):
# type: (Record) -> bool
hcipkt = record[0]
log.debug("sendHciCommand.recvFilterFunction: got response")
@@ -733,7 +703,6 @@ class InternalBlue:
return None
def sendH4(self, h4type, data, timeout=2):
# type: (HCI_CMD, bytes, int) -> bool
"""
Send an arbitrary H4 packet by pushing a send-task into the
sendQueue. This function does not wait for a response! If you
@@ -750,7 +719,6 @@ class InternalBlue:
return False
def recvPacket(self, timeout=None):
# type: (Optional[int]) -> Optional[Record]
"""
This function polls the recvQueue for the next available HCI
packet and returns it. The function checks whether it is called
@@ -775,7 +743,6 @@ class InternalBlue:
return None
def readMem(self, address, length, progress_log=None, bytes_done=0, bytes_total=0):
# type: (int, int, Optional[Any], int, int) -> Optional[bytes]
"""
Reads <length> bytes from the memory space of the firmware at the given
address. Reading from unmapped memory or certain memory-mapped-IO areas
@@ -860,7 +827,6 @@ class InternalBlue:
return outbuffer
def readMemAligned(self, address, length, progress_log=None, bytes_done=0, bytes_total=0):
# type: (int, int, Optional[Any], int, int) -> Any
"""
This is an alternative to readMem() which enforces a strictly aligned access
to the memory that is read. This is needed for e.g. the memory-mapped-IO
@@ -898,7 +864,6 @@ class InternalBlue:
recvQueue = Queue.Queue(1)
def hciFilterFunction(record):
# type: (Record) -> bool
hcipkt = record[0]
if not issubclass(hcipkt.__class__, hci.HCI_Event):
return False
@@ -957,7 +922,6 @@ class InternalBlue:
return outbuffer
def writeMem(self, address, data, progress_log=None, bytes_done=0, bytes_total=0):
# type: (int, bytes, Optional[Any], int, int) -> Optional[bool]
"""
Writes the <data> to the memory space of the firmware at the given
address.
@@ -1000,7 +964,6 @@ class InternalBlue:
return True
def launchRam(self, address):
# type: (int) -> bool
"""
Executes a function at the specified address in the context of the HCI
handler thread. The function has to comply with the calling convention.
@@ -1025,7 +988,6 @@ class InternalBlue:
return True
def getPatchramState(self):
# type: () -> Tuple[List[Optional[int]], List[Any], List[Any]]
"""
Retrieves the current state of the patchram unit. The return value
is a tuple containing 3 lists which are indexed by the slot number:
@@ -1071,7 +1033,6 @@ class InternalBlue:
return (table_addresses, table_values, slot_bits)
def patchRom(self, address, patch, slot=None):
# type: (int, Any, Optional[Any]) -> bool
"""
Patch a 4-byte value (DWORD) inside the ROM section of the firmware
(0x0 - 0x8FFFF) using the patchram mechanism. There are 128 available
@@ -1153,7 +1114,6 @@ class InternalBlue:
return True
def disableRomPatch(self, address, slot=None):
# type: (int, Optional[int]) -> bool
"""
Disable a patchram slot (see also patchRom()). The slot can either be
specified by the target address (address that was patched) or by providing
@@ -1196,7 +1156,6 @@ class InternalBlue:
return True
def readConnectionInformation(self, conn_number):
# type: (ConnectionNumber) -> Optional[ConnectionDict]
"""
Reads and parses a connection struct based on the connection number.
Note: The connection number is different from the connection index!
@@ -1262,7 +1221,6 @@ class InternalBlue:
return conn_dict
def sendLmpPacket(self, opcode, payload='', is_master=True, conn_handle=0x0c, extended_op=False):
# type: (Opcode, Any, bool, ConnectionNumber, bool) -> bool
"""
Inject a LMP packet into a Bluetooth connection (i.e. send a LMP packet
to a remote device which is paired and connected with our local device).
@@ -1327,7 +1285,6 @@ class InternalBlue:
return True
def fuzzLmp(self):
# type: ()-> bool
"""
Installs a patch inside the sendLmp HCI handler that allows sending arbitrary
LMP payloads. Afterwards, use sendLmpPacket as before.
@@ -1355,7 +1312,6 @@ class InternalBlue:
return True
def sendLmpPacketLegacy(self, conn_nr, opcode, payload, extended_op=False):
# type: (int, Opcode, bytes, bool) -> bool
"""
Inject a LMP packet into a Bluetooth connection (i.e. send a LMP packet
to a remote device which is paired and connected with our local device).
@@ -1392,7 +1348,7 @@ class InternalBlue:
# Prepare the assembler snippet by injecting the connection number
# and appending the LMP packet data.
asm_code = self.fw.SENDLMP_ASM_CODE % (conn_nr) # type: str
asm_code = self.fw.SENDLMP_ASM_CODE % (conn_nr)
asm_code_with_data = asm_code + ''.join([".byte 0x%02x\n" % ord(x)
for x in data.ljust(20, "\x00")])
@@ -1408,7 +1364,6 @@ class InternalBlue:
return False
def sendLcpPacket(self, conn_idx, payload):
# type: (ConnectionIndex, bytes) -> bool
"""
Inject a LCP packet into a Bluetooth LE connection (i.e. send a LCP packet
to a remote device which is paired and connected with our local device).
@@ -1445,7 +1400,6 @@ class InternalBlue:
return False
def connectToRemoteDevice(self, bt_addr):
# type: (BluetoothAddress) -> None
"""
Send a HCI Connect Command to the firmware. This will setup
a connection (inserted into the connection structure) if the
@@ -1473,7 +1427,6 @@ class InternalBlue:
self.sendHciCommand(0x0405, bt_addr[::-1] + '\x00\x00\x00\x00\x00\x00\x01')
def connectToRemoteLEDevice(self, bt_addr, addr_type=0x00):
# type: (BluetoothAddress, int) -> None
"""
Send a HCI LE Create Connection Command to the firmware as
defined in the Bluetooth Core Specification 5.0 p. 1266.
@@ -1490,7 +1443,6 @@ class InternalBlue:
self.sendHciCommand(0x200d, '\x60\x00\x30\x00\x00' + p8(addr_type) + bt_addr[::-1] + '\x01\x18\x00\x28\x00\x00\x00\xd0\x07\x00\x00\x00\x00')
def connectionStatusCallback(self, record):
# type: (Record) -> None
"""
HCI Callback function to detect HCI Events related to
Create Connection
@@ -1523,7 +1475,6 @@ class InternalBlue:
log.info("[Disconnect Complete: Handle=0x%x]" % (conn_handle))
def coexStatusCallback(self, record):
# type: (Record) -> None
"""
Coexistence Callback Function
Interprets debug counters for coexistence with WiFi/LTE
@@ -1548,7 +1499,6 @@ class InternalBlue:
return
def readHeapInformation(self):
# type: () -> Optional[Union[HeapInformation, bool]]
"""
Traverses the double-linked list of BLOC structs and returns them as a
list of dictionaries. The dicts have the following fields:
@@ -1642,7 +1592,6 @@ class InternalBlue:
def readQueueInformation(self):
# type: () -> Optional[Union[bool, QueueInformation]]
"""
Traverses the double-linked list of QUEUE structs and returns them as a
list of dictionaries. The dicts have the following fields:
@@ -1706,7 +1655,6 @@ class InternalBlue:
return queuelist
def enableBroadcomDiagnosticLogging(self, enable):
# type: (bool) -> None
"""
Broadcom implemented their own H4 layer protocol. Normally H4 handles HCI
messages like HCI commands, SCO and ACL data, and HCI events. Their types are
@@ -1733,9 +1681,3 @@ class InternalBlue:
# itself crashes when receiving diagnostic frames...
else:
log.warn("Diagnostic protocol requires modified Android driver!")
def _setupSockets(self):
raise NotImplementedError()
def _teardownSockets(self):
raise NotImplementedError()
+5 -1
View File
@@ -50,7 +50,7 @@ SECTIONS = [ MemorySection(0x0, 0x90000, True , False),
MemorySection(0xd0000, 0xd8000, False, True ),
#MemorySection(0xe0000, 0x1f0000, True , False),
MemorySection(0x200000, 0x21ffff, False, True ),
MemorySection(0x260000, 0x268000, True , False),
#MemorySection(0x260000, 0x268000, True , False), # might crash? issue 14
#MemorySection(0x280000, 0x2a0000, True , False),
MemorySection(0x318000, 0x320000, False, False),
MemorySection(0x324000, 0x360000, False, False),
@@ -74,6 +74,10 @@ PATCHRAM_VALUE_TABLE_ADDRESS = 0xd0000
PATCHRAM_NUMBER_OF_SLOTS = 128
PATCHRAM_ALIGNED = False
# Heap
BLOC_HEAD = 0x200588 # g_dynamic_memory_GeneralUsePools
BLOC_NG = True # Next Generation Bloc Buffer
# Snippet for sendLcpPacket()
SENDLCP_CODE_BASE_ADDRESS = 0x21a000
SENDLCP_ASM_CODE = """
+1 -1
View File
@@ -20,7 +20,7 @@
# out of or in connection with the Software or the use or other dealings in the
# Software.
from .fw import MemorySection
from fw import MemorySection
# Firmware Infos
# iPhone 6
+1 -1
View File
@@ -20,7 +20,7 @@
# out of or in connection with the Software or the use or other dealings in the
# Software.
from .fw import MemorySection
from fw import MemorySection
# Firmware Infos
# Samsung Galaxy S8
+1 -1
View File
@@ -23,7 +23,7 @@
# out of or in connection with the Software or the use or other dealings in the
# Software.
from .fw import MemorySection
from fw import MemorySection
# Firmware Infos
FW_NAME = "BCM20702A2"
+1 -1
View File
@@ -20,7 +20,7 @@
# out of or in connection with the Software or the use or other dealings in the
# Software.
from .fw import MemorySection
from fw import MemorySection
# Firmware Infos
# Evaluation Kit CYW920735
+3 -3
View File
@@ -20,11 +20,11 @@
# out of or in connection with the Software or the use or other dealings in the
# Software.
from .fw import MemorySection
from fw import MemorySection
# Firmware Infos
# Evaluation Kit CYW927019
FW_NAME = "CYW27039B1 (NOT iPhone X/XR!)"
# Evaluation Kit CYW920719
FW_NAME = "CYW20739B1 (NOT iPhone X/XR!)"
# TODO this is not the iPhone firmware, we need to add a switch in fw.py
# Device Infos
+1 -1
View File
@@ -20,7 +20,7 @@
# out of or in connection with the Software or the use or other dealings in the
# Software.
from .fw import MemorySection
from fw import MemorySection
# Firmware Infos
# iPhone 8/X/XR
+1 -1
View File
@@ -22,7 +22,7 @@
# out of or in connection with the Software or the use or other dealings in the
# Software.
from .fw import MemorySection
from fw import MemorySection
# Firmware Infos
FW_NAME = "BCM2070B0 (MacBook Pro 2011)"
+1 -1
View File
@@ -20,7 +20,7 @@
# out of or in connection with the Software or the use or other dealings in the
# Software.
from .fw import MemorySection
from fw import MemorySection
# Firmware Infos
# This runs on an iPhone 7
+1 -1
View File
@@ -25,7 +25,7 @@
# out of or in connection with the Software or the use or other dealings in the
# Software.
from .fw import MemorySection
from fw import MemorySection
# Firmware Infos
# This runs on Nexus 5, Xperia Z3, Samsung Galaxy Note 3
+4
View File
@@ -73,6 +73,10 @@ PATCHRAM_VALUE_TABLE_ADDRESS = 0xd0000
PATCHRAM_NUMBER_OF_SLOTS = 128
PATCHRAM_ALIGNED = False
# Heap
BLOC_HEAD = 0x200490 # g_dynamic_memory_GeneralUsePools
BLOC_NG = True # Next Generation Bloc Buffer
# Snippet for sendLcpPacket()
SENDLCP_CODE_BASE_ADDRESS = 0x21f000
SENDLCP_ASM_CODE = """
+1 -1
View File
@@ -20,7 +20,7 @@
# out of or in connection with the Software or the use or other dealings in the
# Software.
from .fw import MemorySection
from fw import MemorySection
# Firmware Infos
# iPhone 6
+1 -1
View File
@@ -20,7 +20,7 @@
# out of or in connection with the Software or the use or other dealings in the
# Software.
from .fw import MemorySection
from fw import MemorySection
# Firmware Infos
# iPhone SE
+1 -1
View File
@@ -22,7 +22,7 @@
# out of or in connection with the Software or the use or other dealings in the
# Software.
from .fw import MemorySection
from fw import MemorySection
# Firmware Infos
FW_NAME = "default (unknown firmware)"
+11 -1
View File
@@ -56,7 +56,6 @@ class HCI(object):
return HCI_UART_TYPE_CLASS[uart_type].from_data(data[1:])
def __init__(self, uart_type):
self.event_code = None
self.uart_type = uart_type
def getRaw(self):
@@ -958,6 +957,7 @@ class StackDumpReceiver:
if self.memdump_addr == None:
self.memdump_addr = addr
self.memdumps[addr-self.memdump_addr] = data[4:]
log.debug("Stack dump handling addr %08x", addr-self.memdump_addr)
def finishStackDump(self):
dump = fit(self.memdumps)
@@ -1092,6 +1092,16 @@ class StackDumpReceiver:
self.finishStackDump()
return True
# On a Raspberry Pi 3, the last packet of a stack dump is '1b0340df0338'.... so it's 0x40
elif packet_type == 0xe8:
# FIXME Raspi memdump is divided in two parts!
# address change from 0001fe38 to packet type e8 and then it's computing addr -0130000
# negative addr does not work with finishStackDump()
# so even though the last packet is 0x40, let's just finish on 0xe8
log.info("End of first stackdump block, writing to file and skipping second...")
self.finishStackDump()
return True
return False
+5 -14
View File
@@ -4,19 +4,11 @@ import subprocess
import datetime
from pwn import *
import fcntl
from .core import InternalBlue
from core import InternalBlue
import hci
import Queue
import threading
try:
from typing import List
from internalblue import Device
except:
pass
# from /usr/include/bluetooth/hci.h:
#define HCIDEVUP _IOW('H', 201, int)
#define HCIGETDEVLIST _IOR('H', 210, int)
@@ -42,7 +34,6 @@ class HCICore(InternalBlue):
self.doublecheck = False
def getHciDeviceList(self):
# type: () -> List[Device]
"""
Get a list of available HCI devices. The list is obtained by executing
ioctl syscalls HCIGETDEVLIST and HCIGETDEVINFO. The returned list
@@ -132,8 +123,8 @@ class HCICore(InternalBlue):
log.info("HCI device: %s [%s] flags=%d<%s>" %
(dev["dev_name"], dev["dev_bdaddr"],
dev["dev_flags"], dev["dev_flags_str"]))
device_list.append((self, dev["dev_name"], 'hci: %s (%s) <%s>' %
(dev["dev_bdaddr"], dev["dev_name"], dev["dev_flags_str"])))
device_list.append([self, dev["dev_name"], 'hci: %s (%s) <%s>' %
(dev["dev_bdaddr"], dev["dev_name"], dev["dev_flags_str"])])
if len(device_list) == 0:
log.info('No connected HCI device found')
@@ -193,8 +184,8 @@ class HCICore(InternalBlue):
record_data = self.s_snoop.recv(1024)
except socket.timeout:
continue # this is ok. just try again without error
except Exception as e:
log.critical("Lost device interface with exception {}, terminating receive thread...".format(e))
except Exception:
log.critical("Lost device interface, terminating receive thread...")
self.exit_requested = True
continue
-2
View File
@@ -114,8 +114,6 @@ class macOSCore(InternalBlue):
# Put all relevant infos into a tuple. The HCI packet is parsed with the help of hci.py.
record = (hci.parse_hci_packet(record_data), 0, 0, 0, 0, 0) #TODO not sure if this causes trouble?
log.debug("Recv: " + str(record[0]))
log.info(binascii.hexlify(record_data))
# Put the record into all queues of registeredHciRecvQueues if their
# filter function matches.
for queue, filter_function in self.registeredHciRecvQueues: # TODO filter_function not working with bluez modifications
-175
View File
@@ -1,175 +0,0 @@
import binascii
import time
class SocketRecvHook():
def __init__(self, socket):
# type: (socket.socket) -> None
self.socket = socket
self.replace = False
def recv_hook(self, data):
raise NotImplementedError()
def recv_replace(self, length, **kwargs):
raise NotImplementedError()
def recv(self, length, **kwargs):
if not self.replace:
data = self.socket.recv(length, **kwargs)
else:
data = self.recv_replace(length, **kwargs)
self.recv_hook(data)
return data
def recvfrom(self, length):
# type: (int) -> Tuple[bytes, Any]
if not self.replace:
data, addr = self.socket.recvfrom(length)
self.recv_hook(data)
return data
class SocketInjectHook():
def __init__(self, socket):
# type: (socket.socket) -> None
self.socket = socket
self.replace = False
def close(self):
self.socket.close()
def send(self,data):
self.send_hook(data)
if not self.replace:
try:
self.socket.send(data)
except Exception as e:
self.send_exception(e)
raise e
else:
self.send_replace(data)
def send_hook(self,result):
raise NotImplementedError()
def send_replace(self,data):
raise NotImplementedError()
def send_exception(self, e):
raise NotImplementedError()
class SocketDuplexHook(SocketInjectHook, SocketRecvHook):
def __init__(self, socket):
# type: (socket.socket) -> None
self.socket = socket
self.replace = False
pass
class HookBase():
def send_hook(self,data):
raise NotImplementedError
def recv_hook(self, data):
raise NotImplementedError
class TraceToFileHook(SocketDuplexHook):
def __init__(self, socket, filename='/tmp/bt_hci.log'):
SocketDuplexHook.__init__(self, socket)
self.file = open(filename, 'a')
self.replace = False
self.log = []
def recv_hook(self, data):
line = "RX {}\n".format(binascii.hexlify(data))
print(line)
self.log.append(line)
def send_hook(self, data):
line = "TX {}\n".format(binascii.hexlify(data))
print(line)
self.log.append(line)
def send_exception(self, e):
line = "EX '{}'\n".format(e)
print(line)
self.log.append(line)
def close(self):
self.socket.close()
self.log.append("Socket closed\n")
self.file.writelines(self.log)
self.file.close()
import socket
class PrintTrace(SocketDuplexHook):
def send_hook(self, data, **kwargs):
print("Sent: {}".format(binascii.hexlify(data)))
def recv_hook(self, data, **kwargs):
print("Recv: {}".format(binascii.hexlify(data)))
def recvfrom_hook(self, data, **kwargs):
print("Recv: {}".format(binascii.hexlify(data)))
class ReplaySocket():
def __init__(self, filename='/tmp/bt.log'):
super(ReplaySocket, self).__init__()
self.log = open(filename).readlines()
self.index = 0
def send(self, data, **kwargs):
encoded_data = "" # type: str
direction, encoded_data = self.log[self.index].split(" ")
assert(direction == "TX")
log_data = binascii.unhexlify(encoded_data.rstrip('\n'))
assert(data == log_data)
self.index+=1
def recv(self, **kwargs):
time.sleep(0.01)
direction, encoded_data = self.log[self.index].split(" ")
if direction == "RX":
return binascii.unhexlify(encoded_data.rstrip('\n'))
else:
raise socket.timeout()
from internalblue.core import InternalBlue
try:
import typing
from typing import Type
except ImportError:
pass
def hook(core, socket_hook):
# type: (Type[InternalBlue], Type[SocketDuplexHook]) -> None
def wrap_socket_setup(orig_func):
def wrapped_socket_setup(self, *args, **kwargs):
status = orig_func(self, *args, **kwargs)
if self.s_inject == self.s_snoop:
h = socket_hook(self.s_inject)
self.s_inject = h
self.s_snoop = h
else:
self.s_inject = socket_hook(self.s_inject)
self.s_snoop = socket_hook(self.s_snoop)
return status
return wrapped_socket_setup
core._setupSockets = wrap_socket_setup(core._setupSockets)
-159
View File
@@ -1,159 +0,0 @@
#!/usr/bin/env python2
import socket
import Queue
import hci
from pwn import *
from core import InternalBlue
import binascii
filepath = os.path.dirname(os.path.abspath(__file__))
try:
import typing
from typing import List, Tuple, Any
from internalblue.core import InternalBlue
except:
pass
class testCore(InternalBlue):
def __init__(self, queue_size=1000, btsnooplog_filename='btsnoop.log', log_level='info', fix_binutils='True', data_directory="."):
super(testCore, self).__init__(queue_size, btsnooplog_filename, log_level, fix_binutils, data_directory=".")
file = open(filepath+'/../dummymemdump.bin', mode='rb')
self.memory = file.read()
file.close()
self.doublecheck = False
def device_list(self):
# type: () -> List[Tuple[InternalBlue,str,str]]
"""
Get a list of connected devices
"""
if self.exit_requested:
self.shutdown()
if self.running:
log.warn("Already running. Call shutdown() first!")
return []
# assume that a explicitly specified iPhone exists
device_list = [(self, "Testchip", "Testchip")]
return device_list
def sendH4(self, h4type, data, timeout=2):
"""
Send an arbitrary HCI packet by pushing a send-task into the
sendQueue. This function blocks until the response is received
or the timeout expires. The return value is the Payload of the
HCI Command Complete Event which was received in response to
the command or None if no response was received within the timeout.
"""
queue = Queue.Queue(1)
try:
self.sendQueue.put((h4type, data, queue, None), timeout=timeout)
ret = queue.get(timeout=timeout)
return ret
except Queue.Empty:
log.warn("sendH4: waiting for response timed out!")
return None
except Queue.Full:
log.warn("sendH4: send queue is full!")
return None
def local_connect(self):
return True
def _setupSockets(self):
self.hciport = random.randint(60000, 65535 - 1)
log.debug("_setupSockets: Selected random ports snoop=%d and inject=%d" % (self.hciport, self.hciport + 1))
log.info("Wireshark configuration (on Loopback interface): udp.port == %d || udp.port == %d" % (
self.hciport, self.hciport + 1))
# Create s_snoop socket
self.s_snoop = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.s_snoop.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
self.s_snoop.bind(('127.0.0.1', self.hciport))
self.s_snoop.settimeout(0.5)
self.s_snoop.setblocking(True)
# Create s_inject
self.s_inject = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.s_inject.settimeout(0.5)
self.s_inject.setblocking(True)
time.sleep(1.5)
return True
def _recvThreadFunc(self):
log.debug("Receive Thread terminated.")
def _sendThreadFunc(self):
log.debug("Send Thread started.")
while not self.exit_requested:
# Little bit ugly: need to re-apply changes to the global context to the thread-copy
context.log_level = self.log_level
# Wait for 'send task' in send queue
try:
task = self.sendQueue.get(timeout=0.5)
except Queue.Empty:
continue
# Extract the components of the task
h4type, data, queue, filter_function = task
# Prepend UART TYPE and length.
out = p8(h4type) + p8(len(data)) + data
# Send command to the chip using IOBluetoothExtended framework
h4type, data, queue, filter_function = task
opcode = binascii.hexlify(data[1]) + binascii.hexlify(data[0])
log.debug("Sending command: 0x" + binascii.hexlify(data))
# if the caller expects a response: register a queue to receive the response
if queue is not None and filter_function is not None:
recvQueue = Queue.Queue(1)
self.registerHciRecvQueue(recvQueue, filter_function)
# if the caller expects a response:
# Wait for the HCI event response by polling the recvQueue
if queue is not None and filter_function is not None:
# Return responses according to the opcode & operands
if opcode == '1001':
record_data = '040E0C0101100006b415060f000e22'.decode('hex')
data = hci.parse_hci_packet(record_data).data
elif opcode == 'fc4d':
length = int(binascii.hexlify(data[7]), 16)
address = int(binascii.hexlify(data[6]+data[5]+data[4]+data[3]), 16)
data = '014dfc00'.decode('hex') + self.memory[address:address+length]
elif opcode == 'fc4c':
log.info(data.encode('hex'))
length = int(binascii.hexlify(data[2]), 16)
address = int(binascii.hexlify(data[6]+data[5]+data[4]+data[3]), 16)
self.memory = self.memory[:address] + data[7:len(data)] + self.memory[address+length:]
else:
print(opcode)
queue.put(data)
self.unregisterHciRecvQueue(recvQueue)
log.debug("Send Thread terminated.")
def enableBroadcomDiagnosticLogging(self, enable):
return
def _teardownSockets(self):
return True
def shutdown(self):
return True
View File
-17
View File
@@ -1,17 +0,0 @@
import unittest
from internalblue.testcore import testCore
class DummyCoreTest(unittest.TestCase):
def setUp(self):
t = testCore(log_level='debug', data_directory='/tmp')
dev = t.device_list()[0]
reference = dev[0]
reference.interface = dev[1]
self.assert_(reference.connect(), 'Connect failed')
self.reference = reference
def tearDown(self):
self.reference.shutdown()
-31
View File
@@ -1,31 +0,0 @@
import nose
from .dummy_core_test import DummyCoreTest
from internalblue.cmds import CmdHexdump, CmdSendHciCmd
import unittest
class TestCmdSendHciCmd(DummyCoreTest):
def test_version(self):
cmd = CmdSendHciCmd("sendhcicmd 0x1001", self.reference)
result = cmd.work()
nose.tools.assert_equal(result, b'\x01\x01\x10\x00\x06\xb4\x15\x06\x0f\x00\x0e"')
pass
def test_write(self):
cmd = CmdSendHciCmd("sendhcicmd 0xfc4c", self.reference)
pass
#cmd.readMem()
if __name__ == '__main__':
unittest.main()
-45
View File
@@ -1,45 +0,0 @@
import nose
try:
from typing import TYPE_CHECKING
if TYPE_CHECKING:
from internalblue import Address
except ImportError:
Address = lambda x: x
pass
from .dummy_core_test import DummyCoreTest
from internalblue.cmds import CmdHexdump
import unittest
class TestDirectMemReadWrite(DummyCoreTest):
def test_read_mem(self):
from internalblue.cmds import CmdHexdump
hdxdmp = CmdHexdump("hexdump 0xc0 -l 0x20", self.reference)
dump = hdxdmp.readMem(Address(0xc0), 0x20)
nose.tools.assert_equal(dump, b'\\\x01\x00\x00-.\x03\x00Copyright (c) Broadcom C')
def test_write_mem(self):
from internalblue.cmds import CmdWriteMem
cmd = CmdWriteMem("writemem --hex 0xc0 41424344", self.reference)
data = b'FOOBAR'
status = cmd.writeMem(Address(0x1000), data)
nose.tools.assert_true(status)
read = cmd.readMem(Address(0x1000), len(data))
nose.tools.assert_equal(data, read)
if __name__ == '__main__':
unittest.main()