第十五章:OPA — 通用策略引擎

“策略应该与代码分离,就像数据应该与逻辑分离一样。”

        mindmap
  root((OPA))
    核心概念
      Policy
      Data
      Query
      Decision
    Rego 语言
      规则
      函数
      集合操作
      测试
    部署模式
      Sidecar
      独立服务
      库模式
      WASM
    应用场景
      K8s Admission
      API Gateway
      微服务授权
      Terraform
    

15.1 OPA 是什么

OPA(Open Policy Agent)是 CNCF 毕业项目,提供通用的策略引擎。它将策略决策从应用代码中解耦出来。

        flowchart LR
    subgraph 应用层
        A1[Web 应用]
        A2[API Gateway]
        A3[微服务]
        A4[CI/CD Pipeline]
    end

    subgraph OPA
        direction TB
        RE[Rego 评估引擎]
        P[策略<br/>Rego Files]
        D[数据<br/>JSON]
        P --> RE
        D --> RE
    end

    subgraph 策略来源
        B[Bundle Server]
        G[Git Repository]
    end

    A1 -->|"POST /v1/data/authz"| RE
    A2 -->|查询决策| RE
    A3 -->|查询决策| RE
    A4 -->|合规检查| RE
    RE -->|"{ allow: true }"| A1

    B -->|定期拉取| P
    B -->|定期拉取| D
    G -.->|CI/CD 构建| B
    

OPA 决策流程

        sequenceDiagram
    participant App as 应用
    participant OPA as OPA Server
    participant Bundle as Bundle Server

    Note over OPA,Bundle: 启动时加载策略
    OPA->>Bundle: GET /bundles/authz.tar.gz
    Bundle-->>OPA: 策略 + 数据

    App->>OPA: POST /v1/data/authz/allow<br/>{"input": {"user": "alice", "action": "read"}}
    OPA->>OPA: 评估 Rego 策略
    OPA->>OPA: 合并 input + data
    OPA-->>App: {"result": true}

    Note over OPA,Bundle: 定期轮询更新
    OPA->>Bundle: GET /bundles/authz.tar.gz<br/>If-None-Match: etag
    Bundle-->>OPA: 304 Not Modified / 新版本
    

OPA 部署模式对比

        flowchart TB
    subgraph Sidecar["Sidecar 模式"]
        direction LR
        subgraph Pod1["Kubernetes Pod"]
            APP1[应用容器] -->|localhost| OPA1[OPA Sidecar]
        end
    end

    subgraph Library["Library 模式(Go 嵌入)"]
        direction LR
        subgraph Process["应用进程"]
            APP2[应用代码] -->|函数调用| OPA2[OPA 库]
        end
    end

    subgraph Central["Central 模式"]
        direction LR
        APP3[应用 A] -->|HTTP/gRPC| OPAC[OPA 集群]
        APP4[应用 B] -->|HTTP/gRPC| OPAC
        APP5[应用 C] -->|HTTP/gRPC| OPAC
    end

    style Sidecar fill:#e8f5e9
    style Library fill:#e1f5fe
    style Central fill:#fff3e0
    

15.2 Rego 语言

Rego 是 OPA 的策略语言,声明式风格:

基本规则

package authz

import rego.v1

# 默认拒绝
default allow := false

# 管理员允许所有操作
allow if {
    input.user.role == "admin"
}

# 用户可以读取自己的数据
allow if {
    input.action == "read"
    input.resource.owner == input.user.id
}

# 编辑者可以编辑非机密文档
allow if {
    input.action == "edit"
    input.user.role == "editor"
    input.resource.classification != "confidential"
}

输入和查询

// 输入(input)
{
  "user": {
    "id": "alice",
    "role": "editor",
    "department": "engineering"
  },
  "action": "edit",
  "resource": {
    "type": "document",
    "id": "doc-123",
    "owner": "bob",
    "classification": "internal"
  }
}

// 查询:data.authz.allow → true

高级特性

package authz

import rego.v1

# 集合推导:获取用户的所有权限
user_permissions contains perm if {
    some role in input.user.roles
    some perm in data.role_permissions[role]
}

# 函数
is_owner(resource) if {
    resource.owner == input.user.id
}

# 条件组合
allow if {
    input.action == "delete"
    is_owner(input.resource)
    input.resource.status != "archived"
}

# 基于时间的策略
allow if {
    input.action == "access"
    time.weekday(time.now_ns()) in ["Monday", "Tuesday", "Wednesday", "Thursday", "Friday"]
    hour := time.clock(time.now_ns())[0]
    hour >= 9
    hour < 18
}

RBAC 策略

package rbac

import rego.v1

# 角色-权限映射(外部数据)
# data.json:
# {
#   "role_permissions": {
#     "admin": ["read", "write", "delete", "admin"],
#     "editor": ["read", "write"],
#     "viewer": ["read"]
#   },
#   "role_hierarchy": {
#     "admin": ["editor"],
#     "editor": ["viewer"]
#   }
# }

default allow := false

# 直接权限检查
allow if {
    some role in input.user.roles
    some perm in data.role_permissions[role]
    perm == input.action
}

# 通过角色层次继承权限
allow if {
    some role in input.user.roles
    some inherited_role in data.role_hierarchy[role]
    some perm in data.role_permissions[inherited_role]
    perm == input.action
}

# 获取用户的有效权限集合(含继承)
effective_permissions contains perm if {
    some role in input.user.roles
    some perm in data.role_permissions[role]
}

effective_permissions contains perm if {
    some role in input.user.roles
    some inherited in data.role_hierarchy[role]
    some perm in data.role_permissions[inherited]
}

ABAC 策略(基于时间、IP、部门)

package abac

import rego.v1

import data.ip_allowlist
import data.department_resources

default allow := false

# 规则 1:仅在工作时间允许访问敏感资源
allow if {
    input.resource.classification == "sensitive"
    is_business_hours
    is_allowed_ip
    is_same_department
}

