Difyを使ったRAGシステム構築完全ガイド:社内ナレッジベースの構築から運用まで

はじめに

大規模言語モデル(LLM)の登場により、企業の知識管理は新たな局面を迎えています。RAG(Retrieval-Augmented Generation)は、企業固有の情報を LLM に組み込む最も効果的な手法の一つです。本記事では、Dify を使用して実践的な RAG システムを構築する方法を詳しく解説します。

RAG(検索拡張生成)とは

RAG は、外部の知識ベースから関連情報を検索し、その情報を基に LLM が回答を生成する技術です。これにより、最新の情報や企業固有のデータに基づいた正確な回答が可能になります。

RAG のメリット

  1. 最新情報への対応: LLM の学習データに含まれない最新情報を活用
  2. ハルシネーション削減: 事実に基づいた回答生成
  3. カスタマイズ性: 企業固有の知識を活用
  4. コスト効率: ファインチューニング不要

Dify プラットフォームの概要

Dify は、LLM アプリケーション開発のためのオープンソースプラットフォームです。直感的な UI で RAG システムを構築できます。

Dify の主要機能

  • ビジュアルワークフロー設計
  • 複数の LLM 対応(OpenAI、Anthropic、Azure等)
  • ベクターデータベース統合
  • API エンドポイント自動生成

環境構築

1. Dify のインストール

# Docker Compose を使用したインストール
git clone https://github.com/langgenius/dify.git
cd dify
cp .env.example .env

# 環境変数の設定
nano .env

# 起動
docker-compose up -d

2. 初期設定

# docker-compose.yml の主要設定
services:
  api:
    image: langgenius/dify-api:latest
    environment:
      MODE: api
      LOG_LEVEL: info
      SECRET_KEY: your-secret-key
      INIT_PASSWORD: your-admin-password
      
  web:
    image: langgenius/dify-web:latest
    environment:
      EDITION: SELF_HOSTED
      CONSOLE_API_URL: http://api:5001

ナレッジベースの構築

1. データの準備

import os
import json
from pathlib import Path

class KnowledgeBasePreparator:
    def __init__(self, source_dir: str):
        self.source_dir = Path(source_dir)
    
    def prepare_documents(self):
        """ドキュメントを Dify 用に整形"""
        documents = []
        
        for file_path in self.source_dir.glob("**/*.md"):
            with open(file_path, 'r', encoding='utf-8') as f:
                content = f.read()
                
            # メタデータの抽出
            metadata = {
                'source': str(file_path),
                'category': file_path.parent.name,
                'title': file_path.stem
            }
            
            documents.append({
                'content': content,
                'metadata': metadata
            })
        
        return documents
    
    def chunk_documents(self, documents, chunk_size=1000, overlap=200):
        """ドキュメントをチャンクに分割"""
        chunked_docs = []
        
        for doc in documents:
            text = doc['content']
            chunks = []
            
            for i in range(0, len(text), chunk_size - overlap):
                chunk = text[i:i + chunk_size]
                chunks.append({
                    'content': chunk,
                    'metadata': {
                        **doc['metadata'],
                        'chunk_index': len(chunks)
                    }
                })
            
            chunked_docs.extend(chunks)
        
        return chunked_docs

# 使用例
preparator = KnowledgeBasePreparator('./company-docs')
documents = preparator.prepare_documents()
chunks = preparator.chunk_documents(documents)

2. ベクターデータベースの設定

from langchain.embeddings import OpenAIEmbeddings
from langchain.vectorstores import Chroma
import chromadb

class VectorStoreManager:
    def __init__(self, persist_directory="./chroma_db"):
        self.persist_directory = persist_directory
        self.embeddings = OpenAIEmbeddings(
            openai_api_key=os.environ['OPENAI_API_KEY']
        )
        
    def create_vector_store(self, documents):
        """ベクターストアの作成"""
        # ChromaDB クライアントの初期化
        client = chromadb.PersistentClient(path=self.persist_directory)
        
        # コレクションの作成
        collection = client.create_collection(
            name="company_knowledge",
            metadata={"hnsw:space": "cosine"}
        )
        
        # ドキュメントの追加
        texts = [doc['content'] for doc in documents]
        metadatas = [doc['metadata'] for doc in documents]
        
        vector_store = Chroma.from_texts(
            texts=texts,
            embedding=self.embeddings,
            metadatas=metadatas,
            persist_directory=self.persist_directory
        )
        
        return vector_store
    
    def search(self, query: str, k: int = 5):
        """類似度検索"""
        vector_store = Chroma(
            persist_directory=self.persist_directory,
            embedding_function=self.embeddings
        )
        
        results = vector_store.similarity_search_with_score(query, k=k)
        return results

Dify でのワークフロー構築

1. アプリケーションの作成

