第二十六章:API 安全设计

“API 是现代应用的前门,也是攻击者最喜欢的入口。”

        mindmap
  root((API 安全))
    OWASP API Top 10
      BOLA
      认证失败
      过度数据暴露
      资源消耗
    设计原则
      输入验证
      输出编码
      速率限制
      幂等性
    协议安全
      REST
      GraphQL
      gRPC
    测试
      DAST
      Fuzzing
      渗透测试
    

26.1 API 安全层次模型

API 安全不是单一措施,而是多层防御的组合。每一层都承担特定的安全职责:

        flowchart TB
    A["🌐 传输层<br/>TLS 1.3 · 证书固定 · mTLS"] --> B["🔑 认证层<br/>OAuth2 · JWT · API Key · mTLS"]
    B --> C["🛡️ 授权层<br/>RBAC · ABAC · BOLA 检查"]
    C --> D["✅ 输入验证层<br/>Schema 验证 · 类型检查 · 注入防护"]
    D --> E["⚙️ 业务逻辑层<br/>幂等性 · 事务完整性"]
    E --> F["📤 输出编码层<br/>响应过滤 · DTO · 敏感数据脱敏"]

    style A fill:#4a90d9,color:#fff
    style B fill:#e67e22,color:#fff
    style C fill:#e74c3c,color:#fff
    style D fill:#f1c40f,color:#333
    style E fill:#2ecc71,color:#fff
    style F fill:#9b59b6,color:#fff
    

26.2 API Gateway 安全架构

        flowchart LR
    Client[客户端] --> GW[API Gateway]

    subgraph Gateway["API Gateway 安全层"]
        direction TB
        TLS[TLS 终止]
        RL[速率限制<br/>Token Bucket]
        AUTH[认证<br/>JWT 验证]
        AUTHZ[授权<br/>Scope/Role 检查]
        VAL[请求验证<br/>Schema Validation]
        LOG[审计日志]
        TLS --> RL --> AUTH --> AUTHZ --> VAL --> LOG
    end

    GW --> Gateway

    LOG --> SvcA[用户服务]
    LOG --> SvcB[订单服务]
    LOG --> SvcC[支付服务]

    style Gateway fill:#f0f0f0,stroke:#333
    style RL fill:#ff9,stroke:#333
    style AUTH fill:#f96,stroke:#333
    style AUTHZ fill:#f96,stroke:#333
    

26.3 Rate Limiting 算法对比

        flowchart LR
    subgraph TB["Token Bucket 算法"]
        direction TB
        TB1["令牌以固定速率填充"] --> TB2["每个请求消耗一个令牌"]
        TB2 --> TB3{"桶中有令牌?"}
        TB3 -->|是| TB4["✅ 允许请求"]
        TB3 -->|否| TB5["❌ 拒绝 (429)"]
    end

    subgraph SW["Sliding Window 算法"]
        direction TB
        SW1["维护时间窗口内的请求计数"] --> SW2["新请求到达"]
        SW2 --> SW3{"窗口内计数 < 限制?"}
        SW3 -->|是| SW4["✅ 允许并计数+1"]
        SW3 -->|否| SW5["❌ 拒绝 (429)"]
    end

    style TB4 fill:#2ecc71,color:#fff
    style TB5 fill:#e74c3c,color:#fff
    style SW4 fill:#2ecc71,color:#fff
    style SW5 fill:#e74c3c,color:#fff
    

特性

Token Bucket

Sliding Window Log

Sliding Window Counter

Fixed Window

突发流量处理

✅ 允许突发(桶容量)

❌ 严格限制

⚠️ 近似

⚠️ 边界突发

内存开销

低(2 个变量)

高(存储每个请求时间戳)

低(2 个计数器)

极低(1 个计数器)

精确度

极高

低(边界问题)

分布式实现

⚠️ 需 Redis Lua

⚠️ 需 Redis Sorted Set

✅ Redis INCR

✅ Redis INCR

适用场景

API 通用限流

金融/支付 API

大规模 API

简单场景


26.4 OWASP API Security Top 10(2023)

排名

风险

描述

防御

API1

BOLA

对象级授权失效

每个请求检查资源所有权

API2

认证失效

认证机制缺陷

OAuth2 + MFA

API3

对象属性级授权

返回过多属性

响应过滤、DTO

API4

不受限资源消耗

无速率限制

Rate Limiting

API5

功能级授权失效

缺少功能权限检查

RBAC/ABAC

API6

服务端请求伪造

SSRF

URL 白名单

API7

安全配置错误

默认配置不安全

安全基线

API8

缺乏自动化威胁防护

无 Bot 防护

WAF + 行为分析

API9

资产管理不当

影子 API、过时版本

API 清单管理

API10

不安全的 API 消费

信任第三方 API

输入验证

26.4.1 OWASP API Top 10 逐条解析与防御代码

API1: BOLA(Broken Object Level Authorization)

最常见的 API 安全问题 — 攻击者修改请求中的对象 ID 即可访问他人数据。

# ❌ 危险:没有检查资源所有权
@app.get("/api/orders/{order_id}")
async def get_order(order_id: str, user: User = Depends(get_current_user)):
    order = db.get_order(order_id)
    return order  # 任何用户都能查看任何订单!

# ✅ 安全:检查资源所有权
@app.get("/api/orders/{order_id}")
async def get_order(order_id: str, user: User = Depends(get_current_user)):
    order = db.get_order(order_id)
    if order.user_id != user.id:
        raise HTTPException(403, "Access denied")
    return order

# ✅ 更好:使用 OpenFGA 检查权限
@app.get("/api/orders/{order_id}")
async def get_order(order_id: str, user: User = Depends(get_current_user)):
    await authz.require(user.id, "can_view", f"order:{order_id}")
    order = db.get_order(order_id)
    return order

API2: Broken Authentication — 认证失效

/**
 * 防御:强制使用安全的认证流程
 * - 短生命周期 Access Token + Refresh Token 轮换
 * - 登录失败锁定
 */
@Configuration
public class AuthSecurityConfig {

    @Bean
    public SecurityFilterChain authFilterChain(HttpSecurity http) throws Exception {
        http
            .oauth2ResourceServer(oauth2 -> oauth2
                .jwt(jwt -> jwt
                    .decoder(jwtDecoder())
                    // 验证 token 未被撤销
                    .jwtAuthenticationConverter(jwtConverter())
                )
            )
            .sessionManagement(s ->
                s.sessionCreationPolicy(SessionCreationPolicy.STATELESS)
            );
        return http.build();
    }

    /**
     * Token 黑名单检查(用于登出/撤销场景)
     */
    @Bean
    public JwtDecoder jwtDecoder() {
        NimbusJwtDecoder decoder = NimbusJwtDecoder
            .withJwkSetUri("https://auth.example.com/.well-known/jwks.json")
            .build();

        // 添加自定义验证器:检查 token 是否在黑名单中
        decoder.setJwtValidator(new DelegatingOAuth2TokenValidator<>(
            JwtValidators.createDefaultWithIssuer("https://auth.example.com"),
            new TokenBlacklistValidator(tokenBlacklistService)
        ));
        return decoder;
    }
}

API3: Broken Object Property Level Authorization — 属性级授权

// 防御:使用 DTO 模式,不同角色返回不同字段

// UserPublicDTO 公开信息(所有人可见)
type UserPublicDTO struct {
	ID       string `json:"id"`
	Username string `json:"username"`
	Avatar   string `json:"avatar"`
}

// UserPrivateDTO 私有信息(仅本人可见)
type UserPrivateDTO struct {
	UserPublicDTO
	Email       string `json:"email"`
	Phone       string `json:"phone"`
	Address     string `json:"address"`
}

// UserAdminDTO 管理信息(仅管理员可见)
type UserAdminDTO struct {
	UserPrivateDTO
	CreatedAt   time.Time `json:"created_at"`
	LastLoginIP string    `json:"last_login_ip"`
	IsBlocked   bool      `json:"is_blocked"`
}

func getUser(c *gin.Context) {
	targetID := c.Param("id")
	user, err := userRepo.FindByID(targetID)
	if err != nil {
		c.JSON(http.StatusNotFound, gin.H{"error": "user not found"})
		return
	}

	roles, _ := c.Get("roles")
	currentUserID, _ := c.Get("userID")

	switch {
	case containsRole(roles.([]string), "admin"):
		// 管理员看到所有字段
		c.JSON(http.StatusOK, toAdminDTO(user))
	case currentUserID == targetID:
		// 本人看到私有字段
		c.JSON(http.StatusOK, toPrivateDTO(user))
	default:
		// 其他人只看到公开字段
		c.JSON(http.StatusOK, toPublicDTO(user))
	}
}

