【.NET Orleans入門】第3回:Grain間通信とストリーミング - リアルタイムデータ処理の実装

57分で読めます
エンハンスド技術チーム
.NETOrleansストリーミングリアルタイムWebSocketイベント駆動
.NET OrleansのGrain間通信とストリーミング機能を使って、リアルタイムチャット、IoTデータ処理、イベント駆動システムなどを実装する方法を解説します。

はじめに

前回はOrleansのステート管理と永続化について学びました。今回は、Orleansの真価を発揮する「Grain間通信とストリーミング」について解説します。

数百万のユーザーが同時に参加するライブチャット、毎秒数万件のIoTデータをリアルタイム処理するシステム、複雑なワークフローを持つビジネスプロセス...これらを効率的に実装する方法を、実践的なコード例とともに学んでいきます。

Grain間通信の基本

1. 直接通信パターン

// インターフェース定義
public interface IOrderGrain : IGrainWithGuidKey
{
    Task<OrderStatus> PlaceOrderAsync(OrderRequest request);
    Task<OrderInfo> GetOrderInfoAsync();
}

public interface IInventoryGrain : IGrainWithStringKey
{
    Task<bool> ReserveItemsAsync(List<OrderItem> items, Guid orderId);
    Task ReleaseReservationAsync(Guid orderId);
}

public interface IPaymentGrain : IGrainWithStringKey
{
    Task<PaymentResult> ProcessPaymentAsync(decimal amount, PaymentMethod method);
}

// OrderGrainの実装
public class OrderGrain : Grain, IOrderGrain
{
    private readonly IPersistentState<OrderState> _state;
    private readonly IGrainFactory _grainFactory;
    
    public OrderGrain(
        [PersistentState("order", "orderStore")] IPersistentState<OrderState> state,
        IGrainFactory grainFactory)
    {
        _state = state;
        _grainFactory = grainFactory;
    }
    
    public async Task<OrderStatus> PlaceOrderAsync(OrderRequest request)
    {
        // 注文IDを設定
        _state.State.OrderId = this.GetPrimaryKey();
        _state.State.CustomerId = request.CustomerId;
        _state.State.Items = request.Items;
        _state.State.Status = OrderStatus.Pending;
        
        // 在庫の確認と予約
        var inventoryGrain = _grainFactory.GetGrain<IInventoryGrain>("main-inventory");
        var reservationSuccess = await inventoryGrain.ReserveItemsAsync(
            request.Items, 
            _state.State.OrderId);
        
        if (!reservationSuccess)
        {
            _state.State.Status = OrderStatus.Failed;
            _state.State.FailureReason = "在庫不足";
            await _state.WriteStateAsync();
            return OrderStatus.Failed;
        }
        
        // 支払い処理
        var paymentGrain = _grainFactory.GetGrain<IPaymentGrain>(request.CustomerId);
        var totalAmount = request.Items.Sum(i => i.Price * i.Quantity);
        var paymentResult = await paymentGrain.ProcessPaymentAsync(
            totalAmount, 
            request.PaymentMethod);
        
        if (!paymentResult.Success)
        {
            // 在庫予約を解放
            await inventoryGrain.ReleaseReservationAsync(_state.State.OrderId);
            
            _state.State.Status = OrderStatus.Failed;
            _state.State.FailureReason = paymentResult.Error;
            await _state.WriteStateAsync();
            return OrderStatus.Failed;
        }
        
        // 注文確定
        _state.State.Status = OrderStatus.Confirmed;
        _state.State.PaymentId = paymentResult.TransactionId;
        _state.State.ConfirmedAt = DateTime.UtcNow;
        await _state.WriteStateAsync();
        
        // 配送Grainに通知
        var shippingGrain = _grainFactory.GetGrain<IShippingGrain>(_state.State.OrderId);
        await shippingGrain.ScheduleDeliveryAsync(_state.State);
        
        return OrderStatus.Confirmed;
    }
}

2. Observer パターン

// 通知インターフェース
public interface IStockPriceObserver : IGrainObserver
{
    void PriceChanged(string symbol, decimal newPrice, decimal change);
}

