13 Commits

Author SHA1 Message Date
l-jonas e2a246220e Update whatsnew 2018-11-26 08:29:37 +01:00
l-jonas 98d6656683 Update whatsnew 2018-11-26 08:27:02 +01:00
l-jonas c307953fce Update Releasing.md 2018-11-26 08:26:46 +01:00
l-jonas 68f541f00b Release 0.3.5 2018-11-26 08:24:15 +01:00
l-jonas 29c71f1ca9 Add crash handler (#103)
* Add crash handler
2018-11-25 19:21:45 +01:00
l-jonas 76ddbdd3b4 New connection handling (#71) 2018-11-25 19:10:05 +01:00
l-jonas cae1026f35 Bugfixes (#100)
* Fix loading subdirectories on the main thread (which caused a crash)
* Fix LibraryHandler creation in the background (ContentProvider)
2018-11-25 18:01:31 +01:00
l-jonas d07c934ea7 Catch index updates after shutdown (#96)
* Catch index updates after shutdown
* Re-add wrongly removed line
2018-11-21 14:52:01 +01:00
l-jonas d829c18e76 Fix some warnings (#97) 2018-11-21 14:50:48 +01:00
l-jonas e41ed80d05 Document release process (#69)
* Move google to the top of allrepositories.google

This should fix build issues according to https://gitlab.com/fdroid/fdroiddata/issues/1423

* Add documentation about the releasing process

* Remove release scripts

* Remove library from the release process

* Update Releasing.md

* Update Releasing.md

* Update Releasing.md

* Update Releasing.md

* Update Releasing.md
2018-11-20 13:42:28 +01:00
l-jonas 3e691b61c0 Fix handling of paths with tilde (#91)
* Refactor PathUtils
2018-11-18 08:26:51 +01:00
l-jonas 0fb7a9e93d Release 0.3.4 2018-11-13 07:33:03 +01:00
l-jonas 1b4205b04a Update library version to fix proguard warnings 2018-11-13 07:30:36 +01:00
60 changed files with 1817 additions and 1667 deletions
+8
View File
@@ -0,0 +1,8 @@
# Releasing
- do tests
- update translations using ``tx pull -a -af`` (as extra merge request or branch for the case it does not build correctly)
- update the version name and version code of the app
- update the changelog at [app/src/main/play/en-GB/whatsnew](https://github.com/syncthing/syncthing-lite/blob/master/app/src/main/play/en-GB/whatsnew)
- create a tag/ release in GitHub with an changelog; The tag name should be the version number
- F-Droid picks up the release by the tag; additonally, the tag triggers a CI build which uploads the generated APK to Google Play
+2 -4
View File
@@ -19,8 +19,8 @@ android {
applicationId "net.syncthing.lite"
minSdkVersion 21
targetSdkVersion 26
versionCode 13
versionName "0.3.3"
versionCode 15
versionName "0.3.5"
multiDexEnabled true
playAccountConfig = playAccountConfigs.defaultAccountConfig
}
@@ -89,11 +89,9 @@ dependencies {
*/
implementation(project(':syncthing-client')) {
exclude group: 'commons-logging', module: 'commons-logging'
exclude group: 'org.apache.httpcomponents', module: 'httpclient'
exclude group: 'org.slf4j'
exclude group: 'ch.qos.logback'
}
implementation 'org.apache.httpcomponents:httpclient-android:4.3.5.1'
implementation 'sk.baka.slf4j:slf4j-handroid:1.7.26'
implementation 'com.google.zxing:android-integration:3.3.0'
+1
View File
@@ -4,6 +4,7 @@
<uses-permission android:name="android.permission.INTERNET"/>
<application
android:name=".android.Application"
android:allowBackup="true"
android:icon="@mipmap/ic_launcher"
android:label="@string/app_name"
@@ -59,7 +59,7 @@ class FolderBrowserActivity : SyncthingActivity() {
}
}
val folder = intent.getStringExtra(EXTRA_FOLDER_NAME)
libraryHandler?.syncthingClient {
libraryHandler.syncthingClient {
indexBrowser = it.indexHandler.newIndexBrowser(folder, true, true)
indexBrowser.setOnFolderChangedListener(this::onFolderChanged)
}
@@ -82,7 +82,7 @@ class FolderBrowserActivity : SyncthingActivity() {
override fun onActivityResult(requestCode: Int, resultCode: Int, intent: Intent?) {
if (requestCode == REQUEST_SELECT_UPLOAD_FILE && resultCode == Activity.RESULT_OK) {
libraryHandler?.syncthingClient { syncthingClient ->
libraryHandler.syncthingClient { syncthingClient ->
GlobalScope.launch (Dispatchers.Main) {
// FIXME: it would be better if the dialog would use the library handler
FileUploadDialog(this@FolderBrowserActivity, syncthingClient, intent!!.data,
@@ -100,18 +100,18 @@ class IntroActivity : AppIntro() {
override fun onCreateView(inflater: LayoutInflater, container: ViewGroup?, savedInstanceState: Bundle?): View {
binding = DataBindingUtil.inflate(inflater, R.layout.fragment_intro_two, container, false)
binding.enterDeviceId!!.scanQrCode.setOnClickListener {
binding.enterDeviceId.scanQrCode.setOnClickListener {
FragmentIntentIntegrator(this@IntroFragmentTwo).initiateScan()
}
binding.enterDeviceId!!.scanQrCode.setImageResource(R.drawable.ic_qr_code_white_24dp)
binding.enterDeviceId.scanQrCode.setImageResource(R.drawable.ic_qr_code_white_24dp)
return binding.root
}
override fun onActivityResult(requestCode: Int, resultCode: Int, intent: Intent?) {
val scanResult = IntentIntegrator.parseActivityResult(requestCode, resultCode, intent)
if (scanResult?.contents != null && scanResult.contents.isNotBlank()) {
binding.enterDeviceId!!.deviceId.setText(scanResult.contents)
binding.enterDeviceId!!.deviceIdHolder.isErrorEnabled = false
binding.enterDeviceId.deviceId.setText(scanResult.contents)
binding.enterDeviceId.deviceIdHolder.isErrorEnabled = false
}
}
@@ -121,11 +121,11 @@ class IntroActivity : AppIntro() {
*/
fun isDeviceIdValid(): Boolean {
return try {
val deviceId = binding.enterDeviceId!!.deviceId.text.toString()
val deviceId = binding.enterDeviceId.deviceId.text.toString()
Util.importDeviceId(libraryHandler, context, deviceId, { })
true
} catch (e: IOException) {
binding.enterDeviceId!!.deviceId.error = getString(R.string.invalid_device_id)
binding.enterDeviceId.deviceId.error = getString(R.string.invalid_device_id)
false
}
}
@@ -0,0 +1,52 @@
package net.syncthing.lite.android
import android.app.Application
import android.content.ClipData
import android.content.ClipboardManager
import android.content.Context
import android.util.Log
import net.syncthing.lite.BuildConfig
import org.jetbrains.anko.defaultSharedPreferences
import java.io.PrintWriter
import java.io.StringWriter
class Application: Application() {
companion object {
private const val LOG_TAG = "Application"
private const val PREF_ENABLE_CRASH_HANDLER = "crash_handler"
}
override fun onCreate() {
super.onCreate()
val clipboard = getSystemService(Context.CLIPBOARD_SERVICE) as ClipboardManager
val defaultHandler = Thread.getDefaultUncaughtExceptionHandler()
if (defaultHandler == null) {
Log.w(LOG_TAG, "could not get default crash handler")
}
Thread.setDefaultUncaughtExceptionHandler { thread, ex ->
Log.w(LOG_TAG, "app crashed", ex)
val enableCustomCrashHandling = defaultSharedPreferences.getBoolean(PREF_ENABLE_CRASH_HANDLER, false)
if (enableCustomCrashHandling) {
clipboard.primaryClip = ClipData.newPlainText(
"stacktrace",
StringWriter().apply {
append("Version: ").append(BuildConfig.VERSION_NAME).append('\n')
append(Log.getStackTraceString(ex)).append('\n')
ex.printStackTrace(PrintWriter(this))
}.buffer.toString()
)
}
if (defaultHandler != null) {
defaultHandler.uncaughtException(thread, ex)
} else {
System.exit(1)
}
}
}
}
@@ -39,12 +39,12 @@ class DevicesFragment : SyncthingFragment() {
override fun onResume() {
super.onResume()
libraryHandler?.syncthingClient { it.addOnConnectionChangedListener { _ -> updateDeviceList() } }
libraryHandler.syncthingClient { it.addOnConnectionChangedListener { _ -> updateDeviceList() } }
}
override fun onPause() {
super.onPause()
libraryHandler?.syncthingClient { it.removeOnConnectionChangedListener{ _ -> updateDeviceList() } }
libraryHandler.syncthingClient { it.removeOnConnectionChangedListener{ _ -> updateDeviceList() } }
}
override fun onLibraryLoaded() {
@@ -61,10 +61,12 @@ class DevicesFragment : SyncthingFragment() {
.setTitle(getString(R.string.remove_device_title, deviceInfo.name))
.setMessage(getString(R.string.remove_device_message, deviceInfo.deviceId.deviceId.substring(0, 7)))
.setPositiveButton(android.R.string.yes) { _, _ ->
libraryHandler?.configuration { config ->
libraryHandler.library { config, syncthingClient, _ ->
config.peers = config.peers.filterNot { it.deviceId == deviceInfo.deviceId }.toSet()
config.persistLater()
updateDeviceList()
syncthingClient.disconnectFromRemovedDevices()
}
}
.setNegativeButton(android.R.string.no, null)
@@ -18,9 +18,9 @@ class SettingsFragment : PreferenceFragmentCompat() {
val versionName = activity.packageManager.getPackageInfo(activity.packageName, 0)?.versionName
appVersion.summary = versionName
activity.libraryHandler?.configuration { localDeviceName.text = it.localDeviceName }
activity.libraryHandler.configuration { localDeviceName.text = it.localDeviceName }
localDeviceName.setOnPreferenceChangeListener { _, _ ->
activity.libraryHandler?.configuration { conf ->
activity.libraryHandler.configuration { conf ->
conf.localDeviceName = localDeviceName.text
conf.persistLater()
}
@@ -28,4 +28,4 @@ class SettingsFragment : PreferenceFragmentCompat() {
}
}
}
}
}
@@ -72,43 +72,35 @@ class DownloadFileTask(private val fileStorageDirectory: File,
return@launch
}
syncthingClient.getBlockPuller(fileInfo.folder, { blockPuller ->
val job = launch {
try {
if (!file.filesDirectory.isDirectory) {
if (!file.filesDirectory.mkdirs()) {
throw IOException("could not create output directory")
}
}
// download the file to a temp location
val inputStream = blockPuller.pullFileCoroutine(fileInfo, this@DownloadFileTask::callProgress)
try {
FileUtils.copyInputStreamToFile(inputStream, file.tempFile)
file.tempFile.renameTo(file.targetFile)
} finally {
file.tempFile.delete()
}
if (BuildConfig.DEBUG) {
Log.i(TAG, "Downloaded file $fileInfo")
}
callComplete(file.targetFile)
} catch (e: Exception) {
callError(e)
if (BuildConfig.DEBUG) {
Log.w(TAG, "Failed to download file $fileInfo", e)
}
try {
if (!file.filesDirectory.isDirectory) {
if (!file.filesDirectory.mkdirs()) {
throw IOException("could not create output directory")
}
}
cancellationSignal.setOnCancelListener {
job.cancel()
// download the file to a temp location
val inputStream = syncthingClient.pullFile(fileInfo, this@DownloadFileTask::callProgress)
try {
FileUtils.copyInputStreamToFile(inputStream, file.tempFile)
file.tempFile.renameTo(file.targetFile)
} finally {
file.tempFile.delete()
}
}, { callError(IOException("could not get block puller for file")) })
if (BuildConfig.DEBUG) {
Log.i(TAG, "Downloaded file $fileInfo")
}
callComplete(file.targetFile)
} catch (e: Exception) {
callError(e)
if (BuildConfig.DEBUG) {
Log.w(TAG, "Failed to download file $fileInfo", e)
}
}
}
}
@@ -3,7 +3,8 @@ package net.syncthing.lite.library
import android.os.Handler
import android.os.Looper
import java.util.concurrent.Executors
import kotlin.coroutines.experimental.suspendCoroutine
import kotlin.coroutines.resume
import kotlin.coroutines.suspendCoroutine
/**
* This class manages the access to an LibraryInstance
@@ -5,6 +5,8 @@ import android.net.Uri
import android.os.Handler
import android.os.Looper
import android.util.Log
import kotlinx.coroutines.GlobalScope
import kotlinx.coroutines.launch
import net.syncthing.java.bep.BlockPusher
import net.syncthing.java.client.SyncthingClient
import net.syncthing.java.core.utils.PathUtils
@@ -31,22 +33,28 @@ class UploadFileTask(context: Context, syncthingClient: SyncthingClient,
init {
Log.i(TAG, "Uploading file $localFile to folder $syncthingFolder:$syncthingPath")
syncthingClient.getBlockPusher(syncthingFolder, { blockPusher ->
val observer = blockPusher.pushFile(uploadStream, syncthingFolder, syncthingPath)
handler.post { onProgress(observer) }
GlobalScope.launch {
try {
val blockPusher = syncthingClient.getBlockPusher(folderId = syncthingFolder)
val observer = blockPusher.pushFile(uploadStream, syncthingFolder, syncthingPath)
while (!observer.isCompleted()) {
if (isCancelled)
return@getBlockPusher
observer.waitForProgressUpdate()
Log.i(TAG, "upload progress = ${observer.progressPercentage()}%")
handler.post { onProgress(observer) }
while (!observer.isCompleted()) {
if (isCancelled)
return@launch
observer.waitForProgressUpdate()
Log.i(TAG, "upload progress = ${observer.progressPercentage()}%")
handler.post { onProgress(observer) }
}
IOUtils.closeQuietly(uploadStream)
handler.post { onComplete() }
} catch (ex: Exception) {
handler.post { onError() }
}
IOUtils.closeQuietly(uploadStream)
handler.post { onComplete() }
}, { handler.post { onError() } })
}
}
fun cancel() {
@@ -44,10 +44,11 @@ object Util {
fun importDeviceId(libraryHandler: LibraryHandler?, context: Context?, deviceId: String,
onComplete: () -> Unit) {
val deviceId2 = DeviceId(deviceId.toUpperCase(Locale.US))
libraryHandler?.configuration { configuration ->
libraryHandler?.library { configuration, syncthingClient, _ ->
if (!configuration.peerIds.contains(deviceId2)) {
configuration.peers = configuration.peers + DeviceInfo(deviceId2, null)
configuration.persistLater()
syncthingClient.connectToNewlyAddedDevices()
GlobalScope.launch (Dispatchers.Main) {
context?.toast(context.getString(R.string.device_import_success, deviceId2.shortId))
onComplete()
+2 -5
View File
@@ -1,6 +1,3 @@
- add option to export files
- send correct file names to apps by which files are opened
- adaptive icon
- updated translations
- validate discovery servers
- new connection handling
- option for users to get detailed crash reports
- bugfixes
+2
View File
@@ -42,6 +42,8 @@
<string name="settings_local_device_summary">The name that other devices will see for this device</string>
<string name="settings_shutdown_delay_title">Shutdown delay</string>
<string name="settings_shutdown_delay_summary">Time before shuting down the Syncthing client after its last usage</string>
<string name="settings_crash_handler_title">Custom Crash-Handler</string>
<string name="settings_crash_handler_summary">Copy the error message to the clipboard when the App crashes</string>
<string name="device_id_dialog_title">Enter Device ID</string>
<string name="settings_shutdown_delay_10_seconds">10 seconds</string>
<string name="settings_shutdown_delay_30_seconds">30 seconds</string>
+5
View File
@@ -24,6 +24,11 @@
-->
<CheckBoxPreference
android:key="crash_handler"
android:title="@string/settings_crash_handler_title"
android:summary="@string/settings_crash_handler_summary" />
<Preference
android:key="app_version"
android:title="@string/settings_app_version_title"/>
+1 -1
View File
@@ -4,7 +4,7 @@ buildscript {
ext.kotlin_version = '1.3.0'
ext.support_version = '27.0.2'
ext.build_tools_version = '3.2.0'
ext.anko_version = '0.10.7'
ext.anko_version = '0.10.8'
ext.protobuf_lite_version = '3.0.1'
repositories {
mavenLocal()
-54
View File
@@ -1,54 +0,0 @@
#!/bin/bash
set -e
NEW_VERSION_NAME=$1
OLD_VERSION_NAME=$(grep "versionName" "app/build.gradle" | awk '{print $2}' | tr -d "\"")
if [[ -z ${NEW_VERSION_NAME} ]]
then
echo "New version name is empty. Please set a new version. Current version: $OLD_VERSION_NAME"
exit
fi
echo "
Updating Translations
-----------------------------
"
tx push -s
# Force push/pull to make sure this is executed. Apparently tx only compares timestamps, not file
# contents. So if a file was `touch`ed, it won't be updated by default.
tx pull -a -f
git add -A "app/src/main/res/values-*/strings.xml"
if ! git diff --cached --exit-code;
then
git commit -m "Imported translations"
fi
echo "
Updating Version
-----------------------------
"
OLD_VERSION_CODE=$(grep "versionCode" "app/build.gradle" -m 1 | awk '{print $2}')
NEW_VERSION_CODE=$(($OLD_VERSION_CODE + 1))
sed -i "s/versionCode $OLD_VERSION_CODE/versionCode $NEW_VERSION_CODE/" "app/build.gradle"
sed -i "s/versionName \"$OLD_VERSION_NAME\"/versionName \"$NEW_VERSION_NAME\"/" "app/build.gradle"
LIBRARY_NAME="com.github.Nutomic:syncthing-java"
sed -i "s/$LIBRARY_NAME:$OLD_VERSION_NAME/$LIBRARY_NAME:$NEW_VERSION_NAME/" "app/build.gradle"
git add "app/build.gradle"
git commit -m "Version $NEW_VERSION_NAME"
git tag ${NEW_VERSION_NAME}
echo "
Running Lint
-----------------------------
"
./gradlew clean lintVitalRelease
echo "
Update ready.
"
-37
View File
@@ -1,37 +0,0 @@
#!/usr/bin/env bash
set -e
version=$(git describe --tags)
regex='^[0-9]+\.[0-9]+\.[0-9]+$'
if [[ ! ${version} =~ $regex ]]
then
echo "Current commit is not a release"
exit;
fi
echo "
Pushing to Github
-----------------------------
"
git push
git push --tags
echo "
Push to Google Play
-----------------------------
"
read -s -p "Enter signing password: " password
SIGNING_PASSWORD=${password} ./gradlew assembleRelease
# Upload apk and listing to Google Play
SIGNING_PASSWORD=${password} ./gradlew publishRelease
echo "
Release published!
"
+1 -1
View File
@@ -1 +1 @@
include ':app', ':syncthing-repository-android', ':syncthing-repository-default', ':syncthing-relay-client', ':syncthing-bep', ':syncthing-core', ':syncthing-client', ':syncthing-discovery', ':syncthing-client-cli', ':syncthing-http-relay-client'
include ':app', ':syncthing-repository-android', ':syncthing-repository-default', ':syncthing-relay-client', ':syncthing-bep', ':syncthing-core', ':syncthing-client', ':syncthing-discovery', ':syncthing-client-cli'
-1
View File
@@ -6,7 +6,6 @@ dependencies {
compile fileTree(dir: 'libs', include: ['*.jar'])
compile project(':syncthing-core')
compile project(':syncthing-relay-client')
compile project(':syncthing-http-relay-client')
compile "net.jpountz.lz4:lz4:1.3.0"
implementation "org.jetbrains.kotlin:kotlin-stdlib:$kotlin_version"
implementation 'org.jetbrains.kotlinx:kotlinx-coroutines-core:1.0.0'
@@ -17,58 +17,44 @@ package net.syncthing.java.bep
import com.google.protobuf.ByteString
import kotlinx.coroutines.*
import kotlinx.coroutines.channels.Channel
import net.syncthing.java.bep.BlockExchangeProtos.ErrorCode
import net.syncthing.java.bep.BlockExchangeProtos.Request
import net.syncthing.java.bep.connectionactor.ConnectionActorWrapper
import net.syncthing.java.bep.utils.longSumBy
import net.syncthing.java.core.beans.BlockInfo
import net.syncthing.java.core.beans.FileBlocks
import net.syncthing.java.core.beans.FileInfo
import net.syncthing.java.core.interfaces.TempRepository
import net.syncthing.java.core.utils.NetworkUtils
import org.bouncycastle.util.encoders.Hex
import org.slf4j.LoggerFactory
import java.io.ByteArrayInputStream
import java.io.IOException
import java.io.InputStream
import java.io.SequenceInputStream
import java.io.*
import java.security.MessageDigest
import java.util.*
import kotlin.collections.HashMap
import kotlin.coroutines.resume
class BlockPuller internal constructor(private val connectionHandler: ConnectionHandler,
private val indexHandler: IndexHandler,
private val responseHandler: ResponseHandler,
private val tempRepository: TempRepository) {
object BlockPuller {
private val logger = LoggerFactory.getLogger(javaClass)
fun pullFileSync(
suspend fun pullFile(
fileInfo: FileInfo,
progressListener: (status: BlockPullerStatus) -> Unit = { }
progressListener: (status: BlockPullerStatus) -> Unit = { },
connections: List<ConnectionActorWrapper>,
indexHandler: IndexHandler,
tempRepository: TempRepository
): InputStream {
return runBlocking {
pullFileCoroutine(fileInfo, progressListener)
val connectionHelper = MultiConnectionHelper(connections) {
it.hasFolder(fileInfo.folder)
}
}
suspend fun pullFileCoroutine(
fileInfo: FileInfo,
progressListener: (status: BlockPullerStatus) -> Unit = { }
): InputStream {
val fileBlocks = indexHandler.waitForRemoteIndexAcquired(connectionHandler)
.getFileInfoAndBlocksByPath(fileInfo.folder, fileInfo.path)
?.value
?: throw IOException("file not found in local index for folder = ${fileInfo.folder} path = ${fileInfo.path}")
logger.info("pulling file = {}", fileBlocks)
NetworkUtils.assertProtocol(connectionHandler.hasFolder(fileBlocks.folder), { "supplied connection handler $connectionHandler will not share folder ${fileBlocks.folder}" })
// fail early if there is no matching connection
connectionHelper.pickConnection()
val (newFileInfo, fileBlocks) = indexHandler.getFileInfoAndBlocksByPath(fileInfo.folder, fileInfo.path) ?: throw FileNotFoundException()
// the file could have changed since the caller read it
// this would save the file using a wrong name, so throw here
if (fileBlocks.hash != fileInfo.hash) {
throw IllegalStateException("the current file entry hash does not match the hash of the provided one")
}
logger.info("pulling file = {}", fileBlocks)
val blockTempIdByHash = Collections.synchronizedMap(HashMap<String, String>())
var status = BlockPullerStatus(
@@ -77,6 +63,47 @@ class BlockPuller internal constructor(private val connectionHandler: Connection
totalFileSize = fileBlocks.size
)
suspend fun pullBlock(fileBlocks: FileBlocks, block: BlockInfo, timeoutInMillis: Long, connectionActorWrapper: ConnectionActorWrapper): ByteArray {
logger.debug("sent message for block, hash = {}", block.hash)
val response =
withTimeout(timeoutInMillis) {
try {
connectionActorWrapper.sendRequest(
BlockExchangeProtos.Request.newBuilder()
.setFolder(fileBlocks.folder)
.setName(fileBlocks.path)
.setOffset(block.offset)
.setSize(block.size)
.setHash(ByteString.copyFrom(Hex.decode(block.hash)))
.buildPartial()
)
} catch (ex: TimeoutCancellationException) {
// It seems like the TimeoutCancellationException
// is handled differently so that the timeout is ignored.
// Due to that, it's converted to an IOException.
throw IOException("timeout during requesting block")
}
}
if (response.code != BlockExchangeProtos.ErrorCode.NO_ERROR) {
// the server does not have/ want to provide this file -> don't ask him again
connectionHelper.disableConnection(connectionActorWrapper)
throw IOException("received error response ${response.code}")
}
val data = response.data.toByteArray()
val hash = Hex.toHexString(MessageDigest.getInstance("SHA-256").digest(data))
if (hash != block.hash) {
throw IllegalStateException("expected block with hash ${block.hash}, but got block with hash $hash")
}
return data
}
try {
val reportProgressLock = Object()
@@ -96,9 +123,31 @@ class BlockPuller internal constructor(private val connectionHandler: Connection
repeat(4 /* 4 blocks per time */) { workerNumber ->
async {
for (block in pipe) {
logger.debug("request block with hash = {} from worker {}", block.hash, workerNumber)
logger.debug("message block with hash = {} from worker {}", block.hash, workerNumber)
val blockContent = pullBlock(fileBlocks, block, 1000 * 60 /* 60 seconds timeout per block */)
lateinit var blockContent: ByteArray
val attempts = 0..4
for (attempt in attempts) {
try {
blockContent = pullBlock(fileBlocks, block, 1000 * 60 /* 60 seconds timeout per block */, connectionHelper.pickConnection())
break
} catch (ex: IOException) {
if (attempt == attempts.last) {
throw ex
} else {
// will retry after a pause
// 0: 300 ms after the first attempt
// 1: 1200 ms after the second attempt
// 2: 2700 ms after the third attempt
// 3: 4800 ms after the third attempt
// total: 9000 ms
delay((attempt + 1) * (attempt + 1) * 300L)
}
}
}
blockTempIdByHash[block.hash] = tempRepository.pushTempData(blockContent)
@@ -140,57 +189,6 @@ class BlockPuller internal constructor(private val connectionHandler: Connection
throw ex
}
}
private suspend fun pullBlock(fileBlocks: FileBlocks, block: BlockInfo, timeoutInMillis: Long): ByteArray {
logger.debug("sent request for block, hash = {}", block.hash)
val response =
withTimeout(timeoutInMillis) {
try {
doRequest(
Request.newBuilder()
.setFolder(fileBlocks.folder)
.setName(fileBlocks.path)
.setOffset(block.offset)
.setSize(block.size)
.setHash(ByteString.copyFrom(Hex.decode(block.hash)))
)
} catch (ex: TimeoutCancellationException) {
// It seems like the TimeoutCancellationException
// is handled differently so that the timeout is ignored.
// Due to that, it's converted to an IOException.
throw IOException("timeout during requesting block")
}
}
NetworkUtils.assertProtocol(response.code == ErrorCode.NO_ERROR) {
"received error response, code = ${response.code}"
}
val data = response.data.toByteArray()
val hash = Hex.toHexString(MessageDigest.getInstance("SHA-256").digest(data))
if (hash != block.hash) {
throw IllegalStateException("expected block with hash ${block.hash}, but got block with hash $hash")
}
return data
}
private suspend fun doRequest(request: Request.Builder): BlockExchangeProtos.Response {
return suspendCancellableCoroutine { continuation ->
val requestId = responseHandler.registerListener { response ->
continuation.resume(response)
}
connectionHandler.sendMessage(
request
.setId(requestId)
.build()
)
}
}
}
data class BlockPullerStatus(
@@ -1,5 +1,6 @@
/*
/*
* Copyright (C) 2016 Davide Imbriaco
* Copyright (C) 2018 Jonas Lochmann
*
* This Java file is subject to the terms of the Mozilla Public
* License, v. 2.0. If a copy of the MPL was not distributed with this
@@ -14,14 +15,15 @@
package net.syncthing.java.bep
import com.google.protobuf.ByteString
import kotlinx.coroutines.GlobalScope
import kotlinx.coroutines.async
import net.syncthing.java.bep.BlockExchangeProtos.Vector
import net.syncthing.java.bep.connectionactor.ConnectionActorWrapper
import net.syncthing.java.core.beans.*
import net.syncthing.java.core.beans.FileInfo.Version
import net.syncthing.java.core.utils.BlockUtils
import net.syncthing.java.core.utils.NetworkUtils
import net.syncthing.java.core.utils.submitLogging
import org.apache.commons.io.IOUtils
import org.apache.commons.lang3.tuple.Pair
import org.bouncycastle.util.encoders.Hex
import org.slf4j.LoggerFactory
import java.io.Closeable
@@ -31,36 +33,35 @@ import java.nio.ByteBuffer
import java.security.MessageDigest
import java.util.*
import java.util.concurrent.ConcurrentHashMap
import java.util.concurrent.ExecutionException
import java.util.concurrent.Executors
import java.util.concurrent.Future
import java.util.concurrent.atomic.AtomicBoolean
import java.util.concurrent.atomic.AtomicReference
class BlockPusher internal constructor(private val localDeviceId: DeviceId,
private val connectionHandler: ConnectionHandler,
private val indexHandler: IndexHandler) {
// TODO: refactor this
class BlockPusher(private val localDeviceId: DeviceId,
private val connectionHandler: ConnectionActorWrapper,
private val indexHandler: IndexHandler,
private val requestHandlerRegistry: RequestHandlerRegistry) {
private val logger = LoggerFactory.getLogger(javaClass)
fun pushDelete(folderId: String, targetPath: String): IndexEditObserver {
suspend fun pushDelete(folderId: String, targetPath: String): BlockExchangeProtos.IndexUpdate {
val fileInfo = indexHandler.waitForRemoteIndexAcquired(connectionHandler).getFileInfoByPath(folderId, targetPath)!!
NetworkUtils.assertProtocol(connectionHandler.hasFolder(fileInfo.folder), {"supplied connection handler $connectionHandler will not share folder ${fileInfo.folder}"})
return IndexEditObserver(sendIndexUpdate(folderId, BlockExchangeProtos.FileInfo.newBuilder()
return sendIndexUpdate(folderId, BlockExchangeProtos.FileInfo.newBuilder()
.setName(targetPath)
.setType(BlockExchangeProtos.FileInfoType.valueOf(fileInfo.type.name))
.setDeleted(true), fileInfo.versionList))
.setDeleted(true), fileInfo.versionList)
}
fun pushDir(folder: String, path: String): IndexEditObserver {
suspend fun pushDir(folder: String, path: String): BlockExchangeProtos.IndexUpdate {
NetworkUtils.assertProtocol(connectionHandler.hasFolder(folder), {"supplied connection handler $connectionHandler will not share folder $folder"})
return IndexEditObserver(sendIndexUpdate(folder, BlockExchangeProtos.FileInfo.newBuilder()
return sendIndexUpdate(folder, BlockExchangeProtos.FileInfo.newBuilder()
.setName(path)
.setType(BlockExchangeProtos.FileInfoType.DIRECTORY), null))
.setType(BlockExchangeProtos.FileInfoType.DIRECTORY), null)
}
fun pushFile(inputStream: InputStream, folderId: String, targetPath: String): FileUploadObserver {
suspend fun pushFile(inputStream: InputStream, folderId: String, targetPath: String): FileUploadObserver {
val fileInfo = indexHandler.waitForRemoteIndexAcquired(connectionHandler).getFileInfoByPath(folderId, targetPath)
NetworkUtils.assertProtocol(connectionHandler.hasFolder(folderId), {"supplied connection handler $connectionHandler will not share folder $folderId"})
assert(fileInfo == null || fileInfo.folder == folderId)
@@ -72,38 +73,33 @@ class BlockPusher internal constructor(private val localDeviceId: DeviceId,
val uploadError = AtomicReference<Exception>()
val isCompleted = AtomicBoolean(false)
val updateLock = Object()
val listener = {request: BlockExchangeProtos.Request ->
if (request.folder == folderId && request.name == targetPath) {
val requestFilter = RequestHandlerFilter(
deviceId = connectionHandler.deviceId,
folderId = folderId,
path = targetPath
)
requestHandlerRegistry.registerListener(requestFilter) { request ->
GlobalScope.async {
val hash = Hex.toHexString(request.hash.toByteArray())
logger.debug("handling block request = {}:{}-{} ({})", request.name, request.offset, request.size, hash)
val data = dataSource.getBlock(request.offset, request.size, hash)
val future = connectionHandler.sendMessage(BlockExchangeProtos.Response.newBuilder()
sentBlocks.add(hash)
synchronized(updateLock) {
updateLock.notifyAll()
}
BlockExchangeProtos.Response.newBuilder()
.setCode(BlockExchangeProtos.ErrorCode.NO_ERROR)
.setData(ByteString.copyFrom(data))
.setId(request.id)
.build())
monitoringProcessExecutorService.submitLogging {
try {
future.get()
sentBlocks.add(hash)
synchronized(updateLock) {
updateLock.notifyAll()
}
//TODO retry on error, register error and throw on watcher
} catch (ex: InterruptedException) {
//return and do nothing
} catch (ex: ExecutionException) {
uploadError.set(ex)
synchronized(updateLock) {
updateLock.notifyAll()
}
}
}
.build()
}
}
connectionHandler.registerOnRequestMessageReceivedListeners(listener)
logger.debug("send index update for file = {}", targetPath)
val indexListener = { folderInfo: FolderInfo, newRecords: List<FileInfo>, indexInfo: IndexInfo ->
val indexListener = { folderInfo: FolderInfo, newRecords: List<FileInfo>, _: IndexInfo ->
if (folderInfo.folderId == folderId) {
for (fileInfo2 in newRecords) {
if (fileInfo2.path == targetPath && fileInfo2.hash == dataSource.getHash()) { //TODO check not invalid
@@ -121,7 +117,7 @@ class BlockPusher internal constructor(private val localDeviceId: DeviceId,
.setName(targetPath)
.setSize(fileSize)
.setType(BlockExchangeProtos.FileInfoType.FILE)
.addAllBlocks(dataSource.blocks), fileInfo?.versionList).right
.addAllBlocks(dataSource.blocks), fileInfo?.versionList)
return object : FileUploadObserver() {
override fun progressPercentage() = if (isCompleted.get()) 100 else (sentBlocks.size.toFloat() / dataSource.getHashes().size).toInt()
@@ -133,7 +129,7 @@ class BlockPusher internal constructor(private val localDeviceId: DeviceId,
logger.debug("closing upload process")
monitoringProcessExecutorService.shutdown()
indexHandler.unregisterOnIndexRecordAcquiredListener(indexListener)
connectionHandler.unregisterOnRequestMessageReceivedListeners(listener)
requestHandlerRegistry.unregisterListener(requestFilter)
val fileInfo1 = indexHandler.pushRecord(indexUpdate.folder, indexUpdate.filesList.single())
logger.info("sent file info record = {}", fileInfo1)
}
@@ -152,8 +148,8 @@ class BlockPusher internal constructor(private val localDeviceId: DeviceId,
}
}
private fun sendIndexUpdate(folderId: String, fileInfoBuilder: BlockExchangeProtos.FileInfo.Builder,
oldVersions: Iterable<Version>?): Pair<Future<*>, BlockExchangeProtos.IndexUpdate> {
private suspend fun sendIndexUpdate(folderId: String, fileInfoBuilder: BlockExchangeProtos.FileInfo.Builder,
oldVersions: Iterable<Version>?): BlockExchangeProtos.IndexUpdate {
run {
val nextSequence = indexHandler.sequencer().nextSequence()
val list = oldVersions ?: emptyList()
@@ -182,7 +178,10 @@ class BlockPusher internal constructor(private val localDeviceId: DeviceId,
.addFiles(fileInfo)
.build()
logger.debug("index update = {}", fileInfo)
return Pair.of(connectionHandler.sendMessage(indexUpdate), indexUpdate)
connectionHandler.sendIndexUpdate(indexUpdate)
return indexUpdate
}
abstract inner class FileUploadObserver : Closeable {
@@ -203,33 +202,6 @@ class BlockPusher internal constructor(private val localDeviceId: DeviceId,
}
}
inner class IndexEditObserver(private val future: Future<*>, private val indexUpdate: BlockExchangeProtos.IndexUpdate) : Closeable {
//throw exception if job has errors
@Throws(InterruptedException::class, ExecutionException::class)
fun isCompleted(): Boolean {
return if (future.isDone) {
future.get()
true
} else {
false
}
}
constructor(pair: Pair<Future<*>, BlockExchangeProtos.IndexUpdate>) : this(pair.left, pair.right)
@Throws(InterruptedException::class, ExecutionException::class)
fun waitForComplete() {
future.get()
}
@Throws(IOException::class)
override fun close() {
indexHandler.pushRecord(indexUpdate.folder, indexUpdate.filesList.single())
}
}
private class DataSource @Throws(IOException::class) constructor(private val inputStream: InputStream) {
var size: Long = 0
@@ -1,517 +0,0 @@
/*
* Copyright (C) 2016 Davide Imbriaco
* Copyright (C) 2018 Jonas Lochmann
*
* This Java file is subject to the terms of the Mozilla Public
* License, v. 2.0. If a copy of the MPL was not distributed with this
* file, You can obtain one at http://mozilla.org/MPL/2.0/.
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package net.syncthing.java.bep
import com.google.protobuf.ByteString
import com.google.protobuf.MessageLite
import net.jpountz.lz4.LZ4Factory
import net.syncthing.java.bep.BlockExchangeProtos.*
import net.syncthing.java.client.protocol.rp.RelayClient
import net.syncthing.java.core.beans.DeviceAddress
import net.syncthing.java.core.beans.DeviceId
import net.syncthing.java.core.beans.DeviceInfo
import net.syncthing.java.core.beans.FolderInfo
import net.syncthing.java.core.configuration.Configuration
import net.syncthing.java.core.interfaces.TempRepository
import net.syncthing.java.core.security.KeystoreHandler
import net.syncthing.java.core.utils.NetworkUtils
import net.syncthing.java.core.utils.submitLogging
import net.syncthing.java.httprelay.HttpRelayClient
import org.apache.commons.io.IOUtils
import org.apache.commons.lang3.tuple.Pair
import org.slf4j.LoggerFactory
import java.io.Closeable
import java.io.DataInputStream
import java.io.DataOutputStream
import java.io.IOException
import java.lang.reflect.InvocationTargetException
import java.nio.ByteBuffer
import java.security.cert.CertificateException
import java.util.concurrent.ConcurrentHashMap
import java.util.concurrent.Executors
import java.util.concurrent.Future
import java.util.concurrent.TimeUnit
import javax.net.ssl.SSLSocket
class ConnectionHandler(private val configuration: Configuration, val address: DeviceAddress,
private val indexHandler: IndexHandler,
private val tempRepository: TempRepository,
private val onNewFolderSharedListener: (ConnectionHandler, FolderInfo) -> Unit,
private val onConnectionChangedListener: (ConnectionHandler) -> Unit) : Closeable {
private val logger = LoggerFactory.getLogger(javaClass)
private val outExecutorService = Executors.newSingleThreadExecutor()
private val inExecutorService = Executors.newSingleThreadExecutor()
private val messageProcessingService = Executors.newCachedThreadPool()
private val periodicExecutorService = Executors.newSingleThreadScheduledExecutor()
private lateinit var socket: SSLSocket
private var inputStream: DataInputStream? = null
private var outputStream: DataOutputStream? = null
private var lastActive = Long.MIN_VALUE
internal var clusterConfigInfo: ClusterConfigInfo? = null
private set
private val clusterConfigWaitingLock = Object()
private val responseHandler = ResponseHandler()
private val blockPuller = BlockPuller(this, indexHandler, responseHandler, tempRepository)
private val blockPusher = BlockPusher(configuration.localDeviceId, this, indexHandler)
private val onRequestMessageReceivedListeners = mutableSetOf<(Request) -> Unit>()
private var isClosed = false
var isConnected = false
private set
fun deviceId(): DeviceId = address.deviceId()
private fun checkNotClosed() {
NetworkUtils.assertProtocol(!isClosed, {"connection $this closed"})
}
internal fun registerOnRequestMessageReceivedListeners(listener: (Request) -> Unit) {
onRequestMessageReceivedListeners.add(listener)
}
internal fun unregisterOnRequestMessageReceivedListeners(listener: (Request) -> Unit) {
assert(onRequestMessageReceivedListeners.contains(listener))
onRequestMessageReceivedListeners.remove(listener)
}
@Throws(IOException::class, KeystoreHandler.CryptoException::class)
fun connect(): ConnectionHandler {
checkNotClosed()
assert(!isConnected, {"already connected!"})
logger.info("connecting to {}", address.address)
val keystoreHandler = KeystoreHandler.Loader().loadKeystore(configuration)
socket = when (address.getType()) {
DeviceAddress.AddressType.TCP -> {
logger.debug("opening tcp ssl connection")
keystoreHandler.createSocket(address.getSocketAddress(), KeystoreHandler.BEP)
}
DeviceAddress.AddressType.RELAY -> {
logger.debug("opening relay connection")
keystoreHandler.wrapSocket(RelayClient(configuration).openRelayConnection(address), KeystoreHandler.BEP)
}
DeviceAddress.AddressType.HTTP_RELAY, DeviceAddress.AddressType.HTTPS_RELAY -> {
logger.debug("opening http relay connection")
keystoreHandler.wrapSocket(HttpRelayClient().openRelayConnection(address), KeystoreHandler.BEP)
}
else -> throw UnsupportedOperationException("unsupported address type = " + address.getType())
}
inputStream = DataInputStream(socket.inputStream)
outputStream = DataOutputStream(socket.outputStream)
sendHelloMessage(BlockExchangeProtos.Hello.newBuilder()
.setClientName(configuration.clientName)
.setClientVersion(configuration.clientVersion)
.setDeviceName(configuration.localDeviceName)
.build().toByteArray())
markActivityOnSocket()
receiveHelloMessage()
try {
KeystoreHandler.assertSocketCertificateValid(socket, address.deviceId())
} catch (e: CertificateException) {
throw IOException(e)
}
run {
val clusterConfigBuilder = ClusterConfig.newBuilder()
for (folder in configuration.folders) {
val folderBuilder = Folder.newBuilder()
.setId(folder.folderId)
.setLabel(folder.label)
run {
//our device
val deviceBuilder = Device.newBuilder()
.setId(ByteString.copyFrom(configuration.localDeviceId.toHashData()))
.setIndexId(indexHandler.sequencer().indexId())
.setMaxSequence(indexHandler.sequencer().currentSequence())
folderBuilder.addDevices(deviceBuilder)
}
run {
//other device
val deviceBuilder = Device.newBuilder()
.setId(ByteString.copyFrom(DeviceId(address.deviceId).toHashData()))
val indexSequenceInfo = indexHandler.indexRepository.findIndexInfoByDeviceAndFolder(address.deviceId(), folder.folderId)
indexSequenceInfo?.let {
deviceBuilder
.setIndexId(indexSequenceInfo.indexId)
.setMaxSequence(indexSequenceInfo.localSequence)
logger.info("send delta index info device = {} index = {} max (local) sequence = {}",
indexSequenceInfo.deviceId,
indexSequenceInfo.indexId,
indexSequenceInfo.localSequence)
}
folderBuilder.addDevices(deviceBuilder)
}
clusterConfigBuilder.addFolders(folderBuilder)
//TODO other devices??
}
sendMessage(clusterConfigBuilder.build())
}
synchronized(clusterConfigWaitingLock) {
startMessageListenerService()
while (clusterConfigInfo == null && !isClosed) {
logger.debug("wait for cluster config")
try {
clusterConfigWaitingLock.wait()
} catch (e: InterruptedException) {
throw IOException(e)
}
}
if (clusterConfigInfo == null) {
throw IOException("unable to retrieve cluster config from peer!")
}
}
for (folder in configuration.folders) {
if (hasFolder(folder.folderId)) {
sendIndexMessage(folder.folderId)
}
}
periodicExecutorService.scheduleWithFixedDelay({ this.sendPing() }, 90, 90, TimeUnit.SECONDS)
isConnected = true
onConnectionChangedListener(this)
return this
}
fun getBlockPuller(): BlockPuller {
return blockPuller
}
fun getBlockPusher(): BlockPusher {
return blockPusher
}
private fun sendIndexMessage(folderId: String) {
sendMessage(Index.newBuilder()
.setFolder(folderId)
.build())
}
fun closeBg() {
Thread { close() }.start()
}
/**
* Receive hello message and save device name to configuration.
*/
@Throws(IOException::class)
private fun receiveHelloMessage() {
val magic = inputStream!!.readInt()
NetworkUtils.assertProtocol(magic == MAGIC, {"magic mismatch, expected $MAGIC, got $magic"})
val length = inputStream!!.readShort().toInt()
NetworkUtils.assertProtocol(length > 0, {"invalid lenght, must be >0, got $length"})
val buffer = ByteArray(length)
inputStream!!.readFully(buffer)
val hello = BlockExchangeProtos.Hello.parseFrom(buffer)
logger.info("Received hello message, deviceName=${hello.deviceName}, clientName=${hello.clientName}, clientVersion=${hello.clientVersion}")
configuration.peers = configuration.peers.map { peer ->
if (peer.deviceId == deviceId()) {
DeviceInfo(deviceId(), hello.deviceName)
} else {
peer
}
}.toSet()
configuration.persistLater()
}
private fun sendHelloMessage(payload: ByteArray): Future<*> {
return outExecutorService.submitLogging {
try {
logger.debug("Sending hello message")
val header = ByteBuffer.allocate(6)
header.putInt(MAGIC)
header.putShort(payload.size.toShort())
outputStream!!.write(header.array())
outputStream!!.write(payload)
outputStream!!.flush()
} catch (ex: IOException) {
if (outExecutorService.isShutdown) {
return@submitLogging
}
logger.error("error writing to output stream", ex)
closeBg()
}
}
}
private fun sendPing(): Future<*> {
return sendMessage(Ping.newBuilder().build())
}
private fun markActivityOnSocket() {
lastActive = System.currentTimeMillis()
}
@Throws(IOException::class)
private fun receiveMessage(): Pair<BlockExchangeProtos.MessageType, MessageLite> {
var headerLength = inputStream!!.readShort().toInt()
while (headerLength == 0) {
logger.warn("got headerLength == 0, skipping short")
headerLength = inputStream!!.readShort().toInt()
}
markActivityOnSocket()
NetworkUtils.assertProtocol(headerLength > 0, {"invalid lenght, must be >0, got $headerLength"})
val headerBuffer = ByteArray(headerLength)
inputStream!!.readFully(headerBuffer)
val header = BlockExchangeProtos.Header.parseFrom(headerBuffer)
var messageLength = 0
while (messageLength == 0) {
logger.warn("received readInt() == 0, expecting 'bep message header length' (int >0), ignoring (keepalive?)")
messageLength = inputStream!!.readInt()
}
NetworkUtils.assertProtocol(messageLength >= 0, {"invalid lenght, must be >=0, got $messageLength"})
var messageBuffer = ByteArray(messageLength)
inputStream!!.readFully(messageBuffer)
markActivityOnSocket()
if (header.compression == BlockExchangeProtos.MessageCompression.LZ4) {
val uncompressedLength = ByteBuffer.wrap(messageBuffer).int
messageBuffer = LZ4Factory.fastestInstance().fastDecompressor().decompress(messageBuffer, 4, uncompressedLength)
}
val messageTypeInfo = messageTypesByProtoMessageType[header.type]
NetworkUtils.assertProtocol(messageTypeInfo != null, {"unsupported message type = ${header.type}"})
try {
val message = messageTypeInfo!!.parseFrom(messageBuffer)
return Pair.of(header.type, message)
} catch (e: Exception) {
when (e) {
is IllegalAccessException, is IllegalArgumentException, is InvocationTargetException, is NoSuchMethodException, is SecurityException ->
throw IOException(e)
else -> throw e
}
}
}
internal fun sendMessage(message: MessageLite): Future<*> {
checkNotClosed()
val messageTypeInfo = messageTypesByJavaClass[message.javaClass]
messageTypeInfo!!
val header = BlockExchangeProtos.Header.newBuilder()
.setCompression(BlockExchangeProtos.MessageCompression.NONE)
// invert map
.setType(messageTypeInfo.protoMessageType)
.build()
val headerData = header.toByteArray()
val messageData = message.toByteArray() //TODO compression
return outExecutorService.submit<Any> {
try {
logger.debug("sending message type = {} {}", header.type, getIdForMessage(message))
markActivityOnSocket()
outputStream!!.writeShort(headerData.size)
outputStream!!.write(headerData)
outputStream!!.writeInt(messageData.size)//with compression, check this
outputStream!!.write(messageData)
outputStream!!.flush()
markActivityOnSocket()
} catch (ex: IOException) {
if (!outExecutorService.isShutdown) {
logger.error("error writing to output stream", ex)
closeBg()
}
throw ex
}
null
}
}
override fun close() {
if (!isClosed) {
sendMessage(Close.getDefaultInstance())
isClosed = true
isConnected = false
periodicExecutorService.shutdown()
outExecutorService.shutdown()
inExecutorService.shutdown()
messageProcessingService.shutdown()
assert(onRequestMessageReceivedListeners.isEmpty())
if (outputStream != null) {
IOUtils.closeQuietly(outputStream)
outputStream = null
}
if (inputStream != null) {
IOUtils.closeQuietly(inputStream)
inputStream = null
}
try {
IOUtils.closeQuietly(socket)
} catch (ex: Exception) {
// ignore this
// this can throw an exception if socket was not yet initialized/ set
// as Kotlin does an check about this, the closeQuietly does not catch it
}
logger.info("closed connection {}", address)
synchronized(clusterConfigWaitingLock) {
clusterConfigWaitingLock.notifyAll()
}
onConnectionChangedListener(this)
try {
periodicExecutorService.awaitTermination(2, TimeUnit.SECONDS)
outExecutorService.awaitTermination(2, TimeUnit.SECONDS)
inExecutorService.awaitTermination(2, TimeUnit.SECONDS)
messageProcessingService.awaitTermination(2, TimeUnit.SECONDS)
} catch (ex: InterruptedException) {
logger.warn("", ex)
}
}
}
/**
* return time elapsed since last activity on socket, inputStream millis
*
* @return
*/
fun getLastActive(): Long {
return System.currentTimeMillis() - lastActive
}
private fun startMessageListenerService() {
inExecutorService.submitLogging {
try {
while (!Thread.interrupted()) {
val message = receiveMessage()
messageProcessingService.submitLogging {
logger.debug("received message type = {} {}", message.left, getIdForMessage(message.right))
when (message.left) {
BlockExchangeProtos.MessageType.INDEX -> {
val index = message.value as Index
indexHandler.handleIndexMessageReceivedEvent(index.folder, index.filesList, this)
}
BlockExchangeProtos.MessageType.INDEX_UPDATE -> {
val update = message.value as IndexUpdate
indexHandler.handleIndexMessageReceivedEvent(update.folder, update.filesList, this)
}
BlockExchangeProtos.MessageType.REQUEST -> {
onRequestMessageReceivedListeners.forEach { it(message.value as Request) }
}
BlockExchangeProtos.MessageType.RESPONSE -> {
responseHandler.handleResponse(message.value as Response)
}
BlockExchangeProtos.MessageType.PING -> logger.debug("ping message received")
BlockExchangeProtos.MessageType.CLOSE -> {
val close = message.value as BlockExchangeProtos.Close
logger.info("received close message, reason=${close.reason}")
closeBg()
}
BlockExchangeProtos.MessageType.CLUSTER_CONFIG -> {
NetworkUtils.assertProtocol(clusterConfigInfo == null, {"received cluster config message twice!"})
clusterConfigInfo = ClusterConfigInfo()
val clusterConfig = message.value as ClusterConfig
for (folder in clusterConfig.foldersList ?: emptyList()) {
val folderInfo = ClusterConfigFolderInfo(folder.id, folder.label)
val devicesById = (folder.devicesList ?: emptyList())
.associateBy { input ->
DeviceId.fromHashData(input.id!!.toByteArray())
}
val otherDevice = devicesById[address.deviceId()]
val ourDevice = devicesById[configuration.localDeviceId]
if (otherDevice != null) {
folderInfo.isAnnounced = true
}
if (ourDevice != null) {
folderInfo.isShared = true
logger.info("folder shared from device = {} folder = {}", address.deviceId, folderInfo)
val folderIds = configuration.folders.map { it.folderId }
if (!folderIds.contains(folderInfo.folderId)) {
val fi = FolderInfo(folderInfo.folderId, folderInfo.label)
configuration.folders = configuration.folders + fi
onNewFolderSharedListener(this, fi)
logger.info("new folder shared = {}", folderInfo)
}
} else {
logger.info("folder not shared from device = {} folder = {}", address.deviceId, folderInfo)
}
clusterConfigInfo!!.putFolderInfo(folderInfo)
}
configuration.persistLater()
indexHandler.handleClusterConfigMessageProcessedEvent(clusterConfig)
synchronized(clusterConfigWaitingLock) {
clusterConfigWaitingLock.notifyAll()
}
}
}
}
}
} catch (ex: IOException) {
if (inExecutorService.isShutdown) {
return@submitLogging
}
logger.error("error receiving message", ex)
closeBg()
}
}
}
override fun toString(): String {
return "ConnectionHandler{" + "address=" + address + ", lastActive=" + getLastActive() / 1000.0 + "secs ago}"
}
internal inner class ClusterConfigInfo {
private val folderInfoById = ConcurrentHashMap<String, ClusterConfigFolderInfo>()
fun getSharedFolders(): Set<String> = folderInfoById.values.filter { it.isShared }.map { it.folderId }.toSet()
fun putFolderInfo(folderInfo: ClusterConfigFolderInfo) {
folderInfoById[folderInfo.folderId] = folderInfo
}
}
fun hasFolder(folder: String): Boolean {
return clusterConfigInfo!!.getSharedFolders().contains(folder)
}
companion object {
private const val MAGIC = 0x2EA7D90B
private val messageTypes = listOf(
MessageTypeInfo(MessageType.CLOSE, Close::class.java) { Close.parseFrom(it) },
MessageTypeInfo(MessageType.CLUSTER_CONFIG, ClusterConfig::class.java) { ClusterConfig.parseFrom(it) },
MessageTypeInfo(MessageType.DOWNLOAD_PROGRESS, DownloadProgress::class.java) { DownloadProgress.parseFrom(it) },
MessageTypeInfo(MessageType.INDEX, Index::class.java) { Index.parseFrom(it) },
MessageTypeInfo(MessageType.INDEX_UPDATE, IndexUpdate::class.java) { IndexUpdate.parseFrom(it) },
MessageTypeInfo(MessageType.PING, Ping::class.java) { Ping.parseFrom(it) },
MessageTypeInfo(MessageType.REQUEST, Request::class.java) { Request.parseFrom(it) },
MessageTypeInfo(MessageType.RESPONSE, Response::class.java) { Response.parseFrom(it) }
)
private val messageTypesByProtoMessageType = messageTypes.map { it.protoMessageType to it }.toMap()
private val messageTypesByJavaClass = messageTypes.map { it.javaClass to it }.toMap()
/**
* get id for message bean/instance, for log tracking
*
* @param message
* @return id for message bean
*/
private fun getIdForMessage(message: MessageLite): String {
return when (message) {
is Request -> Integer.toString(message.id)
is Response -> Integer.toString(message.id)
else -> Integer.toString(Math.abs(message.hashCode()))
}
}
}
data class MessageTypeInfo(
val protoMessageType: MessageType,
val javaClass: Class<out MessageLite>,
val parseFrom: (data: ByteArray) -> MessageLite
)
}
@@ -55,7 +55,7 @@ class IndexBrowser internal constructor(private val indexRepository: IndexReposi
}
}
internal fun onIndexChangedevent(folder: String, newRecord: FileInfo) {
internal fun onIndexChangedevent(folder: String) {
if (folder == this.folder) {
preloadFileInfoForCurrentPath()
}
@@ -13,6 +13,8 @@
*/
package net.syncthing.java.bep
import net.syncthing.java.bep.connectionactor.ClusterConfigInfo
import net.syncthing.java.bep.connectionactor.ConnectionActorWrapper
import net.syncthing.java.core.beans.*
import net.syncthing.java.core.beans.FileInfo.Version
import net.syncthing.java.core.configuration.Configuration
@@ -21,9 +23,8 @@ import net.syncthing.java.core.interfaces.Sequencer
import net.syncthing.java.core.interfaces.TempRepository
import net.syncthing.java.core.utils.NetworkUtils
import net.syncthing.java.core.utils.awaitTerminationSafe
import net.syncthing.java.core.utils.submitLogging
import net.syncthing.java.core.utils.trySubmitLogging
import org.apache.commons.lang3.tuple.Pair
import org.apache.http.util.TextUtils
import org.bouncycastle.util.encoders.Hex
import org.slf4j.LoggerFactory
import java.io.Closeable
@@ -94,9 +95,9 @@ class IndexHandler(private val configuration: Configuration, val indexRepository
}
}
internal fun isRemoteIndexAcquired(clusterConfigInfo: ConnectionHandler.ClusterConfigInfo, peerDeviceId: DeviceId): Boolean {
internal fun isRemoteIndexAcquired(clusterConfigInfo: ClusterConfigInfo, peerDeviceId: DeviceId): Boolean {
var ready = true
for (folder in clusterConfigInfo.getSharedFolders()) {
for (folder in clusterConfigInfo.sharedFolderIds) {
val indexSequenceInfo = indexRepository.findIndexInfoByDeviceAndFolder(peerDeviceId, folder)
if (indexSequenceInfo == null || indexSequenceInfo.localSequence < indexSequenceInfo.maxSequence) {
logger.debug("waiting for index on folder = {} sequenceInfo = {}", folder, indexSequenceInfo)
@@ -107,12 +108,12 @@ class IndexHandler(private val configuration: Configuration, val indexRepository
}
@Throws(InterruptedException::class)
fun waitForRemoteIndexAcquired(connectionHandler: ConnectionHandler, timeoutSecs: Long? = null): IndexHandler {
fun waitForRemoteIndexAcquired(connectionHandler: ConnectionActorWrapper, timeoutSecs: Long? = null): IndexHandler {
val timeoutMillis = (timeoutSecs ?: DEFAULT_INDEX_TIMEOUT) * 1000
synchronized(indexWaitLock) {
while (!isRemoteIndexAcquired(connectionHandler.clusterConfigInfo!!, connectionHandler.deviceId())) {
while (!isRemoteIndexAcquired(connectionHandler.getClusterConfig(), connectionHandler.deviceId)) {
indexWaitLock.wait(timeoutMillis)
NetworkUtils.assertProtocol(connectionHandler.getLastActive() < timeoutMillis || lastActive() < timeoutMillis,
NetworkUtils.assertProtocol(/* TODO connectionHandler.getLastActive() < timeoutMillis || */ lastActive() < timeoutMillis,
{"unable to acquire index from connection $connectionHandler, timeout reached!"})
}
}
@@ -137,8 +138,8 @@ class IndexHandler(private val configuration: Configuration, val indexRepository
}
}
fun handleIndexMessageReceivedEvent(folderId: String, filesList: List<BlockExchangeProtos.FileInfo>, connectionHandler: ConnectionHandler) {
indexMessageProcessor.handleIndexMessageReceivedEvent(folderId, filesList, connectionHandler)
internal fun handleIndexMessageReceivedEvent(folderId: String, filesList: List<BlockExchangeProtos.FileInfo>, clusterConfigInfo: ClusterConfigInfo, peerDeviceId: DeviceId) {
indexMessageProcessor.handleIndexMessageReceivedEvent(folderId, filesList, clusterConfigInfo, peerDeviceId)
}
fun pushRecord(folder: String, bepFileInfo: BlockExchangeProtos.FileInfo): FileInfo? {
@@ -215,7 +216,7 @@ class IndexHandler(private val configuration: Configuration, val indexRepository
indexRepository.updateFileInfo(record, fileBlocks)
logger.trace("loaded new record = {}", record)
indexBrowsers.forEach {
it.onIndexChangedevent(record.folder, record)
it.onIndexChangedevent(record.folder)
}
record
}
@@ -233,9 +234,9 @@ class IndexHandler(private val configuration: Configuration, val indexRepository
} else {
assert(fileInfo.isFile())
val fileBlocks = indexRepository.findFileBlocks(folder, path)
checkNotNull(fileBlocks, {"file blocks not found for file info = $fileInfo"})
checkNotNull(fileBlocks) {"file blocks not found for file info = $fileInfo"}
FileInfo.checkBlocks(fileInfo, fileBlocks!!)
FileInfo.checkBlocks(fileInfo, fileBlocks)
Pair.of(fileInfo, fileBlocks)
}
@@ -243,7 +244,7 @@ class IndexHandler(private val configuration: Configuration, val indexRepository
private fun updateFolderInfo(folder: String, label: String?): FolderInfo {
var folderInfo: FolderInfo? = folderInfoByFolder[folder]
if (folderInfo == null || !TextUtils.isEmpty(label)) {
if (folderInfo == null || label.isNullOrEmpty()) {
folderInfo = FolderInfo(folder, label)
folderInfoByFolder.put(folderInfo.folderId, folderInfo)
}
@@ -292,11 +293,9 @@ class IndexHandler(private val configuration: Configuration, val indexRepository
// private final int MIN_DELAY = 0, MAX_DELAY = 5000, MAX_RECORD_PER_PROCESS = 16, DELAY_FACTOR = 1;
private var startTime: Long? = null
fun handleIndexMessageReceivedEvent(folderId: String, filesList: List<BlockExchangeProtos.FileInfo>, connectionHandler: ConnectionHandler) {
fun handleIndexMessageReceivedEvent(folderId: String, filesList: List<BlockExchangeProtos.FileInfo>, clusterConfigInfo: ClusterConfigInfo, peerDeviceId: DeviceId) {
logger.info("received index message event, preparing (queued records = {} event record count = {})", queuedRecords, filesList.size)
markActive()
val clusterConfigInfo = connectionHandler.clusterConfigInfo
val peerDeviceId = connectionHandler.deviceId()
// List<BlockExchangeProtos.FileInfo> fileList = event.getFilesList();
// for (int index = 0; index < fileList.size(); index += MAX_RECORD_PER_PROCESS) {
// BlockExchangeProtos.IndexUpdate data = BlockExchangeProtos.IndexUpdate.newBuilder()
@@ -320,23 +319,23 @@ class IndexHandler(private val configuration: Configuration, val indexRepository
}
}
private fun processBg(data: BlockExchangeProtos.IndexUpdate, clusterConfigInfo: ConnectionHandler.ClusterConfigInfo?, peerDeviceId: DeviceId) {
private fun processBg(data: BlockExchangeProtos.IndexUpdate, clusterConfigInfo: ClusterConfigInfo?, peerDeviceId: DeviceId) {
logger.debug("received index message event, queuing for processing")
queuedMessages++
queuedRecords += data.filesCount.toLong()
executorService.submitLogging(object : ProcessingRunnable() {
executorService.trySubmitLogging(object : ProcessingRunnable() {
override fun runProcess() {
doHandleIndexMessageReceivedEvent(data, clusterConfigInfo, peerDeviceId)
}
})
}
private fun storeAndProcessBg(data: BlockExchangeProtos.IndexUpdate, clusterConfigInfo: ConnectionHandler.ClusterConfigInfo?, peerDeviceId: DeviceId) {
private fun storeAndProcessBg(data: BlockExchangeProtos.IndexUpdate, clusterConfigInfo: ClusterConfigInfo?, peerDeviceId: DeviceId) {
val key = tempRepository.pushTempData(data.toByteArray())
logger.debug("received index message event, stored to temp record {}, queuing for processing", key)
queuedMessages++
queuedRecords += data.filesCount.toLong()
executorService.submitLogging(object : ProcessingRunnable() {
executorService.trySubmitLogging(object : ProcessingRunnable() {
override fun runProcess() {
try {
doHandleIndexMessageReceivedEvent(key, clusterConfigInfo, peerDeviceId)
@@ -369,7 +368,7 @@ class IndexHandler(private val configuration: Configuration, val indexRepository
// return fileSequence < localSequence;
// }
@Throws(IOException::class)
protected fun doHandleIndexMessageReceivedEvent(key: String, clusterConfigInfo: ConnectionHandler.ClusterConfigInfo?, peerDeviceId: DeviceId) {
protected fun doHandleIndexMessageReceivedEvent(key: String, clusterConfigInfo: ClusterConfigInfo?, peerDeviceId: DeviceId) {
logger.debug("processing index message event from temp record {}", key)
markActive()
val data = tempRepository.popTempData(key)
@@ -377,7 +376,7 @@ class IndexHandler(private val configuration: Configuration, val indexRepository
doHandleIndexMessageReceivedEvent(message, clusterConfigInfo, peerDeviceId)
}
protected fun doHandleIndexMessageReceivedEvent(message: BlockExchangeProtos.IndexUpdate, clusterConfigInfo: ConnectionHandler.ClusterConfigInfo?, peerDeviceId: DeviceId) {
protected fun doHandleIndexMessageReceivedEvent(message: BlockExchangeProtos.IndexUpdate, clusterConfigInfo: ClusterConfigInfo?, peerDeviceId: DeviceId) {
// synchronized (writeAccessLock) {
// if (addProcessingDelayForInterface) {
// delay = Math.min(MAX_DELAY, Math.max(MIN_DELAY, lastRecordProcessingTime * DELAY_FACTOR));
@@ -0,0 +1,49 @@
/*
* Copyright (C) 2018 Jonas Lochmann
*
* This Java file is subject to the terms of the Mozilla Public
* License, v. 2.0. If a copy of the MPL was not distributed with this
* file, You can obtain one at http://mozilla.org/MPL/2.0/.
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package net.syncthing.java.bep
import net.syncthing.java.bep.connectionactor.ConnectionActorWrapper
import java.io.IOException
import java.util.*
class MultiConnectionHelper (
initialConnections: List<ConnectionActorWrapper>,
private val connectionFilter: (ConnectionActorWrapper) -> Boolean
) {
companion object {
private val random = Random()
}
private val usableConnections = initialConnections.toMutableList()
fun pickConnection(): ConnectionActorWrapper {
val possibleConnections = synchronized(usableConnections) {
usableConnections.filter { it.isConnected and connectionFilter(it) }
}
if (possibleConnections.isEmpty()) {
throw IOException("no matching connection is available")
} else if (possibleConnections.size == 1) {
return possibleConnections.first()
} else {
return possibleConnections[random.nextInt(possibleConnections.size)]
}
}
fun disableConnection(wrapper: ConnectionActorWrapper) {
synchronized(usableConnections) {
usableConnections.remove(wrapper)
}
}
}
@@ -0,0 +1,54 @@
package net.syncthing.java.bep
import kotlinx.coroutines.Deferred
import net.syncthing.java.core.beans.DeviceId
import java.io.IOException
class RequestHandlerRegistry {
private val listeners = mutableMapOf<RequestHandlerFilter, (BlockExchangeProtos.Request) -> Deferred<BlockExchangeProtos.Response>>()
suspend fun handleRequest(source: DeviceId, request: BlockExchangeProtos.Request): BlockExchangeProtos.Response {
val rule = RequestHandlerFilter(
deviceId = source,
folderId = request.folder,
path = request.name
)
val matchingListener = synchronized(listeners) {
listeners[rule]
}
if (matchingListener != null) {
return matchingListener(request).await()
} else {
return BlockExchangeProtos.Response.newBuilder()
.setId(request.id)
.setCode(BlockExchangeProtos.ErrorCode.GENERIC)
.build()
}
}
fun registerListener(filter: RequestHandlerFilter, listener: (BlockExchangeProtos.Request) -> Deferred<BlockExchangeProtos.Response>) {
synchronized(listeners) {
val oldListener = listeners[filter]
if (oldListener != null) {
throw IOException("there is already an listener for this filter")
}
listeners[filter] = listener
}
}
fun unregisterListener(filter: RequestHandlerFilter) {
synchronized(listeners) {
listeners.remove(filter)
}
}
}
data class RequestHandlerFilter(
val deviceId: DeviceId,
val folderId: String,
val path: String
)
@@ -1,50 +0,0 @@
/*
* Copyright (C) 2018 Jonas Lochmann
*
* This Java file is subject to the terms of the Mozilla Public
* License, v. 2.0. If a copy of the MPL was not distributed with this
* file, You can obtain one at http://mozilla.org/MPL/2.0/.
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package net.syncthing.java.bep
import org.slf4j.LoggerFactory
import java.util.*
import java.util.concurrent.atomic.AtomicInteger
import kotlin.collections.HashMap
class ResponseHandler {
companion object {
private val logger = LoggerFactory.getLogger(ResponseHandler::class.java)
}
private val responseListeners = Collections.synchronizedMap(HashMap<Int, (BlockExchangeProtos.Response) -> Unit>())
private val nextRequestId = AtomicInteger(0)
fun registerListener(listener: (BlockExchangeProtos.Response) -> Unit): Int {
val requestId = nextRequestId.getAndIncrement()
responseListeners[requestId] = listener
return requestId
}
fun unregisterListener(requestId: Int) {
responseListeners.remove(requestId)
}
fun handleResponse(response: BlockExchangeProtos.Response) {
val listener = responseListeners.remove(response.id)
if (listener != null) {
listener(response)
} else {
logger.warn("received response for {} without associated handler", response.id)
}
}
}
@@ -0,0 +1,139 @@
/*
* Copyright (C) 2016 Davide Imbriaco
* Copyright (C) 2018 Jonas Lochmann
*
* This Java file is subject to the terms of the Mozilla Public
* License, v. 2.0. If a copy of the MPL was not distributed with this
* file, You can obtain one at http://mozilla.org/MPL/2.0/.
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package net.syncthing.java.bep.connectionactor
import com.google.protobuf.ByteString
import net.syncthing.java.bep.BlockExchangeProtos
import net.syncthing.java.bep.IndexHandler
import net.syncthing.java.core.beans.DeviceId
import net.syncthing.java.core.beans.FolderInfo
import net.syncthing.java.core.configuration.Configuration
import org.slf4j.LoggerFactory
object ClusterConfigHandler {
private val logger = LoggerFactory.getLogger(ClusterConfigHandler::class.java)
fun buildClusterConfig(
configuration: Configuration,
indexHandler: IndexHandler,
deviceId: DeviceId
): BlockExchangeProtos.ClusterConfig {
val builder = BlockExchangeProtos.ClusterConfig.newBuilder()
for (folder in configuration.folders) {
val folderBuilder = BlockExchangeProtos.Folder.newBuilder()
.setId(folder.folderId)
.setLabel(folder.label)
// add this device
folderBuilder.addDevices(
BlockExchangeProtos.Device.newBuilder()
.setId(ByteString.copyFrom(configuration.localDeviceId.toHashData()))
.setIndexId(indexHandler.sequencer().indexId())
.setMaxSequence(indexHandler.sequencer().currentSequence())
)
// add other device
val indexSequenceInfo = indexHandler.indexRepository.findIndexInfoByDeviceAndFolder(deviceId, folder.folderId)
folderBuilder.addDevices(
BlockExchangeProtos.Device.newBuilder()
.setId(ByteString.copyFrom(deviceId.toHashData()))
.apply {
indexSequenceInfo?.let {
setIndexId(indexSequenceInfo.indexId)
setMaxSequence(indexSequenceInfo.localSequence)
logger.info("send delta index info device = {} index = {} max (local) sequence = {}",
indexSequenceInfo.deviceId,
indexSequenceInfo.indexId,
indexSequenceInfo.localSequence)
}
}
)
builder.addFolders(folderBuilder)
// TODO: add the other devices to the cluster config
}
return builder.build()
}
// TODO: understand this
internal fun handleReceivedClusterConfig(
clusterConfig: BlockExchangeProtos.ClusterConfig,
configuration: Configuration,
otherDeviceId: DeviceId,
indexHandler: IndexHandler
): ClusterConfigInfo {
val folderInfoList = mutableListOf<ClusterConfigFolderInfo>()
val newSharedFolders = mutableListOf<FolderInfo>()
for (folder in clusterConfig.foldersList ?: emptyList()) {
var folderInfo = ClusterConfigFolderInfo(folder.id, folder.label)
val devicesById = (folder.devicesList ?: emptyList())
.associateBy { input ->
DeviceId.fromHashData(input.id!!.toByteArray())
}
val otherDevice = devicesById[otherDeviceId]
val ourDevice = devicesById[configuration.localDeviceId]
if (otherDevice != null) {
folderInfo = folderInfo.copy(isAnnounced = true)
}
if (ourDevice != null) {
folderInfo = folderInfo.copy(isShared = true)
logger.info("folder shared from device = {} folder = {}", otherDeviceId, folderInfo)
val folderIds = configuration.folders.map { it.folderId }
if (!folderIds.contains(folderInfo.folderId)) {
val fi = FolderInfo(folderInfo.folderId, folderInfo.label)
configuration.folders = configuration.folders + fi
newSharedFolders.add(fi)
logger.info("new folder shared = {}", folderInfo)
}
} else {
logger.info("folder not shared from device = {} folder = {}", otherDeviceId, folderInfo)
}
folderInfoList.add(folderInfo)
}
configuration.persistLater()
indexHandler.handleClusterConfigMessageProcessedEvent(clusterConfig)
return ClusterConfigInfo(folderInfoList, newSharedFolders)
}
}
class ClusterConfigInfo (val folderInfo: List<ClusterConfigFolderInfo>, val newSharedFolders: List<FolderInfo>) {
companion object {
val dummy = ClusterConfigInfo(folderInfo = emptyList(), newSharedFolders = emptyList())
}
val folderInfoById = folderInfo.associateBy { it.folderId }
val sharedFolderIds: Set<String> by lazy {
folderInfo.filter { it.isShared }.map { it.folderId }.toSet()
}
}
data class ClusterConfigFolderInfo(
val folderId: String,
val label: String = folderId,
val isAnnounced: Boolean = false,
val isShared: Boolean = false
) {
init {
assert(folderId.isNotEmpty())
}
}
@@ -0,0 +1,29 @@
/*
* Copyright (C) 2018 Jonas Lochmann
*
* This Java file is subject to the terms of the Mozilla Public
* License, v. 2.0. If a copy of the MPL was not distributed with this
* file, You can obtain one at http://mozilla.org/MPL/2.0/.
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package net.syncthing.java.bep.connectionactor
import kotlinx.coroutines.CompletableDeferred
import net.syncthing.java.bep.BlockExchangeProtos
sealed class ConnectionAction
object CloseConnectionAction: ConnectionAction()
class SendRequestConnectionAction(
val request: BlockExchangeProtos.Request,
val completableDeferred: CompletableDeferred<BlockExchangeProtos.Response>
): ConnectionAction()
class ConfirmIsConnectedAction(val completableDeferred: CompletableDeferred<ClusterConfigInfo>): ConnectionAction()
class SendIndexUpdateAction(
val message: BlockExchangeProtos.IndexUpdate,
val completableDeferred: CompletableDeferred<Unit?>
): ConnectionAction()
@@ -0,0 +1,204 @@
/*
* Copyright (C) 2016 Davide Imbriaco
* Copyright (C) 2018 Jonas Lochmann
*
* This Java file is subject to the terms of the Mozilla Public
* License, v. 2.0. If a copy of the MPL was not distributed with this
* file, You can obtain one at http://mozilla.org/MPL/2.0/.
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package net.syncthing.java.bep.connectionactor
import kotlinx.coroutines.*
import kotlinx.coroutines.channels.*
import net.syncthing.java.bep.BlockExchangeProtos
import net.syncthing.java.bep.IndexHandler
import net.syncthing.java.core.beans.DeviceAddress
import net.syncthing.java.core.configuration.Configuration
import org.slf4j.LoggerFactory
import java.io.IOException
object ConnectionActorGenerator {
private val closed = Channel<ConnectionAction>().apply { cancel() }
private val logger = LoggerFactory.getLogger(ConnectionActorGenerator::class.java)
private fun deviceAddressesGenerator(deviceAddress: ReceiveChannel<DeviceAddress>) = GlobalScope.produce<List<DeviceAddress>> (capacity = Channel.CONFLATED) {
val addresses = mutableMapOf<String, DeviceAddress>()
deviceAddress.consumeEach { address ->
val isNew = addresses[address.address] == null
addresses[address.address] = address
if (isNew) {
send(
addresses.values.sortedBy { it.score }
)
}
}
}
private fun <T> waitForFirstValue(source: ReceiveChannel<T>, time: Long) = GlobalScope.produce<T> {
source.consume {
val firstValue = source.receive()
var lastValue = firstValue
try {
withTimeout(time) {
while (true) {
lastValue = source.receive()
}
}
throw IllegalStateException()
} catch (ex: TimeoutCancellationException) {
// this is expected here
}
send(lastValue)
// other values without delay
for (value in source) {
send(value)
}
}
}
fun generateConnectionActors(
deviceAddress: ReceiveChannel<DeviceAddress>,
configuration: Configuration,
indexHandler: IndexHandler,
requestHandler: (BlockExchangeProtos.Request) -> Deferred<BlockExchangeProtos.Response>
) = generateConnectionActorsFromDeviceAddressList(
deviceAddressSource = waitForFirstValue(
source = deviceAddressesGenerator(deviceAddress),
time = 1000
),
configuration = configuration,
indexHandler = indexHandler,
requestHandler = requestHandler
)
fun generateConnectionActorsFromDeviceAddressList(
deviceAddressSource: ReceiveChannel<List<DeviceAddress>>,
configuration: Configuration,
indexHandler: IndexHandler,
requestHandler: (BlockExchangeProtos.Request) -> Deferred<BlockExchangeProtos.Response>
) = GlobalScope.produce<Pair<SendChannel<ConnectionAction>, ClusterConfigInfo>> {
var currentActor: SendChannel<ConnectionAction> = closed
var currentDeviceAddress: DeviceAddress? = null
suspend fun closeCurrent() {
if (currentActor != closed) {
currentActor.close()
currentActor = closed
send(currentActor to ClusterConfigInfo.dummy)
}
}
suspend fun tryConnectingToAddressHandleBaseErrors(deviceAddress: DeviceAddress) = try {
val newActor = ConnectionActor.createInstance(deviceAddress, configuration, indexHandler, requestHandler)
val clusterConfig = ConnectionActorUtil.waitUntilConnected(newActor)
newActor to clusterConfig
} catch (ex: Exception) {
logger.warn("failed to connect to $deviceAddress", ex)
when (ex) {
is IOException -> {/* expected -> ignore */}
is InterruptedException -> {/* expected -> ignore */}
else -> throw ex
}
null
}
suspend fun dispatchConnection(
connection: SendChannel<ConnectionAction>,
clusterConfig: ClusterConfigInfo,
deviceAddress: DeviceAddress
) {
currentActor = connection
currentDeviceAddress = deviceAddress
send(connection to clusterConfig)
}
suspend fun tryConnectingToAddress(deviceAddress: DeviceAddress): Boolean {
closeCurrent()
var connection = tryConnectingToAddressHandleBaseErrors(deviceAddress) ?: return false
if (connection.second.newSharedFolders.isNotEmpty()) {
logger.debug("connected to $deviceAddress with new folders -> reconnect")
// reconnect to send new cluster config
connection.first.close()
connection = tryConnectingToAddressHandleBaseErrors(deviceAddress) ?: return false
}
logger.debug("connected to $deviceAddress")
dispatchConnection(connection.first, connection.second, deviceAddress)
return true
}
fun isConnected() = !currentActor.isClosedForSend
invokeOnClose {
currentActor.close()
}
val reconnectTicker = ticker(delayMillis = 30 * 1000, initialDelayMillis = 0)
deviceAddressSource.consume {
var lastDeviceAddressList: List<DeviceAddress> = emptyList()
while (true) {
if (isConnected()) {
lastDeviceAddressList = deviceAddressSource.poll() ?: lastDeviceAddressList
if (lastDeviceAddressList.isNotEmpty()) {
if (reconnectTicker.poll() != null) {
if (currentDeviceAddress != lastDeviceAddressList.first()) {
val oldDeviceAddress = currentDeviceAddress!!
if (!tryConnectingToAddress(lastDeviceAddressList.first())) {
tryConnectingToAddress(oldDeviceAddress)
}
}
}
} else {
closeCurrent()
}
delay(500) // don't take too much CPU
} else /* is not connected */ {
// get the new list version if there is any
lastDeviceAddressList = deviceAddressSource.poll() ?: lastDeviceAddressList
// try all addresses
for (address in lastDeviceAddressList) {
if (tryConnectingToAddress(address)) {
break
}
}
// reset countdown before trying other connection if it would be time now
// this does not reset if it has not counted down the whole time yet
reconnectTicker.poll()
// wait for new device address list but not more than 15 seconds before the next iteration
lastDeviceAddressList = withTimeoutOrNull(15 * 1000) {
deviceAddressSource.receive()
} ?: lastDeviceAddressList
}
}
}
}
}
@@ -0,0 +1,45 @@
/*
* Copyright (C) 2018 Jonas Lochmann
*
* This Java file is subject to the terms of the Mozilla Public
* License, v. 2.0. If a copy of the MPL was not distributed with this
* file, You can obtain one at http://mozilla.org/MPL/2.0/.
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package net.syncthing.java.bep.connectionactor
import kotlinx.coroutines.CompletableDeferred
import kotlinx.coroutines.channels.SendChannel
import net.syncthing.java.bep.BlockExchangeProtos
object ConnectionActorUtil {
suspend fun waitUntilConnected(actor: SendChannel<ConnectionAction>): ClusterConfigInfo {
val deferred = CompletableDeferred<ClusterConfigInfo>()
actor.send(ConfirmIsConnectedAction(deferred))
actor.invokeOnClose { deferred.cancel() }
return deferred.await()
}
suspend fun sendRequest(request: BlockExchangeProtos.Request, actor: SendChannel<ConnectionAction>): BlockExchangeProtos.Response {
val deferred = CompletableDeferred<BlockExchangeProtos.Response>()
actor.send(SendRequestConnectionAction(request, deferred))
return deferred.await()
}
suspend fun sendIndexUpdate(update: BlockExchangeProtos.IndexUpdate, actor: SendChannel<ConnectionAction>) {
val deferred = CompletableDeferred<Unit?>()
actor.send(SendIndexUpdateAction(update, deferred))
deferred.await()
}
}
@@ -0,0 +1,86 @@
/*
* Copyright (C) 2018 Jonas Lochmann
*
* This Java file is subject to the terms of the Mozilla Public
* License, v. 2.0. If a copy of the MPL was not distributed with this
* file, You can obtain one at http://mozilla.org/MPL/2.0/.
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package net.syncthing.java.bep.connectionactor
import kotlinx.coroutines.*
import kotlinx.coroutines.channels.ReceiveChannel
import kotlinx.coroutines.channels.SendChannel
import kotlinx.coroutines.channels.consumeEach
import net.syncthing.java.bep.BlockExchangeProtos
import net.syncthing.java.core.beans.DeviceId
import java.io.IOException
class ConnectionActorWrapper (
private val source: ReceiveChannel<Pair<SendChannel<ConnectionAction>, ClusterConfigInfo>>,
val deviceId: DeviceId,
val connectivityChangeListener: () -> Unit
) {
private val job = Job()
private var currentConnectionActor: SendChannel<ConnectionAction>? = null
private var clusterConfigInfo: ClusterConfigInfo? = null
var isConnected = false
get() = currentConnectionActor?.isClosedForSend == false
init {
GlobalScope.launch (job) {
source.consumeEach { (connectionActor, clusterConfig) ->
currentConnectionActor = connectionActor
clusterConfigInfo = clusterConfig
}
}
// this is a very simple solution but it does its job
GlobalScope.launch (job) {
var previousConnected = false
while (isActive) {
val nowConnected = isConnected
if (previousConnected != nowConnected) {
previousConnected = nowConnected
connectivityChangeListener()
}
delay(200)
}
}
}
suspend fun sendRequest(request: BlockExchangeProtos.Request) = ConnectionActorUtil.sendRequest(
request,
currentConnectionActor ?: throw IOException("not connected")
)
suspend fun sendIndexUpdate(update: BlockExchangeProtos.IndexUpdate) = ConnectionActorUtil.sendIndexUpdate(
update,
currentConnectionActor ?: throw IOException("not connected")
)
fun hasFolder(folderId: String) = clusterConfigInfo?.sharedFolderIds?.contains(folderId) ?: false
fun getClusterConfig() = clusterConfigInfo ?: throw IOException("not connected")
fun shutdown() {
job.cancel()
}
// this triggers a disconnection
// the ConnectionActorGenerator will reconnect soon
fun reconnect() {
currentConnectionActor?.close()
}
}
@@ -1,5 +1,6 @@
/*
/*
* Copyright (C) 2016 Davide Imbriaco
* Copyright (C) 2018 Jonas Lochmann
*
* This Java file is subject to the terms of the Mozilla Public
* License, v. 2.0. If a copy of the MPL was not distributed with this
@@ -11,13 +12,8 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package net.syncthing.java.bep
internal data class ClusterConfigFolderInfo(val folderId: String, var label: String = folderId,
var isAnnounced: Boolean = false, var isShared: Boolean = false) {
init {
assert(folderId.isNotEmpty())
}
package net.syncthing.java.bep.connectionactor
object ConnectionConstants {
const val MAGIC = 0x2EA7D90B
}
@@ -0,0 +1,95 @@
/*
* Copyright (C) 2016 Davide Imbriaco
* Copyright (C) 2018 Jonas Lochmann
*
* This Java file is subject to the terms of the Mozilla Public
* License, v. 2.0. If a copy of the MPL was not distributed with this
* file, You can obtain one at http://mozilla.org/MPL/2.0/.
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package net.syncthing.java.bep.connectionactor
import net.syncthing.java.bep.BlockExchangeProtos
import net.syncthing.java.core.beans.DeviceId
import net.syncthing.java.core.beans.DeviceInfo
import net.syncthing.java.core.configuration.Configuration
import net.syncthing.java.core.utils.NetworkUtils
import org.slf4j.LoggerFactory
import java.io.DataInputStream
import java.io.DataOutputStream
import java.nio.ByteBuffer
object HelloMessageHandler {
private val logger = LoggerFactory.getLogger(HelloMessageHandler::class.java)
fun sendHelloMessage(configuration: Configuration, outputStream: DataOutputStream) {
sendHelloMessage(
BlockExchangeProtos.Hello.newBuilder()
.setClientName(configuration.clientName)
.setClientVersion(configuration.clientVersion)
.setDeviceName(configuration.localDeviceName)
.build(),
outputStream
)
}
private fun sendHelloMessage(message: BlockExchangeProtos.Hello, outputStream: DataOutputStream) {
sendHelloMessage(message.toByteArray(), outputStream)
}
private fun sendHelloMessage(payload: ByteArray, outputStream: DataOutputStream) {
logger.debug("Sending hello message")
outputStream.apply {
write(
ByteBuffer.allocate(6).apply {
putInt(ConnectionConstants.MAGIC)
putShort(payload.size.toShort())
}.array()
)
write(payload)
flush()
}
}
fun receiveHelloMessage(
inputStream: DataInputStream
): BlockExchangeProtos.Hello {
val magic = inputStream.readInt()
NetworkUtils.assertProtocol(magic == ConnectionConstants.MAGIC) {"magic mismatch, got $magic"}
val length = inputStream.readShort().toInt()
NetworkUtils.assertProtocol(length > 0) {"invalid length, must be > 0, got $length"}
return BlockExchangeProtos.Hello.parseFrom(
ByteArray(length).apply {
inputStream.readFully(this)
}
)
}
fun processHelloMessage(
hello: BlockExchangeProtos.Hello,
configuration: Configuration,
deviceId: DeviceId
) {
logger.info("Received hello message, deviceName=${hello.deviceName}, clientName=${hello.clientName}, clientVersion=${hello.clientVersion}")
// update the local device name
// TODO: this could need some locking
configuration.peers = configuration.peers.map { peer ->
if (peer.deviceId == deviceId) {
DeviceInfo(deviceId, hello.deviceName)
} else {
peer
}
}.toSet()
configuration.persistLater()
}
}
@@ -0,0 +1,225 @@
/*
* Copyright (C) 2016 Davide Imbriaco
* Copyright (C) 2018 Jonas Lochmann
*
* This Java file is subject to the terms of the Mozilla Public
* License, v. 2.0. If a copy of the MPL was not distributed with this
* file, You can obtain one at http://mozilla.org/MPL/2.0/.
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package net.syncthing.java.bep.connectionactor
import com.google.protobuf.MessageLite
import kotlinx.coroutines.*
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.channels.SendChannel
import kotlinx.coroutines.channels.consumeEach
import kotlinx.coroutines.sync.Mutex
import kotlinx.coroutines.sync.withLock
import net.syncthing.java.bep.BlockExchangeProtos
import net.syncthing.java.bep.IndexHandler
import net.syncthing.java.core.beans.DeviceAddress
import net.syncthing.java.core.configuration.Configuration
import net.syncthing.java.core.security.KeystoreHandler
import java.io.DataInputStream
import java.io.DataOutputStream
import java.io.IOException
import java.util.*
object ConnectionActor {
fun createInstance(
address: DeviceAddress,
configuration: Configuration,
indexHandler: IndexHandler,
requestHandler: (BlockExchangeProtos.Request) -> Deferred<BlockExchangeProtos.Response>
): SendChannel<ConnectionAction> {
val channel = Channel<ConnectionAction>(Channel.RENDEZVOUS)
GlobalScope.async (Dispatchers.IO) {
OpenConnection.openSocketConnection(address, configuration).use { socket ->
val inputStream = DataInputStream(socket.inputStream)
val outputStream = DataOutputStream(socket.outputStream)
val helloMessage = coroutineScope {
async { HelloMessageHandler.sendHelloMessage(configuration, outputStream) }
async { HelloMessageHandler.receiveHelloMessage(inputStream) }.await()
}
// the hello message exchange should happen before the certificate validation
KeystoreHandler.assertSocketCertificateValid(socket, address.deviceId)
// now (after the validation) use the content of the hello message
HelloMessageHandler.processHelloMessage(helloMessage, configuration, address.deviceId)
// helpers for messages
val sendPostAuthMessageLock = Mutex()
val receivePostAuthMessageLock = Mutex()
suspend fun sendPostAuthMessage(message: MessageLite) = sendPostAuthMessageLock.withLock {
PostAuthenticationMessageHandler.sendMessage(outputStream, message, markActivityOnSocket = {})
}
suspend fun receivePostAuthMessage() = receivePostAuthMessageLock.withLock {
PostAuthenticationMessageHandler.receiveMessage(inputStream, markActivityOnSocket = {})
}
// cluster config exchange
val clusterConfig = coroutineScope {
launch { sendPostAuthMessage(ClusterConfigHandler.buildClusterConfig(configuration, indexHandler, address.deviceId)) }
async { receivePostAuthMessage() }.await()
}.second
if (!(clusterConfig is BlockExchangeProtos.ClusterConfig)) {
throw IOException("first message was not a cluster config message")
}
val clusterConfigInfo = ClusterConfigHandler.handleReceivedClusterConfig(
clusterConfig = clusterConfig,
configuration = configuration,
otherDeviceId = address.deviceId,
indexHandler = indexHandler
)
fun hasFolder(folder: String) = clusterConfigInfo.sharedFolderIds.contains(folder)
val messageListeners = Collections.synchronizedMap(mutableMapOf<Int, CompletableDeferred<BlockExchangeProtos.Response>>())
try {
launch {
while (isActive) {
val message = receivePostAuthMessage().second
when (message) {
is BlockExchangeProtos.Response -> {
val listener = messageListeners.remove(message.id)
listener
?: throw IOException("got response ${message.id} but there is no response listener")
listener.complete(message)
}
is BlockExchangeProtos.Index -> {
indexHandler.handleIndexMessageReceivedEvent(
folderId = message.folder,
filesList = message.filesList,
clusterConfigInfo = clusterConfigInfo,
peerDeviceId = address.deviceId
)
}
is BlockExchangeProtos.IndexUpdate -> {
indexHandler.handleIndexMessageReceivedEvent(
folderId = message.folder,
filesList = message.filesList,
clusterConfigInfo = clusterConfigInfo,
peerDeviceId = address.deviceId
)
}
is BlockExchangeProtos.Request -> {
launch {
val response = requestHandler(message).await()
try {
sendPostAuthMessage(response)
} catch (ex: IOException) {
// the connection was closed in the time between - ignore it
}
}
}
is BlockExchangeProtos.Ping -> { /* nothing to do */
}
is BlockExchangeProtos.ClusterConfig -> throw IOException("received cluster config twice")
is BlockExchangeProtos.Close -> socket.close()
else -> throw IOException("unsupported message type ${message.javaClass}")
}
}
}
// send index messages - TODO: Why?
for (folder in configuration.folders) {
if (hasFolder(folder.folderId)) {
sendPostAuthMessage(
BlockExchangeProtos.Index.newBuilder()
.setFolder(folder.folderId)
.build()
)
}
}
launch {
// send ping all 90 seconds
// TODO: only send when there were no messages for 90 seconds
while (isActive) {
delay(90 * 1000)
launch { sendPostAuthMessage(BlockExchangeProtos.Ping.getDefaultInstance()) }
}
}
var nextRequestId = 0
channel.consumeEach { action ->
when (action) {
CloseConnectionAction -> throw InterruptedException()
is SendRequestConnectionAction -> {
val requestId = nextRequestId++
messageListeners[requestId] = action.completableDeferred
// async to allow handling the next action faster
async {
try {
sendPostAuthMessage(
action.request.toBuilder()
.setId(requestId)
.build()
)
} catch (ex: Exception) {
action.completableDeferred.cancel(ex)
}
}
}
is ConfirmIsConnectedAction -> {
action.completableDeferred.complete(clusterConfigInfo)
// otherwise, Kotlin would warn that the return
// type does not match to the other branches
null
}
is SendIndexUpdateAction -> {
async {
try {
sendPostAuthMessage(action.message)
} catch (ex: Exception) {
action.completableDeferred.cancel(ex)
}
}
}
}.let { /* prevents compiling if one action is not handled */ }
}
} finally {
// send close message
withContext(NonCancellable) {
if (socket.isConnected) {
sendPostAuthMessage(BlockExchangeProtos.Close.getDefaultInstance())
}
}
// cancel all pending listeners
messageListeners.values.forEach { it.cancel() }
}
}
}.invokeOnCompletion { ex ->
if (ex != null) {
channel.cancel(ex)
} else {
channel.cancel()
}
}
return channel
}
}
@@ -0,0 +1,46 @@
/*
* Copyright (C) 2016 Davide Imbriaco
* Copyright (C) 2018 Jonas Lochmann
*
* This Java file is subject to the terms of the Mozilla Public
* License, v. 2.0. If a copy of the MPL was not distributed with this
* file, You can obtain one at http://mozilla.org/MPL/2.0/.
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package net.syncthing.java.bep.connectionactor
import com.google.protobuf.MessageLite
import net.syncthing.java.bep.BlockExchangeProtos
object MessageTypes {
val messageTypes = listOf(
MessageTypeInfo(BlockExchangeProtos.MessageType.CLOSE, BlockExchangeProtos.Close::class.java) { BlockExchangeProtos.Close.parseFrom(it) },
MessageTypeInfo(BlockExchangeProtos.MessageType.CLUSTER_CONFIG, BlockExchangeProtos.ClusterConfig::class.java) { BlockExchangeProtos.ClusterConfig.parseFrom(it) },
MessageTypeInfo(BlockExchangeProtos.MessageType.DOWNLOAD_PROGRESS, BlockExchangeProtos.DownloadProgress::class.java) { BlockExchangeProtos.DownloadProgress.parseFrom(it) },
MessageTypeInfo(BlockExchangeProtos.MessageType.INDEX, BlockExchangeProtos.Index::class.java) { BlockExchangeProtos.Index.parseFrom(it) },
MessageTypeInfo(BlockExchangeProtos.MessageType.INDEX_UPDATE, BlockExchangeProtos.IndexUpdate::class.java) { BlockExchangeProtos.IndexUpdate.parseFrom(it) },
MessageTypeInfo(BlockExchangeProtos.MessageType.PING, BlockExchangeProtos.Ping::class.java) { BlockExchangeProtos.Ping.parseFrom(it) },
MessageTypeInfo(BlockExchangeProtos.MessageType.REQUEST, BlockExchangeProtos.Request::class.java) { BlockExchangeProtos.Request.parseFrom(it) },
MessageTypeInfo(BlockExchangeProtos.MessageType.RESPONSE, BlockExchangeProtos.Response::class.java) { BlockExchangeProtos.Response.parseFrom(it) }
)
val messageTypesByProtoMessageType = messageTypes.map { it.protoMessageType to it }.toMap()
val messageTypesByJavaClass = messageTypes.map { it.javaClass to it }.toMap()
fun getIdForMessage(message: MessageLite) = when (message) {
is BlockExchangeProtos.Request -> Integer.toString(message.id)
is BlockExchangeProtos.Response -> Integer.toString(message.id)
else -> Integer.toString(Math.abs(message.hashCode()))
}
}
data class MessageTypeInfo(
val protoMessageType: BlockExchangeProtos.MessageType,
val javaClass: Class<out MessageLite>,
val parseFrom: (data: ByteArray) -> MessageLite
)
@@ -0,0 +1,45 @@
/*
* Copyright (C) 2016 Davide Imbriaco
* Copyright (C) 2018 Jonas Lochmann
*
* This Java file is subject to the terms of the Mozilla Public
* License, v. 2.0. If a copy of the MPL was not distributed with this
* file, You can obtain one at http://mozilla.org/MPL/2.0/.
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package net.syncthing.java.bep.connectionactor
import net.syncthing.java.client.protocol.rp.RelayClient
import net.syncthing.java.core.beans.DeviceAddress
import net.syncthing.java.core.configuration.Configuration
import net.syncthing.java.core.security.KeystoreHandler
import org.slf4j.LoggerFactory
import javax.net.ssl.SSLSocket
object OpenConnection {
private val logger = LoggerFactory.getLogger(OpenConnection::class.java)
fun openSocketConnection(
address: DeviceAddress,
configuration: Configuration
): SSLSocket {
val keystoreHandler = KeystoreHandler.Loader().loadKeystore(configuration)
return when (address.type) {
DeviceAddress.AddressType.TCP -> {
logger.debug("opening tcp ssl connection")
keystoreHandler.createSocket(address.getSocketAddress())
}
DeviceAddress.AddressType.RELAY -> {
logger.debug("opening relay connection")
keystoreHandler.wrapSocket(RelayClient(configuration).openRelayConnection(address))
}
else -> throw UnsupportedOperationException("unsupported address type ${address.type}")
}
}
}
@@ -0,0 +1,140 @@
/*
* Copyright (C) 2016 Davide Imbriaco
* Copyright (C) 2018 Jonas Lochmann
*
* This Java file is subject to the terms of the Mozilla Public
* License, v. 2.0. If a copy of the MPL was not distributed with this
* file, You can obtain one at http://mozilla.org/MPL/2.0/.
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package net.syncthing.java.bep.connectionactor
import com.google.protobuf.MessageLite
import net.jpountz.lz4.LZ4Factory
import net.syncthing.java.bep.BlockExchangeProtos
import net.syncthing.java.core.utils.NetworkUtils
import org.slf4j.LoggerFactory
import java.io.DataInputStream
import java.io.DataOutputStream
import java.io.IOException
import java.lang.reflect.InvocationTargetException
import java.nio.ByteBuffer
object PostAuthenticationMessageHandler {
private val logger = LoggerFactory.getLogger(PostAuthenticationMessageHandler::class.java)
fun sendMessage(
outputStream: DataOutputStream,
message: MessageLite,
markActivityOnSocket: () -> Unit
) {
val messageTypeInfo = MessageTypes.messageTypesByJavaClass[message.javaClass]!!
val header = BlockExchangeProtos.Header.newBuilder()
.setCompression(BlockExchangeProtos.MessageCompression.NONE)
.setType(messageTypeInfo.protoMessageType)
.build()
val headerData = header.toByteArray()
val messageData = message.toByteArray() //TODO support compression
logger.debug("sending message type = {} {}", header.type, MessageTypes.getIdForMessage(message))
markActivityOnSocket()
outputStream.apply {
writeShort(headerData.size)
write(headerData)
writeInt(messageData.size)
write(messageData)
flush()
}
markActivityOnSocket()
}
fun receiveMessage(
inputStream: DataInputStream,
markActivityOnSocket: () -> Unit
): Pair<BlockExchangeProtos.MessageType, MessageLite> {
val header = BlockExchangeProtos.Header.parseFrom(readHeader(
inputStream = inputStream,
retryReadingLength = true,
markActivityOnSocket = markActivityOnSocket
))
var messageBuffer = readMessage(
inputStream = inputStream,
retryReadingLength = true,
markActivityOnSocket = markActivityOnSocket
)
if (header.compression == BlockExchangeProtos.MessageCompression.LZ4) {
val uncompressedLength = ByteBuffer.wrap(messageBuffer).int
messageBuffer = LZ4Factory.fastestInstance().fastDecompressor().decompress(messageBuffer, 4, uncompressedLength)
}
val messageTypeInfo = MessageTypes.messageTypesByProtoMessageType[header.type]
NetworkUtils.assertProtocol(messageTypeInfo != null) {"unsupported message type = ${header.type}"}
try {
return header.type to messageTypeInfo!!.parseFrom(messageBuffer)
} catch (e: Exception) {
when (e) {
is IllegalAccessException, is IllegalArgumentException, is InvocationTargetException, is NoSuchMethodException, is SecurityException ->
throw IOException(e)
else -> throw e
}
}
}
private fun readHeader(
inputStream: DataInputStream,
markActivityOnSocket: () -> Unit,
retryReadingLength: Boolean
): ByteArray {
var headerLength = inputStream.readShort().toInt()
// TODO: what is this good for?
if (retryReadingLength) {
while (headerLength == 0) {
logger.warn("got headerLength == 0, skipping short")
headerLength = inputStream.readShort().toInt()
}
}
markActivityOnSocket()
NetworkUtils.assertProtocol(headerLength > 0) {"invalid length, must be > 0, got $headerLength"}
return ByteArray(headerLength).apply {
inputStream.readFully(this)
}
}
private fun readMessage(
inputStream: DataInputStream,
markActivityOnSocket: () -> Unit,
retryReadingLength: Boolean
): ByteArray {
var messageLength = inputStream.readInt()
// TODO: what is this good for?
if (retryReadingLength) {
while (messageLength == 0) {
logger.warn("received readInt() == 0, expecting 'bep message header length' (int >0), ignoring (keepalive?)")
messageLength = inputStream.readInt()
}
}
NetworkUtils.assertProtocol(messageLength >= 0) {"invalid length, must be >= 0, got $messageLength"}
val messageBuffer = ByteArray(messageLength)
inputStream.readFully(messageBuffer)
markActivityOnSocket()
return messageBuffer
}
}
+1
View File
@@ -7,6 +7,7 @@ dependencies {
compile project(':syncthing-repository-default')
compile "commons-cli:commons-cli:1.4"
implementation "org.jetbrains.kotlin:kotlin-stdlib:$kotlin_version"
implementation 'org.jetbrains.kotlinx:kotlinx-coroutines-core:1.0.0'
}
run {
@@ -13,6 +13,7 @@
*/
package net.syncthing.java.client.cli
import kotlinx.coroutines.runBlocking
import net.syncthing.java.client.SyncthingClient
import net.syncthing.java.core.beans.DeviceId
import net.syncthing.java.core.beans.DeviceInfo
@@ -91,28 +92,23 @@ class Main(private val commandLine: CommandLine) {
System.out.println("file path = $folderAndPath")
val folder = folderAndPath.split(":".toRegex()).dropLastWhile({ it.isEmpty() }).toTypedArray()[0]
val path = folderAndPath.split(":".toRegex()).dropLastWhile({ it.isEmpty() }).toTypedArray()[1]
val latch = CountDownLatch(1)
val fileInfo = FileInfo(folder = folder, path = path, type = FileInfo.FileType.FILE)
syncthingClient.getBlockPuller(folder, { blockPuller ->
try {
val inputStream = blockPuller.pullFileSync(fileInfo)
val fileName = syncthingClient.indexHandler.getFileInfoByPath(folder, path)!!.fileName
val file =
if (commandLine.hasOption("o")) {
val param = File(commandLine.getOptionValue("o"))
if (param.isDirectory) File(param, fileName) else param
} else {
File(fileName)
}
FileUtils.copyInputStreamToFile(inputStream, file)
System.out.println("saved file to = $file.absolutePath")
} catch (e: InterruptedException) {
logger.warn("", e)
} catch (e: IOException) {
logger.warn("", e)
try {
val inputStream = syncthingClient.pullFileSync(fileInfo)
val fileName = syncthingClient.indexHandler.getFileInfoByPath(folder, path)!!.fileName
val file = if (commandLine.hasOption("o")) {
val param = File(commandLine.getOptionValue("o"))
if (param.isDirectory) File(param, fileName) else param
} else {
File(fileName)
}
}, { logger.warn("Failed to pull file") })
latch.await()
FileUtils.copyInputStreamToFile(inputStream, file)
System.out.println("saved file to = $file.absolutePath")
} catch (e: InterruptedException) {
logger.warn("", e)
} catch (e: IOException) {
logger.warn("", e)
}
}
"P" -> {
var path = option.value
@@ -122,20 +118,20 @@ class Main(private val commandLine: CommandLine) {
val folder = path.split(":".toRegex()).dropLastWhile({ it.isEmpty() }).toTypedArray()[0]
path = path.split(":".toRegex()).dropLastWhile({ it.isEmpty() }).toTypedArray()[1]
val latch = CountDownLatch(1)
syncthingClient.getBlockPusher(folder, { blockPusher ->
val observer = blockPusher.pushFile(FileInputStream(file), folder, path)
while (!observer.isCompleted()) {
try {
observer.waitForProgressUpdate()
} catch (e: InterruptedException) {
logger.warn("", e)
}
val blockPusher = syncthingClient.getBlockPusher(folder)
System.out.println("upload progress ${observer.progressPercentage()}%")
val observer = runBlocking {
blockPusher.pushFile(FileInputStream(file), folder, path)
}
while (!observer.isCompleted()) {
try {
observer.waitForProgressUpdate()
} catch (e: InterruptedException) {
logger.warn("", e)
}
latch.countDown()
}, { logger.warn("Failed to upload file") })
latch.await()
System.out.println("upload progress ${observer.progressPercentage()}%")
}
System.out.println("uploaded file to network")
}
"D" -> {
@@ -143,17 +139,16 @@ class Main(private val commandLine: CommandLine) {
val folder = path.split(":".toRegex()).dropLastWhile({ it.isEmpty() }).toTypedArray()[0]
path = path.split(":".toRegex()).dropLastWhile({ it.isEmpty() }).toTypedArray()[1]
System.out.println("delete path = $path")
val latch = CountDownLatch(1)
syncthingClient.getBlockPusher(folder, { blockPusher ->
try {
blockPusher.pushDelete(folder, path).waitForComplete()
} catch (e: InterruptedException) {
logger.warn("", e)
}
try {
val blockPusher = syncthingClient.getBlockPusher(folder)
latch.countDown()
}, { System.out.println("Failed to delete path") })
latch.await()
runBlocking {
blockPusher.pushDelete(folder, path)
}
} catch (e: InterruptedException) {
logger.warn("", e)
System.out.println("Failed to delete path")
}
System.out.println("deleted path")
}
"M" -> {
@@ -161,17 +156,16 @@ class Main(private val commandLine: CommandLine) {
val folder = path.split(":".toRegex()).dropLastWhile({ it.isEmpty() }).toTypedArray()[0]
path = path.split(":".toRegex()).dropLastWhile({ it.isEmpty() }).toTypedArray()[1]
System.out.println("dir path = $path")
val latch = CountDownLatch(1)
syncthingClient.getBlockPusher(folder, { blockPusher ->
try {
blockPusher.pushDir(folder, path).waitForComplete()
} catch (e: InterruptedException) {
logger.warn("", e)
}
try {
val blockPusher = syncthingClient.getBlockPusher(folder)
latch.countDown()
}, { System.out.println("Failed to push directory") })
latch.await()
runBlocking {
blockPusher.pushDir(folder, path)
}
} catch (e: InterruptedException) {
System.out.println("Failed to push directory")
logger.warn("", e)
}
System.out.println("uploaded dir to network")
}
"L" -> {
+1
View File
@@ -6,4 +6,5 @@ dependencies {
compile project(':syncthing-bep')
compile project(':syncthing-discovery')
implementation "org.jetbrains.kotlin:kotlin-stdlib:$kotlin_version"
implementation 'org.jetbrains.kotlinx:kotlinx-coroutines-core:1.0.0'
}
@@ -0,0 +1,49 @@
/*
* Copyright (C) 2018 Jonas Lochmann
*
* This Java file is subject to the terms of the Mozilla Public
* License, v. 2.0. If a copy of the MPL was not distributed with this
* file, You can obtain one at http://mozilla.org/MPL/2.0/.
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package net.syncthing.java.client
import net.syncthing.java.bep.connectionactor.ConnectionActorWrapper
import net.syncthing.java.core.beans.DeviceId
class Connections (val generate: (DeviceId) -> ConnectionActorWrapper) {
private val map = mutableMapOf<DeviceId, ConnectionActorWrapper>()
fun getByDeviceId(deviceId: DeviceId): ConnectionActorWrapper {
return synchronized(map) {
val oldEntry = map[deviceId]
if (oldEntry != null) {
return oldEntry
} else {
val newEntry = generate(deviceId)
map[deviceId] = newEntry
return newEntry
}
}
}
fun shutdown() {
synchronized(map) {
map.values.forEach { it.shutdown() }
}
}
fun reconnectAllConnections() {
synchronized(map) {
map.values.forEach { it.reconnect() }
}
}
}
@@ -13,57 +13,63 @@
*/
package net.syncthing.java.client
import net.syncthing.java.bep.BlockPuller
import net.syncthing.java.bep.BlockPusher
import net.syncthing.java.bep.ConnectionHandler
import net.syncthing.java.bep.IndexHandler
import net.syncthing.java.core.beans.DeviceAddress
import kotlinx.coroutines.GlobalScope
import kotlinx.coroutines.async
import kotlinx.coroutines.runBlocking
import net.syncthing.java.bep.*
import net.syncthing.java.bep.connectionactor.ConnectionActorGenerator
import net.syncthing.java.bep.connectionactor.ConnectionActorWrapper
import net.syncthing.java.core.beans.DeviceId
import net.syncthing.java.core.beans.DeviceInfo
import net.syncthing.java.core.beans.FileInfo
import net.syncthing.java.core.configuration.Configuration
import net.syncthing.java.core.interfaces.IndexRepository
import net.syncthing.java.core.interfaces.TempRepository
import net.syncthing.java.core.security.KeystoreHandler
import net.syncthing.java.core.utils.awaitTerminationSafe
import net.syncthing.java.discovery.DiscoveryHandler
import org.slf4j.LoggerFactory
import java.io.Closeable
import java.io.IOException
import java.util.Collections
import java.util.TreeSet
import java.util.concurrent.Executors
import java.util.concurrent.TimeUnit
import java.util.concurrent.atomic.AtomicBoolean
import kotlin.collections.ArrayList
import kotlin.collections.HashMap
import java.io.InputStream
import java.util.*
class SyncthingClient(
private val configuration: Configuration,
private val repository: IndexRepository,
private val tempRepository: TempRepository
) : Closeable {
private val logger = LoggerFactory.getLogger(javaClass)
val discoveryHandler: DiscoveryHandler
val indexHandler: IndexHandler
private val connections = Collections.synchronizedSet(createConnectionsSet())
private val connectByDeviceIdLocks = Collections.synchronizedMap(HashMap<DeviceId, Object>())
val indexHandler = IndexHandler(configuration, repository, tempRepository)
val discoveryHandler = DiscoveryHandler(configuration)
private val onConnectionChangedListeners = Collections.synchronizedList(mutableListOf<(DeviceId) -> Unit>())
private var connectDevicesScheduler = Executors.newSingleThreadScheduledExecutor()
private fun createConnectionsSet() = TreeSet<ConnectionHandler>(compareBy { it.address.score })
init {
indexHandler = IndexHandler(configuration, repository, tempRepository)
discoveryHandler = DiscoveryHandler(configuration)
connectDevicesScheduler.scheduleAtFixedRate(this::updateIndexFromPeers, 0, 15, TimeUnit.SECONDS)
}
private val requestHandlerRegistry = RequestHandlerRegistry()
private val connections = Connections(
generate = { deviceId ->
ConnectionActorWrapper(
source = ConnectionActorGenerator.generateConnectionActors(
deviceAddress = discoveryHandler.devicesAddressesManager.getDeviceAddressManager(deviceId).streamCurrentDeviceAddresses(),
requestHandler = { request ->
GlobalScope.async {
requestHandlerRegistry.handleRequest(
source = deviceId,
request = request
)
}
},
indexHandler = indexHandler,
configuration = configuration
),
deviceId = deviceId,
connectivityChangeListener = {
synchronized(onConnectionChangedListeners) {
onConnectionChangedListeners.forEach { it(deviceId) }
}
}
)
}
)
fun clearCacheAndIndex() {
indexHandler.clearIndex()
configuration.folders = emptySet()
configuration.persistLater()
updateIndexFromPeers()
connections.reconnectAllConnections()
}
fun addOnConnectionChangedListener(listener: (DeviceId) -> Unit) {
@@ -75,158 +81,61 @@ class SyncthingClient(
onConnectionChangedListeners.remove(listener)
}
@Throws(IOException::class, KeystoreHandler.CryptoException::class)
private fun openConnection(deviceAddress: DeviceAddress): ConnectionHandler {
logger.debug("Connecting to ${deviceAddress.deviceId}, active connections: ${connections.map { it.deviceId().deviceId }}")
val connectionHandler = ConnectionHandler(
configuration, deviceAddress, indexHandler, tempRepository, { connectionHandler, _ ->
connectionHandler.close()
openConnection(deviceAddress)
},
{connection ->
if (!connection.isConnected) {
connections.remove(connection)
}
onConnectionChangedListeners.forEach { it(connection.deviceId()) }
})
private fun getConnections() = configuration.peerIds.map { connections.getByDeviceId(it) }
try {
connectionHandler.connect()
} catch (ex: Exception) {
connectionHandler.closeBg()
throw ex
}
connections.add(connectionHandler)
return connectionHandler
init {
discoveryHandler.newDeviceAddressSupplier() // starts the discovery
getConnections()
}
/**
* Takes discovered addresses from [[DiscoveryHandler]] and connects to devices.
*
* We need to make sure that we are only connecting once to each device.
*/
private fun getPeerConnections(listener: (connection: ConnectionHandler) -> Unit, completeListener: () -> Unit) {
// create an copy to prevent dispatching an action two times
val connectionsWhichWereDispatched = createConnectionsSet()
synchronized (connections) {
connectionsWhichWereDispatched.addAll(connections)
}
connectionsWhichWereDispatched.forEach { listener(it) }
discoveryHandler.newDeviceAddressSupplier()
.takeWhile { it != null }
.filterNotNull()
.groupBy { it.deviceId() }
.filterNot { it.value.isEmpty() }
.forEach { (deviceId, addresses) ->
// create an lock per device id to prevent multiple connections to one device
synchronized (connectByDeviceIdLocks) {
if (connectByDeviceIdLocks[deviceId] == null) {
connectByDeviceIdLocks[deviceId] = Object()
}
}
synchronized (connectByDeviceIdLocks[deviceId]!!) {
val existingConnection = connections.find { it.deviceId() == deviceId && it.isConnected }
if (existingConnection != null) {
connectionsWhichWereDispatched.add(existingConnection)
listener(existingConnection)
return@synchronized
}
// try to use all addresses
for (address in addresses.distinctBy { it.address }) {
try {
val newConnection = openConnection(address)
connectionsWhichWereDispatched.add(newConnection)
listener(newConnection)
break // it worked, no need to try more
} catch (e: IOException) {
logger.warn("error connecting to device = $address", e)
} catch (e: KeystoreHandler.CryptoException) {
logger.warn("error connecting to device = $address", e)
}
}
}
}
// use all connections which were added in the time between and were not added by this function call
val newConnectionsBackup = createConnectionsSet()
synchronized (connections) {
newConnectionsBackup.addAll(connections)
}
connectionsWhichWereDispatched.forEach { newConnectionsBackup.remove(it) }
newConnectionsBackup.forEach { listener(it) }
completeListener()
fun connectToNewlyAddedDevices() {
getConnections()
}
private fun updateIndexFromPeers() {
getPeerConnections({ connection ->
try {
indexHandler.waitForRemoteIndexAcquired(connection)
} catch (ex: InterruptedException) {
logger.warn("exception while waiting for index", ex)
}
}, {})
fun disconnectFromRemovedDevices() {
// TODO: implement this
}
private fun getConnectionForFolder(folder: String, listener: (connection: ConnectionHandler) -> Unit,
errorListener: () -> Unit) {
val isConnected = AtomicBoolean(false)
getPeerConnections({ connection ->
if (connection.hasFolder(folder) && !isConnected.get()) {
listener(connection)
isConnected.set(true)
}
}, {
if (!isConnected.get()) {
errorListener()
}
})
fun getActiveConnectionsForFolder(folderId: String) = configuration.peerIds
.map { connections.getByDeviceId(it) }
.filter { it.isConnected && it.hasFolder(folderId) }
suspend fun pullFile(
fileInfo: FileInfo,
progressListener: (status: BlockPullerStatus) -> Unit = { }
): InputStream = BlockPuller.pullFile(
fileInfo = fileInfo,
progressListener = progressListener,
connections = getConnections(),
indexHandler = indexHandler,
tempRepository = tempRepository
)
fun pullFileSync(fileInfo: FileInfo) = runBlocking { pullFile(fileInfo) }
fun getBlockPusher(folderId: String): BlockPusher {
val connection = getActiveConnectionsForFolder(folderId).first()
return BlockPusher(
localDeviceId = connection.deviceId,
connectionHandler = connection,
indexHandler = indexHandler,
requestHandlerRegistry = requestHandlerRegistry
)
}
fun getBlockPuller(folderId: String, listener: (BlockPuller) -> Unit, errorListener: () -> Unit) {
getConnectionForFolder(folderId, { connection ->
listener(connection.getBlockPuller())
}, errorListener)
}
fun getBlockPusher(folderId: String, listener: (BlockPusher) -> Unit, errorListener: () -> Unit) {
getConnectionForFolder(folderId, { connection ->
listener(connection.getBlockPusher())
}, errorListener)
}
fun getPeerStatus(): List<DeviceInfo> {
return configuration.peers.map { device ->
val isConnected = connections.find { it.deviceId() == device.deviceId }?.isConnected ?: false
device.copy(isConnected = isConnected)
}
fun getPeerStatus() = configuration.peers.map { device ->
device.copy(
isConnected = connections.getByDeviceId(device.deviceId).isConnected
)
}
override fun close() {
connectDevicesScheduler.awaitTerminationSafe()
discoveryHandler.close()
// Create copy of list, because it will be modified by handleConnectionClosedEvent(), causing ConcurrentModificationException.
ArrayList(connections).forEach{it.close()}
indexHandler.close()
repository.close()
tempRepository.close()
connections.shutdown()
assert(onConnectionChangedListeners.isEmpty())
}
}
-1
View File
@@ -9,7 +9,6 @@ dependencies {
compile "org.slf4j:slf4j-api:1.7.25"
compile "ch.qos.logback:logback-classic:1.2.3"
compile "com.google.code.gson:gson:2.8.2"
compile "org.apache.httpcomponents:httpclient:4.5.4"
compile "org.bouncycastle:bcmail-jdk15on:1.59"
implementation "org.jetbrains.kotlin:kotlin-stdlib:$kotlin_version"
}
@@ -1,4 +1,4 @@
/*
/*
* Copyright (C) 2016 Davide Imbriaco
*
* This Java file is subject to the terms of the Mozilla Public
@@ -18,44 +18,38 @@ import java.net.InetSocketAddress
import java.net.UnknownHostException
import java.util.*
/**
*
* TODO: this class cant use [[DeviceId]] because [[GlobalDiscoveryHandler.pickAnnounceServers]] uses that field for discovery server URLs.
*/
class DeviceAddress private constructor(val deviceId: String, private val instanceId: Long?, val address: String, producer: AddressProducer?, score: Int?, lastModified: Date?) {
// TODO: this should use a data class, but the custom equals prevents it
class DeviceAddress private constructor(val deviceId: DeviceId, private val instanceId: Long?, val address: String, producer: AddressProducer?, score: Int?, lastModified: Date?) {
private val producer = producer ?: AddressProducer.UNKNOWN
val score = score ?: Integer.MAX_VALUE
private val lastModified = lastModified ?: Date()
@Deprecated(message = "should use deviceIdObject instead")
fun deviceId() = DeviceId(deviceId)
val deviceIdObject: DeviceId by lazy { DeviceId(deviceId) }
@Throws(UnknownHostException::class)
private fun getInetAddress(): InetAddress = InetAddress.getByName(address.replaceFirst("^[^:]+://".toRegex(), "").replaceFirst("(:[0-9]+)?(/.*)?$".toRegex(), ""))
private fun getPort(): Int = if (address.matches("^[a-z]+://[^:]+:([0-9]+).*".toRegex())) {
private val port: Int by lazy {
if (address.matches("^[a-z]+://[^:]+:([0-9]+).*".toRegex())) {
Integer.parseInt(address.replaceFirst("^[a-z]+://[^:]+:([0-9]+).*".toRegex(), "$1"))
} else {
DEFAULT_PORT_BY_PROTOCOL[getType()]!!
DEFAULT_PORT_BY_PROTOCOL[type]!!
}
}
fun getType(): AddressType = when {
address.isEmpty() -> AddressType.NULL
address.startsWith("tcp://") -> AddressType.TCP
address.startsWith("relay://") -> AddressType.RELAY
address.startsWith("relay-http://") -> AddressType.HTTP_RELAY
address.startsWith("relay-https://") -> AddressType.HTTPS_RELAY
else -> AddressType.OTHER
val type: AddressType by lazy {
when {
address.isEmpty() -> AddressType.NULL
address.startsWith("tcp://") -> AddressType.TCP
address.startsWith("relay://") -> AddressType.RELAY
else -> AddressType.OTHER
}
}
@Throws(UnknownHostException::class)
fun getSocketAddress(): InetSocketAddress = InetSocketAddress(getInetAddress(), getPort())
fun getSocketAddress(): InetSocketAddress = InetSocketAddress(getInetAddress(), port)
fun isWorking(): Boolean = score < Integer.MAX_VALUE
constructor(deviceId: String, address: String) : this(deviceId, null, address, null, null, null)
constructor(deviceId: String, address: String) : this(DeviceId(deviceId), null, address, null, null, null)
fun containsUriParamValue(key: String): Boolean {
return !getUriParam(key).isNullOrEmpty()
@@ -79,7 +73,7 @@ class DeviceAddress private constructor(val deviceId: String, private val instan
}
enum class AddressType {
TCP, RELAY, OTHER, NULL, HTTP_RELAY, HTTPS_RELAY
TCP, RELAY, OTHER, NULL
}
enum class AddressProducer {
@@ -97,18 +91,18 @@ class DeviceAddress private constructor(val deviceId: String, private val instan
return hash
}
override fun equals(obj: Any?): Boolean {
if (this === obj) {
override fun equals(other: Any?): Boolean {
if (this === other) {
return true
}
if (obj == null) {
if (other == null) {
return false
}
if (javaClass != obj.javaClass) {
if (javaClass != other.javaClass) {
return false
}
val other = obj as DeviceAddress?
if (this.deviceId != other!!.deviceId) {
other as DeviceAddress
if (this.deviceId != other.deviceId) {
return false
}
return this.address == other.address
@@ -120,7 +114,7 @@ class DeviceAddress private constructor(val deviceId: String, private val instan
class Builder {
private var deviceId: String? = null
private var deviceId: DeviceId? = null
private var instanceId: Long? = null
private var address: String? = null
private var producer: AddressProducer? = null
@@ -129,7 +123,7 @@ class DeviceAddress private constructor(val deviceId: String, private val instan
constructor()
internal constructor(deviceId: String, instanceId: Long?, address: String, producer: AddressProducer, score: Int?, lastModified: Date) {
internal constructor(deviceId: DeviceId, instanceId: Long?, address: String, producer: AddressProducer, score: Int?, lastModified: Date) {
this.deviceId = deviceId
this.instanceId = instanceId
this.address = address
@@ -147,11 +141,11 @@ class DeviceAddress private constructor(val deviceId: String, private val instan
return this
}
fun getDeviceId(): String? {
fun getDeviceId(): DeviceId? {
return deviceId
}
fun setDeviceId(deviceId: String): Builder {
fun setDeviceId(deviceId: DeviceId): Builder {
this.deviceId = deviceId
return this
}
@@ -200,8 +194,7 @@ class DeviceAddress private constructor(val deviceId: String, private val instan
companion object {
private val DEFAULT_PORT_BY_PROTOCOL = mapOf(
AddressType.TCP to 22000,
AddressType.RELAY to 22067,
AddressType.HTTP_RELAY to 80,
AddressType.HTTPS_RELAY to 443)
AddressType.RELAY to 22067
)
}
}
@@ -77,7 +77,7 @@ class KeystoreHandler private constructor(private val keyStore: KeyStore) {
}
@Throws(CryptoException::class, IOException::class)
private fun wrapSocket(socket: Socket, isServerSocket: Boolean, protocol: String): SSLSocket {
private fun wrapSocket(socket: Socket, isServerSocket: Boolean): SSLSocket {
try {
logger.debug("wrapping plain socket, server mode = {}", isServerSocket)
val sslSocket = socketFactory.createSocket(socket, null, socket.port, true) as SSLSocket
@@ -98,7 +98,7 @@ class KeystoreHandler private constructor(private val keyStore: KeyStore) {
}
@Throws(CryptoException::class, IOException::class)
fun createSocket(relaySocketAddress: InetSocketAddress, protocol: String): SSLSocket {
fun createSocket(relaySocketAddress: InetSocketAddress): SSLSocket {
try {
val socket = socketFactory.createSocket() as SSLSocket
socket.connect(relaySocketAddress, SOCKET_TIMEOUT)
@@ -115,8 +115,8 @@ class KeystoreHandler private constructor(private val keyStore: KeyStore) {
}
@Throws(CryptoException::class, IOException::class)
fun wrapSocket(relayConnection: RelayConnection, protocol: String): SSLSocket {
return wrapSocket(relayConnection.getSocket(), relayConnection.isServerSocket(), protocol)
fun wrapSocket(relayConnection: RelayConnection): SSLSocket {
return wrapSocket(relayConnection.getSocket(), relayConnection.isServerSocket())
}
class Loader {
@@ -269,10 +269,15 @@ class KeystoreHandler private constructor(private val keyStore: KeyStore) {
@Throws(SSLPeerUnverifiedException::class, CertificateException::class)
fun assertSocketCertificateValid(certificate: Certificate, deviceId: DeviceId) {
NetworkUtils.assertProtocol(certificate is X509Certificate)
val derData = certificate.encoded
val deviceIdFromCertificate = derDataToDeviceId(derData)
logger.trace("remote pem certificate =\n{}", derToPem(derData))
NetworkUtils.assertProtocol(deviceIdFromCertificate == deviceId, {"device id mismatch! expected = $deviceId, got = $deviceIdFromCertificate"})
NetworkUtils.assertProtocol(deviceIdFromCertificate == deviceId) {
"device id mismatch! expected = $deviceId, got = $deviceIdFromCertificate"
}
logger.debug("remote ssl certificate match deviceId = {}", deviceId)
}
}
@@ -16,6 +16,7 @@ package net.syncthing.java.core.utils
import org.slf4j.LoggerFactory
import java.util.concurrent.ExecutorService
import java.util.concurrent.Future
import java.util.concurrent.RejectedExecutionException
import java.util.concurrent.TimeUnit
private val logger = LoggerFactory.getLogger(ExecutorService::class.java)
@@ -45,3 +46,11 @@ fun <T> ExecutorService.submitLogging(runnable: () -> T): Future<T> {
}
})
}
fun ExecutorService.trySubmitLogging(runnable: Runnable) {
try {
submitLogging(runnable)
} catch (ex: RejectedExecutionException) {
logger.warn("could not submit task", ex)
}
}
@@ -1,5 +1,6 @@
/*
* Copyright (C) 2016 Davide Imbriaco
* Copyright (C) 2018 Jonas Lochmann
*
* This Java file is subject to the terms of the Mozilla Public
* License, v. 2.0. If a copy of the MPL was not distributed with this
@@ -13,36 +14,90 @@
*/
package net.syncthing.java.core.utils
import org.apache.commons.io.FilenameUtils
object PathUtils {
val ROOT_PATH = ""
val PATH_SEPARATOR = "/"
val PARENT_PATH = ".."
private fun normalizePath(path: String): String {
return FilenameUtils.normalizeNoEndSeparator(path, true).replaceFirst(("^" + PATH_SEPARATOR).toRegex(), "")
}
const val ROOT_PATH = ""
const val PATH_SEPARATOR = "/"
const val PATH_SEPARATOR_WIN = "\\"
const val PARENT_PATH = ".."
const val CURRENT_PATH = "."
fun isRoot(path: String): Boolean {
return path.isEmpty()
}
private fun containsRelativeElements(path: String): Boolean {
val pathSegments = path.split(PATH_SEPARATOR)
return pathSegments.contains(PARENT_PATH) or pathSegments.contains(CURRENT_PATH)
}
private fun isTrimmed(value: String) = value.trim() == value
private fun containsWindowsPathSeparator(path: String) = path.contains(PATH_SEPARATOR_WIN)
private fun startsWithPathSeperator(path: String) = path.startsWith(PATH_SEPARATOR)
private fun isValidPath(path: String) = (!containsRelativeElements(path)) and
(!containsWindowsPathSeparator(path)) and
path.isNotEmpty() and
(!startsWithPathSeperator(path)) and
isTrimmed(path)
private fun containsPathSeparator(file: String) = file.contains(PATH_SEPARATOR) or file.contains(PATH_SEPARATOR_WIN)
private fun isFilenameValid(file: String) = file.isNotBlank() and
(!containsPathSeparator(file)) and
isTrimmed(file)
private fun assertPathValid(path: String) {
if (!isValidPath(path)) {
throw IllegalArgumentException("provided path is invalid")
}
}
private fun assertFilenameValid(filename: String) {
if (!isFilenameValid(filename)) {
throw IllegalArgumentException("provided filename is invalid")
}
}
fun isParent(path: String): Boolean {
return path == PARENT_PATH
}
fun getParentPath(path: String): String {
assert(!isRoot(path), {"cannot get parent of root path"})
return normalizePath(path + PATH_SEPARATOR + PARENT_PATH)
assertPathValid(path)
val pathWithoutSuffix = path.removeSuffix(PATH_SEPARATOR)
val previousSeparator = pathWithoutSuffix.lastIndexOf(PATH_SEPARATOR)
return if (previousSeparator == -1) {
ROOT_PATH
} else {
pathWithoutSuffix.substring(0, previousSeparator)
}
}
fun getFileName(path: String): String {
return FilenameUtils.getName(path)
if (path.isEmpty()) {
// this is required for IndexHandler.ROOT_FILE_INFO
return ""
}
assertPathValid(path)
val pathWithoutSuffix = path.removeSuffix(PATH_SEPARATOR)
val previousSeparator = pathWithoutSuffix.lastIndexOf(PATH_SEPARATOR)
return if (previousSeparator == -1) {
// the file is in the root directory
pathWithoutSuffix
} else {
pathWithoutSuffix.substring(previousSeparator + 1)
}
}
fun buildPath(dir: String, file: String): String {
return normalizePath(dir + PATH_SEPARATOR + file)
assertPathValid(dir)
assertFilenameValid(file)
return dir.removeSuffix(PATH_SEPARATOR) + file
}
}
@@ -19,16 +19,27 @@ import net.syncthing.java.core.beans.DeviceAddress
import net.syncthing.java.core.beans.DeviceId
class DeviceAddressesManager (val deviceId: DeviceId) {
companion object {
private const val MAX_ADDRESSES_PER_TYPE = 16
}
private val lock = Object()
private val deviceAddressesCache = mutableListOf<DeviceAddress>()
private val listeners = mutableListOf<(DeviceAddress) -> Unit>()
fun putAddress(address: DeviceAddress) {
if (address.deviceIdObject != deviceId) {
if (address.deviceId != deviceId) {
throw IllegalArgumentException()
}
synchronized(lock) {
val otherAddressesOfSameType = deviceAddressesCache.filter { it.type == address.type }
if (otherAddressesOfSameType.size == MAX_ADDRESSES_PER_TYPE) {
// forget the oldest one of the same type
deviceAddressesCache.remove(otherAddressesOfSameType.first())
}
deviceAddressesCache.add(address)
listeners.forEach { it(address) }
}
@@ -15,6 +15,7 @@
package net.syncthing.java.discovery
import kotlinx.coroutines.GlobalScope
import kotlinx.coroutines.channels.consumeEach
import kotlinx.coroutines.launch
import net.syncthing.java.core.beans.DeviceAddress
import net.syncthing.java.core.beans.DeviceId
@@ -39,7 +40,7 @@ class DiscoveryHandler(private val configuration: Configuration) : Closeable {
}, { deviceId ->
onMessageFromUnknownDeviceListeners.forEach { listener -> listener(deviceId) }
})
private val devicesAddressesManager = DevicesAddressesManager()
val devicesAddressesManager = DevicesAddressesManager()
private var isClosed = false
private val onMessageFromUnknownDeviceListeners = Collections.synchronizedSet(HashSet<(DeviceId) -> Unit>())
@@ -74,17 +75,18 @@ class DiscoveryHandler(private val configuration: Configuration) : Closeable {
val peers = configuration.peerIds
//do not process address already processed
list.filter { deviceAddress ->
!peers.contains(deviceAddress.deviceIdObject)
!peers.contains(deviceAddress.deviceId)
}
AddressRanker.pingAddresses(list)
.forEach { putDeviceAddress(it) }
AddressRanker.pingAddressesChannel(list).consumeEach {
putDeviceAddress(it)
}
}
}
private fun putDeviceAddress(deviceAddress: DeviceAddress) {
devicesAddressesManager.getDeviceAddressManager(
deviceId = deviceAddress.deviceIdObject
deviceId = deviceAddress.deviceId
).putAddress(deviceAddress)
}
@@ -77,7 +77,7 @@ object LocalDiscoveryUtil {
// discovery announcement is to be used.
DeviceAddress.Builder()
.setAddress(address.replaceFirst("tcp://(0.0.0.0|):".toRegex(), "tcp://$sourceAddress:"))
.setDeviceId(deviceId.deviceId)
.setDeviceId(deviceId)
.setInstanceId(announce.instanceId)
.setProducer(DeviceAddress.AddressProducer.LOCAL_DISCOVERY)
.build()
@@ -135,7 +135,7 @@ object LocalDiscoveryUtil {
data class LocalDiscoveryMessage(val deviceId: DeviceId, val addresses: List<DeviceAddress>) {
init {
addresses.forEach { address ->
if (address.deviceIdObject != deviceId) {
if (address.deviceId != deviceId) {
throw IllegalArgumentException()
}
}
@@ -15,6 +15,8 @@
package net.syncthing.java.discovery.utils
import kotlinx.coroutines.*
import kotlinx.coroutines.channels.produce
import kotlinx.coroutines.channels.toList
import net.syncthing.java.core.beans.DeviceAddress
import net.syncthing.java.core.beans.DeviceAddress.AddressType
import org.slf4j.LoggerFactory
@@ -26,49 +28,47 @@ object AddressRanker {
private const val TCP_CONNECTION_TIMEOUT = 5000
private val BASE_SCORE_MAP = mapOf(
AddressType.TCP to 0,
AddressType.RELAY to 2000,
AddressType.HTTP_RELAY to 1000 * 2000,
AddressType.HTTPS_RELAY to 1000 * 2000
AddressType.RELAY to 2000
)
private val ACCEPTED_ADDRESS_TYPES = BASE_SCORE_MAP.keys
private val logger = LoggerFactory.getLogger(javaClass)
suspend fun pingAddresses(sourceAddresses: List<DeviceAddress>) = coroutineScope {
addHttpRelays(sourceAddresses)
.filter { ACCEPTED_ADDRESS_TYPES.contains(it.getType()) }
.toList() // the following should happen parallel
.map {
fun pingAddressesChannel(sourceAddresses: List<DeviceAddress>) = GlobalScope.produce<DeviceAddress> {
sourceAddresses
.filter { ACCEPTED_ADDRESS_TYPES.contains(it.type) }
.toList()
.map { address ->
async {
try {
withTimeout(TCP_CONNECTION_TIMEOUT * 2L) {
val addressWithScore = withTimeout(TCP_CONNECTION_TIMEOUT * 2L) {
// this nested async ensures that cancelling/ the timeout has got an effect without delay
GlobalScope.async (Dispatchers.IO) {
pingAddressSync(it)
pingAddressSync(address)
}.await()
}
if (addressWithScore != null) {
send(addressWithScore)
}
} catch (ex: Exception) {
logger.warn("Failed to ping device", ex)
null
}
null
}
}
.map { it.await() }
.filterNotNull()
.sortedBy { it.score }
close()
}
private fun getHttpRelays(list: List<DeviceAddress>) = list
.asSequence()
.filter { address ->
address.getType() == AddressType.RELAY && address.containsUriParamValue("httpUrl")
}
.map { address ->
val httpUrl = address.getUriParam("httpUrl")
address.copyBuilder().setAddress("relay-" + httpUrl!!).build()
}
private fun addHttpRelays(list: List<DeviceAddress>) = getHttpRelays(list) + list
@Deprecated(
message = "This is slower than the version which returns the channel",
replaceWith = ReplaceWith("pingAddressesChannel")
)
suspend fun pingAddressesReturnAllResultsAtOnce(sourceAddresses: List<DeviceAddress>) = pingAddressesChannel(sourceAddresses)
.toList()
.sortedBy { it.score }
private fun pingAddressSync(deviceAddress: DeviceAddress): DeviceAddress? {
val startTime = System.currentTimeMillis()
@@ -84,7 +84,7 @@ object AddressRanker {
}
val ping = (System.currentTimeMillis() - startTime).toInt()
val baseScore = BASE_SCORE_MAP[deviceAddress.getType()] ?: 0
val baseScore = BASE_SCORE_MAP[deviceAddress.type] ?: 0
return deviceAddress.copyBuilder().setScore(ping + baseScore).build()
}
-37
View File
@@ -1,37 +0,0 @@
apply plugin: 'java-library'
apply plugin: 'kotlin'
apply plugin: 'com.google.protobuf'
dependencies {
compile project(':syncthing-relay-client')
implementation "org.jetbrains.kotlin:kotlin-stdlib:$kotlin_version"
implementation "com.google.protobuf:protobuf-lite:$protobuf_lite_version"
}
protobuf {
protoc {
artifact = "com.google.protobuf:protoc:3.5.1-1"
}
plugins {
javalite {
// The codegen for lite comes as a separate artifact
artifact = "com.google.protobuf:protoc-gen-javalite:3.0.0"
}
}
generateProtoTasks {
all().each { task ->
task.builtins {
// In most cases you don't need the full Java output
// if you use the lite output.
remove java
}
task.plugins {
javalite { }
}
}
}
}
// Workaround for https://github.com/google/protobuf-gradle-plugin/issues/100
compileKotlin.dependsOn('generateProto')
sourceSets.main.kotlin.srcDirs += file("${protobuf.generatedFilesBaseDir}/main/javalite")
@@ -1,31 +0,0 @@
/*
* Copyright (C) 2016 Davide Imbriaco
*
* This Java file is subject to the terms of the Mozilla Public
* License, v. 2.0. If a copy of the MPL was not distributed with this
* file, You can obtain one at http://mozilla.org/MPL/2.0/.
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package net.syncthing.java.httprelay
import net.syncthing.java.core.beans.DeviceAddress
import net.syncthing.java.core.beans.DeviceAddress.AddressType
import org.slf4j.LoggerFactory
class HttpRelayClient {
private val logger = LoggerFactory.getLogger(javaClass)
fun openRelayConnection(deviceAddress: DeviceAddress): HttpRelayConnection {
assert(setOf(AddressType.HTTP_RELAY, AddressType.HTTPS_RELAY).contains(deviceAddress.getType()))
val httpRelayServerUrl = deviceAddress.address.replaceFirst("^relay-".toRegex(), "")
val deviceId = deviceAddress.deviceId
logger.info("open http relay connection, relay url = {}, target device id = {}", httpRelayServerUrl, deviceId)
return HttpRelayConnection(httpRelayServerUrl, deviceId)
}
}
@@ -1,304 +0,0 @@
/*
* Copyright (C) 2016 Davide Imbriaco
*
* This Java file is subject to the terms of the Mozilla Public
* License, v. 2.0. If a copy of the MPL was not distributed with this
* file, You can obtain one at http://mozilla.org/MPL/2.0/.
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package net.syncthing.java.httprelay
import com.google.protobuf.ByteString
import net.syncthing.java.core.interfaces.RelayConnection
import net.syncthing.java.core.utils.NetworkUtils
import net.syncthing.java.core.utils.submitLogging
import org.apache.http.HttpStatus
import org.apache.http.client.methods.HttpPost
import org.apache.http.entity.ByteArrayEntity
import org.apache.http.impl.client.HttpClients
import org.apache.http.util.EntityUtils
import org.slf4j.LoggerFactory
import java.io.*
import java.net.*
import java.util.concurrent.ExecutionException
import java.util.concurrent.Executors
import java.util.concurrent.LinkedBlockingQueue
import java.util.concurrent.TimeUnit
class HttpRelayConnection internal constructor(private val httpRelayServerUrl: String, deviceId: String) : RelayConnection, Closeable {
private val logger = LoggerFactory.getLogger(javaClass)
private val outgoingExecutorService = Executors.newSingleThreadExecutor()
private val incomingExecutorService = Executors.newSingleThreadExecutor()
private val flusherStreamService = Executors.newSingleThreadScheduledExecutor()
private var peerToRelaySequence: Long = 0
private var relayToPeerSequence: Long = 0
private val sessionId: String
private val incomingDataQueue = LinkedBlockingQueue<ByteArray>()
private val socket: Socket
private val isServerSocket: Boolean
private val inputStream: InputStream
private val outputStream: OutputStream
var isClosed = false
private set
override fun getSocket() = socket
override fun isServerSocket() = isServerSocket
init {
val serverMessage = sendMessage(HttpRelayProtos.HttpRelayPeerMessage.newBuilder()
.setMessageType(HttpRelayProtos.HttpRelayPeerMessageType.CONNECT)
.setDeviceId(deviceId))
assert(serverMessage.messageType == HttpRelayProtos.HttpRelayServerMessageType.PEER_CONNECTED)
assert(!serverMessage.sessionId.isNullOrEmpty())
sessionId = serverMessage.sessionId
isServerSocket = serverMessage.isServerSocket
outputStream = object : OutputStream() {
private var buffer = ByteArrayOutputStream()
private var lastFlush = System.currentTimeMillis()
init {
flusherStreamService.scheduleWithFixedDelay({
if (System.currentTimeMillis() - lastFlush > 1000) {
try {
flush()
} catch (ex: IOException) {
logger.warn("", ex)
}
}
}, 1, 1, TimeUnit.SECONDS)
}
@Synchronized
@Throws(IOException::class)
override fun write(i: Int) {
NetworkUtils.assertProtocol(!this@HttpRelayConnection.isClosed)
buffer.write(i)
}
@Synchronized
@Throws(IOException::class)
override fun write(bytes: ByteArray, offset: Int, size: Int) {
NetworkUtils.assertProtocol(!this@HttpRelayConnection.isClosed)
buffer.write(bytes, offset, size)
}
@Synchronized
@Throws(IOException::class)
override fun flush() {
val data = buffer.toByteArray().copyOf().toList()
buffer = ByteArrayOutputStream()
try {
if (!data.isEmpty()) {
outgoingExecutorService.submit {
sendMessage(HttpRelayProtos.HttpRelayPeerMessage.newBuilder()
.setMessageType(HttpRelayProtos.HttpRelayPeerMessageType.PEER_TO_RELAY)
.setSequence(++peerToRelaySequence)
.setData(data as ByteString))
}.get()
}
lastFlush = System.currentTimeMillis()
} catch (ex: InterruptedException) {
logger.error("error", ex)
closeBg()
throw IOException(ex)
} catch (ex: ExecutionException) {
logger.error("error", ex)
closeBg()
throw IOException(ex)
}
}
@Synchronized
@Throws(IOException::class)
override fun write(bytes: ByteArray) {
NetworkUtils.assertProtocol(!this@HttpRelayConnection.isClosed)
buffer.write(bytes)
}
}
incomingExecutorService.submitLogging {
while (!isClosed) {
val serverMessage1 =
try {
sendMessage(HttpRelayProtos.HttpRelayPeerMessage.newBuilder().setMessageType(HttpRelayProtos.HttpRelayPeerMessageType.WAIT_FOR_DATA))
} catch (e: IOException) {
logger.warn("Failed to send relay message", e)
return@submitLogging
}
if (isClosed) {
return@submitLogging
}
NetworkUtils.assertProtocol(serverMessage1.messageType == HttpRelayProtos.HttpRelayServerMessageType.RELAY_TO_PEER)
NetworkUtils.assertProtocol(serverMessage1.sequence == relayToPeerSequence + 1)
if (!serverMessage1.data.isEmpty) {
incomingDataQueue.add(serverMessage1.data.toByteArray())
}
relayToPeerSequence = serverMessage1.sequence
}
}
inputStream = object : InputStream() {
private var noMoreData = false
private var byteArrayInputStream = ByteArrayInputStream(ByteArray(0))
@Throws(IOException::class)
override fun read(): Int {
NetworkUtils.assertProtocol(!this@HttpRelayConnection.isClosed)
if (noMoreData) {
return -1
}
var bite = -1
while (bite == -1) {
bite = byteArrayInputStream.read()
try {
val data = incomingDataQueue.poll(1, TimeUnit.SECONDS)
if (data == null) {
//continue
} else if (data.contentEquals(STREAM_CLOSED)) {
noMoreData = true
return -1
} else {
byteArrayInputStream = ByteArrayInputStream(data)
}
} catch (ex: InterruptedException) {
logger.warn("", ex)
}
}
return bite
}
}
socket = object : Socket() {
override fun isClosed(): Boolean {
return this@HttpRelayConnection.isClosed
}
override fun isConnected(): Boolean {
return !isClosed
}
@Throws(IOException::class)
override fun shutdownOutput() {
logger.debug("shutdownOutput")
outputStream.flush()
}
@Throws(IOException::class)
override fun shutdownInput() {
logger.debug("shutdownInput")
//do nothing
}
@Synchronized
@Throws(IOException::class)
override fun close() {
logger.debug("received close on socket adapter")
this@HttpRelayConnection.close()
}
@Throws(IOException::class)
override fun getOutputStream(): OutputStream {
return this@HttpRelayConnection.outputStream
}
@Throws(IOException::class)
override fun getInputStream(): InputStream {
return this@HttpRelayConnection.inputStream
}
@Throws(UnknownHostException::class)
override fun getRemoteSocketAddress(): SocketAddress {
return InetSocketAddress(inetAddress, port)
}
override fun getPort(): Int {
return 22067
}
@Throws(UnknownHostException::class)
override fun getInetAddress(): InetAddress {
return InetAddress.getByName(URI.create(this@HttpRelayConnection.httpRelayServerUrl).host)
}
}
}
private fun closeBg() {
Thread { close() }.start()
}
@Throws(IOException::class)
private fun sendMessage(peerMessageBuilder: HttpRelayProtos.HttpRelayPeerMessage.Builder): HttpRelayProtos.HttpRelayServerMessage {
if (!sessionId.isEmpty()) {
peerMessageBuilder.sessionId = sessionId
}
logger.debug("send http relay peer message = {} session id = {} sequence = {}", peerMessageBuilder.messageType, peerMessageBuilder.sessionId, peerMessageBuilder.sequence)
val httpClient = HttpClients.custom()
// .setSSLSocketFactory(new SSLConnectionSocketFactory(new SSLContextBuilder().loadTrustMaterial(null, new TrustSelfSignedStrategy()).build(), SSLConnectionSocketFactory.ALLOW_ALL_HOSTNAME_VERIFIER))
.build()
val httpPost = HttpPost(httpRelayServerUrl)
httpPost.entity = ByteArrayEntity(peerMessageBuilder.build().toByteArray())
val serverMessage = httpClient.execute(httpPost) { response ->
NetworkUtils.assertProtocol(response.statusLine.statusCode == HttpStatus.SC_OK, {"http error ${response.statusLine}"})
HttpRelayProtos.HttpRelayServerMessage.parseFrom(EntityUtils.toByteArray(response.entity))
}
logger.debug("received http relay server message = {}", serverMessage.messageType)
NetworkUtils.assertProtocol(serverMessage.messageType != HttpRelayProtos.HttpRelayServerMessageType.ERROR, {"server error : ${serverMessage.data.toStringUtf8()}"})
return serverMessage
}
override fun close() {
if (!isClosed) {
isClosed = true
logger.info("closing http relay connection {} : {}", httpRelayServerUrl, sessionId)
flusherStreamService.shutdown()
if (!sessionId.isEmpty()) {
try {
outputStream.flush()
sendMessage(HttpRelayProtos.HttpRelayPeerMessage.newBuilder().setMessageType(HttpRelayProtos.HttpRelayPeerMessageType.PEER_CLOSING))
} catch (ex: IOException) {
logger.warn("error closing http relay connection", ex)
}
}
incomingExecutorService.shutdown()
outgoingExecutorService.shutdown()
try {
incomingExecutorService.awaitTermination(1, TimeUnit.SECONDS)
} catch (ex: InterruptedException) {
logger.warn("", ex)
}
try {
outgoingExecutorService.awaitTermination(1, TimeUnit.SECONDS)
} catch (ex: InterruptedException) {
logger.warn("", ex)
}
try {
flusherStreamService.awaitTermination(1, TimeUnit.SECONDS)
} catch (ex: InterruptedException) {
logger.warn("", ex)
}
incomingDataQueue.add(STREAM_CLOSED)
}
}
companion object {
private val STREAM_CLOSED = "STREAM_CLOSED".toByteArray()
}
}
@@ -1,34 +0,0 @@
package net.syncthing.java.httprelay;
option optimize_for = LITE_RUNTIME;
message HttpRelayPeerMessage{
optional HttpRelayPeerMessageType message_type = 1;
optional string session_id = 2;
optional string device_id = 3;
optional int64 sequence = 4;
optional bytes data = 5;
}
message HttpRelayServerMessage{
optional HttpRelayServerMessageType message_type = 1;
optional string session_id = 2;
optional bool is_server_socket = 3;
optional int64 sequence = 4;
optional bytes data = 5;
}
enum HttpRelayPeerMessageType {
CONNECT = 0;
PEER_TO_RELAY = 1;
WAIT_FOR_DATA = 2;
PEER_CLOSING = 3;
}
enum HttpRelayServerMessageType {
PEER_CONNECTED = 0;
DATA_ACCEPTED = 1;
RELAY_TO_PEER = 2;
SERVER_CLOSING = 3;
ERROR = 4;
}
@@ -36,8 +36,8 @@ class RelayClient(configuration: Configuration) {
@Throws(IOException::class, KeystoreHandler.CryptoException::class)
fun openRelayConnection(address: DeviceAddress): RelayConnection {
assert(address.getType() == AddressType.RELAY)
val sessionInvitation = getSessionInvitation(address.getSocketAddress(), address.deviceId())
assert(address.type == AddressType.RELAY)
val sessionInvitation = getSessionInvitation(address.getSocketAddress(), address.deviceId)
return openConnectionSessionMode(sessionInvitation)
}
@@ -80,7 +80,7 @@ class RelayClient(configuration: Configuration) {
@Throws(IOException::class, KeystoreHandler.CryptoException::class)
fun getSessionInvitation(relaySocketAddress: InetSocketAddress, deviceId: DeviceId): SessionInvitation {
logger.debug("connecting to relay = {} (temporary protocol mode)", relaySocketAddress)
keystoreHandler.createSocket(relaySocketAddress, KeystoreHandler.RELAY).use { socket ->
keystoreHandler.createSocket(relaySocketAddress).use { socket ->
RelayDataInputStream(socket.getInputStream()).use { `in` ->
RelayDataOutputStream(socket.getOutputStream()).use { out ->
run {
@@ -49,7 +49,6 @@ dependencies {
implementation (project(':syncthing-client')) {
exclude group: 'commons-logging', module:'commons-logging'
exclude group: 'org.apache.httpcomponents', module:'httpclient'
exclude group: 'org.slf4j'
exclude group: 'ch.qos.logback'
}
@@ -26,7 +26,6 @@ import net.syncthing.java.core.interfaces.IndexRepository
import net.syncthing.java.core.interfaces.Sequencer
import net.syncthing.java.core.interfaces.TempRepository
import org.apache.commons.lang3.tuple.Pair
import org.apache.http.util.TextUtils.isBlank
import org.bouncycastle.util.encoders.Hex
import org.slf4j.LoggerFactory
import java.io.Closeable
@@ -310,21 +309,21 @@ class SqlRepository(databaseFolder: File) : Closeable, IndexRepository, TempRepo
}
@Throws(SQLException::class)
override fun updateFileInfo(newFileInfo: FileInfo, newFileBlocks: FileBlocks?) {
val version = newFileInfo.versionList.last()
override fun updateFileInfo(fileInfo: FileInfo, fileBlocks: FileBlocks?) {
val version = fileInfo.versionList.last()
//TODO open transsaction, rollback
getConnection().use { connection ->
if (newFileBlocks != null) {
FileInfo.checkBlocks(newFileInfo, newFileBlocks)
if (fileBlocks != null) {
FileInfo.checkBlocks(fileInfo, fileBlocks)
connection.prepareStatement("MERGE INTO file_blocks"
+ " (folder,path,hash,size,blocks)"
+ " VALUES (?,?,?,?,?)").use { prepareStatement ->
prepareStatement.setString(1, newFileBlocks.folder)
prepareStatement.setString(2, newFileBlocks.path)
prepareStatement.setString(3, newFileBlocks.hash)
prepareStatement.setLong(4, newFileBlocks.size)
prepareStatement.setString(1, fileBlocks.folder)
prepareStatement.setString(2, fileBlocks.path)
prepareStatement.setString(3, fileBlocks.hash)
prepareStatement.setLong(4, fileBlocks.size)
prepareStatement.setBytes(5, BlockExchangeExtraProtos.Blocks.newBuilder()
.addAllBlocks(newFileBlocks.blocks.map { input ->
.addAllBlocks(fileBlocks.blocks.map { input ->
BlockExchangeProtos.BlockInfo.newBuilder()
.setOffset(input.offset)
.setSize(input.size)
@@ -334,25 +333,25 @@ class SqlRepository(databaseFolder: File) : Closeable, IndexRepository, TempRepo
prepareStatement.executeUpdate()
}
}
val oldFileInfo = findFileInfo(newFileInfo.folder, newFileInfo.path)
val oldFileInfo = findFileInfo(fileInfo.folder, fileInfo.path)
connection.prepareStatement("MERGE INTO file_info"
+ " (folder,path,file_name,parent,size,hash,last_modified,file_type,version_id,version_value,is_deleted)"
+ " VALUES (?,?,?,?,?,?,?,?,?,?,?)").use { prepareStatement ->
prepareStatement.setString(1, newFileInfo.folder)
prepareStatement.setString(2, newFileInfo.path)
prepareStatement.setString(3, newFileInfo.fileName)
prepareStatement.setString(4, newFileInfo.parent)
prepareStatement.setLong(7, newFileInfo.lastModified.time)
prepareStatement.setString(8, newFileInfo.type.name)
prepareStatement.setString(1, fileInfo.folder)
prepareStatement.setString(2, fileInfo.path)
prepareStatement.setString(3, fileInfo.fileName)
prepareStatement.setString(4, fileInfo.parent)
prepareStatement.setLong(7, fileInfo.lastModified.time)
prepareStatement.setString(8, fileInfo.type.name)
prepareStatement.setLong(9, version.id)
prepareStatement.setLong(10, version.value)
prepareStatement.setBoolean(11, newFileInfo.isDeleted)
if (newFileInfo.isDirectory()) {
prepareStatement.setBoolean(11, fileInfo.isDeleted)
if (fileInfo.isDirectory()) {
prepareStatement.setNull(5, Types.BIGINT)
prepareStatement.setNull(6, Types.VARCHAR)
} else {
prepareStatement.setLong(5, newFileInfo.size!!)
prepareStatement.setString(6, newFileInfo.hash)
prepareStatement.setLong(5, fileInfo.size!!)
prepareStatement.setString(6, fileInfo.hash)
}
prepareStatement.executeUpdate()
}
@@ -361,14 +360,14 @@ class SqlRepository(databaseFolder: File) : Closeable, IndexRepository, TempRepo
var deltaDirCount: Long = 0
var deltaSize: Long = 0
val oldMissing = oldFileInfo == null || oldFileInfo.isDeleted
val newMissing = newFileInfo.isDeleted
val newMissing = fileInfo.isDeleted
val oldSizeMissing = oldMissing || !oldFileInfo!!.isFile()
val newSizeMissing = newMissing || !newFileInfo.isFile()
val newSizeMissing = newMissing || !fileInfo.isFile()
if (!oldSizeMissing) {
deltaSize -= oldFileInfo!!.size!!
}
if (!newSizeMissing) {
deltaSize += newFileInfo.size!!
deltaSize += fileInfo.size!!
}
if (!oldMissing) {
if (oldFileInfo!!.isFile()) {
@@ -378,13 +377,13 @@ class SqlRepository(databaseFolder: File) : Closeable, IndexRepository, TempRepo
}
}
if (!newMissing) {
if (newFileInfo.isFile()) {
if (fileInfo.isFile()) {
deltaFileCount++
} else if (newFileInfo.isDirectory()) {
} else if (fileInfo.isDirectory()) {
deltaDirCount++
}
}
val folderStats = updateFolderStats(connection, newFileInfo.folder, deltaFileCount, deltaDirCount, deltaSize, newFileInfo.lastModified)
val folderStats = updateFolderStats(connection, fileInfo.folder, deltaFileCount, deltaDirCount, deltaSize, fileInfo.lastModified)
onFolderStatsUpdatedListener?.invoke(object : IndexRepository.FolderStatsUpdatedEvent() {
override fun getFolderStats(): List<FolderStats> {
@@ -412,7 +411,7 @@ class SqlRepository(databaseFolder: File) : Closeable, IndexRepository, TempRepo
@Throws(SQLException::class)
override fun findFileInfoBySearchTerm(query: String): List<FileInfo> {
assert(!isBlank(query))
assert(query.isNotBlank())
// checkArgument(maxResult > 0);
// try (Connection connection = getConnection(); PreparedStatement preparedStatement = connection.prepareStatement("SELECT * FROM file_info WHERE LOWER(file_name) LIKE ? AND is_deleted=FALSE LIMIT ?")) {
getConnection().use { connection ->
@@ -433,7 +432,7 @@ class SqlRepository(databaseFolder: File) : Closeable, IndexRepository, TempRepo
@Throws(SQLException::class)
override fun countFileInfoBySearchTerm(query: String): Long {
assert(!isBlank(query))
assert(query.isNotBlank())
getConnection().use { connection ->
connection.prepareStatement("SELECT COUNT(*) FROM file_info WHERE LOWER(file_name) REGEXP ? AND is_deleted=FALSE").use { preparedStatement ->
// try (Connection connection = getConnection(); PreparedStatement preparedStatement = connection.prepareStatement("SELECT COUNT(*) FROM file_info")) {
@@ -704,7 +703,7 @@ class SqlRepository(databaseFolder: File) : Closeable, IndexRepository, TempRepo
val instanceId = resultSet.getLong("instance_id")
return DeviceAddress.Builder()
.setAddress(resultSet.getString("address_url"))
.setDeviceId(resultSet.getString("device_id"))
.setDeviceId(DeviceId(resultSet.getString("device_id")))
.setInstanceId(if (instanceId == 0L) null else instanceId)
.setProducer(DeviceAddress.AddressProducer.valueOf(resultSet.getString("address_producer")))
.setScore(resultSet.getInt("address_score"))