# 规则 2:非敏感资源仅需认证
allow if {
    input.resource.classification != "sensitive"
    input.user.authenticated == true
}

# 规则 3:VPN 用户可以在非工作时间访问
allow if {
    input.network.vpn == true
    input.user.authenticated == true
    is_same_department
}

# ── 辅助规则 ─────────────────────────────────────────────────

is_business_hours if {
    day := time.weekday(time.now_ns())
    day in ["Monday", "Tuesday", "Wednesday", "Thursday", "Friday"]
    hour := time.clock(time.now_ns())[0]
    hour >= 9
    hour < 18
}

is_allowed_ip if {
    net.cidr_contains(ip_allowlist[_], input.network.source_ip)
}

is_same_department if {
    input.user.department == input.resource.department
}

# 返回拒绝原因(便于调试)
reasons contains "outside business hours" if {
    input.resource.classification == "sensitive"
    not is_business_hours
}

reasons contains "IP not in allowlist" if {
    not is_allowed_ip
}

reasons contains "department mismatch" if {
    not is_same_department
}

API 路由授权策略

package api_authz

import rego.v1

default allow := false

# 公开端点
allow if {
    is_public_endpoint
}

# 已认证用户的 API 访问
allow if {
    input.token.valid
    is_allowed_route
}

# ── 公开端点列表 ─────────────────────────────────────────────
is_public_endpoint if {
    input.path in ["/health", "/metrics", "/docs", "/openapi.json"]
}

is_public_endpoint if {
    input.method == "POST"
    input.path in ["/auth/login", "/auth/register"]
}

# ── 路由权限矩阵 ────────────────────────────────────────────
route_permissions := {
    "GET /api/v1/users":       {"roles": ["admin", "viewer"]},
    "POST /api/v1/users":      {"roles": ["admin"]},
    "GET /api/v1/documents":   {"roles": ["admin", "editor", "viewer"]},
    "POST /api/v1/documents":  {"roles": ["admin", "editor"]},
    "DELETE /api/v1/documents": {"roles": ["admin"]},
}

is_allowed_route if {
    route_key := sprintf("%s %s", [input.method, input.path])
    required := route_permissions[route_key]
    some role in input.token.roles
    role in required.roles
}

# 通配符路由匹配(/api/v1/documents/*)
is_allowed_route if {
    input.method == "GET"
    glob.match("/api/v1/documents/*", ["/"], input.path)
    some role in input.token.roles
    role in ["admin", "editor", "viewer"]
}

is_allowed_route if {
    input.method in ["PUT", "PATCH"]
    glob.match("/api/v1/documents/*", ["/"], input.path)
    some role in input.token.roles
    role in ["admin", "editor"]
}

# ── 速率限制 ─────────────────────────────────────────────────
rate_limit := 1000 if {
    some role in input.token.roles
    role == "premium"
} else := 100

Rego 测试

package authz_test

import rego.v1
import data.authz

test_admin_allowed if {
    authz.allow with input as {
        "user": {"role": "admin"},
        "action": "delete",
        "resource": {"id": "doc-1"}
    }
}

test_viewer_cannot_delete if {
    not authz.allow with input as {
        "user": {"role": "viewer"},
        "action": "delete",
        "resource": {"id": "doc-1"}
    }
}

test_owner_can_read if {
    authz.allow with input as {
        "user": {"id": "alice", "role": "viewer"},
        "action": "read",
        "resource": {"owner": "alice"}
    }
}
# 运行测试
opa test . -v

15.3 OPA 策略测试最佳实践

目录结构

policies/
├── authz.rego              # 主策略
├── authz_test.rego         # 主策略测试
├── rbac.rego               # RBAC 策略
├── rbac_test.rego          # RBAC 测试
├── abac.rego               # ABAC 策略
├── abac_test.rego          # ABAC 测试
├── data.json               # 外部数据
└── Makefile

完整测试示例

package rbac_test

import rego.v1
import data.rbac

# ── 测试数据 ─────────────────────────────────────────────────
mock_role_permissions := {
    "admin": ["read", "write", "delete"],
    "editor": ["read", "write"],
    "viewer": ["read"],
}

mock_role_hierarchy := {
    "admin": ["editor"],
    "editor": ["viewer"],
}

# ── 正向测试 ─────────────────────────────────────────────────
test_admin_can_delete if {
    rbac.allow with input as {
        "user": {"roles": ["admin"]},
        "action": "delete",
    } with data.role_permissions as mock_role_permissions
      with data.role_hierarchy as mock_role_hierarchy
}

test_editor_can_write if {
    rbac.allow with input as {
        "user": {"roles": ["editor"]},
        "action": "write",
    } with data.role_permissions as mock_role_permissions
      with data.role_hierarchy as mock_role_hierarchy
}

# 测试角色继承:editor 继承 viewer 的 read 权限
test_editor_inherits_read if {
    rbac.allow with input as {
        "user": {"roles": ["editor"]},
        "action": "read",
    } with data.role_permissions as mock_role_permissions
      with data.role_hierarchy as mock_role_hierarchy
}

# ── 反向测试 ─────────────────────────────────────────────────
test_viewer_cannot_write if {
    not rbac.allow with input as {
        "user": {"roles": ["viewer"]},
        "action": "write",
    } with data.role_permissions as mock_role_permissions
      with data.role_hierarchy as mock_role_hierarchy
}

test_viewer_cannot_delete if {
    not rbac.allow with input as {
        "user": {"roles": ["viewer"]},
        "action": "delete",
    } with data.role_permissions as mock_role_permissions
      with data.role_hierarchy as mock_role_hierarchy
}

test_no_roles_denied if {
    not rbac.allow with input as {
        "user": {"roles": []},
        "action": "read",
    } with data.role_permissions as mock_role_permissions
      with data.role_hierarchy as mock_role_hierarchy
}

