Compare commits
3 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| 7b1577fe06 | |||
| 0fd8545d93 | |||
| f8afcaad84 |
+5
-4
@@ -111,7 +111,7 @@ def internalblue_cli(argv):
|
||||
parser.add_argument("--ios-device", "-i", help="Tell internalblue to connect to a remote iPhone HCI socket. Specify socket IP address and port (i.e., 172.20.10.1:1234).")
|
||||
parser.add_argument("--serialsu", "-s", help="On ADB, directly try su/serial/busybox scripting, if you do not have a special bluetooth.default.so file.", action="store_true")
|
||||
parser.add_argument("--testdevice", "-t", help="Use a dummy test device to execute testcases", action="store_true")
|
||||
parser.add_argument("--trace", help="Trace hci connection", action="store_true")
|
||||
parser.add_argument("--trace", help="Trace hci connection")
|
||||
parser.add_argument("--device", help="Specify device/core to be used")
|
||||
parser.add_argument("--commands", "-c", help="CLI command to run before prompting, seperated by ';' (used for easier testing)")
|
||||
args = parser.parse_args(argv)
|
||||
@@ -139,9 +139,10 @@ def internalblue_cli(argv):
|
||||
|
||||
|
||||
if args.trace:
|
||||
from .socket_hooks import wrap_socket_setup
|
||||
ADBCore._setupSockets = wrap_socket_setup(ADBCore._setupSockets)
|
||||
HCICore._setupSockets = wrap_socket_setup(HCICore._setupSockets)
|
||||
from .socket_hooks import hook
|
||||
from internalblue import socket_hooks
|
||||
HookClass = getattr(socket_hooks, args.trace)
|
||||
hook(HCICore, HookClass)
|
||||
|
||||
|
||||
# Initalize cores and get devices
|
||||
|
||||
@@ -45,8 +45,8 @@ try:
|
||||
except:
|
||||
pass
|
||||
|
||||
import logging
|
||||
log = logging.getLogger(__name__)
|
||||
#import logging
|
||||
#log = logging.getLogger(__name__)
|
||||
|
||||
class InternalBlue:
|
||||
__metaclass__ = ABCMeta
|
||||
@@ -62,8 +62,8 @@ class InternalBlue:
|
||||
|
||||
|
||||
self.data_directory = data_directory
|
||||
self.s_inject = None # This is the TCP socket to the HCI inject port
|
||||
self.s_snoop = None # This is the TCP socket to the HCI snoop port
|
||||
self.s_inject = None #type: socket.socket # This is the TCP socket to the HCI inject port
|
||||
self.s_snoop = None #type: socket.socket # This is the TCP socket to the HCI snoop port
|
||||
|
||||
# If btsnooplog_filename is set, write all incomming HCI packets to a file (can be viewed in wireshark for debugging)
|
||||
if btsnooplog_filename is not None:
|
||||
@@ -279,8 +279,8 @@ class InternalBlue:
|
||||
try:
|
||||
log.debug("_sendThreadFunc: Send: " + str(out.encode('hex')))
|
||||
self.s_inject.send(out)
|
||||
except:
|
||||
log.warn("_sendThreadFunc: Sending to socket failed, reestablishing connection.\nWith HCI sockets, some HCI commands require root!")
|
||||
except Exception as e:
|
||||
log.warn("_sendThreadFunc: Sending to socket failed with {}, reestablishing connection.\nWith HCI sockets, some HCI commands require root!".format(e))
|
||||
# socket are terminated by hcicore..
|
||||
self._teardownSockets()
|
||||
self._setupSockets()
|
||||
|
||||
+145
-31
@@ -1,61 +1,175 @@
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
import binascii
|
||||
import time
|
||||
|
||||
|
||||
class SocketRecvHook():
|
||||
def __init__(self, socket, recv_hook):
|
||||
self.recv_hook = recv_hook
|
||||
def __init__(self, socket):
|
||||
# type: (socket.socket) -> None
|
||||
self.socket = socket
|
||||
self.replace = False
|
||||
|
||||
def recv(self, length):
|
||||
data = self.socket.recv(length)
|
||||
def recv_hook(self, data):
|
||||
raise NotImplementedError()
|
||||
|
||||
def recv_replace(self, length, **kwargs):
|
||||
raise NotImplementedError()
|
||||
|
||||
def recv(self, length, **kwargs):
|
||||
if not self.replace:
|
||||
data = self.socket.recv(length, **kwargs)
|
||||
else:
|
||||
data = self.recv_replace(length, **kwargs)
|
||||
self.recv_hook(data)
|
||||
return data
|
||||
|
||||
|
||||
def recvfrom(self, length):
|
||||
# type: (int) -> Tuple[bytes, Any]
|
||||
data, addr = self.socket.recvfrom(length)
|
||||
self.recv_hook(data)
|
||||
return data
|
||||
if not self.replace:
|
||||
data, addr = self.socket.recvfrom(length)
|
||||
self.recv_hook(data)
|
||||
return data
|
||||
|
||||
class SocketInjectHook():
|
||||
def __init__(self, socket, send_hook):
|
||||
self.send_hook = send_hook
|
||||
def __init__(self, socket):
|
||||
# type: (socket.socket) -> None
|
||||
self.socket = socket
|
||||
self.replace = False
|
||||
|
||||
def close(self):
|
||||
self.socket.close()
|
||||
|
||||
def send(self,data):
|
||||
self.send_hook(data)
|
||||
self.socket.send(data)
|
||||
if not self.replace:
|
||||
try:
|
||||
self.socket.send(data)
|
||||
except Exception as e:
|
||||
self.send_exception(e)
|
||||
raise e
|
||||
else:
|
||||
self.send_replace(data)
|
||||
|
||||
def send_hook(self,result):
|
||||
raise NotImplementedError()
|
||||
|
||||
def send_replace(self,data):
|
||||
raise NotImplementedError()
|
||||
|
||||
import binascii
|
||||
def send_print_hook(data):
|
||||
print("Sent: ", binascii.hexlify(data))
|
||||
def send_exception(self, e):
|
||||
raise NotImplementedError()
|
||||
|
||||
def recv_print_hook(data):
|
||||
print("Recv: ", binascii.hexlify(data))
|
||||
|
||||
class SocketDuplexHook(SocketInjectHook, SocketRecvHook):
|
||||
|
||||
def __init__(self, socket, send_hook=send_print_hook, recv_hook=recv_print_hook):
|
||||
self.recv_hook = recv_hook
|
||||
self.send_hook = send_hook
|
||||
def __init__(self, socket):
|
||||
# type: (socket.socket) -> None
|
||||
self.socket = socket
|
||||
self.replace = False
|
||||
pass
|
||||
|
||||
def wrap_socket_setup(orig_func, send_hook=None, recv_hook=None):
|
||||
def wrapped_socket_setup(self, *args, **kwargs):
|
||||
status = orig_func(self, *args, **kwargs)
|
||||
self.s_inject = SocketDuplexHook(self.s_inject)
|
||||
self.s_snoop = SocketDuplexHook(self.s_snoop)
|
||||
return status
|
||||
|
||||
return wrapped_socket_setup
|
||||
|
||||
class HookBase():
|
||||
def send_hook(self,data):
|
||||
raise NotImplementedError
|
||||
def recv_hook(self, data):
|
||||
raise NotImplementedError
|
||||
|
||||
|
||||
class TraceToFileHook(SocketDuplexHook):
|
||||
def __init__(self, socket, filename='/tmp/bt_hci.log'):
|
||||
SocketDuplexHook.__init__(self, socket)
|
||||
self.file = open(filename, 'a')
|
||||
self.replace = False
|
||||
self.log = []
|
||||
|
||||
def recv_hook(self, data):
|
||||
line = "RX {}\n".format(binascii.hexlify(data))
|
||||
print(line)
|
||||
self.log.append(line)
|
||||
|
||||
def send_hook(self, data):
|
||||
line = "TX {}\n".format(binascii.hexlify(data))
|
||||
print(line)
|
||||
self.log.append(line)
|
||||
|
||||
def send_exception(self, e):
|
||||
line = "EX '{}'\n".format(e)
|
||||
print(line)
|
||||
self.log.append(line)
|
||||
|
||||
def close(self):
|
||||
self.socket.close()
|
||||
self.log.append("Socket closed\n")
|
||||
self.file.writelines(self.log)
|
||||
self.file.close()
|
||||
|
||||
|
||||
import socket
|
||||
|
||||
|
||||
class PrintTrace(SocketDuplexHook):
|
||||
|
||||
def send_hook(self, data, **kwargs):
|
||||
print("Sent: {}".format(binascii.hexlify(data)))
|
||||
|
||||
def recv_hook(self, data, **kwargs):
|
||||
print("Recv: {}".format(binascii.hexlify(data)))
|
||||
|
||||
def recvfrom_hook(self, data, **kwargs):
|
||||
print("Recv: {}".format(binascii.hexlify(data)))
|
||||
|
||||
|
||||
|
||||
|
||||
class ReplaySocket():
|
||||
def __init__(self, filename='/tmp/bt.log'):
|
||||
super(ReplaySocket, self).__init__()
|
||||
self.log = open(filename).readlines()
|
||||
self.index = 0
|
||||
|
||||
def send(self, data, **kwargs):
|
||||
encoded_data = "" # type: str
|
||||
direction, encoded_data = self.log[self.index].split(" ")
|
||||
assert(direction == "TX")
|
||||
log_data = binascii.unhexlify(encoded_data.rstrip('\n'))
|
||||
assert(data == log_data)
|
||||
self.index+=1
|
||||
|
||||
def recv(self, **kwargs):
|
||||
time.sleep(0.01)
|
||||
direction, encoded_data = self.log[self.index].split(" ")
|
||||
if direction == "RX":
|
||||
return binascii.unhexlify(encoded_data.rstrip('\n'))
|
||||
else:
|
||||
raise socket.timeout()
|
||||
|
||||
|
||||
|
||||
from internalblue.core import InternalBlue
|
||||
|
||||
try:
|
||||
import typing
|
||||
from typing import Type
|
||||
except ImportError:
|
||||
pass
|
||||
|
||||
def hook(core, socket_hook):
|
||||
# type: (Type[InternalBlue], Type[SocketDuplexHook]) -> None
|
||||
|
||||
def wrap_socket_setup(orig_func):
|
||||
def wrapped_socket_setup(self, *args, **kwargs):
|
||||
status = orig_func(self, *args, **kwargs)
|
||||
if self.s_inject == self.s_snoop:
|
||||
h = socket_hook(self.s_inject)
|
||||
self.s_inject = h
|
||||
self.s_snoop = h
|
||||
else:
|
||||
self.s_inject = socket_hook(self.s_inject)
|
||||
self.s_snoop = socket_hook(self.s_snoop)
|
||||
return status
|
||||
|
||||
return wrapped_socket_setup
|
||||
|
||||
core._setupSockets = wrap_socket_setup(core._setupSockets)
|
||||
|
||||
|
||||
Reference in New Issue
Block a user