第九章:JWT 深入解析

“JWT 不是银弹,但用对了就是利器。”

        mindmap
  root((JWT))
    结构
      Header
      Payload
      Signature
    签名算法
      HS256
      RS256
      ES256
      EdDSA
    安全陷阱
      alg none
      密钥混淆
      无法撤销
    最佳实践
      短过期
      JWKS 轮换
      Audience 验证
    

9.1 JWT 结构

JWT(JSON Web Token)由三部分组成,用点号分隔:

eyJhbGciOiJSUzI1NiIsInR5cCI6IkpXVCJ9.
eyJzdWIiOiJ1c2VyLTEyMyIsIm5hbWUiOiJXYWx0ZXIiLCJyb2xlcyI6WyJhZG1pbiJdLCJleHAiOjE3MDk1MTA0MDB9.
SflKxwRJSMeKKF2QT4fwpMeJf36POk6yJV_adQssw5c
        flowchart LR
    subgraph JWT["JWT = Header . Payload . Signature"]
        direction LR
        H["🔵 Header<br/>(Base64URL)<br/>算法 + 类型 + kid"]
        P["🟢 Payload<br/>(Base64URL)<br/>Claims 声明"]
        S["🔴 Signature<br/>(Base64URL)<br/>数字签名"]
        H --- |"."| P --- |"."| S
    end

    subgraph Header_Detail["Header 详情"]
        H1["alg: RS256"]
        H2["typ: JWT"]
        H3["kid: key-2024"]
    end

    subgraph Payload_Detail["Payload 详情"]
        P1["iss: 签发者"]
        P2["sub: 主体"]
        P3["aud: 受众"]
        P4["exp: 过期时间"]
        P5["jti: Token ID"]
        P6["自定义 Claims"]
    end

    subgraph Signature_Detail["Signature 生成"]
        S1["RSASHA256(<br/>base64url(header) + '.' +<br/>base64url(payload),<br/>privateKey<br/>)"]
    end

    H --> Header_Detail
    P --> Payload_Detail
    S --> Signature_Detail
    

Payload(Claims)

{
  "iss": "https://auth.example.com",  // 签发者
  "sub": "user-123",                   // 主体
  "aud": "my-api",                     // 受众
  "exp": 1709510400,                   // 过期时间
  "nbf": 1709506800,                   // 生效时间
  "iat": 1709506800,                   // 签发时间
  "jti": "unique-token-id",            // Token ID
  "name": "Walter",                    // 自定义 Claim
  "roles": ["admin", "editor"]         // 自定义 Claim
}

9.2 签名算法选择

算法

类型

密钥

适用场景

推荐

HS256

对称

共享密钥

单体应用

⚠️ 简单场景

RS256

非对称

RSA 密钥对

微服务

✅ 通用

ES256

非对称

ECDSA P-256

高性能

✅ 推荐

EdDSA

非对称

Ed25519

最新

✅ 最佳性能

Python 示例

import jwt
import time
from cryptography.hazmat.primitives.asymmetric import rsa, ec
from cryptography.hazmat.primitives import serialization

# === RS256 签名 ===
private_key = rsa.generate_private_key(public_exponent=65537, key_size=2048)
public_key = private_key.public_key()

token = jwt.encode(
    {"sub": "user-123", "roles": ["admin"], "exp": 1709510400},
    private_key,
    algorithm="RS256",
    headers={"kid": "key-2024"}
)

# 验证(只需要公钥)
decoded = jwt.decode(
    token, public_key,
    algorithms=["RS256"],
    audience="my-api",  # 验证 audience
    issuer="https://auth.example.com"  # 验证 issuer
)

# === ES256 签名(更快、更小) ===
ec_private = ec.generate_private_key(ec.SECP256R1())
ec_public = ec_private.public_key()

ec_token = jwt.encode(
    {
        "sub": "user-123",
        "iss": "https://auth.example.com",
        "aud": "my-api",
        "exp": int(time.time()) + 3600,
        "iat": int(time.time()),
        "jti": "unique-id-001",
    },
    ec_private,
    algorithm="ES256",
    headers={"kid": "ec-key-2024"},
)

ec_decoded = jwt.decode(
    ec_token,
    ec_public,
    algorithms=["ES256"],
    audience="my-api",
    issuer="https://auth.example.com",
)
print(f"ES256 decoded: {ec_decoded}")

Java 示例(jjwt)

// build.gradle
// implementation 'io.jsonwebtoken:jjwt-api:0.12.5'
// runtimeOnly  'io.jsonwebtoken:jjwt-impl:0.12.5'
// runtimeOnly  'io.jsonwebtoken:jjwt-jackson:0.12.5'

import io.jsonwebtoken.*;
import io.jsonwebtoken.security.Keys;

import java.security.KeyPair;
import java.security.KeyPairGenerator;
import java.security.interfaces.RSAPrivateKey;
import java.security.interfaces.RSAPublicKey;
import java.time.Instant;
import java.util.Date;
import java.util.List;
import java.util.UUID;

public class JwtRS256Example {

    public static void main(String[] args) throws Exception {
        // --- 生成 RSA 密钥对 ---
        KeyPairGenerator kpg = KeyPairGenerator.getInstance("RSA");
        kpg.initialize(2048);
        KeyPair keyPair = kpg.generateKeyPair();
        RSAPrivateKey privateKey = (RSAPrivateKey) keyPair.getPrivate();
        RSAPublicKey publicKey = (RSAPublicKey) keyPair.getPublic();

        // --- 签发 JWT ---
        Instant now = Instant.now();
        String token = Jwts.builder()
                .header().keyId("key-2024").and()       // kid
                .issuer("https://auth.example.com")      // iss
                .subject("user-123")                     // sub
                .audience().add("my-api").and()           // aud
                .issuedAt(Date.from(now))                // iat
                .expiration(Date.from(now.plusSeconds(3600))) // exp = 1 小时
                .id(UUID.randomUUID().toString())        // jti
                .claim("roles", List.of("admin", "editor"))
                .signWith(privateKey, Jwts.SIG.RS256)    // RS256 签名
                .compact();

        System.out.println("JWT: " + token);

        // --- 验证 JWT ---
        try {
            Jws<Claims> jws = Jwts.parser()
                    .requireIssuer("https://auth.example.com")
                    .requireAudience("my-api")
                    .verifyWith(publicKey)
                    .build()
                    .parseSignedClaims(token);

            Claims claims = jws.getPayload();
            System.out.println("Subject : " + claims.getSubject());
            System.out.println("Roles   : " + claims.get("roles", List.class));
            System.out.println("Expires : " + claims.getExpiration());
        } catch (ExpiredJwtException e) {
            System.err.println("Token 已过期: " + e.getMessage());
        } catch (JwtException e) {
            System.err.println("Token 验证失败: " + e.getMessage());
        }
    }
}

