15 Commits

Author SHA1 Message Date
Florian Magin 2ce2224421 Fix printing by regressing to using pwn directly 2020-02-27 14:55:02 +01:00
Florian Magin 104a35a79a Fix pwnlib related refactor so it still works with python2 2020-02-27 14:41:45 +01:00
Florian Magin e6b99906c9 Remove all 'from pwn import *' in internalblue code 2020-02-27 13:00:01 +01:00
Florian Magin d3059b01d8 Fix subtle byte vs int issues ( 0 != '\x00 is True) 2020-02-27 11:18:21 +01:00
Florian Magin a7066170fc Declare startup trace on adbcore as flaky (works when run directly, doesn't work as part of suite) 2020-02-27 11:13:47 +01:00
Davide Toldo 748c713f67 Fix one of the traces in Python 3 2020-02-23 16:58:01 +01:00
Florian Magin 0864e96569 Propagate exceptions to test framework, ensure required pwnlibs version, ensure that traces don't abort early 2020-02-20 15:23:03 +01:00
Davide Toldo c6e39cb18f * Make pytest testcases Python 2 & 3 compatible
* Make InternalBlue start with ADBCore in Python 3
* Make InternalBlue start with HCICore in Python 3
* Substitute var.decode('hex') with bytearray.fromhex(var) in most places I found which works in Python 2 and 3 and performs the same task
* Substitute var.encode('hex') with new byte_to_hex function that works with Python 2 and 3 (in util.py)
* Declare variables explicitly as bytes instead of strings
* Fix small issue in macOS Core (explicit declaration of variable as byte)
2020-02-17 03:02:59 +01:00
Davide Toldo 07c5c4c336 * CLI now starts up (at least on mac) on Python 2 and 3
* Tests run, some throw errors.
2020-02-16 20:07:23 +01:00
Florian Magin 01589f8eee Hack around pwnlibs treating everything as bytes 2020-02-15 16:58:54 +01:00
Florian Magin d9de8f0d83 Some byte fixes in hcicore 2020-02-15 16:58:19 +01:00
Florian Magin 9b8d5b0740 Move requirements to setup.py 2020-02-15 16:51:43 +01:00
Florian Magin e53edb1ec9 Some byte fixes 2020-02-15 16:51:30 +01:00
Florian Magin b72e12b5a6 Fix subtle syntax change for python3 2020-02-15 16:39:59 +01:00
Florian Magin d7b3b8e7a1 First steps of automatic Python3 conversion
Commands in order:
- `futurize ./ -w -n`

- git grep -l "queue\.Empty" | xargs sed -i 's/queue\.Empty/queue2k.Empty/'

- git grep -l "queue\.Queue" | xargs sed -i 's/queue\.Queue/queue2k.Queue/'

- git grep -l "import queue" | xargs sed -i 's/import queue/import queue as queue2k/'

The search and replaces are needed because the local variables named
`queue` break the module that is called `queue` instead of `Queue` in
Python3

The testcases all still pass with python 2 but with python3 break
completely due to pwnlib issues
2020-02-15 15:52:12 +01:00
38 changed files with 420 additions and 191 deletions
+2
View File
@@ -17,9 +17,11 @@ btsnoop.log
# xcode
xcuserdata
*.xcworkspace
macos-framework/IOBluetoothExtended.framework/
# venv
venv
venv3
# pycharm
*.idea
+1
View File
@@ -4,6 +4,7 @@
# Get receive statistics on a Nexus 5 for BLE connection events
from builtins import range
from pwn import *
from internalblue.adbcore import ADBCore
import internalblue.hci as hci
+1
View File
@@ -4,6 +4,7 @@
# Get receive statistics on a Samsung Galaxy S8 for BLE connection events
from builtins import range
from pwn import *
from internalblue.adbcore import ADBCore
import internalblue.hci as hci
+3 -1
View File
@@ -1,6 +1,8 @@
from future import standard_library
standard_library.install_aliases()
try:
from Queue import Queue
from queue import Queue
from typing import List, Optional, Any, TYPE_CHECKING, Tuple, Union, NewType, Callable, Dict
if TYPE_CHECKING:
View File
+16 -6
View File
@@ -1,13 +1,23 @@
#!/usr/bin/env python2
import struct
from time import sleep
from future import standard_library
from pwnlib import adb
from pwnlib.exception import PwnlibException
standard_library.install_aliases()
from builtins import str
import datetime
import socket
import Queue
import queue as queue2k
import random
from internalblue import hci
from internalblue.utils import bytes_to_hex
from pwn import *
from internalblue.utils.pwnlib_wrapper import log, context, u32
from .core import InternalBlue
@@ -141,7 +151,7 @@ class ADBCore(InternalBlue):
while(not self.exit_requested and len(record_hdr) < 24):
try:
recv_data = self.s_snoop.recv(24 - len(record_hdr))
log.debug("recvThreadFunc: received bt_snoop data " + recv_data.encode('hex'))
log.debug("recvThreadFunc: received bt_snoop data " + bytes_to_hex(recv_data))
if len(recv_data) == 0:
log.info("recvThreadFunc: bt_snoop socket was closed by remote site. stopping recv thread...")
self.exit_requested = True
@@ -163,7 +173,7 @@ class ADBCore(InternalBlue):
orig_len, inc_len, flags, drops, time64 = struct.unpack( ">IIIIq", record_hdr)
# Read the record data
record_data = b''
record_data = bytearray()
while(not self.exit_requested and len(record_data) < inc_len):
try:
recv_data = self.s_snoop.recv(inc_len - len(record_data))
@@ -171,7 +181,7 @@ class ADBCore(InternalBlue):
log.info("recvThreadFunc: bt_snoop socket was closed by remote site. stopping..")
self.exit_requested = True
break
record_data += recv_data
record_data += bytearray(recv_data)
except socket.timeout:
pass # this is ok. just try again without error
@@ -201,7 +211,7 @@ class ADBCore(InternalBlue):
if filter_function == None or filter_function(record):
try:
queue.put(record, block=False)
except Queue.Full:
except queue.Full:
log.warn("recvThreadFunc: A recv queue is full. dropping packets..")
# Call all callback functions inside registeredHciCallbacks and pass the
+33 -28
View File
@@ -28,7 +28,12 @@
# Software.
from pwn import *
from __future__ import print_function
import socket
import sys
from builtins import str
import internalblue.utils.pwnlib_wrapper as pwnlib
import os
import traceback
import argparse
@@ -60,7 +65,7 @@ def print_banner():
type <help> for usage information!\n\n"""
for line in banner:
term.output(text.blue(line))
pwnlib.term.output(pwnlib.text.blue(line))
def commandLoop(internalblue, init_commands=None):
cmdstack = init_commands.split(';')[::-1] if init_commands else None
@@ -70,40 +75,39 @@ def commandLoop(internalblue, init_commands=None):
if cmdstack:
cmdline = cmdstack.pop().strip()
else:
cmdline = term.readline.readline(prompt='> ').strip()
cmdline = pwnlib.term.readline.readline(prompt='> ').strip().decode('utf-8')
cmdword = cmdline.split(' ')[0].split('=')[0]
if(cmdword == ''):
continue
log.debug("Command Line: [[" + cmdword + "]] " + cmdline)
pwnlib.log.debug("Command Line: [[" + cmdword + "]] " + cmdline)
matching_cmd = cmds.findCmd(cmdword)
if matching_cmd == None:
log.warn("Command unknown: " + cmdline)
pwnlib.log.warn("Command unknown: " + cmdline)
continue
cmd_instance = matching_cmd(cmdline, internalblue)
if(not cmd_instance.work()):
log.warn("Command failed: " + str(cmd_instance))
pwnlib.log.warn("Command failed: " + str(cmd_instance))
except ValueError as e:
log.warn("commandLoop: ValueError: " + str(e))
continue
pwnlib.log.warn("commandLoop: ValueError: " + str(e))
raise
except KeyboardInterrupt:
if(cmd_instance != None):
cmd_instance.abort_cmd()
else:
log.info("Got Ctrl-C; exiting...")
pwnlib.log.info("Got Ctrl-C; exiting...")
internalblue.exit_requested = True
break
except AssertionError as e:
raise
except socket.error as e:
if e.args == (1, "Operation not permitted"):
log.critical("Received an 'Operation not permitted' socket.error, you might need root for the command '{}'".format(cmdline))
log.critical(traceback.format_exc())
pwnlib.log.critical("Received an 'Operation not permitted' socket.error, you might need root for the command '{}'".format(cmdline))
pwnlib.log.critical(traceback.format_exc())
except Exception as e:
internalblue.exit_requested = True # Make sure all threads terminate
log.critical("Uncaught exception (%s). Abort." % str(e))
pwnlib.log.critical("Uncaught exception (%s). Abort." % str(e))
print(traceback.format_exc())
break
raise
cmd_instance = None
def _parse_argv(argv):
@@ -145,8 +149,8 @@ def internalblue_cli(argv, args=None):
for cmd in cmds.getCmdList():
for keyword in cmd.keywords:
cmd_keywords.append(keyword)
readline_completer = term.completer.LongestPrefixCompleter(words=cmd_keywords)
term.readline.set_completer(readline_completer)
readline_completer = pwnlib.term.completer.LongestPrefixCompleter(words=cmd_keywords)
pwnlib.term.readline.set_completer(readline_completer)
@@ -217,18 +221,18 @@ def internalblue_cli(argv, args=None):
elif args.device:
matching_devices = [ dev for dev in devices if dev[1] == args.device]
if len(matching_devices) > 1:
log.critical("Found multiple matching devices")
pwnlib.log.critical("Found multiple matching devices")
exit(-1)
elif len(matching_devices) == 1:
log.info("Found device is: {}".format(matching_devices[0]))
pwnlib.log.info("Found device is: {}".format(matching_devices[0]))
device = matching_devices[0]
else:
log.critical("No matching devices found")
pwnlib.log.critical("No matching devices found")
exit(-1)
elif len(devices) == 1:
device = devices[0]
else:
i = options('Please specify device:', [d[2] for d in devices], 0)
i = pwnlib.options('Please specify device:', [d[2] for d in devices], 0)
device = devices[i]
# Setup device
@@ -237,28 +241,29 @@ def internalblue_cli(argv, args=None):
# Restore readline history:
if os.path.exists(reference.data_directory + "/" + HISTFILE):
readline_history = read(reference.data_directory + "/" + HISTFILE)
term.readline.history = readline_history.split('\n')
readline_history = pwnlib.read(reference.data_directory + "/" + HISTFILE)
pwnlib.term.readline.history = readline_history.split(b'\n')
# Connect to device
if not reference.connect():
log.critical("No connection to target device.")
pwnlib.log.critical("No connection to target device.")
exit(-1)
# Enter command loop (runs until user quits)
log.info("Starting commandLoop for reference {}".format(reference))
pwnlib.log.info("Starting commandLoop for reference {}".format(reference))
commandLoop(reference, init_commands=args.commands)
# shutdown connection
reference.shutdown()
# Save readline history:
f = open(reference.data_directory + "/" + HISTFILE, "w")
f.write("\n".join(term.readline.history))
f.close()
# TODO: - This causes issues, have to fix ASAP
# f = open(reference.data_directory + "/" + HISTFILE, "w")
# f.write("\n".join(term.readline.history))
# f.close()
# Cleanup
log.info("Goodbye")
pwnlib.log.info("Goodbye")
if __name__ == "__main__":
+47 -35
View File
@@ -23,7 +23,13 @@
# out of or in connection with the Software or the use or other dealings in the
# Software.
from pwn import *
from __future__ import print_function
import re
from builtins import str
from builtins import hex
from builtins import range
from builtins import object
import os
import sys
import inspect
@@ -36,10 +42,14 @@ import time
import select
import json
from pwnlib.context import context
from pwnlib.asm import disasm, asm
from pwnlib.exception import PwnlibException
from pwnlib.ui import yesno
from pwnlib.util.fiddling import isprint
from internalblue.utils.pwnlib_wrapper import log, flat, read, p8, p32, u32, p16
from internalblue.utils import bytes_to_hex
try:
from typing import List, Optional, Any, TYPE_CHECKING, Tuple, Type
@@ -81,7 +91,7 @@ def bt_addr_to_str(bt_addr):
# type: (BluetoothAddress) -> str
""" Convert a Bluetooth address (6 bytes) into a human readable format.
"""
return ":".join([b.encode("hex") for b in bt_addr])
return ":".join(format(x, '02x') for x in bytearray(bt_addr))
def parse_bt_addr(bt_addr):
# type: (Any) -> Optional[BluetoothAddress]
@@ -105,7 +115,7 @@ def parse_bt_addr(bt_addr):
return addr
class Cmd:
class Cmd(object):
""" This class is the superclass of a CLI command. Every CLI command
must be defined as subclass of Cmd. The subclass must define the
'keywords' list as member variable. The actual implementation of the
@@ -186,7 +196,7 @@ class Cmd:
dumped_sections[section.start_addr] = self.readMem(section.start_addr, section.size(), self.progress_log, bytes_done, bytes_total)
bytes_done += section.size()
self.progress_log.success("Received Data: complete")
Cmd.memory_image = fit(dumped_sections, filler='\x00')
Cmd.memory_image = flat(dumped_sections, filler='\x00')
f = open(self.memory_image_template_filename, 'wb')
f.write(Cmd.memory_image)
f.close()
@@ -284,7 +294,8 @@ class CmdLogLevel(Cmd):
log_levels = ['CRITICAL', 'DEBUG', 'ERROR', 'INFO', 'NOTSET', 'WARN', 'WARNING']
for keyword in list(keywords):
keywords.extend(['%s %s' % (keyword, log_level) for log_level in log_levels])
for log_level in log_levels:
keywords.append('%s %s' % (keyword, log_level))
parser = argparse.ArgumentParser(prog=keywords[0],
description=description,
@@ -316,7 +327,7 @@ class CmdMonitor(Cmd):
parser.add_argument("command",
help="One of: start, stop, kill")
class MonitorController:
class MonitorController(object):
instance = None
@staticmethod
@@ -326,7 +337,7 @@ class CmdMonitor(Cmd):
CmdMonitor.MonitorController.instance = CmdMonitor.MonitorController.__MonitorController(internalblue, 0xC9)
return CmdMonitor.MonitorController.instance
class __MonitorController:
class __MonitorController(object):
def __init__(self, internalblue, pcap_data_link_type):
self.internalblue = internalblue
self.running = False
@@ -546,7 +557,7 @@ class CmdDumpMem(Cmd):
bytes_total = sum([s.size() for s in self.internalblue.fw.SECTIONS if s.is_ram])
bytes_done = 0
self.progress_log = log.progress("Downloading RAM sections...")
for section in filter(lambda s: s.is_ram, self.internalblue.fw.SECTIONS):
for section in [s for s in self.internalblue.fw.SECTIONS if s.is_ram]:
filename = args.file + "_" + hex(section.start_addr)
if os.path.exists(filename):
if not (args.overwrite or yesno("Update '%s'?" % filename)):
@@ -599,10 +610,10 @@ class CmdSearchMem(Cmd):
highlight = pattern
if args.hex:
try:
pattern = pattern.decode('hex')
pattern = bytearray.fromhex(pattern)
highlight = pattern
except TypeError as e:
log.warn("Search pattern cannot be converted to hexstring: " + str(e))
log.warn("Search pattern cannot be converted to bytestring: " + str(e))
return False
elif args.address:
pattern = p32(int(pattern, 16))
@@ -651,7 +662,7 @@ class CmdHexdump(Cmd):
if dump == None:
return False
log.hexdump(dump, begin=args.address)
log.hexdump(bytes(dump), begin=args.address)
return True
class CmdTelescope(Cmd):
@@ -766,9 +777,9 @@ class CmdWriteMem(Cmd):
data = ' '.join(args.data)
if args.hex:
try:
data = data.decode('hex')
data = bytearray.fromhex(data)
except TypeError as e:
log.warn("Data string cannot be converted to hexstring: " + str(e))
log.warn("Hex string cannot be converted to bytestring: " + str(e))
return False
elif args.int:
data = p32(auto_int(data))
@@ -951,7 +962,7 @@ class CmdSendHciCmd(Cmd):
log.info("cmdcode needs to be in the range of 0x0000 - 0xffff")
return False
data = ''
data = b''
for data_part in args.data:
if data_part[0:2] == "0x":
data += p32(auto_int(data_part))
@@ -1181,16 +1192,16 @@ class CmdInfo(Cmd):
log.info(" - Remote BT name: %08X" % connection.remote_name_address)
log.info(" - Master of Conn.: %s" % str(connection.master_of_connection))
log.info(" - Conn. Handle: 0x%X" % connection.connection_handle)
log.info(" - Public RAND: %s" % connection.public_rand.encode('hex'))
#log.info(" - PIN: %s" % connection.pin.encode('hex'))
log.info(" - Public RAND: %s" % bytes_to_hex(connection.public_rand))
#log.info(" - PIN: %s" % bytes_to_hex(connection.pin)
#log.info(" - BT addr for key: %s" % bt_addr_to_str(connection.bt_addr_for_key))
log.info(" - Effective Key Len: %d byte (%d bit)" % (connection.effective_key_len, 8*connection["effective_key_len"]))
log.info(" - Link Key: %s" % connection.link_key.encode('hex'))
log.info(" - LMP Features: %s" % connection.extended_lmp_feat.encode('hex'))
log.info(" - Host Supported F: %s" % connection.host_supported_feat.encode('hex'))
log.info(" - Link Key: %s" % bytes_to_hex(connection.link_key))
log.info(" - LMP Features: %s" % bytes_to_hex(connection.extended_lmp_feat))
log.info(" - Host Supported F: %s" % bytes_to_hex(connection.host_supported_feat))
log.info(" - TX Power (dBm): %d" % connection.tx_pwr_lvl_dBm)
log.info(" - Array Index: %s" % connection.id.encode('hex'))
print
log.info(" - Array Index: %s" % bytes_to_hex(connection.id))
print()
return True
def infoDevice(self, args):
@@ -1199,14 +1210,14 @@ class CmdInfo(Cmd):
log.warn(" '%s' not in fw.py. FEATURE NOT SUPPORTED!" % const)
return False
bt_addr = self.readMem(self.internalblue.fw.BD_ADDR, 6)[::-1]
bt_addr_str = ":".join([b.encode("hex") for b in bt_addr])
bt_addr_str = bt_addr_to_str(bt_addr)
device_name = self.readMem(self.internalblue.fw.DEVICE_NAME, 258)
device_name_len = u8(device_name[0])-1
device_name_len = device_name[0]-1
device_name = device_name[2:2+device_name_len]
adb_serial = context.device
log.info("### | Device ###")
log.info(" - Name: %s" % device_name)
log.info(" - Name: %s" % device_name.decode('utf-8'))
log.info(" - ADB Serial: %s" % adb_serial)
log.info(" - Address: %s" % bt_addr_str)
return True
@@ -1228,7 +1239,7 @@ class CmdInfo(Cmd):
code = disasm(table_values[i],vma=table_addresses[i],byte=False,offset=False)
code = code.replace(" ", " ").replace("\n", "; ")
log.info("[%03d] 0x%08X: %s (%s)" % (i, table_addresses[i],
table_values[i].encode('hex'),
bytes_to_hex(table_values[i]),
code))
return True
@@ -1306,7 +1317,7 @@ class CmdInfo(Cmd):
# Print Buffer Details
buffer_size = bloc_for_details["buffer_size"] + 4
for buffer_address, buffer_hdr in bloc_for_details["buffer_headers"].iteritems():
for buffer_address, buffer_hdr in bloc_for_details["buffer_headers"].items():
progress_log.status("Dumping buffers from BLOC[%d]: 0x%06X" % (bloc_for_details["index"], buffer_address))
# Buffer in use!
if buffer_hdr == bloc_for_details["address"]:
@@ -1362,7 +1373,7 @@ class CmdInfo(Cmd):
if args.type in subcommands:
return subcommands[args.type](args.args)
else:
log.warn("Unkown type: %s\nKnown types: %s" % (args.type, subcommands.keys()))
log.warn("Unkown type: %s\nKnown types: %s" % (args.type, list(subcommands.keys())))
return False
@@ -1486,7 +1497,8 @@ class CmdCustom(Cmd):
actions = ['list', 'add', 'run', 'remove']
for keyword in list(keywords):
keywords.extend(['%s %s' % (keyword, action) for action in actions])
for action in actions:
keywords.append('%s %s' % (keyword, action))
parser = argparse.ArgumentParser(prog=keywords[0],
description=description,
@@ -1521,7 +1533,7 @@ class CmdCustom(Cmd):
return True
if args.do == 'list':
custom_cmds= ["\t%s\t\t%s\n" % (k, v) for k, v in sorted(CmdCustom.custom_commands.iteritems())]
custom_cmds= ["\t%s\t\t%s\n" % (k, v) for k, v in sorted(CmdCustom.custom_commands.items())]
log.info("Custom commands:\n%s" % ''.join(custom_cmds))
return True
@@ -1619,15 +1631,15 @@ class CmdReadAfhChannelMap(Cmd):
"""
response = self.internalblue.sendHciCommand(0x1406, p16(handle))
if len(response) < 17 or response[8:] == '\x00'*9:
if len(response) < 17 or response[8:] == b'\x00'*9:
log.info("Connection 0x%04x is not established." % handle)
return False
log.info("Connection Handle: 0x%04x" % handle)
log.info("AFH Enabled: %s" % bool(response[7] != '\x00'))
log.info("AFH Enabled: %s" % bool(response[7] != 0))
channels = ""
for c in response[8:]:
bits = format(ord(c), '08b')
bits = format(c, '08b')
for b in bits:
if b == "1":
channels = channels + " *"
+61 -40
View File
@@ -25,16 +25,38 @@
# out of or in connection with the Software or the use or other dealings in the
# Software.
from __future__ import division
import socket
import struct
from future import standard_library
import pwnlib
from pwnlib.asm import asm
from pwnlib.exception import PwnlibException
from pwnlib.util.fiddling import bits, unbits
from .utils.pwnlib_wrapper import p16, p8, u32, u16, p32, u8
standard_library.install_aliases()
from builtins import hex
from builtins import str
from builtins import range
from builtins import object
from past.utils import old_div
from abc import ABCMeta, abstractmethod
from pwn import *
from .fw.fw import Firmware
import datetime
import time
import Queue
import queue as queue2k
from . import hci
from .objects.queue_element import QueueElement
from .objects.connection_information import ConnectionInformation
from future.utils import with_metaclass
from internalblue.utils import bytes_to_hex
from internalblue.utils.pwnlib_wrapper import log, context, flat
try:
from typing import List, Optional, Any, TYPE_CHECKING, Tuple, Union, NewType, Callable
@@ -50,9 +72,7 @@ except:
#import logging
#log = logging.getLogger(__name__)
class InternalBlue:
__metaclass__ = ABCMeta
class InternalBlue(with_metaclass(ABCMeta, object)):
def __init__(self, queue_size=1000, btsnooplog_filename='btsnoop.log', log_level='info', fix_binutils='True', data_directory=".", replay=False):
# type: (int, str, str, bool, str, bool) -> None
context.log_level = log_level
@@ -94,7 +114,7 @@ class InternalBlue:
# firmware (the response is recognized with the help of the filter function).
# Once the response arrived, it puts the response into the response_queue from
# the tuple. See sendH4() and sendHciCommand().
self.sendQueue = Queue.Queue(queue_size) # type: Queue.Queue[Task]
self.sendQueue = queue2k.Queue(queue_size) # type: Queue.Queue[Task]
self.recvThread = None # The thread which is responsible for the HCI snoop socket
self.sendThread = None # The thread which is responsible for the HCI inject socket
@@ -225,7 +245,7 @@ class InternalBlue:
# Wait for 'send task' in send queue
try:
task = self.sendQueue.get(timeout=0.5)
except Queue.Empty:
except queue2k.Empty:
continue
# Extract the components of the task
@@ -277,12 +297,12 @@ class InternalBlue:
# if the caller expects a response: register a queue to receive the response
if queue != None and filter_function != None:
recvQueue = Queue.Queue(1)
recvQueue = queue2k.Queue(1)
self.registerHciRecvQueue(recvQueue, filter_function)
# Send command to the chip using s_inject socket
try:
log.debug("_sendThreadFunc: Send: " + str(out.encode('hex')))
log.debug("_sendThreadFunc: Send: " + bytes_to_hex(out))
self.s_inject.send(out)
except socket.error:
pass
@@ -299,7 +319,7 @@ class InternalBlue:
record = recvQueue.get(timeout=2)
hcipkt = record[0]
data = hcipkt.data
except Queue.Empty:
except queue2k.Empty:
log.warn("_sendThreadFunc: No response from the firmware.")
data = None
self.unregisterHciRecvQueue(recvQueue)
@@ -363,7 +383,7 @@ class InternalBlue:
# Check if this was the last packet
if len(self.tracepoint_memdump_parts) == self.fw.TRACEPOINT_RAM_DUMP_PKT_COUNT:
dump = fit(self.tracepoint_memdump_parts)
dump = flat(self.tracepoint_memdump_parts)
#TODO: use this to start qemu
filename = self.data_directory + "/" + "internalblue_tracepoint_0x%x_%s.bin" % (self.tracepoint_memdump_address, datetime.datetime.now().strftime("%Y-%m-%d_%H-%M-%S"))
log.info("Captured Ram Dump for Tracepoint 0x%x to %s" % (self.tracepoint_memdump_address, filename))
@@ -560,7 +580,7 @@ class InternalBlue:
"""
# send Read_Local_Version_Information
version = self.sendHciCommand(0x1001, '')
version = self.sendHciCommand(0x1001, ''.encode('utf-8'))
if not version or len(version) < 11:
log.warn("""initialize_fimware: Failed to send a HCI command to the Bluetooth driver.
@@ -570,12 +590,12 @@ class InternalBlue:
return False
# Broadcom uses 0x000f as vendor ID, Cypress 0x0131
vendor = (u8(version[9]) << 8) + u8(version[8])
vendor = (version[9] << 8) + version[8]
if vendor != 0xf and vendor != 0x131:
log.critical("Not running on a Broadcom or Cypress chip!")
return False
else:
subversion = (u8(version[11]) << 8) + u8(version[10])
subversion = (version[11] << 8) + version[10]
iOS = False
if self.__class__.__name__ == "iOSCore":
@@ -690,7 +710,7 @@ class InternalBlue:
log.warn("registerHciRecvQueue: no such queue is registered!")
def sendHciCommand(self, opcode, data, timeout=3):
# type: (Opcode, bytes, int) -> Optional[Any]
# type: (Opcode, bytes, int) -> Optional[bytearray]
"""
Send an arbitrary HCI command packet by pushing a send-task into the
sendQueue. This function blocks until the response is received
@@ -702,7 +722,7 @@ class InternalBlue:
# return this instead of the Command Complete Event (which will
# follow later and will be ignored). This should be fixed..
queue = Queue.Queue(1)
queue = queue2k.Queue(1)
# standard HCI command structure
payload = p16(opcode) + p8(len(data)) + data
@@ -732,14 +752,14 @@ class InternalBlue:
timeout=timeout)
ret = queue.get(timeout=timeout)
return ret
except Queue.Empty:
except queue2k.Empty:
log.warn("sendHciCommand: waiting for response timed out!")
# If there was no response because the Trace Replay Hook throw an assert it will be in this attribute.
# Raise this so the main thread doesn't ignore this and it will be caught by any testing framework
if hasattr(self, 'test_failed'):
raise self.test_failed
return None
except Queue.Full:
except queue2k.Full:
log.warn("sendHciCommand: send queue is full!")
return None
@@ -756,7 +776,7 @@ class InternalBlue:
try:
self.sendQueue.put((h4type, data, None, None), timeout=timeout)
return True
except Queue.Full:
except queue2k.Full:
log.warn("sendH4: send queue is full!")
return False
@@ -782,7 +802,7 @@ class InternalBlue:
try:
return self.recvQueue.get(timeout=timeout)
except Queue.Empty:
except queue2k.Empty:
return None
def readMem(self, address, length, progress_log=None, bytes_done=0, bytes_total=0):
@@ -805,7 +825,7 @@ class InternalBlue:
read_addr = address # read_addr is the address of the next Read_RAM HCI command
byte_counter = 0 # tracks the number of received bytes
outbuffer = '' # buffer which stores all accumulated data read from the chip
outbuffer = bytearray() # buffer which stores all accumulated data read from the chip
if bytes_total == 0: # If no total bytes where given just use length
bytes_total = length
retry = 3 # Retry on failures
@@ -840,7 +860,7 @@ class InternalBlue:
log.debug("readMem: insufficient bytes returned, retrying...")
continue
status = ord(response[3])
status = response[3]
if status != 0:
# It is not yet reverse engineered what this byte means. For almost
# all memory addresses it will be 0. But for some it will be different,
@@ -856,7 +876,7 @@ class InternalBlue:
response_check = self.sendHciCommand(0xfc4d, p32(read_addr) + p8(blocksize))
if response != response_check:
log.debug("readMem: double checking response failed at 0x%x! retry..." % read_addr)
sleep(0.3)
time.sleep(0.3)
retry = retry - 1
continue
@@ -865,7 +885,7 @@ class InternalBlue:
byte_counter += len(data)
if(progress_log != None):
msg = "receiving data... %d / %d Bytes (%d%%)" % (bytes_done+byte_counter,
bytes_total, (bytes_done+byte_counter)*100/bytes_total)
bytes_total, old_div((bytes_done+byte_counter)*100,bytes_total))
progress_log.status(msg)
retry = 3 # this round worked, so we re-enable retries
return outbuffer
@@ -907,7 +927,7 @@ class InternalBlue:
log.warn("readMemAligned: address (0x%x) must be 4-byte aligned!" % address)
return None
recvQueue = Queue.Queue(1)
recvQueue = queue2k.Queue(1)
def hciFilterFunction(record):
# type: (Record) -> bool
hcipkt = record[0]
@@ -933,7 +953,7 @@ class InternalBlue:
blocksize = 244
# Customize the assembler snippet with the current read_addr and blocksize
code = asm(self.fw.READ_MEM_ALIGNED_ASM_SNIPPET % (blocksize, read_addr, blocksize/4), vma=self.fw.READ_MEM_ALIGNED_ASM_LOCATION, arch='thumb')
code = asm(self.fw.READ_MEM_ALIGNED_ASM_SNIPPET % (blocksize, read_addr, old_div(blocksize,4)), vma=self.fw.READ_MEM_ALIGNED_ASM_LOCATION, arch='thumb')
# Write snippet to the RAM (TODO: maybe backup and restore content of this area?)
self.writeMem(self.fw.READ_MEM_ALIGNED_ASM_LOCATION, code)
@@ -950,7 +970,7 @@ class InternalBlue:
# wait for the custom HCI event sent by the snippet:
try:
record = recvQueue.get(timeout=1)
except Queue.Empty:
except queue2k.Empty:
log.warn("readMemAligned: No response from assembler snippet.")
return None
@@ -961,7 +981,7 @@ class InternalBlue:
byte_counter += len(data)
if progress_log is not None:
msg = "receiving data... %d / %d Bytes (%d%%)" % (bytes_done+byte_counter,
bytes_total, (bytes_done+byte_counter)*100/bytes_total)
bytes_total, old_div((bytes_done+byte_counter)*100,bytes_total))
progress_log.status(msg)
self.unregisterHciRecvQueue(recvQueue)
@@ -1000,8 +1020,8 @@ class InternalBlue:
if(response == None):
log.warn("writeMem: Timeout while reading response, probably need to wait longer.")
return False
elif (response[3] != '\x00'):
log.warn("writeMem: Got error code %s in command complete event." % response[3].encode('hex'))
elif (response[3] != 0):
log.warn("writeMem: Got error code %s in command complete event." % bytes_to_hex(response[3]))
return False
write_addr += blocksize
byte_counter += blocksize
@@ -1024,8 +1044,9 @@ class InternalBlue:
log.warn("Empty HCI response during launchRam, driver crashed due to invalid code or destination")
return False
if response[3] != '\x00':
log.warn("Got error code %x in command complete event." % u8(response[3]))
error_code = response[3]
if error_code != 0:
log.warn("Got error code %x in command complete event." % error_code)
return False
# Nexus 6P Bugfix
@@ -1036,7 +1057,7 @@ class InternalBlue:
return True
def getPatchramState(self):
# type: () -> Tuple[List[Optional[int]], List[Any], List[Any]]
# type: () -> Union[bool, Tuple[List[Optional[int]], List[Union[Union[int, bytes, None], Any]], list]]
"""
Retrieves the current state of the patchram unit. The return value
is a tuple containing 3 lists which are indexed by the slot number:
@@ -1056,10 +1077,10 @@ class InternalBlue:
# On Nexus 5, ReadMemAligned is required, while Nexus 6P supports this memory area with ReadRAM
if self.fw.PATCHRAM_ALIGNED:
slot_dump = self.readMemAligned(self.fw.PATCHRAM_ENABLED_BITMAP_ADDRESS, slot_count/4)
slot_dump = self.readMemAligned(self.fw.PATCHRAM_ENABLED_BITMAP_ADDRESS, old_div(slot_count,4))
table_addr_dump = self.readMemAligned(self.fw.PATCHRAM_TARGET_TABLE_ADDRESS, slot_count*4)
else:
slot_dump = self.readMem(self.fw.PATCHRAM_ENABLED_BITMAP_ADDRESS, slot_count/4)
slot_dump = self.readMem(self.fw.PATCHRAM_ENABLED_BITMAP_ADDRESS, old_div(slot_count,4))
table_addr_dump = self.readMem(self.fw.PATCHRAM_TARGET_TABLE_ADDRESS, slot_count*4)
table_val_dump = self.readMem(self.fw.PATCHRAM_VALUE_TABLE_ADDRESS, slot_count*4)
@@ -1067,11 +1088,11 @@ class InternalBlue:
table_values = []
slot_dwords = []
slot_bits = []
for dword in range(slot_count/32):
for dword in range(old_div(slot_count,32)):
slot_dwords.append(slot_dump[dword*32:(dword+1)*32])
for dword in slot_dwords:
slot_bits.extend(bits(dword[::-1])[::-1])
slot_bits.extend(bits(bytes(dword[::-1]))[::-1])
for i in range(slot_count):
if slot_bits[i]:
table_addresses.append(u32(table_addr_dump[i*4:i*4+4])<<2)
@@ -1157,7 +1178,7 @@ class InternalBlue:
# Enable patchram slot (enable bitfield starts at 0x310204)
# (We need to enable the slot by setting a bit in a multi-dword bitfield)
target_dword = int(slot / 32)
target_dword = int(old_div(slot, 32))
table_slots[slot] = 1
slot_dword = unbits(table_slots[target_dword*32:(target_dword+1)*32][::-1])[::-1]
self.writeMem(self.fw.PATCHRAM_ENABLED_BITMAP_ADDRESS + target_dword*4, slot_dword)
@@ -1196,7 +1217,7 @@ class InternalBlue:
# Disable patchram slot (enable bitfield starts at 0x310204)
# (We need to disable the slot by clearing a bit in a multi-dword bitfield)
target_dword = int(slot / 32)
target_dword = int(old_div(slot, 32))
table_slots[slot] = 0
slot_dword = unbits(table_slots[target_dword*32:(target_dword+1)*32][::-1])[::-1]
self.writeMem(self.fw.PATCHRAM_ENABLED_BITMAP_ADDRESS + target_dword*4, slot_dword)
@@ -1715,7 +1736,7 @@ class InternalBlue:
"""
if not self.serial:
self.sendH4(hci.HCI.BCM_DIAG, '\xf0' + p8(enable))
self.sendH4(hci.HCI.BCM_DIAG, b'\xf0' + b'\x01' if enable else b'\x00')
# We can send the activation to the serial, but then the Android driver
# itself crashes when receiving diagnostic frames...
+4 -2
View File
@@ -23,10 +23,12 @@
# out of or in connection with the Software or the use or other dealings in the
# Software.
from builtins import hex
from builtins import object
from pwn import log
class Firmware:
class Firmware(object):
def __init__(self, version=None, iOS=False):
"""
Load and initialize the actual firmware add-ons for Nexus 5, Raspi3, etc.
@@ -60,7 +62,7 @@ class Firmware:
log.info("Loaded firmware information for " + self.firmware.FW_NAME + ".")
class MemorySection:
class MemorySection(object):
"""
All firmwares have memory sections that can be RAM, ROM or neither of both.
"""
+2 -1
View File
@@ -1,3 +1,4 @@
from __future__ import absolute_import
# fw_0x420e.py
#
# Generic firmware file in case we do not know something...
@@ -20,7 +21,7 @@
# out of or in connection with the Software or the use or other dealings in the
# Software.
from fw import MemorySection
from .fw import MemorySection
# Firmware Infos
# Samsung S10/S10e/S10+
+2 -1
View File
@@ -27,6 +27,7 @@
# Firmware Infos
# This runs on Rasperry Pi 3
from builtins import object
FW_NAME = "BCM43430A1"
# Device Infos
@@ -34,7 +35,7 @@ DEVICE_NAME = 0x20401C
BD_ADDR = 0x201C64
# Memory Sections
class MemorySection:
class MemorySection(object):
def __init__(self, start_addr, end_addr, is_rom, is_ram):
self.start_addr = start_addr
self.end_addr = end_addr
+2 -1
View File
@@ -1,3 +1,4 @@
from __future__ import absolute_import
# fw_0x420e.py
#
# Generic firmware file in case we do not know something...
@@ -20,7 +21,7 @@
# out of or in connection with the Software or the use or other dealings in the
# Software.
from fw import MemorySection
from .fw import MemorySection
# Firmware Infos
# Evaluation Kit CYW20706
+2 -1
View File
@@ -1,3 +1,4 @@
from __future__ import absolute_import
# fw_0x420e.py
#
# Generic firmware file in case we do not know something...
@@ -20,7 +21,7 @@
# out of or in connection with the Software or the use or other dealings in the
# Software.
from fw import MemorySection
from .fw import MemorySection
# Firmware Infos
# Evaluation Kit CYW920819
+2 -1
View File
@@ -23,7 +23,8 @@
# out of or in connection with the Software or the use or other dealings in the
# Software.
from fw import MemorySection
from __future__ import absolute_import
from .fw import MemorySection
# Firmware Infos
FW_NAME = "BCM20702A1 (USB Bluetooth dongle)"
+2 -1
View File
@@ -22,7 +22,8 @@
# out of or in connection with the Software or the use or other dealings in the
# Software.
from fw import MemorySection
from __future__ import absolute_import
from .fw import MemorySection
# Firmware Infos
FW_NAME = "BCM20703A2 (MacBook Pro 2016)"
+2 -1
View File
@@ -25,7 +25,8 @@
# out of or in connection with the Software or the use or other dealings in the
# Software.
from fw import MemorySection
from __future__ import absolute_import
from .fw import MemorySection
# Firmware Infos
# This runs on Nexus 6P, Samsung Galaxy S6, Samsung Galaxy S6 edge
+2 -1
View File
@@ -27,13 +27,14 @@
# Firmware Infos
# This runs on Rasperry Pi 3+
from builtins import object
FW_NAME = "BCM4345C0"
# Device Infos
DEVICE_NAME = 0x204954
# Memory Sections
class MemorySection:
class MemorySection(object):
def __init__(self, start_addr, end_addr, is_rom, is_ram):
self.start_addr = start_addr
self.end_addr = end_addr
+17 -10
View File
@@ -25,7 +25,14 @@
# out of or in connection with the Software or the use or other dealings in the
# Software.
from pwn import *
from __future__ import absolute_import
from builtins import hex
from builtins import range
from builtins import object
from internalblue.utils.pwnlib_wrapper import p8, u16, p16, unbits, bits_str, u8, bits, p32, u32
from internalblue.utils.pwnlib_wrapper import log
from pwnlib.util.packing import flat
HCI_UART_TYPE_CLASS = {}
@@ -52,7 +59,7 @@ class HCI(object):
@staticmethod
def from_data(data):
uart_type = ord(data[0])
uart_type = data[0]
return HCI_UART_TYPE_CLASS[uart_type].from_data(data[1:])
def __init__(self, uart_type):
@@ -573,7 +580,7 @@ class HCI_Cmd(HCI):
0xffed : "COMND VSC_EnterDownloadMode"
}
HCI_CMD_STR_REVERSE = {v: k for k, v in HCI_CMD_STR.iteritems()}
HCI_CMD_STR_REVERSE = {v: k for k, v in HCI_CMD_STR.items()}
@staticmethod
def cmd_name(opcode):
@@ -607,7 +614,7 @@ class HCI_Cmd(HCI):
@staticmethod
def from_data(data):
return HCI_Cmd(u16(data[0:2]), ord(data[2]), data[3:])
return HCI_Cmd(u16(data[0:2]), data[2], data[3:])
def __init__(self, opcode, length, data):
HCI.__init__(self, HCI.HCI_CMD)
@@ -623,7 +630,7 @@ class HCI_Cmd(HCI):
cmdname = "unknown"
if self.opcode in self.HCI_CMD_STR:
cmdname = self.HCI_CMD_STR[self.opcode]
return parent + "<0x%04x %s (len=%d): %s>" % (self.opcode, cmdname, self.length, self.data[0:16].encode('hex'))
return parent + "<0x%04x %s (len=%d): %s>" % (self.opcode, cmdname, self.length, ''.join(format(x, '02x') for x in self.data[0:16]))
class HCI_Acl(HCI):
@@ -698,7 +705,7 @@ class HCI_Diag(HCI):
cmdname = "unknown"
if self.opcode in self.BCM_DIAG_STR:
cmdname = self.BCM_DIAG_STR[self.opcode]
return parent + "<0x%02x %s: %s>" % (self.opcode, cmdname, self.data[0:16].encode('hex'))
return parent + "<0x%02x %s: %s>" % (self.opcode, cmdname, ''.join(format(x, '02x') for x in self.data[0:16]))
class HCI_Event(HCI):
@@ -887,7 +894,7 @@ class HCI_Event(HCI):
@staticmethod
def from_data(data):
return HCI_Event(ord(data[0]), ord(data[1]), data[2:])
return HCI_Event(data[0], data[1], data[2:])
def __init__(self, event_code, length, data):
HCI.__init__(self, HCI.HCI_EVT)
@@ -903,7 +910,7 @@ class HCI_Event(HCI):
eventname = "unknown"
if self.event_code in self.HCI_EVENT_STR:
eventname = self.HCI_EVENT_STR[self.event_code]
return parent + "<0x%02x %s (len=%d): %s>" % (self.event_code, eventname, self.length, self.data[0:].encode('hex'))
return parent + "<0x%02x %s (len=%d): %s>" % (self.event_code, eventname, self.length, ''.join(format(x, '02x') for x in self.data[0:]))
HCI_UART_TYPE_CLASS = {
HCI.HCI_CMD : HCI_Cmd,
@@ -917,7 +924,7 @@ def parse_hci_packet(data):
return HCI.from_data(data)
class StackDumpReceiver:
class StackDumpReceiver(object):
memdump_addr = None
memdumps = {}
stack_dump_has_happend = False
@@ -961,7 +968,7 @@ class StackDumpReceiver:
log.debug("Stack dump handling addr %08x", addr-self.memdump_addr)
def finishStackDump(self):
dump = fit(self.memdumps)
dump = flat(self.memdumps)
log.warn("Stack dump @0x%08x written to %s!" % (self.memdump_addr, self.stack_dump_filename))
f = open(self.stack_dump_filename, "wb")
f.write(dump)
+22 -11
View File
@@ -1,12 +1,22 @@
#!/usr/bin/env python2
from __future__ import absolute_import
import socket
import struct
from future import standard_library
standard_library.install_aliases()
from builtins import str
from builtins import zip
from builtins import range
import subprocess
import datetime
from pwn import *
from internalblue.utils.pwnlib_wrapper import log, context, p32, u16, p16, u32
import fcntl
from .core import InternalBlue
import hci
import Queue
from . import hci
import queue as queue2k
import threading
try:
@@ -65,7 +75,7 @@ class HCICore(InternalBlue):
# Do ioctl(s,HCIGETDEVLIST,arg) to get the number of available devices:
# arg is struct hci_dev_list_req (/usr/include/bluetooth/hci.h)
arg = p32(16) # dl->dev_num = HCI_MAX_DEV which is 16 (little endian)
arg += "\x00"*(8*16)
arg += b"\x00"*(8*16)
devices_raw = fcntl.ioctl(s.fileno(), HCIGETDEVLIST, arg)
num_devices = u16(devices_raw[:2])
log.debug("Found %d HCI devices via ioctl(HCIGETDEVLIST)!" % num_devices)
@@ -76,10 +86,10 @@ class HCICore(InternalBlue):
dev_id = u16(devices_raw[dev_struct_start:dev_struct_start+2])
# arg is struct hci_dev_info (/usr/include/bluetooth/hci.h)
arg = p16(dev_id) # di->dev_id = <device_id>
arg += "\x00"*20 # Enough space for name, bdaddr and flags
dev_info_raw = fcntl.ioctl(s.fileno(), HCIGETDEVINFO, arg)
dev_name = dev_info_raw[2:10].replace("\x00","")
dev_bdaddr = ":".join(["%02X" % ord(x) for x in dev_info_raw[10:16][::-1]])
arg += b"\x00"*20 # Enough space for name, bdaddr and flags
dev_info_raw = bytearray(fcntl.ioctl(s.fileno(), HCIGETDEVINFO, arg))
dev_name = dev_info_raw[2:10].replace(b"\x00",b"").decode()
dev_bdaddr = ":".join(["%02X" % x for x in dev_info_raw[10:16][::-1]])
dev_flags = u32(dev_info_raw[16:20])
if dev_flags == 0:
dev_flags_str = "DOWN"
@@ -192,6 +202,7 @@ class HCICore(InternalBlue):
# Read the record data
try:
record_data = self.s_snoop.recv(1024)
record_data = bytearray(record_data)
except socket.timeout:
continue # this is ok. just try again without error
except Exception as e:
@@ -227,7 +238,7 @@ class HCICore(InternalBlue):
if filter_function == None or filter_function(record):
try:
queue.put(record, block=False)
except Queue.Full:
except queue.Full:
log.warn("recvThreadFunc: A recv queue is full. dropping packets..")
# Call all callback functions inside registeredHciCallbacks and pass the
@@ -278,7 +289,7 @@ class HCICore(InternalBlue):
"""
# TODO still seems to only forward incoming events?!
self.s_snoop.setsockopt(socket.SOL_HCI, socket.HCI_FILTER,
'\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\x00\x00\x00\x00') #type mask, event mask, event mask, opcode
b'\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\x00\x00\x00\x00') #type mask, event mask, event mask, opcode
interface_num = device["dev_id"]
log.debug("Socket interface number: %s" % (interface_num))
@@ -292,7 +303,7 @@ class HCICore(InternalBlue):
# Write Header to btsnoop file (if file is still empty):
if self.write_btsnooplog and self.btsnooplog_file.tell() == 0:
# BT Snoop Header: btsnoop\x00, version: 1, data link type: 1002
btsnoop_hdr = "btsnoop\x00" + p32(1,endian="big") + p32(1002,endian="big")
btsnoop_hdr = b"btsnoop\x00" + p32(1,endian="big") + p32(1002,endian="big")
with self.btsnooplog_file_lock:
self.btsnooplog_file.write(btsnoop_hdr)
self.btsnooplog_file.flush()
+15 -8
View File
@@ -1,12 +1,19 @@
#!/usr/bin/env python2
from __future__ import absolute_import
import struct
from future import standard_library
standard_library.install_aliases()
from builtins import str
import socket
import Queue
import hci
import queue as queue2k
from . import hci
from pwn import *
from internalblue.utils.pwnlib_wrapper import log, context
from core import InternalBlue
from .core import InternalBlue
class iOSCore(InternalBlue):
@@ -49,16 +56,16 @@ class iOSCore(InternalBlue):
the command or None if no response was received within the timeout.
"""
queue = Queue.Queue(1)
queue = queue2k.Queue(1)
try:
self.sendQueue.put((h4type, data, queue, None), timeout=timeout)
ret = queue.get(timeout=timeout)
return ret
except Queue.Empty:
except queue2k.Empty:
log.warn("sendH4: waiting for response timed out!")
return None
except Queue.Full:
except queue.Full:
log.warn("sendH4: send queue is full!")
return None
@@ -173,7 +180,7 @@ class iOSCore(InternalBlue):
for queue, filter_function in self.registeredHciRecvQueues: # TODO filter_function not working with bluez modifications
try:
queue.put(record, block=False)
except Queue.Full:
except queue.Full:
log.warn("recvThreadFunc: A recv queue is full. dropping packets..")
# Call all callback functions inside registeredHciCallbacks and pass the
+24 -12
View File
@@ -1,17 +1,27 @@
#!/usr/bin/env python2
from __future__ import absolute_import
import random
import time
from future import standard_library
standard_library.install_aliases()
from builtins import str
import socket
import Queue
import hci
import queue as queue2k
from . import hci
from pwn import *
from core import InternalBlue
from internalblue.utils.pwnlib_wrapper import log, context, p8
from .core import InternalBlue
import binascii
import os
filepath = os.path.dirname(os.path.abspath(__file__))
IOBE = None
class macOSCore(InternalBlue):
def __init__(self, queue_size=1000, btsnooplog_filename='btsnoop.log', log_level='info', fix_binutils='True', data_directory=".", replay=False):
@@ -89,7 +99,7 @@ class macOSCore(InternalBlue):
# read record data
try:
data, addr = self.s_snoop.recvfrom(1024)
record_data = data
record_data = bytearray(data)
except socket.timeout:
continue # this is ok. just try again without error
@@ -103,7 +113,7 @@ class macOSCore(InternalBlue):
for queue, filter_function in self.registeredHciRecvQueues: # TODO filter_function not working with bluez modifications
try:
queue.put(record, block=False)
except Queue.Full:
except queue.Full:
log.warn("recvThreadFunc: A recv queue is full. dropping packets..>" + record_data)
# Call all callback functions inside registeredHciCallbacks and pass the
@@ -122,7 +132,7 @@ class macOSCore(InternalBlue):
# Wait for 'send task' in send queue
try:
task = self.sendQueue.get(timeout=0.5)
except Queue.Empty:
except queue2k.Empty:
continue
# Extract the components of the task
@@ -133,8 +143,10 @@ class macOSCore(InternalBlue):
# Send command to the chip using IOBluetoothExtended framework
h4type, data, queue, filter_function = task
opcode = binascii.hexlify(data[1]) + binascii.hexlify(data[0])
log.debug("Sending command: 0x" + binascii.hexlify(data) + ", opcode: " + opcode)
data = bytearray(data)
opcode = format(data[1], '02x') + format(data[0], '02x')
log.debug("Sending command: 0x" + ''.join(format(x, '02x') for x in data) + ", opcode: " + opcode)
if not(h4type == 0x01 or h4type == 0x02):
log.warn("H4 Type {0} not supported by macOS Core!".format(str(h4type)))
@@ -144,7 +156,7 @@ class macOSCore(InternalBlue):
# if the caller expects a response: register a queue to receive the response
if queue is not None and filter_function is not None:
recvQueue = Queue.Queue(1)
recvQueue = queue2k.Queue(1)
self.registerHciRecvQueue(recvQueue, filter_function)
# Sending command
@@ -157,7 +169,7 @@ class macOSCore(InternalBlue):
record = recvQueue.get(timeout=10)
hcipkt = record[0]
data = hcipkt.data
except Queue.Empty:
except queue2k.Empty:
log.warn("_sendThreadFunc: No response from the firmware.")
data = None
self.unregisterHciRecvQueue(recvQueue)
@@ -182,5 +194,5 @@ class macOSCore(InternalBlue):
def shutdown(self):
if not self.replay:
self.iobe.shutdown()
self.s_inject.sendto("", ('127.0.0.1', self.s_snoop.getsockname()[1]))
self.s_inject.sendto(b'', ('127.0.0.1', self.s_snoop.getsockname()[1]))
super(macOSCore, self).shutdown()
@@ -1,7 +1,8 @@
from pwnlib.util.packing import u32, u16, u8
from builtins import object
from internalblue.utils.pwnlib_wrapper import u32, u16, u8
class ConnectionInformation:
class ConnectionInformation(object):
connection_handle = 0
connection_number = 0
master_of_connection = False
+2 -1
View File
@@ -1,4 +1,5 @@
class QueueElement:
from builtins import object
class QueueElement(object):
index = 0
next_item = 0
prev = 0
+7 -3
View File
@@ -1,3 +1,5 @@
from __future__ import print_function
from builtins import object
import binascii
import time
@@ -8,7 +10,7 @@ except ImportError:
pass
class SocketRecvHook():
class SocketRecvHook(object):
def __init__(self, socket):
# type: (socket.socket) -> None
self.snoop_socket = socket
@@ -43,7 +45,7 @@ class SocketRecvHook():
self.recvfrom_hook(data, addr)
return data, addr
class SocketInjectHook():
class SocketInjectHook(object):
def __init__(self, socket, core):
# type: (socket.socket, InternalBlue) -> None
self.inject_socket = socket
@@ -114,7 +116,7 @@ class SocketDuplexHook(SocketInjectHook, SocketRecvHook):
pass
class HookBase():
class HookBase(object):
def send_hook(self, data):
raise NotImplementedError
@@ -254,6 +256,8 @@ class ReplaySocket(SocketDuplexHook):
def getsockname(self):
return (None, 0)
def close(self):
assert self.index + 1 == len(self.log)
from internalblue.core import InternalBlue
+21 -9
View File
@@ -1,12 +1,24 @@
#!/usr/bin/env python2
from __future__ import print_function
from __future__ import absolute_import
import os
import random
import time
from future import standard_library
from pwnlib.context import context
standard_library.install_aliases()
import socket
import Queue
import hci
import queue as queue2k
from . import hci
from pwn import *
from internalblue.utils.pwnlib_wrapper import log, p8
from core import InternalBlue
from .core import InternalBlue
import binascii
filepath = os.path.dirname(os.path.abspath(__file__))
@@ -55,16 +67,16 @@ class testCore(InternalBlue):
the command or None if no response was received within the timeout.
"""
queue = Queue.Queue(1)
queue = queue2k.Queue(1)
try:
self.sendQueue.put((h4type, data, queue, None), timeout=timeout)
ret = queue.get(timeout=timeout)
return ret
except Queue.Empty:
except queue2k.Empty:
log.warn("sendH4: waiting for response timed out!")
return None
except Queue.Full:
except queue.Full:
log.warn("sendH4: send queue is full!")
return None
@@ -105,7 +117,7 @@ class testCore(InternalBlue):
# Wait for 'send task' in send queue
try:
task = self.sendQueue.get(timeout=0.5)
except Queue.Empty:
except queue2k.Empty:
continue
# Extract the components of the task
@@ -121,7 +133,7 @@ class testCore(InternalBlue):
# if the caller expects a response: register a queue to receive the response
if queue is not None and filter_function is not None:
recvQueue = Queue.Queue(1)
recvQueue = queue2k.Queue(1)
self.registerHciRecvQueue(recvQueue, filter_function)
# if the caller expects a response:
+11
View File
@@ -0,0 +1,11 @@
#from pwnlib.util.packing import *
def bytes_to_hex(bytes):
# type: (bytearray) -> str
return ''.join(format(x, '02x') for x in bytearray(bytes))
+91
View File
@@ -0,0 +1,91 @@
"""
The following proxies various utilities from pwnlibs by explicitly importing them
To replace a "from pwn import *" remove it and let your IDE highlight all missing methods (Hint: F2 in PyCharm goes to next error)
import the missing (and only the missing!) methods from this module, e.g. with "from internalblue.utils import term, read, log, text, options"
In some cases like "from pwn import socket" this just imports another module.
Use an IPython shell to run "from pwn import *" and check where some method/module actually comes from and either import it directly or add it to this module
"""
# Imports that used to be imported via 'from pwn import *'
import pwnlib
from pwnlib import term
from pwnlib.util import iters
from pwnlib.util.misc import read
from pwnlib.context import context
#TODO: Logging via pwnlib doesn't work yet, so for now it is still used via pwn
# import pwnlib.log
# pwnlib.log.install_default_handler()
# log = pwnlib.log.getLogger('internalbue')
from pwn import log
from pwnlib.term import text
from pwnlib.ui import options, yesno
from pwnlib.util.packing import flat
from pwnlib.asm import disasm
from pwnlib.util.fiddling import isprint, unbits, bits_str, bits
"""
The packers like u8 are generated in a fairly convoluted way that breaks IDE introspection.
The following code remedies this by:
- Explicitly defining a stub function with type annotations
- Generating all the packers like pwnlibs would
- Only if if the current module already has the name of the packer as an attribute (i.e. has a stub function defined) it will be replaced with the pwnlibs version
This means:
- All import issues in the rest of the code are genuine as the imports are only available if an explicit stub function is added
- The functions can be easily replaced by just implementing them and removing the for loop at the end
"""
# Imports needed for this hack
from pwnlib.util.packing import ops, sizes, make_multi
import sys
try:
from typing import Union, Optional, Literal
endianess = Union[Literal['big']]
except ImportError:
pass
mod = sys.modules[__name__]
_DEFINES = ['u8', 'p8', 'u32', 'u16', 'p32']
def u8(data, endian = None):
# type: (bytes, Optional[endianess]) -> int
pass
def p8(number, endian = None):
# type: (int, Optional[endianess]) -> bytes
pass
def u16(data, endian = None):
# type: (bytes, Optional[endianess]) -> int
pass
def p16(number, endian = None):
# type: (int, Optional[endianess]) -> bytes
pass
def u32(data, endian = None):
# type: (bytes, Optional[endianess]) -> int
pass
def p32(number, endian = None):
# type: (int, Optional[endianess]) -> bytes
pass
for op, size in iters.product(ops, sizes):
name, routine = make_multi(op, size)
if hasattr(mod, name):
setattr(mod, name, routine)
-2
View File
@@ -1,2 +0,0 @@
pwntools==3.12.2
pyelftools==0.24
+4 -2
View File
@@ -11,10 +11,12 @@ setup(name='internalblue',
license='MIT',
packages=['internalblue', 'internalblue/fw'],
install_requires=[
'pwntools',
'pwntools>=4.2.0.dev0',
'pyelftools',
],
extras_require={
"macoscore": ["pyobjc"],
"macoscore": ["pyobjc"],
"ipython": ["IPython"]
},
entry_points = {
'console_scripts': ['internalblue=internalblue.cli:internalblue_cli']
+1
View File
@@ -1,3 +1,4 @@
from __future__ import print_function
from internalblue.cli import _parse_argv
from internalblue.hcicore import HCICore
+1
View File
@@ -1,3 +1,4 @@
from __future__ import print_function
from internalblue.cli import _parse_argv
from internalblue.adbcore import ADBCore
+6 -5
View File
@@ -1,3 +1,4 @@
from __future__ import print_function
from internalblue.cli import _parse_argv
from internalblue.adbcore import ADBCore
from internalblue.objects.connection_information import ConnectionInformation
@@ -13,11 +14,11 @@ except ImportError:
def test_info_conn_7():
dummy = ConnectionInformation(7, '0023023a1a2e'.decode('hex'), 0, True, 0xc,
'e98a5eaaff39ecb5ce4447590dfb73a4'.decode('hex'), 16,
'dbea2d9c47bc1aa6afe664ff31591aa6'.decode('hex'), -87,
'0a00c821ffff8ffa'.decode('hex'), '9bff598701000000'.decode('hex'),
'00'.decode('hex'))
dummy = ConnectionInformation(7, bytearray.fromhex('0023023a1a2e'), 0, True, 0xc,
bytearray.fromhex('e98a5eaaff39ecb5ce4447590dfb73a4'), 16,
bytearray.fromhex('dbea2d9c47bc1aa6afe664ff31591aa6'), -87,
bytearray.fromhex('0a00c821ffff8ffa'), bytearray.fromhex('9bff598701000000'),
bytearray.fromhex('00'))
trace = os.path.join(os.path.dirname(os.path.abspath(__file__)),
'traces/adbcore/dictionary_tests/info_conn_7.trace')
+5 -4
View File
@@ -1,3 +1,4 @@
from __future__ import print_function
from internalblue.cli import _parse_argv
from internalblue.adbcore import ADBCore
from internalblue.objects.connection_information import ConnectionInformation
@@ -12,10 +13,10 @@ except ImportError:
def test_info_conn_9():
dummy = ConnectionInformation(9, '000000000000'.decode('hex'), 0, False, 12,
'00000000000000000000000000000000'.decode('hex'), 0, '', -87,
'0000000000000000'.decode('hex'),
'0000000000000000'.decode('hex'), '00'.decode('hex'))
dummy = ConnectionInformation(9, bytearray.fromhex('000000000000'), 0, False, 12,
bytearray.fromhex('00000000000000000000000000000000'), 0, b'', -87,
bytearray.fromhex('0000000000000000'),
bytearray.fromhex('0000000000000000'), bytearray.fromhex('00'))
trace = os.path.join(os.path.dirname(os.path.abspath(__file__)),
'traces/adbcore/dictionary_tests/info_conn_9.trace')
+1
View File
@@ -1,3 +1,4 @@
from __future__ import print_function
from internalblue.cli import _parse_argv
from internalblue.adbcore import ADBCore
from internalblue.objects.queue_element import QueueElement
+3 -1
View File
@@ -1,5 +1,7 @@
from __future__ import print_function
from __future__ import absolute_import
from testwrapper import trace_test, get_trace_path_cmd_tuple
from .testwrapper import trace_test, get_trace_path_cmd_tuple
import os
+2 -1
View File
@@ -1,3 +1,4 @@
from builtins import object
import argparse
from internalblue.cli import internalblue_cli, _parse_argv
@@ -13,7 +14,7 @@ except ImportError:
tracedir = os.path.dirname(__file__)
class Fakeargs():
class Fakeargs(object):
def __init__(self):
self.data_directory = None
self.verbose = False