第十章:多因素认证(MFA)
“密码是安全的最薄弱环节,MFA 是加固它的最有效手段。”
mindmap
root((多因素认证))
认证三要素
你知道的
你拥有的
你是谁
密码安全
bcrypt
Argon2
scrypt
MFA 方式
TOTP
FIDO2/WebAuthn
Passkey
推送通知
高级
自适应认证
MFA 疲劳攻击
10.1 认证三要素
多因素认证基于三类不同的认证因素:
因素 |
类型 |
示例 |
优点 |
缺点 |
|---|---|---|---|---|
你知道的 |
知识因素 |
密码、PIN、安全问题 |
简单、成本低 |
可被猜测、钓鱼、泄露 |
你拥有的 |
持有因素 |
手机、硬件密钥、智能卡 |
物理持有难以远程窃取 |
可丢失、被盗 |
你是谁 |
生物因素 |
指纹、面部、虹膜 |
难以伪造 |
隐私问题、无法更换 |
MFA = 使用两种或以上不同类型的因素
flowchart LR
subgraph 单因素认证
U1[用户] -->|密码| P1[通过 ✅]
end
subgraph 双因素认证 2FA
U2[用户] -->|密码<br/>知识因素| S2[第一关 ✅]
S2 -->|TOTP 验证码<br/>持有因素| P2[通过 ✅]
end
subgraph 三因素认证
U3[用户] -->|密码<br/>知识因素| S3a[第一关 ✅]
S3a -->|硬件密钥<br/>持有因素| S3b[第二关 ✅]
S3b -->|指纹<br/>生物因素| P3[通过 ✅]
end
10.2 密码认证的问题
常见攻击方式
攻击方式 |
描述 |
防御 |
|---|---|---|
暴力破解 |
尝试所有可能的密码组合 |
账户锁定、速率限制 |
字典攻击 |
使用常见密码列表 |
密码复杂度要求 |
撞库攻击 |
用泄露的密码尝试其他网站 |
密码唯一性教育 |
钓鱼攻击 |
伪造登录页面骗取密码 |
FIDO2/WebAuthn |
彩虹表 |
预计算哈希值反查密码 |
加盐哈希 |
键盘记录 |
恶意软件记录键盘输入 |
硬件密钥 |
密码存储
永远不要明文存储密码! 使用专门的密码哈希函数:
# ❌ 错误:明文存储
password_db["user1"] = "my_password"
# ❌ 错误:普通哈希(太快,容易暴力破解)
import hashlib
password_db["user1"] = hashlib.sha256(b"my_password").hexdigest()
# ✅ 正确:使用 Argon2(现代首选)
from argon2 import PasswordHasher
ph = PasswordHasher(
time_cost=3, # 迭代次数
memory_cost=65536, # 内存使用 64MB
parallelism=4, # 并行线程数
)
# 存储
hashed = ph.hash("my_password")
# $argon2id$v=19$m=65536,t=3,p=4$...
# 验证
try:
ph.verify(hashed, "my_password")
print("密码正确 ✅")
# 检查是否需要重新哈希(参数升级时)
if ph.check_needs_rehash(hashed):
new_hash = ph.hash("my_password")
except Exception:
print("密码错误 ❌")
密码哈希函数对比
算法 |
年份 |
特点 |
推荐 |
|---|---|---|---|
MD5/SHA |
— |
通用哈希,太快 |
❌ 禁止 |
bcrypt |
1999 |
经典,CPU 密集 |
✅ 遗留系统 |
scrypt |
2009 |
CPU + 内存密集 |
✅ 可用 |
Argon2id |
2015 |
竞赛冠军,最安全 |
✅ 首选 |
10.3 TOTP — 基于时间的一次性密码
TOTP(Time-based One-Time Password,RFC 6238)是最常用的 MFA 方式:
TOTP 验证流程
sequenceDiagram
participant U as 用户
participant App as 认证器 App
participant S as 服务端
Note over App,S: 注册阶段(一次性)
S->>S: 生成共享密钥 secret
S->>U: 展示二维码(含 secret)
U->>App: 扫描二维码,存储 secret
Note over App,S: 验证阶段(每次登录)
U->>S: 提交用户名 + 密码
S->>S: 验证密码 ✅
S->>U: 请求 TOTP 验证码
App->>App: HMAC-SHA1(secret, time/30) → 6位数字
U->>S: 输入 6 位验证码
S->>S: HMAC-SHA1(secret, time/30) → 6位数字
S->>S: 比较验证码(允许 ±1 窗口)
S->>U: 登录成功 ✅
MFA 注册流程
sequenceDiagram
participant U as 用户
participant B as 浏览器
participant S as 服务端
participant DB as 数据库
U->>B: 点击"启用 MFA"
B->>S: POST /mfa/enroll
S->>S: 生成 TOTP secret
S->>S: 生成备用码(10 个)
S->>DB: 临时存储 secret(待确认)
S->>B: 返回二维码 URI + 备用码
B->>U: 展示二维码 + 备用码
U->>U: 用认证器 App 扫码
U->>B: 输入当前 TOTP 验证码确认
B->>S: POST /mfa/confirm {code}
S->>S: 验证 TOTP 码
alt 验证成功
S->>DB: 标记 MFA 已启用
S->>B: MFA 启用成功 ✅
else 验证失败
S->>DB: 删除临时 secret
S->>B: 验证码错误,请重试 ❌
end
MFA 决策流程
flowchart TD
A[用户登录请求] --> B{密码验证通过?}
B -->|否| C[登录失败 ❌]
B -->|是| D[收集上下文信息<br/>IP / 设备 / 位置 / 时间]
D --> E{风险评估}
E -->|低风险<br/>已知设备+常用IP| F[直接放行 ✅]
E -->|中风险<br/>新设备或新IP| G[要求 TOTP 验证]
E -->|高风险<br/>异地登录| H[要求 TOTP + 邮件验证]
E -->|极高风险<br/>不可能旅行| I[要求 FIDO2 + 管理员审批]
G --> J{MFA 验证通过?}
H --> J
I --> J
J -->|是| K[登录成功 ✅<br/>记录设备指纹]
J -->|否| L[登录失败 ❌<br/>触发告警]
Python TOTP 完整实现
import pyotp
import qrcode
import io
import base64
import secrets
class TOTPManager:
"""TOTP 多因素认证管理器"""
def __init__(self, issuer: str = "MyApp"):
self.issuer = issuer
def generate_secret(self) -> str:
"""生成 Base32 编码的共享密钥"""
return pyotp.random_base32()
def generate_backup_codes(self, count: int = 10) -> list[str]:
"""生成一次性备用码"""
return [secrets.token_hex(4).upper() for _ in range(count)]
def get_provisioning_uri(self, secret: str, email: str) -> str:
"""生成二维码 URI(兼容 Google Authenticator / Authy 等)"""
totp = pyotp.TOTP(secret)
return totp.provisioning_uri(name=email, issuer_name=self.issuer)
def generate_qr_base64(self, uri: str) -> str:
"""生成二维码的 Base64 编码(可直接嵌入 HTML <img>)"""
qr = qrcode.QRCode(version=1, box_size=10, border=5)
qr.add_data(uri)
qr.make(fit=True)
img = qr.make_image(fill_color="black", back_color="white")
buffer = io.BytesIO()
img.save(buffer, format="PNG")
return base64.b64encode(buffer.getvalue()).decode()
def verify_code(self, secret: str, code: str, valid_window: int = 1) -> bool:
"""
验证 TOTP 验证码
valid_window=1 表示允许前后各 30 秒的偏差
"""
totp = pyotp.TOTP(secret)
return totp.verify(code, valid_window=valid_window)
def get_current_code(self, secret: str) -> str:
"""获取当前验证码(仅用于测试)"""
totp = pyotp.TOTP(secret)
return totp.now()
# --- 使用示例 ---
manager = TOTPManager(issuer="SecurityHandbook")
# 1. 注册阶段
secret = manager.generate_secret()
backup_codes = manager.generate_backup_codes()
uri = manager.get_provisioning_uri(secret, "walter@example.com")
qr_b64 = manager.generate_qr_base64(uri)
print(f"密钥: {secret}")
print(f"URI: {uri}")
print(f"备用码: {backup_codes}")
# 2. 验证阶段
current_code = manager.get_current_code(secret)
print(f"当前验证码: {current_code}")
print(f"验证结果: {manager.verify_code(secret, current_code)}")
FastAPI MFA 端点
from fastapi import FastAPI, Depends, HTTPException, status
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
from pydantic import BaseModel
import pyotp
import secrets
from datetime import datetime
app = FastAPI(title="MFA Service")
security = HTTPBearer()
# --- 模型 ---
class MFAEnrollResponse(BaseModel):
qr_uri: str
secret: str # 生产环境不应返回,仅供手动输入
backup_codes: list[str]
class MFAVerifyRequest(BaseModel):
code: str
class BackupCodeRequest(BaseModel):
backup_code: str
# --- 模拟数据库 ---
users_db: dict = {}
def get_current_user(cred: HTTPAuthorizationCredentials = Depends(security)) -> str:
"""从 Bearer Token 中提取用户 ID(简化示例)"""
# 生产环境应验证 JWT
return cred.credentials # 此处直接用 token 当 user_id
# --- MFA 注册 ---
@app.post("/mfa/enroll", response_model=MFAEnrollResponse)
async def enroll_mfa(user_id: str = Depends(get_current_user)):
"""启用 MFA:生成密钥和备用码"""
secret = pyotp.random_base32()
backup_codes = [secrets.token_hex(4).upper() for _ in range(10)]
totp = pyotp.TOTP(secret)
uri = totp.provisioning_uri(name=user_id, issuer_name="MyApp")
# 临时存储,等待确认
users_db[user_id] = {
"mfa_secret": secret,
"mfa_enabled": False, # 尚未确认
"backup_codes": backup_codes,
"enrolled_at": None,
}
return MFAEnrollResponse(qr_uri=uri, secret=secret, backup_codes=backup_codes)
# --- MFA 确认(用户扫码后输入验证码确认) ---
@app.post("/mfa/confirm")
async def confirm_mfa(
req: MFAVerifyRequest,
user_id: str = Depends(get_current_user),
):
"""确认 MFA 注册:用户输入验证码以证明已正确配置"""
user = users_db.get(user_id)
if not user or not user.get("mfa_secret"):
raise HTTPException(status_code=400, detail="请先调用 /mfa/enroll")
totp = pyotp.TOTP(user["mfa_secret"])
if not totp.verify(req.code, valid_window=1):
raise HTTPException(status_code=400, detail="验证码错误")
user["mfa_enabled"] = True
user["enrolled_at"] = datetime.utcnow().isoformat()
return {"message": "MFA 已启用 ✅"}
# --- MFA 验证(登录时) ---
@app.post("/mfa/verify")
async def verify_mfa(
req: MFAVerifyRequest,
user_id: str = Depends(get_current_user),
):
"""登录时验证 TOTP 验证码"""
user = users_db.get(user_id)
if not user or not user.get("mfa_enabled"):
raise HTTPException(status_code=400, detail="MFA 未启用")
totp = pyotp.TOTP(user["mfa_secret"])
if not totp.verify(req.code, valid_window=1):
raise HTTPException(status_code=401, detail="验证码错误或已过期")
return {"message": "MFA 验证通过 ✅", "user_id": user_id}
# --- 备用码验证 ---
@app.post("/mfa/backup")
async def verify_backup_code(
req: BackupCodeRequest,
user_id: str = Depends(get_current_user),
):
"""使用一次性备用码(丢失手机时使用)"""
user = users_db.get(user_id)
if not user or not user.get("mfa_enabled"):
raise HTTPException(status_code=400, detail="MFA 未启用")
code = req.backup_code.upper()
if code not in user["backup_codes"]:
raise HTTPException(status_code=401, detail="备用码无效")
# 备用码只能使用一次
user["backup_codes"].remove(code)
remaining = len(user["backup_codes"])
return {
"message": "备用码验证通过 ✅",
"remaining_backup_codes": remaining,
"warning": "请尽快重新绑定认证器" if remaining < 3 else None,
}
# --- 禁用 MFA ---
@app.delete("/mfa")
async def disable_mfa(
req: MFAVerifyRequest,
user_id: str = Depends(get_current_user),
):
"""禁用 MFA(需要当前验证码确认)"""
user = users_db.get(user_id)
if not user or not user.get("mfa_enabled"):
raise HTTPException(status_code=400, detail="MFA 未启用")
totp = pyotp.TOTP(user["mfa_secret"])
if not totp.verify(req.code, valid_window=1):
raise HTTPException(status_code=401, detail="验证码错误")
user["mfa_enabled"] = False
user["mfa_secret"] = None
user["backup_codes"] = []
return {"message": "MFA 已禁用"}
Java TOTP 实现
使用 dev.samstevens.totp 库实现 Google Authenticator 兼容的 TOTP:
<!-- pom.xml 依赖 -->
<dependency>
<groupId>dev.samstevens.totp</groupId>
<artifactId>totp</artifactId>
<version>1.7.1</version>
</dependency>
import dev.samstevens.totp.code.*;
import dev.samstevens.totp.exceptions.QrGenerationException;
import dev.samstevens.totp.qr.QrData;
import dev.samstevens.totp.qr.QrGenerator;
import dev.samstevens.totp.qr.ZxingPngQrGenerator;
import dev.samstevens.totp.secret.DefaultSecretGenerator;
import dev.samstevens.totp.secret.SecretGenerator;
import dev.samstevens.totp.time.SystemTimeProvider;
import dev.samstevens.totp.time.TimeProvider;
import dev.samstevens.totp.util.Utils;
import java.security.SecureRandom;
import java.util.ArrayList;
import java.util.List;
public class TotpService {
private final SecretGenerator secretGenerator = new DefaultSecretGenerator(32);
private final QrGenerator qrGenerator = new ZxingPngQrGenerator();
private final TimeProvider timeProvider = new SystemTimeProvider();
private final String issuer;
public TotpService(String issuer) {
this.issuer = issuer;
}
/** 生成 TOTP 密钥 */
public String generateSecret() {
return secretGenerator.generate();
}
/** 生成一次性备用码 */
public List<String> generateBackupCodes(int count) {
SecureRandom random = new SecureRandom();
List<String> codes = new ArrayList<>();
for (int i = 0; i < count; i++) {
codes.add(String.format("%08X", random.nextInt()));
}
return codes;
}
/** 生成二维码的 Base64 PNG 数据(可直接嵌入 <img> 标签) */
public String generateQrCodeBase64(String secret, String email)
throws QrGenerationException {
QrData data = new QrData.Builder()
.label(email)
.secret(secret)
.issuer(issuer)
.algorithm(HashingAlgorithm.SHA1)
.digits(6)
.period(30)
.build();
byte[] imageData = qrGenerator.generate(data);
return Utils.getDataUriForImage(imageData, qrGenerator.getImageMimeType());
}
/** 验证 TOTP 验证码 */
public boolean verifyCode(String secret, String code) {
CodeVerifier verifier = new DefaultCodeVerifier(
new DefaultCodeGenerator(), timeProvider);
// 允许前后各 1 个时间窗口(±30 秒)
((DefaultCodeVerifier) verifier).setAllowedTimePeriodDiscrepancy(1);
return verifier.isValidCode(secret, code);
}
}
Spring Security MFA 集成
import org.springframework.security.authentication.AuthenticationProvider;
import org.springframework.security.authentication.BadCredentialsException;
import org.springframework.security.core.Authentication;
import org.springframework.security.core.AuthenticationException;
import org.springframework.stereotype.Component;
/**
* 自定义 MFA 认证 Token
*/
public class MfaAuthenticationToken extends AbstractAuthenticationToken {
private final String userId;
private final String totpCode;
public MfaAuthenticationToken(String userId, String totpCode) {
super(null);
this.userId = userId;
this.totpCode = totpCode;
setAuthenticated(false);
}
@Override public Object getCredentials() { return totpCode; }
@Override public Object getPrincipal() { return userId; }
}
/**
* MFA 认证 Provider:在密码认证通过后进行 TOTP 二次验证
*/
@Component
public class MfaAuthenticationProvider implements AuthenticationProvider {
private final TotpService totpService;
private final UserRepository userRepository;
public MfaAuthenticationProvider(TotpService totpService,
UserRepository userRepository) {
this.totpService = totpService;
this.userRepository = userRepository;
}
@Override
public Authentication authenticate(Authentication auth)
throws AuthenticationException {
String userId = (String) auth.getPrincipal();
String code = (String) auth.getCredentials();
UserEntity user = userRepository.findById(userId)
.orElseThrow(() -> new BadCredentialsException("用户不存在"));
if (!user.isMfaEnabled()) {
throw new BadCredentialsException("MFA 未启用");
}
if (!totpService.verifyCode(user.getMfaSecret(), code)) {
throw new BadCredentialsException("TOTP 验证码错误");
}
MfaAuthenticationToken result =
new MfaAuthenticationToken(userId, null);
result.setAuthenticated(true);
return result;
}
@Override
public boolean supports(Class<?> authentication) {
return MfaAuthenticationToken.class.isAssignableFrom(authentication);
}
}
/**
* MFA REST 控制器
*/
@RestController
@RequestMapping("/api/mfa")
public class MfaController {
private final TotpService totpService;
private final UserRepository userRepository;
public MfaController(TotpService totpService,
UserRepository userRepository) {
this.totpService = totpService;
this.userRepository = userRepository;
}
/** 注册 MFA */
@PostMapping("/enroll")
public ResponseEntity<?> enroll(@AuthenticationPrincipal UserDetails principal)
throws Exception {
String secret = totpService.generateSecret();
List<String> backupCodes = totpService.generateBackupCodes(10);
String qrCode = totpService.generateQrCodeBase64(
secret, principal.getUsername());
// 临时存储 secret,等待确认
UserEntity user = userRepository.findByUsername(principal.getUsername());
user.setPendingMfaSecret(secret);
user.setBackupCodes(backupCodes);
userRepository.save(user);
return ResponseEntity.ok(Map.of(
"qrCode", qrCode,
"backupCodes", backupCodes
));
}
/** 确认 MFA */
@PostMapping("/confirm")
public ResponseEntity<?> confirm(
@AuthenticationPrincipal UserDetails principal,
@RequestBody Map<String, String> body) {
UserEntity user = userRepository.findByUsername(principal.getUsername());
String code = body.get("code");
if (!totpService.verifyCode(user.getPendingMfaSecret(), code)) {
return ResponseEntity.badRequest().body(Map.of("error", "验证码错误"));
}
user.setMfaSecret(user.getPendingMfaSecret());
user.setPendingMfaSecret(null);
user.setMfaEnabled(true);
userRepository.save(user);
return ResponseEntity.ok(Map.of("message", "MFA 已启用 ✅"));
}
/** 验证 MFA */
@PostMapping("/verify")
public ResponseEntity<?> verify(
@AuthenticationPrincipal UserDetails principal,
@RequestBody Map<String, String> body) {
UserEntity user = userRepository.findByUsername(principal.getUsername());
String code = body.get("code");
if (!user.isMfaEnabled()) {
return ResponseEntity.badRequest().body(Map.of("error", "MFA 未启用"));
}
if (!totpService.verifyCode(user.getMfaSecret(), code)) {
return ResponseEntity.status(401).body(Map.of("error", "验证码错误"));
}
return ResponseEntity.ok(Map.of("message", "MFA 验证通过 ✅"));
}
}
Go TOTP 实现
使用 pquerna/otp 库:
package mfa
import (
"crypto/rand"
"encoding/hex"
"fmt"
"image/png"
"net/http"
"strings"
"sync"
"time"
"github.com/gin-gonic/gin"
"github.com/pquerna/otp"
"github.com/pquerna/otp/totp"
)
// TOTPManager 管理 TOTP 操作
type TOTPManager struct {
Issuer string
}
// GenerateSecret 生成 TOTP 密钥和二维码
func (m *TOTPManager) GenerateSecret(email string) (*otp.Key, error) {
key, err := totp.Generate(totp.GenerateOpts{
Issuer: m.Issuer,
AccountName: email,
Period: 30,
Digits: otp.DigitsSix,
Algorithm: otp.AlgorithmSHA1,
})
return key, err
}
// GenerateBackupCodes 生成一次性备用码
func (m *TOTPManager) GenerateBackupCodes(count int) ([]string, error) {
codes := make([]string, count)
for i := 0; i < count; i++ {
b := make([]byte, 4)
if _, err := rand.Read(b); err != nil {
return nil, err
}
codes[i] = strings.ToUpper(hex.EncodeToString(b))
}
return codes, nil
}
// ValidateCode 验证 TOTP 验证码
func (m *TOTPManager) ValidateCode(secret, code string) bool {
valid, err := totp.ValidateCustom(code, secret, time.Now(), totp.ValidateOpts{
Period: 30,
Skew: 1, // 允许前后各 1 个时间窗口
Digits: otp.DigitsSix,
Algorithm: otp.AlgorithmSHA1,
})
return err == nil && valid
}
// WriteQRCodePNG 将二维码写入 HTTP 响应
func WriteQRCodePNG(w http.ResponseWriter, key *otp.Key) error {
img, err := key.Image(200, 200)
if err != nil {
return err
}
w.Header().Set("Content-Type", "image/png")
return png.Encode(w, img)
}
// ---- Gin MFA 中间件与路由 ----
// UserMFA 用户 MFA 信息(模拟数据库)
type UserMFA struct {
Secret string
Enabled bool
BackupCodes []string
}
var (
mfaStore = make(map[string]*UserMFA)
mu sync.RWMutex
)
// RegisterMFARoutes 注册 MFA 相关路由
func RegisterMFARoutes(r *gin.RouterGroup) {
manager := &TOTPManager{Issuer: "MyApp"}
// 注册 MFA
r.POST("/mfa/enroll", func(c *gin.Context) {
userID := c.GetString("user_id") // 从认证中间件获取
key, err := manager.GenerateSecret(userID)
if err != nil {
c.JSON(http.StatusInternalServerError,
gin.H{"error": "生成密钥失败"})
return
}
backupCodes, _ := manager.GenerateBackupCodes(10)
mu.Lock()
mfaStore[userID] = &UserMFA{
Secret: key.Secret(),
Enabled: false,
BackupCodes: backupCodes,
}
mu.Unlock()
c.JSON(http.StatusOK, gin.H{
"qr_uri": key.URL(),
"secret": key.Secret(),
"backup_codes": backupCodes,
})
})
// 确认 MFA
r.POST("/mfa/confirm", func(c *gin.Context) {
userID := c.GetString("user_id")
var req struct {
Code string `json:"code" binding:"required"`
}
if err := c.ShouldBindJSON(&req); err != nil {
c.JSON(http.StatusBadRequest, gin.H{"error": "缺少验证码"})
return
}
mu.RLock()
user, exists := mfaStore[userID]
mu.RUnlock()
if !exists {
c.JSON(http.StatusBadRequest, gin.H{"error": "请先注册 MFA"})
return
}
if !manager.ValidateCode(user.Secret, req.Code) {
c.JSON(http.StatusBadRequest, gin.H{"error": "验证码错误"})
return
}
mu.Lock()
user.Enabled = true
mu.Unlock()
c.JSON(http.StatusOK, gin.H{"message": "MFA 已启用 ✅"})
})
// 验证 MFA
r.POST("/mfa/verify", func(c *gin.Context) {
userID := c.GetString("user_id")
var req struct {
Code string `json:"code" binding:"required"`
}
if err := c.ShouldBindJSON(&req); err != nil {
c.JSON(http.StatusBadRequest, gin.H{"error": "缺少验证码"})
return
}
mu.RLock()
user, exists := mfaStore[userID]
mu.RUnlock()
if !exists || !user.Enabled {
c.JSON(http.StatusBadRequest, gin.H{"error": "MFA 未启用"})
return
}
if !manager.ValidateCode(user.Secret, req.Code) {
c.JSON(http.StatusUnauthorized, gin.H{"error": "验证码错误"})
return
}
c.JSON(http.StatusOK, gin.H{"message": "MFA 验证通过 ✅"})
})
// 备用码验证
r.POST("/mfa/backup", func(c *gin.Context) {
userID := c.GetString("user_id")
var req struct {
BackupCode string `json:"backup_code" binding:"required"`
}
if err := c.ShouldBindJSON(&req); err != nil {
c.JSON(http.StatusBadRequest, gin.H{"error": "缺少备用码"})
return
}
mu.Lock()
defer mu.Unlock()
user, exists := mfaStore[userID]
if !exists || !user.Enabled {
c.JSON(http.StatusBadRequest, gin.H{"error": "MFA 未启用"})
return
}
code := strings.ToUpper(req.BackupCode)
found := -1
for i, bc := range user.BackupCodes {
if bc == code {
found = i
break
}
}
if found == -1 {
c.JSON(http.StatusUnauthorized, gin.H{"error": "备用码无效"})
return
}
// 删除已使用的备用码
user.BackupCodes = append(
user.BackupCodes[:found], user.BackupCodes[found+1:]...)
c.JSON(http.StatusOK, gin.H{
"message": "备用码验证通过 ✅",
"remaining_backup_codes": len(user.BackupCodes),
})
})
}
// MFARequiredMiddleware Gin 中间件:要求已完成 MFA 验证
func MFARequiredMiddleware() gin.HandlerFunc {
return func(c *gin.Context) {
mfaVerified, exists := c.Get("mfa_verified")
if !exists || mfaVerified != true {
c.AbortWithStatusJSON(http.StatusForbidden, gin.H{
"error": "需要完成 MFA 验证",
})
return
}
c.Next()
}
}
10.4 FIDO2/WebAuthn — 无密码认证
FIDO2 是下一代认证标准,由 W3C WebAuthn 和 FIDO CTAP 组成:
flowchart TB
subgraph FIDO2
direction LR
WA[WebAuthn<br/>浏览器 API<br/>W3C 标准]
CTAP[CTAP<br/>客户端到认证器协议<br/>FIDO Alliance 标准]
end
注册流程
sequenceDiagram
participant U as 用户
participant B as 浏览器
participant S as 服务器 (Relying Party)
B->>S: 1. 请求注册
S->>B: 2. 返回挑战 (challenge + user info)
B->>U: 3. 提示用户验证身份
U->>B: 4. 生物识别或 PIN
B->>B: 5. 创建密钥对
B->>S: 6. 发送公钥 + 签名
S->>S: 7. 验证并存储公钥
S->>B: 8. 注册成功 ✅
Passkey
Passkey 是 FIDO2 的消费者友好版本:
特性 |
传统密码 |
TOTP |
Passkey |
|---|---|---|---|
防钓鱼 |
❌ |
❌ |
✅ |
防重放 |
❌ |
✅ |
✅ |
用户体验 |
差 |
中等 |
优秀 |
跨设备 |
✅ |
需同步 |
✅(云同步) |
无密码 |
❌ |
❌ |
✅ |
10.5 MFA 方式对比
方式 |
安全性 |
防钓鱼 |
用户体验 |
部署成本 |
离线可用 |
适用场景 |
|---|---|---|---|---|---|---|
TOTP |
⭐⭐⭐ |
❌ |
中等 |
低 |
✅ |
通用,最广泛 |
SMS 验证码 |
⭐⭐ |
❌ |
好 |
中 |
❌ |
消费者应用(不推荐高安全场景) |
推送通知 |
⭐⭐⭐ |
❌ |
优秀 |
中 |
❌ |
企业应用(需防 MFA 疲劳) |
WebAuthn/Passkey |
⭐⭐⭐⭐⭐ |
✅ |
优秀 |
中 |
✅ |
高安全场景,未来趋势 |
硬件密钥 (YubiKey) |
⭐⭐⭐⭐⭐ |
✅ |
中等 |
高 |
✅ |
高价值目标(管理员、金融) |
选型建议:
消费者应用:TOTP 为主,逐步迁移到 Passkey
企业内部:推送通知 + 号码匹配,或 WebAuthn
高安全场景(管理员、金融):硬件密钥(YubiKey)必选
避免使用 SMS:SIM 卡劫持(SIM Swapping)风险高
10.6 MFA 疲劳攻击与防御
MFA 疲劳攻击(MFA Fatigue / MFA Bombing):攻击者反复触发 MFA 推送通知,直到用户烦躁地点击”批准”。
sequenceDiagram
participant A as 攻击者
participant S as 服务器
participant U as 用户手机
A->>S: 1. 用泄露的密码尝试登录
S->>U: 推送 MFA 通知 #1
U->>U: 用户拒绝 ❌
A->>S: 2. 再次尝试登录
S->>U: 推送 MFA 通知 #2
U->>U: 用户拒绝 ❌
A->>S: 3. 反复尝试...
S->>U: 推送 MFA 通知 #N
Note over U: 用户疲劳,误点"批准"
U->>S: 批准 ✅
S->>A: 登录成功 → 攻击得手 💀
防御措施:
✅ 使用号码匹配(Number Matching):用户必须输入屏幕上显示的数字
✅ 限制推送频率
✅ 显示登录上下文(IP、位置、设备)
✅ 使用 FIDO2 替代推送通知
✅ 异常检测:短时间内多次 MFA 请求告警
10.7 自适应认证
根据风险等级动态调整认证强度:
from dataclasses import dataclass
from enum import Enum
class RiskLevel(Enum):
LOW = "low"
MEDIUM = "medium"
HIGH = "high"
CRITICAL = "critical"
@dataclass
class LoginContext:
ip_address: str
device_id: str
location: str
timestamp: float
user_agent: str
def assess_risk(user_id: str, context: LoginContext) -> RiskLevel:
"""评估登录风险"""
risk_score = 0.0
# 检查 IP 是否已知
if not is_known_ip(user_id, context.ip_address):
risk_score += 0.3
# 检查设备是否已知
if not is_known_device(user_id, context.device_id):
risk_score += 0.3
# 检查地理位置异常(不可能旅行)
if is_impossible_travel(user_id, context.location, context.timestamp):
risk_score += 0.4
# 检查是否在异常时间
if is_unusual_time(user_id, context.timestamp):
risk_score += 0.1
if risk_score < 0.2:
return RiskLevel.LOW
elif risk_score < 0.5:
return RiskLevel.MEDIUM
elif risk_score < 0.8:
return RiskLevel.HIGH
else:
return RiskLevel.CRITICAL
def get_required_auth(risk: RiskLevel) -> list[str]:
"""根据风险等级决定认证要求"""
requirements = {
RiskLevel.LOW: ["password"],
RiskLevel.MEDIUM: ["password", "totp"],
RiskLevel.HIGH: ["password", "totp", "email_verification"],
RiskLevel.CRITICAL: ["password", "fido2", "admin_approval"],
}
return requirements[risk]
10.8 MFA 用户体验最佳实践
注册引导
渐进式引导:不要在注册时强制启用 MFA,在首次登录后温和提示
清晰的说明:用截图或动画演示如何使用认证器 App
多种选择:提供 TOTP、推送、硬件密钥等多种方式
备用码提醒:强调保存备用码的重要性,提供下载/打印选项
日常使用
场景 |
推荐做法 |
避免 |
|---|---|---|
已知设备 |
记住设备 30 天 |
每次都要求 MFA |
敏感操作 |
即使已登录也要求 MFA |
仅在登录时验证 |
丢失设备 |
备用码 → 客服重置流程 |
无恢复途径 |
新设备 |
必须 MFA + 邮件通知 |
静默放行 |
恢复流程
flowchart TD
A[用户无法完成 MFA] --> B{有备用码?}
B -->|是| C[使用备用码登录]
C --> D[提示重新绑定认证器]
B -->|否| E{有备用邮箱/手机?}
E -->|是| F[通过备用渠道验证身份]
F --> G[临时禁用 MFA]
G --> D
E -->|否| H[联系客服]
H --> I[人工身份验证<br/>身份证/视频通话]
I --> J[管理员重置 MFA]
J --> D
10.9 小结
MFA 通过组合多种认证因素大幅提升安全性
Argon2id 是密码哈希的现代首选
TOTP 是最普及的 MFA 方式,但无法防钓鱼
FIDO2/Passkey 是无密码认证的未来,天然防钓鱼
自适应认证 根据风险动态调整认证强度,平衡安全与体验
警惕 MFA 疲劳攻击,使用号码匹配等防御措施
选择 MFA 方式时需综合考虑安全性、用户体验和部署成本
完善的 恢复流程 与 MFA 本身同样重要