第十章:多因素认证(MFA)

“密码是安全的最薄弱环节,MFA 是加固它的最有效手段。”

        mindmap
  root((多因素认证))
    认证三要素
      你知道的
      你拥有的
      你是谁
    密码安全
      bcrypt
      Argon2
      scrypt
    MFA 方式
      TOTP
      FIDO2/WebAuthn
      Passkey
      推送通知
    高级
      自适应认证
      MFA 疲劳攻击
    

10.1 认证三要素

多因素认证基于三类不同的认证因素:

因素

类型

示例

优点

缺点

你知道的

知识因素

密码、PIN、安全问题

简单、成本低

可被猜测、钓鱼、泄露

你拥有的

持有因素

手机、硬件密钥、智能卡

物理持有难以远程窃取

可丢失、被盗

你是谁

生物因素

指纹、面部、虹膜

难以伪造

隐私问题、无法更换

MFA = 使用两种或以上不同类型的因素

        flowchart LR
    subgraph 单因素认证
        U1[用户] -->|密码| P1[通过 ✅]
    end
    subgraph 双因素认证 2FA
        U2[用户] -->|密码<br/>知识因素| S2[第一关 ✅]
        S2 -->|TOTP 验证码<br/>持有因素| P2[通过 ✅]
    end
    subgraph 三因素认证
        U3[用户] -->|密码<br/>知识因素| S3a[第一关 ✅]
        S3a -->|硬件密钥<br/>持有因素| S3b[第二关 ✅]
        S3b -->|指纹<br/>生物因素| P3[通过 ✅]
    end
    

10.2 密码认证的问题

常见攻击方式

攻击方式

描述

防御

暴力破解

尝试所有可能的密码组合

账户锁定、速率限制

字典攻击

使用常见密码列表

密码复杂度要求

撞库攻击

用泄露的密码尝试其他网站

密码唯一性教育

钓鱼攻击

伪造登录页面骗取密码

FIDO2/WebAuthn

彩虹表

预计算哈希值反查密码

加盐哈希

键盘记录

恶意软件记录键盘输入

硬件密钥

密码存储

永远不要明文存储密码! 使用专门的密码哈希函数:

# ❌ 错误:明文存储
password_db["user1"] = "my_password"

# ❌ 错误:普通哈希(太快,容易暴力破解)
import hashlib
password_db["user1"] = hashlib.sha256(b"my_password").hexdigest()

# ✅ 正确:使用 Argon2(现代首选)
from argon2 import PasswordHasher

ph = PasswordHasher(
    time_cost=3,        # 迭代次数
    memory_cost=65536,  # 内存使用 64MB
    parallelism=4,      # 并行线程数
)

# 存储
hashed = ph.hash("my_password")
# $argon2id$v=19$m=65536,t=3,p=4$...

# 验证
try:
    ph.verify(hashed, "my_password")
    print("密码正确 ✅")
    # 检查是否需要重新哈希(参数升级时)
    if ph.check_needs_rehash(hashed):
        new_hash = ph.hash("my_password")
except Exception:
    print("密码错误 ❌")

密码哈希函数对比

算法

年份

特点

推荐

MD5/SHA

通用哈希,太快

❌ 禁止

bcrypt

1999

经典,CPU 密集

✅ 遗留系统

scrypt

2009

CPU + 内存密集

✅ 可用

Argon2id

2015

竞赛冠军,最安全

✅ 首选

10.3 TOTP — 基于时间的一次性密码

TOTP(Time-based One-Time Password,RFC 6238)是最常用的 MFA 方式:

TOTP 验证流程

        sequenceDiagram
    participant U as 用户
    participant App as 认证器 App
    participant S as 服务端

    Note over App,S: 注册阶段(一次性)
    S->>S: 生成共享密钥 secret
    S->>U: 展示二维码(含 secret)
    U->>App: 扫描二维码,存储 secret

    Note over App,S: 验证阶段(每次登录)
    U->>S: 提交用户名 + 密码
    S->>S: 验证密码 ✅
    S->>U: 请求 TOTP 验证码
    App->>App: HMAC-SHA1(secret, time/30) → 6位数字
    U->>S: 输入 6 位验证码
    S->>S: HMAC-SHA1(secret, time/30) → 6位数字
    S->>S: 比较验证码(允许 ±1 窗口)
    S->>U: 登录成功 ✅
    

