【.NET Orleans入門】第2回:ステート管理と永続化 - 信頼性の高いステートフルサービスの構築

はじめに

前回は.NET Orleansの基本概念と開発環境の構築について学びました。今回は、Orleansの最も強力な機能の一つである「ステート管理と永続化」について深く掘り下げます。

分散システムにおいて、状態の管理は最も難しい課題の一つです。サーバーの障害、ネットワークの分断、同時更新の競合...これらの問題をOrleansがどのように解決するのか、実践的なコード例とともに解説します。

Orleansのステート管理の仕組み

従来の課題とOrleansの解決策

// 従来の方法:複雑な状態管理
public class TraditionalService
{
    private readonly IDatabase _db;
    private readonly IDistributedCache _cache;
    private readonly IDistributedLock _lock;
    
    public async Task UpdateUserScoreAsync(string userId, int points)
    {
        using var lockHandle = await _lock.AcquireAsync($"user:{userId}");
        
        // DBから現在のスコアを取得
        var currentScore = await _db.GetUserScoreAsync(userId);
        
        // スコアを更新
        var newScore = currentScore + points;
        
        // DBに保存
        await _db.UpdateUserScoreAsync(userId, newScore);
        
        // キャッシュを更新
        await _cache.SetAsync($"user:score:{userId}", newScore);
        
        // 他のキャッシュも無効化
        await _cache.RemoveAsync($"leaderboard:*");
    }
}

// Orleansの方法:シンプルで確実
public class UserGrain : Grain, IUserGrain
{
    private readonly IPersistentState<UserState> _state;
    
    public UserGrain(
        [PersistentState("userState", "userStore")] 
        IPersistentState<UserState> state)
    {
        _state = state;
    }
    
    public async Task UpdateScoreAsync(int points)
    {
        _state.State.Score += points;
        await _state.WriteStateAsync(); // 自動的に永続化
    }
}

基本的なステート管理

ステートの定義

// UserState.cs
[Serializable]
public class UserState
{
    public string UserId { get; set; }
    public string Name { get; set; }
    public int Score { get; set; }
    public DateTime LastActive { get; set; }
    public List<Achievement> Achievements { get; set; } = new();
    public Dictionary<string, object> Metadata { get; set; } = new();
}

[Serializable]
public class Achievement
{
    public string Id { get; set; }
    public string Name { get; set; }
    public DateTime UnlockedAt { get; set; }
}

PersistentStateを使った実装

public interface IUserGrain : IGrainWithStringKey
{
    Task<UserInfo> GetUserInfoAsync();
    Task UpdateProfileAsync(string name);
    Task AddAchievementAsync(Achievement achievement);
    Task<int> IncrementScoreAsync(int points);
}

public class UserGrain : Grain, IUserGrain
{
    private readonly IPersistentState<UserState> _state;
    private readonly ILogger<UserGrain> _logger;
    
    public UserGrain(
        [PersistentState("userState", "userStore")] 
        IPersistentState<UserState> state,
        ILogger<UserGrain> logger)
    {
        _state = state;
        _logger = logger;
    }
    
    public override Task OnActivateAsync()
    {
        _logger.LogInformation($"UserGrain {this.GetPrimaryKeyString()} activated");
        
        // 初回アクティベーション時の初期化
        if (!_state.RecordExists)
        {
            _state.State = new UserState
            {
                UserId = this.GetPrimaryKeyString(),
                LastActive = DateTime.UtcNow
            };
        }
        
        return base.OnActivateAsync();
    }
    
    public Task<UserInfo> GetUserInfoAsync()
    {
        return Task.FromResult(new UserInfo
        {
            UserId = _state.State.UserId,
            Name = _state.State.Name,
            Score = _state.State.Score,
            AchievementCount = _state.State.Achievements.Count
        });
    }
    
    public async Task UpdateProfileAsync(string name)
    {
        _state.State.Name = name;
        _state.State.LastActive = DateTime.UtcNow;
        await _state.WriteStateAsync();
        
        _logger.LogInformation($"Profile updated for user {_state.State.UserId}");
    }
    
    public async Task AddAchievementAsync(Achievement achievement)
    {
        _state.State.Achievements.Add(achievement);
        await _state.WriteStateAsync();
        
        // イベントを発行(他のGrainに通知)
        var streamProvider = GetStreamProvider("SMS");
        var stream = streamProvider.GetStream<AchievementUnlocked>(
            this.GetPrimaryKeyString(), "achievements");
        
        await stream.OnNextAsync(new AchievementUnlocked
        {
            UserId = _state.State.UserId,
            Achievement = achievement
        });
    }
    
