第十七章:授权架构模式

“好的授权架构应该像空气一样:无处不在,但用户感觉不到它的存在。”

        mindmap
  root((授权架构))
    演进
      硬编码
      配置文件
      策略引擎
      外部化授权
    XACML 架构
      PEP
      PDP
      PIP
      PAP
    微服务模式
      Gateway
      Service Mesh
      应用层
      混合
    多租户
      租户隔离
      权限继承
      跨租户
    

17.1 授权架构的演进

Level 0: 硬编码
if user.role == "admin":
    allow()
→ 问题:修改权限需要改代码、重新部署

Level 1: 配置文件
permissions = load_config("permissions.yaml")
→ 问题:缺乏测试、审计,配置散落各处

Level 2: 策略引擎
result = opa.evaluate(input)
→ 改进:策略与代码分离,可测试

Level 3: 外部化授权(Authorization as a Service)
result = authz_service.check(user, action, resource)
→ 最佳:统一授权服务,所有应用共享

17.2 XACML 授权架构

        flowchart TB
    User["👤 用户"] --> PEP
    PEP["PEP<br/>策略执行点<br/>(API Gateway / 中间件)"]
    PEP -->|授权请求| PDP
    PEP --> App["🖥️ 应用资源"]

    PDP["PDP<br/>策略决策点<br/>(OPA / OpenFGA / Cedar)"]
    PAP["PAP<br/>策略管理点<br/>(管理界面 / Git)"] -->|策略更新| PDP
    PDP -->|查询属性| PIP

    PIP["PIP<br/>策略信息点<br/>(用户属性 / 资源属性)"]

    style PEP fill:#bbdefb,stroke:#1565c0
    style PDP fill:#c8e6c9,stroke:#2e7d32
    style PIP fill:#fff9c4,stroke:#f9a825
    style PAP fill:#f8bbd0,stroke:#c62828
    

组件

职责

实现

PEP

拦截请求,执行决策

API Gateway、中间件、Sidecar

PDP

评估策略,做出决策

OPA、OpenFGA、Cedar

PIP

提供决策所需的属性数据

用户服务、LDAP、数据库

PAP

管理策略的创建和更新

管理界面、Git

17.3 微服务授权模式

模式 1:Gateway 集中授权

        flowchart LR
    Client["🖥️ 客户端"] --> GW

    subgraph GW["API Gateway"]
        direction TB
        Auth["认证"]
        Authz["授权 ✅"]
        RL["限流"]
        Auth --> Authz --> RL
    end

    GW --> MS["微服务<br/>信任 Gateway<br/>不做授权"]

    style GW fill:#e3f2fd,stroke:#1565c0
    style MS fill:#f5f5f5,stroke:#616161
    

优点:简单、集中管理 缺点:粗粒度、Gateway 成为瓶颈

模式 2:Service Mesh 授权

        flowchart LR
    subgraph Pod["Kubernetes Pod"]
        direction LR
        MS2["微服务<br/>业务逻辑"]
        subgraph Sidecar["Envoy Sidecar"]
            mTLS["mTLS"]
            AP["AuthzPolicy"]
        end
        Sidecar <--> MS2
    end

    External["外部流量"] --> Sidecar

    style Pod fill:#e8f5e9,stroke:#2e7d32
    style Sidecar fill:#fff3e0,stroke:#e65100
    

模式 3:应用层授权

# 应用内授权 — 最细粒度
@app.put("/api/documents/{doc_id}")
async def update_document(
    doc_id: str,
    user: User = Depends(get_current_user),
):
    # 调用外部授权服务
    allowed = await openfga_client.check(
        user=f"user:{user.id}",
        relation="can_edit",
        object=f"document:{doc_id}",
    )
    if not allowed:
        raise HTTPException(403, "Cannot edit this document")
    
    # 业务逻辑...

模式 4:混合模式(推荐)

        flowchart LR
    Client2["🖥️ 客户端"] --> GW2

    subgraph GW2["API Gateway — 粗粒度"]
        direction TB
        A1["认证"]
        A2["角色检查"]
        A3["速率限制"]
        A1 --> A2 --> A3
    end

    GW2 --> MS3

    subgraph MS3["微服务 — 细粒度"]
        direction TB
        B1["资源级权限"]
        B2["OpenFGA / OPA"]
        B1 --> B2
    end

    style GW2 fill:#e3f2fd,stroke:#1565c0
    style MS3 fill:#fce4ec,stroke:#c62828
    

17.4 集中式 vs 分布式 vs 混合授权架构

集中式授权架构

所有授权决策由一个中心服务统一处理。

        flowchart TB
    S1["服务 A"] -->|授权请求| CAS["🏛️ 集中授权服务<br/>(OpenFGA / OPA)"]
    S2["服务 B"] -->|授权请求| CAS
    S3["服务 C"] -->|授权请求| CAS
    CAS -->|查询| PS["策略存储"]
    CAS -->|查询| DS["数据源<br/>(用户/资源属性)"]

    style CAS fill:#c8e6c9,stroke:#2e7d32,stroke-width:2px
    style PS fill:#fff9c4,stroke:#f9a825
    style DS fill:#e1bee7,stroke:#7b1fa2
    

优点

缺点

策略统一管理,一致性强

单点故障风险

审计日志集中

网络延迟(每次请求都需远程调用)

策略变更即时生效

高并发下可能成为瓶颈

分布式授权架构

每个服务内嵌授权引擎,策略通过同步机制分发。

        flowchart TB
    PAP2["PAP<br/>策略管理"] -->|策略分发| Bundle["策略 Bundle<br/>(Git / S3)"]
    Bundle -->|拉取| OPA1["OPA Sidecar A"]
    Bundle -->|拉取| OPA2["OPA Sidecar B"]
    Bundle -->|拉取| OPA3["OPA Sidecar C"]

    S4["服务 A"] -->|本地调用| OPA1
    S5["服务 B"] -->|本地调用| OPA2
    S6["服务 C"] -->|本地调用| OPA3

    style PAP2 fill:#f8bbd0,stroke:#c62828
    style Bundle fill:#fff9c4,stroke:#f9a825
    style OPA1 fill:#c8e6c9,stroke:#2e7d32
    style OPA2 fill:#c8e6c9,stroke:#2e7d32
    style OPA3 fill:#c8e6c9,stroke:#2e7d32
    

