architecture

Распределённое кэширование

Стратегии и паттерны распределенного кэширования в микросервисах

#caching #distributed-systems #redis #performance

Распределённое кэширование

Распределенное кэширование — критически важный компонент для масштабирования приложений и снижения нагрузки на базы данных.

Архитектура распределенного кэша

Centralized Cache

class CentralizedCache {
  private redis: Redis;
  
  constructor(redisConfig: RedisConfig) {
    this.redis = new Redis(redisConfig);
  }
  
  async get(key: string): Promise<any> {
    const data = await this.redis.get(key);
    return data ? JSON.parse(data) : null;
  }
  
  async set(key: string, value: any, ttl: number): Promise<void> {
    await this.redis.set(
      key,
      JSON.stringify(value),
      'EX',
      ttl
    );
  }
  
  async delete(key: string): Promise<void> {
    await this.redis.del(key);
  }
  
  async invalidatePattern(pattern: string): Promise<void> {
    const keys = await this.redis.keys(pattern);
    if (keys.length > 0) {
      await this.redis.del(...keys);
    }
  }
}

// Использование
const cache = new CentralizedCache({
  host: 'redis.example.com',
  port: 6379,
  password: process.env.REDIS_PASSWORD
});

// Кэширование результата запроса
async function getUser(userId: string): Promise<User> {
  const cacheKey = `user:${userId}`;
  
  // Проверяем кэш
  const cached = await cache.get(cacheKey);
  if (cached) return cached;
  
  // Загружаем из БД
  const user = await db.query('SELECT * FROM users WHERE id = ?', [userId]);
  
  // Сохраняем в кэш
  await cache.set(cacheKey, user, 3600); // 1 час
  
  return user;
}

Distributed Cache (Redis Cluster)

import { Cluster } from 'ioredis';

class DistributedCache {
  private cluster: Cluster;
  
  constructor() {
    this.cluster = new Cluster([
      { host: 'redis-1.example.com', port: 6379 },
      { host: 'redis-2.example.com', port: 6379 },
      { host: 'redis-3.example.com', port: 6379 }
    ], {
      redisOptions: {
        password: process.env.REDIS_PASSWORD
      },
      clusterRetryStrategy: (times) => {
        return Math.min(times * 100, 2000);
      }
    });
  }
  
  async get(key: string): Promise<any> {
    const data = await this.cluster.get(key);
    return data ? JSON.parse(data) : null;
  }
  
  async set(key: string, value: any, ttl: number): Promise<void> {
    await this.cluster.set(
      key,
      JSON.stringify(value),
      'EX',
      ttl
    );
  }
  
  async mget(keys: string[]): Promise<any[]> {
    // Группируем ключи по слотам для эффективности
    const pipeline = this.cluster.pipeline();
    keys.forEach(key => pipeline.get(key));
    
    const results = await pipeline.exec();
    return results!.map(([err, data]) => 
      data ? JSON.parse(data as string) : null
    );
  }
  
  async mset(entries: Array<{ key: string; value: any; ttl: number }>): Promise<void> {
    const pipeline = this.cluster.pipeline();
    
    entries.forEach(({ key, value, ttl }) => {
      pipeline.set(key, JSON.stringify(value), 'EX', ttl);
    });
    
    await pipeline.exec();
  }
}

Многоуровневое кэширование

L1 (In-Memory) + L2 (Redis)

class MultiLevelCache {
  private l1Cache: Map<string, CacheEntry>;
  private l2Cache: Redis;
  private l1MaxSize: number;
  private l1TTL: number;
  
  constructor(
    redis: Redis,
    options: { maxSize: number; ttl: number }
  ) {
    this.l1Cache = new Map();
    this.l2Cache = redis;
    this.l1MaxSize = options.maxSize;
    this.l1TTL = options.ttl;
    
    // Периодически очищаем устаревшие записи L1
    setInterval(() => this.cleanupL1(), 60000);
  }
  
  async get(key: string): Promise<any> {
    // Level 1: In-memory cache
    const l1Entry = this.l1Cache.get(key);
    if (l1Entry && !this.isExpired(l1Entry)) {
      return l1Entry.value;
    }
    
    // Level 2: Redis cache
    const l2Data = await this.l2Cache.get(key);
    if (l2Data) {
      const value = JSON.parse(l2Data);
      this.setL1(key, value);
      return value;
    }
    
    return null;
  }
  
  async set(key: string, value: any, ttl: number): Promise<void> {
    // Сохраняем в оба уровня
    this.setL1(key, value);
    await this.l2Cache.set(key, JSON.stringify(value), 'EX', ttl);
  }
  
