第十五章:OPA — 通用策略引擎
“策略应该与代码分离,就像数据应该与逻辑分离一样。”
mindmap
root((OPA))
核心概念
Policy
Data
Query
Decision
Rego 语言
规则
函数
集合操作
测试
部署模式
Sidecar
独立服务
库模式
WASM
应用场景
K8s Admission
API Gateway
微服务授权
Terraform
15.1 OPA 是什么
OPA(Open Policy Agent)是 CNCF 毕业项目,提供通用的策略引擎。它将策略决策从应用代码中解耦出来。
flowchart LR
subgraph 应用层
A1[Web 应用]
A2[API Gateway]
A3[微服务]
A4[CI/CD Pipeline]
end
subgraph OPA
direction TB
RE[Rego 评估引擎]
P[策略<br/>Rego Files]
D[数据<br/>JSON]
P --> RE
D --> RE
end
subgraph 策略来源
B[Bundle Server]
G[Git Repository]
end
A1 -->|"POST /v1/data/authz"| RE
A2 -->|查询决策| RE
A3 -->|查询决策| RE
A4 -->|合规检查| RE
RE -->|"{ allow: true }"| A1
B -->|定期拉取| P
B -->|定期拉取| D
G -.->|CI/CD 构建| B
OPA 决策流程
sequenceDiagram
participant App as 应用
participant OPA as OPA Server
participant Bundle as Bundle Server
Note over OPA,Bundle: 启动时加载策略
OPA->>Bundle: GET /bundles/authz.tar.gz
Bundle-->>OPA: 策略 + 数据
App->>OPA: POST /v1/data/authz/allow<br/>{"input": {"user": "alice", "action": "read"}}
OPA->>OPA: 评估 Rego 策略
OPA->>OPA: 合并 input + data
OPA-->>App: {"result": true}
Note over OPA,Bundle: 定期轮询更新
OPA->>Bundle: GET /bundles/authz.tar.gz<br/>If-None-Match: etag
Bundle-->>OPA: 304 Not Modified / 新版本
OPA 部署模式对比
flowchart TB
subgraph Sidecar["Sidecar 模式"]
direction LR
subgraph Pod1["Kubernetes Pod"]
APP1[应用容器] -->|localhost| OPA1[OPA Sidecar]
end
end
subgraph Library["Library 模式(Go 嵌入)"]
direction LR
subgraph Process["应用进程"]
APP2[应用代码] -->|函数调用| OPA2[OPA 库]
end
end
subgraph Central["Central 模式"]
direction LR
APP3[应用 A] -->|HTTP/gRPC| OPAC[OPA 集群]
APP4[应用 B] -->|HTTP/gRPC| OPAC
APP5[应用 C] -->|HTTP/gRPC| OPAC
end
style Sidecar fill:#e8f5e9
style Library fill:#e1f5fe
style Central fill:#fff3e0
15.2 Rego 语言
Rego 是 OPA 的策略语言,声明式风格:
基本规则
package authz
import rego.v1
# 默认拒绝
default allow := false
# 管理员允许所有操作
allow if {
input.user.role == "admin"
}
# 用户可以读取自己的数据
allow if {
input.action == "read"
input.resource.owner == input.user.id
}
# 编辑者可以编辑非机密文档
allow if {
input.action == "edit"
input.user.role == "editor"
input.resource.classification != "confidential"
}
输入和查询
// 输入(input)
{
"user": {
"id": "alice",
"role": "editor",
"department": "engineering"
},
"action": "edit",
"resource": {
"type": "document",
"id": "doc-123",
"owner": "bob",
"classification": "internal"
}
}
// 查询:data.authz.allow → true
高级特性
package authz
import rego.v1
# 集合推导:获取用户的所有权限
user_permissions contains perm if {
some role in input.user.roles
some perm in data.role_permissions[role]
}
# 函数
is_owner(resource) if {
resource.owner == input.user.id
}
# 条件组合
allow if {
input.action == "delete"
is_owner(input.resource)
input.resource.status != "archived"
}
# 基于时间的策略
allow if {
input.action == "access"
time.weekday(time.now_ns()) in ["Monday", "Tuesday", "Wednesday", "Thursday", "Friday"]
hour := time.clock(time.now_ns())[0]
hour >= 9
hour < 18
}
RBAC 策略
package rbac
import rego.v1
# 角色-权限映射(外部数据)
# data.json:
# {
# "role_permissions": {
# "admin": ["read", "write", "delete", "admin"],
# "editor": ["read", "write"],
# "viewer": ["read"]
# },
# "role_hierarchy": {
# "admin": ["editor"],
# "editor": ["viewer"]
# }
# }
default allow := false
# 直接权限检查
allow if {
some role in input.user.roles
some perm in data.role_permissions[role]
perm == input.action
}
# 通过角色层次继承权限
allow if {
some role in input.user.roles
some inherited_role in data.role_hierarchy[role]
some perm in data.role_permissions[inherited_role]
perm == input.action
}
# 获取用户的有效权限集合(含继承)
effective_permissions contains perm if {
some role in input.user.roles
some perm in data.role_permissions[role]
}
effective_permissions contains perm if {
some role in input.user.roles
some inherited in data.role_hierarchy[role]
some perm in data.role_permissions[inherited]
}
ABAC 策略(基于时间、IP、部门)
package abac
import rego.v1
import data.ip_allowlist
import data.department_resources
default allow := false
# 规则 1:仅在工作时间允许访问敏感资源
allow if {
input.resource.classification == "sensitive"
is_business_hours
is_allowed_ip
is_same_department
}
# 规则 2:非敏感资源仅需认证
allow if {
input.resource.classification != "sensitive"
input.user.authenticated == true
}
# 规则 3:VPN 用户可以在非工作时间访问
allow if {
input.network.vpn == true
input.user.authenticated == true
is_same_department
}
# ── 辅助规则 ─────────────────────────────────────────────────
is_business_hours if {
day := time.weekday(time.now_ns())
day in ["Monday", "Tuesday", "Wednesday", "Thursday", "Friday"]
hour := time.clock(time.now_ns())[0]
hour >= 9
hour < 18
}
is_allowed_ip if {
net.cidr_contains(ip_allowlist[_], input.network.source_ip)
}
is_same_department if {
input.user.department == input.resource.department
}
# 返回拒绝原因(便于调试)
reasons contains "outside business hours" if {
input.resource.classification == "sensitive"
not is_business_hours
}
reasons contains "IP not in allowlist" if {
not is_allowed_ip
}
reasons contains "department mismatch" if {
not is_same_department
}
API 路由授权策略
package api_authz
import rego.v1
default allow := false
# 公开端点
allow if {
is_public_endpoint
}
# 已认证用户的 API 访问
allow if {
input.token.valid
is_allowed_route
}
# ── 公开端点列表 ─────────────────────────────────────────────
is_public_endpoint if {
input.path in ["/health", "/metrics", "/docs", "/openapi.json"]
}
is_public_endpoint if {
input.method == "POST"
input.path in ["/auth/login", "/auth/register"]
}
# ── 路由权限矩阵 ────────────────────────────────────────────
route_permissions := {
"GET /api/v1/users": {"roles": ["admin", "viewer"]},
"POST /api/v1/users": {"roles": ["admin"]},
"GET /api/v1/documents": {"roles": ["admin", "editor", "viewer"]},
"POST /api/v1/documents": {"roles": ["admin", "editor"]},
"DELETE /api/v1/documents": {"roles": ["admin"]},
}
is_allowed_route if {
route_key := sprintf("%s %s", [input.method, input.path])
required := route_permissions[route_key]
some role in input.token.roles
role in required.roles
}
# 通配符路由匹配(/api/v1/documents/*)
is_allowed_route if {
input.method == "GET"
glob.match("/api/v1/documents/*", ["/"], input.path)
some role in input.token.roles
role in ["admin", "editor", "viewer"]
}
is_allowed_route if {
input.method in ["PUT", "PATCH"]
glob.match("/api/v1/documents/*", ["/"], input.path)
some role in input.token.roles
role in ["admin", "editor"]
}
# ── 速率限制 ─────────────────────────────────────────────────
rate_limit := 1000 if {
some role in input.token.roles
role == "premium"
} else := 100
Rego 测试
package authz_test
import rego.v1
import data.authz
test_admin_allowed if {
authz.allow with input as {
"user": {"role": "admin"},
"action": "delete",
"resource": {"id": "doc-1"}
}
}
test_viewer_cannot_delete if {
not authz.allow with input as {
"user": {"role": "viewer"},
"action": "delete",
"resource": {"id": "doc-1"}
}
}
test_owner_can_read if {
authz.allow with input as {
"user": {"id": "alice", "role": "viewer"},
"action": "read",
"resource": {"owner": "alice"}
}
}
# 运行测试
opa test . -v
15.3 OPA 策略测试最佳实践
目录结构
policies/
├── authz.rego # 主策略
├── authz_test.rego # 主策略测试
├── rbac.rego # RBAC 策略
├── rbac_test.rego # RBAC 测试
├── abac.rego # ABAC 策略
├── abac_test.rego # ABAC 测试
├── data.json # 外部数据
└── Makefile
完整测试示例
package rbac_test
import rego.v1
import data.rbac
# ── 测试数据 ─────────────────────────────────────────────────
mock_role_permissions := {
"admin": ["read", "write", "delete"],
"editor": ["read", "write"],
"viewer": ["read"],
}
mock_role_hierarchy := {
"admin": ["editor"],
"editor": ["viewer"],
}
# ── 正向测试 ─────────────────────────────────────────────────
test_admin_can_delete if {
rbac.allow with input as {
"user": {"roles": ["admin"]},
"action": "delete",
} with data.role_permissions as mock_role_permissions
with data.role_hierarchy as mock_role_hierarchy
}
test_editor_can_write if {
rbac.allow with input as {
"user": {"roles": ["editor"]},
"action": "write",
} with data.role_permissions as mock_role_permissions
with data.role_hierarchy as mock_role_hierarchy
}
# 测试角色继承:editor 继承 viewer 的 read 权限
test_editor_inherits_read if {
rbac.allow with input as {
"user": {"roles": ["editor"]},
"action": "read",
} with data.role_permissions as mock_role_permissions
with data.role_hierarchy as mock_role_hierarchy
}
# ── 反向测试 ─────────────────────────────────────────────────
test_viewer_cannot_write if {
not rbac.allow with input as {
"user": {"roles": ["viewer"]},
"action": "write",
} with data.role_permissions as mock_role_permissions
with data.role_hierarchy as mock_role_hierarchy
}
test_viewer_cannot_delete if {
not rbac.allow with input as {
"user": {"roles": ["viewer"]},
"action": "delete",
} with data.role_permissions as mock_role_permissions
with data.role_hierarchy as mock_role_hierarchy
}
test_no_roles_denied if {
not rbac.allow with input as {
"user": {"roles": []},
"action": "read",
} with data.role_permissions as mock_role_permissions
with data.role_hierarchy as mock_role_hierarchy
}
# ── 有效权限集合测试 ─────────────────────────────────────────
test_admin_effective_permissions if {
perms := rbac.effective_permissions with input as {
"user": {"roles": ["admin"]},
} with data.role_permissions as mock_role_permissions
with data.role_hierarchy as mock_role_hierarchy
perms == {"read", "write", "delete"}
}
# 运行所有测试(详细输出)
opa test . -v
# 运行测试并生成覆盖率报告
opa test . --coverage --format=json
# 仅运行匹配的测试
opa test . -v --run "test_admin"
# 性能基准测试
opa bench 'data.authz.allow' -i input.json -d policies/
# Makefile
.PHONY: test lint fmt check
test:
opa test policies/ -v
coverage:
opa test policies/ --coverage --format=json | jq '.coverage'
lint:
opa check policies/ --strict
fmt:
opa fmt policies/ --write
check: fmt lint test
@echo "All checks passed!"
15.4 OPA 部署模式
模式 |
描述 |
延迟 |
适用场景 |
|---|---|---|---|
Sidecar |
与应用同 Pod |
极低(本地) |
Kubernetes 微服务 |
独立服务 |
独立部署的 OPA 服务 |
低(网络) |
集中策略管理 |
库模式 |
嵌入应用进程 |
极低 |
Go 应用 |
WASM |
编译为 WebAssembly |
极低 |
边缘计算、浏览器 |
15.5 应用场景
Kubernetes Admission Control(Gatekeeper)
# 禁止使用 latest 标签的镜像
apiVersion: constraints.gatekeeper.sh/v1beta1
kind: K8sDisallowedTags
metadata:
name: no-latest-tag
spec:
match:
kinds:
- apiGroups: [""]
kinds: ["Pod"]
parameters:
tags: ["latest"]
API Gateway 授权
package gateway
import rego.v1
default allow := false
# 公开端点无需认证
allow if {
input.path in ["/health", "/metrics", "/docs"]
}
# 已认证用户可以访问 API
allow if {
input.token.valid
input.method == "GET"
startswith(input.path, "/api/v1/")
}
# 管理员可以访问管理端点
allow if {
input.token.valid
"admin" in input.token.roles
startswith(input.path, "/admin/")
}
# 速率限制
rate_limit := limit if {
"premium" in input.token.roles
limit := 1000
} else := limit if {
limit := 100
}
Terraform 策略检查(Conftest)
package terraform
import rego.v1
# 禁止公开的 S3 存储桶
deny contains msg if {
some resource in input.resource_changes
resource.type == "aws_s3_bucket"
resource.change.after.acl == "public-read"
msg := sprintf("S3 bucket %s must not be public", [resource.address])
}
# 要求所有资源有标签
deny contains msg if {
some resource in input.resource_changes
not resource.change.after.tags.Environment
msg := sprintf("Resource %s must have Environment tag", [resource.address])
}
15.6 Python 集成
OPA REST API 客户端
"""OPA REST API 客户端 — 支持策略查询、数据管理、健康检查"""
import httpx
from typing import Any, Optional
from dataclasses import dataclass
@dataclass
class OPADecision:
result: Any
decision_id: Optional[str] = None
class OPAClient:
"""OPA REST API 客户端"""
def __init__(self, base_url: str = "http://localhost:8181", timeout: float = 5.0):
self.base_url = base_url.rstrip("/")
self.timeout = timeout
self._client = httpx.AsyncClient(
base_url=self.base_url,
timeout=timeout,
headers={"Content-Type": "application/json"},
)
async def close(self):
await self._client.aclose()
async def __aenter__(self):
return self
async def __aexit__(self, *args):
await self.close()
async def check(self, policy_path: str, input_data: dict) -> OPADecision:
"""查询策略决策
Args:
policy_path: 策略路径,如 "authz/allow"
input_data: 输入数据
"""
url = f"/v1/data/{policy_path}"
response = await self._client.post(url, json={"input": input_data})
response.raise_for_status()
body = response.json()
return OPADecision(
result=body.get("result"),
decision_id=body.get("decision_id"),
)
async def query(self, query: str, input_data: Optional[dict] = None) -> Any:
"""执行任意 Rego 查询"""
payload: dict = {"query": query}
if input_data:
payload["input"] = input_data
response = await self._client.post("/v1/query", json=payload)
response.raise_for_status()
return response.json().get("result")
async def put_data(self, path: str, data: Any) -> None:
"""上传外部数据"""
response = await self._client.put(f"/v1/data/{path}", json=data)
response.raise_for_status()
async def get_data(self, path: str) -> Any:
"""获取外部数据"""
response = await self._client.get(f"/v1/data/{path}")
response.raise_for_status()
return response.json().get("result")
async def put_policy(self, policy_id: str, rego_code: str) -> None:
"""上传策略"""
response = await self._client.put(
f"/v1/policies/{policy_id}",
content=rego_code.encode(),
headers={"Content-Type": "text/plain"},
)
response.raise_for_status()
async def health(self) -> bool:
"""健康检查"""
try:
response = await self._client.get("/health")
return response.status_code == 200
except Exception:
return False
# ── 使用示例 ─────────────────────────────────────────────────
async def demo():
async with OPAClient("http://localhost:8181") as opa:
# 健康检查
healthy = await opa.health()
print(f"OPA healthy: {healthy}")
# 上传角色权限数据
await opa.put_data("role_permissions", {
"admin": ["read", "write", "delete"],
"editor": ["read", "write"],
"viewer": ["read"],
})
# 上传策略
await opa.put_policy("authz", """
package authz
import rego.v1
default allow := false
allow if {
some role in input.user.roles
some perm in data.role_permissions[role]
perm == input.action
}
""")
# 查询决策
decision = await opa.check("authz/allow", {
"user": {"roles": ["editor"]},
"action": "write",
})
print(f"Editor can write: {decision.result}") # True
decision = await opa.check("authz/allow", {
"user": {"roles": ["viewer"]},
"action": "delete",
})
print(f"Viewer can delete: {decision.result}") # False
FastAPI + OPA 授权中间件
"""FastAPI + OPA 授权中间件完整示例"""
from contextlib import asynccontextmanager
from typing import Optional
import httpx
from fastapi import FastAPI, Depends, HTTPException, Request
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
# ── OPA 客户端 ───────────────────────────────────────────────
OPA_URL = "http://localhost:8181"
opa_http: Optional[httpx.AsyncClient] = None
@asynccontextmanager
async def lifespan(app: FastAPI):
global opa_http
opa_http = httpx.AsyncClient(base_url=OPA_URL, timeout=5.0)
yield
await opa_http.aclose()
app = FastAPI(title="OPA Demo", lifespan=lifespan)
security = HTTPBearer()
# ── 用户解析 ─────────────────────────────────────────────────
async def get_current_user(
credentials: HTTPAuthorizationCredentials = Depends(security),
) -> dict:
# 实际项目中应验证 JWT
token = credentials.credentials
return {"id": token, "roles": ["editor"], "department": "engineering"}
# ── OPA 授权依赖 ─────────────────────────────────────────────
class OPAAuthorize:
"""可复用的 OPA 授权依赖"""
def __init__(self, policy_path: str = "authz/allow"):
self.policy_path = policy_path
async def __call__(
self,
request: Request,
user: dict = Depends(get_current_user),
) -> dict:
opa_input = {
"input": {
"user": user,
"method": request.method,
"path": request.url.path,
"headers": dict(request.headers),
}
}
response = await opa_http.post(
f"/v1/data/{self.policy_path}",
json=opa_input,
)
if response.status_code != 200:
raise HTTPException(status_code=503, detail="Policy engine unavailable")
result = response.json().get("result", False)
if not result:
raise HTTPException(status_code=403, detail="Denied by policy")
return user
# ── 路由 ─────────────────────────────────────────────────────
authorize = OPAAuthorize("api_authz/allow")
@app.get("/api/v1/documents")
async def list_documents(user: dict = Depends(authorize)):
return {"documents": [...], "user": user["id"]}
@app.post("/api/v1/documents")
async def create_document(user: dict = Depends(authorize)):
return {"created": True, "user": user["id"]}
@app.delete("/api/v1/documents/{doc_id}")
async def delete_document(doc_id: str, user: dict = Depends(authorize)):
return {"deleted": doc_id, "user": user["id"]}
# ── 管理端点:动态更新策略数据 ────────────────────────────────
@app.put("/admin/opa/data/{path:path}")
async def update_opa_data(path: str, data: dict):
"""动态更新 OPA 外部数据"""
response = await opa_http.put(f"/v1/data/{path}", json=data)
if response.status_code not in (200, 204):
raise HTTPException(status_code=500, detail="Failed to update OPA data")
return {"updated": path}
15.7 Java 集成
OPA REST API 客户端(OkHttp)
// build.gradle
// dependencies {
// implementation 'com.squareup.okhttp3:okhttp:4.12.0'
// implementation 'com.fasterxml.jackson.core:jackson-databind:2.17.0'
// }
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import okhttp3.*;
import java.io.IOException;
import java.util.Map;
import java.util.concurrent.TimeUnit;
public class OPAClient implements AutoCloseable {
private static final MediaType JSON_TYPE = MediaType.get("application/json");
private static final MediaType TEXT_TYPE = MediaType.get("text/plain");
private final String baseUrl;
private final OkHttpClient httpClient;
private final ObjectMapper mapper;
public OPAClient(String baseUrl) {
this.baseUrl = baseUrl.replaceAll("/+$", "");
this.httpClient = new OkHttpClient.Builder()
.connectTimeout(5, TimeUnit.SECONDS)
.readTimeout(5, TimeUnit.SECONDS)
.build();
this.mapper = new ObjectMapper();
}
/**
* 查询策略决策
*/
public boolean check(String policyPath, Map<String, Object> input) throws IOException {
Map<String, Object> body = Map.of("input", input);
String json = mapper.writeValueAsString(body);
Request request = new Request.Builder()
.url(baseUrl + "/v1/data/" + policyPath)
.post(RequestBody.create(json, JSON_TYPE))
.build();
try (Response response = httpClient.newCall(request).execute()) {
if (!response.isSuccessful()) {
throw new IOException("OPA request failed: " + response.code());
}
JsonNode node = mapper.readTree(response.body().string());
return node.path("result").asBoolean(false);
}
}
/**
* 查询策略并返回完整结果
*/
public JsonNode query(String policyPath, Map<String, Object> input) throws IOException {
Map<String, Object> body = Map.of("input", input);
String json = mapper.writeValueAsString(body);
Request request = new Request.Builder()
.url(baseUrl + "/v1/data/" + policyPath)
.post(RequestBody.create(json, JSON_TYPE))
.build();
try (Response response = httpClient.newCall(request).execute()) {
if (!response.isSuccessful()) {
throw new IOException("OPA request failed: " + response.code());
}
return mapper.readTree(response.body().string()).path("result");
}
}
/**
* 上传外部数据
*/
public void putData(String path, Object data) throws IOException {
String json = mapper.writeValueAsString(data);
Request request = new Request.Builder()
.url(baseUrl + "/v1/data/" + path)
.put(RequestBody.create(json, JSON_TYPE))
.build();
try (Response response = httpClient.newCall(request).execute()) {
if (!response.isSuccessful()) {
throw new IOException("Failed to put data: " + response.code());
}
}
}
/**
* 上传策略
*/
public void putPolicy(String policyId, String regoCode) throws IOException {
Request request = new Request.Builder()
.url(baseUrl + "/v1/policies/" + policyId)
.put(RequestBody.create(regoCode, TEXT_TYPE))
.build();
try (Response response = httpClient.newCall(request).execute()) {
if (!response.isSuccessful()) {
throw new IOException("Failed to put policy: " + response.code());
}
}
}
/**
* 健康检查
*/
public boolean isHealthy() {
Request request = new Request.Builder()
.url(baseUrl + "/health")
.get()
.build();
try (Response response = httpClient.newCall(request).execute()) {
return response.isSuccessful();
} catch (IOException e) {
return false;
}
}
@Override
public void close() {
httpClient.dispatcher().executorService().shutdown();
httpClient.connectionPool().evictAll();
}
// ── 使用示例 ─────────────────────────────────────────────
public static void main(String[] args) throws Exception {
try (var opa = new OPAClient("http://localhost:8181")) {
// 健康检查
System.out.println("OPA healthy: " + opa.isHealthy());
// 上传数据
opa.putData("role_permissions", Map.of(
"admin", java.util.List.of("read", "write", "delete"),
"editor", java.util.List.of("read", "write"),
"viewer", java.util.List.of("read")
));
// 查询决策
boolean allowed = opa.check("authz/allow", Map.of(
"user", Map.of("roles", java.util.List.of("editor")),
"action", "write"
));
System.out.println("Editor can write: " + allowed);
// 查询完整结果
JsonNode result = opa.query("authz", Map.of(
"user", Map.of("roles", java.util.List.of("admin")),
"action", "delete"
));
System.out.println("Full result: " + result.toPrettyString());
}
}
}
Spring Boot + OPA 集成
// ── application.yml ─────────────────────────────────────────
// opa:
// base-url: http://localhost:8181
// policy-path: api_authz/allow
// timeout: 5s
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.client.RestClient;
import java.time.Duration;
@ConfigurationProperties(prefix = "opa")
record OPAProperties(String baseUrl, String policyPath, Duration timeout) {}
@Configuration
class OPAConfig {
@Bean
RestClient opaRestClient(OPAProperties props) {
return RestClient.builder()
.baseUrl(props.baseUrl())
.build();
}
}
// ── OPA 授权服务 ────────────────────────────────────────────
import com.fasterxml.jackson.annotation.JsonProperty;
import org.springframework.stereotype.Service;
import org.springframework.web.client.RestClient;
import java.util.Map;
@Service
class OPAAuthorizationService {
private final RestClient restClient;
private final OPAProperties props;
OPAAuthorizationService(RestClient opaRestClient, OPAProperties props) {
this.restClient = opaRestClient;
this.props = props;
}
record OPARequest(Map<String, Object> input) {}
record OPAResponse(@JsonProperty("result") Object result) {}
public boolean isAllowed(Map<String, Object> input) {
try {
OPAResponse response = restClient.post()
.uri("/v1/data/{path}", props.policyPath())
.body(new OPARequest(input))
.retrieve()
.body(OPAResponse.class);
return response != null && Boolean.TRUE.equals(response.result());
} catch (Exception e) {
// 策略引擎不可用时默认拒绝(fail-closed)
return false;
}
}
@SuppressWarnings("unchecked")
public Map<String, Object> evaluate(String policyPath, Map<String, Object> input) {
OPAResponse response = restClient.post()
.uri("/v1/data/{path}", policyPath)
.body(new OPARequest(input))
.retrieve()
.body(OPAResponse.class);
if (response != null && response.result() instanceof Map) {
return (Map<String, Object>) response.result();
}
return Map.of();
}
}
// ── 自定义注解 + 拦截器 ─────────────────────────────────────
import java.lang.annotation.*;
@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
@Documented
@interface OPAPolicy {
/** OPA 策略路径,默认使用全局配置 */
String value() default "";
}
import jakarta.servlet.http.HttpServletRequest;
import jakarta.servlet.http.HttpServletResponse;
import org.springframework.http.HttpStatus;
import org.springframework.security.core.context.SecurityContextHolder;
import org.springframework.stereotype.Component;
import org.springframework.web.method.HandlerMethod;
import org.springframework.web.servlet.HandlerInterceptor;
import java.util.Map;
@Component
class OPAInterceptor implements HandlerInterceptor {
private final OPAAuthorizationService opaService;
OPAInterceptor(OPAAuthorizationService opaService) {
this.opaService = opaService;
}
@Override
public boolean preHandle(HttpServletRequest request, HttpServletResponse response,
Object handler) throws Exception {
if (!(handler instanceof HandlerMethod method)) {
return true;
}
OPAPolicy annotation = method.getMethodAnnotation(OPAPolicy.class);
if (annotation == null) {
return true; // 无注解则跳过
}
var auth = SecurityContextHolder.getContext().getAuthentication();
Map<String, Object> input = Map.of(
"user", Map.of(
"id", auth.getName(),
"roles", auth.getAuthorities().stream()
.map(Object::toString).toList()
),
"method", request.getMethod(),
"path", request.getRequestURI()
);
boolean allowed = opaService.isAllowed(input);
if (!allowed) {
response.setStatus(HttpStatus.FORBIDDEN.value());
response.getWriter().write("{\"error\": \"Denied by OPA policy\"}");
return false;
}
return true;
}
}
// ── 注册拦截器 ──────────────────────────────────────────────
import org.springframework.context.annotation.Configuration;
import org.springframework.web.servlet.config.annotation.InterceptorRegistry;
import org.springframework.web.servlet.config.annotation.WebMvcConfigurer;
@Configuration
class WebConfig implements WebMvcConfigurer {
private final OPAInterceptor opaInterceptor;
WebConfig(OPAInterceptor opaInterceptor) {
this.opaInterceptor = opaInterceptor;
}
@Override
public void addInterceptors(InterceptorRegistry registry) {
registry.addInterceptor(opaInterceptor)
.addPathPatterns("/api/**");
}
}
// ── Controller 使用示例 ─────────────────────────────────────
import org.springframework.web.bind.annotation.*;
import java.util.Map;
@RestController
@RequestMapping("/api/v1/documents")
class DocumentController {
@GetMapping
@OPAPolicy
public Map<String, Object> listDocuments() {
return Map.of("documents", java.util.List.of());
}
@PostMapping
@OPAPolicy
public Map<String, Object> createDocument(@RequestBody Map<String, Object> body) {
return Map.of("created", true);
}
@DeleteMapping("/{docId}")
@OPAPolicy("api_authz/allow_delete")
public Map<String, Object> deleteDocument(@PathVariable String docId) {
return Map.of("deleted", docId);
}
}
15.8 Go 集成
OPA Go 库(嵌入式 OPA)
package main
import (
"context"
"encoding/json"
"fmt"
"log"
"github.com/open-policy-agent/opa/v1/ast"
"github.com/open-policy-agent/opa/v1/rego"
"github.com/open-policy-agent/opa/v1/storage/inmem"
)
// ── 嵌入式 OPA 评估器 ──────────────────────────────────────
type OPAEvaluator struct {
query rego.PreparedEvalQuery
}
// NewOPAEvaluator 创建一个预编译的 OPA 评估器
func NewOPAEvaluator(policy string, queryStr string, data map[string]interface{}) (*OPAEvaluator, error) {
ctx := context.Background()
// 创建内存存储(外部数据)
store := inmem.NewFromObject(data)
// 预编译查询(性能优化:只编译一次)
prepared, err := rego.New(
rego.Query(queryStr),
rego.Module("policy.rego", policy),
rego.Store(store),
).PrepareForEval(ctx)
if err != nil {
return nil, fmt.Errorf("failed to prepare query: %w", err)
}
return &OPAEvaluator{query: prepared}, nil
}
// Evaluate 评估策略
func (e *OPAEvaluator) Evaluate(ctx context.Context, input map[string]interface{}) (bool, error) {
results, err := e.query.Eval(ctx, rego.EvalInput(input))
if err != nil {
return false, fmt.Errorf("evaluation failed: %w", err)
}
if len(results) == 0 || len(results[0].Expressions) == 0 {
return false, nil
}
allowed, ok := results[0].Expressions[0].Value.(bool)
return ok && allowed, nil
}
// EvaluateFull 评估策略并返回完整结果
func (e *OPAEvaluator) EvaluateFull(ctx context.Context, input map[string]interface{}) (interface{}, error) {
results, err := e.query.Eval(ctx, rego.EvalInput(input))
if err != nil {
return nil, fmt.Errorf("evaluation failed: %w", err)
}
if len(results) == 0 || len(results[0].Expressions) == 0 {
return nil, nil
}
return results[0].Expressions[0].Value, nil
}
func main() {
ctx := context.Background()
// 策略定义
policy := `
package authz
import rego.v1
default allow := false
allow if {
some role in input.user.roles
some perm in data.role_permissions[role]
perm == input.action
}
allow if {
input.user.id == input.resource.owner
}
`
// 外部数据
data := map[string]interface{}{
"role_permissions": map[string]interface{}{
"admin": []string{"read", "write", "delete"},
"editor": []string{"read", "write"},
"viewer": []string{"read"},
},
}
// 创建评估器
evaluator, err := NewOPAEvaluator(policy, "data.authz.allow", data)
if err != nil {
log.Fatalf("创建评估器失败: %v", err)
}
// 测试用例
testCases := []struct {
name string
input map[string]interface{}
want bool
}{
{
name: "admin can delete",
input: map[string]interface{}{
"user": map[string]interface{}{"id": "alice", "roles": []string{"admin"}},
"action": "delete",
"resource": map[string]interface{}{"id": "doc-1", "owner": "bob"},
},
want: true,
},
{
name: "editor can write",
input: map[string]interface{}{
"user": map[string]interface{}{"id": "bob", "roles": []string{"editor"}},
"action": "write",
"resource": map[string]interface{}{"id": "doc-1", "owner": "alice"},
},
want: true,
},
{
name: "viewer cannot delete",
input: map[string]interface{}{
"user": map[string]interface{}{"id": "carol", "roles": []string{"viewer"}},
"action": "delete",
"resource": map[string]interface{}{"id": "doc-1", "owner": "alice"},
},
want: false,
},
{
name: "owner can do anything",
input: map[string]interface{}{
"user": map[string]interface{}{"id": "alice", "roles": []string{"viewer"}},
"action": "delete",
"resource": map[string]interface{}{"id": "doc-1", "owner": "alice"},
},
want: true,
},
}
for _, tc := range testCases {
allowed, err := evaluator.Evaluate(ctx, tc.input)
if err != nil {
log.Printf("❌ %s: error: %v", tc.name, err)
continue
}
status := "✅"
if allowed != tc.want {
status = "❌"
}
fmt.Printf("%s %s: allowed=%v (want=%v)\n", status, tc.name, allowed, tc.want)
}
// 性能测试:预编译查询非常快
input := map[string]interface{}{
"user": map[string]interface{}{"id": "alice", "roles": []string{"admin"}},
"action": "read",
"resource": map[string]interface{}{"id": "doc-1"},
}
_ = json.RawMessage{} // suppress unused import
_ = ast.MustParseModule // suppress unused import
// 基准:预编译后每次评估通常 < 0.1ms
for i := 0; i < 1000; i++ {
_, _ = evaluator.Evaluate(ctx, input)
}
fmt.Println("1000 次评估完成")
}
Gin + OPA 授权中间件
package main
import (
"context"
"fmt"
"log"
"net/http"
"os"
"github.com/gin-gonic/gin"
"github.com/open-policy-agent/opa/v1/rego"
"github.com/open-policy-agent/opa/v1/storage/inmem"
)
// ── 嵌入式 OPA 引擎 ────────────────────────────────────────
type PolicyEngine struct {
query rego.PreparedEvalQuery
}
func NewPolicyEngine(policyDir string, data map[string]interface{}) (*PolicyEngine, error) {
ctx := context.Background()
store := inmem.NewFromObject(data)
// 从目录加载所有 .rego 文件
prepared, err := rego.New(
rego.Query("data.api_authz.allow"),
rego.Load([]string{policyDir}, nil),
rego.Store(store),
).PrepareForEval(ctx)
if err != nil {
return nil, fmt.Errorf("failed to prepare policy engine: %w", err)
}
return &PolicyEngine{query: prepared}, nil
}
func (pe *PolicyEngine) IsAllowed(ctx context.Context, input map[string]interface{}) bool {
results, err := pe.query.Eval(ctx, rego.EvalInput(input))
if err != nil {
log.Printf("OPA evaluation error: %v", err)
return false // fail-closed
}
if len(results) == 0 || len(results[0].Expressions) == 0 {
return false
}
allowed, ok := results[0].Expressions[0].Value.(bool)
return ok && allowed
}
// ── 全局策略引擎 ────────────────────────────────────────────
var engine *PolicyEngine
// ── OPA 授权中间件 ──────────────────────────────────────────
func OPAMiddleware() gin.HandlerFunc {
return func(c *gin.Context) {
userID, _ := c.Get("userID")
userRoles, _ := c.Get("userRoles")
input := map[string]interface{}{
"user": map[string]interface{}{
"id": userID,
"roles": userRoles,
},
"method": c.Request.Method,
"path": c.Request.URL.Path,
"token": map[string]interface{}{
"valid": true,
"roles": userRoles,
},
}
if !engine.IsAllowed(c.Request.Context(), input) {
c.AbortWithStatusJSON(http.StatusForbidden, gin.H{
"error": "Access denied by policy",
})
return
}
c.Next()
}
}
// ── 认证中间件(简化) ─────────────────────────────────────
func AuthMiddleware() gin.HandlerFunc {
return func(c *gin.Context) {
token := c.GetHeader("Authorization")
if token == "" {
c.AbortWithStatusJSON(http.StatusUnauthorized, gin.H{"error": "missing token"})
return
}
// 实际项目中应验证 JWT
c.Set("userID", token)
c.Set("userRoles", []string{"editor"})
c.Next()
}
}
func main() {
// 外部数据
data := map[string]interface{}{
"role_permissions": map[string]interface{}{
"admin": []string{"read", "write", "delete"},
"editor": []string{"read", "write"},
"viewer": []string{"read"},
},
}
// 初始化策略引擎
var err error
policyDir := os.Getenv("OPA_POLICY_DIR")
if policyDir == "" {
policyDir = "./policies"
}
engine, err = NewPolicyEngine(policyDir, data)
if err != nil {
log.Fatalf("初始化策略引擎失败: %v", err)
}
r := gin.Default()
// 公开端点
r.GET("/health", func(c *gin.Context) {
c.JSON(http.StatusOK, gin.H{"status": "ok"})
})
// 受保护的 API
api := r.Group("/api/v1", AuthMiddleware(), OPAMiddleware())
{
api.GET("/documents", listDocuments)
api.POST("/documents", createDocument)
api.GET("/documents/:id", getDocument)
api.PUT("/documents/:id", updateDocument)
api.DELETE("/documents/:id", deleteDocument)
}
r.Run(":9090")
}
func listDocuments(c *gin.Context) {
c.JSON(http.StatusOK, gin.H{"documents": []string{}})
}
func createDocument(c *gin.Context) {
c.JSON(http.StatusOK, gin.H{"created": true})
}
func getDocument(c *gin.Context) {
c.JSON(http.StatusOK, gin.H{"id": c.Param("id"), "content": "..."})
}
func updateDocument(c *gin.Context) {
c.JSON(http.StatusOK, gin.H{"id": c.Param("id"), "updated": true})
}
func deleteDocument(c *gin.Context) {
c.JSON(http.StatusOK, gin.H{"id": c.Param("id"), "deleted": true})
}
15.9 OPA 性能优化
Partial Evaluation(部分评估)
// Partial Evaluation 将策略预编译为更高效的形式
// 适用于:已知部分输入(如用户角色),未知部分输入(如资源属性)
package main
import (
"context"
"fmt"
"log"
"github.com/open-policy-agent/opa/v1/ast"
"github.com/open-policy-agent/opa/v1/rego"
)
func partialEvalDemo() {
ctx := context.Background()
policy := `
package authz
import rego.v1
default allow := false
allow if {
input.user.role == "admin"
}
allow if {
input.user.role == "editor"
input.resource.classification != "confidential"
}
`
// 部分评估:固定 user.role = "editor",资源属性未知
r := rego.New(
rego.Query("data.authz.allow"),
rego.Module("policy.rego", policy),
rego.Unknowns([]string{"input.resource"}),
)
pq, err := r.Partial(ctx, rego.EvalInput(map[string]interface{}{
"user": map[string]interface{}{"role": "editor"},
}))
if err != nil {
log.Fatalf("Partial evaluation failed: %v", err)
}
// 输出简化后的查询(已消除 user.role 检查)
for _, q := range pq.Queries {
fmt.Printf("Simplified query: %v\n", q)
}
// 输出类似:input.resource.classification != "confidential"
// 运行时只需检查资源属性,跳过用户角色检查
_ = ast.MustParseModule // suppress unused import
}
Bundle 优化
# Bundle 目录结构
bundle/
├── authz/
│ ├── policy.rego # 策略文件
│ └── data.json # 外部数据
├── rbac/
│ ├── policy.rego
│ └── data.json
├── .manifest # Bundle 元数据
└── .signatures.json # 签名(可选)
// .manifest
{
"revision": "v1.2.3",
"roots": ["authz", "rbac"],
"metadata": {
"created_at": "2025-01-15T10:00:00Z"
}
}
# 构建 Bundle
opa build -b bundle/ -o authz-bundle.tar.gz
# 验证 Bundle
opa inspect authz-bundle.tar.gz
# 签名 Bundle(生产环境推荐)
opa sign --signing-key key.pem --bundle bundle/
性能调优建议
优化手段 |
效果 |
适用场景 |
|---|---|---|
PreparedEvalQuery |
避免重复编译,提升 10-100x |
所有场景 |
Partial Evaluation |
减少运行时计算量 |
已知部分输入 |
Bundle |
原子更新、减少 I/O |
生产部署 |
内存存储 |
避免外部数据查询延迟 |
数据量 < 100MB |
Decision Log 采样 |
减少日志开销 |
高 QPS 场景 |
WASM 编译 |
跨语言、低延迟 |
非 Go 应用 |
15.10 OPA 部署
# docker-compose.yml — OPA + Bundle Server
services:
# Nginx 作为 Bundle Server
bundle-server:
image: nginx:alpine
volumes:
- ./bundles:/usr/share/nginx/html/bundles:ro
- ./nginx.conf:/etc/nginx/conf.d/default.conf:ro
ports:
- "8888:80"
healthcheck:
test: ["CMD", "wget", "-q", "--spider", "http://localhost:80/health"]
interval: 10s
timeout: 5s
retries: 3
opa:
image: openpolicyagent/opa:latest
command:
- "run"
- "--server"
- "--addr=0.0.0.0:8181"
- "--set=decision_logs.console=true"
- "--set=bundles.authz.service=bundle-server"
- "--set=bundles.authz.resource=/bundles/authz.tar.gz"
- "--set=bundles.authz.polling.min_delay_seconds=10"
- "--set=bundles.authz.polling.max_delay_seconds=30"
- "--set=services.bundle-server.url=http://bundle-server:80"
ports:
- "8181:8181"
depends_on:
bundle-server:
condition: service_healthy
healthcheck:
test: ["CMD", "wget", "-q", "--spider", "http://localhost:8181/health"]
interval: 10s
timeout: 5s
retries: 3
# nginx.conf
server {
listen 80;
location /health {
return 200 'ok';
add_header Content-Type text/plain;
}
location /bundles/ {
alias /usr/share/nginx/html/bundles/;
add_header ETag $upstream_http_etag;
}
}
# 构建并部署 Bundle
mkdir -p bundles
opa build -b policies/ -o bundles/authz.tar.gz
# 启动
docker compose up -d
# 验证 OPA 健康
curl http://localhost:8181/health
# 测试策略
curl -X POST http://localhost:8181/v1/data/authz/allow \
-H 'Content-Type: application/json' \
-d '{
"input": {
"user": {"roles": ["admin"]},
"action": "delete"
}
}'
# 更新策略(重新构建 Bundle,OPA 自动拉取)
opa build -b policies/ -o bundles/authz.tar.gz
15.11 OPA vs OpenFGA
维度 |
OPA |
OpenFGA |
|---|---|---|
模型 |
ABAC(策略驱动) |
ReBAC(关系驱动) |
语言 |
Rego |
DSL |
擅长 |
复杂条件逻辑 |
资源级权限继承 |
数据 |
策略 + JSON 数据 |
关系元组图 |
查询 |
“允许吗?” |
“允许吗?” + “列出所有” |
适用 |
K8s、API 策略、合规 |
文档协作、SaaS 权限 |
组合 |
✅ 可以组合使用 |
✅ 可以组合使用 |
最佳实践:用 OpenFGA 管理”谁与什么资源有什么关系”,用 OPA 管理”在什么条件下允许什么操作”。
15.12 小结
OPA 是 CNCF 毕业的通用策略引擎,将策略从代码中解耦
Rego 是声明式策略语言,支持复杂的条件逻辑
OPA 支持多种部署模式:Sidecar、独立服务、库、WASM
广泛应用于 Kubernetes Admission、API 授权、IaC 合规
OPA 和 OpenFGA 互补:OPA 擅长策略逻辑,OpenFGA 擅长关系管理
提供 Python、Java、Go 等多语言集成方案,Go 可嵌入式使用获得极致性能
策略测试(
opa test)和 Bundle 机制是生产环境的关键实践