第二十五章:安全框架与库实战

“不要重新发明轮子 — 使用经过审计的安全库,站在巨人的肩膀上。”

        mindmap
  root((安全框架))
    Python
      FastAPI Security
      Authlib
      PyJWT
    Java
      Spring Security
      Keycloak Adapter
    Go
      go-jose
      casbin-go
    Node.js
      Passport.js
      Helmet
    选型原则
      社区活跃度
      安全审计
      维护状态
    

25.1 安全框架总体架构

一个典型的 Web 应用安全框架需要覆盖从请求入口到业务逻辑的多个层次:

        flowchart TB
    A[客户端请求] --> B[TLS 终止]
    B --> C[速率限制]
    C --> D[安全响应头]
    D --> E[CORS 检查]
    E --> F[认证 Authentication]
    F --> G[授权 Authorization]
    G --> H[输入验证]
    H --> I[业务逻辑]
    I --> J[输出编码/过滤]
    J --> K[审计日志]
    K --> L[响应返回]

    style F fill:#f96,stroke:#333,stroke-width:2px
    style G fill:#f96,stroke:#333,stroke-width:2px
    style H fill:#ff9,stroke:#333,stroke-width:2px
    

25.2 Python — FastAPI 安全实战

25.2.1 完整的认证授权中间件

from fastapi import FastAPI, Depends, HTTPException, Security
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
from jose import jwt, JWTError
from pydantic import BaseModel
from functools import wraps
from typing import Callable
import httpx

app = FastAPI()
security = HTTPBearer()

# 配置
JWKS_URL = "https://auth.example.com/.well-known/jwks.json"
ISSUER = "https://auth.example.com"
AUDIENCE = "my-api"

class User(BaseModel):
    id: str
    email: str | None = None
    roles: list[str] = []

# JWKS 缓存
_jwks_cache = None

async def get_jwks():
    global _jwks_cache
    if _jwks_cache is None:
        async with httpx.AsyncClient() as client:
            resp = await client.get(JWKS_URL)
            _jwks_cache = resp.json()
    return _jwks_cache

async def get_current_user(
    credentials: HTTPAuthorizationCredentials = Security(security)
) -> User:
    """从 JWT Token 中提取用户信息"""
    token = credentials.credentials
    try:
        jwks = await get_jwks()
        # 简化示例:实际应根据 kid 选择正确的密钥
        payload = jwt.decode(
            token,
            jwks["keys"][0],
            algorithms=["RS256"],
            audience=AUDIENCE,
            issuer=ISSUER,
        )
        return User(
            id=payload["sub"],
            email=payload.get("email"),
            roles=payload.get("roles", []),
        )
    except JWTError as e:
        raise HTTPException(status_code=401, detail=f"Invalid token: {e}")

def require_role(*roles: str):
    """角色授权装饰器"""
    async def role_checker(user: User = Depends(get_current_user)):
        if not any(role in user.roles for role in roles):
            raise HTTPException(
                status_code=403,
                detail=f"Requires one of roles: {roles}"
            )
        return user
    return role_checker

# API 端点
@app.get("/api/public")
async def public_endpoint():
    return {"message": "Public data"}

@app.get("/api/profile")
async def get_profile(user: User = Depends(get_current_user)):
    return {"user_id": user.id, "email": user.email}

@app.get("/api/admin/users")
async def list_users(user: User = Depends(require_role("admin"))):
    return {"users": [...], "requested_by": user.id}

@app.delete("/api/admin/users/{user_id}")
async def delete_user(
    user_id: str,
    user: User = Depends(require_role("admin", "superadmin"))
):
    return {"message": f"User {user_id} deleted by {user.id}"}

25.2.2 安全中间件

from fastapi.middleware.cors import CORSMiddleware
from fastapi.middleware.trustedhost import TrustedHostMiddleware
from starlette.middleware.httpsredirect import HTTPSRedirectMiddleware

# CORS
app.add_middleware(
    CORSMiddleware,
    allow_origins=["https://app.example.com"],  # 不要用 *
    allow_methods=["GET", "POST", "PUT", "DELETE"],
    allow_headers=["Authorization", "Content-Type"],
    allow_credentials=True,
    max_age=3600,
)

# 可信主机
app.add_middleware(
    TrustedHostMiddleware,
    allowed_hosts=["api.example.com", "*.example.com"]
)

# 安全响应头
from starlette.middleware.base import BaseHTTPMiddleware

class SecurityHeadersMiddleware(BaseHTTPMiddleware):
    async def dispatch(self, request, call_next):
        response = await call_next(request)
        response.headers["X-Content-Type-Options"] = "nosniff"
        response.headers["X-Frame-Options"] = "DENY"
        response.headers["X-XSS-Protection"] = "1; mode=block"
        response.headers["Strict-Transport-Security"] = (
            "max-age=31536000; includeSubDomains"
        )
        response.headers["Content-Security-Policy"] = "default-src 'self'"
        return response

app.add_middleware(SecurityHeadersMiddleware)

25.2.3 FastAPI + Authlib OIDC 集成

"""
FastAPI + Authlib 实现 OpenID Connect 登录流程。
依赖: pip install fastapi authlib httpx itsdangerous uvicorn
"""
from fastapi import FastAPI, Request, HTTPException
from fastapi.responses import RedirectResponse
from authlib.integrations.starlette_client import OAuth
from starlette.middleware.sessions import SessionMiddleware
import os

