cloud

Шарды и реплики в георасределённой среде

Организация данных в географически распределенных системах

#distributed-systems #geo-distribution #sharding #replication

Шарды и реплики в георасределённой среде

Георасределенные системы требуют особого подхода к организации данных для обеспечения низкой задержки и высокой доступности.

Проблемы георасределения

Задержка сети

// Типичные задержки между регионами
const LATENCY_MS = {
  sameDatacenter: 1,
  sameRegion: 5,
  crossRegion: 50,
  crossContinent: 150,
  crossOcean: 300
};

class LatencyAwareRouter {
  async route(userId: string, userRegion: string): Promise<string> {
    // Выбираем ближайший регион
    const region = this.selectNearestRegion(userRegion);
    return `https://${region}.api.example.com`;
  }
  
  private selectNearestRegion(userRegion: string): string {
    const regionMapping = {
      'us-east': 'us-east-1',
      'us-west': 'us-west-1',
      'eu': 'eu-west-1',
      'asia': 'ap-southeast-1'
    };
    
    return regionMapping[userRegion] || 'us-east-1';
  }
}

CAP теорема в контексте георасределения

interface GeoDistributedConfig {
  consistency: 'strong' | 'eventual' | 'causal';
  availability: 'high' | 'medium';
  partitionTolerance: boolean;
}

// Сильная консистентность - высокая задержка
const strongConsistency: GeoDistributedConfig = {
  consistency: 'strong',
  availability: 'medium',
  partitionTolerance: true
};

// Eventual consistency - низкая задержка
const eventualConsistency: GeoDistributedConfig = {
  consistency: 'eventual',
  availability: 'high',
  partitionTolerance: true
};

Стратегии георасределения

1. Geographic Sharding

Данные пользователей хранятся в их регионе.

enum Region {
  US_EAST = 'us-east-1',
  US_WEST = 'us-west-1',
  EU_WEST = 'eu-west-1',
  AP_SOUTHEAST = 'ap-southeast-1'
}

interface GeoShard {
  region: Region;
  database: DatabaseConnection;
  replicas: DatabaseConnection[];
}

class GeographicSharding {
  private shards: Map<Region, GeoShard> = new Map();
  
  constructor(shards: GeoShard[]) {
    shards.forEach(shard => {
      this.shards.set(shard.region, shard);
    });
  }
  
  async write(userId: string, data: any): Promise<void> {
    const userRegion = await this.getUserRegion(userId);
    const shard = this.shards.get(userRegion);
    
    if (!shard) {
      throw new Error(`No shard for region ${userRegion}`);
    }
    
    // Пишем в master региона
    await shard.database.query(
      'INSERT INTO users (id, data, region) VALUES (?, ?, ?)',
      [userId, JSON.stringify(data), userRegion]
    );
    
    // Асинхронно реплицируем в другие регионы
    this.replicateToOtherRegions(userId, data, userRegion);
  }
  
  async read(userId: string): Promise<any> {
    const userRegion = await this.getUserRegion(userId);
    const shard = this.shards.get(userRegion);
    
    if (!shard) {
      throw new Error(`No shard for region ${userRegion}`);
    }
    
    // Читаем из локальной реплики
    const result = await shard.database.query(
      'SELECT data FROM users WHERE id = ?',
      [userId]
    );
    
    return result.length > 0 ? JSON.parse(result[0].data) : null;
  }
  
  private async replicateToOtherRegions(
    userId: string,
    data: any,
    sourceRegion: Region
  ): Promise<void> {
    const otherShards = Array.from(this.shards.values())
      .filter(shard => shard.region !== sourceRegion);
    
    const promises = otherShards.map(shard =>
      shard.database.query(
        'INSERT INTO users (id, data, region) VALUES (?, ?, ?) ON DUPLICATE KEY UPDATE data = ?',
        [userId, JSON.stringify(data), sourceRegion, JSON.stringify(data)]
      ).catch(error => {
        console.error(`Replication to ${shard.region} failed:`, error);
      })
    );
    
    // Не ждем завершения - асинхронная репликация
    Promise.all(promises);
  }
  
