第十二章:API 认证模式

“在微服务时代,API 认证不再是可选项,而是每个请求的必经之路。”

        mindmap
  root((API 认证))
    认证方式
      API Key
      Bearer Token
      mTLS
      HMAC 签名
    Gateway 模式
      集中认证
      分布式认证
      Token 内省
    服务间认证
      Service Account
      Service Mesh
      SPIFFE/SPIRE
    最佳实践
      生命周期管理
      速率限制
      审计日志
    

12.1 API 认证的特殊性

API 认证与传统 Web 认证有本质区别:

维度

Web 认证

API 认证

交互方式

用户交互(浏览器)

程序化(HTTP 客户端)

状态

有状态(Session)

无状态(Token)

凭证类型

Cookie

Authorization Header

调用者

人类

人类 + 机器

频率

低(页面级)

高(API 级)

API 认证方式选型决策树

        flowchart TD
    A[选择 API 认证方式] --> B{调用者是谁?}
    B -->|服务间通信| C{网络环境?}
    B -->|第三方开发者| D{安全要求?}
    B -->|终端用户 App| E[OAuth2 Bearer Token]

    C -->|同一集群/Service Mesh| F[mTLS]
    C -->|跨网络/跨云| G{需要请求签名?}
    G -->|是| H[HMAC 签名]
    G -->|否| I[mTLS 或 Bearer Token]

    D -->|低:公开数据| J[API Key]
    D -->|中:用户数据| E
    D -->|高:支付/金融| K[HMAC 签名 + mTLS]
    

12.2 API Key 认证

最简单的 API 认证方式,适合低安全要求的场景:

API Key 认证流程

        sequenceDiagram
    participant C as 客户端
    participant G as API Gateway
    participant S as 后端服务
    participant DB as Key 存储

    C->>G: GET /api/data<br/>X-API-Key: key-abc123
    G->>DB: 查询 API Key
    alt Key 有效
        DB->>G: 返回 Key 信息(权限、配额)
        G->>G: 检查速率限制
        G->>S: 转发请求 + 注入客户端信息
        S->>G: 响应数据
        G->>C: 200 OK + 数据
    else Key 无效或过期
        DB->>G: Key 不存在
        G->>C: 401 Unauthorized
    end
    

Python: FastAPI API Key 中间件

from fastapi import FastAPI, Header, Query, HTTPException, Depends, Request
from fastapi.security import APIKeyHeader, APIKeyQuery
from typing import Optional
from datetime import datetime
import hashlib
import secrets

app = FastAPI(title="API Key Auth Demo")

# --- API Key 存储(生产环境应使用数据库) ---
API_KEYS = {
    # 存储 Key 的哈希值,而非明文
    hashlib.sha256(b"sk-live-abc123def456").hexdigest(): {
        "name": "Service A",
        "permissions": ["read"],
        "rate_limit": 1000,
        "created_at": "2025-01-01",
    },
    hashlib.sha256(b"sk-live-xyz789ghi012").hexdigest(): {
        "name": "Service B",
        "permissions": ["read", "write"],
        "rate_limit": 5000,
        "created_at": "2025-06-01",
    },
}

# 支持两种传递方式:Header 和 Query
api_key_header = APIKeyHeader(name="X-API-Key", auto_error=False)
api_key_query = APIKeyQuery(name="api_key", auto_error=False)

async def verify_api_key(
    header_key: Optional[str] = Depends(api_key_header),
    query_key: Optional[str] = Depends(api_key_query),
) -> dict:
    """验证 API Key(支持 Header 和 Query 两种方式)"""
    api_key = header_key or query_key
    if not api_key:
        raise HTTPException(
            status_code=401,
            detail="缺少 API Key,请通过 X-API-Key Header 或 api_key Query 参数传递",
        )

    # 对比哈希值(避免明文存储)
    key_hash = hashlib.sha256(api_key.encode()).hexdigest()
    client = API_KEYS.get(key_hash)
    if not client:
        raise HTTPException(status_code=401, detail="无效的 API Key")

    return client

@app.get("/api/data")
async def get_data(client: dict = Depends(verify_api_key)):
    if "read" not in client["permissions"]:
        raise HTTPException(status_code=403, detail="权限不足")
    return {"data": "sensitive information", "client": client["name"]}

@app.post("/api/data")
async def create_data(client: dict = Depends(verify_api_key)):
    if "write" not in client["permissions"]:
        raise HTTPException(status_code=403, detail="权限不足:需要 write 权限")
    return {"message": "数据已创建", "client": client["name"]}

# --- API Key 生成工具 ---
def generate_api_key(prefix: str = "sk-live") -> str:
    """生成安全的 API Key"""
    random_part = secrets.token_urlsafe(32)
    return f"{prefix}-{random_part}"

Java: Spring Boot API Key Filter

import jakarta.servlet.*;
import jakarta.servlet.http.HttpServletRequest;
import jakarta.servlet.http.HttpServletResponse;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
import org.springframework.boot.web.servlet.FilterRegistrationBean;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.security.MessageDigest;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;

/**
 * API Key 认证 Filter
 */
@Component
public class ApiKeyFilter implements Filter {

    private static final String API_KEY_HEADER = "X-API-Key";
    private static final String API_KEY_QUERY  = "api_key";

    // 存储 Key 哈希 → 客户端信息
    private final Map<String, ApiClient> apiKeys = new ConcurrentHashMap<>();

    public ApiKeyFilter() {
        // 初始化(生产环境从数据库加载)
        apiKeys.put(sha256("sk-live-abc123def456"),
                new ApiClient("Service A", Set.of("read")));
        apiKeys.put(sha256("sk-live-xyz789ghi012"),
                new ApiClient("Service B", Set.of("read", "write")));
    }

