Compare commits
15 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| 2ce2224421 | |||
| 104a35a79a | |||
| e6b99906c9 | |||
| d3059b01d8 | |||
| a7066170fc | |||
| 748c713f67 | |||
| 0864e96569 | |||
| c6e39cb18f | |||
| 07c5c4c336 | |||
| 01589f8eee | |||
| d9de8f0d83 | |||
| 9b8d5b0740 | |||
| e53edb1ec9 | |||
| b72e12b5a6 | |||
| d7b3b8e7a1 |
@@ -17,9 +17,11 @@ btsnoop.log
|
||||
# xcode
|
||||
xcuserdata
|
||||
*.xcworkspace
|
||||
macos-framework/IOBluetoothExtended.framework/
|
||||
|
||||
# venv
|
||||
venv
|
||||
venv3
|
||||
|
||||
# pycharm
|
||||
*.idea
|
||||
|
||||
@@ -4,6 +4,7 @@
|
||||
|
||||
# Get receive statistics on a Nexus 5 for BLE connection events
|
||||
|
||||
from builtins import range
|
||||
from pwn import *
|
||||
from internalblue.adbcore import ADBCore
|
||||
import internalblue.hci as hci
|
||||
|
||||
@@ -4,6 +4,7 @@
|
||||
|
||||
# Get receive statistics on a Samsung Galaxy S8 for BLE connection events
|
||||
|
||||
from builtins import range
|
||||
from pwn import *
|
||||
from internalblue.adbcore import ADBCore
|
||||
import internalblue.hci as hci
|
||||
|
||||
@@ -1,6 +1,8 @@
|
||||
|
||||
from future import standard_library
|
||||
standard_library.install_aliases()
|
||||
try:
|
||||
from Queue import Queue
|
||||
from queue import Queue
|
||||
from typing import List, Optional, Any, TYPE_CHECKING, Tuple, Union, NewType, Callable, Dict
|
||||
|
||||
if TYPE_CHECKING:
|
||||
|
||||
+16
-6
@@ -1,13 +1,23 @@
|
||||
#!/usr/bin/env python2
|
||||
import struct
|
||||
from time import sleep
|
||||
|
||||
from future import standard_library
|
||||
|
||||
from pwnlib import adb
|
||||
from pwnlib.exception import PwnlibException
|
||||
|
||||
standard_library.install_aliases()
|
||||
from builtins import str
|
||||
import datetime
|
||||
import socket
|
||||
import Queue
|
||||
import queue as queue2k
|
||||
import random
|
||||
from internalblue import hci
|
||||
from internalblue.utils import bytes_to_hex
|
||||
|
||||
from pwn import *
|
||||
|
||||
from internalblue.utils.pwnlib_wrapper import log, context, u32
|
||||
from .core import InternalBlue
|
||||
|
||||
|
||||
@@ -141,7 +151,7 @@ class ADBCore(InternalBlue):
|
||||
while(not self.exit_requested and len(record_hdr) < 24):
|
||||
try:
|
||||
recv_data = self.s_snoop.recv(24 - len(record_hdr))
|
||||
log.debug("recvThreadFunc: received bt_snoop data " + recv_data.encode('hex'))
|
||||
log.debug("recvThreadFunc: received bt_snoop data " + bytes_to_hex(recv_data))
|
||||
if len(recv_data) == 0:
|
||||
log.info("recvThreadFunc: bt_snoop socket was closed by remote site. stopping recv thread...")
|
||||
self.exit_requested = True
|
||||
@@ -163,7 +173,7 @@ class ADBCore(InternalBlue):
|
||||
orig_len, inc_len, flags, drops, time64 = struct.unpack( ">IIIIq", record_hdr)
|
||||
|
||||
# Read the record data
|
||||
record_data = b''
|
||||
record_data = bytearray()
|
||||
while(not self.exit_requested and len(record_data) < inc_len):
|
||||
try:
|
||||
recv_data = self.s_snoop.recv(inc_len - len(record_data))
|
||||
@@ -171,7 +181,7 @@ class ADBCore(InternalBlue):
|
||||
log.info("recvThreadFunc: bt_snoop socket was closed by remote site. stopping..")
|
||||
self.exit_requested = True
|
||||
break
|
||||
record_data += recv_data
|
||||
record_data += bytearray(recv_data)
|
||||
except socket.timeout:
|
||||
pass # this is ok. just try again without error
|
||||
|
||||
@@ -201,7 +211,7 @@ class ADBCore(InternalBlue):
|
||||
if filter_function == None or filter_function(record):
|
||||
try:
|
||||
queue.put(record, block=False)
|
||||
except Queue.Full:
|
||||
except queue.Full:
|
||||
log.warn("recvThreadFunc: A recv queue is full. dropping packets..")
|
||||
|
||||
# Call all callback functions inside registeredHciCallbacks and pass the
|
||||
|
||||
+33
-28
@@ -28,7 +28,12 @@
|
||||
# Software.
|
||||
|
||||
|
||||
from pwn import *
|
||||
from __future__ import print_function
|
||||
|
||||
import socket
|
||||
import sys
|
||||
from builtins import str
|
||||
import internalblue.utils.pwnlib_wrapper as pwnlib
|
||||
import os
|
||||
import traceback
|
||||
import argparse
|
||||
@@ -60,7 +65,7 @@ def print_banner():
|
||||
|
||||
type <help> for usage information!\n\n"""
|
||||
for line in banner:
|
||||
term.output(text.blue(line))
|
||||
pwnlib.term.output(pwnlib.text.blue(line))
|
||||
|
||||
def commandLoop(internalblue, init_commands=None):
|
||||
cmdstack = init_commands.split(';')[::-1] if init_commands else None
|
||||
@@ -70,40 +75,39 @@ def commandLoop(internalblue, init_commands=None):
|
||||
if cmdstack:
|
||||
cmdline = cmdstack.pop().strip()
|
||||
else:
|
||||
cmdline = term.readline.readline(prompt='> ').strip()
|
||||
cmdline = pwnlib.term.readline.readline(prompt='> ').strip().decode('utf-8')
|
||||
cmdword = cmdline.split(' ')[0].split('=')[0]
|
||||
if(cmdword == ''):
|
||||
continue
|
||||
log.debug("Command Line: [[" + cmdword + "]] " + cmdline)
|
||||
pwnlib.log.debug("Command Line: [[" + cmdword + "]] " + cmdline)
|
||||
matching_cmd = cmds.findCmd(cmdword)
|
||||
if matching_cmd == None:
|
||||
log.warn("Command unknown: " + cmdline)
|
||||
pwnlib.log.warn("Command unknown: " + cmdline)
|
||||
continue
|
||||
cmd_instance = matching_cmd(cmdline, internalblue)
|
||||
|
||||
if(not cmd_instance.work()):
|
||||
log.warn("Command failed: " + str(cmd_instance))
|
||||
pwnlib.log.warn("Command failed: " + str(cmd_instance))
|
||||
except ValueError as e:
|
||||
log.warn("commandLoop: ValueError: " + str(e))
|
||||
continue
|
||||
pwnlib.log.warn("commandLoop: ValueError: " + str(e))
|
||||
raise
|
||||
except KeyboardInterrupt:
|
||||
if(cmd_instance != None):
|
||||
cmd_instance.abort_cmd()
|
||||
else:
|
||||
log.info("Got Ctrl-C; exiting...")
|
||||
pwnlib.log.info("Got Ctrl-C; exiting...")
|
||||
internalblue.exit_requested = True
|
||||
break
|
||||
except AssertionError as e:
|
||||
raise
|
||||
except socket.error as e:
|
||||
if e.args == (1, "Operation not permitted"):
|
||||
log.critical("Received an 'Operation not permitted' socket.error, you might need root for the command '{}'".format(cmdline))
|
||||
log.critical(traceback.format_exc())
|
||||
pwnlib.log.critical("Received an 'Operation not permitted' socket.error, you might need root for the command '{}'".format(cmdline))
|
||||
pwnlib.log.critical(traceback.format_exc())
|
||||
except Exception as e:
|
||||
internalblue.exit_requested = True # Make sure all threads terminate
|
||||
log.critical("Uncaught exception (%s). Abort." % str(e))
|
||||
pwnlib.log.critical("Uncaught exception (%s). Abort." % str(e))
|
||||
print(traceback.format_exc())
|
||||
break
|
||||
raise
|
||||
cmd_instance = None
|
||||
|
||||
def _parse_argv(argv):
|
||||
@@ -145,8 +149,8 @@ def internalblue_cli(argv, args=None):
|
||||
for cmd in cmds.getCmdList():
|
||||
for keyword in cmd.keywords:
|
||||
cmd_keywords.append(keyword)
|
||||
readline_completer = term.completer.LongestPrefixCompleter(words=cmd_keywords)
|
||||
term.readline.set_completer(readline_completer)
|
||||
readline_completer = pwnlib.term.completer.LongestPrefixCompleter(words=cmd_keywords)
|
||||
pwnlib.term.readline.set_completer(readline_completer)
|
||||
|
||||
|
||||
|
||||
@@ -217,18 +221,18 @@ def internalblue_cli(argv, args=None):
|
||||
elif args.device:
|
||||
matching_devices = [ dev for dev in devices if dev[1] == args.device]
|
||||
if len(matching_devices) > 1:
|
||||
log.critical("Found multiple matching devices")
|
||||
pwnlib.log.critical("Found multiple matching devices")
|
||||
exit(-1)
|
||||
elif len(matching_devices) == 1:
|
||||
log.info("Found device is: {}".format(matching_devices[0]))
|
||||
pwnlib.log.info("Found device is: {}".format(matching_devices[0]))
|
||||
device = matching_devices[0]
|
||||
else:
|
||||
log.critical("No matching devices found")
|
||||
pwnlib.log.critical("No matching devices found")
|
||||
exit(-1)
|
||||
elif len(devices) == 1:
|
||||
device = devices[0]
|
||||
else:
|
||||
i = options('Please specify device:', [d[2] for d in devices], 0)
|
||||
i = pwnlib.options('Please specify device:', [d[2] for d in devices], 0)
|
||||
device = devices[i]
|
||||
|
||||
# Setup device
|
||||
@@ -237,28 +241,29 @@ def internalblue_cli(argv, args=None):
|
||||
|
||||
# Restore readline history:
|
||||
if os.path.exists(reference.data_directory + "/" + HISTFILE):
|
||||
readline_history = read(reference.data_directory + "/" + HISTFILE)
|
||||
term.readline.history = readline_history.split('\n')
|
||||
readline_history = pwnlib.read(reference.data_directory + "/" + HISTFILE)
|
||||
pwnlib.term.readline.history = readline_history.split(b'\n')
|
||||
|
||||
# Connect to device
|
||||
if not reference.connect():
|
||||
log.critical("No connection to target device.")
|
||||
pwnlib.log.critical("No connection to target device.")
|
||||
exit(-1)
|
||||
|
||||
# Enter command loop (runs until user quits)
|
||||
log.info("Starting commandLoop for reference {}".format(reference))
|
||||
pwnlib.log.info("Starting commandLoop for reference {}".format(reference))
|
||||
commandLoop(reference, init_commands=args.commands)
|
||||
|
||||
# shutdown connection
|
||||
reference.shutdown()
|
||||
|
||||
# Save readline history:
|
||||
f = open(reference.data_directory + "/" + HISTFILE, "w")
|
||||
f.write("\n".join(term.readline.history))
|
||||
f.close()
|
||||
# TODO: - This causes issues, have to fix ASAP
|
||||
# f = open(reference.data_directory + "/" + HISTFILE, "w")
|
||||
# f.write("\n".join(term.readline.history))
|
||||
# f.close()
|
||||
|
||||
# Cleanup
|
||||
log.info("Goodbye")
|
||||
pwnlib.log.info("Goodbye")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
|
||||
+47
-35
@@ -23,7 +23,13 @@
|
||||
# out of or in connection with the Software or the use or other dealings in the
|
||||
# Software.
|
||||
|
||||
from pwn import *
|
||||
from __future__ import print_function
|
||||
|
||||
import re
|
||||
from builtins import str
|
||||
from builtins import hex
|
||||
from builtins import range
|
||||
from builtins import object
|
||||
import os
|
||||
import sys
|
||||
import inspect
|
||||
@@ -36,10 +42,14 @@ import time
|
||||
import select
|
||||
import json
|
||||
|
||||
from pwnlib.context import context
|
||||
from pwnlib.asm import disasm, asm
|
||||
from pwnlib.exception import PwnlibException
|
||||
from pwnlib.ui import yesno
|
||||
from pwnlib.util.fiddling import isprint
|
||||
|
||||
|
||||
|
||||
|
||||
from internalblue.utils.pwnlib_wrapper import log, flat, read, p8, p32, u32, p16
|
||||
from internalblue.utils import bytes_to_hex
|
||||
|
||||
try:
|
||||
from typing import List, Optional, Any, TYPE_CHECKING, Tuple, Type
|
||||
@@ -81,7 +91,7 @@ def bt_addr_to_str(bt_addr):
|
||||
# type: (BluetoothAddress) -> str
|
||||
""" Convert a Bluetooth address (6 bytes) into a human readable format.
|
||||
"""
|
||||
return ":".join([b.encode("hex") for b in bt_addr])
|
||||
return ":".join(format(x, '02x') for x in bytearray(bt_addr))
|
||||
|
||||
def parse_bt_addr(bt_addr):
|
||||
# type: (Any) -> Optional[BluetoothAddress]
|
||||
@@ -105,7 +115,7 @@ def parse_bt_addr(bt_addr):
|
||||
return addr
|
||||
|
||||
|
||||
class Cmd:
|
||||
class Cmd(object):
|
||||
""" This class is the superclass of a CLI command. Every CLI command
|
||||
must be defined as subclass of Cmd. The subclass must define the
|
||||
'keywords' list as member variable. The actual implementation of the
|
||||
@@ -186,7 +196,7 @@ class Cmd:
|
||||
dumped_sections[section.start_addr] = self.readMem(section.start_addr, section.size(), self.progress_log, bytes_done, bytes_total)
|
||||
bytes_done += section.size()
|
||||
self.progress_log.success("Received Data: complete")
|
||||
Cmd.memory_image = fit(dumped_sections, filler='\x00')
|
||||
Cmd.memory_image = flat(dumped_sections, filler='\x00')
|
||||
f = open(self.memory_image_template_filename, 'wb')
|
||||
f.write(Cmd.memory_image)
|
||||
f.close()
|
||||
@@ -284,7 +294,8 @@ class CmdLogLevel(Cmd):
|
||||
log_levels = ['CRITICAL', 'DEBUG', 'ERROR', 'INFO', 'NOTSET', 'WARN', 'WARNING']
|
||||
|
||||
for keyword in list(keywords):
|
||||
keywords.extend(['%s %s' % (keyword, log_level) for log_level in log_levels])
|
||||
for log_level in log_levels:
|
||||
keywords.append('%s %s' % (keyword, log_level))
|
||||
|
||||
parser = argparse.ArgumentParser(prog=keywords[0],
|
||||
description=description,
|
||||
@@ -316,7 +327,7 @@ class CmdMonitor(Cmd):
|
||||
parser.add_argument("command",
|
||||
help="One of: start, stop, kill")
|
||||
|
||||
class MonitorController:
|
||||
class MonitorController(object):
|
||||
instance = None
|
||||
|
||||
@staticmethod
|
||||
@@ -326,7 +337,7 @@ class CmdMonitor(Cmd):
|
||||
CmdMonitor.MonitorController.instance = CmdMonitor.MonitorController.__MonitorController(internalblue, 0xC9)
|
||||
return CmdMonitor.MonitorController.instance
|
||||
|
||||
class __MonitorController:
|
||||
class __MonitorController(object):
|
||||
def __init__(self, internalblue, pcap_data_link_type):
|
||||
self.internalblue = internalblue
|
||||
self.running = False
|
||||
@@ -546,7 +557,7 @@ class CmdDumpMem(Cmd):
|
||||
bytes_total = sum([s.size() for s in self.internalblue.fw.SECTIONS if s.is_ram])
|
||||
bytes_done = 0
|
||||
self.progress_log = log.progress("Downloading RAM sections...")
|
||||
for section in filter(lambda s: s.is_ram, self.internalblue.fw.SECTIONS):
|
||||
for section in [s for s in self.internalblue.fw.SECTIONS if s.is_ram]:
|
||||
filename = args.file + "_" + hex(section.start_addr)
|
||||
if os.path.exists(filename):
|
||||
if not (args.overwrite or yesno("Update '%s'?" % filename)):
|
||||
@@ -599,10 +610,10 @@ class CmdSearchMem(Cmd):
|
||||
highlight = pattern
|
||||
if args.hex:
|
||||
try:
|
||||
pattern = pattern.decode('hex')
|
||||
pattern = bytearray.fromhex(pattern)
|
||||
highlight = pattern
|
||||
except TypeError as e:
|
||||
log.warn("Search pattern cannot be converted to hexstring: " + str(e))
|
||||
log.warn("Search pattern cannot be converted to bytestring: " + str(e))
|
||||
return False
|
||||
elif args.address:
|
||||
pattern = p32(int(pattern, 16))
|
||||
@@ -651,7 +662,7 @@ class CmdHexdump(Cmd):
|
||||
if dump == None:
|
||||
return False
|
||||
|
||||
log.hexdump(dump, begin=args.address)
|
||||
log.hexdump(bytes(dump), begin=args.address)
|
||||
return True
|
||||
|
||||
class CmdTelescope(Cmd):
|
||||
@@ -766,9 +777,9 @@ class CmdWriteMem(Cmd):
|
||||
data = ' '.join(args.data)
|
||||
if args.hex:
|
||||
try:
|
||||
data = data.decode('hex')
|
||||
data = bytearray.fromhex(data)
|
||||
except TypeError as e:
|
||||
log.warn("Data string cannot be converted to hexstring: " + str(e))
|
||||
log.warn("Hex string cannot be converted to bytestring: " + str(e))
|
||||
return False
|
||||
elif args.int:
|
||||
data = p32(auto_int(data))
|
||||
@@ -951,7 +962,7 @@ class CmdSendHciCmd(Cmd):
|
||||
log.info("cmdcode needs to be in the range of 0x0000 - 0xffff")
|
||||
return False
|
||||
|
||||
data = ''
|
||||
data = b''
|
||||
for data_part in args.data:
|
||||
if data_part[0:2] == "0x":
|
||||
data += p32(auto_int(data_part))
|
||||
@@ -1181,16 +1192,16 @@ class CmdInfo(Cmd):
|
||||
log.info(" - Remote BT name: %08X" % connection.remote_name_address)
|
||||
log.info(" - Master of Conn.: %s" % str(connection.master_of_connection))
|
||||
log.info(" - Conn. Handle: 0x%X" % connection.connection_handle)
|
||||
log.info(" - Public RAND: %s" % connection.public_rand.encode('hex'))
|
||||
#log.info(" - PIN: %s" % connection.pin.encode('hex'))
|
||||
log.info(" - Public RAND: %s" % bytes_to_hex(connection.public_rand))
|
||||
#log.info(" - PIN: %s" % bytes_to_hex(connection.pin)
|
||||
#log.info(" - BT addr for key: %s" % bt_addr_to_str(connection.bt_addr_for_key))
|
||||
log.info(" - Effective Key Len: %d byte (%d bit)" % (connection.effective_key_len, 8*connection["effective_key_len"]))
|
||||
log.info(" - Link Key: %s" % connection.link_key.encode('hex'))
|
||||
log.info(" - LMP Features: %s" % connection.extended_lmp_feat.encode('hex'))
|
||||
log.info(" - Host Supported F: %s" % connection.host_supported_feat.encode('hex'))
|
||||
log.info(" - Link Key: %s" % bytes_to_hex(connection.link_key))
|
||||
log.info(" - LMP Features: %s" % bytes_to_hex(connection.extended_lmp_feat))
|
||||
log.info(" - Host Supported F: %s" % bytes_to_hex(connection.host_supported_feat))
|
||||
log.info(" - TX Power (dBm): %d" % connection.tx_pwr_lvl_dBm)
|
||||
log.info(" - Array Index: %s" % connection.id.encode('hex'))
|
||||
print
|
||||
log.info(" - Array Index: %s" % bytes_to_hex(connection.id))
|
||||
print()
|
||||
return True
|
||||
|
||||
def infoDevice(self, args):
|
||||
@@ -1199,14 +1210,14 @@ class CmdInfo(Cmd):
|
||||
log.warn(" '%s' not in fw.py. FEATURE NOT SUPPORTED!" % const)
|
||||
return False
|
||||
bt_addr = self.readMem(self.internalblue.fw.BD_ADDR, 6)[::-1]
|
||||
bt_addr_str = ":".join([b.encode("hex") for b in bt_addr])
|
||||
bt_addr_str = bt_addr_to_str(bt_addr)
|
||||
device_name = self.readMem(self.internalblue.fw.DEVICE_NAME, 258)
|
||||
device_name_len = u8(device_name[0])-1
|
||||
device_name_len = device_name[0]-1
|
||||
device_name = device_name[2:2+device_name_len]
|
||||
adb_serial = context.device
|
||||
|
||||
log.info("### | Device ###")
|
||||
log.info(" - Name: %s" % device_name)
|
||||
log.info(" - Name: %s" % device_name.decode('utf-8'))
|
||||
log.info(" - ADB Serial: %s" % adb_serial)
|
||||
log.info(" - Address: %s" % bt_addr_str)
|
||||
return True
|
||||
@@ -1228,7 +1239,7 @@ class CmdInfo(Cmd):
|
||||
code = disasm(table_values[i],vma=table_addresses[i],byte=False,offset=False)
|
||||
code = code.replace(" ", " ").replace("\n", "; ")
|
||||
log.info("[%03d] 0x%08X: %s (%s)" % (i, table_addresses[i],
|
||||
table_values[i].encode('hex'),
|
||||
bytes_to_hex(table_values[i]),
|
||||
code))
|
||||
return True
|
||||
|
||||
@@ -1306,7 +1317,7 @@ class CmdInfo(Cmd):
|
||||
|
||||
# Print Buffer Details
|
||||
buffer_size = bloc_for_details["buffer_size"] + 4
|
||||
for buffer_address, buffer_hdr in bloc_for_details["buffer_headers"].iteritems():
|
||||
for buffer_address, buffer_hdr in bloc_for_details["buffer_headers"].items():
|
||||
progress_log.status("Dumping buffers from BLOC[%d]: 0x%06X" % (bloc_for_details["index"], buffer_address))
|
||||
# Buffer in use!
|
||||
if buffer_hdr == bloc_for_details["address"]:
|
||||
@@ -1362,7 +1373,7 @@ class CmdInfo(Cmd):
|
||||
if args.type in subcommands:
|
||||
return subcommands[args.type](args.args)
|
||||
else:
|
||||
log.warn("Unkown type: %s\nKnown types: %s" % (args.type, subcommands.keys()))
|
||||
log.warn("Unkown type: %s\nKnown types: %s" % (args.type, list(subcommands.keys())))
|
||||
return False
|
||||
|
||||
|
||||
@@ -1486,7 +1497,8 @@ class CmdCustom(Cmd):
|
||||
actions = ['list', 'add', 'run', 'remove']
|
||||
|
||||
for keyword in list(keywords):
|
||||
keywords.extend(['%s %s' % (keyword, action) for action in actions])
|
||||
for action in actions:
|
||||
keywords.append('%s %s' % (keyword, action))
|
||||
|
||||
parser = argparse.ArgumentParser(prog=keywords[0],
|
||||
description=description,
|
||||
@@ -1521,7 +1533,7 @@ class CmdCustom(Cmd):
|
||||
return True
|
||||
|
||||
if args.do == 'list':
|
||||
custom_cmds= ["\t%s\t\t%s\n" % (k, v) for k, v in sorted(CmdCustom.custom_commands.iteritems())]
|
||||
custom_cmds= ["\t%s\t\t%s\n" % (k, v) for k, v in sorted(CmdCustom.custom_commands.items())]
|
||||
log.info("Custom commands:\n%s" % ''.join(custom_cmds))
|
||||
return True
|
||||
|
||||
@@ -1619,15 +1631,15 @@ class CmdReadAfhChannelMap(Cmd):
|
||||
"""
|
||||
response = self.internalblue.sendHciCommand(0x1406, p16(handle))
|
||||
|
||||
if len(response) < 17 or response[8:] == '\x00'*9:
|
||||
if len(response) < 17 or response[8:] == b'\x00'*9:
|
||||
log.info("Connection 0x%04x is not established." % handle)
|
||||
return False
|
||||
|
||||
log.info("Connection Handle: 0x%04x" % handle)
|
||||
log.info("AFH Enabled: %s" % bool(response[7] != '\x00'))
|
||||
log.info("AFH Enabled: %s" % bool(response[7] != 0))
|
||||
channels = ""
|
||||
for c in response[8:]:
|
||||
bits = format(ord(c), '08b')
|
||||
bits = format(c, '08b')
|
||||
for b in bits:
|
||||
if b == "1":
|
||||
channels = channels + " *"
|
||||
|
||||
+61
-40
@@ -25,16 +25,38 @@
|
||||
# out of or in connection with the Software or the use or other dealings in the
|
||||
# Software.
|
||||
|
||||
from __future__ import division
|
||||
|
||||
import socket
|
||||
import struct
|
||||
|
||||
from future import standard_library
|
||||
|
||||
import pwnlib
|
||||
from pwnlib.asm import asm
|
||||
from pwnlib.exception import PwnlibException
|
||||
from pwnlib.util.fiddling import bits, unbits
|
||||
from .utils.pwnlib_wrapper import p16, p8, u32, u16, p32, u8
|
||||
|
||||
standard_library.install_aliases()
|
||||
from builtins import hex
|
||||
from builtins import str
|
||||
from builtins import range
|
||||
from builtins import object
|
||||
from past.utils import old_div
|
||||
from abc import ABCMeta, abstractmethod
|
||||
|
||||
from pwn import *
|
||||
from .fw.fw import Firmware
|
||||
import datetime
|
||||
import time
|
||||
import Queue
|
||||
import queue as queue2k
|
||||
from . import hci
|
||||
from .objects.queue_element import QueueElement
|
||||
from .objects.connection_information import ConnectionInformation
|
||||
from future.utils import with_metaclass
|
||||
from internalblue.utils import bytes_to_hex
|
||||
|
||||
from internalblue.utils.pwnlib_wrapper import log, context, flat
|
||||
|
||||
try:
|
||||
from typing import List, Optional, Any, TYPE_CHECKING, Tuple, Union, NewType, Callable
|
||||
@@ -50,9 +72,7 @@ except:
|
||||
#import logging
|
||||
#log = logging.getLogger(__name__)
|
||||
|
||||
class InternalBlue:
|
||||
__metaclass__ = ABCMeta
|
||||
|
||||
class InternalBlue(with_metaclass(ABCMeta, object)):
|
||||
def __init__(self, queue_size=1000, btsnooplog_filename='btsnoop.log', log_level='info', fix_binutils='True', data_directory=".", replay=False):
|
||||
# type: (int, str, str, bool, str, bool) -> None
|
||||
context.log_level = log_level
|
||||
@@ -94,7 +114,7 @@ class InternalBlue:
|
||||
# firmware (the response is recognized with the help of the filter function).
|
||||
# Once the response arrived, it puts the response into the response_queue from
|
||||
# the tuple. See sendH4() and sendHciCommand().
|
||||
self.sendQueue = Queue.Queue(queue_size) # type: Queue.Queue[Task]
|
||||
self.sendQueue = queue2k.Queue(queue_size) # type: Queue.Queue[Task]
|
||||
|
||||
self.recvThread = None # The thread which is responsible for the HCI snoop socket
|
||||
self.sendThread = None # The thread which is responsible for the HCI inject socket
|
||||
@@ -225,7 +245,7 @@ class InternalBlue:
|
||||
# Wait for 'send task' in send queue
|
||||
try:
|
||||
task = self.sendQueue.get(timeout=0.5)
|
||||
except Queue.Empty:
|
||||
except queue2k.Empty:
|
||||
continue
|
||||
|
||||
# Extract the components of the task
|
||||
@@ -277,12 +297,12 @@ class InternalBlue:
|
||||
|
||||
# if the caller expects a response: register a queue to receive the response
|
||||
if queue != None and filter_function != None:
|
||||
recvQueue = Queue.Queue(1)
|
||||
recvQueue = queue2k.Queue(1)
|
||||
self.registerHciRecvQueue(recvQueue, filter_function)
|
||||
|
||||
# Send command to the chip using s_inject socket
|
||||
try:
|
||||
log.debug("_sendThreadFunc: Send: " + str(out.encode('hex')))
|
||||
log.debug("_sendThreadFunc: Send: " + bytes_to_hex(out))
|
||||
self.s_inject.send(out)
|
||||
except socket.error:
|
||||
pass
|
||||
@@ -299,7 +319,7 @@ class InternalBlue:
|
||||
record = recvQueue.get(timeout=2)
|
||||
hcipkt = record[0]
|
||||
data = hcipkt.data
|
||||
except Queue.Empty:
|
||||
except queue2k.Empty:
|
||||
log.warn("_sendThreadFunc: No response from the firmware.")
|
||||
data = None
|
||||
self.unregisterHciRecvQueue(recvQueue)
|
||||
@@ -363,7 +383,7 @@ class InternalBlue:
|
||||
|
||||
# Check if this was the last packet
|
||||
if len(self.tracepoint_memdump_parts) == self.fw.TRACEPOINT_RAM_DUMP_PKT_COUNT:
|
||||
dump = fit(self.tracepoint_memdump_parts)
|
||||
dump = flat(self.tracepoint_memdump_parts)
|
||||
#TODO: use this to start qemu
|
||||
filename = self.data_directory + "/" + "internalblue_tracepoint_0x%x_%s.bin" % (self.tracepoint_memdump_address, datetime.datetime.now().strftime("%Y-%m-%d_%H-%M-%S"))
|
||||
log.info("Captured Ram Dump for Tracepoint 0x%x to %s" % (self.tracepoint_memdump_address, filename))
|
||||
@@ -560,7 +580,7 @@ class InternalBlue:
|
||||
"""
|
||||
|
||||
# send Read_Local_Version_Information
|
||||
version = self.sendHciCommand(0x1001, '')
|
||||
version = self.sendHciCommand(0x1001, ''.encode('utf-8'))
|
||||
|
||||
if not version or len(version) < 11:
|
||||
log.warn("""initialize_fimware: Failed to send a HCI command to the Bluetooth driver.
|
||||
@@ -570,12 +590,12 @@ class InternalBlue:
|
||||
return False
|
||||
|
||||
# Broadcom uses 0x000f as vendor ID, Cypress 0x0131
|
||||
vendor = (u8(version[9]) << 8) + u8(version[8])
|
||||
vendor = (version[9] << 8) + version[8]
|
||||
if vendor != 0xf and vendor != 0x131:
|
||||
log.critical("Not running on a Broadcom or Cypress chip!")
|
||||
return False
|
||||
else:
|
||||
subversion = (u8(version[11]) << 8) + u8(version[10])
|
||||
subversion = (version[11] << 8) + version[10]
|
||||
|
||||
iOS = False
|
||||
if self.__class__.__name__ == "iOSCore":
|
||||
@@ -690,7 +710,7 @@ class InternalBlue:
|
||||
log.warn("registerHciRecvQueue: no such queue is registered!")
|
||||
|
||||
def sendHciCommand(self, opcode, data, timeout=3):
|
||||
# type: (Opcode, bytes, int) -> Optional[Any]
|
||||
# type: (Opcode, bytes, int) -> Optional[bytearray]
|
||||
"""
|
||||
Send an arbitrary HCI command packet by pushing a send-task into the
|
||||
sendQueue. This function blocks until the response is received
|
||||
@@ -702,7 +722,7 @@ class InternalBlue:
|
||||
# return this instead of the Command Complete Event (which will
|
||||
# follow later and will be ignored). This should be fixed..
|
||||
|
||||
queue = Queue.Queue(1)
|
||||
queue = queue2k.Queue(1)
|
||||
|
||||
# standard HCI command structure
|
||||
payload = p16(opcode) + p8(len(data)) + data
|
||||
@@ -732,14 +752,14 @@ class InternalBlue:
|
||||
timeout=timeout)
|
||||
ret = queue.get(timeout=timeout)
|
||||
return ret
|
||||
except Queue.Empty:
|
||||
except queue2k.Empty:
|
||||
log.warn("sendHciCommand: waiting for response timed out!")
|
||||
# If there was no response because the Trace Replay Hook throw an assert it will be in this attribute.
|
||||
# Raise this so the main thread doesn't ignore this and it will be caught by any testing framework
|
||||
if hasattr(self, 'test_failed'):
|
||||
raise self.test_failed
|
||||
return None
|
||||
except Queue.Full:
|
||||
except queue2k.Full:
|
||||
log.warn("sendHciCommand: send queue is full!")
|
||||
return None
|
||||
|
||||
@@ -756,7 +776,7 @@ class InternalBlue:
|
||||
try:
|
||||
self.sendQueue.put((h4type, data, None, None), timeout=timeout)
|
||||
return True
|
||||
except Queue.Full:
|
||||
except queue2k.Full:
|
||||
log.warn("sendH4: send queue is full!")
|
||||
return False
|
||||
|
||||
@@ -782,7 +802,7 @@ class InternalBlue:
|
||||
|
||||
try:
|
||||
return self.recvQueue.get(timeout=timeout)
|
||||
except Queue.Empty:
|
||||
except queue2k.Empty:
|
||||
return None
|
||||
|
||||
def readMem(self, address, length, progress_log=None, bytes_done=0, bytes_total=0):
|
||||
@@ -805,7 +825,7 @@ class InternalBlue:
|
||||
|
||||
read_addr = address # read_addr is the address of the next Read_RAM HCI command
|
||||
byte_counter = 0 # tracks the number of received bytes
|
||||
outbuffer = '' # buffer which stores all accumulated data read from the chip
|
||||
outbuffer = bytearray() # buffer which stores all accumulated data read from the chip
|
||||
if bytes_total == 0: # If no total bytes where given just use length
|
||||
bytes_total = length
|
||||
retry = 3 # Retry on failures
|
||||
@@ -840,7 +860,7 @@ class InternalBlue:
|
||||
log.debug("readMem: insufficient bytes returned, retrying...")
|
||||
continue
|
||||
|
||||
status = ord(response[3])
|
||||
status = response[3]
|
||||
if status != 0:
|
||||
# It is not yet reverse engineered what this byte means. For almost
|
||||
# all memory addresses it will be 0. But for some it will be different,
|
||||
@@ -856,7 +876,7 @@ class InternalBlue:
|
||||
response_check = self.sendHciCommand(0xfc4d, p32(read_addr) + p8(blocksize))
|
||||
if response != response_check:
|
||||
log.debug("readMem: double checking response failed at 0x%x! retry..." % read_addr)
|
||||
sleep(0.3)
|
||||
time.sleep(0.3)
|
||||
retry = retry - 1
|
||||
continue
|
||||
|
||||
@@ -865,7 +885,7 @@ class InternalBlue:
|
||||
byte_counter += len(data)
|
||||
if(progress_log != None):
|
||||
msg = "receiving data... %d / %d Bytes (%d%%)" % (bytes_done+byte_counter,
|
||||
bytes_total, (bytes_done+byte_counter)*100/bytes_total)
|
||||
bytes_total, old_div((bytes_done+byte_counter)*100,bytes_total))
|
||||
progress_log.status(msg)
|
||||
retry = 3 # this round worked, so we re-enable retries
|
||||
return outbuffer
|
||||
@@ -907,7 +927,7 @@ class InternalBlue:
|
||||
log.warn("readMemAligned: address (0x%x) must be 4-byte aligned!" % address)
|
||||
return None
|
||||
|
||||
recvQueue = Queue.Queue(1)
|
||||
recvQueue = queue2k.Queue(1)
|
||||
def hciFilterFunction(record):
|
||||
# type: (Record) -> bool
|
||||
hcipkt = record[0]
|
||||
@@ -933,7 +953,7 @@ class InternalBlue:
|
||||
blocksize = 244
|
||||
|
||||
# Customize the assembler snippet with the current read_addr and blocksize
|
||||
code = asm(self.fw.READ_MEM_ALIGNED_ASM_SNIPPET % (blocksize, read_addr, blocksize/4), vma=self.fw.READ_MEM_ALIGNED_ASM_LOCATION, arch='thumb')
|
||||
code = asm(self.fw.READ_MEM_ALIGNED_ASM_SNIPPET % (blocksize, read_addr, old_div(blocksize,4)), vma=self.fw.READ_MEM_ALIGNED_ASM_LOCATION, arch='thumb')
|
||||
|
||||
# Write snippet to the RAM (TODO: maybe backup and restore content of this area?)
|
||||
self.writeMem(self.fw.READ_MEM_ALIGNED_ASM_LOCATION, code)
|
||||
@@ -950,7 +970,7 @@ class InternalBlue:
|
||||
# wait for the custom HCI event sent by the snippet:
|
||||
try:
|
||||
record = recvQueue.get(timeout=1)
|
||||
except Queue.Empty:
|
||||
except queue2k.Empty:
|
||||
log.warn("readMemAligned: No response from assembler snippet.")
|
||||
return None
|
||||
|
||||
@@ -961,7 +981,7 @@ class InternalBlue:
|
||||
byte_counter += len(data)
|
||||
if progress_log is not None:
|
||||
msg = "receiving data... %d / %d Bytes (%d%%)" % (bytes_done+byte_counter,
|
||||
bytes_total, (bytes_done+byte_counter)*100/bytes_total)
|
||||
bytes_total, old_div((bytes_done+byte_counter)*100,bytes_total))
|
||||
progress_log.status(msg)
|
||||
|
||||
self.unregisterHciRecvQueue(recvQueue)
|
||||
@@ -1000,8 +1020,8 @@ class InternalBlue:
|
||||
if(response == None):
|
||||
log.warn("writeMem: Timeout while reading response, probably need to wait longer.")
|
||||
return False
|
||||
elif (response[3] != '\x00'):
|
||||
log.warn("writeMem: Got error code %s in command complete event." % response[3].encode('hex'))
|
||||
elif (response[3] != 0):
|
||||
log.warn("writeMem: Got error code %s in command complete event." % bytes_to_hex(response[3]))
|
||||
return False
|
||||
write_addr += blocksize
|
||||
byte_counter += blocksize
|
||||
@@ -1024,8 +1044,9 @@ class InternalBlue:
|
||||
log.warn("Empty HCI response during launchRam, driver crashed due to invalid code or destination")
|
||||
return False
|
||||
|
||||
if response[3] != '\x00':
|
||||
log.warn("Got error code %x in command complete event." % u8(response[3]))
|
||||
error_code = response[3]
|
||||
if error_code != 0:
|
||||
log.warn("Got error code %x in command complete event." % error_code)
|
||||
return False
|
||||
|
||||
# Nexus 6P Bugfix
|
||||
@@ -1036,7 +1057,7 @@ class InternalBlue:
|
||||
return True
|
||||
|
||||
def getPatchramState(self):
|
||||
# type: () -> Tuple[List[Optional[int]], List[Any], List[Any]]
|
||||
# type: () -> Union[bool, Tuple[List[Optional[int]], List[Union[Union[int, bytes, None], Any]], list]]
|
||||
"""
|
||||
Retrieves the current state of the patchram unit. The return value
|
||||
is a tuple containing 3 lists which are indexed by the slot number:
|
||||
@@ -1056,10 +1077,10 @@ class InternalBlue:
|
||||
|
||||
# On Nexus 5, ReadMemAligned is required, while Nexus 6P supports this memory area with ReadRAM
|
||||
if self.fw.PATCHRAM_ALIGNED:
|
||||
slot_dump = self.readMemAligned(self.fw.PATCHRAM_ENABLED_BITMAP_ADDRESS, slot_count/4)
|
||||
slot_dump = self.readMemAligned(self.fw.PATCHRAM_ENABLED_BITMAP_ADDRESS, old_div(slot_count,4))
|
||||
table_addr_dump = self.readMemAligned(self.fw.PATCHRAM_TARGET_TABLE_ADDRESS, slot_count*4)
|
||||
else:
|
||||
slot_dump = self.readMem(self.fw.PATCHRAM_ENABLED_BITMAP_ADDRESS, slot_count/4)
|
||||
slot_dump = self.readMem(self.fw.PATCHRAM_ENABLED_BITMAP_ADDRESS, old_div(slot_count,4))
|
||||
table_addr_dump = self.readMem(self.fw.PATCHRAM_TARGET_TABLE_ADDRESS, slot_count*4)
|
||||
table_val_dump = self.readMem(self.fw.PATCHRAM_VALUE_TABLE_ADDRESS, slot_count*4)
|
||||
|
||||
@@ -1067,11 +1088,11 @@ class InternalBlue:
|
||||
table_values = []
|
||||
slot_dwords = []
|
||||
slot_bits = []
|
||||
for dword in range(slot_count/32):
|
||||
for dword in range(old_div(slot_count,32)):
|
||||
slot_dwords.append(slot_dump[dword*32:(dword+1)*32])
|
||||
|
||||
for dword in slot_dwords:
|
||||
slot_bits.extend(bits(dword[::-1])[::-1])
|
||||
slot_bits.extend(bits(bytes(dword[::-1]))[::-1])
|
||||
for i in range(slot_count):
|
||||
if slot_bits[i]:
|
||||
table_addresses.append(u32(table_addr_dump[i*4:i*4+4])<<2)
|
||||
@@ -1157,7 +1178,7 @@ class InternalBlue:
|
||||
|
||||
# Enable patchram slot (enable bitfield starts at 0x310204)
|
||||
# (We need to enable the slot by setting a bit in a multi-dword bitfield)
|
||||
target_dword = int(slot / 32)
|
||||
target_dword = int(old_div(slot, 32))
|
||||
table_slots[slot] = 1
|
||||
slot_dword = unbits(table_slots[target_dword*32:(target_dword+1)*32][::-1])[::-1]
|
||||
self.writeMem(self.fw.PATCHRAM_ENABLED_BITMAP_ADDRESS + target_dword*4, slot_dword)
|
||||
@@ -1196,7 +1217,7 @@ class InternalBlue:
|
||||
|
||||
# Disable patchram slot (enable bitfield starts at 0x310204)
|
||||
# (We need to disable the slot by clearing a bit in a multi-dword bitfield)
|
||||
target_dword = int(slot / 32)
|
||||
target_dword = int(old_div(slot, 32))
|
||||
table_slots[slot] = 0
|
||||
slot_dword = unbits(table_slots[target_dword*32:(target_dword+1)*32][::-1])[::-1]
|
||||
self.writeMem(self.fw.PATCHRAM_ENABLED_BITMAP_ADDRESS + target_dword*4, slot_dword)
|
||||
@@ -1715,7 +1736,7 @@ class InternalBlue:
|
||||
"""
|
||||
|
||||
if not self.serial:
|
||||
self.sendH4(hci.HCI.BCM_DIAG, '\xf0' + p8(enable))
|
||||
self.sendH4(hci.HCI.BCM_DIAG, b'\xf0' + b'\x01' if enable else b'\x00')
|
||||
|
||||
# We can send the activation to the serial, but then the Android driver
|
||||
# itself crashes when receiving diagnostic frames...
|
||||
|
||||
@@ -23,10 +23,12 @@
|
||||
# out of or in connection with the Software or the use or other dealings in the
|
||||
# Software.
|
||||
|
||||
from builtins import hex
|
||||
from builtins import object
|
||||
from pwn import log
|
||||
|
||||
|
||||
class Firmware:
|
||||
class Firmware(object):
|
||||
def __init__(self, version=None, iOS=False):
|
||||
"""
|
||||
Load and initialize the actual firmware add-ons for Nexus 5, Raspi3, etc.
|
||||
@@ -60,7 +62,7 @@ class Firmware:
|
||||
log.info("Loaded firmware information for " + self.firmware.FW_NAME + ".")
|
||||
|
||||
|
||||
class MemorySection:
|
||||
class MemorySection(object):
|
||||
"""
|
||||
All firmwares have memory sections that can be RAM, ROM or neither of both.
|
||||
"""
|
||||
|
||||
@@ -1,3 +1,4 @@
|
||||
from __future__ import absolute_import
|
||||
# fw_0x420e.py
|
||||
#
|
||||
# Generic firmware file in case we do not know something...
|
||||
@@ -20,7 +21,7 @@
|
||||
# out of or in connection with the Software or the use or other dealings in the
|
||||
# Software.
|
||||
|
||||
from fw import MemorySection
|
||||
from .fw import MemorySection
|
||||
|
||||
# Firmware Infos
|
||||
# Samsung S10/S10e/S10+
|
||||
|
||||
@@ -27,6 +27,7 @@
|
||||
|
||||
# Firmware Infos
|
||||
# This runs on Rasperry Pi 3
|
||||
from builtins import object
|
||||
FW_NAME = "BCM43430A1"
|
||||
|
||||
# Device Infos
|
||||
@@ -34,7 +35,7 @@ DEVICE_NAME = 0x20401C
|
||||
BD_ADDR = 0x201C64
|
||||
|
||||
# Memory Sections
|
||||
class MemorySection:
|
||||
class MemorySection(object):
|
||||
def __init__(self, start_addr, end_addr, is_rom, is_ram):
|
||||
self.start_addr = start_addr
|
||||
self.end_addr = end_addr
|
||||
|
||||
@@ -1,3 +1,4 @@
|
||||
from __future__ import absolute_import
|
||||
# fw_0x420e.py
|
||||
#
|
||||
# Generic firmware file in case we do not know something...
|
||||
@@ -20,7 +21,7 @@
|
||||
# out of or in connection with the Software or the use or other dealings in the
|
||||
# Software.
|
||||
|
||||
from fw import MemorySection
|
||||
from .fw import MemorySection
|
||||
|
||||
# Firmware Infos
|
||||
# Evaluation Kit CYW20706
|
||||
|
||||
@@ -1,3 +1,4 @@
|
||||
from __future__ import absolute_import
|
||||
# fw_0x420e.py
|
||||
#
|
||||
# Generic firmware file in case we do not know something...
|
||||
@@ -20,7 +21,7 @@
|
||||
# out of or in connection with the Software or the use or other dealings in the
|
||||
# Software.
|
||||
|
||||
from fw import MemorySection
|
||||
from .fw import MemorySection
|
||||
|
||||
# Firmware Infos
|
||||
# Evaluation Kit CYW920819
|
||||
|
||||
@@ -23,7 +23,8 @@
|
||||
# out of or in connection with the Software or the use or other dealings in the
|
||||
# Software.
|
||||
|
||||
from fw import MemorySection
|
||||
from __future__ import absolute_import
|
||||
from .fw import MemorySection
|
||||
|
||||
# Firmware Infos
|
||||
FW_NAME = "BCM20702A1 (USB Bluetooth dongle)"
|
||||
|
||||
@@ -22,7 +22,8 @@
|
||||
# out of or in connection with the Software or the use or other dealings in the
|
||||
# Software.
|
||||
|
||||
from fw import MemorySection
|
||||
from __future__ import absolute_import
|
||||
from .fw import MemorySection
|
||||
|
||||
# Firmware Infos
|
||||
FW_NAME = "BCM20703A2 (MacBook Pro 2016)"
|
||||
|
||||
@@ -25,7 +25,8 @@
|
||||
# out of or in connection with the Software or the use or other dealings in the
|
||||
# Software.
|
||||
|
||||
from fw import MemorySection
|
||||
from __future__ import absolute_import
|
||||
from .fw import MemorySection
|
||||
|
||||
# Firmware Infos
|
||||
# This runs on Nexus 6P, Samsung Galaxy S6, Samsung Galaxy S6 edge
|
||||
|
||||
@@ -27,13 +27,14 @@
|
||||
|
||||
# Firmware Infos
|
||||
# This runs on Rasperry Pi 3+
|
||||
from builtins import object
|
||||
FW_NAME = "BCM4345C0"
|
||||
|
||||
# Device Infos
|
||||
DEVICE_NAME = 0x204954
|
||||
|
||||
# Memory Sections
|
||||
class MemorySection:
|
||||
class MemorySection(object):
|
||||
def __init__(self, start_addr, end_addr, is_rom, is_ram):
|
||||
self.start_addr = start_addr
|
||||
self.end_addr = end_addr
|
||||
|
||||
+17
-10
@@ -25,7 +25,14 @@
|
||||
# out of or in connection with the Software or the use or other dealings in the
|
||||
# Software.
|
||||
|
||||
from pwn import *
|
||||
from __future__ import absolute_import
|
||||
from builtins import hex
|
||||
from builtins import range
|
||||
from builtins import object
|
||||
|
||||
from internalblue.utils.pwnlib_wrapper import p8, u16, p16, unbits, bits_str, u8, bits, p32, u32
|
||||
from internalblue.utils.pwnlib_wrapper import log
|
||||
from pwnlib.util.packing import flat
|
||||
|
||||
HCI_UART_TYPE_CLASS = {}
|
||||
|
||||
@@ -52,7 +59,7 @@ class HCI(object):
|
||||
|
||||
@staticmethod
|
||||
def from_data(data):
|
||||
uart_type = ord(data[0])
|
||||
uart_type = data[0]
|
||||
return HCI_UART_TYPE_CLASS[uart_type].from_data(data[1:])
|
||||
|
||||
def __init__(self, uart_type):
|
||||
@@ -573,7 +580,7 @@ class HCI_Cmd(HCI):
|
||||
0xffed : "COMND VSC_EnterDownloadMode"
|
||||
}
|
||||
|
||||
HCI_CMD_STR_REVERSE = {v: k for k, v in HCI_CMD_STR.iteritems()}
|
||||
HCI_CMD_STR_REVERSE = {v: k for k, v in HCI_CMD_STR.items()}
|
||||
|
||||
@staticmethod
|
||||
def cmd_name(opcode):
|
||||
@@ -607,7 +614,7 @@ class HCI_Cmd(HCI):
|
||||
|
||||
@staticmethod
|
||||
def from_data(data):
|
||||
return HCI_Cmd(u16(data[0:2]), ord(data[2]), data[3:])
|
||||
return HCI_Cmd(u16(data[0:2]), data[2], data[3:])
|
||||
|
||||
def __init__(self, opcode, length, data):
|
||||
HCI.__init__(self, HCI.HCI_CMD)
|
||||
@@ -623,7 +630,7 @@ class HCI_Cmd(HCI):
|
||||
cmdname = "unknown"
|
||||
if self.opcode in self.HCI_CMD_STR:
|
||||
cmdname = self.HCI_CMD_STR[self.opcode]
|
||||
return parent + "<0x%04x %s (len=%d): %s>" % (self.opcode, cmdname, self.length, self.data[0:16].encode('hex'))
|
||||
return parent + "<0x%04x %s (len=%d): %s>" % (self.opcode, cmdname, self.length, ''.join(format(x, '02x') for x in self.data[0:16]))
|
||||
|
||||
class HCI_Acl(HCI):
|
||||
|
||||
@@ -698,7 +705,7 @@ class HCI_Diag(HCI):
|
||||
cmdname = "unknown"
|
||||
if self.opcode in self.BCM_DIAG_STR:
|
||||
cmdname = self.BCM_DIAG_STR[self.opcode]
|
||||
return parent + "<0x%02x %s: %s>" % (self.opcode, cmdname, self.data[0:16].encode('hex'))
|
||||
return parent + "<0x%02x %s: %s>" % (self.opcode, cmdname, ''.join(format(x, '02x') for x in self.data[0:16]))
|
||||
|
||||
class HCI_Event(HCI):
|
||||
|
||||
@@ -887,7 +894,7 @@ class HCI_Event(HCI):
|
||||
|
||||
@staticmethod
|
||||
def from_data(data):
|
||||
return HCI_Event(ord(data[0]), ord(data[1]), data[2:])
|
||||
return HCI_Event(data[0], data[1], data[2:])
|
||||
|
||||
def __init__(self, event_code, length, data):
|
||||
HCI.__init__(self, HCI.HCI_EVT)
|
||||
@@ -903,7 +910,7 @@ class HCI_Event(HCI):
|
||||
eventname = "unknown"
|
||||
if self.event_code in self.HCI_EVENT_STR:
|
||||
eventname = self.HCI_EVENT_STR[self.event_code]
|
||||
return parent + "<0x%02x %s (len=%d): %s>" % (self.event_code, eventname, self.length, self.data[0:].encode('hex'))
|
||||
return parent + "<0x%02x %s (len=%d): %s>" % (self.event_code, eventname, self.length, ''.join(format(x, '02x') for x in self.data[0:]))
|
||||
|
||||
HCI_UART_TYPE_CLASS = {
|
||||
HCI.HCI_CMD : HCI_Cmd,
|
||||
@@ -917,7 +924,7 @@ def parse_hci_packet(data):
|
||||
return HCI.from_data(data)
|
||||
|
||||
|
||||
class StackDumpReceiver:
|
||||
class StackDumpReceiver(object):
|
||||
memdump_addr = None
|
||||
memdumps = {}
|
||||
stack_dump_has_happend = False
|
||||
@@ -961,7 +968,7 @@ class StackDumpReceiver:
|
||||
log.debug("Stack dump handling addr %08x", addr-self.memdump_addr)
|
||||
|
||||
def finishStackDump(self):
|
||||
dump = fit(self.memdumps)
|
||||
dump = flat(self.memdumps)
|
||||
log.warn("Stack dump @0x%08x written to %s!" % (self.memdump_addr, self.stack_dump_filename))
|
||||
f = open(self.stack_dump_filename, "wb")
|
||||
f.write(dump)
|
||||
|
||||
+22
-11
@@ -1,12 +1,22 @@
|
||||
#!/usr/bin/env python2
|
||||
|
||||
from __future__ import absolute_import
|
||||
|
||||
import socket
|
||||
import struct
|
||||
|
||||
from future import standard_library
|
||||
standard_library.install_aliases()
|
||||
from builtins import str
|
||||
from builtins import zip
|
||||
from builtins import range
|
||||
import subprocess
|
||||
import datetime
|
||||
from pwn import *
|
||||
from internalblue.utils.pwnlib_wrapper import log, context, p32, u16, p16, u32
|
||||
import fcntl
|
||||
from .core import InternalBlue
|
||||
import hci
|
||||
import Queue
|
||||
from . import hci
|
||||
import queue as queue2k
|
||||
import threading
|
||||
|
||||
try:
|
||||
@@ -65,7 +75,7 @@ class HCICore(InternalBlue):
|
||||
# Do ioctl(s,HCIGETDEVLIST,arg) to get the number of available devices:
|
||||
# arg is struct hci_dev_list_req (/usr/include/bluetooth/hci.h)
|
||||
arg = p32(16) # dl->dev_num = HCI_MAX_DEV which is 16 (little endian)
|
||||
arg += "\x00"*(8*16)
|
||||
arg += b"\x00"*(8*16)
|
||||
devices_raw = fcntl.ioctl(s.fileno(), HCIGETDEVLIST, arg)
|
||||
num_devices = u16(devices_raw[:2])
|
||||
log.debug("Found %d HCI devices via ioctl(HCIGETDEVLIST)!" % num_devices)
|
||||
@@ -76,10 +86,10 @@ class HCICore(InternalBlue):
|
||||
dev_id = u16(devices_raw[dev_struct_start:dev_struct_start+2])
|
||||
# arg is struct hci_dev_info (/usr/include/bluetooth/hci.h)
|
||||
arg = p16(dev_id) # di->dev_id = <device_id>
|
||||
arg += "\x00"*20 # Enough space for name, bdaddr and flags
|
||||
dev_info_raw = fcntl.ioctl(s.fileno(), HCIGETDEVINFO, arg)
|
||||
dev_name = dev_info_raw[2:10].replace("\x00","")
|
||||
dev_bdaddr = ":".join(["%02X" % ord(x) for x in dev_info_raw[10:16][::-1]])
|
||||
arg += b"\x00"*20 # Enough space for name, bdaddr and flags
|
||||
dev_info_raw = bytearray(fcntl.ioctl(s.fileno(), HCIGETDEVINFO, arg))
|
||||
dev_name = dev_info_raw[2:10].replace(b"\x00",b"").decode()
|
||||
dev_bdaddr = ":".join(["%02X" % x for x in dev_info_raw[10:16][::-1]])
|
||||
dev_flags = u32(dev_info_raw[16:20])
|
||||
if dev_flags == 0:
|
||||
dev_flags_str = "DOWN"
|
||||
@@ -192,6 +202,7 @@ class HCICore(InternalBlue):
|
||||
# Read the record data
|
||||
try:
|
||||
record_data = self.s_snoop.recv(1024)
|
||||
record_data = bytearray(record_data)
|
||||
except socket.timeout:
|
||||
continue # this is ok. just try again without error
|
||||
except Exception as e:
|
||||
@@ -227,7 +238,7 @@ class HCICore(InternalBlue):
|
||||
if filter_function == None or filter_function(record):
|
||||
try:
|
||||
queue.put(record, block=False)
|
||||
except Queue.Full:
|
||||
except queue.Full:
|
||||
log.warn("recvThreadFunc: A recv queue is full. dropping packets..")
|
||||
|
||||
# Call all callback functions inside registeredHciCallbacks and pass the
|
||||
@@ -278,7 +289,7 @@ class HCICore(InternalBlue):
|
||||
"""
|
||||
# TODO still seems to only forward incoming events?!
|
||||
self.s_snoop.setsockopt(socket.SOL_HCI, socket.HCI_FILTER,
|
||||
'\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\x00\x00\x00\x00') #type mask, event mask, event mask, opcode
|
||||
b'\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\x00\x00\x00\x00') #type mask, event mask, event mask, opcode
|
||||
|
||||
interface_num = device["dev_id"]
|
||||
log.debug("Socket interface number: %s" % (interface_num))
|
||||
@@ -292,7 +303,7 @@ class HCICore(InternalBlue):
|
||||
# Write Header to btsnoop file (if file is still empty):
|
||||
if self.write_btsnooplog and self.btsnooplog_file.tell() == 0:
|
||||
# BT Snoop Header: btsnoop\x00, version: 1, data link type: 1002
|
||||
btsnoop_hdr = "btsnoop\x00" + p32(1,endian="big") + p32(1002,endian="big")
|
||||
btsnoop_hdr = b"btsnoop\x00" + p32(1,endian="big") + p32(1002,endian="big")
|
||||
with self.btsnooplog_file_lock:
|
||||
self.btsnooplog_file.write(btsnoop_hdr)
|
||||
self.btsnooplog_file.flush()
|
||||
|
||||
+15
-8
@@ -1,12 +1,19 @@
|
||||
#!/usr/bin/env python2
|
||||
|
||||
from __future__ import absolute_import
|
||||
|
||||
import struct
|
||||
|
||||
from future import standard_library
|
||||
standard_library.install_aliases()
|
||||
from builtins import str
|
||||
import socket
|
||||
import Queue
|
||||
import hci
|
||||
import queue as queue2k
|
||||
from . import hci
|
||||
|
||||
from pwn import *
|
||||
from internalblue.utils.pwnlib_wrapper import log, context
|
||||
|
||||
from core import InternalBlue
|
||||
from .core import InternalBlue
|
||||
|
||||
class iOSCore(InternalBlue):
|
||||
|
||||
@@ -49,16 +56,16 @@ class iOSCore(InternalBlue):
|
||||
the command or None if no response was received within the timeout.
|
||||
"""
|
||||
|
||||
queue = Queue.Queue(1)
|
||||
queue = queue2k.Queue(1)
|
||||
|
||||
try:
|
||||
self.sendQueue.put((h4type, data, queue, None), timeout=timeout)
|
||||
ret = queue.get(timeout=timeout)
|
||||
return ret
|
||||
except Queue.Empty:
|
||||
except queue2k.Empty:
|
||||
log.warn("sendH4: waiting for response timed out!")
|
||||
return None
|
||||
except Queue.Full:
|
||||
except queue.Full:
|
||||
log.warn("sendH4: send queue is full!")
|
||||
return None
|
||||
|
||||
@@ -173,7 +180,7 @@ class iOSCore(InternalBlue):
|
||||
for queue, filter_function in self.registeredHciRecvQueues: # TODO filter_function not working with bluez modifications
|
||||
try:
|
||||
queue.put(record, block=False)
|
||||
except Queue.Full:
|
||||
except queue.Full:
|
||||
log.warn("recvThreadFunc: A recv queue is full. dropping packets..")
|
||||
|
||||
# Call all callback functions inside registeredHciCallbacks and pass the
|
||||
|
||||
+24
-12
@@ -1,17 +1,27 @@
|
||||
#!/usr/bin/env python2
|
||||
|
||||
from __future__ import absolute_import
|
||||
|
||||
import random
|
||||
import time
|
||||
|
||||
from future import standard_library
|
||||
standard_library.install_aliases()
|
||||
from builtins import str
|
||||
import socket
|
||||
import Queue
|
||||
import hci
|
||||
import queue as queue2k
|
||||
from . import hci
|
||||
|
||||
from pwn import *
|
||||
|
||||
from core import InternalBlue
|
||||
from internalblue.utils.pwnlib_wrapper import log, context, p8
|
||||
from .core import InternalBlue
|
||||
|
||||
import binascii
|
||||
import os
|
||||
filepath = os.path.dirname(os.path.abspath(__file__))
|
||||
|
||||
IOBE = None
|
||||
|
||||
class macOSCore(InternalBlue):
|
||||
|
||||
def __init__(self, queue_size=1000, btsnooplog_filename='btsnoop.log', log_level='info', fix_binutils='True', data_directory=".", replay=False):
|
||||
@@ -89,7 +99,7 @@ class macOSCore(InternalBlue):
|
||||
# read record data
|
||||
try:
|
||||
data, addr = self.s_snoop.recvfrom(1024)
|
||||
record_data = data
|
||||
record_data = bytearray(data)
|
||||
except socket.timeout:
|
||||
continue # this is ok. just try again without error
|
||||
|
||||
@@ -103,7 +113,7 @@ class macOSCore(InternalBlue):
|
||||
for queue, filter_function in self.registeredHciRecvQueues: # TODO filter_function not working with bluez modifications
|
||||
try:
|
||||
queue.put(record, block=False)
|
||||
except Queue.Full:
|
||||
except queue.Full:
|
||||
log.warn("recvThreadFunc: A recv queue is full. dropping packets..>" + record_data)
|
||||
|
||||
# Call all callback functions inside registeredHciCallbacks and pass the
|
||||
@@ -122,7 +132,7 @@ class macOSCore(InternalBlue):
|
||||
# Wait for 'send task' in send queue
|
||||
try:
|
||||
task = self.sendQueue.get(timeout=0.5)
|
||||
except Queue.Empty:
|
||||
except queue2k.Empty:
|
||||
continue
|
||||
|
||||
# Extract the components of the task
|
||||
@@ -133,8 +143,10 @@ class macOSCore(InternalBlue):
|
||||
|
||||
# Send command to the chip using IOBluetoothExtended framework
|
||||
h4type, data, queue, filter_function = task
|
||||
opcode = binascii.hexlify(data[1]) + binascii.hexlify(data[0])
|
||||
log.debug("Sending command: 0x" + binascii.hexlify(data) + ", opcode: " + opcode)
|
||||
data = bytearray(data)
|
||||
opcode = format(data[1], '02x') + format(data[0], '02x')
|
||||
|
||||
log.debug("Sending command: 0x" + ''.join(format(x, '02x') for x in data) + ", opcode: " + opcode)
|
||||
|
||||
if not(h4type == 0x01 or h4type == 0x02):
|
||||
log.warn("H4 Type {0} not supported by macOS Core!".format(str(h4type)))
|
||||
@@ -144,7 +156,7 @@ class macOSCore(InternalBlue):
|
||||
|
||||
# if the caller expects a response: register a queue to receive the response
|
||||
if queue is not None and filter_function is not None:
|
||||
recvQueue = Queue.Queue(1)
|
||||
recvQueue = queue2k.Queue(1)
|
||||
self.registerHciRecvQueue(recvQueue, filter_function)
|
||||
|
||||
# Sending command
|
||||
@@ -157,7 +169,7 @@ class macOSCore(InternalBlue):
|
||||
record = recvQueue.get(timeout=10)
|
||||
hcipkt = record[0]
|
||||
data = hcipkt.data
|
||||
except Queue.Empty:
|
||||
except queue2k.Empty:
|
||||
log.warn("_sendThreadFunc: No response from the firmware.")
|
||||
data = None
|
||||
self.unregisterHciRecvQueue(recvQueue)
|
||||
@@ -182,5 +194,5 @@ class macOSCore(InternalBlue):
|
||||
def shutdown(self):
|
||||
if not self.replay:
|
||||
self.iobe.shutdown()
|
||||
self.s_inject.sendto("", ('127.0.0.1', self.s_snoop.getsockname()[1]))
|
||||
self.s_inject.sendto(b'', ('127.0.0.1', self.s_snoop.getsockname()[1]))
|
||||
super(macOSCore, self).shutdown()
|
||||
|
||||
@@ -1,7 +1,8 @@
|
||||
from pwnlib.util.packing import u32, u16, u8
|
||||
from builtins import object
|
||||
from internalblue.utils.pwnlib_wrapper import u32, u16, u8
|
||||
|
||||
|
||||
class ConnectionInformation:
|
||||
class ConnectionInformation(object):
|
||||
connection_handle = 0
|
||||
connection_number = 0
|
||||
master_of_connection = False
|
||||
|
||||
@@ -1,4 +1,5 @@
|
||||
class QueueElement:
|
||||
from builtins import object
|
||||
class QueueElement(object):
|
||||
index = 0
|
||||
next_item = 0
|
||||
prev = 0
|
||||
|
||||
@@ -1,3 +1,5 @@
|
||||
from __future__ import print_function
|
||||
from builtins import object
|
||||
import binascii
|
||||
import time
|
||||
|
||||
@@ -8,7 +10,7 @@ except ImportError:
|
||||
pass
|
||||
|
||||
|
||||
class SocketRecvHook():
|
||||
class SocketRecvHook(object):
|
||||
def __init__(self, socket):
|
||||
# type: (socket.socket) -> None
|
||||
self.snoop_socket = socket
|
||||
@@ -43,7 +45,7 @@ class SocketRecvHook():
|
||||
self.recvfrom_hook(data, addr)
|
||||
return data, addr
|
||||
|
||||
class SocketInjectHook():
|
||||
class SocketInjectHook(object):
|
||||
def __init__(self, socket, core):
|
||||
# type: (socket.socket, InternalBlue) -> None
|
||||
self.inject_socket = socket
|
||||
@@ -114,7 +116,7 @@ class SocketDuplexHook(SocketInjectHook, SocketRecvHook):
|
||||
pass
|
||||
|
||||
|
||||
class HookBase():
|
||||
class HookBase(object):
|
||||
def send_hook(self, data):
|
||||
raise NotImplementedError
|
||||
|
||||
@@ -254,6 +256,8 @@ class ReplaySocket(SocketDuplexHook):
|
||||
def getsockname(self):
|
||||
return (None, 0)
|
||||
|
||||
def close(self):
|
||||
assert self.index + 1 == len(self.log)
|
||||
|
||||
from internalblue.core import InternalBlue
|
||||
|
||||
|
||||
@@ -1,12 +1,24 @@
|
||||
#!/usr/bin/env python2
|
||||
|
||||
from __future__ import print_function
|
||||
from __future__ import absolute_import
|
||||
|
||||
import os
|
||||
import random
|
||||
import time
|
||||
|
||||
from future import standard_library
|
||||
|
||||
from pwnlib.context import context
|
||||
|
||||
standard_library.install_aliases()
|
||||
import socket
|
||||
import Queue
|
||||
import hci
|
||||
import queue as queue2k
|
||||
from . import hci
|
||||
|
||||
from pwn import *
|
||||
from internalblue.utils.pwnlib_wrapper import log, p8
|
||||
|
||||
from core import InternalBlue
|
||||
from .core import InternalBlue
|
||||
import binascii
|
||||
|
||||
filepath = os.path.dirname(os.path.abspath(__file__))
|
||||
@@ -55,16 +67,16 @@ class testCore(InternalBlue):
|
||||
the command or None if no response was received within the timeout.
|
||||
"""
|
||||
|
||||
queue = Queue.Queue(1)
|
||||
queue = queue2k.Queue(1)
|
||||
|
||||
try:
|
||||
self.sendQueue.put((h4type, data, queue, None), timeout=timeout)
|
||||
ret = queue.get(timeout=timeout)
|
||||
return ret
|
||||
except Queue.Empty:
|
||||
except queue2k.Empty:
|
||||
log.warn("sendH4: waiting for response timed out!")
|
||||
return None
|
||||
except Queue.Full:
|
||||
except queue.Full:
|
||||
log.warn("sendH4: send queue is full!")
|
||||
return None
|
||||
|
||||
@@ -105,7 +117,7 @@ class testCore(InternalBlue):
|
||||
# Wait for 'send task' in send queue
|
||||
try:
|
||||
task = self.sendQueue.get(timeout=0.5)
|
||||
except Queue.Empty:
|
||||
except queue2k.Empty:
|
||||
continue
|
||||
|
||||
# Extract the components of the task
|
||||
@@ -121,7 +133,7 @@ class testCore(InternalBlue):
|
||||
|
||||
# if the caller expects a response: register a queue to receive the response
|
||||
if queue is not None and filter_function is not None:
|
||||
recvQueue = Queue.Queue(1)
|
||||
recvQueue = queue2k.Queue(1)
|
||||
self.registerHciRecvQueue(recvQueue, filter_function)
|
||||
|
||||
# if the caller expects a response:
|
||||
|
||||
@@ -0,0 +1,11 @@
|
||||
|
||||
|
||||
#from pwnlib.util.packing import *
|
||||
|
||||
def bytes_to_hex(bytes):
|
||||
# type: (bytearray) -> str
|
||||
return ''.join(format(x, '02x') for x in bytearray(bytes))
|
||||
|
||||
|
||||
|
||||
|
||||
@@ -0,0 +1,91 @@
|
||||
"""
|
||||
The following proxies various utilities from pwnlibs by explicitly importing them
|
||||
To replace a "from pwn import *" remove it and let your IDE highlight all missing methods (Hint: F2 in PyCharm goes to next error)
|
||||
import the missing (and only the missing!) methods from this module, e.g. with "from internalblue.utils import term, read, log, text, options"
|
||||
In some cases like "from pwn import socket" this just imports another module.
|
||||
Use an IPython shell to run "from pwn import *" and check where some method/module actually comes from and either import it directly or add it to this module
|
||||
"""
|
||||
|
||||
|
||||
|
||||
# Imports that used to be imported via 'from pwn import *'
|
||||
import pwnlib
|
||||
from pwnlib import term
|
||||
from pwnlib.util import iters
|
||||
from pwnlib.util.misc import read
|
||||
from pwnlib.context import context
|
||||
|
||||
#TODO: Logging via pwnlib doesn't work yet, so for now it is still used via pwn
|
||||
# import pwnlib.log
|
||||
# pwnlib.log.install_default_handler()
|
||||
# log = pwnlib.log.getLogger('internalbue')
|
||||
|
||||
from pwn import log
|
||||
|
||||
|
||||
from pwnlib.term import text
|
||||
from pwnlib.ui import options, yesno
|
||||
from pwnlib.util.packing import flat
|
||||
from pwnlib.asm import disasm
|
||||
from pwnlib.util.fiddling import isprint, unbits, bits_str, bits
|
||||
|
||||
|
||||
|
||||
|
||||
"""
|
||||
The packers like u8 are generated in a fairly convoluted way that breaks IDE introspection.
|
||||
The following code remedies this by:
|
||||
- Explicitly defining a stub function with type annotations
|
||||
- Generating all the packers like pwnlibs would
|
||||
- Only if if the current module already has the name of the packer as an attribute (i.e. has a stub function defined) it will be replaced with the pwnlibs version
|
||||
|
||||
This means:
|
||||
- All import issues in the rest of the code are genuine as the imports are only available if an explicit stub function is added
|
||||
- The functions can be easily replaced by just implementing them and removing the for loop at the end
|
||||
|
||||
"""
|
||||
|
||||
# Imports needed for this hack
|
||||
from pwnlib.util.packing import ops, sizes, make_multi
|
||||
import sys
|
||||
try:
|
||||
from typing import Union, Optional, Literal
|
||||
endianess = Union[Literal['big']]
|
||||
|
||||
except ImportError:
|
||||
pass
|
||||
mod = sys.modules[__name__]
|
||||
|
||||
|
||||
_DEFINES = ['u8', 'p8', 'u32', 'u16', 'p32']
|
||||
|
||||
|
||||
|
||||
def u8(data, endian = None):
|
||||
# type: (bytes, Optional[endianess]) -> int
|
||||
pass
|
||||
|
||||
def p8(number, endian = None):
|
||||
# type: (int, Optional[endianess]) -> bytes
|
||||
pass
|
||||
def u16(data, endian = None):
|
||||
# type: (bytes, Optional[endianess]) -> int
|
||||
pass
|
||||
|
||||
def p16(number, endian = None):
|
||||
# type: (int, Optional[endianess]) -> bytes
|
||||
pass
|
||||
|
||||
def u32(data, endian = None):
|
||||
# type: (bytes, Optional[endianess]) -> int
|
||||
pass
|
||||
|
||||
def p32(number, endian = None):
|
||||
# type: (int, Optional[endianess]) -> bytes
|
||||
pass
|
||||
|
||||
|
||||
for op, size in iters.product(ops, sizes):
|
||||
name, routine = make_multi(op, size)
|
||||
if hasattr(mod, name):
|
||||
setattr(mod, name, routine)
|
||||
@@ -1,2 +0,0 @@
|
||||
pwntools==3.12.2
|
||||
pyelftools==0.24
|
||||
@@ -11,10 +11,12 @@ setup(name='internalblue',
|
||||
license='MIT',
|
||||
packages=['internalblue', 'internalblue/fw'],
|
||||
install_requires=[
|
||||
'pwntools',
|
||||
'pwntools>=4.2.0.dev0',
|
||||
'pyelftools',
|
||||
],
|
||||
extras_require={
|
||||
"macoscore": ["pyobjc"],
|
||||
"macoscore": ["pyobjc"],
|
||||
"ipython": ["IPython"]
|
||||
},
|
||||
entry_points = {
|
||||
'console_scripts': ['internalblue=internalblue.cli:internalblue_cli']
|
||||
|
||||
@@ -1,3 +1,4 @@
|
||||
from __future__ import print_function
|
||||
from internalblue.cli import _parse_argv
|
||||
from internalblue.hcicore import HCICore
|
||||
|
||||
|
||||
@@ -1,3 +1,4 @@
|
||||
from __future__ import print_function
|
||||
from internalblue.cli import _parse_argv
|
||||
from internalblue.adbcore import ADBCore
|
||||
|
||||
|
||||
@@ -1,3 +1,4 @@
|
||||
from __future__ import print_function
|
||||
from internalblue.cli import _parse_argv
|
||||
from internalblue.adbcore import ADBCore
|
||||
from internalblue.objects.connection_information import ConnectionInformation
|
||||
@@ -13,11 +14,11 @@ except ImportError:
|
||||
|
||||
|
||||
def test_info_conn_7():
|
||||
dummy = ConnectionInformation(7, '0023023a1a2e'.decode('hex'), 0, True, 0xc,
|
||||
'e98a5eaaff39ecb5ce4447590dfb73a4'.decode('hex'), 16,
|
||||
'dbea2d9c47bc1aa6afe664ff31591aa6'.decode('hex'), -87,
|
||||
'0a00c821ffff8ffa'.decode('hex'), '9bff598701000000'.decode('hex'),
|
||||
'00'.decode('hex'))
|
||||
dummy = ConnectionInformation(7, bytearray.fromhex('0023023a1a2e'), 0, True, 0xc,
|
||||
bytearray.fromhex('e98a5eaaff39ecb5ce4447590dfb73a4'), 16,
|
||||
bytearray.fromhex('dbea2d9c47bc1aa6afe664ff31591aa6'), -87,
|
||||
bytearray.fromhex('0a00c821ffff8ffa'), bytearray.fromhex('9bff598701000000'),
|
||||
bytearray.fromhex('00'))
|
||||
|
||||
trace = os.path.join(os.path.dirname(os.path.abspath(__file__)),
|
||||
'traces/adbcore/dictionary_tests/info_conn_7.trace')
|
||||
|
||||
@@ -1,3 +1,4 @@
|
||||
from __future__ import print_function
|
||||
from internalblue.cli import _parse_argv
|
||||
from internalblue.adbcore import ADBCore
|
||||
from internalblue.objects.connection_information import ConnectionInformation
|
||||
@@ -12,10 +13,10 @@ except ImportError:
|
||||
|
||||
|
||||
def test_info_conn_9():
|
||||
dummy = ConnectionInformation(9, '000000000000'.decode('hex'), 0, False, 12,
|
||||
'00000000000000000000000000000000'.decode('hex'), 0, '', -87,
|
||||
'0000000000000000'.decode('hex'),
|
||||
'0000000000000000'.decode('hex'), '00'.decode('hex'))
|
||||
dummy = ConnectionInformation(9, bytearray.fromhex('000000000000'), 0, False, 12,
|
||||
bytearray.fromhex('00000000000000000000000000000000'), 0, b'', -87,
|
||||
bytearray.fromhex('0000000000000000'),
|
||||
bytearray.fromhex('0000000000000000'), bytearray.fromhex('00'))
|
||||
|
||||
trace = os.path.join(os.path.dirname(os.path.abspath(__file__)),
|
||||
'traces/adbcore/dictionary_tests/info_conn_9.trace')
|
||||
|
||||
@@ -1,3 +1,4 @@
|
||||
from __future__ import print_function
|
||||
from internalblue.cli import _parse_argv
|
||||
from internalblue.adbcore import ADBCore
|
||||
from internalblue.objects.queue_element import QueueElement
|
||||
|
||||
@@ -1,5 +1,7 @@
|
||||
from __future__ import print_function
|
||||
from __future__ import absolute_import
|
||||
|
||||
from testwrapper import trace_test, get_trace_path_cmd_tuple
|
||||
from .testwrapper import trace_test, get_trace_path_cmd_tuple
|
||||
|
||||
|
||||
import os
|
||||
|
||||
@@ -1,3 +1,4 @@
|
||||
from builtins import object
|
||||
import argparse
|
||||
|
||||
from internalblue.cli import internalblue_cli, _parse_argv
|
||||
@@ -13,7 +14,7 @@ except ImportError:
|
||||
tracedir = os.path.dirname(__file__)
|
||||
|
||||
|
||||
class Fakeargs():
|
||||
class Fakeargs(object):
|
||||
def __init__(self):
|
||||
self.data_directory = None
|
||||
self.verbose = False
|
||||
|
||||
Reference in New Issue
Block a user