public interface IStockGrain : IGrainWithStringKey
{
    Task SubscribeAsync(IStockPriceObserver observer);
    Task UnsubscribeAsync(IStockPriceObserver observer);
    Task UpdatePriceAsync(decimal newPrice);
}

// StockGrainの実装
public class StockGrain : Grain, IStockGrain
{
    private readonly ObserverManager<IStockPriceObserver> _observers;
    private decimal _currentPrice;
    private string _symbol;
    
    public StockGrain()
    {
        _observers = new ObserverManager<IStockPriceObserver>(
            TimeSpan.FromMinutes(5), // タイムアウト
            this.GetLogger());
    }
    
    public override Task OnActivateAsync()
    {
        _symbol = this.GetPrimaryKeyString();
        return base.OnActivateAsync();
    }
    
    public Task SubscribeAsync(IStockPriceObserver observer)
    {
        _observers.Subscribe(observer);
        return Task.CompletedTask;
    }
    
    public Task UnsubscribeAsync(IStockPriceObserver observer)
    {
        _observers.Unsubscribe(observer);
        return Task.CompletedTask;
    }
    
    public async Task UpdatePriceAsync(decimal newPrice)
    {
        var oldPrice = _currentPrice;
        _currentPrice = newPrice;
        var change = newPrice - oldPrice;
        
        // すべてのオブザーバーに通知
        await _observers.Notify(observer => 
            observer.PriceChanged(_symbol, newPrice, change));
    }
}

// クライアント側の実装
public class StockPriceMonitor : IStockPriceObserver
{
    private readonly ILogger<StockPriceMonitor> _logger;
    
    public StockPriceMonitor(ILogger<StockPriceMonitor> logger)
    {
        _logger = logger;
    }
    
    public void PriceChanged(string symbol, decimal newPrice, decimal change)
    {
        _logger.LogInformation(
            $"Stock {symbol} price changed to {newPrice:C} ({change:+0.00;-0.00})");
        
        // UIを更新、アラートを送信など
        if (Math.Abs(change) > 10)
        {
            SendPriceAlert(symbol, newPrice, change);
        }
    }
}

Orleans Streams - 強力なストリーミング機能

ストリームプロバイダーの設定

// Silo設定
siloBuilder
    .AddMemoryGrainStorage("PubSubStore")
    .AddSimpleMessageStreamProvider("SMS")
    .AddSimpleMessageStreamProvider("AzureQueue", options =>
    {
        options.FireAndForgetDelivery = false;
        options.OptimizeForImmutableData = true;
    });

1. チャットシステムの実装

// チャットルームGrain
public interface IChatRoomGrain : IGrainWithStringKey
{
    Task JoinAsync(string userId, string userName);
    Task LeaveAsync(string userId);
    Task SendMessageAsync(string userId, string message);
    Task<List<ChatUser>> GetActiveUsersAsync();
}

[Serializable]
public class ChatMessage
{
    public string MessageId { get; set; }
    public string UserId { get; set; }
    public string UserName { get; set; }
    public string Message { get; set; }
    public DateTime Timestamp { get; set; }
    public MessageType Type { get; set; }
}

public class ChatRoomGrain : Grain, IChatRoomGrain
{
    private readonly Dictionary<string, ChatUser> _activeUsers = new();
    private IAsyncStream<ChatMessage> _stream;
    
    public override Task OnActivateAsync()
    {
        var streamProvider = GetStreamProvider("SMS");
        var roomId = this.GetPrimaryKeyString();
        _stream = streamProvider.GetStream<ChatMessage>(roomId, "chat");
        
        return base.OnActivateAsync();
    }
    
    public async Task JoinAsync(string userId, string userName)
    {
        _activeUsers[userId] = new ChatUser { UserId = userId, UserName = userName };
        
        // 入室通知を配信
        await _stream.OnNextAsync(new ChatMessage
        {
            MessageId = Guid.NewGuid().ToString(),
            UserId = userId,
            UserName = userName,
            Message = $"{userName} が入室しました",
            Timestamp = DateTime.UtcNow,
            Type = MessageType.System
        });
    }
    