MFA 注册流程

        sequenceDiagram
    participant U as 用户
    participant B as 浏览器
    participant S as 服务端
    participant DB as 数据库

    U->>B: 点击"启用 MFA"
    B->>S: POST /mfa/enroll
    S->>S: 生成 TOTP secret
    S->>S: 生成备用码(10 个)
    S->>DB: 临时存储 secret(待确认)
    S->>B: 返回二维码 URI + 备用码
    B->>U: 展示二维码 + 备用码
    U->>U: 用认证器 App 扫码
    U->>B: 输入当前 TOTP 验证码确认
    B->>S: POST /mfa/confirm {code}
    S->>S: 验证 TOTP 码
    alt 验证成功
        S->>DB: 标记 MFA 已启用
        S->>B: MFA 启用成功 ✅
    else 验证失败
        S->>DB: 删除临时 secret
        S->>B: 验证码错误,请重试 ❌
    end
    

MFA 决策流程

        flowchart TD
    A[用户登录请求] --> B{密码验证通过?}
    B -->|否| C[登录失败 ❌]
    B -->|是| D[收集上下文信息<br/>IP / 设备 / 位置 / 时间]
    D --> E{风险评估}
    E -->|低风险<br/>已知设备+常用IP| F[直接放行 ✅]
    E -->|中风险<br/>新设备或新IP| G[要求 TOTP 验证]
    E -->|高风险<br/>异地登录| H[要求 TOTP + 邮件验证]
    E -->|极高风险<br/>不可能旅行| I[要求 FIDO2 + 管理员审批]
    G --> J{MFA 验证通过?}
    H --> J
    I --> J
    J -->|是| K[登录成功 ✅<br/>记录设备指纹]
    J -->|否| L[登录失败 ❌<br/>触发告警]
    

Python TOTP 完整实现

import pyotp
import qrcode
import io
import base64
import secrets

class TOTPManager:
    """TOTP 多因素认证管理器"""

    def __init__(self, issuer: str = "MyApp"):
        self.issuer = issuer

    def generate_secret(self) -> str:
        """生成 Base32 编码的共享密钥"""
        return pyotp.random_base32()

    def generate_backup_codes(self, count: int = 10) -> list[str]:
        """生成一次性备用码"""
        return [secrets.token_hex(4).upper() for _ in range(count)]

    def get_provisioning_uri(self, secret: str, email: str) -> str:
        """生成二维码 URI(兼容 Google Authenticator / Authy 等)"""
        totp = pyotp.TOTP(secret)
        return totp.provisioning_uri(name=email, issuer_name=self.issuer)

    def generate_qr_base64(self, uri: str) -> str:
        """生成二维码的 Base64 编码(可直接嵌入 HTML <img>)"""
        qr = qrcode.QRCode(version=1, box_size=10, border=5)
        qr.add_data(uri)
        qr.make(fit=True)
        img = qr.make_image(fill_color="black", back_color="white")
        buffer = io.BytesIO()
        img.save(buffer, format="PNG")
        return base64.b64encode(buffer.getvalue()).decode()

    def verify_code(self, secret: str, code: str, valid_window: int = 1) -> bool:
        """
        验证 TOTP 验证码
        valid_window=1 表示允许前后各 30 秒的偏差
        """
        totp = pyotp.TOTP(secret)
        return totp.verify(code, valid_window=valid_window)

    def get_current_code(self, secret: str) -> str:
        """获取当前验证码(仅用于测试)"""
        totp = pyotp.TOTP(secret)
        return totp.now()


# --- 使用示例 ---
manager = TOTPManager(issuer="SecurityHandbook")

# 1. 注册阶段
secret = manager.generate_secret()
backup_codes = manager.generate_backup_codes()
uri = manager.get_provisioning_uri(secret, "walter@example.com")
qr_b64 = manager.generate_qr_base64(uri)

print(f"密钥: {secret}")
print(f"URI: {uri}")
print(f"备用码: {backup_codes}")

# 2. 验证阶段
current_code = manager.get_current_code(secret)
print(f"当前验证码: {current_code}")
print(f"验证结果: {manager.verify_code(secret, current_code)}")

FastAPI MFA 端点

