第二十三章:零信任架构

“永不信任,始终验证 — 这不是偏执,而是现代安全的基本原则。”

        mindmap
  root((零信任架构))
    核心原则
      永不信任
      假设入侵
      最小权限
      显式验证
    NIST 架构
      PE 策略引擎
      PA 策略管理
      PEP 执行点
    五大支柱
      身份
      设备
      网络
      应用
      数据
    实践
      BeyondCorp
      ZTNA
      成熟度模型
    

23.1 零信任的起源

2010 年,Forrester 的 John Kindervag 提出零信任模型,核心洞察:

传统的”内网可信、外网不可信”的假设已经过时。

        flowchart TB
    subgraph traditional["传统模型(城堡与护城河)"]
        direction TB
        FW["🔥 防火墙"]
        EXT["外网 = 不可信 ❌"]
        subgraph intranet["内网 = 可信 ✅"]
            DB1["🗄️ DB"]
            APP1["📦 App"]
            API1["🔌 API"]
            DB1 <--> APP1
            APP1 <--> API1
            DB1 <--> API1
        end
        EXT -->|"突破防火墙"| FW --> intranet
    end

    subgraph zt["零信任模型"]
        direction LR
        API2["🔌 API"] -->|"mTLS + AuthZ"| APP2["📦 App"]
        APP2 -->|"mTLS + AuthZ"| DB2["🗄️ DB"]
    end

    style traditional fill:#ffe0e0,stroke:#cc0000
    style zt fill:#e0ffe0,stroke:#00cc00
    style intranet fill:#fff0f0,stroke:#cc0000
    

23.2 核心原则

原则

含义

实践

Never Trust, Always Verify

不因位置或网络信任任何请求

每次请求都验证身份和权限

Assume Breach

假设攻击者已在网络内部

微分段、最小权限、监控

Least Privilege

只授予完成任务所需的最小权限

RBAC/ABAC、短期凭证

Verify Explicitly

基于所有可用数据做决策

身份+设备+位置+行为

23.3 NIST SP 800-207 架构

        flowchart TB
    subgraph subject["主体"]
        USER["👤 用户"]
        DEVICE["💻 设备"]
        WORKLOAD["⚙️ 工作负载"]
    end

    PEP["🛡️ PEP<br/>策略执行点<br/>Gateway / Sidecar"]
    PA["📋 PA<br/>策略管理员<br/>建立/断开通信路径"]
    PE["🧠 PE<br/>策略引擎<br/>做出访问决策"]

    subgraph resources["企业资源"]
        APP["📦 应用"]
        DATA["🗄️ 数据"]
    end

    subgraph datasources["数据源"]
        IDP["🔑 身份系统"]
        DEV_STATUS["📱 设备状态"]
        THREAT["🔍 威胁情报"]
        BEHAVIOR["📊 行为分析"]
        COMPLIANCE["📜 合规要求"]
    end

    USER --> PEP
    DEVICE --> PEP
    WORKLOAD --> PEP
    PEP -->|"允许/拒绝"| resources
    PEP <-->|"请求决策"| PA
    PA <-->|"获取策略"| PE
    datasources -->|"输入信号"| PE

    style PEP fill:#ff9900,stroke:#cc6600,color:#fff
    style PE fill:#0066cc,stroke:#003366,color:#fff
    style PA fill:#009933,stroke:#006622,color:#fff
    

零信任决策流程

        flowchart TD
    REQ["📨 请求到达 PEP"] --> AUTH{"身份验证<br/>通过?"}
    AUTH -->|"否"| DENY1["❌ 拒绝访问"]
    AUTH -->|"是"| DEV{"设备信任<br/>评估通过?"}
    DEV -->|"否"| DENY2["❌ 拒绝 / 降级访问"]
    DEV -->|"是"| CTX{"上下文风险<br/>评估"}
    CTX -->|"高风险"| MFA["🔐 要求 Step-up MFA"]
    CTX -->|"中风险"| LIMITED["⚠️ 有限访问"]
    CTX -->|"低风险"| AUTHZ{"授权检查<br/>RBAC/ABAC"}
    MFA -->|"通过"| AUTHZ
    MFA -->|"失败"| DENY3["❌ 拒绝访问"]
    AUTHZ -->|"允许"| ALLOW["✅ 允许访问"]
    AUTHZ -->|"拒绝"| DENY4["❌ 拒绝访问"]
    ALLOW --> AUDIT["📝 记录审计日志"]
    DENY1 --> AUDIT
    DENY2 --> AUDIT
    DENY3 --> AUDIT
    DENY4 --> AUDIT

    style ALLOW fill:#00cc00,stroke:#009900,color:#fff
    style DENY1 fill:#cc0000,stroke:#990000,color:#fff
    style DENY2 fill:#cc0000,stroke:#990000,color:#fff
    style DENY3 fill:#cc0000,stroke:#990000,color:#fff
    style DENY4 fill:#cc0000,stroke:#990000,color:#fff
    

23.4 零信任五大支柱

身份(Identity)

        flowchart LR
    subgraph identity["身份类型"]
        HUMAN["👤 人类身份<br/>OIDC + MFA + Passkey"]
        WORKLOAD["⚙️ 工作负载身份<br/>SPIFFE / SPIRE"]
        DEVICE["📱 设备身份<br/>MDM + 设备证书"]
    end

    subgraph verification["持续验证"]
        LOGIN["🔑 登录时验证"]
        REQUEST["🔄 每次请求验证"]
        ANOMALY["⚠️ 异常行为重新验证"]
    end

    identity --> LOGIN --> REQUEST --> ANOMALY
    ANOMALY -->|"重新认证"| LOGIN
    

设备(Device)

信任等级

条件

允许访问

完全信任

公司设备 + 合规 + 最新补丁

所有资源

部分信任

公司设备 + 部分合规

非敏感资源

低信任

个人设备 + MDM

公开资源

不信任

未知设备

仅公开信息

网络(Network)

        flowchart TB
    subgraph flat["传统扁平网络"]
        direction LR
        SA1["Service A"] <-->|"明文"| SB1["Service B"]
        SA1 <-->|"明文"| DB1["Database"]
        SB1 <-->|"明文"| DB1
    end

    subgraph micro["微分段网络"]
        direction TB
        SA2["Service A"] -->|"mTLS"| SB2["Service B"]
        SB2 -->|"mTLS"| DB2["Database"]
        SA2 -.-x|"❌ 禁止直连"| DB2
    end

    style flat fill:#ffe0e0,stroke:#cc0000
    style micro fill:#e0ffe0,stroke:#00cc00
    

