Улучшена фоновая синхронизация, обработана ошибка
This commit is contained in:
@@ -29,7 +29,9 @@ private fun mapVaultIo(e: IOException): WallencException {
|
||||
WallencException.Auth.TokenMissing()
|
||||
msg.contains("HTTP 423", ignoreCase = true) || msg.contains("423 after retries", ignoreCase = true) ->
|
||||
WallencException.Network.ResourceLocked()
|
||||
msg.contains("async operation timed out", ignoreCase = true) ->
|
||||
msg.equals("timeout", ignoreCase = true) ||
|
||||
msg.contains("timed out", ignoreCase = true) ||
|
||||
msg.contains("async operation timed out", ignoreCase = true) ->
|
||||
WallencException.Network.OperationTimedOut()
|
||||
msg.contains("async operation failed", ignoreCase = true) ->
|
||||
WallencException.Network.OperationFailed()
|
||||
|
||||
@@ -10,6 +10,7 @@ import retrofit2.Retrofit
|
||||
import retrofit2.converter.jackson.JacksonConverterFactory
|
||||
import java.util.UUID
|
||||
import java.util.concurrent.ConcurrentHashMap
|
||||
import java.util.concurrent.TimeUnit
|
||||
|
||||
/**
|
||||
* Фабрика REST-клиента Яндекс.Диска: отдельный [OkHttpClient] с OAuth на каждый vault,
|
||||
@@ -27,14 +28,14 @@ class YandexDiskApiFactory(
|
||||
|
||||
/** Без авторизации — только для одноразовых ссылок upload/download. */
|
||||
val rawHttpClient: OkHttpClient by lazy {
|
||||
OkHttpClient.Builder().build()
|
||||
newHttpClientBuilder().build()
|
||||
}
|
||||
|
||||
/**
|
||||
* [tokenProvider] вызывается на каждый HTTP-запрос к cloud-api (свежий токен из БД).
|
||||
*/
|
||||
fun createAuthenticatedApi(tokenProvider: () -> String?): YandexDiskApi {
|
||||
val client = OkHttpClient.Builder()
|
||||
val client = newHttpClientBuilder()
|
||||
.addInterceptor { chain ->
|
||||
val token = tokenProvider()
|
||||
?: throw java.io.IOException("Yandex OAuth token is missing")
|
||||
@@ -74,11 +75,21 @@ class YandexDiskApiFactory(
|
||||
|
||||
companion object {
|
||||
const val BASE_URL = "https://cloud-api.yandex.net/"
|
||||
|
||||
private const val CONNECT_TIMEOUT_SEC = 30L
|
||||
private const val READ_TIMEOUT_SEC = 120L
|
||||
private const val WRITE_TIMEOUT_SEC = 120L
|
||||
|
||||
fun newHttpClientBuilder(): OkHttpClient.Builder =
|
||||
OkHttpClient.Builder()
|
||||
.connectTimeout(CONNECT_TIMEOUT_SEC, TimeUnit.SECONDS)
|
||||
.readTimeout(READ_TIMEOUT_SEC, TimeUnit.SECONDS)
|
||||
.writeTimeout(WRITE_TIMEOUT_SEC, TimeUnit.SECONDS)
|
||||
fun createRepositoryWithToken(
|
||||
oauthToken: String,
|
||||
ioDispatcher: CoroutineDispatcher,
|
||||
): YandexDiskRepository {
|
||||
val client = OkHttpClient.Builder()
|
||||
val client = newHttpClientBuilder()
|
||||
.addInterceptor { chain ->
|
||||
chain.proceed(
|
||||
chain.request().newBuilder()
|
||||
|
||||
@@ -8,9 +8,10 @@ import com.github.nullptroma.wallenc.domain.encrypt.Encryptor
|
||||
import com.github.nullptroma.wallenc.domain.interfaces.IStorage
|
||||
import com.github.nullptroma.wallenc.domain.interfaces.IStorageAccessor
|
||||
import kotlinx.coroutines.CoroutineDispatcher
|
||||
import kotlinx.coroutines.CoroutineExceptionHandler
|
||||
import kotlinx.coroutines.CoroutineScope
|
||||
import kotlinx.coroutines.DisposableHandle
|
||||
import kotlinx.coroutines.Job
|
||||
import kotlinx.coroutines.SupervisorJob
|
||||
import kotlinx.coroutines.withContext
|
||||
import java.util.UUID
|
||||
|
||||
@@ -25,8 +26,13 @@ class EncryptedStorage private constructor(
|
||||
metaInfoFilePostfix = STORAGE_INFO_FILE_POSTFIX,
|
||||
), DisposableHandle {
|
||||
|
||||
private val job = Job()
|
||||
private val scope = CoroutineScope(ioDispatcher + job)
|
||||
private val job = SupervisorJob()
|
||||
private val scope = CoroutineScope(
|
||||
ioDispatcher + job + CoroutineExceptionHandler { _, throwable ->
|
||||
System.err.println("EncryptedStorage: uncaught coroutine failure: ${throwable.message}")
|
||||
throwable.printStackTrace()
|
||||
},
|
||||
)
|
||||
|
||||
private val encInfo =
|
||||
source.metaInfo.value.encInfo ?: throw WallencException.Storage.NotEncrypted()
|
||||
|
||||
@@ -82,34 +82,40 @@ class EncryptedStorageAccessor(
|
||||
private fun collectSourceState() {
|
||||
scope.launch {
|
||||
launch {
|
||||
source.filesUpdates.collect { page ->
|
||||
val files = page.data.map(::decryptEntity).filterSystemHiddenFiles()
|
||||
_filesUpdates.emit(
|
||||
DataPage(
|
||||
list = files,
|
||||
isLoading = page.isLoading,
|
||||
isError = page.isError,
|
||||
hasNext = page.hasNext,
|
||||
pageLength = page.pageLength,
|
||||
pageIndex = page.pageIndex,
|
||||
try {
|
||||
source.filesUpdates.collect { page ->
|
||||
val files = page.data.map(::decryptEntity).filterSystemHiddenFiles()
|
||||
_filesUpdates.emit(
|
||||
DataPage(
|
||||
list = files,
|
||||
isLoading = page.isLoading,
|
||||
isError = page.isError,
|
||||
hasNext = page.hasNext,
|
||||
pageLength = page.pageLength,
|
||||
pageIndex = page.pageIndex,
|
||||
),
|
||||
)
|
||||
)
|
||||
}
|
||||
} catch (_: Exception) {
|
||||
}
|
||||
}
|
||||
|
||||
launch {
|
||||
source.dirsUpdates.collect { page ->
|
||||
val dirs = page.data.map(::decryptEntity).filterSystemHiddenDirs()
|
||||
_dirsUpdates.emit(
|
||||
DataPage(
|
||||
list = dirs,
|
||||
isLoading = page.isLoading,
|
||||
isError = page.isError,
|
||||
hasNext = page.hasNext,
|
||||
pageLength = page.pageLength,
|
||||
pageIndex = page.pageIndex,
|
||||
try {
|
||||
source.dirsUpdates.collect { page ->
|
||||
val dirs = page.data.map(::decryptEntity).filterSystemHiddenDirs()
|
||||
_dirsUpdates.emit(
|
||||
DataPage(
|
||||
list = dirs,
|
||||
isLoading = page.isLoading,
|
||||
isError = page.isError,
|
||||
hasNext = page.hasNext,
|
||||
pageLength = page.pageLength,
|
||||
pageIndex = page.pageIndex,
|
||||
),
|
||||
)
|
||||
)
|
||||
}
|
||||
} catch (_: Exception) {
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -31,6 +31,7 @@ import kotlinx.coroutines.delay
|
||||
import kotlin.coroutines.coroutineContext
|
||||
import kotlinx.coroutines.ensureActive
|
||||
import kotlinx.coroutines.flow.Flow
|
||||
import kotlinx.coroutines.flow.FlowCollector
|
||||
import kotlinx.coroutines.flow.MutableSharedFlow
|
||||
import kotlinx.coroutines.flow.MutableStateFlow
|
||||
import kotlinx.coroutines.flow.SharingStarted
|
||||
@@ -113,8 +114,14 @@ class YandexStorageAccessor(
|
||||
_size.value = persisted.totalBytes
|
||||
_numberOfFiles.value = persisted.fileCount
|
||||
} else {
|
||||
scanSizeAndNumOfFiles()
|
||||
writePersistedStatsInternal()
|
||||
try {
|
||||
scanSizeAndNumOfFiles()
|
||||
writePersistedStatsInternal()
|
||||
} catch (e: CancellationException) {
|
||||
throw e
|
||||
} catch (_: Exception) {
|
||||
// Полный обход дерева не обязателен для работы; при таймауте сети storage остаётся доступным.
|
||||
}
|
||||
}
|
||||
_storageReady.value = true
|
||||
} catch (e: YandexDiskAuthException) {
|
||||
@@ -233,8 +240,8 @@ class YandexStorageAccessor(
|
||||
} catch (e: YandexDiskAuthException) {
|
||||
reportAuthFailure()
|
||||
throw e
|
||||
} catch (_: IOException) {
|
||||
// Запись stats — best-effort; сетевые сбои не роняем процесс (ошибки в лог UI не выводятся).
|
||||
} catch (_: Exception) {
|
||||
// Запись stats — best-effort; сетевые сбои не роняем процесс.
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -378,35 +385,15 @@ class YandexStorageAccessor(
|
||||
}
|
||||
|
||||
override fun getFilesFlow(path: String): Flow<DataPage<IFile>> = flow {
|
||||
val all = withContext(ioDispatcher) { listImmediateChildren(path).first }
|
||||
var pageIndex = 0
|
||||
var i = 0
|
||||
while (i < all.size) {
|
||||
val chunk = all.subList(i, kotlin.math.min(i + DATA_PAGE_LENGTH, all.size)).toList()
|
||||
emit(
|
||||
DataPage(
|
||||
list = chunk,
|
||||
isLoading = false,
|
||||
isError = false,
|
||||
hasNext = i + DATA_PAGE_LENGTH < all.size,
|
||||
pageLength = DATA_PAGE_LENGTH,
|
||||
pageIndex = pageIndex++,
|
||||
),
|
||||
)
|
||||
i += DATA_PAGE_LENGTH
|
||||
}
|
||||
if (all.isEmpty()) {
|
||||
emit(
|
||||
DataPage(
|
||||
list = emptyList(),
|
||||
isLoading = false,
|
||||
isError = false,
|
||||
hasNext = false,
|
||||
pageLength = DATA_PAGE_LENGTH,
|
||||
pageIndex = 0,
|
||||
),
|
||||
)
|
||||
val all = try {
|
||||
withContext(ioDispatcher) { listImmediateChildren(path).first }
|
||||
} catch (e: CancellationException) {
|
||||
throw e
|
||||
} catch (_: Exception) {
|
||||
emit(filesFlowErrorPage())
|
||||
return@flow
|
||||
}
|
||||
emitAllFilesPages(all)
|
||||
}.flowOn(ioDispatcher)
|
||||
|
||||
override suspend fun getAllDirs(): List<IDirectory> = withContext(ioDispatcher) {
|
||||
@@ -432,35 +419,15 @@ class YandexStorageAccessor(
|
||||
}
|
||||
|
||||
override fun getDirsFlow(path: String): Flow<DataPage<IDirectory>> = flow {
|
||||
val all = withContext(ioDispatcher) { listImmediateChildren(path).second }
|
||||
var pageIndex = 0
|
||||
var i = 0
|
||||
while (i < all.size) {
|
||||
val chunk = all.subList(i, kotlin.math.min(i + DATA_PAGE_LENGTH, all.size)).toList()
|
||||
emit(
|
||||
DataPage(
|
||||
list = chunk,
|
||||
isLoading = false,
|
||||
isError = false,
|
||||
hasNext = i + DATA_PAGE_LENGTH < all.size,
|
||||
pageLength = DATA_PAGE_LENGTH,
|
||||
pageIndex = pageIndex++,
|
||||
),
|
||||
)
|
||||
i += DATA_PAGE_LENGTH
|
||||
}
|
||||
if (all.isEmpty()) {
|
||||
emit(
|
||||
DataPage(
|
||||
list = emptyList(),
|
||||
isLoading = false,
|
||||
isError = false,
|
||||
hasNext = false,
|
||||
pageLength = DATA_PAGE_LENGTH,
|
||||
pageIndex = 0,
|
||||
),
|
||||
)
|
||||
val all = try {
|
||||
withContext(ioDispatcher) { listImmediateChildren(path).second }
|
||||
} catch (e: CancellationException) {
|
||||
throw e
|
||||
} catch (_: Exception) {
|
||||
emit(dirsFlowErrorPage())
|
||||
return@flow
|
||||
}
|
||||
emitAllDirsPages(all)
|
||||
}.flowOn(ioDispatcher)
|
||||
|
||||
override suspend fun getFileInfo(path: String): IFile = withContext(ioDispatcher) {
|
||||
@@ -753,6 +720,88 @@ class YandexStorageAccessor(
|
||||
guard { repo.setCustomProperties(toDiskPath(path), props) }
|
||||
}
|
||||
|
||||
private suspend fun FlowCollector<DataPage<IFile>>.emitAllFilesPages(all: List<IFile>) {
|
||||
var pageIndex = 0
|
||||
var i = 0
|
||||
while (i < all.size) {
|
||||
val chunk = all.subList(i, kotlin.math.min(i + DATA_PAGE_LENGTH, all.size)).toList()
|
||||
emit(
|
||||
DataPage(
|
||||
list = chunk,
|
||||
isLoading = false,
|
||||
isError = false,
|
||||
hasNext = i + DATA_PAGE_LENGTH < all.size,
|
||||
pageLength = DATA_PAGE_LENGTH,
|
||||
pageIndex = pageIndex++,
|
||||
),
|
||||
)
|
||||
i += DATA_PAGE_LENGTH
|
||||
}
|
||||
if (all.isEmpty()) {
|
||||
emit(
|
||||
DataPage(
|
||||
list = emptyList(),
|
||||
isLoading = false,
|
||||
isError = false,
|
||||
hasNext = false,
|
||||
pageLength = DATA_PAGE_LENGTH,
|
||||
pageIndex = 0,
|
||||
),
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
private suspend fun FlowCollector<DataPage<IDirectory>>.emitAllDirsPages(all: List<IDirectory>) {
|
||||
var pageIndex = 0
|
||||
var i = 0
|
||||
while (i < all.size) {
|
||||
val chunk = all.subList(i, kotlin.math.min(i + DATA_PAGE_LENGTH, all.size)).toList()
|
||||
emit(
|
||||
DataPage(
|
||||
list = chunk,
|
||||
isLoading = false,
|
||||
isError = false,
|
||||
hasNext = i + DATA_PAGE_LENGTH < all.size,
|
||||
pageLength = DATA_PAGE_LENGTH,
|
||||
pageIndex = pageIndex++,
|
||||
),
|
||||
)
|
||||
i += DATA_PAGE_LENGTH
|
||||
}
|
||||
if (all.isEmpty()) {
|
||||
emit(
|
||||
DataPage(
|
||||
list = emptyList(),
|
||||
isLoading = false,
|
||||
isError = false,
|
||||
hasNext = false,
|
||||
pageLength = DATA_PAGE_LENGTH,
|
||||
pageIndex = 0,
|
||||
),
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
private fun filesFlowErrorPage(): DataPage<IFile> =
|
||||
DataPage(
|
||||
list = emptyList(),
|
||||
isLoading = false,
|
||||
isError = true,
|
||||
hasNext = false,
|
||||
pageLength = DATA_PAGE_LENGTH,
|
||||
pageIndex = 0,
|
||||
)
|
||||
|
||||
private fun dirsFlowErrorPage(): DataPage<IDirectory> =
|
||||
DataPage(
|
||||
list = emptyList(),
|
||||
isLoading = false,
|
||||
isError = true,
|
||||
hasNext = false,
|
||||
pageLength = DATA_PAGE_LENGTH,
|
||||
pageIndex = 0,
|
||||
)
|
||||
|
||||
companion object {
|
||||
private val statsMapper = jacksonObjectMapper().apply { findAndRegisterModules() }
|
||||
|
||||
|
||||
@@ -15,6 +15,7 @@ import com.github.nullptroma.wallenc.domain.interfaces.IVaultsManager
|
||||
import com.github.nullptroma.wallenc.vault.contract.VaultRegistrar
|
||||
import com.github.nullptroma.wallenc.vault.contract.VaultRegistration
|
||||
import kotlinx.coroutines.CoroutineDispatcher
|
||||
import kotlinx.coroutines.CoroutineExceptionHandler
|
||||
import kotlinx.coroutines.CoroutineScope
|
||||
import kotlinx.coroutines.ExperimentalCoroutinesApi
|
||||
import kotlinx.coroutines.SupervisorJob
|
||||
@@ -38,7 +39,14 @@ class VaultsManager(
|
||||
private val yandexDiskRepositoryFactory: YandexDiskRepositoryFactory,
|
||||
) : IVaultsManager, VaultRegistrar {
|
||||
|
||||
private val scope = CoroutineScope(SupervisorJob() + ioDispatcher)
|
||||
private val scope = CoroutineScope(
|
||||
SupervisorJob() +
|
||||
ioDispatcher +
|
||||
CoroutineExceptionHandler { _, throwable ->
|
||||
System.err.println("VaultsManager: uncaught coroutine failure: ${throwable.message}")
|
||||
throwable.printStackTrace()
|
||||
},
|
||||
)
|
||||
|
||||
private val yandexVaults: StateFlow<List<IVault>> = yandexAccountStore.observeAll()
|
||||
.map { rows ->
|
||||
|
||||
@@ -10,6 +10,7 @@ import retrofit2.HttpException
|
||||
import retrofit2.Response
|
||||
import java.io.FileNotFoundException
|
||||
import java.io.IOException
|
||||
import java.net.SocketTimeoutException
|
||||
|
||||
class VaultThrowableMappingTest {
|
||||
|
||||
@@ -33,6 +34,12 @@ class VaultThrowableMappingTest {
|
||||
assertTrue(mapped is WallencException.Auth.TokenMissing)
|
||||
}
|
||||
|
||||
@Test
|
||||
fun mapsSocketTimeoutToOperationTimedOut() {
|
||||
val mapped = SocketTimeoutException("timeout").toVaultWallencException()
|
||||
assertTrue(mapped is WallencException.Network.OperationTimedOut)
|
||||
}
|
||||
|
||||
@Test
|
||||
fun mapsFileNotFoundToStorageFileNotFound() {
|
||||
val mapped = FileNotFoundException("x").toVaultWallencException()
|
||||
|
||||
Reference in New Issue
Block a user