    public async Task<int> IncrementScoreAsync(int points)
    {
        _state.State.Score += points;
        _state.State.LastActive = DateTime.UtcNow;
        await _state.WriteStateAsync();
        
        return _state.State.Score;
    }
}

ストレージプロバイダーの設定

メモリストレージ(開発用)

// Program.cs (Silo)
siloBuilder.AddMemoryGrainStorage("userStore");

Azure Storage

siloBuilder.AddAzureTableGrainStorage("userStore", options =>
{
    options.ConnectionString = "DefaultEndpointsProtocol=https;...";
    options.TableName = "OrleansUserState";
    options.UseJson = true;
});

Azure Cosmos DB

siloBuilder.AddCosmosDBGrainStorage("userStore", options =>
{
    options.AccountEndpoint = "https://myaccount.documents.azure.com:443/";
    options.AccountKey = "myAccountKey";
    options.DB = "OrleansDB";
    options.Collection = "UserState";
    options.CanCreateResources = true;
});

PostgreSQL/MySQL

siloBuilder.AddAdoNetGrainStorage("userStore", options =>
{
    options.Invariant = "Npgsql"; // PostgreSQL
    options.ConnectionString = "Host=localhost;Database=orleans;Username=orleans;Password=orleans";
    options.UseJsonFormat = true;
});

高度なステート管理パターン

1. イベントソーシング

public interface IBankAccountGrain : IGrainWithStringKey
{
    Task<decimal> GetBalanceAsync();
    Task<TransactionResult> DepositAsync(decimal amount);
    Task<TransactionResult> WithdrawAsync(decimal amount);
    Task<List<Transaction>> GetTransactionHistoryAsync();
}

[Serializable]
public class BankAccountState
{
    public string AccountId { get; set; }
    public List<Transaction> Transactions { get; set; } = new();
    public decimal Balance => Transactions.Sum(t => t.Amount);
}

public class BankAccountGrain : Grain, IBankAccountGrain
{
    private readonly IPersistentState<BankAccountState> _state;
    
    public BankAccountGrain(
        [PersistentState("bankAccount", "eventStore")] 
        IPersistentState<BankAccountState> state)
    {
        _state = state;
    }
    
    public Task<decimal> GetBalanceAsync()
    {
        return Task.FromResult(_state.State.Balance);
    }
    
    public async Task<TransactionResult> DepositAsync(decimal amount)
    {
        if (amount <= 0)
            return new TransactionResult { Success = false, Error = "Invalid amount" };
        
        var transaction = new Transaction
        {
            Id = Guid.NewGuid().ToString(),
            Type = TransactionType.Deposit,
            Amount = amount,
            Timestamp = DateTime.UtcNow,
            Balance = _state.State.Balance + amount
        };
        
        _state.State.Transactions.Add(transaction);
        await _state.WriteStateAsync();
        
        return new TransactionResult
        {
            Success = true,
            TransactionId = transaction.Id,
            NewBalance = transaction.Balance
        };
    }
    
    public async Task<TransactionResult> WithdrawAsync(decimal amount)
    {
        if (amount <= 0)
            return new TransactionResult { Success = false, Error = "Invalid amount" };
        
        if (_state.State.Balance < amount)
            return new TransactionResult { Success = false, Error = "Insufficient funds" };
        
        var transaction = new Transaction
        {
            Id = Guid.NewGuid().ToString(),
            Type = TransactionType.Withdrawal,
            Amount = -amount,
            Timestamp = DateTime.UtcNow,
            Balance = _state.State.Balance - amount
        };
        
        _state.State.Transactions.Add(transaction);
        await _state.WriteStateAsync();
        
        return new TransactionResult
        {
            Success = true,
            TransactionId = transaction.Id,
            NewBalance = transaction.Balance
        };
    }
    
    public Task<List<Transaction>> GetTransactionHistoryAsync()
    {
        return Task.FromResult(
            _state.State.Transactions
                .OrderByDescending(t => t.Timestamp)
                .Take(100)
                .ToList()
        );
    }
}

2. スナップショット付きイベントソーシング

public class GameSessionGrain : Grain, IGameSessionGrain
{
    private readonly IPersistentState<GameSessionSnapshot> _snapshot;
    private readonly IPersistentState<GameEventLog> _eventLog;
    private GameSession _currentSession;
    
    public GameSessionGrain(
        [PersistentState("snapshot", "snapshotStore")] 
        IPersistentState<GameSessionSnapshot> snapshot,
        [PersistentState("eventLog", "eventStore")] 
        IPersistentState<GameEventLog> eventLog)
    {
        _snapshot = snapshot;
        _eventLog = eventLog;
    }
    
