【.NET Aspire入門】第3回:メッセージングとイベント駆動アーキテクチャ

はじめに

マイクロサービスアーキテクチャでは、サービス間の疎結合な通信が重要です。前回はデータベースとキャッシングの統合について学びました。今回は、メッセージングシステムを使った非同期通信とイベント駆動アーキテクチャの実装方法を解説します。

.NET Aspireは、RabbitMQ、Azure Service Bus、Apache Kafkaなどの主要なメッセージングプラットフォームをシームレスに統合し、開発者が複雑な設定に悩むことなく、ビジネスロジックの実装に集中できる環境を提供します。

メッセージングの基礎概念

同期通信 vs 非同期通信

graph LR A[Order Service] -->|同期HTTP| B[Inventory Service] A -->|同期HTTP| C[Payment Service] D[Order Service] -->|非同期| E[Message Queue] E -->|非同期| F[Inventory Service] E -->|非同期| G[Payment Service]

非同期通信の利点:

  • 疎結合: サービス間の依存関係を最小化
  • スケーラビリティ: 負荷に応じた柔軟なスケーリング
  • 耐障害性: 一時的な障害に対する回復力
  • パフォーマンス: 非ブロッキング処理による応答性向上

RabbitMQの統合

基本的なセットアップ

// AppHost/Program.cs
var builder = DistributedApplication.CreateBuilder(args);

// RabbitMQの追加
var messaging = builder.AddRabbitMQ("messaging")
    .WithDataVolume("rabbitmq-data")
    .WithManagementPlugin(); // 管理UIの有効化

// サービスの追加
var orderService = builder.AddProject<Projects.OrderService>("order-service")
    .WithReference(messaging);

var inventoryService = builder.AddProject<Projects.InventoryService>("inventory-service")
    .WithReference(messaging);

var notificationService = builder.AddProject<Projects.NotificationService>("notification-service")
    .WithReference(messaging);

builder.Build().Run();

プロデューサーの実装(注文サービス)

// OrderService/Program.cs
using MassTransit;

var builder = WebApplication.CreateBuilder(args);

builder.AddServiceDefaults();

// MassTransitとRabbitMQの設定
builder.AddRabbitMQClient("messaging");

builder.Services.AddMassTransit(x =>
{
    x.SetKebabCaseEndpointNameFormatter();

    x.UsingRabbitMq((context, cfg) =>
    {
        var configuration = context.GetRequiredService<IConfiguration>();
        var connectionString = configuration.GetConnectionString("messaging");
        cfg.Host(new Uri(connectionString!));
        
        cfg.ConfigureEndpoints(context);
    });
});

var app = builder.Build();

// 注文作成API
app.MapPost("/api/orders", async (
    CreateOrderRequest request,
    IPublishEndpoint publishEndpoint,
    ILogger<Program> logger) =>
{
    var orderId = Guid.NewGuid();
    
    // 注文作成イベントの発行
    var orderCreatedEvent = new OrderCreated
    {
        OrderId = orderId,
        CustomerId = request.CustomerId,
        Items = request.Items.Select(i => new OrderItem
        {
            ProductId = i.ProductId,
            Quantity = i.Quantity,
            Price = i.Price
        }).ToList(),
        TotalAmount = request.Items.Sum(i => i.Quantity * i.Price),
        CreatedAt = DateTime.UtcNow
    };

    await publishEndpoint.Publish(orderCreatedEvent);
    
    logger.LogInformation("Order created: {OrderId}", orderId);
    
    return Results.Created($"/api/orders/{orderId}", new { orderId });
});

app.Run();

// リクエスト/レスポンスモデル
public record CreateOrderRequest(
    Guid CustomerId,
    List<OrderItemRequest> Items
);

public record OrderItemRequest(
    Guid ProductId,
    int Quantity,
    decimal Price
);

// イベント定義(共有ライブラリに配置するのが理想)
public record OrderCreated
{
    public Guid OrderId { get; init; }
    public Guid CustomerId { get; init; }
    public List<OrderItem> Items { get; init; } = new();
    public decimal TotalAmount { get; init; }
    public DateTime CreatedAt { get; init; }
}

public record OrderItem
{
    public Guid ProductId { get; init; }
    public int Quantity { get; init; }
    public decimal Price { get; init; }
}