  private async getUserRegion(userId: string): Promise<Region> {
    // Можно хранить в кэше или определять по IP
    const cached = await redis.get(`user:${userId}:region`);
    return cached as Region || Region.US_EAST;
  }
}

2. Multi-Region Replication

Данные реплицируются во все регионы.

class MultiRegionReplication {
  private regions: Map<Region, DatabaseConnection> = new Map();
  
  constructor(connections: Array<{ region: Region; connection: DatabaseConnection }>) {
    connections.forEach(({ region, connection }) => {
      this.regions.set(region, connection);
    });
  }
  
  async write(key: string, value: any, primaryRegion: Region): Promise<void> {
    const timestamp = Date.now();
    
    // Синхронная запись в primary регион
    const primary = this.regions.get(primaryRegion);
    if (!primary) {
      throw new Error(`Primary region ${primaryRegion} not found`);
    }
    
    await primary.query(
      'INSERT INTO data (key, value, timestamp, region) VALUES (?, ?, ?, ?)',
      [key, JSON.stringify(value), timestamp, primaryRegion]
    );
    
    // Асинхронная репликация в другие регионы
    this.replicateAsync(key, value, timestamp, primaryRegion);
  }
  
  async read(key: string, userRegion: Region): Promise<any> {
    // Читаем из ближайшего региона
    const connection = this.regions.get(userRegion) || 
                       this.regions.values().next().value;
    
    const result = await connection.query(
      'SELECT value, timestamp FROM data WHERE key = ?',
      [key]
    );
    
    return result.length > 0 ? JSON.parse(result[0].value) : null;
  }
  
  private async replicateAsync(
    key: string,
    value: any,
    timestamp: number,
    sourceRegion: Region
  ): Promise<void> {
    const otherRegions = Array.from(this.regions.entries())
      .filter(([region]) => region !== sourceRegion);
    
    for (const [region, connection] of otherRegions) {
      // Используем очередь для надежной репликации
      await messageQueue.publish('replication', {
        targetRegion: region,
        key,
        value,
        timestamp,
        sourceRegion
      });
    }
  }
}

3. Active-Active Multi-Master

Запись возможна в любом регионе.

class ActiveActiveReplication {
  private regions: Map<Region, DatabaseConnection> = new Map();
  
  async write(
    key: string,
    value: any,
    region: Region,
    vectorClock: VectorClock
  ): Promise<void> {
    const connection = this.regions.get(region);
    if (!connection) {
      throw new Error(`Region ${region} not found`);
    }
    
    // Инкрементируем vector clock
    vectorClock.increment(region);
    
    // Пишем локально
    await connection.query(
      'INSERT INTO data (key, value, vector_clock) VALUES (?, ?, ?) ON DUPLICATE KEY UPDATE value = ?, vector_clock = ?',
      [
        key,
        JSON.stringify(value),
        JSON.stringify(vectorClock.toJSON()),
        JSON.stringify(value),
        JSON.stringify(vectorClock.toJSON())
      ]
    );
    
    // Реплицируем в другие регионы
    await this.replicateToAll(key, value, vectorClock, region);
  }
  
  async read(key: string, region: Region): Promise<any> {
    const connection = this.regions.get(region);
    if (!connection) {
      throw new Error(`Region ${region} not found`);
    }
    
    const result = await connection.query(
      'SELECT value, vector_clock FROM data WHERE key = ?',
      [key]
    );
    
    if (result.length === 0) return null;
    
    return {
      value: JSON.parse(result[0].value),
      vectorClock: VectorClock.fromJSON(JSON.parse(result[0].vector_clock))
    };
  }
  
  async resolveConflicts(key: string): Promise<any> {
    // Читаем из всех регионов
    const promises = Array.from(this.regions.entries()).map(
      async ([region, connection]) => {
        const result = await connection.query(
          'SELECT value, vector_clock FROM data WHERE key = ?',
          [key]
        );
        
        if (result.length === 0) return null;
        
        return {
          region,
          value: JSON.parse(result[0].value),
          vectorClock: VectorClock.fromJSON(JSON.parse(result[0].vector_clock))
        };
      }
    );
    
    const results = (await Promise.all(promises)).filter(r => r !== null);
    
    if (results.length === 0) return null;
    if (results.length === 1) return results[0].value;
    
    // Находим конкурирующие версии
    const concurrent = this.findConcurrentVersions(results);
    
    if (concurrent.length === 1) {
      return concurrent[0].value;
    }
    
    // Разрешаем конфликт на уровне приложения
    return this.mergeValues(concurrent.map(c => c.value));
  }
  
