第九章:JWT 深入解析
“JWT 不是银弹,但用对了就是利器。”
mindmap
root((JWT))
结构
Header
Payload
Signature
签名算法
HS256
RS256
ES256
EdDSA
安全陷阱
alg none
密钥混淆
无法撤销
最佳实践
短过期
JWKS 轮换
Audience 验证
9.1 JWT 结构
JWT(JSON Web Token)由三部分组成,用点号分隔:
eyJhbGciOiJSUzI1NiIsInR5cCI6IkpXVCJ9.
eyJzdWIiOiJ1c2VyLTEyMyIsIm5hbWUiOiJXYWx0ZXIiLCJyb2xlcyI6WyJhZG1pbiJdLCJleHAiOjE3MDk1MTA0MDB9.
SflKxwRJSMeKKF2QT4fwpMeJf36POk6yJV_adQssw5c
flowchart LR
subgraph JWT["JWT = Header . Payload . Signature"]
direction LR
H["🔵 Header<br/>(Base64URL)<br/>算法 + 类型 + kid"]
P["🟢 Payload<br/>(Base64URL)<br/>Claims 声明"]
S["🔴 Signature<br/>(Base64URL)<br/>数字签名"]
H --- |"."| P --- |"."| S
end
subgraph Header_Detail["Header 详情"]
H1["alg: RS256"]
H2["typ: JWT"]
H3["kid: key-2024"]
end
subgraph Payload_Detail["Payload 详情"]
P1["iss: 签发者"]
P2["sub: 主体"]
P3["aud: 受众"]
P4["exp: 过期时间"]
P5["jti: Token ID"]
P6["自定义 Claims"]
end
subgraph Signature_Detail["Signature 生成"]
S1["RSASHA256(<br/>base64url(header) + '.' +<br/>base64url(payload),<br/>privateKey<br/>)"]
end
H --> Header_Detail
P --> Payload_Detail
S --> Signature_Detail
Header
{
"alg": "RS256", // 签名算法
"typ": "JWT", // Token 类型
"kid": "key-2024" // 密钥 ID(用于 JWKS 轮换)
}
Payload(Claims)
{
"iss": "https://auth.example.com", // 签发者
"sub": "user-123", // 主体
"aud": "my-api", // 受众
"exp": 1709510400, // 过期时间
"nbf": 1709506800, // 生效时间
"iat": 1709506800, // 签发时间
"jti": "unique-token-id", // Token ID
"name": "Walter", // 自定义 Claim
"roles": ["admin", "editor"] // 自定义 Claim
}
9.2 签名算法选择
算法 |
类型 |
密钥 |
适用场景 |
推荐 |
|---|---|---|---|---|
HS256 |
对称 |
共享密钥 |
单体应用 |
⚠️ 简单场景 |
RS256 |
非对称 |
RSA 密钥对 |
微服务 |
✅ 通用 |
ES256 |
非对称 |
ECDSA P-256 |
高性能 |
✅ 推荐 |
EdDSA |
非对称 |
Ed25519 |
最新 |
✅ 最佳性能 |
Python 示例
import jwt
import time
from cryptography.hazmat.primitives.asymmetric import rsa, ec
from cryptography.hazmat.primitives import serialization
# === RS256 签名 ===
private_key = rsa.generate_private_key(public_exponent=65537, key_size=2048)
public_key = private_key.public_key()
token = jwt.encode(
{"sub": "user-123", "roles": ["admin"], "exp": 1709510400},
private_key,
algorithm="RS256",
headers={"kid": "key-2024"}
)
# 验证(只需要公钥)
decoded = jwt.decode(
token, public_key,
algorithms=["RS256"],
audience="my-api", # 验证 audience
issuer="https://auth.example.com" # 验证 issuer
)
# === ES256 签名(更快、更小) ===
ec_private = ec.generate_private_key(ec.SECP256R1())
ec_public = ec_private.public_key()
ec_token = jwt.encode(
{
"sub": "user-123",
"iss": "https://auth.example.com",
"aud": "my-api",
"exp": int(time.time()) + 3600,
"iat": int(time.time()),
"jti": "unique-id-001",
},
ec_private,
algorithm="ES256",
headers={"kid": "ec-key-2024"},
)
ec_decoded = jwt.decode(
ec_token,
ec_public,
algorithms=["ES256"],
audience="my-api",
issuer="https://auth.example.com",
)
print(f"ES256 decoded: {ec_decoded}")
Java 示例(jjwt)
// build.gradle
// implementation 'io.jsonwebtoken:jjwt-api:0.12.5'
// runtimeOnly 'io.jsonwebtoken:jjwt-impl:0.12.5'
// runtimeOnly 'io.jsonwebtoken:jjwt-jackson:0.12.5'
import io.jsonwebtoken.*;
import io.jsonwebtoken.security.Keys;
import java.security.KeyPair;
import java.security.KeyPairGenerator;
import java.security.interfaces.RSAPrivateKey;
import java.security.interfaces.RSAPublicKey;
import java.time.Instant;
import java.util.Date;
import java.util.List;
import java.util.UUID;
public class JwtRS256Example {
public static void main(String[] args) throws Exception {
// --- 生成 RSA 密钥对 ---
KeyPairGenerator kpg = KeyPairGenerator.getInstance("RSA");
kpg.initialize(2048);
KeyPair keyPair = kpg.generateKeyPair();
RSAPrivateKey privateKey = (RSAPrivateKey) keyPair.getPrivate();
RSAPublicKey publicKey = (RSAPublicKey) keyPair.getPublic();
// --- 签发 JWT ---
Instant now = Instant.now();
String token = Jwts.builder()
.header().keyId("key-2024").and() // kid
.issuer("https://auth.example.com") // iss
.subject("user-123") // sub
.audience().add("my-api").and() // aud
.issuedAt(Date.from(now)) // iat
.expiration(Date.from(now.plusSeconds(3600))) // exp = 1 小时
.id(UUID.randomUUID().toString()) // jti
.claim("roles", List.of("admin", "editor"))
.signWith(privateKey, Jwts.SIG.RS256) // RS256 签名
.compact();
System.out.println("JWT: " + token);
// --- 验证 JWT ---
try {
Jws<Claims> jws = Jwts.parser()
.requireIssuer("https://auth.example.com")
.requireAudience("my-api")
.verifyWith(publicKey)
.build()
.parseSignedClaims(token);
Claims claims = jws.getPayload();
System.out.println("Subject : " + claims.getSubject());
System.out.println("Roles : " + claims.get("roles", List.class));
System.out.println("Expires : " + claims.getExpiration());
} catch (ExpiredJwtException e) {
System.err.println("Token 已过期: " + e.getMessage());
} catch (JwtException e) {
System.err.println("Token 验证失败: " + e.getMessage());
}
}
}
Go 示例(golang-jwt/jwt/v5)
package main
import (
"crypto/ecdsa"
"crypto/elliptic"
"crypto/rand"
"crypto/rsa"
"fmt"
"log"
"time"
"github.com/golang-jwt/jwt/v5"
"github.com/google/uuid"
)
// CustomClaims 自定义 Claims 结构
type CustomClaims struct {
Roles []string `json:"roles"`
jwt.RegisteredClaims
}
func main() {
// ========== RS256 示例 ==========
rsaKey, err := rsa.GenerateKey(rand.Reader, 2048)
if err != nil {
log.Fatalf("生成 RSA 密钥失败: %v", err)
}
now := time.Now()
rsClaims := CustomClaims{
Roles: []string{"admin", "editor"},
RegisteredClaims: jwt.RegisteredClaims{
Issuer: "https://auth.example.com",
Subject: "user-123",
Audience: jwt.ClaimStrings{"my-api"},
ExpiresAt: jwt.NewNumericDate(now.Add(1 * time.Hour)),
IssuedAt: jwt.NewNumericDate(now),
ID: uuid.NewString(),
},
}
rsToken := jwt.NewWithClaims(jwt.SigningMethodRS256, rsClaims)
rsToken.Header["kid"] = "key-2024"
rsTokenStr, err := rsToken.SignedString(rsaKey)
if err != nil {
log.Fatalf("RS256 签名失败: %v", err)
}
fmt.Println("RS256 JWT:", rsTokenStr)
// RS256 验证
parsed, err := jwt.ParseWithClaims(rsTokenStr, &CustomClaims{},
func(t *jwt.Token) (interface{}, error) {
if _, ok := t.Method.(*jwt.SigningMethodRSA); !ok {
return nil, fmt.Errorf("不支持的签名算法: %v", t.Header["alg"])
}
return &rsaKey.PublicKey, nil
},
jwt.WithIssuer("https://auth.example.com"),
jwt.WithAudience("my-api"),
jwt.WithExpirationRequired(),
)
if err != nil {
log.Fatalf("RS256 验证失败: %v", err)
}
if claims, ok := parsed.Claims.(*CustomClaims); ok && parsed.Valid {
fmt.Printf("Subject: %s, Roles: %v\n", claims.Subject, claims.Roles)
}
// ========== ES256 示例 ==========
ecKey, err := ecdsa.GenerateKey(elliptic.P256(), rand.Reader)
if err != nil {
log.Fatalf("生成 ECDSA 密钥失败: %v", err)
}
esClaims := CustomClaims{
Roles: []string{"viewer"},
RegisteredClaims: jwt.RegisteredClaims{
Issuer: "https://auth.example.com",
Subject: "user-456",
Audience: jwt.ClaimStrings{"my-api"},
ExpiresAt: jwt.NewNumericDate(now.Add(30 * time.Minute)),
IssuedAt: jwt.NewNumericDate(now),
ID: uuid.NewString(),
},
}
esToken := jwt.NewWithClaims(jwt.SigningMethodES256, esClaims)
esToken.Header["kid"] = "ec-key-2024"
esTokenStr, err := esToken.SignedString(ecKey)
if err != nil {
log.Fatalf("ES256 签名失败: %v", err)
}
fmt.Println("ES256 JWT:", esTokenStr)
// ES256 验证
parsedES, err := jwt.ParseWithClaims(esTokenStr, &CustomClaims{},
func(t *jwt.Token) (interface{}, error) {
if _, ok := t.Method.(*jwt.SigningMethodECDSA); !ok {
return nil, fmt.Errorf("不支持的签名算法: %v", t.Header["alg"])
}
return &ecKey.PublicKey, nil
},
jwt.WithIssuer("https://auth.example.com"),
jwt.WithAudience("my-api"),
)
if err != nil {
log.Fatalf("ES256 验证失败: %v", err)
}
if claims, ok := parsedES.Claims.(*CustomClaims); ok && parsedES.Valid {
fmt.Printf("ES256 Subject: %s, Roles: %v\n", claims.Subject, claims.Roles)
}
}
9.3 JWT 签发与验证流程
sequenceDiagram
autonumber
participant C as 客户端
participant Auth as 认证服务
participant API as API 服务
participant JWKS as JWKS 端点
C->>Auth: POST /login {username, password}
Auth->>Auth: 验证凭证
Auth->>Auth: 生成 JWT(私钥签名)
Auth-->>C: 200 {access_token, refresh_token, expires_in}
C->>API: GET /resource<br/>Authorization: Bearer <access_token>
API->>API: 解码 JWT Header,提取 kid
API->>JWKS: GET /.well-known/jwks.json(首次/缓存过期)
JWKS-->>API: {keys: [{kid, kty, n, e, ...}]}
API->>API: 用 kid 匹配公钥
API->>API: 验证签名 + exp + iss + aud
alt 验证通过
API-->>C: 200 {data}
else Token 过期或无效
API-->>C: 401 Unauthorized
end
9.4 JWT 安全陷阱
陷阱 1:alg:none 攻击
# ❌ 攻击者构造无签名的 JWT
malicious_header = {"alg": "none", "typ": "JWT"}
malicious_payload = {"sub": "admin", "roles": ["superadmin"]}
# ✅ 防御:始终指定允许的算法
decoded = jwt.decode(
token, public_key,
algorithms=["RS256"], # 明确指定,拒绝 "none"
)
陷阱 2:密钥混淆攻击
# ❌ 攻击者用 RS256 的公钥作为 HS256 的密钥
# 如果服务端不限制算法,攻击者可以:
# 1. 获取 RS256 公钥(公开的)
# 2. 用公钥作为 HS256 密钥签名
# 3. 服务端用同一个公钥验证 HS256 → 通过!
# ✅ 防御:严格限制算法
decoded = jwt.decode(token, public_key, algorithms=["RS256"]) # 只接受 RS256
陷阱 3:Token 无法撤销
JWT 一旦签发就无法撤销(除非等它过期)。解决方案:
Python 实现
import redis
import time
r = redis.Redis()
def revoke_token(jti: str, exp: int):
"""将 Token 加入黑名单"""
ttl = exp - int(time.time())
if ttl > 0:
r.setex(f"revoked:{jti}", ttl, "1")
def is_token_revoked(jti: str) -> bool:
"""检查 Token 是否已被撤销"""
return r.exists(f"revoked:{jti}")
def verify_token(token: str):
payload = jwt.decode(token, public_key, algorithms=["RS256"])
if is_token_revoked(payload.get("jti", "")):
raise Exception("Token has been revoked")
return payload
Java 实现(Redis 黑名单)
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Service;
import java.time.Duration;
import java.time.Instant;
/**
* JWT 黑名单服务 —— 使用 Redis 存储已撤销的 Token ID (jti)。
* TTL 与 Token 剩余有效期一致,过期后自动清除。
*/
@Service
public class JwtBlacklistService {
private static final String PREFIX = "revoked:";
private final StringRedisTemplate redis;
public JwtBlacklistService(StringRedisTemplate redis) {
this.redis = redis;
}
/** 撤销 Token:将 jti 写入 Redis,TTL = Token 剩余有效期 */
public void revoke(String jti, Instant expiration) {
Duration ttl = Duration.between(Instant.now(), expiration);
if (!ttl.isNegative() && !ttl.isZero()) {
redis.opsForValue().set(PREFIX + jti, "1", ttl);
}
}
/** 检查 Token 是否已被撤销 */
public boolean isRevoked(String jti) {
return Boolean.TRUE.equals(redis.hasKey(PREFIX + jti));
}
}
Go 实现(Redis 黑名单)
package auth
import (
"context"
"fmt"
"time"
"github.com/redis/go-redis/v9"
)
// JWTBlacklist 使用 Redis 管理已撤销的 JWT
type JWTBlacklist struct {
rdb *redis.Client
}
func NewJWTBlacklist(rdb *redis.Client) *JWTBlacklist {
return &JWTBlacklist{rdb: rdb}
}
// Revoke 将 jti 加入黑名单,TTL 与 Token 剩余有效期一致
func (b *JWTBlacklist) Revoke(ctx context.Context, jti string, exp time.Time) error {
ttl := time.Until(exp)
if ttl <= 0 {
return nil // 已过期,无需撤销
}
return b.rdb.Set(ctx, fmt.Sprintf("revoked:%s", jti), "1", ttl).Err()
}
// IsRevoked 检查 Token 是否已被撤销
func (b *JWTBlacklist) IsRevoked(ctx context.Context, jti string) (bool, error) {
n, err := b.rdb.Exists(ctx, fmt.Sprintf("revoked:%s", jti)).Result()
if err != nil {
return false, err
}
return n > 0, nil
}
9.5 JWKS(JSON Web Key Set)
JWKS 用于发布公钥,支持密钥轮换:
// GET https://auth.example.com/.well-known/jwks.json
{
"keys": [
{
"kty": "RSA",
"kid": "key-2024",
"use": "sig",
"alg": "RS256",
"n": "0vx7agoebGcQSuuPiLJXZptN9nndrQmbXEps2aiAFbWhM...",
"e": "AQAB"
},
{
"kty": "RSA",
"kid": "key-2023",
"use": "sig",
"alg": "RS256",
"n": "...",
"e": "AQAB"
}
]
}
JWKS 密钥轮换流程
sequenceDiagram
autonumber
participant Admin as 运维 / 自动化
participant Auth as 认证服务
participant JWKS as JWKS 端点
participant API as API 服务(验证方)
Note over Admin,API: 阶段 1 — 生成新密钥并发布
Admin->>Auth: 生成新密钥对 (kid=key-2025)
Auth->>JWKS: 发布新 JWKS(包含 key-2024 + key-2025)
Note over JWKS: 新旧密钥并存
Note over Admin,API: 阶段 2 — 切换签发密钥
Auth->>Auth: 开始使用 key-2025 签发新 JWT
API->>JWKS: 刷新缓存,获取最新 JWKS
JWKS-->>API: {keys: [key-2024, key-2025]}
API->>API: 按 kid 匹配,两把密钥均可验证
Note over Admin,API: 阶段 3 — 旧 Token 自然过期后移除旧密钥
Admin->>Auth: 等待旧 Token 全部过期(≥ max TTL)
Auth->>JWKS: 移除 key-2024,仅保留 key-2025
API->>JWKS: 刷新缓存
JWKS-->>API: {keys: [key-2025]}
Python JWKS 客户端
from jwt import PyJWKClient
# 自动从 JWKS 端点获取公钥
jwks_client = PyJWKClient("https://auth.example.com/.well-known/jwks.json")
def verify_with_jwks(token: str):
"""使用 JWKS 验证 JWT(支持密钥轮换)"""
signing_key = jwks_client.get_signing_key_from_jwt(token)
return jwt.decode(
token,
signing_key.key,
algorithms=["RS256"],
audience="my-api",
)
Java JWKS 客户端(带缓存)
import com.auth0.jwk.*;
import com.auth0.jwt.JWT;
import com.auth0.jwt.algorithms.Algorithm;
import com.auth0.jwt.interfaces.DecodedJWT;
import java.net.URL;
import java.security.interfaces.RSAPublicKey;
import java.util.concurrent.TimeUnit;
/**
* JWKS 公钥获取与缓存。
* 使用 com.auth0:jwks-rsa:0.22.1 和 com.auth0:java-jwt:4.4.0
*
* build.gradle:
* implementation 'com.auth0:java-jwt:4.4.0'
* implementation 'com.auth0:jwks-rsa:0.22.1'
*/
public class JwksVerifier {
private final JwkProvider provider;
public JwksVerifier(String jwksUrl) throws Exception {
// 缓存 10 个密钥,有效期 24 小时;限速每分钟最多 10 次请求
this.provider = new JwkProviderBuilder(new URL(jwksUrl))
.cached(10, 24, TimeUnit.HOURS)
.rateLimited(10, 1, TimeUnit.MINUTES)
.build();
}
/**
* 验证 JWT:从 JWKS 获取公钥,校验签名 + 标准 Claims
*/
public DecodedJWT verify(String token) throws Exception {
DecodedJWT unverified = JWT.decode(token);
String kid = unverified.getKeyId();
if (kid == null || kid.isBlank()) {
throw new IllegalArgumentException("JWT 缺少 kid");
}
Jwk jwk = provider.get(kid);
RSAPublicKey publicKey = (RSAPublicKey) jwk.getPublicKey();
return JWT.require(Algorithm.RSA256(publicKey, null))
.withIssuer("https://auth.example.com")
.withAudience("my-api")
.build()
.verify(token);
}
public static void main(String[] args) throws Exception {
JwksVerifier verifier = new JwksVerifier(
"https://auth.example.com/.well-known/jwks.json");
// DecodedJWT jwt = verifier.verify(someToken);
System.out.println("JwksVerifier 初始化成功");
}
}
Go JWKS 客户端(带缓存)
package auth
import (
"context"
"errors"
"fmt"
"sync"
"time"
"github.com/golang-jwt/jwt/v5"
"github.com/lestrrat-go/jwx/v2/jwk"
)
// JWKSClient 从远程 JWKS 端点获取公钥,内置自动刷新缓存
type JWKSClient struct {
cache *jwk.Cache
jwksURL string
mu sync.RWMutex
lastRefresh time.Time
minInterval time.Duration // 最小刷新间隔,防止频繁请求
}
// NewJWKSClient 创建 JWKS 客户端,refreshInterval 为自动刷新周期
func NewJWKSClient(ctx context.Context, jwksURL string, refreshInterval time.Duration) (*JWKSClient, error) {
c := jwk.NewCache(ctx)
err := c.Register(jwksURL, jwk.WithRefreshInterval(refreshInterval))
if err != nil {
return nil, fmt.Errorf("注册 JWKS URL 失败: %w", err)
}
// 预热缓存
_, err = c.Refresh(ctx, jwksURL)
if err != nil {
return nil, fmt.Errorf("预热 JWKS 缓存失败: %w", err)
}
return &JWKSClient{
cache: c,
jwksURL: jwksURL,
minInterval: 5 * time.Minute,
}, nil
}
// KeyFunc 返回 jwt.ParseWithClaims 所需的 KeyFunc
func (j *JWKSClient) KeyFunc() jwt.Keyfunc {
return func(t *jwt.Token) (interface{}, error) {
kid, ok := t.Header["kid"].(string)
if !ok || kid == "" {
return nil, errors.New("JWT 缺少 kid")
}
set, err := j.cache.Get(context.Background(), j.jwksURL)
if err != nil {
return nil, fmt.Errorf("获取 JWKS 失败: %w", err)
}
key, found := set.LookupKeyID(kid)
if !found {
return nil, fmt.Errorf("kid=%s 未找到", kid)
}
var rawKey interface{}
if err := key.Raw(&rawKey); err != nil {
return nil, fmt.Errorf("提取原始密钥失败: %w", err)
}
return rawKey, nil
}
}
// Verify 验证 JWT 并返回自定义 Claims
func (j *JWKSClient) Verify(tokenStr string) (*CustomClaims, error) {
token, err := jwt.ParseWithClaims(tokenStr, &CustomClaims{}, j.KeyFunc(),
jwt.WithIssuer("https://auth.example.com"),
jwt.WithAudience("my-api"),
jwt.WithExpirationRequired(),
)
if err != nil {
return nil, err
}
claims, ok := token.Claims.(*CustomClaims)
if !ok || !token.Valid {
return nil, errors.New("无效的 Token")
}
return claims, nil
}
9.6 JWT vs Session
维度 |
JWT |
Session |
|---|---|---|
存储位置 |
客户端 |
服务端 |
状态 |
无状态 |
有状态 |
扩展性 |
天然分布式 |
需要共享存储 |
撤销 |
困难(需黑名单) |
简单(删除 Session) |
大小 |
较大(含 Claims) |
小(Session ID) |
适用场景 |
微服务、API |
传统 Web 应用 |
9.7 JWT 生命周期管理
Token 刷新流程
sequenceDiagram
autonumber
participant C as 客户端
participant Auth as 认证服务
participant DB as 数据库 / Redis
C->>Auth: POST /login {username, password}
Auth->>Auth: 验证凭证
Auth->>Auth: 生成 access_token (短期, 15min)
Auth->>Auth: 生成 refresh_token (长期, 7d)
Auth->>DB: 存储 refresh_token 元数据 (jti, user_id, exp)
Auth-->>C: {access_token, refresh_token, expires_in: 900}
Note over C: access_token 过期...
C->>Auth: POST /refresh {refresh_token}
Auth->>Auth: 验证 refresh_token 签名 + exp
Auth->>DB: 检查 refresh_token 是否已撤销
alt refresh_token 有效
Auth->>Auth: 生成新 access_token
Auth->>Auth: (可选) 轮换 refresh_token
Auth->>DB: (可选) 撤销旧 refresh_token,存储新的
Auth-->>C: {access_token, refresh_token, expires_in: 900}
else refresh_token 无效或已撤销
Auth-->>C: 401 请重新登录
end
Note over C: 用户主动登出
C->>Auth: POST /logout {refresh_token}
Auth->>DB: 撤销 refresh_token
Auth->>DB: (可选) 将 access_token jti 加入黑名单
Auth-->>C: 200 OK
Python 完整实现(FastAPI)
"""
FastAPI JWT 认证完整实现
依赖: pip install fastapi uvicorn python-jose[cryptography] redis passlib[bcrypt]
"""
import time
import uuid
from datetime import datetime, timedelta, timezone
from typing import Optional
import redis
from fastapi import Depends, FastAPI, HTTPException, status
from fastapi.security import HTTPAuthorizationCredentials, HTTPBearer
from jose import JWTError, jwt
from passlib.context import CryptContext
from pydantic import BaseModel
# ============================================================
# 配置
# ============================================================
ACCESS_TOKEN_EXPIRE_MINUTES = 15
REFRESH_TOKEN_EXPIRE_DAYS = 7
ALGORITHM = "RS256"
ISSUER = "https://auth.example.com"
AUDIENCE = "my-api"
# 生产环境应从安全存储加载密钥
from cryptography.hazmat.primitives.asymmetric import rsa
from cryptography.hazmat.primitives import serialization
_private_key = rsa.generate_private_key(public_exponent=65537, key_size=2048)
PRIVATE_KEY = _private_key.private_bytes(
serialization.Encoding.PEM,
serialization.PrivateFormat.PKCS8,
serialization.NoEncryption(),
)
PUBLIC_KEY = _private_key.public_key().public_bytes(
serialization.Encoding.PEM,
serialization.SubjectPublicKeyInfo,
)
# ============================================================
# 基础设施
# ============================================================
app = FastAPI(title="JWT Auth Demo")
security = HTTPBearer()
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
r = redis.Redis(host="localhost", port=6379, db=0, decode_responses=True)
# 模拟用户数据库
FAKE_USERS_DB = {
"walter": {
"username": "walter",
"hashed_password": pwd_context.hash("secret123"),
"roles": ["admin", "editor"],
}
}
# ============================================================
# 数据模型
# ============================================================
class LoginRequest(BaseModel):
username: str
password: str
class TokenResponse(BaseModel):
access_token: str
refresh_token: str
token_type: str = "bearer"
expires_in: int = ACCESS_TOKEN_EXPIRE_MINUTES * 60
class RefreshRequest(BaseModel):
refresh_token: str
# ============================================================
# Token 工具函数
# ============================================================
def create_access_token(sub: str, roles: list[str]) -> str:
"""签发短期 access_token"""
now = datetime.now(timezone.utc)
payload = {
"iss": ISSUER,
"sub": sub,
"aud": AUDIENCE,
"exp": now + timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES),
"iat": now,
"jti": str(uuid.uuid4()),
"roles": roles,
"type": "access",
}
return jwt.encode(payload, PRIVATE_KEY, algorithm=ALGORITHM, headers={"kid": "key-2024"})
def create_refresh_token(sub: str) -> str:
"""签发长期 refresh_token"""
now = datetime.now(timezone.utc)
jti = str(uuid.uuid4())
payload = {
"iss": ISSUER,
"sub": sub,
"aud": AUDIENCE,
"exp": now + timedelta(days=REFRESH_TOKEN_EXPIRE_DAYS),
"iat": now,
"jti": jti,
"type": "refresh",
}
token = jwt.encode(payload, PRIVATE_KEY, algorithm=ALGORITHM)
# 将 refresh_token 元数据存入 Redis(用于撤销检查)
ttl = REFRESH_TOKEN_EXPIRE_DAYS * 86400
r.setex(f"refresh:{jti}", ttl, sub)
return token
def revoke_token(jti: str, exp: int):
"""将 Token 加入黑名单"""
ttl = exp - int(time.time())
if ttl > 0:
r.setex(f"revoked:{jti}", ttl, "1")
def is_token_revoked(jti: str) -> bool:
"""检查 Token 是否已被撤销"""
return bool(r.exists(f"revoked:{jti}"))
# ============================================================
# JWT 中间件 / 依赖
# ============================================================
def get_current_user(credentials: HTTPAuthorizationCredentials = Depends(security)) -> dict:
"""FastAPI 依赖:验证 access_token 并返回用户信息"""
token = credentials.credentials
try:
payload = jwt.decode(
token,
PUBLIC_KEY,
algorithms=[ALGORITHM],
audience=AUDIENCE,
issuer=ISSUER,
)
except JWTError as e:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail=f"Token 验证失败: {e}",
)
# 检查 Token 类型
if payload.get("type") != "access":
raise HTTPException(status_code=401, detail="需要 access_token")
# 检查黑名单
jti = payload.get("jti", "")
if is_token_revoked(jti):
raise HTTPException(status_code=401, detail="Token 已被撤销")
return payload
# ============================================================
# 路由
# ============================================================
@app.post("/login", response_model=TokenResponse)
def login(req: LoginRequest):
"""用户登录,返回 access_token + refresh_token"""
user = FAKE_USERS_DB.get(req.username)
if not user or not pwd_context.verify(req.password, user["hashed_password"]):
raise HTTPException(status_code=401, detail="用户名或密码错误")
return TokenResponse(
access_token=create_access_token(req.username, user["roles"]),
refresh_token=create_refresh_token(req.username),
)
@app.post("/refresh", response_model=TokenResponse)
def refresh(req: RefreshRequest):
"""使用 refresh_token 获取新的 access_token(含 refresh_token 轮换)"""
try:
payload = jwt.decode(
req.refresh_token,
PUBLIC_KEY,
algorithms=[ALGORITHM],
audience=AUDIENCE,
issuer=ISSUER,
)
except JWTError as e:
raise HTTPException(status_code=401, detail=f"refresh_token 无效: {e}")
if payload.get("type") != "refresh":
raise HTTPException(status_code=401, detail="需要 refresh_token")
jti = payload.get("jti", "")
sub = payload.get("sub", "")
# 检查 refresh_token 是否已被撤销
if not r.exists(f"refresh:{jti}"):
raise HTTPException(status_code=401, detail="refresh_token 已失效,请重新登录")
# 撤销旧 refresh_token(轮换)
r.delete(f"refresh:{jti}")
# 获取用户角色
user = FAKE_USERS_DB.get(sub, {})
roles = user.get("roles", [])
return TokenResponse(
access_token=create_access_token(sub, roles),
refresh_token=create_refresh_token(sub),
)
@app.post("/logout")
def logout(
req: RefreshRequest,
current_user: dict = Depends(get_current_user),
):
"""登出:撤销 refresh_token 和当前 access_token"""
# 撤销 refresh_token
try:
rt_payload = jwt.decode(
req.refresh_token, PUBLIC_KEY,
algorithms=[ALGORITHM], audience=AUDIENCE, issuer=ISSUER,
)
r.delete(f"refresh:{rt_payload.get('jti', '')}")
except JWTError:
pass # refresh_token 无效也继续
# 将当前 access_token 加入黑名单
revoke_token(current_user["jti"], current_user["exp"])
return {"message": "已登出"}
@app.get("/protected")
def protected_resource(user: dict = Depends(get_current_user)):
"""受保护的资源端点"""
return {
"message": f"Hello, {user['sub']}!",
"roles": user.get("roles", []),
}
Java 完整实现(Spring Boot JWT Filter)
// ============================================================
// JwtProperties.java — 配置属性
// ============================================================
package com.example.auth.config;
import org.springframework.boot.context.properties.ConfigurationProperties;
@ConfigurationProperties(prefix = "jwt")
public record JwtProperties(
String publicKeyPath, // RSA 公钥 PEM 路径
String privateKeyPath, // RSA 私钥 PEM 路径
String issuer, // e.g. "https://auth.example.com"
String audience, // e.g. "my-api"
long accessTtlSeconds, // access_token 有效期(秒)
long refreshTtlSeconds // refresh_token 有效期(秒)
) {}
// ============================================================
// JwtService.java — JWT 签发与验证核心服务
// ============================================================
package com.example.auth.service;
import io.jsonwebtoken.*;
import io.jsonwebtoken.security.SignatureException;
import org.springframework.stereotype.Service;
import java.security.interfaces.RSAPrivateKey;
import java.security.interfaces.RSAPublicKey;
import java.time.Instant;
import java.util.*;
@Service
public class JwtService {
private final RSAPrivateKey privateKey;
private final RSAPublicKey publicKey;
private final String issuer;
private final String audience;
private final long accessTtl;
private final long refreshTtl;
public JwtService(RSAPrivateKey privateKey, RSAPublicKey publicKey,
String issuer, String audience,
long accessTtl, long refreshTtl) {
this.privateKey = privateKey;
this.publicKey = publicKey;
this.issuer = issuer;
this.audience = audience;
this.accessTtl = accessTtl;
this.refreshTtl = refreshTtl;
}
/** 签发 access_token */
public String createAccessToken(String subject, List<String> roles) {
Instant now = Instant.now();
return Jwts.builder()
.header().keyId("key-2024").and()
.issuer(issuer)
.subject(subject)
.audience().add(audience).and()
.issuedAt(Date.from(now))
.expiration(Date.from(now.plusSeconds(accessTtl)))
.id(UUID.randomUUID().toString())
.claim("roles", roles)
.claim("type", "access")
.signWith(privateKey, Jwts.SIG.RS256)
.compact();
}
/** 签发 refresh_token */
public String createRefreshToken(String subject) {
Instant now = Instant.now();
return Jwts.builder()
.issuer(issuer)
.subject(subject)
.audience().add(audience).and()
.issuedAt(Date.from(now))
.expiration(Date.from(now.plusSeconds(refreshTtl)))
.id(UUID.randomUUID().toString())
.claim("type", "refresh")
.signWith(privateKey, Jwts.SIG.RS256)
.compact();
}
/** 验证并解析 JWT,返回 Claims;失败抛出异常 */
public Claims verify(String token) {
try {
return Jwts.parser()
.requireIssuer(issuer)
.requireAudience(audience)
.verifyWith(publicKey)
.build()
.parseSignedClaims(token)
.getPayload();
} catch (ExpiredJwtException e) {
throw new RuntimeException("Token 已过期", e);
} catch (SignatureException e) {
throw new RuntimeException("签名验证失败", e);
} catch (JwtException e) {
throw new RuntimeException("Token 无效: " + e.getMessage(), e);
}
}
}
// ============================================================
// JwtAuthFilter.java — Spring Boot OncePerRequestFilter
// ============================================================
package com.example.auth.filter;
import com.example.auth.service.JwtBlacklistService;
import com.example.auth.service.JwtService;
import io.jsonwebtoken.Claims;
import jakarta.servlet.FilterChain;
import jakarta.servlet.ServletException;
import jakarta.servlet.http.HttpServletRequest;
import jakarta.servlet.http.HttpServletResponse;
import org.springframework.http.HttpHeaders;
import org.springframework.security.authentication.UsernamePasswordAuthenticationToken;
import org.springframework.security.core.authority.SimpleGrantedAuthority;
import org.springframework.security.core.context.SecurityContextHolder;
import org.springframework.stereotype.Component;
import org.springframework.web.filter.OncePerRequestFilter;
import java.io.IOException;
import java.util.List;
/**
* JWT 认证过滤器:
* 1. 从 Authorization: Bearer <token> 提取 JWT
* 2. 验证签名 + Claims
* 3. 检查黑名单
* 4. 设置 Spring Security 上下文
*/
@Component
public class JwtAuthFilter extends OncePerRequestFilter {
private final JwtService jwtService;
private final JwtBlacklistService blacklist;
public JwtAuthFilter(JwtService jwtService, JwtBlacklistService blacklist) {
this.jwtService = jwtService;
this.blacklist = blacklist;
}
@Override
protected void doFilterInternal(HttpServletRequest request,
HttpServletResponse response,
FilterChain chain)
throws ServletException, IOException {
String header = request.getHeader(HttpHeaders.AUTHORIZATION);
if (header == null || !header.startsWith("Bearer ")) {
chain.doFilter(request, response);
return;
}
String token = header.substring(7);
try {
Claims claims = jwtService.verify(token);
// 仅接受 access_token
if (!"access".equals(claims.get("type", String.class))) {
response.sendError(HttpServletResponse.SC_UNAUTHORIZED, "需要 access_token");
return;
}
// 检查黑名单
if (blacklist.isRevoked(claims.getId())) {
response.sendError(HttpServletResponse.SC_UNAUTHORIZED, "Token 已被撤销");
return;
}
// 提取角色
@SuppressWarnings("unchecked")
List<String> roles = claims.get("roles", List.class);
var authorities = (roles != null)
? roles.stream().map(r -> new SimpleGrantedAuthority("ROLE_" + r)).toList()
: List.<SimpleGrantedAuthority>of();
var auth = new UsernamePasswordAuthenticationToken(
claims.getSubject(), null, authorities);
SecurityContextHolder.getContext().setAuthentication(auth);
} catch (Exception e) {
response.sendError(HttpServletResponse.SC_UNAUTHORIZED, e.getMessage());
return;
}
chain.doFilter(request, response);
}
}
Go 完整实现(Gin JWT 中间件 + Token 刷新)
package main
import (
"crypto/rsa"
"crypto/rand"
"errors"
"fmt"
"net/http"
"strings"
"time"
"github.com/gin-gonic/gin"
"github.com/golang-jwt/jwt/v5"
"github.com/google/uuid"
"github.com/redis/go-redis/v9"
)
// ============================================================
// 配置 & 全局变量
// ============================================================
const (
issuer = "https://auth.example.com"
audience = "my-api"
accessTokenTTL = 15 * time.Minute
refreshTokenTTL = 7 * 24 * time.Hour
)
var (
rsaPrivateKey *rsa.PrivateKey
rsaPublicKey *rsa.PublicKey
rdb *redis.Client
)
// ============================================================
// Claims
// ============================================================
type TokenClaims struct {
Roles []string `json:"roles,omitempty"`
TokenType string `json:"type"` // "access" 或 "refresh"
jwt.RegisteredClaims
}
// ============================================================
// Token 签发
// ============================================================
func createAccessToken(sub string, roles []string) (string, error) {
now := time.Now()
claims := TokenClaims{
Roles: roles,
TokenType: "access",
RegisteredClaims: jwt.RegisteredClaims{
Issuer: issuer,
Subject: sub,
Audience: jwt.ClaimStrings{audience},
ExpiresAt: jwt.NewNumericDate(now.Add(accessTokenTTL)),
IssuedAt: jwt.NewNumericDate(now),
ID: uuid.NewString(),
},
}
token := jwt.NewWithClaims(jwt.SigningMethodRS256, claims)
token.Header["kid"] = "key-2024"
return token.SignedString(rsaPrivateKey)
}
func createRefreshToken(sub string) (string, string, error) {
now := time.Now()
jti := uuid.NewString()
claims := TokenClaims{
TokenType: "refresh",
RegisteredClaims: jwt.RegisteredClaims{
Issuer: issuer,
Subject: sub,
Audience: jwt.ClaimStrings{audience},
ExpiresAt: jwt.NewNumericDate(now.Add(refreshTokenTTL)),
IssuedAt: jwt.NewNumericDate(now),
ID: jti,
},
}
token := jwt.NewWithClaims(jwt.SigningMethodRS256, claims)
tokenStr, err := token.SignedString(rsaPrivateKey)
if err != nil {
return "", "", err
}
// 将 refresh_token 元数据存入 Redis
ctx := rdb.Ctx
rdb.Set(ctx(), fmt.Sprintf("refresh:%s", jti), sub, refreshTokenTTL)
return tokenStr, jti, nil
}
// ============================================================
// Token 验证
// ============================================================
func parseToken(tokenStr string) (*TokenClaims, error) {
token, err := jwt.ParseWithClaims(tokenStr, &TokenClaims{},
func(t *jwt.Token) (interface{}, error) {
if _, ok := t.Method.(*jwt.SigningMethodRSA); !ok {
return nil, fmt.Errorf("不支持的签名算法: %v", t.Header["alg"])
}
return rsaPublicKey, nil
},
jwt.WithIssuer(issuer),
jwt.WithAudience(audience),
jwt.WithExpirationRequired(),
)
if err != nil {
return nil, err
}
claims, ok := token.Claims.(*TokenClaims)
if !ok || !token.Valid {
return nil, errors.New("无效的 Token")
}
return claims, nil
}
// ============================================================
// Gin JWT 中间件
// ============================================================
// JWTAuthMiddleware 验证 access_token 并将用户信息注入 Gin 上下文
func JWTAuthMiddleware() gin.HandlerFunc {
return func(c *gin.Context) {
// 1. 提取 Bearer Token
authHeader := c.GetHeader("Authorization")
if authHeader == "" || !strings.HasPrefix(authHeader, "Bearer ") {
c.AbortWithStatusJSON(http.StatusUnauthorized, gin.H{"error": "缺少 Authorization 头"})
return
}
tokenStr := strings.TrimPrefix(authHeader, "Bearer ")
// 2. 解析并验证
claims, err := parseToken(tokenStr)
if err != nil {
c.AbortWithStatusJSON(http.StatusUnauthorized, gin.H{"error": err.Error()})
return
}
// 3. 检查 Token 类型
if claims.TokenType != "access" {
c.AbortWithStatusJSON(http.StatusUnauthorized, gin.H{"error": "需要 access_token"})
return
}
// 4. 检查黑名单
ctx := c.Request.Context()
revoked, _ := rdb.Exists(ctx, fmt.Sprintf("revoked:%s", claims.ID)).Result()
if revoked > 0 {
c.AbortWithStatusJSON(http.StatusUnauthorized, gin.H{"error": "Token 已被撤销"})
return
}
// 5. 注入用户信息
c.Set("user_sub", claims.Subject)
c.Set("user_roles", claims.Roles)
c.Set("token_jti", claims.ID)
c.Set("token_exp", claims.ExpiresAt.Time)
c.Next()
}
}
// ============================================================
// 路由处理
// ============================================================
type loginRequest struct {
Username string `json:"username" binding:"required"`
Password string `json:"password" binding:"required"`
}
type refreshRequest struct {
RefreshToken string `json:"refresh_token" binding:"required"`
}
func loginHandler(c *gin.Context) {
var req loginRequest
if err := c.ShouldBindJSON(&req); err != nil {
c.JSON(http.StatusBadRequest, gin.H{"error": err.Error()})
return
}
// 简化:实际应查数据库验证密码
if req.Username != "walter" || req.Password != "secret123" {
c.JSON(http.StatusUnauthorized, gin.H{"error": "用户名或密码错误"})
return
}
roles := []string{"admin", "editor"}
accessToken, err := createAccessToken(req.Username, roles)
if err != nil {
c.JSON(http.StatusInternalServerError, gin.H{"error": "生成 access_token 失败"})
return
}
refreshToken, _, err := createRefreshToken(req.Username)
if err != nil {
c.JSON(http.StatusInternalServerError, gin.H{"error": "生成 refresh_token 失败"})
return
}
c.JSON(http.StatusOK, gin.H{
"access_token": accessToken,
"refresh_token": refreshToken,
"token_type": "bearer",
"expires_in": int(accessTokenTTL.Seconds()),
})
}
func refreshHandler(c *gin.Context) {
var req refreshRequest
if err := c.ShouldBindJSON(&req); err != nil {
c.JSON(http.StatusBadRequest, gin.H{"error": err.Error()})
return
}
claims, err := parseToken(req.RefreshToken)
if err != nil {
c.JSON(http.StatusUnauthorized, gin.H{"error": "refresh_token 无效: " + err.Error()})
return
}
if claims.TokenType != "refresh" {
c.JSON(http.StatusUnauthorized, gin.H{"error": "需要 refresh_token"})
return
}
// 检查 refresh_token 是否仍在 Redis 中
ctx := c.Request.Context()
key := fmt.Sprintf("refresh:%s", claims.ID)
exists, _ := rdb.Exists(ctx, key).Result()
if exists == 0 {
c.JSON(http.StatusUnauthorized, gin.H{"error": "refresh_token 已失效"})
return
}
// 撤销旧 refresh_token(轮换)
rdb.Del(ctx, key)
// 签发新 Token 对
roles := []string{"admin", "editor"} // 实际应从数据库获取
accessToken, _ := createAccessToken(claims.Subject, roles)
refreshToken, _, _ := createRefreshToken(claims.Subject)
c.JSON(http.StatusOK, gin.H{
"access_token": accessToken,
"refresh_token": refreshToken,
"token_type": "bearer",
"expires_in": int(accessTokenTTL.Seconds()),
})
}
func logoutHandler(c *gin.Context) {
var req refreshRequest
if err := c.ShouldBindJSON(&req); err != nil {
c.JSON(http.StatusBadRequest, gin.H{"error": err.Error()})
return
}
ctx := c.Request.Context()
// 撤销 refresh_token
if rtClaims, err := parseToken(req.RefreshToken); err == nil {
rdb.Del(ctx, fmt.Sprintf("refresh:%s", rtClaims.ID))
}
// 将当前 access_token 加入黑名单
jti, _ := c.Get("token_jti")
exp, _ := c.Get("token_exp")
if jtiStr, ok := jti.(string); ok {
if expTime, ok := exp.(time.Time); ok {
ttl := time.Until(expTime)
if ttl > 0 {
rdb.Set(ctx, fmt.Sprintf("revoked:%s", jtiStr), "1", ttl)
}
}
}
c.JSON(http.StatusOK, gin.H{"message": "已登出"})
}
func protectedHandler(c *gin.Context) {
sub, _ := c.Get("user_sub")
roles, _ := c.Get("user_roles")
c.JSON(http.StatusOK, gin.H{
"message": fmt.Sprintf("Hello, %s!", sub),
"roles": roles,
})
}
// ============================================================
// main
// ============================================================
func main() {
// 初始化 RSA 密钥
var err error
rsaPrivateKey, err = rsa.GenerateKey(rand.Reader, 2048)
if err != nil {
panic(err)
}
rsaPublicKey = &rsaPrivateKey.PublicKey
// 初始化 Redis
rdb = redis.NewClient(&redis.Options{Addr: "localhost:6379"})
// 路由
r := gin.Default()
r.POST("/login", loginHandler)
r.POST("/refresh", refreshHandler)
auth := r.Group("/", JWTAuthMiddleware())
{
auth.GET("/protected", protectedHandler)
auth.POST("/logout", logoutHandler)
}
if err := r.Run(":8080"); err != nil {
panic(err)
}
}
9.8 JWT 最佳实践清单
✅ 使用非对称算法(RS256/ES256)
✅ 设置短过期时间(15分钟-1小时)
✅ 验证所有标准 Claims(iss, aud, exp)
✅ 使用 JWKS 支持密钥轮换
✅ 明确指定允许的算法列表
✅ 包含 jti 用于撤销和防重放
✅ 最小化 Payload(不放敏感数据)
✅ 使用 HTTPS 传输
❌ 不要在 URL 中传递 JWT
❌ 不要在 JWT 中存储密码或密钥
❌ 不要使用 HS256 的公开密钥
❌ 不要忽略 exp 验证
9.9 JWT 安全检查清单
以下是生产环境 JWT 部署的完整检查项:
类别 |
检查项 |
说明 |
优先级 |
|---|---|---|---|
算法 |
使用非对称签名(RS256/ES256/EdDSA) |
避免对称密钥泄露导致伪造 |
🔴 必须 |
算法 |
明确指定 |
防止 |
🔴 必须 |
算法 |
禁止 |
对称密钥无法安全分发给多个服务 |
🔴 必须 |
Claims |
验证 |
拒绝过期 Token |
🔴 必须 |
Claims |
验证 |
确保 Token 来自可信签发方 |
🔴 必须 |
Claims |
验证 |
防止 Token 被其他服务误用 |
🔴 必须 |
Claims |
包含 |
支持撤销和防重放 |
🟡 推荐 |
Claims |
包含 |
防止 Token 提前使用 |
🟢 可选 |
过期策略 |
access_token 有效期 ≤ 15 分钟 |
缩小 Token 泄露的影响窗口 |
🔴 必须 |
过期策略 |
refresh_token 有效期 ≤ 7 天 |
平衡安全性与用户体验 |
🟡 推荐 |
过期策略 |
refresh_token 轮换(Rotation) |
每次刷新签发新 refresh_token,旧的立即失效 |
🟡 推荐 |
过期策略 |
检测 refresh_token 重用 |
若旧 refresh_token 被重用,撤销该用户所有 Token |
🟡 推荐 |
密钥管理 |
使用 JWKS 端点发布公钥 |
支持密钥轮换,验证方自动获取 |
🔴 必须 |
密钥管理 |
定期轮换签名密钥(≤ 90 天) |
降低密钥泄露风险 |
🟡 推荐 |
密钥管理 |
私钥存储在 HSM / KMS / Vault |
防止私钥泄露 |
🔴 必须 |
密钥管理 |
JWKS 端点启用缓存控制头 |
|
🟢 可选 |
传输安全 |
仅通过 HTTPS 传输 JWT |
防止中间人窃取 Token |
🔴 必须 |
传输安全 |
不在 URL 查询参数中传递 JWT |
URL 会被日志、Referer 头泄露 |
🔴 必须 |
传输安全 |
使用 |
标准传输方式 |
🔴 必须 |
存储安全 |
浏览器端存储在 HttpOnly Cookie 或内存 |
避免 XSS 窃取(不要用 localStorage) |
🔴 必须 |
存储安全 |
Cookie 设置 |
防止 CSRF 和非 HTTPS 传输 |
🔴 必须 |
撤销机制 |
实现 Token 黑名单(Redis) |
支持登出和紧急撤销 |
🟡 推荐 |
撤销机制 |
黑名单 TTL 与 Token 剩余有效期一致 |
过期后自动清除,避免存储膨胀 |
🟡 推荐 |
撤销机制 |
支持按用户撤销所有 Token |
密码修改、账户锁定时使用 |
🟡 推荐 |
Payload |
不在 JWT 中存储敏感数据 |
Payload 仅 Base64 编码,非加密 |
🔴 必须 |
Payload |
最小化 Payload 大小 |
减少网络开销,避免超出 Header 大小限制 |
🟡 推荐 |
Payload |
敏感数据使用 JWE(加密) |
需要在 Token 中传递敏感信息时 |
🟢 可选 |
监控 |
记录 Token 验证失败事件 |
检测攻击尝试 |
🟡 推荐 |
监控 |
监控 Token 签发速率 |
检测暴力破解或滥用 |
🟡 推荐 |
监控 |
告警:异常 kid 或算法 |
检测密钥混淆攻击 |
🟡 推荐 |
测试 |
测试过期 Token 被拒绝 |
确保 exp 验证生效 |
🔴 必须 |
测试 |
测试 |
确保算法白名单生效 |
🔴 必须 |
测试 |
测试篡改 Payload 后签名失败 |
确保签名验证生效 |
🔴 必须 |
测试 |
测试错误 audience 被拒绝 |
确保 aud 验证生效 |
🔴 必须 |
9.10 小结
JWT 由 Header.Payload.Signature 三部分组成
推荐使用 RS256 或 ES256 非对称签名算法
注意安全陷阱:alg:none、密钥混淆、无法撤销
JWKS 支持密钥轮换,是生产环境必备
JWT 适合微服务和 API 场景,Session 适合传统 Web 应用
Token 生命周期管理:短期 access_token + 长期 refresh_token + 轮换策略
生产部署前务必逐项核对 安全检查清单