Google ColaboでQdrant用のベクトルデータを生成してQdrantを試す

# 前準備
### jsonデータ生成
ライブドアニュースコーパスをDL

ダウンロード


ダウンロード(通常テキスト):ldcc-20140209.tar.gz

### コーパスの取り込み

import json
import datetime
from typing import List, Dict
from pathlib import Path
import random

CORPUS_DIR = './livedoor-corpus'  # ライブドアコーパスをここにおく
QDRANT_JSON = 'livedoor.json'
SAMPLE_TEXT_LEN: int = 500  # ドキュメントを500文字でトランケート


def read_document(path: Path) -> Dict[str, str]:
    """1ドキュメントの処理"""
    with open(path, 'r') as f:
        lines: List[any] = f.readlines(SAMPLE_TEXT_LEN)
        lines = list(map(lambda x: x.rstrip(), lines))

        d = datetime.datetime.strptime(lines[1], "%Y-%m-%dT%H:%M:%S%z")
        created_at = int(round(d.timestamp()))  # 数値(UNIXエポックタイプ)に変換

        return {
            "url": lines[0],
            "publisher": path.parts[1],  # ['livedoor-corpus', 'it-life-hack', 'it-life-hack-12345.txt']
            "created_at": created_at,
            "body": ' '.join(lines[2:])  # 初めの2行をスキップし、各行をスペースで連結し、1行にする。
        }


def load_dataset_from_livedoor_files() -> (List[List[float]], List[str]):
    # NB. exclude LICENSE.txt, README.txt, CHANGES.txt
    corpus: List[Path] = list(Path(CORPUS_DIR).rglob('*-*.txt'))
    random.shuffle(corpus)  # 記事をシャッフルします

    with open(QDRANT_JSON, 'w') as fp:
        for x in corpus:
            doc: Dict[str, str] = read_document(x)
            json.dump(doc, fp)  # 1行分
            fp.write('\n')


if __name__ == '__main__':
    load_dataset_from_livedoor_files()

$ python3 corpus.py
$ ls
corpus.py livedoor-corpus livedoor.json

このlivedoor.jsonをGoogle collaboで使います。

### Google colabo
!pip install -U ginza spacy
!pip install -U numpy pandas ja_ginza

colaboで文章をベクトル化 … 約10分

import numpy as np
import pandas as pd
import spacy
# from multiprocessing import Pool, cpu_count  <- マルチプロセス関連は不要

# GiNZAモデルのロード (インストールが完了している前提)
try:
    nlp: spacy.Language = spacy.load('ja_ginza', exclude=["tagger", "parser", "ner", "lemmatizer", "textcat", "custom"])
    print("✅ GiNZAモデルのロードに成功しました。")
except OSError:
    print("❌ GiNZAモデルが見つかりません。再度インストール手順を確認してください。")
    # ここでエラーになる場合は、!pip install -U ginza を実行してください。

QDRANT_NPY = 'vectors-livedoor-ginza.npy'  # 出力ファイル名

def f(x):
    # NaNやNone値のチェック (エラー回避のため)
    if pd.isna(x):
        # 空のベクトルを返す、または処理をスキップ
        return np.zeros(nlp.vocab.vectors_length)
        
    doc: spacy.tokens.doc.Doc = nlp(x)  # GiNZAでベクトル化
    return doc.vector

def main():
    try:
        df = pd.read_json('livedoor.json', lines=True)
    except FileNotFoundError:
        print("❌ livedoor.json が見つかりません。ファイルが /content/ にアップロードされているか確認してください。")
        return

    print("\nデータフレームの先頭5行:")
    print(df.head())
    print(f"\n合計 {len(df)} 件の文書をベクトル化中... (シングルプロセス)")
    
    # 修正箇所: df.body.apply(f) を使用してシングルプロセスでベクトル化
    vectors_list = df.body.apply(f).tolist()

    print("ベクトル化完了。NumPyファイルに保存中...")

    # リストをNumPy配列に変換して保存
    vectors_array = np.array(vectors_list)
    np.save(QDRANT_NPY, vectors_array, allow_pickle=False)
    
    print(f"\n========================================================")
    print(f"✅ 保存完了: {QDRANT_NPY}")
    print(f"配列の形状 (Shape): {vectors_array.shape}")
    print(f"========================================================")

