architecture

Репликация и кэширование

Стратегии репликации данных и кэширования для повышения производительности

#scaling #replication #caching #databases #redis

Репликация и кэширование

Репликация и кэширование — ключевые техники для масштабирования систем чтения и повышения отказоустойчивости.

Репликация баз данных

Master-Slave репликация

interface DatabaseConfig {
  master: {
    host: string;
    port: number;
  };
  slaves: Array<{
    host: string;
    port: number;
  }>;
}

class DatabaseRouter {
  private slaveIndex = 0;
  
  constructor(private config: DatabaseConfig) {}
  
  // Запись всегда идет в master
  async write(query: string, params: any[]): Promise<any> {
    const connection = await this.connectToMaster();
    return connection.query(query, params);
  }
  
  // Чтение распределяется по slave
  async read(query: string, params: any[]): Promise<any> {
    const connection = await this.connectToSlave();
    return connection.query(query, params);
  }
  
  private async connectToMaster() {
    return createConnection(this.config.master);
  }
  
  private async connectToSlave() {
    const slave = this.config.slaves[this.slaveIndex];
    this.slaveIndex = (this.slaveIndex + 1) % this.config.slaves.length;
    return createConnection(slave);
  }
}

Использование репликации

class UserService {
  constructor(private db: DatabaseRouter) {}
  
  // Запись в master
  async createUser(userData: UserData): Promise<User> {
    const result = await this.db.write(
      'INSERT INTO users (name, email) VALUES (?, ?)',
      [userData.name, userData.email]
    );
    
    return { id: result.insertId, ...userData };
  }
  
  // Чтение из slave
  async getUser(userId: string): Promise<User> {
    const result = await this.db.read(
      'SELECT * FROM users WHERE id = ?',
      [userId]
    );
    
    return result[0];
  }
  
  // Обновление в master
  async updateUser(userId: string, updates: Partial<UserData>): Promise<void> {
    await this.db.write(
      'UPDATE users SET name = ?, email = ? WHERE id = ?',
      [updates.name, updates.email, userId]
    );
  }
}

Проблема Replication Lag

class ReplicationAwareService {
  constructor(
    private db: DatabaseRouter,
    private cache: Cache
  ) {}
  
  async createOrder(orderData: OrderData): Promise<Order> {
    // 1. Создаем заказ в master
    const order = await this.db.write(
      'INSERT INTO orders (user_id, total) VALUES (?, ?)',
      [orderData.userId, orderData.total]
    );
    
    // 2. Кэшируем, чтобы избежать replication lag
    await this.cache.set(
      `order:${order.id}`,
      order,
      60 // TTL 60 секунд
    );
    
    return order;
  }
  
  async getOrder(orderId: string): Promise<Order> {
    // 1. Проверяем кэш (свежие данные)
    const cached = await this.cache.get(`order:${orderId}`);
    if (cached) return cached;
    
    // 2. Читаем из slave (может быть lag)
    const order = await this.db.read(
      'SELECT * FROM orders WHERE id = ?',
      [orderId]
    );
    
    return order;
  }
  
  // Критичные операции читают из master
  async getOrderForPayment(orderId: string): Promise<Order> {
    return this.db.write( // используем master для чтения
      'SELECT * FROM orders WHERE id = ? FOR UPDATE',
      [orderId]
    );
  }
}

Стратегии кэширования

Cache-Aside (Lazy Loading)

class CacheAsidePattern {
  constructor(
    private cache: Redis,
    private database: Database
  ) {}
  
  async get(key: string): Promise<any> {
    // 1. Проверяем кэш
    const cached = await this.cache.get(key);
    if (cached) {
      return JSON.parse(cached);
    }
    
    // 2. Загружаем из БД
    const data = await this.database.query(
      'SELECT * FROM items WHERE id = ?',
      [key]
    );
    
    // 3. Сохраняем в кэш
    if (data) {
      await this.cache.set(
        key,
        JSON.stringify(data),
        'EX',
        3600 // 1 час
      );
    }
    
    return data;
  }
  