Go 示例(golang-jwt/jwt/v5)

package main

import (
	"crypto/ecdsa"
	"crypto/elliptic"
	"crypto/rand"
	"crypto/rsa"
	"fmt"
	"log"
	"time"

	"github.com/golang-jwt/jwt/v5"
	"github.com/google/uuid"
)

// CustomClaims 自定义 Claims 结构
type CustomClaims struct {
	Roles []string `json:"roles"`
	jwt.RegisteredClaims
}

func main() {
	// ========== RS256 示例 ==========
	rsaKey, err := rsa.GenerateKey(rand.Reader, 2048)
	if err != nil {
		log.Fatalf("生成 RSA 密钥失败: %v", err)
	}

	now := time.Now()
	rsClaims := CustomClaims{
		Roles: []string{"admin", "editor"},
		RegisteredClaims: jwt.RegisteredClaims{
			Issuer:    "https://auth.example.com",
			Subject:   "user-123",
			Audience:  jwt.ClaimStrings{"my-api"},
			ExpiresAt: jwt.NewNumericDate(now.Add(1 * time.Hour)),
			IssuedAt:  jwt.NewNumericDate(now),
			ID:        uuid.NewString(),
		},
	}

	rsToken := jwt.NewWithClaims(jwt.SigningMethodRS256, rsClaims)
	rsToken.Header["kid"] = "key-2024"

	rsTokenStr, err := rsToken.SignedString(rsaKey)
	if err != nil {
		log.Fatalf("RS256 签名失败: %v", err)
	}
	fmt.Println("RS256 JWT:", rsTokenStr)

	// RS256 验证
	parsed, err := jwt.ParseWithClaims(rsTokenStr, &CustomClaims{},
		func(t *jwt.Token) (interface{}, error) {
			if _, ok := t.Method.(*jwt.SigningMethodRSA); !ok {
				return nil, fmt.Errorf("不支持的签名算法: %v", t.Header["alg"])
			}
			return &rsaKey.PublicKey, nil
		},
		jwt.WithIssuer("https://auth.example.com"),
		jwt.WithAudience("my-api"),
		jwt.WithExpirationRequired(),
	)
	if err != nil {
		log.Fatalf("RS256 验证失败: %v", err)
	}
	if claims, ok := parsed.Claims.(*CustomClaims); ok && parsed.Valid {
		fmt.Printf("Subject: %s, Roles: %v\n", claims.Subject, claims.Roles)
	}

	// ========== ES256 示例 ==========
	ecKey, err := ecdsa.GenerateKey(elliptic.P256(), rand.Reader)
	if err != nil {
		log.Fatalf("生成 ECDSA 密钥失败: %v", err)
	}

	esClaims := CustomClaims{
		Roles: []string{"viewer"},
		RegisteredClaims: jwt.RegisteredClaims{
			Issuer:    "https://auth.example.com",
			Subject:   "user-456",
			Audience:  jwt.ClaimStrings{"my-api"},
			ExpiresAt: jwt.NewNumericDate(now.Add(30 * time.Minute)),
			IssuedAt:  jwt.NewNumericDate(now),
			ID:        uuid.NewString(),
		},
	}

	esToken := jwt.NewWithClaims(jwt.SigningMethodES256, esClaims)
	esToken.Header["kid"] = "ec-key-2024"

	esTokenStr, err := esToken.SignedString(ecKey)
	if err != nil {
		log.Fatalf("ES256 签名失败: %v", err)
	}
	fmt.Println("ES256 JWT:", esTokenStr)

	// ES256 验证
	parsedES, err := jwt.ParseWithClaims(esTokenStr, &CustomClaims{},
		func(t *jwt.Token) (interface{}, error) {
			if _, ok := t.Method.(*jwt.SigningMethodECDSA); !ok {
				return nil, fmt.Errorf("不支持的签名算法: %v", t.Header["alg"])
			}
			return &ecKey.PublicKey, nil
		},
		jwt.WithIssuer("https://auth.example.com"),
		jwt.WithAudience("my-api"),
	)
	if err != nil {
		log.Fatalf("ES256 验证失败: %v", err)
	}
	if claims, ok := parsedES.Claims.(*CustomClaims); ok && parsedES.Valid {
		fmt.Printf("ES256 Subject: %s, Roles: %v\n", claims.Subject, claims.Roles)
	}
}

9.3 JWT 签发与验证流程

        sequenceDiagram
    autonumber
    participant C as 客户端
    participant Auth as 认证服务
    participant API as API 服务
    participant JWKS as JWKS 端点

    C->>Auth: POST /login {username, password}
    Auth->>Auth: 验证凭证
    Auth->>Auth: 生成 JWT(私钥签名)
    Auth-->>C: 200 {access_token, refresh_token, expires_in}

    C->>API: GET /resource<br/>Authorization: Bearer <access_token>
    API->>API: 解码 JWT Header,提取 kid
    API->>JWKS: GET /.well-known/jwks.json(首次/缓存过期)
    JWKS-->>API: {keys: [{kid, kty, n, e, ...}]}
    API->>API: 用 kid 匹配公钥
    API->>API: 验证签名 + exp + iss + aud
    alt 验证通过
        API-->>C: 200 {data}
    else Token 过期或无效
        API-->>C: 401 Unauthorized
    end
    

9.4 JWT 安全陷阱

陷阱 1:alg:none 攻击

# ❌ 攻击者构造无签名的 JWT
malicious_header = {"alg": "none", "typ": "JWT"}
malicious_payload = {"sub": "admin", "roles": ["superadmin"]}

# ✅ 防御:始终指定允许的算法
decoded = jwt.decode(
    token, public_key,
    algorithms=["RS256"],  # 明确指定,拒绝 "none"
)

陷阱 2:密钥混淆攻击

# ❌ 攻击者用 RS256 的公钥作为 HS256 的密钥
# 如果服务端不限制算法,攻击者可以:
# 1. 获取 RS256 公钥(公开的)
# 2. 用公钥作为 HS256 密钥签名
# 3. 服务端用同一个公钥验证 HS256 → 通过!

# ✅ 防御:严格限制算法
decoded = jwt.decode(token, public_key, algorithms=["RS256"])  # 只接受 RS256

陷阱 3:Token 无法撤销

JWT 一旦签发就无法撤销(除非等它过期)。解决方案:

Python 实现

import redis
import time

r = redis.Redis()

def revoke_token(jti: str, exp: int):
    """将 Token 加入黑名单"""
    ttl = exp - int(time.time())
    if ttl > 0:
        r.setex(f"revoked:{jti}", ttl, "1")

def is_token_revoked(jti: str) -> bool:
    """检查 Token 是否已被撤销"""
    return r.exists(f"revoked:{jti}")

