microservices
Взаимодействие микросервисов с использованием Kafka
Асинхронное взаимодействие микросервисов через Apache Kafka
•
#microservices
#kafka
#event-driven
#messaging
Взаимодействие микросервисов с использованием Kafka
Apache Kafka — распределенная платформа потоковой передачи данных, идеально подходящая для построения event-driven архитектуры микросервисов.
Основные концепции
Topics и Partitions
// Создание топика
class KafkaTopicManager {
async createTopic(name: string, partitions: number = 3) {
await this.admin.createTopics({
topics: [{
topic: name,
numPartitions: partitions,
replicationFactor: 3
}]
});
}
}
// Топики для разных событий
const TOPICS = {
ORDER_CREATED: 'orders.created',
ORDER_COMPLETED: 'orders.completed',
PAYMENT_PROCESSED: 'payments.processed',
INVENTORY_UPDATED: 'inventory.updated'
};
Producer
Отправка событий
class OrderService {
constructor(private kafka: Kafka) {
this.producer = kafka.producer();
}
async createOrder(order: Order) {
// Сохраняем заказ
await this.repository.save(order);
// Публикуем событие
await this.producer.send({
topic: TOPICS.ORDER_CREATED,
messages: [{
key: order.id,
value: JSON.stringify({
orderId: order.id,
customerId: order.customerId,
items: order.items,
total: order.total,
timestamp: new Date().toISOString()
})
}]
});
}
}
Consumer
Обработка событий
class InventoryService {
constructor(private kafka: Kafka) {
this.consumer = kafka.consumer({ groupId: 'inventory-service' });
}
async start() {
await this.consumer.connect();
await this.consumer.subscribe({
topic: TOPICS.ORDER_CREATED,
fromBeginning: false
});
await this.consumer.run({
eachMessage: async ({ topic, partition, message }) => {
const order = JSON.parse(message.value.toString());
await this.reserveInventory(order);
}
});
}
private async reserveInventory(order: any) {
for (const item of order.items) {
await this.updateStock(item.productId, -item.quantity);
}
// Публикуем событие об обновлении
await this.publishInventoryUpdated(order.orderId);
}
}
Event Sourcing
Хранение событий
class EventStore {
async appendEvent(event: DomainEvent) {
await this.producer.send({
topic: `events.${event.aggregateType}`,
messages: [{
key: event.aggregateId,
value: JSON.stringify(event),
headers: {
'event-type': event.type,
'event-version': '1'
}
}]
});
}
async getEvents(aggregateId: string): Promise<DomainEvent[]> {
const events: DomainEvent[] = [];
await this.consumer.run({
eachMessage: async ({ message }) => {
if (message.key?.toString() === aggregateId) {
events.push(JSON.parse(message.value.toString()));
}
}
});
return events;
}
}
Saga Pattern с Kafka
class OrderSaga {
async handleOrderCreated(order: Order) {
try {
// Шаг 1: Резервируем инвентарь
await this.publishCommand('inventory.reserve', order);
// Ждем подтверждения
const inventoryReserved = await this.waitForEvent('inventory.reserved');
// Шаг 2: Обрабатываем платеж
await this.publishCommand('payment.process', order);
const paymentProcessed = await this.waitForEvent('payment.processed');
// Шаг 3: Создаем доставку
await this.publishCommand('shipping.create', order);
} catch (error) {
// Компенсирующие транзакции
await this.publishCommand('inventory.release', order);
await this.publishCommand('payment.refund', order);
}
}
}
Обработка ошибок
class ResilientConsumer {
async processMessage(message: KafkaMessage) {
const maxRetries = 3;
let attempt = 0;
while (attempt < maxRetries) {
try {
await this.handleMessage(message);
return;
} catch (error) {
attempt++;
if (attempt >= maxRetries) {
// Отправляем в Dead Letter Queue
await this.sendToDLQ(message, error);
} else {
// Экспоненциальная задержка
await this.delay(Math.pow(2, attempt) * 1000);
}
}
}
}
private async sendToDLQ(message: KafkaMessage, error: Error) {
await this.producer.send({
topic: 'dlq.orders',
messages: [{
...message,
headers: {
...message.headers,
'error': error.message,
'failed-at': new Date().toISOString()
}
}]
});
}
}
Мониторинг
class KafkaMonitor {
async getConsumerLag(groupId: string) {
const offsets = await this.admin.fetchOffsets({
groupId
});
return offsets.map(offset => ({
topic: offset.topic,
partition: offset.partition,
lag: offset.high - offset.offset
}));
}
}
Best Practices
- Используйте ключи сообщений — для гарантии порядка
- Идемпотентность — обработка одного события несколько раз должна быть безопасной
- Schema Registry — для версионирования форматов событий
- Мониторинг lag — отслеживайте отставание консьюмеров
- Dead Letter Queue — для обработки проблемных сообщений
Заключение
Kafka обеспечивает надежное асинхронное взаимодействие между микросервисами, позволяя строить масштабируемые event-driven системы.