architecture
Распределённое кэширование
Стратегии и паттерны распределенного кэширования в микросервисах
•
#caching
#distributed-systems
#redis
#performance
Распределённое кэширование
Распределенное кэширование — критически важный компонент для масштабирования приложений и снижения нагрузки на базы данных.
Архитектура распределенного кэша
Centralized Cache
class CentralizedCache {
private redis: Redis;
constructor(redisConfig: RedisConfig) {
this.redis = new Redis(redisConfig);
}
async get(key: string): Promise<any> {
const data = await this.redis.get(key);
return data ? JSON.parse(data) : null;
}
async set(key: string, value: any, ttl: number): Promise<void> {
await this.redis.set(
key,
JSON.stringify(value),
'EX',
ttl
);
}
async delete(key: string): Promise<void> {
await this.redis.del(key);
}
async invalidatePattern(pattern: string): Promise<void> {
const keys = await this.redis.keys(pattern);
if (keys.length > 0) {
await this.redis.del(...keys);
}
}
}
// Использование
const cache = new CentralizedCache({
host: 'redis.example.com',
port: 6379,
password: process.env.REDIS_PASSWORD
});
// Кэширование результата запроса
async function getUser(userId: string): Promise<User> {
const cacheKey = `user:${userId}`;
// Проверяем кэш
const cached = await cache.get(cacheKey);
if (cached) return cached;
// Загружаем из БД
const user = await db.query('SELECT * FROM users WHERE id = ?', [userId]);
// Сохраняем в кэш
await cache.set(cacheKey, user, 3600); // 1 час
return user;
}
Distributed Cache (Redis Cluster)
import { Cluster } from 'ioredis';
class DistributedCache {
private cluster: Cluster;
constructor() {
this.cluster = new Cluster([
{ host: 'redis-1.example.com', port: 6379 },
{ host: 'redis-2.example.com', port: 6379 },
{ host: 'redis-3.example.com', port: 6379 }
], {
redisOptions: {
password: process.env.REDIS_PASSWORD
},
clusterRetryStrategy: (times) => {
return Math.min(times * 100, 2000);
}
});
}
async get(key: string): Promise<any> {
const data = await this.cluster.get(key);
return data ? JSON.parse(data) : null;
}
async set(key: string, value: any, ttl: number): Promise<void> {
await this.cluster.set(
key,
JSON.stringify(value),
'EX',
ttl
);
}
async mget(keys: string[]): Promise<any[]> {
// Группируем ключи по слотам для эффективности
const pipeline = this.cluster.pipeline();
keys.forEach(key => pipeline.get(key));
const results = await pipeline.exec();
return results!.map(([err, data]) =>
data ? JSON.parse(data as string) : null
);
}
async mset(entries: Array<{ key: string; value: any; ttl: number }>): Promise<void> {
const pipeline = this.cluster.pipeline();
entries.forEach(({ key, value, ttl }) => {
pipeline.set(key, JSON.stringify(value), 'EX', ttl);
});
await pipeline.exec();
}
}
Многоуровневое кэширование
L1 (In-Memory) + L2 (Redis)
class MultiLevelCache {
private l1Cache: Map<string, CacheEntry>;
private l2Cache: Redis;
private l1MaxSize: number;
private l1TTL: number;
constructor(
redis: Redis,
options: { maxSize: number; ttl: number }
) {
this.l1Cache = new Map();
this.l2Cache = redis;
this.l1MaxSize = options.maxSize;
this.l1TTL = options.ttl;
// Периодически очищаем устаревшие записи L1
setInterval(() => this.cleanupL1(), 60000);
}
async get(key: string): Promise<any> {
// Level 1: In-memory cache
const l1Entry = this.l1Cache.get(key);
if (l1Entry && !this.isExpired(l1Entry)) {
return l1Entry.value;
}
// Level 2: Redis cache
const l2Data = await this.l2Cache.get(key);
if (l2Data) {
const value = JSON.parse(l2Data);
this.setL1(key, value);
return value;
}
return null;
}
async set(key: string, value: any, ttl: number): Promise<void> {
// Сохраняем в оба уровня
this.setL1(key, value);
await this.l2Cache.set(key, JSON.stringify(value), 'EX', ttl);
}
async delete(key: string): Promise<void> {
this.l1Cache.delete(key);
await this.l2Cache.del(key);
}
private setL1(key: string, value: any): void {
// Проверяем размер кэша
if (this.l1Cache.size >= this.l1MaxSize) {
this.evictL1();
}
this.l1Cache.set(key, {
value,
timestamp: Date.now()
});
}
private evictL1(): void {
// LRU eviction: удаляем самую старую запись
const oldestKey = Array.from(this.l1Cache.entries())
.sort((a, b) => a[1].timestamp - b[1].timestamp)[0][0];
this.l1Cache.delete(oldestKey);
}
private isExpired(entry: CacheEntry): boolean {
return Date.now() - entry.timestamp > this.l1TTL;
}
private cleanupL1(): void {
const now = Date.now();
for (const [key, entry] of this.l1Cache.entries()) {
if (now - entry.timestamp > this.l1TTL) {
this.l1Cache.delete(key);
}
}
}
}
interface CacheEntry {
value: any;
timestamp: number;
}
Паттерны кэширования
Cache-Aside (Lazy Loading)
class CacheAsideRepository {
constructor(
private cache: DistributedCache,
private database: Database
) {}
async getProduct(productId: string): Promise<Product> {
const cacheKey = `product:${productId}`;
// 1. Проверяем кэш
const cached = await this.cache.get(cacheKey);
if (cached) {
return cached;
}
// 2. Загружаем из БД
const product = await this.database.query(
'SELECT * FROM products WHERE id = ?',
[productId]
);
if (!product) {
throw new Error('Product not found');
}
// 3. Сохраняем в кэш
await this.cache.set(cacheKey, product, 3600);
return product;
}
async updateProduct(productId: string, updates: Partial<Product>): Promise<void> {
// 1. Обновляем в БД
await this.database.query(
'UPDATE products SET name = ?, price = ? WHERE id = ?',
[updates.name, updates.price, productId]
);
// 2. Инвалидируем кэш
await this.cache.delete(`product:${productId}`);
}
}
Write-Through Cache
class WriteThroughCache {
constructor(
private cache: DistributedCache,
private database: Database
) {}
async saveProduct(product: Product): Promise<void> {
const cacheKey = `product:${product.id}`;
// 1. Пишем в БД (синхронно)
await this.database.query(
'INSERT INTO products (id, name, price) VALUES (?, ?, ?) ON DUPLICATE KEY UPDATE name = ?, price = ?',
[product.id, product.name, product.price, product.name, product.price]
);
// 2. Обновляем кэш
await this.cache.set(cacheKey, product, 3600);
}
async getProduct(productId: string): Promise<Product> {
const cacheKey = `product:${productId}`;
// Читаем из кэша (всегда актуальный)
const cached = await this.cache.get(cacheKey);
if (cached) {
return cached;
}
// Если нет в кэше - загружаем из БД
const product = await this.database.query(
'SELECT * FROM products WHERE id = ?',
[productId]
);
if (product) {
await this.cache.set(cacheKey, product, 3600);
}
return product;
}
}
Write-Behind (Write-Back) Cache
class WriteBehindCache {
private writeQueue: Map<string, QueuedWrite> = new Map();
private flushInterval: NodeJS.Timeout;
constructor(
private cache: DistributedCache,
private database: Database,
private flushIntervalMs: number = 5000
) {
this.startFlushTimer();
}
async set(key: string, value: any, ttl: number): Promise<void> {
// 1. Сразу пишем в кэш
await this.cache.set(key, value, ttl);
// 2. Добавляем в очередь на запись в БД
this.writeQueue.set(key, {
value,
timestamp: Date.now()
});
}
async get(key: string): Promise<any> {
return this.cache.get(key);
}
private startFlushTimer(): void {
this.flushInterval = setInterval(
() => this.flushToDatabase(),
this.flushIntervalMs
);
}
private async flushToDatabase(): Promise<void> {
if (this.writeQueue.size === 0) return;
const batch = Array.from(this.writeQueue.entries());
this.writeQueue.clear();
try {
// Батчевая запись в БД
await this.database.transaction(async (tx) => {
for (const [key, write] of batch) {
const [table, id] = key.split(':');
await tx.query(
`INSERT INTO ${table} (id, data) VALUES (?, ?) ON DUPLICATE KEY UPDATE data = ?`,
[id, JSON.stringify(write.value), JSON.stringify(write.value)]
);
}
});
} catch (error) {
// При ошибке возвращаем в очередь
batch.forEach(([key, write]) => {
this.writeQueue.set(key, write);
});
console.error('Failed to flush to database:', error);
}
}
async shutdown(): Promise<void> {
clearInterval(this.flushInterval);
await this.flushToDatabase();
}
}
interface QueuedWrite {
value: any;
timestamp: number;
}
Cache Stampede Protection
Locking Pattern
class StampedeProtectedCache {
private locks = new Map<string, Promise<any>>();
constructor(
private cache: DistributedCache,
private database: Database
) {}
async get(key: string): Promise<any> {
// Проверяем кэш
const cached = await this.cache.get(key);
if (cached) {
return cached;
}
// Проверяем, не загружает ли уже кто-то эти данные
if (this.locks.has(key)) {
return this.locks.get(key);
}
// Создаем промис для загрузки
const loadPromise = this.loadAndCache(key);
this.locks.set(key, loadPromise);
try {
return await loadPromise;
} finally {
this.locks.delete(key);
}
}
private async loadAndCache(key: string): Promise<any> {
const [table, id] = key.split(':');
const data = await this.database.query(
`SELECT * FROM ${table} WHERE id = ?`,
[id]
);
if (data) {
await this.cache.set(key, data, 3600);
}
return data;
}
}
Probabilistic Early Expiration
class ProbabilisticCache {
constructor(
private cache: DistributedCache,
private database: Database
) {}
async get(key: string, ttl: number, beta: number = 1.0): Promise<any> {
const cached = await this.cache.getWithTTL(key);
if (!cached) {
return this.loadAndCache(key, ttl);
}
const { value, remainingTTL } = cached;
// Вероятностное раннее обновление
const delta = Date.now() - (ttl - remainingTTL);
const shouldRefresh = delta * beta * Math.log(Math.random()) >= 0;
if (shouldRefresh) {
// Асинхронно обновляем кэш
this.loadAndCache(key, ttl).catch(console.error);
}
return value;
}
private async loadAndCache(key: string, ttl: number): Promise<any> {
const [table, id] = key.split(':');
const data = await this.database.query(
`SELECT * FROM ${table} WHERE id = ?`,
[id]
);
if (data) {
await this.cache.set(key, data, ttl);
}
return data;
}
}
Инвалидация кэша
Event-Based Invalidation
class EventBasedInvalidation {
constructor(
private cache: DistributedCache,
private eventBus: EventEmitter
) {
this.setupInvalidationListeners();
}
private setupInvalidationListeners(): void {
// Инвалидация при обновлении пользователя
this.eventBus.on('user:updated', async (userId: string) => {
await this.cache.delete(`user:${userId}`);
await this.cache.invalidatePattern(`user:${userId}:*`);
});
// Инвалидация при создании заказа
this.eventBus.on('order:created', async (order: Order) => {
await this.cache.delete(`user:${order.userId}:orders`);
await this.cache.delete(`user:${order.userId}:stats`);
});
// Инвалидация при обновлении продукта
this.eventBus.on('product:updated', async (productId: string) => {
await this.cache.delete(`product:${productId}`);
await this.cache.invalidatePattern(`category:*:products`);
});
}
}
Cache Tags
class TaggedCache {
constructor(private cache: DistributedCache) {}
async set(key: string, value: any, ttl: number, tags: string[]): Promise<void> {
// Сохраняем данные
await this.cache.set(key, value, ttl);
// Связываем с тегами
const pipeline = this.cache.cluster.pipeline();
for (const tag of tags) {
pipeline.sadd(`tag:${tag}`, key);
pipeline.expire(`tag:${tag}`, ttl);
}
await pipeline.exec();
}
async invalidateByTag(tag: string): Promise<void> {
// Получаем все ключи с этим тегом
const keys = await this.cache.cluster.smembers(`tag:${tag}`);
if (keys.length > 0) {
const pipeline = this.cache.cluster.pipeline();
// Удаляем все ключи
keys.forEach(key => pipeline.del(key));
// Удаляем сам тег
pipeline.del(`tag:${tag}`);
await pipeline.exec();
}
}
async invalidateByTags(tags: string[]): Promise<void> {
await Promise.all(tags.map(tag => this.invalidateByTag(tag)));
}
}
// Использование
const taggedCache = new TaggedCache(cache);
// Сохраняем с тегами
await taggedCache.set(
'product:123',
product,
3600,
['product', 'category:electronics', 'brand:apple']
);
// Инвалидируем все продукты категории
await taggedCache.invalidateByTag('category:electronics');
// Инвалидируем все продукты бренда
await taggedCache.invalidateByTag('brand:apple');
Time-Based Invalidation with Refresh
class RefreshingCache {
constructor(
private cache: DistributedCache,
private database: Database
) {}
async get(key: string, loader: () => Promise<any>, ttl: number): Promise<any> {
const cached = await this.cache.get(key);
if (cached) {
// Проверяем, не пора ли обновить
const refreshKey = `${key}:refresh`;
const shouldRefresh = await this.cache.get(refreshKey);
if (!shouldRefresh) {
// Устанавливаем флаг обновления (раньше основного TTL)
const refreshTTL = Math.floor(ttl * 0.8);
await this.cache.set(refreshKey, true, refreshTTL);
// Асинхронно обновляем кэш
this.refreshCache(key, loader, ttl).catch(console.error);
}
return cached;
}
// Кэш пуст - загружаем синхронно
return this.refreshCache(key, loader, ttl);
}
private async refreshCache(
key: string,
loader: () => Promise<any>,
ttl: number
): Promise<any> {
const data = await loader();
await this.cache.set(key, data, ttl);
return data;
}
}
// Использование
const refreshingCache = new RefreshingCache(cache, database);
async function getPopularProducts(): Promise<Product[]> {
return refreshingCache.get(
'products:popular',
async () => {
return database.query(
'SELECT * FROM products ORDER BY sales DESC LIMIT 10'
);
},
3600 // 1 час
);
}
Распределенная блокировка
Redis-based Distributed Lock
class DistributedLock {
constructor(private redis: Redis) {}
async acquire(
lockKey: string,
ttl: number,
retries: number = 3
): Promise<string | null> {
const lockValue = generateId();
for (let i = 0; i < retries; i++) {
// Пытаемся установить блокировку
const result = await this.redis.set(
lockKey,
lockValue,
'NX', // Только если ключ не существует
'EX', // С TTL
ttl
);
if (result === 'OK') {
return lockValue;
}
// Ждем перед следующей попыткой
await sleep(100 * (i + 1));
}
return null;
}
async release(lockKey: string, lockValue: string): Promise<boolean> {
// Lua скрипт для атомарного удаления
const script = `
if redis.call("get", KEYS[1]) == ARGV[1] then
return redis.call("del", KEYS[1])
else
return 0
end
`;
const result = await this.redis.eval(script, 1, lockKey, lockValue);
return result === 1;
}
async withLock<T>(
lockKey: string,
ttl: number,
fn: () => Promise<T>
): Promise<T> {
const lockValue = await this.acquire(lockKey, ttl);
if (!lockValue) {
throw new Error('Failed to acquire lock');
}
try {
return await fn();
} finally {
await this.release(lockKey, lockValue);
}
}
}
// Использование
const lock = new DistributedLock(redis);
async function updateInventory(productId: string, quantity: number): Promise<void> {
await lock.withLock(
`inventory:${productId}`,
10, // 10 секунд
async () => {
const current = await getInventory(productId);
const newQuantity = current + quantity;
await setInventory(productId, newQuantity);
}
);
}
Мониторинг кэша
class CacheMonitoring {
private metrics = {
hits: 0,
misses: 0,
errors: 0,
latency: [] as number[]
};
recordHit(): void {
this.metrics.hits++;
}
recordMiss(): void {
this.metrics.misses++;
}
recordError(): void {
this.metrics.errors++;
}
recordLatency(ms: number): void {
this.metrics.latency.push(ms);
// Храним только последние 1000 измерений
if (this.metrics.latency.length > 1000) {
this.metrics.latency.shift();
}
}
getMetrics(): CacheMetrics {
const total = this.metrics.hits + this.metrics.misses;
const hitRate = total > 0 ? this.metrics.hits / total : 0;
const avgLatency = this.metrics.latency.length > 0
? this.metrics.latency.reduce((sum, l) => sum + l, 0) / this.metrics.latency.length
: 0;
const p95Latency = this.calculatePercentile(this.metrics.latency, 0.95);
const p99Latency = this.calculatePercentile(this.metrics.latency, 0.99);
return {
hits: this.metrics.hits,
misses: this.metrics.misses,
errors: this.metrics.errors,
hitRate,
avgLatency,
p95Latency,
p99Latency
};
}
private calculatePercentile(values: number[], percentile: number): number {
if (values.length === 0) return 0;
const sorted = [...values].sort((a, b) => a - b);
const index = Math.ceil(sorted.length * percentile) - 1;
return sorted[index];
}
reset(): void {
this.metrics = {
hits: 0,
misses: 0,
errors: 0,
latency: []
};
}
}
interface CacheMetrics {
hits: number;
misses: number;
errors: number;
hitRate: number;
avgLatency: number;
p95Latency: number;
p99Latency: number;
}
// Интеграция с кэшем
class MonitoredCache {
private monitoring = new CacheMonitoring();
constructor(private cache: DistributedCache) {
// Периодически отправляем метрики
setInterval(() => {
const metrics = this.monitoring.getMetrics();
console.log('Cache metrics:', metrics);
// Отправляем в систему мониторинга
sendToMonitoring(metrics);
this.monitoring.reset();
}, 60000); // Каждую минуту
}
async get(key: string): Promise<any> {
const start = Date.now();
try {
const value = await this.cache.get(key);
if (value) {
this.monitoring.recordHit();
} else {
this.monitoring.recordMiss();
}
this.monitoring.recordLatency(Date.now() - start);
return value;
} catch (error) {
this.monitoring.recordError();
throw error;
}
}
getMetrics(): CacheMetrics {
return this.monitoring.getMetrics();
}
}
Best Practices
1. Правильный выбор TTL
const CacheTTL = {
STATIC_CONTENT: 86400, // 24 часа
USER_PROFILE: 3600, // 1 час
PRODUCT_CATALOG: 1800, // 30 минут
SHOPPING_CART: 900, // 15 минут
SESSION: 7200, // 2 часа
REAL_TIME_DATA: 60, // 1 минута
API_RESPONSE: 300 // 5 минут
};
2. Namespace для ключей
class CacheKeyBuilder {
static user(userId: string): string {
return `user:${userId}`;
}
static userOrders(userId: string): string {
return `user:${userId}:orders`;
}
static product(productId: string): string {
return `product:${productId}`;
}
static categoryProducts(categoryId: string, page: number): string {
return `category:${categoryId}:products:page:${page}`;
}
static searchResults(query: string, filters: any): string {
const filterHash = hashObject(filters);
return `search:${query}:${filterHash}`;
}
}
3. Graceful Degradation
class ResilientCache {
constructor(
private cache: DistributedCache,
private database: Database
) {}
async get(key: string): Promise<any> {
try {
const cached = await this.cache.get(key);
if (cached) return cached;
} catch (error) {
console.error('Cache error, falling back to database:', error);
// Продолжаем работу без кэша
}
// Всегда возвращаемся к БД при проблемах с кэшем
const [table, id] = key.split(':');
return this.database.query(
`SELECT * FROM ${table} WHERE id = ?`,
[id]
);
}
}
Заключение
Распределенное кэширование требует:
- Правильную стратегию — Cache-Aside, Write-Through, Write-Behind
- Защиту от stampede — блокировки или probabilistic expiration
- Эффективную инвалидацию — event-based или tags
- Мониторинг — hit rate, latency, errors
- Graceful degradation — работа при отказе кэша
- Правильные TTL — баланс между свежестью и нагрузкой