fix(sync): стабилизировал синхронизацию, Yandex I/O и вёрстку карточки storage
Добавил TRASH вместо DELETE для moveToTrash, компакцию журналов и отчёт об ошибках apply. Исправил проброс ошибок upload Yandex при close, CAS lock и загрузку OAuth-токена. Упростил совместимость sync-групп (только encInfo), поправил растягивание StorageTree при недоступных meta.
This commit is contained in:
@@ -54,28 +54,26 @@ class YandexDiskApiFactory(
|
||||
}
|
||||
|
||||
/**
|
||||
* Провайдер токена читает [YandexAccountRepository] на IO-диспетчере (как и остальной data-слой).
|
||||
* OAuth-токен загружается один раз при создании API (не в OkHttp interceptor).
|
||||
* При 401 см. [YandexDiskAuthException] и повторную привязку vault.
|
||||
*/
|
||||
fun createApiForVault(vaultUuid: UUID): YandexDiskApi {
|
||||
val id = vaultUuid.toString()
|
||||
val token = runBlocking(ioDispatcher) {
|
||||
accountRepository.getByVaultUuid(id)?.oauthToken
|
||||
} ?: throw java.io.IOException("Yandex OAuth token is missing")
|
||||
oauthTokenCache[id] = System.currentTimeMillis() to token
|
||||
return createAuthenticatedApi {
|
||||
val now = System.currentTimeMillis()
|
||||
val hit = oauthTokenCache[id]
|
||||
if (hit != null && now - hit.first < OAUTH_TOKEN_CACHE_TTL_MS) {
|
||||
return@createAuthenticatedApi hit.second
|
||||
}
|
||||
val token = runBlocking(ioDispatcher) {
|
||||
accountRepository.getByVaultUuid(id)?.oauthToken
|
||||
} ?: throw java.io.IOException("Yandex OAuth token is missing")
|
||||
oauthTokenCache[id] = now to token
|
||||
token
|
||||
oauthTokenCache[id]?.second ?: token
|
||||
}
|
||||
}
|
||||
|
||||
fun invalidateTokenCache(vaultUuid: UUID) {
|
||||
oauthTokenCache.remove(vaultUuid.toString())
|
||||
}
|
||||
|
||||
companion object {
|
||||
const val BASE_URL = "https://cloud-api.yandex.net/"
|
||||
private const val OAUTH_TOKEN_CACHE_TTL_MS = 120_000L
|
||||
|
||||
fun createRepositoryWithToken(
|
||||
oauthToken: String,
|
||||
ioDispatcher: CoroutineDispatcher,
|
||||
|
||||
@@ -46,6 +46,7 @@ class EncryptedStorageAccessor(
|
||||
private val scope: CoroutineScope
|
||||
) : IStorageAccessor, DisposableHandle {
|
||||
private val syncActorId = UUID.randomUUID().toString()
|
||||
private val syncLockMutex = Mutex()
|
||||
private val _size = MutableStateFlow<Long?>(null)
|
||||
override val size: StateFlow<Long?> = _size
|
||||
|
||||
@@ -63,7 +64,6 @@ class EncryptedStorageAccessor(
|
||||
private val dataEncryptor = Encryptor(key.toAesKey())
|
||||
private val pathEncryptor: EncryptorWithStaticIv? = if(pathIv != null) EncryptorWithStaticIv(key.toAesKey(), pathIv) else null
|
||||
|
||||
private val syncLockMutex = Mutex()
|
||||
private var systemHiddenFilesIsActual = false
|
||||
|
||||
init {
|
||||
@@ -266,7 +266,7 @@ class EncryptedStorageAccessor(
|
||||
|
||||
override suspend fun moveToTrash(path: String) {
|
||||
source.moveToTrash(encryptPath(path))
|
||||
appendSyncEntry(path = path, operation = StorageSyncOperation.DELETE)
|
||||
appendSyncEntry(path = path, operation = StorageSyncOperation.TRASH)
|
||||
}
|
||||
|
||||
override fun dispose() {
|
||||
|
||||
@@ -547,7 +547,7 @@ class LocalStorageAccessor(
|
||||
?: throw WallencException.Storage.FileNotFound()
|
||||
val newMeta = pair.meta.copy(isDeleted = true)
|
||||
writeMeta(pair.metaFile, newMeta)
|
||||
appendSyncEntry(path = path, operation = StorageSyncOperation.DELETE)
|
||||
appendSyncEntry(path = path, operation = StorageSyncOperation.TRASH)
|
||||
}
|
||||
|
||||
override suspend fun openReadSystemFile(name: String): InputStream = withContext(ioDispatcher) {
|
||||
|
||||
@@ -532,34 +532,8 @@ class YandexStorageAccessor(
|
||||
val tmp = File.createTempFile("wallenc-yandex-", ".upload")
|
||||
val fos = FileOutputStream(tmp)
|
||||
fos.onClosed {
|
||||
runBlocking(ioDispatcher) {
|
||||
try {
|
||||
val diskPath = toDiskPath(path)
|
||||
val prior = guard { repo.getOrNull(diskPath) }
|
||||
if (prior?.type == "dir") {
|
||||
throw WallencException.Storage.CannotWriteOverDirectory()
|
||||
}
|
||||
val hadFile = prior?.type == "file"
|
||||
val priorSize = if (prior?.type == "file") prior.size ?: 0L else 0L
|
||||
guard { repo.uploadFile(diskPath, tmp, overwrite = true) }
|
||||
val after = guard { getMetadataAfterWrite(diskPath) }
|
||||
if (after.type != "file") {
|
||||
throw WallencException.Storage.UnexpectedState()
|
||||
}
|
||||
val newSize = after.size ?: 0L
|
||||
_size.value = ((_size.value ?: 0L) + newSize - priorSize).coerceAtLeast(0L)
|
||||
if (!hadFile) {
|
||||
_numberOfFiles.value = (_numberOfFiles.value ?: 0) + 1
|
||||
}
|
||||
persistStatsImmediate()
|
||||
val info = runCatching { after.toCommonFile(path) }.getOrNull()
|
||||
info?.let {
|
||||
_filesUpdates.emit(DataPage(listOf(it), pageLength = 1, pageIndex = 0))
|
||||
}
|
||||
appendSyncEntry(path = path, operation = StorageSyncOperation.UPSERT)
|
||||
} finally {
|
||||
tmp.delete()
|
||||
}
|
||||
runCommitAfterStreamClosed {
|
||||
commitUploadedFile(path, tmp)
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -570,7 +544,7 @@ class YandexStorageAccessor(
|
||||
|
||||
override suspend fun moveToTrash(path: String) = withContext(ioDispatcher) {
|
||||
patchCustomProps(path, mapOf(PROP_DELETED to "true"))
|
||||
appendSyncEntry(path = path, operation = StorageSyncOperation.DELETE)
|
||||
appendSyncEntry(path = path, operation = StorageSyncOperation.TRASH)
|
||||
}
|
||||
|
||||
override suspend fun openReadSystemFile(name: String): InputStream = withContext(ioDispatcher) {
|
||||
@@ -589,10 +563,10 @@ class YandexStorageAccessor(
|
||||
val rel = "/$SYSTEM_HIDDEN_DIRNAME/$name"
|
||||
val uploadBuffer = ByteArrayOutputStream()
|
||||
uploadBuffer.onClosed {
|
||||
runBlocking(ioDispatcher) {
|
||||
guard {
|
||||
repo.uploadBytes(toDiskPath(rel), uploadBuffer.toByteArray(), overwrite = true)
|
||||
}
|
||||
val bytes = uploadBuffer.toByteArray()
|
||||
val diskPath = toDiskPath(rel)
|
||||
runCommitAfterStreamClosed {
|
||||
guard { repo.uploadBytes(diskPath, bytes, overwrite = true) }
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -641,8 +615,25 @@ class YandexStorageAccessor(
|
||||
}.getOrNull()
|
||||
}
|
||||
|
||||
/**
|
||||
* Best-effort lock на Диске (read-modify-write без CAS на стороне API).
|
||||
* Межустройственная координация опирается на [StorageSyncEngine] mutex в процессе.
|
||||
*/
|
||||
override suspend fun tryAcquireSyncLock(holderId: String, leaseUntil: Instant): Boolean = withContext(ioDispatcher) {
|
||||
return@withContext syncLockMutex.withLock {
|
||||
repeat(SYNC_LOCK_CAS_RETRIES) { attempt ->
|
||||
val acquired = tryAcquireSyncLockOnce(holderId, leaseUntil)
|
||||
if (acquired) {
|
||||
return@withContext true
|
||||
}
|
||||
if (attempt < SYNC_LOCK_CAS_RETRIES - 1) {
|
||||
delay(SYNC_LOCK_CAS_DELAY_MS)
|
||||
}
|
||||
}
|
||||
return@withContext false
|
||||
}
|
||||
|
||||
private suspend fun tryAcquireSyncLockOnce(holderId: String, leaseUntil: Instant): Boolean {
|
||||
return syncLockMutex.withLock {
|
||||
val current = readSyncLock()
|
||||
val now = Instant.now()
|
||||
val foreignLockActive = current != null &&
|
||||
@@ -684,6 +675,45 @@ class YandexStorageAccessor(
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Выполняется из [OutputStream.close]; ошибки upload пробрасываются вызывающему коду.
|
||||
*/
|
||||
private fun runCommitAfterStreamClosed(block: suspend () -> Unit) {
|
||||
runBlocking(ioDispatcher) {
|
||||
block()
|
||||
}
|
||||
}
|
||||
|
||||
private suspend fun commitUploadedFile(path: String, tmp: File) {
|
||||
try {
|
||||
val diskPath = toDiskPath(path)
|
||||
val prior = guard { repo.getOrNull(diskPath) }
|
||||
if (prior?.type == "dir") {
|
||||
throw WallencException.Storage.CannotWriteOverDirectory()
|
||||
}
|
||||
val hadFile = prior?.type == "file"
|
||||
val priorSize = if (prior?.type == "file") prior.size ?: 0L else 0L
|
||||
guard { repo.uploadFile(diskPath, tmp, overwrite = true) }
|
||||
val after = guard { getMetadataAfterWrite(diskPath) }
|
||||
if (after.type != "file") {
|
||||
throw WallencException.Storage.UnexpectedState()
|
||||
}
|
||||
val newSize = after.size ?: 0L
|
||||
_size.value = ((_size.value ?: 0L) + newSize - priorSize).coerceAtLeast(0L)
|
||||
if (!hadFile) {
|
||||
_numberOfFiles.value = (_numberOfFiles.value ?: 0) + 1
|
||||
}
|
||||
persistStatsImmediate()
|
||||
val info = runCatching { after.toCommonFile(path) }.getOrNull()
|
||||
info?.let {
|
||||
_filesUpdates.emit(DataPage(listOf(it), pageLength = 1, pageIndex = 0))
|
||||
}
|
||||
appendSyncEntry(path = path, operation = StorageSyncOperation.UPSERT)
|
||||
} finally {
|
||||
tmp.delete()
|
||||
}
|
||||
}
|
||||
|
||||
private suspend fun appendSyncEntry(path: String, operation: StorageSyncOperation) {
|
||||
val cleanedPath = if (path.startsWith("/")) path else "/$path"
|
||||
val entries = readSyncJournal()
|
||||
@@ -726,6 +756,8 @@ class YandexStorageAccessor(
|
||||
private const val SYNC_JOURNAL_FILENAME = "sync-journal.json"
|
||||
private const val SYNC_LOCK_FILENAME = "sync-lock.json"
|
||||
private const val SYNC_LOCK_STALE_TIMEOUT_SECONDS: Long = 60 * 60
|
||||
private const val SYNC_LOCK_CAS_RETRIES = 3
|
||||
private const val SYNC_LOCK_CAS_DELAY_MS = 80L
|
||||
private const val STATS_DEBOUNCE_MS = 450L
|
||||
private const val DATA_PAGE_LENGTH = 10
|
||||
private const val API_LIST_LIMIT = 1000
|
||||
|
||||
@@ -27,11 +27,19 @@ private class CloseHandledOutputStream(
|
||||
|
||||
override fun close() {
|
||||
onClosing()
|
||||
var streamFailure: Throwable? = null
|
||||
try {
|
||||
stream.close()
|
||||
} finally {
|
||||
onClose()
|
||||
} catch (t: Throwable) {
|
||||
streamFailure = t
|
||||
}
|
||||
try {
|
||||
onClose()
|
||||
} catch (afterCloseFailure: Throwable) {
|
||||
streamFailure?.let { afterCloseFailure.addSuppressed(it) }
|
||||
throw afterCloseFailure
|
||||
}
|
||||
streamFailure?.let { throw it }
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user