app = FastAPI(title="OIDC Demo")

# Session 中间件(用于存储 OAuth state)
app.add_middleware(
    SessionMiddleware,
    secret_key=os.environ.get("SESSION_SECRET", "change-me-in-production"),
    max_age=1800,  # 30 分钟
    https_only=True,
    same_site="lax",
)

# 配置 OAuth / OIDC 提供者
oauth = OAuth()
oauth.register(
    name="keycloak",
    client_id=os.environ.get("OIDC_CLIENT_ID", "my-app"),
    client_secret=os.environ.get("OIDC_CLIENT_SECRET", "secret"),
    server_metadata_url=(
        "https://auth.example.com/realms/myrealm/"
        ".well-known/openid-configuration"
    ),
    client_kwargs={"scope": "openid email profile"},
)


@app.get("/login")
async def login(request: Request):
    """重定向到 OIDC 提供者进行登录"""
    redirect_uri = request.url_for("auth_callback")
    return await oauth.keycloak.authorize_redirect(request, redirect_uri)


@app.get("/auth/callback")
async def auth_callback(request: Request):
    """OIDC 回调:交换 code 获取 token,提取用户信息"""
    try:
        token = await oauth.keycloak.authorize_access_token(request)
    except Exception as e:
        raise HTTPException(status_code=401, detail=f"OAuth error: {e}")

    userinfo = token.get("userinfo")
    if userinfo is None:
        # 如果 userinfo 不在 token 中,手动获取
        userinfo = await oauth.keycloak.userinfo(token=token)

    # 将用户信息存入 session
    request.session["user"] = {
        "sub": userinfo["sub"],
        "email": userinfo.get("email"),
        "name": userinfo.get("name"),
    }
    return RedirectResponse(url="/me")


@app.get("/me")
async def me(request: Request):
    """返回当前登录用户信息"""
    user = request.session.get("user")
    if not user:
        raise HTTPException(status_code=401, detail="Not logged in")
    return user


@app.get("/logout")
async def logout(request: Request):
    """清除 session 并重定向到 OIDC 提供者的登出端点"""
    request.session.clear()
    return RedirectResponse(
        url="https://auth.example.com/realms/myrealm/"
        "protocol/openid-connect/logout?redirect_uri=https://app.example.com"
    )

25.2.4 Rate Limiting(slowapi)

"""
基于 slowapi 的速率限制。
依赖: pip install fastapi slowapi
"""
from fastapi import FastAPI, Request, Depends
from slowapi import Limiter, _rate_limit_exceeded_handler
from slowapi.util import get_remote_address
from slowapi.errors import RateLimitExceeded
from slowapi.middleware import SlowAPIMiddleware

# 支持按用户 ID 限流(已认证)或按 IP 限流(未认证)
def get_rate_limit_key(request: Request) -> str:
    """优先使用用户 ID,否则使用客户端 IP"""
    user = getattr(request.state, "user", None)
    if user and hasattr(user, "id"):
        return f"user:{user.id}"
    return get_remote_address(request)

limiter = Limiter(
    key_func=get_rate_limit_key,
    default_limits=["200/hour"],          # 全局默认
    storage_uri="redis://localhost:6379",  # 生产环境用 Redis
)

app = FastAPI()
app.state.limiter = limiter
app.add_exception_handler(RateLimitExceeded, _rate_limit_exceeded_handler)
app.add_middleware(SlowAPIMiddleware)


@app.get("/api/search")
@limiter.limit("30/minute")
async def search(request: Request, q: str):
    """搜索接口:每分钟 30 次"""
    return {"results": [], "query": q}


@app.post("/api/login")
@limiter.limit("5/minute")
async def login(request: Request):
    """登录接口:更严格的限制"""
    return {"token": "..."}


@app.post("/api/password-reset")
@limiter.limit("3/hour")
async def password_reset(request: Request):
    """密码重置:每小时仅 3 次"""
    return {"message": "Reset email sent"}

25.2.5 输入验证与 SQL 注入防护

"""
使用 Pydantic 严格验证 + SQLAlchemy 参数化查询防止注入。
依赖: pip install fastapi pydantic sqlalchemy asyncpg
"""
from fastapi import FastAPI, HTTPException, Depends
from pydantic import BaseModel, Field, field_validator
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession
from sqlalchemy.ext.asyncio import async_sessionmaker
from sqlalchemy import text
from typing import Annotated
import re

app = FastAPI()

# ── Pydantic 模型:严格输入验证 ──

class CreateUserRequest(BaseModel):
    username: Annotated[str, Field(
        min_length=3, max_length=50,
        pattern=r"^[a-zA-Z0-9_]+$",
        description="仅允许字母、数字、下划线",
    )]
    email: Annotated[str, Field(max_length=255)]
    age: Annotated[int, Field(ge=0, le=150)]

    @field_validator("email")
    @classmethod
    def validate_email(cls, v: str) -> str:
        pattern = r"^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$"
        if not re.match(pattern, v):
            raise ValueError("Invalid email format")
        return v.lower()

    @field_validator("username")
    @classmethod
    def reject_injection(cls, v: str) -> str:
        """拒绝包含 SQL 注入特征的输入"""
        dangerous = ["'", '"', ";", "--", "/*", "*/", "DROP", "DELETE", "UPDATE"]
        for pat in dangerous:
            if pat.upper() in v.upper():
                raise ValueError("Invalid characters in username")
        return v


