# 第十二章:API 认证模式 > "在微服务时代,API 认证不再是可选项,而是每个请求的必经之路。" ```{mermaid} mindmap root((API 认证)) 认证方式 API Key Bearer Token mTLS HMAC 签名 Gateway 模式 集中认证 分布式认证 Token 内省 服务间认证 Service Account Service Mesh SPIFFE/SPIRE 最佳实践 生命周期管理 速率限制 审计日志 ``` ## 12.1 API 认证的特殊性 API 认证与传统 Web 认证有本质区别: | 维度 | Web 认证 | API 认证 | |------|---------|---------| | 交互方式 | 用户交互(浏览器) | 程序化(HTTP 客户端) | | 状态 | 有状态(Session) | 无状态(Token) | | 凭证类型 | Cookie | Authorization Header | | 调用者 | 人类 | 人类 + 机器 | | 频率 | 低(页面级) | 高(API 级) | ### API 认证方式选型决策树 ```{mermaid} flowchart TD A[选择 API 认证方式] --> B{调用者是谁?} B -->|服务间通信| C{网络环境?} B -->|第三方开发者| D{安全要求?} B -->|终端用户 App| E[OAuth2 Bearer Token] C -->|同一集群/Service Mesh| F[mTLS] C -->|跨网络/跨云| G{需要请求签名?} G -->|是| H[HMAC 签名] G -->|否| I[mTLS 或 Bearer Token] D -->|低:公开数据| J[API Key] D -->|中:用户数据| E D -->|高:支付/金融| K[HMAC 签名 + mTLS] ``` ## 12.2 API Key 认证 最简单的 API 认证方式,适合低安全要求的场景: ### API Key 认证流程 ```{mermaid} sequenceDiagram participant C as 客户端 participant G as API Gateway participant S as 后端服务 participant DB as Key 存储 C->>G: GET /api/data
X-API-Key: key-abc123 G->>DB: 查询 API Key alt Key 有效 DB->>G: 返回 Key 信息(权限、配额) G->>G: 检查速率限制 G->>S: 转发请求 + 注入客户端信息 S->>G: 响应数据 G->>C: 200 OK + 数据 else Key 无效或过期 DB->>G: Key 不存在 G->>C: 401 Unauthorized end ``` ### Python: FastAPI API Key 中间件 ```python from fastapi import FastAPI, Header, Query, HTTPException, Depends, Request from fastapi.security import APIKeyHeader, APIKeyQuery from typing import Optional from datetime import datetime import hashlib import secrets app = FastAPI(title="API Key Auth Demo") # --- API Key 存储(生产环境应使用数据库) --- API_KEYS = { # 存储 Key 的哈希值,而非明文 hashlib.sha256(b"sk-live-abc123def456").hexdigest(): { "name": "Service A", "permissions": ["read"], "rate_limit": 1000, "created_at": "2025-01-01", }, hashlib.sha256(b"sk-live-xyz789ghi012").hexdigest(): { "name": "Service B", "permissions": ["read", "write"], "rate_limit": 5000, "created_at": "2025-06-01", }, } # 支持两种传递方式:Header 和 Query api_key_header = APIKeyHeader(name="X-API-Key", auto_error=False) api_key_query = APIKeyQuery(name="api_key", auto_error=False) async def verify_api_key( header_key: Optional[str] = Depends(api_key_header), query_key: Optional[str] = Depends(api_key_query), ) -> dict: """验证 API Key(支持 Header 和 Query 两种方式)""" api_key = header_key or query_key if not api_key: raise HTTPException( status_code=401, detail="缺少 API Key,请通过 X-API-Key Header 或 api_key Query 参数传递", ) # 对比哈希值(避免明文存储) key_hash = hashlib.sha256(api_key.encode()).hexdigest() client = API_KEYS.get(key_hash) if not client: raise HTTPException(status_code=401, detail="无效的 API Key") return client @app.get("/api/data") async def get_data(client: dict = Depends(verify_api_key)): if "read" not in client["permissions"]: raise HTTPException(status_code=403, detail="权限不足") return {"data": "sensitive information", "client": client["name"]} @app.post("/api/data") async def create_data(client: dict = Depends(verify_api_key)): if "write" not in client["permissions"]: raise HTTPException(status_code=403, detail="权限不足:需要 write 权限") return {"message": "数据已创建", "client": client["name"]} # --- API Key 生成工具 --- def generate_api_key(prefix: str = "sk-live") -> str: """生成安全的 API Key""" random_part = secrets.token_urlsafe(32) return f"{prefix}-{random_part}" ``` ### Java: Spring Boot API Key Filter ```java import jakarta.servlet.*; import jakarta.servlet.http.HttpServletRequest; import jakarta.servlet.http.HttpServletResponse; import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Component; import org.springframework.boot.web.servlet.FilterRegistrationBean; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import java.io.IOException; import java.nio.charset.StandardCharsets; import java.security.MessageDigest; import java.util.Map; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; /** * API Key 认证 Filter */ @Component public class ApiKeyFilter implements Filter { private static final String API_KEY_HEADER = "X-API-Key"; private static final String API_KEY_QUERY = "api_key"; // 存储 Key 哈希 → 客户端信息 private final Map apiKeys = new ConcurrentHashMap<>(); public ApiKeyFilter() { // 初始化(生产环境从数据库加载) apiKeys.put(sha256("sk-live-abc123def456"), new ApiClient("Service A", Set.of("read"))); apiKeys.put(sha256("sk-live-xyz789ghi012"), new ApiClient("Service B", Set.of("read", "write"))); } @Override public void doFilter(ServletRequest req, ServletResponse res, FilterChain chain) throws IOException, ServletException { HttpServletRequest request = (HttpServletRequest) req; HttpServletResponse response = (HttpServletResponse) res; // 从 Header 或 Query 获取 API Key String apiKey = request.getHeader(API_KEY_HEADER); if (apiKey == null || apiKey.isBlank()) { apiKey = request.getParameter(API_KEY_QUERY); } if (apiKey == null || apiKey.isBlank()) { response.setStatus(401); response.setContentType("application/json"); response.getWriter().write("{\"error\":\"缺少 API Key\"}"); return; } ApiClient client = apiKeys.get(sha256(apiKey)); if (client == null) { response.setStatus(401); response.setContentType("application/json"); response.getWriter().write("{\"error\":\"无效的 API Key\"}"); return; } // 将客户端信息注入请求属性 request.setAttribute("api_client", client); chain.doFilter(request, response); } private String sha256(String input) { try { MessageDigest digest = MessageDigest.getInstance("SHA-256"); byte[] hash = digest.digest(input.getBytes(StandardCharsets.UTF_8)); StringBuilder hex = new StringBuilder(); for (byte b : hash) hex.append(String.format("%02x", b)); return hex.toString(); } catch (Exception e) { throw new RuntimeException(e); } } public record ApiClient(String name, Set permissions) {} } /** * Filter 注册配置 */ @Configuration public class FilterConfig { @Bean public FilterRegistrationBean apiKeyFilterRegistration( ApiKeyFilter filter) { FilterRegistrationBean reg = new FilterRegistrationBean<>(); reg.setFilter(filter); reg.addUrlPatterns("/api/*"); reg.setOrder(1); return reg; } } ``` ### Go: Gin API Key 中间件 ```go package middleware import ( "crypto/sha256" "encoding/hex" "net/http" "sync" "github.com/gin-gonic/gin" ) // ApiClient API 客户端信息 type ApiClient struct { Name string Permissions map[string]bool } // ApiKeyStore API Key 存储 type ApiKeyStore struct { mu sync.RWMutex keys map[string]*ApiClient // sha256(key) -> client } func NewApiKeyStore() *ApiKeyStore { store := &ApiKeyStore{keys: make(map[string]*ApiClient)} // 初始化(生产环境从数据库加载) store.Register("sk-live-abc123def456", &ApiClient{ Name: "Service A", Permissions: map[string]bool{"read": true}, }) store.Register("sk-live-xyz789ghi012", &ApiClient{ Name: "Service B", Permissions: map[string]bool{"read": true, "write": true}, }) return store } func (s *ApiKeyStore) Register(key string, client *ApiClient) { hash := sha256Hex(key) s.mu.Lock() defer s.mu.Unlock() s.keys[hash] = client } func (s *ApiKeyStore) Lookup(key string) (*ApiClient, bool) { hash := sha256Hex(key) s.mu.RLock() defer s.mu.RUnlock() client, ok := s.keys[hash] return client, ok } func sha256Hex(s string) string { h := sha256.Sum256([]byte(s)) return hex.EncodeToString(h[:]) } // ApiKeyAuth Gin 中间件:API Key 认证 func ApiKeyAuth(store *ApiKeyStore) gin.HandlerFunc { return func(c *gin.Context) { // 从 Header 或 Query 获取 API Key apiKey := c.GetHeader("X-API-Key") if apiKey == "" { apiKey = c.Query("api_key") } if apiKey == "" { c.AbortWithStatusJSON(http.StatusUnauthorized, gin.H{ "error": "缺少 API Key", }) return } client, ok := store.Lookup(apiKey) if !ok { c.AbortWithStatusJSON(http.StatusUnauthorized, gin.H{ "error": "无效的 API Key", }) return } // 注入客户端信息 c.Set("api_client", client) c.Set("client_name", client.Name) c.Next() } } // RequirePermission 权限检查中间件 func RequirePermission(perm string) gin.HandlerFunc { return func(c *gin.Context) { client, exists := c.Get("api_client") if !exists { c.AbortWithStatusJSON(http.StatusUnauthorized, gin.H{ "error": "未认证", }) return } apiClient := client.(*ApiClient) if !apiClient.Permissions[perm] { c.AbortWithStatusJSON(http.StatusForbidden, gin.H{ "error": "权限不足:需要 " + perm + " 权限", }) return } c.Next() } } ``` **API Key 的问题**: - ❌ 无过期时间(除非手动管理) - ❌ 无法携带用户身份信息 - ❌ 容易泄露(日志、URL、代码仓库) - ❌ 无标准的轮换机制 ## 12.3 Bearer Token 认证 使用 OAuth2 Access Token,是最推荐的 API 认证方式: ### OAuth2 Bearer Token 认证流程 ```{mermaid} sequenceDiagram participant C as 客户端 participant AS as 授权服务器 participant RS as 资源服务器 (API) participant JWKS as JWKS 端点 Note over C,AS: 获取 Token C->>AS: POST /oauth/token
{grant_type, client_id, client_secret} AS->>AS: 验证客户端凭证 AS->>C: {access_token: "eyJ...", expires_in: 3600} Note over C,RS: 使用 Token 访问 API C->>RS: GET /api/resource
Authorization: Bearer eyJ... RS->>RS: 解析 JWT Header RS->>JWKS: 获取公钥(缓存) JWKS->>RS: 返回 JWK Set RS->>RS: 验证签名 + 过期时间 + audience + issuer alt Token 有效 RS->>C: 200 OK + 资源数据 else Token 无效/过期 RS->>C: 401 Unauthorized end ``` ### Python: FastAPI Bearer Token 验证 ```python from fastapi import FastAPI, Depends, HTTPException, Security from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials import jwt import httpx from functools import lru_cache from typing import Optional from datetime import datetime app = FastAPI(title="Bearer Token Auth Demo") security = HTTPBearer() # --- 配置 --- AUTH_SERVER = "https://auth.example.com" JWKS_URL = f"{AUTH_SERVER}/.well-known/jwks.json" EXPECTED_AUDIENCE = "my-api" EXPECTED_ISSUER = AUTH_SERVER # --- JWKS 公钥缓存 --- _jwks_cache: Optional[dict] = None _jwks_cache_time: float = 0 async def get_jwks() -> dict: """获取并缓存 JWKS 公钥集""" global _jwks_cache, _jwks_cache_time import time now = time.time() # 缓存 1 小时 if _jwks_cache and (now - _jwks_cache_time) < 3600: return _jwks_cache async with httpx.AsyncClient() as client: resp = await client.get(JWKS_URL) resp.raise_for_status() _jwks_cache = resp.json() _jwks_cache_time = now return _jwks_cache def get_public_key(token: str, jwks: dict): """从 JWKS 中找到匹配的公钥""" header = jwt.get_unverified_header(token) kid = header.get("kid") for key in jwks.get("keys", []): if key.get("kid") == kid: return jwt.algorithms.RSAAlgorithm.from_jwk(key) raise ValueError(f"找不到 kid={kid} 对应的公钥") async def verify_bearer_token( credentials: HTTPAuthorizationCredentials = Security(security), ) -> dict: """验证 Bearer Token 并返回 payload""" token = credentials.credentials try: jwks = await get_jwks() public_key = get_public_key(token, jwks) payload = jwt.decode( token, public_key, algorithms=["RS256"], audience=EXPECTED_AUDIENCE, issuer=EXPECTED_ISSUER, options={"require": ["exp", "iss", "aud", "sub"]}, ) return payload except jwt.ExpiredSignatureError: raise HTTPException(status_code=401, detail="Token 已过期") except jwt.InvalidAudienceError: raise HTTPException(status_code=401, detail="Token audience 不匹配") except jwt.InvalidIssuerError: raise HTTPException(status_code=401, detail="Token issuer 不匹配") except jwt.InvalidTokenError as e: raise HTTPException(status_code=401, detail=f"无效的 Token: {e}") # --- 权限检查 --- def require_scope(required: str): """检查 Token 是否包含所需的 scope""" async def checker(payload: dict = Depends(verify_bearer_token)): scopes = payload.get("scope", "").split() if required not in scopes: raise HTTPException( status_code=403, detail=f"需要 scope: {required}", ) return payload return checker @app.get("/api/profile") async def get_profile(user: dict = Depends(verify_bearer_token)): return { "user_id": user["sub"], "roles": user.get("roles", []), "scopes": user.get("scope", "").split(), } @app.get("/api/admin") async def admin_endpoint(user: dict = Depends(require_scope("admin"))): return {"message": "管理员接口", "user_id": user["sub"]} ``` ### Java: Spring Resource Server ```java // application.yml // spring: // security: // oauth2: // resourceserver: // jwt: // issuer-uri: https://auth.example.com // audiences: my-api import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.security.config.annotation.method.configuration.EnableMethodSecurity; import org.springframework.security.config.annotation.web.builders.HttpSecurity; import org.springframework.security.config.annotation.web.configuration.EnableWebSecurity; import org.springframework.security.config.http.SessionCreationPolicy; import org.springframework.security.oauth2.server.resource.authentication.JwtAuthenticationConverter; import org.springframework.security.oauth2.server.resource.authentication.JwtGrantedAuthoritiesConverter; import org.springframework.security.web.SecurityFilterChain; @Configuration @EnableWebSecurity @EnableMethodSecurity public class ResourceServerConfig { @Bean public SecurityFilterChain securityFilterChain(HttpSecurity http) throws Exception { http .sessionManagement(s -> s.sessionCreationPolicy(SessionCreationPolicy.STATELESS)) .authorizeHttpRequests(auth -> auth .requestMatchers("/api/public/**").permitAll() .requestMatchers("/api/admin/**").hasAuthority("SCOPE_admin") .requestMatchers("/api/**").authenticated() ) .oauth2ResourceServer(oauth2 -> oauth2 .jwt(jwt -> jwt.jwtAuthenticationConverter(jwtAuthConverter())) ); return http.build(); } @Bean public JwtAuthenticationConverter jwtAuthConverter() { JwtGrantedAuthoritiesConverter grantedAuthorities = new JwtGrantedAuthoritiesConverter(); // 从 JWT 的 "scope" claim 提取权限 grantedAuthorities.setAuthoritiesClaimName("scope"); grantedAuthorities.setAuthorityPrefix("SCOPE_"); JwtAuthenticationConverter converter = new JwtAuthenticationConverter(); converter.setJwtGrantedAuthoritiesConverter(grantedAuthorities); return converter; } } /** * API 控制器 */ @RestController @RequestMapping("/api") public class ApiController { @GetMapping("/profile") public Map getProfile( @AuthenticationPrincipal Jwt jwt) { return Map.of( "user_id", jwt.getSubject(), "claims", jwt.getClaims(), "scopes", jwt.getClaimAsString("scope") ); } @GetMapping("/admin") @PreAuthorize("hasAuthority('SCOPE_admin')") public Map adminEndpoint( @AuthenticationPrincipal Jwt jwt) { return Map.of( "message", "管理员接口", "user_id", jwt.getSubject() ); } } ``` ### Go: Gin JWT Bearer 中间件 ```go package middleware import ( "context" "encoding/json" "fmt" "net/http" "strings" "sync" "time" "github.com/gin-gonic/gin" "github.com/golang-jwt/jwt/v5" "github.com/lestrrat-go/jwx/v2/jwk" ) // BearerConfig Bearer Token 验证配置 type BearerConfig struct { JwksURL string Issuer string Audience string } // JWKSCache JWKS 公钥缓存 type JWKSCache struct { mu sync.RWMutex keySet jwk.Set fetched time.Time ttl time.Duration url string } func NewJWKSCache(url string, ttl time.Duration) *JWKSCache { return &JWKSCache{url: url, ttl: ttl} } func (c *JWKSCache) GetKeySet(ctx context.Context) (jwk.Set, error) { c.mu.RLock() if c.keySet != nil && time.Since(c.fetched) < c.ttl { defer c.mu.RUnlock() return c.keySet, nil } c.mu.RUnlock() c.mu.Lock() defer c.mu.Unlock() set, err := jwk.Fetch(ctx, c.url) if err != nil { return nil, fmt.Errorf("获取 JWKS 失败: %w", err) } c.keySet = set c.fetched = time.Now() return set, nil } // BearerAuth Gin 中间件:Bearer Token 认证 func BearerAuth(cfg BearerConfig) gin.HandlerFunc { cache := NewJWKSCache(cfg.JwksURL, 1*time.Hour) return func(c *gin.Context) { // 提取 Bearer Token authHeader := c.GetHeader("Authorization") if !strings.HasPrefix(authHeader, "Bearer ") { c.AbortWithStatusJSON(http.StatusUnauthorized, gin.H{ "error": "缺少 Bearer Token", }) return } tokenStr := strings.TrimPrefix(authHeader, "Bearer ") // 获取 JWKS 公钥 keySet, err := cache.GetKeySet(c.Request.Context()) if err != nil { c.AbortWithStatusJSON(http.StatusInternalServerError, gin.H{ "error": "无法获取公钥", }) return } // 解析并验证 JWT token, err := jwt.Parse(tokenStr, func(t *jwt.Token) (interface{}, error) { kid, ok := t.Header["kid"].(string) if !ok { return nil, fmt.Errorf("缺少 kid") } key, found := keySet.LookupKeyID(kid) if !found { return nil, fmt.Errorf("找不到 kid=%s 的公钥", kid) } var rawKey interface{} if err := key.Raw(&rawKey); err != nil { return nil, err } return rawKey, nil }, jwt.WithIssuer(cfg.Issuer), jwt.WithAudience(cfg.Audience), jwt.WithValidMethods([]string{"RS256"}), ) if err != nil || !token.Valid { c.AbortWithStatusJSON(http.StatusUnauthorized, gin.H{ "error": fmt.Sprintf("Token 无效: %v", err), }) return } claims, ok := token.Claims.(jwt.MapClaims) if !ok { c.AbortWithStatusJSON(http.StatusUnauthorized, gin.H{ "error": "无法解析 claims", }) return } // 注入用户信息 c.Set("user_id", claims["sub"]) c.Set("claims", claims) c.Next() } } // RequireScope 检查 Token 是否包含所需 scope func RequireScope(required string) gin.HandlerFunc { return func(c *gin.Context) { claims, exists := c.Get("claims") if !exists { c.AbortWithStatusJSON(http.StatusUnauthorized, gin.H{ "error": "未认证", }) return } mapClaims := claims.(jwt.MapClaims) scopeStr, _ := mapClaims["scope"].(string) scopes := strings.Fields(scopeStr) for _, s := range scopes { if s == required { c.Next() return } } c.AbortWithStatusJSON(http.StatusForbidden, gin.H{ "error": fmt.Sprintf("需要 scope: %s", required), }) } } ``` ## 12.4 mTLS 客户端证书认证 最安全的 API 认证方式,适合服务间通信: ### mTLS 认证流程 ```{mermaid} sequenceDiagram participant C as 客户端 participant S as 服务器 participant CA as 证书颁发机构 Note over C,CA: 准备阶段(一次性) C->>CA: 申请客户端证书 CA->>C: 颁发客户端证书 + 私钥 S->>CA: 申请服务器证书 CA->>S: 颁发服务器证书 + 私钥 Note over C,S: TLS 握手(双向验证) C->>S: ClientHello S->>C: ServerHello + 服务器证书 C->>C: 验证服务器证书(用 CA 公钥) S->>C: CertificateRequest(要求客户端证书) C->>S: 客户端证书 + CertificateVerify S->>S: 验证客户端证书(用 CA 公钥) Note over C,S: 双向验证完成 ✅ 建立加密通道 C->>S: HTTP 请求(加密通道内,无需额外 Token) S->>S: 从证书 CN/SAN 提取客户端身份 S->>C: HTTP 响应 ``` ### Python: mTLS 服务端与客户端 ```python # --- mTLS 客户端 --- import httpx async def call_service_with_mtls(): """使用 mTLS 调用其他服务""" async with httpx.AsyncClient( cert=("client.crt", "client.key"), # 客户端证书 + 私钥 verify="ca.crt", # CA 证书(验证服务器) ) as client: response = await client.get("https://service-b:8443/api/data") return response.json() # --- mTLS 服务端(FastAPI + uvicorn) --- # 启动命令: # uvicorn main:app --host 0.0.0.0 --port 8443 \ # --ssl-keyfile server.key \ # --ssl-certfile server.crt \ # --ssl-ca-certs ca.crt \ # --ssl-cert-reqs 2 # 2 = CERT_REQUIRED(强制客户端证书) from fastapi import FastAPI, Request, HTTPException app = FastAPI(title="mTLS Service") @app.middleware("http") async def extract_client_identity(request: Request, call_next): """从客户端证书中提取身份信息""" # Nginx/Envoy 代理时,证书信息通常通过 Header 传递 client_cn = request.headers.get("X-Client-CN", "") client_serial = request.headers.get("X-Client-Serial", "") if not client_cn: # 直接 TLS 连接时,从 transport 获取 transport = request.scope.get("transport") if transport and hasattr(transport, "get_extra_info"): peercert = transport.get_extra_info("peercert") if peercert: for rdn in peercert.get("subject", ()): for attr_type, attr_value in rdn: if attr_type == "commonName": client_cn = attr_value request.state.client_cn = client_cn response = await call_next(request) return response @app.get("/api/data") async def get_data(request: Request): client = getattr(request.state, "client_cn", "unknown") return {"data": "sensitive", "authenticated_client": client} ``` ### Java: mTLS 服务端配置 ```java // application.yml // server: // port: 8443 // ssl: // enabled: true // key-store: classpath:server-keystore.p12 // key-store-password: changeit // key-store-type: PKCS12 // trust-store: classpath:truststore.p12 // trust-store-password: changeit // trust-store-type: PKCS12 // client-auth: need # need = 强制客户端证书 import org.springframework.web.bind.annotation.*; import jakarta.servlet.http.HttpServletRequest; import java.security.cert.X509Certificate; @RestController @RequestMapping("/api") public class MtlsController { @GetMapping("/data") public Map getData(HttpServletRequest request) { // 从请求中提取客户端证书 X509Certificate[] certs = (X509Certificate[]) request.getAttribute("jakarta.servlet.request.X509Certificate"); String clientCN = "unknown"; if (certs != null && certs.length > 0) { clientCN = certs[0].getSubjectX500Principal().getName(); } return Map.of( "data", "sensitive information", "authenticated_client", clientCN ); } } // --- mTLS 客户端(RestTemplate) --- import org.apache.hc.client5.http.impl.classic.CloseableHttpClient; import org.apache.hc.client5.http.impl.classic.HttpClients; import org.apache.hc.client5.http.ssl.SSLConnectionSocketFactory; import org.apache.hc.core5.ssl.SSLContextBuilder; import org.springframework.http.client.HttpComponentsClientHttpRequestFactory; import org.springframework.web.client.RestTemplate; import javax.net.ssl.SSLContext; import java.io.File; @Configuration public class MtlsClientConfig { @Bean public RestTemplate mtlsRestTemplate() throws Exception { SSLContext sslContext = SSLContextBuilder.create() .loadKeyMaterial( new File("client-keystore.p12"), "changeit".toCharArray(), "changeit".toCharArray()) .loadTrustMaterial( new File("truststore.p12"), "changeit".toCharArray()) .build(); SSLConnectionSocketFactory socketFactory = new SSLConnectionSocketFactory(sslContext); CloseableHttpClient httpClient = HttpClients.custom() .setSSLSocketFactory(socketFactory) .build(); return new RestTemplate( new HttpComponentsClientHttpRequestFactory(httpClient)); } } ``` ### Go: mTLS 服务端与客户端 ```go package main import ( "crypto/tls" "crypto/x509" "fmt" "io" "log" "net/http" "os" "github.com/gin-gonic/gin" ) // --- mTLS 服务端 --- func StartMTLSServer() { // 加载 CA 证书(用于验证客户端证书) caCert, err := os.ReadFile("ca.crt") if err != nil { log.Fatal(err) } caCertPool := x509.NewCertPool() caCertPool.AppendCertsFromPEM(caCert) // TLS 配置:要求客户端证书 tlsConfig := &tls.Config{ ClientCAs: caCertPool, ClientAuth: tls.RequireAndVerifyClientCert, MinVersion: tls.VersionTLS13, } r := gin.Default() // 从客户端证书提取身份 r.Use(func(c *gin.Context) { if c.Request.TLS != nil && len(c.Request.TLS.PeerCertificates) > 0 { cert := c.Request.TLS.PeerCertificates[0] c.Set("client_cn", cert.Subject.CommonName) c.Set("client_serial", cert.SerialNumber.String()) } c.Next() }) r.GET("/api/data", func(c *gin.Context) { clientCN := c.GetString("client_cn") c.JSON(http.StatusOK, gin.H{ "data": "sensitive information", "authenticated_client": clientCN, }) }) server := &http.Server{ Addr: ":8443", Handler: r, TLSConfig: tlsConfig, } log.Println("mTLS server starting on :8443") log.Fatal(server.ListenAndServeTLS("server.crt", "server.key")) } // --- mTLS 客户端 --- func CallWithMTLS(url string) ([]byte, error) { // 加载客户端证书 cert, err := tls.LoadX509KeyPair("client.crt", "client.key") if err != nil { return nil, fmt.Errorf("加载客户端证书失败: %w", err) } // 加载 CA 证书 caCert, err := os.ReadFile("ca.crt") if err != nil { return nil, fmt.Errorf("加载 CA 证书失败: %w", err) } caCertPool := x509.NewCertPool() caCertPool.AppendCertsFromPEM(caCert) tlsConfig := &tls.Config{ Certificates: []tls.Certificate{cert}, RootCAs: caCertPool, MinVersion: tls.VersionTLS13, } client := &http.Client{ Transport: &http.Transport{TLSClientConfig: tlsConfig}, } resp, err := client.Get(url) if err != nil { return nil, err } defer resp.Body.Close() return io.ReadAll(resp.Body) } ``` ## 12.5 HMAC 签名认证 类似 AWS Signature V4,通过对请求内容签名来验证身份和完整性: ### HMAC 签名认证流程 ```{mermaid} sequenceDiagram participant C as 客户端 participant S as 服务器 participant DB as 密钥存储 Note over C: 构造签名 C->>C: 1. 拼接签名字符串
METHOD + PATH + TIMESTAMP + BODY_HASH C->>C: 2. HMAC-SHA256(secret_key, string_to_sign) C->>S: 请求 + Headers:
X-Access-Key: AK001
X-Timestamp: 1711036800
X-Signature: a3f2... S->>DB: 3. 用 Access Key 查找 Secret Key DB->>S: 返回 Secret Key S->>S: 4. 检查时间戳(±5 分钟防重放) S->>S: 5. 用相同算法重新计算签名 S->>S: 6. 恒定时间比较签名 alt 签名匹配 S->>C: 200 OK else 签名不匹配 S->>C: 401 Unauthorized end ``` ### Python: HMAC 签名生成和验证 ```python import hmac import hashlib import time from fastapi import FastAPI, Request, HTTPException app = FastAPI(title="HMAC Auth Demo") # --- 密钥存储 --- SECRET_KEYS = { "AK001": "sk-very-secret-key-001", "AK002": "sk-very-secret-key-002", } REPLAY_WINDOW = 300 # 5 分钟防重放窗口 # --- 客户端:签名生成 --- def sign_request( method: str, path: str, body: str, access_key: str, secret_key: str, ) -> dict: """对 API 请求进行 HMAC 签名""" timestamp = str(int(time.time())) body_hash = hashlib.sha256(body.encode()).hexdigest() # 构造签名字符串(规范化) string_to_sign = f"{method.upper()}\n{path}\n{timestamp}\n{body_hash}" # 计算 HMAC-SHA256 签名 signature = hmac.new( secret_key.encode(), string_to_sign.encode(), hashlib.sha256, ).hexdigest() return { "X-Access-Key": access_key, "X-Timestamp": timestamp, "X-Signature": signature, } # --- 服务端:签名验证 --- def verify_signature( method: str, path: str, body: str, headers: dict, ) -> str: """验证 HMAC 签名,返回 access_key""" access_key = headers.get("x-access-key", "") timestamp = headers.get("x-timestamp", "") signature = headers.get("x-signature", "") # 1. 检查必要 Header if not all([access_key, timestamp, signature]): raise ValueError("缺少签名 Header") # 2. 查找 Secret Key secret_key = SECRET_KEYS.get(access_key) if not secret_key: raise ValueError("无效的 Access Key") # 3. 检查时间戳(防重放) try: ts = int(timestamp) except ValueError: raise ValueError("无效的时间戳") if abs(time.time() - ts) > REPLAY_WINDOW: raise ValueError("请求已过期(时间戳超出窗口)") # 4. 重新计算签名 body_hash = hashlib.sha256(body.encode()).hexdigest() string_to_sign = f"{method.upper()}\n{path}\n{timestamp}\n{body_hash}" expected = hmac.new( secret_key.encode(), string_to_sign.encode(), hashlib.sha256, ).hexdigest() # 5. 恒定时间比较(防时序攻击) if not hmac.compare_digest(signature, expected): raise ValueError("签名不匹配") return access_key # --- FastAPI 中间件 --- @app.middleware("http") async def hmac_auth_middleware(request: Request, call_next): if request.url.path.startswith("/api/"): body = (await request.body()).decode() try: access_key = verify_signature( method=request.method, path=request.url.path, body=body, headers=dict(request.headers), ) request.state.access_key = access_key except ValueError as e: raise HTTPException(status_code=401, detail=str(e)) response = await call_next(request) return response @app.post("/api/payment") async def create_payment(request: Request): return { "message": "支付已创建", "authenticated_by": request.state.access_key, } ``` ### Java: HMAC 签名 Filter ```java import jakarta.servlet.*; import jakarta.servlet.http.HttpServletRequest; import jakarta.servlet.http.HttpServletResponse; import org.springframework.stereotype.Component; import javax.crypto.Mac; import javax.crypto.spec.SecretKeySpec; import java.io.IOException; import java.nio.charset.StandardCharsets; import java.security.MessageDigest; import java.time.Instant; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; @Component public class HmacAuthFilter implements Filter { private static final long REPLAY_WINDOW = 300; // 5 分钟 private final Map secretKeys = new ConcurrentHashMap<>(); public HmacAuthFilter() { secretKeys.put("AK001", "sk-very-secret-key-001"); secretKeys.put("AK002", "sk-very-secret-key-002"); } @Override public void doFilter(ServletRequest req, ServletResponse res, FilterChain chain) throws IOException, ServletException { HttpServletRequest request = (HttpServletRequest) req; HttpServletResponse response = (HttpServletResponse) res; if (!request.getRequestURI().startsWith("/api/")) { chain.doFilter(req, res); return; } String accessKey = request.getHeader("X-Access-Key"); String timestamp = request.getHeader("X-Timestamp"); String signature = request.getHeader("X-Signature"); // 1. 检查必要 Header if (accessKey == null || timestamp == null || signature == null) { sendError(response, 401, "缺少签名 Header"); return; } // 2. 查找 Secret Key String secretKey = secretKeys.get(accessKey); if (secretKey == null) { sendError(response, 401, "无效的 Access Key"); return; } // 3. 检查时间戳 long ts; try { ts = Long.parseLong(timestamp); } catch (NumberFormatException e) { sendError(response, 401, "无效的时间戳"); return; } if (Math.abs(Instant.now().getEpochSecond() - ts) > REPLAY_WINDOW) { sendError(response, 401, "请求已过期"); return; } // 4. 读取 Body 并计算签名 CachedBodyHttpServletRequest cachedRequest = new CachedBodyHttpServletRequest(request); String body = new String(cachedRequest.getCachedBody(), StandardCharsets.UTF_8); String bodyHash = sha256(body); String stringToSign = String.join("\n", request.getMethod(), request.getRequestURI(), timestamp, bodyHash); String expected = hmacSha256(secretKey, stringToSign); // 5. 恒定时间比较 if (!MessageDigest.isEqual( signature.getBytes(StandardCharsets.UTF_8), expected.getBytes(StandardCharsets.UTF_8))) { sendError(response, 401, "签名不匹配"); return; } cachedRequest.setAttribute("access_key", accessKey); chain.doFilter(cachedRequest, response); } private String sha256(String input) { try { MessageDigest digest = MessageDigest.getInstance("SHA-256"); byte[] hash = digest.digest(input.getBytes(StandardCharsets.UTF_8)); return bytesToHex(hash); } catch (Exception e) { throw new RuntimeException(e); } } private String hmacSha256(String key, String data) { try { Mac mac = Mac.getInstance("HmacSHA256"); mac.init(new SecretKeySpec( key.getBytes(StandardCharsets.UTF_8), "HmacSHA256")); return bytesToHex( mac.doFinal(data.getBytes(StandardCharsets.UTF_8))); } catch (Exception e) { throw new RuntimeException(e); } } private String bytesToHex(byte[] bytes) { StringBuilder sb = new StringBuilder(); for (byte b : bytes) sb.append(String.format("%02x", b)); return sb.toString(); } private void sendError(HttpServletResponse response, int status, String message) throws IOException { response.setStatus(status); response.setContentType("application/json"); response.getWriter().write( String.format("{\"error\":\"%s\"}", message)); } } ``` ### Go: HMAC 签名中间件 ```go package middleware import ( "bytes" "crypto/hmac" "crypto/sha256" "encoding/hex" "fmt" "io" "math" "net/http" "strconv" "time" "github.com/gin-gonic/gin" ) // HmacConfig HMAC 签名配置 type HmacConfig struct { SecretKeys map[string]string // access_key -> secret_key ReplayWindow time.Duration // 防重放窗口 } // HmacAuth Gin 中间件:HMAC 签名认证 func HmacAuth(cfg HmacConfig) gin.HandlerFunc { return func(c *gin.Context) { accessKey := c.GetHeader("X-Access-Key") timestamp := c.GetHeader("X-Timestamp") signature := c.GetHeader("X-Signature") // 1. 检查必要 Header if accessKey == "" || timestamp == "" || signature == "" { c.AbortWithStatusJSON(http.StatusUnauthorized, gin.H{ "error": "缺少签名 Header", }) return } // 2. 查找 Secret Key secretKey, ok := cfg.SecretKeys[accessKey] if !ok { c.AbortWithStatusJSON(http.StatusUnauthorized, gin.H{ "error": "无效的 Access Key", }) return } // 3. 检查时间戳 ts, err := strconv.ParseInt(timestamp, 10, 64) if err != nil { c.AbortWithStatusJSON(http.StatusUnauthorized, gin.H{ "error": "无效的时间戳", }) return } diff := math.Abs(float64(time.Now().Unix() - ts)) if diff > cfg.ReplayWindow.Seconds() { c.AbortWithStatusJSON(http.StatusUnauthorized, gin.H{ "error": "请求已过期", }) return } // 4. 读取 Body bodyBytes, err := io.ReadAll(c.Request.Body) if err != nil { c.AbortWithStatusJSON(http.StatusBadRequest, gin.H{ "error": "无法读取请求体", }) return } // 重新设置 Body(供后续 Handler 使用) c.Request.Body = io.NopCloser(bytes.NewBuffer(bodyBytes)) // 5. 计算签名 bodyHash := sha256Hex(string(bodyBytes)) stringToSign := fmt.Sprintf("%s\n%s\n%s\n%s", c.Request.Method, c.Request.URL.Path, timestamp, bodyHash) mac := hmac.New(sha256.New, []byte(secretKey)) mac.Write([]byte(stringToSign)) expected := hex.EncodeToString(mac.Sum(nil)) // 6. 恒定时间比较 if !hmac.Equal([]byte(signature), []byte(expected)) { c.AbortWithStatusJSON(http.StatusUnauthorized, gin.H{ "error": "签名不匹配", }) return } c.Set("access_key", accessKey) c.Next() } } func sha256Hex(s string) string { h := sha256.Sum256([]byte(s)) return hex.EncodeToString(h[:]) } ``` ## 12.6 API Gateway 认证模式 ### 集中认证 vs 分布式认证 ```{mermaid} flowchart LR subgraph 集中认证(推荐) C1[客户端] --> GW1[API Gateway
① 验证 Token
② 速率限制
③ 注入用户信息] GW1 --> MS1[微服务
信任 GW 的转发] end subgraph 分布式认证 C2[客户端] --> GW2[API Gateway
仅路由转发] GW2 --> MS2[微服务
自己验证 Token] end ``` ### Token 内省 vs 本地验证 | 方式 | Token 内省(Introspection) | 本地验证(JWT) | |------|---------------------------|----------------| | 原理 | 调用授权服务器验证 Token | 本地用公钥验证签名 | | 延迟 | 高(网络调用) | 低(本地计算) | | 实时性 | 实时(可检查撤销) | 延迟(依赖过期时间) | | 依赖 | 授权服务器必须可用 | 仅需公钥(JWKS) | | 适用 | Opaque Token | JWT | ## 12.7 Service-to-Service 认证 ```{mermaid} flowchart TB L1[Level 1: 无认证
内网信任 ❌ 不安全] L2[Level 2: Service Account + Token
静态凭证,需手动轮换] L3[Level 3: Service Mesh mTLS
自动证书管理,透明加密] L4[Level 4: SPIFFE/SPIRE
统一工作负载身份,跨平台,自动轮换] L1 -->|演进| L2 L2 -->|演进| L3 L3 -->|演进| L4 style L1 fill:#ffcccc style L2 fill:#ffffcc style L3 fill:#ccffcc style L4 fill:#ccccff ``` ## 12.8 API 认证方式对比 | 方式 | 安全性 | 复杂度 | 防重放 | 防篡改 | 携带身份 | 适用场景 | |------|--------|--------|--------|--------|---------|---------| | **API Key** | ⭐⭐ | 低 | ❌ | ❌ | ❌ | 公开 API、低安全要求 | | **Bearer Token (JWT)** | ⭐⭐⭐⭐ | 中 | ✅(过期) | ✅(签名) | ✅ | Web/Mobile API | | **HMAC 签名** | ⭐⭐⭐⭐ | 高 | ✅(时间戳) | ✅(签名) | ❌ | 高安全 API(支付) | | **mTLS** | ⭐⭐⭐⭐⭐ | 高 | ✅(TLS) | ✅(TLS) | ✅(证书) | 服务间通信 | | **SPIFFE SVID** | ⭐⭐⭐⭐⭐ | 中 | ✅ | ✅ | ✅ | 云原生服务间通信 | ## 12.9 认证方式选型指南 ### 按场景选型 | 场景 | 推荐方式 | 理由 | |------|---------|------| | 公开数据 API(天气、汇率) | API Key | 简单,仅用于标识和限流 | | 用户数据 API(SaaS) | OAuth2 Bearer Token | 标准化,携带用户身份和权限 | | 支付/金融 API | HMAC 签名 + TLS | 请求完整性验证,防篡改 | | 微服务间通信(同集群) | mTLS(Service Mesh) | 零信任,自动证书管理 | | 微服务间通信(跨云) | SPIFFE/SPIRE | 统一身份,跨平台 | | 第三方 Webhook | HMAC 签名 | 验证来源真实性 | | 移动端 API | OAuth2 + PKCE | 公共客户端安全授权 | ### 组合使用 在实际生产环境中,通常需要组合多种认证方式: ```{mermaid} flowchart TD EXT[外部请求] --> GW[API Gateway] GW -->|Bearer Token 验证| AUTH[认证服务] GW -->|通过| MESH[Service Mesh] MESH -->|mTLS| SVC_A[服务 A] MESH -->|mTLS| SVC_B[服务 B] SVC_A -->|HMAC 签名| PAY[支付服务] SVC_B -->|mTLS| DB[数据库] style GW fill:#f9f,stroke:#333 style MESH fill:#bbf,stroke:#333 ``` **典型架构**: - **外部 → Gateway**:OAuth2 Bearer Token(用户认证) - **Gateway → 微服务**:mTLS(Service Mesh 自动管理) - **微服务 → 敏感服务**:HMAC 签名(额外的请求完整性保护) ## 12.10 小结 - **Bearer Token(JWT)** 是最通用的 API 认证方式 - **mTLS** 提供最强的安全保障,适合服务间通信 - **API Gateway 集中认证** 简化了微服务的认证逻辑 - **HMAC 签名** 适合需要请求完整性验证的高安全场景 - **SPIFFE/SPIRE** 是云原生时代服务间认证的最佳实践(详见第四部分) - 选择认证方式时需权衡:安全性、复杂度、性能、运维成本 - 生产环境通常**组合使用**多种认证方式,形成纵深防御