def verify_token(token: str):
    payload = jwt.decode(token, public_key, algorithms=["RS256"])
    if is_token_revoked(payload.get("jti", "")):
        raise Exception("Token has been revoked")
    return payload

Java 实现(Redis 黑名单)

import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.stereotype.Service;

import java.time.Duration;
import java.time.Instant;

/**
 * JWT 黑名单服务 —— 使用 Redis 存储已撤销的 Token ID (jti)。
 * TTL 与 Token 剩余有效期一致,过期后自动清除。
 */
@Service
public class JwtBlacklistService {

    private static final String PREFIX = "revoked:";
    private final StringRedisTemplate redis;

    public JwtBlacklistService(StringRedisTemplate redis) {
        this.redis = redis;
    }

    /** 撤销 Token:将 jti 写入 Redis,TTL = Token 剩余有效期 */
    public void revoke(String jti, Instant expiration) {
        Duration ttl = Duration.between(Instant.now(), expiration);
        if (!ttl.isNegative() && !ttl.isZero()) {
            redis.opsForValue().set(PREFIX + jti, "1", ttl);
        }
    }

    /** 检查 Token 是否已被撤销 */
    public boolean isRevoked(String jti) {
        return Boolean.TRUE.equals(redis.hasKey(PREFIX + jti));
    }
}

Go 实现(Redis 黑名单)

package auth

import (
	"context"
	"fmt"
	"time"

	"github.com/redis/go-redis/v9"
)

// JWTBlacklist 使用 Redis 管理已撤销的 JWT
type JWTBlacklist struct {
	rdb *redis.Client
}

func NewJWTBlacklist(rdb *redis.Client) *JWTBlacklist {
	return &JWTBlacklist{rdb: rdb}
}

// Revoke 将 jti 加入黑名单,TTL 与 Token 剩余有效期一致
func (b *JWTBlacklist) Revoke(ctx context.Context, jti string, exp time.Time) error {
	ttl := time.Until(exp)
	if ttl <= 0 {
		return nil // 已过期,无需撤销
	}
	return b.rdb.Set(ctx, fmt.Sprintf("revoked:%s", jti), "1", ttl).Err()
}

// IsRevoked 检查 Token 是否已被撤销
func (b *JWTBlacklist) IsRevoked(ctx context.Context, jti string) (bool, error) {
	n, err := b.rdb.Exists(ctx, fmt.Sprintf("revoked:%s", jti)).Result()
	if err != nil {
		return false, err
	}
	return n > 0, nil
}

9.5 JWKS(JSON Web Key Set)

JWKS 用于发布公钥,支持密钥轮换:

// GET https://auth.example.com/.well-known/jwks.json
{
  "keys": [
    {
      "kty": "RSA",
      "kid": "key-2024",
      "use": "sig",
      "alg": "RS256",
      "n": "0vx7agoebGcQSuuPiLJXZptN9nndrQmbXEps2aiAFbWhM...",
      "e": "AQAB"
    },
    {
      "kty": "RSA",
      "kid": "key-2023",
      "use": "sig",
      "alg": "RS256",
      "n": "...",
      "e": "AQAB"
    }
  ]
}

JWKS 密钥轮换流程

        sequenceDiagram
    autonumber
    participant Admin as 运维 / 自动化
    participant Auth as 认证服务
    participant JWKS as JWKS 端点
    participant API as API 服务(验证方)

    Note over Admin,API: 阶段 1 — 生成新密钥并发布
    Admin->>Auth: 生成新密钥对 (kid=key-2025)
    Auth->>JWKS: 发布新 JWKS(包含 key-2024 + key-2025)
    Note over JWKS: 新旧密钥并存

    Note over Admin,API: 阶段 2 — 切换签发密钥
    Auth->>Auth: 开始使用 key-2025 签发新 JWT
    API->>JWKS: 刷新缓存,获取最新 JWKS
    JWKS-->>API: {keys: [key-2024, key-2025]}
    API->>API: 按 kid 匹配,两把密钥均可验证

    Note over Admin,API: 阶段 3 — 旧 Token 自然过期后移除旧密钥
    Admin->>Auth: 等待旧 Token 全部过期(≥ max TTL)
    Auth->>JWKS: 移除 key-2024,仅保留 key-2025
    API->>JWKS: 刷新缓存
    JWKS-->>API: {keys: [key-2025]}
    

Python JWKS 客户端

from jwt import PyJWKClient

# 自动从 JWKS 端点获取公钥
jwks_client = PyJWKClient("https://auth.example.com/.well-known/jwks.json")

def verify_with_jwks(token: str):
    """使用 JWKS 验证 JWT(支持密钥轮换)"""
    signing_key = jwks_client.get_signing_key_from_jwt(token)
    return jwt.decode(
        token,
        signing_key.key,
        algorithms=["RS256"],
        audience="my-api",
    )

Java JWKS 客户端(带缓存)

import com.auth0.jwk.*;
import com.auth0.jwt.JWT;
import com.auth0.jwt.algorithms.Algorithm;
import com.auth0.jwt.interfaces.DecodedJWT;

import java.net.URL;
import java.security.interfaces.RSAPublicKey;
import java.util.concurrent.TimeUnit;

/**
 * JWKS 公钥获取与缓存。
 * 使用 com.auth0:jwks-rsa:0.22.1 和 com.auth0:java-jwt:4.4.0
 *
 * build.gradle:
 *   implementation 'com.auth0:java-jwt:4.4.0'
 *   implementation 'com.auth0:jwks-rsa:0.22.1'
 */
public class JwksVerifier {

    private final JwkProvider provider;

    public JwksVerifier(String jwksUrl) throws Exception {
        // 缓存 10 个密钥,有效期 24 小时;限速每分钟最多 10 次请求
        this.provider = new JwkProviderBuilder(new URL(jwksUrl))
                .cached(10, 24, TimeUnit.HOURS)
                .rateLimited(10, 1, TimeUnit.MINUTES)
                .build();
    }

    /**
     * 验证 JWT:从 JWKS 获取公钥,校验签名 + 标准 Claims
     */
    public DecodedJWT verify(String token) throws Exception {
        DecodedJWT unverified = JWT.decode(token);
        String kid = unverified.getKeyId();
        if (kid == null || kid.isBlank()) {
            throw new IllegalArgumentException("JWT 缺少 kid");
        }

        Jwk jwk = provider.get(kid);
        RSAPublicKey publicKey = (RSAPublicKey) jwk.getPublicKey();

        return JWT.require(Algorithm.RSA256(publicKey, null))
                .withIssuer("https://auth.example.com")
                .withAudience("my-api")
                .build()
                .verify(token);
    }

