# 第十二章:API 认证模式
> "在微服务时代,API 认证不再是可选项,而是每个请求的必经之路。"
```{mermaid}
mindmap
root((API 认证))
认证方式
API Key
Bearer Token
mTLS
HMAC 签名
Gateway 模式
集中认证
分布式认证
Token 内省
服务间认证
Service Account
Service Mesh
SPIFFE/SPIRE
最佳实践
生命周期管理
速率限制
审计日志
```
## 12.1 API 认证的特殊性
API 认证与传统 Web 认证有本质区别:
| 维度 | Web 认证 | API 认证 |
|------|---------|---------|
| 交互方式 | 用户交互(浏览器) | 程序化(HTTP 客户端) |
| 状态 | 有状态(Session) | 无状态(Token) |
| 凭证类型 | Cookie | Authorization Header |
| 调用者 | 人类 | 人类 + 机器 |
| 频率 | 低(页面级) | 高(API 级) |
### API 认证方式选型决策树
```{mermaid}
flowchart TD
A[选择 API 认证方式] --> B{调用者是谁?}
B -->|服务间通信| C{网络环境?}
B -->|第三方开发者| D{安全要求?}
B -->|终端用户 App| E[OAuth2 Bearer Token]
C -->|同一集群/Service Mesh| F[mTLS]
C -->|跨网络/跨云| G{需要请求签名?}
G -->|是| H[HMAC 签名]
G -->|否| I[mTLS 或 Bearer Token]
D -->|低:公开数据| J[API Key]
D -->|中:用户数据| E
D -->|高:支付/金融| K[HMAC 签名 + mTLS]
```
## 12.2 API Key 认证
最简单的 API 认证方式,适合低安全要求的场景:
### API Key 认证流程
```{mermaid}
sequenceDiagram
participant C as 客户端
participant G as API Gateway
participant S as 后端服务
participant DB as Key 存储
C->>G: GET /api/data
X-API-Key: key-abc123
G->>DB: 查询 API Key
alt Key 有效
DB->>G: 返回 Key 信息(权限、配额)
G->>G: 检查速率限制
G->>S: 转发请求 + 注入客户端信息
S->>G: 响应数据
G->>C: 200 OK + 数据
else Key 无效或过期
DB->>G: Key 不存在
G->>C: 401 Unauthorized
end
```
### Python: FastAPI API Key 中间件
```python
from fastapi import FastAPI, Header, Query, HTTPException, Depends, Request
from fastapi.security import APIKeyHeader, APIKeyQuery
from typing import Optional
from datetime import datetime
import hashlib
import secrets
app = FastAPI(title="API Key Auth Demo")
# --- API Key 存储(生产环境应使用数据库) ---
API_KEYS = {
# 存储 Key 的哈希值,而非明文
hashlib.sha256(b"sk-live-abc123def456").hexdigest(): {
"name": "Service A",
"permissions": ["read"],
"rate_limit": 1000,
"created_at": "2025-01-01",
},
hashlib.sha256(b"sk-live-xyz789ghi012").hexdigest(): {
"name": "Service B",
"permissions": ["read", "write"],
"rate_limit": 5000,
"created_at": "2025-06-01",
},
}
# 支持两种传递方式:Header 和 Query
api_key_header = APIKeyHeader(name="X-API-Key", auto_error=False)
api_key_query = APIKeyQuery(name="api_key", auto_error=False)
async def verify_api_key(
header_key: Optional[str] = Depends(api_key_header),
query_key: Optional[str] = Depends(api_key_query),
) -> dict:
"""验证 API Key(支持 Header 和 Query 两种方式)"""
api_key = header_key or query_key
if not api_key:
raise HTTPException(
status_code=401,
detail="缺少 API Key,请通过 X-API-Key Header 或 api_key Query 参数传递",
)
# 对比哈希值(避免明文存储)
key_hash = hashlib.sha256(api_key.encode()).hexdigest()
client = API_KEYS.get(key_hash)
if not client:
raise HTTPException(status_code=401, detail="无效的 API Key")
return client
@app.get("/api/data")
async def get_data(client: dict = Depends(verify_api_key)):
if "read" not in client["permissions"]:
raise HTTPException(status_code=403, detail="权限不足")
return {"data": "sensitive information", "client": client["name"]}
@app.post("/api/data")
async def create_data(client: dict = Depends(verify_api_key)):
if "write" not in client["permissions"]:
raise HTTPException(status_code=403, detail="权限不足:需要 write 权限")
return {"message": "数据已创建", "client": client["name"]}
# --- API Key 生成工具 ---
def generate_api_key(prefix: str = "sk-live") -> str:
"""生成安全的 API Key"""
random_part = secrets.token_urlsafe(32)
return f"{prefix}-{random_part}"
```
### Java: Spring Boot API Key Filter
```java
import jakarta.servlet.*;
import jakarta.servlet.http.HttpServletRequest;
import jakarta.servlet.http.HttpServletResponse;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
import org.springframework.boot.web.servlet.FilterRegistrationBean;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.security.MessageDigest;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
/**
* API Key 认证 Filter
*/
@Component
public class ApiKeyFilter implements Filter {
private static final String API_KEY_HEADER = "X-API-Key";
private static final String API_KEY_QUERY = "api_key";
// 存储 Key 哈希 → 客户端信息
private final Map apiKeys = new ConcurrentHashMap<>();
public ApiKeyFilter() {
// 初始化(生产环境从数据库加载)
apiKeys.put(sha256("sk-live-abc123def456"),
new ApiClient("Service A", Set.of("read")));
apiKeys.put(sha256("sk-live-xyz789ghi012"),
new ApiClient("Service B", Set.of("read", "write")));
}
@Override
public void doFilter(ServletRequest req, ServletResponse res,
FilterChain chain) throws IOException, ServletException {
HttpServletRequest request = (HttpServletRequest) req;
HttpServletResponse response = (HttpServletResponse) res;
// 从 Header 或 Query 获取 API Key
String apiKey = request.getHeader(API_KEY_HEADER);
if (apiKey == null || apiKey.isBlank()) {
apiKey = request.getParameter(API_KEY_QUERY);
}
if (apiKey == null || apiKey.isBlank()) {
response.setStatus(401);
response.setContentType("application/json");
response.getWriter().write("{\"error\":\"缺少 API Key\"}");
return;
}
ApiClient client = apiKeys.get(sha256(apiKey));
if (client == null) {
response.setStatus(401);
response.setContentType("application/json");
response.getWriter().write("{\"error\":\"无效的 API Key\"}");
return;
}
// 将客户端信息注入请求属性
request.setAttribute("api_client", client);
chain.doFilter(request, response);
}
private String sha256(String input) {
try {
MessageDigest digest = MessageDigest.getInstance("SHA-256");
byte[] hash = digest.digest(input.getBytes(StandardCharsets.UTF_8));
StringBuilder hex = new StringBuilder();
for (byte b : hash) hex.append(String.format("%02x", b));
return hex.toString();
} catch (Exception e) {
throw new RuntimeException(e);
}
}
public record ApiClient(String name, Set permissions) {}
}
/**
* Filter 注册配置
*/
@Configuration
public class FilterConfig {
@Bean
public FilterRegistrationBean apiKeyFilterRegistration(
ApiKeyFilter filter) {
FilterRegistrationBean reg = new FilterRegistrationBean<>();
reg.setFilter(filter);
reg.addUrlPatterns("/api/*");
reg.setOrder(1);
return reg;
}
}
```
### Go: Gin API Key 中间件
```go
package middleware
import (
"crypto/sha256"
"encoding/hex"
"net/http"
"sync"
"github.com/gin-gonic/gin"
)
// ApiClient API 客户端信息
type ApiClient struct {
Name string
Permissions map[string]bool
}
// ApiKeyStore API Key 存储
type ApiKeyStore struct {
mu sync.RWMutex
keys map[string]*ApiClient // sha256(key) -> client
}
func NewApiKeyStore() *ApiKeyStore {
store := &ApiKeyStore{keys: make(map[string]*ApiClient)}
// 初始化(生产环境从数据库加载)
store.Register("sk-live-abc123def456", &ApiClient{
Name: "Service A",
Permissions: map[string]bool{"read": true},
})
store.Register("sk-live-xyz789ghi012", &ApiClient{
Name: "Service B",
Permissions: map[string]bool{"read": true, "write": true},
})
return store
}
func (s *ApiKeyStore) Register(key string, client *ApiClient) {
hash := sha256Hex(key)
s.mu.Lock()
defer s.mu.Unlock()
s.keys[hash] = client
}
func (s *ApiKeyStore) Lookup(key string) (*ApiClient, bool) {
hash := sha256Hex(key)
s.mu.RLock()
defer s.mu.RUnlock()
client, ok := s.keys[hash]
return client, ok
}
func sha256Hex(s string) string {
h := sha256.Sum256([]byte(s))
return hex.EncodeToString(h[:])
}
// ApiKeyAuth Gin 中间件:API Key 认证
func ApiKeyAuth(store *ApiKeyStore) gin.HandlerFunc {
return func(c *gin.Context) {
// 从 Header 或 Query 获取 API Key
apiKey := c.GetHeader("X-API-Key")
if apiKey == "" {
apiKey = c.Query("api_key")
}
if apiKey == "" {
c.AbortWithStatusJSON(http.StatusUnauthorized, gin.H{
"error": "缺少 API Key",
})
return
}
client, ok := store.Lookup(apiKey)
if !ok {
c.AbortWithStatusJSON(http.StatusUnauthorized, gin.H{
"error": "无效的 API Key",
})
return
}
// 注入客户端信息
c.Set("api_client", client)
c.Set("client_name", client.Name)
c.Next()
}
}
// RequirePermission 权限检查中间件
func RequirePermission(perm string) gin.HandlerFunc {
return func(c *gin.Context) {
client, exists := c.Get("api_client")
if !exists {
c.AbortWithStatusJSON(http.StatusUnauthorized, gin.H{
"error": "未认证",
})
return
}
apiClient := client.(*ApiClient)
if !apiClient.Permissions[perm] {
c.AbortWithStatusJSON(http.StatusForbidden, gin.H{
"error": "权限不足:需要 " + perm + " 权限",
})
return
}
c.Next()
}
}
```
**API Key 的问题**:
- ❌ 无过期时间(除非手动管理)
- ❌ 无法携带用户身份信息
- ❌ 容易泄露(日志、URL、代码仓库)
- ❌ 无标准的轮换机制
## 12.3 Bearer Token 认证
使用 OAuth2 Access Token,是最推荐的 API 认证方式:
### OAuth2 Bearer Token 认证流程
```{mermaid}
sequenceDiagram
participant C as 客户端
participant AS as 授权服务器
participant RS as 资源服务器 (API)
participant JWKS as JWKS 端点
Note over C,AS: 获取 Token
C->>AS: POST /oauth/token
{grant_type, client_id, client_secret}
AS->>AS: 验证客户端凭证
AS->>C: {access_token: "eyJ...", expires_in: 3600}
Note over C,RS: 使用 Token 访问 API
C->>RS: GET /api/resource
Authorization: Bearer eyJ...
RS->>RS: 解析 JWT Header
RS->>JWKS: 获取公钥(缓存)
JWKS->>RS: 返回 JWK Set
RS->>RS: 验证签名 + 过期时间 + audience + issuer
alt Token 有效
RS->>C: 200 OK + 资源数据
else Token 无效/过期
RS->>C: 401 Unauthorized
end
```
### Python: FastAPI Bearer Token 验证
```python
from fastapi import FastAPI, Depends, HTTPException, Security
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
import jwt
import httpx
from functools import lru_cache
from typing import Optional
from datetime import datetime
app = FastAPI(title="Bearer Token Auth Demo")
security = HTTPBearer()
# --- 配置 ---
AUTH_SERVER = "https://auth.example.com"
JWKS_URL = f"{AUTH_SERVER}/.well-known/jwks.json"
EXPECTED_AUDIENCE = "my-api"
EXPECTED_ISSUER = AUTH_SERVER
# --- JWKS 公钥缓存 ---
_jwks_cache: Optional[dict] = None
_jwks_cache_time: float = 0
async def get_jwks() -> dict:
"""获取并缓存 JWKS 公钥集"""
global _jwks_cache, _jwks_cache_time
import time
now = time.time()
# 缓存 1 小时
if _jwks_cache and (now - _jwks_cache_time) < 3600:
return _jwks_cache
async with httpx.AsyncClient() as client:
resp = await client.get(JWKS_URL)
resp.raise_for_status()
_jwks_cache = resp.json()
_jwks_cache_time = now
return _jwks_cache
def get_public_key(token: str, jwks: dict):
"""从 JWKS 中找到匹配的公钥"""
header = jwt.get_unverified_header(token)
kid = header.get("kid")
for key in jwks.get("keys", []):
if key.get("kid") == kid:
return jwt.algorithms.RSAAlgorithm.from_jwk(key)
raise ValueError(f"找不到 kid={kid} 对应的公钥")
async def verify_bearer_token(
credentials: HTTPAuthorizationCredentials = Security(security),
) -> dict:
"""验证 Bearer Token 并返回 payload"""
token = credentials.credentials
try:
jwks = await get_jwks()
public_key = get_public_key(token, jwks)
payload = jwt.decode(
token,
public_key,
algorithms=["RS256"],
audience=EXPECTED_AUDIENCE,
issuer=EXPECTED_ISSUER,
options={"require": ["exp", "iss", "aud", "sub"]},
)
return payload
except jwt.ExpiredSignatureError:
raise HTTPException(status_code=401, detail="Token 已过期")
except jwt.InvalidAudienceError:
raise HTTPException(status_code=401, detail="Token audience 不匹配")
except jwt.InvalidIssuerError:
raise HTTPException(status_code=401, detail="Token issuer 不匹配")
except jwt.InvalidTokenError as e:
raise HTTPException(status_code=401, detail=f"无效的 Token: {e}")
# --- 权限检查 ---
def require_scope(required: str):
"""检查 Token 是否包含所需的 scope"""
async def checker(payload: dict = Depends(verify_bearer_token)):
scopes = payload.get("scope", "").split()
if required not in scopes:
raise HTTPException(
status_code=403,
detail=f"需要 scope: {required}",
)
return payload
return checker
@app.get("/api/profile")
async def get_profile(user: dict = Depends(verify_bearer_token)):
return {
"user_id": user["sub"],
"roles": user.get("roles", []),
"scopes": user.get("scope", "").split(),
}
@app.get("/api/admin")
async def admin_endpoint(user: dict = Depends(require_scope("admin"))):
return {"message": "管理员接口", "user_id": user["sub"]}
```
### Java: Spring Resource Server
```java
// application.yml
// spring:
// security:
// oauth2:
// resourceserver:
// jwt:
// issuer-uri: https://auth.example.com
// audiences: my-api
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.security.config.annotation.method.configuration.EnableMethodSecurity;
import org.springframework.security.config.annotation.web.builders.HttpSecurity;
import org.springframework.security.config.annotation.web.configuration.EnableWebSecurity;
import org.springframework.security.config.http.SessionCreationPolicy;
import org.springframework.security.oauth2.server.resource.authentication.JwtAuthenticationConverter;
import org.springframework.security.oauth2.server.resource.authentication.JwtGrantedAuthoritiesConverter;
import org.springframework.security.web.SecurityFilterChain;
@Configuration
@EnableWebSecurity
@EnableMethodSecurity
public class ResourceServerConfig {
@Bean
public SecurityFilterChain securityFilterChain(HttpSecurity http) throws Exception {
http
.sessionManagement(s -> s.sessionCreationPolicy(SessionCreationPolicy.STATELESS))
.authorizeHttpRequests(auth -> auth
.requestMatchers("/api/public/**").permitAll()
.requestMatchers("/api/admin/**").hasAuthority("SCOPE_admin")
.requestMatchers("/api/**").authenticated()
)
.oauth2ResourceServer(oauth2 -> oauth2
.jwt(jwt -> jwt.jwtAuthenticationConverter(jwtAuthConverter()))
);
return http.build();
}
@Bean
public JwtAuthenticationConverter jwtAuthConverter() {
JwtGrantedAuthoritiesConverter grantedAuthorities =
new JwtGrantedAuthoritiesConverter();
// 从 JWT 的 "scope" claim 提取权限
grantedAuthorities.setAuthoritiesClaimName("scope");
grantedAuthorities.setAuthorityPrefix("SCOPE_");
JwtAuthenticationConverter converter = new JwtAuthenticationConverter();
converter.setJwtGrantedAuthoritiesConverter(grantedAuthorities);
return converter;
}
}
/**
* API 控制器
*/
@RestController
@RequestMapping("/api")
public class ApiController {
@GetMapping("/profile")
public Map getProfile(
@AuthenticationPrincipal Jwt jwt) {
return Map.of(
"user_id", jwt.getSubject(),
"claims", jwt.getClaims(),
"scopes", jwt.getClaimAsString("scope")
);
}
@GetMapping("/admin")
@PreAuthorize("hasAuthority('SCOPE_admin')")
public Map adminEndpoint(
@AuthenticationPrincipal Jwt jwt) {
return Map.of(
"message", "管理员接口",
"user_id", jwt.getSubject()
);
}
}
```
### Go: Gin JWT Bearer 中间件
```go
package middleware
import (
"context"
"encoding/json"
"fmt"
"net/http"
"strings"
"sync"
"time"
"github.com/gin-gonic/gin"
"github.com/golang-jwt/jwt/v5"
"github.com/lestrrat-go/jwx/v2/jwk"
)
// BearerConfig Bearer Token 验证配置
type BearerConfig struct {
JwksURL string
Issuer string
Audience string
}
// JWKSCache JWKS 公钥缓存
type JWKSCache struct {
mu sync.RWMutex
keySet jwk.Set
fetched time.Time
ttl time.Duration
url string
}
func NewJWKSCache(url string, ttl time.Duration) *JWKSCache {
return &JWKSCache{url: url, ttl: ttl}
}
func (c *JWKSCache) GetKeySet(ctx context.Context) (jwk.Set, error) {
c.mu.RLock()
if c.keySet != nil && time.Since(c.fetched) < c.ttl {
defer c.mu.RUnlock()
return c.keySet, nil
}
c.mu.RUnlock()
c.mu.Lock()
defer c.mu.Unlock()
set, err := jwk.Fetch(ctx, c.url)
if err != nil {
return nil, fmt.Errorf("获取 JWKS 失败: %w", err)
}
c.keySet = set
c.fetched = time.Now()
return set, nil
}
// BearerAuth Gin 中间件:Bearer Token 认证
func BearerAuth(cfg BearerConfig) gin.HandlerFunc {
cache := NewJWKSCache(cfg.JwksURL, 1*time.Hour)
return func(c *gin.Context) {
// 提取 Bearer Token
authHeader := c.GetHeader("Authorization")
if !strings.HasPrefix(authHeader, "Bearer ") {
c.AbortWithStatusJSON(http.StatusUnauthorized, gin.H{
"error": "缺少 Bearer Token",
})
return
}
tokenStr := strings.TrimPrefix(authHeader, "Bearer ")
// 获取 JWKS 公钥
keySet, err := cache.GetKeySet(c.Request.Context())
if err != nil {
c.AbortWithStatusJSON(http.StatusInternalServerError, gin.H{
"error": "无法获取公钥",
})
return
}
// 解析并验证 JWT
token, err := jwt.Parse(tokenStr, func(t *jwt.Token) (interface{}, error) {
kid, ok := t.Header["kid"].(string)
if !ok {
return nil, fmt.Errorf("缺少 kid")
}
key, found := keySet.LookupKeyID(kid)
if !found {
return nil, fmt.Errorf("找不到 kid=%s 的公钥", kid)
}
var rawKey interface{}
if err := key.Raw(&rawKey); err != nil {
return nil, err
}
return rawKey, nil
},
jwt.WithIssuer(cfg.Issuer),
jwt.WithAudience(cfg.Audience),
jwt.WithValidMethods([]string{"RS256"}),
)
if err != nil || !token.Valid {
c.AbortWithStatusJSON(http.StatusUnauthorized, gin.H{
"error": fmt.Sprintf("Token 无效: %v", err),
})
return
}
claims, ok := token.Claims.(jwt.MapClaims)
if !ok {
c.AbortWithStatusJSON(http.StatusUnauthorized, gin.H{
"error": "无法解析 claims",
})
return
}
// 注入用户信息
c.Set("user_id", claims["sub"])
c.Set("claims", claims)
c.Next()
}
}
// RequireScope 检查 Token 是否包含所需 scope
func RequireScope(required string) gin.HandlerFunc {
return func(c *gin.Context) {
claims, exists := c.Get("claims")
if !exists {
c.AbortWithStatusJSON(http.StatusUnauthorized, gin.H{
"error": "未认证",
})
return
}
mapClaims := claims.(jwt.MapClaims)
scopeStr, _ := mapClaims["scope"].(string)
scopes := strings.Fields(scopeStr)
for _, s := range scopes {
if s == required {
c.Next()
return
}
}
c.AbortWithStatusJSON(http.StatusForbidden, gin.H{
"error": fmt.Sprintf("需要 scope: %s", required),
})
}
}
```
## 12.4 mTLS 客户端证书认证
最安全的 API 认证方式,适合服务间通信:
### mTLS 认证流程
```{mermaid}
sequenceDiagram
participant C as 客户端
participant S as 服务器
participant CA as 证书颁发机构
Note over C,CA: 准备阶段(一次性)
C->>CA: 申请客户端证书
CA->>C: 颁发客户端证书 + 私钥
S->>CA: 申请服务器证书
CA->>S: 颁发服务器证书 + 私钥
Note over C,S: TLS 握手(双向验证)
C->>S: ClientHello
S->>C: ServerHello + 服务器证书
C->>C: 验证服务器证书(用 CA 公钥)
S->>C: CertificateRequest(要求客户端证书)
C->>S: 客户端证书 + CertificateVerify
S->>S: 验证客户端证书(用 CA 公钥)
Note over C,S: 双向验证完成 ✅ 建立加密通道
C->>S: HTTP 请求(加密通道内,无需额外 Token)
S->>S: 从证书 CN/SAN 提取客户端身份
S->>C: HTTP 响应
```
### Python: mTLS 服务端与客户端
```python
# --- mTLS 客户端 ---
import httpx
async def call_service_with_mtls():
"""使用 mTLS 调用其他服务"""
async with httpx.AsyncClient(
cert=("client.crt", "client.key"), # 客户端证书 + 私钥
verify="ca.crt", # CA 证书(验证服务器)
) as client:
response = await client.get("https://service-b:8443/api/data")
return response.json()
# --- mTLS 服务端(FastAPI + uvicorn) ---
# 启动命令:
# uvicorn main:app --host 0.0.0.0 --port 8443 \
# --ssl-keyfile server.key \
# --ssl-certfile server.crt \
# --ssl-ca-certs ca.crt \
# --ssl-cert-reqs 2 # 2 = CERT_REQUIRED(强制客户端证书)
from fastapi import FastAPI, Request, HTTPException
app = FastAPI(title="mTLS Service")
@app.middleware("http")
async def extract_client_identity(request: Request, call_next):
"""从客户端证书中提取身份信息"""
# Nginx/Envoy 代理时,证书信息通常通过 Header 传递
client_cn = request.headers.get("X-Client-CN", "")
client_serial = request.headers.get("X-Client-Serial", "")
if not client_cn:
# 直接 TLS 连接时,从 transport 获取
transport = request.scope.get("transport")
if transport and hasattr(transport, "get_extra_info"):
peercert = transport.get_extra_info("peercert")
if peercert:
for rdn in peercert.get("subject", ()):
for attr_type, attr_value in rdn:
if attr_type == "commonName":
client_cn = attr_value
request.state.client_cn = client_cn
response = await call_next(request)
return response
@app.get("/api/data")
async def get_data(request: Request):
client = getattr(request.state, "client_cn", "unknown")
return {"data": "sensitive", "authenticated_client": client}
```
### Java: mTLS 服务端配置
```java
// application.yml
// server:
// port: 8443
// ssl:
// enabled: true
// key-store: classpath:server-keystore.p12
// key-store-password: changeit
// key-store-type: PKCS12
// trust-store: classpath:truststore.p12
// trust-store-password: changeit
// trust-store-type: PKCS12
// client-auth: need # need = 强制客户端证书
import org.springframework.web.bind.annotation.*;
import jakarta.servlet.http.HttpServletRequest;
import java.security.cert.X509Certificate;
@RestController
@RequestMapping("/api")
public class MtlsController {
@GetMapping("/data")
public Map getData(HttpServletRequest request) {
// 从请求中提取客户端证书
X509Certificate[] certs = (X509Certificate[])
request.getAttribute("jakarta.servlet.request.X509Certificate");
String clientCN = "unknown";
if (certs != null && certs.length > 0) {
clientCN = certs[0].getSubjectX500Principal().getName();
}
return Map.of(
"data", "sensitive information",
"authenticated_client", clientCN
);
}
}
// --- mTLS 客户端(RestTemplate) ---
import org.apache.hc.client5.http.impl.classic.CloseableHttpClient;
import org.apache.hc.client5.http.impl.classic.HttpClients;
import org.apache.hc.client5.http.ssl.SSLConnectionSocketFactory;
import org.apache.hc.core5.ssl.SSLContextBuilder;
import org.springframework.http.client.HttpComponentsClientHttpRequestFactory;
import org.springframework.web.client.RestTemplate;
import javax.net.ssl.SSLContext;
import java.io.File;
@Configuration
public class MtlsClientConfig {
@Bean
public RestTemplate mtlsRestTemplate() throws Exception {
SSLContext sslContext = SSLContextBuilder.create()
.loadKeyMaterial(
new File("client-keystore.p12"),
"changeit".toCharArray(),
"changeit".toCharArray())
.loadTrustMaterial(
new File("truststore.p12"),
"changeit".toCharArray())
.build();
SSLConnectionSocketFactory socketFactory =
new SSLConnectionSocketFactory(sslContext);
CloseableHttpClient httpClient = HttpClients.custom()
.setSSLSocketFactory(socketFactory)
.build();
return new RestTemplate(
new HttpComponentsClientHttpRequestFactory(httpClient));
}
}
```
### Go: mTLS 服务端与客户端
```go
package main
import (
"crypto/tls"
"crypto/x509"
"fmt"
"io"
"log"
"net/http"
"os"
"github.com/gin-gonic/gin"
)
// --- mTLS 服务端 ---
func StartMTLSServer() {
// 加载 CA 证书(用于验证客户端证书)
caCert, err := os.ReadFile("ca.crt")
if err != nil {
log.Fatal(err)
}
caCertPool := x509.NewCertPool()
caCertPool.AppendCertsFromPEM(caCert)
// TLS 配置:要求客户端证书
tlsConfig := &tls.Config{
ClientCAs: caCertPool,
ClientAuth: tls.RequireAndVerifyClientCert,
MinVersion: tls.VersionTLS13,
}
r := gin.Default()
// 从客户端证书提取身份
r.Use(func(c *gin.Context) {
if c.Request.TLS != nil && len(c.Request.TLS.PeerCertificates) > 0 {
cert := c.Request.TLS.PeerCertificates[0]
c.Set("client_cn", cert.Subject.CommonName)
c.Set("client_serial", cert.SerialNumber.String())
}
c.Next()
})
r.GET("/api/data", func(c *gin.Context) {
clientCN := c.GetString("client_cn")
c.JSON(http.StatusOK, gin.H{
"data": "sensitive information",
"authenticated_client": clientCN,
})
})
server := &http.Server{
Addr: ":8443",
Handler: r,
TLSConfig: tlsConfig,
}
log.Println("mTLS server starting on :8443")
log.Fatal(server.ListenAndServeTLS("server.crt", "server.key"))
}
// --- mTLS 客户端 ---
func CallWithMTLS(url string) ([]byte, error) {
// 加载客户端证书
cert, err := tls.LoadX509KeyPair("client.crt", "client.key")
if err != nil {
return nil, fmt.Errorf("加载客户端证书失败: %w", err)
}
// 加载 CA 证书
caCert, err := os.ReadFile("ca.crt")
if err != nil {
return nil, fmt.Errorf("加载 CA 证书失败: %w", err)
}
caCertPool := x509.NewCertPool()
caCertPool.AppendCertsFromPEM(caCert)
tlsConfig := &tls.Config{
Certificates: []tls.Certificate{cert},
RootCAs: caCertPool,
MinVersion: tls.VersionTLS13,
}
client := &http.Client{
Transport: &http.Transport{TLSClientConfig: tlsConfig},
}
resp, err := client.Get(url)
if err != nil {
return nil, err
}
defer resp.Body.Close()
return io.ReadAll(resp.Body)
}
```
## 12.5 HMAC 签名认证
类似 AWS Signature V4,通过对请求内容签名来验证身份和完整性:
### HMAC 签名认证流程
```{mermaid}
sequenceDiagram
participant C as 客户端
participant S as 服务器
participant DB as 密钥存储
Note over C: 构造签名
C->>C: 1. 拼接签名字符串
METHOD + PATH + TIMESTAMP + BODY_HASH
C->>C: 2. HMAC-SHA256(secret_key, string_to_sign)
C->>S: 请求 + Headers:
X-Access-Key: AK001
X-Timestamp: 1711036800
X-Signature: a3f2...
S->>DB: 3. 用 Access Key 查找 Secret Key
DB->>S: 返回 Secret Key
S->>S: 4. 检查时间戳(±5 分钟防重放)
S->>S: 5. 用相同算法重新计算签名
S->>S: 6. 恒定时间比较签名
alt 签名匹配
S->>C: 200 OK
else 签名不匹配
S->>C: 401 Unauthorized
end
```
### Python: HMAC 签名生成和验证
```python
import hmac
import hashlib
import time
from fastapi import FastAPI, Request, HTTPException
app = FastAPI(title="HMAC Auth Demo")
# --- 密钥存储 ---
SECRET_KEYS = {
"AK001": "sk-very-secret-key-001",
"AK002": "sk-very-secret-key-002",
}
REPLAY_WINDOW = 300 # 5 分钟防重放窗口
# --- 客户端:签名生成 ---
def sign_request(
method: str,
path: str,
body: str,
access_key: str,
secret_key: str,
) -> dict:
"""对 API 请求进行 HMAC 签名"""
timestamp = str(int(time.time()))
body_hash = hashlib.sha256(body.encode()).hexdigest()
# 构造签名字符串(规范化)
string_to_sign = f"{method.upper()}\n{path}\n{timestamp}\n{body_hash}"
# 计算 HMAC-SHA256 签名
signature = hmac.new(
secret_key.encode(),
string_to_sign.encode(),
hashlib.sha256,
).hexdigest()
return {
"X-Access-Key": access_key,
"X-Timestamp": timestamp,
"X-Signature": signature,
}
# --- 服务端:签名验证 ---
def verify_signature(
method: str,
path: str,
body: str,
headers: dict,
) -> str:
"""验证 HMAC 签名,返回 access_key"""
access_key = headers.get("x-access-key", "")
timestamp = headers.get("x-timestamp", "")
signature = headers.get("x-signature", "")
# 1. 检查必要 Header
if not all([access_key, timestamp, signature]):
raise ValueError("缺少签名 Header")
# 2. 查找 Secret Key
secret_key = SECRET_KEYS.get(access_key)
if not secret_key:
raise ValueError("无效的 Access Key")
# 3. 检查时间戳(防重放)
try:
ts = int(timestamp)
except ValueError:
raise ValueError("无效的时间戳")
if abs(time.time() - ts) > REPLAY_WINDOW:
raise ValueError("请求已过期(时间戳超出窗口)")
# 4. 重新计算签名
body_hash = hashlib.sha256(body.encode()).hexdigest()
string_to_sign = f"{method.upper()}\n{path}\n{timestamp}\n{body_hash}"
expected = hmac.new(
secret_key.encode(),
string_to_sign.encode(),
hashlib.sha256,
).hexdigest()
# 5. 恒定时间比较(防时序攻击)
if not hmac.compare_digest(signature, expected):
raise ValueError("签名不匹配")
return access_key
# --- FastAPI 中间件 ---
@app.middleware("http")
async def hmac_auth_middleware(request: Request, call_next):
if request.url.path.startswith("/api/"):
body = (await request.body()).decode()
try:
access_key = verify_signature(
method=request.method,
path=request.url.path,
body=body,
headers=dict(request.headers),
)
request.state.access_key = access_key
except ValueError as e:
raise HTTPException(status_code=401, detail=str(e))
response = await call_next(request)
return response
@app.post("/api/payment")
async def create_payment(request: Request):
return {
"message": "支付已创建",
"authenticated_by": request.state.access_key,
}
```
### Java: HMAC 签名 Filter
```java
import jakarta.servlet.*;
import jakarta.servlet.http.HttpServletRequest;
import jakarta.servlet.http.HttpServletResponse;
import org.springframework.stereotype.Component;
import javax.crypto.Mac;
import javax.crypto.spec.SecretKeySpec;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.security.MessageDigest;
import java.time.Instant;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
@Component
public class HmacAuthFilter implements Filter {
private static final long REPLAY_WINDOW = 300; // 5 分钟
private final Map secretKeys = new ConcurrentHashMap<>();
public HmacAuthFilter() {
secretKeys.put("AK001", "sk-very-secret-key-001");
secretKeys.put("AK002", "sk-very-secret-key-002");
}
@Override
public void doFilter(ServletRequest req, ServletResponse res,
FilterChain chain) throws IOException, ServletException {
HttpServletRequest request = (HttpServletRequest) req;
HttpServletResponse response = (HttpServletResponse) res;
if (!request.getRequestURI().startsWith("/api/")) {
chain.doFilter(req, res);
return;
}
String accessKey = request.getHeader("X-Access-Key");
String timestamp = request.getHeader("X-Timestamp");
String signature = request.getHeader("X-Signature");
// 1. 检查必要 Header
if (accessKey == null || timestamp == null || signature == null) {
sendError(response, 401, "缺少签名 Header");
return;
}
// 2. 查找 Secret Key
String secretKey = secretKeys.get(accessKey);
if (secretKey == null) {
sendError(response, 401, "无效的 Access Key");
return;
}
// 3. 检查时间戳
long ts;
try {
ts = Long.parseLong(timestamp);
} catch (NumberFormatException e) {
sendError(response, 401, "无效的时间戳");
return;
}
if (Math.abs(Instant.now().getEpochSecond() - ts) > REPLAY_WINDOW) {
sendError(response, 401, "请求已过期");
return;
}
// 4. 读取 Body 并计算签名
CachedBodyHttpServletRequest cachedRequest =
new CachedBodyHttpServletRequest(request);
String body = new String(cachedRequest.getCachedBody(),
StandardCharsets.UTF_8);
String bodyHash = sha256(body);
String stringToSign = String.join("\n",
request.getMethod(), request.getRequestURI(), timestamp, bodyHash);
String expected = hmacSha256(secretKey, stringToSign);
// 5. 恒定时间比较
if (!MessageDigest.isEqual(
signature.getBytes(StandardCharsets.UTF_8),
expected.getBytes(StandardCharsets.UTF_8))) {
sendError(response, 401, "签名不匹配");
return;
}
cachedRequest.setAttribute("access_key", accessKey);
chain.doFilter(cachedRequest, response);
}
private String sha256(String input) {
try {
MessageDigest digest = MessageDigest.getInstance("SHA-256");
byte[] hash = digest.digest(input.getBytes(StandardCharsets.UTF_8));
return bytesToHex(hash);
} catch (Exception e) { throw new RuntimeException(e); }
}
private String hmacSha256(String key, String data) {
try {
Mac mac = Mac.getInstance("HmacSHA256");
mac.init(new SecretKeySpec(
key.getBytes(StandardCharsets.UTF_8), "HmacSHA256"));
return bytesToHex(
mac.doFinal(data.getBytes(StandardCharsets.UTF_8)));
} catch (Exception e) { throw new RuntimeException(e); }
}
private String bytesToHex(byte[] bytes) {
StringBuilder sb = new StringBuilder();
for (byte b : bytes) sb.append(String.format("%02x", b));
return sb.toString();
}
private void sendError(HttpServletResponse response, int status,
String message) throws IOException {
response.setStatus(status);
response.setContentType("application/json");
response.getWriter().write(
String.format("{\"error\":\"%s\"}", message));
}
}
```
### Go: HMAC 签名中间件
```go
package middleware
import (
"bytes"
"crypto/hmac"
"crypto/sha256"
"encoding/hex"
"fmt"
"io"
"math"
"net/http"
"strconv"
"time"
"github.com/gin-gonic/gin"
)
// HmacConfig HMAC 签名配置
type HmacConfig struct {
SecretKeys map[string]string // access_key -> secret_key
ReplayWindow time.Duration // 防重放窗口
}
// HmacAuth Gin 中间件:HMAC 签名认证
func HmacAuth(cfg HmacConfig) gin.HandlerFunc {
return func(c *gin.Context) {
accessKey := c.GetHeader("X-Access-Key")
timestamp := c.GetHeader("X-Timestamp")
signature := c.GetHeader("X-Signature")
// 1. 检查必要 Header
if accessKey == "" || timestamp == "" || signature == "" {
c.AbortWithStatusJSON(http.StatusUnauthorized, gin.H{
"error": "缺少签名 Header",
})
return
}
// 2. 查找 Secret Key
secretKey, ok := cfg.SecretKeys[accessKey]
if !ok {
c.AbortWithStatusJSON(http.StatusUnauthorized, gin.H{
"error": "无效的 Access Key",
})
return
}
// 3. 检查时间戳
ts, err := strconv.ParseInt(timestamp, 10, 64)
if err != nil {
c.AbortWithStatusJSON(http.StatusUnauthorized, gin.H{
"error": "无效的时间戳",
})
return
}
diff := math.Abs(float64(time.Now().Unix() - ts))
if diff > cfg.ReplayWindow.Seconds() {
c.AbortWithStatusJSON(http.StatusUnauthorized, gin.H{
"error": "请求已过期",
})
return
}
// 4. 读取 Body
bodyBytes, err := io.ReadAll(c.Request.Body)
if err != nil {
c.AbortWithStatusJSON(http.StatusBadRequest, gin.H{
"error": "无法读取请求体",
})
return
}
// 重新设置 Body(供后续 Handler 使用)
c.Request.Body = io.NopCloser(bytes.NewBuffer(bodyBytes))
// 5. 计算签名
bodyHash := sha256Hex(string(bodyBytes))
stringToSign := fmt.Sprintf("%s\n%s\n%s\n%s",
c.Request.Method, c.Request.URL.Path, timestamp, bodyHash)
mac := hmac.New(sha256.New, []byte(secretKey))
mac.Write([]byte(stringToSign))
expected := hex.EncodeToString(mac.Sum(nil))
// 6. 恒定时间比较
if !hmac.Equal([]byte(signature), []byte(expected)) {
c.AbortWithStatusJSON(http.StatusUnauthorized, gin.H{
"error": "签名不匹配",
})
return
}
c.Set("access_key", accessKey)
c.Next()
}
}
func sha256Hex(s string) string {
h := sha256.Sum256([]byte(s))
return hex.EncodeToString(h[:])
}
```
## 12.6 API Gateway 认证模式
### 集中认证 vs 分布式认证
```{mermaid}
flowchart LR
subgraph 集中认证(推荐)
C1[客户端] --> GW1[API Gateway
① 验证 Token
② 速率限制
③ 注入用户信息]
GW1 --> MS1[微服务
信任 GW 的转发]
end
subgraph 分布式认证
C2[客户端] --> GW2[API Gateway
仅路由转发]
GW2 --> MS2[微服务
自己验证 Token]
end
```
### Token 内省 vs 本地验证
| 方式 | Token 内省(Introspection) | 本地验证(JWT) |
|------|---------------------------|----------------|
| 原理 | 调用授权服务器验证 Token | 本地用公钥验证签名 |
| 延迟 | 高(网络调用) | 低(本地计算) |
| 实时性 | 实时(可检查撤销) | 延迟(依赖过期时间) |
| 依赖 | 授权服务器必须可用 | 仅需公钥(JWKS) |
| 适用 | Opaque Token | JWT |
## 12.7 Service-to-Service 认证
```{mermaid}
flowchart TB
L1[Level 1: 无认证
内网信任 ❌ 不安全]
L2[Level 2: Service Account + Token
静态凭证,需手动轮换]
L3[Level 3: Service Mesh mTLS
自动证书管理,透明加密]
L4[Level 4: SPIFFE/SPIRE
统一工作负载身份,跨平台,自动轮换]
L1 -->|演进| L2
L2 -->|演进| L3
L3 -->|演进| L4
style L1 fill:#ffcccc
style L2 fill:#ffffcc
style L3 fill:#ccffcc
style L4 fill:#ccccff
```
## 12.8 API 认证方式对比
| 方式 | 安全性 | 复杂度 | 防重放 | 防篡改 | 携带身份 | 适用场景 |
|------|--------|--------|--------|--------|---------|---------|
| **API Key** | ⭐⭐ | 低 | ❌ | ❌ | ❌ | 公开 API、低安全要求 |
| **Bearer Token (JWT)** | ⭐⭐⭐⭐ | 中 | ✅(过期) | ✅(签名) | ✅ | Web/Mobile API |
| **HMAC 签名** | ⭐⭐⭐⭐ | 高 | ✅(时间戳) | ✅(签名) | ❌ | 高安全 API(支付) |
| **mTLS** | ⭐⭐⭐⭐⭐ | 高 | ✅(TLS) | ✅(TLS) | ✅(证书) | 服务间通信 |
| **SPIFFE SVID** | ⭐⭐⭐⭐⭐ | 中 | ✅ | ✅ | ✅ | 云原生服务间通信 |
## 12.9 认证方式选型指南
### 按场景选型
| 场景 | 推荐方式 | 理由 |
|------|---------|------|
| 公开数据 API(天气、汇率) | API Key | 简单,仅用于标识和限流 |
| 用户数据 API(SaaS) | OAuth2 Bearer Token | 标准化,携带用户身份和权限 |
| 支付/金融 API | HMAC 签名 + TLS | 请求完整性验证,防篡改 |
| 微服务间通信(同集群) | mTLS(Service Mesh) | 零信任,自动证书管理 |
| 微服务间通信(跨云) | SPIFFE/SPIRE | 统一身份,跨平台 |
| 第三方 Webhook | HMAC 签名 | 验证来源真实性 |
| 移动端 API | OAuth2 + PKCE | 公共客户端安全授权 |
### 组合使用
在实际生产环境中,通常需要组合多种认证方式:
```{mermaid}
flowchart TD
EXT[外部请求] --> GW[API Gateway]
GW -->|Bearer Token 验证| AUTH[认证服务]
GW -->|通过| MESH[Service Mesh]
MESH -->|mTLS| SVC_A[服务 A]
MESH -->|mTLS| SVC_B[服务 B]
SVC_A -->|HMAC 签名| PAY[支付服务]
SVC_B -->|mTLS| DB[数据库]
style GW fill:#f9f,stroke:#333
style MESH fill:#bbf,stroke:#333
```
**典型架构**:
- **外部 → Gateway**:OAuth2 Bearer Token(用户认证)
- **Gateway → 微服务**:mTLS(Service Mesh 自动管理)
- **微服务 → 敏感服务**:HMAC 签名(额外的请求完整性保护)
## 12.10 小结
- **Bearer Token(JWT)** 是最通用的 API 认证方式
- **mTLS** 提供最强的安全保障,适合服务间通信
- **API Gateway 集中认证** 简化了微服务的认证逻辑
- **HMAC 签名** 适合需要请求完整性验证的高安全场景
- **SPIFFE/SPIRE** 是云原生时代服务间认证的最佳实践(详见第四部分)
- 选择认证方式时需权衡:安全性、复杂度、性能、运维成本
- 生产环境通常**组合使用**多种认证方式,形成纵深防御