コンシューマーの実装(在庫サービス)

// InventoryService/Consumers/OrderCreatedConsumer.cs
using MassTransit;

public class OrderCreatedConsumer : IConsumer<OrderCreated>
{
    private readonly IInventoryRepository _inventoryRepository;
    private readonly IPublishEndpoint _publishEndpoint;
    private readonly ILogger<OrderCreatedConsumer> _logger;

    public OrderCreatedConsumer(
        IInventoryRepository inventoryRepository,
        IPublishEndpoint publishEndpoint,
        ILogger<OrderCreatedConsumer> logger)
    {
        _inventoryRepository = inventoryRepository;
        _publishEndpoint = publishEndpoint;
        _logger = logger;
    }

    public async Task Consume(ConsumeContext<OrderCreated> context)
    {
        var message = context.Message;
        _logger.LogInformation("Processing order {OrderId}", message.OrderId);

        try
        {
            // 在庫チェックと予約
            var reservationResults = new List<InventoryReservationResult>();
            
            foreach (var item in message.Items)
            {
                var available = await _inventoryRepository.CheckAvailabilityAsync(
                    item.ProductId, item.Quantity);
                
                if (available)
                {
                    await _inventoryRepository.ReserveAsync(
                        item.ProductId, item.Quantity, message.OrderId);
                    
                    reservationResults.Add(new InventoryReservationResult
                    {
                        ProductId = item.ProductId,
                        Quantity = item.Quantity,
                        Reserved = true
                    });
                }
                else
                {
                    reservationResults.Add(new InventoryReservationResult
                    {
                        ProductId = item.ProductId,
                        Quantity = item.Quantity,
                        Reserved = false,
                        Reason = "Insufficient stock"
                    });
                }
            }

            // 結果に基づいてイベントを発行
            if (reservationResults.All(r => r.Reserved))
            {
                await _publishEndpoint.Publish(new InventoryReserved
                {
                    OrderId = message.OrderId,
                    Reservations = reservationResults,
                    ReservedAt = DateTime.UtcNow
                });
                
                _logger.LogInformation("Inventory reserved for order {OrderId}", 
                    message.OrderId);
            }
            else
            {
                // 部分的な予約をロールバック
                foreach (var reservation in reservationResults.Where(r => r.Reserved))
                {
                    await _inventoryRepository.ReleaseReservationAsync(
                        reservation.ProductId, message.OrderId);
                }

                await _publishEndpoint.Publish(new InventoryReservationFailed
                {
                    OrderId = message.OrderId,
                    FailedItems = reservationResults
                        .Where(r => !r.Reserved)
                        .ToList(),
                    FailedAt = DateTime.UtcNow
                });
                
                _logger.LogWarning("Inventory reservation failed for order {OrderId}", 
                    message.OrderId);
            }
        }
        catch (Exception ex)
        {
            _logger.LogError(ex, "Error processing order {OrderId}", message.OrderId);
            throw;
        }
    }
}

// Program.cs での設定
builder.Services.AddMassTransit(x =>
{
    x.AddConsumer<OrderCreatedConsumer>();
    
    x.UsingRabbitMq((context, cfg) =>
    {
        var configuration = context.GetRequiredService<IConfiguration>();
        var connectionString = configuration.GetConnectionString("messaging");
        cfg.Host(new Uri(connectionString!));
        
        cfg.ReceiveEndpoint("inventory-service", e =>
        {
            e.ConfigureConsumer<OrderCreatedConsumer>(context);
            
            // リトライポリシー
            e.UseMessageRetry(r => r.Intervals(100, 200, 500, 1000));
            
            // エラーハンドリング
            e.UseInMemoryOutbox();
        });
    });
});

Sagaパターンの実装

注文処理のSaga

// OrderSaga/OrderStateMachine.cs
using MassTransit;

public class OrderStateMachine : MassTransitStateMachine<OrderState>
{
    public State Submitted { get; private set; }
    public State InventoryReserved { get; private set; }
    public State PaymentProcessed { get; private set; }
    public State Completed { get; private set; }
    public State Failed { get; private set; }

    public Event<OrderSubmitted> OrderSubmitted { get; private set; }
    public Event<InventoryReserved> InventoryReserved { get; private set; }
    public Event<InventoryReservationFailed> InventoryReservationFailed { get; private set; }
    public Event<PaymentProcessed> PaymentProcessed { get; private set; }
    public Event<PaymentFailed> PaymentFailed { get; private set; }