优点

缺点

无网络延迟(本地决策)

策略同步有延迟(最终一致)

无单点故障

策略版本管理复杂

高性能、高可用

审计日志分散

混合授权架构

Gateway 做粗粒度授权,服务内做细粒度授权,兼顾性能与精度。

        flowchart TB
    Client3["客户端"] --> GW3

    subgraph GW3["API Gateway — 粗粒度"]
        G1["JWT 验证"]
        G2["角色/Scope 检查"]
        G1 --> G2
    end

    GW3 --> S7["服务 A"]
    GW3 --> S8["服务 B"]

    S7 -->|细粒度| OPA4["OPA Sidecar"]
    S8 -->|关系查询| FGA["OpenFGA<br/>(集中式 ReBAC)"]

    subgraph Audit["审计层"]
        AL["集中审计日志"]
    end

    S7 --> AL
    S8 --> AL

    style GW3 fill:#e3f2fd,stroke:#1565c0
    style OPA4 fill:#c8e6c9,stroke:#2e7d32
    style FGA fill:#c8e6c9,stroke:#2e7d32
    style Audit fill:#fff3e0,stroke:#e65100
    

授权决策缓存策略

        sequenceDiagram
    participant App as 应用服务
    participant Cache as 本地缓存<br/>(Caffeine/go-cache)
    participant Redis as Redis 分布式缓存
    participant PDP as PDP<br/>(OpenFGA/OPA)

    App->>Cache: 1. 查询本地缓存
    alt 本地缓存命中
        Cache-->>App: 2a. 返回缓存结果 ⚡
    else 本地缓存未命中
        App->>Redis: 2b. 查询 Redis
        alt Redis 命中
            Redis-->>App: 3a. 返回缓存结果
            App->>Cache: 3b. 写入本地缓存
        else Redis 未命中
            App->>PDP: 3c. 调用 PDP 决策
            PDP-->>App: 4. 返回授权结果
            App->>Redis: 5a. 写入 Redis (TTL 60s)
            App->>Cache: 5b. 写入本地缓存 (TTL 10s)
        end
    end
    

17.5 授权架构选型决策树

        flowchart TB
    Start["开始选型"] --> Q1{"服务数量?"}
    Q1 -->|"单体 / < 5 个"| Q2{"需要细粒度<br/>资源级权限?"}
    Q1 -->|"> 5 个微服务"| Q3{"延迟要求?"}

    Q2 -->|否| R1["✅ 应用内 RBAC<br/>Spring Security / Casbin"]
    Q2 -->|是| R2["✅ 集中式 + OpenFGA<br/>应用层 ReBAC"]

    Q3 -->|"< 5ms"| R3["✅ 分布式<br/>OPA Sidecar"]
    Q3 -->|"< 50ms 可接受"| Q4{"需要关系型<br/>权限模型?"}

    Q4 -->|是| R4["✅ 集中式 OpenFGA<br/>+ Redis 缓存"]
    Q4 -->|否| R5["✅ 混合模式<br/>Gateway + OPA Sidecar"]

    style Start fill:#e3f2fd,stroke:#1565c0
    style R1 fill:#c8e6c9,stroke:#2e7d32
    style R2 fill:#c8e6c9,stroke:#2e7d32
    style R3 fill:#c8e6c9,stroke:#2e7d32
    style R4 fill:#c8e6c9,stroke:#2e7d32
    style R5 fill:#c8e6c9,stroke:#2e7d32
    

17.6 多租户授权设计

多租户授权模型(OpenFGA):

type user

type tenant
  relations
    define admin: [user]
    define member: [user] or admin

type project
  relations
    define tenant: [tenant]
    define admin: [user] or admin from tenant
    define member: [user] or admin
    define can_manage: admin
    define can_access: member or can_manage

type resource
  relations
    define project: [project]
    define owner: [user]
    define editor: [user] or owner or can_manage from project
    define viewer: [user] or editor or can_access from project

租户隔离策略

策略

描述

实现

数据隔离

每个租户的数据完全隔离

数据库 schema/表前缀

权限隔离

租户 A 的用户无法访问租户 B

OpenFGA 关系检查

管理隔离

租户管理员只能管理自己的租户

租户级 admin 角色

跨租户访问

特殊场景下的跨租户协作

显式的跨租户关系元组

17.7 性能优化

缓存策略

import redis
import json
from functools import lru_cache

r = redis.Redis()

async def check_permission_cached(user_id: str, relation: str, object_id: str) -> bool:
    """带缓存的权限检查"""
    cache_key = f"authz:{user_id}:{relation}:{object_id}"
    
    # 1. 检查缓存
    cached = r.get(cache_key)
    if cached is not None:
        return cached == b"1"
    
    # 2. 调用 OpenFGA
    result = await openfga_client.check(
        user=f"user:{user_id}",
        relation=relation,
        object=object_id,
    )
    
    # 3. 写入缓存(TTL 60秒)
    r.setex(cache_key, 60, "1" if result.allowed else "0")
    
    return result.allowed

def invalidate_permission_cache(object_id: str):
    """权限变更时清除相关缓存"""
    pattern = f"authz:*:*:{object_id}"
    for key in r.scan_iter(pattern):
        r.delete(key)

批量检查

async def batch_check_permissions(user_id: str, objects: list[str], relation: str) -> dict:
    """批量权限检查"""
    results = {}
    checks = [
        openfga_client.check(
            user=f"user:{user_id}",
            relation=relation,
            object=obj,
        )
        for obj in objects
    ]
    responses = await asyncio.gather(*checks)
    for obj, resp in zip(objects, responses):
        results[obj] = resp.allowed
    return results

17.8 授权架构实战