  private findConcurrentVersions(
    results: Array<{ region: Region; value: any; vectorClock: VectorClock }>
  ): Array<{ region: Region; value: any; vectorClock: VectorClock }> {
    const concurrent: typeof results = [];
    
    for (const result of results) {
      const isDominated = results.some(other => {
        if (other === result) return false;
        return result.vectorClock.compare(other.vectorClock) === 'before';
      });
      
      if (!isDominated) {
        concurrent.push(result);
      }
    }
    
    return concurrent;
  }
  
  private mergeValues(values: any[]): any {
    // Application-specific merge logic
    // Например, для shopping cart объединяем items
    if (values[0]?.items) {
      const allItems = new Map();
      
      for (const value of values) {
        for (const item of value.items) {
          allItems.set(item.id, item);
        }
      }
      
      return { items: Array.from(allItems.values()) };
    }
    
    // Fallback: Last-Write-Wins
    return values[0];
  }
  
  private async replicateToAll(
    key: string,
    value: any,
    vectorClock: VectorClock,
    sourceRegion: Region
  ): Promise<void> {
    const otherRegions = Array.from(this.regions.entries())
      .filter(([region]) => region !== sourceRegion);
    
    const promises = otherRegions.map(([region, connection]) =>
      connection.query(
        'INSERT INTO data (key, value, vector_clock) VALUES (?, ?, ?) ON DUPLICATE KEY UPDATE value = ?, vector_clock = ?',
        [
          key,
          JSON.stringify(value),
          JSON.stringify(vectorClock.toJSON()),
          JSON.stringify(value),
          JSON.stringify(vectorClock.toJSON())
        ]
      ).catch(error => {
        console.error(`Replication to ${region} failed:`, error);
      })
    );
    
    await Promise.allSettled(promises);
  }
}

class VectorClock {
  private clock: Map<string, number>;
  
  constructor(nodeId: string) {
    this.clock = new Map([[nodeId, 0]]);
  }
  
  increment(nodeId: string): void {
    const current = this.clock.get(nodeId) || 0;
    this.clock.set(nodeId, current + 1);
  }
  
  compare(other: VectorClock): 'before' | 'after' | 'concurrent' {
    let hasLess = false;
    let hasGreater = false;
    
    const allNodes = new Set([
      ...this.clock.keys(),
      ...other.clock.keys()
    ]);
    
    for (const nodeId of allNodes) {
      const thisTime = this.clock.get(nodeId) || 0;
      const otherTime = other.clock.get(nodeId) || 0;
      
      if (thisTime < otherTime) hasLess = true;
      if (thisTime > otherTime) hasGreater = true;
    }
    
    if (hasLess && !hasGreater) return 'before';
    if (hasGreater && !hasLess) return 'after';
    return 'concurrent';
  }
  
  toJSON(): Record<string, number> {
    return Object.fromEntries(this.clock);
  }
  
  static fromJSON(data: Record<string, number>): VectorClock {
    const clock = new VectorClock('');
    clock.clock = new Map(Object.entries(data));
    return clock;
  }
}

Паттерны данных

1. Home Region Pattern

class HomeRegionPattern {
  async createUser(userData: UserData, region: Region): Promise<User> {
    const user: User = {
      id: generateId(),
      ...userData,
      homeRegion: region,
      createdAt: new Date()
    };
    
    // Пишем в home region
    await this.writeToRegion(region, user);
    
    // Создаем read replicas в других регионах
    await this.createReadReplicas(user);
    
    return user;
  }
  
  async updateUser(userId: string, updates: Partial<UserData>): Promise<void> {
    // Находим home region пользователя
    const homeRegion = await this.getUserHomeRegion(userId);
    
    // Обновляем в home region
    await this.updateInRegion(homeRegion, userId, updates);
    
    // Реплицируем изменения
    await this.replicateUpdates(userId, updates, homeRegion);
  }
  
