Compare commits
4 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| e89a84812e | |||
| dd9d76cff9 | |||
| 58f9688b84 | |||
| 45ec18744e |
@@ -54,6 +54,17 @@ was also recorded and gives a more high level overview.
|
||||
|
||||
Our talk [Playing with Bluetooth](https://media.ccc.de/v/2019-185-playing-with-bluetooth) focuses on new device support
|
||||
within *InternalBlue* and the Patchram state of various devices.
|
||||
|
||||
* **36C3 Talk** (12/2019)
|
||||
|
||||
The rather generic talk [All wireless communication stacks are equally broken](https://media.ccc.de/v/36c3-10531-all_wireless_communication_stacks_are_equally_broken)
|
||||
points out a couple of new research directions and new Bluetooth projects coming up.
|
||||
|
||||
* **EWSN Paper & Demo** (02/2020)
|
||||
|
||||
We did some work on improving blacklisting performance of BLE data connections. Currently in a separate *blacklisting* branch.
|
||||
|
||||
|
||||
|
||||
|
||||
Supported Features
|
||||
@@ -214,7 +225,7 @@ can replace them with anything you want.
|
||||
License
|
||||
-------
|
||||
|
||||
Copyright 2018-2019 Dennis Mantz, Jiska Classen
|
||||
Copyright 2018-2020 The InternalBlue Team
|
||||
|
||||
Permission is hereby granted, free of charge, to any person obtaining a copy of
|
||||
this software and associated documentation files (the "Software"), to deal in
|
||||
|
||||
Binary file not shown.
@@ -1,62 +1 @@
|
||||
|
||||
try:
|
||||
from Queue import Queue
|
||||
from typing import List, Optional, Any, TYPE_CHECKING, Tuple, Union, NewType, Callable, Dict
|
||||
|
||||
if TYPE_CHECKING:
|
||||
import datetime
|
||||
from internalblue.hci import HCI
|
||||
from internalblue.core import InternalBlue
|
||||
Address = NewType("Address", int)
|
||||
Record = Tuple[HCI, int, int, int, Any, datetime.datetime]
|
||||
FilterFunction = Callable[[Record], bool]
|
||||
ConnectionNumber = NewType("ConnectionNumber", int)
|
||||
ConnectionIndex = NewType("ConnectionIndex", int)
|
||||
|
||||
BluetoothAddress = NewType("BluetoothAddress", bytes)
|
||||
ConnectionDict = NewType("ConnectionDict", Dict[str,Any])
|
||||
HeapInformation = NewType("HeapInformation", Dict[str, Any])
|
||||
QueueInformation = NewType('QueueInformation', Dict[str, Any])
|
||||
Opcode = NewType('Opcode', int)
|
||||
HCI_CMD = NewType('HCI_CMD', int)
|
||||
Task = Tuple[HCI_CMD, bytes, Queue.Queue, Callable[[Record], bool]]
|
||||
|
||||
Device = NewType("Device", Dict[str, Any])
|
||||
"""{"dev_id": dev_id,
|
||||
"dev_name": dev_name,
|
||||
"dev_bdaddr": dev_bdaddr,
|
||||
"dev_flags": dev_flags,
|
||||
"dev_flags_str": dev_flags_str}"""
|
||||
# InternalBlueCore, Device Name, SomeString
|
||||
DeviceTuple = Tuple[InternalBlue, str, str]
|
||||
|
||||
|
||||
except:
|
||||
pass
|
||||
|
||||
|
||||
import logging
|
||||
|
||||
|
||||
class ProgressLog():
|
||||
"""
|
||||
Hack to get around the dependency to the pwnlib logger
|
||||
This looses functionality, but at some point this can be replaced by something like progressbar2
|
||||
"""
|
||||
def __init__(self, init_msg,logger):
|
||||
self.logger = logger
|
||||
self.logger.warning(init_msg)
|
||||
|
||||
def failure(self, msg):
|
||||
self.logger.warning(msg)
|
||||
|
||||
def status(self, msg):
|
||||
self.logger.info(msg)
|
||||
|
||||
def success(self, msg):
|
||||
self.logger.info(msg)
|
||||
|
||||
def getLogger(name):
|
||||
logger = logging.getLogger(name)
|
||||
logger.progress = lambda msg: ProgressLog(msg, logger)
|
||||
return logger
|
||||
|
||||
@@ -4,11 +4,11 @@ import datetime
|
||||
import socket
|
||||
import Queue
|
||||
import random
|
||||
from internalblue import hci
|
||||
import hci
|
||||
|
||||
from pwn import *
|
||||
|
||||
from .core import InternalBlue
|
||||
from core import InternalBlue
|
||||
|
||||
|
||||
class ADBCore(InternalBlue):
|
||||
|
||||
+13
-56
@@ -33,20 +33,11 @@ import os
|
||||
import traceback
|
||||
import argparse
|
||||
|
||||
from .adbcore import ADBCore
|
||||
from .hcicore import HCICore
|
||||
from adbcore import ADBCore
|
||||
from hcicore import HCICore
|
||||
from sys import platform
|
||||
|
||||
from . import cmds
|
||||
|
||||
try:
|
||||
import typing
|
||||
from typing import List, Optional
|
||||
from internalblue.core import InternalBlue
|
||||
from . import DeviceTuple
|
||||
|
||||
except:
|
||||
pass
|
||||
import cmds
|
||||
|
||||
HISTFILE = "_internalblue.hist"
|
||||
|
||||
@@ -62,15 +53,11 @@ type <help> for usage information!\n\n"""
|
||||
for line in banner:
|
||||
term.output(text.blue(line))
|
||||
|
||||
def commandLoop(internalblue, init_commands=None):
|
||||
cmdstack = init_commands.split(';')[::-1] if init_commands else None
|
||||
def commandLoop(internalblue):
|
||||
while internalblue.running and not internalblue.exit_requested:
|
||||
cmd_instance = None
|
||||
try:
|
||||
if cmdstack:
|
||||
cmdline = cmdstack.pop().strip()
|
||||
else:
|
||||
cmdline = term.readline.readline(prompt='> ').strip()
|
||||
cmdline = term.readline.readline(prompt='> ').strip()
|
||||
cmdword = cmdline.split(' ')[0].split('=')[0]
|
||||
if(cmdword == ''):
|
||||
continue
|
||||
@@ -102,7 +89,7 @@ def commandLoop(internalblue, init_commands=None):
|
||||
|
||||
|
||||
# Main Program Start
|
||||
def internalblue_cli(argv):
|
||||
def internalblue_cli():
|
||||
print_banner()
|
||||
|
||||
parser = argparse.ArgumentParser()
|
||||
@@ -110,11 +97,7 @@ def internalblue_cli(argv):
|
||||
parser.add_argument("--verbose", "-v", help="Set log level to DEBUG", action="store_true")
|
||||
parser.add_argument("--ios-device", "-i", help="Tell internalblue to connect to a remote iPhone HCI socket. Specify socket IP address and port (i.e., 172.20.10.1:1234).")
|
||||
parser.add_argument("--serialsu", "-s", help="On ADB, directly try su/serial/busybox scripting, if you do not have a special bluetooth.default.so file.", action="store_true")
|
||||
parser.add_argument("--testdevice", "-t", help="Use a dummy test device to execute testcases", action="store_true")
|
||||
parser.add_argument("--trace", help="Trace hci connection")
|
||||
parser.add_argument("--device", help="Specify device/core to be used")
|
||||
parser.add_argument("--commands", "-c", help="CLI command to run before prompting, seperated by ';' (used for easier testing)")
|
||||
args = parser.parse_args(argv)
|
||||
args = parser.parse_args()
|
||||
|
||||
if args.data_directory is not None:
|
||||
data_directory = args.data_directory
|
||||
@@ -136,26 +119,13 @@ def internalblue_cli(argv):
|
||||
readline_completer = term.completer.LongestPrefixCompleter(words=cmd_keywords)
|
||||
term.readline.set_completer(readline_completer)
|
||||
|
||||
|
||||
|
||||
if args.trace:
|
||||
from .socket_hooks import hook
|
||||
from internalblue import socket_hooks
|
||||
HookClass = getattr(socket_hooks, args.trace)
|
||||
hook(HCICore, HookClass)
|
||||
|
||||
|
||||
# Initalize cores and get devices
|
||||
# As macOS has additional dependencies (objc), only import it here if needed
|
||||
connection_methods = [] # type: List[InternalBlue]
|
||||
if args.ios_device:
|
||||
from .ioscore import iOSCore
|
||||
from ioscore import iOSCore
|
||||
connection_methods = [iOSCore(args.ios_device, log_level=log_level, data_directory=data_directory)]
|
||||
elif args.testdevice:
|
||||
from .testcore import testCore
|
||||
connection_methods = [testCore(log_level=log_level, data_directory=data_directory)]
|
||||
elif platform == "darwin":
|
||||
from .macoscore import macOSCore
|
||||
from macoscore import macOSCore
|
||||
connection_methods = [
|
||||
macOSCore(log_level=log_level, data_directory=data_directory),
|
||||
ADBCore(log_level=log_level, data_directory=data_directory)]
|
||||
@@ -164,25 +134,12 @@ def internalblue_cli(argv):
|
||||
ADBCore(log_level=log_level, data_directory=data_directory, serial=args.serialsu),
|
||||
HCICore(log_level=log_level, data_directory=data_directory)]
|
||||
|
||||
devices = [] # type: List[DeviceTuple]
|
||||
devices = []
|
||||
for connection_method in connection_methods:
|
||||
devices.extend(connection_method.device_list())
|
||||
|
||||
|
||||
device = None # type: Optional[DeviceTuple]
|
||||
if len(devices) > 0:
|
||||
if args.device:
|
||||
matching_devices = [ dev for dev in devices if dev[1] == args.device]
|
||||
if len(matching_devices) > 1:
|
||||
log.critical("Found multiple matching devices")
|
||||
exit(-1)
|
||||
elif len(matching_devices) == 1:
|
||||
log.info("Found device is: {}".format(matching_devices[0]))
|
||||
device = matching_devices[0]
|
||||
else:
|
||||
log.critical("No matching devices found")
|
||||
exit(-1)
|
||||
elif len(devices) == 1:
|
||||
if len(devices) == 1:
|
||||
device = devices[0]
|
||||
else:
|
||||
i = options('Please specify device:', [d[2] for d in devices], 0)
|
||||
@@ -203,7 +160,7 @@ def internalblue_cli(argv):
|
||||
exit(-1)
|
||||
|
||||
# Enter command loop (runs until user quits)
|
||||
commandLoop(reference, init_commands=args.commands)
|
||||
commandLoop(reference)
|
||||
|
||||
# shutdown connection
|
||||
reference.shutdown()
|
||||
@@ -218,5 +175,5 @@ def internalblue_cli(argv):
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
internalblue_cli(sys.argv[1:])
|
||||
internalblue_cli()
|
||||
|
||||
|
||||
+6
-40
@@ -36,25 +36,7 @@ import time
|
||||
import select
|
||||
import json
|
||||
|
||||
|
||||
from internalblue import getLogger
|
||||
|
||||
#log = getLogger(__name__)
|
||||
|
||||
|
||||
|
||||
try:
|
||||
from typing import List, Optional, Any, TYPE_CHECKING, Tuple, Type
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from internalblue.core import InternalBlue
|
||||
from internalblue.hci import HCI
|
||||
from internalblue import Record, BluetoothAddress, Address
|
||||
except:
|
||||
pass
|
||||
|
||||
def getCmdList():
|
||||
# type: () -> List[Type['Cmd']]
|
||||
""" Returns a list of all commands which are defined in this cmds.py file.
|
||||
This is done by searching for all subclasses of Cmd
|
||||
"""
|
||||
@@ -62,7 +44,6 @@ def getCmdList():
|
||||
if inspect.isclass(obj) and issubclass(obj, Cmd)][1:]
|
||||
|
||||
def findCmd(keyword):
|
||||
# type: (str) -> Optional[Type['Cmd']]
|
||||
""" Find and return a Cmd subclass for a given keyword.
|
||||
"""
|
||||
command_list = getCmdList()
|
||||
@@ -80,13 +61,11 @@ def auto_int(x):
|
||||
return int(x, 0)
|
||||
|
||||
def bt_addr_to_str(bt_addr):
|
||||
# type: (BluetoothAddress) -> str
|
||||
""" Convert a Bluetooth address (6 bytes) into a human readable format.
|
||||
"""
|
||||
return ":".join([b.encode("hex") for b in bt_addr])
|
||||
|
||||
def parse_bt_addr(bt_addr):
|
||||
# type: (Any) -> Optional[BluetoothAddress]
|
||||
""" Convert Bluetooth address argument and check lengths.
|
||||
"""
|
||||
addr = bt_addr
|
||||
@@ -113,12 +92,11 @@ class Cmd:
|
||||
'keywords' list as member variable. The actual implementation of the
|
||||
command should be located in the work() method.
|
||||
"""
|
||||
keywords = [] # type: List[str]
|
||||
keywords = []
|
||||
|
||||
memory_image = None # type: Optional[bytes]
|
||||
memory_image = None
|
||||
|
||||
def __init__(self, cmdline, internalblue):
|
||||
# type: (str, InternalBlue) -> None
|
||||
self.cmdline = cmdline
|
||||
self.internalblue = internalblue
|
||||
self.memory_image_template_filename = internalblue.data_directory + "/memdump__template.bin"
|
||||
@@ -127,28 +105,23 @@ class Cmd:
|
||||
self.internalblue.fw.__name__[6:12] + "_template.bin"
|
||||
|
||||
def __str__(self):
|
||||
# type: () -> str
|
||||
return self.cmdline
|
||||
|
||||
def work(self):
|
||||
# type: () -> bool
|
||||
return True
|
||||
|
||||
def abort_cmd(self):
|
||||
# type: () -> None
|
||||
self.aborted = True
|
||||
if hasattr(self, 'progress_log'):
|
||||
self.progress_log.failure("Command aborted")
|
||||
|
||||
def getArgs(self):
|
||||
# type: () -> Any
|
||||
try:
|
||||
return self.parser.parse_args(self.cmdline.split(' ')[1:])
|
||||
except SystemExit:
|
||||
return None
|
||||
|
||||
def isAddressInSections(self, address, length=0, sectiontype=""):
|
||||
# type: (int, int, str) -> bool
|
||||
if not self.internalblue.fw:
|
||||
return False
|
||||
|
||||
@@ -164,15 +137,12 @@ class Cmd:
|
||||
return False
|
||||
|
||||
def readMem(self, address, length, progress_log=None, bytes_done=0, bytes_total=0):
|
||||
# type: (Address, int, Optional[Any], int, int) -> Optional[bytes]
|
||||
return self.internalblue.readMem(address, length, progress_log, bytes_done, bytes_total)
|
||||
|
||||
def writeMem(self, address, data, progress_log=None, bytes_done=0, bytes_total=0):
|
||||
# type: (Address, bytes, Optional[Any], int, int) -> bool
|
||||
return self.internalblue.writeMem(address, data, progress_log, bytes_done, bytes_total)
|
||||
|
||||
def initMemoryImage(self):
|
||||
# type: () -> None
|
||||
"""
|
||||
Initially read out a chip's memory, all sections (RAM+ROM).
|
||||
:return:
|
||||
@@ -198,7 +168,6 @@ class Cmd:
|
||||
self.refreshMemoryImage()
|
||||
|
||||
def refreshMemoryImage(self):
|
||||
# type: () -> None
|
||||
"""
|
||||
Update an existing memory dump, only RAM sections.
|
||||
:return:
|
||||
@@ -209,13 +178,12 @@ class Cmd:
|
||||
for section in self.internalblue.fw.SECTIONS:
|
||||
if not section.is_rom:
|
||||
sectiondump = self.readMem(section.start_addr, section.size(), self.progress_log, bytes_done, bytes_total)
|
||||
if sectiondump and Cmd.memory_image:
|
||||
if sectiondump:
|
||||
Cmd.memory_image = Cmd.memory_image[0:section.start_addr] + sectiondump + Cmd.memory_image[section.end_addr:]
|
||||
bytes_done += section.size()
|
||||
self.progress_log.success("Received Data: complete")
|
||||
|
||||
def getMemoryImage(self, refresh=False):
|
||||
# type: (bool) -> Any
|
||||
if Cmd.memory_image is None:
|
||||
self.initMemoryImage()
|
||||
elif refresh:
|
||||
@@ -443,7 +411,6 @@ class CmdMonitor(Cmd):
|
||||
self.wireshark_process = None
|
||||
|
||||
def adbhciCallback(self, record):
|
||||
# type: (Record) -> None
|
||||
hcipkt, orig_len, inc_len, flags, drops, recvtime = record
|
||||
|
||||
dummy = "\x00\x00\x00" # TODO: Figure out purpose of these fields
|
||||
@@ -536,7 +503,6 @@ class CmdDumpMem(Cmd):
|
||||
help="Only dump the two RAM sections.")
|
||||
parser.add_argument("--file", "-f", default="memdump.bin",
|
||||
help="Filename of memory dump (default: %(default)s)")
|
||||
parser.add_argument("--overwrite", action='store_true')
|
||||
|
||||
def work(self):
|
||||
args = self.getArgs()
|
||||
@@ -551,7 +517,7 @@ class CmdDumpMem(Cmd):
|
||||
for section in filter(lambda s: s.is_ram, self.internalblue.fw.SECTIONS):
|
||||
filename = args.file + "_" + hex(section.start_addr)
|
||||
if os.path.exists(filename):
|
||||
if not (args.overwrite or yesno("Update '%s'?" % filename)):
|
||||
if not yesno("Update '%s'?" % filename):
|
||||
log.info("Skipping section @%s" % hex(section.start_addr))
|
||||
bytes_done += section.size()
|
||||
continue
|
||||
@@ -565,7 +531,7 @@ class CmdDumpMem(Cmd):
|
||||
|
||||
# Get complete memory image
|
||||
if os.path.exists(args.file):
|
||||
if not (args.overwrite or yesno("Update '%s'?" % os.path.abspath(args.file))):
|
||||
if not yesno("Update '%s'?" % os.path.abspath(args.file)):
|
||||
return False
|
||||
|
||||
dump = self.getMemoryImage(refresh=not args.norefresh)
|
||||
@@ -960,7 +926,7 @@ class CmdSendHciCmd(Cmd):
|
||||
else:
|
||||
data += data_part.decode('hex')
|
||||
|
||||
return self.internalblue.sendHciCommand(args.cmdcode, data)
|
||||
self.internalblue.sendHciCommand(args.cmdcode, data)
|
||||
|
||||
return True
|
||||
|
||||
|
||||
+11
-69
@@ -28,31 +28,17 @@
|
||||
from abc import ABCMeta, abstractmethod
|
||||
|
||||
from pwn import *
|
||||
from .fw.fw import Firmware
|
||||
from fw.fw import Firmware
|
||||
import datetime
|
||||
import time
|
||||
import Queue
|
||||
from . import hci
|
||||
import hci
|
||||
|
||||
try:
|
||||
from typing import List, Optional, Any, TYPE_CHECKING, Tuple, Union, NewType, Callable
|
||||
from internalblue import Address, Record, Task, HCI_CMD, FilterFunction, ConnectionNumber, ConnectionDict, \
|
||||
ConnectionIndex, BluetoothAddress, HeapInformation, QueueInformation, Opcode
|
||||
from internalblue.hci import HCI
|
||||
from . import DeviceTuple
|
||||
if TYPE_CHECKING:
|
||||
pass
|
||||
except:
|
||||
pass
|
||||
|
||||
#import logging
|
||||
#log = logging.getLogger(__name__)
|
||||
|
||||
class InternalBlue:
|
||||
__metaclass__ = ABCMeta
|
||||
|
||||
def __init__(self, queue_size=1000, btsnooplog_filename='btsnoop.log', log_level='info', fix_binutils='True', data_directory="."):
|
||||
# type: (int, str, str, bool, str) -> None
|
||||
context.log_level = log_level
|
||||
context.log_file = data_directory + '/_internalblue.log'
|
||||
context.arch = "thumb"
|
||||
@@ -62,11 +48,11 @@ class InternalBlue:
|
||||
|
||||
|
||||
self.data_directory = data_directory
|
||||
self.s_inject = None #type: socket.socket # This is the TCP socket to the HCI inject port
|
||||
self.s_snoop = None #type: socket.socket # This is the TCP socket to the HCI snoop port
|
||||
self.s_inject = None # This is the TCP socket to the HCI inject port
|
||||
self.s_snoop = None # This is the TCP socket to the HCI snoop port
|
||||
|
||||
# If btsnooplog_filename is set, write all incomming HCI packets to a file (can be viewed in wireshark for debugging)
|
||||
if btsnooplog_filename is not None:
|
||||
if btsnooplog_filename != None:
|
||||
self.write_btsnooplog = True
|
||||
self.btsnooplog_file = open(self.data_directory + "/" + btsnooplog_filename, "wb")
|
||||
else:
|
||||
@@ -92,7 +78,7 @@ class InternalBlue:
|
||||
# firmware (the response is recognized with the help of the filter function).
|
||||
# Once the response arrived, it puts the response into the response_queue from
|
||||
# the tuple. See sendH4() and sendHciCommand().
|
||||
self.sendQueue = Queue.Queue(queue_size) # type: Queue.Queue[Task]
|
||||
self.sendQueue = Queue.Queue(queue_size)
|
||||
|
||||
self.recvThread = None # The thread which is responsible for the HCI snoop socket
|
||||
self.sendThread = None # The thread which is responsible for the HCI inject socket
|
||||
@@ -117,7 +103,7 @@ class InternalBlue:
|
||||
# filter_function will be called for each packet that is received and only if it returns
|
||||
# True, the packet will be put into the queue. The filter_function can be None in order
|
||||
# to put all packets into the queue.
|
||||
self.registeredHciRecvQueues = [] # type: List[Tuple[Queue.Queue[Record], FilterFunction]]
|
||||
self.registeredHciRecvQueues = []
|
||||
|
||||
self.exit_requested = False # Will be set to true when the framework wants to shut down (e.g. on error or user exit)
|
||||
self.running = False # 'running' is True once the connection to the HCI sockets is established
|
||||
@@ -176,7 +162,6 @@ class InternalBlue:
|
||||
return False
|
||||
|
||||
def _parse_time(self, time):
|
||||
# type: (Any) -> datetime.datetime
|
||||
"""
|
||||
Taken from: https://github.com/joekickass/python-btsnoop
|
||||
|
||||
@@ -193,11 +178,9 @@ class InternalBlue:
|
||||
|
||||
@abstractmethod
|
||||
def _recvThreadFunc(self):
|
||||
# type: () -> None
|
||||
pass
|
||||
|
||||
def _sendThreadFunc(self):
|
||||
# type: () -> None
|
||||
"""
|
||||
This is the run-function of the sendThread. It polls the sendQueue for new 'send tasks'
|
||||
and executes them (sends H4 commands to the chip and returns the response).
|
||||
@@ -279,8 +262,8 @@ class InternalBlue:
|
||||
try:
|
||||
log.debug("_sendThreadFunc: Send: " + str(out.encode('hex')))
|
||||
self.s_inject.send(out)
|
||||
except Exception as e:
|
||||
log.warn("_sendThreadFunc: Sending to socket failed with {}, reestablishing connection.\nWith HCI sockets, some HCI commands require root!".format(e))
|
||||
except:
|
||||
log.warn("_sendThreadFunc: Sending to socket failed, reestablishing connection.\nWith HCI sockets, some HCI commands require root!")
|
||||
# socket are terminated by hcicore..
|
||||
self._teardownSockets()
|
||||
self._setupSockets()
|
||||
@@ -304,7 +287,6 @@ class InternalBlue:
|
||||
log.debug("Send Thread terminated.")
|
||||
|
||||
def _tracepointHciCallbackFunction(self, record):
|
||||
# type: (Record) -> None
|
||||
hcipkt = record[0] # get HCI Event packet
|
||||
timestamp = record[5] # get timestamp
|
||||
|
||||
@@ -366,7 +348,6 @@ class InternalBlue:
|
||||
|
||||
|
||||
def addTracepoint(self, address):
|
||||
# type: (Address) -> bool
|
||||
# Check if constants are defined in fw.py
|
||||
for const in ['TRACEPOINT_BODY_ASM_LOCATION', 'TRACEPOINT_BODY_ASM_SNIPPET',
|
||||
'TRACEPOINT_HOOK_ASM', 'TRACEPOINT_HOOKS_LOCATION',
|
||||
@@ -460,7 +441,6 @@ class InternalBlue:
|
||||
return True
|
||||
|
||||
def deleteTracepoint(self, address):
|
||||
# type: (Address) -> bool
|
||||
if not self.check_running():
|
||||
return False
|
||||
|
||||
@@ -480,7 +460,6 @@ class InternalBlue:
|
||||
return True
|
||||
|
||||
def check_running(self):
|
||||
# type: () -> bool
|
||||
"""
|
||||
Check if the framework is running (i.e. the sockets are connected,
|
||||
the recv and send threads are running and exit_requested is not True)
|
||||
@@ -496,11 +475,10 @@ class InternalBlue:
|
||||
|
||||
@abstractmethod
|
||||
def device_list(self):
|
||||
# type: () -> List[DeviceTuple]
|
||||
pass
|
||||
|
||||
def connect(self):
|
||||
# type: () -> bool
|
||||
|
||||
if self.exit_requested:
|
||||
self.shutdown()
|
||||
|
||||
@@ -546,7 +524,6 @@ class InternalBlue:
|
||||
return True
|
||||
|
||||
def initialize_fimware(self):
|
||||
# type: () -> bool
|
||||
"""
|
||||
Checks if we are running on a Broadcom chip and loads available firmware information based
|
||||
on LMP subversion.
|
||||
@@ -584,7 +561,6 @@ class InternalBlue:
|
||||
return True
|
||||
|
||||
def shutdown(self):
|
||||
# type: () -> None
|
||||
"""
|
||||
Shutdown the framework by stopping the send and recv threads. Socket shutdown
|
||||
also terminates port forwarding if adb is used.
|
||||
@@ -617,7 +593,6 @@ class InternalBlue:
|
||||
log.info("Shutdown complete.")
|
||||
|
||||
def registerHciCallback(self, callback):
|
||||
# type: (Callable[[Record], None ]) -> None
|
||||
"""
|
||||
Add a new callback function to self.registeredHciCallbacks.
|
||||
The function will be called every time the recvThread receives
|
||||
@@ -637,7 +612,6 @@ class InternalBlue:
|
||||
self.registeredHciCallbacks.append(callback)
|
||||
|
||||
def unregisterHciCallback(self, callback):
|
||||
# type: (Callable[[Tuple[HCI, int, int, int, Any, datetime.datetime]], None ]) -> None
|
||||
"""
|
||||
Remove a callback function from self.registeredHciCallbacks.
|
||||
"""
|
||||
@@ -648,7 +622,6 @@ class InternalBlue:
|
||||
log.warn("registerHciCallback: no such callback is registered!")
|
||||
|
||||
def registerHciRecvQueue(self, queue, filter_function=None):
|
||||
# type: (Queue.Queue[Record], FilterFunction) -> None
|
||||
"""
|
||||
Add a new queue to self.registeredHciRecvQueues.
|
||||
The queue will be filled by the recvThread every time the thread receives
|
||||
@@ -671,7 +644,6 @@ class InternalBlue:
|
||||
self.registeredHciRecvQueues.append((queue, filter_function))
|
||||
|
||||
def unregisterHciRecvQueue(self, queue):
|
||||
# type: (Queue.Queue[Tuple[HCI, int, int, int, Any, datetime]]) -> None
|
||||
"""
|
||||
Remove a queue from self.registeredHciRecvQueues.
|
||||
"""
|
||||
@@ -683,7 +655,6 @@ class InternalBlue:
|
||||
log.warn("registerHciRecvQueue: no such queue is registered!")
|
||||
|
||||
def sendHciCommand(self, opcode, data, timeout=3):
|
||||
# type: (Opcode, bytes, int) -> Optional[Any]
|
||||
"""
|
||||
Send an arbitrary HCI command packet by pushing a send-task into the
|
||||
sendQueue. This function blocks until the response is received
|
||||
@@ -703,7 +674,6 @@ class InternalBlue:
|
||||
# define a filter function which recognizes the response (command complete
|
||||
# or command status event).
|
||||
def recvFilterFunction(record):
|
||||
# type: (Record) -> bool
|
||||
hcipkt = record[0]
|
||||
|
||||
log.debug("sendHciCommand.recvFilterFunction: got response")
|
||||
@@ -733,7 +703,6 @@ class InternalBlue:
|
||||
return None
|
||||
|
||||
def sendH4(self, h4type, data, timeout=2):
|
||||
# type: (HCI_CMD, bytes, int) -> bool
|
||||
"""
|
||||
Send an arbitrary H4 packet by pushing a send-task into the
|
||||
sendQueue. This function does not wait for a response! If you
|
||||
@@ -750,7 +719,6 @@ class InternalBlue:
|
||||
return False
|
||||
|
||||
def recvPacket(self, timeout=None):
|
||||
# type: (Optional[int]) -> Optional[Record]
|
||||
"""
|
||||
This function polls the recvQueue for the next available HCI
|
||||
packet and returns it. The function checks whether it is called
|
||||
@@ -775,7 +743,6 @@ class InternalBlue:
|
||||
return None
|
||||
|
||||
def readMem(self, address, length, progress_log=None, bytes_done=0, bytes_total=0):
|
||||
# type: (int, int, Optional[Any], int, int) -> Optional[bytes]
|
||||
"""
|
||||
Reads <length> bytes from the memory space of the firmware at the given
|
||||
address. Reading from unmapped memory or certain memory-mapped-IO areas
|
||||
@@ -860,7 +827,6 @@ class InternalBlue:
|
||||
return outbuffer
|
||||
|
||||
def readMemAligned(self, address, length, progress_log=None, bytes_done=0, bytes_total=0):
|
||||
# type: (int, int, Optional[Any], int, int) -> Any
|
||||
"""
|
||||
This is an alternative to readMem() which enforces a strictly aligned access
|
||||
to the memory that is read. This is needed for e.g. the memory-mapped-IO
|
||||
@@ -898,7 +864,6 @@ class InternalBlue:
|
||||
|
||||
recvQueue = Queue.Queue(1)
|
||||
def hciFilterFunction(record):
|
||||
# type: (Record) -> bool
|
||||
hcipkt = record[0]
|
||||
if not issubclass(hcipkt.__class__, hci.HCI_Event):
|
||||
return False
|
||||
@@ -957,7 +922,6 @@ class InternalBlue:
|
||||
return outbuffer
|
||||
|
||||
def writeMem(self, address, data, progress_log=None, bytes_done=0, bytes_total=0):
|
||||
# type: (int, bytes, Optional[Any], int, int) -> Optional[bool]
|
||||
"""
|
||||
Writes the <data> to the memory space of the firmware at the given
|
||||
address.
|
||||
@@ -1000,7 +964,6 @@ class InternalBlue:
|
||||
return True
|
||||
|
||||
def launchRam(self, address):
|
||||
# type: (int) -> bool
|
||||
"""
|
||||
Executes a function at the specified address in the context of the HCI
|
||||
handler thread. The function has to comply with the calling convention.
|
||||
@@ -1025,7 +988,6 @@ class InternalBlue:
|
||||
return True
|
||||
|
||||
def getPatchramState(self):
|
||||
# type: () -> Tuple[List[Optional[int]], List[Any], List[Any]]
|
||||
"""
|
||||
Retrieves the current state of the patchram unit. The return value
|
||||
is a tuple containing 3 lists which are indexed by the slot number:
|
||||
@@ -1071,7 +1033,6 @@ class InternalBlue:
|
||||
return (table_addresses, table_values, slot_bits)
|
||||
|
||||
def patchRom(self, address, patch, slot=None):
|
||||
# type: (int, Any, Optional[Any]) -> bool
|
||||
"""
|
||||
Patch a 4-byte value (DWORD) inside the ROM section of the firmware
|
||||
(0x0 - 0x8FFFF) using the patchram mechanism. There are 128 available
|
||||
@@ -1153,7 +1114,6 @@ class InternalBlue:
|
||||
return True
|
||||
|
||||
def disableRomPatch(self, address, slot=None):
|
||||
# type: (int, Optional[int]) -> bool
|
||||
"""
|
||||
Disable a patchram slot (see also patchRom()). The slot can either be
|
||||
specified by the target address (address that was patched) or by providing
|
||||
@@ -1196,7 +1156,6 @@ class InternalBlue:
|
||||
return True
|
||||
|
||||
def readConnectionInformation(self, conn_number):
|
||||
# type: (ConnectionNumber) -> Optional[ConnectionDict]
|
||||
"""
|
||||
Reads and parses a connection struct based on the connection number.
|
||||
Note: The connection number is different from the connection index!
|
||||
@@ -1262,7 +1221,6 @@ class InternalBlue:
|
||||
return conn_dict
|
||||
|
||||
def sendLmpPacket(self, opcode, payload='', is_master=True, conn_handle=0x0c, extended_op=False):
|
||||
# type: (Opcode, Any, bool, ConnectionNumber, bool) -> bool
|
||||
"""
|
||||
Inject a LMP packet into a Bluetooth connection (i.e. send a LMP packet
|
||||
to a remote device which is paired and connected with our local device).
|
||||
@@ -1327,7 +1285,6 @@ class InternalBlue:
|
||||
return True
|
||||
|
||||
def fuzzLmp(self):
|
||||
# type: ()-> bool
|
||||
"""
|
||||
Installs a patch inside the sendLmp HCI handler that allows sending arbitrary
|
||||
LMP payloads. Afterwards, use sendLmpPacket as before.
|
||||
@@ -1355,7 +1312,6 @@ class InternalBlue:
|
||||
return True
|
||||
|
||||
def sendLmpPacketLegacy(self, conn_nr, opcode, payload, extended_op=False):
|
||||
# type: (int, Opcode, bytes, bool) -> bool
|
||||
"""
|
||||
Inject a LMP packet into a Bluetooth connection (i.e. send a LMP packet
|
||||
to a remote device which is paired and connected with our local device).
|
||||
@@ -1392,7 +1348,7 @@ class InternalBlue:
|
||||
|
||||
# Prepare the assembler snippet by injecting the connection number
|
||||
# and appending the LMP packet data.
|
||||
asm_code = self.fw.SENDLMP_ASM_CODE % (conn_nr) # type: str
|
||||
asm_code = self.fw.SENDLMP_ASM_CODE % (conn_nr)
|
||||
asm_code_with_data = asm_code + ''.join([".byte 0x%02x\n" % ord(x)
|
||||
for x in data.ljust(20, "\x00")])
|
||||
|
||||
@@ -1408,7 +1364,6 @@ class InternalBlue:
|
||||
return False
|
||||
|
||||
def sendLcpPacket(self, conn_idx, payload):
|
||||
# type: (ConnectionIndex, bytes) -> bool
|
||||
"""
|
||||
Inject a LCP packet into a Bluetooth LE connection (i.e. send a LCP packet
|
||||
to a remote device which is paired and connected with our local device).
|
||||
@@ -1445,7 +1400,6 @@ class InternalBlue:
|
||||
return False
|
||||
|
||||
def connectToRemoteDevice(self, bt_addr):
|
||||
# type: (BluetoothAddress) -> None
|
||||
"""
|
||||
Send a HCI Connect Command to the firmware. This will setup
|
||||
a connection (inserted into the connection structure) if the
|
||||
@@ -1473,7 +1427,6 @@ class InternalBlue:
|
||||
self.sendHciCommand(0x0405, bt_addr[::-1] + '\x00\x00\x00\x00\x00\x00\x01')
|
||||
|
||||
def connectToRemoteLEDevice(self, bt_addr, addr_type=0x00):
|
||||
# type: (BluetoothAddress, int) -> None
|
||||
"""
|
||||
Send a HCI LE Create Connection Command to the firmware as
|
||||
defined in the Bluetooth Core Specification 5.0 p. 1266.
|
||||
@@ -1490,7 +1443,6 @@ class InternalBlue:
|
||||
self.sendHciCommand(0x200d, '\x60\x00\x30\x00\x00' + p8(addr_type) + bt_addr[::-1] + '\x01\x18\x00\x28\x00\x00\x00\xd0\x07\x00\x00\x00\x00')
|
||||
|
||||
def connectionStatusCallback(self, record):
|
||||
# type: (Record) -> None
|
||||
"""
|
||||
HCI Callback function to detect HCI Events related to
|
||||
Create Connection
|
||||
@@ -1523,7 +1475,6 @@ class InternalBlue:
|
||||
log.info("[Disconnect Complete: Handle=0x%x]" % (conn_handle))
|
||||
|
||||
def coexStatusCallback(self, record):
|
||||
# type: (Record) -> None
|
||||
"""
|
||||
Coexistence Callback Function
|
||||
Interprets debug counters for coexistence with WiFi/LTE
|
||||
@@ -1548,7 +1499,6 @@ class InternalBlue:
|
||||
return
|
||||
|
||||
def readHeapInformation(self):
|
||||
# type: () -> Optional[Union[HeapInformation, bool]]
|
||||
"""
|
||||
Traverses the double-linked list of BLOC structs and returns them as a
|
||||
list of dictionaries. The dicts have the following fields:
|
||||
@@ -1642,7 +1592,6 @@ class InternalBlue:
|
||||
|
||||
|
||||
def readQueueInformation(self):
|
||||
# type: () -> Optional[Union[bool, QueueInformation]]
|
||||
"""
|
||||
Traverses the double-linked list of QUEUE structs and returns them as a
|
||||
list of dictionaries. The dicts have the following fields:
|
||||
@@ -1706,7 +1655,6 @@ class InternalBlue:
|
||||
return queuelist
|
||||
|
||||
def enableBroadcomDiagnosticLogging(self, enable):
|
||||
# type: (bool) -> None
|
||||
"""
|
||||
Broadcom implemented their own H4 layer protocol. Normally H4 handles HCI
|
||||
messages like HCI commands, SCO and ACL data, and HCI events. Their types are
|
||||
@@ -1733,9 +1681,3 @@ class InternalBlue:
|
||||
# itself crashes when receiving diagnostic frames...
|
||||
else:
|
||||
log.warn("Diagnostic protocol requires modified Android driver!")
|
||||
|
||||
def _setupSockets(self):
|
||||
raise NotImplementedError()
|
||||
|
||||
def _teardownSockets(self):
|
||||
raise NotImplementedError()
|
||||
|
||||
@@ -50,7 +50,7 @@ SECTIONS = [ MemorySection(0x0, 0x90000, True , False),
|
||||
MemorySection(0xd0000, 0xd8000, False, True ),
|
||||
#MemorySection(0xe0000, 0x1f0000, True , False),
|
||||
MemorySection(0x200000, 0x21ffff, False, True ),
|
||||
MemorySection(0x260000, 0x268000, True , False),
|
||||
#MemorySection(0x260000, 0x268000, True , False), # might crash? issue 14
|
||||
#MemorySection(0x280000, 0x2a0000, True , False),
|
||||
MemorySection(0x318000, 0x320000, False, False),
|
||||
MemorySection(0x324000, 0x360000, False, False),
|
||||
@@ -74,6 +74,10 @@ PATCHRAM_VALUE_TABLE_ADDRESS = 0xd0000
|
||||
PATCHRAM_NUMBER_OF_SLOTS = 128
|
||||
PATCHRAM_ALIGNED = False
|
||||
|
||||
# Heap
|
||||
BLOC_HEAD = 0x200588 # g_dynamic_memory_GeneralUsePools
|
||||
BLOC_NG = True # Next Generation Bloc Buffer
|
||||
|
||||
# Snippet for sendLcpPacket()
|
||||
SENDLCP_CODE_BASE_ADDRESS = 0x21a000
|
||||
SENDLCP_ASM_CODE = """
|
||||
|
||||
@@ -20,7 +20,7 @@
|
||||
# out of or in connection with the Software or the use or other dealings in the
|
||||
# Software.
|
||||
|
||||
from .fw import MemorySection
|
||||
from fw import MemorySection
|
||||
|
||||
# Firmware Infos
|
||||
# iPhone 6
|
||||
|
||||
@@ -20,7 +20,7 @@
|
||||
# out of or in connection with the Software or the use or other dealings in the
|
||||
# Software.
|
||||
|
||||
from .fw import MemorySection
|
||||
from fw import MemorySection
|
||||
|
||||
# Firmware Infos
|
||||
# Samsung Galaxy S8
|
||||
|
||||
@@ -23,7 +23,7 @@
|
||||
# out of or in connection with the Software or the use or other dealings in the
|
||||
# Software.
|
||||
|
||||
from .fw import MemorySection
|
||||
from fw import MemorySection
|
||||
|
||||
# Firmware Infos
|
||||
FW_NAME = "BCM20702A2"
|
||||
|
||||
@@ -20,7 +20,7 @@
|
||||
# out of or in connection with the Software or the use or other dealings in the
|
||||
# Software.
|
||||
|
||||
from .fw import MemorySection
|
||||
from fw import MemorySection
|
||||
|
||||
# Firmware Infos
|
||||
# Evaluation Kit CYW920735
|
||||
|
||||
@@ -20,11 +20,11 @@
|
||||
# out of or in connection with the Software or the use or other dealings in the
|
||||
# Software.
|
||||
|
||||
from .fw import MemorySection
|
||||
from fw import MemorySection
|
||||
|
||||
# Firmware Infos
|
||||
# Evaluation Kit CYW927019
|
||||
FW_NAME = "CYW27039B1 (NOT iPhone X/XR!)"
|
||||
# Evaluation Kit CYW920719
|
||||
FW_NAME = "CYW20739B1 (NOT iPhone X/XR!)"
|
||||
# TODO this is not the iPhone firmware, we need to add a switch in fw.py
|
||||
|
||||
# Device Infos
|
||||
|
||||
@@ -20,7 +20,7 @@
|
||||
# out of or in connection with the Software or the use or other dealings in the
|
||||
# Software.
|
||||
|
||||
from .fw import MemorySection
|
||||
from fw import MemorySection
|
||||
|
||||
# Firmware Infos
|
||||
# iPhone 8/X/XR
|
||||
|
||||
@@ -22,7 +22,7 @@
|
||||
# out of or in connection with the Software or the use or other dealings in the
|
||||
# Software.
|
||||
|
||||
from .fw import MemorySection
|
||||
from fw import MemorySection
|
||||
|
||||
# Firmware Infos
|
||||
FW_NAME = "BCM2070B0 (MacBook Pro 2011)"
|
||||
|
||||
@@ -20,7 +20,7 @@
|
||||
# out of or in connection with the Software or the use or other dealings in the
|
||||
# Software.
|
||||
|
||||
from .fw import MemorySection
|
||||
from fw import MemorySection
|
||||
|
||||
# Firmware Infos
|
||||
# This runs on an iPhone 7
|
||||
|
||||
@@ -25,7 +25,7 @@
|
||||
# out of or in connection with the Software or the use or other dealings in the
|
||||
# Software.
|
||||
|
||||
from .fw import MemorySection
|
||||
from fw import MemorySection
|
||||
|
||||
# Firmware Infos
|
||||
# This runs on Nexus 5, Xperia Z3, Samsung Galaxy Note 3
|
||||
|
||||
@@ -73,6 +73,10 @@ PATCHRAM_VALUE_TABLE_ADDRESS = 0xd0000
|
||||
PATCHRAM_NUMBER_OF_SLOTS = 128
|
||||
PATCHRAM_ALIGNED = False
|
||||
|
||||
# Heap
|
||||
BLOC_HEAD = 0x200490 # g_dynamic_memory_GeneralUsePools
|
||||
BLOC_NG = True # Next Generation Bloc Buffer
|
||||
|
||||
# Snippet for sendLcpPacket()
|
||||
SENDLCP_CODE_BASE_ADDRESS = 0x21f000
|
||||
SENDLCP_ASM_CODE = """
|
||||
|
||||
@@ -20,7 +20,7 @@
|
||||
# out of or in connection with the Software or the use or other dealings in the
|
||||
# Software.
|
||||
|
||||
from .fw import MemorySection
|
||||
from fw import MemorySection
|
||||
|
||||
# Firmware Infos
|
||||
# iPhone 6
|
||||
|
||||
@@ -20,7 +20,7 @@
|
||||
# out of or in connection with the Software or the use or other dealings in the
|
||||
# Software.
|
||||
|
||||
from .fw import MemorySection
|
||||
from fw import MemorySection
|
||||
|
||||
# Firmware Infos
|
||||
# iPhone SE
|
||||
|
||||
@@ -22,7 +22,7 @@
|
||||
# out of or in connection with the Software or the use or other dealings in the
|
||||
# Software.
|
||||
|
||||
from .fw import MemorySection
|
||||
from fw import MemorySection
|
||||
|
||||
# Firmware Infos
|
||||
FW_NAME = "default (unknown firmware)"
|
||||
|
||||
+11
-1
@@ -56,7 +56,6 @@ class HCI(object):
|
||||
return HCI_UART_TYPE_CLASS[uart_type].from_data(data[1:])
|
||||
|
||||
def __init__(self, uart_type):
|
||||
self.event_code = None
|
||||
self.uart_type = uart_type
|
||||
|
||||
def getRaw(self):
|
||||
@@ -958,6 +957,7 @@ class StackDumpReceiver:
|
||||
if self.memdump_addr == None:
|
||||
self.memdump_addr = addr
|
||||
self.memdumps[addr-self.memdump_addr] = data[4:]
|
||||
log.debug("Stack dump handling addr %08x", addr-self.memdump_addr)
|
||||
|
||||
def finishStackDump(self):
|
||||
dump = fit(self.memdumps)
|
||||
@@ -1092,6 +1092,16 @@ class StackDumpReceiver:
|
||||
self.finishStackDump()
|
||||
return True
|
||||
|
||||
# On a Raspberry Pi 3, the last packet of a stack dump is '1b0340df0338'.... so it's 0x40
|
||||
elif packet_type == 0xe8:
|
||||
# FIXME Raspi memdump is divided in two parts!
|
||||
# address change from 0001fe38 to packet type e8 and then it's computing addr -0130000
|
||||
# negative addr does not work with finishStackDump()
|
||||
# so even though the last packet is 0x40, let's just finish on 0xe8
|
||||
log.info("End of first stackdump block, writing to file and skipping second...")
|
||||
self.finishStackDump()
|
||||
return True
|
||||
|
||||
return False
|
||||
|
||||
|
||||
|
||||
+5
-14
@@ -4,19 +4,11 @@ import subprocess
|
||||
import datetime
|
||||
from pwn import *
|
||||
import fcntl
|
||||
from .core import InternalBlue
|
||||
from core import InternalBlue
|
||||
import hci
|
||||
import Queue
|
||||
import threading
|
||||
|
||||
try:
|
||||
from typing import List
|
||||
from internalblue import Device
|
||||
|
||||
|
||||
except:
|
||||
pass
|
||||
|
||||
# from /usr/include/bluetooth/hci.h:
|
||||
#define HCIDEVUP _IOW('H', 201, int)
|
||||
#define HCIGETDEVLIST _IOR('H', 210, int)
|
||||
@@ -42,7 +34,6 @@ class HCICore(InternalBlue):
|
||||
self.doublecheck = False
|
||||
|
||||
def getHciDeviceList(self):
|
||||
# type: () -> List[Device]
|
||||
"""
|
||||
Get a list of available HCI devices. The list is obtained by executing
|
||||
ioctl syscalls HCIGETDEVLIST and HCIGETDEVINFO. The returned list
|
||||
@@ -132,8 +123,8 @@ class HCICore(InternalBlue):
|
||||
log.info("HCI device: %s [%s] flags=%d<%s>" %
|
||||
(dev["dev_name"], dev["dev_bdaddr"],
|
||||
dev["dev_flags"], dev["dev_flags_str"]))
|
||||
device_list.append((self, dev["dev_name"], 'hci: %s (%s) <%s>' %
|
||||
(dev["dev_bdaddr"], dev["dev_name"], dev["dev_flags_str"])))
|
||||
device_list.append([self, dev["dev_name"], 'hci: %s (%s) <%s>' %
|
||||
(dev["dev_bdaddr"], dev["dev_name"], dev["dev_flags_str"])])
|
||||
|
||||
if len(device_list) == 0:
|
||||
log.info('No connected HCI device found')
|
||||
@@ -193,8 +184,8 @@ class HCICore(InternalBlue):
|
||||
record_data = self.s_snoop.recv(1024)
|
||||
except socket.timeout:
|
||||
continue # this is ok. just try again without error
|
||||
except Exception as e:
|
||||
log.critical("Lost device interface with exception {}, terminating receive thread...".format(e))
|
||||
except Exception:
|
||||
log.critical("Lost device interface, terminating receive thread...")
|
||||
self.exit_requested = True
|
||||
continue
|
||||
|
||||
|
||||
@@ -114,8 +114,6 @@ class macOSCore(InternalBlue):
|
||||
# Put all relevant infos into a tuple. The HCI packet is parsed with the help of hci.py.
|
||||
record = (hci.parse_hci_packet(record_data), 0, 0, 0, 0, 0) #TODO not sure if this causes trouble?
|
||||
log.debug("Recv: " + str(record[0]))
|
||||
log.info(binascii.hexlify(record_data))
|
||||
|
||||
# Put the record into all queues of registeredHciRecvQueues if their
|
||||
# filter function matches.
|
||||
for queue, filter_function in self.registeredHciRecvQueues: # TODO filter_function not working with bluez modifications
|
||||
|
||||
@@ -1,175 +0,0 @@
|
||||
import binascii
|
||||
import time
|
||||
|
||||
|
||||
class SocketRecvHook():
|
||||
def __init__(self, socket):
|
||||
# type: (socket.socket) -> None
|
||||
self.socket = socket
|
||||
self.replace = False
|
||||
|
||||
def recv_hook(self, data):
|
||||
raise NotImplementedError()
|
||||
|
||||
def recv_replace(self, length, **kwargs):
|
||||
raise NotImplementedError()
|
||||
|
||||
def recv(self, length, **kwargs):
|
||||
if not self.replace:
|
||||
data = self.socket.recv(length, **kwargs)
|
||||
else:
|
||||
data = self.recv_replace(length, **kwargs)
|
||||
self.recv_hook(data)
|
||||
return data
|
||||
|
||||
def recvfrom(self, length):
|
||||
# type: (int) -> Tuple[bytes, Any]
|
||||
if not self.replace:
|
||||
data, addr = self.socket.recvfrom(length)
|
||||
self.recv_hook(data)
|
||||
return data
|
||||
|
||||
class SocketInjectHook():
|
||||
def __init__(self, socket):
|
||||
# type: (socket.socket) -> None
|
||||
self.socket = socket
|
||||
self.replace = False
|
||||
|
||||
def close(self):
|
||||
self.socket.close()
|
||||
|
||||
def send(self,data):
|
||||
self.send_hook(data)
|
||||
if not self.replace:
|
||||
try:
|
||||
self.socket.send(data)
|
||||
except Exception as e:
|
||||
self.send_exception(e)
|
||||
raise e
|
||||
else:
|
||||
self.send_replace(data)
|
||||
|
||||
def send_hook(self,result):
|
||||
raise NotImplementedError()
|
||||
|
||||
def send_replace(self,data):
|
||||
raise NotImplementedError()
|
||||
|
||||
def send_exception(self, e):
|
||||
raise NotImplementedError()
|
||||
|
||||
|
||||
class SocketDuplexHook(SocketInjectHook, SocketRecvHook):
|
||||
|
||||
def __init__(self, socket):
|
||||
# type: (socket.socket) -> None
|
||||
self.socket = socket
|
||||
self.replace = False
|
||||
pass
|
||||
|
||||
|
||||
|
||||
class HookBase():
|
||||
def send_hook(self,data):
|
||||
raise NotImplementedError
|
||||
def recv_hook(self, data):
|
||||
raise NotImplementedError
|
||||
|
||||
|
||||
class TraceToFileHook(SocketDuplexHook):
|
||||
def __init__(self, socket, filename='/tmp/bt_hci.log'):
|
||||
SocketDuplexHook.__init__(self, socket)
|
||||
self.file = open(filename, 'a')
|
||||
self.replace = False
|
||||
self.log = []
|
||||
|
||||
def recv_hook(self, data):
|
||||
line = "RX {}\n".format(binascii.hexlify(data))
|
||||
print(line)
|
||||
self.log.append(line)
|
||||
|
||||
def send_hook(self, data):
|
||||
line = "TX {}\n".format(binascii.hexlify(data))
|
||||
print(line)
|
||||
self.log.append(line)
|
||||
|
||||
def send_exception(self, e):
|
||||
line = "EX '{}'\n".format(e)
|
||||
print(line)
|
||||
self.log.append(line)
|
||||
|
||||
def close(self):
|
||||
self.socket.close()
|
||||
self.log.append("Socket closed\n")
|
||||
self.file.writelines(self.log)
|
||||
self.file.close()
|
||||
|
||||
|
||||
import socket
|
||||
|
||||
|
||||
class PrintTrace(SocketDuplexHook):
|
||||
|
||||
def send_hook(self, data, **kwargs):
|
||||
print("Sent: {}".format(binascii.hexlify(data)))
|
||||
|
||||
def recv_hook(self, data, **kwargs):
|
||||
print("Recv: {}".format(binascii.hexlify(data)))
|
||||
|
||||
def recvfrom_hook(self, data, **kwargs):
|
||||
print("Recv: {}".format(binascii.hexlify(data)))
|
||||
|
||||
|
||||
|
||||
|
||||
class ReplaySocket():
|
||||
def __init__(self, filename='/tmp/bt.log'):
|
||||
super(ReplaySocket, self).__init__()
|
||||
self.log = open(filename).readlines()
|
||||
self.index = 0
|
||||
|
||||
def send(self, data, **kwargs):
|
||||
encoded_data = "" # type: str
|
||||
direction, encoded_data = self.log[self.index].split(" ")
|
||||
assert(direction == "TX")
|
||||
log_data = binascii.unhexlify(encoded_data.rstrip('\n'))
|
||||
assert(data == log_data)
|
||||
self.index+=1
|
||||
|
||||
def recv(self, **kwargs):
|
||||
time.sleep(0.01)
|
||||
direction, encoded_data = self.log[self.index].split(" ")
|
||||
if direction == "RX":
|
||||
return binascii.unhexlify(encoded_data.rstrip('\n'))
|
||||
else:
|
||||
raise socket.timeout()
|
||||
|
||||
|
||||
|
||||
from internalblue.core import InternalBlue
|
||||
|
||||
try:
|
||||
import typing
|
||||
from typing import Type
|
||||
except ImportError:
|
||||
pass
|
||||
|
||||
def hook(core, socket_hook):
|
||||
# type: (Type[InternalBlue], Type[SocketDuplexHook]) -> None
|
||||
|
||||
def wrap_socket_setup(orig_func):
|
||||
def wrapped_socket_setup(self, *args, **kwargs):
|
||||
status = orig_func(self, *args, **kwargs)
|
||||
if self.s_inject == self.s_snoop:
|
||||
h = socket_hook(self.s_inject)
|
||||
self.s_inject = h
|
||||
self.s_snoop = h
|
||||
else:
|
||||
self.s_inject = socket_hook(self.s_inject)
|
||||
self.s_snoop = socket_hook(self.s_snoop)
|
||||
return status
|
||||
|
||||
return wrapped_socket_setup
|
||||
|
||||
core._setupSockets = wrap_socket_setup(core._setupSockets)
|
||||
|
||||
@@ -1,159 +0,0 @@
|
||||
#!/usr/bin/env python2
|
||||
|
||||
import socket
|
||||
import Queue
|
||||
import hci
|
||||
|
||||
from pwn import *
|
||||
|
||||
from core import InternalBlue
|
||||
import binascii
|
||||
|
||||
filepath = os.path.dirname(os.path.abspath(__file__))
|
||||
|
||||
|
||||
try:
|
||||
import typing
|
||||
from typing import List, Tuple, Any
|
||||
from internalblue.core import InternalBlue
|
||||
|
||||
except:
|
||||
pass
|
||||
|
||||
class testCore(InternalBlue):
|
||||
def __init__(self, queue_size=1000, btsnooplog_filename='btsnoop.log', log_level='info', fix_binutils='True', data_directory="."):
|
||||
super(testCore, self).__init__(queue_size, btsnooplog_filename, log_level, fix_binutils, data_directory=".")
|
||||
file = open(filepath+'/../dummymemdump.bin', mode='rb')
|
||||
self.memory = file.read()
|
||||
file.close()
|
||||
self.doublecheck = False
|
||||
|
||||
def device_list(self):
|
||||
# type: () -> List[Tuple[InternalBlue,str,str]]
|
||||
"""
|
||||
Get a list of connected devices
|
||||
"""
|
||||
|
||||
if self.exit_requested:
|
||||
self.shutdown()
|
||||
|
||||
if self.running:
|
||||
log.warn("Already running. Call shutdown() first!")
|
||||
return []
|
||||
|
||||
# assume that a explicitly specified iPhone exists
|
||||
device_list = [(self, "Testchip", "Testchip")]
|
||||
|
||||
return device_list
|
||||
|
||||
def sendH4(self, h4type, data, timeout=2):
|
||||
"""
|
||||
Send an arbitrary HCI packet by pushing a send-task into the
|
||||
sendQueue. This function blocks until the response is received
|
||||
or the timeout expires. The return value is the Payload of the
|
||||
HCI Command Complete Event which was received in response to
|
||||
the command or None if no response was received within the timeout.
|
||||
"""
|
||||
|
||||
queue = Queue.Queue(1)
|
||||
|
||||
try:
|
||||
self.sendQueue.put((h4type, data, queue, None), timeout=timeout)
|
||||
ret = queue.get(timeout=timeout)
|
||||
return ret
|
||||
except Queue.Empty:
|
||||
log.warn("sendH4: waiting for response timed out!")
|
||||
return None
|
||||
except Queue.Full:
|
||||
log.warn("sendH4: send queue is full!")
|
||||
return None
|
||||
|
||||
def local_connect(self):
|
||||
return True
|
||||
|
||||
def _setupSockets(self):
|
||||
self.hciport = random.randint(60000, 65535 - 1)
|
||||
log.debug("_setupSockets: Selected random ports snoop=%d and inject=%d" % (self.hciport, self.hciport + 1))
|
||||
log.info("Wireshark configuration (on Loopback interface): udp.port == %d || udp.port == %d" % (
|
||||
self.hciport, self.hciport + 1))
|
||||
|
||||
# Create s_snoop socket
|
||||
self.s_snoop = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
||||
self.s_snoop.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
|
||||
self.s_snoop.bind(('127.0.0.1', self.hciport))
|
||||
self.s_snoop.settimeout(0.5)
|
||||
self.s_snoop.setblocking(True)
|
||||
|
||||
# Create s_inject
|
||||
self.s_inject = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
||||
self.s_inject.settimeout(0.5)
|
||||
self.s_inject.setblocking(True)
|
||||
|
||||
time.sleep(1.5)
|
||||
|
||||
return True
|
||||
|
||||
def _recvThreadFunc(self):
|
||||
log.debug("Receive Thread terminated.")
|
||||
|
||||
def _sendThreadFunc(self):
|
||||
log.debug("Send Thread started.")
|
||||
while not self.exit_requested:
|
||||
# Little bit ugly: need to re-apply changes to the global context to the thread-copy
|
||||
context.log_level = self.log_level
|
||||
|
||||
# Wait for 'send task' in send queue
|
||||
try:
|
||||
task = self.sendQueue.get(timeout=0.5)
|
||||
except Queue.Empty:
|
||||
continue
|
||||
|
||||
# Extract the components of the task
|
||||
h4type, data, queue, filter_function = task
|
||||
|
||||
# Prepend UART TYPE and length.
|
||||
out = p8(h4type) + p8(len(data)) + data
|
||||
|
||||
# Send command to the chip using IOBluetoothExtended framework
|
||||
h4type, data, queue, filter_function = task
|
||||
opcode = binascii.hexlify(data[1]) + binascii.hexlify(data[0])
|
||||
log.debug("Sending command: 0x" + binascii.hexlify(data))
|
||||
|
||||
# if the caller expects a response: register a queue to receive the response
|
||||
if queue is not None and filter_function is not None:
|
||||
recvQueue = Queue.Queue(1)
|
||||
self.registerHciRecvQueue(recvQueue, filter_function)
|
||||
|
||||
# if the caller expects a response:
|
||||
# Wait for the HCI event response by polling the recvQueue
|
||||
|
||||
if queue is not None and filter_function is not None:
|
||||
# Return responses according to the opcode & operands
|
||||
if opcode == '1001':
|
||||
record_data = '040E0C0101100006b415060f000e22'.decode('hex')
|
||||
data = hci.parse_hci_packet(record_data).data
|
||||
elif opcode == 'fc4d':
|
||||
length = int(binascii.hexlify(data[7]), 16)
|
||||
address = int(binascii.hexlify(data[6]+data[5]+data[4]+data[3]), 16)
|
||||
data = '014dfc00'.decode('hex') + self.memory[address:address+length]
|
||||
elif opcode == 'fc4c':
|
||||
log.info(data.encode('hex'))
|
||||
length = int(binascii.hexlify(data[2]), 16)
|
||||
address = int(binascii.hexlify(data[6]+data[5]+data[4]+data[3]), 16)
|
||||
self.memory = self.memory[:address] + data[7:len(data)] + self.memory[address+length:]
|
||||
else:
|
||||
print(opcode)
|
||||
|
||||
queue.put(data)
|
||||
self.unregisterHciRecvQueue(recvQueue)
|
||||
|
||||
log.debug("Send Thread terminated.")
|
||||
|
||||
def enableBroadcomDiagnosticLogging(self, enable):
|
||||
return
|
||||
|
||||
def _teardownSockets(self):
|
||||
return True
|
||||
|
||||
def shutdown(self):
|
||||
return True
|
||||
@@ -1,17 +0,0 @@
|
||||
import unittest
|
||||
|
||||
from internalblue.testcore import testCore
|
||||
|
||||
|
||||
class DummyCoreTest(unittest.TestCase):
|
||||
|
||||
def setUp(self):
|
||||
t = testCore(log_level='debug', data_directory='/tmp')
|
||||
dev = t.device_list()[0]
|
||||
reference = dev[0]
|
||||
reference.interface = dev[1]
|
||||
self.assert_(reference.connect(), 'Connect failed')
|
||||
self.reference = reference
|
||||
|
||||
def tearDown(self):
|
||||
self.reference.shutdown()
|
||||
@@ -1,31 +0,0 @@
|
||||
import nose
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
from .dummy_core_test import DummyCoreTest
|
||||
from internalblue.cmds import CmdHexdump, CmdSendHciCmd
|
||||
|
||||
import unittest
|
||||
|
||||
|
||||
class TestCmdSendHciCmd(DummyCoreTest):
|
||||
def test_version(self):
|
||||
|
||||
cmd = CmdSendHciCmd("sendhcicmd 0x1001", self.reference)
|
||||
result = cmd.work()
|
||||
nose.tools.assert_equal(result, b'\x01\x01\x10\x00\x06\xb4\x15\x06\x0f\x00\x0e"')
|
||||
pass
|
||||
|
||||
def test_write(self):
|
||||
cmd = CmdSendHciCmd("sendhcicmd 0xfc4c", self.reference)
|
||||
pass
|
||||
#cmd.readMem()
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
unittest.main()
|
||||
|
||||
@@ -1,45 +0,0 @@
|
||||
import nose
|
||||
|
||||
|
||||
|
||||
try:
|
||||
from typing import TYPE_CHECKING
|
||||
if TYPE_CHECKING:
|
||||
from internalblue import Address
|
||||
except ImportError:
|
||||
Address = lambda x: x
|
||||
pass
|
||||
|
||||
|
||||
|
||||
from .dummy_core_test import DummyCoreTest
|
||||
from internalblue.cmds import CmdHexdump
|
||||
|
||||
|
||||
import unittest
|
||||
|
||||
|
||||
class TestDirectMemReadWrite(DummyCoreTest):
|
||||
|
||||
def test_read_mem(self):
|
||||
from internalblue.cmds import CmdHexdump
|
||||
hdxdmp = CmdHexdump("hexdump 0xc0 -l 0x20", self.reference)
|
||||
dump = hdxdmp.readMem(Address(0xc0), 0x20)
|
||||
nose.tools.assert_equal(dump, b'\\\x01\x00\x00-.\x03\x00Copyright (c) Broadcom C')
|
||||
|
||||
|
||||
def test_write_mem(self):
|
||||
from internalblue.cmds import CmdWriteMem
|
||||
cmd = CmdWriteMem("writemem --hex 0xc0 41424344", self.reference)
|
||||
data = b'FOOBAR'
|
||||
status = cmd.writeMem(Address(0x1000), data)
|
||||
nose.tools.assert_true(status)
|
||||
|
||||
read = cmd.readMem(Address(0x1000), len(data))
|
||||
|
||||
nose.tools.assert_equal(data, read)
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
unittest.main()
|
||||
|
||||
Reference in New Issue
Block a user