# ── 有效权限集合测试 ─────────────────────────────────────────
test_admin_effective_permissions if {
    perms := rbac.effective_permissions with input as {
        "user": {"roles": ["admin"]},
    } with data.role_permissions as mock_role_permissions
      with data.role_hierarchy as mock_role_hierarchy

    perms == {"read", "write", "delete"}
}
# 运行所有测试(详细输出)
opa test . -v

# 运行测试并生成覆盖率报告
opa test . --coverage --format=json

# 仅运行匹配的测试
opa test . -v --run "test_admin"

# 性能基准测试
opa bench 'data.authz.allow' -i input.json -d policies/
# Makefile
.PHONY: test lint fmt check

test:
	opa test policies/ -v

coverage:
	opa test policies/ --coverage --format=json | jq '.coverage'

lint:
	opa check policies/ --strict

fmt:
	opa fmt policies/ --write

check: fmt lint test
	@echo "All checks passed!"

15.4 OPA 部署模式

模式

描述

延迟

适用场景

Sidecar

与应用同 Pod

极低(本地)

Kubernetes 微服务

独立服务

独立部署的 OPA 服务

低(网络)

集中策略管理

库模式

嵌入应用进程

极低

Go 应用

WASM

编译为 WebAssembly

极低

边缘计算、浏览器

15.5 应用场景

Kubernetes Admission Control(Gatekeeper)

# 禁止使用 latest 标签的镜像
apiVersion: constraints.gatekeeper.sh/v1beta1
kind: K8sDisallowedTags
metadata:
  name: no-latest-tag
spec:
  match:
    kinds:
      - apiGroups: [""]
        kinds: ["Pod"]
  parameters:
    tags: ["latest"]

API Gateway 授权

package gateway

import rego.v1

default allow := false

# 公开端点无需认证
allow if {
    input.path in ["/health", "/metrics", "/docs"]
}

# 已认证用户可以访问 API
allow if {
    input.token.valid
    input.method == "GET"
    startswith(input.path, "/api/v1/")
}

# 管理员可以访问管理端点
allow if {
    input.token.valid
    "admin" in input.token.roles
    startswith(input.path, "/admin/")
}

# 速率限制
rate_limit := limit if {
    "premium" in input.token.roles
    limit := 1000
} else := limit if {
    limit := 100
}

Terraform 策略检查(Conftest)

package terraform

import rego.v1

# 禁止公开的 S3 存储桶
deny contains msg if {
    some resource in input.resource_changes
    resource.type == "aws_s3_bucket"
    resource.change.after.acl == "public-read"
    msg := sprintf("S3 bucket %s must not be public", [resource.address])
}

# 要求所有资源有标签
deny contains msg if {
    some resource in input.resource_changes
    not resource.change.after.tags.Environment
    msg := sprintf("Resource %s must have Environment tag", [resource.address])
}

15.6 Python 集成

OPA REST API 客户端

"""OPA REST API 客户端 — 支持策略查询、数据管理、健康检查"""
import httpx
from typing import Any, Optional
from dataclasses import dataclass


@dataclass
class OPADecision:
    result: Any
    decision_id: Optional[str] = None


class OPAClient:
    """OPA REST API 客户端"""

    def __init__(self, base_url: str = "http://localhost:8181", timeout: float = 5.0):
        self.base_url = base_url.rstrip("/")
        self.timeout = timeout
        self._client = httpx.AsyncClient(
            base_url=self.base_url,
            timeout=timeout,
            headers={"Content-Type": "application/json"},
        )

    async def close(self):
        await self._client.aclose()

    async def __aenter__(self):
        return self

    async def __aexit__(self, *args):
        await self.close()

    async def check(self, policy_path: str, input_data: dict) -> OPADecision:
        """查询策略决策

        Args:
            policy_path: 策略路径,如 "authz/allow"
            input_data: 输入数据
        """
        url = f"/v1/data/{policy_path}"
        response = await self._client.post(url, json={"input": input_data})
        response.raise_for_status()
        body = response.json()
        return OPADecision(
            result=body.get("result"),
            decision_id=body.get("decision_id"),
        )

    async def query(self, query: str, input_data: Optional[dict] = None) -> Any:
        """执行任意 Rego 查询"""
        payload: dict = {"query": query}
        if input_data:
            payload["input"] = input_data
        response = await self._client.post("/v1/query", json=payload)
        response.raise_for_status()
        return response.json().get("result")

    async def put_data(self, path: str, data: Any) -> None:
        """上传外部数据"""
        response = await self._client.put(f"/v1/data/{path}", json=data)
        response.raise_for_status()

    async def get_data(self, path: str) -> Any:
        """获取外部数据"""
        response = await self._client.get(f"/v1/data/{path}")
        response.raise_for_status()
        return response.json().get("result")

    async def put_policy(self, policy_id: str, rego_code: str) -> None:
        """上传策略"""
        response = await self._client.put(
            f"/v1/policies/{policy_id}",
            content=rego_code.encode(),
            headers={"Content-Type": "text/plain"},
        )
        response.raise_for_status()

    async def health(self) -> bool:
        """健康检查"""
        try:
            response = await self._client.get("/health")
            return response.status_code == 200
        except Exception:
            return False


# ── 使用示例 ─────────────────────────────────────────────────
async def demo():
    async with OPAClient("http://localhost:8181") as opa:
        # 健康检查
        healthy = await opa.health()
        print(f"OPA healthy: {healthy}")

        # 上传角色权限数据
        await opa.put_data("role_permissions", {
            "admin": ["read", "write", "delete"],
            "editor": ["read", "write"],
            "viewer": ["read"],
        })

        # 上传策略
        await opa.put_policy("authz", """
            package authz
            import rego.v1
            default allow := false
            allow if {
                some role in input.user.roles
                some perm in data.role_permissions[role]
                perm == input.action
            }
        """)

        # 查询决策
        decision = await opa.check("authz/allow", {
            "user": {"roles": ["editor"]},
            "action": "write",
        })
        print(f"Editor can write: {decision.result}")  # True

        decision = await opa.check("authz/allow", {
            "user": {"roles": ["viewer"]},
            "action": "delete",
        })
        print(f"Viewer can delete: {decision.result}")  # False