    public async Task SendMessageAsync(string userId, string message)
    {
        if (!_activeUsers.ContainsKey(userId))
            throw new InvalidOperationException("ユーザーはこのチャットルームに参加していません");
        
        var chatMessage = new ChatMessage
        {
            MessageId = Guid.NewGuid().ToString(),
            UserId = userId,
            UserName = _activeUsers[userId].UserName,
            Message = message,
            Timestamp = DateTime.UtcNow,
            Type = MessageType.User
        };
        
        // メッセージを配信
        await _stream.OnNextAsync(chatMessage);
        
        // メッセージの永続化(オプション)
        await SaveMessageToHistoryAsync(chatMessage);
    }
}

// クライアント側(SignalR Hub)
public class ChatHub : Hub
{
    private readonly IGrainFactory _grainFactory;
    private StreamSubscriptionHandle<ChatMessage> _subscription;
    
    public ChatHub(IGrainFactory grainFactory)
    {
        _grainFactory = grainFactory;
    }
    
    public async Task JoinRoom(string roomId, string userName)
    {
        var userId = Context.ConnectionId;
        var chatRoom = _grainFactory.GetGrain<IChatRoomGrain>(roomId);
        
        // チャットルームに参加
        await chatRoom.JoinAsync(userId, userName);
        
        // ストリームを購読
        var streamProvider = _grainFactory.GetStreamProvider("SMS");
        var stream = streamProvider.GetStream<ChatMessage>(roomId, "chat");
        
        _subscription = await stream.SubscribeAsync(async (message, token) =>
        {
            // SignalRでクライアントにメッセージを送信
            await Clients.Group(roomId).SendAsync("ReceiveMessage", message);
        });
        
        // SignalRグループに追加
        await Groups.AddToGroupAsync(userId, roomId);
    }
    
    public async Task SendMessage(string roomId, string message)
    {
        var userId = Context.ConnectionId;
        var chatRoom = _grainFactory.GetGrain<IChatRoomGrain>(roomId);
        await chatRoom.SendMessageAsync(userId, message);
    }
}

2. IoTデータストリーミング

// IoTデバイスGrain
public interface IIoTDeviceGrain : IGrainWithStringKey
{
    Task ReportTelemetryAsync(TelemetryData data);
    Task<DeviceStatus> GetStatusAsync();
    Task SetThresholdAsync(string metric, double threshold);
}

[Serializable]
public class TelemetryData
{
    public string DeviceId { get; set; }
    public Dictionary<string, double> Metrics { get; set; }
    public DateTime Timestamp { get; set; }
    public GeoLocation Location { get; set; }
}

public class IoTDeviceGrain : Grain, IIoTDeviceGrain
{
    private readonly IPersistentState<DeviceState> _state;
    private IAsyncStream<TelemetryData> _telemetryStream;
    private IAsyncStream<Alert> _alertStream;
    
    public IoTDeviceGrain(
        [PersistentState("device", "deviceStore")] IPersistentState<DeviceState> state)
    {
        _state = state;
    }
    
    public override Task OnActivateAsync()
    {
        var streamProvider = GetStreamProvider("SMS");
        var deviceId = this.GetPrimaryKeyString();
        
        _telemetryStream = streamProvider.GetStream<TelemetryData>(deviceId, "telemetry");
        _alertStream = streamProvider.GetStream<Alert>("alerts", "system");
        
        return base.OnActivateAsync();
    }
    
    public async Task ReportTelemetryAsync(TelemetryData data)
    {
        // 状態を更新
        _state.State.LastTelemetry = data;
        _state.State.LastSeen = DateTime.UtcNow;
        
        // ストリームに発行
        await _telemetryStream.OnNextAsync(data);
        
        // 閾値チェック
        foreach (var metric in data.Metrics)
        {
            if (_state.State.Thresholds.TryGetValue(metric.Key, out var threshold))
            {
                if (metric.Value > threshold)
                {
                    await _alertStream.OnNextAsync(new Alert
                    {
                        AlertId = Guid.NewGuid().ToString(),
                        DeviceId = data.DeviceId,
                        Metric = metric.Key,
                        Value = metric.Value,
                        Threshold = threshold,
                        Timestamp = DateTime.UtcNow,
                        Severity = CalculateSeverity(metric.Value, threshold)
                    });
                }
            }
        }
        
        await _state.WriteStateAsync();
    }
}