from fastapi import FastAPI, Depends, HTTPException, status
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
from pydantic import BaseModel
import pyotp
import secrets
from datetime import datetime

app = FastAPI(title="MFA Service")
security = HTTPBearer()

# --- 模型 ---
class MFAEnrollResponse(BaseModel):
    qr_uri: str
    secret: str  # 生产环境不应返回,仅供手动输入
    backup_codes: list[str]

class MFAVerifyRequest(BaseModel):
    code: str

class BackupCodeRequest(BaseModel):
    backup_code: str

# --- 模拟数据库 ---
users_db: dict = {}

def get_current_user(cred: HTTPAuthorizationCredentials = Depends(security)) -> str:
    """从 Bearer Token 中提取用户 ID(简化示例)"""
    # 生产环境应验证 JWT
    return cred.credentials  # 此处直接用 token 当 user_id

# --- MFA 注册 ---
@app.post("/mfa/enroll", response_model=MFAEnrollResponse)
async def enroll_mfa(user_id: str = Depends(get_current_user)):
    """启用 MFA:生成密钥和备用码"""
    secret = pyotp.random_base32()
    backup_codes = [secrets.token_hex(4).upper() for _ in range(10)]

    totp = pyotp.TOTP(secret)
    uri = totp.provisioning_uri(name=user_id, issuer_name="MyApp")

    # 临时存储,等待确认
    users_db[user_id] = {
        "mfa_secret": secret,
        "mfa_enabled": False,  # 尚未确认
        "backup_codes": backup_codes,
        "enrolled_at": None,
    }

    return MFAEnrollResponse(qr_uri=uri, secret=secret, backup_codes=backup_codes)

# --- MFA 确认(用户扫码后输入验证码确认) ---
@app.post("/mfa/confirm")
async def confirm_mfa(
    req: MFAVerifyRequest,
    user_id: str = Depends(get_current_user),
):
    """确认 MFA 注册:用户输入验证码以证明已正确配置"""
    user = users_db.get(user_id)
    if not user or not user.get("mfa_secret"):
        raise HTTPException(status_code=400, detail="请先调用 /mfa/enroll")

    totp = pyotp.TOTP(user["mfa_secret"])
    if not totp.verify(req.code, valid_window=1):
        raise HTTPException(status_code=400, detail="验证码错误")

    user["mfa_enabled"] = True
    user["enrolled_at"] = datetime.utcnow().isoformat()
    return {"message": "MFA 已启用 ✅"}

# --- MFA 验证(登录时) ---
@app.post("/mfa/verify")
async def verify_mfa(
    req: MFAVerifyRequest,
    user_id: str = Depends(get_current_user),
):
    """登录时验证 TOTP 验证码"""
    user = users_db.get(user_id)
    if not user or not user.get("mfa_enabled"):
        raise HTTPException(status_code=400, detail="MFA 未启用")

    totp = pyotp.TOTP(user["mfa_secret"])
    if not totp.verify(req.code, valid_window=1):
        raise HTTPException(status_code=401, detail="验证码错误或已过期")

    return {"message": "MFA 验证通过 ✅", "user_id": user_id}

# --- 备用码验证 ---
@app.post("/mfa/backup")
async def verify_backup_code(
    req: BackupCodeRequest,
    user_id: str = Depends(get_current_user),
):
    """使用一次性备用码(丢失手机时使用)"""
    user = users_db.get(user_id)
    if not user or not user.get("mfa_enabled"):
        raise HTTPException(status_code=400, detail="MFA 未启用")

    code = req.backup_code.upper()
    if code not in user["backup_codes"]:
        raise HTTPException(status_code=401, detail="备用码无效")

    # 备用码只能使用一次
    user["backup_codes"].remove(code)
    remaining = len(user["backup_codes"])

    return {
        "message": "备用码验证通过 ✅",
        "remaining_backup_codes": remaining,
        "warning": "请尽快重新绑定认证器" if remaining < 3 else None,
    }