class SearchQuery(BaseModel):
    q: Annotated[str, Field(min_length=1, max_length=200)]
    page: Annotated[int, Field(ge=1, le=1000)] = 1
    size: Annotated[int, Field(ge=1, le=100)] = 20


# ── 数据库层:参数化查询 ──

DATABASE_URL = "postgresql+asyncpg://user:pass@localhost/mydb"
engine = create_async_engine(DATABASE_URL, echo=False)
SessionLocal = async_sessionmaker(engine, class_=AsyncSession)


async def get_db():
    async with SessionLocal() as session:
        yield session


@app.post("/api/users")
async def create_user(
    req: CreateUserRequest,
    db: AsyncSession = Depends(get_db),
):
    """创建用户 — Pydantic 已完成输入验证,SQLAlchemy 使用参数化查询"""
    # ✅ 安全:使用参数化查询,绝不拼接 SQL
    result = await db.execute(
        text("INSERT INTO users (username, email, age) VALUES (:u, :e, :a) RETURNING id"),
        {"u": req.username, "e": req.email, "a": req.age},
    )
    await db.commit()
    user_id = result.scalar_one()
    return {"id": user_id, "username": req.username}


@app.get("/api/users/search")
async def search_users(
    params: SearchQuery = Depends(),
    db: AsyncSession = Depends(get_db),
):
    """搜索用户 — 参数化 LIKE 查询"""
    # ✅ 安全:参数化 LIKE,% 由代码控制
    offset = (params.page - 1) * params.size
    result = await db.execute(
        text(
            "SELECT id, username, email FROM users "
            "WHERE username ILIKE :pattern "
            "ORDER BY id LIMIT :limit OFFSET :offset"
        ),
        {"pattern": f"%{params.q}%", "limit": params.size, "offset": offset},
    )
    rows = result.mappings().all()
    return {"results": [dict(r) for r in rows]}

25.3 Java — Spring Security 实战

Spring Security 过滤器链

        flowchart LR
    A[HTTP 请求] --> B[SecurityContextPersistenceFilter]
    B --> C[CorsFilter]
    C --> D[CsrfFilter]
    D --> E[LogoutFilter]
    E --> F[BearerTokenAuthenticationFilter]
    F --> G[AuthorizationFilter]
    G --> H[ExceptionTranslationFilter]
    H --> I[FilterSecurityInterceptor]
    I --> J[Controller]

    style F fill:#f96,stroke:#333,stroke-width:2px
    style G fill:#f96,stroke:#333,stroke-width:2px
    

25.3.1 完整的 SecurityFilterChain 配置(JWT + CORS + CSRF + 安全头)

package com.example.security.config;

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.http.HttpMethod;
import org.springframework.security.config.annotation.method.configuration.EnableMethodSecurity;
import org.springframework.security.config.annotation.web.builders.HttpSecurity;
import org.springframework.security.config.annotation.web.configuration.EnableWebSecurity;
import org.springframework.security.config.http.SessionCreationPolicy;
import org.springframework.security.oauth2.jwt.JwtDecoder;
import org.springframework.security.oauth2.jwt.NimbusJwtDecoder;
import org.springframework.security.oauth2.server.resource.authentication.JwtAuthenticationConverter;
import org.springframework.security.oauth2.server.resource.authentication.JwtGrantedAuthoritiesConverter;
import org.springframework.security.web.SecurityFilterChain;
import org.springframework.security.web.header.writers.ReferrerPolicyHeaderWriter;
import org.springframework.web.cors.CorsConfiguration;
import org.springframework.web.cors.CorsConfigurationSource;
import org.springframework.web.cors.UrlBasedCorsConfigurationSource;

import java.util.List;

@Configuration
@EnableWebSecurity
@EnableMethodSecurity  // 启用 @PreAuthorize, @Secured
public class SecurityConfig {

    /**
     * 主安全过滤器链:JWT 认证 + CORS + CSRF + 安全头
     */
    @Bean
    public SecurityFilterChain filterChain(HttpSecurity http) throws Exception {
        http
            // ── 会话管理:无状态 ──
            .sessionManagement(session ->
                session.sessionCreationPolicy(SessionCreationPolicy.STATELESS)
            )

            // ── CORS 配置 ──
            .cors(cors -> cors.configurationSource(corsConfigurationSource()))

            // ── CSRF:无状态 API 禁用,但保留对浏览器端点的保护 ──
            .csrf(csrf -> csrf.disable())

            // ── 安全响应头 ──
            .headers(headers -> headers
                .contentTypeOptions(cto -> {})                    // X-Content-Type-Options: nosniff
                .frameOptions(fo -> fo.deny())                    // X-Frame-Options: DENY
                .httpStrictTransportSecurity(hsts -> hsts
                    .includeSubDomains(true)
                    .maxAgeInSeconds(31536000)
                )
                .referrerPolicy(rp ->
                    rp.policy(ReferrerPolicyHeaderWriter.ReferrerPolicy.STRICT_ORIGIN_WHEN_CROSS_ORIGIN)
                )
                .permissionsPolicy(pp ->
                    pp.policy("camera=(), microphone=(), geolocation=()")
                )
            )

            // ── URL 级授权 ──
            .authorizeHttpRequests(auth -> auth
                .requestMatchers("/api/public/**", "/actuator/health").permitAll()
                .requestMatchers(HttpMethod.OPTIONS, "/**").permitAll()
                .requestMatchers("/api/admin/**").hasRole("ADMIN")
                .requestMatchers("/api/users/**").hasAnyRole("USER", "ADMIN")
                .anyRequest().authenticated()
            )

            // ── OAuth2 Resource Server(JWT) ──
            .oauth2ResourceServer(oauth2 -> oauth2
                .jwt(jwt -> jwt
                    .decoder(jwtDecoder())
                    .jwtAuthenticationConverter(jwtAuthenticationConverter())
                )
            );

        return http.build();
    }