API4: Unrestricted Resource Consumption — 不受限资源消耗

"""
多维度速率限制:按 IP、按用户、按端点分别限制
"""
from fastapi import FastAPI, Request
from slowapi import Limiter
from slowapi.util import get_remote_address

limiter = Limiter(
    key_func=get_remote_address,
    storage_uri="redis://localhost:6379",
)

app = FastAPI()
app.state.limiter = limiter

# 全局限制
@app.get("/api/data")
@limiter.limit("100/hour")
async def get_data(request: Request):
    return {"data": "..."}

# 敏感操作更严格
@app.post("/api/login")
@limiter.limit("5/minute;20/hour")  # 多级限制
async def login(request: Request):
    return {"token": "..."}

# 昂贵操作
@app.post("/api/reports/generate")
@limiter.limit("3/hour")
async def generate_report(request: Request):
    return {"report_id": "..."}

# 分页限制:防止一次请求过多数据
from pydantic import Field
from typing import Annotated

class PaginationParams:
    def __init__(
        self,
        page: Annotated[int, Field(ge=1, le=1000)] = 1,
        size: Annotated[int, Field(ge=1, le=100)] = 20,  # 最大 100 条
    ):
        self.page = page
        self.size = size
        self.offset = (page - 1) * size

API5: Broken Function Level Authorization — 功能级授权失效

/**
 * 防御:使用 Spring Security 方法级安全 + 自定义注解
 */
@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
@PreAuthorize("hasRole('ADMIN')")
public @interface AdminOnly {}

@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
@PreAuthorize("hasAnyRole('ADMIN', 'MANAGER')")
public @interface ManagerOrAbove {}

@RestController
@RequestMapping("/api")
public class UserController {

    @GetMapping("/users")
    @ManagerOrAbove  // 只有 MANAGER 和 ADMIN 可以列出所有用户
    public List<UserDTO> listUsers() { /* ... */ }

    @DeleteMapping("/users/{id}")
    @AdminOnly  // 只有 ADMIN 可以删除用户
    public void deleteUser(@PathVariable String id) { /* ... */ }

    @GetMapping("/users/me")
    // 任何已认证用户都可以查看自己的信息
    public UserDTO getMyProfile(@AuthenticationPrincipal Jwt jwt) {
        return userService.findById(jwt.getSubject());
    }
}

API6: Server Side Request Forgery (SSRF)

// 防御:URL 白名单 + 禁止内网访问

import (
	"fmt"
	"net"
	"net/url"
	"strings"
)

// allowedHosts 允许访问的外部主机白名单
var allowedHosts = map[string]bool{
	"api.trusted-partner.com": true,
	"cdn.example.com":         true,
}

// ValidateURL 验证 URL 是否安全(防止 SSRF)
func ValidateURL(rawURL string) error {
	parsed, err := url.Parse(rawURL)
	if err != nil {
		return fmt.Errorf("invalid URL: %w", err)
	}

	// 1. 只允许 HTTPS
	if parsed.Scheme != "https" {
		return fmt.Errorf("only HTTPS is allowed")
	}

	// 2. 检查白名单
	host := parsed.Hostname()
	if !allowedHosts[host] {
		return fmt.Errorf("host %s is not in the allowlist", host)
	}

	// 3. 解析 IP,禁止内网地址
	ips, err := net.LookupIP(host)
	if err != nil {
		return fmt.Errorf("DNS resolution failed: %w", err)
	}
	for _, ip := range ips {
		if ip.IsLoopback() || ip.IsPrivate() || ip.IsLinkLocalUnicast() {
			return fmt.Errorf("internal IP addresses are not allowed")
		}
	}

	return nil
}

API7: Security Misconfiguration — 安全配置错误

"""
安全基线配置检查清单(FastAPI 示例)
"""
from fastapi import FastAPI

app = FastAPI(
    # ✅ 生产环境禁用交互式文档
    docs_url=None if PRODUCTION else "/docs",
    redoc_url=None if PRODUCTION else "/redoc",
    openapi_url=None if PRODUCTION else "/openapi.json",
)

# ✅ 禁用详细错误信息
@app.exception_handler(Exception)
async def generic_exception_handler(request, exc):
    # 生产环境不暴露内部错误细节
    logger.error(f"Unhandled exception: {exc}", exc_info=True)
    return JSONResponse(
        status_code=500,
        content={"error": "Internal server error"},  # 不返回堆栈信息
    )

# ✅ 安全的 CORS 配置
app.add_middleware(
    CORSMiddleware,
    allow_origins=["https://app.example.com"],  # 不要用 ["*"]
    allow_methods=["GET", "POST", "PUT", "DELETE"],
    allow_headers=["Authorization", "Content-Type"],
    allow_credentials=True,
)

# ✅ 移除 Server 头(在反向代理层配置)
# Nginx: server_tokens off;
# 或在中间件中移除

API8: Lack of Protection from Automated Threats

/**
 * 防御:行为分析 + 验证码 + 指纹识别
 * 使用 Spring AOP 实现自动化威胁检测
 */
@Aspect
@Component
public class BotDetectionAspect {

    private final RateLimiterService rateLimiter;
    private final AuditLogService auditLog;

    @Around("@annotation(BotProtected)")
    public Object detectBot(ProceedingJoinPoint joinPoint) throws Throwable {
        HttpServletRequest request = getCurrentRequest();
        String clientIP = getClientIP(request);
        String userAgent = request.getHeader("User-Agent");

        // 1. 检查 User-Agent 是否为已知爬虫
        if (isKnownBot(userAgent)) {
            auditLog.logSuspicious(clientIP, "known_bot_ua", userAgent);
            throw new ResponseStatusException(HttpStatus.FORBIDDEN, "Access denied");
        }

        // 2. 检查请求频率异常(短时间内大量不同端点)
        if (rateLimiter.isAnomalous(clientIP)) {
            auditLog.logSuspicious(clientIP, "anomalous_pattern", "");
            throw new ResponseStatusException(HttpStatus.TOO_MANY_REQUESTS);
        }

        return joinPoint.proceed();
    }
}

API9: Improper Inventory Management — 资产管理不当

"""
API 版本管理与弃用策略
"""
from fastapi import FastAPI, Header, HTTPException
from datetime import date
import warnings

app_v1 = FastAPI(title="My API v1 (Deprecated)")
app_v2 = FastAPI(title="My API v2")

# 主应用挂载版本化子应用
app = FastAPI()
app.mount("/api/v1", app_v1)
app.mount("/api/v2", app_v2)

# v1 弃用中间件:添加 Deprecation 和 Sunset 头
@app_v1.middleware("http")
async def deprecation_warning(request, call_next):
    response = await call_next(request)
    response.headers["Deprecation"] = "true"
    response.headers["Sunset"] = "2025-12-31"  # 下线日期
    response.headers["Link"] = (
        '</api/v2>; rel="successor-version"'
    )
    return response

API10: Unsafe Consumption of APIs — 不安全的 API 消费

// 防御:调用第三方 API 时进行严格验证

import (
	"context"
	"encoding/json"
	"fmt"
	"io"
	"net/http"
	"time"
)

// SafeAPIClient 安全的第三方 API 客户端
type SafeAPIClient struct {
	client  *http.Client
	baseURL string
	apiKey  string
}

func NewSafeAPIClient(baseURL, apiKey string) *SafeAPIClient {
	return &SafeAPIClient{
		client: &http.Client{
			Timeout: 10 * time.Second, // ✅ 设置超时,防止慢速攻击
			// ✅ 禁止自动跟随重定向(防止 SSRF)
			CheckRedirect: func(req *http.Request, via []*http.Request) error {
				return http.ErrUseLastResponse
			},
		},
		baseURL: baseURL,
		apiKey:  apiKey,
	}
}

func (c *SafeAPIClient) GetData(ctx context.Context, path string) (map[string]any, error) {
	req, err := http.NewRequestWithContext(ctx, http.MethodGet, c.baseURL+path, nil)
	if err != nil {
		return nil, fmt.Errorf("create request: %w", err)
	}
	req.Header.Set("Authorization", "Bearer "+c.apiKey)

	resp, err := c.client.Do(req)
	if err != nil {
		return nil, fmt.Errorf("request failed: %w", err)
	}
	defer resp.Body.Close()

	// ✅ 限制响应体大小,防止内存耗尽
	body, err := io.ReadAll(io.LimitReader(resp.Body, 1<<20)) // 最大 1MB
	if err != nil {
		return nil, fmt.Errorf("read body: %w", err)
	}

	if resp.StatusCode != http.StatusOK {
		return nil, fmt.Errorf("unexpected status %d: %s", resp.StatusCode, body)
	}

	// ✅ 严格解析 JSON,拒绝未知字段
	var result map[string]any
	if err := json.Unmarshal(body, &result); err != nil {
		return nil, fmt.Errorf("invalid JSON response: %w", err)
	}

	return result, nil
}