FastAPI + OPA 授权中间件

"""FastAPI + OPA 授权中间件完整示例"""
from contextlib import asynccontextmanager
from typing import Optional

import httpx
from fastapi import FastAPI, Depends, HTTPException, Request
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials

# ── OPA 客户端 ───────────────────────────────────────────────
OPA_URL = "http://localhost:8181"
opa_http: Optional[httpx.AsyncClient] = None


@asynccontextmanager
async def lifespan(app: FastAPI):
    global opa_http
    opa_http = httpx.AsyncClient(base_url=OPA_URL, timeout=5.0)
    yield
    await opa_http.aclose()


app = FastAPI(title="OPA Demo", lifespan=lifespan)
security = HTTPBearer()


# ── 用户解析 ─────────────────────────────────────────────────
async def get_current_user(
    credentials: HTTPAuthorizationCredentials = Depends(security),
) -> dict:
    # 实际项目中应验证 JWT
    token = credentials.credentials
    return {"id": token, "roles": ["editor"], "department": "engineering"}


# ── OPA 授权依赖 ─────────────────────────────────────────────
class OPAAuthorize:
    """可复用的 OPA 授权依赖"""

    def __init__(self, policy_path: str = "authz/allow"):
        self.policy_path = policy_path

    async def __call__(
        self,
        request: Request,
        user: dict = Depends(get_current_user),
    ) -> dict:
        opa_input = {
            "input": {
                "user": user,
                "method": request.method,
                "path": request.url.path,
                "headers": dict(request.headers),
            }
        }

        response = await opa_http.post(
            f"/v1/data/{self.policy_path}",
            json=opa_input,
        )

        if response.status_code != 200:
            raise HTTPException(status_code=503, detail="Policy engine unavailable")

        result = response.json().get("result", False)
        if not result:
            raise HTTPException(status_code=403, detail="Denied by policy")

        return user


# ── 路由 ─────────────────────────────────────────────────────
authorize = OPAAuthorize("api_authz/allow")


@app.get("/api/v1/documents")
async def list_documents(user: dict = Depends(authorize)):
    return {"documents": [...], "user": user["id"]}


@app.post("/api/v1/documents")
async def create_document(user: dict = Depends(authorize)):
    return {"created": True, "user": user["id"]}


@app.delete("/api/v1/documents/{doc_id}")
async def delete_document(doc_id: str, user: dict = Depends(authorize)):
    return {"deleted": doc_id, "user": user["id"]}


# ── 管理端点:动态更新策略数据 ────────────────────────────────
@app.put("/admin/opa/data/{path:path}")
async def update_opa_data(path: str, data: dict):
    """动态更新 OPA 外部数据"""
    response = await opa_http.put(f"/v1/data/{path}", json=data)
    if response.status_code not in (200, 204):
        raise HTTPException(status_code=500, detail="Failed to update OPA data")
    return {"updated": path}

15.7 Java 集成

OPA REST API 客户端(OkHttp)

// build.gradle
// dependencies {
//     implementation 'com.squareup.okhttp3:okhttp:4.12.0'
//     implementation 'com.fasterxml.jackson.core:jackson-databind:2.17.0'
// }

import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import okhttp3.*;

import java.io.IOException;
import java.util.Map;
import java.util.concurrent.TimeUnit;

public class OPAClient implements AutoCloseable {

    private static final MediaType JSON_TYPE = MediaType.get("application/json");
    private static final MediaType TEXT_TYPE = MediaType.get("text/plain");

    private final String baseUrl;
    private final OkHttpClient httpClient;
    private final ObjectMapper mapper;

    public OPAClient(String baseUrl) {
        this.baseUrl = baseUrl.replaceAll("/+$", "");
        this.httpClient = new OkHttpClient.Builder()
                .connectTimeout(5, TimeUnit.SECONDS)
                .readTimeout(5, TimeUnit.SECONDS)
                .build();
        this.mapper = new ObjectMapper();
    }

    /**
     * 查询策略决策
     */
    public boolean check(String policyPath, Map<String, Object> input) throws IOException {
        Map<String, Object> body = Map.of("input", input);
        String json = mapper.writeValueAsString(body);

        Request request = new Request.Builder()
                .url(baseUrl + "/v1/data/" + policyPath)
                .post(RequestBody.create(json, JSON_TYPE))
                .build();

        try (Response response = httpClient.newCall(request).execute()) {
            if (!response.isSuccessful()) {
                throw new IOException("OPA request failed: " + response.code());
            }
            JsonNode node = mapper.readTree(response.body().string());
            return node.path("result").asBoolean(false);
        }
    }

    /**
     * 查询策略并返回完整结果
     */
    public JsonNode query(String policyPath, Map<String, Object> input) throws IOException {
        Map<String, Object> body = Map.of("input", input);
        String json = mapper.writeValueAsString(body);

        Request request = new Request.Builder()
                .url(baseUrl + "/v1/data/" + policyPath)
                .post(RequestBody.create(json, JSON_TYPE))
                .build();

        try (Response response = httpClient.newCall(request).execute()) {
            if (!response.isSuccessful()) {
                throw new IOException("OPA request failed: " + response.code());
            }
            return mapper.readTree(response.body().string()).path("result");
        }
    }