# 処理の実行
main()

vectors-livedoor-ginza.npy ができます。

### Qdrantの使い方
### docker pull
$ sudo docker pull qdrant/qdrant
$ sudo docker run -p 6333:6333 \
-v $(pwd)/qdrant_storage:/qdrant/storage \
qdrant/qdrant
_ _
__ _ __| |_ __ __ _ _ __ | |_
/ _` |/ _` | ‘__/ _` | ‘_ \| __|
| (_| | (_| | | | (_| | | | | |_
\__, |\__,_|_| \__,_|_| |_|\__|
|_|

Version: 1.15.4, build: 20db14f8
Access web UI at http://localhost:6333/dashboard

2025-09-27T03:10:37.414937Z INFO storage::content_manager::consensus::persistent: Initializing new raft state at ./storage/raft_state.json
2025-09-27T03:10:37.427278Z INFO qdrant: Distributed mode disabled
2025-09-27T03:10:37.428004Z INFO qdrant: Telemetry reporting enabled, id: 4ab6c13b-1d33-4b1f-ac4a-baff31ff55ad
2025-09-27T03:10:37.460847Z INFO qdrant::actix: TLS disabled for REST API
2025-09-27T03:10:37.463591Z INFO qdrant::actix: Qdrant HTTP listening on 6333
2025-09-27T03:10:37.465943Z INFO actix_server::builder: starting 1 workers
2025-09-27T03:10:37.466167Z INFO actix_server::server: Actix runtime found; starting in Actix runtime
2025-09-27T03:10:37.466216Z INFO actix_server::server: starting service: “actix-web-service-0.0.0.0:6333”, workers: 1, listening on: 0.0.0.0:6333
2025-09-27T03:10:37.467991Z INFO qdrant::tonic: Qdrant gRPC listening on 6334
2025-09-27T03:10:37.468111Z INFO qdrant::tonic: TLS disabled for gRPC API

### SDKインストール
$ pip3 install qdrant-client

### Qdrantの説明
– コレクション: RDBテーブル
– ポイント: RDBレコード ポイントには、ペイロード(Payload)と呼ばれるメタ情報も一緒に登録できる。メタ情報はフィルター検索に使用する。

### コレクションの作成

from qdrant_client import QdrantClient
from qdrant_client.http.models import VectorParams, Distance 
collection_name = 'livedoor'
qdrant_client = QdrantClient(host='localhost', port=6333)

qdrant_client.recreate_collection(
        collection_name=collection_name,
        vectors_config=VectorParams(size=300, distance=Distance.COSINE) # GiNZAは300次元
    )

### コレクションの確認

from qdrant_client import QdrantClient

# Qdrantクライアントを起動中のDockerコンテナに接続
qdrant_client = QdrantClient(host='localhost', port=6333)

# 存在する全てのコレクション名を取得
collections = qdrant_client.get_collections()
collection_names = [c.name for c in collections.collections]

print("====================================")
print("✅ Qdrantに存在するコレクション:")

if 'livedoor' in collection_names:
    print(f"   [O] 'livedoor' コレクションが見つかりました。")
    
    # 詳細情報の取得でエラーが出るため、シンプルな情報に修正
    collection_info = qdrant_client.get_collection(collection_name='livedoor')
    print(f"   - ステータス: {collection_info.status.value}")
    print(f"   - ポイント数: {collection_info.points_count} (現在は0のはずです)")
else:
    print("   [X] 'livedoor' コレクションは見つかりませんでした。")
    print(f"   現在存在するコレクション: {collection_names}")
print("====================================")

$ python3 check_collection.py
====================================
✅ Qdrantに存在するコレクション:
[O] ‘livedoor’ コレクションが見つかりました。
– ステータス: green
– ポイント数: 0 (現在は0のはずです)
====================================

### upload

import json
import numpy as np
import pandas as pd
from qdrant_client import QdrantClient

# =========================================================================
# 1. 補助関数の定義 (JSONファイルの読み込み用)
# livedoor.jsonには不要なキーが含まれている可能性があるため、
# 必要なキーだけを抽出する目的の関数です。
# =========================================================================
def hook(obj):
    """
    JSONオブジェクトから必要なペイロードデータのみを抽出するフック関数。
    """
    if 'body' in obj:
        # 必要なキー(本文、タイトル、カテゴリ)を抽出して返す
        return {
            "title": obj.get("title", ""),
            "body": obj.get("body", ""),
            "category": obj.get("category", "")
        }
    return obj

# =========================================================================
# 2. メイン処理
# =========================================================================
def main():
    # 接続情報
    collection_name = 'livedoor'
    qdrant_client = QdrantClient(host='localhost', port=6333)

    # データの読み込み
    try:
        # ベクトルデータの読み込み
        vectors = np.load('./vectors-livedoor-ginza.npy')
        
        # JSONファイルの読み込みとペイロードの準備
        print("JSONファイルを読み込んでペイロードを準備中...")
        docs = []
        with open('./livedoor.json', 'r', encoding='utf-8') as fd:
            # 各行(一つのJSONオブジェクト)を読み込み、hook関数で必要なキーを抽出
            for line in fd:
                docs.append(json.loads(line, object_hook=hook))
        
        print(f"✅ 読み込み完了。ベクトル数: {vectors.shape[0]}、文書数: {len(docs)}")

    except FileNotFoundError as e:
        print(f"❌ ファイルが見つかりません: {e.filename}")
        print("ファイル(livedoor.json, vectors-livedoor-ginza.npy)が同じディレクトリにあるか確認してください。")
        return

    # コレクションへのアップロード
    print("Qdrantコレクションにデータをアップロード中...")
    qdrant_client.upload_collection(
        collection_name=collection_name, # コレクション名
        vectors=vectors, # ベクトルデータ (NumPy配列)
        payload=iter(docs), # ペイロードデータ (ジェネレータまたはイテレータ)
        ids=None,  # IDの自動発番
        batch_size=256  # バッチサイズ
    )
    print("✅ データアップロード完了。")
    
    # 最終確認
    collection_info = qdrant_client.get_collection(collection_name='livedoor')
    print(f"最終ポイント数: {collection_info.points_count}")
    
# スクリプトの実行
if __name__ == "__main__":
    main()

$ python3 upload_data.py
JSONファイルを読み込んでペイロードを準備中…
✅ 読み込み完了。ベクトル数: 7367、文書数: 7367
Qdrantコレクションにデータをアップロード中…
✅ データアップロード完了。
最終ポイント数: 7367

$ pip3 install spacy ginza

import numpy as np
import spacy
from qdrant_client import QdrantClient
from qdrant_client.http.models import ScoredPoint

# =========================================================================
# 1. 初期設定とモデルロード
# =========================================================================
# ベクトル化に使用したモデルと同じものをロード
# 以前のステップでインストールが完了していることを前提とします
try:
    nlp: spacy.Language = spacy.load('ja_ginza', exclude=["tagger", "parser", "ner", "lemmatizer", "textcat", "custom"])
    print("✅ GiNZAモデルのロードに成功しました。")
except OSError:
    print("❌ GiNZAモデルが見つかりません。")
    exit() # 処理を中断

# Qdrant接続情報
collection_name = 'livedoor'
qdrant_client = QdrantClient(host='localhost', port=6333)

# 検索クエリ
QUERY_TEXT = "男磨きの動画を見ています"

# =========================================================================
# 2. クエリテキストのベクトル化
# =========================================================================
def get_vector_from_text(text: str) -> np.ndarray:
    """
    GiNZAを使用してテキストをベクトルに変換します。
    """
    doc: spacy.tokens.doc.Doc = nlp(text)
    # GiNZAのdoc.vectorはNumPy配列を返します
    return doc.vector

# =========================================================================
# 3. Qdrantでの検索実行
# =========================================================================
def main():
    print(f"\n========================================================")
    print(f"🔍 検索クエリ: {QUERY_TEXT}")
    print(f"========================================================")

    # クエリテキストをベクトルに変換
    query_vector = get_vector_from_text(QUERY_TEXT)

    # Qdrantで検索を実行
    hits = qdrant_client.search(
        collection_name=collection_name,
        query_vector=query_vector, # ベクトル化したクエリー
        query_filter=None,
        with_payload=True, # レスポンスにペイロードを含める
        limit=5 # 上位5件を取得
    )
    
    # 検索結果の表示
    print("\n[検索結果 - 上位 5件]")
    if not hits:
        print("類似記事は見つかりませんでした。")
        return

    for i, hit in enumerate(hits):
        h: ScoredPoint = hit
        # ペイロードからタイトルと本文を取得
        title = h.payload.get('title', 'N/A')
        body_snippet = h.payload.get('body', 'N/A')[:100] + '...' # 本文は先頭100文字を抜粋
        
        print(f"--- 順位 {i+1} (スコア: {h.score:.4f}) ---")
        print(f"タイトル: {title}")
        print(f"本文抜粋: {body_snippet}")
        
# スクリプトの実行
if __name__ == "__main__":
    main()
import numpy as np
import spacy
from qdrant_client import QdrantClient
from qdrant_client.http.models import ScoredPoint

# =========================================================================
# 1. 初期設定とモデルロード
# =========================================================================
# ベクトル化に使用したモデルと同じものをロード
# 以前のステップでインストールが完了していることを前提とします
try:
    nlp: spacy.Language = spacy.load('ja_ginza', exclude=["tagger", "parser", "ner", "lemmatizer", "textcat", "custom"])
    print("✅ GiNZAモデルのロードに成功しました。")
except OSError:
    print("❌ GiNZAモデルが見つかりません。")
    exit() # 処理を中断

# Qdrant接続情報
collection_name = 'livedoor'
qdrant_client = QdrantClient(host='localhost', port=6333)

# 検索クエリ
QUERY_TEXT = "野球情報が知りたい"

# =========================================================================
# 2. クエリテキストのベクトル化
# =========================================================================
def get_vector_from_text(text: str) -> np.ndarray:
    """
    GiNZAを使用してテキストをベクトルに変換します。
    """
    doc: spacy.tokens.doc.Doc = nlp(text)
    # GiNZAのdoc.vectorはNumPy配列を返します
    return doc.vector

# =========================================================================
# 3. Qdrantでの検索実行
# =========================================================================
def main():
    print(f"\n========================================================")
    print(f"🔍 検索クエリ: {QUERY_TEXT}")
    print(f"========================================================")

    # クエリテキストをベクトルに変換
    query_vector = get_vector_from_text(QUERY_TEXT)

    # Qdrantで検索を実行
    hits = qdrant_client.search(
        collection_name=collection_name,
        query_vector=query_vector, # ベクトル化したクエリー
        query_filter=None,
        with_payload=True, # レスポンスにペイロードを含める
        limit=5 # 上位5件を取得
    )
    
    # 検索結果の表示
    print("\n[検索結果 - 上位 5件]")
    if not hits:
        print("類似記事は見つかりませんでした。")
        return

    for i, hit in enumerate(hits):
        h: ScoredPoint = hit
        # ペイロードからタイトルと本文を取得
        title = h.payload.get('title', 'N/A')
        body_snippet = h.payload.get('body', 'N/A')[:100] + '...' # 本文は先頭100文字を抜粋
        
        print(f"--- 順位 {i+1} (スコア: {h.score:.4f}) ---")
        print(f"タイトル: {title}")
        print(f"本文抜粋: {body_snippet}")
        
# スクリプトの実行
if __name__ == "__main__":
    main()

Qdrantとは何か?

Qdrant公式サイト: https://qdrant.tech/

## Qdrantとは
オープンソースのベクトルデータベース
Rust製
クライアントはPython SDK, REST API, gRPCで接続できる
Qdrant自体は文章をベクトルにする機能はない、ベクトルを比較する機能だけになる

## ベクトル検索
セマンティック検索という方法があり、ドキュメント全体の意味を考慮する
ドキュメントをベクトルで表現することをembeddingという
ベクトルとベクトルを比較することで、ドキュメントの類似性を検証することができる(コサイン類似度, ベクトルとベクトルの類似度の尺度) …
bertなどの事前学習モデルでfine tuneもできるようになってきた
総当たりでベクトルを比較すると計算量が膨大になるため、精度を犠牲にして高速化している

### ベクトル検索の手順
– ベクトルデータの準備 (ニュースコーパス)
– コーパスをディクショナリ登録(json)
– コーパスのベクトル変換(GiNZA)

– Qdrantサーバ起動(docker)
– コレクションの作成
– ドキュメントを登録
– 類似ドキュメントの検索

ベクトルデータの準備とベクトル変換のところが肝になりそう

Pythonで話者分離して、片方の話者の発話を切り抜き

無料のブラウザツールだと、音楽などで、音声とインストラメントを分離することはできるようなのですが、二人が喋っていて、片方の音声に分離することができなかったので、Pythonで実行します。

前準備として、以下の手配が必要
1) ffmpeg, pyannote.audio のインストール
2) Hugging Faceでのaccess token発行(read)およびモデル利用のaccept
3) Hugging Faceでのaccess tokenをコードのHUGGINGFACE_TOKENに埋め込み
4) python3 speaker_separation.py の実行

import subprocess
from pyannote.audio import Pipeline
from pydub import AudioSegment
import os
from collections import defaultdict

# ===== 設定 =====
mp4_file = "video.mp4"
wav_file = "conversation.wav"
output_file = "main_speaker_only.wav"

HUGGINGFACE_TOKEN = "****"  # Hugging Face token

# ===== WAV変換 =====
if os.path.exists(wav_file):
    os.remove(wav_file)

subprocess.run([
    "ffmpeg", "-y", "-i", mp4_file,
    "-vn", "-acodec", "pcm_s16le", "-ar", "16000", "-ac", "1",
    wav_file
], check=True)

# ===== 話者分離 =====
pipeline = Pipeline.from_pretrained(
    "pyannote/speaker-diarization-3.1",
    use_auth_token=HUGGINGFACE_TOKEN
)

diarization = pipeline(wav_file)

# ===== 各話者の合計発話時間を計算 =====
speaker_durations = defaultdict(float)
for turn, _, speaker in diarization.itertracks(yield_label=True):
    speaker_durations[speaker] += turn.end - turn.start

# 発話時間が最も長い話者を自動選択
target_speaker = max(speaker_durations, key=speaker_durations.get)
print("選択された話者:", target_speaker)

# ===== その話者だけ抽出 =====
audio = AudioSegment.from_wav(wav_file)
speaker_segments = [
    audio[int(turn.start * 1000): int(turn.end * 1000)]
    for turn, _, speaker in diarization.itertracks(yield_label=True)
    if speaker == target_speaker
]

if speaker_segments:
    speaker_audio = sum(speaker_segments)
    speaker_audio.export(output_file, format="wav")
    print(f"✅ 保存しました: {output_file}")
else:
    print("⚠️ 対象話者の音声が見つかりませんでした")

Difyの短期メモリ

Difyのチャットフローで メモリをオフにした場合、プレビューで入力された「こんにちは」は 常に新しい会話セッションとして扱われます。

具体的には
メモリON
過去のやり取り(今回のセッション内の履歴)を保持
LLMは前回の「こんにちは」を覚えているので、「再度のこんにちは、どうした」など、文脈を踏まえた応答になる

メモリOFF
そのノードは前のやり取りを無視
「こんにちは」と入力しても、毎回初対面の会話のように扱われ、文脈に依存しない応答になる
つまり、オフにすると 「毎回初めて会話する状態」 と考えればOK

なるほど

Difyにgoogle spread sheetに連携したい

## Google Cloud Platform (GCP) で設定
GCPでGoogle Sheets APIをenableにします。
APIの認証情報(Credential)として、サービスアカウントを作成し、JSON形式のキーファイルをダウンロード(credentials.json)

## スプレッドシートの共有設定
スプレッドシートの右上の「共有」ボタンをクリックし、先ほど作成したサービスアカウントのメールアドレスを**編集者(Editor)**として追加します。

## Node.jsでライブラリをインストール
$ npm install google-auth-library googleapis

chat-sheet.js

import fs from "fs";
import fetch from "node-fetch";
import { google } from "googleapis";

// Google Sheets APIの設定
const sheets = google.sheets({ version: "v4" });
const auth = new google.auth.GoogleAuth({
  keyFile: "./credentials.json", // ダウンロードしたキーファイルのパス
  scopes: ["https://www.googleapis.com/auth/spreadsheets"],
});
const SPREADSHEET_ID = "***"; // スプレッドシートのURLから取得できるID

const DIFY_API_KEY = "app-***";
const API_URL = "https://api.dify.ai/v1/chat-messages";

const prompts = fs
  .readFileSync("./prompts.txt", "utf8")
  .split("\n")
  .map((line) => line.trim())
  .filter(Boolean);

// Difyに問い合わせる関数
async function callDify(prompt) {
  const response = await fetch(API_URL, {
    method: "POST",
    headers: {
      "Authorization": `Bearer ${DIFY_API_KEY}`,
      "Content-Type": "application/json",
    },
    body: JSON.stringify({
      inputs: {},
      query: prompt,
      response_mode: "blocking",
      conversation_id: "",
      user: "cli-user",
    }),
  });

  if (!response.ok) {
    const err = await response.text();
    throw new Error(`API Error ${response.status}: ${err}`);
  }

  const data = await response.json();
  return data.answer || "(No answer)";
}

// スプレッドシートに結果を保存する関数
async function saveToGoogleSheets(results) {
  const client = await auth.getClient();
  google.options({ auth: client });

  const values = results.map(row => [row.prompt, row.answer, row.error]);

  const resource = {
    values: [["質問", "回答", "エラー"], ...values], // ヘッダー行を追加
  };

  try {
    await sheets.spreadsheets.values.clear({
      spreadsheetId: SPREADSHEET_ID,
      range: "Sheet1!A:C", // 既存のデータをクリア(必要に応じて)
    });
    await sheets.spreadsheets.values.update({
      spreadsheetId: SPREADSHEET_ID,
      range: "Sheet1!A1",
      valueInputOption: "RAW",
      resource,
    });
    console.log("\n=== 回答をGoogleスプレッドシートに保存しました ===");
  } catch (error) {
    console.error("スプレッドシートへの書き込みエラー:", error.message);
  }
}

// メイン処理
(async () => {
  const results = [];

  for (const [i, prompt] of prompts.entries()) {
    try {
      const answer = await callDify(prompt);
      results.push({ prompt, answer, error: "" });

      // CLIに出力
      console.log(`Q${i + 1}: ${prompt}`);
      console.log(`A${i + 1}: ${answer}\n`);
    } catch (err) {
      results.push({ prompt, answer: "", error: err.message });
      console.error(`Error for "${prompt}": ${err.message}`);
    }
  }

  // Googleスプレッドシートに保存
  await saveToGoogleSheets(results);

  // (オプション)テキストファイルにも保存
  const textOutput = results
    .map((res) => `Q: ${res.prompt}\nA: ${res.answer || res.error}\n`)
    .join("\n");
  fs.writeFileSync("results.txt", textOutput, "utf8");
  console.log("=== 回答を results.txt にも保存しました ===");
})();

ほう、なるほど

Adamとloss

Adam(アダム)は、主にディープラーニングで広く使われている最適化アルゴリズムの一種。最適化アルゴリズムは、モデルの学習において、損失関数の値を最小化するためのパラメータの更新方法を決定する役割

import torch
import torch.nn as nn
import torch.optim as optim

X = torch.randn(100, 1) * 10
y = 2 * X + 1 + torch.randn(100, 1)

class LinearRegression(nn.Module):
    def __init__(self):
        super().__init__()
        self.linear = nn.Linear(1, 1)

    def forward(self, x):
        return self.linear(x)

model = LinearRegression()

# 損失関数とオプティマイザの定義
# 損失関数は平均二乗誤差
criterion = nn.MSELoss()

# オプティマイザはAdam
optimizer = optim.Adam(model.parameters(), lr=0.01)

# モデルの学習ループ
num_epochs = 1000

for epoch in range(num_epochs):
    
    # 順伝播
    y_pred = model(X)

    # 損失の計算
    loss = criterion(y_pred, y)
    
    # 逆伝播と最適化
    optimizer.zero_grad()
    loss.backward()
    optimizer.step()
    
    if (epoch+1) % 100 == 0:
        print(f'Epoch [{epoch+1}/{num_epochs}], Loss: {loss.item():.4f}')

# 学習後のパラメータ表示
weight, bias = model.linear.weight.item(), model.linear.bias.item()
print(f'Learned parameters: weight = {weight:.4f}, bias = {bias:.4f}')

勾配が急な方向には学習率を大きく、勾配が緩やかな方向には学習率を小さくすることで、効率的かつ安定して学習を進めることができる

LangChainを触ってみる

LangChain は 大規模言語モデル(LLM, Large Language Models)を使ったアプリ開発を効率化するためのフレームワークです。

### LangChain とは?
– Python や JavaScript で使える オープンソースのライブラリ
– OpenAI GPT, Anthropic Claude, Llama などの LLM を 組み合わせて活用できる
– 単に「プロンプトを投げて応答をもらう」以上のことを簡単に構築できる

### 主な機能
– Prompt Management(プロンプト管理)
– Chains(チェーン)
「ユーザーの質問 → LLM 応答 → 外部データ検索 → さらに LLM 応答」みたいに処理をつなげられる
– Agents(エージェント)
LLM が「どのツールを使うか」を自分で判断して実行できる
例:Web検索、計算機、SQLデータベース などを LLM が使い分ける
– Memory(メモリ)
対話の履歴を覚えて、会話に文脈を持たせられる
– Retrieval(外部知識の利用)
RAG(Retrieval Augmented Generation)を簡単に構築できる
例:PDFやドキュメントをベクトルDBに格納して、質問に応じて検索し、LLMに渡す

### 必要なインストール
$ pip3 install langchain
$ pip3 install langchain-openai

from langchain_openai import OpenAI
from langchain.prompts import PromptTemplate
from langchain.chains import LLMChain
import os
from dotenv import load_dotenv

load_dotenv()

llm = OpenAI(
    temperature=0.7,
    api_key=os.getenv("OPENAI_API_KEY")  # ← ここで明示的に渡す
)

# プロンプトのテンプレートを定義
template = "質問: {question}\n答えをわかりやすく説明してください。"
prompt = PromptTemplate(template=template, input_variables=["question"])

# LLMChain を作成
chain = prompt | llm

# 実行
response = chain.invoke({"question": "LangChainとは何ですか?"})
print(response)

$ python3 test_langchain.py

LangChain(言語チェーン)とは、さまざまな言語を繋げて利用することができるシステムのことを指します。つまり、複数の言語を一つのチェーン(連鎖)のようにつなげて、それぞれの言語を柔軟に切り替えて使うことができる仕組みです。これにより、異なる言語を話す人々がコミュニケーションを取る際に、よりスムーズに相手の言語を理解することができるようになります。また、翻訳や通訳の分野にも応用されています。

Gemini APIの利用

pip install -q google-generativeai

import google.generativeai as genai  # Googleの生成AIライブラリ
from google.colab import userdata  # Google Colabのユーザーデータモジュール

GOOGLE_API_KEY=userdata.get('GOOGLE_API_KEY')
genai.configure(api_key=GOOGLE_API_KEY)

print("使用可能なGeminiのモデル一覧:")
for model in genai.list_models():
    if "generateContent" in model.supported_generation_methods:
        print(model.name)

model = genai.GenerativeModel("models/gemini-2.0-flash-001")
print(f"選択されたモデル: {model.model_name}")

config = genai.GenerationConfig(
    max_output_tokens=2048,  # 生成されるトークンの最大数
    temperature=0.8,  # 出力のランダム性を制御
)

def generate_content(model, prompt):
    response = model.generate_content(prompt, generation_config=config)
    return response.text

user_input = input("質問を入力してください: ")
response = generate_content(model, user_input)
print(f"Gemini: {response}")

LLMの内部構造: LLaMA-2

LLaMA-2の主要構造
1. Pythonコード (約1000行) : llama
2. パラメータ : llama-2-7b 次の単語予想に使用
3. トークナイザ : 前処理でテキストを分割

step1: トークナイザで入力テキストを細かく分割する
step2: Transformerとパラメータを用いて次の単語を予測
step3: 予測結果を元に次の単語を選択し結合
※予測結果から単語を選ぶプロセスをサンプリングと呼ぶ

model.py

class Transformer(nn.Module):
    def __init__(self, params: ModelArgs):
        """
        Initialize a Transformer model.

        Args:
            params (ModelArgs): Model configuration parameters.

        Attributes:
            params (ModelArgs): Model configuration parameters.
            vocab_size (int): Vocabulary size.
            n_layers (int): Number of layers in the model.
            tok_embeddings (ParallelEmbedding): Token embeddings.
            layers (torch.nn.ModuleList): List of Transformer blocks.
            norm (RMSNorm): Layer normalization for the model output.
            output (ColumnParallelLinear): Linear layer for final output.
            freqs_cis (torch.Tensor): Precomputed cosine and sine frequencies.

        """
        super().__init__()
        self.params = params
        self.vocab_size = params.vocab_size
        self.n_layers = params.n_layers

        self.tok_embeddings = ParallelEmbedding(
            params.vocab_size, params.dim, init_method=lambda x: x
        )

        self.layers = torch.nn.ModuleList()
        for layer_id in range(params.n_layers):
            self.layers.append(TransformerBlock(layer_id, params))

        self.norm = RMSNorm(params.dim, eps=params.norm_eps)
        self.output = ColumnParallelLinear(
            params.dim, params.vocab_size, bias=False, init_method=lambda x: x
        )

        self.freqs_cis = precompute_freqs_cis(
            # Note that self.params.max_seq_len is multiplied by 2 because the token limit for the Llama 2 generation of models is 4096. 
            # Adding this multiplier instead of using 4096 directly allows for dynamism of token lengths while training or fine-tuning.
            self.params.dim // self.params.n_heads, self.params.max_seq_len * 2
        )

    @torch.inference_mode()
    def forward(self, tokens: torch.Tensor, start_pos: int):
        """
        Perform a forward pass through the Transformer model.

        Args:
            tokens (torch.Tensor): Input token indices.
            start_pos (int): Starting position for attention caching.

        Returns:
            torch.Tensor: Output logits after applying the Transformer model.

        """
        _bsz, seqlen = tokens.shape
        h = self.tok_embeddings(tokens)
        self.freqs_cis = self.freqs_cis.to(h.device)
        freqs_cis = self.freqs_cis[start_pos : start_pos + seqlen]

        mask = None
        if seqlen > 1:
            mask = torch.full(
                (seqlen, seqlen), float("-inf"), device=tokens.device
            )

            mask = torch.triu(mask, diagonal=1)

            # When performing key-value caching, we compute the attention scores
            # only for the new sequence. Thus, the matrix of scores is of size
            # (seqlen, cache_len + seqlen), and the only masked entries are (i, j) for
            # j > cache_len + i, since row i corresponds to token cache_len + i.
            mask = torch.hstack([
                torch.zeros((seqlen, start_pos), device=tokens.device),
                mask
            ]).type_as(h)

        for layer in self.layers:
            h = layer(h, start_pos, freqs_cis, mask)
        h = self.norm(h)
        output = self.output(h).float()
        return output

いくつかの処理では、パラメータと呼ばれる数値を使用して演算が行われる
パラメータを繰り返し調整し、予想の精度を高める
※予想が誤っていた場合、パラメータを修正する (バックプロパゲーションと呼ばれる)
※LLMのパラメータ: 「Apple」という単語は、その単語が持つ意味、文脈、関連する概念(例えば、食べ物、会社名、色など)が、膨大な数値の集合(ベクトル)として表現されます。これらの数値の組み合わせによって、単語間の関係性が学習されています。
※単語のIDやトークンに対して数値ベクトルが割り当てられる
※モデルの大部分を占めるのは、単語間の関係性を理解し、次の単語を予測するための複雑な計算を行う、**アテンション機構やフィードフォワードネットワーク内の重み(weights)やバイアス(biases)**です。これらのパラメータが、文脈に応じた適切な単語の埋め込みベクトルを組み合わせ、最終的な出力を生成する。

### LLMの学習ステップ
学習ステップ1. 事前学習(Pre-training): 基盤モデル ただし、対話形式の学習が不足、不適切な質問にも答えてしまう
学習ステップ2. ファインチューニング(Fine-tuning): 特定のタスクや分野に特化させることができる
学習ステップ3. ヒューマンフィードバック(Human Feedback): 人間からのフィードバックを得る

### 事前学習に使われるデータ
モデルの用途によって、収集するデータ元(Webサイト、書籍、会話テキスト)の比率などが変わってくる。例えば、チャット用途の場合は、比較的会話データが多く学習される傾向にある。

### Transformer
Transformerの構造は主にEncoder(テキスト理解)とDecoder(テキスト生成)の要素から成り立つ

Attention機構の役割: 文中から関連度の高い単語を発見する