    public static void main(String[] args) throws Exception {
        JwksVerifier verifier = new JwksVerifier(
                "https://auth.example.com/.well-known/jwks.json");
        // DecodedJWT jwt = verifier.verify(someToken);
        System.out.println("JwksVerifier 初始化成功");
    }
}

Go JWKS 客户端(带缓存)

package auth

import (
	"context"
	"errors"
	"fmt"
	"sync"
	"time"

	"github.com/golang-jwt/jwt/v5"
	"github.com/lestrrat-go/jwx/v2/jwk"
)

// JWKSClient 从远程 JWKS 端点获取公钥,内置自动刷新缓存
type JWKSClient struct {
	cache       *jwk.Cache
	jwksURL     string
	mu          sync.RWMutex
	lastRefresh time.Time
	minInterval time.Duration // 最小刷新间隔,防止频繁请求
}

// NewJWKSClient 创建 JWKS 客户端,refreshInterval 为自动刷新周期
func NewJWKSClient(ctx context.Context, jwksURL string, refreshInterval time.Duration) (*JWKSClient, error) {
	c := jwk.NewCache(ctx)
	err := c.Register(jwksURL, jwk.WithRefreshInterval(refreshInterval))
	if err != nil {
		return nil, fmt.Errorf("注册 JWKS URL 失败: %w", err)
	}
	// 预热缓存
	_, err = c.Refresh(ctx, jwksURL)
	if err != nil {
		return nil, fmt.Errorf("预热 JWKS 缓存失败: %w", err)
	}
	return &JWKSClient{
		cache:       c,
		jwksURL:     jwksURL,
		minInterval: 5 * time.Minute,
	}, nil
}

// KeyFunc 返回 jwt.ParseWithClaims 所需的 KeyFunc
func (j *JWKSClient) KeyFunc() jwt.Keyfunc {
	return func(t *jwt.Token) (interface{}, error) {
		kid, ok := t.Header["kid"].(string)
		if !ok || kid == "" {
			return nil, errors.New("JWT 缺少 kid")
		}

		set, err := j.cache.Get(context.Background(), j.jwksURL)
		if err != nil {
			return nil, fmt.Errorf("获取 JWKS 失败: %w", err)
		}

		key, found := set.LookupKeyID(kid)
		if !found {
			return nil, fmt.Errorf("kid=%s 未找到", kid)
		}

		var rawKey interface{}
		if err := key.Raw(&rawKey); err != nil {
			return nil, fmt.Errorf("提取原始密钥失败: %w", err)
		}
		return rawKey, nil
	}
}

// Verify 验证 JWT 并返回自定义 Claims
func (j *JWKSClient) Verify(tokenStr string) (*CustomClaims, error) {
	token, err := jwt.ParseWithClaims(tokenStr, &CustomClaims{}, j.KeyFunc(),
		jwt.WithIssuer("https://auth.example.com"),
		jwt.WithAudience("my-api"),
		jwt.WithExpirationRequired(),
	)
	if err != nil {
		return nil, err
	}
	claims, ok := token.Claims.(*CustomClaims)
	if !ok || !token.Valid {
		return nil, errors.New("无效的 Token")
	}
	return claims, nil
}

9.6 JWT vs Session

维度

JWT

Session

存储位置

客户端

服务端

状态

无状态

有状态

扩展性

天然分布式

需要共享存储

撤销

困难(需黑名单)

简单(删除 Session)

大小

较大(含 Claims)

小(Session ID)

适用场景

微服务、API

传统 Web 应用

9.7 JWT 生命周期管理

Token 刷新流程

        sequenceDiagram
    autonumber
    participant C as 客户端
    participant Auth as 认证服务
    participant DB as 数据库 / Redis

    C->>Auth: POST /login {username, password}
    Auth->>Auth: 验证凭证
    Auth->>Auth: 生成 access_token (短期, 15min)
    Auth->>Auth: 生成 refresh_token (长期, 7d)
    Auth->>DB: 存储 refresh_token 元数据 (jti, user_id, exp)
    Auth-->>C: {access_token, refresh_token, expires_in: 900}

    Note over C: access_token 过期...

    C->>Auth: POST /refresh {refresh_token}
    Auth->>Auth: 验证 refresh_token 签名 + exp
    Auth->>DB: 检查 refresh_token 是否已撤销
    alt refresh_token 有效
        Auth->>Auth: 生成新 access_token
        Auth->>Auth: (可选) 轮换 refresh_token
        Auth->>DB: (可选) 撤销旧 refresh_token,存储新的
        Auth-->>C: {access_token, refresh_token, expires_in: 900}
    else refresh_token 无效或已撤销
        Auth-->>C: 401 请重新登录
    end

    Note over C: 用户主动登出

    C->>Auth: POST /logout {refresh_token}
    Auth->>DB: 撤销 refresh_token
    Auth->>DB: (可选) 将 access_token jti 加入黑名单
    Auth-->>C: 200 OK
    

Python 完整实现(FastAPI)

"""
FastAPI JWT 认证完整实现
依赖: pip install fastapi uvicorn python-jose[cryptography] redis passlib[bcrypt]
"""
import time
import uuid
from datetime import datetime, timedelta, timezone
from typing import Optional

import redis
from fastapi import Depends, FastAPI, HTTPException, status
from fastapi.security import HTTPAuthorizationCredentials, HTTPBearer
from jose import JWTError, jwt
from passlib.context import CryptContext
from pydantic import BaseModel

# ============================================================
# 配置
# ============================================================
ACCESS_TOKEN_EXPIRE_MINUTES = 15
REFRESH_TOKEN_EXPIRE_DAYS = 7
ALGORITHM = "RS256"
ISSUER = "https://auth.example.com"
AUDIENCE = "my-api"

# 生产环境应从安全存储加载密钥
from cryptography.hazmat.primitives.asymmetric import rsa
from cryptography.hazmat.primitives import serialization

_private_key = rsa.generate_private_key(public_exponent=65537, key_size=2048)
PRIVATE_KEY = _private_key.private_bytes(
    serialization.Encoding.PEM,
    serialization.PrivateFormat.PKCS8,
    serialization.NoEncryption(),
)
PUBLIC_KEY = _private_key.public_key().public_bytes(
    serialization.Encoding.PEM,
    serialization.SubjectPublicKeyInfo,
)

# ============================================================
# 基础设施
# ============================================================
app = FastAPI(title="JWT Auth Demo")
security = HTTPBearer()
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
r = redis.Redis(host="localhost", port=6379, db=0, decode_responses=True)

# 模拟用户数据库
FAKE_USERS_DB = {
    "walter": {
        "username": "walter",
        "hashed_password": pwd_context.hash("secret123"),
        "roles": ["admin", "editor"],
    }
}


# ============================================================
# 数据模型
# ============================================================
class LoginRequest(BaseModel):
    username: str
    password: str


