architecture

Архитектура без ведущего узла

Построение отказоустойчивых систем без единой точки отказа

#scaling #architecture #distributed-systems #high-availability

Архитектура без ведущего узла

Leaderless архитектура — подход к построению распределенных систем, где нет единого ведущего узла, что повышает отказоустойчивость.

Проблема Master-Slave

Единая точка отказа

// Традиционная Master-Slave архитектура
class MasterSlaveDatabase {
  private master: DatabaseConnection;
  private slaves: DatabaseConnection[];
  
  async write(query: string): Promise<void> {
    // Все записи идут в master
    // Если master падает - система не может писать
    await this.master.query(query);
  }
  
  async read(query: string): Promise<any> {
    // Читаем из slave
    const slave = this.selectSlave();
    return slave.query(query);
  }
}

Проблемы:

  • Master — единая точка отказа для записи
  • Failover занимает время
  • Split-brain при проблемах с сетью
  • Ограничение пропускной способности записи

Leaderless Replication

Quorum Reads and Writes

interface QuorumConfig {
  nodes: Node[];
  replicationFactor: number; // N - количество реплик
  writeQuorum: number;       // W - минимум для записи
  readQuorum: number;        // R - минимум для чтения
}

class LeaderlessDatabase {
  constructor(private config: QuorumConfig) {
    // Правило: W + R > N гарантирует консистентность
    this.validateQuorums();
  }
  
  private validateQuorums(): void {
    const { replicationFactor, writeQuorum, readQuorum } = this.config;
    
    if (writeQuorum + readQuorum <= replicationFactor) {
      throw new Error('W + R must be > N for strong consistency');
    }
  }
  
  async write(key: string, value: any): Promise<void> {
    const nodes = this.selectNodes(key, this.config.replicationFactor);
    
    // Пишем параллельно на все узлы
    const promises = nodes.map(node => 
      this.writeToNode(node, key, value)
    );
    
    // Ждем подтверждения от W узлов
    const results = await Promise.allSettled(promises);
    const successful = results.filter(r => r.status === 'fulfilled').length;
    
    if (successful < this.config.writeQuorum) {
      throw new Error(`Write failed: only ${successful}/${this.config.writeQuorum} nodes confirmed`);
    }
  }
  
  async read(key: string): Promise<any> {
    const nodes = this.selectNodes(key, this.config.replicationFactor);
    
    // Читаем параллельно с нескольких узлов
    const promises = nodes.map(node => 
      this.readFromNode(node, key)
    );
    
    const results = await Promise.allSettled(promises);
    const successful = results
      .filter((r): r is PromiseFulfilledResult<any> => r.status === 'fulfilled')
      .map(r => r.value);
    
    if (successful.length < this.config.readQuorum) {
      throw new Error(`Read failed: only ${successful.length}/${this.config.readQuorum} nodes responded`);
    }
    
    // Выбираем самую свежую версию
    return this.resolveConflicts(successful);
  }
  
  private selectNodes(key: string, count: number): Node[] {
    // Consistent hashing для выбора узлов
    const hash = this.hash(key);
    return this.config.nodes
      .sort((a, b) => this.hash(a.id) - this.hash(b.id))
      .filter((_, i) => i < count);
  }
  
  private async writeToNode(node: Node, key: string, value: any): Promise<void> {
    const timestamp = Date.now();
    await node.connection.query(
      'INSERT INTO data (key, value, timestamp) VALUES (?, ?, ?) ON DUPLICATE KEY UPDATE value = ?, timestamp = ?',
      [key, JSON.stringify(value), timestamp, JSON.stringify(value), timestamp]
    );
  }
  
  private async readFromNode(node: Node, key: string): Promise<VersionedValue> {
    const result = await node.connection.query(
      'SELECT value, timestamp FROM data WHERE key = ?',
      [key]
    );
    
    if (result.length === 0) {
      return null;
    }
    
    return {
      value: JSON.parse(result[0].value),
      timestamp: result[0].timestamp
    };
  }
  
  private resolveConflicts(values: VersionedValue[]): any {
    // Last-Write-Wins (LWW)
    return values.reduce((latest, current) => 
      current.timestamp > latest.timestamp ? current : latest
    ).value;
  }
  
  private hash(key: string): number {
    let hash = 0;
    for (let i = 0; i < key.length; i++) {
      hash = Math.imul(hash ^ key.charCodeAt(i), 2654435761);
    }
    return Math.abs(hash);
  }
}