# --- 禁用 MFA ---
@app.delete("/mfa")
async def disable_mfa(
    req: MFAVerifyRequest,
    user_id: str = Depends(get_current_user),
):
    """禁用 MFA(需要当前验证码确认)"""
    user = users_db.get(user_id)
    if not user or not user.get("mfa_enabled"):
        raise HTTPException(status_code=400, detail="MFA 未启用")

    totp = pyotp.TOTP(user["mfa_secret"])
    if not totp.verify(req.code, valid_window=1):
        raise HTTPException(status_code=401, detail="验证码错误")

    user["mfa_enabled"] = False
    user["mfa_secret"] = None
    user["backup_codes"] = []
    return {"message": "MFA 已禁用"}

Java TOTP 实现

使用 dev.samstevens.totp 库实现 Google Authenticator 兼容的 TOTP:

<!-- pom.xml 依赖 -->
<dependency>
    <groupId>dev.samstevens.totp</groupId>
    <artifactId>totp</artifactId>
    <version>1.7.1</version>
</dependency>
import dev.samstevens.totp.code.*;
import dev.samstevens.totp.exceptions.QrGenerationException;
import dev.samstevens.totp.qr.QrData;
import dev.samstevens.totp.qr.QrGenerator;
import dev.samstevens.totp.qr.ZxingPngQrGenerator;
import dev.samstevens.totp.secret.DefaultSecretGenerator;
import dev.samstevens.totp.secret.SecretGenerator;
import dev.samstevens.totp.time.SystemTimeProvider;
import dev.samstevens.totp.time.TimeProvider;
import dev.samstevens.totp.util.Utils;

import java.security.SecureRandom;
import java.util.ArrayList;
import java.util.List;

public class TotpService {

    private final SecretGenerator secretGenerator = new DefaultSecretGenerator(32);
    private final QrGenerator qrGenerator = new ZxingPngQrGenerator();
    private final TimeProvider timeProvider = new SystemTimeProvider();
    private final String issuer;

    public TotpService(String issuer) {
        this.issuer = issuer;
    }

    /** 生成 TOTP 密钥 */
    public String generateSecret() {
        return secretGenerator.generate();
    }

    /** 生成一次性备用码 */
    public List<String> generateBackupCodes(int count) {
        SecureRandom random = new SecureRandom();
        List<String> codes = new ArrayList<>();
        for (int i = 0; i < count; i++) {
            codes.add(String.format("%08X", random.nextInt()));
        }
        return codes;
    }

    /** 生成二维码的 Base64 PNG 数据(可直接嵌入 <img> 标签) */
    public String generateQrCodeBase64(String secret, String email)
            throws QrGenerationException {
        QrData data = new QrData.Builder()
                .label(email)
                .secret(secret)
                .issuer(issuer)
                .algorithm(HashingAlgorithm.SHA1)
                .digits(6)
                .period(30)
                .build();
        byte[] imageData = qrGenerator.generate(data);
        return Utils.getDataUriForImage(imageData, qrGenerator.getImageMimeType());
    }

    /** 验证 TOTP 验证码 */
    public boolean verifyCode(String secret, String code) {
        CodeVerifier verifier = new DefaultCodeVerifier(
                new DefaultCodeGenerator(), timeProvider);
        // 允许前后各 1 个时间窗口(±30 秒)
        ((DefaultCodeVerifier) verifier).setAllowedTimePeriodDiscrepancy(1);
        return verifier.isValidCode(secret, code);
    }
}

Spring Security MFA 集成

import org.springframework.security.authentication.AuthenticationProvider;
import org.springframework.security.authentication.BadCredentialsException;
import org.springframework.security.core.Authentication;
import org.springframework.security.core.AuthenticationException;
import org.springframework.stereotype.Component;

/**
 * 自定义 MFA 认证 Token
 */
public class MfaAuthenticationToken extends AbstractAuthenticationToken {
    private final String userId;
    private final String totpCode;

    public MfaAuthenticationToken(String userId, String totpCode) {
        super(null);
        this.userId = userId;
        this.totpCode = totpCode;
        setAuthenticated(false);
    }

    @Override public Object getCredentials() { return totpCode; }
    @Override public Object getPrincipal()   { return userId; }
}

/**
 * MFA 认证 Provider:在密码认证通过后进行 TOTP 二次验证
 */
@Component
public class MfaAuthenticationProvider implements AuthenticationProvider {

    private final TotpService totpService;
    private final UserRepository userRepository;

    public MfaAuthenticationProvider(TotpService totpService,
                                     UserRepository userRepository) {
        this.totpService = totpService;
        this.userRepository = userRepository;
    }