// アグリゲーターGrain(複数デバイスのデータを集約)
public class TelemetryAggregatorGrain : Grain, ITelemetryAggregatorGrain
{
    private readonly Dictionary<string, TelemetryStatistics> _statistics = new();
    private IAsyncStream<AggregatedMetrics> _aggregatedStream;
    
    public override async Task OnActivateAsync()
    {
        var streamProvider = GetStreamProvider("SMS");
        
        // 複数のデバイスストリームを購読
        var devices = await GetManagedDevicesAsync();
        foreach (var deviceId in devices)
        {
            var stream = streamProvider.GetStream<TelemetryData>(deviceId, "telemetry");
            await stream.SubscribeAsync(ProcessTelemetryAsync);
        }
        
        // 集約データのストリーム
        _aggregatedStream = streamProvider.GetStream<AggregatedMetrics>(
            this.GetPrimaryKeyString(), "aggregated");
        
        // 定期的に集約データを発行
        RegisterTimer(PublishAggregatedMetricsAsync, null, 
            TimeSpan.FromSeconds(10), TimeSpan.FromSeconds(10));
        
        return base.OnActivateAsync();
    }
    
    private async Task ProcessTelemetryAsync(TelemetryData data, StreamSequenceToken token)
    {
        lock (_statistics)
        {
            if (!_statistics.ContainsKey(data.DeviceId))
            {
                _statistics[data.DeviceId] = new TelemetryStatistics();
            }
            
            var stats = _statistics[data.DeviceId];
            stats.UpdateWithNewData(data);
        }
    }
    
    private async Task PublishAggregatedMetricsAsync(object state)
    {
        var aggregated = new AggregatedMetrics
        {
            Timestamp = DateTime.UtcNow,
            DeviceCount = _statistics.Count,
            Metrics = CalculateAggregatedMetrics()
        };
        
        await _aggregatedStream.OnNextAsync(aggregated);
    }
}

3. イベント駆動ワークフロー

// ワークフローエンジン
public interface IWorkflowGrain : IGrainWithGuidKey
{
    Task StartWorkflowAsync(WorkflowDefinition definition, Dictionary<string, object> inputs);
    Task<WorkflowStatus> GetStatusAsync();
}

public class WorkflowGrain : Grain, IWorkflowGrain
{
    private readonly IPersistentState<WorkflowState> _state;
    private readonly Dictionary<string, StreamSubscriptionHandle<WorkflowEvent>> _subscriptions = new();
    
    public async Task StartWorkflowAsync(
        WorkflowDefinition definition, 
        Dictionary<string, object> inputs)
    {
        _state.State.WorkflowId = this.GetPrimaryKey();
        _state.State.Definition = definition;
        _state.State.Status = WorkflowStatus.Running;
        _state.State.CurrentStep = definition.Steps.First();
        _state.State.Context = inputs;
        
        await _state.WriteStateAsync();
        
        // 最初のステップを実行
        await ExecuteStepAsync(_state.State.CurrentStep);
    }
    
