【.NET Orleans入門】第3回:Grain間通信とストリーミング - リアルタイムデータ処理の実装
.NETOrleansストリーミングリアルタイムWebSocketイベント駆動
.NET OrleansのGrain間通信とストリーミング機能を使って、リアルタイムチャット、IoTデータ処理、イベント駆動システムなどを実装する方法を解説します。
はじめに
前回はOrleansのステート管理と永続化について学びました。今回は、Orleansの真価を発揮する「Grain間通信とストリーミング」について解説します。
数百万のユーザーが同時に参加するライブチャット、毎秒数万件のIoTデータをリアルタイム処理するシステム、複雑なワークフローを持つビジネスプロセス...これらを効率的に実装する方法を、実践的なコード例とともに学んでいきます。
Grain間通信の基本
1. 直接通信パターン
// インターフェース定義
public interface IOrderGrain : IGrainWithGuidKey
{
Task<OrderStatus> PlaceOrderAsync(OrderRequest request);
Task<OrderInfo> GetOrderInfoAsync();
}
public interface IInventoryGrain : IGrainWithStringKey
{
Task<bool> ReserveItemsAsync(List<OrderItem> items, Guid orderId);
Task ReleaseReservationAsync(Guid orderId);
}
public interface IPaymentGrain : IGrainWithStringKey
{
Task<PaymentResult> ProcessPaymentAsync(decimal amount, PaymentMethod method);
}
// OrderGrainの実装
public class OrderGrain : Grain, IOrderGrain
{
private readonly IPersistentState<OrderState> _state;
private readonly IGrainFactory _grainFactory;
public OrderGrain(
[PersistentState("order", "orderStore")] IPersistentState<OrderState> state,
IGrainFactory grainFactory)
{
_state = state;
_grainFactory = grainFactory;
}
public async Task<OrderStatus> PlaceOrderAsync(OrderRequest request)
{
// 注文IDを設定
_state.State.OrderId = this.GetPrimaryKey();
_state.State.CustomerId = request.CustomerId;
_state.State.Items = request.Items;
_state.State.Status = OrderStatus.Pending;
// 在庫の確認と予約
var inventoryGrain = _grainFactory.GetGrain<IInventoryGrain>("main-inventory");
var reservationSuccess = await inventoryGrain.ReserveItemsAsync(
request.Items,
_state.State.OrderId);
if (!reservationSuccess)
{
_state.State.Status = OrderStatus.Failed;
_state.State.FailureReason = "在庫不足";
await _state.WriteStateAsync();
return OrderStatus.Failed;
}
// 支払い処理
var paymentGrain = _grainFactory.GetGrain<IPaymentGrain>(request.CustomerId);
var totalAmount = request.Items.Sum(i => i.Price * i.Quantity);
var paymentResult = await paymentGrain.ProcessPaymentAsync(
totalAmount,
request.PaymentMethod);
if (!paymentResult.Success)
{
// 在庫予約を解放
await inventoryGrain.ReleaseReservationAsync(_state.State.OrderId);
_state.State.Status = OrderStatus.Failed;
_state.State.FailureReason = paymentResult.Error;
await _state.WriteStateAsync();
return OrderStatus.Failed;
}
// 注文確定
_state.State.Status = OrderStatus.Confirmed;
_state.State.PaymentId = paymentResult.TransactionId;
_state.State.ConfirmedAt = DateTime.UtcNow;
await _state.WriteStateAsync();
// 配送Grainに通知
var shippingGrain = _grainFactory.GetGrain<IShippingGrain>(_state.State.OrderId);
await shippingGrain.ScheduleDeliveryAsync(_state.State);
return OrderStatus.Confirmed;
}
}
2. Observer パターン
// 通知インターフェース
public interface IStockPriceObserver : IGrainObserver
{
void PriceChanged(string symbol, decimal newPrice, decimal change);
}
public interface IStockGrain : IGrainWithStringKey
{
Task SubscribeAsync(IStockPriceObserver observer);
Task UnsubscribeAsync(IStockPriceObserver observer);
Task UpdatePriceAsync(decimal newPrice);
}
// StockGrainの実装
public class StockGrain : Grain, IStockGrain
{
private readonly ObserverManager<IStockPriceObserver> _observers;
private decimal _currentPrice;
private string _symbol;
public StockGrain()
{
_observers = new ObserverManager<IStockPriceObserver>(
TimeSpan.FromMinutes(5), // タイムアウト
this.GetLogger());
}
public override Task OnActivateAsync()
{
_symbol = this.GetPrimaryKeyString();
return base.OnActivateAsync();
}
public Task SubscribeAsync(IStockPriceObserver observer)
{
_observers.Subscribe(observer);
return Task.CompletedTask;
}
public Task UnsubscribeAsync(IStockPriceObserver observer)
{
_observers.Unsubscribe(observer);
return Task.CompletedTask;
}
public async Task UpdatePriceAsync(decimal newPrice)
{
var oldPrice = _currentPrice;
_currentPrice = newPrice;
var change = newPrice - oldPrice;
// すべてのオブザーバーに通知
await _observers.Notify(observer =>
observer.PriceChanged(_symbol, newPrice, change));
}
}
// クライアント側の実装
public class StockPriceMonitor : IStockPriceObserver
{
private readonly ILogger<StockPriceMonitor> _logger;
public StockPriceMonitor(ILogger<StockPriceMonitor> logger)
{
_logger = logger;
}
public void PriceChanged(string symbol, decimal newPrice, decimal change)
{
_logger.LogInformation(
$"Stock {symbol} price changed to {newPrice:C} ({change:+0.00;-0.00})");
// UIを更新、アラートを送信など
if (Math.Abs(change) > 10)
{
SendPriceAlert(symbol, newPrice, change);
}
}
}
Orleans Streams - 強力なストリーミング機能
ストリームプロバイダーの設定
// Silo設定
siloBuilder
.AddMemoryGrainStorage("PubSubStore")
.AddSimpleMessageStreamProvider("SMS")
.AddSimpleMessageStreamProvider("AzureQueue", options =>
{
options.FireAndForgetDelivery = false;
options.OptimizeForImmutableData = true;
});
1. チャットシステムの実装
// チャットルームGrain
public interface IChatRoomGrain : IGrainWithStringKey
{
Task JoinAsync(string userId, string userName);
Task LeaveAsync(string userId);
Task SendMessageAsync(string userId, string message);
Task<List<ChatUser>> GetActiveUsersAsync();
}
[Serializable]
public class ChatMessage
{
public string MessageId { get; set; }
public string UserId { get; set; }
public string UserName { get; set; }
public string Message { get; set; }
public DateTime Timestamp { get; set; }
public MessageType Type { get; set; }
}
public class ChatRoomGrain : Grain, IChatRoomGrain
{
private readonly Dictionary<string, ChatUser> _activeUsers = new();
private IAsyncStream<ChatMessage> _stream;
public override Task OnActivateAsync()
{
var streamProvider = GetStreamProvider("SMS");
var roomId = this.GetPrimaryKeyString();
_stream = streamProvider.GetStream<ChatMessage>(roomId, "chat");
return base.OnActivateAsync();
}
public async Task JoinAsync(string userId, string userName)
{
_activeUsers[userId] = new ChatUser { UserId = userId, UserName = userName };
// 入室通知を配信
await _stream.OnNextAsync(new ChatMessage
{
MessageId = Guid.NewGuid().ToString(),
UserId = userId,
UserName = userName,
Message = $"{userName} が入室しました",
Timestamp = DateTime.UtcNow,
Type = MessageType.System
});
}
public async Task SendMessageAsync(string userId, string message)
{
if (!_activeUsers.ContainsKey(userId))
throw new InvalidOperationException("ユーザーはこのチャットルームに参加していません");
var chatMessage = new ChatMessage
{
MessageId = Guid.NewGuid().ToString(),
UserId = userId,
UserName = _activeUsers[userId].UserName,
Message = message,
Timestamp = DateTime.UtcNow,
Type = MessageType.User
};
// メッセージを配信
await _stream.OnNextAsync(chatMessage);
// メッセージの永続化(オプション)
await SaveMessageToHistoryAsync(chatMessage);
}
}
// クライアント側(SignalR Hub)
public class ChatHub : Hub
{
private readonly IGrainFactory _grainFactory;
private StreamSubscriptionHandle<ChatMessage> _subscription;
public ChatHub(IGrainFactory grainFactory)
{
_grainFactory = grainFactory;
}
public async Task JoinRoom(string roomId, string userName)
{
var userId = Context.ConnectionId;
var chatRoom = _grainFactory.GetGrain<IChatRoomGrain>(roomId);
// チャットルームに参加
await chatRoom.JoinAsync(userId, userName);
// ストリームを購読
var streamProvider = _grainFactory.GetStreamProvider("SMS");
var stream = streamProvider.GetStream<ChatMessage>(roomId, "chat");
_subscription = await stream.SubscribeAsync(async (message, token) =>
{
// SignalRでクライアントにメッセージを送信
await Clients.Group(roomId).SendAsync("ReceiveMessage", message);
});
// SignalRグループに追加
await Groups.AddToGroupAsync(userId, roomId);
}
public async Task SendMessage(string roomId, string message)
{
var userId = Context.ConnectionId;
var chatRoom = _grainFactory.GetGrain<IChatRoomGrain>(roomId);
await chatRoom.SendMessageAsync(userId, message);
}
}
2. IoTデータストリーミング
// IoTデバイスGrain
public interface IIoTDeviceGrain : IGrainWithStringKey
{
Task ReportTelemetryAsync(TelemetryData data);
Task<DeviceStatus> GetStatusAsync();
Task SetThresholdAsync(string metric, double threshold);
}
[Serializable]
public class TelemetryData
{
public string DeviceId { get; set; }
public Dictionary<string, double> Metrics { get; set; }
public DateTime Timestamp { get; set; }
public GeoLocation Location { get; set; }
}
public class IoTDeviceGrain : Grain, IIoTDeviceGrain
{
private readonly IPersistentState<DeviceState> _state;
private IAsyncStream<TelemetryData> _telemetryStream;
private IAsyncStream<Alert> _alertStream;
public IoTDeviceGrain(
[PersistentState("device", "deviceStore")] IPersistentState<DeviceState> state)
{
_state = state;
}
public override Task OnActivateAsync()
{
var streamProvider = GetStreamProvider("SMS");
var deviceId = this.GetPrimaryKeyString();
_telemetryStream = streamProvider.GetStream<TelemetryData>(deviceId, "telemetry");
_alertStream = streamProvider.GetStream<Alert>("alerts", "system");
return base.OnActivateAsync();
}
public async Task ReportTelemetryAsync(TelemetryData data)
{
// 状態を更新
_state.State.LastTelemetry = data;
_state.State.LastSeen = DateTime.UtcNow;
// ストリームに発行
await _telemetryStream.OnNextAsync(data);
// 閾値チェック
foreach (var metric in data.Metrics)
{
if (_state.State.Thresholds.TryGetValue(metric.Key, out var threshold))
{
if (metric.Value > threshold)
{
await _alertStream.OnNextAsync(new Alert
{
AlertId = Guid.NewGuid().ToString(),
DeviceId = data.DeviceId,
Metric = metric.Key,
Value = metric.Value,
Threshold = threshold,
Timestamp = DateTime.UtcNow,
Severity = CalculateSeverity(metric.Value, threshold)
});
}
}
}
await _state.WriteStateAsync();
}
}
// アグリゲーターGrain(複数デバイスのデータを集約)
public class TelemetryAggregatorGrain : Grain, ITelemetryAggregatorGrain
{
private readonly Dictionary<string, TelemetryStatistics> _statistics = new();
private IAsyncStream<AggregatedMetrics> _aggregatedStream;
public override async Task OnActivateAsync()
{
var streamProvider = GetStreamProvider("SMS");
// 複数のデバイスストリームを購読
var devices = await GetManagedDevicesAsync();
foreach (var deviceId in devices)
{
var stream = streamProvider.GetStream<TelemetryData>(deviceId, "telemetry");
await stream.SubscribeAsync(ProcessTelemetryAsync);
}
// 集約データのストリーム
_aggregatedStream = streamProvider.GetStream<AggregatedMetrics>(
this.GetPrimaryKeyString(), "aggregated");
// 定期的に集約データを発行
RegisterTimer(PublishAggregatedMetricsAsync, null,
TimeSpan.FromSeconds(10), TimeSpan.FromSeconds(10));
return base.OnActivateAsync();
}
private async Task ProcessTelemetryAsync(TelemetryData data, StreamSequenceToken token)
{
lock (_statistics)
{
if (!_statistics.ContainsKey(data.DeviceId))
{
_statistics[data.DeviceId] = new TelemetryStatistics();
}
var stats = _statistics[data.DeviceId];
stats.UpdateWithNewData(data);
}
}
private async Task PublishAggregatedMetricsAsync(object state)
{
var aggregated = new AggregatedMetrics
{
Timestamp = DateTime.UtcNow,
DeviceCount = _statistics.Count,
Metrics = CalculateAggregatedMetrics()
};
await _aggregatedStream.OnNextAsync(aggregated);
}
}
3. イベント駆動ワークフロー
// ワークフローエンジン
public interface IWorkflowGrain : IGrainWithGuidKey
{
Task StartWorkflowAsync(WorkflowDefinition definition, Dictionary<string, object> inputs);
Task<WorkflowStatus> GetStatusAsync();
}
public class WorkflowGrain : Grain, IWorkflowGrain
{
private readonly IPersistentState<WorkflowState> _state;
private readonly Dictionary<string, StreamSubscriptionHandle<WorkflowEvent>> _subscriptions = new();
public async Task StartWorkflowAsync(
WorkflowDefinition definition,
Dictionary<string, object> inputs)
{
_state.State.WorkflowId = this.GetPrimaryKey();
_state.State.Definition = definition;
_state.State.Status = WorkflowStatus.Running;
_state.State.CurrentStep = definition.Steps.First();
_state.State.Context = inputs;
await _state.WriteStateAsync();
// 最初のステップを実行
await ExecuteStepAsync(_state.State.CurrentStep);
}
private async Task ExecuteStepAsync(WorkflowStep step)
{
var streamProvider = GetStreamProvider("SMS");
switch (step.Type)
{
case StepType.Task:
// タスクGrainに処理を委譲
var taskGrain = GrainFactory.GetGrain<ITaskGrain>(Guid.NewGuid());
await taskGrain.ExecuteAsync(step.TaskDefinition, _state.State.Context);
// 完了イベントを待機
var taskStream = streamProvider.GetStream<TaskCompleted>(
taskGrain.GetPrimaryKey(), "task-events");
await taskStream.SubscribeAsync(async (evt, token) =>
{
_state.State.Context.Merge(evt.Results);
await MoveToNextStepAsync();
});
break;
case StepType.Parallel:
// 並列実行
var tasks = step.ParallelTasks.Select(task =>
ExecuteParallelTaskAsync(task)).ToList();
await Task.WhenAll(tasks);
await MoveToNextStepAsync();
break;
case StepType.Condition:
// 条件分岐
var condition = EvaluateCondition(step.Condition, _state.State.Context);
var nextStep = condition ? step.TrueBranch : step.FalseBranch;
_state.State.CurrentStep = nextStep;
await ExecuteStepAsync(nextStep);
break;
case StepType.Wait:
// イベント待機
var eventStream = streamProvider.GetStream<WorkflowEvent>(
step.WaitEvent.StreamId, step.WaitEvent.Namespace);
await eventStream.SubscribeAsync(async (evt, token) =>
{
if (MatchesCondition(evt, step.WaitEvent.Condition))
{
_state.State.Context.Merge(evt.Data);
await MoveToNextStepAsync();
}
});
break;
}
}
}
高度なストリーミングパターン
1. ストリームの集約とウィンドウ処理
public class StreamAggregatorGrain : Grain, IStreamAggregatorGrain
{
private readonly TimeSpan _windowSize = TimeSpan.FromMinutes(1);
private readonly Queue<TimestampedData> _dataWindow = new();
public override async Task OnActivateAsync()
{
var streamProvider = GetStreamProvider("SMS");
var inputStream = streamProvider.GetStream<SensorData>("sensors", "input");
// ストリームを購読
await inputStream.SubscribeAsync(ProcessDataAsync);
// ウィンドウ処理のタイマー
RegisterTimer(ProcessWindowAsync, null, _windowSize, _windowSize);
return base.OnActivateAsync();
}
private Task ProcessDataAsync(SensorData data, StreamSequenceToken token)
{
lock (_dataWindow)
{
_dataWindow.Enqueue(new TimestampedData
{
Data = data,
Timestamp = DateTime.UtcNow
});
// 古いデータを削除
var cutoff = DateTime.UtcNow - _windowSize;
while (_dataWindow.Count > 0 && _dataWindow.Peek().Timestamp < cutoff)
{
_dataWindow.Dequeue();
}
}
return Task.CompletedTask;
}
private async Task ProcessWindowAsync(object state)
{
List<TimestampedData> windowData;
lock (_dataWindow)
{
windowData = _dataWindow.ToList();
}
if (windowData.Count == 0) return;
// ウィンドウ内のデータを集約
var aggregated = new AggregatedData
{
WindowStart = windowData.First().Timestamp,
WindowEnd = DateTime.UtcNow,
Count = windowData.Count,
Average = windowData.Average(d => d.Data.Value),
Min = windowData.Min(d => d.Data.Value),
Max = windowData.Max(d => d.Data.Value),
StandardDeviation = CalculateStandardDeviation(windowData)
};
// 結果をストリームに発行
var streamProvider = GetStreamProvider("SMS");
var outputStream = streamProvider.GetStream<AggregatedData>("aggregated", "output");
await outputStream.OnNextAsync(aggregated);
}
}
2. ストリームの分岐とフィルタリング
public class StreamRouterGrain : Grain, IStreamRouterGrain
{
private readonly Dictionary<string, IAsyncStream<Event>> _outputStreams = new();
private readonly List<RoutingRule> _rules = new();
public override async Task OnActivateAsync()
{
// ルーティングルールを設定
_rules.Add(new RoutingRule
{
Name = "HighPriorityEvents",
Condition = e => e.Priority == Priority.High,
OutputStreamId = "high-priority"
});
_rules.Add(new RoutingRule
{
Name = "ErrorEvents",
Condition = e => e.Type == EventType.Error,
OutputStreamId = "errors"
});
// 出力ストリームを初期化
var streamProvider = GetStreamProvider("SMS");
foreach (var rule in _rules)
{
_outputStreams[rule.Name] = streamProvider.GetStream<Event>(
rule.OutputStreamId, "filtered");
}
// 入力ストリームを購読
var inputStream = streamProvider.GetStream<Event>("events", "input");
await inputStream.SubscribeAsync(RouteEventAsync);
return base.OnActivateAsync();
}
private async Task RouteEventAsync(Event evt, StreamSequenceToken token)
{
var tasks = new List<Task>();
foreach (var rule in _rules)
{
if (rule.Condition(evt))
{
tasks.Add(_outputStreams[rule.Name].OnNextAsync(evt));
}
}
// デフォルトストリームにも送信
if (!tasks.Any())
{
var defaultStream = _outputStreams.GetValueOrDefault("default");
if (defaultStream != null)
{
tasks.Add(defaultStream.OnNextAsync(evt));
}
}
await Task.WhenAll(tasks);
}
}
パフォーマンス最適化
バッチ処理とバッファリング
public class BatchProcessorGrain : Grain, IBatchProcessorGrain
{
private readonly List<DataItem> _buffer = new();
private readonly int _batchSize = 100;
private readonly TimeSpan _batchTimeout = TimeSpan.FromSeconds(5);
private IDisposable _batchTimer;
public override async Task OnActivateAsync()
{
var streamProvider = GetStreamProvider("SMS");
var inputStream = streamProvider.GetStream<DataItem>("data", "input");
await inputStream.SubscribeAsync(BufferDataAsync);
// バッチタイマーを開始
_batchTimer = RegisterTimer(
ProcessBatchAsync,
null,
_batchTimeout,
_batchTimeout);
return base.OnActivateAsync();
}
private async Task BufferDataAsync(DataItem item, StreamSequenceToken token)
{
lock (_buffer)
{
_buffer.Add(item);
if (_buffer.Count >= _batchSize)
{
// バッチサイズに達したら即座に処理
_ = Task.Run(async () => await ProcessBatchAsync(null));
}
}
}
private async Task ProcessBatchAsync(object state)
{
List<DataItem> batch;
lock (_buffer)
{
if (_buffer.Count == 0) return;
batch = _buffer.Take(_batchSize).ToList();
_buffer.RemoveRange(0, batch.Count);
}
// バッチ処理
try
{
await ProcessBatchInternalAsync(batch);
}
catch (Exception ex)
{
// エラーハンドリング
await HandleBatchErrorAsync(batch, ex);
}
}
}
まとめ
今回は、.NET OrleansのGrain間通信とストリーミング機能について学びました。重要なポイント:
- 柔軟な通信パターン: 直接通信、Observer、ストリーミングなど多様な選択肢
- リアルタイム処理: 低レイテンシでスケーラブルなストリーム処理
- 複雑なワークフロー: イベント駆動アーキテクチャの簡単な実装
- 高度な処理パターン: ウィンドウ処理、ルーティング、バッチ処理
次回は、Orleansのクラスタリングと本番環境での運用について解説します。高可用性システムの構築方法を学びます。
次回予告:「第4回:クラスタリングと高可用性 - 本番環境でのOrleans運用」では、マルチノードクラスター、自動フェイルオーバー、ローリングアップデートなど、エンタープライズ環境での運用手法を解説します。