// Dify API を使用したアプリケーション作成
const createRAGApplication = async () => {
  const response = await fetch('https://api.dify.ai/v1/apps', {
    method: 'POST',
    headers: {
      'Authorization': `Bearer ${process.env.DIFY_API_KEY}`,
      'Content-Type': 'application/json'
    },
    body: JSON.stringify({
      name: '社内ナレッジベース RAG',
      mode: 'chat',
      icon: '📚',
      description: '社内ドキュメントを検索して回答するAIアシスタント'
    })
  });
  
  return response.json();
};

2. ワークフローの設計

# ワークフロー定義(YAML形式)
workflow:
  name: "RAG Knowledge Base Workflow"
  
  nodes:
    - id: "user_input"
      type: "start"
      outputs:
        - query
    
    - id: "query_rewrite"
      type: "llm"
      inputs:
        - user_input.query
      prompt: |
        以下のユーザーの質問を、検索に適した形に書き換えてください:
        質問: {{query}}
        
        検索クエリ:
    
    - id: "knowledge_retrieval"
      type: "knowledge_retrieval"
      inputs:
        - query_rewrite.output
      config:
        top_k: 5
        score_threshold: 0.7
    
    - id: "answer_generation"
      type: "llm"
      inputs:
        - user_input.query
        - knowledge_retrieval.documents
      prompt: |
        以下の情報を基に、ユーザーの質問に答えてください。
        
        関連情報:
        {{documents}}
        
        質問: {{query}}
        
        回答:

3. プロンプトエンジニアリング

class RAGPromptTemplate:
    @staticmethod
    def create_system_prompt():
        return """
        あなたは社内ナレッジベースの AI アシスタントです。
        以下のルールに従って回答してください:
        
        1. 提供された情報のみを使用して回答する
        2. 情報が不足している場合は、その旨を明確に伝える
        3. 専門用語は分かりやすく説明する
        4. 回答は構造化して提示する
        5. 参照元を明記する
        """
    
    @staticmethod
    def create_qa_prompt(context: str, question: str):
        return f"""
        以下のコンテキストを参考に質問に答えてください。
        
        コンテキスト:
        {context}
        
        質問: {question}
        
        回答形式:
        1. 要約(1-2文)
        2. 詳細説明
        3. 参照元
        
        回答:
        """

高度な実装テクニック

1. ハイブリッド検索の実装

class HybridSearcher:
    def __init__(self, vector_store, bm25_index):
        self.vector_store = vector_store
        self.bm25_index = bm25_index
    
    def search(self, query: str, k: int = 5, alpha: float = 0.5):
        """
        ベクター検索と BM25 を組み合わせたハイブリッド検索
        alpha: ベクター検索の重み(0-1)
        """
        # ベクター検索
        vector_results = self.vector_store.similarity_search_with_score(
            query, k=k*2
        )
        
        # BM25 検索
        bm25_results = self.bm25_index.search(query, k=k*2)
        
        # スコアの正規化と結合
        combined_results = self._combine_results(
            vector_results, 
            bm25_results, 
            alpha
        )
        
        # 上位 k 件を返す
        return sorted(combined_results, key=lambda x: x[1], reverse=True)[:k]
    
    def _combine_results(self, vector_results, bm25_results, alpha):
        """検索結果の結合"""
        combined = {}
        
        # ベクター検索結果の処理
        for doc, score in vector_results:
            doc_id = doc.metadata.get('id')
            combined[doc_id] = {
                'doc': doc,
                'vector_score': score,
                'bm25_score': 0
            }
        
        # BM25 検索結果の追加
        for doc, score in bm25_results:
            doc_id = doc.metadata.get('id')
            if doc_id in combined:
                combined[doc_id]['bm25_score'] = score
            else:
                combined[doc_id] = {
                    'doc': doc,
                    'vector_score': 0,
                    'bm25_score': score
                }
        
        # 最終スコアの計算
        results = []
        for doc_id, scores in combined.items():
            final_score = (
                alpha * scores['vector_score'] + 
                (1 - alpha) * scores['bm25_score']
            )
            results.append((scores['doc'], final_score))
        
        return results

2. 回答品質の向上

class AnswerQualityEnhancer:
    def __init__(self, llm_client):
        self.llm = llm_client
    
    def enhance_answer(self, initial_answer: str, context: str):
        """回答の品質を向上させる"""
        
        # 事実確認
        fact_check_prompt = f"""
        以下の回答が提供されたコンテキストと一致しているか確認してください。
        
        コンテキスト: {context}
        回答: {initial_answer}
        
        不正確な部分があれば指摘してください。
        """
        
        fact_check_result = self.llm.generate(fact_check_prompt)
        
        # 回答の改善
        if "不正確" in fact_check_result:
            improvement_prompt = f"""
            以下の回答を、指摘された問題を修正して改善してください。
            
            元の回答: {initial_answer}
            指摘事項: {fact_check_result}
            
            改善された回答:
            """
            
            improved_answer = self.llm.generate(improvement_prompt)
            return improved_answer
        
        return initial_answer
    
    def add_citations(self, answer: str, sources: list):
        """引用元の追加"""
        citations = "\n\n参照元:\n"
        for i, source in enumerate(sources, 1):
            citations += f"{i}. {source['title']} - {source['source']}\n"
        
        return answer + citations