    private async Task ExecuteStepAsync(WorkflowStep step)
    {
        var streamProvider = GetStreamProvider("SMS");
        
        switch (step.Type)
        {
            case StepType.Task:
                // タスクGrainに処理を委譲
                var taskGrain = GrainFactory.GetGrain<ITaskGrain>(Guid.NewGuid());
                await taskGrain.ExecuteAsync(step.TaskDefinition, _state.State.Context);
                
                // 完了イベントを待機
                var taskStream = streamProvider.GetStream<TaskCompleted>(
                    taskGrain.GetPrimaryKey(), "task-events");
                
                await taskStream.SubscribeAsync(async (evt, token) =>
                {
                    _state.State.Context.Merge(evt.Results);
                    await MoveToNextStepAsync();
                });
                break;
                
            case StepType.Parallel:
                // 並列実行
                var tasks = step.ParallelTasks.Select(task =>
                    ExecuteParallelTaskAsync(task)).ToList();
                
                await Task.WhenAll(tasks);
                await MoveToNextStepAsync();
                break;
                
            case StepType.Condition:
                // 条件分岐
                var condition = EvaluateCondition(step.Condition, _state.State.Context);
                var nextStep = condition ? step.TrueBranch : step.FalseBranch;
                _state.State.CurrentStep = nextStep;
                await ExecuteStepAsync(nextStep);
                break;
                
            case StepType.Wait:
                // イベント待機
                var eventStream = streamProvider.GetStream<WorkflowEvent>(
                    step.WaitEvent.StreamId, step.WaitEvent.Namespace);
                
                await eventStream.SubscribeAsync(async (evt, token) =>
                {
                    if (MatchesCondition(evt, step.WaitEvent.Condition))
                    {
                        _state.State.Context.Merge(evt.Data);
                        await MoveToNextStepAsync();
                    }
                });
                break;
        }
    }
}

高度なストリーミングパターン

1. ストリームの集約とウィンドウ処理

public class StreamAggregatorGrain : Grain, IStreamAggregatorGrain
{
    private readonly TimeSpan _windowSize = TimeSpan.FromMinutes(1);
    private readonly Queue<TimestampedData> _dataWindow = new();
    
    public override async Task OnActivateAsync()
    {
        var streamProvider = GetStreamProvider("SMS");
        var inputStream = streamProvider.GetStream<SensorData>("sensors", "input");
        
        // ストリームを購読
        await inputStream.SubscribeAsync(ProcessDataAsync);
        
        // ウィンドウ処理のタイマー
        RegisterTimer(ProcessWindowAsync, null, _windowSize, _windowSize);
        
        return base.OnActivateAsync();
    }
    
    private Task ProcessDataAsync(SensorData data, StreamSequenceToken token)
    {
        lock (_dataWindow)
        {
            _dataWindow.Enqueue(new TimestampedData
            {
                Data = data,
                Timestamp = DateTime.UtcNow
            });
            
            // 古いデータを削除
            var cutoff = DateTime.UtcNow - _windowSize;
            while (_dataWindow.Count > 0 && _dataWindow.Peek().Timestamp < cutoff)
            {
                _dataWindow.Dequeue();
            }
        }
        
        return Task.CompletedTask;
    }
    
    private async Task ProcessWindowAsync(object state)
    {
        List<TimestampedData> windowData;
        lock (_dataWindow)
        {
            windowData = _dataWindow.ToList();
        }
        
        if (windowData.Count == 0) return;
        
        // ウィンドウ内のデータを集約
        var aggregated = new AggregatedData
        {
            WindowStart = windowData.First().Timestamp,
            WindowEnd = DateTime.UtcNow,
            Count = windowData.Count,
            Average = windowData.Average(d => d.Data.Value),
            Min = windowData.Min(d => d.Data.Value),
            Max = windowData.Max(d => d.Data.Value),
            StandardDeviation = CalculateStandardDeviation(windowData)
        };
        
        // 結果をストリームに発行
        var streamProvider = GetStreamProvider("SMS");
        var outputStream = streamProvider.GetStream<AggregatedData>("aggregated", "output");
        await outputStream.OnNextAsync(aggregated);
    }
}

2. ストリームの分岐とフィルタリング

public class StreamRouterGrain : Grain, IStreamRouterGrain
{
    private readonly Dictionary<string, IAsyncStream<Event>> _outputStreams = new();
    private readonly List<RoutingRule> _rules = new();
    