26.5 输入验证

from pydantic import BaseModel, validator, Field
from typing import Annotated
import re

class CreateUserRequest(BaseModel):
    username: Annotated[str, Field(min_length=3, max_length=50, pattern=r'^[a-zA-Z0-9_]+$')]
    email: Annotated[str, Field(max_length=255)]
    age: Annotated[int, Field(ge=0, le=150)]

    @validator('email')
    def validate_email(cls, v):
        if not re.match(r'^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$', v):
            raise ValueError('Invalid email format')
        return v.lower()

    @validator('username')
    def no_sql_injection(cls, v):
        dangerous_patterns = ["'", '"', ';', '--', '/*', '*/', 'DROP', 'DELETE', 'UPDATE']
        for pattern in dangerous_patterns:
            if pattern.upper() in v.upper():
                raise ValueError('Invalid characters in username')
        return v

26.6 速率限制

from fastapi import FastAPI, Request
from slowapi import Limiter, _rate_limit_exceeded_handler
from slowapi.util import get_remote_address
from slowapi.errors import RateLimitExceeded

limiter = Limiter(key_func=get_remote_address)
app = FastAPI()
app.state.limiter = limiter
app.add_exception_handler(RateLimitExceeded, _rate_limit_exceeded_handler)

@app.get("/api/search")
@limiter.limit("30/minute")  # 每分钟 30 次
async def search(request: Request, q: str):
    return {"results": [...]}

@app.post("/api/login")
@limiter.limit("5/minute")  # 登录更严格
async def login(request: Request):
    return {"token": "..."}

26.7 三语言 API 安全完整模板

26.7.1 Python — FastAPI 完整安全 API 模板

"""
FastAPI 完整安全 API 模板
功能:认证 + 授权 + 输入验证 + Rate Limit + 审计日志 + API Key 管理
依赖: pip install fastapi uvicorn python-jose[cryptography] slowapi pydantic sqlalchemy
"""
from fastapi import FastAPI, Depends, HTTPException, Request, Security, Header
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials, APIKeyHeader
from fastapi.middleware.cors import CORSMiddleware
from jose import jwt, JWTError
from pydantic import BaseModel, Field, field_validator
from slowapi import Limiter, _rate_limit_exceeded_handler
from slowapi.util import get_remote_address
from slowapi.errors import RateLimitExceeded
from starlette.middleware.base import BaseHTTPMiddleware
from typing import Annotated
from datetime import datetime, timezone
from enum import Enum
import hashlib
import hmac
import logging
import secrets
import time
import re

# ── 应用初始化 ──

app = FastAPI(
    title="Secure API",
    version="2.0.0",
    docs_url="/docs",  # 生产环境设为 None
)

# 日志
logger = logging.getLogger("audit")
logging.basicConfig(level=logging.INFO)

# 速率限制
limiter = Limiter(key_func=get_remote_address, default_limits=["200/hour"])
app.state.limiter = limiter
app.add_exception_handler(RateLimitExceeded, _rate_limit_exceeded_handler)

# CORS
app.add_middleware(
    CORSMiddleware,
    allow_origins=["https://app.example.com"],
    allow_methods=["GET", "POST", "PUT", "DELETE"],
    allow_headers=["Authorization", "Content-Type", "X-API-Key"],
    allow_credentials=True,
    max_age=3600,
)


# ── 审计日志中间件 ──

class AuditLogMiddleware(BaseHTTPMiddleware):
    """记录所有 API 请求的审计日志"""

    async def dispatch(self, request: Request, call_next):
        start_time = time.time()
        response = await call_next(request)
        duration = time.time() - start_time

        # 安全响应头
        response.headers["X-Content-Type-Options"] = "nosniff"
        response.headers["X-Frame-Options"] = "DENY"
        response.headers["Strict-Transport-Security"] = (
            "max-age=31536000; includeSubDomains"
        )

        # 审计日志
        logger.info(
            "API_REQUEST",
            extra={
                "method": request.method,
                "path": request.url.path,
                "status": response.status_code,
                "duration_ms": round(duration * 1000, 2),
                "client_ip": request.client.host if request.client else "unknown",
                "user_agent": request.headers.get("user-agent", ""),
            },
        )
        return response

app.add_middleware(AuditLogMiddleware)


# ── 认证:JWT ──

JWT_SECRET = "your-256-bit-secret"  # 生产环境从环境变量读取
JWT_ALGORITHM = "HS256"
bearer_scheme = HTTPBearer()


class UserRole(str, Enum):
    USER = "user"
    ADMIN = "admin"
    SUPERADMIN = "superadmin"


class CurrentUser(BaseModel):
    id: str
    email: str | None = None
    roles: list[str] = []


async def get_current_user(
    credentials: HTTPAuthorizationCredentials = Security(bearer_scheme),
) -> CurrentUser:
    """从 JWT 中提取并验证用户信息"""
    try:
        payload = jwt.decode(
            credentials.credentials,
            JWT_SECRET,
            algorithms=[JWT_ALGORITHM],
            options={"require_exp": True, "require_sub": True},
        )
        return CurrentUser(
            id=payload["sub"],
            email=payload.get("email"),
            roles=payload.get("roles", []),
        )
    except JWTError as e:
        raise HTTPException(status_code=401, detail=f"Invalid token: {e}")


def require_role(*roles: str):
    """角色授权依赖"""
    async def checker(user: CurrentUser = Depends(get_current_user)):
        if not any(r in user.roles for r in roles):
            raise HTTPException(403, f"Requires role: {roles}")
        return user
    return checker


# ── 认证:API Key ──

API_KEY_HEADER = APIKeyHeader(name="X-API-Key", auto_error=False)

# 模拟 API Key 存储(生产环境用数据库 + 哈希存储)
API_KEYS_DB: dict[str, dict] = {}


def generate_api_key() -> tuple[str, str]:
    """生成 API Key 和对应的哈希"""
    key = secrets.token_urlsafe(32)
    key_hash = hashlib.sha256(key.encode()).hexdigest()
    return key, key_hash


async def verify_api_key(
    api_key: str | None = Security(API_KEY_HEADER),
) -> dict:
    """验证 API Key"""
    if not api_key:
        raise HTTPException(401, "Missing API Key")

    key_hash = hashlib.sha256(api_key.encode()).hexdigest()
    key_info = API_KEYS_DB.get(key_hash)
    if not key_info:
        raise HTTPException(401, "Invalid API Key")

    if key_info.get("revoked"):
        raise HTTPException(401, "API Key has been revoked")

    if key_info.get("expires_at") and key_info["expires_at"] < datetime.now(timezone.utc):
        raise HTTPException(401, "API Key has expired")

    return key_info


# ── 输入验证模型 ──

class CreateOrderRequest(BaseModel):
    product_id: Annotated[str, Field(
        min_length=1, max_length=50,
        pattern=r"^[a-zA-Z0-9_-]+$",
    )]
    quantity: Annotated[int, Field(ge=1, le=1000)]
    shipping_address: Annotated[str, Field(min_length=10, max_length=500)]

    @field_validator("shipping_address")
    @classmethod
    def sanitize_address(cls, v: str) -> str:
        # 移除潜在的 XSS 内容
        v = re.sub(r"<[^>]*>", "", v)
        return v.strip()


# ── API 端点 ──

@app.get("/api/v2/health")
async def health():
    return {"status": "ok", "timestamp": datetime.now(timezone.utc).isoformat()}


@app.get("/api/v2/orders")
@limiter.limit("60/minute")
async def list_orders(
    request: Request,
    user: CurrentUser = Depends(get_current_user),
    page: Annotated[int, Field(ge=1, le=1000)] = 1,
    size: Annotated[int, Field(ge=1, le=100)] = 20,
):
    """列出当前用户的订单(带分页)"""
    logger.info(f"User {user.id} listing orders, page={page}")
    return {
        "orders": [],
        "page": page,
        "size": size,
        "user_id": user.id,
    }