23.5 BeyondCorp — Google 的零信任实践

        flowchart LR
    EMP["👤 员工<br/>任意网络"]
    AP["🛡️ Access Proxy<br/>(IAP)"]
    PE["🧠 策略引擎"]
    RES["📦 特定资源"]

    EMP -->|"HTTPS 请求"| AP
    AP -->|"请求决策"| PE
    PE -->|"允许"| AP
    AP -->|"转发"| RES

    subgraph checks["Access Proxy 检查"]
        C1["🔑 用户身份"]
        C2["📱 设备状态"]
        C3["🌐 网络位置"]
        C4["📋 请求上下文"]
    end

    subgraph decisions["策略引擎决策依据"]
        D1["👥 用户角色"]
        D2["📊 设备信任等级"]
        D3["🔒 资源敏感度"]
        D4["⏰ 时间 / 行为"]
    end

    AP --- checks
    PE --- decisions

    style AP fill:#ff9900,stroke:#cc6600,color:#fff
    style PE fill:#0066cc,stroke:#003366,color:#fff
    

23.6 ZTNA vs VPN

维度

传统 VPN

ZTNA

信任模型

连接后信任整个网络

每次请求验证

访问粒度

网络级(IP/端口)

应用级

攻击面

大(整个内网暴露)

小(仅授权的应用)

用户体验

需要客户端、延迟高

透明、低延迟

横向移动

容易

困难

可见性

高(每次访问都有日志)

23.7 零信任成熟度模型

等级

名称

身份

设备

网络

应用

数据

可见性与分析

Level 0

无意识

密码认证,无 MFA

无设备管理

扁平网络

无应用安全

无分类

无集中日志

Level 1

传统

密码 + 基本 MFA

基本 MDM

边界防火墙

WAF

基本分类

基本日志

Level 2

初级零信任

SSO + MFA + RBAC

MDM + 合规检查

网络分段

API Gateway

加密存储

SIEM 集中分析

Level 3

高级零信任

ABAC + 持续验证 + SPIFFE

实时设备评估

微分段 + mTLS

服务网格 + OPA

DLP + 标签化

行为分析 + UEBA

Level 4

最优化

AI 风险评估 + 自适应认证

自动隔离风险设备

软件定义边界

自动策略生成

自动分类 + 追踪

AI 驱动 + 自动响应

23.8 零信任实施路线图

        gantt
    title 零信任实施路线图(24个月)
    dateFormat  YYYY-MM
    axisFormat  %Y-%m

    section 第一阶段:身份统一
    部署 OIDC IdP(Keycloak)        :a1, 2025-01, 2m
    全面启用 MFA                      :a2, after a1, 2m
    部署 SPIFFE/SPIRE(工作负载身份)  :a3, after a1, 3m
    统一身份目录                      :a4, after a2, 2m

    section 第二阶段:微分段
    部署 Service Mesh(Istio)        :b1, 2025-07, 3m
    启用自动 mTLS                     :b2, after b1, 2m
    实施 Network Policy               :b3, after b1, 2m
    最小权限网络策略                   :b4, after b2, 2m

    section 第三阶段:持续验证
    部署细粒度授权(OpenFGA + OPA)    :c1, 2026-01, 3m
    设备信任评估                      :c2, after c1, 2m
    行为分析和异常检测                 :c3, after c1, 3m
    自适应认证                        :c4, after c2, 2m

    section 第四阶段:自动化响应
    SOAR 自动化响应                   :d1, 2026-07, 2m
    AI 驱动的风险评估                  :d2, after d1, 2m
    实时策略调整                      :d3, after d1, 3m
    持续合规监控                      :d4, after d2, 2m
    

23.9 代码示例

Python: 基于 FastAPI 的零信任 API Gateway

"""
零信任 API Gateway — 基于 FastAPI
功能:
  - 设备信任评估(设备指纹 + 合规状态)
  - 持续验证中间件(每次请求验证身份 + 上下文)
  - 风险评分计算
  - 审计日志

依赖安装:
  pip install fastapi uvicorn pyjwt httpx cryptography
"""

import hashlib
import json
import logging
import time
from dataclasses import dataclass, field
from datetime import datetime, timezone
from enum import Enum
from typing import Optional

import httpx
import jwt
from fastapi import FastAPI, Request, Response, HTTPException, Depends
from fastapi.middleware.base import BaseHTTPMiddleware
from starlette.middleware.base import RequestResponseEndpoint

# ─── 日志配置 ───────────────────────────────────────────────
logging.basicConfig(
    level=logging.INFO,
    format="%(asctime)s [%(levelname)s] %(name)s: %(message)s",
)
logger = logging.getLogger("zt-gateway")

app = FastAPI(title="Zero Trust API Gateway", version="1.0.0")


# ─── 数据模型 ───────────────────────────────────────────────
class TrustLevel(str, Enum):
    """设备信任等级"""
    FULL = "full"           # 完全信任:公司设备 + 合规 + 最新补丁
    PARTIAL = "partial"     # 部分信任:公司设备 + 部分合规
    LOW = "low"             # 低信任:个人设备 + MDM
    UNTRUSTED = "untrusted" # 不信任:未知设备


class RiskLevel(str, Enum):
    """风险等级"""
    LOW = "low"
    MEDIUM = "medium"
    HIGH = "high"
    CRITICAL = "critical"


@dataclass
class DeviceInfo:
    """设备信息"""
    device_id: str
    fingerprint: str
    os: str = ""
    os_version: str = ""
    is_managed: bool = False
    is_compliant: bool = False
    last_patch_date: Optional[str] = None
    trust_level: TrustLevel = TrustLevel.UNTRUSTED


@dataclass
class RequestContext:
    """请求上下文 — 用于零信任决策"""
    user_id: str = ""
    user_roles: list = field(default_factory=list)
    device: Optional[DeviceInfo] = None
    source_ip: str = ""
    geo_location: str = ""
    request_time: str = ""
    risk_score: float = 0.0
    risk_level: RiskLevel = RiskLevel.LOW


# ─── 设备信任评估 ───────────────────────────────────────────
class DeviceTrustEvaluator:
    """
    设备信任评估器
    根据设备属性计算信任等级
    """

    # 已知的公司设备 ID 列表(生产环境应从 MDM 系统查询)
    MANAGED_DEVICES = {"device-001", "device-002", "device-003"}

    @classmethod
    def evaluate(cls, request: Request) -> DeviceInfo:
        """从请求头中提取设备信息并评估信任等级"""
        # 提取设备信息(实际场景中来自 MDM agent 或设备证书)
        device_id = request.headers.get("X-Device-ID", "unknown")
        user_agent = request.headers.get("User-Agent", "")
        device_cert = request.headers.get("X-Device-Cert", "")

        # 计算设备指纹
        fingerprint_data = f"{device_id}:{user_agent}:{device_cert}"
        fingerprint = hashlib.sha256(fingerprint_data.encode()).hexdigest()[:16]

        device = DeviceInfo(
            device_id=device_id,
            fingerprint=fingerprint,
            os=request.headers.get("X-Device-OS", "unknown"),
            os_version=request.headers.get("X-Device-OS-Version", ""),
            is_managed=device_id in cls.MANAGED_DEVICES,
            is_compliant=request.headers.get("X-Device-Compliant", "false") == "true",
            last_patch_date=request.headers.get("X-Device-Last-Patch", None),
        )

        # 评估信任等级
        device.trust_level = cls._calculate_trust_level(device)
        return device

    @classmethod
    def _calculate_trust_level(cls, device: DeviceInfo) -> TrustLevel:
        """根据设备属性计算信任等级"""
        if not device.is_managed:
            return TrustLevel.UNTRUSTED

        if device.is_compliant and device.last_patch_date:
            # 检查补丁是否在 30 天内
            try:
                patch_date = datetime.fromisoformat(device.last_patch_date)
                days_since_patch = (datetime.now(timezone.utc) - patch_date).days
                if days_since_patch <= 30:
                    return TrustLevel.FULL
            except ValueError:
                pass
            return TrustLevel.PARTIAL

        if device.is_compliant:
            return TrustLevel.PARTIAL

        return TrustLevel.LOW


