12 Commits

Author SHA1 Message Date
Florian Magin d3059b01d8 Fix subtle byte vs int issues ( 0 != '\x00 is True) 2020-02-27 11:18:21 +01:00
Florian Magin a7066170fc Declare startup trace on adbcore as flaky (works when run directly, doesn't work as part of suite) 2020-02-27 11:13:47 +01:00
Davide Toldo 748c713f67 Fix one of the traces in Python 3 2020-02-23 16:58:01 +01:00
Florian Magin 0864e96569 Propagate exceptions to test framework, ensure required pwnlibs version, ensure that traces don't abort early 2020-02-20 15:23:03 +01:00
Davide Toldo c6e39cb18f * Make pytest testcases Python 2 & 3 compatible
* Make InternalBlue start with ADBCore in Python 3
* Make InternalBlue start with HCICore in Python 3
* Substitute var.decode('hex') with bytearray.fromhex(var) in most places I found which works in Python 2 and 3 and performs the same task
* Substitute var.encode('hex') with new byte_to_hex function that works with Python 2 and 3 (in util.py)
* Declare variables explicitly as bytes instead of strings
* Fix small issue in macOS Core (explicit declaration of variable as byte)
2020-02-17 03:02:59 +01:00
Davide Toldo 07c5c4c336 * CLI now starts up (at least on mac) on Python 2 and 3
* Tests run, some throw errors.
2020-02-16 20:07:23 +01:00
Florian Magin 01589f8eee Hack around pwnlibs treating everything as bytes 2020-02-15 16:58:54 +01:00
Florian Magin d9de8f0d83 Some byte fixes in hcicore 2020-02-15 16:58:19 +01:00
Florian Magin 9b8d5b0740 Move requirements to setup.py 2020-02-15 16:51:43 +01:00
Florian Magin e53edb1ec9 Some byte fixes 2020-02-15 16:51:30 +01:00
Florian Magin b72e12b5a6 Fix subtle syntax change for python3 2020-02-15 16:39:59 +01:00
Florian Magin d7b3b8e7a1 First steps of automatic Python3 conversion
Commands in order:
- `futurize ./ -w -n`

- git grep -l "queue\.Empty" | xargs sed -i 's/queue\.Empty/queue2k.Empty/'

- git grep -l "queue\.Queue" | xargs sed -i 's/queue\.Queue/queue2k.Queue/'

- git grep -l "import queue" | xargs sed -i 's/import queue/import queue as queue2k/'

The search and replaces are needed because the local variables named
`queue` break the module that is called `queue` instead of `Queue` in
Python3

The testcases all still pass with python 2 but with python3 break
completely due to pwnlib issues
2020-02-15 15:52:12 +01:00
36 changed files with 233 additions and 154 deletions
+2
View File
@@ -17,9 +17,11 @@ btsnoop.log
# xcode
xcuserdata
*.xcworkspace
macos-framework/IOBluetoothExtended.framework/
# venv
venv
venv3
# pycharm
*.idea
+1
View File
@@ -4,6 +4,7 @@
# Get receive statistics on a Nexus 5 for BLE connection events
from builtins import range
from pwn import *
from internalblue.adbcore import ADBCore
import internalblue.hci as hci
+1
View File
@@ -4,6 +4,7 @@
# Get receive statistics on a Samsung Galaxy S8 for BLE connection events
from builtins import range
from pwn import *
from internalblue.adbcore import ADBCore
import internalblue.hci as hci
+3 -1
View File
@@ -1,6 +1,8 @@
from future import standard_library
standard_library.install_aliases()
try:
from Queue import Queue
from queue import Queue
from typing import List, Optional, Any, TYPE_CHECKING, Tuple, Union, NewType, Callable, Dict
if TYPE_CHECKING:
+9 -5
View File
@@ -1,10 +1,14 @@
#!/usr/bin/env python2
from future import standard_library
standard_library.install_aliases()
from builtins import str
import datetime
import socket
import Queue
import queue as queue2k
import random
from internalblue import hci
from internalblue.utils import bytes_to_hex
from pwn import *
@@ -141,7 +145,7 @@ class ADBCore(InternalBlue):
while(not self.exit_requested and len(record_hdr) < 24):
try:
recv_data = self.s_snoop.recv(24 - len(record_hdr))
log.debug("recvThreadFunc: received bt_snoop data " + recv_data.encode('hex'))
log.debug("recvThreadFunc: received bt_snoop data " + bytes_to_hex(recv_data))
if len(recv_data) == 0:
log.info("recvThreadFunc: bt_snoop socket was closed by remote site. stopping recv thread...")
self.exit_requested = True
@@ -163,7 +167,7 @@ class ADBCore(InternalBlue):
orig_len, inc_len, flags, drops, time64 = struct.unpack( ">IIIIq", record_hdr)
# Read the record data
record_data = b''
record_data = bytearray()
while(not self.exit_requested and len(record_data) < inc_len):
try:
recv_data = self.s_snoop.recv(inc_len - len(record_data))
@@ -171,7 +175,7 @@ class ADBCore(InternalBlue):
log.info("recvThreadFunc: bt_snoop socket was closed by remote site. stopping..")
self.exit_requested = True
break
record_data += recv_data
record_data += bytearray(recv_data)
except socket.timeout:
pass # this is ok. just try again without error
@@ -201,7 +205,7 @@ class ADBCore(InternalBlue):
if filter_function == None or filter_function(record):
try:
queue.put(record, block=False)
except Queue.Full:
except queue.Full:
log.warn("recvThreadFunc: A recv queue is full. dropping packets..")
# Call all callback functions inside registeredHciCallbacks and pass the
+10 -8
View File
@@ -28,6 +28,8 @@
# Software.
from __future__ import print_function
from builtins import str
from pwn import *
import os
import traceback
@@ -70,7 +72,7 @@ def commandLoop(internalblue, init_commands=None):
if cmdstack:
cmdline = cmdstack.pop().strip()
else:
cmdline = term.readline.readline(prompt='> ').strip()
cmdline = term.readline.readline(prompt='> ').strip().decode('utf-8')
cmdword = cmdline.split(' ')[0].split('=')[0]
if(cmdword == ''):
continue
@@ -80,12 +82,11 @@ def commandLoop(internalblue, init_commands=None):
log.warn("Command unknown: " + cmdline)
continue
cmd_instance = matching_cmd(cmdline, internalblue)
if(not cmd_instance.work()):
log.warn("Command failed: " + str(cmd_instance))
except ValueError as e:
log.warn("commandLoop: ValueError: " + str(e))
continue
raise
except KeyboardInterrupt:
if(cmd_instance != None):
cmd_instance.abort_cmd()
@@ -103,7 +104,7 @@ def commandLoop(internalblue, init_commands=None):
internalblue.exit_requested = True # Make sure all threads terminate
log.critical("Uncaught exception (%s). Abort." % str(e))
print(traceback.format_exc())
break
raise
cmd_instance = None
def _parse_argv(argv):
@@ -238,7 +239,7 @@ def internalblue_cli(argv, args=None):
# Restore readline history:
if os.path.exists(reference.data_directory + "/" + HISTFILE):
readline_history = read(reference.data_directory + "/" + HISTFILE)
term.readline.history = readline_history.split('\n')
term.readline.history = readline_history.split(b'\n')
# Connect to device
if not reference.connect():
@@ -253,9 +254,10 @@ def internalblue_cli(argv, args=None):
reference.shutdown()
# Save readline history:
f = open(reference.data_directory + "/" + HISTFILE, "w")
f.write("\n".join(term.readline.history))
f.close()
# TODO: - This causes issues, have to fix ASAP
# f = open(reference.data_directory + "/" + HISTFILE, "w")
# f.write("\n".join(term.readline.history))
# f.close()
# Cleanup
log.info("Goodbye")
+38 -31
View File
@@ -23,6 +23,11 @@
# out of or in connection with the Software or the use or other dealings in the
# Software.
from __future__ import print_function
from builtins import str
from builtins import hex
from builtins import range
from builtins import object
from pwn import *
import os
import sys
@@ -35,7 +40,7 @@ import struct
import time
import select
import json
from internalblue.utils import bytes_to_hex
@@ -81,7 +86,7 @@ def bt_addr_to_str(bt_addr):
# type: (BluetoothAddress) -> str
""" Convert a Bluetooth address (6 bytes) into a human readable format.
"""
return ":".join([b.encode("hex") for b in bt_addr])
return ":".join(format(x, '02x') for x in bytearray(bt_addr))
def parse_bt_addr(bt_addr):
# type: (Any) -> Optional[BluetoothAddress]
@@ -105,7 +110,7 @@ def parse_bt_addr(bt_addr):
return addr
class Cmd:
class Cmd(object):
""" This class is the superclass of a CLI command. Every CLI command
must be defined as subclass of Cmd. The subclass must define the
'keywords' list as member variable. The actual implementation of the
@@ -284,7 +289,8 @@ class CmdLogLevel(Cmd):
log_levels = ['CRITICAL', 'DEBUG', 'ERROR', 'INFO', 'NOTSET', 'WARN', 'WARNING']
for keyword in list(keywords):
keywords.extend(['%s %s' % (keyword, log_level) for log_level in log_levels])
for log_level in log_levels:
keywords.append('%s %s' % (keyword, log_level))
parser = argparse.ArgumentParser(prog=keywords[0],
description=description,
@@ -316,7 +322,7 @@ class CmdMonitor(Cmd):
parser.add_argument("command",
help="One of: start, stop, kill")
class MonitorController:
class MonitorController(object):
instance = None
@staticmethod
@@ -326,7 +332,7 @@ class CmdMonitor(Cmd):
CmdMonitor.MonitorController.instance = CmdMonitor.MonitorController.__MonitorController(internalblue, 0xC9)
return CmdMonitor.MonitorController.instance
class __MonitorController:
class __MonitorController(object):
def __init__(self, internalblue, pcap_data_link_type):
self.internalblue = internalblue
self.running = False
@@ -546,7 +552,7 @@ class CmdDumpMem(Cmd):
bytes_total = sum([s.size() for s in self.internalblue.fw.SECTIONS if s.is_ram])
bytes_done = 0
self.progress_log = log.progress("Downloading RAM sections...")
for section in filter(lambda s: s.is_ram, self.internalblue.fw.SECTIONS):
for section in [s for s in self.internalblue.fw.SECTIONS if s.is_ram]:
filename = args.file + "_" + hex(section.start_addr)
if os.path.exists(filename):
if not (args.overwrite or yesno("Update '%s'?" % filename)):
@@ -599,10 +605,10 @@ class CmdSearchMem(Cmd):
highlight = pattern
if args.hex:
try:
pattern = pattern.decode('hex')
pattern = bytearray.fromhex(pattern)
highlight = pattern
except TypeError as e:
log.warn("Search pattern cannot be converted to hexstring: " + str(e))
log.warn("Search pattern cannot be converted to bytestring: " + str(e))
return False
elif args.address:
pattern = p32(int(pattern, 16))
@@ -651,7 +657,7 @@ class CmdHexdump(Cmd):
if dump == None:
return False
log.hexdump(dump, begin=args.address)
log.hexdump(bytes(dump), begin=args.address)
return True
class CmdTelescope(Cmd):
@@ -766,9 +772,9 @@ class CmdWriteMem(Cmd):
data = ' '.join(args.data)
if args.hex:
try:
data = data.decode('hex')
data = bytearray.fromhex(data)
except TypeError as e:
log.warn("Data string cannot be converted to hexstring: " + str(e))
log.warn("Hex string cannot be converted to bytestring: " + str(e))
return False
elif args.int:
data = p32(auto_int(data))
@@ -951,7 +957,7 @@ class CmdSendHciCmd(Cmd):
log.info("cmdcode needs to be in the range of 0x0000 - 0xffff")
return False
data = ''
data = b''
for data_part in args.data:
if data_part[0:2] == "0x":
data += p32(auto_int(data_part))
@@ -1181,16 +1187,16 @@ class CmdInfo(Cmd):
log.info(" - Remote BT name: %08X" % connection.remote_name_address)
log.info(" - Master of Conn.: %s" % str(connection.master_of_connection))
log.info(" - Conn. Handle: 0x%X" % connection.connection_handle)
log.info(" - Public RAND: %s" % connection.public_rand.encode('hex'))
#log.info(" - PIN: %s" % connection.pin.encode('hex'))
log.info(" - Public RAND: %s" % bytes_to_hex(connection.public_rand))
#log.info(" - PIN: %s" % bytes_to_hex(connection.pin)
#log.info(" - BT addr for key: %s" % bt_addr_to_str(connection.bt_addr_for_key))
log.info(" - Effective Key Len: %d byte (%d bit)" % (connection.effective_key_len, 8*connection["effective_key_len"]))
log.info(" - Link Key: %s" % connection.link_key.encode('hex'))
log.info(" - LMP Features: %s" % connection.extended_lmp_feat.encode('hex'))
log.info(" - Host Supported F: %s" % connection.host_supported_feat.encode('hex'))
log.info(" - Link Key: %s" % bytes_to_hex(connection.link_key))
log.info(" - LMP Features: %s" % bytes_to_hex(connection.extended_lmp_feat))
log.info(" - Host Supported F: %s" % bytes_to_hex(connection.host_supported_feat))
log.info(" - TX Power (dBm): %d" % connection.tx_pwr_lvl_dBm)
log.info(" - Array Index: %s" % connection.id.encode('hex'))
print
log.info(" - Array Index: %s" % bytes_to_hex(connection.id))
print()
return True
def infoDevice(self, args):
@@ -1199,14 +1205,14 @@ class CmdInfo(Cmd):
log.warn(" '%s' not in fw.py. FEATURE NOT SUPPORTED!" % const)
return False
bt_addr = self.readMem(self.internalblue.fw.BD_ADDR, 6)[::-1]
bt_addr_str = ":".join([b.encode("hex") for b in bt_addr])
bt_addr_str = bt_addr_to_str(bt_addr)
device_name = self.readMem(self.internalblue.fw.DEVICE_NAME, 258)
device_name_len = u8(device_name[0])-1
device_name_len = device_name[0]-1
device_name = device_name[2:2+device_name_len]
adb_serial = context.device
log.info("### | Device ###")
log.info(" - Name: %s" % device_name)
log.info(" - Name: %s" % device_name.decode('utf-8'))
log.info(" - ADB Serial: %s" % adb_serial)
log.info(" - Address: %s" % bt_addr_str)
return True
@@ -1228,7 +1234,7 @@ class CmdInfo(Cmd):
code = disasm(table_values[i],vma=table_addresses[i],byte=False,offset=False)
code = code.replace(" ", " ").replace("\n", "; ")
log.info("[%03d] 0x%08X: %s (%s)" % (i, table_addresses[i],
table_values[i].encode('hex'),
bytes_to_hex(table_values[i]),
code))
return True
@@ -1306,7 +1312,7 @@ class CmdInfo(Cmd):
# Print Buffer Details
buffer_size = bloc_for_details["buffer_size"] + 4
for buffer_address, buffer_hdr in bloc_for_details["buffer_headers"].iteritems():
for buffer_address, buffer_hdr in bloc_for_details["buffer_headers"].items():
progress_log.status("Dumping buffers from BLOC[%d]: 0x%06X" % (bloc_for_details["index"], buffer_address))
# Buffer in use!
if buffer_hdr == bloc_for_details["address"]:
@@ -1362,7 +1368,7 @@ class CmdInfo(Cmd):
if args.type in subcommands:
return subcommands[args.type](args.args)
else:
log.warn("Unkown type: %s\nKnown types: %s" % (args.type, subcommands.keys()))
log.warn("Unkown type: %s\nKnown types: %s" % (args.type, list(subcommands.keys())))
return False
@@ -1486,7 +1492,8 @@ class CmdCustom(Cmd):
actions = ['list', 'add', 'run', 'remove']
for keyword in list(keywords):
keywords.extend(['%s %s' % (keyword, action) for action in actions])
for action in actions:
keywords.append('%s %s' % (keyword, action))
parser = argparse.ArgumentParser(prog=keywords[0],
description=description,
@@ -1521,7 +1528,7 @@ class CmdCustom(Cmd):
return True
if args.do == 'list':
custom_cmds= ["\t%s\t\t%s\n" % (k, v) for k, v in sorted(CmdCustom.custom_commands.iteritems())]
custom_cmds= ["\t%s\t\t%s\n" % (k, v) for k, v in sorted(CmdCustom.custom_commands.items())]
log.info("Custom commands:\n%s" % ''.join(custom_cmds))
return True
@@ -1619,15 +1626,15 @@ class CmdReadAfhChannelMap(Cmd):
"""
response = self.internalblue.sendHciCommand(0x1406, p16(handle))
if len(response) < 17 or response[8:] == '\x00'*9:
if len(response) < 17 or response[8:] == b'\x00'*9:
log.info("Connection 0x%04x is not established." % handle)
return False
log.info("Connection Handle: 0x%04x" % handle)
log.info("AFH Enabled: %s" % bool(response[7] != '\x00'))
log.info("AFH Enabled: %s" % bool(response[7] != 0))
channels = ""
for c in response[8:]:
bits = format(ord(c), '08b')
bits = format(c, '08b')
for b in bits:
if b == "1":
channels = channels + " *"
+45 -36
View File
@@ -25,16 +25,26 @@
# out of or in connection with the Software or the use or other dealings in the
# Software.
from __future__ import division
from future import standard_library
standard_library.install_aliases()
from builtins import hex
from builtins import str
from builtins import range
from builtins import object
from past.utils import old_div
from abc import ABCMeta, abstractmethod
from pwn import *
from .fw.fw import Firmware
import datetime
import time
import Queue
import queue as queue2k
from . import hci
from .objects.queue_element import QueueElement
from .objects.connection_information import ConnectionInformation
from future.utils import with_metaclass
from internalblue.utils import bytes_to_hex
try:
from typing import List, Optional, Any, TYPE_CHECKING, Tuple, Union, NewType, Callable
@@ -50,9 +60,7 @@ except:
#import logging
#log = logging.getLogger(__name__)
class InternalBlue:
__metaclass__ = ABCMeta
class InternalBlue(with_metaclass(ABCMeta, object)):
def __init__(self, queue_size=1000, btsnooplog_filename='btsnoop.log', log_level='info', fix_binutils='True', data_directory=".", replay=False):
# type: (int, str, str, bool, str, bool) -> None
context.log_level = log_level
@@ -94,7 +102,7 @@ class InternalBlue:
# firmware (the response is recognized with the help of the filter function).
# Once the response arrived, it puts the response into the response_queue from
# the tuple. See sendH4() and sendHciCommand().
self.sendQueue = Queue.Queue(queue_size) # type: Queue.Queue[Task]
self.sendQueue = queue2k.Queue(queue_size) # type: Queue.Queue[Task]
self.recvThread = None # The thread which is responsible for the HCI snoop socket
self.sendThread = None # The thread which is responsible for the HCI inject socket
@@ -225,7 +233,7 @@ class InternalBlue:
# Wait for 'send task' in send queue
try:
task = self.sendQueue.get(timeout=0.5)
except Queue.Empty:
except queue2k.Empty:
continue
# Extract the components of the task
@@ -277,12 +285,12 @@ class InternalBlue:
# if the caller expects a response: register a queue to receive the response
if queue != None and filter_function != None:
recvQueue = Queue.Queue(1)
recvQueue = queue2k.Queue(1)
self.registerHciRecvQueue(recvQueue, filter_function)
# Send command to the chip using s_inject socket
try:
log.debug("_sendThreadFunc: Send: " + str(out.encode('hex')))
log.debug("_sendThreadFunc: Send: " + bytes_to_hex(out))
self.s_inject.send(out)
except socket.error:
pass
@@ -299,7 +307,7 @@ class InternalBlue:
record = recvQueue.get(timeout=2)
hcipkt = record[0]
data = hcipkt.data
except Queue.Empty:
except queue2k.Empty:
log.warn("_sendThreadFunc: No response from the firmware.")
data = None
self.unregisterHciRecvQueue(recvQueue)
@@ -560,7 +568,7 @@ class InternalBlue:
"""
# send Read_Local_Version_Information
version = self.sendHciCommand(0x1001, '')
version = self.sendHciCommand(0x1001, ''.encode('utf-8'))
if not version or len(version) < 11:
log.warn("""initialize_fimware: Failed to send a HCI command to the Bluetooth driver.
@@ -570,12 +578,12 @@ class InternalBlue:
return False
# Broadcom uses 0x000f as vendor ID, Cypress 0x0131
vendor = (u8(version[9]) << 8) + u8(version[8])
vendor = (version[9] << 8) + version[8]
if vendor != 0xf and vendor != 0x131:
log.critical("Not running on a Broadcom or Cypress chip!")
return False
else:
subversion = (u8(version[11]) << 8) + u8(version[10])
subversion = (version[11] << 8) + version[10]
iOS = False
if self.__class__.__name__ == "iOSCore":
@@ -690,7 +698,7 @@ class InternalBlue:
log.warn("registerHciRecvQueue: no such queue is registered!")
def sendHciCommand(self, opcode, data, timeout=3):
# type: (Opcode, bytes, int) -> Optional[Any]
# type: (Opcode, bytes, int) -> Optional[bytearray]
"""
Send an arbitrary HCI command packet by pushing a send-task into the
sendQueue. This function blocks until the response is received
@@ -702,7 +710,7 @@ class InternalBlue:
# return this instead of the Command Complete Event (which will
# follow later and will be ignored). This should be fixed..
queue = Queue.Queue(1)
queue = queue2k.Queue(1)
# standard HCI command structure
payload = p16(opcode) + p8(len(data)) + data
@@ -732,14 +740,14 @@ class InternalBlue:
timeout=timeout)
ret = queue.get(timeout=timeout)
return ret
except Queue.Empty:
except queue2k.Empty:
log.warn("sendHciCommand: waiting for response timed out!")
# If there was no response because the Trace Replay Hook throw an assert it will be in this attribute.
# Raise this so the main thread doesn't ignore this and it will be caught by any testing framework
if hasattr(self, 'test_failed'):
raise self.test_failed
return None
except Queue.Full:
except queue.Full:
log.warn("sendHciCommand: send queue is full!")
return None
@@ -756,7 +764,7 @@ class InternalBlue:
try:
self.sendQueue.put((h4type, data, None, None), timeout=timeout)
return True
except Queue.Full:
except queue.Full:
log.warn("sendH4: send queue is full!")
return False
@@ -782,7 +790,7 @@ class InternalBlue:
try:
return self.recvQueue.get(timeout=timeout)
except Queue.Empty:
except queue2k.Empty:
return None
def readMem(self, address, length, progress_log=None, bytes_done=0, bytes_total=0):
@@ -805,7 +813,7 @@ class InternalBlue:
read_addr = address # read_addr is the address of the next Read_RAM HCI command
byte_counter = 0 # tracks the number of received bytes
outbuffer = '' # buffer which stores all accumulated data read from the chip
outbuffer = bytearray() # buffer which stores all accumulated data read from the chip
if bytes_total == 0: # If no total bytes where given just use length
bytes_total = length
retry = 3 # Retry on failures
@@ -840,7 +848,7 @@ class InternalBlue:
log.debug("readMem: insufficient bytes returned, retrying...")
continue
status = ord(response[3])
status = response[3]
if status != 0:
# It is not yet reverse engineered what this byte means. For almost
# all memory addresses it will be 0. But for some it will be different,
@@ -865,7 +873,7 @@ class InternalBlue:
byte_counter += len(data)
if(progress_log != None):
msg = "receiving data... %d / %d Bytes (%d%%)" % (bytes_done+byte_counter,
bytes_total, (bytes_done+byte_counter)*100/bytes_total)
bytes_total, old_div((bytes_done+byte_counter)*100,bytes_total))
progress_log.status(msg)
retry = 3 # this round worked, so we re-enable retries
return outbuffer
@@ -907,7 +915,7 @@ class InternalBlue:
log.warn("readMemAligned: address (0x%x) must be 4-byte aligned!" % address)
return None
recvQueue = Queue.Queue(1)
recvQueue = queue2k.Queue(1)
def hciFilterFunction(record):
# type: (Record) -> bool
hcipkt = record[0]
@@ -933,7 +941,7 @@ class InternalBlue:
blocksize = 244
# Customize the assembler snippet with the current read_addr and blocksize
code = asm(self.fw.READ_MEM_ALIGNED_ASM_SNIPPET % (blocksize, read_addr, blocksize/4), vma=self.fw.READ_MEM_ALIGNED_ASM_LOCATION, arch='thumb')
code = asm(self.fw.READ_MEM_ALIGNED_ASM_SNIPPET % (blocksize, read_addr, old_div(blocksize,4)), vma=self.fw.READ_MEM_ALIGNED_ASM_LOCATION, arch='thumb')
# Write snippet to the RAM (TODO: maybe backup and restore content of this area?)
self.writeMem(self.fw.READ_MEM_ALIGNED_ASM_LOCATION, code)
@@ -950,7 +958,7 @@ class InternalBlue:
# wait for the custom HCI event sent by the snippet:
try:
record = recvQueue.get(timeout=1)
except Queue.Empty:
except queue2k.Empty:
log.warn("readMemAligned: No response from assembler snippet.")
return None
@@ -961,7 +969,7 @@ class InternalBlue:
byte_counter += len(data)
if progress_log is not None:
msg = "receiving data... %d / %d Bytes (%d%%)" % (bytes_done+byte_counter,
bytes_total, (bytes_done+byte_counter)*100/bytes_total)
bytes_total, old_div((bytes_done+byte_counter)*100,bytes_total))
progress_log.status(msg)
self.unregisterHciRecvQueue(recvQueue)
@@ -1000,8 +1008,8 @@ class InternalBlue:
if(response == None):
log.warn("writeMem: Timeout while reading response, probably need to wait longer.")
return False
elif (response[3] != '\x00'):
log.warn("writeMem: Got error code %s in command complete event." % response[3].encode('hex'))
elif (response[3] != 0):
log.warn("writeMem: Got error code %s in command complete event." % bytes_to_hex(response[3]))
return False
write_addr += blocksize
byte_counter += blocksize
@@ -1024,8 +1032,9 @@ class InternalBlue:
log.warn("Empty HCI response during launchRam, driver crashed due to invalid code or destination")
return False
if response[3] != '\x00':
log.warn("Got error code %x in command complete event." % u8(response[3]))
error_code = response[3]
if error_code != 0:
log.warn("Got error code %x in command complete event." % error_code)
return False
# Nexus 6P Bugfix
@@ -1056,10 +1065,10 @@ class InternalBlue:
# On Nexus 5, ReadMemAligned is required, while Nexus 6P supports this memory area with ReadRAM
if self.fw.PATCHRAM_ALIGNED:
slot_dump = self.readMemAligned(self.fw.PATCHRAM_ENABLED_BITMAP_ADDRESS, slot_count/4)
slot_dump = self.readMemAligned(self.fw.PATCHRAM_ENABLED_BITMAP_ADDRESS, old_div(slot_count,4))
table_addr_dump = self.readMemAligned(self.fw.PATCHRAM_TARGET_TABLE_ADDRESS, slot_count*4)
else:
slot_dump = self.readMem(self.fw.PATCHRAM_ENABLED_BITMAP_ADDRESS, slot_count/4)
slot_dump = self.readMem(self.fw.PATCHRAM_ENABLED_BITMAP_ADDRESS, old_div(slot_count,4))
table_addr_dump = self.readMem(self.fw.PATCHRAM_TARGET_TABLE_ADDRESS, slot_count*4)
table_val_dump = self.readMem(self.fw.PATCHRAM_VALUE_TABLE_ADDRESS, slot_count*4)
@@ -1067,11 +1076,11 @@ class InternalBlue:
table_values = []
slot_dwords = []
slot_bits = []
for dword in range(slot_count/32):
for dword in range(old_div(slot_count,32)):
slot_dwords.append(slot_dump[dword*32:(dword+1)*32])
for dword in slot_dwords:
slot_bits.extend(bits(dword[::-1])[::-1])
slot_bits.extend(bits(bytes(dword[::-1]))[::-1])
for i in range(slot_count):
if slot_bits[i]:
table_addresses.append(u32(table_addr_dump[i*4:i*4+4])<<2)
@@ -1157,7 +1166,7 @@ class InternalBlue:
# Enable patchram slot (enable bitfield starts at 0x310204)
# (We need to enable the slot by setting a bit in a multi-dword bitfield)
target_dword = int(slot / 32)
target_dword = int(old_div(slot, 32))
table_slots[slot] = 1
slot_dword = unbits(table_slots[target_dword*32:(target_dword+1)*32][::-1])[::-1]
self.writeMem(self.fw.PATCHRAM_ENABLED_BITMAP_ADDRESS + target_dword*4, slot_dword)
@@ -1196,7 +1205,7 @@ class InternalBlue:
# Disable patchram slot (enable bitfield starts at 0x310204)
# (We need to disable the slot by clearing a bit in a multi-dword bitfield)
target_dword = int(slot / 32)
target_dword = int(old_div(slot, 32))
table_slots[slot] = 0
slot_dword = unbits(table_slots[target_dword*32:(target_dword+1)*32][::-1])[::-1]
self.writeMem(self.fw.PATCHRAM_ENABLED_BITMAP_ADDRESS + target_dword*4, slot_dword)
@@ -1715,7 +1724,7 @@ class InternalBlue:
"""
if not self.serial:
self.sendH4(hci.HCI.BCM_DIAG, '\xf0' + p8(enable))
self.sendH4(hci.HCI.BCM_DIAG, b'\xf0' + b'\x01' if enable else b'\x00')
# We can send the activation to the serial, but then the Android driver
# itself crashes when receiving diagnostic frames...
+4 -2
View File
@@ -23,10 +23,12 @@
# out of or in connection with the Software or the use or other dealings in the
# Software.
from builtins import hex
from builtins import object
from pwn import log
class Firmware:
class Firmware(object):
def __init__(self, version=None, iOS=False):
"""
Load and initialize the actual firmware add-ons for Nexus 5, Raspi3, etc.
@@ -60,7 +62,7 @@ class Firmware:
log.info("Loaded firmware information for " + self.firmware.FW_NAME + ".")
class MemorySection:
class MemorySection(object):
"""
All firmwares have memory sections that can be RAM, ROM or neither of both.
"""
+2 -1
View File
@@ -1,3 +1,4 @@
from __future__ import absolute_import
# fw_0x420e.py
#
# Generic firmware file in case we do not know something...
@@ -20,7 +21,7 @@
# out of or in connection with the Software or the use or other dealings in the
# Software.
from fw import MemorySection
from .fw import MemorySection
# Firmware Infos
# Samsung S10/S10e/S10+
+2 -1
View File
@@ -27,6 +27,7 @@
# Firmware Infos
# This runs on Rasperry Pi 3
from builtins import object
FW_NAME = "BCM43430A1"
# Device Infos
@@ -34,7 +35,7 @@ DEVICE_NAME = 0x20401C
BD_ADDR = 0x201C64
# Memory Sections
class MemorySection:
class MemorySection(object):
def __init__(self, start_addr, end_addr, is_rom, is_ram):
self.start_addr = start_addr
self.end_addr = end_addr
+2 -1
View File
@@ -1,3 +1,4 @@
from __future__ import absolute_import
# fw_0x420e.py
#
# Generic firmware file in case we do not know something...
@@ -20,7 +21,7 @@
# out of or in connection with the Software or the use or other dealings in the
# Software.
from fw import MemorySection
from .fw import MemorySection
# Firmware Infos
# Evaluation Kit CYW20706
+2 -1
View File
@@ -1,3 +1,4 @@
from __future__ import absolute_import
# fw_0x420e.py
#
# Generic firmware file in case we do not know something...
@@ -20,7 +21,7 @@
# out of or in connection with the Software or the use or other dealings in the
# Software.
from fw import MemorySection
from .fw import MemorySection
# Firmware Infos
# Evaluation Kit CYW920819
+2 -1
View File
@@ -23,7 +23,8 @@
# out of or in connection with the Software or the use or other dealings in the
# Software.
from fw import MemorySection
from __future__ import absolute_import
from .fw import MemorySection
# Firmware Infos
FW_NAME = "BCM20702A1 (USB Bluetooth dongle)"
+2 -1
View File
@@ -22,7 +22,8 @@
# out of or in connection with the Software or the use or other dealings in the
# Software.
from fw import MemorySection
from __future__ import absolute_import
from .fw import MemorySection
# Firmware Infos
FW_NAME = "BCM20703A2 (MacBook Pro 2016)"
+2 -1
View File
@@ -25,7 +25,8 @@
# out of or in connection with the Software or the use or other dealings in the
# Software.
from fw import MemorySection
from __future__ import absolute_import
from .fw import MemorySection
# Firmware Infos
# This runs on Nexus 6P, Samsung Galaxy S6, Samsung Galaxy S6 edge
+2 -1
View File
@@ -27,13 +27,14 @@
# Firmware Infos
# This runs on Rasperry Pi 3+
from builtins import object
FW_NAME = "BCM4345C0"
# Device Infos
DEVICE_NAME = 0x204954
# Memory Sections
class MemorySection:
class MemorySection(object):
def __init__(self, start_addr, end_addr, is_rom, is_ram):
self.start_addr = start_addr
self.end_addr = end_addr
+12 -8
View File
@@ -25,6 +25,10 @@
# out of or in connection with the Software or the use or other dealings in the
# Software.
from __future__ import absolute_import
from builtins import hex
from builtins import range
from builtins import object
from pwn import *
HCI_UART_TYPE_CLASS = {}
@@ -52,7 +56,7 @@ class HCI(object):
@staticmethod
def from_data(data):
uart_type = ord(data[0])
uart_type = data[0]
return HCI_UART_TYPE_CLASS[uart_type].from_data(data[1:])
def __init__(self, uart_type):
@@ -573,7 +577,7 @@ class HCI_Cmd(HCI):
0xffed : "COMND VSC_EnterDownloadMode"
}
HCI_CMD_STR_REVERSE = {v: k for k, v in HCI_CMD_STR.iteritems()}
HCI_CMD_STR_REVERSE = {v: k for k, v in HCI_CMD_STR.items()}
@staticmethod
def cmd_name(opcode):
@@ -607,7 +611,7 @@ class HCI_Cmd(HCI):
@staticmethod
def from_data(data):
return HCI_Cmd(u16(data[0:2]), ord(data[2]), data[3:])
return HCI_Cmd(u16(data[0:2]), data[2], data[3:])
def __init__(self, opcode, length, data):
HCI.__init__(self, HCI.HCI_CMD)
@@ -623,7 +627,7 @@ class HCI_Cmd(HCI):
cmdname = "unknown"
if self.opcode in self.HCI_CMD_STR:
cmdname = self.HCI_CMD_STR[self.opcode]
return parent + "<0x%04x %s (len=%d): %s>" % (self.opcode, cmdname, self.length, self.data[0:16].encode('hex'))
return parent + "<0x%04x %s (len=%d): %s>" % (self.opcode, cmdname, self.length, ''.join(format(x, '02x') for x in self.data[0:16]))
class HCI_Acl(HCI):
@@ -698,7 +702,7 @@ class HCI_Diag(HCI):
cmdname = "unknown"
if self.opcode in self.BCM_DIAG_STR:
cmdname = self.BCM_DIAG_STR[self.opcode]
return parent + "<0x%02x %s: %s>" % (self.opcode, cmdname, self.data[0:16].encode('hex'))
return parent + "<0x%02x %s: %s>" % (self.opcode, cmdname, ''.join(format(x, '02x') for x in self.data[0:16]))
class HCI_Event(HCI):
@@ -887,7 +891,7 @@ class HCI_Event(HCI):
@staticmethod
def from_data(data):
return HCI_Event(ord(data[0]), ord(data[1]), data[2:])
return HCI_Event(data[0], data[1], data[2:])
def __init__(self, event_code, length, data):
HCI.__init__(self, HCI.HCI_EVT)
@@ -903,7 +907,7 @@ class HCI_Event(HCI):
eventname = "unknown"
if self.event_code in self.HCI_EVENT_STR:
eventname = self.HCI_EVENT_STR[self.event_code]
return parent + "<0x%02x %s (len=%d): %s>" % (self.event_code, eventname, self.length, self.data[0:].encode('hex'))
return parent + "<0x%02x %s (len=%d): %s>" % (self.event_code, eventname, self.length, ''.join(format(x, '02x') for x in self.data[0:]))
HCI_UART_TYPE_CLASS = {
HCI.HCI_CMD : HCI_Cmd,
@@ -917,7 +921,7 @@ def parse_hci_packet(data):
return HCI.from_data(data)
class StackDumpReceiver:
class StackDumpReceiver(object):
memdump_addr = None
memdumps = {}
stack_dump_has_happend = False
+17 -10
View File
@@ -1,12 +1,18 @@
#!/usr/bin/env python2
from __future__ import absolute_import
from future import standard_library
standard_library.install_aliases()
from builtins import str
from builtins import zip
from builtins import range
import subprocess
import datetime
from pwn import *
import fcntl
from .core import InternalBlue
import hci
import Queue
from . import hci
import queue as queue2k
import threading
try:
@@ -65,7 +71,7 @@ class HCICore(InternalBlue):
# Do ioctl(s,HCIGETDEVLIST,arg) to get the number of available devices:
# arg is struct hci_dev_list_req (/usr/include/bluetooth/hci.h)
arg = p32(16) # dl->dev_num = HCI_MAX_DEV which is 16 (little endian)
arg += "\x00"*(8*16)
arg += b"\x00"*(8*16)
devices_raw = fcntl.ioctl(s.fileno(), HCIGETDEVLIST, arg)
num_devices = u16(devices_raw[:2])
log.debug("Found %d HCI devices via ioctl(HCIGETDEVLIST)!" % num_devices)
@@ -76,10 +82,10 @@ class HCICore(InternalBlue):
dev_id = u16(devices_raw[dev_struct_start:dev_struct_start+2])
# arg is struct hci_dev_info (/usr/include/bluetooth/hci.h)
arg = p16(dev_id) # di->dev_id = <device_id>
arg += "\x00"*20 # Enough space for name, bdaddr and flags
dev_info_raw = fcntl.ioctl(s.fileno(), HCIGETDEVINFO, arg)
dev_name = dev_info_raw[2:10].replace("\x00","")
dev_bdaddr = ":".join(["%02X" % ord(x) for x in dev_info_raw[10:16][::-1]])
arg += b"\x00"*20 # Enough space for name, bdaddr and flags
dev_info_raw = bytearray(fcntl.ioctl(s.fileno(), HCIGETDEVINFO, arg))
dev_name = dev_info_raw[2:10].replace(b"\x00",b"").decode()
dev_bdaddr = ":".join(["%02X" % x for x in dev_info_raw[10:16][::-1]])
dev_flags = u32(dev_info_raw[16:20])
if dev_flags == 0:
dev_flags_str = "DOWN"
@@ -192,6 +198,7 @@ class HCICore(InternalBlue):
# Read the record data
try:
record_data = self.s_snoop.recv(1024)
record_data = bytearray(record_data)
except socket.timeout:
continue # this is ok. just try again without error
except Exception as e:
@@ -227,7 +234,7 @@ class HCICore(InternalBlue):
if filter_function == None or filter_function(record):
try:
queue.put(record, block=False)
except Queue.Full:
except queue.Full:
log.warn("recvThreadFunc: A recv queue is full. dropping packets..")
# Call all callback functions inside registeredHciCallbacks and pass the
@@ -278,7 +285,7 @@ class HCICore(InternalBlue):
"""
# TODO still seems to only forward incoming events?!
self.s_snoop.setsockopt(socket.SOL_HCI, socket.HCI_FILTER,
'\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\x00\x00\x00\x00') #type mask, event mask, event mask, opcode
b'\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\x00\x00\x00\x00') #type mask, event mask, event mask, opcode
interface_num = device["dev_id"]
log.debug("Socket interface number: %s" % (interface_num))
@@ -292,7 +299,7 @@ class HCICore(InternalBlue):
# Write Header to btsnoop file (if file is still empty):
if self.write_btsnooplog and self.btsnooplog_file.tell() == 0:
# BT Snoop Header: btsnoop\x00, version: 1, data link type: 1002
btsnoop_hdr = "btsnoop\x00" + p32(1,endian="big") + p32(1002,endian="big")
btsnoop_hdr = b"btsnoop\x00" + p32(1,endian="big") + p32(1002,endian="big")
with self.btsnooplog_file_lock:
self.btsnooplog_file.write(btsnoop_hdr)
self.btsnooplog_file.flush()
+11 -7
View File
@@ -1,12 +1,16 @@
#!/usr/bin/env python2
from __future__ import absolute_import
from future import standard_library
standard_library.install_aliases()
from builtins import str
import socket
import Queue
import hci
import queue as queue2k
from . import hci
from pwn import *
from core import InternalBlue
from .core import InternalBlue
class iOSCore(InternalBlue):
@@ -49,16 +53,16 @@ class iOSCore(InternalBlue):
the command or None if no response was received within the timeout.
"""
queue = Queue.Queue(1)
queue = queue2k.Queue(1)
try:
self.sendQueue.put((h4type, data, queue, None), timeout=timeout)
ret = queue.get(timeout=timeout)
return ret
except Queue.Empty:
except queue2k.Empty:
log.warn("sendH4: waiting for response timed out!")
return None
except Queue.Full:
except queue.Full:
log.warn("sendH4: send queue is full!")
return None
@@ -173,7 +177,7 @@ class iOSCore(InternalBlue):
for queue, filter_function in self.registeredHciRecvQueues: # TODO filter_function not working with bluez modifications
try:
queue.put(record, block=False)
except Queue.Full:
except queue.Full:
log.warn("recvThreadFunc: A recv queue is full. dropping packets..")
# Call all callback functions inside registeredHciCallbacks and pass the
+17 -11
View File
@@ -1,12 +1,16 @@
#!/usr/bin/env python2
from __future__ import absolute_import
from future import standard_library
standard_library.install_aliases()
from builtins import str
import socket
import Queue
import hci
import queue as queue2k
from . import hci
from pwn import *
from core import InternalBlue
from .core import InternalBlue
import binascii
import os
@@ -89,7 +93,7 @@ class macOSCore(InternalBlue):
# read record data
try:
data, addr = self.s_snoop.recvfrom(1024)
record_data = data
record_data = bytearray(data)
except socket.timeout:
continue # this is ok. just try again without error
@@ -103,7 +107,7 @@ class macOSCore(InternalBlue):
for queue, filter_function in self.registeredHciRecvQueues: # TODO filter_function not working with bluez modifications
try:
queue.put(record, block=False)
except Queue.Full:
except queue.Full:
log.warn("recvThreadFunc: A recv queue is full. dropping packets..>" + record_data)
# Call all callback functions inside registeredHciCallbacks and pass the
@@ -122,7 +126,7 @@ class macOSCore(InternalBlue):
# Wait for 'send task' in send queue
try:
task = self.sendQueue.get(timeout=0.5)
except Queue.Empty:
except queue2k.Empty:
continue
# Extract the components of the task
@@ -133,8 +137,10 @@ class macOSCore(InternalBlue):
# Send command to the chip using IOBluetoothExtended framework
h4type, data, queue, filter_function = task
opcode = binascii.hexlify(data[1]) + binascii.hexlify(data[0])
log.debug("Sending command: 0x" + binascii.hexlify(data) + ", opcode: " + opcode)
data = bytearray(data)
opcode = format(data[1], '02x') + format(data[0], '02x')
log.debug("Sending command: 0x" + ''.join(format(x, '02x') for x in data) + ", opcode: " + opcode)
if not(h4type == 0x01 or h4type == 0x02):
log.warn("H4 Type {0} not supported by macOS Core!".format(str(h4type)))
@@ -144,7 +150,7 @@ class macOSCore(InternalBlue):
# if the caller expects a response: register a queue to receive the response
if queue is not None and filter_function is not None:
recvQueue = Queue.Queue(1)
recvQueue = queue2k.Queue(1)
self.registerHciRecvQueue(recvQueue, filter_function)
# Sending command
@@ -157,7 +163,7 @@ class macOSCore(InternalBlue):
record = recvQueue.get(timeout=10)
hcipkt = record[0]
data = hcipkt.data
except Queue.Empty:
except queue2k.Empty:
log.warn("_sendThreadFunc: No response from the firmware.")
data = None
self.unregisterHciRecvQueue(recvQueue)
@@ -182,5 +188,5 @@ class macOSCore(InternalBlue):
def shutdown(self):
if not self.replay:
self.iobe.shutdown()
self.s_inject.sendto("", ('127.0.0.1', self.s_snoop.getsockname()[1]))
self.s_inject.sendto(b'', ('127.0.0.1', self.s_snoop.getsockname()[1]))
super(macOSCore, self).shutdown()
@@ -1,7 +1,8 @@
from builtins import object
from pwnlib.util.packing import u32, u16, u8
class ConnectionInformation:
class ConnectionInformation(object):
connection_handle = 0
connection_number = 0
master_of_connection = False
+2 -1
View File
@@ -1,4 +1,5 @@
class QueueElement:
from builtins import object
class QueueElement(object):
index = 0
next_item = 0
prev = 0
+7 -3
View File
@@ -1,3 +1,5 @@
from __future__ import print_function
from builtins import object
import binascii
import time
@@ -8,7 +10,7 @@ except ImportError:
pass
class SocketRecvHook():
class SocketRecvHook(object):
def __init__(self, socket):
# type: (socket.socket) -> None
self.snoop_socket = socket
@@ -43,7 +45,7 @@ class SocketRecvHook():
self.recvfrom_hook(data, addr)
return data, addr
class SocketInjectHook():
class SocketInjectHook(object):
def __init__(self, socket, core):
# type: (socket.socket, InternalBlue) -> None
self.inject_socket = socket
@@ -114,7 +116,7 @@ class SocketDuplexHook(SocketInjectHook, SocketRecvHook):
pass
class HookBase():
class HookBase(object):
def send_hook(self, data):
raise NotImplementedError
@@ -254,6 +256,8 @@ class ReplaySocket(SocketDuplexHook):
def getsockname(self):
return (None, 0)
def close(self):
assert self.index + 1 == len(self.log)
from internalblue.core import InternalBlue
+12 -8
View File
@@ -1,12 +1,16 @@
#!/usr/bin/env python2
from __future__ import print_function
from __future__ import absolute_import
from future import standard_library
standard_library.install_aliases()
import socket
import Queue
import hci
import queue as queue2k
from . import hci
from pwn import *
from core import InternalBlue
from .core import InternalBlue
import binascii
filepath = os.path.dirname(os.path.abspath(__file__))
@@ -55,16 +59,16 @@ class testCore(InternalBlue):
the command or None if no response was received within the timeout.
"""
queue = Queue.Queue(1)
queue = queue2k.Queue(1)
try:
self.sendQueue.put((h4type, data, queue, None), timeout=timeout)
ret = queue.get(timeout=timeout)
return ret
except Queue.Empty:
except queue2k.Empty:
log.warn("sendH4: waiting for response timed out!")
return None
except Queue.Full:
except queue.Full:
log.warn("sendH4: send queue is full!")
return None
@@ -105,7 +109,7 @@ class testCore(InternalBlue):
# Wait for 'send task' in send queue
try:
task = self.sendQueue.get(timeout=0.5)
except Queue.Empty:
except queue2k.Empty:
continue
# Extract the components of the task
@@ -121,7 +125,7 @@ class testCore(InternalBlue):
# if the caller expects a response: register a queue to receive the response
if queue is not None and filter_function is not None:
recvQueue = Queue.Queue(1)
recvQueue = queue2k.Queue(1)
self.registerHciRecvQueue(recvQueue, filter_function)
# if the caller expects a response:
+3
View File
@@ -0,0 +1,3 @@
def bytes_to_hex(bytes):
# type: (bytearray) -> str
return ''.join(format(x, '02x') for x in bytearray(bytes))
-2
View File
@@ -1,2 +0,0 @@
pwntools==3.12.2
pyelftools==0.24
+2 -1
View File
@@ -11,7 +11,8 @@ setup(name='internalblue',
license='MIT',
packages=['internalblue', 'internalblue/fw'],
install_requires=[
'pwntools',
'pwntools>=4.2.0.dev0',
'pyelftools',
],
extras_require={
"macoscore": ["pyobjc"],
+1
View File
@@ -1,3 +1,4 @@
from __future__ import print_function
from internalblue.cli import _parse_argv
from internalblue.hcicore import HCICore
+1
View File
@@ -1,3 +1,4 @@
from __future__ import print_function
from internalblue.cli import _parse_argv
from internalblue.adbcore import ADBCore
+6 -5
View File
@@ -1,3 +1,4 @@
from __future__ import print_function
from internalblue.cli import _parse_argv
from internalblue.adbcore import ADBCore
from internalblue.objects.connection_information import ConnectionInformation
@@ -13,11 +14,11 @@ except ImportError:
def test_info_conn_7():
dummy = ConnectionInformation(7, '0023023a1a2e'.decode('hex'), 0, True, 0xc,
'e98a5eaaff39ecb5ce4447590dfb73a4'.decode('hex'), 16,
'dbea2d9c47bc1aa6afe664ff31591aa6'.decode('hex'), -87,
'0a00c821ffff8ffa'.decode('hex'), '9bff598701000000'.decode('hex'),
'00'.decode('hex'))
dummy = ConnectionInformation(7, bytearray.fromhex('0023023a1a2e'), 0, True, 0xc,
bytearray.fromhex('e98a5eaaff39ecb5ce4447590dfb73a4'), 16,
bytearray.fromhex('dbea2d9c47bc1aa6afe664ff31591aa6'), -87,
bytearray.fromhex('0a00c821ffff8ffa'), bytearray.fromhex('9bff598701000000'),
bytearray.fromhex('00'))
trace = os.path.join(os.path.dirname(os.path.abspath(__file__)),
'traces/adbcore/dictionary_tests/info_conn_7.trace')
+5 -4
View File
@@ -1,3 +1,4 @@
from __future__ import print_function
from internalblue.cli import _parse_argv
from internalblue.adbcore import ADBCore
from internalblue.objects.connection_information import ConnectionInformation
@@ -12,10 +13,10 @@ except ImportError:
def test_info_conn_9():
dummy = ConnectionInformation(9, '000000000000'.decode('hex'), 0, False, 12,
'00000000000000000000000000000000'.decode('hex'), 0, '', -87,
'0000000000000000'.decode('hex'),
'0000000000000000'.decode('hex'), '00'.decode('hex'))
dummy = ConnectionInformation(9, bytearray.fromhex('000000000000'), 0, False, 12,
bytearray.fromhex('00000000000000000000000000000000'), 0, b'', -87,
bytearray.fromhex('0000000000000000'),
bytearray.fromhex('0000000000000000'), bytearray.fromhex('00'))
trace = os.path.join(os.path.dirname(os.path.abspath(__file__)),
'traces/adbcore/dictionary_tests/info_conn_9.trace')
+1
View File
@@ -1,3 +1,4 @@
from __future__ import print_function
from internalblue.cli import _parse_argv
from internalblue.adbcore import ADBCore
from internalblue.objects.queue_element import QueueElement
+3 -1
View File
@@ -1,5 +1,7 @@
from __future__ import print_function
from __future__ import absolute_import
from testwrapper import trace_test, get_trace_path_cmd_tuple
from .testwrapper import trace_test, get_trace_path_cmd_tuple
import os
+2 -1
View File
@@ -1,3 +1,4 @@
from builtins import object
import argparse
from internalblue.cli import internalblue_cli, _parse_argv
@@ -13,7 +14,7 @@ except ImportError:
tracedir = os.path.dirname(__file__)
class Fakeargs():
class Fakeargs(object):
def __init__(self):
self.data_directory = None
self.verbose = False