interface Node {
  id: string;
  connection: DatabaseConnection;
}

interface VersionedValue {
  value: any;
  timestamp: number;
}

Примеры конфигураций Quorum

// Сильная консистентность
const strongConsistency: QuorumConfig = {
  nodes: nodes,
  replicationFactor: 3,  // N = 3
  writeQuorum: 2,        // W = 2
  readQuorum: 2          // R = 2
  // W + R = 4 > N = 3 ✓
};

// Оптимизация для чтения
const readOptimized: QuorumConfig = {
  nodes: nodes,
  replicationFactor: 3,  // N = 3
  writeQuorum: 3,        // W = 3 (все узлы)
  readQuorum: 1          // R = 1 (быстрое чтение)
  // W + R = 4 > N = 3 ✓
};

// Оптимизация для записи
const writeOptimized: QuorumConfig = {
  nodes: nodes,
  replicationFactor: 3,  // N = 3
  writeQuorum: 1,        // W = 1 (быстрая запись)
  readQuorum: 3          // R = 3 (читаем все)
  // W + R = 4 > N = 3 ✓
};

// Eventual consistency
const eventualConsistency: QuorumConfig = {
  nodes: nodes,
  replicationFactor: 3,  // N = 3
  writeQuorum: 1,        // W = 1
  readQuorum: 1          // R = 1
  // W + R = 2 < N = 3 - может быть несогласованность
};

Hinted Handoff

Механизм для обработки временных отказов узлов.

class HintedHandoff {
  private hints = new Map<string, Hint[]>();
  
  async write(key: string, value: any, targetNodes: Node[]): Promise<void> {
    const timestamp = Date.now();
    const promises = targetNodes.map(async (node) => {
      try {
        await this.writeToNode(node, key, value, timestamp);
      } catch (error) {
        // Узел недоступен - сохраняем hint
        await this.storeHint(node.id, key, value, timestamp);
        
        // Пишем на другой узел временно
        const fallbackNode = this.selectFallbackNode(node);
        await this.writeToNode(fallbackNode, key, value, timestamp);
      }
    });
    
    await Promise.all(promises);
  }
  
  private async storeHint(
    nodeId: string,
    key: string,
    value: any,
    timestamp: number
  ): Promise<void> {
    const hints = this.hints.get(nodeId) || [];
    hints.push({ key, value, timestamp });
    this.hints.set(nodeId, hints);
  }
  
  async replayHints(nodeId: string): Promise<void> {
    const hints = this.hints.get(nodeId) || [];
    
    for (const hint of hints) {
      try {
        const node = this.getNode(nodeId);
        await this.writeToNode(node, hint.key, hint.value, hint.timestamp);
      } catch (error) {
        console.error(`Failed to replay hint for node ${nodeId}:`, error);
        // Оставляем hint для следующей попытки
        continue;
      }
    }
    
    // Очищаем успешно воспроизведенные hints
    this.hints.delete(nodeId);
  }
  
  private selectFallbackNode(failedNode: Node): Node {
    // Выбираем следующий узел в кольце consistent hashing
    return this.nodes.find(n => n.id !== failedNode.id)!;
  }
  
  private async writeToNode(
    node: Node,
    key: string,
    value: any,
    timestamp: number
  ): Promise<void> {
    await node.connection.query(
      'INSERT INTO data (key, value, timestamp) VALUES (?, ?, ?)',
      [key, JSON.stringify(value), timestamp]
    );
  }
  
  private getNode(nodeId: string): Node {
    return this.nodes.find(n => n.id === nodeId)!;
  }
}

interface Hint {
  key: string;
  value: any;
  timestamp: number;
}

Read Repair

Исправление несогласованности при чтении.

class ReadRepair {
  async read(key: string, nodes: Node[]): Promise<any> {
    // Читаем со всех узлов
    const promises = nodes.map(node => 
      this.readFromNode(node, key)
    );
    
    const results = await Promise.allSettled(promises);
    const successful = results
      .filter((r): r is PromiseFulfilledResult<VersionedValue> => 
        r.status === 'fulfilled' && r.value !== null
      )
      .map(r => r.value);
    
    if (successful.length === 0) {
      return null;
    }
    
    // Находим самую свежую версию
    const latest = successful.reduce((max, current) => 
      current.timestamp > max.timestamp ? current : max
    );
    
    // Исправляем устаревшие узлы
    await this.repairStaleNodes(key, latest, successful, nodes);
    
    return latest.value;
  }
  