# ─── 风险评分计算 ───────────────────────────────────────────
class RiskScorer:
    """
    风险评分计算器
    综合多个信号计算请求的风险分数(0.0 - 1.0)
    """

    # 高风险地理位置(示例)
    HIGH_RISK_GEOS = {"CN-UNKNOWN", "RU-UNKNOWN", "KP"}

    # 高风险时间段(UTC 小时)
    UNUSUAL_HOURS = set(range(0, 6))  # 凌晨 0-6 点

    @classmethod
    def calculate(cls, ctx: RequestContext) -> tuple[float, RiskLevel]:
        """计算风险分数和风险等级"""
        score = 0.0

        # 1. 设备信任等级(权重 0.3)
        device_scores = {
            TrustLevel.FULL: 0.0,
            TrustLevel.PARTIAL: 0.1,
            TrustLevel.LOW: 0.2,
            TrustLevel.UNTRUSTED: 0.3,
        }
        if ctx.device:
            score += device_scores.get(ctx.device.trust_level, 0.3)

        # 2. 地理位置风险(权重 0.25)
        if ctx.geo_location in cls.HIGH_RISK_GEOS:
            score += 0.25
        elif not ctx.geo_location:
            score += 0.15  # 无法确定位置也有风险

        # 3. 时间异常(权重 0.2)
        try:
            req_hour = datetime.fromisoformat(ctx.request_time).hour
            if req_hour in cls.UNUSUAL_HOURS:
                score += 0.2
        except (ValueError, AttributeError):
            score += 0.1

        # 4. 无角色信息(权重 0.15)
        if not ctx.user_roles:
            score += 0.15

        # 5. 未知用户(权重 0.1)
        if not ctx.user_id:
            score += 0.1

        # 归一化到 [0, 1]
        score = min(score, 1.0)

        # 确定风险等级
        if score >= 0.7:
            level = RiskLevel.CRITICAL
        elif score >= 0.5:
            level = RiskLevel.HIGH
        elif score >= 0.3:
            level = RiskLevel.MEDIUM
        else:
            level = RiskLevel.LOW

        return score, level


# ─── JWT 验证 ──────────────────────────────────────────────
class JWTVerifier:
    """JWT 令牌验证器"""

    # 生产环境应从 JWKS 端点动态获取公钥
    SECRET_KEY = "your-256-bit-secret-for-demo-only"
    ALGORITHM = "HS256"
    ISSUER = "https://auth.example.com"

    @classmethod
    def verify(cls, token: str) -> dict:
        """
        验证 JWT 令牌并返回 claims
        Raises: HTTPException 如果验证失败
        """
        try:
            payload = jwt.decode(
                token,
                cls.SECRET_KEY,
                algorithms=[cls.ALGORITHM],
                issuer=cls.ISSUER,
                options={"require": ["exp", "sub", "iss"]},
            )
            return payload
        except jwt.ExpiredSignatureError:
            raise HTTPException(status_code=401, detail="Token has expired")
        except jwt.InvalidIssuerError:
            raise HTTPException(status_code=401, detail="Invalid token issuer")
        except jwt.InvalidTokenError as e:
            raise HTTPException(status_code=401, detail=f"Invalid token: {e}")


# ─── 审计日志 ──────────────────────────────────────────────
class AuditLogger:
    """零信任审计日志记录器"""

    @staticmethod
    def log_access(
        ctx: RequestContext,
        path: str,
        method: str,
        decision: str,
        reason: str = "",
        status_code: int = 200,
    ):
        """记录访问审计日志(结构化 JSON)"""
        entry = {
            "timestamp": datetime.now(timezone.utc).isoformat(),
            "event_type": "access_decision",
            "user_id": ctx.user_id,
            "source_ip": ctx.source_ip,
            "device_id": ctx.device.device_id if ctx.device else "unknown",
            "device_trust": ctx.device.trust_level.value if ctx.device else "unknown",
            "path": path,
            "method": method,
            "risk_score": ctx.risk_score,
            "risk_level": ctx.risk_level.value,
            "decision": decision,
            "reason": reason,
            "status_code": status_code,
        }
        logger.info(f"AUDIT: {json.dumps(entry)}")