  async update(key: string, value: any): Promise<void> {
    // 1. Обновляем БД
    await this.database.query(
      'UPDATE items SET data = ? WHERE id = ?',
      [JSON.stringify(value), key]
    );
    
    // 2. Инвалидируем кэш
    await this.cache.del(key);
  }
}

Write-Through Cache

class WriteThroughCache {
  constructor(
    private cache: Redis,
    private database: Database
  ) {}
  
  async set(key: string, value: any): Promise<void> {
    // 1. Пишем в БД (синхронно)
    await this.database.query(
      'INSERT INTO items (id, data) VALUES (?, ?) ON DUPLICATE KEY UPDATE data = ?',
      [key, JSON.stringify(value), JSON.stringify(value)]
    );
    
    // 2. Обновляем кэш
    await this.cache.set(
      key,
      JSON.stringify(value),
      'EX',
      3600
    );
  }
  
  async get(key: string): Promise<any> {
    // Читаем из кэша (всегда актуальный)
    const cached = await this.cache.get(key);
    if (cached) {
      return JSON.parse(cached);
    }
    
    // Если нет в кэше - загружаем из БД
    const data = await this.database.query(
      'SELECT * FROM items WHERE id = ?',
      [key]
    );
    
    if (data) {
      await this.cache.set(key, JSON.stringify(data), 'EX', 3600);
    }
    
    return data;
  }
}

Write-Behind (Write-Back) Cache

class WriteBehindCache {
  private writeQueue: Map<string, any> = new Map();
  private flushInterval: NodeJS.Timeout;
  
  constructor(
    private cache: Redis,
    private database: Database,
    private flushIntervalMs: number = 5000
  ) {
    this.startFlushTimer();
  }
  
  async set(key: string, value: any): Promise<void> {
    // 1. Сразу пишем в кэш
    await this.cache.set(
      key,
      JSON.stringify(value),
      'EX',
      3600
    );
    
    // 2. Добавляем в очередь на запись в БД
    this.writeQueue.set(key, value);
  }
  
  async get(key: string): Promise<any> {
    const cached = await this.cache.get(key);
    return cached ? JSON.parse(cached) : null;
  }
  
  private startFlushTimer(): void {
    this.flushInterval = setInterval(
      () => this.flushToDatabase(),
      this.flushIntervalMs
    );
  }
  
  private async flushToDatabase(): Promise<void> {
    if (this.writeQueue.size === 0) return;
    
    const batch = Array.from(this.writeQueue.entries());
    this.writeQueue.clear();
    
    try {
      // Батчевая запись в БД
      await this.database.transaction(async (tx) => {
        for (const [key, value] of batch) {
          await tx.query(
            'INSERT INTO items (id, data) VALUES (?, ?) ON DUPLICATE KEY UPDATE data = ?',
            [key, JSON.stringify(value), JSON.stringify(value)]
          );
        }
      });
    } catch (error) {
      // При ошибке возвращаем в очередь
      batch.forEach(([key, value]) => {
        this.writeQueue.set(key, value);
      });
      throw error;
    }
  }
  
  async shutdown(): Promise<void> {
    clearInterval(this.flushInterval);
    await this.flushToDatabase();
  }
}

Многоуровневое кэширование

class MultiLevelCache {
  constructor(
    private l1Cache: Map<string, any>, // In-memory
    private l2Cache: Redis,              // Redis
    private database: Database           // Database
  ) {}
  
  async get(key: string): Promise<any> {
    // Level 1: In-memory cache
    if (this.l1Cache.has(key)) {
      return this.l1Cache.get(key);
    }
    
    // Level 2: Redis cache
    const l2Data = await this.l2Cache.get(key);
    if (l2Data) {
      const parsed = JSON.parse(l2Data);
      this.l1Cache.set(key, parsed);
      return parsed;
    }
    
    // Level 3: Database
    const dbData = await this.database.query(
      'SELECT * FROM items WHERE id = ?',
      [key]
    );
    
    if (dbData) {
      // Заполняем оба уровня кэша
      this.l1Cache.set(key, dbData);
      await this.l2Cache.set(
        key,
        JSON.stringify(dbData),
        'EX',
        3600
      );
    }
    
    return dbData;
  }
  