class TokenResponse(BaseModel):
    access_token: str
    refresh_token: str
    token_type: str = "bearer"
    expires_in: int = ACCESS_TOKEN_EXPIRE_MINUTES * 60


class RefreshRequest(BaseModel):
    refresh_token: str


# ============================================================
# Token 工具函数
# ============================================================
def create_access_token(sub: str, roles: list[str]) -> str:
    """签发短期 access_token"""
    now = datetime.now(timezone.utc)
    payload = {
        "iss": ISSUER,
        "sub": sub,
        "aud": AUDIENCE,
        "exp": now + timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES),
        "iat": now,
        "jti": str(uuid.uuid4()),
        "roles": roles,
        "type": "access",
    }
    return jwt.encode(payload, PRIVATE_KEY, algorithm=ALGORITHM, headers={"kid": "key-2024"})


def create_refresh_token(sub: str) -> str:
    """签发长期 refresh_token"""
    now = datetime.now(timezone.utc)
    jti = str(uuid.uuid4())
    payload = {
        "iss": ISSUER,
        "sub": sub,
        "aud": AUDIENCE,
        "exp": now + timedelta(days=REFRESH_TOKEN_EXPIRE_DAYS),
        "iat": now,
        "jti": jti,
        "type": "refresh",
    }
    token = jwt.encode(payload, PRIVATE_KEY, algorithm=ALGORITHM)
    # 将 refresh_token 元数据存入 Redis(用于撤销检查)
    ttl = REFRESH_TOKEN_EXPIRE_DAYS * 86400
    r.setex(f"refresh:{jti}", ttl, sub)
    return token


def revoke_token(jti: str, exp: int):
    """将 Token 加入黑名单"""
    ttl = exp - int(time.time())
    if ttl > 0:
        r.setex(f"revoked:{jti}", ttl, "1")


def is_token_revoked(jti: str) -> bool:
    """检查 Token 是否已被撤销"""
    return bool(r.exists(f"revoked:{jti}"))


# ============================================================
# JWT 中间件 / 依赖
# ============================================================
def get_current_user(credentials: HTTPAuthorizationCredentials = Depends(security)) -> dict:
    """FastAPI 依赖:验证 access_token 并返回用户信息"""
    token = credentials.credentials
    try:
        payload = jwt.decode(
            token,
            PUBLIC_KEY,
            algorithms=[ALGORITHM],
            audience=AUDIENCE,
            issuer=ISSUER,
        )
    except JWTError as e:
        raise HTTPException(
            status_code=status.HTTP_401_UNAUTHORIZED,
            detail=f"Token 验证失败: {e}",
        )

    # 检查 Token 类型
    if payload.get("type") != "access":
        raise HTTPException(status_code=401, detail="需要 access_token")

    # 检查黑名单
    jti = payload.get("jti", "")
    if is_token_revoked(jti):
        raise HTTPException(status_code=401, detail="Token 已被撤销")

    return payload


# ============================================================
# 路由
# ============================================================
@app.post("/login", response_model=TokenResponse)
def login(req: LoginRequest):
    """用户登录,返回 access_token + refresh_token"""
    user = FAKE_USERS_DB.get(req.username)
    if not user or not pwd_context.verify(req.password, user["hashed_password"]):
        raise HTTPException(status_code=401, detail="用户名或密码错误")

    return TokenResponse(
        access_token=create_access_token(req.username, user["roles"]),
        refresh_token=create_refresh_token(req.username),
    )


@app.post("/refresh", response_model=TokenResponse)
def refresh(req: RefreshRequest):
    """使用 refresh_token 获取新的 access_token(含 refresh_token 轮换)"""
    try:
        payload = jwt.decode(
            req.refresh_token,
            PUBLIC_KEY,
            algorithms=[ALGORITHM],
            audience=AUDIENCE,
            issuer=ISSUER,
        )
    except JWTError as e:
        raise HTTPException(status_code=401, detail=f"refresh_token 无效: {e}")

    if payload.get("type") != "refresh":
        raise HTTPException(status_code=401, detail="需要 refresh_token")

    jti = payload.get("jti", "")
    sub = payload.get("sub", "")

    # 检查 refresh_token 是否已被撤销
    if not r.exists(f"refresh:{jti}"):
        raise HTTPException(status_code=401, detail="refresh_token 已失效,请重新登录")

    # 撤销旧 refresh_token(轮换)
    r.delete(f"refresh:{jti}")

    # 获取用户角色
    user = FAKE_USERS_DB.get(sub, {})
    roles = user.get("roles", [])

    return TokenResponse(
        access_token=create_access_token(sub, roles),
        refresh_token=create_refresh_token(sub),
    )


@app.post("/logout")
def logout(
    req: RefreshRequest,
    current_user: dict = Depends(get_current_user),
):
    """登出:撤销 refresh_token 和当前 access_token"""
    # 撤销 refresh_token
    try:
        rt_payload = jwt.decode(
            req.refresh_token, PUBLIC_KEY,
            algorithms=[ALGORITHM], audience=AUDIENCE, issuer=ISSUER,
        )
        r.delete(f"refresh:{rt_payload.get('jti', '')}")
    except JWTError:
        pass  # refresh_token 无效也继续

    # 将当前 access_token 加入黑名单
    revoke_token(current_user["jti"], current_user["exp"])

    return {"message": "已登出"}


@app.get("/protected")
def protected_resource(user: dict = Depends(get_current_user)):
    """受保护的资源端点"""
    return {
        "message": f"Hello, {user['sub']}!",
        "roles": user.get("roles", []),
    }

Java 完整实现(Spring Boot JWT Filter)

// ============================================================
// JwtProperties.java — 配置属性
// ============================================================
package com.example.auth.config;

import org.springframework.boot.context.properties.ConfigurationProperties;

@ConfigurationProperties(prefix = "jwt")
public record JwtProperties(
        String publicKeyPath,   // RSA 公钥 PEM 路径
        String privateKeyPath,  // RSA 私钥 PEM 路径
        String issuer,          // e.g. "https://auth.example.com"
        String audience,        // e.g. "my-api"
        long accessTtlSeconds,  // access_token 有效期(秒)
        long refreshTtlSeconds  // refresh_token 有效期(秒)
) {}
// ============================================================
// JwtService.java — JWT 签发与验证核心服务
// ============================================================
package com.example.auth.service;

import io.jsonwebtoken.*;
import io.jsonwebtoken.security.SignatureException;
import org.springframework.stereotype.Service;

import java.security.interfaces.RSAPrivateKey;
import java.security.interfaces.RSAPublicKey;
import java.time.Instant;
import java.util.*;

@Service
public class JwtService {