  private async repairStaleNodes(
    key: string,
    latest: VersionedValue,
    allValues: VersionedValue[],
    nodes: Node[]
  ): Promise<void> {
    const staleNodes = nodes.filter((node, i) => {
      const value = allValues[i];
      return value && value.timestamp < latest.timestamp;
    });
    
    // Асинхронно обновляем устаревшие узлы
    const repairs = staleNodes.map(node => 
      this.writeToNode(node, key, latest.value, latest.timestamp)
        .catch(error => {
          console.error(`Read repair failed for node ${node.id}:`, error);
        })
    );
    
    // Не ждем завершения - это фоновая операция
    Promise.all(repairs);
  }
  
  private async readFromNode(node: Node, key: string): Promise<VersionedValue | null> {
    try {
      const result = await node.connection.query(
        'SELECT value, timestamp FROM data WHERE key = ?',
        [key]
      );
      
      if (result.length === 0) return null;
      
      return {
        value: JSON.parse(result[0].value),
        timestamp: result[0].timestamp
      };
    } catch (error) {
      console.error(`Read failed from node ${node.id}:`, error);
      return null;
    }
  }
  
  private async writeToNode(
    node: Node,
    key: string,
    value: any,
    timestamp: number
  ): Promise<void> {
    await node.connection.query(
      'INSERT INTO data (key, value, timestamp) VALUES (?, ?, ?) ON DUPLICATE KEY UPDATE value = ?, timestamp = ?',
      [key, JSON.stringify(value), timestamp, JSON.stringify(value), timestamp]
    );
  }
}

Anti-Entropy (Merkle Trees)

Периодическая синхронизация данных между узлами.

class MerkleTree {
  private root: MerkleNode;
  
  constructor(data: Array<{ key: string; value: any }>) {
    this.root = this.buildTree(data);
  }
  
  private buildTree(data: Array<{ key: string; value: any }>): MerkleNode {
    if (data.length === 0) {
      return { hash: this.hash(''), children: [] };
    }
    
    if (data.length === 1) {
      const item = data[0];
      return {
        hash: this.hash(JSON.stringify(item)),
        key: item.key,
        value: item.value,
        children: []
      };
    }
    
    const mid = Math.floor(data.length / 2);
    const left = this.buildTree(data.slice(0, mid));
    const right = this.buildTree(data.slice(mid));
    
    return {
      hash: this.hash(left.hash + right.hash),
      children: [left, right]
    };
  }
  
  getRootHash(): string {
    return this.root.hash;
  }
  
  findDifferences(other: MerkleTree): string[] {
    const differences: string[] = [];
    this.compareTrees(this.root, other.root, differences);
    return differences;
  }
  
  private compareTrees(
    node1: MerkleNode,
    node2: MerkleNode,
    differences: string[]
  ): void {
    if (node1.hash === node2.hash) {
      return; // Поддеревья идентичны
    }
    
    if (node1.key && node2.key) {
      // Листовые узлы различаются
      differences.push(node1.key);
      return;
    }
    
    // Рекурсивно сравниваем дочерние узлы
    if (node1.children.length > 0 && node2.children.length > 0) {
      this.compareTrees(node1.children[0], node2.children[0], differences);
      this.compareTrees(node1.children[1], node2.children[1], differences);
    }
  }
  
  private hash(data: string): string {
    // Простой хэш для примера
    let hash = 0;
    for (let i = 0; i < data.length; i++) {
      hash = Math.imul(hash ^ data.charCodeAt(i), 2654435761);
    }
    return Math.abs(hash).toString(16);
  }
}

interface MerkleNode {
  hash: string;
  key?: string;
  value?: any;
  children: MerkleNode[];
}

class AntiEntropy {
  async synchronize(node1: Node, node2: Node): Promise<void> {
    // Получаем данные с обоих узлов
    const data1 = await this.getAllData(node1);
    const data2 = await this.getAllData(node2);
    
    // Строим Merkle деревья
    const tree1 = new MerkleTree(data1);
    const tree2 = new MerkleTree(data2);
    
    // Если корневые хэши совпадают - данные идентичны
    if (tree1.getRootHash() === tree2.getRootHash()) {
      return;
    }
    
    // Находим различия
    const differences = tree1.findDifferences(tree2);
    
    // Синхронизируем различающиеся ключи
    for (const key of differences) {
      await this.synchronizeKey(key, node1, node2);
    }
  }
  