    @Override
    public Authentication authenticate(Authentication auth)
            throws AuthenticationException {
        String userId = (String) auth.getPrincipal();
        String code   = (String) auth.getCredentials();

        UserEntity user = userRepository.findById(userId)
                .orElseThrow(() -> new BadCredentialsException("用户不存在"));

        if (!user.isMfaEnabled()) {
            throw new BadCredentialsException("MFA 未启用");
        }

        if (!totpService.verifyCode(user.getMfaSecret(), code)) {
            throw new BadCredentialsException("TOTP 验证码错误");
        }

        MfaAuthenticationToken result =
                new MfaAuthenticationToken(userId, null);
        result.setAuthenticated(true);
        return result;
    }

    @Override
    public boolean supports(Class<?> authentication) {
        return MfaAuthenticationToken.class.isAssignableFrom(authentication);
    }
}

/**
 * MFA REST 控制器
 */
@RestController
@RequestMapping("/api/mfa")
public class MfaController {

    private final TotpService totpService;
    private final UserRepository userRepository;

    public MfaController(TotpService totpService,
                         UserRepository userRepository) {
        this.totpService = totpService;
        this.userRepository = userRepository;
    }

    /** 注册 MFA */
    @PostMapping("/enroll")
    public ResponseEntity<?> enroll(@AuthenticationPrincipal UserDetails principal)
            throws Exception {
        String secret = totpService.generateSecret();
        List<String> backupCodes = totpService.generateBackupCodes(10);
        String qrCode = totpService.generateQrCodeBase64(
                secret, principal.getUsername());

        // 临时存储 secret,等待确认
        UserEntity user = userRepository.findByUsername(principal.getUsername());
        user.setPendingMfaSecret(secret);
        user.setBackupCodes(backupCodes);
        userRepository.save(user);

        return ResponseEntity.ok(Map.of(
                "qrCode", qrCode,
                "backupCodes", backupCodes
        ));
    }

    /** 确认 MFA */
    @PostMapping("/confirm")
    public ResponseEntity<?> confirm(
            @AuthenticationPrincipal UserDetails principal,
            @RequestBody Map<String, String> body) {
        UserEntity user = userRepository.findByUsername(principal.getUsername());
        String code = body.get("code");

        if (!totpService.verifyCode(user.getPendingMfaSecret(), code)) {
            return ResponseEntity.badRequest().body(Map.of("error", "验证码错误"));
        }

        user.setMfaSecret(user.getPendingMfaSecret());
        user.setPendingMfaSecret(null);
        user.setMfaEnabled(true);
        userRepository.save(user);

        return ResponseEntity.ok(Map.of("message", "MFA 已启用 ✅"));
    }

    /** 验证 MFA */
    @PostMapping("/verify")
    public ResponseEntity<?> verify(
            @AuthenticationPrincipal UserDetails principal,
            @RequestBody Map<String, String> body) {
        UserEntity user = userRepository.findByUsername(principal.getUsername());
        String code = body.get("code");

        if (!user.isMfaEnabled()) {
            return ResponseEntity.badRequest().body(Map.of("error", "MFA 未启用"));
        }

        if (!totpService.verifyCode(user.getMfaSecret(), code)) {
            return ResponseEntity.status(401).body(Map.of("error", "验证码错误"));
        }

        return ResponseEntity.ok(Map.of("message", "MFA 验证通过 ✅"));
    }
}

Go TOTP 实现

使用 pquerna/otp 库:

package mfa

import (
	"crypto/rand"
	"encoding/hex"
	"fmt"
	"image/png"
	"net/http"
	"strings"
	"sync"
	"time"

	"github.com/gin-gonic/gin"
	"github.com/pquerna/otp"
	"github.com/pquerna/otp/totp"
)

// TOTPManager 管理 TOTP 操作
type TOTPManager struct {
	Issuer string
}

// GenerateSecret 生成 TOTP 密钥和二维码
func (m *TOTPManager) GenerateSecret(email string) (*otp.Key, error) {
	key, err := totp.Generate(totp.GenerateOpts{
		Issuer:      m.Issuer,
		AccountName: email,
		Period:      30,
		Digits:      otp.DigitsSix,
		Algorithm:   otp.AlgorithmSHA1,
	})
	return key, err
}

