perf(yandex): сузил инвалидацию кэша Disk API и добавил счётчик запросов
Инвалидирую list/get по префиксу пути вместо полной очистки, учитываю вызовы в cloudApiCallCount для замеров.
This commit is contained in:
@@ -25,6 +25,7 @@ import java.io.FilterInputStream
|
|||||||
import java.io.IOException
|
import java.io.IOException
|
||||||
import java.io.InputStream
|
import java.io.InputStream
|
||||||
import java.util.concurrent.ConcurrentHashMap
|
import java.util.concurrent.ConcurrentHashMap
|
||||||
|
import java.util.concurrent.atomic.AtomicLong
|
||||||
|
|
||||||
class YandexDiskRepository(
|
class YandexDiskRepository(
|
||||||
private val api: YandexDiskApi,
|
private val api: YandexDiskApi,
|
||||||
@@ -38,6 +39,13 @@ class YandexDiskRepository(
|
|||||||
|
|
||||||
private val listCache = ConcurrentHashMap<ListCacheKey, ResourceDto>()
|
private val listCache = ConcurrentHashMap<ListCacheKey, ResourceDto>()
|
||||||
private val getCache = ConcurrentHashMap<String, ResourceDto>()
|
private val getCache = ConcurrentHashMap<String, ResourceDto>()
|
||||||
|
private val cloudApiCallCount = AtomicLong(0)
|
||||||
|
|
||||||
|
fun cloudApiCallCount(): Long = cloudApiCallCount.get()
|
||||||
|
|
||||||
|
fun resetCloudApiCallCount() {
|
||||||
|
cloudApiCallCount.set(0)
|
||||||
|
}
|
||||||
|
|
||||||
suspend fun diskInfo(): DiskInfoDto = withContext(ioDispatcher) {
|
suspend fun diskInfo(): DiskInfoDto = withContext(ioDispatcher) {
|
||||||
val now = System.currentTimeMillis()
|
val now = System.currentTimeMillis()
|
||||||
@@ -93,7 +101,7 @@ class YandexDiskRepository(
|
|||||||
suspend fun createFolder(path: String): Unit = withContext(ioDispatcher) {
|
suspend fun createFolder(path: String): Unit = withContext(ioDispatcher) {
|
||||||
val resp = wrapAuth { api.createFolder(path) }
|
val resp = wrapAuth { api.createFolder(path) }
|
||||||
when (resp.code()) {
|
when (resp.code()) {
|
||||||
201, 409 -> invalidateDiskMetaCaches()
|
201, 409 -> invalidateDiskMetaCaches(path)
|
||||||
else -> throw failure("createFolder", resp)
|
else -> throw failure("createFolder", resp)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -101,14 +109,14 @@ class YandexDiskRepository(
|
|||||||
suspend fun delete(path: String, permanently: Boolean = true): Unit = withContext(ioDispatcher) {
|
suspend fun delete(path: String, permanently: Boolean = true): Unit = withContext(ioDispatcher) {
|
||||||
val resp = wrapAuth { api.deleteResource(path, permanently) }
|
val resp = wrapAuth { api.deleteResource(path, permanently) }
|
||||||
when (resp.code()) {
|
when (resp.code()) {
|
||||||
204 -> invalidateDiskMetaCaches()
|
204 -> invalidateDiskMetaCaches(path)
|
||||||
202 -> {
|
202 -> {
|
||||||
val link = resp.body()?.use { body -> parseLink(body) }
|
val link = resp.body()?.use { body -> parseLink(body) }
|
||||||
?: throw IOException("DELETE 202 without body")
|
?: throw IOException("DELETE 202 without body")
|
||||||
awaitOperation(link.href)
|
awaitOperation(link.href)
|
||||||
invalidateDiskMetaCaches()
|
invalidateDiskMetaCaches(path)
|
||||||
}
|
}
|
||||||
404 -> invalidateDiskMetaCaches()
|
404 -> invalidateDiskMetaCaches(path)
|
||||||
else -> throw failure("delete", resp)
|
else -> throw failure("delete", resp)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -122,7 +130,7 @@ class YandexDiskRepository(
|
|||||||
throw failure("patch", resp)
|
throw failure("patch", resp)
|
||||||
}
|
}
|
||||||
resp.body()?.close()
|
resp.body()?.close()
|
||||||
invalidateDiskMetaCaches()
|
invalidateDiskMetaCaches(path)
|
||||||
}
|
}
|
||||||
|
|
||||||
suspend fun uploadBytes(path: String, bytes: ByteArray, overwrite: Boolean = true): Unit =
|
suspend fun uploadBytes(path: String, bytes: ByteArray, overwrite: Boolean = true): Unit =
|
||||||
@@ -134,10 +142,11 @@ class YandexDiskRepository(
|
|||||||
val body = bytes.toRequestBody(OCTET_STREAM)
|
val body = bytes.toRequestBody(OCTET_STREAM)
|
||||||
val req = Request.Builder().url(link.href).put(body).build()
|
val req = Request.Builder().url(link.href).put(body).build()
|
||||||
repeat(LOCKED_RETRY_MAX) { attempt ->
|
repeat(LOCKED_RETRY_MAX) { attempt ->
|
||||||
|
recordCloudApiCall()
|
||||||
rawHttp.newCall(req).execute().use { resp ->
|
rawHttp.newCall(req).execute().use { resp ->
|
||||||
when {
|
when {
|
||||||
resp.isSuccessful -> {
|
resp.isSuccessful -> {
|
||||||
invalidateDiskMetaCaches()
|
invalidateDiskMetaCaches(path)
|
||||||
return@withContext
|
return@withContext
|
||||||
}
|
}
|
||||||
resp.code == 423 && attempt < LOCKED_RETRY_MAX - 1 ->
|
resp.code == 423 && attempt < LOCKED_RETRY_MAX - 1 ->
|
||||||
@@ -158,10 +167,11 @@ class YandexDiskRepository(
|
|||||||
val body = file.asRequestBody(OCTET_STREAM)
|
val body = file.asRequestBody(OCTET_STREAM)
|
||||||
val req = Request.Builder().url(link.href).put(body).build()
|
val req = Request.Builder().url(link.href).put(body).build()
|
||||||
repeat(LOCKED_RETRY_MAX) { attempt ->
|
repeat(LOCKED_RETRY_MAX) { attempt ->
|
||||||
|
recordCloudApiCall()
|
||||||
rawHttp.newCall(req).execute().use { resp ->
|
rawHttp.newCall(req).execute().use { resp ->
|
||||||
when {
|
when {
|
||||||
resp.isSuccessful -> {
|
resp.isSuccessful -> {
|
||||||
invalidateDiskMetaCaches()
|
invalidateDiskMetaCaches(path)
|
||||||
return@withContext
|
return@withContext
|
||||||
}
|
}
|
||||||
resp.code == 423 && attempt < LOCKED_RETRY_MAX - 1 ->
|
resp.code == 423 && attempt < LOCKED_RETRY_MAX - 1 ->
|
||||||
@@ -186,6 +196,7 @@ class YandexDiskRepository(
|
|||||||
}
|
}
|
||||||
val req = Request.Builder().url(link.href).get().build()
|
val req = Request.Builder().url(link.href).get().build()
|
||||||
repeat(LOCKED_RETRY_MAX) { attempt ->
|
repeat(LOCKED_RETRY_MAX) { attempt ->
|
||||||
|
recordCloudApiCall()
|
||||||
val resp = rawHttp.newCall(req).execute()
|
val resp = rawHttp.newCall(req).execute()
|
||||||
when {
|
when {
|
||||||
resp.isSuccessful -> {
|
resp.isSuccessful -> {
|
||||||
@@ -281,18 +292,47 @@ class YandexDiskRepository(
|
|||||||
getCache[path] = value
|
getCache[path] = value
|
||||||
}
|
}
|
||||||
|
|
||||||
private fun invalidateDiskMetaCaches() {
|
private fun invalidateDiskMetaCaches(changedDiskPath: String? = null) {
|
||||||
synchronized(diskCacheLock) {
|
synchronized(diskCacheLock) {
|
||||||
diskInfoCached = null
|
diskInfoCached = null
|
||||||
diskInfoCachedUntilMs = 0L
|
diskInfoCachedUntilMs = 0L
|
||||||
}
|
}
|
||||||
|
if (changedDiskPath == null) {
|
||||||
listCache.clear()
|
listCache.clear()
|
||||||
getCache.clear()
|
getCache.clear()
|
||||||
|
return
|
||||||
|
}
|
||||||
|
val prefixes = cachePrefixesForPath(changedDiskPath)
|
||||||
|
listCache.keys.removeAll { key ->
|
||||||
|
prefixes.any { prefix ->
|
||||||
|
key.path.startsWith(prefix) || prefix.startsWith(key.path.trimEnd('/'))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
getCache.keys.removeAll { cachedPath ->
|
||||||
|
prefixes.any { prefix ->
|
||||||
|
cachedPath.startsWith(prefix) || prefix.startsWith(cachedPath.trimEnd('/'))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private fun cachePrefixesForPath(diskPath: String): List<String> {
|
||||||
|
val normalized = diskPath.trimEnd('/')
|
||||||
|
val out = mutableListOf<String>()
|
||||||
|
var current = normalized
|
||||||
|
while (current.isNotEmpty()) {
|
||||||
|
out.add(current)
|
||||||
|
out.add("$current/")
|
||||||
|
val slash = current.lastIndexOf('/')
|
||||||
|
if (slash <= 0) break
|
||||||
|
current = current.substring(0, slash)
|
||||||
|
}
|
||||||
|
return out
|
||||||
}
|
}
|
||||||
|
|
||||||
private suspend inline fun <T> wrapAuth(crossinline block: suspend () -> T): T {
|
private suspend inline fun <T> wrapAuth(crossinline block: suspend () -> T): T {
|
||||||
repeat(LOCKED_RETRY_MAX) { attempt ->
|
repeat(LOCKED_RETRY_MAX) { attempt ->
|
||||||
try {
|
try {
|
||||||
|
recordCloudApiCall()
|
||||||
return block()
|
return block()
|
||||||
} catch (e: HttpException) {
|
} catch (e: HttpException) {
|
||||||
when (e.code()) {
|
when (e.code()) {
|
||||||
@@ -313,6 +353,10 @@ class YandexDiskRepository(
|
|||||||
error("unreachable")
|
error("unreachable")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private fun recordCloudApiCall() {
|
||||||
|
cloudApiCallCount.incrementAndGet()
|
||||||
|
}
|
||||||
|
|
||||||
private fun failure(op: String, resp: Response<ResponseBody>): IOException {
|
private fun failure(op: String, resp: Response<ResponseBody>): IOException {
|
||||||
val msg = resp.errorBody()?.string() ?: resp.message()
|
val msg = resp.errorBody()?.string() ?: resp.message()
|
||||||
return IOException("$op failed: HTTP ${resp.code()} $msg")
|
return IOException("$op failed: HTTP ${resp.code()} $msg")
|
||||||
|
|||||||
@@ -33,6 +33,7 @@ class YandexDiskRepositoryTest {
|
|||||||
|
|
||||||
@Test
|
@Test
|
||||||
fun diskInfoParsesResponse() = runBlocking {
|
fun diskInfoParsesResponse() = runBlocking {
|
||||||
|
repository.resetCloudApiCallCount()
|
||||||
server.enqueue(
|
server.enqueue(
|
||||||
MockResponse()
|
MockResponse()
|
||||||
.setResponseCode(200)
|
.setResponseCode(200)
|
||||||
@@ -42,6 +43,7 @@ class YandexDiskRepositoryTest {
|
|||||||
val info = repository.diskInfo()
|
val info = repository.diskInfo()
|
||||||
assertEquals(1000L, info.totalSpace)
|
assertEquals(1000L, info.totalSpace)
|
||||||
assertEquals(200L, info.usedSpace)
|
assertEquals(200L, info.usedSpace)
|
||||||
|
assertEquals(1L, repository.cloudApiCallCount())
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
|
|||||||
Reference in New Issue
Block a user