    private final RSAPrivateKey privateKey;
    private final RSAPublicKey publicKey;
    private final String issuer;
    private final String audience;
    private final long accessTtl;
    private final long refreshTtl;

    public JwtService(RSAPrivateKey privateKey, RSAPublicKey publicKey,
                      String issuer, String audience,
                      long accessTtl, long refreshTtl) {
        this.privateKey = privateKey;
        this.publicKey = publicKey;
        this.issuer = issuer;
        this.audience = audience;
        this.accessTtl = accessTtl;
        this.refreshTtl = refreshTtl;
    }

    /** 签发 access_token */
    public String createAccessToken(String subject, List<String> roles) {
        Instant now = Instant.now();
        return Jwts.builder()
                .header().keyId("key-2024").and()
                .issuer(issuer)
                .subject(subject)
                .audience().add(audience).and()
                .issuedAt(Date.from(now))
                .expiration(Date.from(now.plusSeconds(accessTtl)))
                .id(UUID.randomUUID().toString())
                .claim("roles", roles)
                .claim("type", "access")
                .signWith(privateKey, Jwts.SIG.RS256)
                .compact();
    }

    /** 签发 refresh_token */
    public String createRefreshToken(String subject) {
        Instant now = Instant.now();
        return Jwts.builder()
                .issuer(issuer)
                .subject(subject)
                .audience().add(audience).and()
                .issuedAt(Date.from(now))
                .expiration(Date.from(now.plusSeconds(refreshTtl)))
                .id(UUID.randomUUID().toString())
                .claim("type", "refresh")
                .signWith(privateKey, Jwts.SIG.RS256)
                .compact();
    }

    /** 验证并解析 JWT,返回 Claims;失败抛出异常 */
    public Claims verify(String token) {
        try {
            return Jwts.parser()
                    .requireIssuer(issuer)
                    .requireAudience(audience)
                    .verifyWith(publicKey)
                    .build()
                    .parseSignedClaims(token)
                    .getPayload();
        } catch (ExpiredJwtException e) {
            throw new RuntimeException("Token 已过期", e);
        } catch (SignatureException e) {
            throw new RuntimeException("签名验证失败", e);
        } catch (JwtException e) {
            throw new RuntimeException("Token 无效: " + e.getMessage(), e);
        }
    }
}
// ============================================================
// JwtAuthFilter.java — Spring Boot OncePerRequestFilter
// ============================================================
package com.example.auth.filter;

import com.example.auth.service.JwtBlacklistService;
import com.example.auth.service.JwtService;
import io.jsonwebtoken.Claims;
import jakarta.servlet.FilterChain;
import jakarta.servlet.ServletException;
import jakarta.servlet.http.HttpServletRequest;
import jakarta.servlet.http.HttpServletResponse;
import org.springframework.http.HttpHeaders;
import org.springframework.security.authentication.UsernamePasswordAuthenticationToken;
import org.springframework.security.core.authority.SimpleGrantedAuthority;
import org.springframework.security.core.context.SecurityContextHolder;
import org.springframework.stereotype.Component;
import org.springframework.web.filter.OncePerRequestFilter;

import java.io.IOException;
import java.util.List;

/**
 * JWT 认证过滤器:
 * 1. 从 Authorization: Bearer <token> 提取 JWT
 * 2. 验证签名 + Claims
 * 3. 检查黑名单
 * 4. 设置 Spring Security 上下文
 */
@Component
public class JwtAuthFilter extends OncePerRequestFilter {

    private final JwtService jwtService;
    private final JwtBlacklistService blacklist;

    public JwtAuthFilter(JwtService jwtService, JwtBlacklistService blacklist) {
        this.jwtService = jwtService;
        this.blacklist = blacklist;
    }

    @Override
    protected void doFilterInternal(HttpServletRequest request,
                                    HttpServletResponse response,
                                    FilterChain chain)
            throws ServletException, IOException {

        String header = request.getHeader(HttpHeaders.AUTHORIZATION);
        if (header == null || !header.startsWith("Bearer ")) {
            chain.doFilter(request, response);
            return;
        }

        String token = header.substring(7);
        try {
            Claims claims = jwtService.verify(token);

            // 仅接受 access_token
            if (!"access".equals(claims.get("type", String.class))) {
                response.sendError(HttpServletResponse.SC_UNAUTHORIZED, "需要 access_token");
                return;
            }

            // 检查黑名单
            if (blacklist.isRevoked(claims.getId())) {
                response.sendError(HttpServletResponse.SC_UNAUTHORIZED, "Token 已被撤销");
                return;
            }

            // 提取角色
            @SuppressWarnings("unchecked")
            List<String> roles = claims.get("roles", List.class);
            var authorities = (roles != null)
                    ? roles.stream().map(r -> new SimpleGrantedAuthority("ROLE_" + r)).toList()
                    : List.<SimpleGrantedAuthority>of();

            var auth = new UsernamePasswordAuthenticationToken(
                    claims.getSubject(), null, authorities);
            SecurityContextHolder.getContext().setAuthentication(auth);

        } catch (Exception e) {
            response.sendError(HttpServletResponse.SC_UNAUTHORIZED, e.getMessage());
            return;
        }

        chain.doFilter(request, response);
    }
}

Go 完整实现(Gin JWT 中间件 + Token 刷新)

package main

import (
	"crypto/rsa"
	"crypto/rand"
	"errors"
	"fmt"
	"net/http"
	"strings"
	"time"

	"github.com/gin-gonic/gin"
	"github.com/golang-jwt/jwt/v5"
	"github.com/google/uuid"
	"github.com/redis/go-redis/v9"
)

// ============================================================
// 配置 & 全局变量
// ============================================================

const (
	issuer          = "https://auth.example.com"
	audience        = "my-api"
	accessTokenTTL  = 15 * time.Minute
	refreshTokenTTL = 7 * 24 * time.Hour
)

var (
	rsaPrivateKey *rsa.PrivateKey
	rsaPublicKey  *rsa.PublicKey
	rdb           *redis.Client
)

// ============================================================
// Claims
// ============================================================

type TokenClaims struct {
	Roles     []string `json:"roles,omitempty"`
	TokenType string   `json:"type"` // "access" 或 "refresh"
	jwt.RegisteredClaims
}

// ============================================================
// Token 签发
// ============================================================

func createAccessToken(sub string, roles []string) (string, error) {
	now := time.Now()
	claims := TokenClaims{
		Roles:     roles,
		TokenType: "access",
		RegisteredClaims: jwt.RegisteredClaims{
			Issuer:    issuer,
			Subject:   sub,
			Audience:  jwt.ClaimStrings{audience},
			ExpiresAt: jwt.NewNumericDate(now.Add(accessTokenTTL)),
			IssuedAt:  jwt.NewNumericDate(now),
			ID:        uuid.NewString(),
		},
	}
	token := jwt.NewWithClaims(jwt.SigningMethodRS256, claims)
	token.Header["kid"] = "key-2024"
	return token.SignedString(rsaPrivateKey)
}