  async delete(key: string): Promise<void> {
    this.l1Cache.delete(key);
    await this.l2Cache.del(key);
  }
  
  private setL1(key: string, value: any): void {
    // Проверяем размер кэша
    if (this.l1Cache.size >= this.l1MaxSize) {
      this.evictL1();
    }
    
    this.l1Cache.set(key, {
      value,
      timestamp: Date.now()
    });
  }
  
  private evictL1(): void {
    // LRU eviction: удаляем самую старую запись
    const oldestKey = Array.from(this.l1Cache.entries())
      .sort((a, b) => a[1].timestamp - b[1].timestamp)[0][0];
    
    this.l1Cache.delete(oldestKey);
  }
  
  private isExpired(entry: CacheEntry): boolean {
    return Date.now() - entry.timestamp > this.l1TTL;
  }
  
  private cleanupL1(): void {
    const now = Date.now();
    
    for (const [key, entry] of this.l1Cache.entries()) {
      if (now - entry.timestamp > this.l1TTL) {
        this.l1Cache.delete(key);
      }
    }
  }
}

interface CacheEntry {
  value: any;
  timestamp: number;
}

Паттерны кэширования

Cache-Aside (Lazy Loading)

class CacheAsideRepository {
  constructor(
    private cache: DistributedCache,
    private database: Database
  ) {}
  
  async getProduct(productId: string): Promise<Product> {
    const cacheKey = `product:${productId}`;
    
    // 1. Проверяем кэш
    const cached = await this.cache.get(cacheKey);
    if (cached) {
      return cached;
    }
    
    // 2. Загружаем из БД
    const product = await this.database.query(
      'SELECT * FROM products WHERE id = ?',
      [productId]
    );
    
    if (!product) {
      throw new Error('Product not found');
    }
    
    // 3. Сохраняем в кэш
    await this.cache.set(cacheKey, product, 3600);
    
    return product;
  }
  
  async updateProduct(productId: string, updates: Partial<Product>): Promise<void> {
    // 1. Обновляем в БД
    await this.database.query(
      'UPDATE products SET name = ?, price = ? WHERE id = ?',
      [updates.name, updates.price, productId]
    );
    
    // 2. Инвалидируем кэш
    await this.cache.delete(`product:${productId}`);
  }
}

Write-Through Cache

class WriteThroughCache {
  constructor(
    private cache: DistributedCache,
    private database: Database
  ) {}
  
  async saveProduct(product: Product): Promise<void> {
    const cacheKey = `product:${product.id}`;
    
    // 1. Пишем в БД (синхронно)
    await this.database.query(
      'INSERT INTO products (id, name, price) VALUES (?, ?, ?) ON DUPLICATE KEY UPDATE name = ?, price = ?',
      [product.id, product.name, product.price, product.name, product.price]
    );
    
    // 2. Обновляем кэш
    await this.cache.set(cacheKey, product, 3600);
  }
  
  async getProduct(productId: string): Promise<Product> {
    const cacheKey = `product:${productId}`;
    
    // Читаем из кэша (всегда актуальный)
    const cached = await this.cache.get(cacheKey);
    if (cached) {
      return cached;
    }
    
    // Если нет в кэше - загружаем из БД
    const product = await this.database.query(
      'SELECT * FROM products WHERE id = ?',
      [productId]
    );
    
    if (product) {
      await this.cache.set(cacheKey, product, 3600);
    }
    
    return product;
  }
}

Write-Behind (Write-Back) Cache

class WriteBehindCache {
  private writeQueue: Map<string, QueuedWrite> = new Map();
  private flushInterval: NodeJS.Timeout;
  
  constructor(
    private cache: DistributedCache,
    private database: Database,
    private flushIntervalMs: number = 5000
  ) {
    this.startFlushTimer();
  }
  
  async set(key: string, value: any, ttl: number): Promise<void> {
    // 1. Сразу пишем в кэш
    await this.cache.set(key, value, ttl);
    
    // 2. Добавляем в очередь на запись в БД
    this.writeQueue.set(key, {
      value,
      timestamp: Date.now()
    });
  }
  
  async get(key: string): Promise<any> {
    return this.cache.get(key);
  }
  
  private startFlushTimer(): void {
    this.flushInterval = setInterval(
      () => this.flushToDatabase(),
      this.flushIntervalMs
    );
  }
  
