# 第二十三章:零信任架构
> "永不信任,始终验证 — 这不是偏执,而是现代安全的基本原则。"
```{mermaid}
mindmap
root((零信任架构))
核心原则
永不信任
假设入侵
最小权限
显式验证
NIST 架构
PE 策略引擎
PA 策略管理
PEP 执行点
五大支柱
身份
设备
网络
应用
数据
实践
BeyondCorp
ZTNA
成熟度模型
```
## 23.1 零信任的起源
2010 年,Forrester 的 John Kindervag 提出零信任模型,核心洞察:
> **传统的"内网可信、外网不可信"的假设已经过时。**
```{mermaid}
flowchart TB
subgraph traditional["传统模型(城堡与护城河)"]
direction TB
FW["🔥 防火墙"]
EXT["外网 = 不可信 ❌"]
subgraph intranet["内网 = 可信 ✅"]
DB1["🗄️ DB"]
APP1["📦 App"]
API1["🔌 API"]
DB1 <--> APP1
APP1 <--> API1
DB1 <--> API1
end
EXT -->|"突破防火墙"| FW --> intranet
end
subgraph zt["零信任模型"]
direction LR
API2["🔌 API"] -->|"mTLS + AuthZ"| APP2["📦 App"]
APP2 -->|"mTLS + AuthZ"| DB2["🗄️ DB"]
end
style traditional fill:#ffe0e0,stroke:#cc0000
style zt fill:#e0ffe0,stroke:#00cc00
style intranet fill:#fff0f0,stroke:#cc0000
```
## 23.2 核心原则
| 原则 | 含义 | 实践 |
|------|------|------|
| Never Trust, Always Verify | 不因位置或网络信任任何请求 | 每次请求都验证身份和权限 |
| Assume Breach | 假设攻击者已在网络内部 | 微分段、最小权限、监控 |
| Least Privilege | 只授予完成任务所需的最小权限 | RBAC/ABAC、短期凭证 |
| Verify Explicitly | 基于所有可用数据做决策 | 身份+设备+位置+行为 |
## 23.3 NIST SP 800-207 架构
```{mermaid}
flowchart TB
subgraph subject["主体"]
USER["👤 用户"]
DEVICE["💻 设备"]
WORKLOAD["⚙️ 工作负载"]
end
PEP["🛡️ PEP
策略执行点
Gateway / Sidecar"]
PA["📋 PA
策略管理员
建立/断开通信路径"]
PE["🧠 PE
策略引擎
做出访问决策"]
subgraph resources["企业资源"]
APP["📦 应用"]
DATA["🗄️ 数据"]
end
subgraph datasources["数据源"]
IDP["🔑 身份系统"]
DEV_STATUS["📱 设备状态"]
THREAT["🔍 威胁情报"]
BEHAVIOR["📊 行为分析"]
COMPLIANCE["📜 合规要求"]
end
USER --> PEP
DEVICE --> PEP
WORKLOAD --> PEP
PEP -->|"允许/拒绝"| resources
PEP <-->|"请求决策"| PA
PA <-->|"获取策略"| PE
datasources -->|"输入信号"| PE
style PEP fill:#ff9900,stroke:#cc6600,color:#fff
style PE fill:#0066cc,stroke:#003366,color:#fff
style PA fill:#009933,stroke:#006622,color:#fff
```
### 零信任决策流程
```{mermaid}
flowchart TD
REQ["📨 请求到达 PEP"] --> AUTH{"身份验证
通过?"}
AUTH -->|"否"| DENY1["❌ 拒绝访问"]
AUTH -->|"是"| DEV{"设备信任
评估通过?"}
DEV -->|"否"| DENY2["❌ 拒绝 / 降级访问"]
DEV -->|"是"| CTX{"上下文风险
评估"}
CTX -->|"高风险"| MFA["🔐 要求 Step-up MFA"]
CTX -->|"中风险"| LIMITED["⚠️ 有限访问"]
CTX -->|"低风险"| AUTHZ{"授权检查
RBAC/ABAC"}
MFA -->|"通过"| AUTHZ
MFA -->|"失败"| DENY3["❌ 拒绝访问"]
AUTHZ -->|"允许"| ALLOW["✅ 允许访问"]
AUTHZ -->|"拒绝"| DENY4["❌ 拒绝访问"]
ALLOW --> AUDIT["📝 记录审计日志"]
DENY1 --> AUDIT
DENY2 --> AUDIT
DENY3 --> AUDIT
DENY4 --> AUDIT
style ALLOW fill:#00cc00,stroke:#009900,color:#fff
style DENY1 fill:#cc0000,stroke:#990000,color:#fff
style DENY2 fill:#cc0000,stroke:#990000,color:#fff
style DENY3 fill:#cc0000,stroke:#990000,color:#fff
style DENY4 fill:#cc0000,stroke:#990000,color:#fff
```
## 23.4 零信任五大支柱
### 身份(Identity)
```{mermaid}
flowchart LR
subgraph identity["身份类型"]
HUMAN["👤 人类身份
OIDC + MFA + Passkey"]
WORKLOAD["⚙️ 工作负载身份
SPIFFE / SPIRE"]
DEVICE["📱 设备身份
MDM + 设备证书"]
end
subgraph verification["持续验证"]
LOGIN["🔑 登录时验证"]
REQUEST["🔄 每次请求验证"]
ANOMALY["⚠️ 异常行为重新验证"]
end
identity --> LOGIN --> REQUEST --> ANOMALY
ANOMALY -->|"重新认证"| LOGIN
```
### 设备(Device)
| 信任等级 | 条件 | 允许访问 |
|---------|------|---------|
| 完全信任 | 公司设备 + 合规 + 最新补丁 | 所有资源 |
| 部分信任 | 公司设备 + 部分合规 | 非敏感资源 |
| 低信任 | 个人设备 + MDM | 公开资源 |
| 不信任 | 未知设备 | 仅公开信息 |
### 网络(Network)
```{mermaid}
flowchart TB
subgraph flat["传统扁平网络"]
direction LR
SA1["Service A"] <-->|"明文"| SB1["Service B"]
SA1 <-->|"明文"| DB1["Database"]
SB1 <-->|"明文"| DB1
end
subgraph micro["微分段网络"]
direction TB
SA2["Service A"] -->|"mTLS"| SB2["Service B"]
SB2 -->|"mTLS"| DB2["Database"]
SA2 -.-x|"❌ 禁止直连"| DB2
end
style flat fill:#ffe0e0,stroke:#cc0000
style micro fill:#e0ffe0,stroke:#00cc00
```
## 23.5 BeyondCorp — Google 的零信任实践
```{mermaid}
flowchart LR
EMP["👤 员工
任意网络"]
AP["🛡️ Access Proxy
(IAP)"]
PE["🧠 策略引擎"]
RES["📦 特定资源"]
EMP -->|"HTTPS 请求"| AP
AP -->|"请求决策"| PE
PE -->|"允许"| AP
AP -->|"转发"| RES
subgraph checks["Access Proxy 检查"]
C1["🔑 用户身份"]
C2["📱 设备状态"]
C3["🌐 网络位置"]
C4["📋 请求上下文"]
end
subgraph decisions["策略引擎决策依据"]
D1["👥 用户角色"]
D2["📊 设备信任等级"]
D3["🔒 资源敏感度"]
D4["⏰ 时间 / 行为"]
end
AP --- checks
PE --- decisions
style AP fill:#ff9900,stroke:#cc6600,color:#fff
style PE fill:#0066cc,stroke:#003366,color:#fff
```
## 23.6 ZTNA vs VPN
| 维度 | 传统 VPN | ZTNA |
|------|---------|------|
| 信任模型 | 连接后信任整个网络 | 每次请求验证 |
| 访问粒度 | 网络级(IP/端口) | 应用级 |
| 攻击面 | 大(整个内网暴露) | 小(仅授权的应用) |
| 用户体验 | 需要客户端、延迟高 | 透明、低延迟 |
| 横向移动 | 容易 | 困难 |
| 可见性 | 低 | 高(每次访问都有日志) |
## 23.7 零信任成熟度模型
| 等级 | 名称 | 身份 | 设备 | 网络 | 应用 | 数据 | 可见性与分析 |
|------|------|------|------|------|------|------|-------------|
| **Level 0** | 无意识 | 密码认证,无 MFA | 无设备管理 | 扁平网络 | 无应用安全 | 无分类 | 无集中日志 |
| **Level 1** | 传统 | 密码 + 基本 MFA | 基本 MDM | 边界防火墙 | WAF | 基本分类 | 基本日志 |
| **Level 2** | 初级零信任 | SSO + MFA + RBAC | MDM + 合规检查 | 网络分段 | API Gateway | 加密存储 | SIEM 集中分析 |
| **Level 3** | 高级零信任 | ABAC + 持续验证 + SPIFFE | 实时设备评估 | 微分段 + mTLS | 服务网格 + OPA | DLP + 标签化 | 行为分析 + UEBA |
| **Level 4** | 最优化 | AI 风险评估 + 自适应认证 | 自动隔离风险设备 | 软件定义边界 | 自动策略生成 | 自动分类 + 追踪 | AI 驱动 + 自动响应 |
## 23.8 零信任实施路线图
```{mermaid}
gantt
title 零信任实施路线图(24个月)
dateFormat YYYY-MM
axisFormat %Y-%m
section 第一阶段:身份统一
部署 OIDC IdP(Keycloak) :a1, 2025-01, 2m
全面启用 MFA :a2, after a1, 2m
部署 SPIFFE/SPIRE(工作负载身份) :a3, after a1, 3m
统一身份目录 :a4, after a2, 2m
section 第二阶段:微分段
部署 Service Mesh(Istio) :b1, 2025-07, 3m
启用自动 mTLS :b2, after b1, 2m
实施 Network Policy :b3, after b1, 2m
最小权限网络策略 :b4, after b2, 2m
section 第三阶段:持续验证
部署细粒度授权(OpenFGA + OPA) :c1, 2026-01, 3m
设备信任评估 :c2, after c1, 2m
行为分析和异常检测 :c3, after c1, 3m
自适应认证 :c4, after c2, 2m
section 第四阶段:自动化响应
SOAR 自动化响应 :d1, 2026-07, 2m
AI 驱动的风险评估 :d2, after d1, 2m
实时策略调整 :d3, after d1, 3m
持续合规监控 :d4, after d2, 2m
```
## 23.9 代码示例
### Python: 基于 FastAPI 的零信任 API Gateway
```python
"""
零信任 API Gateway — 基于 FastAPI
功能:
- 设备信任评估(设备指纹 + 合规状态)
- 持续验证中间件(每次请求验证身份 + 上下文)
- 风险评分计算
- 审计日志
依赖安装:
pip install fastapi uvicorn pyjwt httpx cryptography
"""
import hashlib
import json
import logging
import time
from dataclasses import dataclass, field
from datetime import datetime, timezone
from enum import Enum
from typing import Optional
import httpx
import jwt
from fastapi import FastAPI, Request, Response, HTTPException, Depends
from fastapi.middleware.base import BaseHTTPMiddleware
from starlette.middleware.base import RequestResponseEndpoint
# ─── 日志配置 ───────────────────────────────────────────────
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [%(levelname)s] %(name)s: %(message)s",
)
logger = logging.getLogger("zt-gateway")
app = FastAPI(title="Zero Trust API Gateway", version="1.0.0")
# ─── 数据模型 ───────────────────────────────────────────────
class TrustLevel(str, Enum):
"""设备信任等级"""
FULL = "full" # 完全信任:公司设备 + 合规 + 最新补丁
PARTIAL = "partial" # 部分信任:公司设备 + 部分合规
LOW = "low" # 低信任:个人设备 + MDM
UNTRUSTED = "untrusted" # 不信任:未知设备
class RiskLevel(str, Enum):
"""风险等级"""
LOW = "low"
MEDIUM = "medium"
HIGH = "high"
CRITICAL = "critical"
@dataclass
class DeviceInfo:
"""设备信息"""
device_id: str
fingerprint: str
os: str = ""
os_version: str = ""
is_managed: bool = False
is_compliant: bool = False
last_patch_date: Optional[str] = None
trust_level: TrustLevel = TrustLevel.UNTRUSTED
@dataclass
class RequestContext:
"""请求上下文 — 用于零信任决策"""
user_id: str = ""
user_roles: list = field(default_factory=list)
device: Optional[DeviceInfo] = None
source_ip: str = ""
geo_location: str = ""
request_time: str = ""
risk_score: float = 0.0
risk_level: RiskLevel = RiskLevel.LOW
# ─── 设备信任评估 ───────────────────────────────────────────
class DeviceTrustEvaluator:
"""
设备信任评估器
根据设备属性计算信任等级
"""
# 已知的公司设备 ID 列表(生产环境应从 MDM 系统查询)
MANAGED_DEVICES = {"device-001", "device-002", "device-003"}
@classmethod
def evaluate(cls, request: Request) -> DeviceInfo:
"""从请求头中提取设备信息并评估信任等级"""
# 提取设备信息(实际场景中来自 MDM agent 或设备证书)
device_id = request.headers.get("X-Device-ID", "unknown")
user_agent = request.headers.get("User-Agent", "")
device_cert = request.headers.get("X-Device-Cert", "")
# 计算设备指纹
fingerprint_data = f"{device_id}:{user_agent}:{device_cert}"
fingerprint = hashlib.sha256(fingerprint_data.encode()).hexdigest()[:16]
device = DeviceInfo(
device_id=device_id,
fingerprint=fingerprint,
os=request.headers.get("X-Device-OS", "unknown"),
os_version=request.headers.get("X-Device-OS-Version", ""),
is_managed=device_id in cls.MANAGED_DEVICES,
is_compliant=request.headers.get("X-Device-Compliant", "false") == "true",
last_patch_date=request.headers.get("X-Device-Last-Patch", None),
)
# 评估信任等级
device.trust_level = cls._calculate_trust_level(device)
return device
@classmethod
def _calculate_trust_level(cls, device: DeviceInfo) -> TrustLevel:
"""根据设备属性计算信任等级"""
if not device.is_managed:
return TrustLevel.UNTRUSTED
if device.is_compliant and device.last_patch_date:
# 检查补丁是否在 30 天内
try:
patch_date = datetime.fromisoformat(device.last_patch_date)
days_since_patch = (datetime.now(timezone.utc) - patch_date).days
if days_since_patch <= 30:
return TrustLevel.FULL
except ValueError:
pass
return TrustLevel.PARTIAL
if device.is_compliant:
return TrustLevel.PARTIAL
return TrustLevel.LOW
# ─── 风险评分计算 ───────────────────────────────────────────
class RiskScorer:
"""
风险评分计算器
综合多个信号计算请求的风险分数(0.0 - 1.0)
"""
# 高风险地理位置(示例)
HIGH_RISK_GEOS = {"CN-UNKNOWN", "RU-UNKNOWN", "KP"}
# 高风险时间段(UTC 小时)
UNUSUAL_HOURS = set(range(0, 6)) # 凌晨 0-6 点
@classmethod
def calculate(cls, ctx: RequestContext) -> tuple[float, RiskLevel]:
"""计算风险分数和风险等级"""
score = 0.0
# 1. 设备信任等级(权重 0.3)
device_scores = {
TrustLevel.FULL: 0.0,
TrustLevel.PARTIAL: 0.1,
TrustLevel.LOW: 0.2,
TrustLevel.UNTRUSTED: 0.3,
}
if ctx.device:
score += device_scores.get(ctx.device.trust_level, 0.3)
# 2. 地理位置风险(权重 0.25)
if ctx.geo_location in cls.HIGH_RISK_GEOS:
score += 0.25
elif not ctx.geo_location:
score += 0.15 # 无法确定位置也有风险
# 3. 时间异常(权重 0.2)
try:
req_hour = datetime.fromisoformat(ctx.request_time).hour
if req_hour in cls.UNUSUAL_HOURS:
score += 0.2
except (ValueError, AttributeError):
score += 0.1
# 4. 无角色信息(权重 0.15)
if not ctx.user_roles:
score += 0.15
# 5. 未知用户(权重 0.1)
if not ctx.user_id:
score += 0.1
# 归一化到 [0, 1]
score = min(score, 1.0)
# 确定风险等级
if score >= 0.7:
level = RiskLevel.CRITICAL
elif score >= 0.5:
level = RiskLevel.HIGH
elif score >= 0.3:
level = RiskLevel.MEDIUM
else:
level = RiskLevel.LOW
return score, level
# ─── JWT 验证 ──────────────────────────────────────────────
class JWTVerifier:
"""JWT 令牌验证器"""
# 生产环境应从 JWKS 端点动态获取公钥
SECRET_KEY = "your-256-bit-secret-for-demo-only"
ALGORITHM = "HS256"
ISSUER = "https://auth.example.com"
@classmethod
def verify(cls, token: str) -> dict:
"""
验证 JWT 令牌并返回 claims
Raises: HTTPException 如果验证失败
"""
try:
payload = jwt.decode(
token,
cls.SECRET_KEY,
algorithms=[cls.ALGORITHM],
issuer=cls.ISSUER,
options={"require": ["exp", "sub", "iss"]},
)
return payload
except jwt.ExpiredSignatureError:
raise HTTPException(status_code=401, detail="Token has expired")
except jwt.InvalidIssuerError:
raise HTTPException(status_code=401, detail="Invalid token issuer")
except jwt.InvalidTokenError as e:
raise HTTPException(status_code=401, detail=f"Invalid token: {e}")
# ─── 审计日志 ──────────────────────────────────────────────
class AuditLogger:
"""零信任审计日志记录器"""
@staticmethod
def log_access(
ctx: RequestContext,
path: str,
method: str,
decision: str,
reason: str = "",
status_code: int = 200,
):
"""记录访问审计日志(结构化 JSON)"""
entry = {
"timestamp": datetime.now(timezone.utc).isoformat(),
"event_type": "access_decision",
"user_id": ctx.user_id,
"source_ip": ctx.source_ip,
"device_id": ctx.device.device_id if ctx.device else "unknown",
"device_trust": ctx.device.trust_level.value if ctx.device else "unknown",
"path": path,
"method": method,
"risk_score": ctx.risk_score,
"risk_level": ctx.risk_level.value,
"decision": decision,
"reason": reason,
"status_code": status_code,
}
logger.info(f"AUDIT: {json.dumps(entry)}")
# ─── 零信任中间件 ──────────────────────────────────────────
class ZeroTrustMiddleware(BaseHTTPMiddleware):
"""
零信任持续验证中间件
每个请求都经过:身份验证 → 设备评估 → 风险计算 → 授权决策
"""
# 不需要认证的路径
PUBLIC_PATHS = {"/health", "/metrics", "/docs", "/openapi.json"}
# 资源敏感度映射
RESOURCE_SENSITIVITY = {
"/api/admin": "high",
"/api/users": "medium",
"/api/public": "low",
}
async def dispatch(
self, request: Request, call_next: RequestResponseEndpoint
) -> Response:
start_time = time.time()
# 1. 公开路径直接放行
if request.url.path in self.PUBLIC_PATHS:
return await call_next(request)
# 2. 构建请求上下文
ctx = RequestContext(
source_ip=request.client.host if request.client else "unknown",
geo_location=request.headers.get("X-Geo-Location", ""),
request_time=datetime.now(timezone.utc).isoformat(),
)
# 3. 身份验证(JWT)
auth_header = request.headers.get("Authorization", "")
if not auth_header.startswith("Bearer "):
AuditLogger.log_access(
ctx, request.url.path, request.method,
decision="DENY", reason="missing_token", status_code=401,
)
raise HTTPException(status_code=401, detail="Missing Bearer token")
token = auth_header[7:]
claims = JWTVerifier.verify(token)
ctx.user_id = claims.get("sub", "")
ctx.user_roles = claims.get("roles", [])
# 4. 设备信任评估
ctx.device = DeviceTrustEvaluator.evaluate(request)
# 5. 风险评分计算
ctx.risk_score, ctx.risk_level = RiskScorer.calculate(ctx)
# 6. 基于风险的访问决策
sensitivity = self._get_resource_sensitivity(request.url.path)
if ctx.risk_level == RiskLevel.CRITICAL:
AuditLogger.log_access(
ctx, request.url.path, request.method,
decision="DENY", reason="critical_risk", status_code=403,
)
raise HTTPException(
status_code=403,
detail="Access denied: risk level too high",
)
if ctx.risk_level == RiskLevel.HIGH and sensitivity == "high":
AuditLogger.log_access(
ctx, request.url.path, request.method,
decision="DENY", reason="high_risk_sensitive_resource",
status_code=403,
)
raise HTTPException(
status_code=403,
detail="Access denied: high risk for sensitive resource. "
"Please use a managed device or contact IT.",
)
if (
ctx.device
and ctx.device.trust_level == TrustLevel.UNTRUSTED
and sensitivity != "low"
):
AuditLogger.log_access(
ctx, request.url.path, request.method,
decision="DENY", reason="untrusted_device", status_code=403,
)
raise HTTPException(
status_code=403,
detail="Access denied: untrusted device cannot access this resource",
)
# 7. 将上下文注入请求 state,供下游使用
request.state.zt_context = ctx
# 8. 执行请求
response = await call_next(request)
# 9. 记录审计日志
duration_ms = (time.time() - start_time) * 1000
AuditLogger.log_access(
ctx, request.url.path, request.method,
decision="ALLOW", reason="passed_all_checks",
status_code=response.status_code,
)
# 10. 添加零信任响应头
response.headers["X-ZT-Risk-Score"] = str(round(ctx.risk_score, 2))
response.headers["X-ZT-Risk-Level"] = ctx.risk_level.value
response.headers["X-ZT-Device-Trust"] = (
ctx.device.trust_level.value if ctx.device else "unknown"
)
response.headers["X-ZT-Duration-Ms"] = str(round(duration_ms, 2))
return response
def _get_resource_sensitivity(self, path: str) -> str:
"""获取资源敏感度"""
for prefix, sensitivity in self.RESOURCE_SENSITIVITY.items():
if path.startswith(prefix):
return sensitivity
return "medium" # 默认中等敏感度
# ─── 注册中间件 ────────────────────────────────────────────
app.add_middleware(ZeroTrustMiddleware)
# ─── API 端点 ──────────────────────────────────────────────
@app.get("/health")
async def health():
"""健康检查(公开端点)"""
return {"status": "healthy", "service": "zt-gateway"}
@app.get("/api/public/info")
async def public_info(request: Request):
"""公开信息(低敏感度)"""
ctx: RequestContext = request.state.zt_context
return {
"message": "Public information",
"your_risk_score": ctx.risk_score,
"device_trust": ctx.device.trust_level.value if ctx.device else "unknown",
}
@app.get("/api/users/me")
async def get_current_user(request: Request):
"""获取当前用户信息(中等敏感度)"""
ctx: RequestContext = request.state.zt_context
return {
"user_id": ctx.user_id,
"roles": ctx.user_roles,
"device_trust": ctx.device.trust_level.value if ctx.device else "unknown",
"risk_level": ctx.risk_level.value,
}
@app.get("/api/admin/dashboard")
async def admin_dashboard(request: Request):
"""管理面板(高敏感度,需要 admin 角色)"""
ctx: RequestContext = request.state.zt_context
if "admin" not in ctx.user_roles:
AuditLogger.log_access(
ctx, request.url.path, request.method,
decision="DENY", reason="insufficient_role", status_code=403,
)
raise HTTPException(status_code=403, detail="Admin role required")
return {
"message": "Welcome to admin dashboard",
"user_id": ctx.user_id,
"risk_score": ctx.risk_score,
}
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8080)
```
### Java: Spring Cloud Gateway 零信任过滤器
```java
/**
* 零信任 Spring Cloud Gateway 过滤器
* 功能:JWT 验证 + 设备指纹 + 风险评分
*
* 依赖(build.gradle):
* implementation 'org.springframework.cloud:spring-cloud-starter-gateway'
* implementation 'io.jsonwebtoken:jjwt-api:0.12.5'
* runtimeOnly 'io.jsonwebtoken:jjwt-impl:0.12.5'
* runtimeOnly 'io.jsonwebtoken:jjwt-jackson:0.12.5'
*/
package com.example.zerotrust.gateway;
import io.jsonwebtoken.Claims;
import io.jsonwebtoken.JwtException;
import io.jsonwebtoken.Jwts;
import io.jsonwebtoken.security.Keys;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.cloud.gateway.filter.GatewayFilterChain;
import org.springframework.cloud.gateway.filter.GlobalFilter;
import org.springframework.core.Ordered;
import org.springframework.http.HttpHeaders;
import org.springframework.http.HttpStatus;
import org.springframework.http.server.reactive.ServerHttpRequest;
import org.springframework.stereotype.Component;
import org.springframework.web.server.ServerWebExchange;
import reactor.core.publisher.Mono;
import javax.crypto.SecretKey;
import java.nio.charset.StandardCharsets;
import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException;
import java.time.Instant;
import java.time.LocalTime;
import java.util.*;
/**
* 零信任全局过滤器
* 对每个请求执行:身份验证 → 设备评估 → 风险计算 → 访问决策
*/
@Component
public class ZeroTrustGatewayFilter implements GlobalFilter, Ordered {
private static final Logger log = LoggerFactory.getLogger(ZeroTrustGatewayFilter.class);
// 公开路径(不需要认证)
private static final Set PUBLIC_PATHS = Set.of(
"/health", "/metrics", "/actuator/health"
);
// JWT 密钥(生产环境应使用 RSA/EC 公钥,从 JWKS 端点获取)
private static final SecretKey JWT_KEY = Keys.hmacShaKeyFor(
"your-256-bit-secret-key-for-demo-only!!".getBytes(StandardCharsets.UTF_8)
);
// 已知的公司管理设备
private static final Set MANAGED_DEVICES = Set.of(
"device-001", "device-002", "device-003"
);
@Override
public int getOrder() {
return -1; // 最高优先级
}
@Override
public Mono filter(ServerWebExchange exchange, GatewayFilterChain chain) {
ServerHttpRequest request = exchange.getRequest();
String path = request.getURI().getPath();
String method = request.getMethod().name();
// 1. 公开路径直接放行
if (PUBLIC_PATHS.contains(path)) {
return chain.filter(exchange);
}
// 2. 提取并验证 JWT
String authHeader = request.getHeaders().getFirst(HttpHeaders.AUTHORIZATION);
if (authHeader == null || !authHeader.startsWith("Bearer ")) {
log.warn("AUDIT: DENY path={} reason=missing_token ip={}",
path, getClientIp(request));
return rejectRequest(exchange, HttpStatus.UNAUTHORIZED, "Missing Bearer token");
}
Claims claims;
try {
claims = Jwts.parser()
.verifyWith(JWT_KEY)
.requireIssuer("https://auth.example.com")
.build()
.parseSignedClaims(authHeader.substring(7))
.getPayload();
} catch (JwtException e) {
log.warn("AUDIT: DENY path={} reason=invalid_token error={}", path, e.getMessage());
return rejectRequest(exchange, HttpStatus.UNAUTHORIZED, "Invalid token: " + e.getMessage());
}
String userId = claims.getSubject();
@SuppressWarnings("unchecked")
List roles = claims.get("roles", List.class);
if (roles == null) roles = Collections.emptyList();
// 3. 设备信任评估
DeviceInfo device = evaluateDevice(request);
// 4. 风险评分计算
RiskAssessment risk = calculateRisk(device, request, roles);
// 5. 访问决策
String sensitivity = getResourceSensitivity(path);
if (risk.score >= 0.7) {
log.warn("AUDIT: DENY user={} path={} risk={} reason=critical_risk device={}",
userId, path, risk.score, device.deviceId);
return rejectRequest(exchange, HttpStatus.FORBIDDEN,
"Access denied: risk level too high");
}
if (risk.score >= 0.5 && "high".equals(sensitivity)) {
log.warn("AUDIT: DENY user={} path={} risk={} reason=high_risk_sensitive",
userId, path, risk.score);
return rejectRequest(exchange, HttpStatus.FORBIDDEN,
"Access denied: high risk for sensitive resource");
}
if (device.trustLevel == TrustLevel.UNTRUSTED && !"low".equals(sensitivity)) {
log.warn("AUDIT: DENY user={} path={} reason=untrusted_device device={}",
userId, path, device.deviceId);
return rejectRequest(exchange, HttpStatus.FORBIDDEN,
"Access denied: untrusted device");
}
// 6. 记录审计日志
log.info("AUDIT: ALLOW user={} path={} method={} risk={} device_trust={} ip={}",
userId, path, method, risk.score, device.trustLevel, getClientIp(request));
// 7. 将零信任上下文注入下游请求头
ServerHttpRequest mutatedRequest = request.mutate()
.header("X-ZT-User-Id", userId)
.header("X-ZT-User-Roles", String.join(",", roles))
.header("X-ZT-Device-Trust", device.trustLevel.name())
.header("X-ZT-Device-Fingerprint", device.fingerprint)
.header("X-ZT-Risk-Score", String.valueOf(risk.score))
.header("X-ZT-Risk-Level", risk.level)
.build();
return chain.filter(exchange.mutate().request(mutatedRequest).build());
}
// ─── 设备信任评估 ──────────────────────────────────────
private DeviceInfo evaluateDevice(ServerHttpRequest request) {
String deviceId = getHeader(request, "X-Device-ID", "unknown");
String userAgent = getHeader(request, "User-Agent", "");
String deviceCert = getHeader(request, "X-Device-Cert", "");
boolean isCompliant = "true".equals(getHeader(request, "X-Device-Compliant", "false"));
boolean isManaged = MANAGED_DEVICES.contains(deviceId);
// 计算设备指纹
String fingerprint = computeFingerprint(deviceId, userAgent, deviceCert);
// 确定信任等级
TrustLevel trustLevel;
if (!isManaged) {
trustLevel = TrustLevel.UNTRUSTED;
} else if (isCompliant) {
trustLevel = TrustLevel.FULL;
} else {
trustLevel = TrustLevel.PARTIAL;
}
return new DeviceInfo(deviceId, fingerprint, isManaged, isCompliant, trustLevel);
}
// ─── 风险评分 ──────────────────────────────────────────
private RiskAssessment calculateRisk(
DeviceInfo device, ServerHttpRequest request, List roles) {
double score = 0.0;
// 设备信任(权重 0.3)
switch (device.trustLevel) {
case UNTRUSTED -> score += 0.3;
case LOW -> score += 0.2;
case PARTIAL -> score += 0.1;
case FULL -> score += 0.0;
}
// 时间异常(权重 0.2)— 凌晨 0-6 点
int hour = LocalTime.now().getHour();
if (hour >= 0 && hour < 6) {
score += 0.2;
}
// 无角色(权重 0.15)
if (roles.isEmpty()) {
score += 0.15;
}
// 地理位置风险(权重 0.25)
String geo = getHeader(request, "X-Geo-Location", "");
if (geo.isEmpty()) {
score += 0.15;
}
score = Math.min(score, 1.0);
String level;
if (score >= 0.7) level = "critical";
else if (score >= 0.5) level = "high";
else if (score >= 0.3) level = "medium";
else level = "low";
return new RiskAssessment(Math.round(score * 100.0) / 100.0, level);
}
// ─── 辅助方法 ──────────────────────────────────────────
private String getResourceSensitivity(String path) {
if (path.startsWith("/api/admin")) return "high";
if (path.startsWith("/api/users")) return "medium";
if (path.startsWith("/api/public")) return "low";
return "medium";
}
private Mono rejectRequest(
ServerWebExchange exchange, HttpStatus status, String message) {
exchange.getResponse().setStatusCode(status);
exchange.getResponse().getHeaders().add("Content-Type", "application/json");
byte[] body = ("{\"error\":\"" + message + "\"}").getBytes(StandardCharsets.UTF_8);
return exchange.getResponse().writeWith(
Mono.just(exchange.getResponse().bufferFactory().wrap(body))
);
}
private String getClientIp(ServerHttpRequest request) {
String xff = request.getHeaders().getFirst("X-Forwarded-For");
if (xff != null && !xff.isEmpty()) {
return xff.split(",")[0].trim();
}
return request.getRemoteAddress() != null
? request.getRemoteAddress().getAddress().getHostAddress()
: "unknown";
}
private String getHeader(ServerHttpRequest request, String name, String defaultValue) {
String value = request.getHeaders().getFirst(name);
return value != null ? value : defaultValue;
}
private String computeFingerprint(String deviceId, String userAgent, String cert) {
try {
MessageDigest md = MessageDigest.getInstance("SHA-256");
md.update((deviceId + ":" + userAgent + ":" + cert).getBytes(StandardCharsets.UTF_8));
byte[] hash = md.digest();
StringBuilder sb = new StringBuilder();
for (int i = 0; i < 8; i++) {
sb.append(String.format("%02x", hash[i]));
}
return sb.toString();
} catch (NoSuchAlgorithmException e) {
return "unknown";
}
}
// ─── 内部数据类 ────────────────────────────────────────
enum TrustLevel { FULL, PARTIAL, LOW, UNTRUSTED }
record DeviceInfo(
String deviceId,
String fingerprint,
boolean isManaged,
boolean isCompliant,
TrustLevel trustLevel
) {}
record RiskAssessment(double score, String level) {}
}
```
### Go: 零信任 Proxy(mTLS + OPA 策略检查 + 审计日志)
```go
/*
零信任反向代理 — Go 实现
功能:
- mTLS 双向认证(验证客户端证书)
- OPA 策略检查(通过 HTTP API 调用 OPA)
- 结构化审计日志(JSON 格式)
- 设备信任评估
- 请求上下文传播
运行方式:
go run main.go \
--listen=:8443 \
--backend=http://localhost:8080 \
--opa-url=http://localhost:8181 \
--tls-cert=server.crt \
--tls-key=server.key \
--ca-cert=ca.crt
*/
package main
import (
"bytes"
"context"
"crypto/sha256"
"crypto/tls"
"crypto/x509"
"encoding/json"
"flag"
"fmt"
"io"
"log/slog"
"net/http"
"net/http/httputil"
"net/url"
"os"
"os/signal"
"strings"
"syscall"
"time"
)
// ─── 配置 ──────────────────────────────────────────────────
type Config struct {
ListenAddr string
BackendURL string
OPAUrl string
TLSCert string
TLSKey string
CACert string
}
func parseFlags() *Config {
cfg := &Config{}
flag.StringVar(&cfg.ListenAddr, "listen", ":8443", "Listen address")
flag.StringVar(&cfg.BackendURL, "backend", "http://localhost:8080", "Backend URL")
flag.StringVar(&cfg.OPAUrl, "opa-url", "http://localhost:8181", "OPA server URL")
flag.StringVar(&cfg.TLSCert, "tls-cert", "server.crt", "TLS certificate file")
flag.StringVar(&cfg.TLSKey, "tls-key", "server.key", "TLS private key file")
flag.StringVar(&cfg.CACert, "ca-cert", "ca.crt", "CA certificate for client verification")
flag.Parse()
return cfg
}
// ─── 数据模型 ──────────────────────────────────────────────
// TrustLevel 表示设备信任等级
type TrustLevel string
const (
TrustFull TrustLevel = "full"
TrustPartial TrustLevel = "partial"
TrustLow TrustLevel = "low"
TrustUntrusted TrustLevel = "untrusted"
)
// RequestContext 零信任请求上下文
type RequestContext struct {
UserID string `json:"user_id"`
ClientCertSAN string `json:"client_cert_san"`
DeviceID string `json:"device_id"`
DeviceFingerprint string `json:"device_fingerprint"`
DeviceTrust TrustLevel `json:"device_trust"`
SourceIP string `json:"source_ip"`
Path string `json:"path"`
Method string `json:"method"`
RiskScore float64 `json:"risk_score"`
Timestamp time.Time `json:"timestamp"`
}
// AuditEntry 审计日志条目
type AuditEntry struct {
Timestamp string `json:"timestamp"`
EventType string `json:"event_type"`
UserID string `json:"user_id"`
CertSAN string `json:"cert_san"`
DeviceID string `json:"device_id"`
DeviceTrust TrustLevel `json:"device_trust"`
SourceIP string `json:"source_ip"`
Path string `json:"path"`
Method string `json:"method"`
Decision string `json:"decision"`
Reason string `json:"reason"`
RiskScore float64 `json:"risk_score"`
DurationMs float64 `json:"duration_ms"`
StatusCode int `json:"status_code"`
}
// OPAInput OPA 策略查询输入
type OPAInput struct {
Input struct {
User string `json:"user"`
CertSAN string `json:"cert_san"`
Path string `json:"path"`
Method string `json:"method"`
DeviceTrust TrustLevel `json:"device_trust"`
SourceIP string `json:"source_ip"`
RiskScore float64 `json:"risk_score"`
} `json:"input"`
}
// OPAResult OPA 策略查询结果
type OPAResult struct {
Result struct {
Allow bool `json:"allow"`
Reason string `json:"reason,omitempty"`
} `json:"result"`
}
// ─── 审计日志记录器 ────────────────────────────────────────
type AuditLogger struct {
logger *slog.Logger
}
func NewAuditLogger() *AuditLogger {
// 使用 JSON 格式输出到 stdout
handler := slog.NewJSONHandler(os.Stdout, &slog.HandlerOptions{
Level: slog.LevelInfo,
})
return &AuditLogger{
logger: slog.New(handler),
}
}
func (a *AuditLogger) Log(entry AuditEntry) {
a.logger.Info("access_decision",
"event_type", entry.EventType,
"user_id", entry.UserID,
"cert_san", entry.CertSAN,
"device_id", entry.DeviceID,
"device_trust", entry.DeviceTrust,
"source_ip", entry.SourceIP,
"path", entry.Path,
"method", entry.Method,
"decision", entry.Decision,
"reason", entry.Reason,
"risk_score", entry.RiskScore,
"duration_ms", entry.DurationMs,
"status_code", entry.StatusCode,
)
}
// ─── 设备信任评估 ──────────────────────────────────────────
// 已知的公司管理设备(生产环境应查询 MDM 系统)
var managedDevices = map[string]bool{
"device-001": true,
"device-002": true,
"device-003": true,
}
func evaluateDeviceTrust(r *http.Request) (string, string, TrustLevel) {
deviceID := r.Header.Get("X-Device-ID")
if deviceID == "" {
deviceID = "unknown"
}
userAgent := r.Header.Get("User-Agent")
isCompliant := r.Header.Get("X-Device-Compliant") == "true"
// 计算设备指纹
h := sha256.New()
h.Write([]byte(fmt.Sprintf("%s:%s", deviceID, userAgent)))
fingerprint := fmt.Sprintf("%x", h.Sum(nil))[:16]
// 评估信任等级
isManaged := managedDevices[deviceID]
var trust TrustLevel
switch {
case !isManaged:
trust = TrustUntrusted
case isManaged && isCompliant:
trust = TrustFull
case isManaged:
trust = TrustPartial
default:
trust = TrustLow
}
return deviceID, fingerprint, trust
}
// ─── OPA 策略检查 ──────────────────────────────────────────
type OPAChecker struct {
baseURL string
httpClient *http.Client
}
func NewOPAChecker(baseURL string) *OPAChecker {
return &OPAChecker{
baseURL: baseURL,
httpClient: &http.Client{
Timeout: 2 * time.Second,
},
}
}
// Check 向 OPA 发送策略查询
func (o *OPAChecker) Check(ctx context.Context, reqCtx *RequestContext) (bool, string, error) {
// 构建 OPA 输入
input := OPAInput{}
input.Input.User = reqCtx.UserID
input.Input.CertSAN = reqCtx.ClientCertSAN
input.Input.Path = reqCtx.Path
input.Input.Method = reqCtx.Method
input.Input.DeviceTrust = reqCtx.DeviceTrust
input.Input.SourceIP = reqCtx.SourceIP
input.Input.RiskScore = reqCtx.RiskScore
body, err := json.Marshal(input)
if err != nil {
return false, "marshal_error", fmt.Errorf("failed to marshal OPA input: %w", err)
}
// 发送请求到 OPA
opaURL := fmt.Sprintf("%s/v1/data/zerotrust/authz", o.baseURL)
req, err := http.NewRequestWithContext(ctx, http.MethodPost, opaURL, bytes.NewReader(body))
if err != nil {
return false, "request_error", fmt.Errorf("failed to create OPA request: %w", err)
}
req.Header.Set("Content-Type", "application/json")
resp, err := o.httpClient.Do(req)
if err != nil {
// OPA 不可用时,默认拒绝(fail-close)
return false, "opa_unavailable", fmt.Errorf("OPA request failed: %w", err)
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
respBody, _ := io.ReadAll(resp.Body)
return false, "opa_error", fmt.Errorf("OPA returned status %d: %s", resp.StatusCode, respBody)
}
var result OPAResult
if err := json.NewDecoder(resp.Body).Decode(&result); err != nil {
return false, "decode_error", fmt.Errorf("failed to decode OPA response: %w", err)
}
return result.Result.Allow, result.Result.Reason, nil
}
// ─── 零信任代理 ────────────────────────────────────────────
type ZeroTrustProxy struct {
proxy *httputil.ReverseProxy
opaChecker *OPAChecker
audit *AuditLogger
}
func NewZeroTrustProxy(backendURL string, opaURL string) (*ZeroTrustProxy, error) {
target, err := url.Parse(backendURL)
if err != nil {
return nil, fmt.Errorf("invalid backend URL: %w", err)
}
proxy := httputil.NewSingleHostReverseProxy(target)
// 自定义错误处理
proxy.ErrorHandler = func(w http.ResponseWriter, r *http.Request, err error) {
slog.Error("proxy error", "error", err, "path", r.URL.Path)
http.Error(w, `{"error":"backend unavailable"}`, http.StatusBadGateway)
}
return &ZeroTrustProxy{
proxy: proxy,
opaChecker: NewOPAChecker(opaURL),
audit: NewAuditLogger(),
}, nil
}
// ServeHTTP 处理每个请求的零信任验证
func (ztp *ZeroTrustProxy) ServeHTTP(w http.ResponseWriter, r *http.Request) {
start := time.Now()
// 1. 公开路径直接放行
if r.URL.Path == "/health" || r.URL.Path == "/metrics" {
ztp.proxy.ServeHTTP(w, r)
return
}
// 2. 提取客户端证书信息(mTLS)
var certSAN string
var userID string
if r.TLS != nil && len(r.TLS.PeerCertificates) > 0 {
cert := r.TLS.PeerCertificates[0]
// 使用 SPIFFE ID 或 DNS SAN 作为身份
for _, uri := range cert.URIs {
if strings.HasPrefix(uri.String(), "spiffe://") {
certSAN = uri.String()
break
}
}
if certSAN == "" && len(cert.DNSNames) > 0 {
certSAN = cert.DNSNames[0]
}
userID = cert.Subject.CommonName
} else {
// 没有客户端证书 → 拒绝
ztp.audit.Log(AuditEntry{
Timestamp: time.Now().UTC().Format(time.RFC3339),
EventType: "access_decision",
SourceIP: getClientIP(r),
Path: r.URL.Path,
Method: r.Method,
Decision: "DENY",
Reason: "no_client_certificate",
})
http.Error(w, `{"error":"mTLS required: no client certificate"}`, http.StatusUnauthorized)
return
}
// 3. 设备信任评估
deviceID, fingerprint, deviceTrust := evaluateDeviceTrust(r)
// 4. 构建请求上下文
reqCtx := &RequestContext{
UserID: userID,
ClientCertSAN: certSAN,
DeviceID: deviceID,
DeviceFingerprint: fingerprint,
DeviceTrust: deviceTrust,
SourceIP: getClientIP(r),
Path: r.URL.Path,
Method: r.Method,
Timestamp: time.Now().UTC(),
}
// 5. OPA 策略检查
allowed, reason, err := ztp.opaChecker.Check(r.Context(), reqCtx)
if err != nil {
slog.Warn("OPA check failed, defaulting to deny",
"error", err, "path", r.URL.Path, "user", userID)
// fail-close: OPA 不可用时拒绝请求
ztp.audit.Log(AuditEntry{
Timestamp: time.Now().UTC().Format(time.RFC3339),
EventType: "access_decision",
UserID: userID,
CertSAN: certSAN,
DeviceID: deviceID,
DeviceTrust: deviceTrust,
SourceIP: getClientIP(r),
Path: r.URL.Path,
Method: r.Method,
Decision: "DENY",
Reason: fmt.Sprintf("opa_error: %v", err),
DurationMs: float64(time.Since(start).Milliseconds()),
})
http.Error(w, `{"error":"authorization service unavailable"}`, http.StatusForbidden)
return
}
if !allowed {
ztp.audit.Log(AuditEntry{
Timestamp: time.Now().UTC().Format(time.RFC3339),
EventType: "access_decision",
UserID: userID,
CertSAN: certSAN,
DeviceID: deviceID,
DeviceTrust: deviceTrust,
SourceIP: getClientIP(r),
Path: r.URL.Path,
Method: r.Method,
Decision: "DENY",
Reason: reason,
DurationMs: float64(time.Since(start).Milliseconds()),
StatusCode: http.StatusForbidden,
})
http.Error(w, fmt.Sprintf(`{"error":"access denied","reason":"%s"}`, reason),
http.StatusForbidden)
return
}
// 6. 注入零信任上下文头到下游请求
r.Header.Set("X-ZT-User-Id", userID)
r.Header.Set("X-ZT-Cert-SAN", certSAN)
r.Header.Set("X-ZT-Device-ID", deviceID)
r.Header.Set("X-ZT-Device-Fingerprint", fingerprint)
r.Header.Set("X-ZT-Device-Trust", string(deviceTrust))
r.Header.Set("X-ZT-Source-IP", getClientIP(r))
// 7. 转发请求到后端
recorder := &statusRecorder{ResponseWriter: w, statusCode: http.StatusOK}
ztp.proxy.ServeHTTP(recorder, r)
// 8. 记录审计日志
duration := time.Since(start)
ztp.audit.Log(AuditEntry{
Timestamp: time.Now().UTC().Format(time.RFC3339),
EventType: "access_decision",
UserID: userID,
CertSAN: certSAN,
DeviceID: deviceID,
DeviceTrust: deviceTrust,
SourceIP: getClientIP(r),
Path: r.URL.Path,
Method: r.Method,
Decision: "ALLOW",
Reason: reason,
RiskScore: reqCtx.RiskScore,
DurationMs: float64(duration.Milliseconds()),
StatusCode: recorder.statusCode,
})
}
// statusRecorder 用于捕获响应状态码
type statusRecorder struct {
http.ResponseWriter
statusCode int
}
func (sr *statusRecorder) WriteHeader(code int) {
sr.statusCode = code
sr.ResponseWriter.WriteHeader(code)
}
// getClientIP 提取客户端真实 IP
func getClientIP(r *http.Request) string {
if xff := r.Header.Get("X-Forwarded-For"); xff != "" {
parts := strings.SplitN(xff, ",", 2)
return strings.TrimSpace(parts[0])
}
if host, _, err := strings.Cut(r.RemoteAddr, ":"); err {
return host
}
return r.RemoteAddr
}
// ─── 主函数 ────────────────────────────────────────────────
func main() {
cfg := parseFlags()
// 创建零信任代理
proxy, err := NewZeroTrustProxy(cfg.BackendURL, cfg.OPAUrl)
if err != nil {
slog.Error("failed to create proxy", "error", err)
os.Exit(1)
}
// 加载 CA 证书(用于验证客户端证书)
caCert, err := os.ReadFile(cfg.CACert)
if err != nil {
slog.Error("failed to read CA cert", "error", err, "path", cfg.CACert)
os.Exit(1)
}
caCertPool := x509.NewCertPool()
if !caCertPool.AppendCertsFromPEM(caCert) {
slog.Error("failed to parse CA cert")
os.Exit(1)
}
// 配置 mTLS
tlsConfig := &tls.Config{
ClientCAs: caCertPool,
ClientAuth: tls.RequireAndVerifyClientCert, // 强制客户端证书
MinVersion: tls.VersionTLS13, // 最低 TLS 1.3
CipherSuites: []uint16{
tls.TLS_AES_256_GCM_SHA384,
tls.TLS_CHACHA20_POLY1305_SHA256,
},
}
server := &http.Server{
Addr: cfg.ListenAddr,
Handler: proxy,
TLSConfig: tlsConfig,
ReadTimeout: 30 * time.Second,
WriteTimeout: 30 * time.Second,
IdleTimeout: 120 * time.Second,
}
// 优雅关闭
go func() {
sigCh := make(chan os.Signal, 1)
signal.Notify(sigCh, syscall.SIGINT, syscall.SIGTERM)
sig := <-sigCh
slog.Info("received signal, shutting down", "signal", sig)
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
if err := server.Shutdown(ctx); err != nil {
slog.Error("shutdown error", "error", err)
}
}()
slog.Info("Zero Trust Proxy starting",
"listen", cfg.ListenAddr,
"backend", cfg.BackendURL,
"opa", cfg.OPAUrl,
)
if err := server.ListenAndServeTLS(cfg.TLSCert, cfg.TLSKey); err != http.ErrServerClosed {
slog.Error("server error", "error", err)
os.Exit(1)
}
slog.Info("server stopped gracefully")
}
```
**配套 OPA 策略文件** (`zerotrust/authz.rego`):
```rego
# OPA 零信任授权策略
# 路径: zerotrust/authz.rego
package zerotrust.authz
import rego.v1
default allow := false
default reason := "denied_by_default"
# 规则 1: 完全信任的设备可以访问所有资源
allow if {
input.device_trust == "full"
input.risk_score < 0.7
}
reason := "full_trust_device" if {
input.device_trust == "full"
input.risk_score < 0.7
}
# 规则 2: 部分信任的设备只能访问非管理端点
allow if {
input.device_trust == "partial"
not startswith(input.path, "/api/admin")
input.risk_score < 0.5
}
reason := "partial_trust_non_admin" if {
input.device_trust == "partial"
not startswith(input.path, "/api/admin")
input.risk_score < 0.5
}
# 规则 3: 只读操作对低信任设备开放公开端点
allow if {
input.device_trust == "low"
startswith(input.path, "/api/public")
input.method == "GET"
}
reason := "low_trust_public_readonly" if {
input.device_trust == "low"
startswith(input.path, "/api/public")
input.method == "GET"
}
# 规则 4: 不信任的设备一律拒绝
reason := "untrusted_device" if {
input.device_trust == "untrusted"
}
# 规则 5: 高风险分数一律拒绝
reason := "high_risk_score" if {
input.risk_score >= 0.7
}
```
## 23.9 小结
- **零信任** 的核心是"永不信任,始终验证"
- **NIST SP 800-207** 定义了标准的零信任架构(PE/PA/PEP)
- 零信任有**五大支柱**:身份、设备、网络、应用、数据
- **BeyondCorp** 是 Google 的零信任实践,用 Access Proxy 替代 VPN
- **ZTNA** 比 VPN 更安全、更细粒度、更好的用户体验
- 零信任实施是**渐进式**的,从身份统一开始,逐步深入