func createRefreshToken(sub string) (string, string, error) {
	now := time.Now()
	jti := uuid.NewString()
	claims := TokenClaims{
		TokenType: "refresh",
		RegisteredClaims: jwt.RegisteredClaims{
			Issuer:    issuer,
			Subject:   sub,
			Audience:  jwt.ClaimStrings{audience},
			ExpiresAt: jwt.NewNumericDate(now.Add(refreshTokenTTL)),
			IssuedAt:  jwt.NewNumericDate(now),
			ID:        jti,
		},
	}
	token := jwt.NewWithClaims(jwt.SigningMethodRS256, claims)
	tokenStr, err := token.SignedString(rsaPrivateKey)
	if err != nil {
		return "", "", err
	}
	// 将 refresh_token 元数据存入 Redis
	ctx := rdb.Ctx
	rdb.Set(ctx(), fmt.Sprintf("refresh:%s", jti), sub, refreshTokenTTL)
	return tokenStr, jti, nil
}

// ============================================================
// Token 验证
// ============================================================

func parseToken(tokenStr string) (*TokenClaims, error) {
	token, err := jwt.ParseWithClaims(tokenStr, &TokenClaims{},
		func(t *jwt.Token) (interface{}, error) {
			if _, ok := t.Method.(*jwt.SigningMethodRSA); !ok {
				return nil, fmt.Errorf("不支持的签名算法: %v", t.Header["alg"])
			}
			return rsaPublicKey, nil
		},
		jwt.WithIssuer(issuer),
		jwt.WithAudience(audience),
		jwt.WithExpirationRequired(),
	)
	if err != nil {
		return nil, err
	}
	claims, ok := token.Claims.(*TokenClaims)
	if !ok || !token.Valid {
		return nil, errors.New("无效的 Token")
	}
	return claims, nil
}

// ============================================================
// Gin JWT 中间件
// ============================================================

// JWTAuthMiddleware 验证 access_token 并将用户信息注入 Gin 上下文
func JWTAuthMiddleware() gin.HandlerFunc {
	return func(c *gin.Context) {
		// 1. 提取 Bearer Token
		authHeader := c.GetHeader("Authorization")
		if authHeader == "" || !strings.HasPrefix(authHeader, "Bearer ") {
			c.AbortWithStatusJSON(http.StatusUnauthorized, gin.H{"error": "缺少 Authorization 头"})
			return
		}
		tokenStr := strings.TrimPrefix(authHeader, "Bearer ")

		// 2. 解析并验证
		claims, err := parseToken(tokenStr)
		if err != nil {
			c.AbortWithStatusJSON(http.StatusUnauthorized, gin.H{"error": err.Error()})
			return
		}

		// 3. 检查 Token 类型
		if claims.TokenType != "access" {
			c.AbortWithStatusJSON(http.StatusUnauthorized, gin.H{"error": "需要 access_token"})
			return
		}

		// 4. 检查黑名单
		ctx := c.Request.Context()
		revoked, _ := rdb.Exists(ctx, fmt.Sprintf("revoked:%s", claims.ID)).Result()
		if revoked > 0 {
			c.AbortWithStatusJSON(http.StatusUnauthorized, gin.H{"error": "Token 已被撤销"})
			return
		}

		// 5. 注入用户信息
		c.Set("user_sub", claims.Subject)
		c.Set("user_roles", claims.Roles)
		c.Set("token_jti", claims.ID)
		c.Set("token_exp", claims.ExpiresAt.Time)
		c.Next()
	}
}

// ============================================================
// 路由处理
// ============================================================

type loginRequest struct {
	Username string `json:"username" binding:"required"`
	Password string `json:"password" binding:"required"`
}

type refreshRequest struct {
	RefreshToken string `json:"refresh_token" binding:"required"`
}

func loginHandler(c *gin.Context) {
	var req loginRequest
	if err := c.ShouldBindJSON(&req); err != nil {
		c.JSON(http.StatusBadRequest, gin.H{"error": err.Error()})
		return
	}

	// 简化:实际应查数据库验证密码
	if req.Username != "walter" || req.Password != "secret123" {
		c.JSON(http.StatusUnauthorized, gin.H{"error": "用户名或密码错误"})
		return
	}

	roles := []string{"admin", "editor"}
	accessToken, err := createAccessToken(req.Username, roles)
	if err != nil {
		c.JSON(http.StatusInternalServerError, gin.H{"error": "生成 access_token 失败"})
		return
	}
	refreshToken, _, err := createRefreshToken(req.Username)
	if err != nil {
		c.JSON(http.StatusInternalServerError, gin.H{"error": "生成 refresh_token 失败"})
		return
	}

	c.JSON(http.StatusOK, gin.H{
		"access_token":  accessToken,
		"refresh_token": refreshToken,
		"token_type":    "bearer",
		"expires_in":    int(accessTokenTTL.Seconds()),
	})
}

func refreshHandler(c *gin.Context) {
	var req refreshRequest
	if err := c.ShouldBindJSON(&req); err != nil {
		c.JSON(http.StatusBadRequest, gin.H{"error": err.Error()})
		return
	}

	claims, err := parseToken(req.RefreshToken)
	if err != nil {
		c.JSON(http.StatusUnauthorized, gin.H{"error": "refresh_token 无效: " + err.Error()})
		return
	}
	if claims.TokenType != "refresh" {
		c.JSON(http.StatusUnauthorized, gin.H{"error": "需要 refresh_token"})
		return
	}

	// 检查 refresh_token 是否仍在 Redis 中
	ctx := c.Request.Context()
	key := fmt.Sprintf("refresh:%s", claims.ID)
	exists, _ := rdb.Exists(ctx, key).Result()
	if exists == 0 {
		c.JSON(http.StatusUnauthorized, gin.H{"error": "refresh_token 已失效"})
		return
	}

	// 撤销旧 refresh_token(轮换)
	rdb.Del(ctx, key)

	// 签发新 Token 对
	roles := []string{"admin", "editor"} // 实际应从数据库获取
	accessToken, _ := createAccessToken(claims.Subject, roles)
	refreshToken, _, _ := createRefreshToken(claims.Subject)

	c.JSON(http.StatusOK, gin.H{
		"access_token":  accessToken,
		"refresh_token": refreshToken,
		"token_type":    "bearer",
		"expires_in":    int(accessTokenTTL.Seconds()),
	})
}

