feat(ui): добавлены новые состояния и компоненты для отображения статуса работы

This commit is contained in:
2026-05-13 17:22:31 +03:00
parent 6c18a1d741
commit f551efe4a6
40 changed files with 1787 additions and 542 deletions

View File

@@ -8,6 +8,7 @@ import okhttp3.OkHttpClient
import retrofit2.Retrofit
import retrofit2.converter.jackson.JacksonConverterFactory
import java.util.UUID
import java.util.concurrent.ConcurrentHashMap
/**
* Фабрика REST-клиента Яндекс.Диска: отдельный [OkHttpClient] с OAuth на каждый vault,
@@ -18,6 +19,9 @@ class YandexDiskApiFactory(
private val ioDispatcher: CoroutineDispatcher,
) {
/** Кеш OAuth-токена по vault, чтобы не дергать БД на каждый HTTP-запрос к cloud-api. */
private val oauthTokenCache = ConcurrentHashMap<String, Pair<Long, String>>()
private val jackson = jacksonObjectMapper().apply { findAndRegisterModules() }
/** Без авторизации — только для одноразовых ссылок upload/download. */
@@ -54,13 +58,21 @@ class YandexDiskApiFactory(
fun createApiForVault(vaultUuid: UUID): YandexDiskApi {
val id = vaultUuid.toString()
return createAuthenticatedApi {
runBlocking(ioDispatcher) {
accountRepository.getByVaultUuid(id)?.oauthToken
val now = System.currentTimeMillis()
val hit = oauthTokenCache[id]
if (hit != null && now - hit.first < OAUTH_TOKEN_CACHE_TTL_MS) {
return@createAuthenticatedApi hit.second
}
val token = runBlocking(ioDispatcher) {
accountRepository.getByVaultUuid(id)?.oauthToken
} ?: throw java.io.IOException("Yandex OAuth token is missing")
oauthTokenCache[id] = now to token
token
}
}
companion object {
private const val BASE_URL = "https://cloud-api.yandex.net/"
private const val OAUTH_TOKEN_CACHE_TTL_MS = 120_000L
}
}

View File

@@ -20,9 +20,11 @@ import okhttp3.RequestBody.Companion.toRequestBody
import okhttp3.ResponseBody
import retrofit2.HttpException
import retrofit2.Response
import java.io.FileNotFoundException
import java.io.FilterInputStream
import java.io.IOException
import java.io.InputStream
import java.util.concurrent.ConcurrentHashMap
class YandexDiskRepository(
private val api: YandexDiskApi,
@@ -30,40 +32,68 @@ class YandexDiskRepository(
private val ioDispatcher: CoroutineDispatcher,
) {
private val diskCacheLock = Any()
private var diskInfoCached: DiskInfoDto? = null
private var diskInfoCachedUntilMs: Long = 0L
private val listCache = ConcurrentHashMap<ListCacheKey, ResourceDto>()
private val getCache = ConcurrentHashMap<String, ResourceDto>()
suspend fun diskInfo(): DiskInfoDto = withContext(ioDispatcher) {
wrapAuth { api.getDisk() }
val now = System.currentTimeMillis()
synchronized(diskCacheLock) {
val cached = diskInfoCached
if (cached != null && now < diskInfoCachedUntilMs) {
return@withContext cached
}
}
val fresh = wrapAuth { api.getDisk() }
synchronized(diskCacheLock) {
diskInfoCached = fresh
diskInfoCachedUntilMs = System.currentTimeMillis() + DISK_INFO_TTL_MS
}
fresh
}
suspend fun list(path: String, limit: Int, offset: Int, sort: String? = null): ResourceDto =
withContext(ioDispatcher) {
try {
wrapAuth { api.listResources(path, limit, offset, sort, FIELDS_LIST) }
} catch (e: HttpException) {
if (e.code() == 404) {
ResourceDto(embedded = EmbeddedResourceListDto(items = emptyList()))
} else {
throw e
val key = ListCacheKey(path = path, limit = limit, offset = offset, sort = sort)
listCache[key]?.let { return@withContext it }
suspend fun tryList(p: String): ResourceDto? =
try {
wrapAuth { api.listResources(p, limit, offset, sort, FIELDS_LIST) }
} catch (e: HttpException) {
if (e.code() == 404) null else throw e
}
}
val primary = tryList(path)
val secondary = if (!path.endsWith('/') && path != "app:/") tryList("$path/") else null
val result = primary ?: secondary
?: ResourceDto(embedded = EmbeddedResourceListDto(items = emptyList()))
putListCache(key, result)
result
}
suspend fun get(path: String): ResourceDto = withContext(ioDispatcher) {
wrapAuth { api.getResource(path, FIELDS_RESOURCE) }
getCache[path]?.let { return@withContext it }
val result = fetchResource(path)
putGetCache(path, result)
result
}
suspend fun getOrNull(path: String): ResourceDto? = withContext(ioDispatcher) {
try {
wrapAuth { api.getResource(path, FIELDS_RESOURCE) }
} catch (e: HttpException) {
if (e.code() == 404) null else throw e
getCache[path]?.let { return@withContext it }
val result = fetchResourceOrNull(path)
if (result != null) {
putGetCache(path, result)
}
result
}
suspend fun createFolder(path: String): Unit = withContext(ioDispatcher) {
val resp = wrapAuth { api.createFolder(path) }
when (resp.code()) {
201 -> Unit
409 -> Unit
201, 409 -> invalidateDiskMetaCaches()
else -> throw failure("createFolder", resp)
}
}
@@ -71,13 +101,14 @@ class YandexDiskRepository(
suspend fun delete(path: String, permanently: Boolean = true): Unit = withContext(ioDispatcher) {
val resp = wrapAuth { api.deleteResource(path, permanently) }
when (resp.code()) {
204 -> Unit
204 -> invalidateDiskMetaCaches()
202 -> {
val link = resp.body()?.use { body -> parseLink(body) }
?: throw IOException("DELETE 202 without body")
awaitOperation(link.href)
invalidateDiskMetaCaches()
}
404 -> Unit
404 -> invalidateDiskMetaCaches()
else -> throw failure("delete", resp)
}
}
@@ -91,70 +122,113 @@ class YandexDiskRepository(
throw failure("patch", resp)
}
resp.body()?.close()
invalidateDiskMetaCaches()
}
suspend fun uploadBytes(path: String, bytes: ByteArray, overwrite: Boolean = true): Unit =
withContext(ioDispatcher) {
val link = wrapAuth { api.getUploadLink(path, overwrite) }
val link = uploadLinkOrThrow(path, overwrite)
require(link.method.equals("PUT", ignoreCase = true)) {
"Unexpected upload method ${link.method}"
}
val body = bytes.toRequestBody(OCTET_STREAM)
val req = Request.Builder().url(link.href).put(body).build()
rawHttp.newCall(req).execute().use { resp ->
if (!resp.isSuccessful) {
throw IOException("Upload failed: HTTP ${resp.code}")
repeat(LOCKED_RETRY_MAX) { attempt ->
rawHttp.newCall(req).execute().use { resp ->
when {
resp.isSuccessful -> {
invalidateDiskMetaCaches()
return@withContext
}
resp.code == 423 && attempt < LOCKED_RETRY_MAX - 1 ->
delay(lockedBackoffMs(attempt))
else ->
throw IOException("Upload failed: HTTP ${resp.code}")
}
}
}
}
suspend fun uploadFile(path: String, file: java.io.File, overwrite: Boolean = true): Unit =
withContext(ioDispatcher) {
val link = wrapAuth { api.getUploadLink(path, overwrite) }
val link = uploadLinkOrThrow(path, overwrite)
require(link.method.equals("PUT", ignoreCase = true)) {
"Unexpected upload method ${link.method}"
}
val body = file.asRequestBody(OCTET_STREAM)
val req = Request.Builder().url(link.href).put(body).build()
rawHttp.newCall(req).execute().use { resp ->
if (!resp.isSuccessful) {
throw IOException("Upload failed: HTTP ${resp.code}")
repeat(LOCKED_RETRY_MAX) { attempt ->
rawHttp.newCall(req).execute().use { resp ->
when {
resp.isSuccessful -> {
invalidateDiskMetaCaches()
return@withContext
}
resp.code == 423 && attempt < LOCKED_RETRY_MAX - 1 ->
delay(lockedBackoffMs(attempt))
else ->
throw IOException("Upload failed: HTTP ${resp.code}")
}
}
}
}
/** Поток должен быть закрыт вызывающим кодом — закроет HTTP-ответ. */
suspend fun openDownloadStream(path: String): InputStream = withContext(ioDispatcher) {
val link = wrapAuth { api.getDownloadLink(path) }
val link = try {
wrapAuth { api.getDownloadLink(path) }
} catch (e: HttpException) {
if (e.code() == 404) throw FileNotFoundException(path)
throw e
}
require(link.method.equals("GET", ignoreCase = true)) {
"Unexpected download method ${link.method}"
}
val req = Request.Builder().url(link.href).get().build()
val resp = rawHttp.newCall(req).execute()
if (!resp.isSuccessful) {
resp.close()
throw IOException("Download failed: HTTP ${resp.code}")
}
val body = resp.body
val stream = body?.byteStream() ?: run {
resp.close()
throw IOException("Download failed: missing body")
}
object : FilterInputStream(stream) {
override fun close() {
try {
`in`.close()
} finally {
repeat(LOCKED_RETRY_MAX) { attempt ->
val resp = rawHttp.newCall(req).execute()
when {
resp.isSuccessful -> {
val body = resp.body
val stream = body?.byteStream() ?: run {
resp.close()
throw IOException("Download failed: missing body")
}
return@withContext object : FilterInputStream(stream) {
override fun close() {
try {
`in`.close()
} finally {
resp.close()
}
}
}
}
resp.code == 423 && attempt < LOCKED_RETRY_MAX - 1 -> {
resp.close()
delay(lockedBackoffMs(attempt))
}
else -> {
val code = resp.code
resp.close()
throw IOException("Download failed: HTTP $code")
}
}
}
throw IOException("Download failed: HTTP 423 after retries")
}
private suspend fun awaitOperation(href: String) {
repeat(OPERATION_POLL_MAX) {
delay(OPERATION_POLL_DELAY_MS)
val st: OperationStatusDto = wrapAuth { api.getOperationByUrl(href) }
val st: OperationStatusDto = try {
wrapAuth { api.getOperationByUrl(href) }
} catch (e: HttpException) {
if (e.code() == 404) {
throw IOException("Disk async operation status not found (404)")
}
throw e
}
when (st.status?.lowercase()) {
"success" -> return
"failure", "failed" -> throw IOException("Disk async operation failed")
@@ -164,18 +238,83 @@ class YandexDiskRepository(
throw IOException("Disk async operation timed out")
}
private suspend inline fun <T> wrapAuth(crossinline block: suspend () -> T): T {
private suspend fun uploadLinkOrThrow(path: String, overwrite: Boolean): LinkDto {
try {
return block()
return wrapAuth { api.getUploadLink(path, overwrite) }
} catch (e: HttpException) {
when (e.code()) {
401 -> {
throw YandexDiskAuthException(e.message())
if (e.code() == 404) {
throw FileNotFoundException("Upload path or parent not found: $path")
}
throw e
}
}
private suspend fun fetchResource(path: String): ResourceDto {
suspend fun tryGet(p: String): ResourceDto? =
try {
wrapAuth { api.getResource(p, FIELDS_RESOURCE) }
} catch (e: HttpException) {
if (e.code() == 404) null else throw e
}
val primary = tryGet(path)
val secondary = if (!path.endsWith('/')) tryGet("$path/") else null
return primary ?: secondary ?: throw FileNotFoundException(path)
}
private suspend fun fetchResourceOrNull(path: String): ResourceDto? {
suspend fun tryGet(p: String): ResourceDto? =
try {
wrapAuth { api.getResource(p, FIELDS_RESOURCE) }
} catch (e: HttpException) {
if (e.code() == 404) null else throw e
}
return tryGet(path) ?: if (!path.endsWith('/')) tryGet("$path/") else null
}
private fun putListCache(key: ListCacheKey, value: ResourceDto) {
if (listCache.size >= LIST_CACHE_MAX_ENTRIES) {
listCache.clear()
}
listCache[key] = value
}
private fun putGetCache(path: String, value: ResourceDto) {
if (getCache.size >= GET_CACHE_MAX_ENTRIES) {
getCache.clear()
}
getCache[path] = value
}
private fun invalidateDiskMetaCaches() {
synchronized(diskCacheLock) {
diskInfoCached = null
diskInfoCachedUntilMs = 0L
}
listCache.clear()
getCache.clear()
}
private suspend inline fun <T> wrapAuth(crossinline block: suspend () -> T): T {
repeat(LOCKED_RETRY_MAX) { attempt ->
try {
return block()
} catch (e: HttpException) {
when (e.code()) {
401 -> throw YandexDiskAuthException(e.message())
423 -> {
if (attempt >= LOCKED_RETRY_MAX - 1) {
throw IOException(
"Yandex Disk: ресурс временно заблокирован (HTTP 423). Повторите позже.",
e,
)
}
delay(lockedBackoffMs(attempt))
}
else -> throw e
}
404 -> throw e
else -> throw e
}
}
error("unreachable")
}
private fun failure(op: String, resp: Response<ResponseBody>): IOException {
@@ -186,12 +325,29 @@ class YandexDiskRepository(
private fun parseLink(body: ResponseBody): LinkDto =
jackson.readValue(body.string())
private data class ListCacheKey(
val path: String,
val limit: Int,
val offset: Int,
val sort: String?,
)
companion object {
private val jackson = jacksonObjectMapper().apply { findAndRegisterModules() }
private val OCTET_STREAM = "application/octet-stream".toMediaType()
private const val OPERATION_POLL_DELAY_MS = 300L
private const val OPERATION_POLL_MAX = 200
private const val DISK_INFO_TTL_MS = 45_000L
private const val LIST_CACHE_MAX_ENTRIES = 384
private const val GET_CACHE_MAX_ENTRIES = 384
/** Сколько раз повторять запрос при HTTP 423 (ресурс временно заблокирован). */
private const val LOCKED_RETRY_MAX = 6
private fun lockedBackoffMs(attempt: Int): Long =
(100L * (1L shl attempt)).coerceAtMost(2000L)
/**
* Урезанный набор полей для листинга каталога (см. параметр `fields` в Disk API).
*/

View File

@@ -28,6 +28,7 @@ import kotlinx.coroutines.flow.map
import kotlinx.coroutines.launch
import kotlinx.coroutines.sync.Mutex
import kotlinx.coroutines.sync.withLock
import java.io.FileNotFoundException
import java.io.InputStream
import java.io.OutputStream
import java.time.Instant
@@ -287,7 +288,13 @@ class EncryptedStorageAccessor(
override suspend fun openReadSystemFile(name: String): InputStream = scope.run {
val path = Path(systemHiddenDirName, name).pathString
return@run openRead(path)
return@run try {
openRead(path)
} catch (_: FileNotFoundException) {
// Как у Yandex/Local: системного файла ещё нет — создаём пустой и читаем снова.
openWriteSystemFile(name).use { }
openRead(path)
}
}
override suspend fun openWriteSystemFile(name: String): OutputStream = scope.run {

View File

@@ -39,11 +39,14 @@ import kotlinx.coroutines.sync.withLock
import kotlinx.coroutines.withContext
import java.io.ByteArrayOutputStream
import java.io.File
import java.io.FileNotFoundException
import java.io.FileOutputStream
import java.io.IOException
import java.io.InputStream
import java.io.OutputStream
import java.time.Instant
import java.util.UUID
import kotlin.jvm.Volatile
/**
* Реализация [IStorageAccessor] для дерева файлов `app:/<storageUuid>/…` на Яндекс.Диске.
@@ -84,6 +87,9 @@ class YandexStorageAccessor(
private var statsPersistJob: Job? = null
@Volatile
private var systemDirEnsured: Boolean = false
suspend fun init() = withContext(ioDispatcher) {
try {
val persisted = readPersistedStats()
@@ -119,7 +125,8 @@ class YandexStorageAccessor(
rel.isBlank() || rel == "/" -> ""
else -> rel.trimStart('/')
}
return if (tail.isEmpty()) diskRoot else "$diskRoot/$tail"
// Корень хранилища — каталог в app:; для list/get API стабильнее с завершающим «/».
return if (tail.isEmpty()) "$diskRoot/" else "$diskRoot/$tail"
}
private fun toRelPath(diskPath: String): String {
@@ -135,6 +142,10 @@ class YandexStorageAccessor(
val tail = diskPath.substring(i + needle.length).removeSuffix("/")
return if (tail.isEmpty()) "/" else "/$tail"
}
val needleEnd = "/$u"
if (diskPath.endsWith(needleEnd, ignoreCase = true)) {
return "/"
}
return "/"
}
@@ -148,9 +159,14 @@ class YandexStorageAccessor(
rel == "/$SYSTEM_HIDDEN_DIRNAME" || rel.startsWith("/$SYSTEM_HIDDEN_DIRNAME/")
private suspend fun ensureSystemDirExists() {
if (systemDirEnsured) return
val p = toDiskPath("/$SYSTEM_HIDDEN_DIRNAME")
if (guard { repo.getOrNull(p) }?.type == "dir") return
if (guard { repo.getOrNull(p) }?.type == "dir") {
systemDirEnsured = true
return
}
guard { repo.createFolder(p) }
systemDirEnsured = true
}
private fun statsFileRel(): String = "/$SYSTEM_HIDDEN_DIRNAME/$STATS_FILENAME"
@@ -199,6 +215,8 @@ class YandexStorageAccessor(
} catch (e: YandexDiskAuthException) {
reportAuthFailure()
throw e
} catch (_: IOException) {
// Запись stats — best-effort; сетевые сбои не роняем процесс (ошибки в лог UI не выводятся).
}
}
}
@@ -274,6 +292,19 @@ class YandexStorageAccessor(
return files to dirs
}
private suspend fun getMetadataAfterWrite(diskPath: String): ResourceDto {
val maxAttempts = 6
repeat(maxAttempts) { attempt ->
try {
return guard { repo.get(diskPath) }
} catch (e: FileNotFoundException) {
if (attempt == maxAttempts - 1) throw e
delay(200L * (attempt + 1))
}
}
error("unreachable")
}
private fun ResourceDto.toCommonFile(rel: String): CommonFile =
CommonFile(
CommonMetaInfo(
@@ -505,7 +536,7 @@ class YandexStorageAccessor(
val hadFile = prior?.type == "file"
val priorSize = if (prior?.type == "file") prior.size ?: 0L else 0L
guard { repo.uploadFile(diskPath, tmp, overwrite = true) }
val after = guard { repo.get(diskPath) }
val after = guard { getMetadataAfterWrite(diskPath) }
if (after.type != "file") {
throw IllegalStateException("Expected file after upload: $path")
}
@@ -687,7 +718,7 @@ class YandexStorageAccessor(
private const val SYNC_LOCK_FILENAME = "sync-lock.json"
private const val STATS_DEBOUNCE_MS = 450L
private const val DATA_PAGE_LENGTH = 10
private const val API_LIST_LIMIT = 200
private const val API_LIST_LIMIT = 1000
private const val PROP_HIDDEN = "wallenc.hidden"
private const val PROP_DELETED = "wallenc.deleted"
}

View File

@@ -33,6 +33,9 @@ class LocalVault(
private val _storages = MutableStateFlow<List<IStorage>>(emptyList())
override val storages: StateFlow<List<IStorage>> = _storages
private val _storagesScanInProgress = MutableStateFlow(false)
override val storagesScanInProgress: StateFlow<Boolean> = _storagesScanInProgress
private val _isAvailable = MutableStateFlow(false)
override val isAvailable: StateFlow<Boolean> = _isAvailable
@@ -46,9 +49,14 @@ class LocalVault(
init {
CoroutineScope(ioDispatcher).launch {
_isAvailable.value = path.value != null
if (path.value != null) {
readStorages()
_storagesScanInProgress.value = true
try {
_isAvailable.value = path.value != null
if (path.value != null) {
readStorages()
}
} finally {
_storagesScanInProgress.value = false
}
}
}

View File

@@ -10,6 +10,9 @@ import com.github.nullptroma.wallenc.vault.contract.DescribedVault
import com.github.nullptroma.wallenc.vault.contract.VaultDescriptor
import kotlinx.coroutines.CoroutineDispatcher
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.async
import kotlinx.coroutines.awaitAll
import kotlinx.coroutines.coroutineScope
import kotlinx.coroutines.flow.MutableStateFlow
import kotlinx.coroutines.flow.StateFlow
import kotlinx.coroutines.launch
@@ -42,6 +45,9 @@ class YandexVault(
private val _storages = MutableStateFlow<List<IStorage>>(emptyList())
override val storages: StateFlow<List<IStorage>> = _storages
private val _storagesScanInProgress = MutableStateFlow(false)
override val storagesScanInProgress: StateFlow<Boolean> = _storagesScanInProgress
private val _totalSpace = MutableStateFlow<Long?>(null)
override val totalSpace: StateFlow<Long?> = _totalSpace
@@ -55,6 +61,7 @@ class YandexVault(
}
private suspend fun refreshFromDisk() {
_storagesScanInProgress.value = true
_vaultReachable.value = false
try {
val info = repo.diskInfo()
@@ -70,11 +77,13 @@ class YandexVault(
} catch (_: Exception) {
_vaultReachable.value = false
_storages.value = emptyList()
} finally {
_storagesScanInProgress.value = false
}
}
private suspend fun loadStoragesList(): List<IStorage> {
val out = mutableListOf<YandexStorage>()
val pending = mutableListOf<YandexStorage>()
var offset = 0
while (true) {
val root = repo.list("app:/", APP_LIST_LIMIT, offset)
@@ -83,25 +92,30 @@ class YandexVault(
if (item.type != "dir") continue
val name = item.name ?: continue
val storageUuid = runCatching { UUID.fromString(name) }.getOrNull() ?: continue
val storage = YandexStorage(
uuid = storageUuid,
repo = repo,
vaultAvailability = _vaultReachable,
ioDispatcher = ioDispatcher,
accessorScope = parentScope,
reportAuthFailure = {
parentScope.launch { _vaultReachable.value = false }
},
pending.add(
YandexStorage(
uuid = storageUuid,
repo = repo,
vaultAvailability = _vaultReachable,
ioDispatcher = ioDispatcher,
accessorScope = parentScope,
reportAuthFailure = {
parentScope.launch { _vaultReachable.value = false }
},
),
)
try {
storage.init()
out.add(storage)
} catch (_: Exception) { }
}
if (items.size < APP_LIST_LIMIT) break
offset += items.size
}
return out
if (pending.isEmpty()) return emptyList()
return coroutineScope {
pending.map { storage ->
async(ioDispatcher) {
if (runCatching { storage.init() }.isSuccess) storage else null
}
}.awaitAll().filterNotNull()
}
}
override suspend fun createStorage(): IStorage = withContext(ioDispatcher) {
@@ -135,6 +149,6 @@ class YandexVault(
}
private companion object {
private const val APP_LIST_LIMIT = 200
private const val APP_LIST_LIMIT = 1000
}
}