# 第十五章:OPA — 通用策略引擎
> "策略应该与代码分离,就像数据应该与逻辑分离一样。"
```{mermaid}
mindmap
root((OPA))
核心概念
Policy
Data
Query
Decision
Rego 语言
规则
函数
集合操作
测试
部署模式
Sidecar
独立服务
库模式
WASM
应用场景
K8s Admission
API Gateway
微服务授权
Terraform
```
## 15.1 OPA 是什么
OPA(Open Policy Agent)是 CNCF 毕业项目,提供通用的策略引擎。它将策略决策从应用代码中解耦出来。
```{mermaid}
flowchart LR
subgraph 应用层
A1[Web 应用]
A2[API Gateway]
A3[微服务]
A4[CI/CD Pipeline]
end
subgraph OPA
direction TB
RE[Rego 评估引擎]
P[策略
Rego Files]
D[数据
JSON]
P --> RE
D --> RE
end
subgraph 策略来源
B[Bundle Server]
G[Git Repository]
end
A1 -->|"POST /v1/data/authz"| RE
A2 -->|查询决策| RE
A3 -->|查询决策| RE
A4 -->|合规检查| RE
RE -->|"{ allow: true }"| A1
B -->|定期拉取| P
B -->|定期拉取| D
G -.->|CI/CD 构建| B
```
### OPA 决策流程
```{mermaid}
sequenceDiagram
participant App as 应用
participant OPA as OPA Server
participant Bundle as Bundle Server
Note over OPA,Bundle: 启动时加载策略
OPA->>Bundle: GET /bundles/authz.tar.gz
Bundle-->>OPA: 策略 + 数据
App->>OPA: POST /v1/data/authz/allow
{"input": {"user": "alice", "action": "read"}}
OPA->>OPA: 评估 Rego 策略
OPA->>OPA: 合并 input + data
OPA-->>App: {"result": true}
Note over OPA,Bundle: 定期轮询更新
OPA->>Bundle: GET /bundles/authz.tar.gz
If-None-Match: etag
Bundle-->>OPA: 304 Not Modified / 新版本
```
### OPA 部署模式对比
```{mermaid}
flowchart TB
subgraph Sidecar["Sidecar 模式"]
direction LR
subgraph Pod1["Kubernetes Pod"]
APP1[应用容器] -->|localhost| OPA1[OPA Sidecar]
end
end
subgraph Library["Library 模式(Go 嵌入)"]
direction LR
subgraph Process["应用进程"]
APP2[应用代码] -->|函数调用| OPA2[OPA 库]
end
end
subgraph Central["Central 模式"]
direction LR
APP3[应用 A] -->|HTTP/gRPC| OPAC[OPA 集群]
APP4[应用 B] -->|HTTP/gRPC| OPAC
APP5[应用 C] -->|HTTP/gRPC| OPAC
end
style Sidecar fill:#e8f5e9
style Library fill:#e1f5fe
style Central fill:#fff3e0
```
## 15.2 Rego 语言
Rego 是 OPA 的策略语言,声明式风格:
### 基本规则
```rego
package authz
import rego.v1
# 默认拒绝
default allow := false
# 管理员允许所有操作
allow if {
input.user.role == "admin"
}
# 用户可以读取自己的数据
allow if {
input.action == "read"
input.resource.owner == input.user.id
}
# 编辑者可以编辑非机密文档
allow if {
input.action == "edit"
input.user.role == "editor"
input.resource.classification != "confidential"
}
```
### 输入和查询
```json
// 输入(input)
{
"user": {
"id": "alice",
"role": "editor",
"department": "engineering"
},
"action": "edit",
"resource": {
"type": "document",
"id": "doc-123",
"owner": "bob",
"classification": "internal"
}
}
// 查询:data.authz.allow → true
```
### 高级特性
```rego
package authz
import rego.v1
# 集合推导:获取用户的所有权限
user_permissions contains perm if {
some role in input.user.roles
some perm in data.role_permissions[role]
}
# 函数
is_owner(resource) if {
resource.owner == input.user.id
}
# 条件组合
allow if {
input.action == "delete"
is_owner(input.resource)
input.resource.status != "archived"
}
# 基于时间的策略
allow if {
input.action == "access"
time.weekday(time.now_ns()) in ["Monday", "Tuesday", "Wednesday", "Thursday", "Friday"]
hour := time.clock(time.now_ns())[0]
hour >= 9
hour < 18
}
```
### RBAC 策略
```rego
package rbac
import rego.v1
# 角色-权限映射(外部数据)
# data.json:
# {
# "role_permissions": {
# "admin": ["read", "write", "delete", "admin"],
# "editor": ["read", "write"],
# "viewer": ["read"]
# },
# "role_hierarchy": {
# "admin": ["editor"],
# "editor": ["viewer"]
# }
# }
default allow := false
# 直接权限检查
allow if {
some role in input.user.roles
some perm in data.role_permissions[role]
perm == input.action
}
# 通过角色层次继承权限
allow if {
some role in input.user.roles
some inherited_role in data.role_hierarchy[role]
some perm in data.role_permissions[inherited_role]
perm == input.action
}
# 获取用户的有效权限集合(含继承)
effective_permissions contains perm if {
some role in input.user.roles
some perm in data.role_permissions[role]
}
effective_permissions contains perm if {
some role in input.user.roles
some inherited in data.role_hierarchy[role]
some perm in data.role_permissions[inherited]
}
```
### ABAC 策略(基于时间、IP、部门)
```rego
package abac
import rego.v1
import data.ip_allowlist
import data.department_resources
default allow := false
# 规则 1:仅在工作时间允许访问敏感资源
allow if {
input.resource.classification == "sensitive"
is_business_hours
is_allowed_ip
is_same_department
}
# 规则 2:非敏感资源仅需认证
allow if {
input.resource.classification != "sensitive"
input.user.authenticated == true
}
# 规则 3:VPN 用户可以在非工作时间访问
allow if {
input.network.vpn == true
input.user.authenticated == true
is_same_department
}
# ── 辅助规则 ─────────────────────────────────────────────────
is_business_hours if {
day := time.weekday(time.now_ns())
day in ["Monday", "Tuesday", "Wednesday", "Thursday", "Friday"]
hour := time.clock(time.now_ns())[0]
hour >= 9
hour < 18
}
is_allowed_ip if {
net.cidr_contains(ip_allowlist[_], input.network.source_ip)
}
is_same_department if {
input.user.department == input.resource.department
}
# 返回拒绝原因(便于调试)
reasons contains "outside business hours" if {
input.resource.classification == "sensitive"
not is_business_hours
}
reasons contains "IP not in allowlist" if {
not is_allowed_ip
}
reasons contains "department mismatch" if {
not is_same_department
}
```
### API 路由授权策略
```rego
package api_authz
import rego.v1
default allow := false
# 公开端点
allow if {
is_public_endpoint
}
# 已认证用户的 API 访问
allow if {
input.token.valid
is_allowed_route
}
# ── 公开端点列表 ─────────────────────────────────────────────
is_public_endpoint if {
input.path in ["/health", "/metrics", "/docs", "/openapi.json"]
}
is_public_endpoint if {
input.method == "POST"
input.path in ["/auth/login", "/auth/register"]
}
# ── 路由权限矩阵 ────────────────────────────────────────────
route_permissions := {
"GET /api/v1/users": {"roles": ["admin", "viewer"]},
"POST /api/v1/users": {"roles": ["admin"]},
"GET /api/v1/documents": {"roles": ["admin", "editor", "viewer"]},
"POST /api/v1/documents": {"roles": ["admin", "editor"]},
"DELETE /api/v1/documents": {"roles": ["admin"]},
}
is_allowed_route if {
route_key := sprintf("%s %s", [input.method, input.path])
required := route_permissions[route_key]
some role in input.token.roles
role in required.roles
}
# 通配符路由匹配(/api/v1/documents/*)
is_allowed_route if {
input.method == "GET"
glob.match("/api/v1/documents/*", ["/"], input.path)
some role in input.token.roles
role in ["admin", "editor", "viewer"]
}
is_allowed_route if {
input.method in ["PUT", "PATCH"]
glob.match("/api/v1/documents/*", ["/"], input.path)
some role in input.token.roles
role in ["admin", "editor"]
}
# ── 速率限制 ─────────────────────────────────────────────────
rate_limit := 1000 if {
some role in input.token.roles
role == "premium"
} else := 100
```
### Rego 测试
```rego
package authz_test
import rego.v1
import data.authz
test_admin_allowed if {
authz.allow with input as {
"user": {"role": "admin"},
"action": "delete",
"resource": {"id": "doc-1"}
}
}
test_viewer_cannot_delete if {
not authz.allow with input as {
"user": {"role": "viewer"},
"action": "delete",
"resource": {"id": "doc-1"}
}
}
test_owner_can_read if {
authz.allow with input as {
"user": {"id": "alice", "role": "viewer"},
"action": "read",
"resource": {"owner": "alice"}
}
}
```
```bash
# 运行测试
opa test . -v
```
## 15.3 OPA 策略测试最佳实践
### 目录结构
```
policies/
├── authz.rego # 主策略
├── authz_test.rego # 主策略测试
├── rbac.rego # RBAC 策略
├── rbac_test.rego # RBAC 测试
├── abac.rego # ABAC 策略
├── abac_test.rego # ABAC 测试
├── data.json # 外部数据
└── Makefile
```
### 完整测试示例
```rego
package rbac_test
import rego.v1
import data.rbac
# ── 测试数据 ─────────────────────────────────────────────────
mock_role_permissions := {
"admin": ["read", "write", "delete"],
"editor": ["read", "write"],
"viewer": ["read"],
}
mock_role_hierarchy := {
"admin": ["editor"],
"editor": ["viewer"],
}
# ── 正向测试 ─────────────────────────────────────────────────
test_admin_can_delete if {
rbac.allow with input as {
"user": {"roles": ["admin"]},
"action": "delete",
} with data.role_permissions as mock_role_permissions
with data.role_hierarchy as mock_role_hierarchy
}
test_editor_can_write if {
rbac.allow with input as {
"user": {"roles": ["editor"]},
"action": "write",
} with data.role_permissions as mock_role_permissions
with data.role_hierarchy as mock_role_hierarchy
}
# 测试角色继承:editor 继承 viewer 的 read 权限
test_editor_inherits_read if {
rbac.allow with input as {
"user": {"roles": ["editor"]},
"action": "read",
} with data.role_permissions as mock_role_permissions
with data.role_hierarchy as mock_role_hierarchy
}
# ── 反向测试 ─────────────────────────────────────────────────
test_viewer_cannot_write if {
not rbac.allow with input as {
"user": {"roles": ["viewer"]},
"action": "write",
} with data.role_permissions as mock_role_permissions
with data.role_hierarchy as mock_role_hierarchy
}
test_viewer_cannot_delete if {
not rbac.allow with input as {
"user": {"roles": ["viewer"]},
"action": "delete",
} with data.role_permissions as mock_role_permissions
with data.role_hierarchy as mock_role_hierarchy
}
test_no_roles_denied if {
not rbac.allow with input as {
"user": {"roles": []},
"action": "read",
} with data.role_permissions as mock_role_permissions
with data.role_hierarchy as mock_role_hierarchy
}
# ── 有效权限集合测试 ─────────────────────────────────────────
test_admin_effective_permissions if {
perms := rbac.effective_permissions with input as {
"user": {"roles": ["admin"]},
} with data.role_permissions as mock_role_permissions
with data.role_hierarchy as mock_role_hierarchy
perms == {"read", "write", "delete"}
}
```
```bash
# 运行所有测试(详细输出)
opa test . -v
# 运行测试并生成覆盖率报告
opa test . --coverage --format=json
# 仅运行匹配的测试
opa test . -v --run "test_admin"
# 性能基准测试
opa bench 'data.authz.allow' -i input.json -d policies/
```
```makefile
# Makefile
.PHONY: test lint fmt check
test:
opa test policies/ -v
coverage:
opa test policies/ --coverage --format=json | jq '.coverage'
lint:
opa check policies/ --strict
fmt:
opa fmt policies/ --write
check: fmt lint test
@echo "All checks passed!"
```
## 15.4 OPA 部署模式
| 模式 | 描述 | 延迟 | 适用场景 |
|------|------|------|---------|
| Sidecar | 与应用同 Pod | 极低(本地) | Kubernetes 微服务 |
| 独立服务 | 独立部署的 OPA 服务 | 低(网络) | 集中策略管理 |
| 库模式 | 嵌入应用进程 | 极低 | Go 应用 |
| WASM | 编译为 WebAssembly | 极低 | 边缘计算、浏览器 |
## 15.5 应用场景
### Kubernetes Admission Control(Gatekeeper)
```yaml
# 禁止使用 latest 标签的镜像
apiVersion: constraints.gatekeeper.sh/v1beta1
kind: K8sDisallowedTags
metadata:
name: no-latest-tag
spec:
match:
kinds:
- apiGroups: [""]
kinds: ["Pod"]
parameters:
tags: ["latest"]
```
### API Gateway 授权
```rego
package gateway
import rego.v1
default allow := false
# 公开端点无需认证
allow if {
input.path in ["/health", "/metrics", "/docs"]
}
# 已认证用户可以访问 API
allow if {
input.token.valid
input.method == "GET"
startswith(input.path, "/api/v1/")
}
# 管理员可以访问管理端点
allow if {
input.token.valid
"admin" in input.token.roles
startswith(input.path, "/admin/")
}
# 速率限制
rate_limit := limit if {
"premium" in input.token.roles
limit := 1000
} else := limit if {
limit := 100
}
```
### Terraform 策略检查(Conftest)
```rego
package terraform
import rego.v1
# 禁止公开的 S3 存储桶
deny contains msg if {
some resource in input.resource_changes
resource.type == "aws_s3_bucket"
resource.change.after.acl == "public-read"
msg := sprintf("S3 bucket %s must not be public", [resource.address])
}
# 要求所有资源有标签
deny contains msg if {
some resource in input.resource_changes
not resource.change.after.tags.Environment
msg := sprintf("Resource %s must have Environment tag", [resource.address])
}
```
## 15.6 Python 集成
### OPA REST API 客户端
```python
"""OPA REST API 客户端 — 支持策略查询、数据管理、健康检查"""
import httpx
from typing import Any, Optional
from dataclasses import dataclass
@dataclass
class OPADecision:
result: Any
decision_id: Optional[str] = None
class OPAClient:
"""OPA REST API 客户端"""
def __init__(self, base_url: str = "http://localhost:8181", timeout: float = 5.0):
self.base_url = base_url.rstrip("/")
self.timeout = timeout
self._client = httpx.AsyncClient(
base_url=self.base_url,
timeout=timeout,
headers={"Content-Type": "application/json"},
)
async def close(self):
await self._client.aclose()
async def __aenter__(self):
return self
async def __aexit__(self, *args):
await self.close()
async def check(self, policy_path: str, input_data: dict) -> OPADecision:
"""查询策略决策
Args:
policy_path: 策略路径,如 "authz/allow"
input_data: 输入数据
"""
url = f"/v1/data/{policy_path}"
response = await self._client.post(url, json={"input": input_data})
response.raise_for_status()
body = response.json()
return OPADecision(
result=body.get("result"),
decision_id=body.get("decision_id"),
)
async def query(self, query: str, input_data: Optional[dict] = None) -> Any:
"""执行任意 Rego 查询"""
payload: dict = {"query": query}
if input_data:
payload["input"] = input_data
response = await self._client.post("/v1/query", json=payload)
response.raise_for_status()
return response.json().get("result")
async def put_data(self, path: str, data: Any) -> None:
"""上传外部数据"""
response = await self._client.put(f"/v1/data/{path}", json=data)
response.raise_for_status()
async def get_data(self, path: str) -> Any:
"""获取外部数据"""
response = await self._client.get(f"/v1/data/{path}")
response.raise_for_status()
return response.json().get("result")
async def put_policy(self, policy_id: str, rego_code: str) -> None:
"""上传策略"""
response = await self._client.put(
f"/v1/policies/{policy_id}",
content=rego_code.encode(),
headers={"Content-Type": "text/plain"},
)
response.raise_for_status()
async def health(self) -> bool:
"""健康检查"""
try:
response = await self._client.get("/health")
return response.status_code == 200
except Exception:
return False
# ── 使用示例 ─────────────────────────────────────────────────
async def demo():
async with OPAClient("http://localhost:8181") as opa:
# 健康检查
healthy = await opa.health()
print(f"OPA healthy: {healthy}")
# 上传角色权限数据
await opa.put_data("role_permissions", {
"admin": ["read", "write", "delete"],
"editor": ["read", "write"],
"viewer": ["read"],
})
# 上传策略
await opa.put_policy("authz", """
package authz
import rego.v1
default allow := false
allow if {
some role in input.user.roles
some perm in data.role_permissions[role]
perm == input.action
}
""")
# 查询决策
decision = await opa.check("authz/allow", {
"user": {"roles": ["editor"]},
"action": "write",
})
print(f"Editor can write: {decision.result}") # True
decision = await opa.check("authz/allow", {
"user": {"roles": ["viewer"]},
"action": "delete",
})
print(f"Viewer can delete: {decision.result}") # False
```
### FastAPI + OPA 授权中间件
```python
"""FastAPI + OPA 授权中间件完整示例"""
from contextlib import asynccontextmanager
from typing import Optional
import httpx
from fastapi import FastAPI, Depends, HTTPException, Request
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
# ── OPA 客户端 ───────────────────────────────────────────────
OPA_URL = "http://localhost:8181"
opa_http: Optional[httpx.AsyncClient] = None
@asynccontextmanager
async def lifespan(app: FastAPI):
global opa_http
opa_http = httpx.AsyncClient(base_url=OPA_URL, timeout=5.0)
yield
await opa_http.aclose()
app = FastAPI(title="OPA Demo", lifespan=lifespan)
security = HTTPBearer()
# ── 用户解析 ─────────────────────────────────────────────────
async def get_current_user(
credentials: HTTPAuthorizationCredentials = Depends(security),
) -> dict:
# 实际项目中应验证 JWT
token = credentials.credentials
return {"id": token, "roles": ["editor"], "department": "engineering"}
# ── OPA 授权依赖 ─────────────────────────────────────────────
class OPAAuthorize:
"""可复用的 OPA 授权依赖"""
def __init__(self, policy_path: str = "authz/allow"):
self.policy_path = policy_path
async def __call__(
self,
request: Request,
user: dict = Depends(get_current_user),
) -> dict:
opa_input = {
"input": {
"user": user,
"method": request.method,
"path": request.url.path,
"headers": dict(request.headers),
}
}
response = await opa_http.post(
f"/v1/data/{self.policy_path}",
json=opa_input,
)
if response.status_code != 200:
raise HTTPException(status_code=503, detail="Policy engine unavailable")
result = response.json().get("result", False)
if not result:
raise HTTPException(status_code=403, detail="Denied by policy")
return user
# ── 路由 ─────────────────────────────────────────────────────
authorize = OPAAuthorize("api_authz/allow")
@app.get("/api/v1/documents")
async def list_documents(user: dict = Depends(authorize)):
return {"documents": [...], "user": user["id"]}
@app.post("/api/v1/documents")
async def create_document(user: dict = Depends(authorize)):
return {"created": True, "user": user["id"]}
@app.delete("/api/v1/documents/{doc_id}")
async def delete_document(doc_id: str, user: dict = Depends(authorize)):
return {"deleted": doc_id, "user": user["id"]}
# ── 管理端点:动态更新策略数据 ────────────────────────────────
@app.put("/admin/opa/data/{path:path}")
async def update_opa_data(path: str, data: dict):
"""动态更新 OPA 外部数据"""
response = await opa_http.put(f"/v1/data/{path}", json=data)
if response.status_code not in (200, 204):
raise HTTPException(status_code=500, detail="Failed to update OPA data")
return {"updated": path}
```
## 15.7 Java 集成
### OPA REST API 客户端(OkHttp)
```java
// build.gradle
// dependencies {
// implementation 'com.squareup.okhttp3:okhttp:4.12.0'
// implementation 'com.fasterxml.jackson.core:jackson-databind:2.17.0'
// }
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import okhttp3.*;
import java.io.IOException;
import java.util.Map;
import java.util.concurrent.TimeUnit;
public class OPAClient implements AutoCloseable {
private static final MediaType JSON_TYPE = MediaType.get("application/json");
private static final MediaType TEXT_TYPE = MediaType.get("text/plain");
private final String baseUrl;
private final OkHttpClient httpClient;
private final ObjectMapper mapper;
public OPAClient(String baseUrl) {
this.baseUrl = baseUrl.replaceAll("/+$", "");
this.httpClient = new OkHttpClient.Builder()
.connectTimeout(5, TimeUnit.SECONDS)
.readTimeout(5, TimeUnit.SECONDS)
.build();
this.mapper = new ObjectMapper();
}
/**
* 查询策略决策
*/
public boolean check(String policyPath, Map input) throws IOException {
Map body = Map.of("input", input);
String json = mapper.writeValueAsString(body);
Request request = new Request.Builder()
.url(baseUrl + "/v1/data/" + policyPath)
.post(RequestBody.create(json, JSON_TYPE))
.build();
try (Response response = httpClient.newCall(request).execute()) {
if (!response.isSuccessful()) {
throw new IOException("OPA request failed: " + response.code());
}
JsonNode node = mapper.readTree(response.body().string());
return node.path("result").asBoolean(false);
}
}
/**
* 查询策略并返回完整结果
*/
public JsonNode query(String policyPath, Map input) throws IOException {
Map body = Map.of("input", input);
String json = mapper.writeValueAsString(body);
Request request = new Request.Builder()
.url(baseUrl + "/v1/data/" + policyPath)
.post(RequestBody.create(json, JSON_TYPE))
.build();
try (Response response = httpClient.newCall(request).execute()) {
if (!response.isSuccessful()) {
throw new IOException("OPA request failed: " + response.code());
}
return mapper.readTree(response.body().string()).path("result");
}
}
/**
* 上传外部数据
*/
public void putData(String path, Object data) throws IOException {
String json = mapper.writeValueAsString(data);
Request request = new Request.Builder()
.url(baseUrl + "/v1/data/" + path)
.put(RequestBody.create(json, JSON_TYPE))
.build();
try (Response response = httpClient.newCall(request).execute()) {
if (!response.isSuccessful()) {
throw new IOException("Failed to put data: " + response.code());
}
}
}
/**
* 上传策略
*/
public void putPolicy(String policyId, String regoCode) throws IOException {
Request request = new Request.Builder()
.url(baseUrl + "/v1/policies/" + policyId)
.put(RequestBody.create(regoCode, TEXT_TYPE))
.build();
try (Response response = httpClient.newCall(request).execute()) {
if (!response.isSuccessful()) {
throw new IOException("Failed to put policy: " + response.code());
}
}
}
/**
* 健康检查
*/
public boolean isHealthy() {
Request request = new Request.Builder()
.url(baseUrl + "/health")
.get()
.build();
try (Response response = httpClient.newCall(request).execute()) {
return response.isSuccessful();
} catch (IOException e) {
return false;
}
}
@Override
public void close() {
httpClient.dispatcher().executorService().shutdown();
httpClient.connectionPool().evictAll();
}
// ── 使用示例 ─────────────────────────────────────────────
public static void main(String[] args) throws Exception {
try (var opa = new OPAClient("http://localhost:8181")) {
// 健康检查
System.out.println("OPA healthy: " + opa.isHealthy());
// 上传数据
opa.putData("role_permissions", Map.of(
"admin", java.util.List.of("read", "write", "delete"),
"editor", java.util.List.of("read", "write"),
"viewer", java.util.List.of("read")
));
// 查询决策
boolean allowed = opa.check("authz/allow", Map.of(
"user", Map.of("roles", java.util.List.of("editor")),
"action", "write"
));
System.out.println("Editor can write: " + allowed);
// 查询完整结果
JsonNode result = opa.query("authz", Map.of(
"user", Map.of("roles", java.util.List.of("admin")),
"action", "delete"
));
System.out.println("Full result: " + result.toPrettyString());
}
}
}
```
### Spring Boot + OPA 集成
```java
// ── application.yml ─────────────────────────────────────────
// opa:
// base-url: http://localhost:8181
// policy-path: api_authz/allow
// timeout: 5s
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.client.RestClient;
import java.time.Duration;
@ConfigurationProperties(prefix = "opa")
record OPAProperties(String baseUrl, String policyPath, Duration timeout) {}
@Configuration
class OPAConfig {
@Bean
RestClient opaRestClient(OPAProperties props) {
return RestClient.builder()
.baseUrl(props.baseUrl())
.build();
}
}
// ── OPA 授权服务 ────────────────────────────────────────────
import com.fasterxml.jackson.annotation.JsonProperty;
import org.springframework.stereotype.Service;
import org.springframework.web.client.RestClient;
import java.util.Map;
@Service
class OPAAuthorizationService {
private final RestClient restClient;
private final OPAProperties props;
OPAAuthorizationService(RestClient opaRestClient, OPAProperties props) {
this.restClient = opaRestClient;
this.props = props;
}
record OPARequest(Map input) {}
record OPAResponse(@JsonProperty("result") Object result) {}
public boolean isAllowed(Map input) {
try {
OPAResponse response = restClient.post()
.uri("/v1/data/{path}", props.policyPath())
.body(new OPARequest(input))
.retrieve()
.body(OPAResponse.class);
return response != null && Boolean.TRUE.equals(response.result());
} catch (Exception e) {
// 策略引擎不可用时默认拒绝(fail-closed)
return false;
}
}
@SuppressWarnings("unchecked")
public Map evaluate(String policyPath, Map input) {
OPAResponse response = restClient.post()
.uri("/v1/data/{path}", policyPath)
.body(new OPARequest(input))
.retrieve()
.body(OPAResponse.class);
if (response != null && response.result() instanceof Map) {
return (Map) response.result();
}
return Map.of();
}
}
// ── 自定义注解 + 拦截器 ─────────────────────────────────────
import java.lang.annotation.*;
@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
@Documented
@interface OPAPolicy {
/** OPA 策略路径,默认使用全局配置 */
String value() default "";
}
import jakarta.servlet.http.HttpServletRequest;
import jakarta.servlet.http.HttpServletResponse;
import org.springframework.http.HttpStatus;
import org.springframework.security.core.context.SecurityContextHolder;
import org.springframework.stereotype.Component;
import org.springframework.web.method.HandlerMethod;
import org.springframework.web.servlet.HandlerInterceptor;
import java.util.Map;
@Component
class OPAInterceptor implements HandlerInterceptor {
private final OPAAuthorizationService opaService;
OPAInterceptor(OPAAuthorizationService opaService) {
this.opaService = opaService;
}
@Override
public boolean preHandle(HttpServletRequest request, HttpServletResponse response,
Object handler) throws Exception {
if (!(handler instanceof HandlerMethod method)) {
return true;
}
OPAPolicy annotation = method.getMethodAnnotation(OPAPolicy.class);
if (annotation == null) {
return true; // 无注解则跳过
}
var auth = SecurityContextHolder.getContext().getAuthentication();
Map input = Map.of(
"user", Map.of(
"id", auth.getName(),
"roles", auth.getAuthorities().stream()
.map(Object::toString).toList()
),
"method", request.getMethod(),
"path", request.getRequestURI()
);
boolean allowed = opaService.isAllowed(input);
if (!allowed) {
response.setStatus(HttpStatus.FORBIDDEN.value());
response.getWriter().write("{\"error\": \"Denied by OPA policy\"}");
return false;
}
return true;
}
}
// ── 注册拦截器 ──────────────────────────────────────────────
import org.springframework.context.annotation.Configuration;
import org.springframework.web.servlet.config.annotation.InterceptorRegistry;
import org.springframework.web.servlet.config.annotation.WebMvcConfigurer;
@Configuration
class WebConfig implements WebMvcConfigurer {
private final OPAInterceptor opaInterceptor;
WebConfig(OPAInterceptor opaInterceptor) {
this.opaInterceptor = opaInterceptor;
}
@Override
public void addInterceptors(InterceptorRegistry registry) {
registry.addInterceptor(opaInterceptor)
.addPathPatterns("/api/**");
}
}
// ── Controller 使用示例 ─────────────────────────────────────
import org.springframework.web.bind.annotation.*;
import java.util.Map;
@RestController
@RequestMapping("/api/v1/documents")
class DocumentController {
@GetMapping
@OPAPolicy
public Map listDocuments() {
return Map.of("documents", java.util.List.of());
}
@PostMapping
@OPAPolicy
public Map createDocument(@RequestBody Map body) {
return Map.of("created", true);
}
@DeleteMapping("/{docId}")
@OPAPolicy("api_authz/allow_delete")
public Map deleteDocument(@PathVariable String docId) {
return Map.of("deleted", docId);
}
}
```
## 15.8 Go 集成
### OPA Go 库(嵌入式 OPA)
```go
package main
import (
"context"
"encoding/json"
"fmt"
"log"
"github.com/open-policy-agent/opa/v1/ast"
"github.com/open-policy-agent/opa/v1/rego"
"github.com/open-policy-agent/opa/v1/storage/inmem"
)
// ── 嵌入式 OPA 评估器 ──────────────────────────────────────
type OPAEvaluator struct {
query rego.PreparedEvalQuery
}
// NewOPAEvaluator 创建一个预编译的 OPA 评估器
func NewOPAEvaluator(policy string, queryStr string, data map[string]interface{}) (*OPAEvaluator, error) {
ctx := context.Background()
// 创建内存存储(外部数据)
store := inmem.NewFromObject(data)
// 预编译查询(性能优化:只编译一次)
prepared, err := rego.New(
rego.Query(queryStr),
rego.Module("policy.rego", policy),
rego.Store(store),
).PrepareForEval(ctx)
if err != nil {
return nil, fmt.Errorf("failed to prepare query: %w", err)
}
return &OPAEvaluator{query: prepared}, nil
}
// Evaluate 评估策略
func (e *OPAEvaluator) Evaluate(ctx context.Context, input map[string]interface{}) (bool, error) {
results, err := e.query.Eval(ctx, rego.EvalInput(input))
if err != nil {
return false, fmt.Errorf("evaluation failed: %w", err)
}
if len(results) == 0 || len(results[0].Expressions) == 0 {
return false, nil
}
allowed, ok := results[0].Expressions[0].Value.(bool)
return ok && allowed, nil
}
// EvaluateFull 评估策略并返回完整结果
func (e *OPAEvaluator) EvaluateFull(ctx context.Context, input map[string]interface{}) (interface{}, error) {
results, err := e.query.Eval(ctx, rego.EvalInput(input))
if err != nil {
return nil, fmt.Errorf("evaluation failed: %w", err)
}
if len(results) == 0 || len(results[0].Expressions) == 0 {
return nil, nil
}
return results[0].Expressions[0].Value, nil
}
func main() {
ctx := context.Background()
// 策略定义
policy := `
package authz
import rego.v1
default allow := false
allow if {
some role in input.user.roles
some perm in data.role_permissions[role]
perm == input.action
}
allow if {
input.user.id == input.resource.owner
}
`
// 外部数据
data := map[string]interface{}{
"role_permissions": map[string]interface{}{
"admin": []string{"read", "write", "delete"},
"editor": []string{"read", "write"},
"viewer": []string{"read"},
},
}
// 创建评估器
evaluator, err := NewOPAEvaluator(policy, "data.authz.allow", data)
if err != nil {
log.Fatalf("创建评估器失败: %v", err)
}
// 测试用例
testCases := []struct {
name string
input map[string]interface{}
want bool
}{
{
name: "admin can delete",
input: map[string]interface{}{
"user": map[string]interface{}{"id": "alice", "roles": []string{"admin"}},
"action": "delete",
"resource": map[string]interface{}{"id": "doc-1", "owner": "bob"},
},
want: true,
},
{
name: "editor can write",
input: map[string]interface{}{
"user": map[string]interface{}{"id": "bob", "roles": []string{"editor"}},
"action": "write",
"resource": map[string]interface{}{"id": "doc-1", "owner": "alice"},
},
want: true,
},
{
name: "viewer cannot delete",
input: map[string]interface{}{
"user": map[string]interface{}{"id": "carol", "roles": []string{"viewer"}},
"action": "delete",
"resource": map[string]interface{}{"id": "doc-1", "owner": "alice"},
},
want: false,
},
{
name: "owner can do anything",
input: map[string]interface{}{
"user": map[string]interface{}{"id": "alice", "roles": []string{"viewer"}},
"action": "delete",
"resource": map[string]interface{}{"id": "doc-1", "owner": "alice"},
},
want: true,
},
}
for _, tc := range testCases {
allowed, err := evaluator.Evaluate(ctx, tc.input)
if err != nil {
log.Printf("❌ %s: error: %v", tc.name, err)
continue
}
status := "✅"
if allowed != tc.want {
status = "❌"
}
fmt.Printf("%s %s: allowed=%v (want=%v)\n", status, tc.name, allowed, tc.want)
}
// 性能测试:预编译查询非常快
input := map[string]interface{}{
"user": map[string]interface{}{"id": "alice", "roles": []string{"admin"}},
"action": "read",
"resource": map[string]interface{}{"id": "doc-1"},
}
_ = json.RawMessage{} // suppress unused import
_ = ast.MustParseModule // suppress unused import
// 基准:预编译后每次评估通常 < 0.1ms
for i := 0; i < 1000; i++ {
_, _ = evaluator.Evaluate(ctx, input)
}
fmt.Println("1000 次评估完成")
}
```
### Gin + OPA 授权中间件
```go
package main
import (
"context"
"fmt"
"log"
"net/http"
"os"
"github.com/gin-gonic/gin"
"github.com/open-policy-agent/opa/v1/rego"
"github.com/open-policy-agent/opa/v1/storage/inmem"
)
// ── 嵌入式 OPA 引擎 ────────────────────────────────────────
type PolicyEngine struct {
query rego.PreparedEvalQuery
}
func NewPolicyEngine(policyDir string, data map[string]interface{}) (*PolicyEngine, error) {
ctx := context.Background()
store := inmem.NewFromObject(data)
// 从目录加载所有 .rego 文件
prepared, err := rego.New(
rego.Query("data.api_authz.allow"),
rego.Load([]string{policyDir}, nil),
rego.Store(store),
).PrepareForEval(ctx)
if err != nil {
return nil, fmt.Errorf("failed to prepare policy engine: %w", err)
}
return &PolicyEngine{query: prepared}, nil
}
func (pe *PolicyEngine) IsAllowed(ctx context.Context, input map[string]interface{}) bool {
results, err := pe.query.Eval(ctx, rego.EvalInput(input))
if err != nil {
log.Printf("OPA evaluation error: %v", err)
return false // fail-closed
}
if len(results) == 0 || len(results[0].Expressions) == 0 {
return false
}
allowed, ok := results[0].Expressions[0].Value.(bool)
return ok && allowed
}
// ── 全局策略引擎 ────────────────────────────────────────────
var engine *PolicyEngine
// ── OPA 授权中间件 ──────────────────────────────────────────
func OPAMiddleware() gin.HandlerFunc {
return func(c *gin.Context) {
userID, _ := c.Get("userID")
userRoles, _ := c.Get("userRoles")
input := map[string]interface{}{
"user": map[string]interface{}{
"id": userID,
"roles": userRoles,
},
"method": c.Request.Method,
"path": c.Request.URL.Path,
"token": map[string]interface{}{
"valid": true,
"roles": userRoles,
},
}
if !engine.IsAllowed(c.Request.Context(), input) {
c.AbortWithStatusJSON(http.StatusForbidden, gin.H{
"error": "Access denied by policy",
})
return
}
c.Next()
}
}
// ── 认证中间件(简化) ─────────────────────────────────────
func AuthMiddleware() gin.HandlerFunc {
return func(c *gin.Context) {
token := c.GetHeader("Authorization")
if token == "" {
c.AbortWithStatusJSON(http.StatusUnauthorized, gin.H{"error": "missing token"})
return
}
// 实际项目中应验证 JWT
c.Set("userID", token)
c.Set("userRoles", []string{"editor"})
c.Next()
}
}
func main() {
// 外部数据
data := map[string]interface{}{
"role_permissions": map[string]interface{}{
"admin": []string{"read", "write", "delete"},
"editor": []string{"read", "write"},
"viewer": []string{"read"},
},
}
// 初始化策略引擎
var err error
policyDir := os.Getenv("OPA_POLICY_DIR")
if policyDir == "" {
policyDir = "./policies"
}
engine, err = NewPolicyEngine(policyDir, data)
if err != nil {
log.Fatalf("初始化策略引擎失败: %v", err)
}
r := gin.Default()
// 公开端点
r.GET("/health", func(c *gin.Context) {
c.JSON(http.StatusOK, gin.H{"status": "ok"})
})
// 受保护的 API
api := r.Group("/api/v1", AuthMiddleware(), OPAMiddleware())
{
api.GET("/documents", listDocuments)
api.POST("/documents", createDocument)
api.GET("/documents/:id", getDocument)
api.PUT("/documents/:id", updateDocument)
api.DELETE("/documents/:id", deleteDocument)
}
r.Run(":9090")
}
func listDocuments(c *gin.Context) {
c.JSON(http.StatusOK, gin.H{"documents": []string{}})
}
func createDocument(c *gin.Context) {
c.JSON(http.StatusOK, gin.H{"created": true})
}
func getDocument(c *gin.Context) {
c.JSON(http.StatusOK, gin.H{"id": c.Param("id"), "content": "..."})
}
func updateDocument(c *gin.Context) {
c.JSON(http.StatusOK, gin.H{"id": c.Param("id"), "updated": true})
}
func deleteDocument(c *gin.Context) {
c.JSON(http.StatusOK, gin.H{"id": c.Param("id"), "deleted": true})
}
```
## 15.9 OPA 性能优化
### Partial Evaluation(部分评估)
```go
// Partial Evaluation 将策略预编译为更高效的形式
// 适用于:已知部分输入(如用户角色),未知部分输入(如资源属性)
package main
import (
"context"
"fmt"
"log"
"github.com/open-policy-agent/opa/v1/ast"
"github.com/open-policy-agent/opa/v1/rego"
)
func partialEvalDemo() {
ctx := context.Background()
policy := `
package authz
import rego.v1
default allow := false
allow if {
input.user.role == "admin"
}
allow if {
input.user.role == "editor"
input.resource.classification != "confidential"
}
`
// 部分评估:固定 user.role = "editor",资源属性未知
r := rego.New(
rego.Query("data.authz.allow"),
rego.Module("policy.rego", policy),
rego.Unknowns([]string{"input.resource"}),
)
pq, err := r.Partial(ctx, rego.EvalInput(map[string]interface{}{
"user": map[string]interface{}{"role": "editor"},
}))
if err != nil {
log.Fatalf("Partial evaluation failed: %v", err)
}
// 输出简化后的查询(已消除 user.role 检查)
for _, q := range pq.Queries {
fmt.Printf("Simplified query: %v\n", q)
}
// 输出类似:input.resource.classification != "confidential"
// 运行时只需检查资源属性,跳过用户角色检查
_ = ast.MustParseModule // suppress unused import
}
```
### Bundle 优化
```
# Bundle 目录结构
bundle/
├── authz/
│ ├── policy.rego # 策略文件
│ └── data.json # 外部数据
├── rbac/
│ ├── policy.rego
│ └── data.json
├── .manifest # Bundle 元数据
└── .signatures.json # 签名(可选)
```
```json
// .manifest
{
"revision": "v1.2.3",
"roots": ["authz", "rbac"],
"metadata": {
"created_at": "2025-01-15T10:00:00Z"
}
}
```
```bash
# 构建 Bundle
opa build -b bundle/ -o authz-bundle.tar.gz
# 验证 Bundle
opa inspect authz-bundle.tar.gz
# 签名 Bundle(生产环境推荐)
opa sign --signing-key key.pem --bundle bundle/
```
### 性能调优建议
| 优化手段 | 效果 | 适用场景 |
|---------|------|---------|
| **PreparedEvalQuery** | 避免重复编译,提升 10-100x | 所有场景 |
| **Partial Evaluation** | 减少运行时计算量 | 已知部分输入 |
| **Bundle** | 原子更新、减少 I/O | 生产部署 |
| **内存存储** | 避免外部数据查询延迟 | 数据量 < 100MB |
| **Decision Log 采样** | 减少日志开销 | 高 QPS 场景 |
| **WASM 编译** | 跨语言、低延迟 | 非 Go 应用 |
## 15.10 OPA 部署
```yaml
# docker-compose.yml — OPA + Bundle Server
services:
# Nginx 作为 Bundle Server
bundle-server:
image: nginx:alpine
volumes:
- ./bundles:/usr/share/nginx/html/bundles:ro
- ./nginx.conf:/etc/nginx/conf.d/default.conf:ro
ports:
- "8888:80"
healthcheck:
test: ["CMD", "wget", "-q", "--spider", "http://localhost:80/health"]
interval: 10s
timeout: 5s
retries: 3
opa:
image: openpolicyagent/opa:latest
command:
- "run"
- "--server"
- "--addr=0.0.0.0:8181"
- "--set=decision_logs.console=true"
- "--set=bundles.authz.service=bundle-server"
- "--set=bundles.authz.resource=/bundles/authz.tar.gz"
- "--set=bundles.authz.polling.min_delay_seconds=10"
- "--set=bundles.authz.polling.max_delay_seconds=30"
- "--set=services.bundle-server.url=http://bundle-server:80"
ports:
- "8181:8181"
depends_on:
bundle-server:
condition: service_healthy
healthcheck:
test: ["CMD", "wget", "-q", "--spider", "http://localhost:8181/health"]
interval: 10s
timeout: 5s
retries: 3
```
```nginx
# nginx.conf
server {
listen 80;
location /health {
return 200 'ok';
add_header Content-Type text/plain;
}
location /bundles/ {
alias /usr/share/nginx/html/bundles/;
add_header ETag $upstream_http_etag;
}
}
```
```bash
# 构建并部署 Bundle
mkdir -p bundles
opa build -b policies/ -o bundles/authz.tar.gz
# 启动
docker compose up -d
# 验证 OPA 健康
curl http://localhost:8181/health
# 测试策略
curl -X POST http://localhost:8181/v1/data/authz/allow \
-H 'Content-Type: application/json' \
-d '{
"input": {
"user": {"roles": ["admin"]},
"action": "delete"
}
}'
# 更新策略(重新构建 Bundle,OPA 自动拉取)
opa build -b policies/ -o bundles/authz.tar.gz
```
## 15.11 OPA vs OpenFGA
| 维度 | OPA | OpenFGA |
|------|-----|---------|
| 模型 | ABAC(策略驱动) | ReBAC(关系驱动) |
| 语言 | Rego | DSL |
| 擅长 | 复杂条件逻辑 | 资源级权限继承 |
| 数据 | 策略 + JSON 数据 | 关系元组图 |
| 查询 | "允许吗?" | "允许吗?" + "列出所有" |
| 适用 | K8s、API 策略、合规 | 文档协作、SaaS 权限 |
| 组合 | ✅ 可以组合使用 | ✅ 可以组合使用 |
**最佳实践**:用 OpenFGA 管理"谁与什么资源有什么关系",用 OPA 管理"在什么条件下允许什么操作"。
## 15.12 小结
- OPA 是 CNCF 毕业的**通用策略引擎**,将策略从代码中解耦
- **Rego** 是声明式策略语言,支持复杂的条件逻辑
- OPA 支持多种部署模式:Sidecar、独立服务、库、WASM
- 广泛应用于 **Kubernetes Admission**、**API 授权**、**IaC 合规**
- OPA 和 OpenFGA **互补**:OPA 擅长策略逻辑,OpenFGA 擅长关系管理
- 提供 **Python、Java、Go** 等多语言集成方案,Go 可嵌入式使用获得极致性能
- **策略测试**(`opa test`)和 **Bundle 机制**是生产环境的关键实践