    public OrderStateMachine()
    {
        InstanceState(x => x.CurrentState);

        Event(() => OrderSubmitted, x => x.CorrelateById(m => m.Message.OrderId));
        Event(() => InventoryReserved, x => x.CorrelateById(m => m.Message.OrderId));
        Event(() => InventoryReservationFailed, x => x.CorrelateById(m => m.Message.OrderId));
        Event(() => PaymentProcessed, x => x.CorrelateById(m => m.Message.OrderId));
        Event(() => PaymentFailed, x => x.CorrelateById(m => m.Message.OrderId));

        Initially(
            When(OrderSubmitted)
                .Then(context =>
                {
                    context.Saga.OrderId = context.Message.OrderId;
                    context.Saga.CustomerId = context.Message.CustomerId;
                    context.Saga.TotalAmount = context.Message.TotalAmount;
                    context.Saga.SubmittedAt = DateTime.UtcNow;
                })
                .PublishAsync(context => context.Init<OrderCreated>(new
                {
                    context.Saga.OrderId,
                    context.Saga.CustomerId,
                    Items = context.Message.Items,
                    context.Saga.TotalAmount,
                    CreatedAt = DateTime.UtcNow
                }))
                .TransitionTo(Submitted)
        );

        During(Submitted,
            When(InventoryReserved)
                .Then(context => context.Saga.InventoryReservedAt = DateTime.UtcNow)
                .PublishAsync(context => context.Init<ProcessPayment>(new
                {
                    context.Saga.OrderId,
                    context.Saga.CustomerId,
                    context.Saga.TotalAmount
                }))
                .TransitionTo(InventoryReserved),
            
            When(InventoryReservationFailed)
                .Then(context => context.Saga.FailedAt = DateTime.UtcNow)
                .PublishAsync(context => context.Init<OrderFailed>(new
                {
                    context.Saga.OrderId,
                    Reason = "Inventory reservation failed",
                    FailedAt = DateTime.UtcNow
                }))
                .TransitionTo(Failed)
        );

        During(InventoryReserved,
            When(PaymentProcessed)
                .Then(context => context.Saga.PaymentProcessedAt = DateTime.UtcNow)
                .PublishAsync(context => context.Init<OrderCompleted>(new
                {
                    context.Saga.OrderId,
                    CompletedAt = DateTime.UtcNow
                }))
                .TransitionTo(Completed),
            
            When(PaymentFailed)
                .Then(context => context.Saga.FailedAt = DateTime.UtcNow)
                .PublishAsync(context => context.Init<ReleaseInventory>(new
                {
                    context.Saga.OrderId
                }))
                .PublishAsync(context => context.Init<OrderFailed>(new
                {
                    context.Saga.OrderId,
                    Reason = "Payment failed",
                    FailedAt = DateTime.UtcNow
                }))
                .TransitionTo(Failed)
        );

        SetCompletedWhenFinalized();
    }
}

public class OrderState : SagaStateMachineInstance
{
    public Guid CorrelationId { get; set; }
    public string CurrentState { get; set; }
    public Guid OrderId { get; set; }
    public Guid CustomerId { get; set; }
    public decimal TotalAmount { get; set; }
    public DateTime SubmittedAt { get; set; }
    public DateTime? InventoryReservedAt { get; set; }
    public DateTime? PaymentProcessedAt { get; set; }
    public DateTime? FailedAt { get; set; }
}

Apache Kafkaの統合

イベントストリーミング

// AppHost/Program.cs
var builder = DistributedApplication.CreateBuilder(args);

// Kafkaクラスタの追加
var kafka = builder.AddKafka("kafka")
    .WithDataVolume("kafka-data");

// イベントストアサービス
var eventStore = builder.AddProject<Projects.EventStore>("event-store")
    .WithReference(kafka);

// アナリティクスサービス
var analytics = builder.AddProject<Projects.Analytics>("analytics")
    .WithReference(kafka);

builder.Build().Run();

プロデューサーの実装

// EventStore/Services/EventProducer.cs
using Confluent.Kafka;

public interface IEventProducer
{
    Task ProduceAsync<T>(string topic, string key, T value) where T : class;
}