  private async synchronizeKey(
    key: string,
    node1: Node,
    node2: Node
  ): Promise<void> {
    const [value1, value2] = await Promise.all([
      this.readFromNode(node1, key),
      this.readFromNode(node2, key)
    ]);
    
    if (!value1 && value2) {
      // Ключ есть только на node2
      await this.writeToNode(node1, key, value2.value, value2.timestamp);
    } else if (value1 && !value2) {
      // Ключ есть только на node1
      await this.writeToNode(node2, key, value1.value, value1.timestamp);
    } else if (value1 && value2) {
      // Ключ есть на обоих - выбираем более свежий
      if (value1.timestamp > value2.timestamp) {
        await this.writeToNode(node2, key, value1.value, value1.timestamp);
      } else if (value2.timestamp > value1.timestamp) {
        await this.writeToNode(node1, key, value2.value, value2.timestamp);
      }
    }
  }
  
  private async getAllData(node: Node): Promise<Array<{ key: string; value: any }>> {
    const result = await node.connection.query('SELECT key, value FROM data');
    return result.map((row: any) => ({
      key: row.key,
      value: JSON.parse(row.value)
    }));
  }
  
  private async readFromNode(node: Node, key: string): Promise<VersionedValue | null> {
    const result = await node.connection.query(
      'SELECT value, timestamp FROM data WHERE key = ?',
      [key]
    );
    
    if (result.length === 0) return null;
    
    return {
      value: JSON.parse(result[0].value),
      timestamp: result[0].timestamp
    };
  }
  
  private async writeToNode(
    node: Node,
    key: string,
    value: any,
    timestamp: number
  ): Promise<void> {
    await node.connection.query(
      'INSERT INTO data (key, value, timestamp) VALUES (?, ?, ?) ON DUPLICATE KEY UPDATE value = ?, timestamp = ?',
      [key, JSON.stringify(value), timestamp, JSON.stringify(value), timestamp]
    );
  }
}

Vector Clocks

Отслеживание причинно-следственных связей между событиями.

class VectorClock {
  private clock: Map<string, number>;
  
  constructor(nodeId: string) {
    this.clock = new Map([[nodeId, 0]]);
  }
  
  increment(nodeId: string): void {
    const current = this.clock.get(nodeId) || 0;
    this.clock.set(nodeId, current + 1);
  }
  
  merge(other: VectorClock): void {
    for (const [nodeId, timestamp] of other.clock.entries()) {
      const current = this.clock.get(nodeId) || 0;
      this.clock.set(nodeId, Math.max(current, timestamp));
    }
  }
  
  compare(other: VectorClock): 'before' | 'after' | 'concurrent' {
    let hasLess = false;
    let hasGreater = false;
    
    const allNodes = new Set([
      ...this.clock.keys(),
      ...other.clock.keys()
    ]);
    
    for (const nodeId of allNodes) {
      const thisTime = this.clock.get(nodeId) || 0;
      const otherTime = other.clock.get(nodeId) || 0;
      
      if (thisTime < otherTime) hasLess = true;
      if (thisTime > otherTime) hasGreater = true;
    }
    
    if (hasLess && !hasGreater) return 'before';
    if (hasGreater && !hasLess) return 'after';
    return 'concurrent';
  }
  
  toJSON(): Record<string, number> {
    return Object.fromEntries(this.clock);
  }
  
  static fromJSON(data: Record<string, number>): VectorClock {
    const clock = new VectorClock('');
    clock.clock = new Map(Object.entries(data));
    return clock;
  }
}

class VectorClockDatabase {
  constructor(private nodeId: string) {}
  
  async write(key: string, value: any): Promise<void> {
    const clock = new VectorClock(this.nodeId);
    clock.increment(this.nodeId);
    
    await db.query(
      'INSERT INTO data (key, value, vector_clock) VALUES (?, ?, ?) ON DUPLICATE KEY UPDATE value = ?, vector_clock = ?',
      [key, JSON.stringify(value), JSON.stringify(clock.toJSON()), JSON.stringify(value), JSON.stringify(clock.toJSON())]
    );
  }
  
  async read(key: string): Promise<any> {
    const results = await this.readFromAllReplicas(key);
    
    if (results.length === 0) return null;
    if (results.length === 1) return results[0].value;
    
    // Разрешаем конфликты
    return this.resolveConflicts(results);
  }
  