    public override async Task OnActivateAsync()
    {
        // ルーティングルールを設定
        _rules.Add(new RoutingRule
        {
            Name = "HighPriorityEvents",
            Condition = e => e.Priority == Priority.High,
            OutputStreamId = "high-priority"
        });
        
        _rules.Add(new RoutingRule
        {
            Name = "ErrorEvents",
            Condition = e => e.Type == EventType.Error,
            OutputStreamId = "errors"
        });
        
        // 出力ストリームを初期化
        var streamProvider = GetStreamProvider("SMS");
        foreach (var rule in _rules)
        {
            _outputStreams[rule.Name] = streamProvider.GetStream<Event>(
                rule.OutputStreamId, "filtered");
        }
        
        // 入力ストリームを購読
        var inputStream = streamProvider.GetStream<Event>("events", "input");
        await inputStream.SubscribeAsync(RouteEventAsync);
        
        return base.OnActivateAsync();
    }
    
    private async Task RouteEventAsync(Event evt, StreamSequenceToken token)
    {
        var tasks = new List<Task>();
        
        foreach (var rule in _rules)
        {
            if (rule.Condition(evt))
            {
                tasks.Add(_outputStreams[rule.Name].OnNextAsync(evt));
            }
        }
        
        // デフォルトストリームにも送信
        if (!tasks.Any())
        {
            var defaultStream = _outputStreams.GetValueOrDefault("default");
            if (defaultStream != null)
            {
                tasks.Add(defaultStream.OnNextAsync(evt));
            }
        }
        
        await Task.WhenAll(tasks);
    }
}

パフォーマンス最適化

バッチ処理とバッファリング

public class BatchProcessorGrain : Grain, IBatchProcessorGrain
{
    private readonly List<DataItem> _buffer = new();
    private readonly int _batchSize = 100;
    private readonly TimeSpan _batchTimeout = TimeSpan.FromSeconds(5);
    private IDisposable _batchTimer;
    
    public override async Task OnActivateAsync()
    {
        var streamProvider = GetStreamProvider("SMS");
        var inputStream = streamProvider.GetStream<DataItem>("data", "input");
        
        await inputStream.SubscribeAsync(BufferDataAsync);
        
        // バッチタイマーを開始
        _batchTimer = RegisterTimer(
            ProcessBatchAsync, 
            null, 
            _batchTimeout, 
            _batchTimeout);
        
        return base.OnActivateAsync();
    }
    
    private async Task BufferDataAsync(DataItem item, StreamSequenceToken token)
    {
        lock (_buffer)
        {
            _buffer.Add(item);
            
            if (_buffer.Count >= _batchSize)
            {
                // バッチサイズに達したら即座に処理
                _ = Task.Run(async () => await ProcessBatchAsync(null));
            }
        }
    }
    
    private async Task ProcessBatchAsync(object state)
    {
        List<DataItem> batch;
        lock (_buffer)
        {
            if (_buffer.Count == 0) return;
            
            batch = _buffer.Take(_batchSize).ToList();
            _buffer.RemoveRange(0, batch.Count);
        }
        
        // バッチ処理
        try
        {
            await ProcessBatchInternalAsync(batch);
        }
        catch (Exception ex)
        {
            // エラーハンドリング
            await HandleBatchErrorAsync(batch, ex);
        }
    }
}

まとめ

今回は、.NET OrleansのGrain間通信とストリーミング機能について学びました。重要なポイント:

  1. 柔軟な通信パターン: 直接通信、Observer、ストリーミングなど多様な選択肢
  2. リアルタイム処理: 低レイテンシでスケーラブルなストリーム処理
  3. 複雑なワークフロー: イベント駆動アーキテクチャの簡単な実装
  4. 高度な処理パターン: ウィンドウ処理、ルーティング、バッチ処理

次回は、Orleansのクラスタリングと本番環境での運用について解説します。高可用性システムの構築方法を学びます。


次回予告:「第4回:クラスタリングと高可用性 - 本番環境でのOrleans運用」では、マルチノードクラスター、自動フェイルオーバー、ローリングアップデートなど、エンタープライズ環境での運用手法を解説します。

技術的な課題をお持ちですか?

記事でご紹介した技術や実装について、
より詳細なご相談やプロジェクトのサポートを承ります

無料技術相談を申し込む