  private async flushToDatabase(): Promise<void> {
    if (this.writeQueue.size === 0) return;
    
    const batch = Array.from(this.writeQueue.entries());
    this.writeQueue.clear();
    
    try {
      // Батчевая запись в БД
      await this.database.transaction(async (tx) => {
        for (const [key, write] of batch) {
          const [table, id] = key.split(':');
          await tx.query(
            `INSERT INTO ${table} (id, data) VALUES (?, ?) ON DUPLICATE KEY UPDATE data = ?`,
            [id, JSON.stringify(write.value), JSON.stringify(write.value)]
          );
        }
      });
    } catch (error) {
      // При ошибке возвращаем в очередь
      batch.forEach(([key, write]) => {
        this.writeQueue.set(key, write);
      });
      console.error('Failed to flush to database:', error);
    }
  }
  
  async shutdown(): Promise<void> {
    clearInterval(this.flushInterval);
    await this.flushToDatabase();
  }
}

interface QueuedWrite {
  value: any;
  timestamp: number;
}

Cache Stampede Protection

Locking Pattern

class StampedeProtectedCache {
  private locks = new Map<string, Promise<any>>();
  
  constructor(
    private cache: DistributedCache,
    private database: Database
  ) {}
  
  async get(key: string): Promise<any> {
    // Проверяем кэш
    const cached = await this.cache.get(key);
    if (cached) {
      return cached;
    }
    
    // Проверяем, не загружает ли уже кто-то эти данные
    if (this.locks.has(key)) {
      return this.locks.get(key);
    }
    
    // Создаем промис для загрузки
    const loadPromise = this.loadAndCache(key);
    this.locks.set(key, loadPromise);
    
    try {
      return await loadPromise;
    } finally {
      this.locks.delete(key);
    }
  }
  
  private async loadAndCache(key: string): Promise<any> {
    const [table, id] = key.split(':');
    
    const data = await this.database.query(
      `SELECT * FROM ${table} WHERE id = ?`,
      [id]
    );
    
    if (data) {
      await this.cache.set(key, data, 3600);
    }
    
    return data;
  }
}

Probabilistic Early Expiration

class ProbabilisticCache {
  constructor(
    private cache: DistributedCache,
    private database: Database
  ) {}
  
  async get(key: string, ttl: number, beta: number = 1.0): Promise<any> {
    const cached = await this.cache.getWithTTL(key);
    
    if (!cached) {
      return this.loadAndCache(key, ttl);
    }
    
    const { value, remainingTTL } = cached;
    
    // Вероятностное раннее обновление
    const delta = Date.now() - (ttl - remainingTTL);
    const shouldRefresh = delta * beta * Math.log(Math.random()) >= 0;
    
    if (shouldRefresh) {
      // Асинхронно обновляем кэш
      this.loadAndCache(key, ttl).catch(console.error);
    }
    
    return value;
  }
  
  private async loadAndCache(key: string, ttl: number): Promise<any> {
    const [table, id] = key.split(':');
    
    const data = await this.database.query(
      `SELECT * FROM ${table} WHERE id = ?`,
      [id]
    );
    
    if (data) {
      await this.cache.set(key, data, ttl);
    }
    
    return data;
  }
}

Инвалидация кэша

Event-Based Invalidation

class EventBasedInvalidation {
  constructor(
    private cache: DistributedCache,
    private eventBus: EventEmitter
  ) {
    this.setupInvalidationListeners();
  }
  
  private setupInvalidationListeners(): void {
    // Инвалидация при обновлении пользователя
    this.eventBus.on('user:updated', async (userId: string) => {
      await this.cache.delete(`user:${userId}`);
      await this.cache.invalidatePattern(`user:${userId}:*`);
    });
    
    // Инвалидация при создании заказа
    this.eventBus.on('order:created', async (order: Order) => {
      await this.cache.delete(`user:${order.userId}:orders`);
      await this.cache.delete(`user:${order.userId}:stats`);
    });
    
    // Инвалидация при обновлении продукта
    this.eventBus.on('product:updated', async (productId: string) => {
      await this.cache.delete(`product:${productId}`);
      await this.cache.invalidatePattern(`category:*:products`);
    });
  }
}

Cache Tags

class TaggedCache {
  constructor(private cache: DistributedCache) {}
  
  async set(key: string, value: any, ttl: number, tags: string[]): Promise<void> {
    // Сохраняем данные
    await this.cache.set(key, value, ttl);
    
    // Связываем с тегами
    const pipeline = this.cache.cluster.pipeline();
    
    for (const tag of tags) {
      pipeline.sadd(`tag:${tag}`, key);
      pipeline.expire(`tag:${tag}`, ttl);
    }
    
    await pipeline.exec();
  }
  