# ─── 零信任中间件 ──────────────────────────────────────────
class ZeroTrustMiddleware(BaseHTTPMiddleware):
    """
    零信任持续验证中间件
    每个请求都经过:身份验证 → 设备评估 → 风险计算 → 授权决策
    """

    # 不需要认证的路径
    PUBLIC_PATHS = {"/health", "/metrics", "/docs", "/openapi.json"}

    # 资源敏感度映射
    RESOURCE_SENSITIVITY = {
        "/api/admin": "high",
        "/api/users": "medium",
        "/api/public": "low",
    }

    async def dispatch(
        self, request: Request, call_next: RequestResponseEndpoint
    ) -> Response:
        start_time = time.time()

        # 1. 公开路径直接放行
        if request.url.path in self.PUBLIC_PATHS:
            return await call_next(request)

        # 2. 构建请求上下文
        ctx = RequestContext(
            source_ip=request.client.host if request.client else "unknown",
            geo_location=request.headers.get("X-Geo-Location", ""),
            request_time=datetime.now(timezone.utc).isoformat(),
        )

        # 3. 身份验证(JWT)
        auth_header = request.headers.get("Authorization", "")
        if not auth_header.startswith("Bearer "):
            AuditLogger.log_access(
                ctx, request.url.path, request.method,
                decision="DENY", reason="missing_token", status_code=401,
            )
            raise HTTPException(status_code=401, detail="Missing Bearer token")

        token = auth_header[7:]
        claims = JWTVerifier.verify(token)
        ctx.user_id = claims.get("sub", "")
        ctx.user_roles = claims.get("roles", [])

        # 4. 设备信任评估
        ctx.device = DeviceTrustEvaluator.evaluate(request)

        # 5. 风险评分计算
        ctx.risk_score, ctx.risk_level = RiskScorer.calculate(ctx)

        # 6. 基于风险的访问决策
        sensitivity = self._get_resource_sensitivity(request.url.path)

        if ctx.risk_level == RiskLevel.CRITICAL:
            AuditLogger.log_access(
                ctx, request.url.path, request.method,
                decision="DENY", reason="critical_risk", status_code=403,
            )
            raise HTTPException(
                status_code=403,
                detail="Access denied: risk level too high",
            )

        if ctx.risk_level == RiskLevel.HIGH and sensitivity == "high":
            AuditLogger.log_access(
                ctx, request.url.path, request.method,
                decision="DENY", reason="high_risk_sensitive_resource",
                status_code=403,
            )
            raise HTTPException(
                status_code=403,
                detail="Access denied: high risk for sensitive resource. "
                       "Please use a managed device or contact IT.",
            )

        if (
            ctx.device
            and ctx.device.trust_level == TrustLevel.UNTRUSTED
            and sensitivity != "low"
        ):
            AuditLogger.log_access(
                ctx, request.url.path, request.method,
                decision="DENY", reason="untrusted_device", status_code=403,
            )
            raise HTTPException(
                status_code=403,
                detail="Access denied: untrusted device cannot access this resource",
            )

        # 7. 将上下文注入请求 state,供下游使用
        request.state.zt_context = ctx

        # 8. 执行请求
        response = await call_next(request)

        # 9. 记录审计日志
        duration_ms = (time.time() - start_time) * 1000
        AuditLogger.log_access(
            ctx, request.url.path, request.method,
            decision="ALLOW", reason="passed_all_checks",
            status_code=response.status_code,
        )

        # 10. 添加零信任响应头
        response.headers["X-ZT-Risk-Score"] = str(round(ctx.risk_score, 2))
        response.headers["X-ZT-Risk-Level"] = ctx.risk_level.value
        response.headers["X-ZT-Device-Trust"] = (
            ctx.device.trust_level.value if ctx.device else "unknown"
        )
        response.headers["X-ZT-Duration-Ms"] = str(round(duration_ms, 2))

        return response

    def _get_resource_sensitivity(self, path: str) -> str:
        """获取资源敏感度"""
        for prefix, sensitivity in self.RESOURCE_SENSITIVITY.items():
            if path.startswith(prefix):
                return sensitivity
        return "medium"  # 默认中等敏感度


# ─── 注册中间件 ────────────────────────────────────────────
app.add_middleware(ZeroTrustMiddleware)


# ─── API 端点 ──────────────────────────────────────────────
@app.get("/health")
async def health():
    """健康检查(公开端点)"""
    return {"status": "healthy", "service": "zt-gateway"}


@app.get("/api/public/info")
async def public_info(request: Request):
    """公开信息(低敏感度)"""
    ctx: RequestContext = request.state.zt_context
    return {
        "message": "Public information",
        "your_risk_score": ctx.risk_score,
        "device_trust": ctx.device.trust_level.value if ctx.device else "unknown",
    }


@app.get("/api/users/me")
async def get_current_user(request: Request):
    """获取当前用户信息(中等敏感度)"""
    ctx: RequestContext = request.state.zt_context
    return {
        "user_id": ctx.user_id,
        "roles": ctx.user_roles,
        "device_trust": ctx.device.trust_level.value if ctx.device else "unknown",
        "risk_level": ctx.risk_level.value,
    }


@app.get("/api/admin/dashboard")
async def admin_dashboard(request: Request):
    """管理面板(高敏感度,需要 admin 角色)"""
    ctx: RequestContext = request.state.zt_context
    if "admin" not in ctx.user_roles:
        AuditLogger.log_access(
            ctx, request.url.path, request.method,
            decision="DENY", reason="insufficient_role", status_code=403,
        )
        raise HTTPException(status_code=403, detail="Admin role required")
    return {
        "message": "Welcome to admin dashboard",
        "user_id": ctx.user_id,
        "risk_score": ctx.risk_score,
    }


if __name__ == "__main__":
    import uvicorn
    uvicorn.run(app, host="0.0.0.0", port=8080)

Java: Spring Cloud Gateway 零信任过滤器

/**
 * 零信任 Spring Cloud Gateway 过滤器
 * 功能:JWT 验证 + 设备指纹 + 风险评分
 *
 * 依赖(build.gradle):
 *   implementation 'org.springframework.cloud:spring-cloud-starter-gateway'
 *   implementation 'io.jsonwebtoken:jjwt-api:0.12.5'
 *   runtimeOnly 'io.jsonwebtoken:jjwt-impl:0.12.5'
 *   runtimeOnly 'io.jsonwebtoken:jjwt-jackson:0.12.5'
 */
package com.example.zerotrust.gateway;

import io.jsonwebtoken.Claims;
import io.jsonwebtoken.JwtException;
import io.jsonwebtoken.Jwts;
import io.jsonwebtoken.security.Keys;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.cloud.gateway.filter.GatewayFilterChain;
import org.springframework.cloud.gateway.filter.GlobalFilter;
import org.springframework.core.Ordered;
import org.springframework.http.HttpHeaders;
import org.springframework.http.HttpStatus;
import org.springframework.http.server.reactive.ServerHttpRequest;
import org.springframework.stereotype.Component;
import org.springframework.web.server.ServerWebExchange;
import reactor.core.publisher.Mono;

import javax.crypto.SecretKey;
import java.nio.charset.StandardCharsets;
import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException;
import java.time.Instant;
import java.time.LocalTime;
import java.util.*;

/**
 * 零信任全局过滤器
 * 对每个请求执行:身份验证 → 设备评估 → 风险计算 → 访问决策
 */
@Component
public class ZeroTrustGatewayFilter implements GlobalFilter, Ordered {

    private static final Logger log = LoggerFactory.getLogger(ZeroTrustGatewayFilter.class);

    // 公开路径(不需要认证)
    private static final Set<String> PUBLIC_PATHS = Set.of(
        "/health", "/metrics", "/actuator/health"
    );

    // JWT 密钥(生产环境应使用 RSA/EC 公钥,从 JWKS 端点获取)
    private static final SecretKey JWT_KEY = Keys.hmacShaKeyFor(
        "your-256-bit-secret-key-for-demo-only!!".getBytes(StandardCharsets.UTF_8)
    );

    // 已知的公司管理设备
    private static final Set<String> MANAGED_DEVICES = Set.of(
        "device-001", "device-002", "device-003"
    );

    @Override
    public int getOrder() {
        return -1; // 最高优先级
    }

    @Override
    public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
        ServerHttpRequest request = exchange.getRequest();
        String path = request.getURI().getPath();
        String method = request.getMethod().name();