    /**
     * JWT 解码器:从 JWKS 端点获取公钥
     */
    @Bean
    public JwtDecoder jwtDecoder() {
        return NimbusJwtDecoder
            .withJwkSetUri("https://auth.example.com/.well-known/jwks.json")
            .build();
    }

    /**
     * JWT → Spring Security Authority 映射
     * 将 JWT 中的 "roles" claim 映射为 ROLE_ 前缀的权限
     */
    @Bean
    public JwtAuthenticationConverter jwtAuthenticationConverter() {
        JwtGrantedAuthoritiesConverter grantedAuthoritiesConverter =
            new JwtGrantedAuthoritiesConverter();
        grantedAuthoritiesConverter.setAuthoritiesClaimName("roles");
        grantedAuthoritiesConverter.setAuthorityPrefix("ROLE_");

        JwtAuthenticationConverter converter = new JwtAuthenticationConverter();
        converter.setJwtGrantedAuthoritiesConverter(grantedAuthoritiesConverter);
        return converter;
    }

    /**
     * CORS 配置源
     */
    @Bean
    public CorsConfigurationSource corsConfigurationSource() {
        CorsConfiguration config = new CorsConfiguration();
        config.setAllowedOrigins(List.of("https://app.example.com"));
        config.setAllowedMethods(List.of("GET", "POST", "PUT", "DELETE", "OPTIONS"));
        config.setAllowedHeaders(List.of("Authorization", "Content-Type", "X-Request-ID"));
        config.setExposedHeaders(List.of("X-Request-ID"));
        config.setAllowCredentials(true);
        config.setMaxAge(3600L);

        UrlBasedCorsConfigurationSource source = new UrlBasedCorsConfigurationSource();
        source.registerCorsConfiguration("/api/**", config);
        return source;
    }
}

25.3.2 自定义 AuthenticationProvider

package com.example.security.provider;

import org.springframework.security.authentication.AuthenticationProvider;
import org.springframework.security.authentication.BadCredentialsException;
import org.springframework.security.authentication.UsernamePasswordAuthenticationToken;
import org.springframework.security.core.Authentication;
import org.springframework.security.core.AuthenticationException;
import org.springframework.security.core.authority.SimpleGrantedAuthority;
import org.springframework.security.crypto.password.PasswordEncoder;
import org.springframework.stereotype.Component;

import java.util.List;
import java.util.stream.Collectors;

/**
 * 自定义认证提供者:支持多因素认证(MFA)和账户锁定。
 */
@Component
public class CustomAuthenticationProvider implements AuthenticationProvider {

    private final UserRepository userRepository;
    private final PasswordEncoder passwordEncoder;
    private final LoginAttemptService loginAttemptService;

    public CustomAuthenticationProvider(
            UserRepository userRepository,
            PasswordEncoder passwordEncoder,
            LoginAttemptService loginAttemptService) {
        this.userRepository = userRepository;
        this.passwordEncoder = passwordEncoder;
        this.loginAttemptService = loginAttemptService;
    }

    @Override
    public Authentication authenticate(Authentication authentication)
            throws AuthenticationException {

        String username = authentication.getName();
        String password = authentication.getCredentials().toString();

        // 1. 检查账户是否被锁定(连续失败 5 次锁定 15 分钟)
        if (loginAttemptService.isBlocked(username)) {
            throw new BadCredentialsException(
                "Account temporarily locked due to too many failed attempts"
            );
        }

        // 2. 查找用户
        UserEntity user = userRepository.findByUsername(username)
            .orElseThrow(() -> {
                // 即使用户不存在也执行密码哈希,防止时序攻击
                passwordEncoder.encode(password);
                loginAttemptService.recordFailure(username);
                return new BadCredentialsException("Invalid credentials");
            });

        // 3. 验证密码
        if (!passwordEncoder.matches(password, user.getPasswordHash())) {
            loginAttemptService.recordFailure(username);
            throw new BadCredentialsException("Invalid credentials");
        }

        // 4. 检查账户状态
        if (!user.isEnabled()) {
            throw new BadCredentialsException("Account is disabled");
        }

        // 5. 登录成功,重置失败计数
        loginAttemptService.resetFailures(username);

        // 6. 构建认证令牌
        List<SimpleGrantedAuthority> authorities = user.getRoles().stream()
            .map(role -> new SimpleGrantedAuthority("ROLE_" + role))
            .collect(Collectors.toList());

        return new UsernamePasswordAuthenticationToken(
            username, null, authorities
        );
    }