  async invalidateByTag(tag: string): Promise<void> {
    // Получаем все ключи с этим тегом
    const keys = await this.cache.cluster.smembers(`tag:${tag}`);
    
    if (keys.length > 0) {
      const pipeline = this.cache.cluster.pipeline();
      
      // Удаляем все ключи
      keys.forEach(key => pipeline.del(key));
      
      // Удаляем сам тег
      pipeline.del(`tag:${tag}`);
      
      await pipeline.exec();
    }
  }
  
  async invalidateByTags(tags: string[]): Promise<void> {
    await Promise.all(tags.map(tag => this.invalidateByTag(tag)));
  }
}

// Использование
const taggedCache = new TaggedCache(cache);

// Сохраняем с тегами
await taggedCache.set(
  'product:123',
  product,
  3600,
  ['product', 'category:electronics', 'brand:apple']
);

// Инвалидируем все продукты категории
await taggedCache.invalidateByTag('category:electronics');

// Инвалидируем все продукты бренда
await taggedCache.invalidateByTag('brand:apple');

Time-Based Invalidation with Refresh

class RefreshingCache {
  constructor(
    private cache: DistributedCache,
    private database: Database
  ) {}
  
  async get(key: string, loader: () => Promise<any>, ttl: number): Promise<any> {
    const cached = await this.cache.get(key);
    
    if (cached) {
      // Проверяем, не пора ли обновить
      const refreshKey = `${key}:refresh`;
      const shouldRefresh = await this.cache.get(refreshKey);
      
      if (!shouldRefresh) {
        // Устанавливаем флаг обновления (раньше основного TTL)
        const refreshTTL = Math.floor(ttl * 0.8);
        await this.cache.set(refreshKey, true, refreshTTL);
        
        // Асинхронно обновляем кэш
        this.refreshCache(key, loader, ttl).catch(console.error);
      }
      
      return cached;
    }
    
    // Кэш пуст - загружаем синхронно
    return this.refreshCache(key, loader, ttl);
  }
  
  private async refreshCache(
    key: string,
    loader: () => Promise<any>,
    ttl: number
  ): Promise<any> {
    const data = await loader();
    await this.cache.set(key, data, ttl);
    return data;
  }
}

// Использование
const refreshingCache = new RefreshingCache(cache, database);

async function getPopularProducts(): Promise<Product[]> {
  return refreshingCache.get(
    'products:popular',
    async () => {
      return database.query(
        'SELECT * FROM products ORDER BY sales DESC LIMIT 10'
      );
    },
    3600 // 1 час
  );
}

Распределенная блокировка

Redis-based Distributed Lock

class DistributedLock {
  constructor(private redis: Redis) {}
  
  async acquire(
    lockKey: string,
    ttl: number,
    retries: number = 3
  ): Promise<string | null> {
    const lockValue = generateId();
    
    for (let i = 0; i < retries; i++) {
      // Пытаемся установить блокировку
      const result = await this.redis.set(
        lockKey,
        lockValue,
        'NX', // Только если ключ не существует
        'EX', // С TTL
        ttl
      );
      
      if (result === 'OK') {
        return lockValue;
      }
      
      // Ждем перед следующей попыткой
      await sleep(100 * (i + 1));
    }
    
    return null;
  }
  
  async release(lockKey: string, lockValue: string): Promise<boolean> {
    // Lua скрипт для атомарного удаления
    const script = `
      if redis.call("get", KEYS[1]) == ARGV[1] then
        return redis.call("del", KEYS[1])
      else
        return 0
      end
    `;
    
    const result = await this.redis.eval(script, 1, lockKey, lockValue);
    return result === 1;
  }
  
  async withLock<T>(
    lockKey: string,
    ttl: number,
    fn: () => Promise<T>
  ): Promise<T> {
    const lockValue = await this.acquire(lockKey, ttl);
    
    if (!lockValue) {
      throw new Error('Failed to acquire lock');
    }
    
    try {
      return await fn();
    } finally {
      await this.release(lockKey, lockValue);
    }
  }
}

// Использование
const lock = new DistributedLock(redis);

async function updateInventory(productId: string, quantity: number): Promise<void> {
  await lock.withLock(
    `inventory:${productId}`,
    10, // 10 секунд
    async () => {
      const current = await getInventory(productId);
      const newQuantity = current + quantity;
      await setInventory(productId, newQuantity);
    }
  );
}

Мониторинг кэша

class CacheMonitoring {
  private metrics = {
    hits: 0,
    misses: 0,
    errors: 0,
    latency: [] as number[]
  };
  
  recordHit(): void {
    this.metrics.hits++;
  }
  
  recordMiss(): void {
    this.metrics.misses++;
  }
  
  recordError(): void {
    this.metrics.errors++;
  }
  