@app.post("/api/v2/orders", status_code=201)
@limiter.limit("10/minute")
async def create_order(
    request: Request,
    order: CreateOrderRequest,
    user: CurrentUser = Depends(get_current_user),
):
    """创建订单"""
    logger.info(f"User {user.id} creating order for product {order.product_id}")
    return {
        "order_id": secrets.token_hex(8),
        "product_id": order.product_id,
        "quantity": order.quantity,
        "created_by": user.id,
    }


@app.get("/api/v2/admin/users")
@limiter.limit("30/minute")
async def admin_list_users(
    request: Request,
    user: CurrentUser = Depends(require_role("admin", "superadmin")),
):
    """管理员:列出所有用户"""
    return {"users": [], "requested_by": user.id}


# ── API Key 管理端点 ──

@app.post("/api/v2/api-keys", status_code=201)
async def create_api_key(
    user: CurrentUser = Depends(require_role("admin")),
):
    """创建新的 API Key(仅管理员)"""
    key, key_hash = generate_api_key()
    API_KEYS_DB[key_hash] = {
        "owner": user.id,
        "created_at": datetime.now(timezone.utc).isoformat(),
        "revoked": False,
        "expires_at": None,
    }
    return {
        "api_key": key,  # ⚠️ 仅在创建时返回一次
        "message": "Store this key securely. It will not be shown again.",
    }


@app.get("/api/v2/external/data")
async def external_data(key_info: dict = Depends(verify_api_key)):
    """使用 API Key 认证的外部接口"""
    return {"data": "external data", "key_owner": key_info["owner"]}

26.7.2 Java — Spring Boot API 安全模板

package com.example.secureapi;

import com.fasterxml.jackson.annotation.JsonProperty;
import jakarta.validation.Valid;
import jakarta.validation.constraints.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.http.HttpStatus;
import org.springframework.http.ResponseEntity;
import org.springframework.security.access.prepost.PreAuthorize;
import org.springframework.security.config.annotation.method.configuration.EnableMethodSecurity;
import org.springframework.security.config.annotation.web.builders.HttpSecurity;
import org.springframework.security.config.annotation.web.configuration.EnableWebSecurity;
import org.springframework.security.config.http.SessionCreationPolicy;
import org.springframework.security.core.annotation.AuthenticationPrincipal;
import org.springframework.security.oauth2.jwt.Jwt;
import org.springframework.security.web.SecurityFilterChain;
import org.springframework.web.bind.annotation.*;
import org.springframework.web.filter.OncePerRequestFilter;

import jakarta.servlet.FilterChain;
import jakarta.servlet.ServletException;
import jakarta.servlet.http.HttpServletRequest;
import jakarta.servlet.http.HttpServletResponse;
import java.io.IOException;
import java.time.Instant;
import java.util.*;

@SpringBootApplication
public class SecureApiApplication {
    public static void main(String[] args) {
        SpringApplication.run(SecureApiApplication.class, args);
    }
}

// ── 安全配置 ──

@Configuration
@EnableWebSecurity
@EnableMethodSecurity
class ApiSecurityConfig {

    @Bean
    public SecurityFilterChain filterChain(HttpSecurity http) throws Exception {
        http
            .sessionManagement(s ->
                s.sessionCreationPolicy(SessionCreationPolicy.STATELESS))
            .csrf(csrf -> csrf.disable())
            .cors(cors -> cors.configurationSource(request -> {
                var config = new org.springframework.web.cors.CorsConfiguration();
                config.setAllowedOrigins(List.of("https://app.example.com"));
                config.setAllowedMethods(List.of("GET", "POST", "PUT", "DELETE"));
                config.setAllowedHeaders(List.of("Authorization", "Content-Type"));
                config.setAllowCredentials(true);
                config.setMaxAge(3600L);
                return config;
            }))
            .headers(headers -> headers
                .contentTypeOptions(cto -> {})
                .frameOptions(fo -> fo.deny())
                .httpStrictTransportSecurity(hsts -> hsts
                    .includeSubDomains(true)
                    .maxAgeInSeconds(31536000))
            )
            .authorizeHttpRequests(auth -> auth
                .requestMatchers("/api/v2/health").permitAll()
                .requestMatchers("/api/v2/admin/**").hasRole("ADMIN")
                .anyRequest().authenticated()
            )
            .oauth2ResourceServer(oauth2 -> oauth2
                .jwt(jwt -> {})
            )
            // 审计日志过滤器
            .addFilterBefore(new AuditLogFilter(), 
                org.springframework.security.web.authentication.UsernamePasswordAuthenticationFilter.class);

        return http.build();
    }
}

// ── 审计日志过滤器 ──

class AuditLogFilter extends OncePerRequestFilter {
    private static final Logger auditLog = LoggerFactory.getLogger("AUDIT");

    @Override
    protected void doFilterInternal(
            HttpServletRequest request,
            HttpServletResponse response,
            FilterChain filterChain) throws ServletException, IOException {

        long startTime = System.currentTimeMillis();
        filterChain.doFilter(request, response);
        long duration = System.currentTimeMillis() - startTime;

        auditLog.info("API_REQUEST method={} path={} status={} duration_ms={} client_ip={} user_agent={}",
            request.getMethod(),
            request.getRequestURI(),
            response.getStatus(),
            duration,
            request.getRemoteAddr(),
            request.getHeader("User-Agent")
        );
    }
}

// ── 输入验证 DTO ──

record CreateOrderRequest(
    @NotBlank
    @Size(min = 1, max = 50)
    @Pattern(regexp = "^[a-zA-Z0-9_-]+$", message = "Invalid product ID format")
    @JsonProperty("product_id")
    String productId,

    @NotNull
    @Min(1) @Max(1000)
    Integer quantity,

    @NotBlank
    @Size(min = 10, max = 500)
    @JsonProperty("shipping_address")
    String shippingAddress
) {}

record OrderResponse(
    @JsonProperty("order_id") String orderId,
    @JsonProperty("product_id") String productId,
    Integer quantity,
    @JsonProperty("created_by") String createdBy,
    @JsonProperty("created_at") Instant createdAt
) {}

// ── API 控制器 ──

@RestController
@RequestMapping("/api/v2")
class SecureApiController {

    private static final Logger log = LoggerFactory.getLogger(SecureApiController.class);

    @GetMapping("/health")
    public Map<String, Object> health() {
        return Map.of("status", "ok", "timestamp", Instant.now().toString());
    }

    /**
     * 列出当前用户的订单
     */
    @GetMapping("/orders")
    public Map<String, Object> listOrders(
            @AuthenticationPrincipal Jwt jwt,
            @RequestParam(defaultValue = "1") @Min(1) @Max(1000) int page,
            @RequestParam(defaultValue = "20") @Min(1) @Max(100) int size) {

        String userId = jwt.getSubject();
        log.info("User {} listing orders, page={}", userId, page);

        return Map.of(
            "orders", List.of(),
            "page", page,
            "size", size,
            "user_id", userId
        );
    }

    /**
     * 创建订单
     */
    @PostMapping("/orders")
    public ResponseEntity<OrderResponse> createOrder(
            @Valid @RequestBody CreateOrderRequest request,
            @AuthenticationPrincipal Jwt jwt) {

        String userId = jwt.getSubject();
        log.info("User {} creating order for product {}", userId, request.productId());

        OrderResponse response = new OrderResponse(
            UUID.randomUUID().toString(),
            request.productId(),
            request.quantity(),
            userId,
            Instant.now()
        );
        return ResponseEntity.status(HttpStatus.CREATED).body(response);
    }

    /**
     * 管理员:列出所有用户
     */
    @GetMapping("/admin/users")
    @PreAuthorize("hasRole('ADMIN')")
    public Map<String, Object> adminListUsers(@AuthenticationPrincipal Jwt jwt) {
        return Map.of("users", List.of(), "requested_by", jwt.getSubject());
    }
}

// ── API 版本管理与安全 ──

@Configuration
class ApiVersionConfig {

    /**
     * v1 弃用过滤器:为旧版 API 添加 Deprecation 头
     */
    @Bean
    public OncePerRequestFilter apiVersionFilter() {
        return new OncePerRequestFilter() {
            @Override
            protected void doFilterInternal(
                    HttpServletRequest request,
                    HttpServletResponse response,
                    FilterChain filterChain) throws ServletException, IOException {

                filterChain.doFilter(request, response);

                // 为 v1 API 添加弃用警告头
                if (request.getRequestURI().startsWith("/api/v1/")) {
                    response.setHeader("Deprecation", "true");
                    response.setHeader("Sunset", "2025-12-31");
                    response.setHeader("Link", "</api/v2>; rel=\"successor-version\"");
                }
            }
        };
    }
}