    /**
     * 上传外部数据
     */
    public void putData(String path, Object data) throws IOException {
        String json = mapper.writeValueAsString(data);
        Request request = new Request.Builder()
                .url(baseUrl + "/v1/data/" + path)
                .put(RequestBody.create(json, JSON_TYPE))
                .build();

        try (Response response = httpClient.newCall(request).execute()) {
            if (!response.isSuccessful()) {
                throw new IOException("Failed to put data: " + response.code());
            }
        }
    }

    /**
     * 上传策略
     */
    public void putPolicy(String policyId, String regoCode) throws IOException {
        Request request = new Request.Builder()
                .url(baseUrl + "/v1/policies/" + policyId)
                .put(RequestBody.create(regoCode, TEXT_TYPE))
                .build();

        try (Response response = httpClient.newCall(request).execute()) {
            if (!response.isSuccessful()) {
                throw new IOException("Failed to put policy: " + response.code());
            }
        }
    }

    /**
     * 健康检查
     */
    public boolean isHealthy() {
        Request request = new Request.Builder()
                .url(baseUrl + "/health")
                .get()
                .build();
        try (Response response = httpClient.newCall(request).execute()) {
            return response.isSuccessful();
        } catch (IOException e) {
            return false;
        }
    }

    @Override
    public void close() {
        httpClient.dispatcher().executorService().shutdown();
        httpClient.connectionPool().evictAll();
    }

    // ── 使用示例 ─────────────────────────────────────────────
    public static void main(String[] args) throws Exception {
        try (var opa = new OPAClient("http://localhost:8181")) {
            // 健康检查
            System.out.println("OPA healthy: " + opa.isHealthy());

            // 上传数据
            opa.putData("role_permissions", Map.of(
                    "admin", java.util.List.of("read", "write", "delete"),
                    "editor", java.util.List.of("read", "write"),
                    "viewer", java.util.List.of("read")
            ));

            // 查询决策
            boolean allowed = opa.check("authz/allow", Map.of(
                    "user", Map.of("roles", java.util.List.of("editor")),
                    "action", "write"
            ));
            System.out.println("Editor can write: " + allowed);

            // 查询完整结果
            JsonNode result = opa.query("authz", Map.of(
                    "user", Map.of("roles", java.util.List.of("admin")),
                    "action", "delete"
            ));
            System.out.println("Full result: " + result.toPrettyString());
        }
    }
}

Spring Boot + OPA 集成

// ── application.yml ─────────────────────────────────────────
// opa:
//   base-url: http://localhost:8181
//   policy-path: api_authz/allow
//   timeout: 5s

import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.client.RestClient;

import java.time.Duration;

@ConfigurationProperties(prefix = "opa")
record OPAProperties(String baseUrl, String policyPath, Duration timeout) {}

@Configuration
class OPAConfig {

    @Bean
    RestClient opaRestClient(OPAProperties props) {
        return RestClient.builder()
                .baseUrl(props.baseUrl())
                .build();
    }
}

// ── OPA 授权服务 ────────────────────────────────────────────
import com.fasterxml.jackson.annotation.JsonProperty;
import org.springframework.stereotype.Service;
import org.springframework.web.client.RestClient;

import java.util.Map;

@Service
class OPAAuthorizationService {

    private final RestClient restClient;
    private final OPAProperties props;

    OPAAuthorizationService(RestClient opaRestClient, OPAProperties props) {
        this.restClient = opaRestClient;
        this.props = props;
    }

    record OPARequest(Map<String, Object> input) {}
    record OPAResponse(@JsonProperty("result") Object result) {}

    public boolean isAllowed(Map<String, Object> input) {
        try {
            OPAResponse response = restClient.post()
                    .uri("/v1/data/{path}", props.policyPath())
                    .body(new OPARequest(input))
                    .retrieve()
                    .body(OPAResponse.class);

            return response != null && Boolean.TRUE.equals(response.result());
        } catch (Exception e) {
            // 策略引擎不可用时默认拒绝(fail-closed)
            return false;
        }
    }

    @SuppressWarnings("unchecked")
    public Map<String, Object> evaluate(String policyPath, Map<String, Object> input) {
        OPAResponse response = restClient.post()
                .uri("/v1/data/{path}", policyPath)
                .body(new OPARequest(input))
                .retrieve()
                .body(OPAResponse.class);

        if (response != null && response.result() instanceof Map) {
            return (Map<String, Object>) response.result();
        }
        return Map.of();
    }
}

// ── 自定义注解 + 拦截器 ─────────────────────────────────────
import java.lang.annotation.*;

@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
@Documented
@interface OPAPolicy {
    /** OPA 策略路径,默认使用全局配置 */
    String value() default "";
}

import jakarta.servlet.http.HttpServletRequest;
import jakarta.servlet.http.HttpServletResponse;
import org.springframework.http.HttpStatus;
import org.springframework.security.core.context.SecurityContextHolder;
import org.springframework.stereotype.Component;
import org.springframework.web.method.HandlerMethod;
import org.springframework.web.servlet.HandlerInterceptor;

import java.util.Map;

@Component
class OPAInterceptor implements HandlerInterceptor {

    private final OPAAuthorizationService opaService;

    OPAInterceptor(OPAAuthorizationService opaService) {
        this.opaService = opaService;
    }

    @Override
    public boolean preHandle(HttpServletRequest request, HttpServletResponse response,
                             Object handler) throws Exception {
        if (!(handler instanceof HandlerMethod method)) {
            return true;
        }

        OPAPolicy annotation = method.getMethodAnnotation(OPAPolicy.class);
        if (annotation == null) {
            return true; // 无注解则跳过
        }

        var auth = SecurityContextHolder.getContext().getAuthentication();
        Map<String, Object> input = Map.of(
                "user", Map.of(
                        "id", auth.getName(),
                        "roles", auth.getAuthorities().stream()
                                .map(Object::toString).toList()
                ),
                "method", request.getMethod(),
                "path", request.getRequestURI()
        );

        boolean allowed = opaService.isAllowed(input);
        if (!allowed) {
            response.setStatus(HttpStatus.FORBIDDEN.value());
            response.getWriter().write("{\"error\": \"Denied by OPA policy\"}");
            return false;
        }
        return true;
    }
}

