Compare commits
26 Commits
python2
...
hci_hooking
| Author | SHA1 | Date | |
|---|---|---|---|
| 7b1577fe06 | |||
| 0fd8545d93 | |||
| f8afcaad84 | |||
| ef8ed5bd7e | |||
| c225be4f9b | |||
| 90af9fbde3 | |||
| 79fc673617 | |||
| c36b8fe93d | |||
| ef624644b3 | |||
| 9766a12ece | |||
| 01a7decfc9 | |||
| 8f136d1172 | |||
| 21d3ff3b44 | |||
| 4dafd0bf8f | |||
| ac6f8ea630 | |||
| a1222c897a | |||
| ce463df61b | |||
| 0c8bf457a3 | |||
| 28e39d6c34 | |||
| 88bdc3e58e | |||
| d190a2537a | |||
| 9377c8d17e | |||
| c6575d904d | |||
| 9a16c93191 | |||
| a8a9736bf1 | |||
| c43c4bb580 |
Binary file not shown.
@@ -1 +1,62 @@
|
||||
|
||||
try:
|
||||
from Queue import Queue
|
||||
from typing import List, Optional, Any, TYPE_CHECKING, Tuple, Union, NewType, Callable, Dict
|
||||
|
||||
if TYPE_CHECKING:
|
||||
import datetime
|
||||
from internalblue.hci import HCI
|
||||
from internalblue.core import InternalBlue
|
||||
Address = NewType("Address", int)
|
||||
Record = Tuple[HCI, int, int, int, Any, datetime.datetime]
|
||||
FilterFunction = Callable[[Record], bool]
|
||||
ConnectionNumber = NewType("ConnectionNumber", int)
|
||||
ConnectionIndex = NewType("ConnectionIndex", int)
|
||||
|
||||
BluetoothAddress = NewType("BluetoothAddress", bytes)
|
||||
ConnectionDict = NewType("ConnectionDict", Dict[str,Any])
|
||||
HeapInformation = NewType("HeapInformation", Dict[str, Any])
|
||||
QueueInformation = NewType('QueueInformation', Dict[str, Any])
|
||||
Opcode = NewType('Opcode', int)
|
||||
HCI_CMD = NewType('HCI_CMD', int)
|
||||
Task = Tuple[HCI_CMD, bytes, Queue.Queue, Callable[[Record], bool]]
|
||||
|
||||
Device = NewType("Device", Dict[str, Any])
|
||||
"""{"dev_id": dev_id,
|
||||
"dev_name": dev_name,
|
||||
"dev_bdaddr": dev_bdaddr,
|
||||
"dev_flags": dev_flags,
|
||||
"dev_flags_str": dev_flags_str}"""
|
||||
# InternalBlueCore, Device Name, SomeString
|
||||
DeviceTuple = Tuple[InternalBlue, str, str]
|
||||
|
||||
|
||||
except:
|
||||
pass
|
||||
|
||||
|
||||
import logging
|
||||
|
||||
|
||||
class ProgressLog():
|
||||
"""
|
||||
Hack to get around the dependency to the pwnlib logger
|
||||
This looses functionality, but at some point this can be replaced by something like progressbar2
|
||||
"""
|
||||
def __init__(self, init_msg,logger):
|
||||
self.logger = logger
|
||||
self.logger.warning(init_msg)
|
||||
|
||||
def failure(self, msg):
|
||||
self.logger.warning(msg)
|
||||
|
||||
def status(self, msg):
|
||||
self.logger.info(msg)
|
||||
|
||||
def success(self, msg):
|
||||
self.logger.info(msg)
|
||||
|
||||
def getLogger(name):
|
||||
logger = logging.getLogger(name)
|
||||
logger.progress = lambda msg: ProgressLog(msg, logger)
|
||||
return logger
|
||||
|
||||
@@ -4,11 +4,11 @@ import datetime
|
||||
import socket
|
||||
import Queue
|
||||
import random
|
||||
import hci
|
||||
from internalblue import hci
|
||||
|
||||
from pwn import *
|
||||
|
||||
from core import InternalBlue
|
||||
from .core import InternalBlue
|
||||
|
||||
|
||||
class ADBCore(InternalBlue):
|
||||
|
||||
+56
-13
@@ -33,11 +33,20 @@ import os
|
||||
import traceback
|
||||
import argparse
|
||||
|
||||
from adbcore import ADBCore
|
||||
from hcicore import HCICore
|
||||
from .adbcore import ADBCore
|
||||
from .hcicore import HCICore
|
||||
from sys import platform
|
||||
|
||||
import cmds
|
||||
from . import cmds
|
||||
|
||||
try:
|
||||
import typing
|
||||
from typing import List, Optional
|
||||
from internalblue.core import InternalBlue
|
||||
from . import DeviceTuple
|
||||
|
||||
except:
|
||||
pass
|
||||
|
||||
HISTFILE = "_internalblue.hist"
|
||||
|
||||
@@ -53,11 +62,15 @@ type <help> for usage information!\n\n"""
|
||||
for line in banner:
|
||||
term.output(text.blue(line))
|
||||
|
||||
def commandLoop(internalblue):
|
||||
def commandLoop(internalblue, init_commands=None):
|
||||
cmdstack = init_commands.split(';')[::-1] if init_commands else None
|
||||
while internalblue.running and not internalblue.exit_requested:
|
||||
cmd_instance = None
|
||||
try:
|
||||
cmdline = term.readline.readline(prompt='> ').strip()
|
||||
if cmdstack:
|
||||
cmdline = cmdstack.pop().strip()
|
||||
else:
|
||||
cmdline = term.readline.readline(prompt='> ').strip()
|
||||
cmdword = cmdline.split(' ')[0].split('=')[0]
|
||||
if(cmdword == ''):
|
||||
continue
|
||||
@@ -89,7 +102,7 @@ def commandLoop(internalblue):
|
||||
|
||||
|
||||
# Main Program Start
|
||||
def internalblue_cli():
|
||||
def internalblue_cli(argv):
|
||||
print_banner()
|
||||
|
||||
parser = argparse.ArgumentParser()
|
||||
@@ -97,7 +110,11 @@ def internalblue_cli():
|
||||
parser.add_argument("--verbose", "-v", help="Set log level to DEBUG", action="store_true")
|
||||
parser.add_argument("--ios-device", "-i", help="Tell internalblue to connect to a remote iPhone HCI socket. Specify socket IP address and port (i.e., 172.20.10.1:1234).")
|
||||
parser.add_argument("--serialsu", "-s", help="On ADB, directly try su/serial/busybox scripting, if you do not have a special bluetooth.default.so file.", action="store_true")
|
||||
args = parser.parse_args()
|
||||
parser.add_argument("--testdevice", "-t", help="Use a dummy test device to execute testcases", action="store_true")
|
||||
parser.add_argument("--trace", help="Trace hci connection")
|
||||
parser.add_argument("--device", help="Specify device/core to be used")
|
||||
parser.add_argument("--commands", "-c", help="CLI command to run before prompting, seperated by ';' (used for easier testing)")
|
||||
args = parser.parse_args(argv)
|
||||
|
||||
if args.data_directory is not None:
|
||||
data_directory = args.data_directory
|
||||
@@ -119,13 +136,26 @@ def internalblue_cli():
|
||||
readline_completer = term.completer.LongestPrefixCompleter(words=cmd_keywords)
|
||||
term.readline.set_completer(readline_completer)
|
||||
|
||||
|
||||
|
||||
if args.trace:
|
||||
from .socket_hooks import hook
|
||||
from internalblue import socket_hooks
|
||||
HookClass = getattr(socket_hooks, args.trace)
|
||||
hook(HCICore, HookClass)
|
||||
|
||||
|
||||
# Initalize cores and get devices
|
||||
# As macOS has additional dependencies (objc), only import it here if needed
|
||||
connection_methods = [] # type: List[InternalBlue]
|
||||
if args.ios_device:
|
||||
from ioscore import iOSCore
|
||||
from .ioscore import iOSCore
|
||||
connection_methods = [iOSCore(args.ios_device, log_level=log_level, data_directory=data_directory)]
|
||||
elif args.testdevice:
|
||||
from .testcore import testCore
|
||||
connection_methods = [testCore(log_level=log_level, data_directory=data_directory)]
|
||||
elif platform == "darwin":
|
||||
from macoscore import macOSCore
|
||||
from .macoscore import macOSCore
|
||||
connection_methods = [
|
||||
macOSCore(log_level=log_level, data_directory=data_directory),
|
||||
ADBCore(log_level=log_level, data_directory=data_directory)]
|
||||
@@ -134,12 +164,25 @@ def internalblue_cli():
|
||||
ADBCore(log_level=log_level, data_directory=data_directory, serial=args.serialsu),
|
||||
HCICore(log_level=log_level, data_directory=data_directory)]
|
||||
|
||||
devices = []
|
||||
devices = [] # type: List[DeviceTuple]
|
||||
for connection_method in connection_methods:
|
||||
devices.extend(connection_method.device_list())
|
||||
|
||||
|
||||
device = None # type: Optional[DeviceTuple]
|
||||
if len(devices) > 0:
|
||||
if len(devices) == 1:
|
||||
if args.device:
|
||||
matching_devices = [ dev for dev in devices if dev[1] == args.device]
|
||||
if len(matching_devices) > 1:
|
||||
log.critical("Found multiple matching devices")
|
||||
exit(-1)
|
||||
elif len(matching_devices) == 1:
|
||||
log.info("Found device is: {}".format(matching_devices[0]))
|
||||
device = matching_devices[0]
|
||||
else:
|
||||
log.critical("No matching devices found")
|
||||
exit(-1)
|
||||
elif len(devices) == 1:
|
||||
device = devices[0]
|
||||
else:
|
||||
i = options('Please specify device:', [d[2] for d in devices], 0)
|
||||
@@ -160,7 +203,7 @@ def internalblue_cli():
|
||||
exit(-1)
|
||||
|
||||
# Enter command loop (runs until user quits)
|
||||
commandLoop(reference)
|
||||
commandLoop(reference, init_commands=args.commands)
|
||||
|
||||
# shutdown connection
|
||||
reference.shutdown()
|
||||
@@ -175,5 +218,5 @@ def internalblue_cli():
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
internalblue_cli()
|
||||
internalblue_cli(sys.argv[1:])
|
||||
|
||||
|
||||
+40
-6
@@ -36,7 +36,25 @@ import time
|
||||
import select
|
||||
import json
|
||||
|
||||
|
||||
from internalblue import getLogger
|
||||
|
||||
#log = getLogger(__name__)
|
||||
|
||||
|
||||
|
||||
try:
|
||||
from typing import List, Optional, Any, TYPE_CHECKING, Tuple, Type
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from internalblue.core import InternalBlue
|
||||
from internalblue.hci import HCI
|
||||
from internalblue import Record, BluetoothAddress, Address
|
||||
except:
|
||||
pass
|
||||
|
||||
def getCmdList():
|
||||
# type: () -> List[Type['Cmd']]
|
||||
""" Returns a list of all commands which are defined in this cmds.py file.
|
||||
This is done by searching for all subclasses of Cmd
|
||||
"""
|
||||
@@ -44,6 +62,7 @@ def getCmdList():
|
||||
if inspect.isclass(obj) and issubclass(obj, Cmd)][1:]
|
||||
|
||||
def findCmd(keyword):
|
||||
# type: (str) -> Optional[Type['Cmd']]
|
||||
""" Find and return a Cmd subclass for a given keyword.
|
||||
"""
|
||||
command_list = getCmdList()
|
||||
@@ -61,11 +80,13 @@ def auto_int(x):
|
||||
return int(x, 0)
|
||||
|
||||
def bt_addr_to_str(bt_addr):
|
||||
# type: (BluetoothAddress) -> str
|
||||
""" Convert a Bluetooth address (6 bytes) into a human readable format.
|
||||
"""
|
||||
return ":".join([b.encode("hex") for b in bt_addr])
|
||||
|
||||
def parse_bt_addr(bt_addr):
|
||||
# type: (Any) -> Optional[BluetoothAddress]
|
||||
""" Convert Bluetooth address argument and check lengths.
|
||||
"""
|
||||
addr = bt_addr
|
||||
@@ -92,11 +113,12 @@ class Cmd:
|
||||
'keywords' list as member variable. The actual implementation of the
|
||||
command should be located in the work() method.
|
||||
"""
|
||||
keywords = []
|
||||
keywords = [] # type: List[str]
|
||||
|
||||
memory_image = None
|
||||
memory_image = None # type: Optional[bytes]
|
||||
|
||||
def __init__(self, cmdline, internalblue):
|
||||
# type: (str, InternalBlue) -> None
|
||||
self.cmdline = cmdline
|
||||
self.internalblue = internalblue
|
||||
self.memory_image_template_filename = internalblue.data_directory + "/memdump__template.bin"
|
||||
@@ -105,23 +127,28 @@ class Cmd:
|
||||
self.internalblue.fw.__name__[6:12] + "_template.bin"
|
||||
|
||||
def __str__(self):
|
||||
# type: () -> str
|
||||
return self.cmdline
|
||||
|
||||
def work(self):
|
||||
# type: () -> bool
|
||||
return True
|
||||
|
||||
def abort_cmd(self):
|
||||
# type: () -> None
|
||||
self.aborted = True
|
||||
if hasattr(self, 'progress_log'):
|
||||
self.progress_log.failure("Command aborted")
|
||||
|
||||
def getArgs(self):
|
||||
# type: () -> Any
|
||||
try:
|
||||
return self.parser.parse_args(self.cmdline.split(' ')[1:])
|
||||
except SystemExit:
|
||||
return None
|
||||
|
||||
def isAddressInSections(self, address, length=0, sectiontype=""):
|
||||
# type: (int, int, str) -> bool
|
||||
if not self.internalblue.fw:
|
||||
return False
|
||||
|
||||
@@ -137,12 +164,15 @@ class Cmd:
|
||||
return False
|
||||
|
||||
def readMem(self, address, length, progress_log=None, bytes_done=0, bytes_total=0):
|
||||
# type: (Address, int, Optional[Any], int, int) -> Optional[bytes]
|
||||
return self.internalblue.readMem(address, length, progress_log, bytes_done, bytes_total)
|
||||
|
||||
def writeMem(self, address, data, progress_log=None, bytes_done=0, bytes_total=0):
|
||||
# type: (Address, bytes, Optional[Any], int, int) -> bool
|
||||
return self.internalblue.writeMem(address, data, progress_log, bytes_done, bytes_total)
|
||||
|
||||
def initMemoryImage(self):
|
||||
# type: () -> None
|
||||
"""
|
||||
Initially read out a chip's memory, all sections (RAM+ROM).
|
||||
:return:
|
||||
@@ -168,6 +198,7 @@ class Cmd:
|
||||
self.refreshMemoryImage()
|
||||
|
||||
def refreshMemoryImage(self):
|
||||
# type: () -> None
|
||||
"""
|
||||
Update an existing memory dump, only RAM sections.
|
||||
:return:
|
||||
@@ -178,12 +209,13 @@ class Cmd:
|
||||
for section in self.internalblue.fw.SECTIONS:
|
||||
if not section.is_rom:
|
||||
sectiondump = self.readMem(section.start_addr, section.size(), self.progress_log, bytes_done, bytes_total)
|
||||
if sectiondump:
|
||||
if sectiondump and Cmd.memory_image:
|
||||
Cmd.memory_image = Cmd.memory_image[0:section.start_addr] + sectiondump + Cmd.memory_image[section.end_addr:]
|
||||
bytes_done += section.size()
|
||||
self.progress_log.success("Received Data: complete")
|
||||
|
||||
def getMemoryImage(self, refresh=False):
|
||||
# type: (bool) -> Any
|
||||
if Cmd.memory_image is None:
|
||||
self.initMemoryImage()
|
||||
elif refresh:
|
||||
@@ -411,6 +443,7 @@ class CmdMonitor(Cmd):
|
||||
self.wireshark_process = None
|
||||
|
||||
def adbhciCallback(self, record):
|
||||
# type: (Record) -> None
|
||||
hcipkt, orig_len, inc_len, flags, drops, recvtime = record
|
||||
|
||||
dummy = "\x00\x00\x00" # TODO: Figure out purpose of these fields
|
||||
@@ -503,6 +536,7 @@ class CmdDumpMem(Cmd):
|
||||
help="Only dump the two RAM sections.")
|
||||
parser.add_argument("--file", "-f", default="memdump.bin",
|
||||
help="Filename of memory dump (default: %(default)s)")
|
||||
parser.add_argument("--overwrite", action='store_true')
|
||||
|
||||
def work(self):
|
||||
args = self.getArgs()
|
||||
@@ -517,7 +551,7 @@ class CmdDumpMem(Cmd):
|
||||
for section in filter(lambda s: s.is_ram, self.internalblue.fw.SECTIONS):
|
||||
filename = args.file + "_" + hex(section.start_addr)
|
||||
if os.path.exists(filename):
|
||||
if not yesno("Update '%s'?" % filename):
|
||||
if not (args.overwrite or yesno("Update '%s'?" % filename)):
|
||||
log.info("Skipping section @%s" % hex(section.start_addr))
|
||||
bytes_done += section.size()
|
||||
continue
|
||||
@@ -531,7 +565,7 @@ class CmdDumpMem(Cmd):
|
||||
|
||||
# Get complete memory image
|
||||
if os.path.exists(args.file):
|
||||
if not yesno("Update '%s'?" % os.path.abspath(args.file)):
|
||||
if not (args.overwrite or yesno("Update '%s'?" % os.path.abspath(args.file))):
|
||||
return False
|
||||
|
||||
dump = self.getMemoryImage(refresh=not args.norefresh)
|
||||
@@ -926,7 +960,7 @@ class CmdSendHciCmd(Cmd):
|
||||
else:
|
||||
data += data_part.decode('hex')
|
||||
|
||||
self.internalblue.sendHciCommand(args.cmdcode, data)
|
||||
return self.internalblue.sendHciCommand(args.cmdcode, data)
|
||||
|
||||
return True
|
||||
|
||||
|
||||
+69
-11
@@ -28,17 +28,31 @@
|
||||
from abc import ABCMeta, abstractmethod
|
||||
|
||||
from pwn import *
|
||||
from fw.fw import Firmware
|
||||
from .fw.fw import Firmware
|
||||
import datetime
|
||||
import time
|
||||
import Queue
|
||||
import hci
|
||||
from . import hci
|
||||
|
||||
try:
|
||||
from typing import List, Optional, Any, TYPE_CHECKING, Tuple, Union, NewType, Callable
|
||||
from internalblue import Address, Record, Task, HCI_CMD, FilterFunction, ConnectionNumber, ConnectionDict, \
|
||||
ConnectionIndex, BluetoothAddress, HeapInformation, QueueInformation, Opcode
|
||||
from internalblue.hci import HCI
|
||||
from . import DeviceTuple
|
||||
if TYPE_CHECKING:
|
||||
pass
|
||||
except:
|
||||
pass
|
||||
|
||||
#import logging
|
||||
#log = logging.getLogger(__name__)
|
||||
|
||||
class InternalBlue:
|
||||
__metaclass__ = ABCMeta
|
||||
|
||||
def __init__(self, queue_size=1000, btsnooplog_filename='btsnoop.log', log_level='info', fix_binutils='True', data_directory="."):
|
||||
# type: (int, str, str, bool, str) -> None
|
||||
context.log_level = log_level
|
||||
context.log_file = data_directory + '/_internalblue.log'
|
||||
context.arch = "thumb"
|
||||
@@ -48,11 +62,11 @@ class InternalBlue:
|
||||
|
||||
|
||||
self.data_directory = data_directory
|
||||
self.s_inject = None # This is the TCP socket to the HCI inject port
|
||||
self.s_snoop = None # This is the TCP socket to the HCI snoop port
|
||||
self.s_inject = None #type: socket.socket # This is the TCP socket to the HCI inject port
|
||||
self.s_snoop = None #type: socket.socket # This is the TCP socket to the HCI snoop port
|
||||
|
||||
# If btsnooplog_filename is set, write all incomming HCI packets to a file (can be viewed in wireshark for debugging)
|
||||
if btsnooplog_filename != None:
|
||||
if btsnooplog_filename is not None:
|
||||
self.write_btsnooplog = True
|
||||
self.btsnooplog_file = open(self.data_directory + "/" + btsnooplog_filename, "wb")
|
||||
else:
|
||||
@@ -78,7 +92,7 @@ class InternalBlue:
|
||||
# firmware (the response is recognized with the help of the filter function).
|
||||
# Once the response arrived, it puts the response into the response_queue from
|
||||
# the tuple. See sendH4() and sendHciCommand().
|
||||
self.sendQueue = Queue.Queue(queue_size)
|
||||
self.sendQueue = Queue.Queue(queue_size) # type: Queue.Queue[Task]
|
||||
|
||||
self.recvThread = None # The thread which is responsible for the HCI snoop socket
|
||||
self.sendThread = None # The thread which is responsible for the HCI inject socket
|
||||
@@ -103,7 +117,7 @@ class InternalBlue:
|
||||
# filter_function will be called for each packet that is received and only if it returns
|
||||
# True, the packet will be put into the queue. The filter_function can be None in order
|
||||
# to put all packets into the queue.
|
||||
self.registeredHciRecvQueues = []
|
||||
self.registeredHciRecvQueues = [] # type: List[Tuple[Queue.Queue[Record], FilterFunction]]
|
||||
|
||||
self.exit_requested = False # Will be set to true when the framework wants to shut down (e.g. on error or user exit)
|
||||
self.running = False # 'running' is True once the connection to the HCI sockets is established
|
||||
@@ -162,6 +176,7 @@ class InternalBlue:
|
||||
return False
|
||||
|
||||
def _parse_time(self, time):
|
||||
# type: (Any) -> datetime.datetime
|
||||
"""
|
||||
Taken from: https://github.com/joekickass/python-btsnoop
|
||||
|
||||
@@ -178,9 +193,11 @@ class InternalBlue:
|
||||
|
||||
@abstractmethod
|
||||
def _recvThreadFunc(self):
|
||||
# type: () -> None
|
||||
pass
|
||||
|
||||
def _sendThreadFunc(self):
|
||||
# type: () -> None
|
||||
"""
|
||||
This is the run-function of the sendThread. It polls the sendQueue for new 'send tasks'
|
||||
and executes them (sends H4 commands to the chip and returns the response).
|
||||
@@ -262,8 +279,8 @@ class InternalBlue:
|
||||
try:
|
||||
log.debug("_sendThreadFunc: Send: " + str(out.encode('hex')))
|
||||
self.s_inject.send(out)
|
||||
except:
|
||||
log.warn("_sendThreadFunc: Sending to socket failed, reestablishing connection.\nWith HCI sockets, some HCI commands require root!")
|
||||
except Exception as e:
|
||||
log.warn("_sendThreadFunc: Sending to socket failed with {}, reestablishing connection.\nWith HCI sockets, some HCI commands require root!".format(e))
|
||||
# socket are terminated by hcicore..
|
||||
self._teardownSockets()
|
||||
self._setupSockets()
|
||||
@@ -287,6 +304,7 @@ class InternalBlue:
|
||||
log.debug("Send Thread terminated.")
|
||||
|
||||
def _tracepointHciCallbackFunction(self, record):
|
||||
# type: (Record) -> None
|
||||
hcipkt = record[0] # get HCI Event packet
|
||||
timestamp = record[5] # get timestamp
|
||||
|
||||
@@ -348,6 +366,7 @@ class InternalBlue:
|
||||
|
||||
|
||||
def addTracepoint(self, address):
|
||||
# type: (Address) -> bool
|
||||
# Check if constants are defined in fw.py
|
||||
for const in ['TRACEPOINT_BODY_ASM_LOCATION', 'TRACEPOINT_BODY_ASM_SNIPPET',
|
||||
'TRACEPOINT_HOOK_ASM', 'TRACEPOINT_HOOKS_LOCATION',
|
||||
@@ -441,6 +460,7 @@ class InternalBlue:
|
||||
return True
|
||||
|
||||
def deleteTracepoint(self, address):
|
||||
# type: (Address) -> bool
|
||||
if not self.check_running():
|
||||
return False
|
||||
|
||||
@@ -460,6 +480,7 @@ class InternalBlue:
|
||||
return True
|
||||
|
||||
def check_running(self):
|
||||
# type: () -> bool
|
||||
"""
|
||||
Check if the framework is running (i.e. the sockets are connected,
|
||||
the recv and send threads are running and exit_requested is not True)
|
||||
@@ -475,10 +496,11 @@ class InternalBlue:
|
||||
|
||||
@abstractmethod
|
||||
def device_list(self):
|
||||
# type: () -> List[DeviceTuple]
|
||||
pass
|
||||
|
||||
def connect(self):
|
||||
|
||||
# type: () -> bool
|
||||
if self.exit_requested:
|
||||
self.shutdown()
|
||||
|
||||
@@ -524,6 +546,7 @@ class InternalBlue:
|
||||
return True
|
||||
|
||||
def initialize_fimware(self):
|
||||
# type: () -> bool
|
||||
"""
|
||||
Checks if we are running on a Broadcom chip and loads available firmware information based
|
||||
on LMP subversion.
|
||||
@@ -561,6 +584,7 @@ class InternalBlue:
|
||||
return True
|
||||
|
||||
def shutdown(self):
|
||||
# type: () -> None
|
||||
"""
|
||||
Shutdown the framework by stopping the send and recv threads. Socket shutdown
|
||||
also terminates port forwarding if adb is used.
|
||||
@@ -593,6 +617,7 @@ class InternalBlue:
|
||||
log.info("Shutdown complete.")
|
||||
|
||||
def registerHciCallback(self, callback):
|
||||
# type: (Callable[[Record], None ]) -> None
|
||||
"""
|
||||
Add a new callback function to self.registeredHciCallbacks.
|
||||
The function will be called every time the recvThread receives
|
||||
@@ -612,6 +637,7 @@ class InternalBlue:
|
||||
self.registeredHciCallbacks.append(callback)
|
||||
|
||||
def unregisterHciCallback(self, callback):
|
||||
# type: (Callable[[Tuple[HCI, int, int, int, Any, datetime.datetime]], None ]) -> None
|
||||
"""
|
||||
Remove a callback function from self.registeredHciCallbacks.
|
||||
"""
|
||||
@@ -622,6 +648,7 @@ class InternalBlue:
|
||||
log.warn("registerHciCallback: no such callback is registered!")
|
||||
|
||||
def registerHciRecvQueue(self, queue, filter_function=None):
|
||||
# type: (Queue.Queue[Record], FilterFunction) -> None
|
||||
"""
|
||||
Add a new queue to self.registeredHciRecvQueues.
|
||||
The queue will be filled by the recvThread every time the thread receives
|
||||
@@ -644,6 +671,7 @@ class InternalBlue:
|
||||
self.registeredHciRecvQueues.append((queue, filter_function))
|
||||
|
||||
def unregisterHciRecvQueue(self, queue):
|
||||
# type: (Queue.Queue[Tuple[HCI, int, int, int, Any, datetime]]) -> None
|
||||
"""
|
||||
Remove a queue from self.registeredHciRecvQueues.
|
||||
"""
|
||||
@@ -655,6 +683,7 @@ class InternalBlue:
|
||||
log.warn("registerHciRecvQueue: no such queue is registered!")
|
||||
|
||||
def sendHciCommand(self, opcode, data, timeout=3):
|
||||
# type: (Opcode, bytes, int) -> Optional[Any]
|
||||
"""
|
||||
Send an arbitrary HCI command packet by pushing a send-task into the
|
||||
sendQueue. This function blocks until the response is received
|
||||
@@ -674,6 +703,7 @@ class InternalBlue:
|
||||
# define a filter function which recognizes the response (command complete
|
||||
# or command status event).
|
||||
def recvFilterFunction(record):
|
||||
# type: (Record) -> bool
|
||||
hcipkt = record[0]
|
||||
|
||||
log.debug("sendHciCommand.recvFilterFunction: got response")
|
||||
@@ -703,6 +733,7 @@ class InternalBlue:
|
||||
return None
|
||||
|
||||
def sendH4(self, h4type, data, timeout=2):
|
||||
# type: (HCI_CMD, bytes, int) -> bool
|
||||
"""
|
||||
Send an arbitrary H4 packet by pushing a send-task into the
|
||||
sendQueue. This function does not wait for a response! If you
|
||||
@@ -719,6 +750,7 @@ class InternalBlue:
|
||||
return False
|
||||
|
||||
def recvPacket(self, timeout=None):
|
||||
# type: (Optional[int]) -> Optional[Record]
|
||||
"""
|
||||
This function polls the recvQueue for the next available HCI
|
||||
packet and returns it. The function checks whether it is called
|
||||
@@ -743,6 +775,7 @@ class InternalBlue:
|
||||
return None
|
||||
|
||||
def readMem(self, address, length, progress_log=None, bytes_done=0, bytes_total=0):
|
||||
# type: (int, int, Optional[Any], int, int) -> Optional[bytes]
|
||||
"""
|
||||
Reads <length> bytes from the memory space of the firmware at the given
|
||||
address. Reading from unmapped memory or certain memory-mapped-IO areas
|
||||
@@ -827,6 +860,7 @@ class InternalBlue:
|
||||
return outbuffer
|
||||
|
||||
def readMemAligned(self, address, length, progress_log=None, bytes_done=0, bytes_total=0):
|
||||
# type: (int, int, Optional[Any], int, int) -> Any
|
||||
"""
|
||||
This is an alternative to readMem() which enforces a strictly aligned access
|
||||
to the memory that is read. This is needed for e.g. the memory-mapped-IO
|
||||
@@ -864,6 +898,7 @@ class InternalBlue:
|
||||
|
||||
recvQueue = Queue.Queue(1)
|
||||
def hciFilterFunction(record):
|
||||
# type: (Record) -> bool
|
||||
hcipkt = record[0]
|
||||
if not issubclass(hcipkt.__class__, hci.HCI_Event):
|
||||
return False
|
||||
@@ -922,6 +957,7 @@ class InternalBlue:
|
||||
return outbuffer
|
||||
|
||||
def writeMem(self, address, data, progress_log=None, bytes_done=0, bytes_total=0):
|
||||
# type: (int, bytes, Optional[Any], int, int) -> Optional[bool]
|
||||
"""
|
||||
Writes the <data> to the memory space of the firmware at the given
|
||||
address.
|
||||
@@ -964,6 +1000,7 @@ class InternalBlue:
|
||||
return True
|
||||
|
||||
def launchRam(self, address):
|
||||
# type: (int) -> bool
|
||||
"""
|
||||
Executes a function at the specified address in the context of the HCI
|
||||
handler thread. The function has to comply with the calling convention.
|
||||
@@ -988,6 +1025,7 @@ class InternalBlue:
|
||||
return True
|
||||
|
||||
def getPatchramState(self):
|
||||
# type: () -> Tuple[List[Optional[int]], List[Any], List[Any]]
|
||||
"""
|
||||
Retrieves the current state of the patchram unit. The return value
|
||||
is a tuple containing 3 lists which are indexed by the slot number:
|
||||
@@ -1033,6 +1071,7 @@ class InternalBlue:
|
||||
return (table_addresses, table_values, slot_bits)
|
||||
|
||||
def patchRom(self, address, patch, slot=None):
|
||||
# type: (int, Any, Optional[Any]) -> bool
|
||||
"""
|
||||
Patch a 4-byte value (DWORD) inside the ROM section of the firmware
|
||||
(0x0 - 0x8FFFF) using the patchram mechanism. There are 128 available
|
||||
@@ -1114,6 +1153,7 @@ class InternalBlue:
|
||||
return True
|
||||
|
||||
def disableRomPatch(self, address, slot=None):
|
||||
# type: (int, Optional[int]) -> bool
|
||||
"""
|
||||
Disable a patchram slot (see also patchRom()). The slot can either be
|
||||
specified by the target address (address that was patched) or by providing
|
||||
@@ -1156,6 +1196,7 @@ class InternalBlue:
|
||||
return True
|
||||
|
||||
def readConnectionInformation(self, conn_number):
|
||||
# type: (ConnectionNumber) -> Optional[ConnectionDict]
|
||||
"""
|
||||
Reads and parses a connection struct based on the connection number.
|
||||
Note: The connection number is different from the connection index!
|
||||
@@ -1221,6 +1262,7 @@ class InternalBlue:
|
||||
return conn_dict
|
||||
|
||||
def sendLmpPacket(self, opcode, payload='', is_master=True, conn_handle=0x0c, extended_op=False):
|
||||
# type: (Opcode, Any, bool, ConnectionNumber, bool) -> bool
|
||||
"""
|
||||
Inject a LMP packet into a Bluetooth connection (i.e. send a LMP packet
|
||||
to a remote device which is paired and connected with our local device).
|
||||
@@ -1285,6 +1327,7 @@ class InternalBlue:
|
||||
return True
|
||||
|
||||
def fuzzLmp(self):
|
||||
# type: ()-> bool
|
||||
"""
|
||||
Installs a patch inside the sendLmp HCI handler that allows sending arbitrary
|
||||
LMP payloads. Afterwards, use sendLmpPacket as before.
|
||||
@@ -1312,6 +1355,7 @@ class InternalBlue:
|
||||
return True
|
||||
|
||||
def sendLmpPacketLegacy(self, conn_nr, opcode, payload, extended_op=False):
|
||||
# type: (int, Opcode, bytes, bool) -> bool
|
||||
"""
|
||||
Inject a LMP packet into a Bluetooth connection (i.e. send a LMP packet
|
||||
to a remote device which is paired and connected with our local device).
|
||||
@@ -1348,7 +1392,7 @@ class InternalBlue:
|
||||
|
||||
# Prepare the assembler snippet by injecting the connection number
|
||||
# and appending the LMP packet data.
|
||||
asm_code = self.fw.SENDLMP_ASM_CODE % (conn_nr)
|
||||
asm_code = self.fw.SENDLMP_ASM_CODE % (conn_nr) # type: str
|
||||
asm_code_with_data = asm_code + ''.join([".byte 0x%02x\n" % ord(x)
|
||||
for x in data.ljust(20, "\x00")])
|
||||
|
||||
@@ -1364,6 +1408,7 @@ class InternalBlue:
|
||||
return False
|
||||
|
||||
def sendLcpPacket(self, conn_idx, payload):
|
||||
# type: (ConnectionIndex, bytes) -> bool
|
||||
"""
|
||||
Inject a LCP packet into a Bluetooth LE connection (i.e. send a LCP packet
|
||||
to a remote device which is paired and connected with our local device).
|
||||
@@ -1400,6 +1445,7 @@ class InternalBlue:
|
||||
return False
|
||||
|
||||
def connectToRemoteDevice(self, bt_addr):
|
||||
# type: (BluetoothAddress) -> None
|
||||
"""
|
||||
Send a HCI Connect Command to the firmware. This will setup
|
||||
a connection (inserted into the connection structure) if the
|
||||
@@ -1427,6 +1473,7 @@ class InternalBlue:
|
||||
self.sendHciCommand(0x0405, bt_addr[::-1] + '\x00\x00\x00\x00\x00\x00\x01')
|
||||
|
||||
def connectToRemoteLEDevice(self, bt_addr, addr_type=0x00):
|
||||
# type: (BluetoothAddress, int) -> None
|
||||
"""
|
||||
Send a HCI LE Create Connection Command to the firmware as
|
||||
defined in the Bluetooth Core Specification 5.0 p. 1266.
|
||||
@@ -1443,6 +1490,7 @@ class InternalBlue:
|
||||
self.sendHciCommand(0x200d, '\x60\x00\x30\x00\x00' + p8(addr_type) + bt_addr[::-1] + '\x01\x18\x00\x28\x00\x00\x00\xd0\x07\x00\x00\x00\x00')
|
||||
|
||||
def connectionStatusCallback(self, record):
|
||||
# type: (Record) -> None
|
||||
"""
|
||||
HCI Callback function to detect HCI Events related to
|
||||
Create Connection
|
||||
@@ -1475,6 +1523,7 @@ class InternalBlue:
|
||||
log.info("[Disconnect Complete: Handle=0x%x]" % (conn_handle))
|
||||
|
||||
def coexStatusCallback(self, record):
|
||||
# type: (Record) -> None
|
||||
"""
|
||||
Coexistence Callback Function
|
||||
Interprets debug counters for coexistence with WiFi/LTE
|
||||
@@ -1499,6 +1548,7 @@ class InternalBlue:
|
||||
return
|
||||
|
||||
def readHeapInformation(self):
|
||||
# type: () -> Optional[Union[HeapInformation, bool]]
|
||||
"""
|
||||
Traverses the double-linked list of BLOC structs and returns them as a
|
||||
list of dictionaries. The dicts have the following fields:
|
||||
@@ -1592,6 +1642,7 @@ class InternalBlue:
|
||||
|
||||
|
||||
def readQueueInformation(self):
|
||||
# type: () -> Optional[Union[bool, QueueInformation]]
|
||||
"""
|
||||
Traverses the double-linked list of QUEUE structs and returns them as a
|
||||
list of dictionaries. The dicts have the following fields:
|
||||
@@ -1655,6 +1706,7 @@ class InternalBlue:
|
||||
return queuelist
|
||||
|
||||
def enableBroadcomDiagnosticLogging(self, enable):
|
||||
# type: (bool) -> None
|
||||
"""
|
||||
Broadcom implemented their own H4 layer protocol. Normally H4 handles HCI
|
||||
messages like HCI commands, SCO and ACL data, and HCI events. Their types are
|
||||
@@ -1681,3 +1733,9 @@ class InternalBlue:
|
||||
# itself crashes when receiving diagnostic frames...
|
||||
else:
|
||||
log.warn("Diagnostic protocol requires modified Android driver!")
|
||||
|
||||
def _setupSockets(self):
|
||||
raise NotImplementedError()
|
||||
|
||||
def _teardownSockets(self):
|
||||
raise NotImplementedError()
|
||||
|
||||
@@ -20,7 +20,7 @@
|
||||
# out of or in connection with the Software or the use or other dealings in the
|
||||
# Software.
|
||||
|
||||
from fw import MemorySection
|
||||
from .fw import MemorySection
|
||||
|
||||
# Firmware Infos
|
||||
# iPhone 6
|
||||
|
||||
@@ -20,7 +20,7 @@
|
||||
# out of or in connection with the Software or the use or other dealings in the
|
||||
# Software.
|
||||
|
||||
from fw import MemorySection
|
||||
from .fw import MemorySection
|
||||
|
||||
# Firmware Infos
|
||||
# Samsung Galaxy S8
|
||||
|
||||
@@ -23,7 +23,7 @@
|
||||
# out of or in connection with the Software or the use or other dealings in the
|
||||
# Software.
|
||||
|
||||
from fw import MemorySection
|
||||
from .fw import MemorySection
|
||||
|
||||
# Firmware Infos
|
||||
FW_NAME = "BCM20702A2"
|
||||
|
||||
@@ -20,7 +20,7 @@
|
||||
# out of or in connection with the Software or the use or other dealings in the
|
||||
# Software.
|
||||
|
||||
from fw import MemorySection
|
||||
from .fw import MemorySection
|
||||
|
||||
# Firmware Infos
|
||||
# Evaluation Kit CYW920735
|
||||
|
||||
@@ -20,7 +20,7 @@
|
||||
# out of or in connection with the Software or the use or other dealings in the
|
||||
# Software.
|
||||
|
||||
from fw import MemorySection
|
||||
from .fw import MemorySection
|
||||
|
||||
# Firmware Infos
|
||||
# Evaluation Kit CYW927019
|
||||
|
||||
@@ -20,7 +20,7 @@
|
||||
# out of or in connection with the Software or the use or other dealings in the
|
||||
# Software.
|
||||
|
||||
from fw import MemorySection
|
||||
from .fw import MemorySection
|
||||
|
||||
# Firmware Infos
|
||||
# iPhone 8/X/XR
|
||||
|
||||
@@ -22,7 +22,7 @@
|
||||
# out of or in connection with the Software or the use or other dealings in the
|
||||
# Software.
|
||||
|
||||
from fw import MemorySection
|
||||
from .fw import MemorySection
|
||||
|
||||
# Firmware Infos
|
||||
FW_NAME = "BCM2070B0 (MacBook Pro 2011)"
|
||||
|
||||
@@ -20,7 +20,7 @@
|
||||
# out of or in connection with the Software or the use or other dealings in the
|
||||
# Software.
|
||||
|
||||
from fw import MemorySection
|
||||
from .fw import MemorySection
|
||||
|
||||
# Firmware Infos
|
||||
# This runs on an iPhone 7
|
||||
|
||||
@@ -25,7 +25,7 @@
|
||||
# out of or in connection with the Software or the use or other dealings in the
|
||||
# Software.
|
||||
|
||||
from fw import MemorySection
|
||||
from .fw import MemorySection
|
||||
|
||||
# Firmware Infos
|
||||
# This runs on Nexus 5, Xperia Z3, Samsung Galaxy Note 3
|
||||
|
||||
@@ -20,7 +20,7 @@
|
||||
# out of or in connection with the Software or the use or other dealings in the
|
||||
# Software.
|
||||
|
||||
from fw import MemorySection
|
||||
from .fw import MemorySection
|
||||
|
||||
# Firmware Infos
|
||||
# iPhone 6
|
||||
|
||||
@@ -20,7 +20,7 @@
|
||||
# out of or in connection with the Software or the use or other dealings in the
|
||||
# Software.
|
||||
|
||||
from fw import MemorySection
|
||||
from .fw import MemorySection
|
||||
|
||||
# Firmware Infos
|
||||
# iPhone SE
|
||||
|
||||
@@ -22,7 +22,7 @@
|
||||
# out of or in connection with the Software or the use or other dealings in the
|
||||
# Software.
|
||||
|
||||
from fw import MemorySection
|
||||
from .fw import MemorySection
|
||||
|
||||
# Firmware Infos
|
||||
FW_NAME = "default (unknown firmware)"
|
||||
|
||||
@@ -56,6 +56,7 @@ class HCI(object):
|
||||
return HCI_UART_TYPE_CLASS[uart_type].from_data(data[1:])
|
||||
|
||||
def __init__(self, uart_type):
|
||||
self.event_code = None
|
||||
self.uart_type = uart_type
|
||||
|
||||
def getRaw(self):
|
||||
|
||||
+14
-5
@@ -4,11 +4,19 @@ import subprocess
|
||||
import datetime
|
||||
from pwn import *
|
||||
import fcntl
|
||||
from core import InternalBlue
|
||||
from .core import InternalBlue
|
||||
import hci
|
||||
import Queue
|
||||
import threading
|
||||
|
||||
try:
|
||||
from typing import List
|
||||
from internalblue import Device
|
||||
|
||||
|
||||
except:
|
||||
pass
|
||||
|
||||
# from /usr/include/bluetooth/hci.h:
|
||||
#define HCIDEVUP _IOW('H', 201, int)
|
||||
#define HCIGETDEVLIST _IOR('H', 210, int)
|
||||
@@ -34,6 +42,7 @@ class HCICore(InternalBlue):
|
||||
self.doublecheck = False
|
||||
|
||||
def getHciDeviceList(self):
|
||||
# type: () -> List[Device]
|
||||
"""
|
||||
Get a list of available HCI devices. The list is obtained by executing
|
||||
ioctl syscalls HCIGETDEVLIST and HCIGETDEVINFO. The returned list
|
||||
@@ -123,8 +132,8 @@ class HCICore(InternalBlue):
|
||||
log.info("HCI device: %s [%s] flags=%d<%s>" %
|
||||
(dev["dev_name"], dev["dev_bdaddr"],
|
||||
dev["dev_flags"], dev["dev_flags_str"]))
|
||||
device_list.append([self, dev["dev_name"], 'hci: %s (%s) <%s>' %
|
||||
(dev["dev_bdaddr"], dev["dev_name"], dev["dev_flags_str"])])
|
||||
device_list.append((self, dev["dev_name"], 'hci: %s (%s) <%s>' %
|
||||
(dev["dev_bdaddr"], dev["dev_name"], dev["dev_flags_str"])))
|
||||
|
||||
if len(device_list) == 0:
|
||||
log.info('No connected HCI device found')
|
||||
@@ -184,8 +193,8 @@ class HCICore(InternalBlue):
|
||||
record_data = self.s_snoop.recv(1024)
|
||||
except socket.timeout:
|
||||
continue # this is ok. just try again without error
|
||||
except Exception:
|
||||
log.critical("Lost device interface, terminating receive thread...")
|
||||
except Exception as e:
|
||||
log.critical("Lost device interface with exception {}, terminating receive thread...".format(e))
|
||||
self.exit_requested = True
|
||||
continue
|
||||
|
||||
|
||||
@@ -0,0 +1,175 @@
|
||||
import binascii
|
||||
import time
|
||||
|
||||
|
||||
class SocketRecvHook():
|
||||
def __init__(self, socket):
|
||||
# type: (socket.socket) -> None
|
||||
self.socket = socket
|
||||
self.replace = False
|
||||
|
||||
def recv_hook(self, data):
|
||||
raise NotImplementedError()
|
||||
|
||||
def recv_replace(self, length, **kwargs):
|
||||
raise NotImplementedError()
|
||||
|
||||
def recv(self, length, **kwargs):
|
||||
if not self.replace:
|
||||
data = self.socket.recv(length, **kwargs)
|
||||
else:
|
||||
data = self.recv_replace(length, **kwargs)
|
||||
self.recv_hook(data)
|
||||
return data
|
||||
|
||||
def recvfrom(self, length):
|
||||
# type: (int) -> Tuple[bytes, Any]
|
||||
if not self.replace:
|
||||
data, addr = self.socket.recvfrom(length)
|
||||
self.recv_hook(data)
|
||||
return data
|
||||
|
||||
class SocketInjectHook():
|
||||
def __init__(self, socket):
|
||||
# type: (socket.socket) -> None
|
||||
self.socket = socket
|
||||
self.replace = False
|
||||
|
||||
def close(self):
|
||||
self.socket.close()
|
||||
|
||||
def send(self,data):
|
||||
self.send_hook(data)
|
||||
if not self.replace:
|
||||
try:
|
||||
self.socket.send(data)
|
||||
except Exception as e:
|
||||
self.send_exception(e)
|
||||
raise e
|
||||
else:
|
||||
self.send_replace(data)
|
||||
|
||||
def send_hook(self,result):
|
||||
raise NotImplementedError()
|
||||
|
||||
def send_replace(self,data):
|
||||
raise NotImplementedError()
|
||||
|
||||
def send_exception(self, e):
|
||||
raise NotImplementedError()
|
||||
|
||||
|
||||
class SocketDuplexHook(SocketInjectHook, SocketRecvHook):
|
||||
|
||||
def __init__(self, socket):
|
||||
# type: (socket.socket) -> None
|
||||
self.socket = socket
|
||||
self.replace = False
|
||||
pass
|
||||
|
||||
|
||||
|
||||
class HookBase():
|
||||
def send_hook(self,data):
|
||||
raise NotImplementedError
|
||||
def recv_hook(self, data):
|
||||
raise NotImplementedError
|
||||
|
||||
|
||||
class TraceToFileHook(SocketDuplexHook):
|
||||
def __init__(self, socket, filename='/tmp/bt_hci.log'):
|
||||
SocketDuplexHook.__init__(self, socket)
|
||||
self.file = open(filename, 'a')
|
||||
self.replace = False
|
||||
self.log = []
|
||||
|
||||
def recv_hook(self, data):
|
||||
line = "RX {}\n".format(binascii.hexlify(data))
|
||||
print(line)
|
||||
self.log.append(line)
|
||||
|
||||
def send_hook(self, data):
|
||||
line = "TX {}\n".format(binascii.hexlify(data))
|
||||
print(line)
|
||||
self.log.append(line)
|
||||
|
||||
def send_exception(self, e):
|
||||
line = "EX '{}'\n".format(e)
|
||||
print(line)
|
||||
self.log.append(line)
|
||||
|
||||
def close(self):
|
||||
self.socket.close()
|
||||
self.log.append("Socket closed\n")
|
||||
self.file.writelines(self.log)
|
||||
self.file.close()
|
||||
|
||||
|
||||
import socket
|
||||
|
||||
|
||||
class PrintTrace(SocketDuplexHook):
|
||||
|
||||
def send_hook(self, data, **kwargs):
|
||||
print("Sent: {}".format(binascii.hexlify(data)))
|
||||
|
||||
def recv_hook(self, data, **kwargs):
|
||||
print("Recv: {}".format(binascii.hexlify(data)))
|
||||
|
||||
def recvfrom_hook(self, data, **kwargs):
|
||||
print("Recv: {}".format(binascii.hexlify(data)))
|
||||
|
||||
|
||||
|
||||
|
||||
class ReplaySocket():
|
||||
def __init__(self, filename='/tmp/bt.log'):
|
||||
super(ReplaySocket, self).__init__()
|
||||
self.log = open(filename).readlines()
|
||||
self.index = 0
|
||||
|
||||
def send(self, data, **kwargs):
|
||||
encoded_data = "" # type: str
|
||||
direction, encoded_data = self.log[self.index].split(" ")
|
||||
assert(direction == "TX")
|
||||
log_data = binascii.unhexlify(encoded_data.rstrip('\n'))
|
||||
assert(data == log_data)
|
||||
self.index+=1
|
||||
|
||||
def recv(self, **kwargs):
|
||||
time.sleep(0.01)
|
||||
direction, encoded_data = self.log[self.index].split(" ")
|
||||
if direction == "RX":
|
||||
return binascii.unhexlify(encoded_data.rstrip('\n'))
|
||||
else:
|
||||
raise socket.timeout()
|
||||
|
||||
|
||||
|
||||
from internalblue.core import InternalBlue
|
||||
|
||||
try:
|
||||
import typing
|
||||
from typing import Type
|
||||
except ImportError:
|
||||
pass
|
||||
|
||||
def hook(core, socket_hook):
|
||||
# type: (Type[InternalBlue], Type[SocketDuplexHook]) -> None
|
||||
|
||||
def wrap_socket_setup(orig_func):
|
||||
def wrapped_socket_setup(self, *args, **kwargs):
|
||||
status = orig_func(self, *args, **kwargs)
|
||||
if self.s_inject == self.s_snoop:
|
||||
h = socket_hook(self.s_inject)
|
||||
self.s_inject = h
|
||||
self.s_snoop = h
|
||||
else:
|
||||
self.s_inject = socket_hook(self.s_inject)
|
||||
self.s_snoop = socket_hook(self.s_snoop)
|
||||
return status
|
||||
|
||||
return wrapped_socket_setup
|
||||
|
||||
core._setupSockets = wrap_socket_setup(core._setupSockets)
|
||||
|
||||
@@ -0,0 +1,159 @@
|
||||
#!/usr/bin/env python2
|
||||
|
||||
import socket
|
||||
import Queue
|
||||
import hci
|
||||
|
||||
from pwn import *
|
||||
|
||||
from core import InternalBlue
|
||||
import binascii
|
||||
|
||||
filepath = os.path.dirname(os.path.abspath(__file__))
|
||||
|
||||
|
||||
try:
|
||||
import typing
|
||||
from typing import List, Tuple, Any
|
||||
from internalblue.core import InternalBlue
|
||||
|
||||
except:
|
||||
pass
|
||||
|
||||
class testCore(InternalBlue):
|
||||
def __init__(self, queue_size=1000, btsnooplog_filename='btsnoop.log', log_level='info', fix_binutils='True', data_directory="."):
|
||||
super(testCore, self).__init__(queue_size, btsnooplog_filename, log_level, fix_binutils, data_directory=".")
|
||||
file = open(filepath+'/../dummymemdump.bin', mode='rb')
|
||||
self.memory = file.read()
|
||||
file.close()
|
||||
self.doublecheck = False
|
||||
|
||||
def device_list(self):
|
||||
# type: () -> List[Tuple[InternalBlue,str,str]]
|
||||
"""
|
||||
Get a list of connected devices
|
||||
"""
|
||||
|
||||
if self.exit_requested:
|
||||
self.shutdown()
|
||||
|
||||
if self.running:
|
||||
log.warn("Already running. Call shutdown() first!")
|
||||
return []
|
||||
|
||||
# assume that a explicitly specified iPhone exists
|
||||
device_list = [(self, "Testchip", "Testchip")]
|
||||
|
||||
return device_list
|
||||
|
||||
def sendH4(self, h4type, data, timeout=2):
|
||||
"""
|
||||
Send an arbitrary HCI packet by pushing a send-task into the
|
||||
sendQueue. This function blocks until the response is received
|
||||
or the timeout expires. The return value is the Payload of the
|
||||
HCI Command Complete Event which was received in response to
|
||||
the command or None if no response was received within the timeout.
|
||||
"""
|
||||
|
||||
queue = Queue.Queue(1)
|
||||
|
||||
try:
|
||||
self.sendQueue.put((h4type, data, queue, None), timeout=timeout)
|
||||
ret = queue.get(timeout=timeout)
|
||||
return ret
|
||||
except Queue.Empty:
|
||||
log.warn("sendH4: waiting for response timed out!")
|
||||
return None
|
||||
except Queue.Full:
|
||||
log.warn("sendH4: send queue is full!")
|
||||
return None
|
||||
|
||||
def local_connect(self):
|
||||
return True
|
||||
|
||||
def _setupSockets(self):
|
||||
self.hciport = random.randint(60000, 65535 - 1)
|
||||
log.debug("_setupSockets: Selected random ports snoop=%d and inject=%d" % (self.hciport, self.hciport + 1))
|
||||
log.info("Wireshark configuration (on Loopback interface): udp.port == %d || udp.port == %d" % (
|
||||
self.hciport, self.hciport + 1))
|
||||
|
||||
# Create s_snoop socket
|
||||
self.s_snoop = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
||||
self.s_snoop.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
|
||||
self.s_snoop.bind(('127.0.0.1', self.hciport))
|
||||
self.s_snoop.settimeout(0.5)
|
||||
self.s_snoop.setblocking(True)
|
||||
|
||||
# Create s_inject
|
||||
self.s_inject = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
||||
self.s_inject.settimeout(0.5)
|
||||
self.s_inject.setblocking(True)
|
||||
|
||||
time.sleep(1.5)
|
||||
|
||||
return True
|
||||
|
||||
def _recvThreadFunc(self):
|
||||
log.debug("Receive Thread terminated.")
|
||||
|
||||
def _sendThreadFunc(self):
|
||||
log.debug("Send Thread started.")
|
||||
while not self.exit_requested:
|
||||
# Little bit ugly: need to re-apply changes to the global context to the thread-copy
|
||||
context.log_level = self.log_level
|
||||
|
||||
# Wait for 'send task' in send queue
|
||||
try:
|
||||
task = self.sendQueue.get(timeout=0.5)
|
||||
except Queue.Empty:
|
||||
continue
|
||||
|
||||
# Extract the components of the task
|
||||
h4type, data, queue, filter_function = task
|
||||
|
||||
# Prepend UART TYPE and length.
|
||||
out = p8(h4type) + p8(len(data)) + data
|
||||
|
||||
# Send command to the chip using IOBluetoothExtended framework
|
||||
h4type, data, queue, filter_function = task
|
||||
opcode = binascii.hexlify(data[1]) + binascii.hexlify(data[0])
|
||||
log.debug("Sending command: 0x" + binascii.hexlify(data))
|
||||
|
||||
# if the caller expects a response: register a queue to receive the response
|
||||
if queue is not None and filter_function is not None:
|
||||
recvQueue = Queue.Queue(1)
|
||||
self.registerHciRecvQueue(recvQueue, filter_function)
|
||||
|
||||
# if the caller expects a response:
|
||||
# Wait for the HCI event response by polling the recvQueue
|
||||
|
||||
if queue is not None and filter_function is not None:
|
||||
# Return responses according to the opcode & operands
|
||||
if opcode == '1001':
|
||||
record_data = '040E0C0101100006b415060f000e22'.decode('hex')
|
||||
data = hci.parse_hci_packet(record_data).data
|
||||
elif opcode == 'fc4d':
|
||||
length = int(binascii.hexlify(data[7]), 16)
|
||||
address = int(binascii.hexlify(data[6]+data[5]+data[4]+data[3]), 16)
|
||||
data = '014dfc00'.decode('hex') + self.memory[address:address+length]
|
||||
elif opcode == 'fc4c':
|
||||
log.info(data.encode('hex'))
|
||||
length = int(binascii.hexlify(data[2]), 16)
|
||||
address = int(binascii.hexlify(data[6]+data[5]+data[4]+data[3]), 16)
|
||||
self.memory = self.memory[:address] + data[7:len(data)] + self.memory[address+length:]
|
||||
else:
|
||||
print(opcode)
|
||||
|
||||
queue.put(data)
|
||||
self.unregisterHciRecvQueue(recvQueue)
|
||||
|
||||
log.debug("Send Thread terminated.")
|
||||
|
||||
def enableBroadcomDiagnosticLogging(self, enable):
|
||||
return
|
||||
|
||||
def _teardownSockets(self):
|
||||
return True
|
||||
|
||||
def shutdown(self):
|
||||
return True
|
||||
@@ -0,0 +1,17 @@
|
||||
import unittest
|
||||
|
||||
from internalblue.testcore import testCore
|
||||
|
||||
|
||||
class DummyCoreTest(unittest.TestCase):
|
||||
|
||||
def setUp(self):
|
||||
t = testCore(log_level='debug', data_directory='/tmp')
|
||||
dev = t.device_list()[0]
|
||||
reference = dev[0]
|
||||
reference.interface = dev[1]
|
||||
self.assert_(reference.connect(), 'Connect failed')
|
||||
self.reference = reference
|
||||
|
||||
def tearDown(self):
|
||||
self.reference.shutdown()
|
||||
@@ -0,0 +1,31 @@
|
||||
import nose
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
from .dummy_core_test import DummyCoreTest
|
||||
from internalblue.cmds import CmdHexdump, CmdSendHciCmd
|
||||
|
||||
import unittest
|
||||
|
||||
|
||||
class TestCmdSendHciCmd(DummyCoreTest):
|
||||
def test_version(self):
|
||||
|
||||
cmd = CmdSendHciCmd("sendhcicmd 0x1001", self.reference)
|
||||
result = cmd.work()
|
||||
nose.tools.assert_equal(result, b'\x01\x01\x10\x00\x06\xb4\x15\x06\x0f\x00\x0e"')
|
||||
pass
|
||||
|
||||
def test_write(self):
|
||||
cmd = CmdSendHciCmd("sendhcicmd 0xfc4c", self.reference)
|
||||
pass
|
||||
#cmd.readMem()
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
unittest.main()
|
||||
|
||||
@@ -0,0 +1,45 @@
|
||||
import nose
|
||||
|
||||
|
||||
|
||||
try:
|
||||
from typing import TYPE_CHECKING
|
||||
if TYPE_CHECKING:
|
||||
from internalblue import Address
|
||||
except ImportError:
|
||||
Address = lambda x: x
|
||||
pass
|
||||
|
||||
|
||||
|
||||
from .dummy_core_test import DummyCoreTest
|
||||
from internalblue.cmds import CmdHexdump
|
||||
|
||||
|
||||
import unittest
|
||||
|
||||
|
||||
class TestDirectMemReadWrite(DummyCoreTest):
|
||||
|
||||
def test_read_mem(self):
|
||||
from internalblue.cmds import CmdHexdump
|
||||
hdxdmp = CmdHexdump("hexdump 0xc0 -l 0x20", self.reference)
|
||||
dump = hdxdmp.readMem(Address(0xc0), 0x20)
|
||||
nose.tools.assert_equal(dump, b'\\\x01\x00\x00-.\x03\x00Copyright (c) Broadcom C')
|
||||
|
||||
|
||||
def test_write_mem(self):
|
||||
from internalblue.cmds import CmdWriteMem
|
||||
cmd = CmdWriteMem("writemem --hex 0xc0 41424344", self.reference)
|
||||
data = b'FOOBAR'
|
||||
status = cmd.writeMem(Address(0x1000), data)
|
||||
nose.tools.assert_true(status)
|
||||
|
||||
read = cmd.readMem(Address(0x1000), len(data))
|
||||
|
||||
nose.tools.assert_equal(data, read)
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
unittest.main()
|
||||
|
||||
Reference in New Issue
Block a user