3 Commits

Author SHA1 Message Date
Florian Magin 7b1577fe06 Refactor hci hooking 2020-01-10 13:16:44 +01:00
Florian Magin 0fd8545d93 Fix logging in core.py and print exceptions in _sendThreadFunc in the log 2020-01-10 13:14:46 +01:00
Florian Magin f8afcaad84 Partial refactoring of hooking 2020-01-10 10:38:53 +01:00
3 changed files with 156 additions and 41 deletions
+5 -4
View File
@@ -111,7 +111,7 @@ def internalblue_cli(argv):
parser.add_argument("--ios-device", "-i", help="Tell internalblue to connect to a remote iPhone HCI socket. Specify socket IP address and port (i.e., 172.20.10.1:1234).")
parser.add_argument("--serialsu", "-s", help="On ADB, directly try su/serial/busybox scripting, if you do not have a special bluetooth.default.so file.", action="store_true")
parser.add_argument("--testdevice", "-t", help="Use a dummy test device to execute testcases", action="store_true")
parser.add_argument("--trace", help="Trace hci connection", action="store_true")
parser.add_argument("--trace", help="Trace hci connection")
parser.add_argument("--device", help="Specify device/core to be used")
parser.add_argument("--commands", "-c", help="CLI command to run before prompting, seperated by ';' (used for easier testing)")
args = parser.parse_args(argv)
@@ -139,9 +139,10 @@ def internalblue_cli(argv):
if args.trace:
from .socket_hooks import wrap_socket_setup
ADBCore._setupSockets = wrap_socket_setup(ADBCore._setupSockets)
HCICore._setupSockets = wrap_socket_setup(HCICore._setupSockets)
from .socket_hooks import hook
from internalblue import socket_hooks
HookClass = getattr(socket_hooks, args.trace)
hook(HCICore, HookClass)
# Initalize cores and get devices
+6 -6
View File
@@ -45,8 +45,8 @@ try:
except:
pass
import logging
log = logging.getLogger(__name__)
#import logging
#log = logging.getLogger(__name__)
class InternalBlue:
__metaclass__ = ABCMeta
@@ -62,8 +62,8 @@ class InternalBlue:
self.data_directory = data_directory
self.s_inject = None # This is the TCP socket to the HCI inject port
self.s_snoop = None # This is the TCP socket to the HCI snoop port
self.s_inject = None #type: socket.socket # This is the TCP socket to the HCI inject port
self.s_snoop = None #type: socket.socket # This is the TCP socket to the HCI snoop port
# If btsnooplog_filename is set, write all incomming HCI packets to a file (can be viewed in wireshark for debugging)
if btsnooplog_filename is not None:
@@ -279,8 +279,8 @@ class InternalBlue:
try:
log.debug("_sendThreadFunc: Send: " + str(out.encode('hex')))
self.s_inject.send(out)
except:
log.warn("_sendThreadFunc: Sending to socket failed, reestablishing connection.\nWith HCI sockets, some HCI commands require root!")
except Exception as e:
log.warn("_sendThreadFunc: Sending to socket failed with {}, reestablishing connection.\nWith HCI sockets, some HCI commands require root!".format(e))
# socket are terminated by hcicore..
self._teardownSockets()
self._setupSockets()
+145 -31
View File
@@ -1,61 +1,175 @@
import binascii
import time
class SocketRecvHook():
def __init__(self, socket, recv_hook):
self.recv_hook = recv_hook
def __init__(self, socket):
# type: (socket.socket) -> None
self.socket = socket
self.replace = False
def recv(self, length):
data = self.socket.recv(length)
def recv_hook(self, data):
raise NotImplementedError()
def recv_replace(self, length, **kwargs):
raise NotImplementedError()
def recv(self, length, **kwargs):
if not self.replace:
data = self.socket.recv(length, **kwargs)
else:
data = self.recv_replace(length, **kwargs)
self.recv_hook(data)
return data
def recvfrom(self, length):
# type: (int) -> Tuple[bytes, Any]
data, addr = self.socket.recvfrom(length)
self.recv_hook(data)
return data
if not self.replace:
data, addr = self.socket.recvfrom(length)
self.recv_hook(data)
return data
class SocketInjectHook():
def __init__(self, socket, send_hook):
self.send_hook = send_hook
def __init__(self, socket):
# type: (socket.socket) -> None
self.socket = socket
self.replace = False
def close(self):
self.socket.close()
def send(self,data):
self.send_hook(data)
self.socket.send(data)
if not self.replace:
try:
self.socket.send(data)
except Exception as e:
self.send_exception(e)
raise e
else:
self.send_replace(data)
def send_hook(self,result):
raise NotImplementedError()
def send_replace(self,data):
raise NotImplementedError()
import binascii
def send_print_hook(data):
print("Sent: ", binascii.hexlify(data))
def send_exception(self, e):
raise NotImplementedError()
def recv_print_hook(data):
print("Recv: ", binascii.hexlify(data))
class SocketDuplexHook(SocketInjectHook, SocketRecvHook):
def __init__(self, socket, send_hook=send_print_hook, recv_hook=recv_print_hook):
self.recv_hook = recv_hook
self.send_hook = send_hook
def __init__(self, socket):
# type: (socket.socket) -> None
self.socket = socket
self.replace = False
pass
def wrap_socket_setup(orig_func, send_hook=None, recv_hook=None):
def wrapped_socket_setup(self, *args, **kwargs):
status = orig_func(self, *args, **kwargs)
self.s_inject = SocketDuplexHook(self.s_inject)
self.s_snoop = SocketDuplexHook(self.s_snoop)
return status
return wrapped_socket_setup
class HookBase():
def send_hook(self,data):
raise NotImplementedError
def recv_hook(self, data):
raise NotImplementedError
class TraceToFileHook(SocketDuplexHook):
def __init__(self, socket, filename='/tmp/bt_hci.log'):
SocketDuplexHook.__init__(self, socket)
self.file = open(filename, 'a')
self.replace = False
self.log = []
def recv_hook(self, data):
line = "RX {}\n".format(binascii.hexlify(data))
print(line)
self.log.append(line)
def send_hook(self, data):
line = "TX {}\n".format(binascii.hexlify(data))
print(line)
self.log.append(line)
def send_exception(self, e):
line = "EX '{}'\n".format(e)
print(line)
self.log.append(line)
def close(self):
self.socket.close()
self.log.append("Socket closed\n")
self.file.writelines(self.log)
self.file.close()
import socket
class PrintTrace(SocketDuplexHook):
def send_hook(self, data, **kwargs):
print("Sent: {}".format(binascii.hexlify(data)))
def recv_hook(self, data, **kwargs):
print("Recv: {}".format(binascii.hexlify(data)))
def recvfrom_hook(self, data, **kwargs):
print("Recv: {}".format(binascii.hexlify(data)))
class ReplaySocket():
def __init__(self, filename='/tmp/bt.log'):
super(ReplaySocket, self).__init__()
self.log = open(filename).readlines()
self.index = 0
def send(self, data, **kwargs):
encoded_data = "" # type: str
direction, encoded_data = self.log[self.index].split(" ")
assert(direction == "TX")
log_data = binascii.unhexlify(encoded_data.rstrip('\n'))
assert(data == log_data)
self.index+=1
def recv(self, **kwargs):
time.sleep(0.01)
direction, encoded_data = self.log[self.index].split(" ")
if direction == "RX":
return binascii.unhexlify(encoded_data.rstrip('\n'))
else:
raise socket.timeout()
from internalblue.core import InternalBlue
try:
import typing
from typing import Type
except ImportError:
pass
def hook(core, socket_hook):
# type: (Type[InternalBlue], Type[SocketDuplexHook]) -> None
def wrap_socket_setup(orig_func):
def wrapped_socket_setup(self, *args, **kwargs):
status = orig_func(self, *args, **kwargs)
if self.s_inject == self.s_snoop:
h = socket_hook(self.s_inject)
self.s_inject = h
self.s_snoop = h
else:
self.s_inject = socket_hook(self.s_inject)
self.s_snoop = socket_hook(self.s_snoop)
return status
return wrapped_socket_setup
core._setupSockets = wrap_socket_setup(core._setupSockets)