Improve capture performance with PCAP dump

PCAP dump is now performed into a separate thread. This greatly reduces
the chance for packet loss in root mode as well as preventing latency
spikes in VPN mode.
This commit is contained in:
emanuele-f
2022-01-28 18:00:40 +01:00
parent a2d795152f
commit 672e810bec
4 changed files with 97 additions and 43 deletions
@@ -99,7 +99,9 @@ public class CaptureService extends VpnService implements Runnable {
private Thread mCaptureThread;
private Thread mBlacklistsUpdateThread;
private Thread mConnUpdateThread;
private Thread mDumperThread;
private final LinkedBlockingDeque<Pair<ConnectionDescriptor[], ConnectionUpdate[]>> mPendingUpdates = new LinkedBlockingDeque<>(32);
private LinkedBlockingDeque<byte[]> mDumpQueue;
private String vpn_ipv4;
private String vpn_dns;
private String dns_server;
@@ -273,6 +275,9 @@ public class CaptureService extends VpnService implements Runnable {
}
if(mDumper != null) {
// Max memory usage = (JAVA_PCAP_BUFFER_SIZE * 64) = 32 MB
mDumpQueue = new LinkedBlockingDeque<>(64);
try {
mDumper.startDumper();
} catch (IOException | SecurityException e) {
@@ -358,6 +363,11 @@ public class CaptureService extends VpnService implements Runnable {
mConnUpdateThread = new Thread(this::connUpdateWork, "UpdateListener");
mConnUpdateThread.start();
if(mDumper != null) {
mDumperThread = new Thread(this::dumpWork, "DumperThread");
mDumperThread.start();
}
// Start the native capture thread
mQueueFull = false;
mCaptureThread = new Thread(this, "PacketCapture");
@@ -395,6 +405,7 @@ public class CaptureService extends VpnService implements Runnable {
e.printStackTrace();
}
mDumper = null;
mDumpQueue.clear();
}
appsResolver = null;
@@ -571,7 +582,10 @@ public class CaptureService extends VpnService implements Runnable {
private void stop() {
stopPacketLoop();
mPendingUpdates.offer(new Pair<>(null, null)); // signal termination to the mConnUpdateThread
// signal termination
mPendingUpdates.offer(new Pair<>(null, null));
mDumpQueue.offer(new byte[0]);
while((mCaptureThread != null) && (mCaptureThread.isAlive())) {
try {
@@ -594,6 +608,18 @@ public class CaptureService extends VpnService implements Runnable {
}
mConnUpdateThread = null;
while((mDumperThread != null) && (mDumperThread.isAlive())) {
try {
Log.d(TAG, "Joining dumper thread...");
mDumperThread.join();
} catch (InterruptedException e) {
Log.e(TAG, "Joining dumper thread failed");
mDumpQueue.offer(new byte[0]);
}
}
mDumperThread = null;
mDumper = null;
if(mParcelFileDescriptor != null) {
try {
mParcelFileDescriptor.close();
@@ -603,15 +629,6 @@ public class CaptureService extends VpnService implements Runnable {
mParcelFileDescriptor = null;
}
if(mDumper != null) {
try {
mDumper.stopDumper();
} catch (IOException e) {
e.printStackTrace();
}
mDumper = null;
}
mPcapUri = null;
mPendingUpdates.clear();
unregisterNetworkCallbacks();
@@ -780,28 +797,60 @@ public class CaptureService extends VpnService implements Runnable {
}
private void connUpdateWork() {
try {
while(true) {
Pair<ConnectionDescriptor[], ConnectionUpdate[]> item = mPendingUpdates.take();
if(item.first == null) // termination request
break;
ConnectionDescriptor[] new_conns = item.first;
ConnectionUpdate[] conns_updates = item.second;
checkBlacklistsUpdates();
// synchronize the conn_reg to ensure that newConnections and connectionsUpdates run atomically
// thus preventing the ConnectionsAdapter from interleaving other operations
synchronized (conn_reg) {
if(new_conns.length > 0)
conn_reg.newConnections(new_conns);
if(conns_updates.length > 0)
conn_reg.connectionsUpdates(conns_updates);
}
while(true) {
Pair<ConnectionDescriptor[], ConnectionUpdate[]> item;
try {
item = mPendingUpdates.take();
} catch (InterruptedException e) {
continue;
}
} catch (InterruptedException e) {
if(item.first == null) // termination request
break;
ConnectionDescriptor[] new_conns = item.first;
ConnectionUpdate[] conns_updates = item.second;
checkBlacklistsUpdates();
// synchronize the conn_reg to ensure that newConnections and connectionsUpdates run atomically
// thus preventing the ConnectionsAdapter from interleaving other operations
synchronized (conn_reg) {
if(new_conns.length > 0)
conn_reg.newConnections(new_conns);
if(conns_updates.length > 0)
conn_reg.connectionsUpdates(conns_updates);
}
}
}
private void dumpWork() {
while(true) {
byte[] data;
try {
data = mDumpQueue.take();
} catch (InterruptedException e) {
continue;
}
if(data.length == 0) // termination request
break;
try {
mDumper.dumpData(data);
} catch (IOException e) {
// Stop the capture
e.printStackTrace();
reportError(e.getLocalizedMessage());
mHandler.post(this::stop);
break;
}
}
try {
mDumper.stopDumper();
} catch (IOException e) {
e.printStackTrace();
}
}
@@ -941,13 +990,16 @@ public class CaptureService extends VpnService implements Runnable {
/* Exports a PCAP data chunk */
public void dumpPcapData(byte[] data) {
if(mDumper != null) {
try {
mDumper.dumpData(data);
} catch (IOException e) {
e.printStackTrace();
reportError(e.getLocalizedMessage());
stopPacketLoop();
if((mDumper != null) && (data.length > 0)) {
while(true) {
try {
// wait until the queue has space to insert the data. If the queue is full, we
// will experience slow-downs/drops but this is expected
mDumpQueue.put(data);
break;
} catch (InterruptedException e) {
// retry
}
}
}
}
+3 -3
View File
@@ -528,12 +528,12 @@ int run_root(pcapdroid_t *pd) {
pd_refresh_time(pd);
if(!running)
break;
if(!FD_ISSET(sock, &fdset))
goto housekeeping;
if(!running)
break;
ssize_t xrv = xread(sock, &hdr, sizeof(hdr));
if(xrv != sizeof(hdr)) {
if(xrv < 0)
+1 -1
View File
@@ -97,7 +97,7 @@ static void sendStatsDump(pcapdroid_t *pd) {
static void sendPcapDump(pcapdroid_t *pd) {
JNIEnv *env = pd->env;
log_d("Exporting a %d B PCAP buffer", pd->pcap_dump.buffer_idx);
//log_d("Exporting a %d B PCAP buffer", pd->pcap_dump.buffer_idx);
jbyteArray barray = (*env)->NewByteArray(env, pd->pcap_dump.buffer_idx);
if(jniCheckException(env))
+3 -1
View File
@@ -915,7 +915,9 @@ cleanup:
if(rt.tun)
zdtun_finalize(rt.tun);
log_i("Pkts: %u rcvd, %u drops, %u iface_drops", rt.stats.ps_recv, rt.stats.ps_drop, rt.stats.ps_ifdrop);
log_i("Pkts: %u rcvd, %u drops (%.1f%%), %u iface_drops", rt.stats.ps_recv, rt.stats.ps_drop,
rt.stats.ps_drop * 100.f / (rt.stats.ps_recv + rt.stats.ps_drop + 1),
rt.stats.ps_ifdrop);
return rv;
}