// ── API Key + HMAC 签名验证 Filter ──

/**
 * HmacSignatureFilter — 验证 API Key + HMAC 请求签名
 *
 * 请求头:
 *   X-API-Key:    客户端 API Key
 *   X-Timestamp:  RFC3339 时间戳(防重放,允许 ±5 分钟偏差)
 *   X-Signature:  HMAC-SHA256(apiKey + "\n" + timestamp + "\n" + method + "\n" + path + "\n" + bodyHash)
 *
 * 签名密钥从 API Key 关联的 secret 中获取。
 */
@Component
class HmacSignatureFilter extends OncePerRequestFilter {

    private static final Logger log = LoggerFactory.getLogger(HmacSignatureFilter.class);
    private static final long MAX_CLOCK_SKEW_SECONDS = 300; // 5 分钟

    // 生产环境应注入 ApiKeyRepository
    private final Map<String, ApiKeyRecord> apiKeyStore = new ConcurrentHashMap<>();

    record ApiKeyRecord(String owner, String hmacSecret, Instant createdAt, boolean revoked) {}

    @Override
    protected boolean shouldNotFilter(HttpServletRequest request) {
        // 仅对 /api/v2/external/** 路径启用 HMAC 验证
        return !request.getRequestURI().startsWith("/api/v2/external/");
    }

    @Override
    protected void doFilterInternal(
            HttpServletRequest request,
            HttpServletResponse response,
            FilterChain filterChain) throws ServletException, IOException {

        String apiKey    = request.getHeader("X-API-Key");
        String timestamp = request.getHeader("X-Timestamp");
        String signature = request.getHeader("X-Signature");

        if (apiKey == null || timestamp == null || signature == null) {
            sendError(response, HttpStatus.UNAUTHORIZED,
                "Missing X-API-Key, X-Timestamp, or X-Signature header");
            return;
        }

        // 1. 时间戳校验(防重放攻击)
        Instant ts;
        try {
            ts = Instant.parse(timestamp);
        } catch (Exception e) {
            sendError(response, HttpStatus.BAD_REQUEST,
                "Invalid timestamp format, expected ISO-8601 / RFC3339");
            return;
        }
        if (Math.abs(Duration.between(Instant.now(), ts).getSeconds()) > MAX_CLOCK_SKEW_SECONDS) {
            sendError(response, HttpStatus.UNAUTHORIZED,
                "Request timestamp too old or too far in the future");
            return;
        }

        // 2. 查找 API Key(按哈希查找)
        String keyHash = sha256Hex(apiKey);
        ApiKeyRecord record = apiKeyStore.get(keyHash);
        if (record == null || record.revoked()) {
            sendError(response, HttpStatus.UNAUTHORIZED, "Invalid or revoked API key");
            return;
        }

        // 3. 读取请求体并计算 body hash
        CachedBodyHttpServletRequest wrappedRequest = new CachedBodyHttpServletRequest(request);
        String bodyHash = sha256Hex(new String(wrappedRequest.getCachedBody(), StandardCharsets.UTF_8));

        // 4. 构造签名消息并计算预期签名
        String message = String.join("\n",
            apiKey, timestamp, request.getMethod(), request.getRequestURI(), bodyHash);
        String expectedSig = hmacSha256Hex(message, record.hmacSecret());

        // 5. 常量时间比较(防时序攻击)
        if (!MessageDigest.isEqual(
                hexDecode(signature), hexDecode(expectedSig))) {
            log.warn("HMAC signature mismatch for key owner={}", record.owner());
            sendError(response, HttpStatus.UNAUTHORIZED, "Invalid signature");
            return;
        }

        // 签名验证通过,设置属性供下游使用
        request.setAttribute("apiKeyOwner", record.owner());
        filterChain.doFilter(wrappedRequest, response);
    }

    // ── 工具方法 ──

    private static String sha256Hex(String data) {
        try {
            byte[] hash = MessageDigest.getInstance("SHA-256")
                .digest(data.getBytes(StandardCharsets.UTF_8));
            return HexFormat.of().formatHex(hash);
        } catch (NoSuchAlgorithmException e) {
            throw new RuntimeException(e);
        }
    }