// ── 注册拦截器 ──────────────────────────────────────────────
import org.springframework.context.annotation.Configuration;
import org.springframework.web.servlet.config.annotation.InterceptorRegistry;
import org.springframework.web.servlet.config.annotation.WebMvcConfigurer;

@Configuration
class WebConfig implements WebMvcConfigurer {

    private final OPAInterceptor opaInterceptor;

    WebConfig(OPAInterceptor opaInterceptor) {
        this.opaInterceptor = opaInterceptor;
    }

    @Override
    public void addInterceptors(InterceptorRegistry registry) {
        registry.addInterceptor(opaInterceptor)
                .addPathPatterns("/api/**");
    }
}

// ── Controller 使用示例 ─────────────────────────────────────
import org.springframework.web.bind.annotation.*;

import java.util.Map;

@RestController
@RequestMapping("/api/v1/documents")
class DocumentController {

    @GetMapping
    @OPAPolicy
    public Map<String, Object> listDocuments() {
        return Map.of("documents", java.util.List.of());
    }

    @PostMapping
    @OPAPolicy
    public Map<String, Object> createDocument(@RequestBody Map<String, Object> body) {
        return Map.of("created", true);
    }

    @DeleteMapping("/{docId}")
    @OPAPolicy("api_authz/allow_delete")
    public Map<String, Object> deleteDocument(@PathVariable String docId) {
        return Map.of("deleted", docId);
    }
}

15.8 Go 集成

OPA Go 库(嵌入式 OPA)

package main

import (
	"context"
	"encoding/json"
	"fmt"
	"log"

	"github.com/open-policy-agent/opa/v1/ast"
	"github.com/open-policy-agent/opa/v1/rego"
	"github.com/open-policy-agent/opa/v1/storage/inmem"
)

// ── 嵌入式 OPA 评估器 ──────────────────────────────────────
type OPAEvaluator struct {
	query rego.PreparedEvalQuery
}

// NewOPAEvaluator 创建一个预编译的 OPA 评估器
func NewOPAEvaluator(policy string, queryStr string, data map[string]interface{}) (*OPAEvaluator, error) {
	ctx := context.Background()

	// 创建内存存储(外部数据)
	store := inmem.NewFromObject(data)

	// 预编译查询(性能优化:只编译一次)
	prepared, err := rego.New(
		rego.Query(queryStr),
		rego.Module("policy.rego", policy),
		rego.Store(store),
	).PrepareForEval(ctx)

	if err != nil {
		return nil, fmt.Errorf("failed to prepare query: %w", err)
	}

	return &OPAEvaluator{query: prepared}, nil
}

// Evaluate 评估策略
func (e *OPAEvaluator) Evaluate(ctx context.Context, input map[string]interface{}) (bool, error) {
	results, err := e.query.Eval(ctx, rego.EvalInput(input))
	if err != nil {
		return false, fmt.Errorf("evaluation failed: %w", err)
	}

	if len(results) == 0 || len(results[0].Expressions) == 0 {
		return false, nil
	}

	allowed, ok := results[0].Expressions[0].Value.(bool)
	return ok && allowed, nil
}

// EvaluateFull 评估策略并返回完整结果
func (e *OPAEvaluator) EvaluateFull(ctx context.Context, input map[string]interface{}) (interface{}, error) {
	results, err := e.query.Eval(ctx, rego.EvalInput(input))
	if err != nil {
		return nil, fmt.Errorf("evaluation failed: %w", err)
	}

	if len(results) == 0 || len(results[0].Expressions) == 0 {
		return nil, nil
	}

	return results[0].Expressions[0].Value, nil
}

func main() {
	ctx := context.Background()

	// 策略定义
	policy := `
		package authz
		import rego.v1

		default allow := false

		allow if {
			some role in input.user.roles
			some perm in data.role_permissions[role]
			perm == input.action
		}

		allow if {
			input.user.id == input.resource.owner
		}
	`

	// 外部数据
	data := map[string]interface{}{
		"role_permissions": map[string]interface{}{
			"admin":  []string{"read", "write", "delete"},
			"editor": []string{"read", "write"},
			"viewer": []string{"read"},
		},
	}

	// 创建评估器
	evaluator, err := NewOPAEvaluator(policy, "data.authz.allow", data)
	if err != nil {
		log.Fatalf("创建评估器失败: %v", err)
	}

	// 测试用例
	testCases := []struct {
		name  string
		input map[string]interface{}
		want  bool
	}{
		{
			name: "admin can delete",
			input: map[string]interface{}{
				"user":     map[string]interface{}{"id": "alice", "roles": []string{"admin"}},
				"action":   "delete",
				"resource": map[string]interface{}{"id": "doc-1", "owner": "bob"},
			},
			want: true,
		},
		{
			name: "editor can write",
			input: map[string]interface{}{
				"user":     map[string]interface{}{"id": "bob", "roles": []string{"editor"}},
				"action":   "write",
				"resource": map[string]interface{}{"id": "doc-1", "owner": "alice"},
			},
			want: true,
		},
		{
			name: "viewer cannot delete",
			input: map[string]interface{}{
				"user":     map[string]interface{}{"id": "carol", "roles": []string{"viewer"}},
				"action":   "delete",
				"resource": map[string]interface{}{"id": "doc-1", "owner": "alice"},
			},
			want: false,
		},
		{
			name: "owner can do anything",
			input: map[string]interface{}{
				"user":     map[string]interface{}{"id": "alice", "roles": []string{"viewer"}},
				"action":   "delete",
				"resource": map[string]interface{}{"id": "doc-1", "owner": "alice"},
			},
			want: true,
		},
	}

	for _, tc := range testCases {
		allowed, err := evaluator.Evaluate(ctx, tc.input)
		if err != nil {
			log.Printf("❌ %s: error: %v", tc.name, err)
			continue
		}
		status := "✅"
		if allowed != tc.want {
			status = "❌"
		}
		fmt.Printf("%s %s: allowed=%v (want=%v)\n", status, tc.name, allowed, tc.want)
	}

	// 性能测试:预编译查询非常快
	input := map[string]interface{}{
		"user":     map[string]interface{}{"id": "alice", "roles": []string{"admin"}},
		"action":   "read",
		"resource": map[string]interface{}{"id": "doc-1"},
	}
	_ = json.RawMessage{} // suppress unused import
	_ = ast.MustParseModule        // suppress unused import

	// 基准:预编译后每次评估通常 < 0.1ms
	for i := 0; i < 1000; i++ {
		_, _ = evaluator.Evaluate(ctx, input)
	}
	fmt.Println("1000 次评估完成")
}