  private resolveConflicts(results: VersionedValue[]): any {
    // Находим все несовместимые версии
    const concurrent: VersionedValue[] = [];
    
    for (const result of results) {
      const isConcurrent = concurrent.every(c => {
        const comparison = result.clock.compare(c.clock);
        return comparison === 'concurrent';
      });
      
      if (isConcurrent) {
        concurrent.push(result);
      }
    }
    
    if (concurrent.length === 1) {
      return concurrent[0].value;
    }
    
    // Несколько конкурирующих версий - нужно разрешение конфликта
    // Можно использовать application-level merge или LWW
    return this.applicationLevelMerge(concurrent);
  }
  
  private applicationLevelMerge(versions: VersionedValue[]): any {
    // Пример: объединение изменений в shopping cart
    const merged = { items: [] };
    
    for (const version of versions) {
      for (const item of version.value.items) {
        if (!merged.items.find((i: any) => i.id === item.id)) {
          merged.items.push(item);
        }
      }
    }
    
    return merged;
  }
  
  private async readFromAllReplicas(key: string): Promise<VersionedValue[]> {
    // Читаем со всех реплик
    return [];
  }
}

Практический пример: Distributed Cache

class DistributedLeaderlessCache {
  private nodes: Node[];
  private replicationFactor = 3;
  private writeQuorum = 2;
  private readQuorum = 2;
  
  constructor(nodes: Node[]) {
    this.nodes = nodes;
  }
  
  async set(key: string, value: any, ttl: number): Promise<void> {
    const targetNodes = this.selectNodes(key, this.replicationFactor);
    const timestamp = Date.now();
    
    const promises = targetNodes.map(node =>
      this.writeToNode(node, key, value, timestamp, ttl)
    );
    
    const results = await Promise.allSettled(promises);
    const successful = results.filter(r => r.status === 'fulfilled').length;
    
    if (successful < this.writeQuorum) {
      throw new Error('Write quorum not met');
    }
  }
  
  async get(key: string): Promise<any> {
    const targetNodes = this.selectNodes(key, this.replicationFactor);
    
    const promises = targetNodes.map(node =>
      this.readFromNode(node, key)
    );
    
    const results = await Promise.allSettled(promises);
    const successful = results
      .filter((r): r is PromiseFulfilledResult<VersionedValue> => 
        r.status === 'fulfilled' && r.value !== null
      )
      .map(r => r.value);
    
    if (successful.length < this.readQuorum) {
      throw new Error('Read quorum not met');
    }
    
    // Возвращаем самую свежую версию
    const latest = successful.reduce((max, current) =>
      current.timestamp > max.timestamp ? current : max
    );
    
    return latest.value;
  }
  
  private selectNodes(key: string, count: number): Node[] {
    const hash = this.hash(key);
    const startIndex = hash % this.nodes.length;
    
    const selected: Node[] = [];
    for (let i = 0; i < count; i++) {
      const index = (startIndex + i) % this.nodes.length;
      selected.push(this.nodes[index]);
    }
    
    return selected;
  }
  
  private async writeToNode(
    node: Node,
    key: string,
    value: any,
    timestamp: number,
    ttl: number
  ): Promise<void> {
    await node.redis.set(
      key,
      JSON.stringify({ value, timestamp }),
      'EX',
      ttl
    );
  }
  
  private async readFromNode(node: Node, key: string): Promise<VersionedValue | null> {
    const data = await node.redis.get(key);
    if (!data) return null;
    
    const parsed = JSON.parse(data);
    return {
      value: parsed.value,
      timestamp: parsed.timestamp
    };
  }
  
  private hash(key: string): number {
    let hash = 0;
    for (let i = 0; i < key.length; i++) {
      hash = Math.imul(hash ^ key.charCodeAt(i), 2654435761);
    }
    return Math.abs(hash);
  }
}

Заключение

Leaderless архитектура обеспечивает:

Преимущества:

  • Нет единой точки отказа
  • Высокая доступность
  • Масштабируемость записи
  • Устойчивость к сетевым разделениям

Недостатки:

  • Eventual consistency
  • Сложность разрешения конфликтов
  • Больше сетевого трафика
  • Сложность реализации

Когда использовать:

  • Высокие требования к доступности
  • Географически распределенные системы
  • Толерантность к eventual consistency
  • Необходимость масштабирования записи