  async getUser(userId: string, requestRegion: Region): Promise<User> {
    // Читаем из ближайшей реплики
    const user = await this.readFromRegion(requestRegion, userId);
    
    if (!user) {
      // Fallback: читаем из home region
      const homeRegion = await this.getUserHomeRegion(userId);
      return this.readFromRegion(homeRegion, userId);
    }
    
    return user;
  }
  
  private async writeToRegion(region: Region, user: User): Promise<void> {
    const connection = this.getConnection(region);
    await connection.query(
      'INSERT INTO users (id, data, home_region) VALUES (?, ?, ?)',
      [user.id, JSON.stringify(user), region]
    );
  }
  
  private async createReadReplicas(user: User): Promise<void> {
    const otherRegions = this.getAllRegions()
      .filter(r => r !== user.homeRegion);
    
    for (const region of otherRegions) {
      await messageQueue.publish('create-replica', {
        region,
        user
      });
    }
  }
  
  private async getUserHomeRegion(userId: string): Promise<Region> {
    const cached = await redis.get(`user:${userId}:home-region`);
    if (cached) return cached as Region;
    
    // Ищем во всех регионах
    for (const region of this.getAllRegions()) {
      const connection = this.getConnection(region);
      const result = await connection.query(
        'SELECT home_region FROM users WHERE id = ?',
        [userId]
      );
      
      if (result.length > 0) {
        const homeRegion = result[0].home_region;
        await redis.set(`user:${userId}:home-region`, homeRegion, 'EX', 3600);
        return homeRegion;
      }
    }
    
    throw new Error(`User ${userId} not found`);
  }
  
  private getConnection(region: Region): DatabaseConnection {
    // Возвращает соединение для региона
    return connections.get(region)!;
  }
  
  private getAllRegions(): Region[] {
    return [
      Region.US_EAST,
      Region.US_WEST,
      Region.EU_WEST,
      Region.AP_SOUTHEAST
    ];
  }
  
  private async readFromRegion(region: Region, userId: string): Promise<User | null> {
    const connection = this.getConnection(region);
    const result = await connection.query(
      'SELECT data FROM users WHERE id = ?',
      [userId]
    );
    
    return result.length > 0 ? JSON.parse(result[0].data) : null;
  }
  
  private async updateInRegion(
    region: Region,
    userId: string,
    updates: Partial<UserData>
  ): Promise<void> {
    const connection = this.getConnection(region);
    const user = await this.readFromRegion(region, userId);
    
    if (!user) {
      throw new Error(`User ${userId} not found in region ${region}`);
    }
    
    Object.assign(user, updates);
    
    await connection.query(
      'UPDATE users SET data = ? WHERE id = ?',
      [JSON.stringify(user), userId]
    );
  }
  
  private async replicateUpdates(
    userId: string,
    updates: Partial<UserData>,
    sourceRegion: Region
  ): Promise<void> {
    const otherRegions = this.getAllRegions()
      .filter(r => r !== sourceRegion);
    
    for (const region of otherRegions) {
      await messageQueue.publish('replicate-update', {
        region,
        userId,
        updates
      });
    }
  }
}

interface UserData {
  name: string;
  email: string;
}

interface User extends UserData {
  id: string;
  homeRegion: Region;
  createdAt: Date;
}

2. Global Tables Pattern

class GlobalTablesPattern {
  private regions: Map<Region, DatabaseConnection> = new Map();
  
  async write(
    tableName: string,
    key: string,
    value: any,
    sourceRegion: Region
  ): Promise<void> {
    const timestamp = Date.now();
    const version = generateVersion();
    
    // Пишем в source region
    await this.writeToRegion(
      sourceRegion,
      tableName,
      key,
      value,
      timestamp,
      version
    );
    
    // DynamoDB Global Tables автоматически реплицирует
    // Здесь эмулируем это поведение
    await this.replicateGlobally(
      tableName,
      key,
      value,
      timestamp,
      version,
      sourceRegion
    );
  }
  