運用とモニタリング

1. パフォーマンス監視

import time
from datetime import datetime
import prometheus_client

class RAGMonitor:
    def __init__(self):
        # Prometheusメトリクスの定義
        self.response_time = prometheus_client.Histogram(
            'rag_response_time_seconds',
            'RAG システムの応答時間'
        )
        self.relevance_score = prometheus_client.Gauge(
            'rag_relevance_score',
            '検索結果の関連性スコア'
        )
        self.query_counter = prometheus_client.Counter(
            'rag_queries_total',
            'クエリの総数'
        )
    
    def log_query(self, query: str, response: str, duration: float, 
                  relevance_scores: list):
        """クエリのログ記録"""
        self.query_counter.inc()
        self.response_time.observe(duration)
        
        if relevance_scores:
            avg_score = sum(relevance_scores) / len(relevance_scores)
            self.relevance_score.set(avg_score)
        
        # 詳細ログの記録
        log_entry = {
            'timestamp': datetime.now().isoformat(),
            'query': query,
            'response_length': len(response),
            'duration': duration,
            'relevance_scores': relevance_scores,
            'avg_relevance': avg_score if relevance_scores else 0
        }
        
        # ログファイルに記録
        with open('rag_queries.log', 'a') as f:
            f.write(json.dumps(log_entry) + '\n')

2. フィードバックループの実装

class FeedbackSystem:
    def __init__(self, db_connection):
        self.db = db_connection
    
    def collect_feedback(self, query_id: str, rating: int, comment: str = None):
        """ユーザーフィードバックの収集"""
        feedback = {
            'query_id': query_id,
            'rating': rating,
            'comment': comment,
            'timestamp': datetime.now()
        }
        
        # データベースに保存
        self.db.feedback.insert_one(feedback)
        
        # 低評価の場合はアラート
        if rating < 3:
            self._send_alert(feedback)
    
    def analyze_feedback(self, period_days: int = 30):
        """フィードバックの分析"""
        start_date = datetime.now() - timedelta(days=period_days)
        
        feedback_data = self.db.feedback.find({
            'timestamp': {'$gte': start_date}
        })
        
        # 統計情報の計算
        ratings = [f['rating'] for f in feedback_data]
        avg_rating = sum(ratings) / len(ratings) if ratings else 0
        
        # 問題のあるクエリパターンの特定
        low_rated = self.db.feedback.find({
            'timestamp': {'$gte': start_date},
            'rating': {'$lt': 3}
        })
        
        problem_patterns = self._identify_patterns(low_rated)
        
        return {
            'average_rating': avg_rating,
            'total_feedback': len(ratings),
            'problem_patterns': problem_patterns
        }

セキュリティとコンプライアンス

データセキュリティ

class SecureRAGSystem:
    def __init__(self):
        self.encryption_key = os.environ['ENCRYPTION_KEY']
    
    def encrypt_sensitive_data(self, data: str) -> str:
        """機密データの暗号化"""
        from cryptography.fernet import Fernet
        
        f = Fernet(self.encryption_key)
        encrypted = f.encrypt(data.encode())
        return encrypted.decode()
    
    def access_control(self, user_id: str, document_id: str) -> bool:
        """アクセス制御の実装"""
        # ユーザーの権限確認
        user_permissions = self.get_user_permissions(user_id)
        document_classification = self.get_document_classification(document_id)
        
        return document_classification in user_permissions
    
    def audit_log(self, user_id: str, action: str, resource: str):
        """監査ログの記録"""
        log_entry = {
            'timestamp': datetime.now().isoformat(),
            'user_id': user_id,
            'action': action,
            'resource': resource,
            'ip_address': self.get_client_ip()
        }
        
        # 監査ログの保存
        self.save_audit_log(log_entry)

トラブルシューティング

よくある問題と解決策

  1. 検索精度が低い

    • チャンクサイズの調整
    • Embedding モデルの変更
    • メタデータの追加
  2. 応答速度が遅い

    • ベクターインデックスの最適化
    • キャッシュの実装
    • バッチ処理の活用
  3. メモリ使用量が多い

    • ストリーミング処理の実装
    • 不要なデータの削除
    • インデックスの分割

まとめ

Dify を使用した RAG システムの構築は、企業の知識管理を革新的に改善します。本記事で紹介した実装方法を活用することで、高精度かつ高速な社内ナレッジベースを構築できます。

重要なポイント:

  • 適切なデータ準備とチャンキング
  • ハイブリッド検索の活用
  • 継続的な改善とモニタリング
  • セキュリティとコンプライアンスの確保

エンハンスド株式会社では、Dify を活用した RAG システムの構築支援を行っています。詳しくはお問い合わせください。


タグ: #Dify #RAG #LLM #ナレッジベース #AI #検索拡張生成

執筆者: エンハンスド株式会社 AI ソリューション部

公開日: 2024年12月20日