        // 1. 公开路径直接放行
        if (PUBLIC_PATHS.contains(path)) {
            return chain.filter(exchange);
        }

        // 2. 提取并验证 JWT
        String authHeader = request.getHeaders().getFirst(HttpHeaders.AUTHORIZATION);
        if (authHeader == null || !authHeader.startsWith("Bearer ")) {
            log.warn("AUDIT: DENY path={} reason=missing_token ip={}",
                path, getClientIp(request));
            return rejectRequest(exchange, HttpStatus.UNAUTHORIZED, "Missing Bearer token");
        }

        Claims claims;
        try {
            claims = Jwts.parser()
                .verifyWith(JWT_KEY)
                .requireIssuer("https://auth.example.com")
                .build()
                .parseSignedClaims(authHeader.substring(7))
                .getPayload();
        } catch (JwtException e) {
            log.warn("AUDIT: DENY path={} reason=invalid_token error={}", path, e.getMessage());
            return rejectRequest(exchange, HttpStatus.UNAUTHORIZED, "Invalid token: " + e.getMessage());
        }

        String userId = claims.getSubject();
        @SuppressWarnings("unchecked")
        List<String> roles = claims.get("roles", List.class);
        if (roles == null) roles = Collections.emptyList();

        // 3. 设备信任评估
        DeviceInfo device = evaluateDevice(request);

        // 4. 风险评分计算
        RiskAssessment risk = calculateRisk(device, request, roles);

        // 5. 访问决策
        String sensitivity = getResourceSensitivity(path);

        if (risk.score >= 0.7) {
            log.warn("AUDIT: DENY user={} path={} risk={} reason=critical_risk device={}",
                userId, path, risk.score, device.deviceId);
            return rejectRequest(exchange, HttpStatus.FORBIDDEN,
                "Access denied: risk level too high");
        }

        if (risk.score >= 0.5 && "high".equals(sensitivity)) {
            log.warn("AUDIT: DENY user={} path={} risk={} reason=high_risk_sensitive",
                userId, path, risk.score);
            return rejectRequest(exchange, HttpStatus.FORBIDDEN,
                "Access denied: high risk for sensitive resource");
        }

        if (device.trustLevel == TrustLevel.UNTRUSTED && !"low".equals(sensitivity)) {
            log.warn("AUDIT: DENY user={} path={} reason=untrusted_device device={}",
                userId, path, device.deviceId);
            return rejectRequest(exchange, HttpStatus.FORBIDDEN,
                "Access denied: untrusted device");
        }

        // 6. 记录审计日志
        log.info("AUDIT: ALLOW user={} path={} method={} risk={} device_trust={} ip={}",
            userId, path, method, risk.score, device.trustLevel, getClientIp(request));

        // 7. 将零信任上下文注入下游请求头
        ServerHttpRequest mutatedRequest = request.mutate()
            .header("X-ZT-User-Id", userId)
            .header("X-ZT-User-Roles", String.join(",", roles))
            .header("X-ZT-Device-Trust", device.trustLevel.name())
            .header("X-ZT-Device-Fingerprint", device.fingerprint)
            .header("X-ZT-Risk-Score", String.valueOf(risk.score))
            .header("X-ZT-Risk-Level", risk.level)
            .build();

        return chain.filter(exchange.mutate().request(mutatedRequest).build());
    }

    // ─── 设备信任评估 ──────────────────────────────────────
    private DeviceInfo evaluateDevice(ServerHttpRequest request) {
        String deviceId = getHeader(request, "X-Device-ID", "unknown");
        String userAgent = getHeader(request, "User-Agent", "");
        String deviceCert = getHeader(request, "X-Device-Cert", "");
        boolean isCompliant = "true".equals(getHeader(request, "X-Device-Compliant", "false"));
        boolean isManaged = MANAGED_DEVICES.contains(deviceId);

        // 计算设备指纹
        String fingerprint = computeFingerprint(deviceId, userAgent, deviceCert);

        // 确定信任等级
        TrustLevel trustLevel;
        if (!isManaged) {
            trustLevel = TrustLevel.UNTRUSTED;
        } else if (isCompliant) {
            trustLevel = TrustLevel.FULL;
        } else {
            trustLevel = TrustLevel.PARTIAL;
        }

        return new DeviceInfo(deviceId, fingerprint, isManaged, isCompliant, trustLevel);
    }

    // ─── 风险评分 ──────────────────────────────────────────
    private RiskAssessment calculateRisk(
            DeviceInfo device, ServerHttpRequest request, List<String> roles) {
        double score = 0.0;

        // 设备信任(权重 0.3)
        switch (device.trustLevel) {
            case UNTRUSTED -> score += 0.3;
            case LOW -> score += 0.2;
            case PARTIAL -> score += 0.1;
            case FULL -> score += 0.0;
        }

        // 时间异常(权重 0.2)— 凌晨 0-6 点
        int hour = LocalTime.now().getHour();
        if (hour >= 0 && hour < 6) {
            score += 0.2;
        }

        // 无角色(权重 0.15)
        if (roles.isEmpty()) {
            score += 0.15;
        }

        // 地理位置风险(权重 0.25)
        String geo = getHeader(request, "X-Geo-Location", "");
        if (geo.isEmpty()) {
            score += 0.15;
        }

        score = Math.min(score, 1.0);

        String level;
        if (score >= 0.7) level = "critical";
        else if (score >= 0.5) level = "high";
        else if (score >= 0.3) level = "medium";
        else level = "low";

        return new RiskAssessment(Math.round(score * 100.0) / 100.0, level);
    }

    // ─── 辅助方法 ──────────────────────────────────────────
    private String getResourceSensitivity(String path) {
        if (path.startsWith("/api/admin")) return "high";
        if (path.startsWith("/api/users")) return "medium";
        if (path.startsWith("/api/public")) return "low";
        return "medium";
    }

    private Mono<Void> rejectRequest(
            ServerWebExchange exchange, HttpStatus status, String message) {
        exchange.getResponse().setStatusCode(status);
        exchange.getResponse().getHeaders().add("Content-Type", "application/json");
        byte[] body = ("{\"error\":\"" + message + "\"}").getBytes(StandardCharsets.UTF_8);
        return exchange.getResponse().writeWith(
            Mono.just(exchange.getResponse().bufferFactory().wrap(body))
        );
    }

    private String getClientIp(ServerHttpRequest request) {
        String xff = request.getHeaders().getFirst("X-Forwarded-For");
        if (xff != null && !xff.isEmpty()) {
            return xff.split(",")[0].trim();
        }
        return request.getRemoteAddress() != null
            ? request.getRemoteAddress().getAddress().getHostAddress()
            : "unknown";
    }

    private String getHeader(ServerHttpRequest request, String name, String defaultValue) {
        String value = request.getHeaders().getFirst(name);
        return value != null ? value : defaultValue;
    }

    private String computeFingerprint(String deviceId, String userAgent, String cert) {
        try {
            MessageDigest md = MessageDigest.getInstance("SHA-256");
            md.update((deviceId + ":" + userAgent + ":" + cert).getBytes(StandardCharsets.UTF_8));
            byte[] hash = md.digest();
            StringBuilder sb = new StringBuilder();
            for (int i = 0; i < 8; i++) {
                sb.append(String.format("%02x", hash[i]));
            }
            return sb.toString();
        } catch (NoSuchAlgorithmException e) {
            return "unknown";
        }
    }

    // ─── 内部数据类 ────────────────────────────────────────
    enum TrustLevel { FULL, PARTIAL, LOW, UNTRUSTED }

    record DeviceInfo(
        String deviceId,
        String fingerprint,
        boolean isManaged,
        boolean isCompliant,
        TrustLevel trustLevel
    ) {}

    record RiskAssessment(double score, String level) {}
}