Python:集中式授权服务(FastAPI + OpenFGA)

"""
集中式授权服务 — FastAPI + OpenFGA
依赖: pip install fastapi uvicorn openfga-sdk redis httpx
"""
from contextlib import asynccontextmanager
from datetime import datetime
from typing import Optional

import redis.asyncio as aioredis
from fastapi import FastAPI, HTTPException, Depends, Header
from pydantic import BaseModel
from openfga_sdk import (
    ClientConfiguration,
    OpenFgaClient,
    ClientCheckRequest,
    ClientWriteRequest,
    ClientTuple,
    ClientListObjectsRequest,
)

# ── 配置 ─────────────────────────────────────────────────
OPENFGA_API_URL = "http://localhost:8080"
OPENFGA_STORE_ID = "your-store-id"
REDIS_URL = "redis://localhost:6379/0"
CACHE_TTL = 60  # 秒


# ── 生命周期管理 ─────────────────────────────────────────
fga_client: Optional[OpenFgaClient] = None
redis_client: Optional[aioredis.Redis] = None


@asynccontextmanager
async def lifespan(app: FastAPI):
    global fga_client, redis_client

    # 启动:初始化 OpenFGA 和 Redis
    config = ClientConfiguration(
        api_url=OPENFGA_API_URL,
        store_id=OPENFGA_STORE_ID,
    )
    fga_client = OpenFgaClient(config)
    redis_client = aioredis.from_url(REDIS_URL, decode_responses=True)

    yield

    # 关闭
    await fga_client.close()
    await redis_client.close()


app = FastAPI(title="Centralized Authorization Service", lifespan=lifespan)


# ── 数据模型 ─────────────────────────────────────────────
class CheckRequest(BaseModel):
    user: str       # e.g. "user:alice"
    relation: str   # e.g. "can_edit"
    object: str     # e.g. "document:readme"


class CheckResponse(BaseModel):
    allowed: bool
    cached: bool
    latency_ms: float


class GrantRequest(BaseModel):
    user: str
    relation: str
    object: str


class ListObjectsRequest(BaseModel):
    user: str
    relation: str
    type: str       # e.g. "document"


# ── 带缓存的权限检查 ─────────────────────────────────────
async def cached_check(user: str, relation: str, obj: str) -> tuple[bool, bool]:
    """返回 (allowed, cached)"""
    cache_key = f"authz:{user}:{relation}:{obj}"

    # 1. 查 Redis 缓存
    cached = await redis_client.get(cache_key)
    if cached is not None:
        return cached == "1", True

    # 2. 调用 OpenFGA
    response = await fga_client.check(ClientCheckRequest(
        user=user,
        relation=relation,
        object=obj,
    ))
    allowed = response.allowed

    # 3. 写入缓存
    await redis_client.setex(cache_key, CACHE_TTL, "1" if allowed else "0")

    return allowed, False


# ── API 端点 ─────────────────────────────────────────────
@app.post("/v1/check", response_model=CheckResponse)
async def check_permission(req: CheckRequest):
    """检查权限(带缓存)"""
    start = datetime.now()
    allowed, cached = await cached_check(req.user, req.relation, req.object)
    latency = (datetime.now() - start).total_seconds() * 1000

    return CheckResponse(allowed=allowed, cached=cached, latency_ms=round(latency, 2))


@app.post("/v1/grant")
async def grant_permission(req: GrantRequest):
    """授予权限"""
    await fga_client.write(ClientWriteRequest(
        writes=[ClientTuple(
            user=req.user,
            relation=req.relation,
            object=req.object,
        )],
    ))

    # 清除相关缓存
    pattern = f"authz:*:*:{req.object}"
    async for key in redis_client.scan_iter(pattern):
        await redis_client.delete(key)

    return {"status": "granted"}


@app.post("/v1/revoke")
async def revoke_permission(req: GrantRequest):
    """撤销权限"""
    await fga_client.write(ClientWriteRequest(
        deletes=[ClientTuple(
            user=req.user,
            relation=req.relation,
            object=req.object,
        )],
    ))

    # 清除相关缓存
    pattern = f"authz:{req.user}:{req.relation}:{req.object}"
    async for key in redis_client.scan_iter(pattern):
        await redis_client.delete(key)

    return {"status": "revoked"}


@app.post("/v1/list-objects")
async def list_objects(req: ListObjectsRequest):
    """列出用户有权访问的所有对象"""
    response = await fga_client.list_objects(ClientListObjectsRequest(
        user=req.user,
        relation=req.relation,
        type=req.type,
    ))
    return {"objects": response.objects}


@app.post("/v1/batch-check")
async def batch_check(checks: list[CheckRequest]):
    """批量权限检查"""
    import asyncio
    results = []
    tasks = [cached_check(c.user, c.relation, c.object) for c in checks]
    responses = await asyncio.gather(*tasks)

    for req, (allowed, cached) in zip(checks, responses):
        results.append({
            "user": req.user,
            "relation": req.relation,
            "object": req.object,
            "allowed": allowed,
            "cached": cached,
        })

    return {"results": results}


if __name__ == "__main__":
    import uvicorn
    uvicorn.run(app, host="0.0.0.0", port=9000)

Python:授权决策缓存(Redis 多级 TTL)

"""
多级授权缓存策略 — 本地 LRU + Redis
依赖: pip install redis cachetools
"""
import asyncio
import time
from typing import Optional

import redis.asyncio as aioredis
from cachetools import TTLCache

