microservices

Взаимодействие микросервисов с использованием Kafka

Асинхронное взаимодействие микросервисов через Apache Kafka

#microservices #kafka #event-driven #messaging

Взаимодействие микросервисов с использованием Kafka

Apache Kafka — распределенная платформа потоковой передачи данных, идеально подходящая для построения event-driven архитектуры микросервисов.

Основные концепции

Topics и Partitions

// Создание топика
class KafkaTopicManager {
  async createTopic(name: string, partitions: number = 3) {
    await this.admin.createTopics({
      topics: [{
        topic: name,
        numPartitions: partitions,
        replicationFactor: 3
      }]
    });
  }
}

// Топики для разных событий
const TOPICS = {
  ORDER_CREATED: 'orders.created',
  ORDER_COMPLETED: 'orders.completed',
  PAYMENT_PROCESSED: 'payments.processed',
  INVENTORY_UPDATED: 'inventory.updated'
};

Producer

Отправка событий

class OrderService {
  constructor(private kafka: Kafka) {
    this.producer = kafka.producer();
  }
  
  async createOrder(order: Order) {
    // Сохраняем заказ
    await this.repository.save(order);
    
    // Публикуем событие
    await this.producer.send({
      topic: TOPICS.ORDER_CREATED,
      messages: [{
        key: order.id,
        value: JSON.stringify({
          orderId: order.id,
          customerId: order.customerId,
          items: order.items,
          total: order.total,
          timestamp: new Date().toISOString()
        })
      }]
    });
  }
}

Consumer

Обработка событий

class InventoryService {
  constructor(private kafka: Kafka) {
    this.consumer = kafka.consumer({ groupId: 'inventory-service' });
  }
  
  async start() {
    await this.consumer.connect();
    await this.consumer.subscribe({ 
      topic: TOPICS.ORDER_CREATED,
      fromBeginning: false 
    });
    
    await this.consumer.run({
      eachMessage: async ({ topic, partition, message }) => {
        const order = JSON.parse(message.value.toString());
        await this.reserveInventory(order);
      }
    });
  }
  
  private async reserveInventory(order: any) {
    for (const item of order.items) {
      await this.updateStock(item.productId, -item.quantity);
    }
    
    // Публикуем событие об обновлении
    await this.publishInventoryUpdated(order.orderId);
  }
}

Event Sourcing

Хранение событий

class EventStore {
  async appendEvent(event: DomainEvent) {
    await this.producer.send({
      topic: `events.${event.aggregateType}`,
      messages: [{
        key: event.aggregateId,
        value: JSON.stringify(event),
        headers: {
          'event-type': event.type,
          'event-version': '1'
        }
      }]
    });
  }
  
  async getEvents(aggregateId: string): Promise<DomainEvent[]> {
    const events: DomainEvent[] = [];
    
    await this.consumer.run({
      eachMessage: async ({ message }) => {
        if (message.key?.toString() === aggregateId) {
          events.push(JSON.parse(message.value.toString()));
        }
      }
    });
    
    return events;
  }
}

Saga Pattern с Kafka

class OrderSaga {
  async handleOrderCreated(order: Order) {
    try {
      // Шаг 1: Резервируем инвентарь
      await this.publishCommand('inventory.reserve', order);
      
      // Ждем подтверждения
      const inventoryReserved = await this.waitForEvent('inventory.reserved');
      
      // Шаг 2: Обрабатываем платеж
      await this.publishCommand('payment.process', order);
      
      const paymentProcessed = await this.waitForEvent('payment.processed');
      
      // Шаг 3: Создаем доставку
      await this.publishCommand('shipping.create', order);
      
    } catch (error) {
      // Компенсирующие транзакции
      await this.publishCommand('inventory.release', order);
      await this.publishCommand('payment.refund', order);
    }
  }
}

Обработка ошибок

class ResilientConsumer {
  async processMessage(message: KafkaMessage) {
    const maxRetries = 3;
    let attempt = 0;
    
    while (attempt < maxRetries) {
      try {
        await this.handleMessage(message);
        return;
      } catch (error) {
        attempt++;
        
        if (attempt >= maxRetries) {
          // Отправляем в Dead Letter Queue
          await this.sendToDLQ(message, error);
        } else {
          // Экспоненциальная задержка
          await this.delay(Math.pow(2, attempt) * 1000);
        }
      }
    }
  }
  
  private async sendToDLQ(message: KafkaMessage, error: Error) {
    await this.producer.send({
      topic: 'dlq.orders',
      messages: [{
        ...message,
        headers: {
          ...message.headers,
          'error': error.message,
          'failed-at': new Date().toISOString()
        }
      }]
    });
  }
}

Мониторинг

class KafkaMonitor {
  async getConsumerLag(groupId: string) {
    const offsets = await this.admin.fetchOffsets({ 
      groupId 
    });
    
    return offsets.map(offset => ({
      topic: offset.topic,
      partition: offset.partition,
      lag: offset.high - offset.offset
    }));
  }
}

Best Practices

  1. Используйте ключи сообщений — для гарантии порядка
  2. Идемпотентность — обработка одного события несколько раз должна быть безопасной
  3. Schema Registry — для версионирования форматов событий
  4. Мониторинг lag — отслеживайте отставание консьюмеров
  5. Dead Letter Queue — для обработки проблемных сообщений

Заключение

Kafka обеспечивает надежное асинхронное взаимодействие между микросервисами, позволяя строить масштабируемые event-driven системы.