  async read(
    tableName: string,
    key: string,
    region: Region
  ): Promise<any> {
    const connection = this.regions.get(region);
    if (!connection) {
      throw new Error(`Region ${region} not found`);
    }
    
    const result = await connection.query(
      `SELECT value, timestamp, version FROM ${tableName} WHERE key = ?`,
      [key]
    );
    
    return result.length > 0 ? JSON.parse(result[0].value) : null;
  }
  
  private async writeToRegion(
    region: Region,
    tableName: string,
    key: string,
    value: any,
    timestamp: number,
    version: string
  ): Promise<void> {
    const connection = this.regions.get(region);
    if (!connection) {
      throw new Error(`Region ${region} not found`);
    }
    
    await connection.query(
      `INSERT INTO ${tableName} (key, value, timestamp, version) 
       VALUES (?, ?, ?, ?)
       ON DUPLICATE KEY UPDATE 
         value = IF(timestamp < VALUES(timestamp), VALUES(value), value),
         timestamp = IF(timestamp < VALUES(timestamp), VALUES(timestamp), timestamp),
         version = IF(timestamp < VALUES(timestamp), VALUES(version), version)`,
      [key, JSON.stringify(value), timestamp, version]
    );
  }
  
  private async replicateGlobally(
    tableName: string,
    key: string,
    value: any,
    timestamp: number,
    version: string,
    sourceRegion: Region
  ): Promise<void> {
    const otherRegions = Array.from(this.regions.keys())
      .filter(r => r !== sourceRegion);
    
    const promises = otherRegions.map(region =>
      this.writeToRegion(region, tableName, key, value, timestamp, version)
        .catch(error => {
          console.error(`Replication to ${region} failed:`, error);
          // Добавляем в очередь для повторной попытки
          return this.queueReplication(region, tableName, key, value, timestamp, version);
        })
    );
    
    await Promise.allSettled(promises);
  }
  
  private async queueReplication(
    region: Region,
    tableName: string,
    key: string,
    value: any,
    timestamp: number,
    version: string
  ): Promise<void> {
    await messageQueue.publish('replication-retry', {
      region,
      tableName,
      key,
      value,
      timestamp,
      version
    });
  }
}

function generateVersion(): string {
  return `${Date.now()}-${Math.random().toString(36).substr(2, 9)}`;
}

Мониторинг георасределенных систем

class GeoDistributedMonitoring {
  async collectMetrics(): Promise<GeoMetrics> {
    const regions = [
      Region.US_EAST,
      Region.US_WEST,
      Region.EU_WEST,
      Region.AP_SOUTHEAST
    ];
    
    const promises = regions.map(async (region) => {
      const connection = this.getConnection(region);
      
      const [replicationLag, dataSize, queryLatency] = await Promise.all([
        this.getReplicationLag(connection),
        this.getDataSize(connection),
        this.getQueryLatency(connection)
      ]);
      
      return {
        region,
        replicationLag,
        dataSize,
        queryLatency,
        timestamp: new Date()
      };
    });
    
    const regionMetrics = await Promise.all(promises);
    
    return {
      regions: regionMetrics,
      crossRegionLatency: await this.measureCrossRegionLatency()
    };
  }
  
  private async getReplicationLag(connection: DatabaseConnection): Promise<number> {
    const result = await connection.query(
      'SELECT MAX(UNIX_TIMESTAMP() - timestamp) as lag FROM replication_status'
    );
    return result[0]?.lag || 0;
  }
  
  private async getDataSize(connection: DatabaseConnection): Promise<number> {
    const result = await connection.query(
      'SELECT SUM(data_length + index_length) as size FROM information_schema.tables'
    );
    return result[0]?.size || 0;
  }
  
  private async getQueryLatency(connection: DatabaseConnection): Promise<number> {
    const start = Date.now();
    await connection.query('SELECT 1');
    return Date.now() - start;
  }
  
  private async measureCrossRegionLatency(): Promise<Record<string, Record<string, number>>> {
    const regions = [Region.US_EAST, Region.US_WEST, Region.EU_WEST, Region.AP_SOUTHEAST];
    const latencies: Record<string, Record<string, number>> = {};
    
    for (const from of regions) {
      latencies[from] = {};
      
      for (const to of regions) {
        if (from === to) {
          latencies[from][to] = 0;
          continue;
        }
        
        const start = Date.now();
        try {
          await this.pingRegion(to);
          latencies[from][to] = Date.now() - start;
        } catch (error) {
          latencies[from][to] = -1; // Недоступен
        }
      }
    }
    
    return latencies;
  }
  
