Appearance
第7章 本番運用とデバッグ
7.1 LangSmithを用いた実行の追跡(トレーシング)と評価
AIアプリケーションの開発において、ブラックボックス(中身が見えない状態)になりがちなLLMの挙動を可視化することは、デバッグや品質向上のために欠かせません。LangChainファミリーが提供する「LangSmith(ラングスミス)」は、本番運用時における実行のすべてを追跡(トレーシング)するためのプラットフォームです。
LangSmithを連携させると、プログラム側のコードを変更することなく、環境変数を設定するだけで、すべてのAPI呼び出し、プロンプトの具体的な値、LLMの応答時間、トークンの消費量などをWebダッシュボード上で詳細に確認できます。特にLangGraphを使った複雑なエージェントでは、「どのノードがどの順序で実行され、状態(State)がどう書き換わったか」をタイムライン形式でビジュアルに追跡できるため、バグの原因特定が劇的に早くなります。
以下は、PythonスクリプトからLangSmithのトレーシングを有効化するための環境変数設定コードです。
python
import os
# LangSmithの連携に必要な環境変数を設定します
# (APIキーは事前にLangSmithのアカウントから取得しておきます)
os.environ["LANGCHAIN_TRACING_V2"] = "true"
os.environ["LANGCHAIN_API_KEY"] = "lsv2_pt_xxxxxxxxxxxxxxxxxxxxxx"
os.environ["LANGCHAIN_PROJECT"] = "my-langgraph-agent" # 管理画面でのプロジェクト名
# この状態でLangChainやLangGraphのプログラムを実行するだけで、
# 自動的にすべての呼び出し履歴がLangSmithのクラウド上に記録されます。また、蓄積された実行データをもとにテスト用データセットを作成し、プロンプトやモデルを変更した際にシステムの精度がどう変化したかを自動でテスト・評価(エバリュエーション)する機能も備えており、継続的な改善を支えるインフラとして機能します。
7.2 エラーハンドリングとリトライ処理の設計
本番環境では、APIの通信制限(レートリミット)やネットワークの一時的な瞬断、AIモデルのサーバー障害など、予測できないエラーが頻繁に発生します。信頼性の高いシステムを作るためには、これらの例外的な状況に耐えられる堅牢な「エラーハンドリング」と「リトライ(再試行)処理」を組み込んでおく必要があります。
LangGraphでは、ノードごとにリトライポリシー(再試行のルール)を簡単に設定できます。例えば、一時的なエラーが起きた際には「1秒待って再試行し、それでもダメなら待ち時間を倍(2秒、4秒…)にして最大3回まで繰り返す(指数バックオフ)」といった処理を数行の宣言的なコードで実装可能です。また、どうしてもエラーが解決しない場合には、現在の状態(State)を安全に保存した上で処理を一時停止し、管理者に通知メールを送って手動での復旧を待つような高度なフローも、グラフの設計段階で組み込むことができます。
以下は、特定のノードに対し、自動リトライポリシーを設定するコード例です。
python
from langgraph.graph import StateGraph
from langgraph.prebuilt import RetryPolicy
# リトライポリシー(再試行ルール)の定義
# エラーが発生した際、間隔を空けながら最大3回まで再実行します
custom_retry_policy = RetryPolicy(
max_attempts=3, # 最大3回実行を試みる
initial_interval=1.0, # 初回のエラーから再試行までの待機時間(秒)
backoff_factor=2.0, # 待機時間を倍々にする(1秒、2秒、4秒…)
retry_on=Exception # すべての例外エラーを対象にする
)
workflow = StateGraph(MyState)
# ノードを登録する際、retryオプションとしてポリシーを渡します
workflow.add_node("api_call_node", call_external_api, retry=custom_retry_policy)こうしたエラーへの備えを丁寧に行うことで、夜間にバックグラウンドで大量のバッチ処理を実行するようなケースでも、途中でシステムがクラッシュしてデータが消失するリスクを最小限に抑えることができます。
7.3 ストリーミング出力によるユーザー体験の向上
LLMの応答生成や、LangGraphの複雑なマルチエージェントの処理には、数秒から数分といった長い時間がかかることがあります。ユーザーに対して画面が固まったような状態を見せ続けるのはストレスを与えるため、処理の進捗をリアルタイムに伝える「ストリーミング出力」の実装が必須となります。
LangGraphは、強力なストリーミング機能を標準でサポートしています。ストリーミングには大きく分けて2つのモードがあります。一つは、LLMが言葉を組み立てるそばから1文字ずつリアルタイムに出力していく「メッセージストリーミング(トークンレベル)」です。もう一つは、エージェントの処理が進むごとに「現在、検索ノードが完了しました」「次に評価ノードを実行中です」といった、グラフの各ノードの処理状況(Stateの遷移)を順次出力する「更新ストリーミング(ノードレベル)」です。
以下は、ノードごとの実行進捗と、モデルの出力トークンをリアルタイムで出力するストリーミング実装のコード例です。
python
# 1. ノードレベルのストリーミング(どのノードが処理を終え、何を返したか)
inputs = {"messages": [("user", "LangGraphについて教えて")]}
# streamメソッドを呼び出し、更新状況を随時ループで処理します
for chunk in app.stream(inputs, config):
for node_name, state_update in chunk.items():
print(f"\n--- Node: {node_name} が処理を完了しました ---")
# 更新された状態(メッセージなど)を画面に表示
if "messages" in state_update:
print(state_update["messages"][-1].content)
# 2. トークンレベルのストリーミング(LLMが生成中の文字をリアルタイムに表示)
# astream_eventsメソッドを使用して非同期(async)でイベントを取得します
async for event in app.astream_events(inputs, config, version="v2"):
kind = event["event"]
# チャットモデルが新しい文字(トークン)を出力したイベントを検知
if kind == "on_chat_model_stream":
content = event["data"]["chunk"].content
if content:
# 改行なしで文字をリアルタイムに出力
print(content, end="", flush=True)これら2つのストリーミングをフロントエンド(Web画面)と適切に繋ぎ込み、ローディングアニメーションや途中経過をテンポよく表示することで、ユーザーが体感する待ち時間を大幅に削減し、滑らかで使い心地の良いAIアプリケーションを提供することができます。