# ── 多级缓存 ─────────────────────────────────────────────
class AuthzCache:
    """
    L1: 进程内 TTLCache(10s,容量 10000)
    L2: Redis(60s)
    """

    def __init__(self, redis_url: str = "redis://localhost:6379/0"):
        self.l1 = TTLCache(maxsize=10000, ttl=10)
        self.l2: Optional[aioredis.Redis] = None
        self._redis_url = redis_url
        self.l2_ttl = 60
        # 统计
        self.stats = {"l1_hit": 0, "l2_hit": 0, "miss": 0}

    async def connect(self):
        self.l2 = aioredis.from_url(self._redis_url, decode_responses=True)

    async def close(self):
        if self.l2:
            await self.l2.close()

    def _cache_key(self, user: str, relation: str, obj: str) -> str:
        return f"authz:{user}:{relation}:{obj}"

    async def get(self, user: str, relation: str, obj: str) -> Optional[bool]:
        key = self._cache_key(user, relation, obj)

        # L1
        if key in self.l1:
            self.stats["l1_hit"] += 1
            return self.l1[key]

        # L2
        if self.l2:
            val = await self.l2.get(key)
            if val is not None:
                self.stats["l2_hit"] += 1
                result = val == "1"
                self.l1[key] = result  # 回填 L1
                return result

        self.stats["miss"] += 1
        return None

    async def set(self, user: str, relation: str, obj: str, allowed: bool):
        key = self._cache_key(user, relation, obj)
        self.l1[key] = allowed
        if self.l2:
            await self.l2.setex(key, self.l2_ttl, "1" if allowed else "0")

    async def invalidate(self, obj: str):
        """权限变更时清除该对象的所有缓存"""
        # 清除 L1 中匹配的 key
        keys_to_delete = [k for k in self.l1 if k.endswith(f":{obj}")]
        for k in keys_to_delete:
            del self.l1[k]

        # 清除 L2
        if self.l2:
            pattern = f"authz:*:*:{obj}"
            async for key in self.l2.scan_iter(pattern):
                await self.l2.delete(key)

    def get_stats(self) -> dict:
        total = sum(self.stats.values())
        return {
            **self.stats,
            "total": total,
            "l1_hit_rate": f"{self.stats['l1_hit']/max(total,1)*100:.1f}%",
            "l2_hit_rate": f"{self.stats['l2_hit']/max(total,1)*100:.1f}%",
        }


# ── 使用示例 ─────────────────────────────────────────────
async def main():
    cache = AuthzCache()
    await cache.connect()

    # 模拟权限检查
    result = await cache.get("user:alice", "can_edit", "doc:1")
    if result is None:
        # 缓存未命中,调用 PDP
        result = True  # 假设 PDP 返回 True
        await cache.set("user:alice", "can_edit", "doc:1", result)

    print(f"Allowed: {result}")
    print(f"Stats: {cache.get_stats()}")

    await cache.close()


if __name__ == "__main__":
    asyncio.run(main())

Java:Spring Security 自定义 AuthorizationManager

/**
 * Spring Security 自定义 AuthorizationManager — 集成 OpenFGA
 * 依赖: spring-boot-starter-security, spring-boot-starter-cache,
 *       com.github.ben-manes.caffeine:caffeine, dev.openfga:openfga-sdk
 */

// ── 1. OpenFGA 授权管理器 ──────────────────────────────
import dev.openfga.sdk.api.client.OpenFgaClient;
import dev.openfga.sdk.api.client.model.ClientCheckRequest;
import org.springframework.security.authorization.AuthorizationDecision;
import org.springframework.security.authorization.AuthorizationManager;
import org.springframework.security.core.Authentication;
import org.springframework.security.web.access.intercept.RequestAuthorizationContext;
import org.springframework.stereotype.Component;

import java.util.function.Supplier;

@Component
public class OpenFgaAuthorizationManager
        implements AuthorizationManager<RequestAuthorizationContext> {

    private final OpenFgaClient fgaClient;
    private final AuthzCacheService cacheService;

    public OpenFgaAuthorizationManager(OpenFgaClient fgaClient,
                                        AuthzCacheService cacheService) {
        this.fgaClient = fgaClient;
        this.cacheService = cacheService;
    }

    @Override
    public AuthorizationDecision check(
            Supplier<Authentication> authentication,
            RequestAuthorizationContext context) {

        Authentication auth = authentication.get();
        if (auth == null || !auth.isAuthenticated()) {
            return new AuthorizationDecision(false);
        }

        String user = "user:" + auth.getName();
        String method = context.getRequest().getMethod();
        String path = context.getRequest().getRequestURI();

        // 将 HTTP 方法映射为关系
        String relation = mapMethodToRelation(method);
        String object = mapPathToObject(path);

        // 带缓存的权限检查
        boolean allowed = cacheService.checkPermission(user, relation, object);
        return new AuthorizationDecision(allowed);
    }

    private String mapMethodToRelation(String method) {
        return switch (method) {
            case "GET"    -> "can_view";
            case "POST"   -> "can_create";
            case "PUT", "PATCH" -> "can_edit";
            case "DELETE" -> "can_delete";
            default       -> "can_view";
        };
    }

    private String mapPathToObject(String path) {
        // /api/documents/123 → document:123
        String[] parts = path.split("/");
        if (parts.length >= 4) {
            String type = parts[2].replaceAll("s$", ""); // documents → document
            return type + ":" + parts[3];
        }
        return "resource:" + path;
    }
}


// ── 2. Caffeine 缓存服务 ──────────────────────────────
import com.github.benmanes.caffeine.cache.Cache;
import com.github.benmanes.caffeine.cache.Caffeine;
import dev.openfga.sdk.api.client.OpenFgaClient;
import dev.openfga.sdk.api.client.model.ClientCheckRequest;
import org.springframework.stereotype.Service;

import java.time.Duration;

@Service
public class AuthzCacheService {

    private final OpenFgaClient fgaClient;
    private final Cache<String, Boolean> cache;

    public AuthzCacheService(OpenFgaClient fgaClient) {
        this.fgaClient = fgaClient;
        this.cache = Caffeine.newBuilder()
                .maximumSize(50_000)
                .expireAfterWrite(Duration.ofSeconds(30))
                .recordStats()
                .build();
    }

    public boolean checkPermission(String user, String relation, String object) {
        String key = user + ":" + relation + ":" + object;

        Boolean cached = cache.getIfPresent(key);
        if (cached != null) {
            return cached;
        }

        try {
            var response = fgaClient.check(new ClientCheckRequest()
                    .user(user)
                    .relation(relation)
                    ._object(object)
            ).get();

            boolean allowed = response.getAllowed();
            cache.put(key, allowed);
            return allowed;
        } catch (Exception e) {
            // 授权服务不可用时,默认拒绝
            return false;
        }
    }

