{
  "cells": [
    {
      "cell_type": "markdown",
      "metadata": {},
      "source": [
        "# AI Infra Tiny LLM 全流程 Colab 实验\n",
        "\n",
        "这个 notebook 用一个很小的 Transformer 语言模型跑完整闭环：\n",
        "\n",
        "1. 数据与 tokenizer\n",
        "2. 从零预训练 causal LM\n",
        "3. SFT 指令微调\n",
        "4. DPO 风格偏好优化\n",
        "5. 评估与对比\n",
        "6. 推理生成\n",
        "7. 动态量化\n",
        "8. 最小 API 服务\n",
        "9. 导出模型工件\n",
        "\n",
        "为了保证 Google Colab 免费环境也能学习，本实验使用小数据、小模型和少量 step。它不是为了得到高质量模型，而是为了把 AI infra 的生命周期跑通。\n",
        "\n",
        "建议：菜单选择 `Runtime -> Change runtime type -> T4 GPU`。没有 GPU 也能跑，只是慢一点。"
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {},
      "outputs": [],
      "source": [
        "!pip -q install tokenizers pandas matplotlib tqdm fastapi uvicorn"
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {},
      "outputs": [],
      "source": [
        "import copy\n",
        "import json\n",
        "import math\n",
        "import os\n",
        "import random\n",
        "import shutil\n",
        "import time\n",
        "from pathlib import Path\n",
        "\n",
        "import matplotlib.pyplot as plt\n",
        "import pandas as pd\n",
        "import torch\n",
        "import torch.nn as nn\n",
        "import torch.nn.functional as F\n",
        "from tqdm.auto import tqdm\n",
        "\n",
        "SEED = 42\n",
        "random.seed(SEED)\n",
        "torch.manual_seed(SEED)\n",
        "device = \"cuda\" if torch.cuda.is_available() else \"cpu\"\n",
        "print(\"device:\", device)"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {},
      "source": [
        "## 1. 构造一个可控训练语料\n",
        "\n",
        "真实预训练会处理 TB/PB 级网页、代码、论文、书籍、日志和合成数据。这里用几十条小语料模拟：\n",
        "\n",
        "- `pretrain_corpus`: 连续文本，用于 next-token prediction\n",
        "- `sft_examples`: 指令-回答样本，用于 SFT\n",
        "- `preference_pairs`: chosen/rejected，用于偏好优化\n",
        "\n",
        "工程上你需要额外记录 license、来源、去重版本、PII 过滤、污染检查、质量分、语言分布和数据哈希。"
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {},
      "outputs": [],
      "source": [
        "pretrain_corpus = [\n",
        "    \"AI infra begins with data. Clean data, stable training, and careful evaluation create reliable models.\",\n",
        "    \"Pretraining optimizes next token prediction over large corpora. The model learns syntax, facts, style, and reasoning patterns.\",\n",
        "    \"Post-training adapts a base model with supervised examples, preference data, safety policy, and domain feedback.\",\n",
        "    \"Inference systems manage prefill, decode, KV cache, batching, routing, quantization, and monitoring.\",\n",
        "    \"A production model needs reproducible data, versioned checkpoints, eval gates, rollback plans, and cost controls.\",\n",
        "    \"Tokenization turns text into integer tokens. Vocabulary design changes sequence length, multilingual quality, and cost.\",\n",
        "    \"Distributed training uses data parallelism, tensor parallelism, pipeline parallelism, and sharded optimizers.\",\n",
        "    \"LoRA updates low-rank adapter matrices. QLoRA combines quantized base weights with trainable adapters.\",\n",
        "    \"DPO compares chosen and rejected answers to make the policy prefer better completions without a separate reward model.\",\n",
        "    \"vLLM improves serving throughput with paged KV cache management and continuous batching.\",\n",
        "    \"TensorRT-LLM builds optimized inference engines for NVIDIA GPUs. llama.cpp and GGUF are common for local CPU and edge usage.\",\n",
        "    \"Evaluation should include automatic benchmarks, private task tests, safety tests, latency tests, and human review.\",\n",
        "    \"Observability tracks tokens per second, latency percentiles, GPU memory, request errors, prompt categories, and quality drift.\",\n",
        "    \"模型训练不是只看 loss，还要看数据质量、吞吐、显存、checkpoint 恢复、评估集泄漏和线上回滚。\",\n",
        "    \"预训练学习通用模式，后训练塑造指令跟随、风格、安全边界和业务偏好，推理系统决定成本和体验。\",\n",
        "    \"一个完整 AI infra 团队通常包括数据工程、训练平台、模型算法、评估、安全、推理服务和产品集成。\",\n",
        "]\n",
        "\n",
        "sft_examples = [\n",
        "    {\n",
        "        \"prompt\": \"用三句话解释 AI infra 的三大阶段。\",\n",
        "        \"response\": \"第一，预训练用大规模语料学习通用语言模式。第二，后训练用指令、偏好和安全数据把模型变成可用助手。第三，推理系统把模型稳定、低延迟、低成本地服务给用户。\",\n",
        "    },\n",
        "    {\n",
        "        \"prompt\": \"什么时候选择 LoRA 而不是全量微调？\",\n",
        "        \"response\": \"当数据量较小、预算有限、希望快速迭代或需要为多个场景维护轻量适配器时，优先选择 LoRA。全量微调更适合需要深度改变模型能力且有足够算力和评估体系的场景。\",\n",
        "    },\n",
        "    {\n",
        "        \"prompt\": \"预训练数据管线要检查什么？\",\n",
        "        \"response\": \"要检查来源许可、去重、PII、毒性、语言比例、质量分、格式一致性、评估集污染和数据版本。每一步都要能复现并留下审计记录。\",\n",
        "    },\n",
        "    {\n",
        "        \"prompt\": \"推理服务为什么需要批处理？\",\n",
        "        \"response\": \"批处理能把多个请求合并到一次 GPU 计算中，提高吞吐和利用率。LLM 服务通常还需要连续批处理，让短请求和长请求动态进入解码队列。\",\n",
        "    },\n",
        "    {\n",
        "        \"prompt\": \"DPO 和 SFT 的区别是什么？\",\n",
        "        \"response\": \"SFT 学习给定问题的目标答案，主要模仿示例。DPO 学习同一提示下 chosen 优于 rejected 的偏好信号，用来进一步对齐风格、质量和安全边界。\",\n",
        "    },\n",
        "    {\n",
        "        \"prompt\": \"上线一个模型前最小评估集应包含什么？\",\n",
        "        \"response\": \"至少包含核心业务任务、失败案例回归集、安全与拒答测试、延迟和吞吐测试、成本估算、长上下文测试以及人工抽检样本。\",\n",
        "    },\n",
        "]\n",
        "\n",
        "preference_pairs = [\n",
        "    {\n",
        "        \"prompt\": \"如何降低 LLM 推理成本？\",\n",
        "        \"chosen\": \"可以从模型尺寸、量化、KV cache、连续批处理、prefix cache、请求路由、投机解码、缓存命中率和 autoscaling 入手，并用线上指标验证质量没有明显下降。\",\n",
        "        \"rejected\": \"直接买更多 GPU 就可以降低成本。\",\n",
        "    },\n",
        "    {\n",
        "        \"prompt\": \"训练 loss 降低就能上线吗？\",\n",
        "        \"chosen\": \"不能。loss 只是训练信号，还要通过任务评估、安全评估、泛化测试、污染检查、延迟成本测试和灰度验证。\",\n",
        "        \"rejected\": \"可以，只要 loss 降低说明模型一定更好。\",\n",
        "    },\n",
        "    {\n",
        "        \"prompt\": \"为什么要保存 checkpoint？\",\n",
        "        \"chosen\": \"checkpoint 用于故障恢复、实验对比、回滚、继续训练和审计。大规模训练必须定期保存并验证可恢复。\",\n",
        "        \"rejected\": \"checkpoint 只是在最后导出模型时才需要。\",\n",
        "    },\n",
        "    {\n",
        "        \"prompt\": \"RAG 可以替代微调吗？\",\n",
        "        \"chosen\": \"不完全。RAG 适合注入可更新知识，微调适合改变行为、格式、风格和任务策略。生产系统经常组合使用。\",\n",
        "        \"rejected\": \"可以，RAG 永远比微调好，不需要评估。\",\n",
        "    },\n",
        "]\n",
        "\n",
        "pd.DataFrame(sft_examples)"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {},
      "source": [
        "## 2. 训练 tokenizer\n",
        "\n",
        "真实项目常见选择：\n",
        "\n",
        "- BPE / Unigram / WordPiece / SentencePiece\n",
        "- Hugging Face Tokenizers\n",
        "- tiktoken 风格 tokenizer\n",
        "- 需要定义 BOS/EOS/PAD、chat template、工具调用 token、多模态占位 token\n",
        "\n",
        "这里用 Hugging Face `tokenizers` 训练一个 Byte-level BPE，避免中英文混合时过度依赖空格。"
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {},
      "outputs": [],
      "source": [
        "from tokenizers import Tokenizer\n",
        "from tokenizers.decoders import ByteLevel as ByteLevelDecoder\n",
        "from tokenizers.models import BPE\n",
        "from tokenizers.pre_tokenizers import ByteLevel\n",
        "from tokenizers.trainers import BpeTrainer\n",
        "\n",
        "SPECIAL_TOKENS = [\"[PAD]\", \"[UNK]\", \"[BOS]\", \"[EOS]\", \"[USER]\", \"[ASSISTANT]\"]\n",
        "tokenizer = Tokenizer(BPE(unk_token=\"[UNK]\"))\n",
        "tokenizer.pre_tokenizer = ByteLevel(add_prefix_space=False)\n",
        "tokenizer.decoder = ByteLevelDecoder()\n",
        "\n",
        "all_texts = pretrain_corpus[:]\n",
        "all_texts += [f\"[USER] {x['prompt']}\\n[ASSISTANT] {x['response']}\" for x in sft_examples]\n",
        "all_texts += [f\"[USER] {x['prompt']}\\n[ASSISTANT] {x['chosen']}\" for x in preference_pairs]\n",
        "all_texts += [f\"[USER] {x['prompt']}\\n[ASSISTANT] {x['rejected']}\" for x in preference_pairs]\n",
        "\n",
        "trainer = BpeTrainer(vocab_size=500, min_frequency=1, special_tokens=SPECIAL_TOKENS)\n",
        "tokenizer.train_from_iterator(all_texts, trainer=trainer)\n",
        "\n",
        "PAD = tokenizer.token_to_id(\"[PAD]\")\n",
        "BOS = tokenizer.token_to_id(\"[BOS]\")\n",
        "EOS = tokenizer.token_to_id(\"[EOS]\")\n",
        "vocab_size = tokenizer.get_vocab_size()\n",
        "print(\"vocab_size:\", vocab_size, \"PAD/BOS/EOS:\", PAD, BOS, EOS)\n",
        "\n",
        "def encode(text, add_bos=True, add_eos=True):\n",
        "    ids = tokenizer.encode(text).ids\n",
        "    if add_bos:\n",
        "        ids = [BOS] + ids\n",
        "    if add_eos:\n",
        "        ids = ids + [EOS]\n",
        "    return ids\n",
        "\n",
        "def decode(ids):\n",
        "    ids = [int(x) for x in ids if int(x) not in (PAD, BOS, EOS)]\n",
        "    return tokenizer.decode(ids, skip_special_tokens=True)\n",
        "\n",
        "sample = \"[USER] 什么是 AI infra?\\n[ASSISTANT]\"\n",
        "ids = encode(sample, add_eos=False)\n",
        "print(ids[:30])\n",
        "print(decode(ids))"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {},
      "source": [
        "## 3. 定义 tiny GPT 模型\n",
        "\n",
        "生产中你通常不会自己手写 Transformer，而会使用：\n",
        "\n",
        "- `transformers.AutoModelForCausalLM`\n",
        "- Megatron-LM / NeMo / DeepSpeed / FSDP\n",
        "- 自研 kernel、FlashAttention、FP8、checkpointing、tensor/pipeline parallel\n",
        "\n",
        "但为了看清楚完整链路，这里手写一个 2 层 decoder-only causal LM。"
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {},
      "outputs": [],
      "source": [
        "class TinyGPT(nn.Module):\n",
        "    def __init__(self, vocab_size, block_size=96, n_embd=96, n_head=4, n_layer=2, dropout=0.10):\n",
        "        super().__init__()\n",
        "        self.block_size = block_size\n",
        "        self.tok_emb = nn.Embedding(vocab_size, n_embd)\n",
        "        self.pos_emb = nn.Embedding(block_size, n_embd)\n",
        "        layer = nn.TransformerEncoderLayer(\n",
        "            d_model=n_embd,\n",
        "            nhead=n_head,\n",
        "            dim_feedforward=4 * n_embd,\n",
        "            dropout=dropout,\n",
        "            activation=\"gelu\",\n",
        "            batch_first=True,\n",
        "            norm_first=True,\n",
        "        )\n",
        "        self.blocks = nn.TransformerEncoder(layer, num_layers=n_layer)\n",
        "        self.ln_f = nn.LayerNorm(n_embd)\n",
        "        self.head = nn.Linear(n_embd, vocab_size, bias=False)\n",
        "        self.head.weight = self.tok_emb.weight\n",
        "\n",
        "    def forward(self, idx, targets=None, loss_mask=None):\n",
        "        B, T = idx.shape\n",
        "        if T > self.block_size:\n",
        "            idx = idx[:, -self.block_size :]\n",
        "            if targets is not None:\n",
        "                targets = targets[:, -self.block_size :]\n",
        "            if loss_mask is not None:\n",
        "                loss_mask = loss_mask[:, -self.block_size :]\n",
        "            B, T = idx.shape\n",
        "\n",
        "        pos = torch.arange(0, T, device=idx.device).unsqueeze(0)\n",
        "        x = self.tok_emb(idx) + self.pos_emb(pos)\n",
        "        causal_mask = torch.triu(\n",
        "            torch.full((T, T), float(\"-inf\"), device=idx.device),\n",
        "            diagonal=1,\n",
        "        )\n",
        "        x = self.blocks(x, mask=causal_mask)\n",
        "        logits = self.head(self.ln_f(x))\n",
        "\n",
        "        loss = None\n",
        "        if targets is not None:\n",
        "            losses = F.cross_entropy(\n",
        "                logits.reshape(-1, logits.size(-1)),\n",
        "                targets.reshape(-1),\n",
        "                reduction=\"none\",\n",
        "            )\n",
        "            if loss_mask is None:\n",
        "                loss_mask = (targets != PAD).float()\n",
        "            losses = losses.reshape_as(targets)\n",
        "            denom = loss_mask.sum().clamp_min(1.0)\n",
        "            loss = (losses * loss_mask).sum() / denom\n",
        "        return logits, loss\n",
        "\n",
        "\n",
        "block_size = 96\n",
        "model = TinyGPT(vocab_size=vocab_size, block_size=block_size).to(device)\n",
        "print(f\"parameters: {sum(p.numel() for p in model.parameters())/1e6:.3f}M\")"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {},
      "source": [
        "## 4. 数据 batch：预训练与 SFT mask\n",
        "\n",
        "预训练：所有非 PAD token 都参与 next-token loss。\n",
        "\n",
        "SFT：只让 assistant 回答部分参与 loss。否则模型会浪费容量去预测用户提示。"
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {},
      "outputs": [],
      "source": [
        "def pad_to_block(ids, block_size):\n",
        "    ids = ids[: block_size + 1]\n",
        "    if len(ids) < block_size + 1:\n",
        "        ids = ids + [PAD] * (block_size + 1 - len(ids))\n",
        "    return ids\n",
        "\n",
        "def make_lm_batch(texts, batch_size=8):\n",
        "    xs, ys, masks = [], [], []\n",
        "    for text in random.choices(texts, k=batch_size):\n",
        "        ids = pad_to_block(encode(text), block_size)\n",
        "        x = ids[:-1]\n",
        "        y = ids[1:]\n",
        "        mask = [1.0 if tok != PAD else 0.0 for tok in y]\n",
        "        xs.append(x); ys.append(y); masks.append(mask)\n",
        "    return (\n",
        "        torch.tensor(xs, device=device, dtype=torch.long),\n",
        "        torch.tensor(ys, device=device, dtype=torch.long),\n",
        "        torch.tensor(masks, device=device, dtype=torch.float32),\n",
        "    )\n",
        "\n",
        "def chat_prefix(prompt):\n",
        "    return f\"[USER] {prompt}\\n[ASSISTANT] \"\n",
        "\n",
        "def chat_full(prompt, response):\n",
        "    return f\"{chat_prefix(prompt)}{response}\"\n",
        "\n",
        "def make_sft_batch(examples, batch_size=4):\n",
        "    xs, ys, masks = [], [], []\n",
        "    for ex in random.choices(examples, k=batch_size):\n",
        "        prompt_ids = encode(chat_prefix(ex[\"prompt\"]), add_bos=True, add_eos=False)\n",
        "        full_ids = pad_to_block(encode(chat_full(ex[\"prompt\"], ex[\"response\"])), block_size)\n",
        "        x = full_ids[:-1]\n",
        "        y = full_ids[1:]\n",
        "        # y[i] predicts full_ids[i+1]. First answer token appears at y[len(prompt_ids)-1].\n",
        "        loss_mask = [1.0 if i >= len(prompt_ids) - 1 and y[i] != PAD else 0.0 for i in range(len(y))]\n",
        "        xs.append(x); ys.append(y); masks.append(loss_mask)\n",
        "    return (\n",
        "        torch.tensor(xs, device=device, dtype=torch.long),\n",
        "        torch.tensor(ys, device=device, dtype=torch.long),\n",
        "        torch.tensor(masks, device=device, dtype=torch.float32),\n",
        "    )"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {},
      "source": [
        "## 5. 预训练：next-token prediction\n",
        "\n",
        "真实大模型预训练会把这个循环放大到数万 GPU、数万亿 token，并加上：\n",
        "\n",
        "- 分布式并行：DDP / FSDP / DeepSpeed ZeRO / Megatron TP+PP+SP\n",
        "- 混合精度：BF16 / FP8\n",
        "- 激活重计算、梯度累积、学习率 warmup 和 cosine decay\n",
        "- checkpoint/resume、故障容错、数据 shuffle 状态恢复\n",
        "- 训练吞吐、MFU、tokens/sec、OOM 和 loss spike 监控"
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {},
      "outputs": [],
      "source": [
        "def train_loop(model, batch_fn, steps, lr, label):\n",
        "    model.train()\n",
        "    opt = torch.optim.AdamW(model.parameters(), lr=lr, weight_decay=0.01)\n",
        "    history = []\n",
        "    pbar = tqdm(range(steps), desc=label)\n",
        "    for step in pbar:\n",
        "        x, y, mask = batch_fn()\n",
        "        _, loss = model(x, y, mask)\n",
        "        opt.zero_grad(set_to_none=True)\n",
        "        loss.backward()\n",
        "        torch.nn.utils.clip_grad_norm_(model.parameters(), 1.0)\n",
        "        opt.step()\n",
        "        history.append(float(loss.detach().cpu()))\n",
        "        pbar.set_postfix(loss=f\"{history[-1]:.3f}\")\n",
        "    return history\n",
        "\n",
        "pretrain_loss = train_loop(\n",
        "    model,\n",
        "    batch_fn=lambda: make_lm_batch(pretrain_corpus, batch_size=8),\n",
        "    steps=50,\n",
        "    lr=3e-3,\n",
        "    label=\"pretrain\",\n",
        ")"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {},
      "source": [
        "## 6. SFT：让 base model 学会回答指令\n",
        "\n",
        "SFT 的核心不是“训练更多”，而是“训练正确格式和行为”：\n",
        "\n",
        "- 明确 chat template\n",
        "- 只对 assistant tokens 计算 loss\n",
        "- 控制样本质量，少而精通常比脏数据更有效\n",
        "- 保留验证集，避免只看训练 loss"
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {},
      "outputs": [],
      "source": [
        "sft_loss = train_loop(\n",
        "    model,\n",
        "    batch_fn=lambda: make_sft_batch(sft_examples, batch_size=4),\n",
        "    steps=60,\n",
        "    lr=8e-4,\n",
        "    label=\"sft\",\n",
        ")"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {},
      "source": [
        "## 7. DPO 风格偏好优化\n",
        "\n",
        "DPO 使用同一 prompt 下的 `chosen` 和 `rejected`。直觉是：\n",
        "\n",
        "> 当前模型应比参考模型更偏好 chosen，而不是 rejected。\n",
        "\n",
        "生产中可用 TRL 的 `DPOTrainer`、OpenRLHF、NeMo-Aligner、LLaMA-Factory 或 Axolotl。这里手写一个极简版本，帮助理解 loss 的结构。"
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {},
      "outputs": [],
      "source": [
        "def response_logprob(model, prompt, response):\n",
        "    prompt_ids = encode(chat_prefix(prompt), add_bos=True, add_eos=False)\n",
        "    full_ids = encode(chat_full(prompt, response), add_bos=True, add_eos=True)\n",
        "    full_ids = full_ids[: block_size + 1]\n",
        "    x = torch.tensor([full_ids[:-1]], device=device, dtype=torch.long)\n",
        "    y = torch.tensor([full_ids[1:]], device=device, dtype=torch.long)\n",
        "    logits, _ = model(x)\n",
        "    logp = F.log_softmax(logits, dim=-1)\n",
        "    token_logp = logp[0, torch.arange(y.size(1), device=device), y[0]]\n",
        "    mask = torch.zeros_like(token_logp)\n",
        "    start = min(len(prompt_ids) - 1, token_logp.numel() - 1)\n",
        "    mask[start:] = 1.0\n",
        "    return (token_logp * mask).sum() / mask.sum().clamp_min(1.0)\n",
        "\n",
        "ref_model = copy.deepcopy(model).eval()\n",
        "for p in ref_model.parameters():\n",
        "    p.requires_grad_(False)\n",
        "\n",
        "def dpo_step(model, ref_model, pairs, beta=0.2):\n",
        "    losses = []\n",
        "    for ex in pairs:\n",
        "        pi_chosen = response_logprob(model, ex[\"prompt\"], ex[\"chosen\"])\n",
        "        pi_rejected = response_logprob(model, ex[\"prompt\"], ex[\"rejected\"])\n",
        "        with torch.no_grad():\n",
        "            ref_chosen = response_logprob(ref_model, ex[\"prompt\"], ex[\"chosen\"])\n",
        "            ref_rejected = response_logprob(ref_model, ex[\"prompt\"], ex[\"rejected\"])\n",
        "        pi_delta = pi_chosen - pi_rejected\n",
        "        ref_delta = ref_chosen - ref_rejected\n",
        "        losses.append(-F.logsigmoid(beta * (pi_delta - ref_delta)))\n",
        "    return torch.stack(losses).mean()\n",
        "\n",
        "model.train()\n",
        "opt = torch.optim.AdamW(model.parameters(), lr=4e-4, weight_decay=0.01)\n",
        "dpo_loss = []\n",
        "pbar = tqdm(range(40), desc=\"dpo\")\n",
        "for step in pbar:\n",
        "    batch = random.choices(preference_pairs, k=2)\n",
        "    loss = dpo_step(model, ref_model, batch)\n",
        "    opt.zero_grad(set_to_none=True)\n",
        "    loss.backward()\n",
        "    torch.nn.utils.clip_grad_norm_(model.parameters(), 1.0)\n",
        "    opt.step()\n",
        "    dpo_loss.append(float(loss.detach().cpu()))\n",
        "    pbar.set_postfix(loss=f\"{dpo_loss[-1]:.3f}\")"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {},
      "source": [
        "## 8. 评估：loss、perplexity、偏好准确率和人工样例\n",
        "\n",
        "生产评估建议分层：\n",
        "\n",
        "- base benchmark：MMLU、GSM8K、HumanEval、MT-Bench 等\n",
        "- 私有业务集：最重要，必须版本化\n",
        "- 安全集：越狱、PII、歧视、错误医疗/法律/金融建议等\n",
        "- 线上指标：延迟、错误率、成本、人工反馈、回滚率"
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {},
      "outputs": [],
      "source": [
        "@torch.no_grad()\n",
        "def eval_lm_loss(model, texts):\n",
        "    model.eval()\n",
        "    x, y, mask = make_lm_batch(texts, batch_size=len(texts))\n",
        "    _, loss = model(x, y, mask)\n",
        "    return float(loss.cpu()), float(torch.exp(loss).cpu())\n",
        "\n",
        "@torch.no_grad()\n",
        "def preference_accuracy(model, pairs):\n",
        "    model.eval()\n",
        "    rows = []\n",
        "    for ex in pairs:\n",
        "        chosen_lp = response_logprob(model, ex[\"prompt\"], ex[\"chosen\"])\n",
        "        rejected_lp = response_logprob(model, ex[\"prompt\"], ex[\"rejected\"])\n",
        "        ok = chosen_lp > rejected_lp\n",
        "        rows.append({\n",
        "            \"prompt\": ex[\"prompt\"],\n",
        "            \"chosen_logp\": float(chosen_lp.cpu()),\n",
        "            \"rejected_logp\": float(rejected_lp.cpu()),\n",
        "            \"model_prefers_chosen\": bool(ok),\n",
        "        })\n",
        "    return pd.DataFrame(rows)\n",
        "\n",
        "eval_loss, ppl = eval_lm_loss(model, pretrain_corpus)\n",
        "pref_df = preference_accuracy(model, preference_pairs)\n",
        "print(\"eval_loss:\", round(eval_loss, 3), \"ppl:\", round(ppl, 2))\n",
        "display(pref_df)\n",
        "\n",
        "plt.figure(figsize=(10, 4))\n",
        "plt.plot(pretrain_loss, label=\"pretrain\")\n",
        "plt.plot(range(len(pretrain_loss), len(pretrain_loss) + len(sft_loss)), sft_loss, label=\"sft\")\n",
        "offset = len(pretrain_loss) + len(sft_loss)\n",
        "plt.plot(range(offset, offset + len(dpo_loss)), dpo_loss, label=\"dpo\")\n",
        "plt.xlabel(\"step\")\n",
        "plt.ylabel(\"loss\")\n",
        "plt.title(\"Tiny LLM training phases\")\n",
        "plt.legend()\n",
        "plt.grid(alpha=0.25)\n",
        "plt.show()"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {},
      "source": [
        "## 9. 推理：采样、temperature、top-k\n",
        "\n",
        "推理时，模型服务真正消耗来自两个阶段：\n",
        "\n",
        "- prefill：处理 prompt，建立 KV cache\n",
        "- decode：逐 token 生成，通常受内存带宽和 KV cache 管理影响\n",
        "\n",
        "生产系统要同时优化首 token 延迟、总延迟、吞吐、成本和质量。"
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {},
      "outputs": [],
      "source": [
        "@torch.no_grad()\n",
        "def generate(model, prompt, max_new_tokens=80, temperature=0.8, top_k=40):\n",
        "    model.eval()\n",
        "    prefix = chat_prefix(prompt)\n",
        "    ids = encode(prefix, add_bos=True, add_eos=False)\n",
        "    idx = torch.tensor([ids], device=next(model.parameters()).device, dtype=torch.long)\n",
        "\n",
        "    for _ in range(max_new_tokens):\n",
        "        idx_cond = idx[:, -block_size:]\n",
        "        logits, _ = model(idx_cond)\n",
        "        logits = logits[:, -1, :] / max(temperature, 1e-5)\n",
        "        if top_k is not None:\n",
        "            values, _ = torch.topk(logits, min(top_k, logits.size(-1)))\n",
        "            logits[logits < values[:, [-1]]] = -float(\"inf\")\n",
        "        probs = F.softmax(logits, dim=-1)\n",
        "        next_id = torch.multinomial(probs, num_samples=1)\n",
        "        idx = torch.cat([idx, next_id], dim=1)\n",
        "        if int(next_id.item()) == EOS:\n",
        "            break\n",
        "\n",
        "    answer_ids = idx[0, len(ids):].detach().cpu().tolist()\n",
        "    return decode(answer_ids).strip()\n",
        "\n",
        "for prompt in [\n",
        "    \"用三句话解释 AI infra 的三大阶段。\",\n",
        "    \"如何降低 LLM 推理成本？\",\n",
        "    \"训练 loss 降低就能上线吗？\",\n",
        "]:\n",
        "    print(\"\\nPROMPT:\", prompt)\n",
        "    print(\"ANSWER:\", generate(model, prompt, temperature=0.7))"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {},
      "source": [
        "## 10. 量化：用更少内存换取可接受质量\n",
        "\n",
        "常见量化方向：\n",
        "\n",
        "- 训练/微调用 bitsandbytes 4-bit/8-bit、QLoRA\n",
        "- 推理用 GPTQ、AWQ、SmoothQuant、FP8、GGUF、TensorRT-LLM engine\n",
        "- 评估不仅看速度，也要看准确率、安全、长上下文和特定格式输出\n",
        "\n",
        "这里用 PyTorch dynamic quantization 演示概念。"
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {},
      "outputs": [],
      "source": [
        "cpu_model = copy.deepcopy(model).cpu().eval()\n",
        "q_model = torch.quantization.quantize_dynamic(cpu_model, {nn.Linear}, dtype=torch.qint8)\n",
        "\n",
        "def object_size_bytes(obj):\n",
        "    if torch.is_tensor(obj):\n",
        "        return obj.numel() * obj.element_size()\n",
        "    if isinstance(obj, (list, tuple)):\n",
        "        return sum(object_size_bytes(x) for x in obj)\n",
        "    if isinstance(obj, dict):\n",
        "        return sum(object_size_bytes(x) for x in obj.values())\n",
        "    return 0\n",
        "\n",
        "def parameter_size_mb(m):\n",
        "    return sum(object_size_bytes(v) for v in m.state_dict().values()) / 1024 / 1024\n",
        "\n",
        "print(\"fp model MB:\", round(parameter_size_mb(cpu_model), 3))\n",
        "print(\"dynamic quantized state MB:\", round(parameter_size_mb(q_model), 3))\n",
        "print(generate(q_model, \"如何降低 LLM 推理成本？\", temperature=0.8))\n",
        "model = model.to(device)"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {},
      "source": [
        "## 11. 最小 API 服务\n",
        "\n",
        "生产服务通常会使用 vLLM、TGI、TensorRT-LLM、SGLang、Ray Serve、KServe 或 Triton。这里用 FastAPI 测试一个最小 `/generate` 接口。\n",
        "\n",
        "如果要在 Colab 暴露公网，需要额外使用 ngrok/cloudflared，这里不默认开启。"
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {},
      "outputs": [],
      "source": [
        "from fastapi import FastAPI\n",
        "from fastapi.testclient import TestClient\n",
        "from pydantic import BaseModel\n",
        "\n",
        "app = FastAPI(title=\"Tiny AI Infra LLM\")\n",
        "\n",
        "class GenerateRequest(BaseModel):\n",
        "    prompt: str\n",
        "    max_new_tokens: int = 60\n",
        "    temperature: float = 0.8\n",
        "\n",
        "@app.post(\"/generate\")\n",
        "def generate_endpoint(req: GenerateRequest):\n",
        "    return {\n",
        "        \"prompt\": req.prompt,\n",
        "        \"text\": generate(model, req.prompt, max_new_tokens=req.max_new_tokens, temperature=req.temperature),\n",
        "    }\n",
        "\n",
        "client = TestClient(app)\n",
        "response = client.post(\"/generate\", json={\"prompt\": \"DPO 和 SFT 的区别是什么？\", \"max_new_tokens\": 50})\n",
        "response.json()"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {},
      "source": [
        "## 12. 导出工件\n",
        "\n",
        "真实导出通常包括：\n",
        "\n",
        "- checkpoint / adapter / merged model\n",
        "- tokenizer 与 chat template\n",
        "- config、训练参数、数据版本\n",
        "- eval report、model card、license\n",
        "- serving image、推理引擎配置、回滚版本"
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {},
      "outputs": [],
      "source": [
        "artifact_dir = Path(\"ai_infra_tiny_artifacts\")\n",
        "artifact_dir.mkdir(exist_ok=True)\n",
        "\n",
        "torch.save(cpu_model.state_dict(), artifact_dir / \"tiny_gpt_state_dict.pt\")\n",
        "tokenizer.save(str(artifact_dir / \"tokenizer.json\"))\n",
        "\n",
        "config = {\n",
        "    \"model_type\": \"TinyGPT\",\n",
        "    \"vocab_size\": vocab_size,\n",
        "    \"block_size\": block_size,\n",
        "    \"n_embd\": 96,\n",
        "    \"n_head\": 4,\n",
        "    \"n_layer\": 2,\n",
        "    \"special_tokens\": SPECIAL_TOKENS,\n",
        "    \"purpose\": \"educational end-to-end AI infra demo, not production quality\",\n",
        "}\n",
        "(artifact_dir / \"config.json\").write_text(json.dumps(config, ensure_ascii=False, indent=2), encoding=\"utf-8\")\n",
        "\n",
        "model_card = f\"\"\"# Tiny AI Infra LLM\n",
        "\n",
        "This is an educational tiny language model trained in a Colab notebook.\n",
        "\n",
        "## Training phases\n",
        "- Pretraining steps: {len(pretrain_loss)}\n",
        "- SFT steps: {len(sft_loss)}\n",
        "- DPO-style steps: {len(dpo_loss)}\n",
        "\n",
        "## Intended use\n",
        "Learning AI infra workflow only. Do not use for factual answers or production.\n",
        "\n",
        "## Evaluation\n",
        "- eval_loss: {eval_loss:.4f}\n",
        "- perplexity: {ppl:.4f}\n",
        "- preference_accuracy: {pref_df['model_prefers_chosen'].mean():.2f}\n",
        "\"\"\"\n",
        "(artifact_dir / \"MODEL_CARD.md\").write_text(model_card, encoding=\"utf-8\")\n",
        "\n",
        "zip_path = shutil.make_archive(\"ai_infra_tiny_artifacts\", \"zip\", artifact_dir)\n",
        "print(\"created:\", zip_path)\n",
        "\n",
        "try:\n",
        "    from google.colab import files\n",
        "    files.download(zip_path)\n",
        "except Exception:\n",
        "    print(\"Not running in Colab, download skipped.\")"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {},
      "source": [
        "## 13. 把这个 tiny demo 映射到生产工具\n",
        "\n",
        "| 阶段 | notebook 中的动作 | 生产常用替代 |\n",
        "|---|---|---|\n",
        "| 数据 | Python list + tokenizer | Spark/Ray Data/Datatrove/DVC/lakeFS/对象存储 |\n",
        "| tokenizer | Hugging Face tokenizers | SentencePiece/HF tokenizers/tiktoken/chat template |\n",
        "| 预训练 | TinyGPT + AdamW | Megatron-LM/NeMo/DeepSpeed/PyTorch FSDP/JAX |\n",
        "| SFT | masked assistant loss | TRL SFTTrainer/Axolotl/LLaMA-Factory/torchtune/Unsloth |\n",
        "| 偏好优化 | 手写 DPO loss | TRL DPOTrainer/OpenRLHF/NeMo-Aligner |\n",
        "| 评估 | loss + preference accuracy | lm-eval-harness/OpenAI Evals/DeepEval/Ragas/private evals |\n",
        "| 推理 | generate() | vLLM/TGI/TensorRT-LLM/SGLang/llama.cpp/Ollama |\n",
        "| 服务 | FastAPI TestClient | Kubernetes/KServe/Ray Serve/Triton/API Gateway |\n",
        "| 观测 | pandas/plots | MLflow/W&B/Prometheus/Grafana/OpenTelemetry |\n",
        "\n",
        "下一步练习：\n",
        "\n",
        "1. 把 `steps` 增大 5 倍，观察 loss 和生成质量。\n",
        "2. 增加 20 条高质量 SFT 样本，看格式稳定性是否变好。\n",
        "3. 把 rejected 写得更像真实错误，观察 DPO 是否更有效。\n",
        "4. 把模型换成 Hugging Face 的小模型，用 TRL `SFTTrainer` 和 `DPOTrainer` 重写训练阶段。\n",
        "5. 用 vLLM 或 llama.cpp 部署一个真实开源小模型，测吞吐和延迟。"
      ]
    }
  ],
  "metadata": {
    "colab": {
      "name": "AI Infra Tiny LLM Complete Pipeline.ipynb",
      "provenance": []
    },
    "kernelspec": {
      "display_name": "Python 3",
      "language": "python",
      "name": "python3"
    },
    "language_info": {
      "name": "python",
      "pygments_lexer": "ipython3"
    }
  },
  "nbformat": 4,
  "nbformat_minor": 5
}