# 第二十三章:零信任架构 > "永不信任,始终验证 — 这不是偏执,而是现代安全的基本原则。" ```{mermaid} mindmap root((零信任架构)) 核心原则 永不信任 假设入侵 最小权限 显式验证 NIST 架构 PE 策略引擎 PA 策略管理 PEP 执行点 五大支柱 身份 设备 网络 应用 数据 实践 BeyondCorp ZTNA 成熟度模型 ``` ## 23.1 零信任的起源 2010 年,Forrester 的 John Kindervag 提出零信任模型,核心洞察: > **传统的"内网可信、外网不可信"的假设已经过时。** ```{mermaid} flowchart TB subgraph traditional["传统模型(城堡与护城河)"] direction TB FW["🔥 防火墙"] EXT["外网 = 不可信 ❌"] subgraph intranet["内网 = 可信 ✅"] DB1["🗄️ DB"] APP1["📦 App"] API1["🔌 API"] DB1 <--> APP1 APP1 <--> API1 DB1 <--> API1 end EXT -->|"突破防火墙"| FW --> intranet end subgraph zt["零信任模型"] direction LR API2["🔌 API"] -->|"mTLS + AuthZ"| APP2["📦 App"] APP2 -->|"mTLS + AuthZ"| DB2["🗄️ DB"] end style traditional fill:#ffe0e0,stroke:#cc0000 style zt fill:#e0ffe0,stroke:#00cc00 style intranet fill:#fff0f0,stroke:#cc0000 ``` ## 23.2 核心原则 | 原则 | 含义 | 实践 | |------|------|------| | Never Trust, Always Verify | 不因位置或网络信任任何请求 | 每次请求都验证身份和权限 | | Assume Breach | 假设攻击者已在网络内部 | 微分段、最小权限、监控 | | Least Privilege | 只授予完成任务所需的最小权限 | RBAC/ABAC、短期凭证 | | Verify Explicitly | 基于所有可用数据做决策 | 身份+设备+位置+行为 | ## 23.3 NIST SP 800-207 架构 ```{mermaid} flowchart TB subgraph subject["主体"] USER["👤 用户"] DEVICE["💻 设备"] WORKLOAD["⚙️ 工作负载"] end PEP["🛡️ PEP
策略执行点
Gateway / Sidecar"] PA["📋 PA
策略管理员
建立/断开通信路径"] PE["🧠 PE
策略引擎
做出访问决策"] subgraph resources["企业资源"] APP["📦 应用"] DATA["🗄️ 数据"] end subgraph datasources["数据源"] IDP["🔑 身份系统"] DEV_STATUS["📱 设备状态"] THREAT["🔍 威胁情报"] BEHAVIOR["📊 行为分析"] COMPLIANCE["📜 合规要求"] end USER --> PEP DEVICE --> PEP WORKLOAD --> PEP PEP -->|"允许/拒绝"| resources PEP <-->|"请求决策"| PA PA <-->|"获取策略"| PE datasources -->|"输入信号"| PE style PEP fill:#ff9900,stroke:#cc6600,color:#fff style PE fill:#0066cc,stroke:#003366,color:#fff style PA fill:#009933,stroke:#006622,color:#fff ``` ### 零信任决策流程 ```{mermaid} flowchart TD REQ["📨 请求到达 PEP"] --> AUTH{"身份验证
通过?"} AUTH -->|"否"| DENY1["❌ 拒绝访问"] AUTH -->|"是"| DEV{"设备信任
评估通过?"} DEV -->|"否"| DENY2["❌ 拒绝 / 降级访问"] DEV -->|"是"| CTX{"上下文风险
评估"} CTX -->|"高风险"| MFA["🔐 要求 Step-up MFA"] CTX -->|"中风险"| LIMITED["⚠️ 有限访问"] CTX -->|"低风险"| AUTHZ{"授权检查
RBAC/ABAC"} MFA -->|"通过"| AUTHZ MFA -->|"失败"| DENY3["❌ 拒绝访问"] AUTHZ -->|"允许"| ALLOW["✅ 允许访问"] AUTHZ -->|"拒绝"| DENY4["❌ 拒绝访问"] ALLOW --> AUDIT["📝 记录审计日志"] DENY1 --> AUDIT DENY2 --> AUDIT DENY3 --> AUDIT DENY4 --> AUDIT style ALLOW fill:#00cc00,stroke:#009900,color:#fff style DENY1 fill:#cc0000,stroke:#990000,color:#fff style DENY2 fill:#cc0000,stroke:#990000,color:#fff style DENY3 fill:#cc0000,stroke:#990000,color:#fff style DENY4 fill:#cc0000,stroke:#990000,color:#fff ``` ## 23.4 零信任五大支柱 ### 身份(Identity) ```{mermaid} flowchart LR subgraph identity["身份类型"] HUMAN["👤 人类身份
OIDC + MFA + Passkey"] WORKLOAD["⚙️ 工作负载身份
SPIFFE / SPIRE"] DEVICE["📱 设备身份
MDM + 设备证书"] end subgraph verification["持续验证"] LOGIN["🔑 登录时验证"] REQUEST["🔄 每次请求验证"] ANOMALY["⚠️ 异常行为重新验证"] end identity --> LOGIN --> REQUEST --> ANOMALY ANOMALY -->|"重新认证"| LOGIN ``` ### 设备(Device) | 信任等级 | 条件 | 允许访问 | |---------|------|---------| | 完全信任 | 公司设备 + 合规 + 最新补丁 | 所有资源 | | 部分信任 | 公司设备 + 部分合规 | 非敏感资源 | | 低信任 | 个人设备 + MDM | 公开资源 | | 不信任 | 未知设备 | 仅公开信息 | ### 网络(Network) ```{mermaid} flowchart TB subgraph flat["传统扁平网络"] direction LR SA1["Service A"] <-->|"明文"| SB1["Service B"] SA1 <-->|"明文"| DB1["Database"] SB1 <-->|"明文"| DB1 end subgraph micro["微分段网络"] direction TB SA2["Service A"] -->|"mTLS"| SB2["Service B"] SB2 -->|"mTLS"| DB2["Database"] SA2 -.-x|"❌ 禁止直连"| DB2 end style flat fill:#ffe0e0,stroke:#cc0000 style micro fill:#e0ffe0,stroke:#00cc00 ``` ## 23.5 BeyondCorp — Google 的零信任实践 ```{mermaid} flowchart LR EMP["👤 员工
任意网络"] AP["🛡️ Access Proxy
(IAP)"] PE["🧠 策略引擎"] RES["📦 特定资源"] EMP -->|"HTTPS 请求"| AP AP -->|"请求决策"| PE PE -->|"允许"| AP AP -->|"转发"| RES subgraph checks["Access Proxy 检查"] C1["🔑 用户身份"] C2["📱 设备状态"] C3["🌐 网络位置"] C4["📋 请求上下文"] end subgraph decisions["策略引擎决策依据"] D1["👥 用户角色"] D2["📊 设备信任等级"] D3["🔒 资源敏感度"] D4["⏰ 时间 / 行为"] end AP --- checks PE --- decisions style AP fill:#ff9900,stroke:#cc6600,color:#fff style PE fill:#0066cc,stroke:#003366,color:#fff ``` ## 23.6 ZTNA vs VPN | 维度 | 传统 VPN | ZTNA | |------|---------|------| | 信任模型 | 连接后信任整个网络 | 每次请求验证 | | 访问粒度 | 网络级(IP/端口) | 应用级 | | 攻击面 | 大(整个内网暴露) | 小(仅授权的应用) | | 用户体验 | 需要客户端、延迟高 | 透明、低延迟 | | 横向移动 | 容易 | 困难 | | 可见性 | 低 | 高(每次访问都有日志) | ## 23.7 零信任成熟度模型 | 等级 | 名称 | 身份 | 设备 | 网络 | 应用 | 数据 | 可见性与分析 | |------|------|------|------|------|------|------|-------------| | **Level 0** | 无意识 | 密码认证,无 MFA | 无设备管理 | 扁平网络 | 无应用安全 | 无分类 | 无集中日志 | | **Level 1** | 传统 | 密码 + 基本 MFA | 基本 MDM | 边界防火墙 | WAF | 基本分类 | 基本日志 | | **Level 2** | 初级零信任 | SSO + MFA + RBAC | MDM + 合规检查 | 网络分段 | API Gateway | 加密存储 | SIEM 集中分析 | | **Level 3** | 高级零信任 | ABAC + 持续验证 + SPIFFE | 实时设备评估 | 微分段 + mTLS | 服务网格 + OPA | DLP + 标签化 | 行为分析 + UEBA | | **Level 4** | 最优化 | AI 风险评估 + 自适应认证 | 自动隔离风险设备 | 软件定义边界 | 自动策略生成 | 自动分类 + 追踪 | AI 驱动 + 自动响应 | ## 23.8 零信任实施路线图 ```{mermaid} gantt title 零信任实施路线图(24个月) dateFormat YYYY-MM axisFormat %Y-%m section 第一阶段:身份统一 部署 OIDC IdP(Keycloak) :a1, 2025-01, 2m 全面启用 MFA :a2, after a1, 2m 部署 SPIFFE/SPIRE(工作负载身份) :a3, after a1, 3m 统一身份目录 :a4, after a2, 2m section 第二阶段:微分段 部署 Service Mesh(Istio) :b1, 2025-07, 3m 启用自动 mTLS :b2, after b1, 2m 实施 Network Policy :b3, after b1, 2m 最小权限网络策略 :b4, after b2, 2m section 第三阶段:持续验证 部署细粒度授权(OpenFGA + OPA) :c1, 2026-01, 3m 设备信任评估 :c2, after c1, 2m 行为分析和异常检测 :c3, after c1, 3m 自适应认证 :c4, after c2, 2m section 第四阶段:自动化响应 SOAR 自动化响应 :d1, 2026-07, 2m AI 驱动的风险评估 :d2, after d1, 2m 实时策略调整 :d3, after d1, 3m 持续合规监控 :d4, after d2, 2m ``` ## 23.9 代码示例 ### Python: 基于 FastAPI 的零信任 API Gateway ```python """ 零信任 API Gateway — 基于 FastAPI 功能: - 设备信任评估(设备指纹 + 合规状态) - 持续验证中间件(每次请求验证身份 + 上下文) - 风险评分计算 - 审计日志 依赖安装: pip install fastapi uvicorn pyjwt httpx cryptography """ import hashlib import json import logging import time from dataclasses import dataclass, field from datetime import datetime, timezone from enum import Enum from typing import Optional import httpx import jwt from fastapi import FastAPI, Request, Response, HTTPException, Depends from fastapi.middleware.base import BaseHTTPMiddleware from starlette.middleware.base import RequestResponseEndpoint # ─── 日志配置 ─────────────────────────────────────────────── logging.basicConfig( level=logging.INFO, format="%(asctime)s [%(levelname)s] %(name)s: %(message)s", ) logger = logging.getLogger("zt-gateway") app = FastAPI(title="Zero Trust API Gateway", version="1.0.0") # ─── 数据模型 ─────────────────────────────────────────────── class TrustLevel(str, Enum): """设备信任等级""" FULL = "full" # 完全信任:公司设备 + 合规 + 最新补丁 PARTIAL = "partial" # 部分信任:公司设备 + 部分合规 LOW = "low" # 低信任:个人设备 + MDM UNTRUSTED = "untrusted" # 不信任:未知设备 class RiskLevel(str, Enum): """风险等级""" LOW = "low" MEDIUM = "medium" HIGH = "high" CRITICAL = "critical" @dataclass class DeviceInfo: """设备信息""" device_id: str fingerprint: str os: str = "" os_version: str = "" is_managed: bool = False is_compliant: bool = False last_patch_date: Optional[str] = None trust_level: TrustLevel = TrustLevel.UNTRUSTED @dataclass class RequestContext: """请求上下文 — 用于零信任决策""" user_id: str = "" user_roles: list = field(default_factory=list) device: Optional[DeviceInfo] = None source_ip: str = "" geo_location: str = "" request_time: str = "" risk_score: float = 0.0 risk_level: RiskLevel = RiskLevel.LOW # ─── 设备信任评估 ─────────────────────────────────────────── class DeviceTrustEvaluator: """ 设备信任评估器 根据设备属性计算信任等级 """ # 已知的公司设备 ID 列表(生产环境应从 MDM 系统查询) MANAGED_DEVICES = {"device-001", "device-002", "device-003"} @classmethod def evaluate(cls, request: Request) -> DeviceInfo: """从请求头中提取设备信息并评估信任等级""" # 提取设备信息(实际场景中来自 MDM agent 或设备证书) device_id = request.headers.get("X-Device-ID", "unknown") user_agent = request.headers.get("User-Agent", "") device_cert = request.headers.get("X-Device-Cert", "") # 计算设备指纹 fingerprint_data = f"{device_id}:{user_agent}:{device_cert}" fingerprint = hashlib.sha256(fingerprint_data.encode()).hexdigest()[:16] device = DeviceInfo( device_id=device_id, fingerprint=fingerprint, os=request.headers.get("X-Device-OS", "unknown"), os_version=request.headers.get("X-Device-OS-Version", ""), is_managed=device_id in cls.MANAGED_DEVICES, is_compliant=request.headers.get("X-Device-Compliant", "false") == "true", last_patch_date=request.headers.get("X-Device-Last-Patch", None), ) # 评估信任等级 device.trust_level = cls._calculate_trust_level(device) return device @classmethod def _calculate_trust_level(cls, device: DeviceInfo) -> TrustLevel: """根据设备属性计算信任等级""" if not device.is_managed: return TrustLevel.UNTRUSTED if device.is_compliant and device.last_patch_date: # 检查补丁是否在 30 天内 try: patch_date = datetime.fromisoformat(device.last_patch_date) days_since_patch = (datetime.now(timezone.utc) - patch_date).days if days_since_patch <= 30: return TrustLevel.FULL except ValueError: pass return TrustLevel.PARTIAL if device.is_compliant: return TrustLevel.PARTIAL return TrustLevel.LOW # ─── 风险评分计算 ─────────────────────────────────────────── class RiskScorer: """ 风险评分计算器 综合多个信号计算请求的风险分数(0.0 - 1.0) """ # 高风险地理位置(示例) HIGH_RISK_GEOS = {"CN-UNKNOWN", "RU-UNKNOWN", "KP"} # 高风险时间段(UTC 小时) UNUSUAL_HOURS = set(range(0, 6)) # 凌晨 0-6 点 @classmethod def calculate(cls, ctx: RequestContext) -> tuple[float, RiskLevel]: """计算风险分数和风险等级""" score = 0.0 # 1. 设备信任等级(权重 0.3) device_scores = { TrustLevel.FULL: 0.0, TrustLevel.PARTIAL: 0.1, TrustLevel.LOW: 0.2, TrustLevel.UNTRUSTED: 0.3, } if ctx.device: score += device_scores.get(ctx.device.trust_level, 0.3) # 2. 地理位置风险(权重 0.25) if ctx.geo_location in cls.HIGH_RISK_GEOS: score += 0.25 elif not ctx.geo_location: score += 0.15 # 无法确定位置也有风险 # 3. 时间异常(权重 0.2) try: req_hour = datetime.fromisoformat(ctx.request_time).hour if req_hour in cls.UNUSUAL_HOURS: score += 0.2 except (ValueError, AttributeError): score += 0.1 # 4. 无角色信息(权重 0.15) if not ctx.user_roles: score += 0.15 # 5. 未知用户(权重 0.1) if not ctx.user_id: score += 0.1 # 归一化到 [0, 1] score = min(score, 1.0) # 确定风险等级 if score >= 0.7: level = RiskLevel.CRITICAL elif score >= 0.5: level = RiskLevel.HIGH elif score >= 0.3: level = RiskLevel.MEDIUM else: level = RiskLevel.LOW return score, level # ─── JWT 验证 ────────────────────────────────────────────── class JWTVerifier: """JWT 令牌验证器""" # 生产环境应从 JWKS 端点动态获取公钥 SECRET_KEY = "your-256-bit-secret-for-demo-only" ALGORITHM = "HS256" ISSUER = "https://auth.example.com" @classmethod def verify(cls, token: str) -> dict: """ 验证 JWT 令牌并返回 claims Raises: HTTPException 如果验证失败 """ try: payload = jwt.decode( token, cls.SECRET_KEY, algorithms=[cls.ALGORITHM], issuer=cls.ISSUER, options={"require": ["exp", "sub", "iss"]}, ) return payload except jwt.ExpiredSignatureError: raise HTTPException(status_code=401, detail="Token has expired") except jwt.InvalidIssuerError: raise HTTPException(status_code=401, detail="Invalid token issuer") except jwt.InvalidTokenError as e: raise HTTPException(status_code=401, detail=f"Invalid token: {e}") # ─── 审计日志 ────────────────────────────────────────────── class AuditLogger: """零信任审计日志记录器""" @staticmethod def log_access( ctx: RequestContext, path: str, method: str, decision: str, reason: str = "", status_code: int = 200, ): """记录访问审计日志(结构化 JSON)""" entry = { "timestamp": datetime.now(timezone.utc).isoformat(), "event_type": "access_decision", "user_id": ctx.user_id, "source_ip": ctx.source_ip, "device_id": ctx.device.device_id if ctx.device else "unknown", "device_trust": ctx.device.trust_level.value if ctx.device else "unknown", "path": path, "method": method, "risk_score": ctx.risk_score, "risk_level": ctx.risk_level.value, "decision": decision, "reason": reason, "status_code": status_code, } logger.info(f"AUDIT: {json.dumps(entry)}") # ─── 零信任中间件 ────────────────────────────────────────── class ZeroTrustMiddleware(BaseHTTPMiddleware): """ 零信任持续验证中间件 每个请求都经过:身份验证 → 设备评估 → 风险计算 → 授权决策 """ # 不需要认证的路径 PUBLIC_PATHS = {"/health", "/metrics", "/docs", "/openapi.json"} # 资源敏感度映射 RESOURCE_SENSITIVITY = { "/api/admin": "high", "/api/users": "medium", "/api/public": "low", } async def dispatch( self, request: Request, call_next: RequestResponseEndpoint ) -> Response: start_time = time.time() # 1. 公开路径直接放行 if request.url.path in self.PUBLIC_PATHS: return await call_next(request) # 2. 构建请求上下文 ctx = RequestContext( source_ip=request.client.host if request.client else "unknown", geo_location=request.headers.get("X-Geo-Location", ""), request_time=datetime.now(timezone.utc).isoformat(), ) # 3. 身份验证(JWT) auth_header = request.headers.get("Authorization", "") if not auth_header.startswith("Bearer "): AuditLogger.log_access( ctx, request.url.path, request.method, decision="DENY", reason="missing_token", status_code=401, ) raise HTTPException(status_code=401, detail="Missing Bearer token") token = auth_header[7:] claims = JWTVerifier.verify(token) ctx.user_id = claims.get("sub", "") ctx.user_roles = claims.get("roles", []) # 4. 设备信任评估 ctx.device = DeviceTrustEvaluator.evaluate(request) # 5. 风险评分计算 ctx.risk_score, ctx.risk_level = RiskScorer.calculate(ctx) # 6. 基于风险的访问决策 sensitivity = self._get_resource_sensitivity(request.url.path) if ctx.risk_level == RiskLevel.CRITICAL: AuditLogger.log_access( ctx, request.url.path, request.method, decision="DENY", reason="critical_risk", status_code=403, ) raise HTTPException( status_code=403, detail="Access denied: risk level too high", ) if ctx.risk_level == RiskLevel.HIGH and sensitivity == "high": AuditLogger.log_access( ctx, request.url.path, request.method, decision="DENY", reason="high_risk_sensitive_resource", status_code=403, ) raise HTTPException( status_code=403, detail="Access denied: high risk for sensitive resource. " "Please use a managed device or contact IT.", ) if ( ctx.device and ctx.device.trust_level == TrustLevel.UNTRUSTED and sensitivity != "low" ): AuditLogger.log_access( ctx, request.url.path, request.method, decision="DENY", reason="untrusted_device", status_code=403, ) raise HTTPException( status_code=403, detail="Access denied: untrusted device cannot access this resource", ) # 7. 将上下文注入请求 state,供下游使用 request.state.zt_context = ctx # 8. 执行请求 response = await call_next(request) # 9. 记录审计日志 duration_ms = (time.time() - start_time) * 1000 AuditLogger.log_access( ctx, request.url.path, request.method, decision="ALLOW", reason="passed_all_checks", status_code=response.status_code, ) # 10. 添加零信任响应头 response.headers["X-ZT-Risk-Score"] = str(round(ctx.risk_score, 2)) response.headers["X-ZT-Risk-Level"] = ctx.risk_level.value response.headers["X-ZT-Device-Trust"] = ( ctx.device.trust_level.value if ctx.device else "unknown" ) response.headers["X-ZT-Duration-Ms"] = str(round(duration_ms, 2)) return response def _get_resource_sensitivity(self, path: str) -> str: """获取资源敏感度""" for prefix, sensitivity in self.RESOURCE_SENSITIVITY.items(): if path.startswith(prefix): return sensitivity return "medium" # 默认中等敏感度 # ─── 注册中间件 ──────────────────────────────────────────── app.add_middleware(ZeroTrustMiddleware) # ─── API 端点 ────────────────────────────────────────────── @app.get("/health") async def health(): """健康检查(公开端点)""" return {"status": "healthy", "service": "zt-gateway"} @app.get("/api/public/info") async def public_info(request: Request): """公开信息(低敏感度)""" ctx: RequestContext = request.state.zt_context return { "message": "Public information", "your_risk_score": ctx.risk_score, "device_trust": ctx.device.trust_level.value if ctx.device else "unknown", } @app.get("/api/users/me") async def get_current_user(request: Request): """获取当前用户信息(中等敏感度)""" ctx: RequestContext = request.state.zt_context return { "user_id": ctx.user_id, "roles": ctx.user_roles, "device_trust": ctx.device.trust_level.value if ctx.device else "unknown", "risk_level": ctx.risk_level.value, } @app.get("/api/admin/dashboard") async def admin_dashboard(request: Request): """管理面板(高敏感度,需要 admin 角色)""" ctx: RequestContext = request.state.zt_context if "admin" not in ctx.user_roles: AuditLogger.log_access( ctx, request.url.path, request.method, decision="DENY", reason="insufficient_role", status_code=403, ) raise HTTPException(status_code=403, detail="Admin role required") return { "message": "Welcome to admin dashboard", "user_id": ctx.user_id, "risk_score": ctx.risk_score, } if __name__ == "__main__": import uvicorn uvicorn.run(app, host="0.0.0.0", port=8080) ``` ### Java: Spring Cloud Gateway 零信任过滤器 ```java /** * 零信任 Spring Cloud Gateway 过滤器 * 功能:JWT 验证 + 设备指纹 + 风险评分 * * 依赖(build.gradle): * implementation 'org.springframework.cloud:spring-cloud-starter-gateway' * implementation 'io.jsonwebtoken:jjwt-api:0.12.5' * runtimeOnly 'io.jsonwebtoken:jjwt-impl:0.12.5' * runtimeOnly 'io.jsonwebtoken:jjwt-jackson:0.12.5' */ package com.example.zerotrust.gateway; import io.jsonwebtoken.Claims; import io.jsonwebtoken.JwtException; import io.jsonwebtoken.Jwts; import io.jsonwebtoken.security.Keys; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.cloud.gateway.filter.GatewayFilterChain; import org.springframework.cloud.gateway.filter.GlobalFilter; import org.springframework.core.Ordered; import org.springframework.http.HttpHeaders; import org.springframework.http.HttpStatus; import org.springframework.http.server.reactive.ServerHttpRequest; import org.springframework.stereotype.Component; import org.springframework.web.server.ServerWebExchange; import reactor.core.publisher.Mono; import javax.crypto.SecretKey; import java.nio.charset.StandardCharsets; import java.security.MessageDigest; import java.security.NoSuchAlgorithmException; import java.time.Instant; import java.time.LocalTime; import java.util.*; /** * 零信任全局过滤器 * 对每个请求执行:身份验证 → 设备评估 → 风险计算 → 访问决策 */ @Component public class ZeroTrustGatewayFilter implements GlobalFilter, Ordered { private static final Logger log = LoggerFactory.getLogger(ZeroTrustGatewayFilter.class); // 公开路径(不需要认证) private static final Set PUBLIC_PATHS = Set.of( "/health", "/metrics", "/actuator/health" ); // JWT 密钥(生产环境应使用 RSA/EC 公钥,从 JWKS 端点获取) private static final SecretKey JWT_KEY = Keys.hmacShaKeyFor( "your-256-bit-secret-key-for-demo-only!!".getBytes(StandardCharsets.UTF_8) ); // 已知的公司管理设备 private static final Set MANAGED_DEVICES = Set.of( "device-001", "device-002", "device-003" ); @Override public int getOrder() { return -1; // 最高优先级 } @Override public Mono filter(ServerWebExchange exchange, GatewayFilterChain chain) { ServerHttpRequest request = exchange.getRequest(); String path = request.getURI().getPath(); String method = request.getMethod().name(); // 1. 公开路径直接放行 if (PUBLIC_PATHS.contains(path)) { return chain.filter(exchange); } // 2. 提取并验证 JWT String authHeader = request.getHeaders().getFirst(HttpHeaders.AUTHORIZATION); if (authHeader == null || !authHeader.startsWith("Bearer ")) { log.warn("AUDIT: DENY path={} reason=missing_token ip={}", path, getClientIp(request)); return rejectRequest(exchange, HttpStatus.UNAUTHORIZED, "Missing Bearer token"); } Claims claims; try { claims = Jwts.parser() .verifyWith(JWT_KEY) .requireIssuer("https://auth.example.com") .build() .parseSignedClaims(authHeader.substring(7)) .getPayload(); } catch (JwtException e) { log.warn("AUDIT: DENY path={} reason=invalid_token error={}", path, e.getMessage()); return rejectRequest(exchange, HttpStatus.UNAUTHORIZED, "Invalid token: " + e.getMessage()); } String userId = claims.getSubject(); @SuppressWarnings("unchecked") List roles = claims.get("roles", List.class); if (roles == null) roles = Collections.emptyList(); // 3. 设备信任评估 DeviceInfo device = evaluateDevice(request); // 4. 风险评分计算 RiskAssessment risk = calculateRisk(device, request, roles); // 5. 访问决策 String sensitivity = getResourceSensitivity(path); if (risk.score >= 0.7) { log.warn("AUDIT: DENY user={} path={} risk={} reason=critical_risk device={}", userId, path, risk.score, device.deviceId); return rejectRequest(exchange, HttpStatus.FORBIDDEN, "Access denied: risk level too high"); } if (risk.score >= 0.5 && "high".equals(sensitivity)) { log.warn("AUDIT: DENY user={} path={} risk={} reason=high_risk_sensitive", userId, path, risk.score); return rejectRequest(exchange, HttpStatus.FORBIDDEN, "Access denied: high risk for sensitive resource"); } if (device.trustLevel == TrustLevel.UNTRUSTED && !"low".equals(sensitivity)) { log.warn("AUDIT: DENY user={} path={} reason=untrusted_device device={}", userId, path, device.deviceId); return rejectRequest(exchange, HttpStatus.FORBIDDEN, "Access denied: untrusted device"); } // 6. 记录审计日志 log.info("AUDIT: ALLOW user={} path={} method={} risk={} device_trust={} ip={}", userId, path, method, risk.score, device.trustLevel, getClientIp(request)); // 7. 将零信任上下文注入下游请求头 ServerHttpRequest mutatedRequest = request.mutate() .header("X-ZT-User-Id", userId) .header("X-ZT-User-Roles", String.join(",", roles)) .header("X-ZT-Device-Trust", device.trustLevel.name()) .header("X-ZT-Device-Fingerprint", device.fingerprint) .header("X-ZT-Risk-Score", String.valueOf(risk.score)) .header("X-ZT-Risk-Level", risk.level) .build(); return chain.filter(exchange.mutate().request(mutatedRequest).build()); } // ─── 设备信任评估 ────────────────────────────────────── private DeviceInfo evaluateDevice(ServerHttpRequest request) { String deviceId = getHeader(request, "X-Device-ID", "unknown"); String userAgent = getHeader(request, "User-Agent", ""); String deviceCert = getHeader(request, "X-Device-Cert", ""); boolean isCompliant = "true".equals(getHeader(request, "X-Device-Compliant", "false")); boolean isManaged = MANAGED_DEVICES.contains(deviceId); // 计算设备指纹 String fingerprint = computeFingerprint(deviceId, userAgent, deviceCert); // 确定信任等级 TrustLevel trustLevel; if (!isManaged) { trustLevel = TrustLevel.UNTRUSTED; } else if (isCompliant) { trustLevel = TrustLevel.FULL; } else { trustLevel = TrustLevel.PARTIAL; } return new DeviceInfo(deviceId, fingerprint, isManaged, isCompliant, trustLevel); } // ─── 风险评分 ────────────────────────────────────────── private RiskAssessment calculateRisk( DeviceInfo device, ServerHttpRequest request, List roles) { double score = 0.0; // 设备信任(权重 0.3) switch (device.trustLevel) { case UNTRUSTED -> score += 0.3; case LOW -> score += 0.2; case PARTIAL -> score += 0.1; case FULL -> score += 0.0; } // 时间异常(权重 0.2)— 凌晨 0-6 点 int hour = LocalTime.now().getHour(); if (hour >= 0 && hour < 6) { score += 0.2; } // 无角色(权重 0.15) if (roles.isEmpty()) { score += 0.15; } // 地理位置风险(权重 0.25) String geo = getHeader(request, "X-Geo-Location", ""); if (geo.isEmpty()) { score += 0.15; } score = Math.min(score, 1.0); String level; if (score >= 0.7) level = "critical"; else if (score >= 0.5) level = "high"; else if (score >= 0.3) level = "medium"; else level = "low"; return new RiskAssessment(Math.round(score * 100.0) / 100.0, level); } // ─── 辅助方法 ────────────────────────────────────────── private String getResourceSensitivity(String path) { if (path.startsWith("/api/admin")) return "high"; if (path.startsWith("/api/users")) return "medium"; if (path.startsWith("/api/public")) return "low"; return "medium"; } private Mono rejectRequest( ServerWebExchange exchange, HttpStatus status, String message) { exchange.getResponse().setStatusCode(status); exchange.getResponse().getHeaders().add("Content-Type", "application/json"); byte[] body = ("{\"error\":\"" + message + "\"}").getBytes(StandardCharsets.UTF_8); return exchange.getResponse().writeWith( Mono.just(exchange.getResponse().bufferFactory().wrap(body)) ); } private String getClientIp(ServerHttpRequest request) { String xff = request.getHeaders().getFirst("X-Forwarded-For"); if (xff != null && !xff.isEmpty()) { return xff.split(",")[0].trim(); } return request.getRemoteAddress() != null ? request.getRemoteAddress().getAddress().getHostAddress() : "unknown"; } private String getHeader(ServerHttpRequest request, String name, String defaultValue) { String value = request.getHeaders().getFirst(name); return value != null ? value : defaultValue; } private String computeFingerprint(String deviceId, String userAgent, String cert) { try { MessageDigest md = MessageDigest.getInstance("SHA-256"); md.update((deviceId + ":" + userAgent + ":" + cert).getBytes(StandardCharsets.UTF_8)); byte[] hash = md.digest(); StringBuilder sb = new StringBuilder(); for (int i = 0; i < 8; i++) { sb.append(String.format("%02x", hash[i])); } return sb.toString(); } catch (NoSuchAlgorithmException e) { return "unknown"; } } // ─── 内部数据类 ──────────────────────────────────────── enum TrustLevel { FULL, PARTIAL, LOW, UNTRUSTED } record DeviceInfo( String deviceId, String fingerprint, boolean isManaged, boolean isCompliant, TrustLevel trustLevel ) {} record RiskAssessment(double score, String level) {} } ``` ### Go: 零信任 Proxy(mTLS + OPA 策略检查 + 审计日志) ```go /* 零信任反向代理 — Go 实现 功能: - mTLS 双向认证(验证客户端证书) - OPA 策略检查(通过 HTTP API 调用 OPA) - 结构化审计日志(JSON 格式) - 设备信任评估 - 请求上下文传播 运行方式: go run main.go \ --listen=:8443 \ --backend=http://localhost:8080 \ --opa-url=http://localhost:8181 \ --tls-cert=server.crt \ --tls-key=server.key \ --ca-cert=ca.crt */ package main import ( "bytes" "context" "crypto/sha256" "crypto/tls" "crypto/x509" "encoding/json" "flag" "fmt" "io" "log/slog" "net/http" "net/http/httputil" "net/url" "os" "os/signal" "strings" "syscall" "time" ) // ─── 配置 ────────────────────────────────────────────────── type Config struct { ListenAddr string BackendURL string OPAUrl string TLSCert string TLSKey string CACert string } func parseFlags() *Config { cfg := &Config{} flag.StringVar(&cfg.ListenAddr, "listen", ":8443", "Listen address") flag.StringVar(&cfg.BackendURL, "backend", "http://localhost:8080", "Backend URL") flag.StringVar(&cfg.OPAUrl, "opa-url", "http://localhost:8181", "OPA server URL") flag.StringVar(&cfg.TLSCert, "tls-cert", "server.crt", "TLS certificate file") flag.StringVar(&cfg.TLSKey, "tls-key", "server.key", "TLS private key file") flag.StringVar(&cfg.CACert, "ca-cert", "ca.crt", "CA certificate for client verification") flag.Parse() return cfg } // ─── 数据模型 ────────────────────────────────────────────── // TrustLevel 表示设备信任等级 type TrustLevel string const ( TrustFull TrustLevel = "full" TrustPartial TrustLevel = "partial" TrustLow TrustLevel = "low" TrustUntrusted TrustLevel = "untrusted" ) // RequestContext 零信任请求上下文 type RequestContext struct { UserID string `json:"user_id"` ClientCertSAN string `json:"client_cert_san"` DeviceID string `json:"device_id"` DeviceFingerprint string `json:"device_fingerprint"` DeviceTrust TrustLevel `json:"device_trust"` SourceIP string `json:"source_ip"` Path string `json:"path"` Method string `json:"method"` RiskScore float64 `json:"risk_score"` Timestamp time.Time `json:"timestamp"` } // AuditEntry 审计日志条目 type AuditEntry struct { Timestamp string `json:"timestamp"` EventType string `json:"event_type"` UserID string `json:"user_id"` CertSAN string `json:"cert_san"` DeviceID string `json:"device_id"` DeviceTrust TrustLevel `json:"device_trust"` SourceIP string `json:"source_ip"` Path string `json:"path"` Method string `json:"method"` Decision string `json:"decision"` Reason string `json:"reason"` RiskScore float64 `json:"risk_score"` DurationMs float64 `json:"duration_ms"` StatusCode int `json:"status_code"` } // OPAInput OPA 策略查询输入 type OPAInput struct { Input struct { User string `json:"user"` CertSAN string `json:"cert_san"` Path string `json:"path"` Method string `json:"method"` DeviceTrust TrustLevel `json:"device_trust"` SourceIP string `json:"source_ip"` RiskScore float64 `json:"risk_score"` } `json:"input"` } // OPAResult OPA 策略查询结果 type OPAResult struct { Result struct { Allow bool `json:"allow"` Reason string `json:"reason,omitempty"` } `json:"result"` } // ─── 审计日志记录器 ──────────────────────────────────────── type AuditLogger struct { logger *slog.Logger } func NewAuditLogger() *AuditLogger { // 使用 JSON 格式输出到 stdout handler := slog.NewJSONHandler(os.Stdout, &slog.HandlerOptions{ Level: slog.LevelInfo, }) return &AuditLogger{ logger: slog.New(handler), } } func (a *AuditLogger) Log(entry AuditEntry) { a.logger.Info("access_decision", "event_type", entry.EventType, "user_id", entry.UserID, "cert_san", entry.CertSAN, "device_id", entry.DeviceID, "device_trust", entry.DeviceTrust, "source_ip", entry.SourceIP, "path", entry.Path, "method", entry.Method, "decision", entry.Decision, "reason", entry.Reason, "risk_score", entry.RiskScore, "duration_ms", entry.DurationMs, "status_code", entry.StatusCode, ) } // ─── 设备信任评估 ────────────────────────────────────────── // 已知的公司管理设备(生产环境应查询 MDM 系统) var managedDevices = map[string]bool{ "device-001": true, "device-002": true, "device-003": true, } func evaluateDeviceTrust(r *http.Request) (string, string, TrustLevel) { deviceID := r.Header.Get("X-Device-ID") if deviceID == "" { deviceID = "unknown" } userAgent := r.Header.Get("User-Agent") isCompliant := r.Header.Get("X-Device-Compliant") == "true" // 计算设备指纹 h := sha256.New() h.Write([]byte(fmt.Sprintf("%s:%s", deviceID, userAgent))) fingerprint := fmt.Sprintf("%x", h.Sum(nil))[:16] // 评估信任等级 isManaged := managedDevices[deviceID] var trust TrustLevel switch { case !isManaged: trust = TrustUntrusted case isManaged && isCompliant: trust = TrustFull case isManaged: trust = TrustPartial default: trust = TrustLow } return deviceID, fingerprint, trust } // ─── OPA 策略检查 ────────────────────────────────────────── type OPAChecker struct { baseURL string httpClient *http.Client } func NewOPAChecker(baseURL string) *OPAChecker { return &OPAChecker{ baseURL: baseURL, httpClient: &http.Client{ Timeout: 2 * time.Second, }, } } // Check 向 OPA 发送策略查询 func (o *OPAChecker) Check(ctx context.Context, reqCtx *RequestContext) (bool, string, error) { // 构建 OPA 输入 input := OPAInput{} input.Input.User = reqCtx.UserID input.Input.CertSAN = reqCtx.ClientCertSAN input.Input.Path = reqCtx.Path input.Input.Method = reqCtx.Method input.Input.DeviceTrust = reqCtx.DeviceTrust input.Input.SourceIP = reqCtx.SourceIP input.Input.RiskScore = reqCtx.RiskScore body, err := json.Marshal(input) if err != nil { return false, "marshal_error", fmt.Errorf("failed to marshal OPA input: %w", err) } // 发送请求到 OPA opaURL := fmt.Sprintf("%s/v1/data/zerotrust/authz", o.baseURL) req, err := http.NewRequestWithContext(ctx, http.MethodPost, opaURL, bytes.NewReader(body)) if err != nil { return false, "request_error", fmt.Errorf("failed to create OPA request: %w", err) } req.Header.Set("Content-Type", "application/json") resp, err := o.httpClient.Do(req) if err != nil { // OPA 不可用时,默认拒绝(fail-close) return false, "opa_unavailable", fmt.Errorf("OPA request failed: %w", err) } defer resp.Body.Close() if resp.StatusCode != http.StatusOK { respBody, _ := io.ReadAll(resp.Body) return false, "opa_error", fmt.Errorf("OPA returned status %d: %s", resp.StatusCode, respBody) } var result OPAResult if err := json.NewDecoder(resp.Body).Decode(&result); err != nil { return false, "decode_error", fmt.Errorf("failed to decode OPA response: %w", err) } return result.Result.Allow, result.Result.Reason, nil } // ─── 零信任代理 ──────────────────────────────────────────── type ZeroTrustProxy struct { proxy *httputil.ReverseProxy opaChecker *OPAChecker audit *AuditLogger } func NewZeroTrustProxy(backendURL string, opaURL string) (*ZeroTrustProxy, error) { target, err := url.Parse(backendURL) if err != nil { return nil, fmt.Errorf("invalid backend URL: %w", err) } proxy := httputil.NewSingleHostReverseProxy(target) // 自定义错误处理 proxy.ErrorHandler = func(w http.ResponseWriter, r *http.Request, err error) { slog.Error("proxy error", "error", err, "path", r.URL.Path) http.Error(w, `{"error":"backend unavailable"}`, http.StatusBadGateway) } return &ZeroTrustProxy{ proxy: proxy, opaChecker: NewOPAChecker(opaURL), audit: NewAuditLogger(), }, nil } // ServeHTTP 处理每个请求的零信任验证 func (ztp *ZeroTrustProxy) ServeHTTP(w http.ResponseWriter, r *http.Request) { start := time.Now() // 1. 公开路径直接放行 if r.URL.Path == "/health" || r.URL.Path == "/metrics" { ztp.proxy.ServeHTTP(w, r) return } // 2. 提取客户端证书信息(mTLS) var certSAN string var userID string if r.TLS != nil && len(r.TLS.PeerCertificates) > 0 { cert := r.TLS.PeerCertificates[0] // 使用 SPIFFE ID 或 DNS SAN 作为身份 for _, uri := range cert.URIs { if strings.HasPrefix(uri.String(), "spiffe://") { certSAN = uri.String() break } } if certSAN == "" && len(cert.DNSNames) > 0 { certSAN = cert.DNSNames[0] } userID = cert.Subject.CommonName } else { // 没有客户端证书 → 拒绝 ztp.audit.Log(AuditEntry{ Timestamp: time.Now().UTC().Format(time.RFC3339), EventType: "access_decision", SourceIP: getClientIP(r), Path: r.URL.Path, Method: r.Method, Decision: "DENY", Reason: "no_client_certificate", }) http.Error(w, `{"error":"mTLS required: no client certificate"}`, http.StatusUnauthorized) return } // 3. 设备信任评估 deviceID, fingerprint, deviceTrust := evaluateDeviceTrust(r) // 4. 构建请求上下文 reqCtx := &RequestContext{ UserID: userID, ClientCertSAN: certSAN, DeviceID: deviceID, DeviceFingerprint: fingerprint, DeviceTrust: deviceTrust, SourceIP: getClientIP(r), Path: r.URL.Path, Method: r.Method, Timestamp: time.Now().UTC(), } // 5. OPA 策略检查 allowed, reason, err := ztp.opaChecker.Check(r.Context(), reqCtx) if err != nil { slog.Warn("OPA check failed, defaulting to deny", "error", err, "path", r.URL.Path, "user", userID) // fail-close: OPA 不可用时拒绝请求 ztp.audit.Log(AuditEntry{ Timestamp: time.Now().UTC().Format(time.RFC3339), EventType: "access_decision", UserID: userID, CertSAN: certSAN, DeviceID: deviceID, DeviceTrust: deviceTrust, SourceIP: getClientIP(r), Path: r.URL.Path, Method: r.Method, Decision: "DENY", Reason: fmt.Sprintf("opa_error: %v", err), DurationMs: float64(time.Since(start).Milliseconds()), }) http.Error(w, `{"error":"authorization service unavailable"}`, http.StatusForbidden) return } if !allowed { ztp.audit.Log(AuditEntry{ Timestamp: time.Now().UTC().Format(time.RFC3339), EventType: "access_decision", UserID: userID, CertSAN: certSAN, DeviceID: deviceID, DeviceTrust: deviceTrust, SourceIP: getClientIP(r), Path: r.URL.Path, Method: r.Method, Decision: "DENY", Reason: reason, DurationMs: float64(time.Since(start).Milliseconds()), StatusCode: http.StatusForbidden, }) http.Error(w, fmt.Sprintf(`{"error":"access denied","reason":"%s"}`, reason), http.StatusForbidden) return } // 6. 注入零信任上下文头到下游请求 r.Header.Set("X-ZT-User-Id", userID) r.Header.Set("X-ZT-Cert-SAN", certSAN) r.Header.Set("X-ZT-Device-ID", deviceID) r.Header.Set("X-ZT-Device-Fingerprint", fingerprint) r.Header.Set("X-ZT-Device-Trust", string(deviceTrust)) r.Header.Set("X-ZT-Source-IP", getClientIP(r)) // 7. 转发请求到后端 recorder := &statusRecorder{ResponseWriter: w, statusCode: http.StatusOK} ztp.proxy.ServeHTTP(recorder, r) // 8. 记录审计日志 duration := time.Since(start) ztp.audit.Log(AuditEntry{ Timestamp: time.Now().UTC().Format(time.RFC3339), EventType: "access_decision", UserID: userID, CertSAN: certSAN, DeviceID: deviceID, DeviceTrust: deviceTrust, SourceIP: getClientIP(r), Path: r.URL.Path, Method: r.Method, Decision: "ALLOW", Reason: reason, RiskScore: reqCtx.RiskScore, DurationMs: float64(duration.Milliseconds()), StatusCode: recorder.statusCode, }) } // statusRecorder 用于捕获响应状态码 type statusRecorder struct { http.ResponseWriter statusCode int } func (sr *statusRecorder) WriteHeader(code int) { sr.statusCode = code sr.ResponseWriter.WriteHeader(code) } // getClientIP 提取客户端真实 IP func getClientIP(r *http.Request) string { if xff := r.Header.Get("X-Forwarded-For"); xff != "" { parts := strings.SplitN(xff, ",", 2) return strings.TrimSpace(parts[0]) } if host, _, err := strings.Cut(r.RemoteAddr, ":"); err { return host } return r.RemoteAddr } // ─── 主函数 ──────────────────────────────────────────────── func main() { cfg := parseFlags() // 创建零信任代理 proxy, err := NewZeroTrustProxy(cfg.BackendURL, cfg.OPAUrl) if err != nil { slog.Error("failed to create proxy", "error", err) os.Exit(1) } // 加载 CA 证书(用于验证客户端证书) caCert, err := os.ReadFile(cfg.CACert) if err != nil { slog.Error("failed to read CA cert", "error", err, "path", cfg.CACert) os.Exit(1) } caCertPool := x509.NewCertPool() if !caCertPool.AppendCertsFromPEM(caCert) { slog.Error("failed to parse CA cert") os.Exit(1) } // 配置 mTLS tlsConfig := &tls.Config{ ClientCAs: caCertPool, ClientAuth: tls.RequireAndVerifyClientCert, // 强制客户端证书 MinVersion: tls.VersionTLS13, // 最低 TLS 1.3 CipherSuites: []uint16{ tls.TLS_AES_256_GCM_SHA384, tls.TLS_CHACHA20_POLY1305_SHA256, }, } server := &http.Server{ Addr: cfg.ListenAddr, Handler: proxy, TLSConfig: tlsConfig, ReadTimeout: 30 * time.Second, WriteTimeout: 30 * time.Second, IdleTimeout: 120 * time.Second, } // 优雅关闭 go func() { sigCh := make(chan os.Signal, 1) signal.Notify(sigCh, syscall.SIGINT, syscall.SIGTERM) sig := <-sigCh slog.Info("received signal, shutting down", "signal", sig) ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) defer cancel() if err := server.Shutdown(ctx); err != nil { slog.Error("shutdown error", "error", err) } }() slog.Info("Zero Trust Proxy starting", "listen", cfg.ListenAddr, "backend", cfg.BackendURL, "opa", cfg.OPAUrl, ) if err := server.ListenAndServeTLS(cfg.TLSCert, cfg.TLSKey); err != http.ErrServerClosed { slog.Error("server error", "error", err) os.Exit(1) } slog.Info("server stopped gracefully") } ``` **配套 OPA 策略文件** (`zerotrust/authz.rego`): ```rego # OPA 零信任授权策略 # 路径: zerotrust/authz.rego package zerotrust.authz import rego.v1 default allow := false default reason := "denied_by_default" # 规则 1: 完全信任的设备可以访问所有资源 allow if { input.device_trust == "full" input.risk_score < 0.7 } reason := "full_trust_device" if { input.device_trust == "full" input.risk_score < 0.7 } # 规则 2: 部分信任的设备只能访问非管理端点 allow if { input.device_trust == "partial" not startswith(input.path, "/api/admin") input.risk_score < 0.5 } reason := "partial_trust_non_admin" if { input.device_trust == "partial" not startswith(input.path, "/api/admin") input.risk_score < 0.5 } # 规则 3: 只读操作对低信任设备开放公开端点 allow if { input.device_trust == "low" startswith(input.path, "/api/public") input.method == "GET" } reason := "low_trust_public_readonly" if { input.device_trust == "low" startswith(input.path, "/api/public") input.method == "GET" } # 规则 4: 不信任的设备一律拒绝 reason := "untrusted_device" if { input.device_trust == "untrusted" } # 规则 5: 高风险分数一律拒绝 reason := "high_risk_score" if { input.risk_score >= 0.7 } ``` ## 23.9 小结 - **零信任** 的核心是"永不信任,始终验证" - **NIST SP 800-207** 定义了标准的零信任架构(PE/PA/PEP) - 零信任有**五大支柱**:身份、设备、网络、应用、数据 - **BeyondCorp** 是 Google 的零信任实践,用 Access Proxy 替代 VPN - **ZTNA** 比 VPN 更安全、更细粒度、更好的用户体验 - 零信任实施是**渐进式**的,从身份统一开始,逐步深入