public class KafkaEventProducer : IEventProducer
{
    private readonly IProducer<string, string> _producer;
    private readonly ILogger<KafkaEventProducer> _logger;
    private readonly JsonSerializerOptions _jsonOptions;

    public KafkaEventProducer(
        IConfiguration configuration,
        ILogger<KafkaEventProducer> logger)
    {
        var config = new ProducerConfig
        {
            BootstrapServers = configuration.GetConnectionString("kafka"),
            ClientId = "event-store",
            Acks = Acks.All,
            EnableIdempotence = true,
            MaxInFlight = 5,
            CompressionType = CompressionType.Snappy,
            LingerMs = 20,
            BatchSize = 32768
        };

        _producer = new ProducerBuilder<string, string>(config)
            .SetErrorHandler((_, e) => logger.LogError("Kafka error: {Reason}", e.Reason))
            .Build();

        _logger = logger;
        _jsonOptions = new JsonSerializerOptions
        {
            PropertyNamingPolicy = JsonNamingPolicy.CamelCase
        };
    }

    public async Task ProduceAsync<T>(string topic, string key, T value) where T : class
    {
        try
        {
            var message = new Message<string, string>
            {
                Key = key,
                Value = JsonSerializer.Serialize(value, _jsonOptions),
                Headers = new Headers
                {
                    { "event-type", Encoding.UTF8.GetBytes(typeof(T).Name) },
                    { "timestamp", Encoding.UTF8.GetBytes(DateTime.UtcNow.ToString("O")) }
                }
            };

            var result = await _producer.ProduceAsync(topic, message);
            
            _logger.LogDebug("Produced message to {Topic} at offset {Offset}", 
                result.Topic, result.Offset);
        }
        catch (ProduceException<string, string> ex)
        {
            _logger.LogError(ex, "Failed to produce message to {Topic}", topic);
            throw;
        }
    }

    public void Dispose()
    {
        _producer?.Flush();
        _producer?.Dispose();
    }
}

// Program.cs
builder.Services.AddSingleton<IEventProducer, KafkaEventProducer>();

// 使用例
app.MapPost("/api/events", async (
    DomainEvent domainEvent,
    IEventProducer eventProducer) =>
{
    await eventProducer.ProduceAsync(
        "domain-events",
        domainEvent.AggregateId.ToString(),
        domainEvent);
    
    return Results.Accepted();
});

コンシューマーの実装

// Analytics/Services/EventConsumer.cs
public class KafkaEventConsumer : BackgroundService
{
    private readonly IConsumer<string, string> _consumer;
    private readonly IServiceProvider _serviceProvider;
    private readonly ILogger<KafkaEventConsumer> _logger;

    public KafkaEventConsumer(
        IConfiguration configuration,
        IServiceProvider serviceProvider,
        ILogger<KafkaEventConsumer> logger)
    {
        var config = new ConsumerConfig
        {
            BootstrapServers = configuration.GetConnectionString("kafka"),
            GroupId = "analytics-service",
            ClientId = "analytics-consumer",
            AutoOffsetReset = AutoOffsetReset.Earliest,
            EnableAutoCommit = false,
            EnablePartitionEof = true,
            StatisticsIntervalMs = 5000,
            SessionTimeoutMs = 6000,
            MaxPollIntervalMs = 300000
        };

        _consumer = new ConsumerBuilder<string, string>(config)
            .SetErrorHandler((_, e) => logger.LogError("Kafka error: {Reason}", e.Reason))
            .SetStatisticsHandler((_, json) => logger.LogDebug("Statistics: {Json}", json))
            .SetPartitionsAssignedHandler((c, partitions) =>
            {
                logger.LogInformation("Partitions assigned: {Partitions}", 
                    string.Join(", ", partitions));
            })
            .Build();

        _serviceProvider = serviceProvider;
        _logger = logger;
    }