Gin + OPA 授权中间件

package main

import (
	"context"
	"fmt"
	"log"
	"net/http"
	"os"

	"github.com/gin-gonic/gin"
	"github.com/open-policy-agent/opa/v1/rego"
	"github.com/open-policy-agent/opa/v1/storage/inmem"
)

// ── 嵌入式 OPA 引擎 ────────────────────────────────────────
type PolicyEngine struct {
	query rego.PreparedEvalQuery
}

func NewPolicyEngine(policyDir string, data map[string]interface{}) (*PolicyEngine, error) {
	ctx := context.Background()
	store := inmem.NewFromObject(data)

	// 从目录加载所有 .rego 文件
	prepared, err := rego.New(
		rego.Query("data.api_authz.allow"),
		rego.Load([]string{policyDir}, nil),
		rego.Store(store),
	).PrepareForEval(ctx)

	if err != nil {
		return nil, fmt.Errorf("failed to prepare policy engine: %w", err)
	}

	return &PolicyEngine{query: prepared}, nil
}

func (pe *PolicyEngine) IsAllowed(ctx context.Context, input map[string]interface{}) bool {
	results, err := pe.query.Eval(ctx, rego.EvalInput(input))
	if err != nil {
		log.Printf("OPA evaluation error: %v", err)
		return false // fail-closed
	}
	if len(results) == 0 || len(results[0].Expressions) == 0 {
		return false
	}
	allowed, ok := results[0].Expressions[0].Value.(bool)
	return ok && allowed
}

// ── 全局策略引擎 ────────────────────────────────────────────
var engine *PolicyEngine

// ── OPA 授权中间件 ──────────────────────────────────────────
func OPAMiddleware() gin.HandlerFunc {
	return func(c *gin.Context) {
		userID, _ := c.Get("userID")
		userRoles, _ := c.Get("userRoles")

		input := map[string]interface{}{
			"user": map[string]interface{}{
				"id":    userID,
				"roles": userRoles,
			},
			"method": c.Request.Method,
			"path":   c.Request.URL.Path,
			"token": map[string]interface{}{
				"valid": true,
				"roles": userRoles,
			},
		}

		if !engine.IsAllowed(c.Request.Context(), input) {
			c.AbortWithStatusJSON(http.StatusForbidden, gin.H{
				"error": "Access denied by policy",
			})
			return
		}

		c.Next()
	}
}

// ── 认证中间件(简化) ─────────────────────────────────────
func AuthMiddleware() gin.HandlerFunc {
	return func(c *gin.Context) {
		token := c.GetHeader("Authorization")
		if token == "" {
			c.AbortWithStatusJSON(http.StatusUnauthorized, gin.H{"error": "missing token"})
			return
		}
		// 实际项目中应验证 JWT
		c.Set("userID", token)
		c.Set("userRoles", []string{"editor"})
		c.Next()
	}
}

func main() {
	// 外部数据
	data := map[string]interface{}{
		"role_permissions": map[string]interface{}{
			"admin":  []string{"read", "write", "delete"},
			"editor": []string{"read", "write"},
			"viewer": []string{"read"},
		},
	}

	// 初始化策略引擎
	var err error
	policyDir := os.Getenv("OPA_POLICY_DIR")
	if policyDir == "" {
		policyDir = "./policies"
	}
	engine, err = NewPolicyEngine(policyDir, data)
	if err != nil {
		log.Fatalf("初始化策略引擎失败: %v", err)
	}

	r := gin.Default()

	// 公开端点
	r.GET("/health", func(c *gin.Context) {
		c.JSON(http.StatusOK, gin.H{"status": "ok"})
	})

	// 受保护的 API
	api := r.Group("/api/v1", AuthMiddleware(), OPAMiddleware())
	{
		api.GET("/documents", listDocuments)
		api.POST("/documents", createDocument)
		api.GET("/documents/:id", getDocument)
		api.PUT("/documents/:id", updateDocument)
		api.DELETE("/documents/:id", deleteDocument)
	}

	r.Run(":9090")
}

func listDocuments(c *gin.Context) {
	c.JSON(http.StatusOK, gin.H{"documents": []string{}})
}

func createDocument(c *gin.Context) {
	c.JSON(http.StatusOK, gin.H{"created": true})
}

func getDocument(c *gin.Context) {
	c.JSON(http.StatusOK, gin.H{"id": c.Param("id"), "content": "..."})
}

func updateDocument(c *gin.Context) {
	c.JSON(http.StatusOK, gin.H{"id": c.Param("id"), "updated": true})
}

func deleteDocument(c *gin.Context) {
	c.JSON(http.StatusOK, gin.H{"id": c.Param("id"), "deleted": true})
}

15.9 OPA 性能优化

Partial Evaluation(部分评估)

// Partial Evaluation 将策略预编译为更高效的形式
// 适用于:已知部分输入(如用户角色),未知部分输入(如资源属性)

package main