    @Override
    public boolean supports(Class<?> authentication) {
        return UsernamePasswordAuthenticationToken.class
            .isAssignableFrom(authentication);
    }
}

25.3.3 Method-level Security(@PreAuthorize, @Secured)

package com.example.security.controller;

import org.springframework.security.access.annotation.Secured;
import org.springframework.security.access.prepost.PreAuthorize;
import org.springframework.security.core.annotation.AuthenticationPrincipal;
import org.springframework.security.oauth2.jwt.Jwt;
import org.springframework.web.bind.annotation.*;

@RestController
@RequestMapping("/api")
public class SecureController {

    /**
     * 简单角色检查:仅 ADMIN 可访问
     */
    @Secured("ROLE_ADMIN")
    @GetMapping("/admin/dashboard")
    public Map<String, Object> adminDashboard() {
        return Map.of("status", "ok", "role", "admin");
    }

    /**
     * SpEL 表达式:用户只能访问自己的数据,ADMIN 可访问所有
     */
    @PreAuthorize("#userId == authentication.name or hasRole('ADMIN')")
    @GetMapping("/users/{userId}/profile")
    public Map<String, Object> getUserProfile(
            @PathVariable String userId,
            @AuthenticationPrincipal Jwt jwt) {
        return Map.of(
            "userId", userId,
            "email", jwt.getClaimAsString("email"),
            "requestedBy", jwt.getSubject()
        );
    }

    /**
     * 多条件授权:需要特定角色 + 特定 scope
     */
    @PreAuthorize("hasRole('ADMIN') and hasAuthority('SCOPE_users:write')")
    @DeleteMapping("/users/{userId}")
    public Map<String, String> deleteUser(@PathVariable String userId) {
        // 执行删除逻辑...
        return Map.of("message", "User " + userId + " deleted");
    }

    /**
     * 自定义权限评估器:基于业务规则的授权
     */
    @PreAuthorize("@orderSecurity.canAccess(authentication, #orderId)")
    @GetMapping("/orders/{orderId}")
    public Map<String, Object> getOrder(@PathVariable String orderId) {
        // orderSecurity 是一个 Spring Bean,实现自定义授权逻辑
        return Map.of("orderId", orderId, "status", "shipped");
    }
}
package com.example.security.service;

import org.springframework.security.core.Authentication;
import org.springframework.stereotype.Component;

/**
 * 自定义安全评估器:在 @PreAuthorize 中通过 @orderSecurity 引用
 */
@Component("orderSecurity")
public class OrderSecurityEvaluator {

    private final OrderRepository orderRepository;

    public OrderSecurityEvaluator(OrderRepository orderRepository) {
        this.orderRepository = orderRepository;
    }

    /**
     * 检查当前用户是否有权访问指定订单
     */
    public boolean canAccess(Authentication auth, String orderId) {
        // ADMIN 可以访问所有订单
        if (auth.getAuthorities().stream()
                .anyMatch(a -> a.getAuthority().equals("ROLE_ADMIN"))) {
            return true;
        }
        // 普通用户只能访问自己的订单
        return orderRepository.findById(orderId)
            .map(order -> order.getUserId().equals(auth.getName()))
            .orElse(false);
    }
}

25.4 Go — Gin 安全框架实战

Go 安全中间件架构

        flowchart LR
    A[HTTP 请求] --> B[Recovery]
    B --> C[Logger]
    C --> D[RateLimiter]
    D --> E[SecurityHeaders]
    E --> F[CORS]
    F --> G[JWTAuth]
    G --> H[CasbinRBAC]
    H --> I[Handler]

    style G fill:#f96,stroke:#333,stroke-width:2px
    style H fill:#f96,stroke:#333,stroke-width:2px
    

25.4.1 JWT 认证中间件(完整实现)

// middleware/jwt.go
package middleware

import (
	"context"
	"errors"
	"net/http"
	"strings"
	"sync"
	"time"

	"github.com/gin-gonic/gin"
	"github.com/go-jose/go-jose/v3"
	"github.com/go-jose/go-jose/v3/jwt"
)

// Claims 定义 JWT 中的声明
type Claims struct {
	jwt.Claims
	Email string   `json:"email"`
	Roles []string `json:"roles"`
}

// JWTConfig 保存 JWT 验证配置
type JWTConfig struct {
	JWKSURL   string
	Issuer    string
	Audience  string
	jwks      *jose.JSONWebKeySet
	mu        sync.RWMutex
	lastFetch time.Time
	cacheTTL  time.Duration
}

// NewJWTConfig 创建 JWT 配置
func NewJWTConfig(jwksURL, issuer, audience string) *JWTConfig {
	return &JWTConfig{
		JWKSURL:  jwksURL,
		Issuer:   issuer,
		Audience: audience,
		cacheTTL: 1 * time.Hour,
	}
}