Go: 零信任 Proxy(mTLS + OPA 策略检查 + 审计日志)

/*
零信任反向代理 — Go 实现
功能:
  - mTLS 双向认证(验证客户端证书)
  - OPA 策略检查(通过 HTTP API 调用 OPA)
  - 结构化审计日志(JSON 格式)
  - 设备信任评估
  - 请求上下文传播

运行方式:
  go run main.go \
    --listen=:8443 \
    --backend=http://localhost:8080 \
    --opa-url=http://localhost:8181 \
    --tls-cert=server.crt \
    --tls-key=server.key \
    --ca-cert=ca.crt
*/
package main

import (
	"bytes"
	"context"
	"crypto/sha256"
	"crypto/tls"
	"crypto/x509"
	"encoding/json"
	"flag"
	"fmt"
	"io"
	"log/slog"
	"net/http"
	"net/http/httputil"
	"net/url"
	"os"
	"os/signal"
	"strings"
	"syscall"
	"time"
)

// ─── 配置 ──────────────────────────────────────────────────
type Config struct {
	ListenAddr string
	BackendURL string
	OPAUrl     string
	TLSCert    string
	TLSKey     string
	CACert     string
}

func parseFlags() *Config {
	cfg := &Config{}
	flag.StringVar(&cfg.ListenAddr, "listen", ":8443", "Listen address")
	flag.StringVar(&cfg.BackendURL, "backend", "http://localhost:8080", "Backend URL")
	flag.StringVar(&cfg.OPAUrl, "opa-url", "http://localhost:8181", "OPA server URL")
	flag.StringVar(&cfg.TLSCert, "tls-cert", "server.crt", "TLS certificate file")
	flag.StringVar(&cfg.TLSKey, "tls-key", "server.key", "TLS private key file")
	flag.StringVar(&cfg.CACert, "ca-cert", "ca.crt", "CA certificate for client verification")
	flag.Parse()
	return cfg
}

// ─── 数据模型 ──────────────────────────────────────────────
// TrustLevel 表示设备信任等级
type TrustLevel string

const (
	TrustFull      TrustLevel = "full"
	TrustPartial   TrustLevel = "partial"
	TrustLow       TrustLevel = "low"
	TrustUntrusted TrustLevel = "untrusted"
)

// RequestContext 零信任请求上下文
type RequestContext struct {
	UserID          string     `json:"user_id"`
	ClientCertSAN   string     `json:"client_cert_san"`
	DeviceID        string     `json:"device_id"`
	DeviceFingerprint string   `json:"device_fingerprint"`
	DeviceTrust     TrustLevel `json:"device_trust"`
	SourceIP        string     `json:"source_ip"`
	Path            string     `json:"path"`
	Method          string     `json:"method"`
	RiskScore       float64    `json:"risk_score"`
	Timestamp       time.Time  `json:"timestamp"`
}

// AuditEntry 审计日志条目
type AuditEntry struct {
	Timestamp   string     `json:"timestamp"`
	EventType   string     `json:"event_type"`
	UserID      string     `json:"user_id"`
	CertSAN     string     `json:"cert_san"`
	DeviceID    string     `json:"device_id"`
	DeviceTrust TrustLevel `json:"device_trust"`
	SourceIP    string     `json:"source_ip"`
	Path        string     `json:"path"`
	Method      string     `json:"method"`
	Decision    string     `json:"decision"`
	Reason      string     `json:"reason"`
	RiskScore   float64    `json:"risk_score"`
	DurationMs  float64    `json:"duration_ms"`
	StatusCode  int        `json:"status_code"`
}

// OPAInput OPA 策略查询输入
type OPAInput struct {
	Input struct {
		User        string     `json:"user"`
		CertSAN     string     `json:"cert_san"`
		Path        string     `json:"path"`
		Method      string     `json:"method"`
		DeviceTrust TrustLevel `json:"device_trust"`
		SourceIP    string     `json:"source_ip"`
		RiskScore   float64    `json:"risk_score"`
	} `json:"input"`
}

// OPAResult OPA 策略查询结果
type OPAResult struct {
	Result struct {
		Allow  bool   `json:"allow"`
		Reason string `json:"reason,omitempty"`
	} `json:"result"`
}

// ─── 审计日志记录器 ────────────────────────────────────────
type AuditLogger struct {
	logger *slog.Logger
}

func NewAuditLogger() *AuditLogger {
	// 使用 JSON 格式输出到 stdout
	handler := slog.NewJSONHandler(os.Stdout, &slog.HandlerOptions{
		Level: slog.LevelInfo,
	})
	return &AuditLogger{
		logger: slog.New(handler),
	}
}

func (a *AuditLogger) Log(entry AuditEntry) {
	a.logger.Info("access_decision",
		"event_type", entry.EventType,
		"user_id", entry.UserID,
		"cert_san", entry.CertSAN,
		"device_id", entry.DeviceID,
		"device_trust", entry.DeviceTrust,
		"source_ip", entry.SourceIP,
		"path", entry.Path,
		"method", entry.Method,
		"decision", entry.Decision,
		"reason", entry.Reason,
		"risk_score", entry.RiskScore,
		"duration_ms", entry.DurationMs,
		"status_code", entry.StatusCode,
	)
}

// ─── 设备信任评估 ──────────────────────────────────────────
// 已知的公司管理设备(生产环境应查询 MDM 系统)
var managedDevices = map[string]bool{
	"device-001": true,
	"device-002": true,
	"device-003": true,
}

func evaluateDeviceTrust(r *http.Request) (string, string, TrustLevel) {
	deviceID := r.Header.Get("X-Device-ID")
	if deviceID == "" {
		deviceID = "unknown"
	}
	userAgent := r.Header.Get("User-Agent")
	isCompliant := r.Header.Get("X-Device-Compliant") == "true"

	// 计算设备指纹
	h := sha256.New()
	h.Write([]byte(fmt.Sprintf("%s:%s", deviceID, userAgent)))
	fingerprint := fmt.Sprintf("%x", h.Sum(nil))[:16]

	// 评估信任等级
	isManaged := managedDevices[deviceID]
	var trust TrustLevel
	switch {
	case !isManaged:
		trust = TrustUntrusted
	case isManaged && isCompliant:
		trust = TrustFull
	case isManaged:
		trust = TrustPartial
	default:
		trust = TrustLow
	}

	return deviceID, fingerprint, trust
}

