【.NET Aspire入門】第3回:メッセージングとイベント駆動アーキテクチャ
.NETAspireメッセージングRabbitMQKafkaイベント駆動
はじめに
マイクロサービスアーキテクチャでは、サービス間の疎結合な通信が重要です。前回はデータベースとキャッシングの統合について学びました。今回は、メッセージングシステムを使った非同期通信とイベント駆動アーキテクチャの実装方法を解説します。
.NET Aspireは、RabbitMQ、Azure Service Bus、Apache Kafkaなどの主要なメッセージングプラットフォームをシームレスに統合し、開発者が複雑な設定に悩むことなく、ビジネスロジックの実装に集中できる環境を提供します。
メッセージングの基礎概念
同期通信 vs 非同期通信
graph LR
A[Order Service] -->|同期HTTP| B[Inventory Service]
A -->|同期HTTP| C[Payment Service]
D[Order Service] -->|非同期| E[Message Queue]
E -->|非同期| F[Inventory Service]
E -->|非同期| G[Payment Service]
非同期通信の利点:
- 疎結合: サービス間の依存関係を最小化
- スケーラビリティ: 負荷に応じた柔軟なスケーリング
- 耐障害性: 一時的な障害に対する回復力
- パフォーマンス: 非ブロッキング処理による応答性向上
RabbitMQの統合
基本的なセットアップ
// AppHost/Program.cs
var builder = DistributedApplication.CreateBuilder(args);
// RabbitMQの追加
var messaging = builder.AddRabbitMQ("messaging")
.WithDataVolume("rabbitmq-data")
.WithManagementPlugin(); // 管理UIの有効化
// サービスの追加
var orderService = builder.AddProject<Projects.OrderService>("order-service")
.WithReference(messaging);
var inventoryService = builder.AddProject<Projects.InventoryService>("inventory-service")
.WithReference(messaging);
var notificationService = builder.AddProject<Projects.NotificationService>("notification-service")
.WithReference(messaging);
builder.Build().Run();
プロデューサーの実装(注文サービス)
// OrderService/Program.cs
using MassTransit;
var builder = WebApplication.CreateBuilder(args);
builder.AddServiceDefaults();
// MassTransitとRabbitMQの設定
builder.AddRabbitMQClient("messaging");
builder.Services.AddMassTransit(x =>
{
x.SetKebabCaseEndpointNameFormatter();
x.UsingRabbitMq((context, cfg) =>
{
var configuration = context.GetRequiredService<IConfiguration>();
var connectionString = configuration.GetConnectionString("messaging");
cfg.Host(new Uri(connectionString!));
cfg.ConfigureEndpoints(context);
});
});
var app = builder.Build();
// 注文作成API
app.MapPost("/api/orders", async (
CreateOrderRequest request,
IPublishEndpoint publishEndpoint,
ILogger<Program> logger) =>
{
var orderId = Guid.NewGuid();
// 注文作成イベントの発行
var orderCreatedEvent = new OrderCreated
{
OrderId = orderId,
CustomerId = request.CustomerId,
Items = request.Items.Select(i => new OrderItem
{
ProductId = i.ProductId,
Quantity = i.Quantity,
Price = i.Price
}).ToList(),
TotalAmount = request.Items.Sum(i => i.Quantity * i.Price),
CreatedAt = DateTime.UtcNow
};
await publishEndpoint.Publish(orderCreatedEvent);
logger.LogInformation("Order created: {OrderId}", orderId);
return Results.Created($"/api/orders/{orderId}", new { orderId });
});
app.Run();
// リクエスト/レスポンスモデル
public record CreateOrderRequest(
Guid CustomerId,
List<OrderItemRequest> Items
);
public record OrderItemRequest(
Guid ProductId,
int Quantity,
decimal Price
);
// イベント定義(共有ライブラリに配置するのが理想)
public record OrderCreated
{
public Guid OrderId { get; init; }
public Guid CustomerId { get; init; }
public List<OrderItem> Items { get; init; } = new();
public decimal TotalAmount { get; init; }
public DateTime CreatedAt { get; init; }
}
public record OrderItem
{
public Guid ProductId { get; init; }
public int Quantity { get; init; }
public decimal Price { get; init; }
}
コンシューマーの実装(在庫サービス)
// InventoryService/Consumers/OrderCreatedConsumer.cs
using MassTransit;
public class OrderCreatedConsumer : IConsumer<OrderCreated>
{
private readonly IInventoryRepository _inventoryRepository;
private readonly IPublishEndpoint _publishEndpoint;
private readonly ILogger<OrderCreatedConsumer> _logger;
public OrderCreatedConsumer(
IInventoryRepository inventoryRepository,
IPublishEndpoint publishEndpoint,
ILogger<OrderCreatedConsumer> logger)
{
_inventoryRepository = inventoryRepository;
_publishEndpoint = publishEndpoint;
_logger = logger;
}
public async Task Consume(ConsumeContext<OrderCreated> context)
{
var message = context.Message;
_logger.LogInformation("Processing order {OrderId}", message.OrderId);
try
{
// 在庫チェックと予約
var reservationResults = new List<InventoryReservationResult>();
foreach (var item in message.Items)
{
var available = await _inventoryRepository.CheckAvailabilityAsync(
item.ProductId, item.Quantity);
if (available)
{
await _inventoryRepository.ReserveAsync(
item.ProductId, item.Quantity, message.OrderId);
reservationResults.Add(new InventoryReservationResult
{
ProductId = item.ProductId,
Quantity = item.Quantity,
Reserved = true
});
}
else
{
reservationResults.Add(new InventoryReservationResult
{
ProductId = item.ProductId,
Quantity = item.Quantity,
Reserved = false,
Reason = "Insufficient stock"
});
}
}
// 結果に基づいてイベントを発行
if (reservationResults.All(r => r.Reserved))
{
await _publishEndpoint.Publish(new InventoryReserved
{
OrderId = message.OrderId,
Reservations = reservationResults,
ReservedAt = DateTime.UtcNow
});
_logger.LogInformation("Inventory reserved for order {OrderId}",
message.OrderId);
}
else
{
// 部分的な予約をロールバック
foreach (var reservation in reservationResults.Where(r => r.Reserved))
{
await _inventoryRepository.ReleaseReservationAsync(
reservation.ProductId, message.OrderId);
}
await _publishEndpoint.Publish(new InventoryReservationFailed
{
OrderId = message.OrderId,
FailedItems = reservationResults
.Where(r => !r.Reserved)
.ToList(),
FailedAt = DateTime.UtcNow
});
_logger.LogWarning("Inventory reservation failed for order {OrderId}",
message.OrderId);
}
}
catch (Exception ex)
{
_logger.LogError(ex, "Error processing order {OrderId}", message.OrderId);
throw;
}
}
}
// Program.cs での設定
builder.Services.AddMassTransit(x =>
{
x.AddConsumer<OrderCreatedConsumer>();
x.UsingRabbitMq((context, cfg) =>
{
var configuration = context.GetRequiredService<IConfiguration>();
var connectionString = configuration.GetConnectionString("messaging");
cfg.Host(new Uri(connectionString!));
cfg.ReceiveEndpoint("inventory-service", e =>
{
e.ConfigureConsumer<OrderCreatedConsumer>(context);
// リトライポリシー
e.UseMessageRetry(r => r.Intervals(100, 200, 500, 1000));
// エラーハンドリング
e.UseInMemoryOutbox();
});
});
});
Sagaパターンの実装
注文処理のSaga
// OrderSaga/OrderStateMachine.cs
using MassTransit;
public class OrderStateMachine : MassTransitStateMachine<OrderState>
{
public State Submitted { get; private set; }
public State InventoryReserved { get; private set; }
public State PaymentProcessed { get; private set; }
public State Completed { get; private set; }
public State Failed { get; private set; }
public Event<OrderSubmitted> OrderSubmitted { get; private set; }
public Event<InventoryReserved> InventoryReserved { get; private set; }
public Event<InventoryReservationFailed> InventoryReservationFailed { get; private set; }
public Event<PaymentProcessed> PaymentProcessed { get; private set; }
public Event<PaymentFailed> PaymentFailed { get; private set; }
public OrderStateMachine()
{
InstanceState(x => x.CurrentState);
Event(() => OrderSubmitted, x => x.CorrelateById(m => m.Message.OrderId));
Event(() => InventoryReserved, x => x.CorrelateById(m => m.Message.OrderId));
Event(() => InventoryReservationFailed, x => x.CorrelateById(m => m.Message.OrderId));
Event(() => PaymentProcessed, x => x.CorrelateById(m => m.Message.OrderId));
Event(() => PaymentFailed, x => x.CorrelateById(m => m.Message.OrderId));
Initially(
When(OrderSubmitted)
.Then(context =>
{
context.Saga.OrderId = context.Message.OrderId;
context.Saga.CustomerId = context.Message.CustomerId;
context.Saga.TotalAmount = context.Message.TotalAmount;
context.Saga.SubmittedAt = DateTime.UtcNow;
})
.PublishAsync(context => context.Init<OrderCreated>(new
{
context.Saga.OrderId,
context.Saga.CustomerId,
Items = context.Message.Items,
context.Saga.TotalAmount,
CreatedAt = DateTime.UtcNow
}))
.TransitionTo(Submitted)
);
During(Submitted,
When(InventoryReserved)
.Then(context => context.Saga.InventoryReservedAt = DateTime.UtcNow)
.PublishAsync(context => context.Init<ProcessPayment>(new
{
context.Saga.OrderId,
context.Saga.CustomerId,
context.Saga.TotalAmount
}))
.TransitionTo(InventoryReserved),
When(InventoryReservationFailed)
.Then(context => context.Saga.FailedAt = DateTime.UtcNow)
.PublishAsync(context => context.Init<OrderFailed>(new
{
context.Saga.OrderId,
Reason = "Inventory reservation failed",
FailedAt = DateTime.UtcNow
}))
.TransitionTo(Failed)
);
During(InventoryReserved,
When(PaymentProcessed)
.Then(context => context.Saga.PaymentProcessedAt = DateTime.UtcNow)
.PublishAsync(context => context.Init<OrderCompleted>(new
{
context.Saga.OrderId,
CompletedAt = DateTime.UtcNow
}))
.TransitionTo(Completed),
When(PaymentFailed)
.Then(context => context.Saga.FailedAt = DateTime.UtcNow)
.PublishAsync(context => context.Init<ReleaseInventory>(new
{
context.Saga.OrderId
}))
.PublishAsync(context => context.Init<OrderFailed>(new
{
context.Saga.OrderId,
Reason = "Payment failed",
FailedAt = DateTime.UtcNow
}))
.TransitionTo(Failed)
);
SetCompletedWhenFinalized();
}
}
public class OrderState : SagaStateMachineInstance
{
public Guid CorrelationId { get; set; }
public string CurrentState { get; set; }
public Guid OrderId { get; set; }
public Guid CustomerId { get; set; }
public decimal TotalAmount { get; set; }
public DateTime SubmittedAt { get; set; }
public DateTime? InventoryReservedAt { get; set; }
public DateTime? PaymentProcessedAt { get; set; }
public DateTime? FailedAt { get; set; }
}
Apache Kafkaの統合
イベントストリーミング
// AppHost/Program.cs
var builder = DistributedApplication.CreateBuilder(args);
// Kafkaクラスタの追加
var kafka = builder.AddKafka("kafka")
.WithDataVolume("kafka-data");
// イベントストアサービス
var eventStore = builder.AddProject<Projects.EventStore>("event-store")
.WithReference(kafka);
// アナリティクスサービス
var analytics = builder.AddProject<Projects.Analytics>("analytics")
.WithReference(kafka);
builder.Build().Run();
プロデューサーの実装
// EventStore/Services/EventProducer.cs
using Confluent.Kafka;
public interface IEventProducer
{
Task ProduceAsync<T>(string topic, string key, T value) where T : class;
}
public class KafkaEventProducer : IEventProducer
{
private readonly IProducer<string, string> _producer;
private readonly ILogger<KafkaEventProducer> _logger;
private readonly JsonSerializerOptions _jsonOptions;
public KafkaEventProducer(
IConfiguration configuration,
ILogger<KafkaEventProducer> logger)
{
var config = new ProducerConfig
{
BootstrapServers = configuration.GetConnectionString("kafka"),
ClientId = "event-store",
Acks = Acks.All,
EnableIdempotence = true,
MaxInFlight = 5,
CompressionType = CompressionType.Snappy,
LingerMs = 20,
BatchSize = 32768
};
_producer = new ProducerBuilder<string, string>(config)
.SetErrorHandler((_, e) => logger.LogError("Kafka error: {Reason}", e.Reason))
.Build();
_logger = logger;
_jsonOptions = new JsonSerializerOptions
{
PropertyNamingPolicy = JsonNamingPolicy.CamelCase
};
}
public async Task ProduceAsync<T>(string topic, string key, T value) where T : class
{
try
{
var message = new Message<string, string>
{
Key = key,
Value = JsonSerializer.Serialize(value, _jsonOptions),
Headers = new Headers
{
{ "event-type", Encoding.UTF8.GetBytes(typeof(T).Name) },
{ "timestamp", Encoding.UTF8.GetBytes(DateTime.UtcNow.ToString("O")) }
}
};
var result = await _producer.ProduceAsync(topic, message);
_logger.LogDebug("Produced message to {Topic} at offset {Offset}",
result.Topic, result.Offset);
}
catch (ProduceException<string, string> ex)
{
_logger.LogError(ex, "Failed to produce message to {Topic}", topic);
throw;
}
}
public void Dispose()
{
_producer?.Flush();
_producer?.Dispose();
}
}
// Program.cs
builder.Services.AddSingleton<IEventProducer, KafkaEventProducer>();
// 使用例
app.MapPost("/api/events", async (
DomainEvent domainEvent,
IEventProducer eventProducer) =>
{
await eventProducer.ProduceAsync(
"domain-events",
domainEvent.AggregateId.ToString(),
domainEvent);
return Results.Accepted();
});
コンシューマーの実装
// Analytics/Services/EventConsumer.cs
public class KafkaEventConsumer : BackgroundService
{
private readonly IConsumer<string, string> _consumer;
private readonly IServiceProvider _serviceProvider;
private readonly ILogger<KafkaEventConsumer> _logger;
public KafkaEventConsumer(
IConfiguration configuration,
IServiceProvider serviceProvider,
ILogger<KafkaEventConsumer> logger)
{
var config = new ConsumerConfig
{
BootstrapServers = configuration.GetConnectionString("kafka"),
GroupId = "analytics-service",
ClientId = "analytics-consumer",
AutoOffsetReset = AutoOffsetReset.Earliest,
EnableAutoCommit = false,
EnablePartitionEof = true,
StatisticsIntervalMs = 5000,
SessionTimeoutMs = 6000,
MaxPollIntervalMs = 300000
};
_consumer = new ConsumerBuilder<string, string>(config)
.SetErrorHandler((_, e) => logger.LogError("Kafka error: {Reason}", e.Reason))
.SetStatisticsHandler((_, json) => logger.LogDebug("Statistics: {Json}", json))
.SetPartitionsAssignedHandler((c, partitions) =>
{
logger.LogInformation("Partitions assigned: {Partitions}",
string.Join(", ", partitions));
})
.Build();
_serviceProvider = serviceProvider;
_logger = logger;
}
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
_consumer.Subscribe(new[] { "domain-events", "user-events", "system-events" });
await Task.Run(async () =>
{
while (!stoppingToken.IsCancellationRequested)
{
try
{
var consumeResult = _consumer.Consume(stoppingToken);
if (consumeResult.IsPartitionEOF)
{
_logger.LogDebug("Reached end of partition {Partition}",
consumeResult.Partition);
continue;
}
await ProcessMessageAsync(consumeResult);
_consumer.Commit(consumeResult);
}
catch (ConsumeException ex)
{
_logger.LogError(ex, "Consume error: {Reason}", ex.Error.Reason);
}
catch (Exception ex)
{
_logger.LogError(ex, "Unexpected error in consumer");
}
}
}, stoppingToken);
}
private async Task ProcessMessageAsync(ConsumeResult<string, string> consumeResult)
{
using var scope = _serviceProvider.CreateScope();
var eventType = Encoding.UTF8.GetString(
consumeResult.Headers.GetLastBytes("event-type"));
_logger.LogInformation("Processing {EventType} from {Topic}",
eventType, consumeResult.Topic);
// イベントタイプに基づいた処理
switch (eventType)
{
case "OrderCreated":
var orderHandler = scope.ServiceProvider
.GetRequiredService<IEventHandler<OrderCreated>>();
var orderEvent = JsonSerializer.Deserialize<OrderCreated>(
consumeResult.Message.Value);
await orderHandler.HandleAsync(orderEvent!);
break;
case "UserRegistered":
var userHandler = scope.ServiceProvider
.GetRequiredService<IEventHandler<UserRegistered>>();
var userEvent = JsonSerializer.Deserialize<UserRegistered>(
consumeResult.Message.Value);
await userHandler.HandleAsync(userEvent!);
break;
default:
_logger.LogWarning("Unknown event type: {EventType}", eventType);
break;
}
}
public override void Dispose()
{
_consumer?.Close();
_consumer?.Dispose();
base.Dispose();
}
}
イベントソーシングの実装
イベントストア
public interface IEventStore
{
Task AppendAsync(string streamName, IEnumerable<IEvent> events);
Task<IEnumerable<IEvent>> ReadStreamAsync(string streamName, int fromVersion = 0);
}
public class EventStore : IEventStore
{
private readonly IEventProducer _eventProducer;
private readonly IEventRepository _eventRepository;
private readonly ILogger<EventStore> _logger;
public EventStore(
IEventProducer eventProducer,
IEventRepository eventRepository,
ILogger<EventStore> logger)
{
_eventProducer = eventProducer;
_eventRepository = eventRepository;
_logger = logger;
}
public async Task AppendAsync(string streamName, IEnumerable<IEvent> events)
{
var eventList = events.ToList();
// データベースに保存
await _eventRepository.SaveEventsAsync(streamName, eventList);
// Kafkaにパブリッシュ
foreach (var @event in eventList)
{
await _eventProducer.ProduceAsync(
"event-store",
streamName,
@event);
}
_logger.LogInformation("Appended {Count} events to stream {StreamName}",
eventList.Count, streamName);
}
public async Task<IEnumerable<IEvent>> ReadStreamAsync(
string streamName, int fromVersion = 0)
{
return await _eventRepository.GetEventsAsync(streamName, fromVersion);
}
}
// アグリゲートの例
public abstract class AggregateRoot
{
private readonly List<IEvent> _changes = new();
public Guid Id { get; protected set; }
public int Version { get; private set; }
public IEnumerable<IEvent> GetUncommittedChanges() => _changes;
public void MarkChangesAsCommitted()
{
_changes.Clear();
}
public void LoadFromHistory(IEnumerable<IEvent> history)
{
foreach (var @event in history)
{
ApplyChange(@event, false);
}
}
protected void ApplyChange(IEvent @event)
{
ApplyChange(@event, true);
}
private void ApplyChange(IEvent @event, bool isNew)
{
var method = GetType().GetMethod("Apply",
BindingFlags.NonPublic | BindingFlags.Instance,
null,
new[] { @event.GetType() },
null);
if (method == null)
{
throw new InvalidOperationException(
$"Apply method not found for {@event.GetType().Name}");
}
method.Invoke(this, new object[] { @event });
if (isNew)
{
_changes.Add(@event);
}
Version++;
}
}
// 注文アグリゲート
public class Order : AggregateRoot
{
private readonly List<OrderItem> _items = new();
public string CustomerId { get; private set; }
public OrderStatus Status { get; private set; }
public decimal TotalAmount { get; private set; }
public DateTime CreatedAt { get; private set; }
public IReadOnlyList<OrderItem> Items => _items.AsReadOnly();
private Order() { } // For reconstitution
public Order(Guid orderId, string customerId, List<OrderItem> items)
{
ApplyChange(new OrderCreatedEvent
{
OrderId = orderId,
CustomerId = customerId,
Items = items,
TotalAmount = items.Sum(i => i.Quantity * i.UnitPrice),
CreatedAt = DateTime.UtcNow
});
}
public void Cancel(string reason)
{
if (Status != OrderStatus.Pending)
{
throw new InvalidOperationException("Only pending orders can be cancelled");
}
ApplyChange(new OrderCancelledEvent
{
OrderId = Id,
Reason = reason,
CancelledAt = DateTime.UtcNow
});
}
private void Apply(OrderCreatedEvent e)
{
Id = e.OrderId;
CustomerId = e.CustomerId;
_items.AddRange(e.Items);
TotalAmount = e.TotalAmount;
Status = OrderStatus.Pending;
CreatedAt = e.CreatedAt;
}
private void Apply(OrderCancelledEvent e)
{
Status = OrderStatus.Cancelled;
}
}
監視とトラブルシューティング
メトリクスの収集
public class MessagingMetrics
{
private readonly Counter<long> _messagesProduced;
private readonly Counter<long> _messagesConsumed;
private readonly Histogram<double> _messageProcessingDuration;
private readonly Counter<long> _messageErrors;
public MessagingMetrics(IMeterFactory meterFactory)
{
var meter = meterFactory.Create("Messaging");
_messagesProduced = meter.CreateCounter<long>(
"messages_produced",
description: "Total number of messages produced");
_messagesConsumed = meter.CreateCounter<long>(
"messages_consumed",
description: "Total number of messages consumed");
_messageProcessingDuration = meter.CreateHistogram<double>(
"message_processing_duration",
unit: "ms",
description: "Duration of message processing");
_messageErrors = meter.CreateCounter<long>(
"message_errors",
description: "Total number of message processing errors");
}
public void RecordMessageProduced(string topic) =>
_messagesProduced.Add(1, new KeyValuePair<string, object?>("topic", topic));
public void RecordMessageConsumed(string topic) =>
_messagesConsumed.Add(1, new KeyValuePair<string, object?>("topic", topic));
public void RecordProcessingDuration(string messageType, double durationMs) =>
_messageProcessingDuration.Record(
durationMs,
new KeyValuePair<string, object?>("message_type", messageType));
public void RecordError(string messageType, string errorType) =>
_messageErrors.Add(1,
new KeyValuePair<string, object?>("message_type", messageType),
new KeyValuePair<string, object?>("error_type", errorType));
}
まとめ
今回は、.NET Aspireでのメッセージングとイベント駆動アーキテクチャについて学びました。重要なポイント:
- 簡単な統合: RabbitMQ、Kafka、Azure Service Busをシンプルに追加
- Sagaパターン: 分散トランザクションの実装
- イベントソーシング: イベントストアとCQRSの実装
- 非同期通信: サービス間の疎結合化
- 監視: メトリクスとトレーシングの統合
次回は、観測可能性とモニタリングについて詳しく解説します。
次回予告:「第4回:観測可能性とモニタリング」では、OpenTelemetry、Prometheus、Grafanaを使った包括的な監視システムの構築方法を解説します。