    public void invalidate(String object) {
        // 清除与该对象相关的所有缓存
        cache.asMap().keySet().removeIf(k -> k.endsWith(":" + object));
    }

    public com.github.benmanes.caffeine.cache.stats.CacheStats getStats() {
        return cache.stats();
    }
}


// ── 3. Security 配置 ──────────────────────────────────
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.security.config.annotation.web.builders.HttpSecurity;
import org.springframework.security.config.annotation.web.configuration.EnableWebSecurity;
import org.springframework.security.web.SecurityFilterChain;

@Configuration
@EnableWebSecurity
public class SecurityConfig {

    private final OpenFgaAuthorizationManager authzManager;

    public SecurityConfig(OpenFgaAuthorizationManager authzManager) {
        this.authzManager = authzManager;
    }

    @Bean
    public SecurityFilterChain securityFilterChain(HttpSecurity http) throws Exception {
        http
            .authorizeHttpRequests(auth -> auth
                .requestMatchers("/", "/health", "/public/**").permitAll()
                .requestMatchers("/api/**").access(authzManager)
                .anyRequest().authenticated()
            )
            .oauth2ResourceServer(oauth2 -> oauth2.jwt(jwt -> {}));

        return http.build();
    }
}

Java:授权决策缓存(Caffeine Cache + 审计)

/**
 * 带审计日志的授权决策缓存
 */
import com.github.benmanes.caffeine.cache.Cache;
import com.github.benmanes.caffeine.cache.Caffeine;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Service;

import java.time.Duration;
import java.time.Instant;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;

@Service
public class AuditedAuthzCache {

    private static final Logger auditLog =
            LoggerFactory.getLogger("AUTHZ_AUDIT");

    private final Cache<String, AuthzDecision> cache;

    public AuditedAuthzCache() {
        this.cache = Caffeine.newBuilder()
                .maximumSize(100_000)
                .expireAfterWrite(Duration.ofSeconds(30))
                .expireAfterAccess(Duration.ofSeconds(15))
                .recordStats()
                .removalListener((key, value, cause) -> {
                    if (cause.wasEvicted()) {
                        auditLog.debug("Cache evicted: {} reason={}", key, cause);
                    }
                })
                .build();
    }

    public record AuthzDecision(
            boolean allowed,
            Instant decidedAt,
            String source  // "cache" or "pdp"
    ) {}

    public AuthzDecision checkWithAudit(
            String user, String relation, String object,
            AuthzProvider pdpProvider) {

        String key = user + ":" + relation + ":" + object;

        AuthzDecision cached = cache.getIfPresent(key);
        if (cached != null) {
            auditLog.info("AUTHZ_CHECK user={} relation={} object={} "
                            + "allowed={} source=cache",
                    user, relation, object, cached.allowed());
            return new AuthzDecision(cached.allowed(), cached.decidedAt(), "cache");
        }

        // 调用 PDP
        boolean allowed = pdpProvider.check(user, relation, object);
        var decision = new AuthzDecision(allowed, Instant.now(), "pdp");
        cache.put(key, decision);

        auditLog.info("AUTHZ_CHECK user={} relation={} object={} "
                        + "allowed={} source=pdp",
                user, relation, object, allowed);

        return decision;
    }

    /**
     * 批量检查 — 先查缓存,缓存未命中的批量调用 PDP
     */
    public Map<String, Boolean> batchCheck(
            String user, String relation, List<String> objects,
            AuthzProvider pdpProvider) {

        Map<String, Boolean> results = new ConcurrentHashMap<>();
        List<String> uncached = objects.stream()
                .filter(obj -> {
                    String key = user + ":" + relation + ":" + obj;
                    AuthzDecision d = cache.getIfPresent(key);
                    if (d != null) {
                        results.put(obj, d.allowed());
                        return false;
                    }
                    return true;
                })
                .collect(Collectors.toList());

        // 批量调用 PDP
        if (!uncached.isEmpty()) {
            Map<String, Boolean> pdpResults =
                    pdpProvider.batchCheck(user, relation, uncached);
            pdpResults.forEach((obj, allowed) -> {
                String key = user + ":" + relation + ":" + obj;
                cache.put(key, new AuthzDecision(allowed, Instant.now(), "pdp"));
                results.put(obj, allowed);
            });
        }

        return results;
    }

    @FunctionalInterface
    public interface AuthzProvider {
        boolean check(String user, String relation, String object);

        default Map<String, Boolean> batchCheck(
                String user, String relation, List<String> objects) {
            return objects.stream().collect(Collectors.toMap(
                    obj -> obj,
                    obj -> check(user, relation, obj)
            ));
        }
    }

    public String getStatsReport() {
        var stats = cache.stats();
        return String.format(
                "hits=%d misses=%d hitRate=%.2f%% evictions=%d",
                stats.hitCount(), stats.missCount(),
                stats.hitRate() * 100, stats.evictionCount()
        );
    }
}

Go:授权中间件链(认证 → 授权 → 审计)

/*
 * Go 授权中间件链 — 认证 → 授权 → 审计
 * 依赖:
 *   go get github.com/golang-jwt/jwt/v5
 *   go get github.com/openfga/go-sdk
 *   go get github.com/patrickmn/go-cache
 */
package main

import (
	"context"
	"encoding/json"
	"fmt"
	"log"
	"net/http"
	"strings"
	"time"

	"github.com/golang-jwt/jwt/v5"
	openfga "github.com/openfga/go-sdk/client"
	gocache "github.com/patrickmn/go-cache"
)

// ── 上下文 Key ──────────────────────────────────────────
type contextKey string

const (
	userContextKey  contextKey = "user"
	auditContextKey contextKey = "audit"
)