// ─── OPA 策略检查 ──────────────────────────────────────────
type OPAChecker struct {
	baseURL    string
	httpClient *http.Client
}

func NewOPAChecker(baseURL string) *OPAChecker {
	return &OPAChecker{
		baseURL: baseURL,
		httpClient: &http.Client{
			Timeout: 2 * time.Second,
		},
	}
}

// Check 向 OPA 发送策略查询
func (o *OPAChecker) Check(ctx context.Context, reqCtx *RequestContext) (bool, string, error) {
	// 构建 OPA 输入
	input := OPAInput{}
	input.Input.User = reqCtx.UserID
	input.Input.CertSAN = reqCtx.ClientCertSAN
	input.Input.Path = reqCtx.Path
	input.Input.Method = reqCtx.Method
	input.Input.DeviceTrust = reqCtx.DeviceTrust
	input.Input.SourceIP = reqCtx.SourceIP
	input.Input.RiskScore = reqCtx.RiskScore

	body, err := json.Marshal(input)
	if err != nil {
		return false, "marshal_error", fmt.Errorf("failed to marshal OPA input: %w", err)
	}

	// 发送请求到 OPA
	opaURL := fmt.Sprintf("%s/v1/data/zerotrust/authz", o.baseURL)
	req, err := http.NewRequestWithContext(ctx, http.MethodPost, opaURL, bytes.NewReader(body))
	if err != nil {
		return false, "request_error", fmt.Errorf("failed to create OPA request: %w", err)
	}
	req.Header.Set("Content-Type", "application/json")

	resp, err := o.httpClient.Do(req)
	if err != nil {
		// OPA 不可用时,默认拒绝(fail-close)
		return false, "opa_unavailable", fmt.Errorf("OPA request failed: %w", err)
	}
	defer resp.Body.Close()

	if resp.StatusCode != http.StatusOK {
		respBody, _ := io.ReadAll(resp.Body)
		return false, "opa_error", fmt.Errorf("OPA returned status %d: %s", resp.StatusCode, respBody)
	}

	var result OPAResult
	if err := json.NewDecoder(resp.Body).Decode(&result); err != nil {
		return false, "decode_error", fmt.Errorf("failed to decode OPA response: %w", err)
	}

	return result.Result.Allow, result.Result.Reason, nil
}

// ─── 零信任代理 ────────────────────────────────────────────
type ZeroTrustProxy struct {
	proxy      *httputil.ReverseProxy
	opaChecker *OPAChecker
	audit      *AuditLogger
}

func NewZeroTrustProxy(backendURL string, opaURL string) (*ZeroTrustProxy, error) {
	target, err := url.Parse(backendURL)
	if err != nil {
		return nil, fmt.Errorf("invalid backend URL: %w", err)
	}

	proxy := httputil.NewSingleHostReverseProxy(target)

	// 自定义错误处理
	proxy.ErrorHandler = func(w http.ResponseWriter, r *http.Request, err error) {
		slog.Error("proxy error", "error", err, "path", r.URL.Path)
		http.Error(w, `{"error":"backend unavailable"}`, http.StatusBadGateway)
	}

	return &ZeroTrustProxy{
		proxy:      proxy,
		opaChecker: NewOPAChecker(opaURL),
		audit:      NewAuditLogger(),
	}, nil
}

// ServeHTTP 处理每个请求的零信任验证
func (ztp *ZeroTrustProxy) ServeHTTP(w http.ResponseWriter, r *http.Request) {
	start := time.Now()

	// 1. 公开路径直接放行
	if r.URL.Path == "/health" || r.URL.Path == "/metrics" {
		ztp.proxy.ServeHTTP(w, r)
		return
	}

	// 2. 提取客户端证书信息(mTLS)
	var certSAN string
	var userID string
	if r.TLS != nil && len(r.TLS.PeerCertificates) > 0 {
		cert := r.TLS.PeerCertificates[0]
		// 使用 SPIFFE ID 或 DNS SAN 作为身份
		for _, uri := range cert.URIs {
			if strings.HasPrefix(uri.String(), "spiffe://") {
				certSAN = uri.String()
				break
			}
		}
		if certSAN == "" && len(cert.DNSNames) > 0 {
			certSAN = cert.DNSNames[0]
		}
		userID = cert.Subject.CommonName
	} else {
		// 没有客户端证书 → 拒绝
		ztp.audit.Log(AuditEntry{
			Timestamp: time.Now().UTC().Format(time.RFC3339),
			EventType: "access_decision",
			SourceIP:  getClientIP(r),
			Path:      r.URL.Path,
			Method:    r.Method,
			Decision:  "DENY",
			Reason:    "no_client_certificate",
		})
		http.Error(w, `{"error":"mTLS required: no client certificate"}`, http.StatusUnauthorized)
		return
	}

	// 3. 设备信任评估
	deviceID, fingerprint, deviceTrust := evaluateDeviceTrust(r)

	// 4. 构建请求上下文
	reqCtx := &RequestContext{
		UserID:            userID,
		ClientCertSAN:     certSAN,
		DeviceID:          deviceID,
		DeviceFingerprint: fingerprint,
		DeviceTrust:       deviceTrust,
		SourceIP:          getClientIP(r),
		Path:              r.URL.Path,
		Method:            r.Method,
		Timestamp:         time.Now().UTC(),
	}

	// 5. OPA 策略检查
	allowed, reason, err := ztp.opaChecker.Check(r.Context(), reqCtx)
	if err != nil {
		slog.Warn("OPA check failed, defaulting to deny",
			"error", err, "path", r.URL.Path, "user", userID)
		// fail-close: OPA 不可用时拒绝请求
		ztp.audit.Log(AuditEntry{
			Timestamp:   time.Now().UTC().Format(time.RFC3339),
			EventType:   "access_decision",
			UserID:      userID,
			CertSAN:     certSAN,
			DeviceID:    deviceID,
			DeviceTrust: deviceTrust,
			SourceIP:    getClientIP(r),
			Path:        r.URL.Path,
			Method:      r.Method,
			Decision:    "DENY",
			Reason:      fmt.Sprintf("opa_error: %v", err),
			DurationMs:  float64(time.Since(start).Milliseconds()),
		})
		http.Error(w, `{"error":"authorization service unavailable"}`, http.StatusForbidden)
		return
	}

	if !allowed {
		ztp.audit.Log(AuditEntry{
			Timestamp:   time.Now().UTC().Format(time.RFC3339),
			EventType:   "access_decision",
			UserID:      userID,
			CertSAN:     certSAN,
			DeviceID:    deviceID,
			DeviceTrust: deviceTrust,
			SourceIP:    getClientIP(r),
			Path:        r.URL.Path,
			Method:      r.Method,
			Decision:    "DENY",
			Reason:      reason,
			DurationMs:  float64(time.Since(start).Milliseconds()),
			StatusCode:  http.StatusForbidden,
		})
		http.Error(w, fmt.Sprintf(`{"error":"access denied","reason":"%s"}`, reason),
			http.StatusForbidden)
		return
	}

	// 6. 注入零信任上下文头到下游请求
	r.Header.Set("X-ZT-User-Id", userID)
	r.Header.Set("X-ZT-Cert-SAN", certSAN)
	r.Header.Set("X-ZT-Device-ID", deviceID)
	r.Header.Set("X-ZT-Device-Fingerprint", fingerprint)
	r.Header.Set("X-ZT-Device-Trust", string(deviceTrust))
	r.Header.Set("X-ZT-Source-IP", getClientIP(r))

	// 7. 转发请求到后端
	recorder := &statusRecorder{ResponseWriter: w, statusCode: http.StatusOK}
	ztp.proxy.ServeHTTP(recorder, r)

	// 8. 记录审计日志
	duration := time.Since(start)
	ztp.audit.Log(AuditEntry{
		Timestamp:   time.Now().UTC().Format(time.RFC3339),
		EventType:   "access_decision",
		UserID:      userID,
		CertSAN:     certSAN,
		DeviceID:    deviceID,
		DeviceTrust: deviceTrust,
		SourceIP:    getClientIP(r),
		Path:        r.URL.Path,
		Method:      r.Method,
		Decision:    "ALLOW",
		Reason:      reason,
		RiskScore:   reqCtx.RiskScore,
		DurationMs:  float64(duration.Milliseconds()),
		StatusCode:  recorder.statusCode,
	})
}