    @Override
    public void doFilter(ServletRequest req, ServletResponse res,
                         FilterChain chain) throws IOException, ServletException {
        HttpServletRequest request = (HttpServletRequest) req;
        HttpServletResponse response = (HttpServletResponse) res;

        // 从 Header 或 Query 获取 API Key
        String apiKey = request.getHeader(API_KEY_HEADER);
        if (apiKey == null || apiKey.isBlank()) {
            apiKey = request.getParameter(API_KEY_QUERY);
        }

        if (apiKey == null || apiKey.isBlank()) {
            response.setStatus(401);
            response.setContentType("application/json");
            response.getWriter().write("{\"error\":\"缺少 API Key\"}");
            return;
        }

        ApiClient client = apiKeys.get(sha256(apiKey));
        if (client == null) {
            response.setStatus(401);
            response.setContentType("application/json");
            response.getWriter().write("{\"error\":\"无效的 API Key\"}");
            return;
        }

        // 将客户端信息注入请求属性
        request.setAttribute("api_client", client);
        chain.doFilter(request, response);
    }

    private String sha256(String input) {
        try {
            MessageDigest digest = MessageDigest.getInstance("SHA-256");
            byte[] hash = digest.digest(input.getBytes(StandardCharsets.UTF_8));
            StringBuilder hex = new StringBuilder();
            for (byte b : hash) hex.append(String.format("%02x", b));
            return hex.toString();
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    public record ApiClient(String name, Set<String> permissions) {}
}

/**
 * Filter 注册配置
 */
@Configuration
public class FilterConfig {
    @Bean
    public FilterRegistrationBean<ApiKeyFilter> apiKeyFilterRegistration(
            ApiKeyFilter filter) {
        FilterRegistrationBean<ApiKeyFilter> reg = new FilterRegistrationBean<>();
        reg.setFilter(filter);
        reg.addUrlPatterns("/api/*");
        reg.setOrder(1);
        return reg;
    }
}

Go: Gin API Key 中间件

package middleware

import (
	"crypto/sha256"
	"encoding/hex"
	"net/http"
	"sync"

	"github.com/gin-gonic/gin"
)

// ApiClient API 客户端信息
type ApiClient struct {
	Name        string
	Permissions map[string]bool
}

// ApiKeyStore API Key 存储
type ApiKeyStore struct {
	mu   sync.RWMutex
	keys map[string]*ApiClient // sha256(key) -> client
}

func NewApiKeyStore() *ApiKeyStore {
	store := &ApiKeyStore{keys: make(map[string]*ApiClient)}
	// 初始化(生产环境从数据库加载)
	store.Register("sk-live-abc123def456", &ApiClient{
		Name:        "Service A",
		Permissions: map[string]bool{"read": true},
	})
	store.Register("sk-live-xyz789ghi012", &ApiClient{
		Name:        "Service B",
		Permissions: map[string]bool{"read": true, "write": true},
	})
	return store
}

func (s *ApiKeyStore) Register(key string, client *ApiClient) {
	hash := sha256Hex(key)
	s.mu.Lock()
	defer s.mu.Unlock()
	s.keys[hash] = client
}

func (s *ApiKeyStore) Lookup(key string) (*ApiClient, bool) {
	hash := sha256Hex(key)
	s.mu.RLock()
	defer s.mu.RUnlock()
	client, ok := s.keys[hash]
	return client, ok
}

func sha256Hex(s string) string {
	h := sha256.Sum256([]byte(s))
	return hex.EncodeToString(h[:])
}

// ApiKeyAuth Gin 中间件:API Key 认证
func ApiKeyAuth(store *ApiKeyStore) gin.HandlerFunc {
	return func(c *gin.Context) {
		// 从 Header 或 Query 获取 API Key
		apiKey := c.GetHeader("X-API-Key")
		if apiKey == "" {
			apiKey = c.Query("api_key")
		}

		if apiKey == "" {
			c.AbortWithStatusJSON(http.StatusUnauthorized, gin.H{
				"error": "缺少 API Key",
			})
			return
		}

		client, ok := store.Lookup(apiKey)
		if !ok {
			c.AbortWithStatusJSON(http.StatusUnauthorized, gin.H{
				"error": "无效的 API Key",
			})
			return
		}

		// 注入客户端信息
		c.Set("api_client", client)
		c.Set("client_name", client.Name)
		c.Next()
	}
}

// RequirePermission 权限检查中间件
func RequirePermission(perm string) gin.HandlerFunc {
	return func(c *gin.Context) {
		client, exists := c.Get("api_client")
		if !exists {
			c.AbortWithStatusJSON(http.StatusUnauthorized, gin.H{
				"error": "未认证",
			})
			return
		}

		apiClient := client.(*ApiClient)
		if !apiClient.Permissions[perm] {
			c.AbortWithStatusJSON(http.StatusForbidden, gin.H{
				"error": "权限不足:需要 " + perm + " 权限",
			})
			return
		}
		c.Next()
	}
}

API Key 的问题

  • ❌ 无过期时间(除非手动管理)

  • ❌ 无法携带用户身份信息

  • ❌ 容易泄露(日志、URL、代码仓库)

  • ❌ 无标准的轮换机制

12.3 Bearer Token 认证

使用 OAuth2 Access Token,是最推荐的 API 认证方式:

OAuth2 Bearer Token 认证流程

        sequenceDiagram
    participant C as 客户端
    participant AS as 授权服务器
    participant RS as 资源服务器 (API)
    participant JWKS as JWKS 端点

    Note over C,AS: 获取 Token
    C->>AS: POST /oauth/token<br/>{grant_type, client_id, client_secret}
    AS->>AS: 验证客户端凭证
    AS->>C: {access_token: "eyJ...", expires_in: 3600}

    Note over C,RS: 使用 Token 访问 API
    C->>RS: GET /api/resource<br/>Authorization: Bearer eyJ...
    RS->>RS: 解析 JWT Header
    RS->>JWKS: 获取公钥(缓存)
    JWKS->>RS: 返回 JWK Set
    RS->>RS: 验证签名 + 过期时间 + audience + issuer
    alt Token 有效
        RS->>C: 200 OK + 资源数据
    else Token 无效/过期
        RS->>C: 401 Unauthorized
    end
    

Python: FastAPI Bearer Token 验证

from fastapi import FastAPI, Depends, HTTPException, Security
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
import jwt
import httpx
from functools import lru_cache
from typing import Optional
from datetime import datetime

app = FastAPI(title="Bearer Token Auth Demo")
security = HTTPBearer()

# --- 配置 ---
AUTH_SERVER = "https://auth.example.com"
JWKS_URL = f"{AUTH_SERVER}/.well-known/jwks.json"
EXPECTED_AUDIENCE = "my-api"
EXPECTED_ISSUER = AUTH_SERVER

# --- JWKS 公钥缓存 ---
_jwks_cache: Optional[dict] = None
_jwks_cache_time: float = 0

async def get_jwks() -> dict:
    """获取并缓存 JWKS 公钥集"""
    global _jwks_cache, _jwks_cache_time
    import time
    now = time.time()
    # 缓存 1 小时
    if _jwks_cache and (now - _jwks_cache_time) < 3600:
        return _jwks_cache

    async with httpx.AsyncClient() as client:
        resp = await client.get(JWKS_URL)
        resp.raise_for_status()
        _jwks_cache = resp.json()
        _jwks_cache_time = now
        return _jwks_cache

def get_public_key(token: str, jwks: dict):
    """从 JWKS 中找到匹配的公钥"""
    header = jwt.get_unverified_header(token)
    kid = header.get("kid")
    for key in jwks.get("keys", []):
        if key.get("kid") == kid:
            return jwt.algorithms.RSAAlgorithm.from_jwk(key)
    raise ValueError(f"找不到 kid={kid} 对应的公钥")

async def verify_bearer_token(
    credentials: HTTPAuthorizationCredentials = Security(security),
) -> dict:
    """验证 Bearer Token 并返回 payload"""
    token = credentials.credentials
    try:
        jwks = await get_jwks()
        public_key = get_public_key(token, jwks)

        payload = jwt.decode(
            token,
            public_key,
            algorithms=["RS256"],
            audience=EXPECTED_AUDIENCE,
            issuer=EXPECTED_ISSUER,
            options={"require": ["exp", "iss", "aud", "sub"]},
        )
        return payload
    except jwt.ExpiredSignatureError:
        raise HTTPException(status_code=401, detail="Token 已过期")
    except jwt.InvalidAudienceError:
        raise HTTPException(status_code=401, detail="Token audience 不匹配")
    except jwt.InvalidIssuerError:
        raise HTTPException(status_code=401, detail="Token issuer 不匹配")
    except jwt.InvalidTokenError as e:
        raise HTTPException(status_code=401, detail=f"无效的 Token: {e}")

# --- 权限检查 ---
def require_scope(required: str):
    """检查 Token 是否包含所需的 scope"""
    async def checker(payload: dict = Depends(verify_bearer_token)):
        scopes = payload.get("scope", "").split()
        if required not in scopes:
            raise HTTPException(
                status_code=403,
                detail=f"需要 scope: {required}",
            )
        return payload
    return checker

@app.get("/api/profile")
async def get_profile(user: dict = Depends(verify_bearer_token)):
    return {
        "user_id": user["sub"],
        "roles": user.get("roles", []),
        "scopes": user.get("scope", "").split(),
    }

@app.get("/api/admin")
async def admin_endpoint(user: dict = Depends(require_scope("admin"))):
    return {"message": "管理员接口", "user_id": user["sub"]}

Java: Spring Resource Server

// application.yml
// spring:
//   security:
//     oauth2:
//       resourceserver:
//         jwt:
//           issuer-uri: https://auth.example.com
//           audiences: my-api

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.security.config.annotation.method.configuration.EnableMethodSecurity;
import org.springframework.security.config.annotation.web.builders.HttpSecurity;
import org.springframework.security.config.annotation.web.configuration.EnableWebSecurity;
import org.springframework.security.config.http.SessionCreationPolicy;
import org.springframework.security.oauth2.server.resource.authentication.JwtAuthenticationConverter;
import org.springframework.security.oauth2.server.resource.authentication.JwtGrantedAuthoritiesConverter;
import org.springframework.security.web.SecurityFilterChain;

@Configuration
@EnableWebSecurity
@EnableMethodSecurity
public class ResourceServerConfig {

    @Bean
    public SecurityFilterChain securityFilterChain(HttpSecurity http) throws Exception {
        http
            .sessionManagement(s -> s.sessionCreationPolicy(SessionCreationPolicy.STATELESS))
            .authorizeHttpRequests(auth -> auth
                .requestMatchers("/api/public/**").permitAll()
                .requestMatchers("/api/admin/**").hasAuthority("SCOPE_admin")
                .requestMatchers("/api/**").authenticated()
            )
            .oauth2ResourceServer(oauth2 -> oauth2
                .jwt(jwt -> jwt.jwtAuthenticationConverter(jwtAuthConverter()))
            );
        return http.build();
    }

    @Bean
    public JwtAuthenticationConverter jwtAuthConverter() {
        JwtGrantedAuthoritiesConverter grantedAuthorities =
                new JwtGrantedAuthoritiesConverter();
        // 从 JWT 的 "scope" claim 提取权限
        grantedAuthorities.setAuthoritiesClaimName("scope");
        grantedAuthorities.setAuthorityPrefix("SCOPE_");

        JwtAuthenticationConverter converter = new JwtAuthenticationConverter();
        converter.setJwtGrantedAuthoritiesConverter(grantedAuthorities);
        return converter;
    }
}

/**
 * API 控制器
 */
@RestController
@RequestMapping("/api")
public class ApiController {

    @GetMapping("/profile")
    public Map<String, Object> getProfile(
            @AuthenticationPrincipal Jwt jwt) {
        return Map.of(
                "user_id", jwt.getSubject(),
                "claims", jwt.getClaims(),
                "scopes", jwt.getClaimAsString("scope")
        );
    }

    @GetMapping("/admin")
    @PreAuthorize("hasAuthority('SCOPE_admin')")
    public Map<String, Object> adminEndpoint(
            @AuthenticationPrincipal Jwt jwt) {
        return Map.of(
                "message", "管理员接口",
                "user_id", jwt.getSubject()
        );
    }
}

Go: Gin JWT Bearer 中间件

package middleware

import (
	"context"
	"encoding/json"
	"fmt"
	"net/http"
	"strings"
	"sync"
	"time"

	"github.com/gin-gonic/gin"
	"github.com/golang-jwt/jwt/v5"
	"github.com/lestrrat-go/jwx/v2/jwk"
)

// BearerConfig Bearer Token 验证配置
type BearerConfig struct {
	JwksURL  string
	Issuer   string
	Audience string
}

// JWKSCache JWKS 公钥缓存
type JWKSCache struct {
	mu      sync.RWMutex
	keySet  jwk.Set
	fetched time.Time
	ttl     time.Duration
	url     string
}

func NewJWKSCache(url string, ttl time.Duration) *JWKSCache {
	return &JWKSCache{url: url, ttl: ttl}
}

func (c *JWKSCache) GetKeySet(ctx context.Context) (jwk.Set, error) {
	c.mu.RLock()
	if c.keySet != nil && time.Since(c.fetched) < c.ttl {
		defer c.mu.RUnlock()
		return c.keySet, nil
	}
	c.mu.RUnlock()

	c.mu.Lock()
	defer c.mu.Unlock()

	set, err := jwk.Fetch(ctx, c.url)
	if err != nil {
		return nil, fmt.Errorf("获取 JWKS 失败: %w", err)
	}
	c.keySet = set
	c.fetched = time.Now()
	return set, nil
}

// BearerAuth Gin 中间件:Bearer Token 认证
func BearerAuth(cfg BearerConfig) gin.HandlerFunc {
	cache := NewJWKSCache(cfg.JwksURL, 1*time.Hour)

	return func(c *gin.Context) {
		// 提取 Bearer Token
		authHeader := c.GetHeader("Authorization")
		if !strings.HasPrefix(authHeader, "Bearer ") {
			c.AbortWithStatusJSON(http.StatusUnauthorized, gin.H{
				"error": "缺少 Bearer Token",
			})
			return
		}
		tokenStr := strings.TrimPrefix(authHeader, "Bearer ")

		// 获取 JWKS 公钥
		keySet, err := cache.GetKeySet(c.Request.Context())
		if err != nil {
			c.AbortWithStatusJSON(http.StatusInternalServerError, gin.H{
				"error": "无法获取公钥",
			})
			return
		}

		// 解析并验证 JWT
		token, err := jwt.Parse(tokenStr, func(t *jwt.Token) (interface{}, error) {
			kid, ok := t.Header["kid"].(string)
			if !ok {
				return nil, fmt.Errorf("缺少 kid")
			}
			key, found := keySet.LookupKeyID(kid)
			if !found {
				return nil, fmt.Errorf("找不到 kid=%s 的公钥", kid)
			}
			var rawKey interface{}
			if err := key.Raw(&rawKey); err != nil {
				return nil, err
			}
			return rawKey, nil
		},
			jwt.WithIssuer(cfg.Issuer),
			jwt.WithAudience(cfg.Audience),
			jwt.WithValidMethods([]string{"RS256"}),
		)

		if err != nil || !token.Valid {
			c.AbortWithStatusJSON(http.StatusUnauthorized, gin.H{
				"error": fmt.Sprintf("Token 无效: %v", err),
			})
			return
		}

		claims, ok := token.Claims.(jwt.MapClaims)
		if !ok {
			c.AbortWithStatusJSON(http.StatusUnauthorized, gin.H{
				"error": "无法解析 claims",
			})
			return
		}

		// 注入用户信息
		c.Set("user_id", claims["sub"])
		c.Set("claims", claims)
		c.Next()
	}
}

// RequireScope 检查 Token 是否包含所需 scope
func RequireScope(required string) gin.HandlerFunc {
	return func(c *gin.Context) {
		claims, exists := c.Get("claims")
		if !exists {
			c.AbortWithStatusJSON(http.StatusUnauthorized, gin.H{
				"error": "未认证",
			})
			return
		}

		mapClaims := claims.(jwt.MapClaims)
		scopeStr, _ := mapClaims["scope"].(string)
		scopes := strings.Fields(scopeStr)

		for _, s := range scopes {
			if s == required {
				c.Next()
				return
			}
		}

		c.AbortWithStatusJSON(http.StatusForbidden, gin.H{
			"error": fmt.Sprintf("需要 scope: %s", required),
		})
	}
}

12.4 mTLS 客户端证书认证

最安全的 API 认证方式,适合服务间通信:

mTLS 认证流程

        sequenceDiagram
    participant C as 客户端
    participant S as 服务器
    participant CA as 证书颁发机构

    Note over C,CA: 准备阶段(一次性)
    C->>CA: 申请客户端证书
    CA->>C: 颁发客户端证书 + 私钥
    S->>CA: 申请服务器证书
    CA->>S: 颁发服务器证书 + 私钥

    Note over C,S: TLS 握手(双向验证)
    C->>S: ClientHello
    S->>C: ServerHello + 服务器证书
    C->>C: 验证服务器证书(用 CA 公钥)
    S->>C: CertificateRequest(要求客户端证书)
    C->>S: 客户端证书 + CertificateVerify
    S->>S: 验证客户端证书(用 CA 公钥)
    Note over C,S: 双向验证完成 ✅ 建立加密通道

    C->>S: HTTP 请求(加密通道内,无需额外 Token)
    S->>S: 从证书 CN/SAN 提取客户端身份
    S->>C: HTTP 响应
    

Python: mTLS 服务端与客户端

# --- mTLS 客户端 ---
import httpx

async def call_service_with_mtls():
    """使用 mTLS 调用其他服务"""
    async with httpx.AsyncClient(
        cert=("client.crt", "client.key"),  # 客户端证书 + 私钥
        verify="ca.crt",                     # CA 证书(验证服务器)
    ) as client:
        response = await client.get("https://service-b:8443/api/data")
        return response.json()


# --- mTLS 服务端(FastAPI + uvicorn) ---
# 启动命令:
# uvicorn main:app --host 0.0.0.0 --port 8443 \
#   --ssl-keyfile server.key \
#   --ssl-certfile server.crt \
#   --ssl-ca-certs ca.crt \
#   --ssl-cert-reqs 2  # 2 = CERT_REQUIRED(强制客户端证书)

from fastapi import FastAPI, Request, HTTPException

app = FastAPI(title="mTLS Service")

@app.middleware("http")
async def extract_client_identity(request: Request, call_next):
    """从客户端证书中提取身份信息"""
    # Nginx/Envoy 代理时,证书信息通常通过 Header 传递
    client_cn = request.headers.get("X-Client-CN", "")
    client_serial = request.headers.get("X-Client-Serial", "")

    if not client_cn:
        # 直接 TLS 连接时,从 transport 获取
        transport = request.scope.get("transport")
        if transport and hasattr(transport, "get_extra_info"):
            peercert = transport.get_extra_info("peercert")
            if peercert:
                for rdn in peercert.get("subject", ()):
                    for attr_type, attr_value in rdn:
                        if attr_type == "commonName":
                            client_cn = attr_value

    request.state.client_cn = client_cn
    response = await call_next(request)
    return response

@app.get("/api/data")
async def get_data(request: Request):
    client = getattr(request.state, "client_cn", "unknown")
    return {"data": "sensitive", "authenticated_client": client}

Java: mTLS 服务端配置

// application.yml
// server:
//   port: 8443
//   ssl:
//     enabled: true
//     key-store: classpath:server-keystore.p12
//     key-store-password: changeit
//     key-store-type: PKCS12
//     trust-store: classpath:truststore.p12
//     trust-store-password: changeit
//     trust-store-type: PKCS12
//     client-auth: need  # need = 强制客户端证书

import org.springframework.web.bind.annotation.*;
import jakarta.servlet.http.HttpServletRequest;
import java.security.cert.X509Certificate;

@RestController
@RequestMapping("/api")
public class MtlsController {

    @GetMapping("/data")
    public Map<String, Object> getData(HttpServletRequest request) {
        // 从请求中提取客户端证书
        X509Certificate[] certs = (X509Certificate[])
                request.getAttribute("jakarta.servlet.request.X509Certificate");

        String clientCN = "unknown";
        if (certs != null && certs.length > 0) {
            clientCN = certs[0].getSubjectX500Principal().getName();
        }

        return Map.of(
                "data", "sensitive information",
                "authenticated_client", clientCN
        );
    }
}

// --- mTLS 客户端(RestTemplate) ---
import org.apache.hc.client5.http.impl.classic.CloseableHttpClient;
import org.apache.hc.client5.http.impl.classic.HttpClients;
import org.apache.hc.client5.http.ssl.SSLConnectionSocketFactory;
import org.apache.hc.core5.ssl.SSLContextBuilder;
import org.springframework.http.client.HttpComponentsClientHttpRequestFactory;
import org.springframework.web.client.RestTemplate;

import javax.net.ssl.SSLContext;
import java.io.File;

@Configuration
public class MtlsClientConfig {

    @Bean
    public RestTemplate mtlsRestTemplate() throws Exception {
        SSLContext sslContext = SSLContextBuilder.create()
                .loadKeyMaterial(
                        new File("client-keystore.p12"),
                        "changeit".toCharArray(),
                        "changeit".toCharArray())
                .loadTrustMaterial(
                        new File("truststore.p12"),
                        "changeit".toCharArray())
                .build();

        SSLConnectionSocketFactory socketFactory =
                new SSLConnectionSocketFactory(sslContext);

        CloseableHttpClient httpClient = HttpClients.custom()
                .setSSLSocketFactory(socketFactory)
                .build();

        return new RestTemplate(
                new HttpComponentsClientHttpRequestFactory(httpClient));
    }
}

Go: mTLS 服务端与客户端

package main

import (
	"crypto/tls"
	"crypto/x509"
	"fmt"
	"io"
	"log"
	"net/http"
	"os"

	"github.com/gin-gonic/gin"
)

// --- mTLS 服务端 ---

func StartMTLSServer() {
	// 加载 CA 证书(用于验证客户端证书)
	caCert, err := os.ReadFile("ca.crt")
	if err != nil {
		log.Fatal(err)
	}
	caCertPool := x509.NewCertPool()
	caCertPool.AppendCertsFromPEM(caCert)

	// TLS 配置:要求客户端证书
	tlsConfig := &tls.Config{
		ClientCAs:  caCertPool,
		ClientAuth: tls.RequireAndVerifyClientCert,
		MinVersion: tls.VersionTLS13,
	}

	r := gin.Default()

	// 从客户端证书提取身份
	r.Use(func(c *gin.Context) {
		if c.Request.TLS != nil && len(c.Request.TLS.PeerCertificates) > 0 {
			cert := c.Request.TLS.PeerCertificates[0]
			c.Set("client_cn", cert.Subject.CommonName)
			c.Set("client_serial", cert.SerialNumber.String())
		}
		c.Next()
	})

	r.GET("/api/data", func(c *gin.Context) {
		clientCN := c.GetString("client_cn")
		c.JSON(http.StatusOK, gin.H{
			"data":                 "sensitive information",
			"authenticated_client": clientCN,
		})
	})

	server := &http.Server{
		Addr:      ":8443",
		Handler:   r,
		TLSConfig: tlsConfig,
	}

	log.Println("mTLS server starting on :8443")
	log.Fatal(server.ListenAndServeTLS("server.crt", "server.key"))
}

// --- mTLS 客户端 ---

func CallWithMTLS(url string) ([]byte, error) {
	// 加载客户端证书
	cert, err := tls.LoadX509KeyPair("client.crt", "client.key")
	if err != nil {
		return nil, fmt.Errorf("加载客户端证书失败: %w", err)
	}

	// 加载 CA 证书
	caCert, err := os.ReadFile("ca.crt")
	if err != nil {
		return nil, fmt.Errorf("加载 CA 证书失败: %w", err)
	}
	caCertPool := x509.NewCertPool()
	caCertPool.AppendCertsFromPEM(caCert)

	tlsConfig := &tls.Config{
		Certificates: []tls.Certificate{cert},
		RootCAs:      caCertPool,
		MinVersion:   tls.VersionTLS13,
	}

	client := &http.Client{
		Transport: &http.Transport{TLSClientConfig: tlsConfig},
	}

	resp, err := client.Get(url)
	if err != nil {
		return nil, err
	}
	defer resp.Body.Close()
	return io.ReadAll(resp.Body)
}

12.5 HMAC 签名认证

类似 AWS Signature V4,通过对请求内容签名来验证身份和完整性:

HMAC 签名认证流程

        sequenceDiagram
    participant C as 客户端
    participant S as 服务器
    participant DB as 密钥存储

    Note over C: 构造签名
    C->>C: 1. 拼接签名字符串<br/>METHOD + PATH + TIMESTAMP + BODY_HASH
    C->>C: 2. HMAC-SHA256(secret_key, string_to_sign)
    C->>S: 请求 + Headers:<br/>X-Access-Key: AK001<br/>X-Timestamp: 1711036800<br/>X-Signature: a3f2...

    S->>DB: 3. 用 Access Key 查找 Secret Key
    DB->>S: 返回 Secret Key

    S->>S: 4. 检查时间戳(±5 分钟防重放)
    S->>S: 5. 用相同算法重新计算签名
    S->>S: 6. 恒定时间比较签名

    alt 签名匹配
        S->>C: 200 OK
    else 签名不匹配
        S->>C: 401 Unauthorized
    end
    

Python: HMAC 签名生成和验证

import hmac
import hashlib
import time
from fastapi import FastAPI, Request, HTTPException

app = FastAPI(title="HMAC Auth Demo")

# --- 密钥存储 ---
SECRET_KEYS = {
    "AK001": "sk-very-secret-key-001",
    "AK002": "sk-very-secret-key-002",
}

REPLAY_WINDOW = 300  # 5 分钟防重放窗口

# --- 客户端:签名生成 ---
def sign_request(
    method: str,
    path: str,
    body: str,
    access_key: str,
    secret_key: str,
) -> dict:
    """对 API 请求进行 HMAC 签名"""
    timestamp = str(int(time.time()))
    body_hash = hashlib.sha256(body.encode()).hexdigest()

    # 构造签名字符串(规范化)
    string_to_sign = f"{method.upper()}\n{path}\n{timestamp}\n{body_hash}"

    # 计算 HMAC-SHA256 签名
    signature = hmac.new(
        secret_key.encode(),
        string_to_sign.encode(),
        hashlib.sha256,
    ).hexdigest()

    return {
        "X-Access-Key": access_key,
        "X-Timestamp": timestamp,
        "X-Signature": signature,
    }

# --- 服务端:签名验证 ---
def verify_signature(
    method: str,
    path: str,
    body: str,
    headers: dict,
) -> str:
    """验证 HMAC 签名,返回 access_key"""
    access_key = headers.get("x-access-key", "")
    timestamp = headers.get("x-timestamp", "")
    signature = headers.get("x-signature", "")

    # 1. 检查必要 Header
    if not all([access_key, timestamp, signature]):
        raise ValueError("缺少签名 Header")

    # 2. 查找 Secret Key
    secret_key = SECRET_KEYS.get(access_key)
    if not secret_key:
        raise ValueError("无效的 Access Key")

    # 3. 检查时间戳(防重放)
    try:
        ts = int(timestamp)
    except ValueError:
        raise ValueError("无效的时间戳")

    if abs(time.time() - ts) > REPLAY_WINDOW:
        raise ValueError("请求已过期(时间戳超出窗口)")

    # 4. 重新计算签名
    body_hash = hashlib.sha256(body.encode()).hexdigest()
    string_to_sign = f"{method.upper()}\n{path}\n{timestamp}\n{body_hash}"
    expected = hmac.new(
        secret_key.encode(),
        string_to_sign.encode(),
        hashlib.sha256,
    ).hexdigest()

    # 5. 恒定时间比较(防时序攻击)
    if not hmac.compare_digest(signature, expected):
        raise ValueError("签名不匹配")

    return access_key

# --- FastAPI 中间件 ---
@app.middleware("http")
async def hmac_auth_middleware(request: Request, call_next):
    if request.url.path.startswith("/api/"):
        body = (await request.body()).decode()
        try:
            access_key = verify_signature(
                method=request.method,
                path=request.url.path,
                body=body,
                headers=dict(request.headers),
            )
            request.state.access_key = access_key
        except ValueError as e:
            raise HTTPException(status_code=401, detail=str(e))

    response = await call_next(request)
    return response

@app.post("/api/payment")
async def create_payment(request: Request):
    return {
        "message": "支付已创建",
        "authenticated_by": request.state.access_key,
    }

Java: HMAC 签名 Filter

import jakarta.servlet.*;
import jakarta.servlet.http.HttpServletRequest;
import jakarta.servlet.http.HttpServletResponse;
import org.springframework.stereotype.Component;

import javax.crypto.Mac;
import javax.crypto.spec.SecretKeySpec;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.security.MessageDigest;
import java.time.Instant;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

@Component
public class HmacAuthFilter implements Filter {

    private static final long REPLAY_WINDOW = 300; // 5 分钟
    private final Map<String, String> secretKeys = new ConcurrentHashMap<>();

    public HmacAuthFilter() {
        secretKeys.put("AK001", "sk-very-secret-key-001");
        secretKeys.put("AK002", "sk-very-secret-key-002");
    }

    @Override
    public void doFilter(ServletRequest req, ServletResponse res,
                         FilterChain chain) throws IOException, ServletException {
        HttpServletRequest request = (HttpServletRequest) req;
        HttpServletResponse response = (HttpServletResponse) res;

        if (!request.getRequestURI().startsWith("/api/")) {
            chain.doFilter(req, res);
            return;
        }

        String accessKey = request.getHeader("X-Access-Key");
        String timestamp = request.getHeader("X-Timestamp");
        String signature = request.getHeader("X-Signature");

        // 1. 检查必要 Header
        if (accessKey == null || timestamp == null || signature == null) {
            sendError(response, 401, "缺少签名 Header");
            return;
        }

        // 2. 查找 Secret Key
        String secretKey = secretKeys.get(accessKey);
        if (secretKey == null) {
            sendError(response, 401, "无效的 Access Key");
            return;
        }

        // 3. 检查时间戳
        long ts;
        try {
            ts = Long.parseLong(timestamp);
        } catch (NumberFormatException e) {
            sendError(response, 401, "无效的时间戳");
            return;
        }
        if (Math.abs(Instant.now().getEpochSecond() - ts) > REPLAY_WINDOW) {
            sendError(response, 401, "请求已过期");
            return;
        }

        // 4. 读取 Body 并计算签名
        CachedBodyHttpServletRequest cachedRequest =
                new CachedBodyHttpServletRequest(request);
        String body = new String(cachedRequest.getCachedBody(),
                StandardCharsets.UTF_8);
        String bodyHash = sha256(body);
        String stringToSign = String.join("\n",
                request.getMethod(), request.getRequestURI(), timestamp, bodyHash);

        String expected = hmacSha256(secretKey, stringToSign);

        // 5. 恒定时间比较
        if (!MessageDigest.isEqual(
                signature.getBytes(StandardCharsets.UTF_8),
                expected.getBytes(StandardCharsets.UTF_8))) {
            sendError(response, 401, "签名不匹配");
            return;
        }

        cachedRequest.setAttribute("access_key", accessKey);
        chain.doFilter(cachedRequest, response);
    }

    private String sha256(String input) {
        try {
            MessageDigest digest = MessageDigest.getInstance("SHA-256");
            byte[] hash = digest.digest(input.getBytes(StandardCharsets.UTF_8));
            return bytesToHex(hash);
        } catch (Exception e) { throw new RuntimeException(e); }
    }

    private String hmacSha256(String key, String data) {
        try {
            Mac mac = Mac.getInstance("HmacSHA256");
            mac.init(new SecretKeySpec(
                    key.getBytes(StandardCharsets.UTF_8), "HmacSHA256"));
            return bytesToHex(
                    mac.doFinal(data.getBytes(StandardCharsets.UTF_8)));
        } catch (Exception e) { throw new RuntimeException(e); }
    }

    private String bytesToHex(byte[] bytes) {
        StringBuilder sb = new StringBuilder();
        for (byte b : bytes) sb.append(String.format("%02x", b));
        return sb.toString();
    }

    private void sendError(HttpServletResponse response, int status,
                           String message) throws IOException {
        response.setStatus(status);
        response.setContentType("application/json");
        response.getWriter().write(
                String.format("{\"error\":\"%s\"}", message));
    }
}

Go: HMAC 签名中间件

package middleware

import (
	"bytes"
	"crypto/hmac"
	"crypto/sha256"
	"encoding/hex"
	"fmt"
	"io"
	"math"
	"net/http"
	"strconv"
	"time"

	"github.com/gin-gonic/gin"
)

// HmacConfig HMAC 签名配置
type HmacConfig struct {
	SecretKeys   map[string]string // access_key -> secret_key
	ReplayWindow time.Duration     // 防重放窗口
}

// HmacAuth Gin 中间件:HMAC 签名认证
func HmacAuth(cfg HmacConfig) gin.HandlerFunc {
	return func(c *gin.Context) {
		accessKey := c.GetHeader("X-Access-Key")
		timestamp := c.GetHeader("X-Timestamp")
		signature := c.GetHeader("X-Signature")

		// 1. 检查必要 Header
		if accessKey == "" || timestamp == "" || signature == "" {
			c.AbortWithStatusJSON(http.StatusUnauthorized, gin.H{
				"error": "缺少签名 Header",
			})
			return
		}

		// 2. 查找 Secret Key
		secretKey, ok := cfg.SecretKeys[accessKey]
		if !ok {
			c.AbortWithStatusJSON(http.StatusUnauthorized, gin.H{
				"error": "无效的 Access Key",
			})
			return
		}

		// 3. 检查时间戳
		ts, err := strconv.ParseInt(timestamp, 10, 64)
		if err != nil {
			c.AbortWithStatusJSON(http.StatusUnauthorized, gin.H{
				"error": "无效的时间戳",
			})
			return
		}
		diff := math.Abs(float64(time.Now().Unix() - ts))
		if diff > cfg.ReplayWindow.Seconds() {
			c.AbortWithStatusJSON(http.StatusUnauthorized, gin.H{
				"error": "请求已过期",
			})
			return
		}

		// 4. 读取 Body
		bodyBytes, err := io.ReadAll(c.Request.Body)
		if err != nil {
			c.AbortWithStatusJSON(http.StatusBadRequest, gin.H{
				"error": "无法读取请求体",
			})
			return
		}
		// 重新设置 Body(供后续 Handler 使用)
		c.Request.Body = io.NopCloser(bytes.NewBuffer(bodyBytes))

		// 5. 计算签名
		bodyHash := sha256Hex(string(bodyBytes))
		stringToSign := fmt.Sprintf("%s\n%s\n%s\n%s",
			c.Request.Method, c.Request.URL.Path, timestamp, bodyHash)

		mac := hmac.New(sha256.New, []byte(secretKey))
		mac.Write([]byte(stringToSign))
		expected := hex.EncodeToString(mac.Sum(nil))

		// 6. 恒定时间比较
		if !hmac.Equal([]byte(signature), []byte(expected)) {
			c.AbortWithStatusJSON(http.StatusUnauthorized, gin.H{
				"error": "签名不匹配",
			})
			return
		}

		c.Set("access_key", accessKey)
		c.Next()
	}
}

func sha256Hex(s string) string {
	h := sha256.Sum256([]byte(s))
	return hex.EncodeToString(h[:])
}

12.6 API Gateway 认证模式

集中认证 vs 分布式认证

        flowchart LR
    subgraph 集中认证(推荐)
        C1[客户端] --> GW1[API Gateway<br/>① 验证 Token<br/>② 速率限制<br/>③ 注入用户信息]
        GW1 --> MS1[微服务<br/>信任 GW 的转发]
    end

    subgraph 分布式认证
        C2[客户端] --> GW2[API Gateway<br/>仅路由转发]
        GW2 --> MS2[微服务<br/>自己验证 Token]
    end
    

Token 内省 vs 本地验证

方式

Token 内省(Introspection)

本地验证(JWT)

原理

调用授权服务器验证 Token

本地用公钥验证签名

延迟

高(网络调用)

低(本地计算)

实时性

实时(可检查撤销)

延迟(依赖过期时间)

依赖

授权服务器必须可用

仅需公钥(JWKS)

适用

Opaque Token

JWT

12.7 Service-to-Service 认证

        flowchart TB
    L1[Level 1: 无认证<br/>内网信任 ❌ 不安全]
    L2[Level 2: Service Account + Token<br/>静态凭证,需手动轮换]
    L3[Level 3: Service Mesh mTLS<br/>自动证书管理,透明加密]
    L4[Level 4: SPIFFE/SPIRE<br/>统一工作负载身份,跨平台,自动轮换]

    L1 -->|演进| L2
    L2 -->|演进| L3
    L3 -->|演进| L4

    style L1 fill:#ffcccc
    style L2 fill:#ffffcc
    style L3 fill:#ccffcc
    style L4 fill:#ccccff
    

12.8 API 认证方式对比

方式

安全性

复杂度

防重放

防篡改

携带身份

适用场景

API Key

⭐⭐

公开 API、低安全要求

Bearer Token (JWT)

⭐⭐⭐⭐

✅(过期)

✅(签名)

Web/Mobile API

HMAC 签名

⭐⭐⭐⭐

✅(时间戳)

✅(签名)

高安全 API(支付)

mTLS

⭐⭐⭐⭐⭐

✅(TLS)

✅(TLS)

✅(证书)

服务间通信

SPIFFE SVID

⭐⭐⭐⭐⭐

云原生服务间通信

12.9 认证方式选型指南

按场景选型

场景

推荐方式

理由

公开数据 API(天气、汇率)

API Key

简单,仅用于标识和限流

用户数据 API(SaaS)

OAuth2 Bearer Token

标准化,携带用户身份和权限

支付/金融 API

HMAC 签名 + TLS

请求完整性验证,防篡改

微服务间通信(同集群)

mTLS(Service Mesh)

零信任,自动证书管理

微服务间通信(跨云)

SPIFFE/SPIRE

统一身份,跨平台

第三方 Webhook

HMAC 签名

验证来源真实性

移动端 API

OAuth2 + PKCE

公共客户端安全授权

组合使用

在实际生产环境中,通常需要组合多种认证方式:

        flowchart TD
    EXT[外部请求] --> GW[API Gateway]
    GW -->|Bearer Token 验证| AUTH[认证服务]
    GW -->|通过| MESH[Service Mesh]
    MESH -->|mTLS| SVC_A[服务 A]
    MESH -->|mTLS| SVC_B[服务 B]
    SVC_A -->|HMAC 签名| PAY[支付服务]
    SVC_B -->|mTLS| DB[数据库]

    style GW fill:#f9f,stroke:#333
    style MESH fill:#bbf,stroke:#333
    

典型架构

  • 外部 → Gateway:OAuth2 Bearer Token(用户认证)

  • Gateway → 微服务:mTLS(Service Mesh 自动管理)

  • 微服务 → 敏感服务:HMAC 签名(额外的请求完整性保护)

12.10 小结

  • Bearer Token(JWT) 是最通用的 API 认证方式

  • mTLS 提供最强的安全保障,适合服务间通信

  • API Gateway 集中认证 简化了微服务的认证逻辑

  • HMAC 签名 适合需要请求完整性验证的高安全场景

  • SPIFFE/SPIRE 是云原生时代服务间认证的最佳实践(详见第四部分)

  • 选择认证方式时需权衡:安全性、复杂度、性能、运维成本

  • 生产环境通常组合使用多种认证方式,形成纵深防御