// fetchJWKS 获取并缓存 JWKS
func (c *JWTConfig) fetchJWKS() (*jose.JSONWebKeySet, error) {
	c.mu.RLock()
	if c.jwks != nil && time.Since(c.lastFetch) < c.cacheTTL {
		defer c.mu.RUnlock()
		return c.jwks, nil
	}
	c.mu.RUnlock()

	c.mu.Lock()
	defer c.mu.Unlock()

	// 双重检查
	if c.jwks != nil && time.Since(c.lastFetch) < c.cacheTTL {
		return c.jwks, nil
	}

	ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
	defer cancel()

	req, err := http.NewRequestWithContext(ctx, http.MethodGet, c.JWKSURL, nil)
	if err != nil {
		return nil, err
	}

	resp, err := http.DefaultClient.Do(req)
	if err != nil {
		return nil, err
	}
	defer resp.Body.Close()

	var jwks jose.JSONWebKeySet
	if err := json.NewDecoder(resp.Body).Decode(&jwks); err != nil {
		return nil, err
	}

	c.jwks = &jwks
	c.lastFetch = time.Now()
	return c.jwks, nil
}

// ValidateToken 验证 JWT 并返回 Claims
func (c *JWTConfig) ValidateToken(tokenStr string) (*Claims, error) {
	tok, err := jwt.ParseSigned(tokenStr)
	if err != nil {
		return nil, errors.New("malformed token")
	}

	jwks, err := c.fetchJWKS()
	if err != nil {
		return nil, errors.New("failed to fetch JWKS")
	}

	// 根据 token header 中的 kid 查找对应的密钥
	if len(tok.Headers) == 0 {
		return nil, errors.New("token has no headers")
	}
	kid := tok.Headers[0].KeyID
	keys := jwks.Key(kid)
	if len(keys) == 0 {
		return nil, errors.New("unknown signing key")
	}

	var claims Claims
	if err := tok.Claims(keys[0].Key, &claims); err != nil {
		return nil, errors.New("invalid token signature")
	}

	// 验证标准声明
	expected := jwt.Expected{
		Issuer:   c.Issuer,
		Audience: jwt.Audience{c.Audience},
		Time:     time.Now(),
	}
	if err := claims.Claims.Validate(expected); err != nil {
		return nil, err
	}

	return &claims, nil
}

// JWTAuthMiddleware 返回 Gin JWT 认证中间件
func JWTAuthMiddleware(config *JWTConfig) gin.HandlerFunc {
	return func(c *gin.Context) {
		authHeader := c.GetHeader("Authorization")
		if authHeader == "" {
			c.AbortWithStatusJSON(http.StatusUnauthorized, gin.H{
				"error": "missing Authorization header",
			})
			return
		}

		parts := strings.SplitN(authHeader, " ", 2)
		if len(parts) != 2 || !strings.EqualFold(parts[0], "bearer") {
			c.AbortWithStatusJSON(http.StatusUnauthorized, gin.H{
				"error": "invalid Authorization header format",
			})
			return
		}

		claims, err := config.ValidateToken(parts[1])
		if err != nil {
			c.AbortWithStatusJSON(http.StatusUnauthorized, gin.H{
				"error": "invalid token: " + err.Error(),
			})
			return
		}

		// 将用户信息存入上下文
		c.Set("userID", claims.Subject)
		c.Set("email", claims.Email)
		c.Set("roles", claims.Roles)
		c.Next()
	}
}

25.4.2 Casbin RBAC 授权中间件

// middleware/casbin.go
package middleware

import (
	"net/http"

	"github.com/casbin/casbin/v2"
	"github.com/gin-gonic/gin"
)

// CasbinMiddleware 基于 Casbin 的 RBAC 授权中间件
func CasbinMiddleware(enforcer *casbin.Enforcer) gin.HandlerFunc {
	return func(c *gin.Context) {
		// 从 JWT 中间件设置的上下文中获取用户角色
		rolesVal, exists := c.Get("roles")
		if !exists {
			c.AbortWithStatusJSON(http.StatusForbidden, gin.H{
				"error": "no roles found in context",
			})
			return
		}

		roles, ok := rolesVal.([]string)
		if !ok || len(roles) == 0 {
			c.AbortWithStatusJSON(http.StatusForbidden, gin.H{
				"error": "invalid roles",
			})
			return
		}

		obj := c.Request.URL.Path  // 资源
		act := c.Request.Method    // 操作

		// 检查任一角色是否有权限
		allowed := false
		for _, role := range roles {
			ok, err := enforcer.Enforce(role, obj, act)
			if err != nil {
				c.AbortWithStatusJSON(http.StatusInternalServerError, gin.H{
					"error": "authorization check failed",
				})
				return
			}
			if ok {
				allowed = true
				break
			}
		}

		if !allowed {
			c.AbortWithStatusJSON(http.StatusForbidden, gin.H{
				"error": "access denied",
			})
			return
		}

		c.Next()
	}
}

Casbin 策略模型文件 (model.conf):

[request_definition]
r = sub, obj, act

[policy_definition]
p = sub, obj, act

[role_definition]
g = _, _

[policy_effect]
e = some(where (p.eft == allow))

[matchers]
m = g(r.sub, p.sub) && keyMatch2(r.obj, p.obj) && r.act == p.act

Casbin 策略文件 (policy.csv):