    protected override async Task ExecuteAsync(CancellationToken stoppingToken)
    {
        _consumer.Subscribe(new[] { "domain-events", "user-events", "system-events" });

        await Task.Run(async () =>
        {
            while (!stoppingToken.IsCancellationRequested)
            {
                try
                {
                    var consumeResult = _consumer.Consume(stoppingToken);
                    
                    if (consumeResult.IsPartitionEOF)
                    {
                        _logger.LogDebug("Reached end of partition {Partition}", 
                            consumeResult.Partition);
                        continue;
                    }

                    await ProcessMessageAsync(consumeResult);
                    
                    _consumer.Commit(consumeResult);
                }
                catch (ConsumeException ex)
                {
                    _logger.LogError(ex, "Consume error: {Reason}", ex.Error.Reason);
                }
                catch (Exception ex)
                {
                    _logger.LogError(ex, "Unexpected error in consumer");
                }
            }
        }, stoppingToken);
    }

    private async Task ProcessMessageAsync(ConsumeResult<string, string> consumeResult)
    {
        using var scope = _serviceProvider.CreateScope();
        
        var eventType = Encoding.UTF8.GetString(
            consumeResult.Headers.GetLastBytes("event-type"));
        
        _logger.LogInformation("Processing {EventType} from {Topic}", 
            eventType, consumeResult.Topic);

        // イベントタイプに基づいた処理
        switch (eventType)
        {
            case "OrderCreated":
                var orderHandler = scope.ServiceProvider
                    .GetRequiredService<IEventHandler<OrderCreated>>();
                var orderEvent = JsonSerializer.Deserialize<OrderCreated>(
                    consumeResult.Message.Value);
                await orderHandler.HandleAsync(orderEvent!);
                break;
                
            case "UserRegistered":
                var userHandler = scope.ServiceProvider
                    .GetRequiredService<IEventHandler<UserRegistered>>();
                var userEvent = JsonSerializer.Deserialize<UserRegistered>(
                    consumeResult.Message.Value);
                await userHandler.HandleAsync(userEvent!);
                break;
                
            default:
                _logger.LogWarning("Unknown event type: {EventType}", eventType);
                break;
        }
    }

    public override void Dispose()
    {
        _consumer?.Close();
        _consumer?.Dispose();
        base.Dispose();
    }
}

イベントソーシングの実装

イベントストア

public interface IEventStore
{
    Task AppendAsync(string streamName, IEnumerable<IEvent> events);
    Task<IEnumerable<IEvent>> ReadStreamAsync(string streamName, int fromVersion = 0);
}

public class EventStore : IEventStore
{
    private readonly IEventProducer _eventProducer;
    private readonly IEventRepository _eventRepository;
    private readonly ILogger<EventStore> _logger;

    public EventStore(
        IEventProducer eventProducer,
        IEventRepository eventRepository,
        ILogger<EventStore> logger)
    {
        _eventProducer = eventProducer;
        _eventRepository = eventRepository;
        _logger = logger;
    }

    public async Task AppendAsync(string streamName, IEnumerable<IEvent> events)
    {
        var eventList = events.ToList();
        
        // データベースに保存
        await _eventRepository.SaveEventsAsync(streamName, eventList);
        
        // Kafkaにパブリッシュ
        foreach (var @event in eventList)
        {
            await _eventProducer.ProduceAsync(
                "event-store",
                streamName,
                @event);
        }
        
        _logger.LogInformation("Appended {Count} events to stream {StreamName}", 
            eventList.Count, streamName);
    }

    public async Task<IEnumerable<IEvent>> ReadStreamAsync(
        string streamName, int fromVersion = 0)
    {
        return await _eventRepository.GetEventsAsync(streamName, fromVersion);
    }
}

// アグリゲートの例
public abstract class AggregateRoot
{
    private readonly List<IEvent> _changes = new();
    
    public Guid Id { get; protected set; }
    public int Version { get; private set; }
    
    public IEnumerable<IEvent> GetUncommittedChanges() => _changes;
    
    public void MarkChangesAsCommitted()
    {
        _changes.Clear();
    }
    
    public void LoadFromHistory(IEnumerable<IEvent> history)
    {
        foreach (var @event in history)
        {
            ApplyChange(@event, false);
        }
    }
    
    protected void ApplyChange(IEvent @event)
    {
        ApplyChange(@event, true);
    }
    
    private void ApplyChange(IEvent @event, bool isNew)
    {
        var method = GetType().GetMethod("Apply", 
            BindingFlags.NonPublic | BindingFlags.Instance,
            null,
            new[] { @event.GetType() },
            null);
            
        if (method == null)
        {
            throw new InvalidOperationException(
                $"Apply method not found for {@event.GetType().Name}");
        }
        
        method.Invoke(this, new object[] { @event });
        
        if (isNew)
        {
            _changes.Add(@event);
        }
        
        Version++;
    }
}

