architecture
Архитектура без ведущего узла
Построение отказоустойчивых систем без единой точки отказа
•
#scaling
#architecture
#distributed-systems
#high-availability
Архитектура без ведущего узла
Leaderless архитектура — подход к построению распределенных систем, где нет единого ведущего узла, что повышает отказоустойчивость.
Проблема Master-Slave
Единая точка отказа
// Традиционная Master-Slave архитектура
class MasterSlaveDatabase {
private master: DatabaseConnection;
private slaves: DatabaseConnection[];
async write(query: string): Promise<void> {
// Все записи идут в master
// Если master падает - система не может писать
await this.master.query(query);
}
async read(query: string): Promise<any> {
// Читаем из slave
const slave = this.selectSlave();
return slave.query(query);
}
}
Проблемы:
- Master — единая точка отказа для записи
- Failover занимает время
- Split-brain при проблемах с сетью
- Ограничение пропускной способности записи
Leaderless Replication
Quorum Reads and Writes
interface QuorumConfig {
nodes: Node[];
replicationFactor: number; // N - количество реплик
writeQuorum: number; // W - минимум для записи
readQuorum: number; // R - минимум для чтения
}
class LeaderlessDatabase {
constructor(private config: QuorumConfig) {
// Правило: W + R > N гарантирует консистентность
this.validateQuorums();
}
private validateQuorums(): void {
const { replicationFactor, writeQuorum, readQuorum } = this.config;
if (writeQuorum + readQuorum <= replicationFactor) {
throw new Error('W + R must be > N for strong consistency');
}
}
async write(key: string, value: any): Promise<void> {
const nodes = this.selectNodes(key, this.config.replicationFactor);
// Пишем параллельно на все узлы
const promises = nodes.map(node =>
this.writeToNode(node, key, value)
);
// Ждем подтверждения от W узлов
const results = await Promise.allSettled(promises);
const successful = results.filter(r => r.status === 'fulfilled').length;
if (successful < this.config.writeQuorum) {
throw new Error(`Write failed: only ${successful}/${this.config.writeQuorum} nodes confirmed`);
}
}
async read(key: string): Promise<any> {
const nodes = this.selectNodes(key, this.config.replicationFactor);
// Читаем параллельно с нескольких узлов
const promises = nodes.map(node =>
this.readFromNode(node, key)
);
const results = await Promise.allSettled(promises);
const successful = results
.filter((r): r is PromiseFulfilledResult<any> => r.status === 'fulfilled')
.map(r => r.value);
if (successful.length < this.config.readQuorum) {
throw new Error(`Read failed: only ${successful.length}/${this.config.readQuorum} nodes responded`);
}
// Выбираем самую свежую версию
return this.resolveConflicts(successful);
}
private selectNodes(key: string, count: number): Node[] {
// Consistent hashing для выбора узлов
const hash = this.hash(key);
return this.config.nodes
.sort((a, b) => this.hash(a.id) - this.hash(b.id))
.filter((_, i) => i < count);
}
private async writeToNode(node: Node, key: string, value: any): Promise<void> {
const timestamp = Date.now();
await node.connection.query(
'INSERT INTO data (key, value, timestamp) VALUES (?, ?, ?) ON DUPLICATE KEY UPDATE value = ?, timestamp = ?',
[key, JSON.stringify(value), timestamp, JSON.stringify(value), timestamp]
);
}
private async readFromNode(node: Node, key: string): Promise<VersionedValue> {
const result = await node.connection.query(
'SELECT value, timestamp FROM data WHERE key = ?',
[key]
);
if (result.length === 0) {
return null;
}
return {
value: JSON.parse(result[0].value),
timestamp: result[0].timestamp
};
}
private resolveConflicts(values: VersionedValue[]): any {
// Last-Write-Wins (LWW)
return values.reduce((latest, current) =>
current.timestamp > latest.timestamp ? current : latest
).value;
}
private hash(key: string): number {
let hash = 0;
for (let i = 0; i < key.length; i++) {
hash = Math.imul(hash ^ key.charCodeAt(i), 2654435761);
}
return Math.abs(hash);
}
}
interface Node {
id: string;
connection: DatabaseConnection;
}
interface VersionedValue {
value: any;
timestamp: number;
}
Примеры конфигураций Quorum
// Сильная консистентность
const strongConsistency: QuorumConfig = {
nodes: nodes,
replicationFactor: 3, // N = 3
writeQuorum: 2, // W = 2
readQuorum: 2 // R = 2
// W + R = 4 > N = 3 ✓
};
// Оптимизация для чтения
const readOptimized: QuorumConfig = {
nodes: nodes,
replicationFactor: 3, // N = 3
writeQuorum: 3, // W = 3 (все узлы)
readQuorum: 1 // R = 1 (быстрое чтение)
// W + R = 4 > N = 3 ✓
};
// Оптимизация для записи
const writeOptimized: QuorumConfig = {
nodes: nodes,
replicationFactor: 3, // N = 3
writeQuorum: 1, // W = 1 (быстрая запись)
readQuorum: 3 // R = 3 (читаем все)
// W + R = 4 > N = 3 ✓
};
// Eventual consistency
const eventualConsistency: QuorumConfig = {
nodes: nodes,
replicationFactor: 3, // N = 3
writeQuorum: 1, // W = 1
readQuorum: 1 // R = 1
// W + R = 2 < N = 3 - может быть несогласованность
};
Hinted Handoff
Механизм для обработки временных отказов узлов.
class HintedHandoff {
private hints = new Map<string, Hint[]>();
async write(key: string, value: any, targetNodes: Node[]): Promise<void> {
const timestamp = Date.now();
const promises = targetNodes.map(async (node) => {
try {
await this.writeToNode(node, key, value, timestamp);
} catch (error) {
// Узел недоступен - сохраняем hint
await this.storeHint(node.id, key, value, timestamp);
// Пишем на другой узел временно
const fallbackNode = this.selectFallbackNode(node);
await this.writeToNode(fallbackNode, key, value, timestamp);
}
});
await Promise.all(promises);
}
private async storeHint(
nodeId: string,
key: string,
value: any,
timestamp: number
): Promise<void> {
const hints = this.hints.get(nodeId) || [];
hints.push({ key, value, timestamp });
this.hints.set(nodeId, hints);
}
async replayHints(nodeId: string): Promise<void> {
const hints = this.hints.get(nodeId) || [];
for (const hint of hints) {
try {
const node = this.getNode(nodeId);
await this.writeToNode(node, hint.key, hint.value, hint.timestamp);
} catch (error) {
console.error(`Failed to replay hint for node ${nodeId}:`, error);
// Оставляем hint для следующей попытки
continue;
}
}
// Очищаем успешно воспроизведенные hints
this.hints.delete(nodeId);
}
private selectFallbackNode(failedNode: Node): Node {
// Выбираем следующий узел в кольце consistent hashing
return this.nodes.find(n => n.id !== failedNode.id)!;
}
private async writeToNode(
node: Node,
key: string,
value: any,
timestamp: number
): Promise<void> {
await node.connection.query(
'INSERT INTO data (key, value, timestamp) VALUES (?, ?, ?)',
[key, JSON.stringify(value), timestamp]
);
}
private getNode(nodeId: string): Node {
return this.nodes.find(n => n.id === nodeId)!;
}
}
interface Hint {
key: string;
value: any;
timestamp: number;
}
Read Repair
Исправление несогласованности при чтении.
class ReadRepair {
async read(key: string, nodes: Node[]): Promise<any> {
// Читаем со всех узлов
const promises = nodes.map(node =>
this.readFromNode(node, key)
);
const results = await Promise.allSettled(promises);
const successful = results
.filter((r): r is PromiseFulfilledResult<VersionedValue> =>
r.status === 'fulfilled' && r.value !== null
)
.map(r => r.value);
if (successful.length === 0) {
return null;
}
// Находим самую свежую версию
const latest = successful.reduce((max, current) =>
current.timestamp > max.timestamp ? current : max
);
// Исправляем устаревшие узлы
await this.repairStaleNodes(key, latest, successful, nodes);
return latest.value;
}
private async repairStaleNodes(
key: string,
latest: VersionedValue,
allValues: VersionedValue[],
nodes: Node[]
): Promise<void> {
const staleNodes = nodes.filter((node, i) => {
const value = allValues[i];
return value && value.timestamp < latest.timestamp;
});
// Асинхронно обновляем устаревшие узлы
const repairs = staleNodes.map(node =>
this.writeToNode(node, key, latest.value, latest.timestamp)
.catch(error => {
console.error(`Read repair failed for node ${node.id}:`, error);
})
);
// Не ждем завершения - это фоновая операция
Promise.all(repairs);
}
private async readFromNode(node: Node, key: string): Promise<VersionedValue | null> {
try {
const result = await node.connection.query(
'SELECT value, timestamp FROM data WHERE key = ?',
[key]
);
if (result.length === 0) return null;
return {
value: JSON.parse(result[0].value),
timestamp: result[0].timestamp
};
} catch (error) {
console.error(`Read failed from node ${node.id}:`, error);
return null;
}
}
private async writeToNode(
node: Node,
key: string,
value: any,
timestamp: number
): Promise<void> {
await node.connection.query(
'INSERT INTO data (key, value, timestamp) VALUES (?, ?, ?) ON DUPLICATE KEY UPDATE value = ?, timestamp = ?',
[key, JSON.stringify(value), timestamp, JSON.stringify(value), timestamp]
);
}
}
Anti-Entropy (Merkle Trees)
Периодическая синхронизация данных между узлами.
class MerkleTree {
private root: MerkleNode;
constructor(data: Array<{ key: string; value: any }>) {
this.root = this.buildTree(data);
}
private buildTree(data: Array<{ key: string; value: any }>): MerkleNode {
if (data.length === 0) {
return { hash: this.hash(''), children: [] };
}
if (data.length === 1) {
const item = data[0];
return {
hash: this.hash(JSON.stringify(item)),
key: item.key,
value: item.value,
children: []
};
}
const mid = Math.floor(data.length / 2);
const left = this.buildTree(data.slice(0, mid));
const right = this.buildTree(data.slice(mid));
return {
hash: this.hash(left.hash + right.hash),
children: [left, right]
};
}
getRootHash(): string {
return this.root.hash;
}
findDifferences(other: MerkleTree): string[] {
const differences: string[] = [];
this.compareTrees(this.root, other.root, differences);
return differences;
}
private compareTrees(
node1: MerkleNode,
node2: MerkleNode,
differences: string[]
): void {
if (node1.hash === node2.hash) {
return; // Поддеревья идентичны
}
if (node1.key && node2.key) {
// Листовые узлы различаются
differences.push(node1.key);
return;
}
// Рекурсивно сравниваем дочерние узлы
if (node1.children.length > 0 && node2.children.length > 0) {
this.compareTrees(node1.children[0], node2.children[0], differences);
this.compareTrees(node1.children[1], node2.children[1], differences);
}
}
private hash(data: string): string {
// Простой хэш для примера
let hash = 0;
for (let i = 0; i < data.length; i++) {
hash = Math.imul(hash ^ data.charCodeAt(i), 2654435761);
}
return Math.abs(hash).toString(16);
}
}
interface MerkleNode {
hash: string;
key?: string;
value?: any;
children: MerkleNode[];
}
class AntiEntropy {
async synchronize(node1: Node, node2: Node): Promise<void> {
// Получаем данные с обоих узлов
const data1 = await this.getAllData(node1);
const data2 = await this.getAllData(node2);
// Строим Merkle деревья
const tree1 = new MerkleTree(data1);
const tree2 = new MerkleTree(data2);
// Если корневые хэши совпадают - данные идентичны
if (tree1.getRootHash() === tree2.getRootHash()) {
return;
}
// Находим различия
const differences = tree1.findDifferences(tree2);
// Синхронизируем различающиеся ключи
for (const key of differences) {
await this.synchronizeKey(key, node1, node2);
}
}
private async synchronizeKey(
key: string,
node1: Node,
node2: Node
): Promise<void> {
const [value1, value2] = await Promise.all([
this.readFromNode(node1, key),
this.readFromNode(node2, key)
]);
if (!value1 && value2) {
// Ключ есть только на node2
await this.writeToNode(node1, key, value2.value, value2.timestamp);
} else if (value1 && !value2) {
// Ключ есть только на node1
await this.writeToNode(node2, key, value1.value, value1.timestamp);
} else if (value1 && value2) {
// Ключ есть на обоих - выбираем более свежий
if (value1.timestamp > value2.timestamp) {
await this.writeToNode(node2, key, value1.value, value1.timestamp);
} else if (value2.timestamp > value1.timestamp) {
await this.writeToNode(node1, key, value2.value, value2.timestamp);
}
}
}
private async getAllData(node: Node): Promise<Array<{ key: string; value: any }>> {
const result = await node.connection.query('SELECT key, value FROM data');
return result.map((row: any) => ({
key: row.key,
value: JSON.parse(row.value)
}));
}
private async readFromNode(node: Node, key: string): Promise<VersionedValue | null> {
const result = await node.connection.query(
'SELECT value, timestamp FROM data WHERE key = ?',
[key]
);
if (result.length === 0) return null;
return {
value: JSON.parse(result[0].value),
timestamp: result[0].timestamp
};
}
private async writeToNode(
node: Node,
key: string,
value: any,
timestamp: number
): Promise<void> {
await node.connection.query(
'INSERT INTO data (key, value, timestamp) VALUES (?, ?, ?) ON DUPLICATE KEY UPDATE value = ?, timestamp = ?',
[key, JSON.stringify(value), timestamp, JSON.stringify(value), timestamp]
);
}
}
Vector Clocks
Отслеживание причинно-следственных связей между событиями.
class VectorClock {
private clock: Map<string, number>;
constructor(nodeId: string) {
this.clock = new Map([[nodeId, 0]]);
}
increment(nodeId: string): void {
const current = this.clock.get(nodeId) || 0;
this.clock.set(nodeId, current + 1);
}
merge(other: VectorClock): void {
for (const [nodeId, timestamp] of other.clock.entries()) {
const current = this.clock.get(nodeId) || 0;
this.clock.set(nodeId, Math.max(current, timestamp));
}
}
compare(other: VectorClock): 'before' | 'after' | 'concurrent' {
let hasLess = false;
let hasGreater = false;
const allNodes = new Set([
...this.clock.keys(),
...other.clock.keys()
]);
for (const nodeId of allNodes) {
const thisTime = this.clock.get(nodeId) || 0;
const otherTime = other.clock.get(nodeId) || 0;
if (thisTime < otherTime) hasLess = true;
if (thisTime > otherTime) hasGreater = true;
}
if (hasLess && !hasGreater) return 'before';
if (hasGreater && !hasLess) return 'after';
return 'concurrent';
}
toJSON(): Record<string, number> {
return Object.fromEntries(this.clock);
}
static fromJSON(data: Record<string, number>): VectorClock {
const clock = new VectorClock('');
clock.clock = new Map(Object.entries(data));
return clock;
}
}
class VectorClockDatabase {
constructor(private nodeId: string) {}
async write(key: string, value: any): Promise<void> {
const clock = new VectorClock(this.nodeId);
clock.increment(this.nodeId);
await db.query(
'INSERT INTO data (key, value, vector_clock) VALUES (?, ?, ?) ON DUPLICATE KEY UPDATE value = ?, vector_clock = ?',
[key, JSON.stringify(value), JSON.stringify(clock.toJSON()), JSON.stringify(value), JSON.stringify(clock.toJSON())]
);
}
async read(key: string): Promise<any> {
const results = await this.readFromAllReplicas(key);
if (results.length === 0) return null;
if (results.length === 1) return results[0].value;
// Разрешаем конфликты
return this.resolveConflicts(results);
}
private resolveConflicts(results: VersionedValue[]): any {
// Находим все несовместимые версии
const concurrent: VersionedValue[] = [];
for (const result of results) {
const isConcurrent = concurrent.every(c => {
const comparison = result.clock.compare(c.clock);
return comparison === 'concurrent';
});
if (isConcurrent) {
concurrent.push(result);
}
}
if (concurrent.length === 1) {
return concurrent[0].value;
}
// Несколько конкурирующих версий - нужно разрешение конфликта
// Можно использовать application-level merge или LWW
return this.applicationLevelMerge(concurrent);
}
private applicationLevelMerge(versions: VersionedValue[]): any {
// Пример: объединение изменений в shopping cart
const merged = { items: [] };
for (const version of versions) {
for (const item of version.value.items) {
if (!merged.items.find((i: any) => i.id === item.id)) {
merged.items.push(item);
}
}
}
return merged;
}
private async readFromAllReplicas(key: string): Promise<VersionedValue[]> {
// Читаем со всех реплик
return [];
}
}
Практический пример: Distributed Cache
class DistributedLeaderlessCache {
private nodes: Node[];
private replicationFactor = 3;
private writeQuorum = 2;
private readQuorum = 2;
constructor(nodes: Node[]) {
this.nodes = nodes;
}
async set(key: string, value: any, ttl: number): Promise<void> {
const targetNodes = this.selectNodes(key, this.replicationFactor);
const timestamp = Date.now();
const promises = targetNodes.map(node =>
this.writeToNode(node, key, value, timestamp, ttl)
);
const results = await Promise.allSettled(promises);
const successful = results.filter(r => r.status === 'fulfilled').length;
if (successful < this.writeQuorum) {
throw new Error('Write quorum not met');
}
}
async get(key: string): Promise<any> {
const targetNodes = this.selectNodes(key, this.replicationFactor);
const promises = targetNodes.map(node =>
this.readFromNode(node, key)
);
const results = await Promise.allSettled(promises);
const successful = results
.filter((r): r is PromiseFulfilledResult<VersionedValue> =>
r.status === 'fulfilled' && r.value !== null
)
.map(r => r.value);
if (successful.length < this.readQuorum) {
throw new Error('Read quorum not met');
}
// Возвращаем самую свежую версию
const latest = successful.reduce((max, current) =>
current.timestamp > max.timestamp ? current : max
);
return latest.value;
}
private selectNodes(key: string, count: number): Node[] {
const hash = this.hash(key);
const startIndex = hash % this.nodes.length;
const selected: Node[] = [];
for (let i = 0; i < count; i++) {
const index = (startIndex + i) % this.nodes.length;
selected.push(this.nodes[index]);
}
return selected;
}
private async writeToNode(
node: Node,
key: string,
value: any,
timestamp: number,
ttl: number
): Promise<void> {
await node.redis.set(
key,
JSON.stringify({ value, timestamp }),
'EX',
ttl
);
}
private async readFromNode(node: Node, key: string): Promise<VersionedValue | null> {
const data = await node.redis.get(key);
if (!data) return null;
const parsed = JSON.parse(data);
return {
value: parsed.value,
timestamp: parsed.timestamp
};
}
private hash(key: string): number {
let hash = 0;
for (let i = 0; i < key.length; i++) {
hash = Math.imul(hash ^ key.charCodeAt(i), 2654435761);
}
return Math.abs(hash);
}
}
Заключение
Leaderless архитектура обеспечивает:
Преимущества:
- Нет единой точки отказа
- Высокая доступность
- Масштабируемость записи
- Устойчивость к сетевым разделениям
Недостатки:
- Eventual consistency
- Сложность разрешения конфликтов
- Больше сетевого трафика
- Сложность реализации
Когда использовать:
- Высокие требования к доступности
- Географически распределенные системы
- Толерантность к eventual consistency
- Необходимость масштабирования записи