  async set(key: string, value: any): Promise<void> {
    // Обновляем все уровни
    this.l1Cache.set(key, value);
    await this.l2Cache.set(key, JSON.stringify(value), 'EX', 3600);
    await this.database.query(
      'INSERT INTO items (id, data) VALUES (?, ?) ON DUPLICATE KEY UPDATE data = ?',
      [key, JSON.stringify(value), JSON.stringify(value)]
    );
  }
  
  async invalidate(key: string): Promise<void> {
    this.l1Cache.delete(key);
    await this.l2Cache.del(key);
  }
}

Cache Stampede Protection

class StampedeProtectedCache {
  private locks = new Map<string, Promise<any>>();
  
  constructor(
    private cache: Redis,
    private database: Database
  ) {}
  
  async get(key: string): Promise<any> {
    // Проверяем кэш
    const cached = await this.cache.get(key);
    if (cached) {
      return JSON.parse(cached);
    }
    
    // Проверяем, не загружает ли уже кто-то эти данные
    if (this.locks.has(key)) {
      return this.locks.get(key);
    }
    
    // Создаем промис для загрузки
    const loadPromise = this.loadAndCache(key);
    this.locks.set(key, loadPromise);
    
    try {
      return await loadPromise;
    } finally {
      this.locks.delete(key);
    }
  }
  
  private async loadAndCache(key: string): Promise<any> {
    const data = await this.database.query(
      'SELECT * FROM items WHERE id = ?',
      [key]
    );
    
    if (data) {
      await this.cache.set(
        key,
        JSON.stringify(data),
        'EX',
        3600
      );
    }
    
    return data;
  }
}

Стратегии инвалидации кэша

Time-based (TTL)

class TTLCache {
  async set(key: string, value: any, ttlSeconds: number): Promise<void> {
    await redis.set(
      key,
      JSON.stringify(value),
      'EX',
      ttlSeconds
    );
  }
}

Event-based

class EventBasedCache {
  constructor(
    private cache: Redis,
    private eventBus: EventEmitter
  ) {
    this.setupInvalidationListeners();
  }
  
  private setupInvalidationListeners(): void {
    this.eventBus.on('user:updated', async (userId: string) => {
      await this.cache.del(`user:${userId}`);
      await this.cache.del(`user:${userId}:orders`);
    });
    
    this.eventBus.on('order:created', async (order: Order) => {
      await this.cache.del(`user:${order.userId}:orders`);
    });
  }
}

Cache Tags

class TaggedCache {
  constructor(private cache: Redis) {}
  
  async set(key: string, value: any, tags: string[]): Promise<void> {
    // Сохраняем данные
    await this.cache.set(key, JSON.stringify(value), 'EX', 3600);
    
    // Связываем с тегами
    for (const tag of tags) {
      await this.cache.sadd(`tag:${tag}`, key);
    }
  }
  
  async invalidateByTag(tag: string): Promise<void> {
    // Получаем все ключи с этим тегом
    const keys = await this.cache.smembers(`tag:${tag}`);
    
    if (keys.length > 0) {
      // Удаляем все ключи
      await this.cache.del(...keys);
      
      // Удаляем сам тег
      await this.cache.del(`tag:${tag}`);
    }
  }
}

// Использование
const cache = new TaggedCache(redis);

await cache.set(
  'user:123',
  userData,
  ['user', 'user:123', 'premium-users']
);

// Инвалидируем все данные премиум пользователей
await cache.invalidateByTag('premium-users');

Распределенный кэш

Redis Cluster

class RedisClusterCache {
  private cluster: RedisCluster;
  
