# 第十五章:OPA — 通用策略引擎 > "策略应该与代码分离,就像数据应该与逻辑分离一样。" ```{mermaid} mindmap root((OPA)) 核心概念 Policy Data Query Decision Rego 语言 规则 函数 集合操作 测试 部署模式 Sidecar 独立服务 库模式 WASM 应用场景 K8s Admission API Gateway 微服务授权 Terraform ``` ## 15.1 OPA 是什么 OPA(Open Policy Agent)是 CNCF 毕业项目,提供通用的策略引擎。它将策略决策从应用代码中解耦出来。 ```{mermaid} flowchart LR subgraph 应用层 A1[Web 应用] A2[API Gateway] A3[微服务] A4[CI/CD Pipeline] end subgraph OPA direction TB RE[Rego 评估引擎] P[策略
Rego Files] D[数据
JSON] P --> RE D --> RE end subgraph 策略来源 B[Bundle Server] G[Git Repository] end A1 -->|"POST /v1/data/authz"| RE A2 -->|查询决策| RE A3 -->|查询决策| RE A4 -->|合规检查| RE RE -->|"{ allow: true }"| A1 B -->|定期拉取| P B -->|定期拉取| D G -.->|CI/CD 构建| B ``` ### OPA 决策流程 ```{mermaid} sequenceDiagram participant App as 应用 participant OPA as OPA Server participant Bundle as Bundle Server Note over OPA,Bundle: 启动时加载策略 OPA->>Bundle: GET /bundles/authz.tar.gz Bundle-->>OPA: 策略 + 数据 App->>OPA: POST /v1/data/authz/allow
{"input": {"user": "alice", "action": "read"}} OPA->>OPA: 评估 Rego 策略 OPA->>OPA: 合并 input + data OPA-->>App: {"result": true} Note over OPA,Bundle: 定期轮询更新 OPA->>Bundle: GET /bundles/authz.tar.gz
If-None-Match: etag Bundle-->>OPA: 304 Not Modified / 新版本 ``` ### OPA 部署模式对比 ```{mermaid} flowchart TB subgraph Sidecar["Sidecar 模式"] direction LR subgraph Pod1["Kubernetes Pod"] APP1[应用容器] -->|localhost| OPA1[OPA Sidecar] end end subgraph Library["Library 模式(Go 嵌入)"] direction LR subgraph Process["应用进程"] APP2[应用代码] -->|函数调用| OPA2[OPA 库] end end subgraph Central["Central 模式"] direction LR APP3[应用 A] -->|HTTP/gRPC| OPAC[OPA 集群] APP4[应用 B] -->|HTTP/gRPC| OPAC APP5[应用 C] -->|HTTP/gRPC| OPAC end style Sidecar fill:#e8f5e9 style Library fill:#e1f5fe style Central fill:#fff3e0 ``` ## 15.2 Rego 语言 Rego 是 OPA 的策略语言,声明式风格: ### 基本规则 ```rego package authz import rego.v1 # 默认拒绝 default allow := false # 管理员允许所有操作 allow if { input.user.role == "admin" } # 用户可以读取自己的数据 allow if { input.action == "read" input.resource.owner == input.user.id } # 编辑者可以编辑非机密文档 allow if { input.action == "edit" input.user.role == "editor" input.resource.classification != "confidential" } ``` ### 输入和查询 ```json // 输入(input) { "user": { "id": "alice", "role": "editor", "department": "engineering" }, "action": "edit", "resource": { "type": "document", "id": "doc-123", "owner": "bob", "classification": "internal" } } // 查询:data.authz.allow → true ``` ### 高级特性 ```rego package authz import rego.v1 # 集合推导:获取用户的所有权限 user_permissions contains perm if { some role in input.user.roles some perm in data.role_permissions[role] } # 函数 is_owner(resource) if { resource.owner == input.user.id } # 条件组合 allow if { input.action == "delete" is_owner(input.resource) input.resource.status != "archived" } # 基于时间的策略 allow if { input.action == "access" time.weekday(time.now_ns()) in ["Monday", "Tuesday", "Wednesday", "Thursday", "Friday"] hour := time.clock(time.now_ns())[0] hour >= 9 hour < 18 } ``` ### RBAC 策略 ```rego package rbac import rego.v1 # 角色-权限映射(外部数据) # data.json: # { # "role_permissions": { # "admin": ["read", "write", "delete", "admin"], # "editor": ["read", "write"], # "viewer": ["read"] # }, # "role_hierarchy": { # "admin": ["editor"], # "editor": ["viewer"] # } # } default allow := false # 直接权限检查 allow if { some role in input.user.roles some perm in data.role_permissions[role] perm == input.action } # 通过角色层次继承权限 allow if { some role in input.user.roles some inherited_role in data.role_hierarchy[role] some perm in data.role_permissions[inherited_role] perm == input.action } # 获取用户的有效权限集合(含继承) effective_permissions contains perm if { some role in input.user.roles some perm in data.role_permissions[role] } effective_permissions contains perm if { some role in input.user.roles some inherited in data.role_hierarchy[role] some perm in data.role_permissions[inherited] } ``` ### ABAC 策略(基于时间、IP、部门) ```rego package abac import rego.v1 import data.ip_allowlist import data.department_resources default allow := false # 规则 1:仅在工作时间允许访问敏感资源 allow if { input.resource.classification == "sensitive" is_business_hours is_allowed_ip is_same_department } # 规则 2:非敏感资源仅需认证 allow if { input.resource.classification != "sensitive" input.user.authenticated == true } # 规则 3:VPN 用户可以在非工作时间访问 allow if { input.network.vpn == true input.user.authenticated == true is_same_department } # ── 辅助规则 ───────────────────────────────────────────────── is_business_hours if { day := time.weekday(time.now_ns()) day in ["Monday", "Tuesday", "Wednesday", "Thursday", "Friday"] hour := time.clock(time.now_ns())[0] hour >= 9 hour < 18 } is_allowed_ip if { net.cidr_contains(ip_allowlist[_], input.network.source_ip) } is_same_department if { input.user.department == input.resource.department } # 返回拒绝原因(便于调试) reasons contains "outside business hours" if { input.resource.classification == "sensitive" not is_business_hours } reasons contains "IP not in allowlist" if { not is_allowed_ip } reasons contains "department mismatch" if { not is_same_department } ``` ### API 路由授权策略 ```rego package api_authz import rego.v1 default allow := false # 公开端点 allow if { is_public_endpoint } # 已认证用户的 API 访问 allow if { input.token.valid is_allowed_route } # ── 公开端点列表 ───────────────────────────────────────────── is_public_endpoint if { input.path in ["/health", "/metrics", "/docs", "/openapi.json"] } is_public_endpoint if { input.method == "POST" input.path in ["/auth/login", "/auth/register"] } # ── 路由权限矩阵 ──────────────────────────────────────────── route_permissions := { "GET /api/v1/users": {"roles": ["admin", "viewer"]}, "POST /api/v1/users": {"roles": ["admin"]}, "GET /api/v1/documents": {"roles": ["admin", "editor", "viewer"]}, "POST /api/v1/documents": {"roles": ["admin", "editor"]}, "DELETE /api/v1/documents": {"roles": ["admin"]}, } is_allowed_route if { route_key := sprintf("%s %s", [input.method, input.path]) required := route_permissions[route_key] some role in input.token.roles role in required.roles } # 通配符路由匹配(/api/v1/documents/*) is_allowed_route if { input.method == "GET" glob.match("/api/v1/documents/*", ["/"], input.path) some role in input.token.roles role in ["admin", "editor", "viewer"] } is_allowed_route if { input.method in ["PUT", "PATCH"] glob.match("/api/v1/documents/*", ["/"], input.path) some role in input.token.roles role in ["admin", "editor"] } # ── 速率限制 ───────────────────────────────────────────────── rate_limit := 1000 if { some role in input.token.roles role == "premium" } else := 100 ``` ### Rego 测试 ```rego package authz_test import rego.v1 import data.authz test_admin_allowed if { authz.allow with input as { "user": {"role": "admin"}, "action": "delete", "resource": {"id": "doc-1"} } } test_viewer_cannot_delete if { not authz.allow with input as { "user": {"role": "viewer"}, "action": "delete", "resource": {"id": "doc-1"} } } test_owner_can_read if { authz.allow with input as { "user": {"id": "alice", "role": "viewer"}, "action": "read", "resource": {"owner": "alice"} } } ``` ```bash # 运行测试 opa test . -v ``` ## 15.3 OPA 策略测试最佳实践 ### 目录结构 ``` policies/ ├── authz.rego # 主策略 ├── authz_test.rego # 主策略测试 ├── rbac.rego # RBAC 策略 ├── rbac_test.rego # RBAC 测试 ├── abac.rego # ABAC 策略 ├── abac_test.rego # ABAC 测试 ├── data.json # 外部数据 └── Makefile ``` ### 完整测试示例 ```rego package rbac_test import rego.v1 import data.rbac # ── 测试数据 ───────────────────────────────────────────────── mock_role_permissions := { "admin": ["read", "write", "delete"], "editor": ["read", "write"], "viewer": ["read"], } mock_role_hierarchy := { "admin": ["editor"], "editor": ["viewer"], } # ── 正向测试 ───────────────────────────────────────────────── test_admin_can_delete if { rbac.allow with input as { "user": {"roles": ["admin"]}, "action": "delete", } with data.role_permissions as mock_role_permissions with data.role_hierarchy as mock_role_hierarchy } test_editor_can_write if { rbac.allow with input as { "user": {"roles": ["editor"]}, "action": "write", } with data.role_permissions as mock_role_permissions with data.role_hierarchy as mock_role_hierarchy } # 测试角色继承:editor 继承 viewer 的 read 权限 test_editor_inherits_read if { rbac.allow with input as { "user": {"roles": ["editor"]}, "action": "read", } with data.role_permissions as mock_role_permissions with data.role_hierarchy as mock_role_hierarchy } # ── 反向测试 ───────────────────────────────────────────────── test_viewer_cannot_write if { not rbac.allow with input as { "user": {"roles": ["viewer"]}, "action": "write", } with data.role_permissions as mock_role_permissions with data.role_hierarchy as mock_role_hierarchy } test_viewer_cannot_delete if { not rbac.allow with input as { "user": {"roles": ["viewer"]}, "action": "delete", } with data.role_permissions as mock_role_permissions with data.role_hierarchy as mock_role_hierarchy } test_no_roles_denied if { not rbac.allow with input as { "user": {"roles": []}, "action": "read", } with data.role_permissions as mock_role_permissions with data.role_hierarchy as mock_role_hierarchy } # ── 有效权限集合测试 ───────────────────────────────────────── test_admin_effective_permissions if { perms := rbac.effective_permissions with input as { "user": {"roles": ["admin"]}, } with data.role_permissions as mock_role_permissions with data.role_hierarchy as mock_role_hierarchy perms == {"read", "write", "delete"} } ``` ```bash # 运行所有测试(详细输出) opa test . -v # 运行测试并生成覆盖率报告 opa test . --coverage --format=json # 仅运行匹配的测试 opa test . -v --run "test_admin" # 性能基准测试 opa bench 'data.authz.allow' -i input.json -d policies/ ``` ```makefile # Makefile .PHONY: test lint fmt check test: opa test policies/ -v coverage: opa test policies/ --coverage --format=json | jq '.coverage' lint: opa check policies/ --strict fmt: opa fmt policies/ --write check: fmt lint test @echo "All checks passed!" ``` ## 15.4 OPA 部署模式 | 模式 | 描述 | 延迟 | 适用场景 | |------|------|------|---------| | Sidecar | 与应用同 Pod | 极低(本地) | Kubernetes 微服务 | | 独立服务 | 独立部署的 OPA 服务 | 低(网络) | 集中策略管理 | | 库模式 | 嵌入应用进程 | 极低 | Go 应用 | | WASM | 编译为 WebAssembly | 极低 | 边缘计算、浏览器 | ## 15.5 应用场景 ### Kubernetes Admission Control(Gatekeeper) ```yaml # 禁止使用 latest 标签的镜像 apiVersion: constraints.gatekeeper.sh/v1beta1 kind: K8sDisallowedTags metadata: name: no-latest-tag spec: match: kinds: - apiGroups: [""] kinds: ["Pod"] parameters: tags: ["latest"] ``` ### API Gateway 授权 ```rego package gateway import rego.v1 default allow := false # 公开端点无需认证 allow if { input.path in ["/health", "/metrics", "/docs"] } # 已认证用户可以访问 API allow if { input.token.valid input.method == "GET" startswith(input.path, "/api/v1/") } # 管理员可以访问管理端点 allow if { input.token.valid "admin" in input.token.roles startswith(input.path, "/admin/") } # 速率限制 rate_limit := limit if { "premium" in input.token.roles limit := 1000 } else := limit if { limit := 100 } ``` ### Terraform 策略检查(Conftest) ```rego package terraform import rego.v1 # 禁止公开的 S3 存储桶 deny contains msg if { some resource in input.resource_changes resource.type == "aws_s3_bucket" resource.change.after.acl == "public-read" msg := sprintf("S3 bucket %s must not be public", [resource.address]) } # 要求所有资源有标签 deny contains msg if { some resource in input.resource_changes not resource.change.after.tags.Environment msg := sprintf("Resource %s must have Environment tag", [resource.address]) } ``` ## 15.6 Python 集成 ### OPA REST API 客户端 ```python """OPA REST API 客户端 — 支持策略查询、数据管理、健康检查""" import httpx from typing import Any, Optional from dataclasses import dataclass @dataclass class OPADecision: result: Any decision_id: Optional[str] = None class OPAClient: """OPA REST API 客户端""" def __init__(self, base_url: str = "http://localhost:8181", timeout: float = 5.0): self.base_url = base_url.rstrip("/") self.timeout = timeout self._client = httpx.AsyncClient( base_url=self.base_url, timeout=timeout, headers={"Content-Type": "application/json"}, ) async def close(self): await self._client.aclose() async def __aenter__(self): return self async def __aexit__(self, *args): await self.close() async def check(self, policy_path: str, input_data: dict) -> OPADecision: """查询策略决策 Args: policy_path: 策略路径,如 "authz/allow" input_data: 输入数据 """ url = f"/v1/data/{policy_path}" response = await self._client.post(url, json={"input": input_data}) response.raise_for_status() body = response.json() return OPADecision( result=body.get("result"), decision_id=body.get("decision_id"), ) async def query(self, query: str, input_data: Optional[dict] = None) -> Any: """执行任意 Rego 查询""" payload: dict = {"query": query} if input_data: payload["input"] = input_data response = await self._client.post("/v1/query", json=payload) response.raise_for_status() return response.json().get("result") async def put_data(self, path: str, data: Any) -> None: """上传外部数据""" response = await self._client.put(f"/v1/data/{path}", json=data) response.raise_for_status() async def get_data(self, path: str) -> Any: """获取外部数据""" response = await self._client.get(f"/v1/data/{path}") response.raise_for_status() return response.json().get("result") async def put_policy(self, policy_id: str, rego_code: str) -> None: """上传策略""" response = await self._client.put( f"/v1/policies/{policy_id}", content=rego_code.encode(), headers={"Content-Type": "text/plain"}, ) response.raise_for_status() async def health(self) -> bool: """健康检查""" try: response = await self._client.get("/health") return response.status_code == 200 except Exception: return False # ── 使用示例 ───────────────────────────────────────────────── async def demo(): async with OPAClient("http://localhost:8181") as opa: # 健康检查 healthy = await opa.health() print(f"OPA healthy: {healthy}") # 上传角色权限数据 await opa.put_data("role_permissions", { "admin": ["read", "write", "delete"], "editor": ["read", "write"], "viewer": ["read"], }) # 上传策略 await opa.put_policy("authz", """ package authz import rego.v1 default allow := false allow if { some role in input.user.roles some perm in data.role_permissions[role] perm == input.action } """) # 查询决策 decision = await opa.check("authz/allow", { "user": {"roles": ["editor"]}, "action": "write", }) print(f"Editor can write: {decision.result}") # True decision = await opa.check("authz/allow", { "user": {"roles": ["viewer"]}, "action": "delete", }) print(f"Viewer can delete: {decision.result}") # False ``` ### FastAPI + OPA 授权中间件 ```python """FastAPI + OPA 授权中间件完整示例""" from contextlib import asynccontextmanager from typing import Optional import httpx from fastapi import FastAPI, Depends, HTTPException, Request from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials # ── OPA 客户端 ─────────────────────────────────────────────── OPA_URL = "http://localhost:8181" opa_http: Optional[httpx.AsyncClient] = None @asynccontextmanager async def lifespan(app: FastAPI): global opa_http opa_http = httpx.AsyncClient(base_url=OPA_URL, timeout=5.0) yield await opa_http.aclose() app = FastAPI(title="OPA Demo", lifespan=lifespan) security = HTTPBearer() # ── 用户解析 ───────────────────────────────────────────────── async def get_current_user( credentials: HTTPAuthorizationCredentials = Depends(security), ) -> dict: # 实际项目中应验证 JWT token = credentials.credentials return {"id": token, "roles": ["editor"], "department": "engineering"} # ── OPA 授权依赖 ───────────────────────────────────────────── class OPAAuthorize: """可复用的 OPA 授权依赖""" def __init__(self, policy_path: str = "authz/allow"): self.policy_path = policy_path async def __call__( self, request: Request, user: dict = Depends(get_current_user), ) -> dict: opa_input = { "input": { "user": user, "method": request.method, "path": request.url.path, "headers": dict(request.headers), } } response = await opa_http.post( f"/v1/data/{self.policy_path}", json=opa_input, ) if response.status_code != 200: raise HTTPException(status_code=503, detail="Policy engine unavailable") result = response.json().get("result", False) if not result: raise HTTPException(status_code=403, detail="Denied by policy") return user # ── 路由 ───────────────────────────────────────────────────── authorize = OPAAuthorize("api_authz/allow") @app.get("/api/v1/documents") async def list_documents(user: dict = Depends(authorize)): return {"documents": [...], "user": user["id"]} @app.post("/api/v1/documents") async def create_document(user: dict = Depends(authorize)): return {"created": True, "user": user["id"]} @app.delete("/api/v1/documents/{doc_id}") async def delete_document(doc_id: str, user: dict = Depends(authorize)): return {"deleted": doc_id, "user": user["id"]} # ── 管理端点:动态更新策略数据 ──────────────────────────────── @app.put("/admin/opa/data/{path:path}") async def update_opa_data(path: str, data: dict): """动态更新 OPA 外部数据""" response = await opa_http.put(f"/v1/data/{path}", json=data) if response.status_code not in (200, 204): raise HTTPException(status_code=500, detail="Failed to update OPA data") return {"updated": path} ``` ## 15.7 Java 集成 ### OPA REST API 客户端(OkHttp) ```java // build.gradle // dependencies { // implementation 'com.squareup.okhttp3:okhttp:4.12.0' // implementation 'com.fasterxml.jackson.core:jackson-databind:2.17.0' // } import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; import okhttp3.*; import java.io.IOException; import java.util.Map; import java.util.concurrent.TimeUnit; public class OPAClient implements AutoCloseable { private static final MediaType JSON_TYPE = MediaType.get("application/json"); private static final MediaType TEXT_TYPE = MediaType.get("text/plain"); private final String baseUrl; private final OkHttpClient httpClient; private final ObjectMapper mapper; public OPAClient(String baseUrl) { this.baseUrl = baseUrl.replaceAll("/+$", ""); this.httpClient = new OkHttpClient.Builder() .connectTimeout(5, TimeUnit.SECONDS) .readTimeout(5, TimeUnit.SECONDS) .build(); this.mapper = new ObjectMapper(); } /** * 查询策略决策 */ public boolean check(String policyPath, Map input) throws IOException { Map body = Map.of("input", input); String json = mapper.writeValueAsString(body); Request request = new Request.Builder() .url(baseUrl + "/v1/data/" + policyPath) .post(RequestBody.create(json, JSON_TYPE)) .build(); try (Response response = httpClient.newCall(request).execute()) { if (!response.isSuccessful()) { throw new IOException("OPA request failed: " + response.code()); } JsonNode node = mapper.readTree(response.body().string()); return node.path("result").asBoolean(false); } } /** * 查询策略并返回完整结果 */ public JsonNode query(String policyPath, Map input) throws IOException { Map body = Map.of("input", input); String json = mapper.writeValueAsString(body); Request request = new Request.Builder() .url(baseUrl + "/v1/data/" + policyPath) .post(RequestBody.create(json, JSON_TYPE)) .build(); try (Response response = httpClient.newCall(request).execute()) { if (!response.isSuccessful()) { throw new IOException("OPA request failed: " + response.code()); } return mapper.readTree(response.body().string()).path("result"); } } /** * 上传外部数据 */ public void putData(String path, Object data) throws IOException { String json = mapper.writeValueAsString(data); Request request = new Request.Builder() .url(baseUrl + "/v1/data/" + path) .put(RequestBody.create(json, JSON_TYPE)) .build(); try (Response response = httpClient.newCall(request).execute()) { if (!response.isSuccessful()) { throw new IOException("Failed to put data: " + response.code()); } } } /** * 上传策略 */ public void putPolicy(String policyId, String regoCode) throws IOException { Request request = new Request.Builder() .url(baseUrl + "/v1/policies/" + policyId) .put(RequestBody.create(regoCode, TEXT_TYPE)) .build(); try (Response response = httpClient.newCall(request).execute()) { if (!response.isSuccessful()) { throw new IOException("Failed to put policy: " + response.code()); } } } /** * 健康检查 */ public boolean isHealthy() { Request request = new Request.Builder() .url(baseUrl + "/health") .get() .build(); try (Response response = httpClient.newCall(request).execute()) { return response.isSuccessful(); } catch (IOException e) { return false; } } @Override public void close() { httpClient.dispatcher().executorService().shutdown(); httpClient.connectionPool().evictAll(); } // ── 使用示例 ───────────────────────────────────────────── public static void main(String[] args) throws Exception { try (var opa = new OPAClient("http://localhost:8181")) { // 健康检查 System.out.println("OPA healthy: " + opa.isHealthy()); // 上传数据 opa.putData("role_permissions", Map.of( "admin", java.util.List.of("read", "write", "delete"), "editor", java.util.List.of("read", "write"), "viewer", java.util.List.of("read") )); // 查询决策 boolean allowed = opa.check("authz/allow", Map.of( "user", Map.of("roles", java.util.List.of("editor")), "action", "write" )); System.out.println("Editor can write: " + allowed); // 查询完整结果 JsonNode result = opa.query("authz", Map.of( "user", Map.of("roles", java.util.List.of("admin")), "action", "delete" )); System.out.println("Full result: " + result.toPrettyString()); } } } ``` ### Spring Boot + OPA 集成 ```java // ── application.yml ───────────────────────────────────────── // opa: // base-url: http://localhost:8181 // policy-path: api_authz/allow // timeout: 5s import org.springframework.boot.context.properties.ConfigurationProperties; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.web.client.RestClient; import java.time.Duration; @ConfigurationProperties(prefix = "opa") record OPAProperties(String baseUrl, String policyPath, Duration timeout) {} @Configuration class OPAConfig { @Bean RestClient opaRestClient(OPAProperties props) { return RestClient.builder() .baseUrl(props.baseUrl()) .build(); } } // ── OPA 授权服务 ──────────────────────────────────────────── import com.fasterxml.jackson.annotation.JsonProperty; import org.springframework.stereotype.Service; import org.springframework.web.client.RestClient; import java.util.Map; @Service class OPAAuthorizationService { private final RestClient restClient; private final OPAProperties props; OPAAuthorizationService(RestClient opaRestClient, OPAProperties props) { this.restClient = opaRestClient; this.props = props; } record OPARequest(Map input) {} record OPAResponse(@JsonProperty("result") Object result) {} public boolean isAllowed(Map input) { try { OPAResponse response = restClient.post() .uri("/v1/data/{path}", props.policyPath()) .body(new OPARequest(input)) .retrieve() .body(OPAResponse.class); return response != null && Boolean.TRUE.equals(response.result()); } catch (Exception e) { // 策略引擎不可用时默认拒绝(fail-closed) return false; } } @SuppressWarnings("unchecked") public Map evaluate(String policyPath, Map input) { OPAResponse response = restClient.post() .uri("/v1/data/{path}", policyPath) .body(new OPARequest(input)) .retrieve() .body(OPAResponse.class); if (response != null && response.result() instanceof Map) { return (Map) response.result(); } return Map.of(); } } // ── 自定义注解 + 拦截器 ───────────────────────────────────── import java.lang.annotation.*; @Target(ElementType.METHOD) @Retention(RetentionPolicy.RUNTIME) @Documented @interface OPAPolicy { /** OPA 策略路径,默认使用全局配置 */ String value() default ""; } import jakarta.servlet.http.HttpServletRequest; import jakarta.servlet.http.HttpServletResponse; import org.springframework.http.HttpStatus; import org.springframework.security.core.context.SecurityContextHolder; import org.springframework.stereotype.Component; import org.springframework.web.method.HandlerMethod; import org.springframework.web.servlet.HandlerInterceptor; import java.util.Map; @Component class OPAInterceptor implements HandlerInterceptor { private final OPAAuthorizationService opaService; OPAInterceptor(OPAAuthorizationService opaService) { this.opaService = opaService; } @Override public boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler) throws Exception { if (!(handler instanceof HandlerMethod method)) { return true; } OPAPolicy annotation = method.getMethodAnnotation(OPAPolicy.class); if (annotation == null) { return true; // 无注解则跳过 } var auth = SecurityContextHolder.getContext().getAuthentication(); Map input = Map.of( "user", Map.of( "id", auth.getName(), "roles", auth.getAuthorities().stream() .map(Object::toString).toList() ), "method", request.getMethod(), "path", request.getRequestURI() ); boolean allowed = opaService.isAllowed(input); if (!allowed) { response.setStatus(HttpStatus.FORBIDDEN.value()); response.getWriter().write("{\"error\": \"Denied by OPA policy\"}"); return false; } return true; } } // ── 注册拦截器 ────────────────────────────────────────────── import org.springframework.context.annotation.Configuration; import org.springframework.web.servlet.config.annotation.InterceptorRegistry; import org.springframework.web.servlet.config.annotation.WebMvcConfigurer; @Configuration class WebConfig implements WebMvcConfigurer { private final OPAInterceptor opaInterceptor; WebConfig(OPAInterceptor opaInterceptor) { this.opaInterceptor = opaInterceptor; } @Override public void addInterceptors(InterceptorRegistry registry) { registry.addInterceptor(opaInterceptor) .addPathPatterns("/api/**"); } } // ── Controller 使用示例 ───────────────────────────────────── import org.springframework.web.bind.annotation.*; import java.util.Map; @RestController @RequestMapping("/api/v1/documents") class DocumentController { @GetMapping @OPAPolicy public Map listDocuments() { return Map.of("documents", java.util.List.of()); } @PostMapping @OPAPolicy public Map createDocument(@RequestBody Map body) { return Map.of("created", true); } @DeleteMapping("/{docId}") @OPAPolicy("api_authz/allow_delete") public Map deleteDocument(@PathVariable String docId) { return Map.of("deleted", docId); } } ``` ## 15.8 Go 集成 ### OPA Go 库(嵌入式 OPA) ```go package main import ( "context" "encoding/json" "fmt" "log" "github.com/open-policy-agent/opa/v1/ast" "github.com/open-policy-agent/opa/v1/rego" "github.com/open-policy-agent/opa/v1/storage/inmem" ) // ── 嵌入式 OPA 评估器 ────────────────────────────────────── type OPAEvaluator struct { query rego.PreparedEvalQuery } // NewOPAEvaluator 创建一个预编译的 OPA 评估器 func NewOPAEvaluator(policy string, queryStr string, data map[string]interface{}) (*OPAEvaluator, error) { ctx := context.Background() // 创建内存存储(外部数据) store := inmem.NewFromObject(data) // 预编译查询(性能优化:只编译一次) prepared, err := rego.New( rego.Query(queryStr), rego.Module("policy.rego", policy), rego.Store(store), ).PrepareForEval(ctx) if err != nil { return nil, fmt.Errorf("failed to prepare query: %w", err) } return &OPAEvaluator{query: prepared}, nil } // Evaluate 评估策略 func (e *OPAEvaluator) Evaluate(ctx context.Context, input map[string]interface{}) (bool, error) { results, err := e.query.Eval(ctx, rego.EvalInput(input)) if err != nil { return false, fmt.Errorf("evaluation failed: %w", err) } if len(results) == 0 || len(results[0].Expressions) == 0 { return false, nil } allowed, ok := results[0].Expressions[0].Value.(bool) return ok && allowed, nil } // EvaluateFull 评估策略并返回完整结果 func (e *OPAEvaluator) EvaluateFull(ctx context.Context, input map[string]interface{}) (interface{}, error) { results, err := e.query.Eval(ctx, rego.EvalInput(input)) if err != nil { return nil, fmt.Errorf("evaluation failed: %w", err) } if len(results) == 0 || len(results[0].Expressions) == 0 { return nil, nil } return results[0].Expressions[0].Value, nil } func main() { ctx := context.Background() // 策略定义 policy := ` package authz import rego.v1 default allow := false allow if { some role in input.user.roles some perm in data.role_permissions[role] perm == input.action } allow if { input.user.id == input.resource.owner } ` // 外部数据 data := map[string]interface{}{ "role_permissions": map[string]interface{}{ "admin": []string{"read", "write", "delete"}, "editor": []string{"read", "write"}, "viewer": []string{"read"}, }, } // 创建评估器 evaluator, err := NewOPAEvaluator(policy, "data.authz.allow", data) if err != nil { log.Fatalf("创建评估器失败: %v", err) } // 测试用例 testCases := []struct { name string input map[string]interface{} want bool }{ { name: "admin can delete", input: map[string]interface{}{ "user": map[string]interface{}{"id": "alice", "roles": []string{"admin"}}, "action": "delete", "resource": map[string]interface{}{"id": "doc-1", "owner": "bob"}, }, want: true, }, { name: "editor can write", input: map[string]interface{}{ "user": map[string]interface{}{"id": "bob", "roles": []string{"editor"}}, "action": "write", "resource": map[string]interface{}{"id": "doc-1", "owner": "alice"}, }, want: true, }, { name: "viewer cannot delete", input: map[string]interface{}{ "user": map[string]interface{}{"id": "carol", "roles": []string{"viewer"}}, "action": "delete", "resource": map[string]interface{}{"id": "doc-1", "owner": "alice"}, }, want: false, }, { name: "owner can do anything", input: map[string]interface{}{ "user": map[string]interface{}{"id": "alice", "roles": []string{"viewer"}}, "action": "delete", "resource": map[string]interface{}{"id": "doc-1", "owner": "alice"}, }, want: true, }, } for _, tc := range testCases { allowed, err := evaluator.Evaluate(ctx, tc.input) if err != nil { log.Printf("❌ %s: error: %v", tc.name, err) continue } status := "✅" if allowed != tc.want { status = "❌" } fmt.Printf("%s %s: allowed=%v (want=%v)\n", status, tc.name, allowed, tc.want) } // 性能测试:预编译查询非常快 input := map[string]interface{}{ "user": map[string]interface{}{"id": "alice", "roles": []string{"admin"}}, "action": "read", "resource": map[string]interface{}{"id": "doc-1"}, } _ = json.RawMessage{} // suppress unused import _ = ast.MustParseModule // suppress unused import // 基准:预编译后每次评估通常 < 0.1ms for i := 0; i < 1000; i++ { _, _ = evaluator.Evaluate(ctx, input) } fmt.Println("1000 次评估完成") } ``` ### Gin + OPA 授权中间件 ```go package main import ( "context" "fmt" "log" "net/http" "os" "github.com/gin-gonic/gin" "github.com/open-policy-agent/opa/v1/rego" "github.com/open-policy-agent/opa/v1/storage/inmem" ) // ── 嵌入式 OPA 引擎 ──────────────────────────────────────── type PolicyEngine struct { query rego.PreparedEvalQuery } func NewPolicyEngine(policyDir string, data map[string]interface{}) (*PolicyEngine, error) { ctx := context.Background() store := inmem.NewFromObject(data) // 从目录加载所有 .rego 文件 prepared, err := rego.New( rego.Query("data.api_authz.allow"), rego.Load([]string{policyDir}, nil), rego.Store(store), ).PrepareForEval(ctx) if err != nil { return nil, fmt.Errorf("failed to prepare policy engine: %w", err) } return &PolicyEngine{query: prepared}, nil } func (pe *PolicyEngine) IsAllowed(ctx context.Context, input map[string]interface{}) bool { results, err := pe.query.Eval(ctx, rego.EvalInput(input)) if err != nil { log.Printf("OPA evaluation error: %v", err) return false // fail-closed } if len(results) == 0 || len(results[0].Expressions) == 0 { return false } allowed, ok := results[0].Expressions[0].Value.(bool) return ok && allowed } // ── 全局策略引擎 ──────────────────────────────────────────── var engine *PolicyEngine // ── OPA 授权中间件 ────────────────────────────────────────── func OPAMiddleware() gin.HandlerFunc { return func(c *gin.Context) { userID, _ := c.Get("userID") userRoles, _ := c.Get("userRoles") input := map[string]interface{}{ "user": map[string]interface{}{ "id": userID, "roles": userRoles, }, "method": c.Request.Method, "path": c.Request.URL.Path, "token": map[string]interface{}{ "valid": true, "roles": userRoles, }, } if !engine.IsAllowed(c.Request.Context(), input) { c.AbortWithStatusJSON(http.StatusForbidden, gin.H{ "error": "Access denied by policy", }) return } c.Next() } } // ── 认证中间件(简化) ───────────────────────────────────── func AuthMiddleware() gin.HandlerFunc { return func(c *gin.Context) { token := c.GetHeader("Authorization") if token == "" { c.AbortWithStatusJSON(http.StatusUnauthorized, gin.H{"error": "missing token"}) return } // 实际项目中应验证 JWT c.Set("userID", token) c.Set("userRoles", []string{"editor"}) c.Next() } } func main() { // 外部数据 data := map[string]interface{}{ "role_permissions": map[string]interface{}{ "admin": []string{"read", "write", "delete"}, "editor": []string{"read", "write"}, "viewer": []string{"read"}, }, } // 初始化策略引擎 var err error policyDir := os.Getenv("OPA_POLICY_DIR") if policyDir == "" { policyDir = "./policies" } engine, err = NewPolicyEngine(policyDir, data) if err != nil { log.Fatalf("初始化策略引擎失败: %v", err) } r := gin.Default() // 公开端点 r.GET("/health", func(c *gin.Context) { c.JSON(http.StatusOK, gin.H{"status": "ok"}) }) // 受保护的 API api := r.Group("/api/v1", AuthMiddleware(), OPAMiddleware()) { api.GET("/documents", listDocuments) api.POST("/documents", createDocument) api.GET("/documents/:id", getDocument) api.PUT("/documents/:id", updateDocument) api.DELETE("/documents/:id", deleteDocument) } r.Run(":9090") } func listDocuments(c *gin.Context) { c.JSON(http.StatusOK, gin.H{"documents": []string{}}) } func createDocument(c *gin.Context) { c.JSON(http.StatusOK, gin.H{"created": true}) } func getDocument(c *gin.Context) { c.JSON(http.StatusOK, gin.H{"id": c.Param("id"), "content": "..."}) } func updateDocument(c *gin.Context) { c.JSON(http.StatusOK, gin.H{"id": c.Param("id"), "updated": true}) } func deleteDocument(c *gin.Context) { c.JSON(http.StatusOK, gin.H{"id": c.Param("id"), "deleted": true}) } ``` ## 15.9 OPA 性能优化 ### Partial Evaluation(部分评估) ```go // Partial Evaluation 将策略预编译为更高效的形式 // 适用于:已知部分输入(如用户角色),未知部分输入(如资源属性) package main import ( "context" "fmt" "log" "github.com/open-policy-agent/opa/v1/ast" "github.com/open-policy-agent/opa/v1/rego" ) func partialEvalDemo() { ctx := context.Background() policy := ` package authz import rego.v1 default allow := false allow if { input.user.role == "admin" } allow if { input.user.role == "editor" input.resource.classification != "confidential" } ` // 部分评估:固定 user.role = "editor",资源属性未知 r := rego.New( rego.Query("data.authz.allow"), rego.Module("policy.rego", policy), rego.Unknowns([]string{"input.resource"}), ) pq, err := r.Partial(ctx, rego.EvalInput(map[string]interface{}{ "user": map[string]interface{}{"role": "editor"}, })) if err != nil { log.Fatalf("Partial evaluation failed: %v", err) } // 输出简化后的查询(已消除 user.role 检查) for _, q := range pq.Queries { fmt.Printf("Simplified query: %v\n", q) } // 输出类似:input.resource.classification != "confidential" // 运行时只需检查资源属性,跳过用户角色检查 _ = ast.MustParseModule // suppress unused import } ``` ### Bundle 优化 ``` # Bundle 目录结构 bundle/ ├── authz/ │ ├── policy.rego # 策略文件 │ └── data.json # 外部数据 ├── rbac/ │ ├── policy.rego │ └── data.json ├── .manifest # Bundle 元数据 └── .signatures.json # 签名(可选) ``` ```json // .manifest { "revision": "v1.2.3", "roots": ["authz", "rbac"], "metadata": { "created_at": "2025-01-15T10:00:00Z" } } ``` ```bash # 构建 Bundle opa build -b bundle/ -o authz-bundle.tar.gz # 验证 Bundle opa inspect authz-bundle.tar.gz # 签名 Bundle(生产环境推荐) opa sign --signing-key key.pem --bundle bundle/ ``` ### 性能调优建议 | 优化手段 | 效果 | 适用场景 | |---------|------|---------| | **PreparedEvalQuery** | 避免重复编译,提升 10-100x | 所有场景 | | **Partial Evaluation** | 减少运行时计算量 | 已知部分输入 | | **Bundle** | 原子更新、减少 I/O | 生产部署 | | **内存存储** | 避免外部数据查询延迟 | 数据量 < 100MB | | **Decision Log 采样** | 减少日志开销 | 高 QPS 场景 | | **WASM 编译** | 跨语言、低延迟 | 非 Go 应用 | ## 15.10 OPA 部署 ```yaml # docker-compose.yml — OPA + Bundle Server services: # Nginx 作为 Bundle Server bundle-server: image: nginx:alpine volumes: - ./bundles:/usr/share/nginx/html/bundles:ro - ./nginx.conf:/etc/nginx/conf.d/default.conf:ro ports: - "8888:80" healthcheck: test: ["CMD", "wget", "-q", "--spider", "http://localhost:80/health"] interval: 10s timeout: 5s retries: 3 opa: image: openpolicyagent/opa:latest command: - "run" - "--server" - "--addr=0.0.0.0:8181" - "--set=decision_logs.console=true" - "--set=bundles.authz.service=bundle-server" - "--set=bundles.authz.resource=/bundles/authz.tar.gz" - "--set=bundles.authz.polling.min_delay_seconds=10" - "--set=bundles.authz.polling.max_delay_seconds=30" - "--set=services.bundle-server.url=http://bundle-server:80" ports: - "8181:8181" depends_on: bundle-server: condition: service_healthy healthcheck: test: ["CMD", "wget", "-q", "--spider", "http://localhost:8181/health"] interval: 10s timeout: 5s retries: 3 ``` ```nginx # nginx.conf server { listen 80; location /health { return 200 'ok'; add_header Content-Type text/plain; } location /bundles/ { alias /usr/share/nginx/html/bundles/; add_header ETag $upstream_http_etag; } } ``` ```bash # 构建并部署 Bundle mkdir -p bundles opa build -b policies/ -o bundles/authz.tar.gz # 启动 docker compose up -d # 验证 OPA 健康 curl http://localhost:8181/health # 测试策略 curl -X POST http://localhost:8181/v1/data/authz/allow \ -H 'Content-Type: application/json' \ -d '{ "input": { "user": {"roles": ["admin"]}, "action": "delete" } }' # 更新策略(重新构建 Bundle,OPA 自动拉取) opa build -b policies/ -o bundles/authz.tar.gz ``` ## 15.11 OPA vs OpenFGA | 维度 | OPA | OpenFGA | |------|-----|---------| | 模型 | ABAC(策略驱动) | ReBAC(关系驱动) | | 语言 | Rego | DSL | | 擅长 | 复杂条件逻辑 | 资源级权限继承 | | 数据 | 策略 + JSON 数据 | 关系元组图 | | 查询 | "允许吗?" | "允许吗?" + "列出所有" | | 适用 | K8s、API 策略、合规 | 文档协作、SaaS 权限 | | 组合 | ✅ 可以组合使用 | ✅ 可以组合使用 | **最佳实践**:用 OpenFGA 管理"谁与什么资源有什么关系",用 OPA 管理"在什么条件下允许什么操作"。 ## 15.12 小结 - OPA 是 CNCF 毕业的**通用策略引擎**,将策略从代码中解耦 - **Rego** 是声明式策略语言,支持复杂的条件逻辑 - OPA 支持多种部署模式:Sidecar、独立服务、库、WASM - 广泛应用于 **Kubernetes Admission**、**API 授权**、**IaC 合规** - OPA 和 OpenFGA **互补**:OPA 擅长策略逻辑,OpenFGA 擅长关系管理 - 提供 **Python、Java、Go** 等多语言集成方案,Go 可嵌入式使用获得极致性能 - **策略测试**(`opa test`)和 **Bundle 机制**是生产环境的关键实践