Refactor hci hooking

This commit is contained in:
Florian Magin
2020-01-10 13:16:44 +01:00
parent 0fd8545d93
commit 7b1577fe06
2 changed files with 88 additions and 34 deletions
+1 -1
View File
@@ -142,7 +142,7 @@ def internalblue_cli(argv):
from .socket_hooks import hook
from internalblue import socket_hooks
HookClass = getattr(socket_hooks, args.trace)
hook(HCICore, HookClass())
hook(HCICore, HookClass)
# Initalize cores and get devices
+87 -33
View File
@@ -1,55 +1,73 @@
import binascii
import time
class SocketRecvHook():
def __init__(self, socket, recv_hook):
self.recv_hook = recv_hook
def __init__(self, socket):
# type: (socket.socket) -> None
self.socket = socket
self.replace = False
def recv(self, length):
data = self.socket.recv(length)
def recv_hook(self, data):
raise NotImplementedError()
def recv_replace(self, length, **kwargs):
raise NotImplementedError()
def recv(self, length, **kwargs):
if not self.replace:
data = self.socket.recv(length, **kwargs)
else:
data = self.recv_replace(length, **kwargs)
self.recv_hook(data)
return data
def recvfrom(self, length):
# type: (int) -> Tuple[bytes, Any]
data, addr = self.socket.recvfrom(length)
self.recv_hook(data)
return data
if not self.replace:
data, addr = self.socket.recvfrom(length)
self.recv_hook(data)
return data
class SocketInjectHook():
def __init__(self, socket, send_hook):
self.send_hook = send_hook
def __init__(self, socket):
# type: (socket.socket) -> None
self.socket = socket
self.replace = False
def close(self):
self.socket.close()
def send(self,data):
self.send_hook(data)
self.socket.send(data)
if not self.replace:
try:
self.socket.send(data)
except Exception as e:
self.send_exception(e)
raise e
else:
self.send_replace(data)
def send_hook(self,result):
raise NotImplementedError()
def send_replace(self,data):
raise NotImplementedError()
def send_exception(self, e):
raise NotImplementedError()
class SocketDuplexHook(SocketInjectHook, SocketRecvHook):
def __init__(self, socket, send_hook, recv_hook):
self.recv_hook = recv_hook
self.send_hook = send_hook
def __init__(self, socket):
# type: (socket.socket) -> None
self.socket = socket
self.replace = False
pass
def wrap_socket_setup(orig_func, send_hook=None, recv_hook=None):
def wrapped_socket_setup(self, *args, **kwargs):
status = orig_func(self, *args, **kwargs)
self.s_inject = SocketDuplexHook(self.s_inject, send_hook, recv_hook)
self.s_snoop = SocketDuplexHook(self.s_snoop, send_hook, recv_hook)
return status
return wrapped_socket_setup
class HookBase():
def send_hook(self,data):
@@ -58,33 +76,53 @@ class HookBase():
raise NotImplementedError
class TraceToFileHook(HookBase):
def __init__(self,filename='/tmp/bt_hci.log'):
self.file = open(filename, 'w')
class TraceToFileHook(SocketDuplexHook):
def __init__(self, socket, filename='/tmp/bt_hci.log'):
SocketDuplexHook.__init__(self, socket)
self.file = open(filename, 'a')
self.replace = False
self.log = []
def recv_hook(self, data):
line = "RX {}\n".format(binascii.hexlify(data))
self.file.writelines([line])
print(line)
self.log.append(line)
def send_hook(self, data):
line = "TX {}\n".format(binascii.hexlify(data))
self.file.writelines([line])
print(line)
self.log.append(line)
def send_exception(self, e):
line = "EX '{}'\n".format(e)
print(line)
self.log.append(line)
def close(self):
self.socket.close()
self.log.append("Socket closed\n")
self.file.writelines(self.log)
self.file.close()
import socket
class PrintTrace(socket.socket):
class PrintTrace(SocketDuplexHook):
def send_hook(self, data):
def send_hook(self, data, **kwargs):
print("Sent: {}".format(binascii.hexlify(data)))
def recv_hook(self, data):
def recv_hook(self, data, **kwargs):
print("Recv: {}".format(binascii.hexlify(data)))
def recvfrom_hook(self, data, **kwargs):
print("Recv: {}".format(binascii.hexlify(data)))
class ReplaySocket(socket.socket):
class ReplaySocket():
def __init__(self, filename='/tmp/bt.log'):
super(ReplaySocket, self).__init__()
self.log = open(filename).readlines()
@@ -99,6 +137,7 @@ class ReplaySocket(socket.socket):
self.index+=1
def recv(self, **kwargs):
time.sleep(0.01)
direction, encoded_data = self.log[self.index].split(" ")
if direction == "RX":
return binascii.unhexlify(encoded_data.rstrip('\n'))
@@ -115,7 +154,22 @@ try:
except ImportError:
pass
def hook(core, hook):
# type: (Type[InternalBlue], HookBase) -> None
core._setupSockets = wrap_socket_setup(core._setupSockets, hook.send_hook, hook.recv_hook)
def hook(core, socket_hook):
# type: (Type[InternalBlue], Type[SocketDuplexHook]) -> None
def wrap_socket_setup(orig_func):
def wrapped_socket_setup(self, *args, **kwargs):
status = orig_func(self, *args, **kwargs)
if self.s_inject == self.s_snoop:
h = socket_hook(self.s_inject)
self.s_inject = h
self.s_snoop = h
else:
self.s_inject = socket_hook(self.s_inject)
self.s_snoop = socket_hook(self.s_snoop)
return status
return wrapped_socket_setup
core._setupSockets = wrap_socket_setup(core._setupSockets)