import (
	"context"
	"fmt"
	"log"

	"github.com/open-policy-agent/opa/v1/ast"
	"github.com/open-policy-agent/opa/v1/rego"
)

func partialEvalDemo() {
	ctx := context.Background()

	policy := `
		package authz
		import rego.v1
		default allow := false
		allow if {
			input.user.role == "admin"
		}
		allow if {
			input.user.role == "editor"
			input.resource.classification != "confidential"
		}
	`

	// 部分评估:固定 user.role = "editor",资源属性未知
	r := rego.New(
		rego.Query("data.authz.allow"),
		rego.Module("policy.rego", policy),
		rego.Unknowns([]string{"input.resource"}),
	)

	pq, err := r.Partial(ctx, rego.EvalInput(map[string]interface{}{
		"user": map[string]interface{}{"role": "editor"},
	}))
	if err != nil {
		log.Fatalf("Partial evaluation failed: %v", err)
	}

	// 输出简化后的查询(已消除 user.role 检查)
	for _, q := range pq.Queries {
		fmt.Printf("Simplified query: %v\n", q)
	}
	// 输出类似:input.resource.classification != "confidential"
	// 运行时只需检查资源属性,跳过用户角色检查
	_ = ast.MustParseModule // suppress unused import
}

Bundle 优化

# Bundle 目录结构
bundle/
├── authz/
│   ├── policy.rego          # 策略文件
│   └── data.json            # 外部数据
├── rbac/
│   ├── policy.rego
│   └── data.json
├── .manifest                # Bundle 元数据
└── .signatures.json         # 签名(可选)
// .manifest
{
  "revision": "v1.2.3",
  "roots": ["authz", "rbac"],
  "metadata": {
    "created_at": "2025-01-15T10:00:00Z"
  }
}
# 构建 Bundle
opa build -b bundle/ -o authz-bundle.tar.gz

# 验证 Bundle
opa inspect authz-bundle.tar.gz

# 签名 Bundle(生产环境推荐)
opa sign --signing-key key.pem --bundle bundle/

性能调优建议

优化手段

效果

适用场景

PreparedEvalQuery

避免重复编译,提升 10-100x

所有场景

Partial Evaluation

减少运行时计算量

已知部分输入

Bundle

原子更新、减少 I/O

生产部署

内存存储

避免外部数据查询延迟

数据量 < 100MB

Decision Log 采样

减少日志开销

高 QPS 场景

WASM 编译

跨语言、低延迟

非 Go 应用

15.10 OPA 部署

# docker-compose.yml — OPA + Bundle Server
services:
  # Nginx 作为 Bundle Server
  bundle-server:
    image: nginx:alpine
    volumes:
      - ./bundles:/usr/share/nginx/html/bundles:ro
      - ./nginx.conf:/etc/nginx/conf.d/default.conf:ro
    ports:
      - "8888:80"
    healthcheck:
      test: ["CMD", "wget", "-q", "--spider", "http://localhost:80/health"]
      interval: 10s
      timeout: 5s
      retries: 3

  opa:
    image: openpolicyagent/opa:latest
    command:
      - "run"
      - "--server"
      - "--addr=0.0.0.0:8181"
      - "--set=decision_logs.console=true"
      - "--set=bundles.authz.service=bundle-server"
      - "--set=bundles.authz.resource=/bundles/authz.tar.gz"
      - "--set=bundles.authz.polling.min_delay_seconds=10"
      - "--set=bundles.authz.polling.max_delay_seconds=30"
      - "--set=services.bundle-server.url=http://bundle-server:80"
    ports:
      - "8181:8181"
    depends_on:
      bundle-server:
        condition: service_healthy
    healthcheck:
      test: ["CMD", "wget", "-q", "--spider", "http://localhost:8181/health"]
      interval: 10s
      timeout: 5s
      retries: 3
# nginx.conf
server {
    listen 80;

    location /health {
        return 200 'ok';
        add_header Content-Type text/plain;
    }

    location /bundles/ {
        alias /usr/share/nginx/html/bundles/;
        add_header ETag $upstream_http_etag;
    }
}
# 构建并部署 Bundle
mkdir -p bundles
opa build -b policies/ -o bundles/authz.tar.gz

# 启动
docker compose up -d

# 验证 OPA 健康
curl http://localhost:8181/health

# 测试策略
curl -X POST http://localhost:8181/v1/data/authz/allow \
  -H 'Content-Type: application/json' \
  -d '{
    "input": {
      "user": {"roles": ["admin"]},
      "action": "delete"
    }
  }'

# 更新策略(重新构建 Bundle,OPA 自动拉取)
opa build -b policies/ -o bundles/authz.tar.gz

15.11 OPA vs OpenFGA

维度

OPA

OpenFGA

模型

ABAC(策略驱动)

ReBAC(关系驱动)

语言

Rego

DSL

擅长

复杂条件逻辑

资源级权限继承

数据

策略 + JSON 数据

关系元组图

查询

“允许吗?”

“允许吗?” + “列出所有”

适用

K8s、API 策略、合规

文档协作、SaaS 权限

组合

✅ 可以组合使用

✅ 可以组合使用

最佳实践:用 OpenFGA 管理”谁与什么资源有什么关系”,用 OPA 管理”在什么条件下允许什么操作”。

15.12 小结

  • OPA 是 CNCF 毕业的通用策略引擎,将策略从代码中解耦

  • Rego 是声明式策略语言,支持复杂的条件逻辑

  • OPA 支持多种部署模式:Sidecar、独立服务、库、WASM

  • 广泛应用于 Kubernetes AdmissionAPI 授权IaC 合规

  • OPA 和 OpenFGA 互补:OPA 擅长策略逻辑,OpenFGA 擅长关系管理

  • 提供 Python、Java、Go 等多语言集成方案,Go 可嵌入式使用获得极致性能

  • 策略测试opa test)和 Bundle 机制是生产环境的关键实践