  recordLatency(ms: number): void {
    this.metrics.latency.push(ms);
    
    // Храним только последние 1000 измерений
    if (this.metrics.latency.length > 1000) {
      this.metrics.latency.shift();
    }
  }
  
  getMetrics(): CacheMetrics {
    const total = this.metrics.hits + this.metrics.misses;
    const hitRate = total > 0 ? this.metrics.hits / total : 0;
    
    const avgLatency = this.metrics.latency.length > 0
      ? this.metrics.latency.reduce((sum, l) => sum + l, 0) / this.metrics.latency.length
      : 0;
    
    const p95Latency = this.calculatePercentile(this.metrics.latency, 0.95);
    const p99Latency = this.calculatePercentile(this.metrics.latency, 0.99);
    
    return {
      hits: this.metrics.hits,
      misses: this.metrics.misses,
      errors: this.metrics.errors,
      hitRate,
      avgLatency,
      p95Latency,
      p99Latency
    };
  }
  
  private calculatePercentile(values: number[], percentile: number): number {
    if (values.length === 0) return 0;
    
    const sorted = [...values].sort((a, b) => a - b);
    const index = Math.ceil(sorted.length * percentile) - 1;
    return sorted[index];
  }
  
  reset(): void {
    this.metrics = {
      hits: 0,
      misses: 0,
      errors: 0,
      latency: []
    };
  }
}

interface CacheMetrics {
  hits: number;
  misses: number;
  errors: number;
  hitRate: number;
  avgLatency: number;
  p95Latency: number;
  p99Latency: number;
}

// Интеграция с кэшем
class MonitoredCache {
  private monitoring = new CacheMonitoring();
  
  constructor(private cache: DistributedCache) {
    // Периодически отправляем метрики
    setInterval(() => {
      const metrics = this.monitoring.getMetrics();
      console.log('Cache metrics:', metrics);
      
      // Отправляем в систему мониторинга
      sendToMonitoring(metrics);
      
      this.monitoring.reset();
    }, 60000); // Каждую минуту
  }
  
  async get(key: string): Promise<any> {
    const start = Date.now();
    
    try {
      const value = await this.cache.get(key);
      
      if (value) {
        this.monitoring.recordHit();
      } else {
        this.monitoring.recordMiss();
      }
      
      this.monitoring.recordLatency(Date.now() - start);
      
      return value;
    } catch (error) {
      this.monitoring.recordError();
      throw error;
    }
  }
  
  getMetrics(): CacheMetrics {
    return this.monitoring.getMetrics();
  }
}

Best Practices

1. Правильный выбор TTL

const CacheTTL = {
  STATIC_CONTENT: 86400,      // 24 часа
  USER_PROFILE: 3600,         // 1 час
  PRODUCT_CATALOG: 1800,      // 30 минут
  SHOPPING_CART: 900,         // 15 минут
  SESSION: 7200,              // 2 часа
  REAL_TIME_DATA: 60,         // 1 минута
  API_RESPONSE: 300           // 5 минут
};

2. Namespace для ключей

class CacheKeyBuilder {
  static user(userId: string): string {
    return `user:${userId}`;
  }
  
  static userOrders(userId: string): string {
    return `user:${userId}:orders`;
  }
  
  static product(productId: string): string {
    return `product:${productId}`;
  }
  
  static categoryProducts(categoryId: string, page: number): string {
    return `category:${categoryId}:products:page:${page}`;
  }
  
  static searchResults(query: string, filters: any): string {
    const filterHash = hashObject(filters);
    return `search:${query}:${filterHash}`;
  }
}

3. Graceful Degradation

class ResilientCache {
  constructor(
    private cache: DistributedCache,
    private database: Database
  ) {}
  
  async get(key: string): Promise<any> {
    try {
      const cached = await this.cache.get(key);
      if (cached) return cached;
    } catch (error) {
      console.error('Cache error, falling back to database:', error);
      // Продолжаем работу без кэша
    }
    
    // Всегда возвращаемся к БД при проблемах с кэшем
    const [table, id] = key.split(':');
    return this.database.query(
      `SELECT * FROM ${table} WHERE id = ?`,
      [id]
    );
  }
}

Заключение

Распределенное кэширование требует:

  1. Правильную стратегию — Cache-Aside, Write-Through, Write-Behind
  2. Защиту от stampede — блокировки или probabilistic expiration
  3. Эффективную инвалидацию — event-based или tags
  4. Мониторинг — hit rate, latency, errors
  5. Graceful degradation — работа при отказе кэша
  6. Правильные TTL — баланс между свежестью и нагрузкой