【.NET Orleans入門】第2回:ステート管理と永続化 - 信頼性の高いステートフルサービスの構築
.NETOrleansステート管理永続化信頼性分散システム
はじめに
前回は.NET Orleansの基本概念と開発環境の構築について学びました。今回は、Orleansの最も強力な機能の一つである「ステート管理と永続化」について深く掘り下げます。
分散システムにおいて、状態の管理は最も難しい課題の一つです。サーバーの障害、ネットワークの分断、同時更新の競合...これらの問題をOrleansがどのように解決するのか、実践的なコード例とともに解説します。
Orleansのステート管理の仕組み
従来の課題とOrleansの解決策
// 従来の方法:複雑な状態管理
public class TraditionalService
{
private readonly IDatabase _db;
private readonly IDistributedCache _cache;
private readonly IDistributedLock _lock;
public async Task UpdateUserScoreAsync(string userId, int points)
{
using var lockHandle = await _lock.AcquireAsync($"user:{userId}");
// DBから現在のスコアを取得
var currentScore = await _db.GetUserScoreAsync(userId);
// スコアを更新
var newScore = currentScore + points;
// DBに保存
await _db.UpdateUserScoreAsync(userId, newScore);
// キャッシュを更新
await _cache.SetAsync($"user:score:{userId}", newScore);
// 他のキャッシュも無効化
await _cache.RemoveAsync($"leaderboard:*");
}
}
// Orleansの方法:シンプルで確実
public class UserGrain : Grain, IUserGrain
{
private readonly IPersistentState<UserState> _state;
public UserGrain(
[PersistentState("userState", "userStore")]
IPersistentState<UserState> state)
{
_state = state;
}
public async Task UpdateScoreAsync(int points)
{
_state.State.Score += points;
await _state.WriteStateAsync(); // 自動的に永続化
}
}
基本的なステート管理
ステートの定義
// UserState.cs
[Serializable]
public class UserState
{
public string UserId { get; set; }
public string Name { get; set; }
public int Score { get; set; }
public DateTime LastActive { get; set; }
public List<Achievement> Achievements { get; set; } = new();
public Dictionary<string, object> Metadata { get; set; } = new();
}
[Serializable]
public class Achievement
{
public string Id { get; set; }
public string Name { get; set; }
public DateTime UnlockedAt { get; set; }
}
PersistentStateを使った実装
public interface IUserGrain : IGrainWithStringKey
{
Task<UserInfo> GetUserInfoAsync();
Task UpdateProfileAsync(string name);
Task AddAchievementAsync(Achievement achievement);
Task<int> IncrementScoreAsync(int points);
}
public class UserGrain : Grain, IUserGrain
{
private readonly IPersistentState<UserState> _state;
private readonly ILogger<UserGrain> _logger;
public UserGrain(
[PersistentState("userState", "userStore")]
IPersistentState<UserState> state,
ILogger<UserGrain> logger)
{
_state = state;
_logger = logger;
}
public override Task OnActivateAsync()
{
_logger.LogInformation($"UserGrain {this.GetPrimaryKeyString()} activated");
// 初回アクティベーション時の初期化
if (!_state.RecordExists)
{
_state.State = new UserState
{
UserId = this.GetPrimaryKeyString(),
LastActive = DateTime.UtcNow
};
}
return base.OnActivateAsync();
}
public Task<UserInfo> GetUserInfoAsync()
{
return Task.FromResult(new UserInfo
{
UserId = _state.State.UserId,
Name = _state.State.Name,
Score = _state.State.Score,
AchievementCount = _state.State.Achievements.Count
});
}
public async Task UpdateProfileAsync(string name)
{
_state.State.Name = name;
_state.State.LastActive = DateTime.UtcNow;
await _state.WriteStateAsync();
_logger.LogInformation($"Profile updated for user {_state.State.UserId}");
}
public async Task AddAchievementAsync(Achievement achievement)
{
_state.State.Achievements.Add(achievement);
await _state.WriteStateAsync();
// イベントを発行(他のGrainに通知)
var streamProvider = GetStreamProvider("SMS");
var stream = streamProvider.GetStream<AchievementUnlocked>(
this.GetPrimaryKeyString(), "achievements");
await stream.OnNextAsync(new AchievementUnlocked
{
UserId = _state.State.UserId,
Achievement = achievement
});
}
public async Task<int> IncrementScoreAsync(int points)
{
_state.State.Score += points;
_state.State.LastActive = DateTime.UtcNow;
await _state.WriteStateAsync();
return _state.State.Score;
}
}
ストレージプロバイダーの設定
メモリストレージ(開発用)
// Program.cs (Silo)
siloBuilder.AddMemoryGrainStorage("userStore");
Azure Storage
siloBuilder.AddAzureTableGrainStorage("userStore", options =>
{
options.ConnectionString = "DefaultEndpointsProtocol=https;...";
options.TableName = "OrleansUserState";
options.UseJson = true;
});
Azure Cosmos DB
siloBuilder.AddCosmosDBGrainStorage("userStore", options =>
{
options.AccountEndpoint = "https://myaccount.documents.azure.com:443/";
options.AccountKey = "myAccountKey";
options.DB = "OrleansDB";
options.Collection = "UserState";
options.CanCreateResources = true;
});
PostgreSQL/MySQL
siloBuilder.AddAdoNetGrainStorage("userStore", options =>
{
options.Invariant = "Npgsql"; // PostgreSQL
options.ConnectionString = "Host=localhost;Database=orleans;Username=orleans;Password=orleans";
options.UseJsonFormat = true;
});
高度なステート管理パターン
1. イベントソーシング
public interface IBankAccountGrain : IGrainWithStringKey
{
Task<decimal> GetBalanceAsync();
Task<TransactionResult> DepositAsync(decimal amount);
Task<TransactionResult> WithdrawAsync(decimal amount);
Task<List<Transaction>> GetTransactionHistoryAsync();
}
[Serializable]
public class BankAccountState
{
public string AccountId { get; set; }
public List<Transaction> Transactions { get; set; } = new();
public decimal Balance => Transactions.Sum(t => t.Amount);
}
public class BankAccountGrain : Grain, IBankAccountGrain
{
private readonly IPersistentState<BankAccountState> _state;
public BankAccountGrain(
[PersistentState("bankAccount", "eventStore")]
IPersistentState<BankAccountState> state)
{
_state = state;
}
public Task<decimal> GetBalanceAsync()
{
return Task.FromResult(_state.State.Balance);
}
public async Task<TransactionResult> DepositAsync(decimal amount)
{
if (amount <= 0)
return new TransactionResult { Success = false, Error = "Invalid amount" };
var transaction = new Transaction
{
Id = Guid.NewGuid().ToString(),
Type = TransactionType.Deposit,
Amount = amount,
Timestamp = DateTime.UtcNow,
Balance = _state.State.Balance + amount
};
_state.State.Transactions.Add(transaction);
await _state.WriteStateAsync();
return new TransactionResult
{
Success = true,
TransactionId = transaction.Id,
NewBalance = transaction.Balance
};
}
public async Task<TransactionResult> WithdrawAsync(decimal amount)
{
if (amount <= 0)
return new TransactionResult { Success = false, Error = "Invalid amount" };
if (_state.State.Balance < amount)
return new TransactionResult { Success = false, Error = "Insufficient funds" };
var transaction = new Transaction
{
Id = Guid.NewGuid().ToString(),
Type = TransactionType.Withdrawal,
Amount = -amount,
Timestamp = DateTime.UtcNow,
Balance = _state.State.Balance - amount
};
_state.State.Transactions.Add(transaction);
await _state.WriteStateAsync();
return new TransactionResult
{
Success = true,
TransactionId = transaction.Id,
NewBalance = transaction.Balance
};
}
public Task<List<Transaction>> GetTransactionHistoryAsync()
{
return Task.FromResult(
_state.State.Transactions
.OrderByDescending(t => t.Timestamp)
.Take(100)
.ToList()
);
}
}
2. スナップショット付きイベントソーシング
public class GameSessionGrain : Grain, IGameSessionGrain
{
private readonly IPersistentState<GameSessionSnapshot> _snapshot;
private readonly IPersistentState<GameEventLog> _eventLog;
private GameSession _currentSession;
public GameSessionGrain(
[PersistentState("snapshot", "snapshotStore")]
IPersistentState<GameSessionSnapshot> snapshot,
[PersistentState("eventLog", "eventStore")]
IPersistentState<GameEventLog> eventLog)
{
_snapshot = snapshot;
_eventLog = eventLog;
}
public override async Task OnActivateAsync()
{
// スナップショットから復元
if (_snapshot.RecordExists)
{
_currentSession = _snapshot.State.ToGameSession();
// スナップショット以降のイベントを適用
var eventsAfterSnapshot = _eventLog.State.Events
.Where(e => e.Timestamp > _snapshot.State.Timestamp)
.OrderBy(e => e.Timestamp);
foreach (var evt in eventsAfterSnapshot)
{
_currentSession.ApplyEvent(evt);
}
}
else
{
_currentSession = new GameSession(this.GetPrimaryKey());
}
await base.OnActivateAsync();
}
public async Task<MoveResult> MakeMoveAsync(string playerId, Move move)
{
var gameEvent = new GameEvent
{
Id = Guid.NewGuid().ToString(),
Type = GameEventType.Move,
PlayerId = playerId,
Data = move,
Timestamp = DateTime.UtcNow
};
// イベントを適用
var result = _currentSession.ApplyEvent(gameEvent);
if (result.IsValid)
{
// イベントを永続化
_eventLog.State.Events.Add(gameEvent);
await _eventLog.WriteStateAsync();
// 100イベントごとにスナップショットを作成
if (_eventLog.State.Events.Count % 100 == 0)
{
await CreateSnapshotAsync();
}
}
return result;
}
private async Task CreateSnapshotAsync()
{
_snapshot.State = GameSessionSnapshot.FromGameSession(_currentSession);
await _snapshot.WriteStateAsync();
// 古いイベントをクリーンアップ
_eventLog.State.Events.RemoveAll(e => e.Timestamp <= _snapshot.State.Timestamp);
await _eventLog.WriteStateAsync();
}
}
3. 複数ストレージプロバイダーの活用
public class HybridStateGrain : Grain, IHybridStateGrain
{
// ホットデータ:高速アクセスが必要
private readonly IPersistentState<HotData> _hotState;
// コールドデータ:コスト効率重視
private readonly IPersistentState<ColdData> _coldState;
public HybridStateGrain(
[PersistentState("hotData", "redisStore")]
IPersistentState<HotData> hotState,
[PersistentState("coldData", "blobStore")]
IPersistentState<ColdData> coldState)
{
_hotState = hotState;
_coldState = coldState;
}
public async Task<RecentActivity> GetRecentActivityAsync()
{
// Redisから高速に取得
return _hotState.State.RecentActivity;
}
public async Task<HistoricalData> GetHistoricalDataAsync(DateRange range)
{
// Blob Storageから取得(コスト効率的)
return _coldState.State.GetDataForRange(range);
}
public async Task RecordActivityAsync(Activity activity)
{
// ホットデータに追加
_hotState.State.RecentActivity.Add(activity);
await _hotState.WriteStateAsync();
// 古いデータをコールドストレージに移動
if (_hotState.State.RecentActivity.Count > 1000)
{
var toArchive = _hotState.State.RecentActivity
.OrderBy(a => a.Timestamp)
.Take(500)
.ToList();
_coldState.State.ArchivedActivities.AddRange(toArchive);
await _coldState.WriteStateAsync();
_hotState.State.RecentActivity.RemoveRange(0, 500);
await _hotState.WriteStateAsync();
}
}
}
パフォーマンス最適化
1. Write-Through vs Write-Behind
public class OptimizedGrain : Grain, IOptimizedGrain
{
private readonly IPersistentState<MyState> _state;
private bool _isDirty = false;
private IDisposable _timer;
public override Task OnActivateAsync()
{
// Write-Behind: 5秒ごとに変更を永続化
_timer = RegisterTimer(
async _ => await FlushStateAsync(),
null,
TimeSpan.FromSeconds(5),
TimeSpan.FromSeconds(5));
return base.OnActivateAsync();
}
public Task UpdateDataAsync(string key, object value)
{
_state.State.Data[key] = value;
_isDirty = true;
// Write-Through を選択的に使用
if (IsCriticalData(key))
{
return _state.WriteStateAsync();
}
return Task.CompletedTask;
}
private async Task FlushStateAsync()
{
if (_isDirty)
{
await _state.WriteStateAsync();
_isDirty = false;
}
}
}
2. ステートの圧縮
[Serializable]
public class CompressedState
{
private byte[] _compressedData;
public void SetData<T>(T data)
{
var json = JsonSerializer.Serialize(data);
using var output = new MemoryStream();
using (var gzip = new GZipStream(output, CompressionLevel.Optimal))
{
var bytes = Encoding.UTF8.GetBytes(json);
gzip.Write(bytes, 0, bytes.Length);
}
_compressedData = output.ToArray();
}
public T GetData<T>()
{
using var input = new MemoryStream(_compressedData);
using var gzip = new GZipStream(input, CompressionMode.Decompress);
using var reader = new StreamReader(gzip);
var json = reader.ReadToEnd();
return JsonSerializer.Deserialize<T>(json);
}
}
エラーハンドリングと回復性
状態の検証とリカバリー
public class ResilientGrain : Grain, IResilientGrain
{
private readonly IPersistentState<ResilientState> _state;
public override async Task OnActivateAsync()
{
await base.OnActivateAsync();
// 状態の整合性チェック
if (!ValidateState())
{
await RecoverStateAsync();
}
}
private bool ValidateState()
{
try
{
// ビジネスルールに基づく検証
if (_state.State.Balance < 0)
return false;
if (_state.State.Transactions.Count != _state.State.TransactionCount)
return false;
var calculatedBalance = _state.State.Transactions.Sum(t => t.Amount);
if (Math.Abs(calculatedBalance - _state.State.Balance) > 0.01m)
return false;
return true;
}
catch
{
return false;
}
}
private async Task RecoverStateAsync()
{
try
{
// バックアップから復元を試みる
var backup = await LoadBackupStateAsync();
if (backup != null && ValidateBackupState(backup))
{
_state.State = backup;
await _state.WriteStateAsync();
return;
}
// イベントログから再構築
await RebuildFromEventLogAsync();
}
catch (Exception ex)
{
// 最終手段:初期状態にリセット
_state.State = new ResilientState
{
Id = this.GetPrimaryKeyString(),
CreatedAt = DateTime.UtcNow
};
await _state.WriteStateAsync();
// アラートを送信
await NotifyStateResetAsync(ex);
}
}
}
ベストプラクティス
1. ステートのサイズを適切に管理
// 悪い例:巨大なステート
public class BadState
{
public List<DetailedLogEntry> AllLogs { get; set; } // 無制限に増加
}
// 良い例:サイズを制限
public class GoodState
{
private readonly Queue<LogEntry> _recentLogs = new(1000);
public void AddLog(LogEntry entry)
{
_recentLogs.Enqueue(entry);
if (_recentLogs.Count > 1000)
_recentLogs.Dequeue();
}
}
2. 適切な粒度でGrainを設計
// 悪い例:1つのGrainに全てを詰め込む
public interface ICompanyGrain : IGrainWithStringKey
{
Task<List<Employee>> GetAllEmployeesAsync();
Task<List<Department>> GetAllDepartmentsAsync();
Task<List<Project>> GetAllProjectsAsync();
}
// 良い例:責務を分離
public interface IEmployeeGrain : IGrainWithStringKey
{
Task<EmployeeInfo> GetInfoAsync();
}
public interface IDepartmentGrain : IGrainWithStringKey
{
Task<List<string>> GetEmployeeIdsAsync();
}
public interface IProjectGrain : IGrainWithGuidKey
{
Task<ProjectInfo> GetInfoAsync();
}
まとめ
今回は、.NET Orleansのステート管理と永続化について学びました。重要なポイント:
- シンプルなAPI:
IPersistentState<T>
による直感的な状態管理 - 柔軟なストレージオプション: メモリ、Azure、SQL、NoSQLなど多様な選択肢
- 高度なパターン: イベントソーシング、スナップショット、ハイブリッドストレージ
- 信頼性: 自動的な状態の復元と整合性保証
次回は、Grainの通信パターンとストリーミングについて解説します。リアルタイムデータ処理やイベント駆動アーキテクチャの実装方法を学びます。
次回予告:「第3回:Grain間通信とストリーミング - リアルタイムデータ処理の実装」では、Orleansの強力な通信機能を使った、高度な分散処理パターンを解説します。