  constructor(nodes: Array<{ host: string; port: number }>) {
    this.cluster = new RedisCluster(nodes, {
      redisOptions: {
        password: process.env.REDIS_PASSWORD
      }
    });
  }
  
  async get(key: string): Promise<any> {
    const data = await this.cluster.get(key);
    return data ? JSON.parse(data) : null;
  }
  
  async set(key: string, value: any, ttl: number): Promise<void> {
    await this.cluster.set(
      key,
      JSON.stringify(value),
      'EX',
      ttl
    );
  }
  
  // Операции с хэш-тегами для multi-key операций
  async mget(keys: string[]): Promise<any[]> {
    // Группируем ключи по слотам
    const pipeline = this.cluster.pipeline();
    keys.forEach(key => pipeline.get(key));
    
    const results = await pipeline.exec();
    return results.map(([err, data]) => 
      data ? JSON.parse(data) : null
    );
  }
}

Мониторинг кэша

class CacheMonitor {
  private hits = 0;
  private misses = 0;
  private errors = 0;
  
  recordHit(): void {
    this.hits++;
  }
  
  recordMiss(): void {
    this.misses++;
  }
  
  recordError(): void {
    this.errors++;
  }
  
  getMetrics() {
    const total = this.hits + this.misses;
    return {
      hits: this.hits,
      misses: this.misses,
      errors: this.errors,
      hitRate: total > 0 ? this.hits / total : 0,
      total
    };
  }
  
  reset(): void {
    this.hits = 0;
    this.misses = 0;
    this.errors = 0;
  }
}

class MonitoredCache {
  private monitor = new CacheMonitor();
  
  constructor(
    private cache: Redis,
    private database: Database
  ) {
    // Периодически отправляем метрики
    setInterval(() => {
      const metrics = this.monitor.getMetrics();
      console.log('Cache metrics:', metrics);
      this.monitor.reset();
    }, 60000);
  }
  
  async get(key: string): Promise<any> {
    try {
      const cached = await this.cache.get(key);
      
      if (cached) {
        this.monitor.recordHit();
        return JSON.parse(cached);
      }
      
      this.monitor.recordMiss();
      
      const data = await this.database.query(
        'SELECT * FROM items WHERE id = ?',
        [key]
      );
      
      if (data) {
        await this.cache.set(key, JSON.stringify(data), 'EX', 3600);
      }
      
      return data;
    } catch (error) {
      this.monitor.recordError();
      throw error;
    }
  }
}

Best Practices

1. Выбор TTL

const CacheTTL = {
  STATIC_CONTENT: 86400,      // 24 часа
  USER_PROFILE: 3600,         // 1 час
  PRODUCT_LIST: 300,          // 5 минут
  SHOPPING_CART: 1800,        // 30 минут
  SESSION: 7200,              // 2 часа
  REAL_TIME_DATA: 10          // 10 секунд
};

2. Размер кэшируемых данных

class SmartCache {
  private readonly MAX_CACHE_SIZE = 1024 * 1024; // 1MB
  
  async set(key: string, value: any): Promise<void> {
    const serialized = JSON.stringify(value);
    
    if (serialized.length > this.MAX_CACHE_SIZE) {
      console.warn(`Value too large to cache: ${key}`);
      return;
    }
    
    await redis.set(key, serialized, 'EX', 3600);
  }
}

3. Graceful Degradation

class ResilientCache {
  async get(key: string): Promise<any> {
    try {
      const cached = await redis.get(key);
      if (cached) return JSON.parse(cached);
    } catch (error) {
      console.error('Cache error, falling back to DB:', error);
    }
    
    // Всегда возвращаемся к БД при проблемах с кэшем
    return this.database.query('SELECT * FROM items WHERE id = ?', [key]);
  }
}

Заключение

Репликация и кэширование — мощные инструменты масштабирования:

  • Репликация увеличивает пропускную способность чтения
  • Кэширование снижает нагрузку на БД и улучшает latency
  • Важно правильно выбрать стратегию кэширования
  • Необходим мониторинг эффективности кэша
  • Система должна работать при отказе кэша