// GenerateBackupCodes 生成一次性备用码
func (m *TOTPManager) GenerateBackupCodes(count int) ([]string, error) {
	codes := make([]string, count)
	for i := 0; i < count; i++ {
		b := make([]byte, 4)
		if _, err := rand.Read(b); err != nil {
			return nil, err
		}
		codes[i] = strings.ToUpper(hex.EncodeToString(b))
	}
	return codes, nil
}

// ValidateCode 验证 TOTP 验证码
func (m *TOTPManager) ValidateCode(secret, code string) bool {
	valid, err := totp.ValidateCustom(code, secret, time.Now(), totp.ValidateOpts{
		Period:    30,
		Skew:     1, // 允许前后各 1 个时间窗口
		Digits:   otp.DigitsSix,
		Algorithm: otp.AlgorithmSHA1,
	})
	return err == nil && valid
}

// WriteQRCodePNG 将二维码写入 HTTP 响应
func WriteQRCodePNG(w http.ResponseWriter, key *otp.Key) error {
	img, err := key.Image(200, 200)
	if err != nil {
		return err
	}
	w.Header().Set("Content-Type", "image/png")
	return png.Encode(w, img)
}

// ---- Gin MFA 中间件与路由 ----

// UserMFA 用户 MFA 信息(模拟数据库)
type UserMFA struct {
	Secret      string
	Enabled     bool
	BackupCodes []string
}

var (
	mfaStore = make(map[string]*UserMFA)
	mu       sync.RWMutex
)

// RegisterMFARoutes 注册 MFA 相关路由
func RegisterMFARoutes(r *gin.RouterGroup) {
	manager := &TOTPManager{Issuer: "MyApp"}

	// 注册 MFA
	r.POST("/mfa/enroll", func(c *gin.Context) {
		userID := c.GetString("user_id") // 从认证中间件获取

		key, err := manager.GenerateSecret(userID)
		if err != nil {
			c.JSON(http.StatusInternalServerError,
				gin.H{"error": "生成密钥失败"})
			return
		}

		backupCodes, _ := manager.GenerateBackupCodes(10)

		mu.Lock()
		mfaStore[userID] = &UserMFA{
			Secret:      key.Secret(),
			Enabled:     false,
			BackupCodes: backupCodes,
		}
		mu.Unlock()

		c.JSON(http.StatusOK, gin.H{
			"qr_uri":       key.URL(),
			"secret":       key.Secret(),
			"backup_codes": backupCodes,
		})
	})

	// 确认 MFA
	r.POST("/mfa/confirm", func(c *gin.Context) {
		userID := c.GetString("user_id")
		var req struct {
			Code string `json:"code" binding:"required"`
		}
		if err := c.ShouldBindJSON(&req); err != nil {
			c.JSON(http.StatusBadRequest, gin.H{"error": "缺少验证码"})
			return
		}

		mu.RLock()
		user, exists := mfaStore[userID]
		mu.RUnlock()

		if !exists {
			c.JSON(http.StatusBadRequest, gin.H{"error": "请先注册 MFA"})
			return
		}

		if !manager.ValidateCode(user.Secret, req.Code) {
			c.JSON(http.StatusBadRequest, gin.H{"error": "验证码错误"})
			return
		}

		mu.Lock()
		user.Enabled = true
		mu.Unlock()

		c.JSON(http.StatusOK, gin.H{"message": "MFA 已启用 ✅"})
	})

	// 验证 MFA
	r.POST("/mfa/verify", func(c *gin.Context) {
		userID := c.GetString("user_id")
		var req struct {
			Code string `json:"code" binding:"required"`
		}
		if err := c.ShouldBindJSON(&req); err != nil {
			c.JSON(http.StatusBadRequest, gin.H{"error": "缺少验证码"})
			return
		}

		mu.RLock()
		user, exists := mfaStore[userID]
		mu.RUnlock()

		if !exists || !user.Enabled {
			c.JSON(http.StatusBadRequest, gin.H{"error": "MFA 未启用"})
			return
		}

		if !manager.ValidateCode(user.Secret, req.Code) {
			c.JSON(http.StatusUnauthorized, gin.H{"error": "验证码错误"})
			return
		}

		c.JSON(http.StatusOK, gin.H{"message": "MFA 验证通过 ✅"})
	})

	// 备用码验证
	r.POST("/mfa/backup", func(c *gin.Context) {
		userID := c.GetString("user_id")
		var req struct {
			BackupCode string `json:"backup_code" binding:"required"`
		}
		if err := c.ShouldBindJSON(&req); err != nil {
			c.JSON(http.StatusBadRequest, gin.H{"error": "缺少备用码"})
			return
		}

		mu.Lock()
		defer mu.Unlock()

		user, exists := mfaStore[userID]
		if !exists || !user.Enabled {
			c.JSON(http.StatusBadRequest, gin.H{"error": "MFA 未启用"})
			return
		}

		code := strings.ToUpper(req.BackupCode)
		found := -1
		for i, bc := range user.BackupCodes {
			if bc == code {
				found = i
				break
			}
		}

		if found == -1 {
			c.JSON(http.StatusUnauthorized, gin.H{"error": "备用码无效"})
			return
		}

		// 删除已使用的备用码
		user.BackupCodes = append(
			user.BackupCodes[:found], user.BackupCodes[found+1:]...)

		c.JSON(http.StatusOK, gin.H{
			"message":                "备用码验证通过 ✅",
			"remaining_backup_codes": len(user.BackupCodes),
		})
	})
}

