cloud
Шарды и реплики в георасределённой среде
Организация данных в географически распределенных системах
•
#distributed-systems
#geo-distribution
#sharding
#replication
Шарды и реплики в георасределённой среде
Георасределенные системы требуют особого подхода к организации данных для обеспечения низкой задержки и высокой доступности.
Проблемы георасределения
Задержка сети
// Типичные задержки между регионами
const LATENCY_MS = {
sameDatacenter: 1,
sameRegion: 5,
crossRegion: 50,
crossContinent: 150,
crossOcean: 300
};
class LatencyAwareRouter {
async route(userId: string, userRegion: string): Promise<string> {
// Выбираем ближайший регион
const region = this.selectNearestRegion(userRegion);
return `https://${region}.api.example.com`;
}
private selectNearestRegion(userRegion: string): string {
const regionMapping = {
'us-east': 'us-east-1',
'us-west': 'us-west-1',
'eu': 'eu-west-1',
'asia': 'ap-southeast-1'
};
return regionMapping[userRegion] || 'us-east-1';
}
}
CAP теорема в контексте георасределения
interface GeoDistributedConfig {
consistency: 'strong' | 'eventual' | 'causal';
availability: 'high' | 'medium';
partitionTolerance: boolean;
}
// Сильная консистентность - высокая задержка
const strongConsistency: GeoDistributedConfig = {
consistency: 'strong',
availability: 'medium',
partitionTolerance: true
};
// Eventual consistency - низкая задержка
const eventualConsistency: GeoDistributedConfig = {
consistency: 'eventual',
availability: 'high',
partitionTolerance: true
};
Стратегии георасределения
1. Geographic Sharding
Данные пользователей хранятся в их регионе.
enum Region {
US_EAST = 'us-east-1',
US_WEST = 'us-west-1',
EU_WEST = 'eu-west-1',
AP_SOUTHEAST = 'ap-southeast-1'
}
interface GeoShard {
region: Region;
database: DatabaseConnection;
replicas: DatabaseConnection[];
}
class GeographicSharding {
private shards: Map<Region, GeoShard> = new Map();
constructor(shards: GeoShard[]) {
shards.forEach(shard => {
this.shards.set(shard.region, shard);
});
}
async write(userId: string, data: any): Promise<void> {
const userRegion = await this.getUserRegion(userId);
const shard = this.shards.get(userRegion);
if (!shard) {
throw new Error(`No shard for region ${userRegion}`);
}
// Пишем в master региона
await shard.database.query(
'INSERT INTO users (id, data, region) VALUES (?, ?, ?)',
[userId, JSON.stringify(data), userRegion]
);
// Асинхронно реплицируем в другие регионы
this.replicateToOtherRegions(userId, data, userRegion);
}
async read(userId: string): Promise<any> {
const userRegion = await this.getUserRegion(userId);
const shard = this.shards.get(userRegion);
if (!shard) {
throw new Error(`No shard for region ${userRegion}`);
}
// Читаем из локальной реплики
const result = await shard.database.query(
'SELECT data FROM users WHERE id = ?',
[userId]
);
return result.length > 0 ? JSON.parse(result[0].data) : null;
}
private async replicateToOtherRegions(
userId: string,
data: any,
sourceRegion: Region
): Promise<void> {
const otherShards = Array.from(this.shards.values())
.filter(shard => shard.region !== sourceRegion);
const promises = otherShards.map(shard =>
shard.database.query(
'INSERT INTO users (id, data, region) VALUES (?, ?, ?) ON DUPLICATE KEY UPDATE data = ?',
[userId, JSON.stringify(data), sourceRegion, JSON.stringify(data)]
).catch(error => {
console.error(`Replication to ${shard.region} failed:`, error);
})
);
// Не ждем завершения - асинхронная репликация
Promise.all(promises);
}
private async getUserRegion(userId: string): Promise<Region> {
// Можно хранить в кэше или определять по IP
const cached = await redis.get(`user:${userId}:region`);
return cached as Region || Region.US_EAST;
}
}
2. Multi-Region Replication
Данные реплицируются во все регионы.
class MultiRegionReplication {
private regions: Map<Region, DatabaseConnection> = new Map();
constructor(connections: Array<{ region: Region; connection: DatabaseConnection }>) {
connections.forEach(({ region, connection }) => {
this.regions.set(region, connection);
});
}
async write(key: string, value: any, primaryRegion: Region): Promise<void> {
const timestamp = Date.now();
// Синхронная запись в primary регион
const primary = this.regions.get(primaryRegion);
if (!primary) {
throw new Error(`Primary region ${primaryRegion} not found`);
}
await primary.query(
'INSERT INTO data (key, value, timestamp, region) VALUES (?, ?, ?, ?)',
[key, JSON.stringify(value), timestamp, primaryRegion]
);
// Асинхронная репликация в другие регионы
this.replicateAsync(key, value, timestamp, primaryRegion);
}
async read(key: string, userRegion: Region): Promise<any> {
// Читаем из ближайшего региона
const connection = this.regions.get(userRegion) ||
this.regions.values().next().value;
const result = await connection.query(
'SELECT value, timestamp FROM data WHERE key = ?',
[key]
);
return result.length > 0 ? JSON.parse(result[0].value) : null;
}
private async replicateAsync(
key: string,
value: any,
timestamp: number,
sourceRegion: Region
): Promise<void> {
const otherRegions = Array.from(this.regions.entries())
.filter(([region]) => region !== sourceRegion);
for (const [region, connection] of otherRegions) {
// Используем очередь для надежной репликации
await messageQueue.publish('replication', {
targetRegion: region,
key,
value,
timestamp,
sourceRegion
});
}
}
}
3. Active-Active Multi-Master
Запись возможна в любом регионе.
class ActiveActiveReplication {
private regions: Map<Region, DatabaseConnection> = new Map();
async write(
key: string,
value: any,
region: Region,
vectorClock: VectorClock
): Promise<void> {
const connection = this.regions.get(region);
if (!connection) {
throw new Error(`Region ${region} not found`);
}
// Инкрементируем vector clock
vectorClock.increment(region);
// Пишем локально
await connection.query(
'INSERT INTO data (key, value, vector_clock) VALUES (?, ?, ?) ON DUPLICATE KEY UPDATE value = ?, vector_clock = ?',
[
key,
JSON.stringify(value),
JSON.stringify(vectorClock.toJSON()),
JSON.stringify(value),
JSON.stringify(vectorClock.toJSON())
]
);
// Реплицируем в другие регионы
await this.replicateToAll(key, value, vectorClock, region);
}
async read(key: string, region: Region): Promise<any> {
const connection = this.regions.get(region);
if (!connection) {
throw new Error(`Region ${region} not found`);
}
const result = await connection.query(
'SELECT value, vector_clock FROM data WHERE key = ?',
[key]
);
if (result.length === 0) return null;
return {
value: JSON.parse(result[0].value),
vectorClock: VectorClock.fromJSON(JSON.parse(result[0].vector_clock))
};
}
async resolveConflicts(key: string): Promise<any> {
// Читаем из всех регионов
const promises = Array.from(this.regions.entries()).map(
async ([region, connection]) => {
const result = await connection.query(
'SELECT value, vector_clock FROM data WHERE key = ?',
[key]
);
if (result.length === 0) return null;
return {
region,
value: JSON.parse(result[0].value),
vectorClock: VectorClock.fromJSON(JSON.parse(result[0].vector_clock))
};
}
);
const results = (await Promise.all(promises)).filter(r => r !== null);
if (results.length === 0) return null;
if (results.length === 1) return results[0].value;
// Находим конкурирующие версии
const concurrent = this.findConcurrentVersions(results);
if (concurrent.length === 1) {
return concurrent[0].value;
}
// Разрешаем конфликт на уровне приложения
return this.mergeValues(concurrent.map(c => c.value));
}
private findConcurrentVersions(
results: Array<{ region: Region; value: any; vectorClock: VectorClock }>
): Array<{ region: Region; value: any; vectorClock: VectorClock }> {
const concurrent: typeof results = [];
for (const result of results) {
const isDominated = results.some(other => {
if (other === result) return false;
return result.vectorClock.compare(other.vectorClock) === 'before';
});
if (!isDominated) {
concurrent.push(result);
}
}
return concurrent;
}
private mergeValues(values: any[]): any {
// Application-specific merge logic
// Например, для shopping cart объединяем items
if (values[0]?.items) {
const allItems = new Map();
for (const value of values) {
for (const item of value.items) {
allItems.set(item.id, item);
}
}
return { items: Array.from(allItems.values()) };
}
// Fallback: Last-Write-Wins
return values[0];
}
private async replicateToAll(
key: string,
value: any,
vectorClock: VectorClock,
sourceRegion: Region
): Promise<void> {
const otherRegions = Array.from(this.regions.entries())
.filter(([region]) => region !== sourceRegion);
const promises = otherRegions.map(([region, connection]) =>
connection.query(
'INSERT INTO data (key, value, vector_clock) VALUES (?, ?, ?) ON DUPLICATE KEY UPDATE value = ?, vector_clock = ?',
[
key,
JSON.stringify(value),
JSON.stringify(vectorClock.toJSON()),
JSON.stringify(value),
JSON.stringify(vectorClock.toJSON())
]
).catch(error => {
console.error(`Replication to ${region} failed:`, error);
})
);
await Promise.allSettled(promises);
}
}
class VectorClock {
private clock: Map<string, number>;
constructor(nodeId: string) {
this.clock = new Map([[nodeId, 0]]);
}
increment(nodeId: string): void {
const current = this.clock.get(nodeId) || 0;
this.clock.set(nodeId, current + 1);
}
compare(other: VectorClock): 'before' | 'after' | 'concurrent' {
let hasLess = false;
let hasGreater = false;
const allNodes = new Set([
...this.clock.keys(),
...other.clock.keys()
]);
for (const nodeId of allNodes) {
const thisTime = this.clock.get(nodeId) || 0;
const otherTime = other.clock.get(nodeId) || 0;
if (thisTime < otherTime) hasLess = true;
if (thisTime > otherTime) hasGreater = true;
}
if (hasLess && !hasGreater) return 'before';
if (hasGreater && !hasLess) return 'after';
return 'concurrent';
}
toJSON(): Record<string, number> {
return Object.fromEntries(this.clock);
}
static fromJSON(data: Record<string, number>): VectorClock {
const clock = new VectorClock('');
clock.clock = new Map(Object.entries(data));
return clock;
}
}
Паттерны данных
1. Home Region Pattern
class HomeRegionPattern {
async createUser(userData: UserData, region: Region): Promise<User> {
const user: User = {
id: generateId(),
...userData,
homeRegion: region,
createdAt: new Date()
};
// Пишем в home region
await this.writeToRegion(region, user);
// Создаем read replicas в других регионах
await this.createReadReplicas(user);
return user;
}
async updateUser(userId: string, updates: Partial<UserData>): Promise<void> {
// Находим home region пользователя
const homeRegion = await this.getUserHomeRegion(userId);
// Обновляем в home region
await this.updateInRegion(homeRegion, userId, updates);
// Реплицируем изменения
await this.replicateUpdates(userId, updates, homeRegion);
}
async getUser(userId: string, requestRegion: Region): Promise<User> {
// Читаем из ближайшей реплики
const user = await this.readFromRegion(requestRegion, userId);
if (!user) {
// Fallback: читаем из home region
const homeRegion = await this.getUserHomeRegion(userId);
return this.readFromRegion(homeRegion, userId);
}
return user;
}
private async writeToRegion(region: Region, user: User): Promise<void> {
const connection = this.getConnection(region);
await connection.query(
'INSERT INTO users (id, data, home_region) VALUES (?, ?, ?)',
[user.id, JSON.stringify(user), region]
);
}
private async createReadReplicas(user: User): Promise<void> {
const otherRegions = this.getAllRegions()
.filter(r => r !== user.homeRegion);
for (const region of otherRegions) {
await messageQueue.publish('create-replica', {
region,
user
});
}
}
private async getUserHomeRegion(userId: string): Promise<Region> {
const cached = await redis.get(`user:${userId}:home-region`);
if (cached) return cached as Region;
// Ищем во всех регионах
for (const region of this.getAllRegions()) {
const connection = this.getConnection(region);
const result = await connection.query(
'SELECT home_region FROM users WHERE id = ?',
[userId]
);
if (result.length > 0) {
const homeRegion = result[0].home_region;
await redis.set(`user:${userId}:home-region`, homeRegion, 'EX', 3600);
return homeRegion;
}
}
throw new Error(`User ${userId} not found`);
}
private getConnection(region: Region): DatabaseConnection {
// Возвращает соединение для региона
return connections.get(region)!;
}
private getAllRegions(): Region[] {
return [
Region.US_EAST,
Region.US_WEST,
Region.EU_WEST,
Region.AP_SOUTHEAST
];
}
private async readFromRegion(region: Region, userId: string): Promise<User | null> {
const connection = this.getConnection(region);
const result = await connection.query(
'SELECT data FROM users WHERE id = ?',
[userId]
);
return result.length > 0 ? JSON.parse(result[0].data) : null;
}
private async updateInRegion(
region: Region,
userId: string,
updates: Partial<UserData>
): Promise<void> {
const connection = this.getConnection(region);
const user = await this.readFromRegion(region, userId);
if (!user) {
throw new Error(`User ${userId} not found in region ${region}`);
}
Object.assign(user, updates);
await connection.query(
'UPDATE users SET data = ? WHERE id = ?',
[JSON.stringify(user), userId]
);
}
private async replicateUpdates(
userId: string,
updates: Partial<UserData>,
sourceRegion: Region
): Promise<void> {
const otherRegions = this.getAllRegions()
.filter(r => r !== sourceRegion);
for (const region of otherRegions) {
await messageQueue.publish('replicate-update', {
region,
userId,
updates
});
}
}
}
interface UserData {
name: string;
email: string;
}
interface User extends UserData {
id: string;
homeRegion: Region;
createdAt: Date;
}
2. Global Tables Pattern
class GlobalTablesPattern {
private regions: Map<Region, DatabaseConnection> = new Map();
async write(
tableName: string,
key: string,
value: any,
sourceRegion: Region
): Promise<void> {
const timestamp = Date.now();
const version = generateVersion();
// Пишем в source region
await this.writeToRegion(
sourceRegion,
tableName,
key,
value,
timestamp,
version
);
// DynamoDB Global Tables автоматически реплицирует
// Здесь эмулируем это поведение
await this.replicateGlobally(
tableName,
key,
value,
timestamp,
version,
sourceRegion
);
}
async read(
tableName: string,
key: string,
region: Region
): Promise<any> {
const connection = this.regions.get(region);
if (!connection) {
throw new Error(`Region ${region} not found`);
}
const result = await connection.query(
`SELECT value, timestamp, version FROM ${tableName} WHERE key = ?`,
[key]
);
return result.length > 0 ? JSON.parse(result[0].value) : null;
}
private async writeToRegion(
region: Region,
tableName: string,
key: string,
value: any,
timestamp: number,
version: string
): Promise<void> {
const connection = this.regions.get(region);
if (!connection) {
throw new Error(`Region ${region} not found`);
}
await connection.query(
`INSERT INTO ${tableName} (key, value, timestamp, version)
VALUES (?, ?, ?, ?)
ON DUPLICATE KEY UPDATE
value = IF(timestamp < VALUES(timestamp), VALUES(value), value),
timestamp = IF(timestamp < VALUES(timestamp), VALUES(timestamp), timestamp),
version = IF(timestamp < VALUES(timestamp), VALUES(version), version)`,
[key, JSON.stringify(value), timestamp, version]
);
}
private async replicateGlobally(
tableName: string,
key: string,
value: any,
timestamp: number,
version: string,
sourceRegion: Region
): Promise<void> {
const otherRegions = Array.from(this.regions.keys())
.filter(r => r !== sourceRegion);
const promises = otherRegions.map(region =>
this.writeToRegion(region, tableName, key, value, timestamp, version)
.catch(error => {
console.error(`Replication to ${region} failed:`, error);
// Добавляем в очередь для повторной попытки
return this.queueReplication(region, tableName, key, value, timestamp, version);
})
);
await Promise.allSettled(promises);
}
private async queueReplication(
region: Region,
tableName: string,
key: string,
value: any,
timestamp: number,
version: string
): Promise<void> {
await messageQueue.publish('replication-retry', {
region,
tableName,
key,
value,
timestamp,
version
});
}
}
function generateVersion(): string {
return `${Date.now()}-${Math.random().toString(36).substr(2, 9)}`;
}
Мониторинг георасределенных систем
class GeoDistributedMonitoring {
async collectMetrics(): Promise<GeoMetrics> {
const regions = [
Region.US_EAST,
Region.US_WEST,
Region.EU_WEST,
Region.AP_SOUTHEAST
];
const promises = regions.map(async (region) => {
const connection = this.getConnection(region);
const [replicationLag, dataSize, queryLatency] = await Promise.all([
this.getReplicationLag(connection),
this.getDataSize(connection),
this.getQueryLatency(connection)
]);
return {
region,
replicationLag,
dataSize,
queryLatency,
timestamp: new Date()
};
});
const regionMetrics = await Promise.all(promises);
return {
regions: regionMetrics,
crossRegionLatency: await this.measureCrossRegionLatency()
};
}
private async getReplicationLag(connection: DatabaseConnection): Promise<number> {
const result = await connection.query(
'SELECT MAX(UNIX_TIMESTAMP() - timestamp) as lag FROM replication_status'
);
return result[0]?.lag || 0;
}
private async getDataSize(connection: DatabaseConnection): Promise<number> {
const result = await connection.query(
'SELECT SUM(data_length + index_length) as size FROM information_schema.tables'
);
return result[0]?.size || 0;
}
private async getQueryLatency(connection: DatabaseConnection): Promise<number> {
const start = Date.now();
await connection.query('SELECT 1');
return Date.now() - start;
}
private async measureCrossRegionLatency(): Promise<Record<string, Record<string, number>>> {
const regions = [Region.US_EAST, Region.US_WEST, Region.EU_WEST, Region.AP_SOUTHEAST];
const latencies: Record<string, Record<string, number>> = {};
for (const from of regions) {
latencies[from] = {};
for (const to of regions) {
if (from === to) {
latencies[from][to] = 0;
continue;
}
const start = Date.now();
try {
await this.pingRegion(to);
latencies[from][to] = Date.now() - start;
} catch (error) {
latencies[from][to] = -1; // Недоступен
}
}
}
return latencies;
}
private async pingRegion(region: Region): Promise<void> {
const connection = this.getConnection(region);
await connection.query('SELECT 1');
}
private getConnection(region: Region): DatabaseConnection {
return connections.get(region)!;
}
}
interface GeoMetrics {
regions: Array<{
region: Region;
replicationLag: number;
dataSize: number;
queryLatency: number;
timestamp: Date;
}>;
crossRegionLatency: Record<string, Record<string, number>>;
}
Best Practices
1. Data Locality
// ✅ Хранить данные близко к пользователям
class DataLocalityOptimizer {
async optimizeUserPlacement(userId: string): Promise<Region> {
// Анализируем, откуда пользователь чаще всего обращается
const accessLog = await this.getUserAccessLog(userId);
const mostFrequentRegion = this.getMostFrequentRegion(accessLog);
// Мигрируем home region если нужно
const currentHomeRegion = await this.getUserHomeRegion(userId);
if (currentHomeRegion !== mostFrequentRegion) {
await this.migrateUserHomeRegion(userId, mostFrequentRegion);
}
return mostFrequentRegion;
}
private async getUserAccessLog(userId: string): Promise<AccessLog[]> {
return analytics.query(`
SELECT region, COUNT(*) as count
FROM access_logs
WHERE user_id = ? AND timestamp > DATE_SUB(NOW(), INTERVAL 30 DAY)
GROUP BY region
`, [userId]);
}
private getMostFrequentRegion(accessLog: AccessLog[]): Region {
return accessLog.reduce((max, current) =>
current.count > max.count ? current : max
).region;
}
private async getUserHomeRegion(userId: string): Promise<Region> {
// Реализация
return Region.US_EAST;
}
private async migrateUserHomeRegion(userId: string, newRegion: Region): Promise<void> {
// Реализация миграции
}
}
interface AccessLog {
region: Region;
count: number;
}
2. Conflict Resolution
// Определите стратегию разрешения конфликтов
class ConflictResolver {
resolve(conflicts: Conflict[]): any {
// Last-Write-Wins
if (this.isLWWApplicable(conflicts)) {
return this.lastWriteWins(conflicts);
}
// Application-level merge
if (this.isMergeable(conflicts)) {
return this.merge(conflicts);
}
// Manual resolution required
throw new ConflictError('Manual resolution required', conflicts);
}
private lastWriteWins(conflicts: Conflict[]): any {
return conflicts.reduce((latest, current) =>
current.timestamp > latest.timestamp ? current : latest
).value;
}
private merge(conflicts: Conflict[]): any {
// Пример для shopping cart
const allItems = new Map();
for (const conflict of conflicts) {
for (const item of conflict.value.items) {
allItems.set(item.id, item);
}
}
return { items: Array.from(allItems.values()) };
}
private isLWWApplicable(conflicts: Conflict[]): boolean {
// LWW подходит для immutable данных
return true;
}
private isMergeable(conflicts: Conflict[]): boolean {
// Проверяем, можно ли объединить
return conflicts.every(c => c.value?.items);
}
}
interface Conflict {
value: any;
timestamp: number;
region: Region;
}
class ConflictError extends Error {
constructor(message: string, public conflicts: Conflict[]) {
super(message);
}
}
Заключение
Георасределенные системы требуют:
- Правильный выбор стратегии — geographic sharding vs multi-region replication
- Управление консистентностью — strong vs eventual consistency
- Data locality — данные близко к пользователям
- Conflict resolution — стратегия разрешения конфликтов
- Мониторинг — replication lag, cross-region latency
- Failover — автоматическое переключение при отказах