// statusRecorder 用于捕获响应状态码
type statusRecorder struct {
	http.ResponseWriter
	statusCode int
}

func (sr *statusRecorder) WriteHeader(code int) {
	sr.statusCode = code
	sr.ResponseWriter.WriteHeader(code)
}

// getClientIP 提取客户端真实 IP
func getClientIP(r *http.Request) string {
	if xff := r.Header.Get("X-Forwarded-For"); xff != "" {
		parts := strings.SplitN(xff, ",", 2)
		return strings.TrimSpace(parts[0])
	}
	if host, _, err := strings.Cut(r.RemoteAddr, ":"); err {
		return host
	}
	return r.RemoteAddr
}

// ─── 主函数 ────────────────────────────────────────────────
func main() {
	cfg := parseFlags()

	// 创建零信任代理
	proxy, err := NewZeroTrustProxy(cfg.BackendURL, cfg.OPAUrl)
	if err != nil {
		slog.Error("failed to create proxy", "error", err)
		os.Exit(1)
	}

	// 加载 CA 证书(用于验证客户端证书)
	caCert, err := os.ReadFile(cfg.CACert)
	if err != nil {
		slog.Error("failed to read CA cert", "error", err, "path", cfg.CACert)
		os.Exit(1)
	}
	caCertPool := x509.NewCertPool()
	if !caCertPool.AppendCertsFromPEM(caCert) {
		slog.Error("failed to parse CA cert")
		os.Exit(1)
	}

	// 配置 mTLS
	tlsConfig := &tls.Config{
		ClientCAs:  caCertPool,
		ClientAuth: tls.RequireAndVerifyClientCert, // 强制客户端证书
		MinVersion: tls.VersionTLS13,               // 最低 TLS 1.3
		CipherSuites: []uint16{
			tls.TLS_AES_256_GCM_SHA384,
			tls.TLS_CHACHA20_POLY1305_SHA256,
		},
	}

	server := &http.Server{
		Addr:         cfg.ListenAddr,
		Handler:      proxy,
		TLSConfig:    tlsConfig,
		ReadTimeout:  30 * time.Second,
		WriteTimeout: 30 * time.Second,
		IdleTimeout:  120 * time.Second,
	}

	// 优雅关闭
	go func() {
		sigCh := make(chan os.Signal, 1)
		signal.Notify(sigCh, syscall.SIGINT, syscall.SIGTERM)
		sig := <-sigCh
		slog.Info("received signal, shutting down", "signal", sig)

		ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
		defer cancel()
		if err := server.Shutdown(ctx); err != nil {
			slog.Error("shutdown error", "error", err)
		}
	}()

	slog.Info("Zero Trust Proxy starting",
		"listen", cfg.ListenAddr,
		"backend", cfg.BackendURL,
		"opa", cfg.OPAUrl,
	)

	if err := server.ListenAndServeTLS(cfg.TLSCert, cfg.TLSKey); err != http.ErrServerClosed {
		slog.Error("server error", "error", err)
		os.Exit(1)
	}

	slog.Info("server stopped gracefully")
}

配套 OPA 策略文件 (zerotrust/authz.rego):

# OPA 零信任授权策略
# 路径: zerotrust/authz.rego
package zerotrust.authz

import rego.v1

default allow := false
default reason := "denied_by_default"

# 规则 1: 完全信任的设备可以访问所有资源
allow if {
    input.device_trust == "full"
    input.risk_score < 0.7
}

reason := "full_trust_device" if {
    input.device_trust == "full"
    input.risk_score < 0.7
}

# 规则 2: 部分信任的设备只能访问非管理端点
allow if {
    input.device_trust == "partial"
    not startswith(input.path, "/api/admin")
    input.risk_score < 0.5
}

reason := "partial_trust_non_admin" if {
    input.device_trust == "partial"
    not startswith(input.path, "/api/admin")
    input.risk_score < 0.5
}

# 规则 3: 只读操作对低信任设备开放公开端点
allow if {
    input.device_trust == "low"
    startswith(input.path, "/api/public")
    input.method == "GET"
}

reason := "low_trust_public_readonly" if {
    input.device_trust == "low"
    startswith(input.path, "/api/public")
    input.method == "GET"
}

# 规则 4: 不信任的设备一律拒绝
reason := "untrusted_device" if {
    input.device_trust == "untrusted"
}

# 规则 5: 高风险分数一律拒绝
reason := "high_risk_score" if {
    input.risk_score >= 0.7
}

23.9 小结

  • 零信任 的核心是”永不信任,始终验证”

  • NIST SP 800-207 定义了标准的零信任架构(PE/PA/PEP)

  • 零信任有五大支柱:身份、设备、网络、应用、数据

  • BeyondCorp 是 Google 的零信任实践,用 Access Proxy 替代 VPN

  • ZTNA 比 VPN 更安全、更细粒度、更好的用户体验

  • 零信任实施是渐进式的,从身份统一开始,逐步深入