p, admin, /api/admin/*, GET
p, admin, /api/admin/*, POST
p, admin, /api/admin/*, DELETE
p, user, /api/users/:id, GET
p, user, /api/users/:id, PUT
p, user, /api/orders/*, GET
p, user, /api/orders, POST

g, superadmin, admin

25.4.3 安全响应头中间件

// middleware/headers.go
package middleware

import "github.com/gin-gonic/gin"

// SecurityHeadersMiddleware 添加安全响应头
func SecurityHeadersMiddleware() gin.HandlerFunc {
	return func(c *gin.Context) {
		h := c.Writer.Header()

		// 防止 MIME 类型嗅探
		h.Set("X-Content-Type-Options", "nosniff")
		// 防止点击劫持
		h.Set("X-Frame-Options", "DENY")
		// 启用 XSS 过滤器
		h.Set("X-XSS-Protection", "1; mode=block")
		// 强制 HTTPS
		h.Set("Strict-Transport-Security",
			"max-age=31536000; includeSubDomains; preload")
		// 内容安全策略
		h.Set("Content-Security-Policy", "default-src 'self'")
		// Referrer 策略
		h.Set("Referrer-Policy", "strict-origin-when-cross-origin")
		// 权限策略
		h.Set("Permissions-Policy",
			"camera=(), microphone=(), geolocation=()")
		// 禁止缓存敏感数据
		h.Set("Cache-Control", "no-store, no-cache, must-revalidate")
		h.Set("Pragma", "no-cache")

		c.Next()
	}
}

25.4.4 Rate Limiting 中间件

// middleware/ratelimit.go
package middleware

import (
	"net/http"
	"sync"
	"time"

	"github.com/gin-gonic/gin"
	"golang.org/x/time/rate"
)

// RateLimiterConfig 速率限制配置
type RateLimiterConfig struct {
	Rate      rate.Limit    // 每秒允许的请求数
	Burst     int           // 突发容量
	CleanupTTL time.Duration // 清理不活跃客户端的间隔
}

// clientLimiter 每个客户端的限流器
type clientLimiter struct {
	limiter  *rate.Limiter
	lastSeen time.Time
}

// RateLimiter 基于 Token Bucket 的分布式速率限制器
type RateLimiter struct {
	clients map[string]*clientLimiter
	mu      sync.Mutex
	config  RateLimiterConfig
}

// NewRateLimiter 创建速率限制器
func NewRateLimiter(config RateLimiterConfig) *RateLimiter {
	rl := &RateLimiter{
		clients: make(map[string]*clientLimiter),
		config:  config,
	}
	// 后台清理过期的客户端限流器
	go rl.cleanup()
	return rl
}

// cleanup 定期清理不活跃的客户端
func (rl *RateLimiter) cleanup() {
	ticker := time.NewTicker(rl.config.CleanupTTL)
	defer ticker.Stop()
	for range ticker.C {
		rl.mu.Lock()
		for key, cl := range rl.clients {
			if time.Since(cl.lastSeen) > rl.config.CleanupTTL {
				delete(rl.clients, key)
			}
		}
		rl.mu.Unlock()
	}
}

// getLimiter 获取或创建客户端限流器
func (rl *RateLimiter) getLimiter(key string) *rate.Limiter {
	rl.mu.Lock()
	defer rl.mu.Unlock()

	cl, exists := rl.clients[key]
	if !exists {
		cl = &clientLimiter{
			limiter: rate.NewLimiter(rl.config.Rate, rl.config.Burst),
		}
		rl.clients[key] = cl
	}
	cl.lastSeen = time.Now()
	return cl.limiter
}

// Middleware 返回 Gin 速率限制中间件
func (rl *RateLimiter) Middleware() gin.HandlerFunc {
	return func(c *gin.Context) {
		// 优先使用用户 ID,否则使用 IP
		key := c.ClientIP()
		if userID, exists := c.Get("userID"); exists {
			key = userID.(string)
		}

		limiter := rl.getLimiter(key)
		if !limiter.Allow() {
			c.AbortWithStatusJSON(http.StatusTooManyRequests, gin.H{
				"error":       "rate limit exceeded",
				"retry_after": "1s",
			})
			return
		}
		c.Next()
	}
}

25.4.5 组装完整的 Gin 安全应用

// main.go
package main

import (
	"log"
	"net/http"

	"github.com/casbin/casbin/v2"
	"github.com/gin-gonic/gin"
	"golang.org/x/time/rate"

	"myapp/middleware"
)

func main() {
	// ── 初始化组件 ──
	jwtConfig := middleware.NewJWTConfig(
		"https://auth.example.com/.well-known/jwks.json",
		"https://auth.example.com",
		"my-api",
	)

	enforcer, err := casbin.NewEnforcer("model.conf", "policy.csv")
	if err != nil {
		log.Fatalf("Failed to create Casbin enforcer: %v", err)
	}

	rateLimiter := middleware.NewRateLimiter(middleware.RateLimiterConfig{
		Rate:       10,                    // 每秒 10 个请求
		Burst:      20,                    // 突发 20 个
		CleanupTTL: 5 * time.Minute,
	})

	// ── 创建 Gin 引擎 ──
	r := gin.New()
	r.Use(gin.Recovery())
	r.Use(gin.Logger())

	// 全局中间件
	r.Use(middleware.SecurityHeadersMiddleware())
	r.Use(rateLimiter.Middleware())

	// ── 公开路由 ──
	r.GET("/api/public/health", func(c *gin.Context) {
		c.JSON(http.StatusOK, gin.H{"status": "ok"})
	})

	// ── 需要认证的路由 ──
	auth := r.Group("/api")
	auth.Use(middleware.JWTAuthMiddleware(jwtConfig))
	auth.Use(middleware.CasbinMiddleware(enforcer))
	{
		auth.GET("/users/:id", getUser)
		auth.PUT("/users/:id", updateUser)
		auth.GET("/orders", listOrders)
		auth.POST("/orders", createOrder)
	}

	// ── 管理员路由 ──
	admin := r.Group("/api/admin")
	admin.Use(middleware.JWTAuthMiddleware(jwtConfig))
	admin.Use(middleware.CasbinMiddleware(enforcer))
	{
		admin.GET("/users", listAllUsers)
		admin.DELETE("/users/:id", deleteUser)
	}

	// ── 启动 HTTPS 服务 ──
	log.Fatal(r.RunTLS(":8443", "server.crt", "server.key"))
}

func getUser(c *gin.Context) {
	userID := c.Param("id")
	c.JSON(http.StatusOK, gin.H{"user_id": userID})
}

func updateUser(c *gin.Context)   { /* ... */ }
func listOrders(c *gin.Context)   { /* ... */ }
func createOrder(c *gin.Context)  { /* ... */ }
func listAllUsers(c *gin.Context) { /* ... */ }
func deleteUser(c *gin.Context)   { /* ... */ }