// 注文アグリゲート
public class Order : AggregateRoot
{
    private readonly List<OrderItem> _items = new();
    
    public string CustomerId { get; private set; }
    public OrderStatus Status { get; private set; }
    public decimal TotalAmount { get; private set; }
    public DateTime CreatedAt { get; private set; }
    
    public IReadOnlyList<OrderItem> Items => _items.AsReadOnly();
    
    private Order() { } // For reconstitution
    
    public Order(Guid orderId, string customerId, List<OrderItem> items)
    {
        ApplyChange(new OrderCreatedEvent
        {
            OrderId = orderId,
            CustomerId = customerId,
            Items = items,
            TotalAmount = items.Sum(i => i.Quantity * i.UnitPrice),
            CreatedAt = DateTime.UtcNow
        });
    }
    
    public void Cancel(string reason)
    {
        if (Status != OrderStatus.Pending)
        {
            throw new InvalidOperationException("Only pending orders can be cancelled");
        }
        
        ApplyChange(new OrderCancelledEvent
        {
            OrderId = Id,
            Reason = reason,
            CancelledAt = DateTime.UtcNow
        });
    }
    
    private void Apply(OrderCreatedEvent e)
    {
        Id = e.OrderId;
        CustomerId = e.CustomerId;
        _items.AddRange(e.Items);
        TotalAmount = e.TotalAmount;
        Status = OrderStatus.Pending;
        CreatedAt = e.CreatedAt;
    }
    
    private void Apply(OrderCancelledEvent e)
    {
        Status = OrderStatus.Cancelled;
    }
}

監視とトラブルシューティング

メトリクスの収集

public class MessagingMetrics
{
    private readonly Counter<long> _messagesProduced;
    private readonly Counter<long> _messagesConsumed;
    private readonly Histogram<double> _messageProcessingDuration;
    private readonly Counter<long> _messageErrors;

    public MessagingMetrics(IMeterFactory meterFactory)
    {
        var meter = meterFactory.Create("Messaging");
        
        _messagesProduced = meter.CreateCounter<long>(
            "messages_produced",
            description: "Total number of messages produced");
            
        _messagesConsumed = meter.CreateCounter<long>(
            "messages_consumed", 
            description: "Total number of messages consumed");
            
        _messageProcessingDuration = meter.CreateHistogram<double>(
            "message_processing_duration",
            unit: "ms",
            description: "Duration of message processing");
            
        _messageErrors = meter.CreateCounter<long>(
            "message_errors",
            description: "Total number of message processing errors");
    }

    public void RecordMessageProduced(string topic) => 
        _messagesProduced.Add(1, new KeyValuePair<string, object?>("topic", topic));
        
    public void RecordMessageConsumed(string topic) => 
        _messagesConsumed.Add(1, new KeyValuePair<string, object?>("topic", topic));
        
    public void RecordProcessingDuration(string messageType, double durationMs) => 
        _messageProcessingDuration.Record(
            durationMs, 
            new KeyValuePair<string, object?>("message_type", messageType));
            
    public void RecordError(string messageType, string errorType) => 
        _messageErrors.Add(1, 
            new KeyValuePair<string, object?>("message_type", messageType),
            new KeyValuePair<string, object?>("error_type", errorType));
}

まとめ

今回は、.NET Aspireでのメッセージングとイベント駆動アーキテクチャについて学びました。重要なポイント:

  1. 簡単な統合: RabbitMQ、Kafka、Azure Service Busをシンプルに追加
  2. Sagaパターン: 分散トランザクションの実装
  3. イベントソーシング: イベントストアとCQRSの実装
  4. 非同期通信: サービス間の疎結合化
  5. 監視: メトリクスとトレーシングの統合

次回は、観測可能性とモニタリングについて詳しく解説します。


次回予告:「第4回:観測可能性とモニタリング」では、OpenTelemetry、Prometheus、Grafanaを使った包括的な監視システムの構築方法を解説します。

技術的な課題をお持ちですか専門チームがサポートします

記事でご紹介した技術や実装について、
より詳細なご相談やプロジェクトのサポートを承ります。