architecture
Репликация и кэширование
Стратегии репликации данных и кэширования для повышения производительности
•
#scaling
#replication
#caching
#databases
#redis
Репликация и кэширование
Репликация и кэширование — ключевые техники для масштабирования систем чтения и повышения отказоустойчивости.
Репликация баз данных
Master-Slave репликация
interface DatabaseConfig {
master: {
host: string;
port: number;
};
slaves: Array<{
host: string;
port: number;
}>;
}
class DatabaseRouter {
private slaveIndex = 0;
constructor(private config: DatabaseConfig) {}
// Запись всегда идет в master
async write(query: string, params: any[]): Promise<any> {
const connection = await this.connectToMaster();
return connection.query(query, params);
}
// Чтение распределяется по slave
async read(query: string, params: any[]): Promise<any> {
const connection = await this.connectToSlave();
return connection.query(query, params);
}
private async connectToMaster() {
return createConnection(this.config.master);
}
private async connectToSlave() {
const slave = this.config.slaves[this.slaveIndex];
this.slaveIndex = (this.slaveIndex + 1) % this.config.slaves.length;
return createConnection(slave);
}
}
Использование репликации
class UserService {
constructor(private db: DatabaseRouter) {}
// Запись в master
async createUser(userData: UserData): Promise<User> {
const result = await this.db.write(
'INSERT INTO users (name, email) VALUES (?, ?)',
[userData.name, userData.email]
);
return { id: result.insertId, ...userData };
}
// Чтение из slave
async getUser(userId: string): Promise<User> {
const result = await this.db.read(
'SELECT * FROM users WHERE id = ?',
[userId]
);
return result[0];
}
// Обновление в master
async updateUser(userId: string, updates: Partial<UserData>): Promise<void> {
await this.db.write(
'UPDATE users SET name = ?, email = ? WHERE id = ?',
[updates.name, updates.email, userId]
);
}
}
Проблема Replication Lag
class ReplicationAwareService {
constructor(
private db: DatabaseRouter,
private cache: Cache
) {}
async createOrder(orderData: OrderData): Promise<Order> {
// 1. Создаем заказ в master
const order = await this.db.write(
'INSERT INTO orders (user_id, total) VALUES (?, ?)',
[orderData.userId, orderData.total]
);
// 2. Кэшируем, чтобы избежать replication lag
await this.cache.set(
`order:${order.id}`,
order,
60 // TTL 60 секунд
);
return order;
}
async getOrder(orderId: string): Promise<Order> {
// 1. Проверяем кэш (свежие данные)
const cached = await this.cache.get(`order:${orderId}`);
if (cached) return cached;
// 2. Читаем из slave (может быть lag)
const order = await this.db.read(
'SELECT * FROM orders WHERE id = ?',
[orderId]
);
return order;
}
// Критичные операции читают из master
async getOrderForPayment(orderId: string): Promise<Order> {
return this.db.write( // используем master для чтения
'SELECT * FROM orders WHERE id = ? FOR UPDATE',
[orderId]
);
}
}
Стратегии кэширования
Cache-Aside (Lazy Loading)
class CacheAsidePattern {
constructor(
private cache: Redis,
private database: Database
) {}
async get(key: string): Promise<any> {
// 1. Проверяем кэш
const cached = await this.cache.get(key);
if (cached) {
return JSON.parse(cached);
}
// 2. Загружаем из БД
const data = await this.database.query(
'SELECT * FROM items WHERE id = ?',
[key]
);
// 3. Сохраняем в кэш
if (data) {
await this.cache.set(
key,
JSON.stringify(data),
'EX',
3600 // 1 час
);
}
return data;
}
async update(key: string, value: any): Promise<void> {
// 1. Обновляем БД
await this.database.query(
'UPDATE items SET data = ? WHERE id = ?',
[JSON.stringify(value), key]
);
// 2. Инвалидируем кэш
await this.cache.del(key);
}
}
Write-Through Cache
class WriteThroughCache {
constructor(
private cache: Redis,
private database: Database
) {}
async set(key: string, value: any): Promise<void> {
// 1. Пишем в БД (синхронно)
await this.database.query(
'INSERT INTO items (id, data) VALUES (?, ?) ON DUPLICATE KEY UPDATE data = ?',
[key, JSON.stringify(value), JSON.stringify(value)]
);
// 2. Обновляем кэш
await this.cache.set(
key,
JSON.stringify(value),
'EX',
3600
);
}
async get(key: string): Promise<any> {
// Читаем из кэша (всегда актуальный)
const cached = await this.cache.get(key);
if (cached) {
return JSON.parse(cached);
}
// Если нет в кэше - загружаем из БД
const data = await this.database.query(
'SELECT * FROM items WHERE id = ?',
[key]
);
if (data) {
await this.cache.set(key, JSON.stringify(data), 'EX', 3600);
}
return data;
}
}
Write-Behind (Write-Back) Cache
class WriteBehindCache {
private writeQueue: Map<string, any> = new Map();
private flushInterval: NodeJS.Timeout;
constructor(
private cache: Redis,
private database: Database,
private flushIntervalMs: number = 5000
) {
this.startFlushTimer();
}
async set(key: string, value: any): Promise<void> {
// 1. Сразу пишем в кэш
await this.cache.set(
key,
JSON.stringify(value),
'EX',
3600
);
// 2. Добавляем в очередь на запись в БД
this.writeQueue.set(key, value);
}
async get(key: string): Promise<any> {
const cached = await this.cache.get(key);
return cached ? JSON.parse(cached) : null;
}
private startFlushTimer(): void {
this.flushInterval = setInterval(
() => this.flushToDatabase(),
this.flushIntervalMs
);
}
private async flushToDatabase(): Promise<void> {
if (this.writeQueue.size === 0) return;
const batch = Array.from(this.writeQueue.entries());
this.writeQueue.clear();
try {
// Батчевая запись в БД
await this.database.transaction(async (tx) => {
for (const [key, value] of batch) {
await tx.query(
'INSERT INTO items (id, data) VALUES (?, ?) ON DUPLICATE KEY UPDATE data = ?',
[key, JSON.stringify(value), JSON.stringify(value)]
);
}
});
} catch (error) {
// При ошибке возвращаем в очередь
batch.forEach(([key, value]) => {
this.writeQueue.set(key, value);
});
throw error;
}
}
async shutdown(): Promise<void> {
clearInterval(this.flushInterval);
await this.flushToDatabase();
}
}
Многоуровневое кэширование
class MultiLevelCache {
constructor(
private l1Cache: Map<string, any>, // In-memory
private l2Cache: Redis, // Redis
private database: Database // Database
) {}
async get(key: string): Promise<any> {
// Level 1: In-memory cache
if (this.l1Cache.has(key)) {
return this.l1Cache.get(key);
}
// Level 2: Redis cache
const l2Data = await this.l2Cache.get(key);
if (l2Data) {
const parsed = JSON.parse(l2Data);
this.l1Cache.set(key, parsed);
return parsed;
}
// Level 3: Database
const dbData = await this.database.query(
'SELECT * FROM items WHERE id = ?',
[key]
);
if (dbData) {
// Заполняем оба уровня кэша
this.l1Cache.set(key, dbData);
await this.l2Cache.set(
key,
JSON.stringify(dbData),
'EX',
3600
);
}
return dbData;
}
async set(key: string, value: any): Promise<void> {
// Обновляем все уровни
this.l1Cache.set(key, value);
await this.l2Cache.set(key, JSON.stringify(value), 'EX', 3600);
await this.database.query(
'INSERT INTO items (id, data) VALUES (?, ?) ON DUPLICATE KEY UPDATE data = ?',
[key, JSON.stringify(value), JSON.stringify(value)]
);
}
async invalidate(key: string): Promise<void> {
this.l1Cache.delete(key);
await this.l2Cache.del(key);
}
}
Cache Stampede Protection
class StampedeProtectedCache {
private locks = new Map<string, Promise<any>>();
constructor(
private cache: Redis,
private database: Database
) {}
async get(key: string): Promise<any> {
// Проверяем кэш
const cached = await this.cache.get(key);
if (cached) {
return JSON.parse(cached);
}
// Проверяем, не загружает ли уже кто-то эти данные
if (this.locks.has(key)) {
return this.locks.get(key);
}
// Создаем промис для загрузки
const loadPromise = this.loadAndCache(key);
this.locks.set(key, loadPromise);
try {
return await loadPromise;
} finally {
this.locks.delete(key);
}
}
private async loadAndCache(key: string): Promise<any> {
const data = await this.database.query(
'SELECT * FROM items WHERE id = ?',
[key]
);
if (data) {
await this.cache.set(
key,
JSON.stringify(data),
'EX',
3600
);
}
return data;
}
}
Стратегии инвалидации кэша
Time-based (TTL)
class TTLCache {
async set(key: string, value: any, ttlSeconds: number): Promise<void> {
await redis.set(
key,
JSON.stringify(value),
'EX',
ttlSeconds
);
}
}
Event-based
class EventBasedCache {
constructor(
private cache: Redis,
private eventBus: EventEmitter
) {
this.setupInvalidationListeners();
}
private setupInvalidationListeners(): void {
this.eventBus.on('user:updated', async (userId: string) => {
await this.cache.del(`user:${userId}`);
await this.cache.del(`user:${userId}:orders`);
});
this.eventBus.on('order:created', async (order: Order) => {
await this.cache.del(`user:${order.userId}:orders`);
});
}
}
Cache Tags
class TaggedCache {
constructor(private cache: Redis) {}
async set(key: string, value: any, tags: string[]): Promise<void> {
// Сохраняем данные
await this.cache.set(key, JSON.stringify(value), 'EX', 3600);
// Связываем с тегами
for (const tag of tags) {
await this.cache.sadd(`tag:${tag}`, key);
}
}
async invalidateByTag(tag: string): Promise<void> {
// Получаем все ключи с этим тегом
const keys = await this.cache.smembers(`tag:${tag}`);
if (keys.length > 0) {
// Удаляем все ключи
await this.cache.del(...keys);
// Удаляем сам тег
await this.cache.del(`tag:${tag}`);
}
}
}
// Использование
const cache = new TaggedCache(redis);
await cache.set(
'user:123',
userData,
['user', 'user:123', 'premium-users']
);
// Инвалидируем все данные премиум пользователей
await cache.invalidateByTag('premium-users');
Распределенный кэш
Redis Cluster
class RedisClusterCache {
private cluster: RedisCluster;
constructor(nodes: Array<{ host: string; port: number }>) {
this.cluster = new RedisCluster(nodes, {
redisOptions: {
password: process.env.REDIS_PASSWORD
}
});
}
async get(key: string): Promise<any> {
const data = await this.cluster.get(key);
return data ? JSON.parse(data) : null;
}
async set(key: string, value: any, ttl: number): Promise<void> {
await this.cluster.set(
key,
JSON.stringify(value),
'EX',
ttl
);
}
// Операции с хэш-тегами для multi-key операций
async mget(keys: string[]): Promise<any[]> {
// Группируем ключи по слотам
const pipeline = this.cluster.pipeline();
keys.forEach(key => pipeline.get(key));
const results = await pipeline.exec();
return results.map(([err, data]) =>
data ? JSON.parse(data) : null
);
}
}
Мониторинг кэша
class CacheMonitor {
private hits = 0;
private misses = 0;
private errors = 0;
recordHit(): void {
this.hits++;
}
recordMiss(): void {
this.misses++;
}
recordError(): void {
this.errors++;
}
getMetrics() {
const total = this.hits + this.misses;
return {
hits: this.hits,
misses: this.misses,
errors: this.errors,
hitRate: total > 0 ? this.hits / total : 0,
total
};
}
reset(): void {
this.hits = 0;
this.misses = 0;
this.errors = 0;
}
}
class MonitoredCache {
private monitor = new CacheMonitor();
constructor(
private cache: Redis,
private database: Database
) {
// Периодически отправляем метрики
setInterval(() => {
const metrics = this.monitor.getMetrics();
console.log('Cache metrics:', metrics);
this.monitor.reset();
}, 60000);
}
async get(key: string): Promise<any> {
try {
const cached = await this.cache.get(key);
if (cached) {
this.monitor.recordHit();
return JSON.parse(cached);
}
this.monitor.recordMiss();
const data = await this.database.query(
'SELECT * FROM items WHERE id = ?',
[key]
);
if (data) {
await this.cache.set(key, JSON.stringify(data), 'EX', 3600);
}
return data;
} catch (error) {
this.monitor.recordError();
throw error;
}
}
}
Best Practices
1. Выбор TTL
const CacheTTL = {
STATIC_CONTENT: 86400, // 24 часа
USER_PROFILE: 3600, // 1 час
PRODUCT_LIST: 300, // 5 минут
SHOPPING_CART: 1800, // 30 минут
SESSION: 7200, // 2 часа
REAL_TIME_DATA: 10 // 10 секунд
};
2. Размер кэшируемых данных
class SmartCache {
private readonly MAX_CACHE_SIZE = 1024 * 1024; // 1MB
async set(key: string, value: any): Promise<void> {
const serialized = JSON.stringify(value);
if (serialized.length > this.MAX_CACHE_SIZE) {
console.warn(`Value too large to cache: ${key}`);
return;
}
await redis.set(key, serialized, 'EX', 3600);
}
}
3. Graceful Degradation
class ResilientCache {
async get(key: string): Promise<any> {
try {
const cached = await redis.get(key);
if (cached) return JSON.parse(cached);
} catch (error) {
console.error('Cache error, falling back to DB:', error);
}
// Всегда возвращаемся к БД при проблемах с кэшем
return this.database.query('SELECT * FROM items WHERE id = ?', [key]);
}
}
Заключение
Репликация и кэширование — мощные инструменты масштабирования:
- Репликация увеличивает пропускную способность чтения
- Кэширование снижает нагрузку на БД и улучшает latency
- Важно правильно выбрать стратегию кэширования
- Необходим мониторинг эффективности кэша
- Система должна работать при отказе кэша