25.5 安全框架对比矩阵

功能

Spring Security (Java)

Gin + Casbin (Go)

FastAPI Security (Python)

Express + Passport (Node.js)

认证

JWT 验证

✅ 内置 OAuth2 Resource Server

✅ go-jose

✅ python-jose / PyJWT

✅ passport-jwt

OAuth2 / OIDC

✅ 内置客户端 + 资源服务器

⚠️ 需第三方库 (coreos/go-oidc)

✅ Authlib

✅ passport-openidconnect

Session 管理

✅ 内置 Spring Session

⚠️ gin-contrib/sessions

✅ Starlette SessionMiddleware

✅ express-session

MFA / TOTP

⚠️ 需扩展

⚠️ pquerna/otp

⚠️ pyotp

⚠️ speakeasy

授权

RBAC

✅ @Secured, hasRole()

✅ Casbin 内置

⚠️ 手动实现 / Depends

⚠️ 手动实现

ABAC

✅ @PreAuthorize SpEL

✅ Casbin ABAC 模型

⚠️ 手动实现

⚠️ casl

方法级安全

✅ @PreAuthorize, @Secured

❌ 无(路由级)

⚠️ Depends 装饰器

❌ 无(路由级)

动态策略

✅ 数据库 + SpEL

✅ Casbin Adapter

⚠️ 手动实现

⚠️ 手动实现

传输安全

CORS

✅ 内置

✅ gin-contrib/cors

✅ CORSMiddleware

✅ cors 包

CSRF

✅ 内置

⚠️ 需手动实现

⚠️ Starlette CSRFMiddleware

✅ csurf

安全头

✅ 内置 headers()

⚠️ 手动中间件

⚠️ 手动中间件

✅ helmet

HTTPS 强制

✅ requiresSecure()

✅ RunTLS

✅ HTTPSRedirectMiddleware

⚠️ 手动 / 反向代理

输入验证

请求体验证

✅ Bean Validation (JSR 380)

⚠️ go-playground/validator

✅ Pydantic

⚠️ joi / zod

SQL 注入防护

✅ JPA 参数化查询

✅ database/sql 占位符

✅ SQLAlchemy 参数化

✅ Knex / Prisma

速率限制

内置支持

⚠️ 需 Bucket4j / Resilience4j

⚠️ 手动 / ulule/limiter

✅ slowapi

⚠️ express-rate-limit

分布式限流

✅ Redis + Bucket4j

✅ Redis + 自定义

✅ Redis + slowapi

✅ rate-limit-redis

审计与监控

审计日志

✅ Spring Actuator + AOP

⚠️ 手动中间件

⚠️ 手动中间件

⚠️ morgan + 手动

安全事件

✅ ApplicationEvent

⚠️ 手动实现

⚠️ 手动实现

⚠️ 手动实现

生态成熟度

⭐⭐⭐⭐⭐

⭐⭐⭐

⭐⭐⭐⭐

⭐⭐⭐⭐

学习曲线

陡峭

中等

平缓

中等

性能

极高

图例: ✅ 内置/一流支持 · ⚠️ 需第三方库或手动实现 · ❌ 不支持


25.6 安全库选型原则

原则

说明

检查项

社区活跃度

活跃的社区意味着快速的漏洞修复

GitHub Stars、Issue 响应时间

安全审计

经过专业安全审计的库更可信

审计报告、CVE 历史

维护状态

持续维护的库才能及时修复漏洞

最近提交时间、发布频率

依赖链

依赖越少,攻击面越小

依赖数量、传递依赖

标准合规

遵循标准的库互操作性更好

RFC 合规、FIPS 认证


25.7 小结

  • FastAPI 通过 Depends 和 Security 提供灵活的认证授权机制,配合 Authlib 可快速集成 OIDC

  • Spring Security 是 Java 生态最成熟的安全框架,提供从认证到方法级授权的全栈方案

  • Gin + Casbin + go-jose 是 Go 生态的主流安全组合,性能极高,策略灵活

  • 选择安全库时优先考虑:社区活跃度、安全审计、维护状态

  • 不要自己实现加密算法,使用经过审计的库

  • 安全中间件应按正确顺序组装:限流 → 安全头 → CORS → 认证 → 授权 → 业务逻辑