// ── 用户信息 ────────────────────────────────────────────
type UserInfo struct {
	ID    string   `json:"sub"`
	Name  string   `json:"name"`
	Email string   `json:"email"`
	Roles []string `json:"roles"`
}

// ── 审计记录 ────────────────────────────────────────────
type AuditEntry struct {
	Timestamp  time.Time `json:"timestamp"`
	UserID     string    `json:"user_id"`
	Action     string    `json:"action"`
	Resource   string    `json:"resource"`
	Allowed    bool      `json:"allowed"`
	LatencyMs  float64   `json:"latency_ms"`
	CacheHit   bool      `json:"cache_hit"`
	IP         string    `json:"ip"`
	UserAgent  string    `json:"user_agent"`
}

// ── 中间件:认证 ────────────────────────────────────────
func AuthenticationMiddleware(jwtSecret []byte) func(http.Handler) http.Handler {
	return func(next http.Handler) http.Handler {
		return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
			authHeader := r.Header.Get("Authorization")
			if !strings.HasPrefix(authHeader, "Bearer ") {
				http.Error(w, `{"error":"missing token"}`, http.StatusUnauthorized)
				return
			}

			tokenStr := strings.TrimPrefix(authHeader, "Bearer ")
			token, err := jwt.Parse(tokenStr, func(t *jwt.Token) (interface{}, error) {
				if _, ok := t.Method.(*jwt.SigningMethodHMAC); !ok {
					return nil, fmt.Errorf("unexpected signing method: %v",
						t.Header["alg"])
				}
				return jwtSecret, nil
			})

			if err != nil || !token.Valid {
				http.Error(w, `{"error":"invalid token"}`, http.StatusUnauthorized)
				return
			}

			claims, ok := token.Claims.(jwt.MapClaims)
			if !ok {
				http.Error(w, `{"error":"invalid claims"}`, http.StatusUnauthorized)
				return
			}

			user := &UserInfo{
				ID:    claims["sub"].(string),
				Name:  getStringClaim(claims, "name"),
				Email: getStringClaim(claims, "email"),
			}

			// 提取角色
			if roles, ok := claims["roles"].([]interface{}); ok {
				for _, r := range roles {
					if s, ok := r.(string); ok {
						user.Roles = append(user.Roles, s)
					}
				}
			}

			ctx := context.WithValue(r.Context(), userContextKey, user)
			next.ServeHTTP(w, r.WithContext(ctx))
		})
	}
}

func getStringClaim(claims jwt.MapClaims, key string) string {
	if v, ok := claims[key].(string); ok {
		return v
	}
	return ""
}

// ── 中间件:授权(OpenFGA + go-cache) ──────────────────
type AuthzMiddleware struct {
	fgaClient *openfga.OpenFgaClient
	cache     *gocache.Cache
}

func NewAuthzMiddleware(fgaClient *openfga.OpenFgaClient) *AuthzMiddleware {
	return &AuthzMiddleware{
		fgaClient: fgaClient,
		// 默认过期 30s,清理间隔 60s
		cache: gocache.New(30*time.Second, 60*time.Second),
	}
}

func (am *AuthzMiddleware) Middleware(next http.Handler) http.Handler {
	return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
		user, ok := r.Context().Value(userContextKey).(*UserInfo)
		if !ok || user == nil {
			http.Error(w, `{"error":"unauthenticated"}`, http.StatusUnauthorized)
			return
		}

		relation := mapHTTPMethodToRelation(r.Method)
		object := mapPathToObject(r.URL.Path)
		fgaUser := "user:" + user.ID

		start := time.Now()
		allowed, cacheHit := am.checkWithCache(r.Context(), fgaUser, relation, object)
		latency := time.Since(start)

		// 将审计信息存入上下文
		audit := &AuditEntry{
			Timestamp: time.Now(),
			UserID:    user.ID,
			Action:    relation,
			Resource:  object,
			Allowed:   allowed,
			LatencyMs: float64(latency.Microseconds()) / 1000.0,
			CacheHit:  cacheHit,
			IP:        r.RemoteAddr,
			UserAgent: r.UserAgent(),
		}
		ctx := context.WithValue(r.Context(), auditContextKey, audit)

		if !allowed {
			http.Error(w, `{"error":"forbidden"}`, http.StatusForbidden)
			return
		}

		next.ServeHTTP(w, r.WithContext(ctx))
	})
}

func (am *AuthzMiddleware) checkWithCache(
	ctx context.Context, user, relation, object string,
) (bool, bool) {
	cacheKey := user + ":" + relation + ":" + object

	// 查缓存
	if val, found := am.cache.Get(cacheKey); found {
		return val.(bool), true
	}

	// 调用 OpenFGA
	body := openfga.ClientCheckRequest{
		User:     user,
		Relation: relation,
		Object:   object,
	}
	resp, err := am.fgaClient.Check(ctx).Body(body).Execute()
	if err != nil {
		log.Printf("OpenFGA check error: %v", err)
		return false, false // 默认拒绝
	}

	allowed := resp.GetAllowed()
	am.cache.Set(cacheKey, allowed, gocache.DefaultExpiration)
	return allowed, false
}

func mapHTTPMethodToRelation(method string) string {
	switch method {
	case "GET":
		return "can_view"
	case "POST":
		return "can_create"
	case "PUT", "PATCH":
		return "can_edit"
	case "DELETE":
		return "can_delete"
	default:
		return "can_view"
	}
}

func mapPathToObject(path string) string {
	parts := strings.Split(strings.Trim(path, "/"), "/")
	if len(parts) >= 3 {
		// /api/documents/123 → document:123
		resourceType := strings.TrimSuffix(parts[1], "s")
		return resourceType + ":" + parts[2]
	}
	return "resource:" + path
}

// ── 中间件:审计日志 ────────────────────────────────────
func AuditMiddleware(next http.Handler) http.Handler {
	return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
		next.ServeHTTP(w, r)

		// 从上下文获取审计记录
		audit, ok := r.Context().Value(auditContextKey).(*AuditEntry)
		if !ok {
			return
		}

		// 输出结构化审计日志
		auditJSON, _ := json.Marshal(audit)
		log.Printf("AUDIT: %s", string(auditJSON))

		// 生产环境中可发送到 Kafka / Elasticsearch
	})
}