    public override async Task OnActivateAsync()
    {
        // スナップショットから復元
        if (_snapshot.RecordExists)
        {
            _currentSession = _snapshot.State.ToGameSession();
            
            // スナップショット以降のイベントを適用
            var eventsAfterSnapshot = _eventLog.State.Events
                .Where(e => e.Timestamp > _snapshot.State.Timestamp)
                .OrderBy(e => e.Timestamp);
            
            foreach (var evt in eventsAfterSnapshot)
            {
                _currentSession.ApplyEvent(evt);
            }
        }
        else
        {
            _currentSession = new GameSession(this.GetPrimaryKey());
        }
        
        await base.OnActivateAsync();
    }
    
    public async Task<MoveResult> MakeMoveAsync(string playerId, Move move)
    {
        var gameEvent = new GameEvent
        {
            Id = Guid.NewGuid().ToString(),
            Type = GameEventType.Move,
            PlayerId = playerId,
            Data = move,
            Timestamp = DateTime.UtcNow
        };
        
        // イベントを適用
        var result = _currentSession.ApplyEvent(gameEvent);
        
        if (result.IsValid)
        {
            // イベントを永続化
            _eventLog.State.Events.Add(gameEvent);
            await _eventLog.WriteStateAsync();
            
            // 100イベントごとにスナップショットを作成
            if (_eventLog.State.Events.Count % 100 == 0)
            {
                await CreateSnapshotAsync();
            }
        }
        
        return result;
    }
    
    private async Task CreateSnapshotAsync()
    {
        _snapshot.State = GameSessionSnapshot.FromGameSession(_currentSession);
        await _snapshot.WriteStateAsync();
        
        // 古いイベントをクリーンアップ
        _eventLog.State.Events.RemoveAll(e => e.Timestamp <= _snapshot.State.Timestamp);
        await _eventLog.WriteStateAsync();
    }
}

3. 複数ストレージプロバイダーの活用

public class HybridStateGrain : Grain, IHybridStateGrain
{
    // ホットデータ:高速アクセスが必要
    private readonly IPersistentState<HotData> _hotState;
    
    // コールドデータ:コスト効率重視
    private readonly IPersistentState<ColdData> _coldState;
    
    public HybridStateGrain(
        [PersistentState("hotData", "redisStore")] 
        IPersistentState<HotData> hotState,
        [PersistentState("coldData", "blobStore")] 
        IPersistentState<ColdData> coldState)
    {
        _hotState = hotState;
        _coldState = coldState;
    }
    
    public async Task<RecentActivity> GetRecentActivityAsync()
    {
        // Redisから高速に取得
        return _hotState.State.RecentActivity;
    }
    
    public async Task<HistoricalData> GetHistoricalDataAsync(DateRange range)
    {
        // Blob Storageから取得(コスト効率的)
        return _coldState.State.GetDataForRange(range);
    }
    
    public async Task RecordActivityAsync(Activity activity)
    {
        // ホットデータに追加
        _hotState.State.RecentActivity.Add(activity);
        await _hotState.WriteStateAsync();
        
        // 古いデータをコールドストレージに移動
        if (_hotState.State.RecentActivity.Count > 1000)
        {
            var toArchive = _hotState.State.RecentActivity
                .OrderBy(a => a.Timestamp)
                .Take(500)
                .ToList();
            
            _coldState.State.ArchivedActivities.AddRange(toArchive);
            await _coldState.WriteStateAsync();
            
            _hotState.State.RecentActivity.RemoveRange(0, 500);
            await _hotState.WriteStateAsync();
        }
    }
}

パフォーマンス最適化

1. Write-Through vs Write-Behind

public class OptimizedGrain : Grain, IOptimizedGrain
{
    private readonly IPersistentState<MyState> _state;
    private bool _isDirty = false;
    private IDisposable _timer;
    
    public override Task OnActivateAsync()
    {
        // Write-Behind: 5秒ごとに変更を永続化
        _timer = RegisterTimer(
            async _ => await FlushStateAsync(),
            null,
            TimeSpan.FromSeconds(5),
            TimeSpan.FromSeconds(5));
        
        return base.OnActivateAsync();
    }
    
    public Task UpdateDataAsync(string key, object value)
    {
        _state.State.Data[key] = value;
        _isDirty = true;
        
        // Write-Through を選択的に使用
        if (IsCriticalData(key))
        {
            return _state.WriteStateAsync();
        }
        
        return Task.CompletedTask;
    }
    
    private async Task FlushStateAsync()
    {
        if (_isDirty)
        {
            await _state.WriteStateAsync();
            _isDirty = false;
        }
    }
}

2. ステートの圧縮