    private static String hmacSha256Hex(String data, String secret) {
        try {
            Mac mac = Mac.getInstance("HmacSHA256");
            mac.init(new SecretKeySpec(secret.getBytes(StandardCharsets.UTF_8), "HmacSHA256"));
            byte[] sig = mac.doFinal(data.getBytes(StandardCharsets.UTF_8));
            return HexFormat.of().formatHex(sig);
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    private static byte[] hexDecode(String hex) {
        return HexFormat.of().parseHex(hex);
    }

    private void sendError(HttpServletResponse response, HttpStatus status, String message)
            throws IOException {
        response.setStatus(status.value());
        response.setContentType("application/json");
        response.getWriter().write("{\"error\":\"" + message + "\"}");
    }
}

/**
 * CachedBodyHttpServletRequest — 缓存请求体以便多次读取
 * (签名验证需要读取 body,后续 Controller 也需要读取)
 */
class CachedBodyHttpServletRequest extends HttpServletRequestWrapper {
    private final byte[] cachedBody;

    public CachedBodyHttpServletRequest(HttpServletRequest request) throws IOException {
        super(request);
        this.cachedBody = request.getInputStream().readAllBytes();
    }

    public byte[] getCachedBody() { return cachedBody; }

    @Override
    public ServletInputStream getInputStream() {
        ByteArrayInputStream bais = new ByteArrayInputStream(cachedBody);
        return new ServletInputStream() {
            @Override public int read() { return bais.read(); }
            @Override public boolean isFinished() { return bais.available() == 0; }
            @Override public boolean isReady() { return true; }
            @Override public void setReadListener(ReadListener listener) {}
        };
    }

    @Override
    public BufferedReader getReader() {
        return new BufferedReader(new InputStreamReader(getInputStream(), StandardCharsets.UTF_8));
    }
}

26.7.3 Go — Gin API 安全模板

/*
Gin 完整安全 API 模板
功能:JWT 认证 + RBAC 授权 + 输入验证 + Rate Limit + 审计日志 + API Key + HMAC 签名
依赖: go get github.com/gin-gonic/gin github.com/go-jose/go-jose/v3 golang.org/x/time/rate
*/
package main

import (
	"crypto/hmac"
	"crypto/sha256"
	"crypto/subtle"
	"encoding/hex"
	"fmt"
	"log"
	"net/http"
	"strings"
	"sync"
	"time"

	"github.com/gin-gonic/gin"
	"github.com/gin-gonic/gin/binding"
	"github.com/go-playground/validator/v10"
	"golang.org/x/time/rate"
)

// ── 配置 ──

type Config struct {
	JWTSecret     string
	HMACSecret    string
	AllowedOrigin string
}

var cfg = Config{
	JWTSecret:     "your-256-bit-secret", // 生产环境从环境变量读取
	HMACSecret:    "hmac-signing-secret",
	AllowedOrigin: "https://app.example.com",
}

// ── 审计日志中间件 ──

func AuditLogMiddleware() gin.HandlerFunc {
	return func(c *gin.Context) {
		start := time.Now()
		c.Next()
		duration := time.Since(start)

		userID := "anonymous"
		if id, exists := c.Get("userID"); exists {
			userID = id.(string)
		}

		log.Printf("AUDIT method=%s path=%s status=%d duration=%v user=%s ip=%s ua=%s",
			c.Request.Method,
			c.Request.URL.Path,
			c.Writer.Status(),
			duration,
			userID,
			c.ClientIP(),
			c.Request.UserAgent(),
		)
	}
}

// ── 安全响应头中间件 ──

func SecurityHeaders() gin.HandlerFunc {
	return func(c *gin.Context) {
		h := c.Writer.Header()
		h.Set("X-Content-Type-Options", "nosniff")
		h.Set("X-Frame-Options", "DENY")
		h.Set("Strict-Transport-Security", "max-age=31536000; includeSubDomains")
		h.Set("Content-Security-Policy", "default-src 'self'")
		h.Set("Referrer-Policy", "strict-origin-when-cross-origin")
		h.Set("Cache-Control", "no-store")
		c.Next()
	}
}

// ── CORS 中间件 ──

func CORSMiddleware(allowedOrigin string) gin.HandlerFunc {
	return func(c *gin.Context) {
		h := c.Writer.Header()
		h.Set("Access-Control-Allow-Origin", allowedOrigin)
		h.Set("Access-Control-Allow-Methods", "GET, POST, PUT, DELETE, OPTIONS")
		h.Set("Access-Control-Allow-Headers", "Authorization, Content-Type, X-API-Key, X-Signature")
		h.Set("Access-Control-Allow-Credentials", "true")
		h.Set("Access-Control-Max-Age", "3600")

		if c.Request.Method == http.MethodOptions {
			c.AbortWithStatus(http.StatusNoContent)
			return
		}
		c.Next()
	}
}

// ── 速率限制中间件 ──

type IPRateLimiter struct {
	limiters map[string]*rate.Limiter
	mu       sync.Mutex
	rate     rate.Limit
	burst    int
}

func NewIPRateLimiter(r rate.Limit, burst int) *IPRateLimiter {
	return &IPRateLimiter{
		limiters: make(map[string]*rate.Limiter),
		rate:     r,
		burst:    burst,
	}
}

func (rl *IPRateLimiter) GetLimiter(ip string) *rate.Limiter {
	rl.mu.Lock()
	defer rl.mu.Unlock()
	limiter, exists := rl.limiters[ip]
	if !exists {
		limiter = rate.NewLimiter(rl.rate, rl.burst)
		rl.limiters[ip] = limiter
	}
	return limiter
}

func RateLimitMiddleware(rl *IPRateLimiter) gin.HandlerFunc {
	return func(c *gin.Context) {
		limiter := rl.GetLimiter(c.ClientIP())
		if !limiter.Allow() {
			c.AbortWithStatusJSON(http.StatusTooManyRequests, gin.H{
				"error":       "rate limit exceeded",
				"retry_after": "1s",
			})
			return
		}
		c.Next()
	}
}

// ── JWT 认证中间件 ──

func JWTAuthMiddleware(secret string) gin.HandlerFunc {
	return func(c *gin.Context) {
		authHeader := c.GetHeader("Authorization")
		if authHeader == "" {
			c.AbortWithStatusJSON(http.StatusUnauthorized, gin.H{
				"error": "missing Authorization header",
			})
			return
		}

		parts := strings.SplitN(authHeader, " ", 2)
		if len(parts) != 2 || !strings.EqualFold(parts[0], "bearer") {
			c.AbortWithStatusJSON(http.StatusUnauthorized, gin.H{
				"error": "invalid Authorization format, expected: Bearer <token>",
			})
			return
		}

		// 简化示例:实际应使用 go-jose 验证 RS256 签名
		// 这里演示 HS256 对称签名验证
		claims, err := parseAndValidateJWT(parts[1], secret)
		if err != nil {
			c.AbortWithStatusJSON(http.StatusUnauthorized, gin.H{
				"error": "invalid token: " + err.Error(),
			})
			return
		}

		c.Set("userID", claims.Subject)
		c.Set("email", claims.Email)
		c.Set("roles", claims.Roles)
		c.Next()
	}
}

// SimpleClaims 简化的 JWT Claims
type SimpleClaims struct {
	Subject string   `json:"sub"`
	Email   string   `json:"email"`
	Roles   []string `json:"roles"`
	Exp     int64    `json:"exp"`
}

func parseAndValidateJWT(tokenStr, secret string) (*SimpleClaims, error) {
	// 生产环境应使用 go-jose 进行完整的 JWT 验证
	// 此处为简化示例
	_ = secret
	return &SimpleClaims{
		Subject: "user-123",
		Email:   "user@example.com",
		Roles:   []string{"user"},
	}, nil
}

// ── RBAC 授权中间件 ──

func RequireRole(roles ...string) gin.HandlerFunc {
	return func(c *gin.Context) {
		userRoles, exists := c.Get("roles")
		if !exists {
			c.AbortWithStatusJSON(http.StatusForbidden, gin.H{
				"error": "no roles in context",
			})
			return
		}

		roleList, ok := userRoles.([]string)
		if !ok {
			c.AbortWithStatusJSON(http.StatusForbidden, gin.H{
				"error": "invalid roles format",
			})
			return
		}

		for _, required := range roles {
			for _, has := range roleList {
				if has == required {
					c.Next()
					return
				}
			}
		}

		c.AbortWithStatusJSON(http.StatusForbidden, gin.H{
			"error": fmt.Sprintf("requires one of roles: %v", roles),
		})
	}
}

// ── API Key + HMAC 签名验证中间件 ──

// apiKeyStore 模拟 API Key 存储(生产环境用数据库)
var apiKeyStore = map[string]APIKeyInfo{
	// key hash -> info
}

type APIKeyInfo struct {
	Owner     string
	Secret    string // 用于 HMAC 签名的密钥
	CreatedAt time.Time
	Revoked   bool
}

// HMACSignatureMiddleware 验证 API Key + HMAC 请求签名
// 签名算法: HMAC-SHA256(api_key + timestamp + method + path + body_hash)
func HMACSignatureMiddleware() gin.HandlerFunc {
	return func(c *gin.Context) {
		apiKey := c.GetHeader("X-API-Key")
		signature := c.GetHeader("X-Signature")
		timestamp := c.GetHeader("X-Timestamp")

		if apiKey == "" || signature == "" || timestamp == "" {
			c.AbortWithStatusJSON(http.StatusUnauthorized, gin.H{
				"error": "missing X-API-Key, X-Signature, or X-Timestamp header",
			})
			return
		}

		// 1. 检查时间戳(防止重放攻击,允许 5 分钟偏差)
		ts, err := time.Parse(time.RFC3339, timestamp)
		if err != nil {
			c.AbortWithStatusJSON(http.StatusBadRequest, gin.H{
				"error": "invalid timestamp format, expected RFC3339",
			})
			return
		}
		if time.Since(ts).Abs() > 5*time.Minute {
			c.AbortWithStatusJSON(http.StatusUnauthorized, gin.H{
				"error": "request timestamp too old or too far in the future",
			})
			return
		}

		// 2. 查找 API Key
		keyHash := sha256Hash(apiKey)
		keyInfo, exists := apiKeyStore[keyHash]
		if !exists || keyInfo.Revoked {
			c.AbortWithStatusJSON(http.StatusUnauthorized, gin.H{
				"error": "invalid or revoked API key",
			})
			return
		}

		// 3. 计算预期签名
		message := fmt.Sprintf("%s\n%s\n%s\n%s",
			apiKey, timestamp, c.Request.Method, c.Request.URL.Path)
		expectedSig := computeHMAC(message, keyInfo.Secret)

		// 4. 常量时间比较(防止时序攻击)
		sigBytes, err := hex.DecodeString(signature)
		if err != nil {
			c.AbortWithStatusJSON(http.StatusBadRequest, gin.H{
				"error": "invalid signature encoding",
			})
			return
		}
		expectedBytes, _ := hex.DecodeString(expectedSig)
		if subtle.ConstantTimeCompare(sigBytes, expectedBytes) != 1 {
			c.AbortWithStatusJSON(http.StatusUnauthorized, gin.H{
				"error": "invalid signature",
			})
			return
		}

		c.Set("apiKeyOwner", keyInfo.Owner)
		c.Next()
	}
}

func sha256Hash(data string) string {
	h := sha256.Sum256([]byte(data))
	return hex.EncodeToString(h[:])
}

func computeHMAC(message, secret string) string {
	mac := hmac.New(sha256.New, []byte(secret))
	mac.Write([]byte(message))
	return hex.EncodeToString(mac.Sum(nil))
}

// ── 输入验证模型 ──

type CreateOrderRequest struct {
	ProductID       string `json:"product_id" binding:"required,min=1,max=50,alphanumdash"`
	Quantity        int    `json:"quantity" binding:"required,min=1,max=1000"`
	ShippingAddress string `json:"shipping_address" binding:"required,min=10,max=500"`
}

type OrderResponse struct {
	OrderID   string    `json:"order_id"`
	ProductID string    `json:"product_id"`
	Quantity  int       `json:"quantity"`
	CreatedBy string    `json:"created_by"`
	CreatedAt time.Time `json:"created_at"`
}

// ── 主函数 ──

func main() {
	// 注册自定义验证器
	if v, ok := binding.Validator.Engine().(*validator.Validate); ok {
		v.RegisterValidation("alphanumdash", func(fl validator.FieldLevel) bool {
			for _, c := range fl.Field().String() {
				if !((c >= 'a' && c <= 'z') || (c >= 'A' && c <= 'Z') ||
					(c >= '0' && c <= '9') || c == '_' || c == '-') {
					return false
				}
			}
			return true
		})
	}

	r := gin.New()
	r.Use(gin.Recovery())

	// 全局中间件
	rateLimiter := NewIPRateLimiter(10, 20) // 每秒 10 请求,突发 20
	r.Use(SecurityHeaders())
	r.Use(CORSMiddleware(cfg.AllowedOrigin))
	r.Use(RateLimitMiddleware(rateLimiter))
	r.Use(AuditLogMiddleware())

	// 公开端点
	r.GET("/api/v2/health", func(c *gin.Context) {
		c.JSON(http.StatusOK, gin.H{
			"status":    "ok",
			"timestamp": time.Now().UTC().Format(time.RFC3339),
		})
	})

	// JWT 认证路由组
	auth := r.Group("/api/v2")
	auth.Use(JWTAuthMiddleware(cfg.JWTSecret))
	{
		auth.GET("/orders", listOrders)
		auth.POST("/orders", createOrder)
	}

	// 管理员路由组
	admin := r.Group("/api/v2/admin")
	admin.Use(JWTAuthMiddleware(cfg.JWTSecret))
	admin.Use(RequireRole("admin", "superadmin"))
	{
		admin.GET("/users", adminListUsers)
	}

	// HMAC 签名认证路由组(用于服务间调用)
	external := r.Group("/api/v2/external")
	external.Use(HMACSignatureMiddleware())
	{
		external.GET("/data", func(c *gin.Context) {
			owner, _ := c.Get("apiKeyOwner")
			c.JSON(http.StatusOK, gin.H{
				"data":      "external data",
				"key_owner": owner,
			})
		})
	}

	log.Fatal(r.RunTLS(":8443", "server.crt", "server.key"))
}

func listOrders(c *gin.Context) {
	userID, _ := c.Get("userID")
	page := c.DefaultQuery("page", "1")
	size := c.DefaultQuery("size", "20")

	c.JSON(http.StatusOK, gin.H{
		"orders":  []any{},
		"page":    page,
		"size":    size,
		"user_id": userID,
	})
}

func createOrder(c *gin.Context) {
	var req CreateOrderRequest
	if err := c.ShouldBindJSON(&req); err != nil {
		c.JSON(http.StatusBadRequest, gin.H{"error": err.Error()})
		return
	}

	userID, _ := c.Get("userID")
	c.JSON(http.StatusCreated, OrderResponse{
		OrderID:   fmt.Sprintf("%x", time.Now().UnixNano()),
		ProductID: req.ProductID,
		Quantity:  req.Quantity,
		CreatedBy: userID.(string),
		CreatedAt: time.Now().UTC(),
	})
}

func adminListUsers(c *gin.Context) {
	userID, _ := c.Get("userID")
	c.JSON(http.StatusOK, gin.H{
		"users":        []any{},
		"requested_by": userID,
	})
}

26.7.4 Go — 请求签名验证(类似 AWS Signature V4)

/*
AWS Signature V4 风格的请求签名验证中间件

签名流程(客户端):
  1. 创建规范请求 (Canonical Request)
     CanonicalRequest = Method + "\n" + Path + "\n" + QueryString + "\n"
                      + CanonicalHeaders + "\n" + SignedHeaders + "\n" + SHA256(Body)
  2. 创建待签字符串 (String to Sign)
     StringToSign = Algorithm + "\n" + Timestamp + "\n" + CredentialScope + "\n" + SHA256(CanonicalRequest)
  3. 派生签名密钥 (Signing Key)
     DateKey    = HMAC-SHA256("MYAPI" + SecretKey, Date)
     RegionKey  = HMAC-SHA256(DateKey, Region)
     ServiceKey = HMAC-SHA256(RegionKey, Service)
     SigningKey = HMAC-SHA256(ServiceKey, "myapi_request")
  4. 计算签名
     Signature  = Hex(HMAC-SHA256(SigningKey, StringToSign))

Authorization 头格式:
  MYAPI4-HMAC-SHA256 Credential=<AccessKey>/<Date>/<Region>/<Service>/myapi_request,
  SignedHeaders=<headers>, Signature=<signature>
*/
package sigv4

import (
	"bytes"
	"crypto/hmac"
	"crypto/sha256"
	"encoding/hex"
	"fmt"
	"io"
	"net/http"
	"sort"
	"strings"
	"time"

	"github.com/gin-gonic/gin"
)

const (
	algorithm       = "MYAPI4-HMAC-SHA256"
	maxClockSkew    = 5 * time.Minute
	signingTrailer  = "myapi_request"
)

// Credential 存储访问凭证(生产环境从数据库/Vault 加载)
type Credential struct {
	AccessKey string
	SecretKey string
	Owner     string
	Active    bool
}

// CredentialStore 凭证存储接口
type CredentialStore interface {
	Lookup(accessKey string) (*Credential, error)
}

// SigV4Middleware 验证类似 AWS Signature V4 的请求签名
func SigV4Middleware(store CredentialStore) gin.HandlerFunc {
	return func(c *gin.Context) {
		authHeader := c.GetHeader("Authorization")
		if authHeader == "" {
			c.AbortWithStatusJSON(http.StatusUnauthorized, gin.H{
				"error": "missing Authorization header",
			})
			return
		}

		// ── 1. 解析 Authorization 头 ──
		params, err := parseAuthHeader(authHeader)
		if err != nil {
			c.AbortWithStatusJSON(http.StatusBadRequest, gin.H{
				"error": "malformed Authorization header: " + err.Error(),
			})
			return
		}

		// ── 2. 解析 Credential scope ──
		// 格式: AccessKey/Date/Region/Service/myapi_request
		scopeParts := strings.Split(params.credential, "/")
		if len(scopeParts) != 5 || scopeParts[4] != signingTrailer {
			c.AbortWithStatusJSON(http.StatusBadRequest, gin.H{
				"error": "invalid credential scope",
			})
			return
		}
		accessKey := scopeParts[0]
		dateStr := scopeParts[1]   // 20260321
		region := scopeParts[2]    // e.g. "cn-east-1"
		service := scopeParts[3]   // e.g. "order-api"

		// ── 3. 查找凭证 ──
		cred, err := store.Lookup(accessKey)
		if err != nil || cred == nil || !cred.Active {
			c.AbortWithStatusJSON(http.StatusUnauthorized, gin.H{
				"error": "invalid or inactive access key",
			})
			return
		}

		// ── 4. 时间戳校验 ──
		reqTimestamp := c.GetHeader("X-Myapi-Date")
		reqTime, err := time.Parse("20060102T150405Z", reqTimestamp)
		if err != nil {
			c.AbortWithStatusJSON(http.StatusBadRequest, gin.H{
				"error": "invalid X-Myapi-Date, expected format: 20060102T150405Z",
			})
			return
		}
		if time.Since(reqTime).Abs() > maxClockSkew {
			c.AbortWithStatusJSON(http.StatusUnauthorized, gin.H{
				"error": "request timestamp outside allowed skew",
			})
			return
		}

		// ── 5. 读取并缓存请求体 ──
		bodyBytes, err := io.ReadAll(io.LimitReader(c.Request.Body, 1<<20)) // 最大 1MB
		if err != nil {
			c.AbortWithStatusJSON(http.StatusBadRequest, gin.H{
				"error": "failed to read request body",
			})
			return
		}
		c.Request.Body = io.NopCloser(bytes.NewReader(bodyBytes))

		// ── 6. 构造规范请求 ──
		canonicalHeaders, signedHeaders := buildCanonicalHeaders(c.Request, params.signedHeaders)
		bodyHash := sha256Hex(bodyBytes)

		canonicalRequest := strings.Join([]string{
			c.Request.Method,
			c.Request.URL.Path,
			c.Request.URL.RawQuery,
			canonicalHeaders,
			signedHeaders,
			bodyHash,
		}, "\n")

		// ── 7. 构造待签字符串 ──
		credentialScope := fmt.Sprintf("%s/%s/%s/%s", dateStr, region, service, signingTrailer)
		stringToSign := strings.Join([]string{
			algorithm,
			reqTimestamp,
			credentialScope,
			sha256Hex([]byte(canonicalRequest)),
		}, "\n")

		// ── 8. 派生签名密钥 ──
		signingKey := deriveSigningKey(cred.SecretKey, dateStr, region, service)

		// ── 9. 计算并比较签名 ──
		expectedSig := hmacSHA256Hex(signingKey, []byte(stringToSign))
		if !hmac.Equal([]byte(params.signature), []byte(expectedSig)) {
			c.AbortWithStatusJSON(http.StatusUnauthorized, gin.H{
				"error": "signature mismatch",
			})
			return
		}

		// 签名验证通过
		c.Set("sigv4Owner", cred.Owner)
		c.Set("sigv4AccessKey", accessKey)
		c.Next()
	}
}

// ── 内部类型与辅助函数 ──

type authParams struct {
	credential    string
	signedHeaders []string
	signature     string
}

func parseAuthHeader(header string) (*authParams, error) {
	if !strings.HasPrefix(header, algorithm+" ") {
		return nil, fmt.Errorf("unsupported algorithm, expected %s", algorithm)
	}
	parts := strings.TrimPrefix(header, algorithm+" ")
	params := &authParams{}
	for _, segment := range strings.Split(parts, ",") {
		kv := strings.SplitN(strings.TrimSpace(segment), "=", 2)
		if len(kv) != 2 {
			continue
		}
		switch kv[0] {
		case "Credential":
			params.credential = kv[1]
		case "SignedHeaders":
			params.signedHeaders = strings.Split(kv[1], ";")
		case "Signature":
			params.signature = kv[1]
		}
	}
	if params.credential == "" || params.signature == "" || len(params.signedHeaders) == 0 {
		return nil, fmt.Errorf("missing required fields")
	}
	return params, nil
}

func buildCanonicalHeaders(req *http.Request, signed []string) (string, string) {
	sort.Strings(signed)
	var canonical strings.Builder
	for _, h := range signed {
		val := strings.TrimSpace(req.Header.Get(h))
		canonical.WriteString(strings.ToLower(h) + ":" + val + "\n")
	}
	return canonical.String(), strings.Join(signed, ";")
}

func deriveSigningKey(secret, dateStr, region, service string) []byte {
	dateKey := hmacSHA256([]byte("MYAPI"+secret), []byte(dateStr))
	regionKey := hmacSHA256(dateKey, []byte(region))
	serviceKey := hmacSHA256(regionKey, []byte(service))
	return hmacSHA256(serviceKey, []byte(signingTrailer))
}

func hmacSHA256(key, data []byte) []byte {
	h := hmac.New(sha256.New, key)
	h.Write(data)
	return h.Sum(nil)
}

func hmacSHA256Hex(key, data []byte) string {
	return hex.EncodeToString(hmacSHA256(key, data))
}

func sha256Hex(data []byte) string {
	h := sha256.Sum256(data)
	return hex.EncodeToString(h[:])
}

26.8 GraphQL 安全

# GraphQL 特有的安全问题和防御

# 1. 查询深度限制
# 防止:{ user { friends { friends { friends { ... } } } } }
MAX_DEPTH = 5

# 2. 查询复杂度限制
# 防止:{ users(first: 1000000) { posts { comments { ... } } } }
MAX_COMPLEXITY = 1000

# 3. 禁用内省(生产环境)
# 防止攻击者发现所有 API 端点
INTROSPECTION_ENABLED = False  # 生产环境禁用

26.9 gRPC 安全

import grpc

# mTLS gRPC 服务端
server_credentials = grpc.ssl_server_credentials(
    [(server_key, server_cert)],
    root_certificates=ca_cert,
    require_client_auth=True,  # 要求客户端证书
)
server = grpc.server(futures.ThreadPoolExecutor())
server.add_secure_port('[::]:50051', server_credentials)

# Token 拦截器
class AuthInterceptor(grpc.ServerInterceptor):
    def intercept_service(self, continuation, handler_call_details):
        metadata = dict(handler_call_details.invocation_metadata)
        token = metadata.get('authorization', '')
        if not verify_token(token):
            return grpc.unary_unary_rpc_method_handler(
                lambda req, ctx: ctx.abort(grpc.StatusCode.UNAUTHENTICATED, 'Invalid token')
            )
        return continuation(handler_call_details)

26.10 API 安全测试清单

认证测试

#

测试项

方法

预期结果

1

无 Token 访问受保护端点

curl /api/orders (无 Authorization)

401 Unauthorized

2

过期 Token

使用 exp 已过期的 JWT

401 Unauthorized

3

篡改 Token payload

修改 JWT payload 但不重签

401 Unauthorized

4

使用错误算法

将 RS256 Token 改为 none

401 Unauthorized

5

暴力破解

连续发送 10 次错误密码

429 或账户锁定

授权测试

#

测试项

方法

预期结果

6

BOLA:访问他人资源

用户 A 的 Token 访问用户 B 的订单

403 Forbidden

7

越权:普通用户访问管理端点

用 user 角色 Token 访问 /api/admin/*

403 Forbidden

8

水平越权:修改他人数据

PUT /api/users/other-user-id

403 Forbidden

9

属性级越权

普通用户尝试修改 role 字段

字段被忽略或 400

输入验证测试

#

测试项

方法

预期结果

10

SQL 注入

username: "admin' OR '1'='1"

400 Bad Request

11

XSS

name: "<script>alert(1)</script>"

输入被拒绝或转义

12

超长输入

发送 1MB 的 username

400 或 413

13

类型混淆

age: "not_a_number"

422 Validation Error

14

路径遍历

file: "../../etc/passwd"

400 Bad Request

速率限制测试

#

测试项

方法

预期结果

15

超过速率限制

1 秒内发送 100 个请求

429 Too Many Requests

16

登录限流

连续 6 次登录请求

第 6 次返回 429

17

分布式限流

从多个 IP 发送请求

按用户 ID 聚合限流

传输安全测试

#

测试项

方法

预期结果

18

HTTP 访问

curl http://api.example.com/...

301 重定向到 HTTPS

19

HSTS 头

检查响应头

包含 Strict-Transport-Security

20

TLS 版本

nmap --script ssl-enum-ciphers

仅 TLS 1.2+

自动化测试脚本示例

#!/bin/bash
# api_security_test.sh — API 安全快速检查脚本

BASE_URL="https://api.example.com"
TOKEN="eyJhbGciOiJSUzI1NiIs..."  # 有效的测试 Token

echo "=== API Security Test Suite ==="

# 1. 无认证访问
echo -n "[TEST 1] No auth: "
STATUS=$(curl -s -o /dev/null -w "%{http_code}" "$BASE_URL/api/v2/orders")
[ "$STATUS" = "401" ] && echo "PASS (401)" || echo "FAIL ($STATUS)"

# 2. 安全头检查
echo -n "[TEST 2] Security headers: "
HEADERS=$(curl -s -I "$BASE_URL/api/v2/health")
echo "$HEADERS" | grep -q "X-Content-Type-Options: nosniff" && \
echo "$HEADERS" | grep -q "X-Frame-Options: DENY" && \
echo "$HEADERS" | grep -q "Strict-Transport-Security" && \
echo "PASS" || echo "FAIL"

# 3. CORS 检查
echo -n "[TEST 3] CORS origin: "
ORIGIN=$(curl -s -I -H "Origin: https://evil.com" "$BASE_URL/api/v2/health" | \
    grep -i "Access-Control-Allow-Origin")
echo "$ORIGIN" | grep -q "evil.com" && echo "FAIL (allows evil origin)" || echo "PASS"

# 4. 速率限制
echo -n "[TEST 4] Rate limiting: "
for i in $(seq 1 50); do
    STATUS=$(curl -s -o /dev/null -w "%{http_code}" \
        -H "Authorization: Bearer $TOKEN" "$BASE_URL/api/v2/orders")
    if [ "$STATUS" = "429" ]; then
        echo "PASS (429 at request $i)"
        break
    fi
done

# 5. SQL 注入
echo -n "[TEST 5] SQL injection: "
STATUS=$(curl -s -o /dev/null -w "%{http_code}" \
    -H "Authorization: Bearer $TOKEN" \
    "$BASE_URL/api/v2/users/search?q=admin'%20OR%20'1'='1")
[ "$STATUS" = "400" ] || [ "$STATUS" = "422" ] && echo "PASS ($STATUS)" || echo "FAIL ($STATUS)"

echo "=== Tests Complete ==="

26.11 小结

  • BOLA 是最常见的 API 安全问题,必须在每个请求中检查资源所有权

  • 输入验证 使用 Pydantic / Bean Validation / go-playground/validator 进行严格的类型和格式检查

  • 速率限制 防止暴力破解和资源耗尽,Token Bucket 适合大多数场景

  • GraphQL 需要额外的深度限制、复杂度限制和内省控制

  • gRPC 通过 mTLS 和拦截器实现安全通信

  • OWASP API Top 10 是 API 安全的最低防御基线,每一条都需要对应的技术措施

  • API 安全测试 应纳入 CI/CD 流水线,自动化执行