# 第十七章:授权架构模式
> "好的授权架构应该像空气一样:无处不在,但用户感觉不到它的存在。"
```{mermaid}
mindmap
root((授权架构))
演进
硬编码
配置文件
策略引擎
外部化授权
XACML 架构
PEP
PDP
PIP
PAP
微服务模式
Gateway
Service Mesh
应用层
混合
多租户
租户隔离
权限继承
跨租户
```
## 17.1 授权架构的演进
```
Level 0: 硬编码
if user.role == "admin":
allow()
→ 问题:修改权限需要改代码、重新部署
Level 1: 配置文件
permissions = load_config("permissions.yaml")
→ 问题:缺乏测试、审计,配置散落各处
Level 2: 策略引擎
result = opa.evaluate(input)
→ 改进:策略与代码分离,可测试
Level 3: 外部化授权(Authorization as a Service)
result = authz_service.check(user, action, resource)
→ 最佳:统一授权服务,所有应用共享
```
## 17.2 XACML 授权架构
```{mermaid}
flowchart TB
User["👤 用户"] --> PEP
PEP["PEP
策略执行点
(API Gateway / 中间件)"]
PEP -->|授权请求| PDP
PEP --> App["🖥️ 应用资源"]
PDP["PDP
策略决策点
(OPA / OpenFGA / Cedar)"]
PAP["PAP
策略管理点
(管理界面 / Git)"] -->|策略更新| PDP
PDP -->|查询属性| PIP
PIP["PIP
策略信息点
(用户属性 / 资源属性)"]
style PEP fill:#bbdefb,stroke:#1565c0
style PDP fill:#c8e6c9,stroke:#2e7d32
style PIP fill:#fff9c4,stroke:#f9a825
style PAP fill:#f8bbd0,stroke:#c62828
```
| 组件 | 职责 | 实现 |
|------|------|------|
| PEP | 拦截请求,执行决策 | API Gateway、中间件、Sidecar |
| PDP | 评估策略,做出决策 | OPA、OpenFGA、Cedar |
| PIP | 提供决策所需的属性数据 | 用户服务、LDAP、数据库 |
| PAP | 管理策略的创建和更新 | 管理界面、Git |
## 17.3 微服务授权模式
### 模式 1:Gateway 集中授权
```{mermaid}
flowchart LR
Client["🖥️ 客户端"] --> GW
subgraph GW["API Gateway"]
direction TB
Auth["认证"]
Authz["授权 ✅"]
RL["限流"]
Auth --> Authz --> RL
end
GW --> MS["微服务
信任 Gateway
不做授权"]
style GW fill:#e3f2fd,stroke:#1565c0
style MS fill:#f5f5f5,stroke:#616161
```
**优点**:简单、集中管理
**缺点**:粗粒度、Gateway 成为瓶颈
### 模式 2:Service Mesh 授权
```{mermaid}
flowchart LR
subgraph Pod["Kubernetes Pod"]
direction LR
MS2["微服务
业务逻辑"]
subgraph Sidecar["Envoy Sidecar"]
mTLS["mTLS"]
AP["AuthzPolicy"]
end
Sidecar <--> MS2
end
External["外部流量"] --> Sidecar
style Pod fill:#e8f5e9,stroke:#2e7d32
style Sidecar fill:#fff3e0,stroke:#e65100
```
### 模式 3:应用层授权
```python
# 应用内授权 — 最细粒度
@app.put("/api/documents/{doc_id}")
async def update_document(
doc_id: str,
user: User = Depends(get_current_user),
):
# 调用外部授权服务
allowed = await openfga_client.check(
user=f"user:{user.id}",
relation="can_edit",
object=f"document:{doc_id}",
)
if not allowed:
raise HTTPException(403, "Cannot edit this document")
# 业务逻辑...
```
### 模式 4:混合模式(推荐)
```{mermaid}
flowchart LR
Client2["🖥️ 客户端"] --> GW2
subgraph GW2["API Gateway — 粗粒度"]
direction TB
A1["认证"]
A2["角色检查"]
A3["速率限制"]
A1 --> A2 --> A3
end
GW2 --> MS3
subgraph MS3["微服务 — 细粒度"]
direction TB
B1["资源级权限"]
B2["OpenFGA / OPA"]
B1 --> B2
end
style GW2 fill:#e3f2fd,stroke:#1565c0
style MS3 fill:#fce4ec,stroke:#c62828
```
## 17.4 集中式 vs 分布式 vs 混合授权架构
### 集中式授权架构
所有授权决策由一个中心服务统一处理。
```{mermaid}
flowchart TB
S1["服务 A"] -->|授权请求| CAS["🏛️ 集中授权服务
(OpenFGA / OPA)"]
S2["服务 B"] -->|授权请求| CAS
S3["服务 C"] -->|授权请求| CAS
CAS -->|查询| PS["策略存储"]
CAS -->|查询| DS["数据源
(用户/资源属性)"]
style CAS fill:#c8e6c9,stroke:#2e7d32,stroke-width:2px
style PS fill:#fff9c4,stroke:#f9a825
style DS fill:#e1bee7,stroke:#7b1fa2
```
| 优点 | 缺点 |
|------|------|
| 策略统一管理,一致性强 | 单点故障风险 |
| 审计日志集中 | 网络延迟(每次请求都需远程调用) |
| 策略变更即时生效 | 高并发下可能成为瓶颈 |
### 分布式授权架构
每个服务内嵌授权引擎,策略通过同步机制分发。
```{mermaid}
flowchart TB
PAP2["PAP
策略管理"] -->|策略分发| Bundle["策略 Bundle
(Git / S3)"]
Bundle -->|拉取| OPA1["OPA Sidecar A"]
Bundle -->|拉取| OPA2["OPA Sidecar B"]
Bundle -->|拉取| OPA3["OPA Sidecar C"]
S4["服务 A"] -->|本地调用| OPA1
S5["服务 B"] -->|本地调用| OPA2
S6["服务 C"] -->|本地调用| OPA3
style PAP2 fill:#f8bbd0,stroke:#c62828
style Bundle fill:#fff9c4,stroke:#f9a825
style OPA1 fill:#c8e6c9,stroke:#2e7d32
style OPA2 fill:#c8e6c9,stroke:#2e7d32
style OPA3 fill:#c8e6c9,stroke:#2e7d32
```
| 优点 | 缺点 |
|------|------|
| 无网络延迟(本地决策) | 策略同步有延迟(最终一致) |
| 无单点故障 | 策略版本管理复杂 |
| 高性能、高可用 | 审计日志分散 |
### 混合授权架构
Gateway 做粗粒度授权,服务内做细粒度授权,兼顾性能与精度。
```{mermaid}
flowchart TB
Client3["客户端"] --> GW3
subgraph GW3["API Gateway — 粗粒度"]
G1["JWT 验证"]
G2["角色/Scope 检查"]
G1 --> G2
end
GW3 --> S7["服务 A"]
GW3 --> S8["服务 B"]
S7 -->|细粒度| OPA4["OPA Sidecar"]
S8 -->|关系查询| FGA["OpenFGA
(集中式 ReBAC)"]
subgraph Audit["审计层"]
AL["集中审计日志"]
end
S7 --> AL
S8 --> AL
style GW3 fill:#e3f2fd,stroke:#1565c0
style OPA4 fill:#c8e6c9,stroke:#2e7d32
style FGA fill:#c8e6c9,stroke:#2e7d32
style Audit fill:#fff3e0,stroke:#e65100
```
### 授权决策缓存策略
```{mermaid}
sequenceDiagram
participant App as 应用服务
participant Cache as 本地缓存
(Caffeine/go-cache)
participant Redis as Redis 分布式缓存
participant PDP as PDP
(OpenFGA/OPA)
App->>Cache: 1. 查询本地缓存
alt 本地缓存命中
Cache-->>App: 2a. 返回缓存结果 ⚡
else 本地缓存未命中
App->>Redis: 2b. 查询 Redis
alt Redis 命中
Redis-->>App: 3a. 返回缓存结果
App->>Cache: 3b. 写入本地缓存
else Redis 未命中
App->>PDP: 3c. 调用 PDP 决策
PDP-->>App: 4. 返回授权结果
App->>Redis: 5a. 写入 Redis (TTL 60s)
App->>Cache: 5b. 写入本地缓存 (TTL 10s)
end
end
```
## 17.5 授权架构选型决策树
```{mermaid}
flowchart TB
Start["开始选型"] --> Q1{"服务数量?"}
Q1 -->|"单体 / < 5 个"| Q2{"需要细粒度
资源级权限?"}
Q1 -->|"> 5 个微服务"| Q3{"延迟要求?"}
Q2 -->|否| R1["✅ 应用内 RBAC
Spring Security / Casbin"]
Q2 -->|是| R2["✅ 集中式 + OpenFGA
应用层 ReBAC"]
Q3 -->|"< 5ms"| R3["✅ 分布式
OPA Sidecar"]
Q3 -->|"< 50ms 可接受"| Q4{"需要关系型
权限模型?"}
Q4 -->|是| R4["✅ 集中式 OpenFGA
+ Redis 缓存"]
Q4 -->|否| R5["✅ 混合模式
Gateway + OPA Sidecar"]
style Start fill:#e3f2fd,stroke:#1565c0
style R1 fill:#c8e6c9,stroke:#2e7d32
style R2 fill:#c8e6c9,stroke:#2e7d32
style R3 fill:#c8e6c9,stroke:#2e7d32
style R4 fill:#c8e6c9,stroke:#2e7d32
style R5 fill:#c8e6c9,stroke:#2e7d32
```
## 17.6 多租户授权设计
```
多租户授权模型(OpenFGA):
type user
type tenant
relations
define admin: [user]
define member: [user] or admin
type project
relations
define tenant: [tenant]
define admin: [user] or admin from tenant
define member: [user] or admin
define can_manage: admin
define can_access: member or can_manage
type resource
relations
define project: [project]
define owner: [user]
define editor: [user] or owner or can_manage from project
define viewer: [user] or editor or can_access from project
```
### 租户隔离策略
| 策略 | 描述 | 实现 |
|------|------|------|
| 数据隔离 | 每个租户的数据完全隔离 | 数据库 schema/表前缀 |
| 权限隔离 | 租户 A 的用户无法访问租户 B | OpenFGA 关系检查 |
| 管理隔离 | 租户管理员只能管理自己的租户 | 租户级 admin 角色 |
| 跨租户访问 | 特殊场景下的跨租户协作 | 显式的跨租户关系元组 |
## 17.7 性能优化
### 缓存策略
```python
import redis
import json
from functools import lru_cache
r = redis.Redis()
async def check_permission_cached(user_id: str, relation: str, object_id: str) -> bool:
"""带缓存的权限检查"""
cache_key = f"authz:{user_id}:{relation}:{object_id}"
# 1. 检查缓存
cached = r.get(cache_key)
if cached is not None:
return cached == b"1"
# 2. 调用 OpenFGA
result = await openfga_client.check(
user=f"user:{user_id}",
relation=relation,
object=object_id,
)
# 3. 写入缓存(TTL 60秒)
r.setex(cache_key, 60, "1" if result.allowed else "0")
return result.allowed
def invalidate_permission_cache(object_id: str):
"""权限变更时清除相关缓存"""
pattern = f"authz:*:*:{object_id}"
for key in r.scan_iter(pattern):
r.delete(key)
```
### 批量检查
```python
async def batch_check_permissions(user_id: str, objects: list[str], relation: str) -> dict:
"""批量权限检查"""
results = {}
checks = [
openfga_client.check(
user=f"user:{user_id}",
relation=relation,
object=obj,
)
for obj in objects
]
responses = await asyncio.gather(*checks)
for obj, resp in zip(objects, responses):
results[obj] = resp.allowed
return results
```
## 17.8 授权架构实战
### Python:集中式授权服务(FastAPI + OpenFGA)
```python
"""
集中式授权服务 — FastAPI + OpenFGA
依赖: pip install fastapi uvicorn openfga-sdk redis httpx
"""
from contextlib import asynccontextmanager
from datetime import datetime
from typing import Optional
import redis.asyncio as aioredis
from fastapi import FastAPI, HTTPException, Depends, Header
from pydantic import BaseModel
from openfga_sdk import (
ClientConfiguration,
OpenFgaClient,
ClientCheckRequest,
ClientWriteRequest,
ClientTuple,
ClientListObjectsRequest,
)
# ── 配置 ─────────────────────────────────────────────────
OPENFGA_API_URL = "http://localhost:8080"
OPENFGA_STORE_ID = "your-store-id"
REDIS_URL = "redis://localhost:6379/0"
CACHE_TTL = 60 # 秒
# ── 生命周期管理 ─────────────────────────────────────────
fga_client: Optional[OpenFgaClient] = None
redis_client: Optional[aioredis.Redis] = None
@asynccontextmanager
async def lifespan(app: FastAPI):
global fga_client, redis_client
# 启动:初始化 OpenFGA 和 Redis
config = ClientConfiguration(
api_url=OPENFGA_API_URL,
store_id=OPENFGA_STORE_ID,
)
fga_client = OpenFgaClient(config)
redis_client = aioredis.from_url(REDIS_URL, decode_responses=True)
yield
# 关闭
await fga_client.close()
await redis_client.close()
app = FastAPI(title="Centralized Authorization Service", lifespan=lifespan)
# ── 数据模型 ─────────────────────────────────────────────
class CheckRequest(BaseModel):
user: str # e.g. "user:alice"
relation: str # e.g. "can_edit"
object: str # e.g. "document:readme"
class CheckResponse(BaseModel):
allowed: bool
cached: bool
latency_ms: float
class GrantRequest(BaseModel):
user: str
relation: str
object: str
class ListObjectsRequest(BaseModel):
user: str
relation: str
type: str # e.g. "document"
# ── 带缓存的权限检查 ─────────────────────────────────────
async def cached_check(user: str, relation: str, obj: str) -> tuple[bool, bool]:
"""返回 (allowed, cached)"""
cache_key = f"authz:{user}:{relation}:{obj}"
# 1. 查 Redis 缓存
cached = await redis_client.get(cache_key)
if cached is not None:
return cached == "1", True
# 2. 调用 OpenFGA
response = await fga_client.check(ClientCheckRequest(
user=user,
relation=relation,
object=obj,
))
allowed = response.allowed
# 3. 写入缓存
await redis_client.setex(cache_key, CACHE_TTL, "1" if allowed else "0")
return allowed, False
# ── API 端点 ─────────────────────────────────────────────
@app.post("/v1/check", response_model=CheckResponse)
async def check_permission(req: CheckRequest):
"""检查权限(带缓存)"""
start = datetime.now()
allowed, cached = await cached_check(req.user, req.relation, req.object)
latency = (datetime.now() - start).total_seconds() * 1000
return CheckResponse(allowed=allowed, cached=cached, latency_ms=round(latency, 2))
@app.post("/v1/grant")
async def grant_permission(req: GrantRequest):
"""授予权限"""
await fga_client.write(ClientWriteRequest(
writes=[ClientTuple(
user=req.user,
relation=req.relation,
object=req.object,
)],
))
# 清除相关缓存
pattern = f"authz:*:*:{req.object}"
async for key in redis_client.scan_iter(pattern):
await redis_client.delete(key)
return {"status": "granted"}
@app.post("/v1/revoke")
async def revoke_permission(req: GrantRequest):
"""撤销权限"""
await fga_client.write(ClientWriteRequest(
deletes=[ClientTuple(
user=req.user,
relation=req.relation,
object=req.object,
)],
))
# 清除相关缓存
pattern = f"authz:{req.user}:{req.relation}:{req.object}"
async for key in redis_client.scan_iter(pattern):
await redis_client.delete(key)
return {"status": "revoked"}
@app.post("/v1/list-objects")
async def list_objects(req: ListObjectsRequest):
"""列出用户有权访问的所有对象"""
response = await fga_client.list_objects(ClientListObjectsRequest(
user=req.user,
relation=req.relation,
type=req.type,
))
return {"objects": response.objects}
@app.post("/v1/batch-check")
async def batch_check(checks: list[CheckRequest]):
"""批量权限检查"""
import asyncio
results = []
tasks = [cached_check(c.user, c.relation, c.object) for c in checks]
responses = await asyncio.gather(*tasks)
for req, (allowed, cached) in zip(checks, responses):
results.append({
"user": req.user,
"relation": req.relation,
"object": req.object,
"allowed": allowed,
"cached": cached,
})
return {"results": results}
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=9000)
```
### Python:授权决策缓存(Redis 多级 TTL)
```python
"""
多级授权缓存策略 — 本地 LRU + Redis
依赖: pip install redis cachetools
"""
import asyncio
import time
from typing import Optional
import redis.asyncio as aioredis
from cachetools import TTLCache
# ── 多级缓存 ─────────────────────────────────────────────
class AuthzCache:
"""
L1: 进程内 TTLCache(10s,容量 10000)
L2: Redis(60s)
"""
def __init__(self, redis_url: str = "redis://localhost:6379/0"):
self.l1 = TTLCache(maxsize=10000, ttl=10)
self.l2: Optional[aioredis.Redis] = None
self._redis_url = redis_url
self.l2_ttl = 60
# 统计
self.stats = {"l1_hit": 0, "l2_hit": 0, "miss": 0}
async def connect(self):
self.l2 = aioredis.from_url(self._redis_url, decode_responses=True)
async def close(self):
if self.l2:
await self.l2.close()
def _cache_key(self, user: str, relation: str, obj: str) -> str:
return f"authz:{user}:{relation}:{obj}"
async def get(self, user: str, relation: str, obj: str) -> Optional[bool]:
key = self._cache_key(user, relation, obj)
# L1
if key in self.l1:
self.stats["l1_hit"] += 1
return self.l1[key]
# L2
if self.l2:
val = await self.l2.get(key)
if val is not None:
self.stats["l2_hit"] += 1
result = val == "1"
self.l1[key] = result # 回填 L1
return result
self.stats["miss"] += 1
return None
async def set(self, user: str, relation: str, obj: str, allowed: bool):
key = self._cache_key(user, relation, obj)
self.l1[key] = allowed
if self.l2:
await self.l2.setex(key, self.l2_ttl, "1" if allowed else "0")
async def invalidate(self, obj: str):
"""权限变更时清除该对象的所有缓存"""
# 清除 L1 中匹配的 key
keys_to_delete = [k for k in self.l1 if k.endswith(f":{obj}")]
for k in keys_to_delete:
del self.l1[k]
# 清除 L2
if self.l2:
pattern = f"authz:*:*:{obj}"
async for key in self.l2.scan_iter(pattern):
await self.l2.delete(key)
def get_stats(self) -> dict:
total = sum(self.stats.values())
return {
**self.stats,
"total": total,
"l1_hit_rate": f"{self.stats['l1_hit']/max(total,1)*100:.1f}%",
"l2_hit_rate": f"{self.stats['l2_hit']/max(total,1)*100:.1f}%",
}
# ── 使用示例 ─────────────────────────────────────────────
async def main():
cache = AuthzCache()
await cache.connect()
# 模拟权限检查
result = await cache.get("user:alice", "can_edit", "doc:1")
if result is None:
# 缓存未命中,调用 PDP
result = True # 假设 PDP 返回 True
await cache.set("user:alice", "can_edit", "doc:1", result)
print(f"Allowed: {result}")
print(f"Stats: {cache.get_stats()}")
await cache.close()
if __name__ == "__main__":
asyncio.run(main())
```
### Java:Spring Security 自定义 AuthorizationManager
```java
/**
* Spring Security 自定义 AuthorizationManager — 集成 OpenFGA
* 依赖: spring-boot-starter-security, spring-boot-starter-cache,
* com.github.ben-manes.caffeine:caffeine, dev.openfga:openfga-sdk
*/
// ── 1. OpenFGA 授权管理器 ──────────────────────────────
import dev.openfga.sdk.api.client.OpenFgaClient;
import dev.openfga.sdk.api.client.model.ClientCheckRequest;
import org.springframework.security.authorization.AuthorizationDecision;
import org.springframework.security.authorization.AuthorizationManager;
import org.springframework.security.core.Authentication;
import org.springframework.security.web.access.intercept.RequestAuthorizationContext;
import org.springframework.stereotype.Component;
import java.util.function.Supplier;
@Component
public class OpenFgaAuthorizationManager
implements AuthorizationManager {
private final OpenFgaClient fgaClient;
private final AuthzCacheService cacheService;
public OpenFgaAuthorizationManager(OpenFgaClient fgaClient,
AuthzCacheService cacheService) {
this.fgaClient = fgaClient;
this.cacheService = cacheService;
}
@Override
public AuthorizationDecision check(
Supplier authentication,
RequestAuthorizationContext context) {
Authentication auth = authentication.get();
if (auth == null || !auth.isAuthenticated()) {
return new AuthorizationDecision(false);
}
String user = "user:" + auth.getName();
String method = context.getRequest().getMethod();
String path = context.getRequest().getRequestURI();
// 将 HTTP 方法映射为关系
String relation = mapMethodToRelation(method);
String object = mapPathToObject(path);
// 带缓存的权限检查
boolean allowed = cacheService.checkPermission(user, relation, object);
return new AuthorizationDecision(allowed);
}
private String mapMethodToRelation(String method) {
return switch (method) {
case "GET" -> "can_view";
case "POST" -> "can_create";
case "PUT", "PATCH" -> "can_edit";
case "DELETE" -> "can_delete";
default -> "can_view";
};
}
private String mapPathToObject(String path) {
// /api/documents/123 → document:123
String[] parts = path.split("/");
if (parts.length >= 4) {
String type = parts[2].replaceAll("s$", ""); // documents → document
return type + ":" + parts[3];
}
return "resource:" + path;
}
}
// ── 2. Caffeine 缓存服务 ──────────────────────────────
import com.github.benmanes.caffeine.cache.Cache;
import com.github.benmanes.caffeine.cache.Caffeine;
import dev.openfga.sdk.api.client.OpenFgaClient;
import dev.openfga.sdk.api.client.model.ClientCheckRequest;
import org.springframework.stereotype.Service;
import java.time.Duration;
@Service
public class AuthzCacheService {
private final OpenFgaClient fgaClient;
private final Cache cache;
public AuthzCacheService(OpenFgaClient fgaClient) {
this.fgaClient = fgaClient;
this.cache = Caffeine.newBuilder()
.maximumSize(50_000)
.expireAfterWrite(Duration.ofSeconds(30))
.recordStats()
.build();
}
public boolean checkPermission(String user, String relation, String object) {
String key = user + ":" + relation + ":" + object;
Boolean cached = cache.getIfPresent(key);
if (cached != null) {
return cached;
}
try {
var response = fgaClient.check(new ClientCheckRequest()
.user(user)
.relation(relation)
._object(object)
).get();
boolean allowed = response.getAllowed();
cache.put(key, allowed);
return allowed;
} catch (Exception e) {
// 授权服务不可用时,默认拒绝
return false;
}
}
public void invalidate(String object) {
// 清除与该对象相关的所有缓存
cache.asMap().keySet().removeIf(k -> k.endsWith(":" + object));
}
public com.github.benmanes.caffeine.cache.stats.CacheStats getStats() {
return cache.stats();
}
}
// ── 3. Security 配置 ──────────────────────────────────
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.security.config.annotation.web.builders.HttpSecurity;
import org.springframework.security.config.annotation.web.configuration.EnableWebSecurity;
import org.springframework.security.web.SecurityFilterChain;
@Configuration
@EnableWebSecurity
public class SecurityConfig {
private final OpenFgaAuthorizationManager authzManager;
public SecurityConfig(OpenFgaAuthorizationManager authzManager) {
this.authzManager = authzManager;
}
@Bean
public SecurityFilterChain securityFilterChain(HttpSecurity http) throws Exception {
http
.authorizeHttpRequests(auth -> auth
.requestMatchers("/", "/health", "/public/**").permitAll()
.requestMatchers("/api/**").access(authzManager)
.anyRequest().authenticated()
)
.oauth2ResourceServer(oauth2 -> oauth2.jwt(jwt -> {}));
return http.build();
}
}
```
### Java:授权决策缓存(Caffeine Cache + 审计)
```java
/**
* 带审计日志的授权决策缓存
*/
import com.github.benmanes.caffeine.cache.Cache;
import com.github.benmanes.caffeine.cache.Caffeine;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Service;
import java.time.Duration;
import java.time.Instant;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;
@Service
public class AuditedAuthzCache {
private static final Logger auditLog =
LoggerFactory.getLogger("AUTHZ_AUDIT");
private final Cache cache;
public AuditedAuthzCache() {
this.cache = Caffeine.newBuilder()
.maximumSize(100_000)
.expireAfterWrite(Duration.ofSeconds(30))
.expireAfterAccess(Duration.ofSeconds(15))
.recordStats()
.removalListener((key, value, cause) -> {
if (cause.wasEvicted()) {
auditLog.debug("Cache evicted: {} reason={}", key, cause);
}
})
.build();
}
public record AuthzDecision(
boolean allowed,
Instant decidedAt,
String source // "cache" or "pdp"
) {}
public AuthzDecision checkWithAudit(
String user, String relation, String object,
AuthzProvider pdpProvider) {
String key = user + ":" + relation + ":" + object;
AuthzDecision cached = cache.getIfPresent(key);
if (cached != null) {
auditLog.info("AUTHZ_CHECK user={} relation={} object={} "
+ "allowed={} source=cache",
user, relation, object, cached.allowed());
return new AuthzDecision(cached.allowed(), cached.decidedAt(), "cache");
}
// 调用 PDP
boolean allowed = pdpProvider.check(user, relation, object);
var decision = new AuthzDecision(allowed, Instant.now(), "pdp");
cache.put(key, decision);
auditLog.info("AUTHZ_CHECK user={} relation={} object={} "
+ "allowed={} source=pdp",
user, relation, object, allowed);
return decision;
}
/**
* 批量检查 — 先查缓存,缓存未命中的批量调用 PDP
*/
public Map batchCheck(
String user, String relation, List objects,
AuthzProvider pdpProvider) {
Map results = new ConcurrentHashMap<>();
List uncached = objects.stream()
.filter(obj -> {
String key = user + ":" + relation + ":" + obj;
AuthzDecision d = cache.getIfPresent(key);
if (d != null) {
results.put(obj, d.allowed());
return false;
}
return true;
})
.collect(Collectors.toList());
// 批量调用 PDP
if (!uncached.isEmpty()) {
Map pdpResults =
pdpProvider.batchCheck(user, relation, uncached);
pdpResults.forEach((obj, allowed) -> {
String key = user + ":" + relation + ":" + obj;
cache.put(key, new AuthzDecision(allowed, Instant.now(), "pdp"));
results.put(obj, allowed);
});
}
return results;
}
@FunctionalInterface
public interface AuthzProvider {
boolean check(String user, String relation, String object);
default Map batchCheck(
String user, String relation, List objects) {
return objects.stream().collect(Collectors.toMap(
obj -> obj,
obj -> check(user, relation, obj)
));
}
}
public String getStatsReport() {
var stats = cache.stats();
return String.format(
"hits=%d misses=%d hitRate=%.2f%% evictions=%d",
stats.hitCount(), stats.missCount(),
stats.hitRate() * 100, stats.evictionCount()
);
}
}
```
### Go:授权中间件链(认证 → 授权 → 审计)
```go
/*
* Go 授权中间件链 — 认证 → 授权 → 审计
* 依赖:
* go get github.com/golang-jwt/jwt/v5
* go get github.com/openfga/go-sdk
* go get github.com/patrickmn/go-cache
*/
package main
import (
"context"
"encoding/json"
"fmt"
"log"
"net/http"
"strings"
"time"
"github.com/golang-jwt/jwt/v5"
openfga "github.com/openfga/go-sdk/client"
gocache "github.com/patrickmn/go-cache"
)
// ── 上下文 Key ──────────────────────────────────────────
type contextKey string
const (
userContextKey contextKey = "user"
auditContextKey contextKey = "audit"
)
// ── 用户信息 ────────────────────────────────────────────
type UserInfo struct {
ID string `json:"sub"`
Name string `json:"name"`
Email string `json:"email"`
Roles []string `json:"roles"`
}
// ── 审计记录 ────────────────────────────────────────────
type AuditEntry struct {
Timestamp time.Time `json:"timestamp"`
UserID string `json:"user_id"`
Action string `json:"action"`
Resource string `json:"resource"`
Allowed bool `json:"allowed"`
LatencyMs float64 `json:"latency_ms"`
CacheHit bool `json:"cache_hit"`
IP string `json:"ip"`
UserAgent string `json:"user_agent"`
}
// ── 中间件:认证 ────────────────────────────────────────
func AuthenticationMiddleware(jwtSecret []byte) func(http.Handler) http.Handler {
return func(next http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
authHeader := r.Header.Get("Authorization")
if !strings.HasPrefix(authHeader, "Bearer ") {
http.Error(w, `{"error":"missing token"}`, http.StatusUnauthorized)
return
}
tokenStr := strings.TrimPrefix(authHeader, "Bearer ")
token, err := jwt.Parse(tokenStr, func(t *jwt.Token) (interface{}, error) {
if _, ok := t.Method.(*jwt.SigningMethodHMAC); !ok {
return nil, fmt.Errorf("unexpected signing method: %v",
t.Header["alg"])
}
return jwtSecret, nil
})
if err != nil || !token.Valid {
http.Error(w, `{"error":"invalid token"}`, http.StatusUnauthorized)
return
}
claims, ok := token.Claims.(jwt.MapClaims)
if !ok {
http.Error(w, `{"error":"invalid claims"}`, http.StatusUnauthorized)
return
}
user := &UserInfo{
ID: claims["sub"].(string),
Name: getStringClaim(claims, "name"),
Email: getStringClaim(claims, "email"),
}
// 提取角色
if roles, ok := claims["roles"].([]interface{}); ok {
for _, r := range roles {
if s, ok := r.(string); ok {
user.Roles = append(user.Roles, s)
}
}
}
ctx := context.WithValue(r.Context(), userContextKey, user)
next.ServeHTTP(w, r.WithContext(ctx))
})
}
}
func getStringClaim(claims jwt.MapClaims, key string) string {
if v, ok := claims[key].(string); ok {
return v
}
return ""
}
// ── 中间件:授权(OpenFGA + go-cache) ──────────────────
type AuthzMiddleware struct {
fgaClient *openfga.OpenFgaClient
cache *gocache.Cache
}
func NewAuthzMiddleware(fgaClient *openfga.OpenFgaClient) *AuthzMiddleware {
return &AuthzMiddleware{
fgaClient: fgaClient,
// 默认过期 30s,清理间隔 60s
cache: gocache.New(30*time.Second, 60*time.Second),
}
}
func (am *AuthzMiddleware) Middleware(next http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
user, ok := r.Context().Value(userContextKey).(*UserInfo)
if !ok || user == nil {
http.Error(w, `{"error":"unauthenticated"}`, http.StatusUnauthorized)
return
}
relation := mapHTTPMethodToRelation(r.Method)
object := mapPathToObject(r.URL.Path)
fgaUser := "user:" + user.ID
start := time.Now()
allowed, cacheHit := am.checkWithCache(r.Context(), fgaUser, relation, object)
latency := time.Since(start)
// 将审计信息存入上下文
audit := &AuditEntry{
Timestamp: time.Now(),
UserID: user.ID,
Action: relation,
Resource: object,
Allowed: allowed,
LatencyMs: float64(latency.Microseconds()) / 1000.0,
CacheHit: cacheHit,
IP: r.RemoteAddr,
UserAgent: r.UserAgent(),
}
ctx := context.WithValue(r.Context(), auditContextKey, audit)
if !allowed {
http.Error(w, `{"error":"forbidden"}`, http.StatusForbidden)
return
}
next.ServeHTTP(w, r.WithContext(ctx))
})
}
func (am *AuthzMiddleware) checkWithCache(
ctx context.Context, user, relation, object string,
) (bool, bool) {
cacheKey := user + ":" + relation + ":" + object
// 查缓存
if val, found := am.cache.Get(cacheKey); found {
return val.(bool), true
}
// 调用 OpenFGA
body := openfga.ClientCheckRequest{
User: user,
Relation: relation,
Object: object,
}
resp, err := am.fgaClient.Check(ctx).Body(body).Execute()
if err != nil {
log.Printf("OpenFGA check error: %v", err)
return false, false // 默认拒绝
}
allowed := resp.GetAllowed()
am.cache.Set(cacheKey, allowed, gocache.DefaultExpiration)
return allowed, false
}
func mapHTTPMethodToRelation(method string) string {
switch method {
case "GET":
return "can_view"
case "POST":
return "can_create"
case "PUT", "PATCH":
return "can_edit"
case "DELETE":
return "can_delete"
default:
return "can_view"
}
}
func mapPathToObject(path string) string {
parts := strings.Split(strings.Trim(path, "/"), "/")
if len(parts) >= 3 {
// /api/documents/123 → document:123
resourceType := strings.TrimSuffix(parts[1], "s")
return resourceType + ":" + parts[2]
}
return "resource:" + path
}
// ── 中间件:审计日志 ────────────────────────────────────
func AuditMiddleware(next http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
next.ServeHTTP(w, r)
// 从上下文获取审计记录
audit, ok := r.Context().Value(auditContextKey).(*AuditEntry)
if !ok {
return
}
// 输出结构化审计日志
auditJSON, _ := json.Marshal(audit)
log.Printf("AUDIT: %s", string(auditJSON))
// 生产环境中可发送到 Kafka / Elasticsearch
})
}
// ── 业务处理器 ──────────────────────────────────────────
func handleGetDocument(w http.ResponseWriter, r *http.Request) {
user := r.Context().Value(userContextKey).(*UserInfo)
w.Header().Set("Content-Type", "application/json")
json.NewEncoder(w).Encode(map[string]interface{}{
"message": "Document content",
"user": user.Name,
"resource": r.URL.Path,
})
}
// ── 主函数:组装中间件链 ────────────────────────────────
func main() {
jwtSecret := []byte("your-256-bit-secret")
// 初始化 OpenFGA Client
fgaClient, err := openfga.NewSdkClient(&openfga.ClientConfiguration{
ApiUrl: "http://localhost:8080",
StoreId: "your-store-id",
})
if err != nil {
log.Fatalf("Failed to create OpenFGA client: %v", err)
}
authzMW := NewAuthzMiddleware(fgaClient)
// 组装中间件链:审计 → 认证 → 授权 → 业务处理
handler := http.HandlerFunc(handleGetDocument)
chain := AuditMiddleware(
AuthenticationMiddleware(jwtSecret)(
authzMW.Middleware(handler),
),
)
http.Handle("/api/", chain)
// 健康检查(无需认证)
http.HandleFunc("/health", func(w http.ResponseWriter, r *http.Request) {
w.Write([]byte(`{"status":"ok"}`))
})
log.Println("Authorization middleware demo listening on :8080")
log.Fatal(http.ListenAndServe(":8080", nil))
}
```
### Go:授权决策缓存(go-cache 多级策略)
```go
/*
* Go 多级授权缓存 — go-cache (L1) + Redis (L2)
* 依赖:
* go get github.com/patrickmn/go-cache
* go get github.com/redis/go-redis/v9
*/
package authzcache
import (
"context"
"fmt"
"sync/atomic"
"time"
"github.com/patrickmn/go-cache"
"github.com/redis/go-redis/v9"
)
// ── 缓存统计 ────────────────────────────────────────────
type CacheStats struct {
L1Hits int64 `json:"l1_hits"`
L2Hits int64 `json:"l2_hits"`
Misses int64 `json:"misses"`
Total int64 `json:"total"`
L1Rate string `json:"l1_hit_rate"`
L2Rate string `json:"l2_hit_rate"`
}
// ── 多级授权缓存 ────────────────────────────────────────
type MultiLevelAuthzCache struct {
l1 *cache.Cache
l2 *redis.Client
l1TTL time.Duration
l2TTL time.Duration
stats struct {
l1Hits atomic.Int64
l2Hits atomic.Int64
misses atomic.Int64
}
}
func NewMultiLevelAuthzCache(redisAddr string) *MultiLevelAuthzCache {
return &MultiLevelAuthzCache{
l1: cache.New(10*time.Second, 30*time.Second),
l2: redis.NewClient(&redis.Options{Addr: redisAddr}),
l1TTL: 10 * time.Second,
l2TTL: 60 * time.Second,
}
}
func (c *MultiLevelAuthzCache) cacheKey(user, relation, object string) string {
return fmt.Sprintf("authz:%s:%s:%s", user, relation, object)
}
// Check 查询缓存,返回 (allowed, found)
func (c *MultiLevelAuthzCache) Check(
ctx context.Context, user, relation, object string,
) (bool, bool) {
key := c.cacheKey(user, relation, object)
// L1: go-cache
if val, found := c.l1.Get(key); found {
c.stats.l1Hits.Add(1)
return val.(bool), true
}
// L2: Redis
val, err := c.l2.Get(ctx, key).Result()
if err == nil {
c.stats.l2Hits.Add(1)
allowed := val == "1"
c.l1.Set(key, allowed, c.l1TTL) // 回填 L1
return allowed, true
}
c.stats.misses.Add(1)
return false, false
}
// Set 写入两级缓存
func (c *MultiLevelAuthzCache) Set(
ctx context.Context, user, relation, object string, allowed bool,
) {
key := c.cacheKey(user, relation, object)
c.l1.Set(key, allowed, c.l1TTL)
val := "0"
if allowed {
val = "1"
}
c.l2.Set(ctx, key, val, c.l2TTL)
}
// Invalidate 清除指定对象的所有缓存
func (c *MultiLevelAuthzCache) Invalidate(ctx context.Context, object string) {
// 清除 L1
for k := range c.l1.Items() {
if len(k) > len(object) && k[len(k)-len(object)-1:] == ":"+object {
c.l1.Delete(k)
}
}
// 清除 L2
pattern := fmt.Sprintf("authz:*:*:%s", object)
iter := c.l2.Scan(ctx, 0, pattern, 100).Iterator()
for iter.Next(ctx) {
c.l2.Del(ctx, iter.Val())
}
}
// Stats 返回缓存统计
func (c *MultiLevelAuthzCache) Stats() CacheStats {
l1 := c.stats.l1Hits.Load()
l2 := c.stats.l2Hits.Load()
miss := c.stats.misses.Load()
total := l1 + l2 + miss
l1Rate := "0.0%"
l2Rate := "0.0%"
if total > 0 {
l1Rate = fmt.Sprintf("%.1f%%", float64(l1)/float64(total)*100)
l2Rate = fmt.Sprintf("%.1f%%", float64(l2)/float64(total)*100)
}
return CacheStats{
L1Hits: l1,
L2Hits: l2,
Misses: miss,
Total: total,
L1Rate: l1Rate,
L2Rate: l2Rate,
}
}
// Close 关闭连接
func (c *MultiLevelAuthzCache) Close() error {
return c.l2.Close()
}
```
## 17.9 授权性能基准测试
以下为典型授权方案在不同场景下的性能基准(参考值,实际取决于硬件和网络):
| 方案 | 单次检查延迟 (p50) | 单次检查延迟 (p99) | 吞吐量 (QPS) | 备注 |
|------|-------------------|-------------------|-------------|------|
| 应用内 RBAC(内存) | < 0.01ms | < 0.05ms | > 500,000 | 最快,但灵活性低 |
| OPA Sidecar(本地) | 0.1 - 0.5ms | 1 - 2ms | 50,000 - 100,000 | Rego 策略复杂度影响性能 |
| OpenFGA(远程调用) | 2 - 5ms | 10 - 20ms | 5,000 - 20,000 | 取决于关系图深度 |
| OpenFGA + Redis 缓存 | 0.5 - 1ms | 2 - 5ms | 30,000 - 80,000 | 缓存命中率 > 90% 时 |
| OpenFGA + 本地缓存 | < 0.1ms | 0.5ms | > 200,000 | L1 命中时接近内存速度 |
| Cedar(本地) | 0.05 - 0.2ms | 0.5 - 1ms | 100,000+ | AWS 开源,Rust 实现 |
### 缓存命中率与延迟关系
| 缓存命中率 | 平均延迟 | 适用场景 |
|-----------|---------|---------|
| 0%(无缓存) | 3 - 10ms | 权限频繁变更的场景 |
| 50% | 2 - 5ms | 一般业务场景 |
| 80% | 0.5 - 2ms | 读多写少的场景 |
| 95%+ | < 0.5ms | 权限稳定的生产环境 |
### 优化建议
| 优化手段 | 效果 | 复杂度 |
|---------|------|--------|
| 本地 LRU 缓存(L1) | 延迟降低 10-100x | 低 |
| Redis 分布式缓存(L2) | 跨实例共享,减少 PDP 调用 | 中 |
| 批量检查 API | 减少网络往返 | 低 |
| 预热缓存(登录时) | 首次访问无延迟 | 中 |
| 策略简化 | 减少 PDP 计算时间 | 中 |
| 连接池 + Keep-Alive | 减少连接建立开销 | 低 |
## 17.10 授权架构演进路线
### 单体 → 微服务 → Service Mesh
```{mermaid}
flowchart LR
subgraph Phase1["阶段 1:单体应用"]
direction TB
M1["单体应用"]
M2["内置 RBAC
if/else 或注解"]
M1 --- M2
end
subgraph Phase2["阶段 2:微服务"]
direction TB
M3["API Gateway
粗粒度授权"]
M4["各服务
调用集中授权服务"]
M3 --> M4
end
subgraph Phase3["阶段 3:Service Mesh"]
direction TB
M5["Istio/Envoy
AuthorizationPolicy"]
M6["OPA Sidecar
细粒度策略"]
M7["OpenFGA
关系型权限"]
M5 --> M6
M6 --> M7
end
Phase1 -->|"拆分服务"| Phase2
Phase2 -->|"引入 Mesh"| Phase3
style Phase1 fill:#ffccbc,stroke:#bf360c
style Phase2 fill:#fff9c4,stroke:#f9a825
style Phase3 fill:#c8e6c9,stroke:#2e7d32
```
### 各阶段详细对比
| 维度 | 单体 RBAC | 微服务 + 集中授权 | Service Mesh + 外部化 |
|------|----------|-----------------|---------------------|
| 授权粒度 | 粗(角色级) | 中(API 级) | 细(资源级) |
| 策略管理 | 代码内 | 集中服务 | 策略即代码(Git) |
| 部署耦合 | 强(改权限需发版) | 弱 | 无(策略热更新) |
| 性能 | 极快(内存) | 中(网络调用) | 快(Sidecar 本地) |
| 审计 | 手动 | 集中日志 | 自动(Mesh 遥测) |
| 适用规模 | < 5 人团队 | 5 - 50 人 | 50+ 人 / 大规模微服务 |
### 迁移策略
```{mermaid}
flowchart TB
Start["现状评估"] --> Q1{"当前架构?"}
Q1 -->|单体| Step1["1. 抽取授权逻辑为独立模块"]
Step1 --> Step2["2. 引入策略引擎(OPA/Casbin)"]
Step2 --> Step3["3. 拆分为授权微服务"]
Q1 -->|微服务| Step3
Step3 --> Step4["4. 添加 API Gateway 粗粒度授权"]
Step4 --> Step5["5. 引入 OpenFGA 关系型权限"]
Step5 --> Step6["6. 部署 OPA Sidecar"]
Q1 -->|Service Mesh| Step6
Step6 --> Step7["7. 配置 AuthorizationPolicy"]
Step7 --> Done["✅ 完整的外部化授权"]
style Start fill:#e3f2fd,stroke:#1565c0
style Done fill:#c8e6c9,stroke:#2e7d32
```
## 17.11 实战:SaaS 平台授权架构
```{mermaid}
flowchart TB
subgraph Clients["客户端"]
Web["Web/Mobile"]
end
subgraph Gateway["API Gateway (Kong/Envoy)"]
JWT["JWT 验证"]
Tenant["租户路由"]
Rate["速率限制"]
JWT --> Tenant --> Rate
end
subgraph Services["微服务集群"]
US["用户服务"]
PS["项目服务"]
RS["资源服务"]
US --> PS --> RS
end
subgraph AuthzLayer["授权服务层"]
FGA["OpenFGA
(ReBAC 关系权限)"]
OPA["OPA
(ABAC 条件策略)"]
end
Web --> Gateway
Gateway --> Services
Services --> AuthzLayer
style Gateway fill:#e3f2fd,stroke:#1565c0
style AuthzLayer fill:#c8e6c9,stroke:#2e7d32
style Services fill:#fff3e0,stroke:#e65100
```
## 17.12 小结
- 授权架构从**硬编码**演进到**外部化授权即服务**
- **XACML 架构**(PEP/PDP/PIP/PAP)是授权系统的标准参考架构
- **集中式**适合一致性要求高的场景,**分布式**适合低延迟要求,**混合模式**最实用
- **多租户授权**需要数据隔离、权限隔离、管理隔离
- **多级缓存**(本地 L1 + Redis L2)是授权性能优化的关键手段
- **授权架构演进**:单体 RBAC → 微服务集中授权 → Service Mesh 外部化
- OpenFGA(ReBAC)+ OPA(ABAC)的组合覆盖大多数授权需求