第十二章:API 认证模式
“在微服务时代,API 认证不再是可选项,而是每个请求的必经之路。”
mindmap
root((API 认证))
认证方式
API Key
Bearer Token
mTLS
HMAC 签名
Gateway 模式
集中认证
分布式认证
Token 内省
服务间认证
Service Account
Service Mesh
SPIFFE/SPIRE
最佳实践
生命周期管理
速率限制
审计日志
12.1 API 认证的特殊性
API 认证与传统 Web 认证有本质区别:
维度 |
Web 认证 |
API 认证 |
|---|---|---|
交互方式 |
用户交互(浏览器) |
程序化(HTTP 客户端) |
状态 |
有状态(Session) |
无状态(Token) |
凭证类型 |
Cookie |
Authorization Header |
调用者 |
人类 |
人类 + 机器 |
频率 |
低(页面级) |
高(API 级) |
API 认证方式选型决策树
flowchart TD
A[选择 API 认证方式] --> B{调用者是谁?}
B -->|服务间通信| C{网络环境?}
B -->|第三方开发者| D{安全要求?}
B -->|终端用户 App| E[OAuth2 Bearer Token]
C -->|同一集群/Service Mesh| F[mTLS]
C -->|跨网络/跨云| G{需要请求签名?}
G -->|是| H[HMAC 签名]
G -->|否| I[mTLS 或 Bearer Token]
D -->|低:公开数据| J[API Key]
D -->|中:用户数据| E
D -->|高:支付/金融| K[HMAC 签名 + mTLS]
12.2 API Key 认证
最简单的 API 认证方式,适合低安全要求的场景:
API Key 认证流程
sequenceDiagram
participant C as 客户端
participant G as API Gateway
participant S as 后端服务
participant DB as Key 存储
C->>G: GET /api/data<br/>X-API-Key: key-abc123
G->>DB: 查询 API Key
alt Key 有效
DB->>G: 返回 Key 信息(权限、配额)
G->>G: 检查速率限制
G->>S: 转发请求 + 注入客户端信息
S->>G: 响应数据
G->>C: 200 OK + 数据
else Key 无效或过期
DB->>G: Key 不存在
G->>C: 401 Unauthorized
end
Python: FastAPI API Key 中间件
from fastapi import FastAPI, Header, Query, HTTPException, Depends, Request
from fastapi.security import APIKeyHeader, APIKeyQuery
from typing import Optional
from datetime import datetime
import hashlib
import secrets
app = FastAPI(title="API Key Auth Demo")
# --- API Key 存储(生产环境应使用数据库) ---
API_KEYS = {
# 存储 Key 的哈希值,而非明文
hashlib.sha256(b"sk-live-abc123def456").hexdigest(): {
"name": "Service A",
"permissions": ["read"],
"rate_limit": 1000,
"created_at": "2025-01-01",
},
hashlib.sha256(b"sk-live-xyz789ghi012").hexdigest(): {
"name": "Service B",
"permissions": ["read", "write"],
"rate_limit": 5000,
"created_at": "2025-06-01",
},
}
# 支持两种传递方式:Header 和 Query
api_key_header = APIKeyHeader(name="X-API-Key", auto_error=False)
api_key_query = APIKeyQuery(name="api_key", auto_error=False)
async def verify_api_key(
header_key: Optional[str] = Depends(api_key_header),
query_key: Optional[str] = Depends(api_key_query),
) -> dict:
"""验证 API Key(支持 Header 和 Query 两种方式)"""
api_key = header_key or query_key
if not api_key:
raise HTTPException(
status_code=401,
detail="缺少 API Key,请通过 X-API-Key Header 或 api_key Query 参数传递",
)
# 对比哈希值(避免明文存储)
key_hash = hashlib.sha256(api_key.encode()).hexdigest()
client = API_KEYS.get(key_hash)
if not client:
raise HTTPException(status_code=401, detail="无效的 API Key")
return client
@app.get("/api/data")
async def get_data(client: dict = Depends(verify_api_key)):
if "read" not in client["permissions"]:
raise HTTPException(status_code=403, detail="权限不足")
return {"data": "sensitive information", "client": client["name"]}
@app.post("/api/data")
async def create_data(client: dict = Depends(verify_api_key)):
if "write" not in client["permissions"]:
raise HTTPException(status_code=403, detail="权限不足:需要 write 权限")
return {"message": "数据已创建", "client": client["name"]}
# --- API Key 生成工具 ---
def generate_api_key(prefix: str = "sk-live") -> str:
"""生成安全的 API Key"""
random_part = secrets.token_urlsafe(32)
return f"{prefix}-{random_part}"
Java: Spring Boot API Key Filter
import jakarta.servlet.*;
import jakarta.servlet.http.HttpServletRequest;
import jakarta.servlet.http.HttpServletResponse;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
import org.springframework.boot.web.servlet.FilterRegistrationBean;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.security.MessageDigest;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
/**
* API Key 认证 Filter
*/
@Component
public class ApiKeyFilter implements Filter {
private static final String API_KEY_HEADER = "X-API-Key";
private static final String API_KEY_QUERY = "api_key";
// 存储 Key 哈希 → 客户端信息
private final Map<String, ApiClient> apiKeys = new ConcurrentHashMap<>();
public ApiKeyFilter() {
// 初始化(生产环境从数据库加载)
apiKeys.put(sha256("sk-live-abc123def456"),
new ApiClient("Service A", Set.of("read")));
apiKeys.put(sha256("sk-live-xyz789ghi012"),
new ApiClient("Service B", Set.of("read", "write")));
}
@Override
public void doFilter(ServletRequest req, ServletResponse res,
FilterChain chain) throws IOException, ServletException {
HttpServletRequest request = (HttpServletRequest) req;
HttpServletResponse response = (HttpServletResponse) res;
// 从 Header 或 Query 获取 API Key
String apiKey = request.getHeader(API_KEY_HEADER);
if (apiKey == null || apiKey.isBlank()) {
apiKey = request.getParameter(API_KEY_QUERY);
}
if (apiKey == null || apiKey.isBlank()) {
response.setStatus(401);
response.setContentType("application/json");
response.getWriter().write("{\"error\":\"缺少 API Key\"}");
return;
}
ApiClient client = apiKeys.get(sha256(apiKey));
if (client == null) {
response.setStatus(401);
response.setContentType("application/json");
response.getWriter().write("{\"error\":\"无效的 API Key\"}");
return;
}
// 将客户端信息注入请求属性
request.setAttribute("api_client", client);
chain.doFilter(request, response);
}
private String sha256(String input) {
try {
MessageDigest digest = MessageDigest.getInstance("SHA-256");
byte[] hash = digest.digest(input.getBytes(StandardCharsets.UTF_8));
StringBuilder hex = new StringBuilder();
for (byte b : hash) hex.append(String.format("%02x", b));
return hex.toString();
} catch (Exception e) {
throw new RuntimeException(e);
}
}
public record ApiClient(String name, Set<String> permissions) {}
}
/**
* Filter 注册配置
*/
@Configuration
public class FilterConfig {
@Bean
public FilterRegistrationBean<ApiKeyFilter> apiKeyFilterRegistration(
ApiKeyFilter filter) {
FilterRegistrationBean<ApiKeyFilter> reg = new FilterRegistrationBean<>();
reg.setFilter(filter);
reg.addUrlPatterns("/api/*");
reg.setOrder(1);
return reg;
}
}
Go: Gin API Key 中间件
package middleware
import (
"crypto/sha256"
"encoding/hex"
"net/http"
"sync"
"github.com/gin-gonic/gin"
)
// ApiClient API 客户端信息
type ApiClient struct {
Name string
Permissions map[string]bool
}
// ApiKeyStore API Key 存储
type ApiKeyStore struct {
mu sync.RWMutex
keys map[string]*ApiClient // sha256(key) -> client
}
func NewApiKeyStore() *ApiKeyStore {
store := &ApiKeyStore{keys: make(map[string]*ApiClient)}
// 初始化(生产环境从数据库加载)
store.Register("sk-live-abc123def456", &ApiClient{
Name: "Service A",
Permissions: map[string]bool{"read": true},
})
store.Register("sk-live-xyz789ghi012", &ApiClient{
Name: "Service B",
Permissions: map[string]bool{"read": true, "write": true},
})
return store
}
func (s *ApiKeyStore) Register(key string, client *ApiClient) {
hash := sha256Hex(key)
s.mu.Lock()
defer s.mu.Unlock()
s.keys[hash] = client
}
func (s *ApiKeyStore) Lookup(key string) (*ApiClient, bool) {
hash := sha256Hex(key)
s.mu.RLock()
defer s.mu.RUnlock()
client, ok := s.keys[hash]
return client, ok
}
func sha256Hex(s string) string {
h := sha256.Sum256([]byte(s))
return hex.EncodeToString(h[:])
}
// ApiKeyAuth Gin 中间件:API Key 认证
func ApiKeyAuth(store *ApiKeyStore) gin.HandlerFunc {
return func(c *gin.Context) {
// 从 Header 或 Query 获取 API Key
apiKey := c.GetHeader("X-API-Key")
if apiKey == "" {
apiKey = c.Query("api_key")
}
if apiKey == "" {
c.AbortWithStatusJSON(http.StatusUnauthorized, gin.H{
"error": "缺少 API Key",
})
return
}
client, ok := store.Lookup(apiKey)
if !ok {
c.AbortWithStatusJSON(http.StatusUnauthorized, gin.H{
"error": "无效的 API Key",
})
return
}
// 注入客户端信息
c.Set("api_client", client)
c.Set("client_name", client.Name)
c.Next()
}
}
// RequirePermission 权限检查中间件
func RequirePermission(perm string) gin.HandlerFunc {
return func(c *gin.Context) {
client, exists := c.Get("api_client")
if !exists {
c.AbortWithStatusJSON(http.StatusUnauthorized, gin.H{
"error": "未认证",
})
return
}
apiClient := client.(*ApiClient)
if !apiClient.Permissions[perm] {
c.AbortWithStatusJSON(http.StatusForbidden, gin.H{
"error": "权限不足:需要 " + perm + " 权限",
})
return
}
c.Next()
}
}
API Key 的问题:
❌ 无过期时间(除非手动管理)
❌ 无法携带用户身份信息
❌ 容易泄露(日志、URL、代码仓库)
❌ 无标准的轮换机制
12.3 Bearer Token 认证
使用 OAuth2 Access Token,是最推荐的 API 认证方式:
OAuth2 Bearer Token 认证流程
sequenceDiagram
participant C as 客户端
participant AS as 授权服务器
participant RS as 资源服务器 (API)
participant JWKS as JWKS 端点
Note over C,AS: 获取 Token
C->>AS: POST /oauth/token<br/>{grant_type, client_id, client_secret}
AS->>AS: 验证客户端凭证
AS->>C: {access_token: "eyJ...", expires_in: 3600}
Note over C,RS: 使用 Token 访问 API
C->>RS: GET /api/resource<br/>Authorization: Bearer eyJ...
RS->>RS: 解析 JWT Header
RS->>JWKS: 获取公钥(缓存)
JWKS->>RS: 返回 JWK Set
RS->>RS: 验证签名 + 过期时间 + audience + issuer
alt Token 有效
RS->>C: 200 OK + 资源数据
else Token 无效/过期
RS->>C: 401 Unauthorized
end
Python: FastAPI Bearer Token 验证
from fastapi import FastAPI, Depends, HTTPException, Security
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
import jwt
import httpx
from functools import lru_cache
from typing import Optional
from datetime import datetime
app = FastAPI(title="Bearer Token Auth Demo")
security = HTTPBearer()
# --- 配置 ---
AUTH_SERVER = "https://auth.example.com"
JWKS_URL = f"{AUTH_SERVER}/.well-known/jwks.json"
EXPECTED_AUDIENCE = "my-api"
EXPECTED_ISSUER = AUTH_SERVER
# --- JWKS 公钥缓存 ---
_jwks_cache: Optional[dict] = None
_jwks_cache_time: float = 0
async def get_jwks() -> dict:
"""获取并缓存 JWKS 公钥集"""
global _jwks_cache, _jwks_cache_time
import time
now = time.time()
# 缓存 1 小时
if _jwks_cache and (now - _jwks_cache_time) < 3600:
return _jwks_cache
async with httpx.AsyncClient() as client:
resp = await client.get(JWKS_URL)
resp.raise_for_status()
_jwks_cache = resp.json()
_jwks_cache_time = now
return _jwks_cache
def get_public_key(token: str, jwks: dict):
"""从 JWKS 中找到匹配的公钥"""
header = jwt.get_unverified_header(token)
kid = header.get("kid")
for key in jwks.get("keys", []):
if key.get("kid") == kid:
return jwt.algorithms.RSAAlgorithm.from_jwk(key)
raise ValueError(f"找不到 kid={kid} 对应的公钥")
async def verify_bearer_token(
credentials: HTTPAuthorizationCredentials = Security(security),
) -> dict:
"""验证 Bearer Token 并返回 payload"""
token = credentials.credentials
try:
jwks = await get_jwks()
public_key = get_public_key(token, jwks)
payload = jwt.decode(
token,
public_key,
algorithms=["RS256"],
audience=EXPECTED_AUDIENCE,
issuer=EXPECTED_ISSUER,
options={"require": ["exp", "iss", "aud", "sub"]},
)
return payload
except jwt.ExpiredSignatureError:
raise HTTPException(status_code=401, detail="Token 已过期")
except jwt.InvalidAudienceError:
raise HTTPException(status_code=401, detail="Token audience 不匹配")
except jwt.InvalidIssuerError:
raise HTTPException(status_code=401, detail="Token issuer 不匹配")
except jwt.InvalidTokenError as e:
raise HTTPException(status_code=401, detail=f"无效的 Token: {e}")
# --- 权限检查 ---
def require_scope(required: str):
"""检查 Token 是否包含所需的 scope"""
async def checker(payload: dict = Depends(verify_bearer_token)):
scopes = payload.get("scope", "").split()
if required not in scopes:
raise HTTPException(
status_code=403,
detail=f"需要 scope: {required}",
)
return payload
return checker
@app.get("/api/profile")
async def get_profile(user: dict = Depends(verify_bearer_token)):
return {
"user_id": user["sub"],
"roles": user.get("roles", []),
"scopes": user.get("scope", "").split(),
}
@app.get("/api/admin")
async def admin_endpoint(user: dict = Depends(require_scope("admin"))):
return {"message": "管理员接口", "user_id": user["sub"]}
Java: Spring Resource Server
// application.yml
// spring:
// security:
// oauth2:
// resourceserver:
// jwt:
// issuer-uri: https://auth.example.com
// audiences: my-api
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.security.config.annotation.method.configuration.EnableMethodSecurity;
import org.springframework.security.config.annotation.web.builders.HttpSecurity;
import org.springframework.security.config.annotation.web.configuration.EnableWebSecurity;
import org.springframework.security.config.http.SessionCreationPolicy;
import org.springframework.security.oauth2.server.resource.authentication.JwtAuthenticationConverter;
import org.springframework.security.oauth2.server.resource.authentication.JwtGrantedAuthoritiesConverter;
import org.springframework.security.web.SecurityFilterChain;
@Configuration
@EnableWebSecurity
@EnableMethodSecurity
public class ResourceServerConfig {
@Bean
public SecurityFilterChain securityFilterChain(HttpSecurity http) throws Exception {
http
.sessionManagement(s -> s.sessionCreationPolicy(SessionCreationPolicy.STATELESS))
.authorizeHttpRequests(auth -> auth
.requestMatchers("/api/public/**").permitAll()
.requestMatchers("/api/admin/**").hasAuthority("SCOPE_admin")
.requestMatchers("/api/**").authenticated()
)
.oauth2ResourceServer(oauth2 -> oauth2
.jwt(jwt -> jwt.jwtAuthenticationConverter(jwtAuthConverter()))
);
return http.build();
}
@Bean
public JwtAuthenticationConverter jwtAuthConverter() {
JwtGrantedAuthoritiesConverter grantedAuthorities =
new JwtGrantedAuthoritiesConverter();
// 从 JWT 的 "scope" claim 提取权限
grantedAuthorities.setAuthoritiesClaimName("scope");
grantedAuthorities.setAuthorityPrefix("SCOPE_");
JwtAuthenticationConverter converter = new JwtAuthenticationConverter();
converter.setJwtGrantedAuthoritiesConverter(grantedAuthorities);
return converter;
}
}
/**
* API 控制器
*/
@RestController
@RequestMapping("/api")
public class ApiController {
@GetMapping("/profile")
public Map<String, Object> getProfile(
@AuthenticationPrincipal Jwt jwt) {
return Map.of(
"user_id", jwt.getSubject(),
"claims", jwt.getClaims(),
"scopes", jwt.getClaimAsString("scope")
);
}
@GetMapping("/admin")
@PreAuthorize("hasAuthority('SCOPE_admin')")
public Map<String, Object> adminEndpoint(
@AuthenticationPrincipal Jwt jwt) {
return Map.of(
"message", "管理员接口",
"user_id", jwt.getSubject()
);
}
}
Go: Gin JWT Bearer 中间件
package middleware
import (
"context"
"encoding/json"
"fmt"
"net/http"
"strings"
"sync"
"time"
"github.com/gin-gonic/gin"
"github.com/golang-jwt/jwt/v5"
"github.com/lestrrat-go/jwx/v2/jwk"
)
// BearerConfig Bearer Token 验证配置
type BearerConfig struct {
JwksURL string
Issuer string
Audience string
}
// JWKSCache JWKS 公钥缓存
type JWKSCache struct {
mu sync.RWMutex
keySet jwk.Set
fetched time.Time
ttl time.Duration
url string
}
func NewJWKSCache(url string, ttl time.Duration) *JWKSCache {
return &JWKSCache{url: url, ttl: ttl}
}
func (c *JWKSCache) GetKeySet(ctx context.Context) (jwk.Set, error) {
c.mu.RLock()
if c.keySet != nil && time.Since(c.fetched) < c.ttl {
defer c.mu.RUnlock()
return c.keySet, nil
}
c.mu.RUnlock()
c.mu.Lock()
defer c.mu.Unlock()
set, err := jwk.Fetch(ctx, c.url)
if err != nil {
return nil, fmt.Errorf("获取 JWKS 失败: %w", err)
}
c.keySet = set
c.fetched = time.Now()
return set, nil
}
// BearerAuth Gin 中间件:Bearer Token 认证
func BearerAuth(cfg BearerConfig) gin.HandlerFunc {
cache := NewJWKSCache(cfg.JwksURL, 1*time.Hour)
return func(c *gin.Context) {
// 提取 Bearer Token
authHeader := c.GetHeader("Authorization")
if !strings.HasPrefix(authHeader, "Bearer ") {
c.AbortWithStatusJSON(http.StatusUnauthorized, gin.H{
"error": "缺少 Bearer Token",
})
return
}
tokenStr := strings.TrimPrefix(authHeader, "Bearer ")
// 获取 JWKS 公钥
keySet, err := cache.GetKeySet(c.Request.Context())
if err != nil {
c.AbortWithStatusJSON(http.StatusInternalServerError, gin.H{
"error": "无法获取公钥",
})
return
}
// 解析并验证 JWT
token, err := jwt.Parse(tokenStr, func(t *jwt.Token) (interface{}, error) {
kid, ok := t.Header["kid"].(string)
if !ok {
return nil, fmt.Errorf("缺少 kid")
}
key, found := keySet.LookupKeyID(kid)
if !found {
return nil, fmt.Errorf("找不到 kid=%s 的公钥", kid)
}
var rawKey interface{}
if err := key.Raw(&rawKey); err != nil {
return nil, err
}
return rawKey, nil
},
jwt.WithIssuer(cfg.Issuer),
jwt.WithAudience(cfg.Audience),
jwt.WithValidMethods([]string{"RS256"}),
)
if err != nil || !token.Valid {
c.AbortWithStatusJSON(http.StatusUnauthorized, gin.H{
"error": fmt.Sprintf("Token 无效: %v", err),
})
return
}
claims, ok := token.Claims.(jwt.MapClaims)
if !ok {
c.AbortWithStatusJSON(http.StatusUnauthorized, gin.H{
"error": "无法解析 claims",
})
return
}
// 注入用户信息
c.Set("user_id", claims["sub"])
c.Set("claims", claims)
c.Next()
}
}
// RequireScope 检查 Token 是否包含所需 scope
func RequireScope(required string) gin.HandlerFunc {
return func(c *gin.Context) {
claims, exists := c.Get("claims")
if !exists {
c.AbortWithStatusJSON(http.StatusUnauthorized, gin.H{
"error": "未认证",
})
return
}
mapClaims := claims.(jwt.MapClaims)
scopeStr, _ := mapClaims["scope"].(string)
scopes := strings.Fields(scopeStr)
for _, s := range scopes {
if s == required {
c.Next()
return
}
}
c.AbortWithStatusJSON(http.StatusForbidden, gin.H{
"error": fmt.Sprintf("需要 scope: %s", required),
})
}
}
12.4 mTLS 客户端证书认证
最安全的 API 认证方式,适合服务间通信:
mTLS 认证流程
sequenceDiagram
participant C as 客户端
participant S as 服务器
participant CA as 证书颁发机构
Note over C,CA: 准备阶段(一次性)
C->>CA: 申请客户端证书
CA->>C: 颁发客户端证书 + 私钥
S->>CA: 申请服务器证书
CA->>S: 颁发服务器证书 + 私钥
Note over C,S: TLS 握手(双向验证)
C->>S: ClientHello
S->>C: ServerHello + 服务器证书
C->>C: 验证服务器证书(用 CA 公钥)
S->>C: CertificateRequest(要求客户端证书)
C->>S: 客户端证书 + CertificateVerify
S->>S: 验证客户端证书(用 CA 公钥)
Note over C,S: 双向验证完成 ✅ 建立加密通道
C->>S: HTTP 请求(加密通道内,无需额外 Token)
S->>S: 从证书 CN/SAN 提取客户端身份
S->>C: HTTP 响应
Python: mTLS 服务端与客户端
# --- mTLS 客户端 ---
import httpx
async def call_service_with_mtls():
"""使用 mTLS 调用其他服务"""
async with httpx.AsyncClient(
cert=("client.crt", "client.key"), # 客户端证书 + 私钥
verify="ca.crt", # CA 证书(验证服务器)
) as client:
response = await client.get("https://service-b:8443/api/data")
return response.json()
# --- mTLS 服务端(FastAPI + uvicorn) ---
# 启动命令:
# uvicorn main:app --host 0.0.0.0 --port 8443 \
# --ssl-keyfile server.key \
# --ssl-certfile server.crt \
# --ssl-ca-certs ca.crt \
# --ssl-cert-reqs 2 # 2 = CERT_REQUIRED(强制客户端证书)
from fastapi import FastAPI, Request, HTTPException
app = FastAPI(title="mTLS Service")
@app.middleware("http")
async def extract_client_identity(request: Request, call_next):
"""从客户端证书中提取身份信息"""
# Nginx/Envoy 代理时,证书信息通常通过 Header 传递
client_cn = request.headers.get("X-Client-CN", "")
client_serial = request.headers.get("X-Client-Serial", "")
if not client_cn:
# 直接 TLS 连接时,从 transport 获取
transport = request.scope.get("transport")
if transport and hasattr(transport, "get_extra_info"):
peercert = transport.get_extra_info("peercert")
if peercert:
for rdn in peercert.get("subject", ()):
for attr_type, attr_value in rdn:
if attr_type == "commonName":
client_cn = attr_value
request.state.client_cn = client_cn
response = await call_next(request)
return response
@app.get("/api/data")
async def get_data(request: Request):
client = getattr(request.state, "client_cn", "unknown")
return {"data": "sensitive", "authenticated_client": client}
Java: mTLS 服务端配置
// application.yml
// server:
// port: 8443
// ssl:
// enabled: true
// key-store: classpath:server-keystore.p12
// key-store-password: changeit
// key-store-type: PKCS12
// trust-store: classpath:truststore.p12
// trust-store-password: changeit
// trust-store-type: PKCS12
// client-auth: need # need = 强制客户端证书
import org.springframework.web.bind.annotation.*;
import jakarta.servlet.http.HttpServletRequest;
import java.security.cert.X509Certificate;
@RestController
@RequestMapping("/api")
public class MtlsController {
@GetMapping("/data")
public Map<String, Object> getData(HttpServletRequest request) {
// 从请求中提取客户端证书
X509Certificate[] certs = (X509Certificate[])
request.getAttribute("jakarta.servlet.request.X509Certificate");
String clientCN = "unknown";
if (certs != null && certs.length > 0) {
clientCN = certs[0].getSubjectX500Principal().getName();
}
return Map.of(
"data", "sensitive information",
"authenticated_client", clientCN
);
}
}
// --- mTLS 客户端(RestTemplate) ---
import org.apache.hc.client5.http.impl.classic.CloseableHttpClient;
import org.apache.hc.client5.http.impl.classic.HttpClients;
import org.apache.hc.client5.http.ssl.SSLConnectionSocketFactory;
import org.apache.hc.core5.ssl.SSLContextBuilder;
import org.springframework.http.client.HttpComponentsClientHttpRequestFactory;
import org.springframework.web.client.RestTemplate;
import javax.net.ssl.SSLContext;
import java.io.File;
@Configuration
public class MtlsClientConfig {
@Bean
public RestTemplate mtlsRestTemplate() throws Exception {
SSLContext sslContext = SSLContextBuilder.create()
.loadKeyMaterial(
new File("client-keystore.p12"),
"changeit".toCharArray(),
"changeit".toCharArray())
.loadTrustMaterial(
new File("truststore.p12"),
"changeit".toCharArray())
.build();
SSLConnectionSocketFactory socketFactory =
new SSLConnectionSocketFactory(sslContext);
CloseableHttpClient httpClient = HttpClients.custom()
.setSSLSocketFactory(socketFactory)
.build();
return new RestTemplate(
new HttpComponentsClientHttpRequestFactory(httpClient));
}
}
Go: mTLS 服务端与客户端
package main
import (
"crypto/tls"
"crypto/x509"
"fmt"
"io"
"log"
"net/http"
"os"
"github.com/gin-gonic/gin"
)
// --- mTLS 服务端 ---
func StartMTLSServer() {
// 加载 CA 证书(用于验证客户端证书)
caCert, err := os.ReadFile("ca.crt")
if err != nil {
log.Fatal(err)
}
caCertPool := x509.NewCertPool()
caCertPool.AppendCertsFromPEM(caCert)
// TLS 配置:要求客户端证书
tlsConfig := &tls.Config{
ClientCAs: caCertPool,
ClientAuth: tls.RequireAndVerifyClientCert,
MinVersion: tls.VersionTLS13,
}
r := gin.Default()
// 从客户端证书提取身份
r.Use(func(c *gin.Context) {
if c.Request.TLS != nil && len(c.Request.TLS.PeerCertificates) > 0 {
cert := c.Request.TLS.PeerCertificates[0]
c.Set("client_cn", cert.Subject.CommonName)
c.Set("client_serial", cert.SerialNumber.String())
}
c.Next()
})
r.GET("/api/data", func(c *gin.Context) {
clientCN := c.GetString("client_cn")
c.JSON(http.StatusOK, gin.H{
"data": "sensitive information",
"authenticated_client": clientCN,
})
})
server := &http.Server{
Addr: ":8443",
Handler: r,
TLSConfig: tlsConfig,
}
log.Println("mTLS server starting on :8443")
log.Fatal(server.ListenAndServeTLS("server.crt", "server.key"))
}
// --- mTLS 客户端 ---
func CallWithMTLS(url string) ([]byte, error) {
// 加载客户端证书
cert, err := tls.LoadX509KeyPair("client.crt", "client.key")
if err != nil {
return nil, fmt.Errorf("加载客户端证书失败: %w", err)
}
// 加载 CA 证书
caCert, err := os.ReadFile("ca.crt")
if err != nil {
return nil, fmt.Errorf("加载 CA 证书失败: %w", err)
}
caCertPool := x509.NewCertPool()
caCertPool.AppendCertsFromPEM(caCert)
tlsConfig := &tls.Config{
Certificates: []tls.Certificate{cert},
RootCAs: caCertPool,
MinVersion: tls.VersionTLS13,
}
client := &http.Client{
Transport: &http.Transport{TLSClientConfig: tlsConfig},
}
resp, err := client.Get(url)
if err != nil {
return nil, err
}
defer resp.Body.Close()
return io.ReadAll(resp.Body)
}
12.5 HMAC 签名认证
类似 AWS Signature V4,通过对请求内容签名来验证身份和完整性:
HMAC 签名认证流程
sequenceDiagram
participant C as 客户端
participant S as 服务器
participant DB as 密钥存储
Note over C: 构造签名
C->>C: 1. 拼接签名字符串<br/>METHOD + PATH + TIMESTAMP + BODY_HASH
C->>C: 2. HMAC-SHA256(secret_key, string_to_sign)
C->>S: 请求 + Headers:<br/>X-Access-Key: AK001<br/>X-Timestamp: 1711036800<br/>X-Signature: a3f2...
S->>DB: 3. 用 Access Key 查找 Secret Key
DB->>S: 返回 Secret Key
S->>S: 4. 检查时间戳(±5 分钟防重放)
S->>S: 5. 用相同算法重新计算签名
S->>S: 6. 恒定时间比较签名
alt 签名匹配
S->>C: 200 OK
else 签名不匹配
S->>C: 401 Unauthorized
end
Python: HMAC 签名生成和验证
import hmac
import hashlib
import time
from fastapi import FastAPI, Request, HTTPException
app = FastAPI(title="HMAC Auth Demo")
# --- 密钥存储 ---
SECRET_KEYS = {
"AK001": "sk-very-secret-key-001",
"AK002": "sk-very-secret-key-002",
}
REPLAY_WINDOW = 300 # 5 分钟防重放窗口
# --- 客户端:签名生成 ---
def sign_request(
method: str,
path: str,
body: str,
access_key: str,
secret_key: str,
) -> dict:
"""对 API 请求进行 HMAC 签名"""
timestamp = str(int(time.time()))
body_hash = hashlib.sha256(body.encode()).hexdigest()
# 构造签名字符串(规范化)
string_to_sign = f"{method.upper()}\n{path}\n{timestamp}\n{body_hash}"
# 计算 HMAC-SHA256 签名
signature = hmac.new(
secret_key.encode(),
string_to_sign.encode(),
hashlib.sha256,
).hexdigest()
return {
"X-Access-Key": access_key,
"X-Timestamp": timestamp,
"X-Signature": signature,
}
# --- 服务端:签名验证 ---
def verify_signature(
method: str,
path: str,
body: str,
headers: dict,
) -> str:
"""验证 HMAC 签名,返回 access_key"""
access_key = headers.get("x-access-key", "")
timestamp = headers.get("x-timestamp", "")
signature = headers.get("x-signature", "")
# 1. 检查必要 Header
if not all([access_key, timestamp, signature]):
raise ValueError("缺少签名 Header")
# 2. 查找 Secret Key
secret_key = SECRET_KEYS.get(access_key)
if not secret_key:
raise ValueError("无效的 Access Key")
# 3. 检查时间戳(防重放)
try:
ts = int(timestamp)
except ValueError:
raise ValueError("无效的时间戳")
if abs(time.time() - ts) > REPLAY_WINDOW:
raise ValueError("请求已过期(时间戳超出窗口)")
# 4. 重新计算签名
body_hash = hashlib.sha256(body.encode()).hexdigest()
string_to_sign = f"{method.upper()}\n{path}\n{timestamp}\n{body_hash}"
expected = hmac.new(
secret_key.encode(),
string_to_sign.encode(),
hashlib.sha256,
).hexdigest()
# 5. 恒定时间比较(防时序攻击)
if not hmac.compare_digest(signature, expected):
raise ValueError("签名不匹配")
return access_key
# --- FastAPI 中间件 ---
@app.middleware("http")
async def hmac_auth_middleware(request: Request, call_next):
if request.url.path.startswith("/api/"):
body = (await request.body()).decode()
try:
access_key = verify_signature(
method=request.method,
path=request.url.path,
body=body,
headers=dict(request.headers),
)
request.state.access_key = access_key
except ValueError as e:
raise HTTPException(status_code=401, detail=str(e))
response = await call_next(request)
return response
@app.post("/api/payment")
async def create_payment(request: Request):
return {
"message": "支付已创建",
"authenticated_by": request.state.access_key,
}
Java: HMAC 签名 Filter
import jakarta.servlet.*;
import jakarta.servlet.http.HttpServletRequest;
import jakarta.servlet.http.HttpServletResponse;
import org.springframework.stereotype.Component;
import javax.crypto.Mac;
import javax.crypto.spec.SecretKeySpec;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.security.MessageDigest;
import java.time.Instant;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
@Component
public class HmacAuthFilter implements Filter {
private static final long REPLAY_WINDOW = 300; // 5 分钟
private final Map<String, String> secretKeys = new ConcurrentHashMap<>();
public HmacAuthFilter() {
secretKeys.put("AK001", "sk-very-secret-key-001");
secretKeys.put("AK002", "sk-very-secret-key-002");
}
@Override
public void doFilter(ServletRequest req, ServletResponse res,
FilterChain chain) throws IOException, ServletException {
HttpServletRequest request = (HttpServletRequest) req;
HttpServletResponse response = (HttpServletResponse) res;
if (!request.getRequestURI().startsWith("/api/")) {
chain.doFilter(req, res);
return;
}
String accessKey = request.getHeader("X-Access-Key");
String timestamp = request.getHeader("X-Timestamp");
String signature = request.getHeader("X-Signature");
// 1. 检查必要 Header
if (accessKey == null || timestamp == null || signature == null) {
sendError(response, 401, "缺少签名 Header");
return;
}
// 2. 查找 Secret Key
String secretKey = secretKeys.get(accessKey);
if (secretKey == null) {
sendError(response, 401, "无效的 Access Key");
return;
}
// 3. 检查时间戳
long ts;
try {
ts = Long.parseLong(timestamp);
} catch (NumberFormatException e) {
sendError(response, 401, "无效的时间戳");
return;
}
if (Math.abs(Instant.now().getEpochSecond() - ts) > REPLAY_WINDOW) {
sendError(response, 401, "请求已过期");
return;
}
// 4. 读取 Body 并计算签名
CachedBodyHttpServletRequest cachedRequest =
new CachedBodyHttpServletRequest(request);
String body = new String(cachedRequest.getCachedBody(),
StandardCharsets.UTF_8);
String bodyHash = sha256(body);
String stringToSign = String.join("\n",
request.getMethod(), request.getRequestURI(), timestamp, bodyHash);
String expected = hmacSha256(secretKey, stringToSign);
// 5. 恒定时间比较
if (!MessageDigest.isEqual(
signature.getBytes(StandardCharsets.UTF_8),
expected.getBytes(StandardCharsets.UTF_8))) {
sendError(response, 401, "签名不匹配");
return;
}
cachedRequest.setAttribute("access_key", accessKey);
chain.doFilter(cachedRequest, response);
}
private String sha256(String input) {
try {
MessageDigest digest = MessageDigest.getInstance("SHA-256");
byte[] hash = digest.digest(input.getBytes(StandardCharsets.UTF_8));
return bytesToHex(hash);
} catch (Exception e) { throw new RuntimeException(e); }
}
private String hmacSha256(String key, String data) {
try {
Mac mac = Mac.getInstance("HmacSHA256");
mac.init(new SecretKeySpec(
key.getBytes(StandardCharsets.UTF_8), "HmacSHA256"));
return bytesToHex(
mac.doFinal(data.getBytes(StandardCharsets.UTF_8)));
} catch (Exception e) { throw new RuntimeException(e); }
}
private String bytesToHex(byte[] bytes) {
StringBuilder sb = new StringBuilder();
for (byte b : bytes) sb.append(String.format("%02x", b));
return sb.toString();
}
private void sendError(HttpServletResponse response, int status,
String message) throws IOException {
response.setStatus(status);
response.setContentType("application/json");
response.getWriter().write(
String.format("{\"error\":\"%s\"}", message));
}
}
Go: HMAC 签名中间件
package middleware
import (
"bytes"
"crypto/hmac"
"crypto/sha256"
"encoding/hex"
"fmt"
"io"
"math"
"net/http"
"strconv"
"time"
"github.com/gin-gonic/gin"
)
// HmacConfig HMAC 签名配置
type HmacConfig struct {
SecretKeys map[string]string // access_key -> secret_key
ReplayWindow time.Duration // 防重放窗口
}
// HmacAuth Gin 中间件:HMAC 签名认证
func HmacAuth(cfg HmacConfig) gin.HandlerFunc {
return func(c *gin.Context) {
accessKey := c.GetHeader("X-Access-Key")
timestamp := c.GetHeader("X-Timestamp")
signature := c.GetHeader("X-Signature")
// 1. 检查必要 Header
if accessKey == "" || timestamp == "" || signature == "" {
c.AbortWithStatusJSON(http.StatusUnauthorized, gin.H{
"error": "缺少签名 Header",
})
return
}
// 2. 查找 Secret Key
secretKey, ok := cfg.SecretKeys[accessKey]
if !ok {
c.AbortWithStatusJSON(http.StatusUnauthorized, gin.H{
"error": "无效的 Access Key",
})
return
}
// 3. 检查时间戳
ts, err := strconv.ParseInt(timestamp, 10, 64)
if err != nil {
c.AbortWithStatusJSON(http.StatusUnauthorized, gin.H{
"error": "无效的时间戳",
})
return
}
diff := math.Abs(float64(time.Now().Unix() - ts))
if diff > cfg.ReplayWindow.Seconds() {
c.AbortWithStatusJSON(http.StatusUnauthorized, gin.H{
"error": "请求已过期",
})
return
}
// 4. 读取 Body
bodyBytes, err := io.ReadAll(c.Request.Body)
if err != nil {
c.AbortWithStatusJSON(http.StatusBadRequest, gin.H{
"error": "无法读取请求体",
})
return
}
// 重新设置 Body(供后续 Handler 使用)
c.Request.Body = io.NopCloser(bytes.NewBuffer(bodyBytes))
// 5. 计算签名
bodyHash := sha256Hex(string(bodyBytes))
stringToSign := fmt.Sprintf("%s\n%s\n%s\n%s",
c.Request.Method, c.Request.URL.Path, timestamp, bodyHash)
mac := hmac.New(sha256.New, []byte(secretKey))
mac.Write([]byte(stringToSign))
expected := hex.EncodeToString(mac.Sum(nil))
// 6. 恒定时间比较
if !hmac.Equal([]byte(signature), []byte(expected)) {
c.AbortWithStatusJSON(http.StatusUnauthorized, gin.H{
"error": "签名不匹配",
})
return
}
c.Set("access_key", accessKey)
c.Next()
}
}
func sha256Hex(s string) string {
h := sha256.Sum256([]byte(s))
return hex.EncodeToString(h[:])
}
12.6 API Gateway 认证模式
集中认证 vs 分布式认证
flowchart LR
subgraph 集中认证(推荐)
C1[客户端] --> GW1[API Gateway<br/>① 验证 Token<br/>② 速率限制<br/>③ 注入用户信息]
GW1 --> MS1[微服务<br/>信任 GW 的转发]
end
subgraph 分布式认证
C2[客户端] --> GW2[API Gateway<br/>仅路由转发]
GW2 --> MS2[微服务<br/>自己验证 Token]
end
Token 内省 vs 本地验证
方式 |
Token 内省(Introspection) |
本地验证(JWT) |
|---|---|---|
原理 |
调用授权服务器验证 Token |
本地用公钥验证签名 |
延迟 |
高(网络调用) |
低(本地计算) |
实时性 |
实时(可检查撤销) |
延迟(依赖过期时间) |
依赖 |
授权服务器必须可用 |
仅需公钥(JWKS) |
适用 |
Opaque Token |
JWT |
12.7 Service-to-Service 认证
flowchart TB
L1[Level 1: 无认证<br/>内网信任 ❌ 不安全]
L2[Level 2: Service Account + Token<br/>静态凭证,需手动轮换]
L3[Level 3: Service Mesh mTLS<br/>自动证书管理,透明加密]
L4[Level 4: SPIFFE/SPIRE<br/>统一工作负载身份,跨平台,自动轮换]
L1 -->|演进| L2
L2 -->|演进| L3
L3 -->|演进| L4
style L1 fill:#ffcccc
style L2 fill:#ffffcc
style L3 fill:#ccffcc
style L4 fill:#ccccff
12.8 API 认证方式对比
方式 |
安全性 |
复杂度 |
防重放 |
防篡改 |
携带身份 |
适用场景 |
|---|---|---|---|---|---|---|
API Key |
⭐⭐ |
低 |
❌ |
❌ |
❌ |
公开 API、低安全要求 |
Bearer Token (JWT) |
⭐⭐⭐⭐ |
中 |
✅(过期) |
✅(签名) |
✅ |
Web/Mobile API |
HMAC 签名 |
⭐⭐⭐⭐ |
高 |
✅(时间戳) |
✅(签名) |
❌ |
高安全 API(支付) |
mTLS |
⭐⭐⭐⭐⭐ |
高 |
✅(TLS) |
✅(TLS) |
✅(证书) |
服务间通信 |
SPIFFE SVID |
⭐⭐⭐⭐⭐ |
中 |
✅ |
✅ |
✅ |
云原生服务间通信 |
12.9 认证方式选型指南
按场景选型
场景 |
推荐方式 |
理由 |
|---|---|---|
公开数据 API(天气、汇率) |
API Key |
简单,仅用于标识和限流 |
用户数据 API(SaaS) |
OAuth2 Bearer Token |
标准化,携带用户身份和权限 |
支付/金融 API |
HMAC 签名 + TLS |
请求完整性验证,防篡改 |
微服务间通信(同集群) |
mTLS(Service Mesh) |
零信任,自动证书管理 |
微服务间通信(跨云) |
SPIFFE/SPIRE |
统一身份,跨平台 |
第三方 Webhook |
HMAC 签名 |
验证来源真实性 |
移动端 API |
OAuth2 + PKCE |
公共客户端安全授权 |
组合使用
在实际生产环境中,通常需要组合多种认证方式:
flowchart TD
EXT[外部请求] --> GW[API Gateway]
GW -->|Bearer Token 验证| AUTH[认证服务]
GW -->|通过| MESH[Service Mesh]
MESH -->|mTLS| SVC_A[服务 A]
MESH -->|mTLS| SVC_B[服务 B]
SVC_A -->|HMAC 签名| PAY[支付服务]
SVC_B -->|mTLS| DB[数据库]
style GW fill:#f9f,stroke:#333
style MESH fill:#bbf,stroke:#333
典型架构:
外部 → Gateway:OAuth2 Bearer Token(用户认证)
Gateway → 微服务:mTLS(Service Mesh 自动管理)
微服务 → 敏感服务:HMAC 签名(额外的请求完整性保护)
12.10 小结
Bearer Token(JWT) 是最通用的 API 认证方式
mTLS 提供最强的安全保障,适合服务间通信
API Gateway 集中认证 简化了微服务的认证逻辑
HMAC 签名 适合需要请求完整性验证的高安全场景
SPIFFE/SPIRE 是云原生时代服务间认证的最佳实践(详见第四部分)
选择认证方式时需权衡:安全性、复杂度、性能、运维成本
生产环境通常组合使用多种认证方式,形成纵深防御