// ── 业务处理器 ──────────────────────────────────────────
func handleGetDocument(w http.ResponseWriter, r *http.Request) {
	user := r.Context().Value(userContextKey).(*UserInfo)
	w.Header().Set("Content-Type", "application/json")
	json.NewEncoder(w).Encode(map[string]interface{}{
		"message":  "Document content",
		"user":     user.Name,
		"resource": r.URL.Path,
	})
}

// ── 主函数:组装中间件链 ────────────────────────────────
func main() {
	jwtSecret := []byte("your-256-bit-secret")

	// 初始化 OpenFGA Client
	fgaClient, err := openfga.NewSdkClient(&openfga.ClientConfiguration{
		ApiUrl:  "http://localhost:8080",
		StoreId: "your-store-id",
	})
	if err != nil {
		log.Fatalf("Failed to create OpenFGA client: %v", err)
	}

	authzMW := NewAuthzMiddleware(fgaClient)

	// 组装中间件链:审计 → 认证 → 授权 → 业务处理
	handler := http.HandlerFunc(handleGetDocument)
	chain := AuditMiddleware(
		AuthenticationMiddleware(jwtSecret)(
			authzMW.Middleware(handler),
		),
	)

	http.Handle("/api/", chain)

	// 健康检查(无需认证)
	http.HandleFunc("/health", func(w http.ResponseWriter, r *http.Request) {
		w.Write([]byte(`{"status":"ok"}`))
	})

	log.Println("Authorization middleware demo listening on :8080")
	log.Fatal(http.ListenAndServe(":8080", nil))
}

Go:授权决策缓存(go-cache 多级策略)

/*
 * Go 多级授权缓存 — go-cache (L1) + Redis (L2)
 * 依赖:
 *   go get github.com/patrickmn/go-cache
 *   go get github.com/redis/go-redis/v9
 */
package authzcache

import (
	"context"
	"fmt"
	"sync/atomic"
	"time"

	"github.com/patrickmn/go-cache"
	"github.com/redis/go-redis/v9"
)

// ── 缓存统计 ────────────────────────────────────────────
type CacheStats struct {
	L1Hits   int64 `json:"l1_hits"`
	L2Hits   int64 `json:"l2_hits"`
	Misses   int64 `json:"misses"`
	Total    int64 `json:"total"`
	L1Rate   string `json:"l1_hit_rate"`
	L2Rate   string `json:"l2_hit_rate"`
}

// ── 多级授权缓存 ────────────────────────────────────────
type MultiLevelAuthzCache struct {
	l1      *cache.Cache
	l2      *redis.Client
	l1TTL   time.Duration
	l2TTL   time.Duration
	stats   struct {
		l1Hits atomic.Int64
		l2Hits atomic.Int64
		misses atomic.Int64
	}
}

func NewMultiLevelAuthzCache(redisAddr string) *MultiLevelAuthzCache {
	return &MultiLevelAuthzCache{
		l1:    cache.New(10*time.Second, 30*time.Second),
		l2:    redis.NewClient(&redis.Options{Addr: redisAddr}),
		l1TTL: 10 * time.Second,
		l2TTL: 60 * time.Second,
	}
}

func (c *MultiLevelAuthzCache) cacheKey(user, relation, object string) string {
	return fmt.Sprintf("authz:%s:%s:%s", user, relation, object)
}

// Check 查询缓存,返回 (allowed, found)
func (c *MultiLevelAuthzCache) Check(
	ctx context.Context, user, relation, object string,
) (bool, bool) {
	key := c.cacheKey(user, relation, object)

	// L1: go-cache
	if val, found := c.l1.Get(key); found {
		c.stats.l1Hits.Add(1)
		return val.(bool), true
	}

	// L2: Redis
	val, err := c.l2.Get(ctx, key).Result()
	if err == nil {
		c.stats.l2Hits.Add(1)
		allowed := val == "1"
		c.l1.Set(key, allowed, c.l1TTL) // 回填 L1
		return allowed, true
	}

	c.stats.misses.Add(1)
	return false, false
}

// Set 写入两级缓存
func (c *MultiLevelAuthzCache) Set(
	ctx context.Context, user, relation, object string, allowed bool,
) {
	key := c.cacheKey(user, relation, object)
	c.l1.Set(key, allowed, c.l1TTL)

	val := "0"
	if allowed {
		val = "1"
	}
	c.l2.Set(ctx, key, val, c.l2TTL)
}

// Invalidate 清除指定对象的所有缓存
func (c *MultiLevelAuthzCache) Invalidate(ctx context.Context, object string) {
	// 清除 L1
	for k := range c.l1.Items() {
		if len(k) > len(object) && k[len(k)-len(object)-1:] == ":"+object {
			c.l1.Delete(k)
		}
	}

	// 清除 L2
	pattern := fmt.Sprintf("authz:*:*:%s", object)
	iter := c.l2.Scan(ctx, 0, pattern, 100).Iterator()
	for iter.Next(ctx) {
		c.l2.Del(ctx, iter.Val())
	}
}

// Stats 返回缓存统计
func (c *MultiLevelAuthzCache) Stats() CacheStats {
	l1 := c.stats.l1Hits.Load()
	l2 := c.stats.l2Hits.Load()
	miss := c.stats.misses.Load()
	total := l1 + l2 + miss

	l1Rate := "0.0%"
	l2Rate := "0.0%"
	if total > 0 {
		l1Rate = fmt.Sprintf("%.1f%%", float64(l1)/float64(total)*100)
		l2Rate = fmt.Sprintf("%.1f%%", float64(l2)/float64(total)*100)
	}

	return CacheStats{
		L1Hits: l1,
		L2Hits: l2,
		Misses: miss,
		Total:  total,
		L1Rate: l1Rate,
		L2Rate: l2Rate,
	}
}