// MFARequiredMiddleware Gin 中间件:要求已完成 MFA 验证
func MFARequiredMiddleware() gin.HandlerFunc {
	return func(c *gin.Context) {
		mfaVerified, exists := c.Get("mfa_verified")
		if !exists || mfaVerified != true {
			c.AbortWithStatusJSON(http.StatusForbidden, gin.H{
				"error": "需要完成 MFA 验证",
			})
			return
		}
		c.Next()
	}
}

10.4 FIDO2/WebAuthn — 无密码认证

FIDO2 是下一代认证标准,由 W3C WebAuthn 和 FIDO CTAP 组成:

        flowchart TB
    subgraph FIDO2
        direction LR
        WA[WebAuthn<br/>浏览器 API<br/>W3C 标准]
        CTAP[CTAP<br/>客户端到认证器协议<br/>FIDO Alliance 标准]
    end
    

注册流程

        sequenceDiagram
    participant U as 用户
    participant B as 浏览器
    participant S as 服务器 (Relying Party)

    B->>S: 1. 请求注册
    S->>B: 2. 返回挑战 (challenge + user info)
    B->>U: 3. 提示用户验证身份
    U->>B: 4. 生物识别或 PIN
    B->>B: 5. 创建密钥对
    B->>S: 6. 发送公钥 + 签名
    S->>S: 7. 验证并存储公钥
    S->>B: 8. 注册成功 ✅
    

Passkey

Passkey 是 FIDO2 的消费者友好版本:

特性

传统密码

TOTP

Passkey

防钓鱼

防重放

用户体验

中等

优秀

跨设备

需同步

✅(云同步)

无密码

10.5 MFA 方式对比

方式

安全性

防钓鱼

用户体验

部署成本

离线可用

适用场景

TOTP

⭐⭐⭐

中等

通用,最广泛

SMS 验证码

⭐⭐

消费者应用(不推荐高安全场景)

推送通知

⭐⭐⭐

优秀

企业应用(需防 MFA 疲劳)

WebAuthn/Passkey

⭐⭐⭐⭐⭐

优秀

高安全场景,未来趋势

硬件密钥 (YubiKey)

⭐⭐⭐⭐⭐

中等

高价值目标(管理员、金融)

选型建议

  • 消费者应用:TOTP 为主,逐步迁移到 Passkey

  • 企业内部:推送通知 + 号码匹配,或 WebAuthn

  • 高安全场景(管理员、金融):硬件密钥(YubiKey)必选

  • 避免使用 SMS:SIM 卡劫持(SIM Swapping)风险高

10.6 MFA 疲劳攻击与防御