func logoutHandler(c *gin.Context) {
	var req refreshRequest
	if err := c.ShouldBindJSON(&req); err != nil {
		c.JSON(http.StatusBadRequest, gin.H{"error": err.Error()})
		return
	}

	ctx := c.Request.Context()

	// 撤销 refresh_token
	if rtClaims, err := parseToken(req.RefreshToken); err == nil {
		rdb.Del(ctx, fmt.Sprintf("refresh:%s", rtClaims.ID))
	}

	// 将当前 access_token 加入黑名单
	jti, _ := c.Get("token_jti")
	exp, _ := c.Get("token_exp")
	if jtiStr, ok := jti.(string); ok {
		if expTime, ok := exp.(time.Time); ok {
			ttl := time.Until(expTime)
			if ttl > 0 {
				rdb.Set(ctx, fmt.Sprintf("revoked:%s", jtiStr), "1", ttl)
			}
		}
	}

	c.JSON(http.StatusOK, gin.H{"message": "已登出"})
}

func protectedHandler(c *gin.Context) {
	sub, _ := c.Get("user_sub")
	roles, _ := c.Get("user_roles")
	c.JSON(http.StatusOK, gin.H{
		"message": fmt.Sprintf("Hello, %s!", sub),
		"roles":   roles,
	})
}

// ============================================================
// main
// ============================================================

func main() {
	// 初始化 RSA 密钥
	var err error
	rsaPrivateKey, err = rsa.GenerateKey(rand.Reader, 2048)
	if err != nil {
		panic(err)
	}
	rsaPublicKey = &rsaPrivateKey.PublicKey

	// 初始化 Redis
	rdb = redis.NewClient(&redis.Options{Addr: "localhost:6379"})

	// 路由
	r := gin.Default()

	r.POST("/login", loginHandler)
	r.POST("/refresh", refreshHandler)

	auth := r.Group("/", JWTAuthMiddleware())
	{
		auth.GET("/protected", protectedHandler)
		auth.POST("/logout", logoutHandler)
	}

	if err := r.Run(":8080"); err != nil {
		panic(err)
	}
}

9.8 JWT 最佳实践清单

✅ 使用非对称算法(RS256/ES256)
✅ 设置短过期时间(15分钟-1小时)
✅ 验证所有标准 Claims(iss, aud, exp)
✅ 使用 JWKS 支持密钥轮换
✅ 明确指定允许的算法列表
✅ 包含 jti 用于撤销和防重放
✅ 最小化 Payload(不放敏感数据)
✅ 使用 HTTPS 传输
❌ 不要在 URL 中传递 JWT
❌ 不要在 JWT 中存储密码或密钥
❌ 不要使用 HS256 的公开密钥
❌ 不要忽略 exp 验证

9.9 JWT 安全检查清单

以下是生产环境 JWT 部署的完整检查项:

类别

检查项

说明

优先级

算法

使用非对称签名(RS256/ES256/EdDSA)

避免对称密钥泄露导致伪造

🔴 必须

算法

明确指定 algorithms 白名单

防止 alg:none 和密钥混淆攻击

🔴 必须

算法

禁止 HS256 用于多方验证场景

对称密钥无法安全分发给多个服务

🔴 必须

Claims

验证 exp(过期时间)

拒绝过期 Token

🔴 必须

Claims

验证 iss(签发者)

确保 Token 来自可信签发方

🔴 必须

Claims

验证 aud(受众)

防止 Token 被其他服务误用

🔴 必须

Claims

包含 jti(Token ID)

支持撤销和防重放

🟡 推荐

Claims

包含 nbf(生效时间)

防止 Token 提前使用

🟢 可选

过期策略

access_token 有效期 ≤ 15 分钟

缩小 Token 泄露的影响窗口

🔴 必须

过期策略

refresh_token 有效期 ≤ 7 天

平衡安全性与用户体验

🟡 推荐

过期策略

refresh_token 轮换(Rotation)

每次刷新签发新 refresh_token,旧的立即失效

🟡 推荐

过期策略

检测 refresh_token 重用

若旧 refresh_token 被重用,撤销该用户所有 Token

🟡 推荐

密钥管理

使用 JWKS 端点发布公钥

支持密钥轮换,验证方自动获取

🔴 必须

密钥管理

定期轮换签名密钥(≤ 90 天)

降低密钥泄露风险

🟡 推荐

密钥管理

私钥存储在 HSM / KMS / Vault

防止私钥泄露

🔴 必须

密钥管理

JWKS 端点启用缓存控制头

Cache-Control: max-age=86400 减少请求

🟢 可选

传输安全

仅通过 HTTPS 传输 JWT

防止中间人窃取 Token

🔴 必须

传输安全

不在 URL 查询参数中传递 JWT

URL 会被日志、Referer 头泄露

🔴 必须

传输安全

使用 Authorization: Bearer

标准传输方式

🔴 必须

存储安全

浏览器端存储在 HttpOnly Cookie 或内存

避免 XSS 窃取(不要用 localStorage)

🔴 必须

存储安全

Cookie 设置 Secure; SameSite=Strict

防止 CSRF 和非 HTTPS 传输

🔴 必须

撤销机制

实现 Token 黑名单(Redis)

支持登出和紧急撤销

🟡 推荐

撤销机制

黑名单 TTL 与 Token 剩余有效期一致

过期后自动清除,避免存储膨胀

🟡 推荐

撤销机制

支持按用户撤销所有 Token

密码修改、账户锁定时使用

🟡 推荐

Payload

不在 JWT 中存储敏感数据

Payload 仅 Base64 编码,非加密

🔴 必须

Payload

最小化 Payload 大小

减少网络开销,避免超出 Header 大小限制

🟡 推荐

Payload

敏感数据使用 JWE(加密)

需要在 Token 中传递敏感信息时

🟢 可选

监控

记录 Token 验证失败事件

检测攻击尝试

🟡 推荐

监控

监控 Token 签发速率

检测暴力破解或滥用

🟡 推荐

监控

告警:异常 kid 或算法

检测密钥混淆攻击

🟡 推荐

测试

测试过期 Token 被拒绝

确保 exp 验证生效

🔴 必须

测试

测试 alg:none Token 被拒绝

确保算法白名单生效

🔴 必须

测试

测试篡改 Payload 后签名失败

确保签名验证生效

🔴 必须

测试

测试错误 audience 被拒绝

确保 aud 验证生效

🔴 必须

9.10 小结

  • JWT 由 Header.Payload.Signature 三部分组成

  • 推荐使用 RS256 或 ES256 非对称签名算法

  • 注意安全陷阱:alg:none、密钥混淆、无法撤销

  • JWKS 支持密钥轮换,是生产环境必备

  • JWT 适合微服务和 API 场景,Session 适合传统 Web 应用

  • Token 生命周期管理:短期 access_token + 长期 refresh_token + 轮换策略

  • 生产部署前务必逐项核对 安全检查清单