// Close 关闭连接
func (c *MultiLevelAuthzCache) Close() error {
	return c.l2.Close()
}

17.9 授权性能基准测试

以下为典型授权方案在不同场景下的性能基准(参考值,实际取决于硬件和网络):

方案

单次检查延迟 (p50)

单次检查延迟 (p99)

吞吐量 (QPS)

备注

应用内 RBAC(内存)

< 0.01ms

< 0.05ms

> 500,000

最快,但灵活性低

OPA Sidecar(本地)

0.1 - 0.5ms

1 - 2ms

50,000 - 100,000

Rego 策略复杂度影响性能

OpenFGA(远程调用)

2 - 5ms

10 - 20ms

5,000 - 20,000

取决于关系图深度

OpenFGA + Redis 缓存

0.5 - 1ms

2 - 5ms

30,000 - 80,000

缓存命中率 > 90% 时

OpenFGA + 本地缓存

< 0.1ms

0.5ms

> 200,000

L1 命中时接近内存速度

Cedar(本地)

0.05 - 0.2ms

0.5 - 1ms

100,000+

AWS 开源,Rust 实现

缓存命中率与延迟关系

缓存命中率

平均延迟

适用场景

0%(无缓存)

3 - 10ms

权限频繁变更的场景

50%

2 - 5ms

一般业务场景

80%

0.5 - 2ms

读多写少的场景

95%+

< 0.5ms

权限稳定的生产环境

优化建议

优化手段

效果

复杂度

本地 LRU 缓存(L1)

延迟降低 10-100x

Redis 分布式缓存(L2)

跨实例共享,减少 PDP 调用

批量检查 API

减少网络往返

预热缓存(登录时)

首次访问无延迟

策略简化

减少 PDP 计算时间

连接池 + Keep-Alive

减少连接建立开销

17.10 授权架构演进路线

单体 → 微服务 → Service Mesh

        flowchart LR
    subgraph Phase1["阶段 1:单体应用"]
        direction TB
        M1["单体应用"]
        M2["内置 RBAC<br/>if/else 或注解"]
        M1 --- M2
    end

    subgraph Phase2["阶段 2:微服务"]
        direction TB
        M3["API Gateway<br/>粗粒度授权"]
        M4["各服务<br/>调用集中授权服务"]
        M3 --> M4
    end

    subgraph Phase3["阶段 3:Service Mesh"]
        direction TB
        M5["Istio/Envoy<br/>AuthorizationPolicy"]
        M6["OPA Sidecar<br/>细粒度策略"]
        M7["OpenFGA<br/>关系型权限"]
        M5 --> M6
        M6 --> M7
    end

    Phase1 -->|"拆分服务"| Phase2
    Phase2 -->|"引入 Mesh"| Phase3

    style Phase1 fill:#ffccbc,stroke:#bf360c
    style Phase2 fill:#fff9c4,stroke:#f9a825
    style Phase3 fill:#c8e6c9,stroke:#2e7d32
    

各阶段详细对比

维度

单体 RBAC

微服务 + 集中授权

Service Mesh + 外部化

授权粒度

粗(角色级)

中(API 级)

细(资源级)

策略管理

代码内

集中服务

策略即代码(Git)

部署耦合

强(改权限需发版)

无(策略热更新)

性能

极快(内存)

中(网络调用)

快(Sidecar 本地)

审计

手动

集中日志

自动(Mesh 遥测)

适用规模

< 5 人团队

5 - 50 人

50+ 人 / 大规模微服务

迁移策略

        flowchart TB
    Start["现状评估"] --> Q1{"当前架构?"}

    Q1 -->|单体| Step1["1. 抽取授权逻辑为独立模块"]
    Step1 --> Step2["2. 引入策略引擎(OPA/Casbin)"]
    Step2 --> Step3["3. 拆分为授权微服务"]

    Q1 -->|微服务| Step3
    Step3 --> Step4["4. 添加 API Gateway 粗粒度授权"]
    Step4 --> Step5["5. 引入 OpenFGA 关系型权限"]
    Step5 --> Step6["6. 部署 OPA Sidecar"]

    Q1 -->|Service Mesh| Step6
    Step6 --> Step7["7. 配置 AuthorizationPolicy"]
    Step7 --> Done["✅ 完整的外部化授权"]

    style Start fill:#e3f2fd,stroke:#1565c0
    style Done fill:#c8e6c9,stroke:#2e7d32
    

17.11 实战:SaaS 平台授权架构

        flowchart TB
    subgraph Clients["客户端"]
        Web["Web/Mobile"]
    end

    subgraph Gateway["API Gateway (Kong/Envoy)"]
        JWT["JWT 验证"]
        Tenant["租户路由"]
        Rate["速率限制"]
        JWT --> Tenant --> Rate
    end

    subgraph Services["微服务集群"]
        US["用户服务"]
        PS["项目服务"]
        RS["资源服务"]
        US --> PS --> RS
    end

    subgraph AuthzLayer["授权服务层"]
        FGA["OpenFGA<br/>(ReBAC 关系权限)"]
        OPA["OPA<br/>(ABAC 条件策略)"]
    end

    Web --> Gateway
    Gateway --> Services
    Services --> AuthzLayer

    style Gateway fill:#e3f2fd,stroke:#1565c0
    style AuthzLayer fill:#c8e6c9,stroke:#2e7d32
    style Services fill:#fff3e0,stroke:#e65100
    

17.12 小结

  • 授权架构从硬编码演进到外部化授权即服务

  • XACML 架构(PEP/PDP/PIP/PAP)是授权系统的标准参考架构

  • 集中式适合一致性要求高的场景,分布式适合低延迟要求,混合模式最实用

  • 多租户授权需要数据隔离、权限隔离、管理隔离

  • 多级缓存(本地 L1 + Redis L2)是授权性能优化的关键手段

  • 授权架构演进:单体 RBAC → 微服务集中授权 → Service Mesh 外部化

  • OpenFGA(ReBAC)+ OPA(ABAC)的组合覆盖大多数授权需求