[Serializable]
public class CompressedState
{
    private byte[] _compressedData;
    
    public void SetData<T>(T data)
    {
        var json = JsonSerializer.Serialize(data);
        using var output = new MemoryStream();
        using (var gzip = new GZipStream(output, CompressionLevel.Optimal))
        {
            var bytes = Encoding.UTF8.GetBytes(json);
            gzip.Write(bytes, 0, bytes.Length);
        }
        _compressedData = output.ToArray();
    }
    
    public T GetData<T>()
    {
        using var input = new MemoryStream(_compressedData);
        using var gzip = new GZipStream(input, CompressionMode.Decompress);
        using var reader = new StreamReader(gzip);
        var json = reader.ReadToEnd();
        return JsonSerializer.Deserialize<T>(json);
    }
}

エラーハンドリングと回復性

状態の検証とリカバリー

public class ResilientGrain : Grain, IResilientGrain
{
    private readonly IPersistentState<ResilientState> _state;
    
    public override async Task OnActivateAsync()
    {
        await base.OnActivateAsync();
        
        // 状態の整合性チェック
        if (!ValidateState())
        {
            await RecoverStateAsync();
        }
    }
    
    private bool ValidateState()
    {
        try
        {
            // ビジネスルールに基づく検証
            if (_state.State.Balance < 0)
                return false;
                
            if (_state.State.Transactions.Count != _state.State.TransactionCount)
                return false;
                
            var calculatedBalance = _state.State.Transactions.Sum(t => t.Amount);
            if (Math.Abs(calculatedBalance - _state.State.Balance) > 0.01m)
                return false;
                
            return true;
        }
        catch
        {
            return false;
        }
    }
    
    private async Task RecoverStateAsync()
    {
        try
        {
            // バックアップから復元を試みる
            var backup = await LoadBackupStateAsync();
            if (backup != null && ValidateBackupState(backup))
            {
                _state.State = backup;
                await _state.WriteStateAsync();
                return;
            }
            
            // イベントログから再構築
            await RebuildFromEventLogAsync();
        }
        catch (Exception ex)
        {
            // 最終手段:初期状態にリセット
            _state.State = new ResilientState
            {
                Id = this.GetPrimaryKeyString(),
                CreatedAt = DateTime.UtcNow
            };
            await _state.WriteStateAsync();
            
            // アラートを送信
            await NotifyStateResetAsync(ex);
        }
    }
}

ベストプラクティス

1. ステートのサイズを適切に管理

// 悪い例:巨大なステート
public class BadState
{
    public List<DetailedLogEntry> AllLogs { get; set; } // 無制限に増加
}

// 良い例:サイズを制限
public class GoodState
{
    private readonly Queue<LogEntry> _recentLogs = new(1000);
    
    public void AddLog(LogEntry entry)
    {
        _recentLogs.Enqueue(entry);
        if (_recentLogs.Count > 1000)
            _recentLogs.Dequeue();
    }
}

2. 適切な粒度でGrainを設計

// 悪い例:1つのGrainに全てを詰め込む
public interface ICompanyGrain : IGrainWithStringKey
{
    Task<List<Employee>> GetAllEmployeesAsync();
    Task<List<Department>> GetAllDepartmentsAsync();
    Task<List<Project>> GetAllProjectsAsync();
}

// 良い例:責務を分離
public interface IEmployeeGrain : IGrainWithStringKey
{
    Task<EmployeeInfo> GetInfoAsync();
}

public interface IDepartmentGrain : IGrainWithStringKey
{
    Task<List<string>> GetEmployeeIdsAsync();
}

public interface IProjectGrain : IGrainWithGuidKey
{
    Task<ProjectInfo> GetInfoAsync();
}

まとめ

今回は、.NET Orleansのステート管理と永続化について学びました。重要なポイント:

  1. シンプルなAPI: IPersistentState<T>による直感的な状態管理
  2. 柔軟なストレージオプション: メモリ、Azure、SQL、NoSQLなど多様な選択肢
  3. 高度なパターン: イベントソーシング、スナップショット、ハイブリッドストレージ
  4. 信頼性: 自動的な状態の復元と整合性保証

次回は、Grainの通信パターンとストリーミングについて解説します。リアルタイムデータ処理やイベント駆動アーキテクチャの実装方法を学びます。


次回予告:「第3回:Grain間通信とストリーミング - リアルタイムデータ処理の実装」では、Orleansの強力な通信機能を使った、高度な分散処理パターンを解説します。

技術的な課題をお持ちですか専門チームがサポートします

記事でご紹介した技術や実装について、
より詳細なご相談やプロジェクトのサポートを承ります。