  private async pingRegion(region: Region): Promise<void> {
    const connection = this.getConnection(region);
    await connection.query('SELECT 1');
  }
  
  private getConnection(region: Region): DatabaseConnection {
    return connections.get(region)!;
  }
}

interface GeoMetrics {
  regions: Array<{
    region: Region;
    replicationLag: number;
    dataSize: number;
    queryLatency: number;
    timestamp: Date;
  }>;
  crossRegionLatency: Record<string, Record<string, number>>;
}

Best Practices

1. Data Locality

// ✅ Хранить данные близко к пользователям
class DataLocalityOptimizer {
  async optimizeUserPlacement(userId: string): Promise<Region> {
    // Анализируем, откуда пользователь чаще всего обращается
    const accessLog = await this.getUserAccessLog(userId);
    const mostFrequentRegion = this.getMostFrequentRegion(accessLog);
    
    // Мигрируем home region если нужно
    const currentHomeRegion = await this.getUserHomeRegion(userId);
    
    if (currentHomeRegion !== mostFrequentRegion) {
      await this.migrateUserHomeRegion(userId, mostFrequentRegion);
    }
    
    return mostFrequentRegion;
  }
  
  private async getUserAccessLog(userId: string): Promise<AccessLog[]> {
    return analytics.query(`
      SELECT region, COUNT(*) as count
      FROM access_logs
      WHERE user_id = ? AND timestamp > DATE_SUB(NOW(), INTERVAL 30 DAY)
      GROUP BY region
    `, [userId]);
  }
  
  private getMostFrequentRegion(accessLog: AccessLog[]): Region {
    return accessLog.reduce((max, current) =>
      current.count > max.count ? current : max
    ).region;
  }
  
  private async getUserHomeRegion(userId: string): Promise<Region> {
    // Реализация
    return Region.US_EAST;
  }
  
  private async migrateUserHomeRegion(userId: string, newRegion: Region): Promise<void> {
    // Реализация миграции
  }
}

interface AccessLog {
  region: Region;
  count: number;
}

2. Conflict Resolution

// Определите стратегию разрешения конфликтов
class ConflictResolver {
  resolve(conflicts: Conflict[]): any {
    // Last-Write-Wins
    if (this.isLWWApplicable(conflicts)) {
      return this.lastWriteWins(conflicts);
    }
    
    // Application-level merge
    if (this.isMergeable(conflicts)) {
      return this.merge(conflicts);
    }
    
    // Manual resolution required
    throw new ConflictError('Manual resolution required', conflicts);
  }
  
  private lastWriteWins(conflicts: Conflict[]): any {
    return conflicts.reduce((latest, current) =>
      current.timestamp > latest.timestamp ? current : latest
    ).value;
  }
  
  private merge(conflicts: Conflict[]): any {
    // Пример для shopping cart
    const allItems = new Map();
    
    for (const conflict of conflicts) {
      for (const item of conflict.value.items) {
        allItems.set(item.id, item);
      }
    }
    
    return { items: Array.from(allItems.values()) };
  }
  
  private isLWWApplicable(conflicts: Conflict[]): boolean {
    // LWW подходит для immutable данных
    return true;
  }
  
  private isMergeable(conflicts: Conflict[]): boolean {
    // Проверяем, можно ли объединить
    return conflicts.every(c => c.value?.items);
  }
}

interface Conflict {
  value: any;
  timestamp: number;
  region: Region;
}

class ConflictError extends Error {
  constructor(message: string, public conflicts: Conflict[]) {
    super(message);
  }
}

Заключение

Георасределенные системы требуют:

  1. Правильный выбор стратегии — geographic sharding vs multi-region replication
  2. Управление консистентностью — strong vs eventual consistency
  3. Data locality — данные близко к пользователям
  4. Conflict resolution — стратегия разрешения конфликтов
  5. Мониторинг — replication lag, cross-region latency
  6. Failover — автоматическое переключение при отказах