MFA 疲劳攻击(MFA Fatigue / MFA Bombing):攻击者反复触发 MFA 推送通知,直到用户烦躁地点击”批准”。

        sequenceDiagram
    participant A as 攻击者
    participant S as 服务器
    participant U as 用户手机

    A->>S: 1. 用泄露的密码尝试登录
    S->>U: 推送 MFA 通知 #1
    U->>U: 用户拒绝 ❌
    A->>S: 2. 再次尝试登录
    S->>U: 推送 MFA 通知 #2
    U->>U: 用户拒绝 ❌
    A->>S: 3. 反复尝试...
    S->>U: 推送 MFA 通知 #N
    Note over U: 用户疲劳,误点"批准"
    U->>S: 批准 ✅
    S->>A: 登录成功 → 攻击得手 💀
    

防御措施

  • ✅ 使用号码匹配(Number Matching):用户必须输入屏幕上显示的数字

  • ✅ 限制推送频率

  • ✅ 显示登录上下文(IP、位置、设备)

  • ✅ 使用 FIDO2 替代推送通知

  • ✅ 异常检测:短时间内多次 MFA 请求告警

10.7 自适应认证

根据风险等级动态调整认证强度:

from dataclasses import dataclass
from enum import Enum

class RiskLevel(Enum):
    LOW = "low"
    MEDIUM = "medium"
    HIGH = "high"
    CRITICAL = "critical"

@dataclass
class LoginContext:
    ip_address: str
    device_id: str
    location: str
    timestamp: float
    user_agent: str

def assess_risk(user_id: str, context: LoginContext) -> RiskLevel:
    """评估登录风险"""
    risk_score = 0.0

    # 检查 IP 是否已知
    if not is_known_ip(user_id, context.ip_address):
        risk_score += 0.3

    # 检查设备是否已知
    if not is_known_device(user_id, context.device_id):
        risk_score += 0.3

    # 检查地理位置异常(不可能旅行)
    if is_impossible_travel(user_id, context.location, context.timestamp):
        risk_score += 0.4

    # 检查是否在异常时间
    if is_unusual_time(user_id, context.timestamp):
        risk_score += 0.1

    if risk_score < 0.2:
        return RiskLevel.LOW
    elif risk_score < 0.5:
        return RiskLevel.MEDIUM
    elif risk_score < 0.8:
        return RiskLevel.HIGH
    else:
        return RiskLevel.CRITICAL

def get_required_auth(risk: RiskLevel) -> list[str]:
    """根据风险等级决定认证要求"""
    requirements = {
        RiskLevel.LOW: ["password"],
        RiskLevel.MEDIUM: ["password", "totp"],
        RiskLevel.HIGH: ["password", "totp", "email_verification"],
        RiskLevel.CRITICAL: ["password", "fido2", "admin_approval"],
    }
    return requirements[risk]

10.8 MFA 用户体验最佳实践

注册引导

  1. 渐进式引导:不要在注册时强制启用 MFA,在首次登录后温和提示

  2. 清晰的说明:用截图或动画演示如何使用认证器 App

  3. 多种选择:提供 TOTP、推送、硬件密钥等多种方式

  4. 备用码提醒:强调保存备用码的重要性,提供下载/打印选项

日常使用

场景

推荐做法

避免

已知设备

记住设备 30 天

每次都要求 MFA

敏感操作

即使已登录也要求 MFA

仅在登录时验证

丢失设备

备用码 → 客服重置流程

无恢复途径

新设备

必须 MFA + 邮件通知

静默放行

恢复流程

        flowchart TD
    A[用户无法完成 MFA] --> B{有备用码?}
    B -->|是| C[使用备用码登录]
    C --> D[提示重新绑定认证器]
    B -->|否| E{有备用邮箱/手机?}
    E -->|是| F[通过备用渠道验证身份]
    F --> G[临时禁用 MFA]
    G --> D
    E -->|否| H[联系客服]
    H --> I[人工身份验证<br/>身份证/视频通话]
    I --> J[管理员重置 MFA]
    J --> D
    

10.9 小结

  • MFA 通过组合多种认证因素大幅提升安全性

  • Argon2id 是密码哈希的现代首选

  • TOTP 是最普及的 MFA 方式,但无法防钓鱼

  • FIDO2/Passkey 是无密码认证的未来,天然防钓鱼

  • 自适应认证 根据风险动态调整认证强度,平衡安全与体验

  • 警惕 MFA 疲劳攻击,使用号码匹配等防御措施

  • 选择 MFA 方式时需综合考虑安全性、用户体验和部署成本

  • 完善的 恢复流程 与 MFA 本身同样重要