# 第二十六章:API 安全设计 > "API 是现代应用的前门,也是攻击者最喜欢的入口。" ```{mermaid} mindmap root((API 安全)) OWASP API Top 10 BOLA 认证失败 过度数据暴露 资源消耗 设计原则 输入验证 输出编码 速率限制 幂等性 协议安全 REST GraphQL gRPC 测试 DAST Fuzzing 渗透测试 ``` ## 26.1 API 安全层次模型 API 安全不是单一措施,而是多层防御的组合。每一层都承担特定的安全职责: ```{mermaid} flowchart TB A["🌐 传输层
TLS 1.3 · 证书固定 · mTLS"] --> B["🔑 认证层
OAuth2 · JWT · API Key · mTLS"] B --> C["🛡️ 授权层
RBAC · ABAC · BOLA 检查"] C --> D["✅ 输入验证层
Schema 验证 · 类型检查 · 注入防护"] D --> E["⚙️ 业务逻辑层
幂等性 · 事务完整性"] E --> F["📤 输出编码层
响应过滤 · DTO · 敏感数据脱敏"] style A fill:#4a90d9,color:#fff style B fill:#e67e22,color:#fff style C fill:#e74c3c,color:#fff style D fill:#f1c40f,color:#333 style E fill:#2ecc71,color:#fff style F fill:#9b59b6,color:#fff ``` ## 26.2 API Gateway 安全架构 ```{mermaid} flowchart LR Client[客户端] --> GW[API Gateway] subgraph Gateway["API Gateway 安全层"] direction TB TLS[TLS 终止] RL[速率限制
Token Bucket] AUTH[认证
JWT 验证] AUTHZ[授权
Scope/Role 检查] VAL[请求验证
Schema Validation] LOG[审计日志] TLS --> RL --> AUTH --> AUTHZ --> VAL --> LOG end GW --> Gateway LOG --> SvcA[用户服务] LOG --> SvcB[订单服务] LOG --> SvcC[支付服务] style Gateway fill:#f0f0f0,stroke:#333 style RL fill:#ff9,stroke:#333 style AUTH fill:#f96,stroke:#333 style AUTHZ fill:#f96,stroke:#333 ``` ## 26.3 Rate Limiting 算法对比 ```{mermaid} flowchart LR subgraph TB["Token Bucket 算法"] direction TB TB1["令牌以固定速率填充"] --> TB2["每个请求消耗一个令牌"] TB2 --> TB3{"桶中有令牌?"} TB3 -->|是| TB4["✅ 允许请求"] TB3 -->|否| TB5["❌ 拒绝 (429)"] end subgraph SW["Sliding Window 算法"] direction TB SW1["维护时间窗口内的请求计数"] --> SW2["新请求到达"] SW2 --> SW3{"窗口内计数 < 限制?"} SW3 -->|是| SW4["✅ 允许并计数+1"] SW3 -->|否| SW5["❌ 拒绝 (429)"] end style TB4 fill:#2ecc71,color:#fff style TB5 fill:#e74c3c,color:#fff style SW4 fill:#2ecc71,color:#fff style SW5 fill:#e74c3c,color:#fff ``` | 特性 | Token Bucket | Sliding Window Log | Sliding Window Counter | Fixed Window | |------|-------------|-------------------|----------------------|-------------| | 突发流量处理 | ✅ 允许突发(桶容量) | ❌ 严格限制 | ⚠️ 近似 | ⚠️ 边界突发 | | 内存开销 | 低(2 个变量) | 高(存储每个请求时间戳) | 低(2 个计数器) | 极低(1 个计数器) | | 精确度 | 高 | 极高 | 高 | 低(边界问题) | | 分布式实现 | ⚠️ 需 Redis Lua | ⚠️ 需 Redis Sorted Set | ✅ Redis INCR | ✅ Redis INCR | | 适用场景 | API 通用限流 | 金融/支付 API | 大规模 API | 简单场景 | --- ## 26.4 OWASP API Security Top 10(2023) | 排名 | 风险 | 描述 | 防御 | |------|------|------|------| | API1 | BOLA | 对象级授权失效 | 每个请求检查资源所有权 | | API2 | 认证失效 | 认证机制缺陷 | OAuth2 + MFA | | API3 | 对象属性级授权 | 返回过多属性 | 响应过滤、DTO | | API4 | 不受限资源消耗 | 无速率限制 | Rate Limiting | | API5 | 功能级授权失效 | 缺少功能权限检查 | RBAC/ABAC | | API6 | 服务端请求伪造 | SSRF | URL 白名单 | | API7 | 安全配置错误 | 默认配置不安全 | 安全基线 | | API8 | 缺乏自动化威胁防护 | 无 Bot 防护 | WAF + 行为分析 | | API9 | 资产管理不当 | 影子 API、过时版本 | API 清单管理 | | API10 | 不安全的 API 消费 | 信任第三方 API | 输入验证 | ### 26.4.1 OWASP API Top 10 逐条解析与防御代码 #### API1: BOLA(Broken Object Level Authorization) 最常见的 API 安全问题 — 攻击者修改请求中的对象 ID 即可访问他人数据。 ```python # ❌ 危险:没有检查资源所有权 @app.get("/api/orders/{order_id}") async def get_order(order_id: str, user: User = Depends(get_current_user)): order = db.get_order(order_id) return order # 任何用户都能查看任何订单! # ✅ 安全:检查资源所有权 @app.get("/api/orders/{order_id}") async def get_order(order_id: str, user: User = Depends(get_current_user)): order = db.get_order(order_id) if order.user_id != user.id: raise HTTPException(403, "Access denied") return order # ✅ 更好:使用 OpenFGA 检查权限 @app.get("/api/orders/{order_id}") async def get_order(order_id: str, user: User = Depends(get_current_user)): await authz.require(user.id, "can_view", f"order:{order_id}") order = db.get_order(order_id) return order ``` #### API2: Broken Authentication — 认证失效 ```java /** * 防御:强制使用安全的认证流程 * - 短生命周期 Access Token + Refresh Token 轮换 * - 登录失败锁定 */ @Configuration public class AuthSecurityConfig { @Bean public SecurityFilterChain authFilterChain(HttpSecurity http) throws Exception { http .oauth2ResourceServer(oauth2 -> oauth2 .jwt(jwt -> jwt .decoder(jwtDecoder()) // 验证 token 未被撤销 .jwtAuthenticationConverter(jwtConverter()) ) ) .sessionManagement(s -> s.sessionCreationPolicy(SessionCreationPolicy.STATELESS) ); return http.build(); } /** * Token 黑名单检查(用于登出/撤销场景) */ @Bean public JwtDecoder jwtDecoder() { NimbusJwtDecoder decoder = NimbusJwtDecoder .withJwkSetUri("https://auth.example.com/.well-known/jwks.json") .build(); // 添加自定义验证器:检查 token 是否在黑名单中 decoder.setJwtValidator(new DelegatingOAuth2TokenValidator<>( JwtValidators.createDefaultWithIssuer("https://auth.example.com"), new TokenBlacklistValidator(tokenBlacklistService) )); return decoder; } } ``` #### API3: Broken Object Property Level Authorization — 属性级授权 ```go // 防御:使用 DTO 模式,不同角色返回不同字段 // UserPublicDTO 公开信息(所有人可见) type UserPublicDTO struct { ID string `json:"id"` Username string `json:"username"` Avatar string `json:"avatar"` } // UserPrivateDTO 私有信息(仅本人可见) type UserPrivateDTO struct { UserPublicDTO Email string `json:"email"` Phone string `json:"phone"` Address string `json:"address"` } // UserAdminDTO 管理信息(仅管理员可见) type UserAdminDTO struct { UserPrivateDTO CreatedAt time.Time `json:"created_at"` LastLoginIP string `json:"last_login_ip"` IsBlocked bool `json:"is_blocked"` } func getUser(c *gin.Context) { targetID := c.Param("id") user, err := userRepo.FindByID(targetID) if err != nil { c.JSON(http.StatusNotFound, gin.H{"error": "user not found"}) return } roles, _ := c.Get("roles") currentUserID, _ := c.Get("userID") switch { case containsRole(roles.([]string), "admin"): // 管理员看到所有字段 c.JSON(http.StatusOK, toAdminDTO(user)) case currentUserID == targetID: // 本人看到私有字段 c.JSON(http.StatusOK, toPrivateDTO(user)) default: // 其他人只看到公开字段 c.JSON(http.StatusOK, toPublicDTO(user)) } } ``` #### API4: Unrestricted Resource Consumption — 不受限资源消耗 ```python """ 多维度速率限制:按 IP、按用户、按端点分别限制 """ from fastapi import FastAPI, Request from slowapi import Limiter from slowapi.util import get_remote_address limiter = Limiter( key_func=get_remote_address, storage_uri="redis://localhost:6379", ) app = FastAPI() app.state.limiter = limiter # 全局限制 @app.get("/api/data") @limiter.limit("100/hour") async def get_data(request: Request): return {"data": "..."} # 敏感操作更严格 @app.post("/api/login") @limiter.limit("5/minute;20/hour") # 多级限制 async def login(request: Request): return {"token": "..."} # 昂贵操作 @app.post("/api/reports/generate") @limiter.limit("3/hour") async def generate_report(request: Request): return {"report_id": "..."} # 分页限制:防止一次请求过多数据 from pydantic import Field from typing import Annotated class PaginationParams: def __init__( self, page: Annotated[int, Field(ge=1, le=1000)] = 1, size: Annotated[int, Field(ge=1, le=100)] = 20, # 最大 100 条 ): self.page = page self.size = size self.offset = (page - 1) * size ``` #### API5: Broken Function Level Authorization — 功能级授权失效 ```java /** * 防御:使用 Spring Security 方法级安全 + 自定义注解 */ @Target(ElementType.METHOD) @Retention(RetentionPolicy.RUNTIME) @PreAuthorize("hasRole('ADMIN')") public @interface AdminOnly {} @Target(ElementType.METHOD) @Retention(RetentionPolicy.RUNTIME) @PreAuthorize("hasAnyRole('ADMIN', 'MANAGER')") public @interface ManagerOrAbove {} @RestController @RequestMapping("/api") public class UserController { @GetMapping("/users") @ManagerOrAbove // 只有 MANAGER 和 ADMIN 可以列出所有用户 public List listUsers() { /* ... */ } @DeleteMapping("/users/{id}") @AdminOnly // 只有 ADMIN 可以删除用户 public void deleteUser(@PathVariable String id) { /* ... */ } @GetMapping("/users/me") // 任何已认证用户都可以查看自己的信息 public UserDTO getMyProfile(@AuthenticationPrincipal Jwt jwt) { return userService.findById(jwt.getSubject()); } } ``` #### API6: Server Side Request Forgery (SSRF) ```go // 防御:URL 白名单 + 禁止内网访问 import ( "fmt" "net" "net/url" "strings" ) // allowedHosts 允许访问的外部主机白名单 var allowedHosts = map[string]bool{ "api.trusted-partner.com": true, "cdn.example.com": true, } // ValidateURL 验证 URL 是否安全(防止 SSRF) func ValidateURL(rawURL string) error { parsed, err := url.Parse(rawURL) if err != nil { return fmt.Errorf("invalid URL: %w", err) } // 1. 只允许 HTTPS if parsed.Scheme != "https" { return fmt.Errorf("only HTTPS is allowed") } // 2. 检查白名单 host := parsed.Hostname() if !allowedHosts[host] { return fmt.Errorf("host %s is not in the allowlist", host) } // 3. 解析 IP,禁止内网地址 ips, err := net.LookupIP(host) if err != nil { return fmt.Errorf("DNS resolution failed: %w", err) } for _, ip := range ips { if ip.IsLoopback() || ip.IsPrivate() || ip.IsLinkLocalUnicast() { return fmt.Errorf("internal IP addresses are not allowed") } } return nil } ``` #### API7: Security Misconfiguration — 安全配置错误 ```python """ 安全基线配置检查清单(FastAPI 示例) """ from fastapi import FastAPI app = FastAPI( # ✅ 生产环境禁用交互式文档 docs_url=None if PRODUCTION else "/docs", redoc_url=None if PRODUCTION else "/redoc", openapi_url=None if PRODUCTION else "/openapi.json", ) # ✅ 禁用详细错误信息 @app.exception_handler(Exception) async def generic_exception_handler(request, exc): # 生产环境不暴露内部错误细节 logger.error(f"Unhandled exception: {exc}", exc_info=True) return JSONResponse( status_code=500, content={"error": "Internal server error"}, # 不返回堆栈信息 ) # ✅ 安全的 CORS 配置 app.add_middleware( CORSMiddleware, allow_origins=["https://app.example.com"], # 不要用 ["*"] allow_methods=["GET", "POST", "PUT", "DELETE"], allow_headers=["Authorization", "Content-Type"], allow_credentials=True, ) # ✅ 移除 Server 头(在反向代理层配置) # Nginx: server_tokens off; # 或在中间件中移除 ``` #### API8: Lack of Protection from Automated Threats ```java /** * 防御:行为分析 + 验证码 + 指纹识别 * 使用 Spring AOP 实现自动化威胁检测 */ @Aspect @Component public class BotDetectionAspect { private final RateLimiterService rateLimiter; private final AuditLogService auditLog; @Around("@annotation(BotProtected)") public Object detectBot(ProceedingJoinPoint joinPoint) throws Throwable { HttpServletRequest request = getCurrentRequest(); String clientIP = getClientIP(request); String userAgent = request.getHeader("User-Agent"); // 1. 检查 User-Agent 是否为已知爬虫 if (isKnownBot(userAgent)) { auditLog.logSuspicious(clientIP, "known_bot_ua", userAgent); throw new ResponseStatusException(HttpStatus.FORBIDDEN, "Access denied"); } // 2. 检查请求频率异常(短时间内大量不同端点) if (rateLimiter.isAnomalous(clientIP)) { auditLog.logSuspicious(clientIP, "anomalous_pattern", ""); throw new ResponseStatusException(HttpStatus.TOO_MANY_REQUESTS); } return joinPoint.proceed(); } } ``` #### API9: Improper Inventory Management — 资产管理不当 ```python """ API 版本管理与弃用策略 """ from fastapi import FastAPI, Header, HTTPException from datetime import date import warnings app_v1 = FastAPI(title="My API v1 (Deprecated)") app_v2 = FastAPI(title="My API v2") # 主应用挂载版本化子应用 app = FastAPI() app.mount("/api/v1", app_v1) app.mount("/api/v2", app_v2) # v1 弃用中间件:添加 Deprecation 和 Sunset 头 @app_v1.middleware("http") async def deprecation_warning(request, call_next): response = await call_next(request) response.headers["Deprecation"] = "true" response.headers["Sunset"] = "2025-12-31" # 下线日期 response.headers["Link"] = ( '; rel="successor-version"' ) return response ``` #### API10: Unsafe Consumption of APIs — 不安全的 API 消费 ```go // 防御:调用第三方 API 时进行严格验证 import ( "context" "encoding/json" "fmt" "io" "net/http" "time" ) // SafeAPIClient 安全的第三方 API 客户端 type SafeAPIClient struct { client *http.Client baseURL string apiKey string } func NewSafeAPIClient(baseURL, apiKey string) *SafeAPIClient { return &SafeAPIClient{ client: &http.Client{ Timeout: 10 * time.Second, // ✅ 设置超时,防止慢速攻击 // ✅ 禁止自动跟随重定向(防止 SSRF) CheckRedirect: func(req *http.Request, via []*http.Request) error { return http.ErrUseLastResponse }, }, baseURL: baseURL, apiKey: apiKey, } } func (c *SafeAPIClient) GetData(ctx context.Context, path string) (map[string]any, error) { req, err := http.NewRequestWithContext(ctx, http.MethodGet, c.baseURL+path, nil) if err != nil { return nil, fmt.Errorf("create request: %w", err) } req.Header.Set("Authorization", "Bearer "+c.apiKey) resp, err := c.client.Do(req) if err != nil { return nil, fmt.Errorf("request failed: %w", err) } defer resp.Body.Close() // ✅ 限制响应体大小,防止内存耗尽 body, err := io.ReadAll(io.LimitReader(resp.Body, 1<<20)) // 最大 1MB if err != nil { return nil, fmt.Errorf("read body: %w", err) } if resp.StatusCode != http.StatusOK { return nil, fmt.Errorf("unexpected status %d: %s", resp.StatusCode, body) } // ✅ 严格解析 JSON,拒绝未知字段 var result map[string]any if err := json.Unmarshal(body, &result); err != nil { return nil, fmt.Errorf("invalid JSON response: %w", err) } return result, nil } ``` --- ## 26.5 输入验证 ```python from pydantic import BaseModel, validator, Field from typing import Annotated import re class CreateUserRequest(BaseModel): username: Annotated[str, Field(min_length=3, max_length=50, pattern=r'^[a-zA-Z0-9_]+$')] email: Annotated[str, Field(max_length=255)] age: Annotated[int, Field(ge=0, le=150)] @validator('email') def validate_email(cls, v): if not re.match(r'^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$', v): raise ValueError('Invalid email format') return v.lower() @validator('username') def no_sql_injection(cls, v): dangerous_patterns = ["'", '"', ';', '--', '/*', '*/', 'DROP', 'DELETE', 'UPDATE'] for pattern in dangerous_patterns: if pattern.upper() in v.upper(): raise ValueError('Invalid characters in username') return v ``` --- ## 26.6 速率限制 ```python from fastapi import FastAPI, Request from slowapi import Limiter, _rate_limit_exceeded_handler from slowapi.util import get_remote_address from slowapi.errors import RateLimitExceeded limiter = Limiter(key_func=get_remote_address) app = FastAPI() app.state.limiter = limiter app.add_exception_handler(RateLimitExceeded, _rate_limit_exceeded_handler) @app.get("/api/search") @limiter.limit("30/minute") # 每分钟 30 次 async def search(request: Request, q: str): return {"results": [...]} @app.post("/api/login") @limiter.limit("5/minute") # 登录更严格 async def login(request: Request): return {"token": "..."} ``` --- ## 26.7 三语言 API 安全完整模板 ### 26.7.1 Python — FastAPI 完整安全 API 模板 ```python """ FastAPI 完整安全 API 模板 功能:认证 + 授权 + 输入验证 + Rate Limit + 审计日志 + API Key 管理 依赖: pip install fastapi uvicorn python-jose[cryptography] slowapi pydantic sqlalchemy """ from fastapi import FastAPI, Depends, HTTPException, Request, Security, Header from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials, APIKeyHeader from fastapi.middleware.cors import CORSMiddleware from jose import jwt, JWTError from pydantic import BaseModel, Field, field_validator from slowapi import Limiter, _rate_limit_exceeded_handler from slowapi.util import get_remote_address from slowapi.errors import RateLimitExceeded from starlette.middleware.base import BaseHTTPMiddleware from typing import Annotated from datetime import datetime, timezone from enum import Enum import hashlib import hmac import logging import secrets import time import re # ── 应用初始化 ── app = FastAPI( title="Secure API", version="2.0.0", docs_url="/docs", # 生产环境设为 None ) # 日志 logger = logging.getLogger("audit") logging.basicConfig(level=logging.INFO) # 速率限制 limiter = Limiter(key_func=get_remote_address, default_limits=["200/hour"]) app.state.limiter = limiter app.add_exception_handler(RateLimitExceeded, _rate_limit_exceeded_handler) # CORS app.add_middleware( CORSMiddleware, allow_origins=["https://app.example.com"], allow_methods=["GET", "POST", "PUT", "DELETE"], allow_headers=["Authorization", "Content-Type", "X-API-Key"], allow_credentials=True, max_age=3600, ) # ── 审计日志中间件 ── class AuditLogMiddleware(BaseHTTPMiddleware): """记录所有 API 请求的审计日志""" async def dispatch(self, request: Request, call_next): start_time = time.time() response = await call_next(request) duration = time.time() - start_time # 安全响应头 response.headers["X-Content-Type-Options"] = "nosniff" response.headers["X-Frame-Options"] = "DENY" response.headers["Strict-Transport-Security"] = ( "max-age=31536000; includeSubDomains" ) # 审计日志 logger.info( "API_REQUEST", extra={ "method": request.method, "path": request.url.path, "status": response.status_code, "duration_ms": round(duration * 1000, 2), "client_ip": request.client.host if request.client else "unknown", "user_agent": request.headers.get("user-agent", ""), }, ) return response app.add_middleware(AuditLogMiddleware) # ── 认证:JWT ── JWT_SECRET = "your-256-bit-secret" # 生产环境从环境变量读取 JWT_ALGORITHM = "HS256" bearer_scheme = HTTPBearer() class UserRole(str, Enum): USER = "user" ADMIN = "admin" SUPERADMIN = "superadmin" class CurrentUser(BaseModel): id: str email: str | None = None roles: list[str] = [] async def get_current_user( credentials: HTTPAuthorizationCredentials = Security(bearer_scheme), ) -> CurrentUser: """从 JWT 中提取并验证用户信息""" try: payload = jwt.decode( credentials.credentials, JWT_SECRET, algorithms=[JWT_ALGORITHM], options={"require_exp": True, "require_sub": True}, ) return CurrentUser( id=payload["sub"], email=payload.get("email"), roles=payload.get("roles", []), ) except JWTError as e: raise HTTPException(status_code=401, detail=f"Invalid token: {e}") def require_role(*roles: str): """角色授权依赖""" async def checker(user: CurrentUser = Depends(get_current_user)): if not any(r in user.roles for r in roles): raise HTTPException(403, f"Requires role: {roles}") return user return checker # ── 认证:API Key ── API_KEY_HEADER = APIKeyHeader(name="X-API-Key", auto_error=False) # 模拟 API Key 存储(生产环境用数据库 + 哈希存储) API_KEYS_DB: dict[str, dict] = {} def generate_api_key() -> tuple[str, str]: """生成 API Key 和对应的哈希""" key = secrets.token_urlsafe(32) key_hash = hashlib.sha256(key.encode()).hexdigest() return key, key_hash async def verify_api_key( api_key: str | None = Security(API_KEY_HEADER), ) -> dict: """验证 API Key""" if not api_key: raise HTTPException(401, "Missing API Key") key_hash = hashlib.sha256(api_key.encode()).hexdigest() key_info = API_KEYS_DB.get(key_hash) if not key_info: raise HTTPException(401, "Invalid API Key") if key_info.get("revoked"): raise HTTPException(401, "API Key has been revoked") if key_info.get("expires_at") and key_info["expires_at"] < datetime.now(timezone.utc): raise HTTPException(401, "API Key has expired") return key_info # ── 输入验证模型 ── class CreateOrderRequest(BaseModel): product_id: Annotated[str, Field( min_length=1, max_length=50, pattern=r"^[a-zA-Z0-9_-]+$", )] quantity: Annotated[int, Field(ge=1, le=1000)] shipping_address: Annotated[str, Field(min_length=10, max_length=500)] @field_validator("shipping_address") @classmethod def sanitize_address(cls, v: str) -> str: # 移除潜在的 XSS 内容 v = re.sub(r"<[^>]*>", "", v) return v.strip() # ── API 端点 ── @app.get("/api/v2/health") async def health(): return {"status": "ok", "timestamp": datetime.now(timezone.utc).isoformat()} @app.get("/api/v2/orders") @limiter.limit("60/minute") async def list_orders( request: Request, user: CurrentUser = Depends(get_current_user), page: Annotated[int, Field(ge=1, le=1000)] = 1, size: Annotated[int, Field(ge=1, le=100)] = 20, ): """列出当前用户的订单(带分页)""" logger.info(f"User {user.id} listing orders, page={page}") return { "orders": [], "page": page, "size": size, "user_id": user.id, } @app.post("/api/v2/orders", status_code=201) @limiter.limit("10/minute") async def create_order( request: Request, order: CreateOrderRequest, user: CurrentUser = Depends(get_current_user), ): """创建订单""" logger.info(f"User {user.id} creating order for product {order.product_id}") return { "order_id": secrets.token_hex(8), "product_id": order.product_id, "quantity": order.quantity, "created_by": user.id, } @app.get("/api/v2/admin/users") @limiter.limit("30/minute") async def admin_list_users( request: Request, user: CurrentUser = Depends(require_role("admin", "superadmin")), ): """管理员:列出所有用户""" return {"users": [], "requested_by": user.id} # ── API Key 管理端点 ── @app.post("/api/v2/api-keys", status_code=201) async def create_api_key( user: CurrentUser = Depends(require_role("admin")), ): """创建新的 API Key(仅管理员)""" key, key_hash = generate_api_key() API_KEYS_DB[key_hash] = { "owner": user.id, "created_at": datetime.now(timezone.utc).isoformat(), "revoked": False, "expires_at": None, } return { "api_key": key, # ⚠️ 仅在创建时返回一次 "message": "Store this key securely. It will not be shown again.", } @app.get("/api/v2/external/data") async def external_data(key_info: dict = Depends(verify_api_key)): """使用 API Key 认证的外部接口""" return {"data": "external data", "key_owner": key_info["owner"]} ``` ### 26.7.2 Java — Spring Boot API 安全模板 ```java package com.example.secureapi; import com.fasterxml.jackson.annotation.JsonProperty; import jakarta.validation.Valid; import jakarta.validation.constraints.*; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.http.HttpStatus; import org.springframework.http.ResponseEntity; import org.springframework.security.access.prepost.PreAuthorize; import org.springframework.security.config.annotation.method.configuration.EnableMethodSecurity; import org.springframework.security.config.annotation.web.builders.HttpSecurity; import org.springframework.security.config.annotation.web.configuration.EnableWebSecurity; import org.springframework.security.config.http.SessionCreationPolicy; import org.springframework.security.core.annotation.AuthenticationPrincipal; import org.springframework.security.oauth2.jwt.Jwt; import org.springframework.security.web.SecurityFilterChain; import org.springframework.web.bind.annotation.*; import org.springframework.web.filter.OncePerRequestFilter; import jakarta.servlet.FilterChain; import jakarta.servlet.ServletException; import jakarta.servlet.http.HttpServletRequest; import jakarta.servlet.http.HttpServletResponse; import java.io.IOException; import java.time.Instant; import java.util.*; @SpringBootApplication public class SecureApiApplication { public static void main(String[] args) { SpringApplication.run(SecureApiApplication.class, args); } } // ── 安全配置 ── @Configuration @EnableWebSecurity @EnableMethodSecurity class ApiSecurityConfig { @Bean public SecurityFilterChain filterChain(HttpSecurity http) throws Exception { http .sessionManagement(s -> s.sessionCreationPolicy(SessionCreationPolicy.STATELESS)) .csrf(csrf -> csrf.disable()) .cors(cors -> cors.configurationSource(request -> { var config = new org.springframework.web.cors.CorsConfiguration(); config.setAllowedOrigins(List.of("https://app.example.com")); config.setAllowedMethods(List.of("GET", "POST", "PUT", "DELETE")); config.setAllowedHeaders(List.of("Authorization", "Content-Type")); config.setAllowCredentials(true); config.setMaxAge(3600L); return config; })) .headers(headers -> headers .contentTypeOptions(cto -> {}) .frameOptions(fo -> fo.deny()) .httpStrictTransportSecurity(hsts -> hsts .includeSubDomains(true) .maxAgeInSeconds(31536000)) ) .authorizeHttpRequests(auth -> auth .requestMatchers("/api/v2/health").permitAll() .requestMatchers("/api/v2/admin/**").hasRole("ADMIN") .anyRequest().authenticated() ) .oauth2ResourceServer(oauth2 -> oauth2 .jwt(jwt -> {}) ) // 审计日志过滤器 .addFilterBefore(new AuditLogFilter(), org.springframework.security.web.authentication.UsernamePasswordAuthenticationFilter.class); return http.build(); } } // ── 审计日志过滤器 ── class AuditLogFilter extends OncePerRequestFilter { private static final Logger auditLog = LoggerFactory.getLogger("AUDIT"); @Override protected void doFilterInternal( HttpServletRequest request, HttpServletResponse response, FilterChain filterChain) throws ServletException, IOException { long startTime = System.currentTimeMillis(); filterChain.doFilter(request, response); long duration = System.currentTimeMillis() - startTime; auditLog.info("API_REQUEST method={} path={} status={} duration_ms={} client_ip={} user_agent={}", request.getMethod(), request.getRequestURI(), response.getStatus(), duration, request.getRemoteAddr(), request.getHeader("User-Agent") ); } } // ── 输入验证 DTO ── record CreateOrderRequest( @NotBlank @Size(min = 1, max = 50) @Pattern(regexp = "^[a-zA-Z0-9_-]+$", message = "Invalid product ID format") @JsonProperty("product_id") String productId, @NotNull @Min(1) @Max(1000) Integer quantity, @NotBlank @Size(min = 10, max = 500) @JsonProperty("shipping_address") String shippingAddress ) {} record OrderResponse( @JsonProperty("order_id") String orderId, @JsonProperty("product_id") String productId, Integer quantity, @JsonProperty("created_by") String createdBy, @JsonProperty("created_at") Instant createdAt ) {} // ── API 控制器 ── @RestController @RequestMapping("/api/v2") class SecureApiController { private static final Logger log = LoggerFactory.getLogger(SecureApiController.class); @GetMapping("/health") public Map health() { return Map.of("status", "ok", "timestamp", Instant.now().toString()); } /** * 列出当前用户的订单 */ @GetMapping("/orders") public Map listOrders( @AuthenticationPrincipal Jwt jwt, @RequestParam(defaultValue = "1") @Min(1) @Max(1000) int page, @RequestParam(defaultValue = "20") @Min(1) @Max(100) int size) { String userId = jwt.getSubject(); log.info("User {} listing orders, page={}", userId, page); return Map.of( "orders", List.of(), "page", page, "size", size, "user_id", userId ); } /** * 创建订单 */ @PostMapping("/orders") public ResponseEntity createOrder( @Valid @RequestBody CreateOrderRequest request, @AuthenticationPrincipal Jwt jwt) { String userId = jwt.getSubject(); log.info("User {} creating order for product {}", userId, request.productId()); OrderResponse response = new OrderResponse( UUID.randomUUID().toString(), request.productId(), request.quantity(), userId, Instant.now() ); return ResponseEntity.status(HttpStatus.CREATED).body(response); } /** * 管理员:列出所有用户 */ @GetMapping("/admin/users") @PreAuthorize("hasRole('ADMIN')") public Map adminListUsers(@AuthenticationPrincipal Jwt jwt) { return Map.of("users", List.of(), "requested_by", jwt.getSubject()); } } // ── API 版本管理与安全 ── @Configuration class ApiVersionConfig { /** * v1 弃用过滤器:为旧版 API 添加 Deprecation 头 */ @Bean public OncePerRequestFilter apiVersionFilter() { return new OncePerRequestFilter() { @Override protected void doFilterInternal( HttpServletRequest request, HttpServletResponse response, FilterChain filterChain) throws ServletException, IOException { filterChain.doFilter(request, response); // 为 v1 API 添加弃用警告头 if (request.getRequestURI().startsWith("/api/v1/")) { response.setHeader("Deprecation", "true"); response.setHeader("Sunset", "2025-12-31"); response.setHeader("Link", "; rel=\"successor-version\""); } } }; } } // ── API Key + HMAC 签名验证 Filter ── /** * HmacSignatureFilter — 验证 API Key + HMAC 请求签名 * * 请求头: * X-API-Key: 客户端 API Key * X-Timestamp: RFC3339 时间戳(防重放,允许 ±5 分钟偏差) * X-Signature: HMAC-SHA256(apiKey + "\n" + timestamp + "\n" + method + "\n" + path + "\n" + bodyHash) * * 签名密钥从 API Key 关联的 secret 中获取。 */ @Component class HmacSignatureFilter extends OncePerRequestFilter { private static final Logger log = LoggerFactory.getLogger(HmacSignatureFilter.class); private static final long MAX_CLOCK_SKEW_SECONDS = 300; // 5 分钟 // 生产环境应注入 ApiKeyRepository private final Map apiKeyStore = new ConcurrentHashMap<>(); record ApiKeyRecord(String owner, String hmacSecret, Instant createdAt, boolean revoked) {} @Override protected boolean shouldNotFilter(HttpServletRequest request) { // 仅对 /api/v2/external/** 路径启用 HMAC 验证 return !request.getRequestURI().startsWith("/api/v2/external/"); } @Override protected void doFilterInternal( HttpServletRequest request, HttpServletResponse response, FilterChain filterChain) throws ServletException, IOException { String apiKey = request.getHeader("X-API-Key"); String timestamp = request.getHeader("X-Timestamp"); String signature = request.getHeader("X-Signature"); if (apiKey == null || timestamp == null || signature == null) { sendError(response, HttpStatus.UNAUTHORIZED, "Missing X-API-Key, X-Timestamp, or X-Signature header"); return; } // 1. 时间戳校验(防重放攻击) Instant ts; try { ts = Instant.parse(timestamp); } catch (Exception e) { sendError(response, HttpStatus.BAD_REQUEST, "Invalid timestamp format, expected ISO-8601 / RFC3339"); return; } if (Math.abs(Duration.between(Instant.now(), ts).getSeconds()) > MAX_CLOCK_SKEW_SECONDS) { sendError(response, HttpStatus.UNAUTHORIZED, "Request timestamp too old or too far in the future"); return; } // 2. 查找 API Key(按哈希查找) String keyHash = sha256Hex(apiKey); ApiKeyRecord record = apiKeyStore.get(keyHash); if (record == null || record.revoked()) { sendError(response, HttpStatus.UNAUTHORIZED, "Invalid or revoked API key"); return; } // 3. 读取请求体并计算 body hash CachedBodyHttpServletRequest wrappedRequest = new CachedBodyHttpServletRequest(request); String bodyHash = sha256Hex(new String(wrappedRequest.getCachedBody(), StandardCharsets.UTF_8)); // 4. 构造签名消息并计算预期签名 String message = String.join("\n", apiKey, timestamp, request.getMethod(), request.getRequestURI(), bodyHash); String expectedSig = hmacSha256Hex(message, record.hmacSecret()); // 5. 常量时间比较(防时序攻击) if (!MessageDigest.isEqual( hexDecode(signature), hexDecode(expectedSig))) { log.warn("HMAC signature mismatch for key owner={}", record.owner()); sendError(response, HttpStatus.UNAUTHORIZED, "Invalid signature"); return; } // 签名验证通过,设置属性供下游使用 request.setAttribute("apiKeyOwner", record.owner()); filterChain.doFilter(wrappedRequest, response); } // ── 工具方法 ── private static String sha256Hex(String data) { try { byte[] hash = MessageDigest.getInstance("SHA-256") .digest(data.getBytes(StandardCharsets.UTF_8)); return HexFormat.of().formatHex(hash); } catch (NoSuchAlgorithmException e) { throw new RuntimeException(e); } } private static String hmacSha256Hex(String data, String secret) { try { Mac mac = Mac.getInstance("HmacSHA256"); mac.init(new SecretKeySpec(secret.getBytes(StandardCharsets.UTF_8), "HmacSHA256")); byte[] sig = mac.doFinal(data.getBytes(StandardCharsets.UTF_8)); return HexFormat.of().formatHex(sig); } catch (Exception e) { throw new RuntimeException(e); } } private static byte[] hexDecode(String hex) { return HexFormat.of().parseHex(hex); } private void sendError(HttpServletResponse response, HttpStatus status, String message) throws IOException { response.setStatus(status.value()); response.setContentType("application/json"); response.getWriter().write("{\"error\":\"" + message + "\"}"); } } /** * CachedBodyHttpServletRequest — 缓存请求体以便多次读取 * (签名验证需要读取 body,后续 Controller 也需要读取) */ class CachedBodyHttpServletRequest extends HttpServletRequestWrapper { private final byte[] cachedBody; public CachedBodyHttpServletRequest(HttpServletRequest request) throws IOException { super(request); this.cachedBody = request.getInputStream().readAllBytes(); } public byte[] getCachedBody() { return cachedBody; } @Override public ServletInputStream getInputStream() { ByteArrayInputStream bais = new ByteArrayInputStream(cachedBody); return new ServletInputStream() { @Override public int read() { return bais.read(); } @Override public boolean isFinished() { return bais.available() == 0; } @Override public boolean isReady() { return true; } @Override public void setReadListener(ReadListener listener) {} }; } @Override public BufferedReader getReader() { return new BufferedReader(new InputStreamReader(getInputStream(), StandardCharsets.UTF_8)); } } ``` ### 26.7.3 Go — Gin API 安全模板 ```go /* Gin 完整安全 API 模板 功能:JWT 认证 + RBAC 授权 + 输入验证 + Rate Limit + 审计日志 + API Key + HMAC 签名 依赖: go get github.com/gin-gonic/gin github.com/go-jose/go-jose/v3 golang.org/x/time/rate */ package main import ( "crypto/hmac" "crypto/sha256" "crypto/subtle" "encoding/hex" "fmt" "log" "net/http" "strings" "sync" "time" "github.com/gin-gonic/gin" "github.com/gin-gonic/gin/binding" "github.com/go-playground/validator/v10" "golang.org/x/time/rate" ) // ── 配置 ── type Config struct { JWTSecret string HMACSecret string AllowedOrigin string } var cfg = Config{ JWTSecret: "your-256-bit-secret", // 生产环境从环境变量读取 HMACSecret: "hmac-signing-secret", AllowedOrigin: "https://app.example.com", } // ── 审计日志中间件 ── func AuditLogMiddleware() gin.HandlerFunc { return func(c *gin.Context) { start := time.Now() c.Next() duration := time.Since(start) userID := "anonymous" if id, exists := c.Get("userID"); exists { userID = id.(string) } log.Printf("AUDIT method=%s path=%s status=%d duration=%v user=%s ip=%s ua=%s", c.Request.Method, c.Request.URL.Path, c.Writer.Status(), duration, userID, c.ClientIP(), c.Request.UserAgent(), ) } } // ── 安全响应头中间件 ── func SecurityHeaders() gin.HandlerFunc { return func(c *gin.Context) { h := c.Writer.Header() h.Set("X-Content-Type-Options", "nosniff") h.Set("X-Frame-Options", "DENY") h.Set("Strict-Transport-Security", "max-age=31536000; includeSubDomains") h.Set("Content-Security-Policy", "default-src 'self'") h.Set("Referrer-Policy", "strict-origin-when-cross-origin") h.Set("Cache-Control", "no-store") c.Next() } } // ── CORS 中间件 ── func CORSMiddleware(allowedOrigin string) gin.HandlerFunc { return func(c *gin.Context) { h := c.Writer.Header() h.Set("Access-Control-Allow-Origin", allowedOrigin) h.Set("Access-Control-Allow-Methods", "GET, POST, PUT, DELETE, OPTIONS") h.Set("Access-Control-Allow-Headers", "Authorization, Content-Type, X-API-Key, X-Signature") h.Set("Access-Control-Allow-Credentials", "true") h.Set("Access-Control-Max-Age", "3600") if c.Request.Method == http.MethodOptions { c.AbortWithStatus(http.StatusNoContent) return } c.Next() } } // ── 速率限制中间件 ── type IPRateLimiter struct { limiters map[string]*rate.Limiter mu sync.Mutex rate rate.Limit burst int } func NewIPRateLimiter(r rate.Limit, burst int) *IPRateLimiter { return &IPRateLimiter{ limiters: make(map[string]*rate.Limiter), rate: r, burst: burst, } } func (rl *IPRateLimiter) GetLimiter(ip string) *rate.Limiter { rl.mu.Lock() defer rl.mu.Unlock() limiter, exists := rl.limiters[ip] if !exists { limiter = rate.NewLimiter(rl.rate, rl.burst) rl.limiters[ip] = limiter } return limiter } func RateLimitMiddleware(rl *IPRateLimiter) gin.HandlerFunc { return func(c *gin.Context) { limiter := rl.GetLimiter(c.ClientIP()) if !limiter.Allow() { c.AbortWithStatusJSON(http.StatusTooManyRequests, gin.H{ "error": "rate limit exceeded", "retry_after": "1s", }) return } c.Next() } } // ── JWT 认证中间件 ── func JWTAuthMiddleware(secret string) gin.HandlerFunc { return func(c *gin.Context) { authHeader := c.GetHeader("Authorization") if authHeader == "" { c.AbortWithStatusJSON(http.StatusUnauthorized, gin.H{ "error": "missing Authorization header", }) return } parts := strings.SplitN(authHeader, " ", 2) if len(parts) != 2 || !strings.EqualFold(parts[0], "bearer") { c.AbortWithStatusJSON(http.StatusUnauthorized, gin.H{ "error": "invalid Authorization format, expected: Bearer ", }) return } // 简化示例:实际应使用 go-jose 验证 RS256 签名 // 这里演示 HS256 对称签名验证 claims, err := parseAndValidateJWT(parts[1], secret) if err != nil { c.AbortWithStatusJSON(http.StatusUnauthorized, gin.H{ "error": "invalid token: " + err.Error(), }) return } c.Set("userID", claims.Subject) c.Set("email", claims.Email) c.Set("roles", claims.Roles) c.Next() } } // SimpleClaims 简化的 JWT Claims type SimpleClaims struct { Subject string `json:"sub"` Email string `json:"email"` Roles []string `json:"roles"` Exp int64 `json:"exp"` } func parseAndValidateJWT(tokenStr, secret string) (*SimpleClaims, error) { // 生产环境应使用 go-jose 进行完整的 JWT 验证 // 此处为简化示例 _ = secret return &SimpleClaims{ Subject: "user-123", Email: "user@example.com", Roles: []string{"user"}, }, nil } // ── RBAC 授权中间件 ── func RequireRole(roles ...string) gin.HandlerFunc { return func(c *gin.Context) { userRoles, exists := c.Get("roles") if !exists { c.AbortWithStatusJSON(http.StatusForbidden, gin.H{ "error": "no roles in context", }) return } roleList, ok := userRoles.([]string) if !ok { c.AbortWithStatusJSON(http.StatusForbidden, gin.H{ "error": "invalid roles format", }) return } for _, required := range roles { for _, has := range roleList { if has == required { c.Next() return } } } c.AbortWithStatusJSON(http.StatusForbidden, gin.H{ "error": fmt.Sprintf("requires one of roles: %v", roles), }) } } // ── API Key + HMAC 签名验证中间件 ── // apiKeyStore 模拟 API Key 存储(生产环境用数据库) var apiKeyStore = map[string]APIKeyInfo{ // key hash -> info } type APIKeyInfo struct { Owner string Secret string // 用于 HMAC 签名的密钥 CreatedAt time.Time Revoked bool } // HMACSignatureMiddleware 验证 API Key + HMAC 请求签名 // 签名算法: HMAC-SHA256(api_key + timestamp + method + path + body_hash) func HMACSignatureMiddleware() gin.HandlerFunc { return func(c *gin.Context) { apiKey := c.GetHeader("X-API-Key") signature := c.GetHeader("X-Signature") timestamp := c.GetHeader("X-Timestamp") if apiKey == "" || signature == "" || timestamp == "" { c.AbortWithStatusJSON(http.StatusUnauthorized, gin.H{ "error": "missing X-API-Key, X-Signature, or X-Timestamp header", }) return } // 1. 检查时间戳(防止重放攻击,允许 5 分钟偏差) ts, err := time.Parse(time.RFC3339, timestamp) if err != nil { c.AbortWithStatusJSON(http.StatusBadRequest, gin.H{ "error": "invalid timestamp format, expected RFC3339", }) return } if time.Since(ts).Abs() > 5*time.Minute { c.AbortWithStatusJSON(http.StatusUnauthorized, gin.H{ "error": "request timestamp too old or too far in the future", }) return } // 2. 查找 API Key keyHash := sha256Hash(apiKey) keyInfo, exists := apiKeyStore[keyHash] if !exists || keyInfo.Revoked { c.AbortWithStatusJSON(http.StatusUnauthorized, gin.H{ "error": "invalid or revoked API key", }) return } // 3. 计算预期签名 message := fmt.Sprintf("%s\n%s\n%s\n%s", apiKey, timestamp, c.Request.Method, c.Request.URL.Path) expectedSig := computeHMAC(message, keyInfo.Secret) // 4. 常量时间比较(防止时序攻击) sigBytes, err := hex.DecodeString(signature) if err != nil { c.AbortWithStatusJSON(http.StatusBadRequest, gin.H{ "error": "invalid signature encoding", }) return } expectedBytes, _ := hex.DecodeString(expectedSig) if subtle.ConstantTimeCompare(sigBytes, expectedBytes) != 1 { c.AbortWithStatusJSON(http.StatusUnauthorized, gin.H{ "error": "invalid signature", }) return } c.Set("apiKeyOwner", keyInfo.Owner) c.Next() } } func sha256Hash(data string) string { h := sha256.Sum256([]byte(data)) return hex.EncodeToString(h[:]) } func computeHMAC(message, secret string) string { mac := hmac.New(sha256.New, []byte(secret)) mac.Write([]byte(message)) return hex.EncodeToString(mac.Sum(nil)) } // ── 输入验证模型 ── type CreateOrderRequest struct { ProductID string `json:"product_id" binding:"required,min=1,max=50,alphanumdash"` Quantity int `json:"quantity" binding:"required,min=1,max=1000"` ShippingAddress string `json:"shipping_address" binding:"required,min=10,max=500"` } type OrderResponse struct { OrderID string `json:"order_id"` ProductID string `json:"product_id"` Quantity int `json:"quantity"` CreatedBy string `json:"created_by"` CreatedAt time.Time `json:"created_at"` } // ── 主函数 ── func main() { // 注册自定义验证器 if v, ok := binding.Validator.Engine().(*validator.Validate); ok { v.RegisterValidation("alphanumdash", func(fl validator.FieldLevel) bool { for _, c := range fl.Field().String() { if !((c >= 'a' && c <= 'z') || (c >= 'A' && c <= 'Z') || (c >= '0' && c <= '9') || c == '_' || c == '-') { return false } } return true }) } r := gin.New() r.Use(gin.Recovery()) // 全局中间件 rateLimiter := NewIPRateLimiter(10, 20) // 每秒 10 请求,突发 20 r.Use(SecurityHeaders()) r.Use(CORSMiddleware(cfg.AllowedOrigin)) r.Use(RateLimitMiddleware(rateLimiter)) r.Use(AuditLogMiddleware()) // 公开端点 r.GET("/api/v2/health", func(c *gin.Context) { c.JSON(http.StatusOK, gin.H{ "status": "ok", "timestamp": time.Now().UTC().Format(time.RFC3339), }) }) // JWT 认证路由组 auth := r.Group("/api/v2") auth.Use(JWTAuthMiddleware(cfg.JWTSecret)) { auth.GET("/orders", listOrders) auth.POST("/orders", createOrder) } // 管理员路由组 admin := r.Group("/api/v2/admin") admin.Use(JWTAuthMiddleware(cfg.JWTSecret)) admin.Use(RequireRole("admin", "superadmin")) { admin.GET("/users", adminListUsers) } // HMAC 签名认证路由组(用于服务间调用) external := r.Group("/api/v2/external") external.Use(HMACSignatureMiddleware()) { external.GET("/data", func(c *gin.Context) { owner, _ := c.Get("apiKeyOwner") c.JSON(http.StatusOK, gin.H{ "data": "external data", "key_owner": owner, }) }) } log.Fatal(r.RunTLS(":8443", "server.crt", "server.key")) } func listOrders(c *gin.Context) { userID, _ := c.Get("userID") page := c.DefaultQuery("page", "1") size := c.DefaultQuery("size", "20") c.JSON(http.StatusOK, gin.H{ "orders": []any{}, "page": page, "size": size, "user_id": userID, }) } func createOrder(c *gin.Context) { var req CreateOrderRequest if err := c.ShouldBindJSON(&req); err != nil { c.JSON(http.StatusBadRequest, gin.H{"error": err.Error()}) return } userID, _ := c.Get("userID") c.JSON(http.StatusCreated, OrderResponse{ OrderID: fmt.Sprintf("%x", time.Now().UnixNano()), ProductID: req.ProductID, Quantity: req.Quantity, CreatedBy: userID.(string), CreatedAt: time.Now().UTC(), }) } func adminListUsers(c *gin.Context) { userID, _ := c.Get("userID") c.JSON(http.StatusOK, gin.H{ "users": []any{}, "requested_by": userID, }) } ``` ### 26.7.4 Go — 请求签名验证(类似 AWS Signature V4) ```go /* AWS Signature V4 风格的请求签名验证中间件 签名流程(客户端): 1. 创建规范请求 (Canonical Request) CanonicalRequest = Method + "\n" + Path + "\n" + QueryString + "\n" + CanonicalHeaders + "\n" + SignedHeaders + "\n" + SHA256(Body) 2. 创建待签字符串 (String to Sign) StringToSign = Algorithm + "\n" + Timestamp + "\n" + CredentialScope + "\n" + SHA256(CanonicalRequest) 3. 派生签名密钥 (Signing Key) DateKey = HMAC-SHA256("MYAPI" + SecretKey, Date) RegionKey = HMAC-SHA256(DateKey, Region) ServiceKey = HMAC-SHA256(RegionKey, Service) SigningKey = HMAC-SHA256(ServiceKey, "myapi_request") 4. 计算签名 Signature = Hex(HMAC-SHA256(SigningKey, StringToSign)) Authorization 头格式: MYAPI4-HMAC-SHA256 Credential=////myapi_request, SignedHeaders=, Signature= */ package sigv4 import ( "bytes" "crypto/hmac" "crypto/sha256" "encoding/hex" "fmt" "io" "net/http" "sort" "strings" "time" "github.com/gin-gonic/gin" ) const ( algorithm = "MYAPI4-HMAC-SHA256" maxClockSkew = 5 * time.Minute signingTrailer = "myapi_request" ) // Credential 存储访问凭证(生产环境从数据库/Vault 加载) type Credential struct { AccessKey string SecretKey string Owner string Active bool } // CredentialStore 凭证存储接口 type CredentialStore interface { Lookup(accessKey string) (*Credential, error) } // SigV4Middleware 验证类似 AWS Signature V4 的请求签名 func SigV4Middleware(store CredentialStore) gin.HandlerFunc { return func(c *gin.Context) { authHeader := c.GetHeader("Authorization") if authHeader == "" { c.AbortWithStatusJSON(http.StatusUnauthorized, gin.H{ "error": "missing Authorization header", }) return } // ── 1. 解析 Authorization 头 ── params, err := parseAuthHeader(authHeader) if err != nil { c.AbortWithStatusJSON(http.StatusBadRequest, gin.H{ "error": "malformed Authorization header: " + err.Error(), }) return } // ── 2. 解析 Credential scope ── // 格式: AccessKey/Date/Region/Service/myapi_request scopeParts := strings.Split(params.credential, "/") if len(scopeParts) != 5 || scopeParts[4] != signingTrailer { c.AbortWithStatusJSON(http.StatusBadRequest, gin.H{ "error": "invalid credential scope", }) return } accessKey := scopeParts[0] dateStr := scopeParts[1] // 20260321 region := scopeParts[2] // e.g. "cn-east-1" service := scopeParts[3] // e.g. "order-api" // ── 3. 查找凭证 ── cred, err := store.Lookup(accessKey) if err != nil || cred == nil || !cred.Active { c.AbortWithStatusJSON(http.StatusUnauthorized, gin.H{ "error": "invalid or inactive access key", }) return } // ── 4. 时间戳校验 ── reqTimestamp := c.GetHeader("X-Myapi-Date") reqTime, err := time.Parse("20060102T150405Z", reqTimestamp) if err != nil { c.AbortWithStatusJSON(http.StatusBadRequest, gin.H{ "error": "invalid X-Myapi-Date, expected format: 20060102T150405Z", }) return } if time.Since(reqTime).Abs() > maxClockSkew { c.AbortWithStatusJSON(http.StatusUnauthorized, gin.H{ "error": "request timestamp outside allowed skew", }) return } // ── 5. 读取并缓存请求体 ── bodyBytes, err := io.ReadAll(io.LimitReader(c.Request.Body, 1<<20)) // 最大 1MB if err != nil { c.AbortWithStatusJSON(http.StatusBadRequest, gin.H{ "error": "failed to read request body", }) return } c.Request.Body = io.NopCloser(bytes.NewReader(bodyBytes)) // ── 6. 构造规范请求 ── canonicalHeaders, signedHeaders := buildCanonicalHeaders(c.Request, params.signedHeaders) bodyHash := sha256Hex(bodyBytes) canonicalRequest := strings.Join([]string{ c.Request.Method, c.Request.URL.Path, c.Request.URL.RawQuery, canonicalHeaders, signedHeaders, bodyHash, }, "\n") // ── 7. 构造待签字符串 ── credentialScope := fmt.Sprintf("%s/%s/%s/%s", dateStr, region, service, signingTrailer) stringToSign := strings.Join([]string{ algorithm, reqTimestamp, credentialScope, sha256Hex([]byte(canonicalRequest)), }, "\n") // ── 8. 派生签名密钥 ── signingKey := deriveSigningKey(cred.SecretKey, dateStr, region, service) // ── 9. 计算并比较签名 ── expectedSig := hmacSHA256Hex(signingKey, []byte(stringToSign)) if !hmac.Equal([]byte(params.signature), []byte(expectedSig)) { c.AbortWithStatusJSON(http.StatusUnauthorized, gin.H{ "error": "signature mismatch", }) return } // 签名验证通过 c.Set("sigv4Owner", cred.Owner) c.Set("sigv4AccessKey", accessKey) c.Next() } } // ── 内部类型与辅助函数 ── type authParams struct { credential string signedHeaders []string signature string } func parseAuthHeader(header string) (*authParams, error) { if !strings.HasPrefix(header, algorithm+" ") { return nil, fmt.Errorf("unsupported algorithm, expected %s", algorithm) } parts := strings.TrimPrefix(header, algorithm+" ") params := &authParams{} for _, segment := range strings.Split(parts, ",") { kv := strings.SplitN(strings.TrimSpace(segment), "=", 2) if len(kv) != 2 { continue } switch kv[0] { case "Credential": params.credential = kv[1] case "SignedHeaders": params.signedHeaders = strings.Split(kv[1], ";") case "Signature": params.signature = kv[1] } } if params.credential == "" || params.signature == "" || len(params.signedHeaders) == 0 { return nil, fmt.Errorf("missing required fields") } return params, nil } func buildCanonicalHeaders(req *http.Request, signed []string) (string, string) { sort.Strings(signed) var canonical strings.Builder for _, h := range signed { val := strings.TrimSpace(req.Header.Get(h)) canonical.WriteString(strings.ToLower(h) + ":" + val + "\n") } return canonical.String(), strings.Join(signed, ";") } func deriveSigningKey(secret, dateStr, region, service string) []byte { dateKey := hmacSHA256([]byte("MYAPI"+secret), []byte(dateStr)) regionKey := hmacSHA256(dateKey, []byte(region)) serviceKey := hmacSHA256(regionKey, []byte(service)) return hmacSHA256(serviceKey, []byte(signingTrailer)) } func hmacSHA256(key, data []byte) []byte { h := hmac.New(sha256.New, key) h.Write(data) return h.Sum(nil) } func hmacSHA256Hex(key, data []byte) string { return hex.EncodeToString(hmacSHA256(key, data)) } func sha256Hex(data []byte) string { h := sha256.Sum256(data) return hex.EncodeToString(h[:]) } ``` --- ## 26.8 GraphQL 安全 ```python # GraphQL 特有的安全问题和防御 # 1. 查询深度限制 # 防止:{ user { friends { friends { friends { ... } } } } } MAX_DEPTH = 5 # 2. 查询复杂度限制 # 防止:{ users(first: 1000000) { posts { comments { ... } } } } MAX_COMPLEXITY = 1000 # 3. 禁用内省(生产环境) # 防止攻击者发现所有 API 端点 INTROSPECTION_ENABLED = False # 生产环境禁用 ``` --- ## 26.9 gRPC 安全 ```python import grpc # mTLS gRPC 服务端 server_credentials = grpc.ssl_server_credentials( [(server_key, server_cert)], root_certificates=ca_cert, require_client_auth=True, # 要求客户端证书 ) server = grpc.server(futures.ThreadPoolExecutor()) server.add_secure_port('[::]:50051', server_credentials) # Token 拦截器 class AuthInterceptor(grpc.ServerInterceptor): def intercept_service(self, continuation, handler_call_details): metadata = dict(handler_call_details.invocation_metadata) token = metadata.get('authorization', '') if not verify_token(token): return grpc.unary_unary_rpc_method_handler( lambda req, ctx: ctx.abort(grpc.StatusCode.UNAUTHENTICATED, 'Invalid token') ) return continuation(handler_call_details) ``` --- ## 26.10 API 安全测试清单 ### 认证测试 | # | 测试项 | 方法 | 预期结果 | |---|--------|------|----------| | 1 | 无 Token 访问受保护端点 | `curl /api/orders` (无 Authorization) | 401 Unauthorized | | 2 | 过期 Token | 使用 exp 已过期的 JWT | 401 Unauthorized | | 3 | 篡改 Token payload | 修改 JWT payload 但不重签 | 401 Unauthorized | | 4 | 使用错误算法 | 将 RS256 Token 改为 none | 401 Unauthorized | | 5 | 暴力破解 | 连续发送 10 次错误密码 | 429 或账户锁定 | ### 授权测试 | # | 测试项 | 方法 | 预期结果 | |---|--------|------|----------| | 6 | BOLA:访问他人资源 | 用户 A 的 Token 访问用户 B 的订单 | 403 Forbidden | | 7 | 越权:普通用户访问管理端点 | 用 user 角色 Token 访问 /api/admin/* | 403 Forbidden | | 8 | 水平越权:修改他人数据 | PUT /api/users/other-user-id | 403 Forbidden | | 9 | 属性级越权 | 普通用户尝试修改 role 字段 | 字段被忽略或 400 | ### 输入验证测试 | # | 测试项 | 方法 | 预期结果 | |---|--------|------|----------| | 10 | SQL 注入 | `username: "admin' OR '1'='1"` | 400 Bad Request | | 11 | XSS | `name: ""` | 输入被拒绝或转义 | | 12 | 超长输入 | 发送 1MB 的 username | 400 或 413 | | 13 | 类型混淆 | `age: "not_a_number"` | 422 Validation Error | | 14 | 路径遍历 | `file: "../../etc/passwd"` | 400 Bad Request | ### 速率限制测试 | # | 测试项 | 方法 | 预期结果 | |---|--------|------|----------| | 15 | 超过速率限制 | 1 秒内发送 100 个请求 | 429 Too Many Requests | | 16 | 登录限流 | 连续 6 次登录请求 | 第 6 次返回 429 | | 17 | 分布式限流 | 从多个 IP 发送请求 | 按用户 ID 聚合限流 | ### 传输安全测试 | # | 测试项 | 方法 | 预期结果 | |---|--------|------|----------| | 18 | HTTP 访问 | `curl http://api.example.com/...` | 301 重定向到 HTTPS | | 19 | HSTS 头 | 检查响应头 | 包含 Strict-Transport-Security | | 20 | TLS 版本 | `nmap --script ssl-enum-ciphers` | 仅 TLS 1.2+ | ### 自动化测试脚本示例 ```bash #!/bin/bash # api_security_test.sh — API 安全快速检查脚本 BASE_URL="https://api.example.com" TOKEN="eyJhbGciOiJSUzI1NiIs..." # 有效的测试 Token echo "=== API Security Test Suite ===" # 1. 无认证访问 echo -n "[TEST 1] No auth: " STATUS=$(curl -s -o /dev/null -w "%{http_code}" "$BASE_URL/api/v2/orders") [ "$STATUS" = "401" ] && echo "PASS (401)" || echo "FAIL ($STATUS)" # 2. 安全头检查 echo -n "[TEST 2] Security headers: " HEADERS=$(curl -s -I "$BASE_URL/api/v2/health") echo "$HEADERS" | grep -q "X-Content-Type-Options: nosniff" && \ echo "$HEADERS" | grep -q "X-Frame-Options: DENY" && \ echo "$HEADERS" | grep -q "Strict-Transport-Security" && \ echo "PASS" || echo "FAIL" # 3. CORS 检查 echo -n "[TEST 3] CORS origin: " ORIGIN=$(curl -s -I -H "Origin: https://evil.com" "$BASE_URL/api/v2/health" | \ grep -i "Access-Control-Allow-Origin") echo "$ORIGIN" | grep -q "evil.com" && echo "FAIL (allows evil origin)" || echo "PASS" # 4. 速率限制 echo -n "[TEST 4] Rate limiting: " for i in $(seq 1 50); do STATUS=$(curl -s -o /dev/null -w "%{http_code}" \ -H "Authorization: Bearer $TOKEN" "$BASE_URL/api/v2/orders") if [ "$STATUS" = "429" ]; then echo "PASS (429 at request $i)" break fi done # 5. SQL 注入 echo -n "[TEST 5] SQL injection: " STATUS=$(curl -s -o /dev/null -w "%{http_code}" \ -H "Authorization: Bearer $TOKEN" \ "$BASE_URL/api/v2/users/search?q=admin'%20OR%20'1'='1") [ "$STATUS" = "400" ] || [ "$STATUS" = "422" ] && echo "PASS ($STATUS)" || echo "FAIL ($STATUS)" echo "=== Tests Complete ===" ``` --- ## 26.11 小结 - **BOLA** 是最常见的 API 安全问题,必须在每个请求中检查资源所有权 - **输入验证** 使用 Pydantic / Bean Validation / go-playground/validator 进行严格的类型和格式检查 - **速率限制** 防止暴力破解和资源耗尽,Token Bucket 适合大多数场景 - **GraphQL** 需要额外的深度限制、复杂度限制和内省控制 - **gRPC** 通过 mTLS 和拦截器实现安全通信 - **OWASP API Top 10** 是 API 安全的最低防御基线,每一条都需要对应的技术措施 - **API 安全测试** 应纳入 CI/CD 流水线,自动化执行