第二十四章:Service Mesh 安全

“Service Mesh 让零信任从理念变为现实 — 每个服务间的通信都被加密、认证和授权。”

        mindmap
  root((Service Mesh 安全))
    Istio
      Citadel
      Envoy
      Pilot
      SPIFFE ID
    安全特性
      自动 mTLS
      AuthorizationPolicy
      RequestAuthentication
    集成
      SPIRE
      OPA
      ext_authz
    对比
      Linkerd
      Cilium
    

24.1 Service Mesh 与零信任

Service Mesh 在基础设施层实现了零信任的核心要求:

        flowchart LR
    subgraph no_mesh["没有 Service Mesh"]
        A1["Service A"] -- "明文 HTTP<br/>无认证、无授权、无加密" --> B1["Service B"]
    end

    subgraph with_mesh["有了 Service Mesh"]
        A2["Service A"] --> EA["Envoy Sidecar<br/>自动注入"]
        EA == "mTLS<br/>加密+认证+授权" ==> EB["Envoy Sidecar<br/>自动注入"]
        EB --> B2["Service B"]
    end

    style no_mesh fill:#fee,stroke:#c00
    style with_mesh fill:#efe,stroke:#0a0
    

24.2 Istio 安全架构

        flowchart TB
    subgraph cp["Istio 控制平面"]
        direction LR
        istiod["Istiod<br/>(核心)"]
        citadel["Citadel<br/>(证书管理)<br/>SPIFFE CA"]
        pilot["Pilot<br/>(配置分发)<br/>xDS API"]
    end

    cp -- "xDS (证书 + 策略)" --> dp

    subgraph dp["数据平面"]
        subgraph podA["Pod A"]
            svcA["Service A"]
            envoyA["Envoy Sidecar<br/>• mTLS 终止/发起<br/>• 授权策略执行<br/>• JWT 验证<br/>• 遥测数据收集"]
            svcA <--> envoyA
        end
        subgraph podB["Pod B"]
            envoyB["Envoy Sidecar"]
            svcB["Service B"]
            envoyB <--> svcB
        end
        envoyA == "mTLS" ==> envoyB
    end

    identity["身份模型<br/>每个工作负载获得 SPIFFE ID:<br/>spiffe://cluster.local/ns/namespace/sa/service-account"]

    dp --- identity

    style cp fill:#e8f0fe,stroke:#1a73e8
    style dp fill:#fef7e0,stroke:#f9ab00
    

24.3 自动 mTLS

Sidecar mTLS 握手流程

        sequenceDiagram
    participant A as Service A
    participant EA as Envoy Sidecar A
    participant Istiod as Istiod (Citadel)
    participant EB as Envoy Sidecar B
    participant B as Service B

    Note over EA, Istiod: 启动阶段:证书签发
    EA ->> Istiod: CSR (Certificate Signing Request)
    Istiod -->> EA: 签发 SVID 证书<br/>spiffe://cluster.local/ns/prod/sa/svc-a
    EB ->> Istiod: CSR
    Istiod -->> EB: 签发 SVID 证书<br/>spiffe://cluster.local/ns/prod/sa/svc-b

    Note over A, B: 运行阶段:mTLS 通信
    A ->> EA: HTTP 请求 (明文, localhost)
    EA ->> EB: TLS ClientHello (SNI: outbound_.80_._.svc-b)
    EB -->> EA: TLS ServerHello + 服务端证书
    EA ->> EB: 客户端证书 (双向认证)
    EA ->> EB: 加密的 HTTP 请求
    EB ->> B: HTTP 请求 (明文, localhost)
    B -->> EB: HTTP 响应
    EB -->> EA: 加密的 HTTP 响应
    EA -->> A: HTTP 响应

    Note over EA, EB: 证书自动轮换 (默认24小时)
    Istiod -->> EA: 新证书推送 (SDS API)
    Istiod -->> EB: 新证书推送 (SDS API)
    

PeerAuthentication

# 全局启用严格 mTLS
apiVersion: security.istio.io/v1
kind: PeerAuthentication
metadata:
  name: default
  namespace: istio-system
spec:
  mtls:
    mode: STRICT  # 所有服务间通信必须 mTLS

---
# 特定命名空间允许明文(迁移期间)
apiVersion: security.istio.io/v1
kind: PeerAuthentication
metadata:
  name: legacy-permissive
  namespace: legacy-apps
spec:
  mtls:
    mode: PERMISSIVE  # 同时接受 mTLS 和明文

---
# 特定端口排除 mTLS
apiVersion: security.istio.io/v1
kind: PeerAuthentication
metadata:
  name: db-service
  namespace: production
spec:
  selector:
    matchLabels:
      app: database
  mtls:
    mode: STRICT
  portLevelMtls:
    9090:
      mode: PERMISSIVE  # metrics 端口允许明文

mTLS 模式对比

模式

行为

适用场景

DISABLE

不使用 mTLS

不推荐

PERMISSIVE

同时接受 mTLS 和明文

迁移期间

STRICT

仅接受 mTLS

生产环境

UNSET

继承父级设置

默认

24.4 AuthorizationPolicy

AuthorizationPolicy 决策流程

        flowchart TD
    req["入站请求到达 Envoy Sidecar"] --> customCheck{"存在 CUSTOM<br/>action 策略?"}

    customCheck -- "是" --> extAuthz["调用 ext_authz 外部服务"]
    extAuthz -- "拒绝" --> deny403["返回 403 Forbidden"]
    extAuthz -- "允许" --> denyCheck

    customCheck -- "否" --> denyCheck{"匹配任何 DENY<br/>策略规则?"}

    denyCheck -- "是" --> deny403
    denyCheck -- "否" --> allowCheck{"存在 ALLOW<br/>策略?"}

    allowCheck -- "不存在" --> passthrough["放行请求"]
    allowCheck -- "存在" --> matchAllow{"匹配任何 ALLOW<br/>策略规则?"}

    matchAllow -- "是" --> passthrough
    matchAllow -- "否" --> deny403

    passthrough --> upstream["转发到上游服务"]

    style deny403 fill:#fdd,stroke:#c00,color:#900
    style passthrough fill:#dfd,stroke:#0a0,color:#060
    style upstream fill:#ddf,stroke:#00a,color:#006
    

AuthorizationPolicy YAML 示例

# 只允许 frontend 访问 backend
apiVersion: security.istio.io/v1
kind: AuthorizationPolicy
metadata:
  name: backend-policy
  namespace: production
spec:
  selector:
    matchLabels:
      app: backend
  action: ALLOW
  rules:
    - from:
        - source:
            principals:
              - "cluster.local/ns/production/sa/frontend"
      to:
        - operation:
            methods: ["GET", "POST"]
            paths: ["/api/*"]

---
# 拒绝来自特定命名空间的请求
apiVersion: security.istio.io/v1
kind: AuthorizationPolicy
metadata:
  name: deny-untrusted
  namespace: production
spec:
  action: DENY
  rules:
    - from:
        - source:
            namespaces: ["untrusted"]

---
# 基于 JWT Claims 的授权
apiVersion: security.istio.io/v1
kind: AuthorizationPolicy
metadata:
  name: admin-only
  namespace: production
spec:
  selector:
    matchLabels:
      app: admin-panel
  action: ALLOW
  rules:
    - from:
        - source:
            requestPrincipals: ["https://auth.example.com/*"]
      when:
        - key: request.auth.claims[roles]
          values: ["admin"]

---
# 多条件组合授权:限制来源 + 方法 + 路径 + 请求头
apiVersion: security.istio.io/v1
kind: AuthorizationPolicy
metadata:
  name: fine-grained-policy
  namespace: production
spec:
  selector:
    matchLabels:
      app: order-service
  action: ALLOW
  rules:
    # 规则 1:内部服务可以读取订单
    - from:
        - source:
            principals:
              - "cluster.local/ns/production/sa/inventory-service"
              - "cluster.local/ns/production/sa/shipping-service"
      to:
        - operation:
            methods: ["GET"]
            paths: ["/api/v1/orders/*"]
    # 规则 2:只有 checkout-service 可以创建订单
    - from:
        - source:
            principals:
              - "cluster.local/ns/production/sa/checkout-service"
      to:
        - operation:
            methods: ["POST"]
            paths: ["/api/v1/orders"]
    # 规则 3:管理员可以执行任何操作(需要 JWT)
    - from:
        - source:
            requestPrincipals: ["https://auth.example.com/*"]
      to:
        - operation:
            paths: ["/api/v1/orders/*"]
      when:
        - key: request.auth.claims[roles]
          values: ["order-admin"]

24.5 RequestAuthentication

# JWT 验证配置
apiVersion: security.istio.io/v1
kind: RequestAuthentication
metadata:
  name: jwt-auth
  namespace: production
spec:
  selector:
    matchLabels:
      app: api-server
  jwtRules:
    - issuer: "https://auth.example.com"
      jwksUri: "https://auth.example.com/.well-known/jwks.json"
      audiences:
        - "my-api"
      forwardOriginalToken: true
      outputPayloadToHeader: "x-jwt-payload"

---
# 多 IdP JWT 验证(同时支持内部和外部 IdP)
apiVersion: security.istio.io/v1
kind: RequestAuthentication
metadata:
  name: multi-idp-jwt
  namespace: production
spec:
  selector:
    matchLabels:
      app: api-gateway
  jwtRules:
    # 内部 IdP(Keycloak)
    - issuer: "https://keycloak.internal.example.com/realms/main"
      jwksUri: "https://keycloak.internal.example.com/realms/main/protocol/openid-connect/certs"
      audiences:
        - "internal-api"
      forwardOriginalToken: true
    # 外部 IdP(Auth0)
    - issuer: "https://example.auth0.com/"
      jwksUri: "https://example.auth0.com/.well-known/jwks.json"
      audiences:
        - "https://api.example.com"
      forwardOriginalToken: true
      outputClaimToHeaders:
        - header: "x-user-id"
          claim: "sub"
        - header: "x-user-email"
          claim: "email"

24.6 Istio + SPIRE 集成

用 SPIRE 替换 Istio 内置的 Citadel CA:

apiVersion: install.istio.io/v1alpha1
kind: IstioOperator
spec:
  values:
    global:
      caAddress: "spire-server.spire-system:8081"
    pilot:
      env:
        PILOT_CERT_PROVIDER: spiffe
  meshConfig:
    trustDomain: "example.org"

优势:

  • 跨集群统一身份(SPIFFE 联邦)

  • 更灵活的 CA 管理(对接 Vault、AWS PCA)

  • 非 K8s 工作负载也能参与 Mesh

24.7 Envoy 外部授权(ext_authz)

# 将授权决策委托给外部服务(如 OPA)
apiVersion: networking.istio.io/v1alpha3
kind: EnvoyFilter
metadata:
  name: ext-authz
  namespace: istio-system
spec:
  configPatches:
    - applyTo: HTTP_FILTER
      match:
        context: SIDECAR_INBOUND
      patch:
        operation: INSERT_BEFORE
        value:
          name: envoy.filters.http.ext_authz
          typed_config:
            "@type": type.googleapis.com/envoy.extensions.filters.http.ext_authz.v3.ExtAuthz
            grpc_service:
              envoy_grpc:
                cluster_name: opa-authz
              timeout: 0.5s
            failure_mode_allow: false

Go 实现:Envoy ext_authz gRPC 服务

以下是一个完整的 Envoy 外部授权 gRPC 服务实现,支持基于路径、方法和 JWT claims 的授权决策:

package main

import (
	"context"
	"encoding/json"
	"fmt"
	"log"
	"net"
	"strings"

	corev3 "github.com/envoyproxy/go-control-plane/envoy/config/core/v3"
	authv3 "github.com/envoyproxy/go-control-plane/envoy/service/auth/v3"
	typev3 "github.com/envoyproxy/go-control-plane/envoy/type/v3"
	"google.golang.org/genproto/googleapis/rpc/status"
	"google.golang.org/grpc"
	"google.golang.org/grpc/codes"
)

// Policy 定义一条授权策略规则
type Policy struct {
	Name           string   `json:"name"`
	Principals     []string `json:"principals"`      // 允许的 SPIFFE ID
	Methods        []string `json:"methods"`          // 允许的 HTTP 方法
	PathPrefixes   []string `json:"path_prefixes"`    // 允许的路径前缀
	RequiredClaims map[string][]string `json:"required_claims"` // 需要的 JWT claims
}

// AuthServer 实现 Envoy ext_authz gRPC 接口
type AuthServer struct {
	policies []Policy
}

func NewAuthServer() *AuthServer {
	return &AuthServer{
		policies: []Policy{
			{
				Name:         "allow-frontend-read",
				Principals:   []string{"spiffe://cluster.local/ns/production/sa/frontend"},
				Methods:      []string{"GET"},
				PathPrefixes: []string{"/api/v1/products", "/api/v1/catalog"},
			},
			{
				Name:         "allow-checkout-write",
				Principals:   []string{"spiffe://cluster.local/ns/production/sa/checkout"},
				Methods:      []string{"GET", "POST", "PUT"},
				PathPrefixes: []string{"/api/v1/orders"},
			},
			{
				Name:         "allow-admin",
				Principals:   []string{}, // 任何身份
				Methods:      []string{"GET", "POST", "PUT", "DELETE"},
				PathPrefixes: []string{"/api/"},
				RequiredClaims: map[string][]string{
					"roles": {"admin", "super-admin"},
				},
			},
		},
	}
}

// Check 实现 envoy.service.auth.v3.Authorization/Check
func (s *AuthServer) Check(
	ctx context.Context,
	req *authv3.CheckRequest,
) (*authv3.CheckResponse, error) {
	httpReq := req.GetAttributes().GetRequest().GetHttp()
	method := httpReq.GetMethod()
	path := httpReq.GetPath()
	headers := httpReq.GetHeaders()

	// 提取调用方 SPIFFE ID(由 Envoy mTLS 填充)
	principal := headers["x-forwarded-client-cert"]
	spiffeID := extractSPIFFEID(principal)

	// 提取 JWT claims(由 RequestAuthentication 填充)
	claims := extractJWTClaims(headers["x-jwt-payload"])

	log.Printf("ext_authz check: method=%s path=%s spiffe_id=%s", method, path, spiffeID)

	// 评估策略
	for _, policy := range s.policies {
		if s.matchPolicy(policy, spiffeID, method, path, claims) {
			log.Printf("ext_authz ALLOW: matched policy %q", policy.Name)
			return &authv3.CheckResponse{
				Status: &status.Status{Code: int32(codes.OK)},
				HttpResponse: &authv3.CheckResponse_OkResponse{
					OkResponse: &authv3.OkHttpResponse{
						Headers: []*corev3.HeaderValueOption{
							{
								Header: &corev3.HeaderValue{
									Key:   "x-auth-policy",
									Value: policy.Name,
								},
							},
						},
					},
				},
			}, nil
		}
	}

	log.Printf("ext_authz DENY: no matching policy for %s %s (identity: %s)", method, path, spiffeID)
	return &authv3.CheckResponse{
		Status: &status.Status{Code: int32(codes.PermissionDenied)},
		HttpResponse: &authv3.CheckResponse_DeniedResponse{
			DeniedResponse: &authv3.DeniedHttpResponse{
				Status: &typev3.HttpStatus{Code: typev3.StatusCode_Forbidden},
				Body:   `{"error": "access denied by ext_authz policy"}`,
				Headers: []*corev3.HeaderValueOption{
					{
						Header: &corev3.HeaderValue{
							Key:   "content-type",
							Value: "application/json",
						},
					},
				},
			},
		},
	}, nil
}

func (s *AuthServer) matchPolicy(
	policy Policy, spiffeID, method, path string, claims map[string]interface{},
) bool {
	// 检查 principal
	if len(policy.Principals) > 0 {
		matched := false
		for _, p := range policy.Principals {
			if p == spiffeID {
				matched = true
				break
			}
		}
		if !matched {
			return false
		}
	}

	// 检查 HTTP 方法
	methodMatched := false
	for _, m := range policy.Methods {
		if strings.EqualFold(m, method) {
			methodMatched = true
			break
		}
	}
	if !methodMatched {
		return false
	}

	// 检查路径前缀
	pathMatched := false
	for _, prefix := range policy.PathPrefixes {
		if strings.HasPrefix(path, prefix) {
			pathMatched = true
			break
		}
	}
	if !pathMatched {
		return false
	}

	// 检查 JWT claims
	for claimKey, requiredValues := range policy.RequiredClaims {
		claimValue, ok := claims[claimKey]
		if !ok {
			return false
		}
		if !matchClaimValue(claimValue, requiredValues) {
			return false
		}
	}

	return true
}

func extractSPIFFEID(xfcc string) string {
	// X-Forwarded-Client-Cert 格式:
	// By=spiffe://...;Hash=...;Subject="";URI=spiffe://cluster.local/ns/prod/sa/frontend
	for _, part := range strings.Split(xfcc, ";") {
		part = strings.TrimSpace(part)
		if strings.HasPrefix(part, "URI=") {
			return strings.TrimPrefix(part, "URI=")
		}
	}
	return ""
}

func extractJWTClaims(payload string) map[string]interface{} {
	claims := make(map[string]interface{})
	if payload == "" {
		return claims
	}
	_ = json.Unmarshal([]byte(payload), &claims)
	return claims
}

func matchClaimValue(claimValue interface{}, required []string) bool {
	switch v := claimValue.(type) {
	case string:
		for _, r := range required {
			if v == r {
				return true
			}
		}
	case []interface{}:
		for _, item := range v {
			if str, ok := item.(string); ok {
				for _, r := range required {
					if str == r {
						return true
					}
				}
			}
		}
	}
	return false
}

func main() {
	lis, err := net.Listen("tcp", ":9191")
	if err != nil {
		log.Fatalf("failed to listen: %v", err)
	}

	grpcServer := grpc.NewServer()
	authv3.RegisterAuthorizationServer(grpcServer, NewAuthServer())

	fmt.Println("ext_authz gRPC server listening on :9191")
	if err := grpcServer.Serve(lis); err != nil {
		log.Fatalf("failed to serve: %v", err)
	}
}

24.8 Python 实现:AuthorizationPolicy 生成器

根据 OpenAPI spec 自动生成 Istio AuthorizationPolicy,避免手工编写大量 YAML:

#!/usr/bin/env python3
"""
Istio AuthorizationPolicy Generator

根据 OpenAPI (Swagger) 规范自动生成 Istio AuthorizationPolicy YAML。
从 OpenAPI 的 paths + security 定义中提取路径、方法和角色要求,
映射为 Istio 的 ALLOW/DENY 规则。

用法:
    python authz_policy_gen.py --spec openapi.yaml \
        --namespace production \
        --service-account backend \
        --output policies/
"""

import argparse
import sys
from dataclasses import dataclass, field
from pathlib import Path
from typing import Optional

import yaml


@dataclass
class EndpointRule:
    """表示一个 API 端点的授权规则"""
    path: str
    methods: list[str]
    required_roles: list[str] = field(default_factory=list)
    allowed_principals: list[str] = field(default_factory=list)
    public: bool = False


@dataclass
class PolicyConfig:
    """策略生成配置"""
    namespace: str
    app_label: str
    service_account: str
    trust_domain: str = "cluster.local"
    issuer: str = ""


class AuthzPolicyGenerator:
    """从 OpenAPI spec 生成 Istio AuthorizationPolicy"""

    def __init__(self, config: PolicyConfig):
        self.config = config

    def parse_openapi(self, spec_path: str) -> list[EndpointRule]:
        """解析 OpenAPI spec,提取端点和安全要求"""
        with open(spec_path, "r") as f:
            spec = yaml.safe_load(f)

        # 提取全局安全定义
        global_security = spec.get("security", [])
        security_schemes = (
            spec.get("components", {}).get("securitySchemes", {})
        )

        rules: list[EndpointRule] = []

        for path, path_item in spec.get("paths", {}).items():
            # 将 OpenAPI 路径参数转换为 Istio 通配符
            # /api/v1/users/{id} -> /api/v1/users/*
            istio_path = self._convert_path(path)

            for method in ["get", "post", "put", "delete", "patch"]:
                operation = path_item.get(method)
                if operation is None:
                    continue

                # 确定安全要求(操作级别覆盖全局级别)
                security = operation.get("security", global_security)

                rule = EndpointRule(
                    path=istio_path,
                    methods=[method.upper()],
                )

                if not security or security == [{}]:
                    rule.public = True
                else:
                    for sec_req in security:
                        for scheme_name, scopes in sec_req.items():
                            scheme = security_schemes.get(scheme_name, {})
                            # 从 x-required-roles 扩展提取角色
                            roles = operation.get(
                                "x-required-roles",
                                scheme.get("x-default-roles", []),
                            )
                            rule.required_roles.extend(roles)
                            # 从 x-allowed-principals 提取允许的服务
                            principals = operation.get(
                                "x-allowed-principals", []
                            )
                            rule.allowed_principals.extend(principals)

                rules.append(rule)

        return self._merge_rules(rules)

    def generate_policies(
        self, rules: list[EndpointRule]
    ) -> list[dict]:
        """生成 Istio AuthorizationPolicy 资源列表"""
        policies: list[dict] = []

        # 1. 公开端点策略(无需认证)
        public_rules = [r for r in rules if r.public]
        if public_rules:
            policies.append(self._build_public_policy(public_rules))

        # 2. 基于角色的策略(需要 JWT)
        role_rules = [r for r in rules if r.required_roles]
        if role_rules:
            policies.extend(self._build_role_policies(role_rules))

        # 3. 基于服务身份的策略(服务间调用)
        principal_rules = [r for r in rules if r.allowed_principals]
        if principal_rules:
            policies.append(
                self._build_principal_policy(principal_rules)
            )

        # 4. 默认拒绝策略
        policies.append(self._build_default_deny())

        return policies

    def _build_public_policy(
        self, rules: list[EndpointRule]
    ) -> dict:
        """构建公开端点的 ALLOW 策略"""
        istio_rules = []
        for rule in rules:
            istio_rules.append(
                {
                    "to": [
                        {
                            "operation": {
                                "methods": rule.methods,
                                "paths": [rule.path],
                            }
                        }
                    ]
                }
            )

        return {
            "apiVersion": "security.istio.io/v1",
            "kind": "AuthorizationPolicy",
            "metadata": {
                "name": f"{self.config.app_label}-public",
                "namespace": self.config.namespace,
                "labels": {
                    "generated-by": "authz-policy-gen",
                    "policy-type": "public",
                },
            },
            "spec": {
                "selector": {
                    "matchLabels": {"app": self.config.app_label}
                },
                "action": "ALLOW",
                "rules": istio_rules,
            },
        }

    def _build_role_policies(
        self, rules: list[EndpointRule]
    ) -> list[dict]:
        """按角色分组构建 ALLOW 策略"""
        from collections import defaultdict

        role_groups: dict[str, list[EndpointRule]] = defaultdict(list)
        for rule in rules:
            for role in rule.required_roles:
                role_groups[role].append(rule)

        policies = []
        for role, grouped_rules in role_groups.items():
            istio_rules = []
            for rule in grouped_rules:
                istio_rules.append(
                    {
                        "from": [
                            {
                                "source": {
                                    "requestPrincipals": ["*"],
                                }
                            }
                        ],
                        "to": [
                            {
                                "operation": {
                                    "methods": rule.methods,
                                    "paths": [rule.path],
                                }
                            }
                        ],
                        "when": [
                            {
                                "key": "request.auth.claims[roles]",
                                "values": [role],
                            }
                        ],
                    }
                )

            policies.append(
                {
                    "apiVersion": "security.istio.io/v1",
                    "kind": "AuthorizationPolicy",
                    "metadata": {
                        "name": (
                            f"{self.config.app_label}-role-{role}"
                        ),
                        "namespace": self.config.namespace,
                        "labels": {
                            "generated-by": "authz-policy-gen",
                            "policy-type": "role-based",
                            "role": role,
                        },
                    },
                    "spec": {
                        "selector": {
                            "matchLabels": {
                                "app": self.config.app_label
                            }
                        },
                        "action": "ALLOW",
                        "rules": istio_rules,
                    },
                }
            )

        return policies

    def _build_principal_policy(
        self, rules: list[EndpointRule]
    ) -> dict:
        """构建基于服务身份的 ALLOW 策略"""
        istio_rules = []
        for rule in rules:
            spiffe_ids = [
                (
                    f"{self.config.trust_domain}/ns/"
                    f"{self.config.namespace}/sa/{p}"
                )
                for p in rule.allowed_principals
            ]
            istio_rules.append(
                {
                    "from": [
                        {"source": {"principals": spiffe_ids}}
                    ],
                    "to": [
                        {
                            "operation": {
                                "methods": rule.methods,
                                "paths": [rule.path],
                            }
                        }
                    ],
                }
            )

        return {
            "apiVersion": "security.istio.io/v1",
            "kind": "AuthorizationPolicy",
            "metadata": {
                "name": f"{self.config.app_label}-service-to-service",
                "namespace": self.config.namespace,
                "labels": {
                    "generated-by": "authz-policy-gen",
                    "policy-type": "service-identity",
                },
            },
            "spec": {
                "selector": {
                    "matchLabels": {"app": self.config.app_label}
                },
                "action": "ALLOW",
                "rules": istio_rules,
            },
        }

    def _build_default_deny(self) -> dict:
        """构建默认拒绝策略(空 rules = 拒绝所有)"""
        return {
            "apiVersion": "security.istio.io/v1",
            "kind": "AuthorizationPolicy",
            "metadata": {
                "name": f"{self.config.app_label}-deny-all",
                "namespace": self.config.namespace,
                "labels": {
                    "generated-by": "authz-policy-gen",
                    "policy-type": "default-deny",
                },
            },
            "spec": {
                "selector": {
                    "matchLabels": {"app": self.config.app_label}
                },
                # 空 spec(无 rules)= 拒绝所有未被其他策略允许的请求
            },
        }

    @staticmethod
    def _convert_path(openapi_path: str) -> str:
        """将 OpenAPI 路径参数转换为 Istio 通配符"""
        import re
        # /api/v1/users/{id} -> /api/v1/users/*
        # /api/v1/users/{id}/orders/{orderId} -> /api/v1/users/*/orders/*
        return re.sub(r"\{[^}]+\}", "*", openapi_path)

    @staticmethod
    def _merge_rules(rules: list[EndpointRule]) -> list[EndpointRule]:
        """合并相同路径和安全要求的规则"""
        merged: dict[str, EndpointRule] = {}
        for rule in rules:
            key = (
                f"{rule.path}:"
                f"{','.join(sorted(rule.required_roles))}:"
                f"{','.join(sorted(rule.allowed_principals))}:"
                f"{rule.public}"
            )
            if key in merged:
                for m in rule.methods:
                    if m not in merged[key].methods:
                        merged[key].methods.append(m)
            else:
                merged[key] = rule
        return list(merged.values())


def main():
    parser = argparse.ArgumentParser(
        description="Generate Istio AuthorizationPolicy from OpenAPI spec"
    )
    parser.add_argument(
        "--spec", required=True, help="Path to OpenAPI spec (YAML)"
    )
    parser.add_argument(
        "--namespace", default="default", help="Target namespace"
    )
    parser.add_argument(
        "--app", required=True, help="App label for selector"
    )
    parser.add_argument(
        "--service-account", required=True, help="Service account name"
    )
    parser.add_argument(
        "--trust-domain",
        default="cluster.local",
        help="SPIFFE trust domain",
    )
    parser.add_argument(
        "--issuer", default="", help="JWT issuer URL"
    )
    parser.add_argument(
        "--output", default="-", help="Output directory or - for stdout"
    )
    args = parser.parse_args()

    config = PolicyConfig(
        namespace=args.namespace,
        app_label=args.app,
        service_account=args.service_account,
        trust_domain=args.trust_domain,
        issuer=args.issuer,
    )

    generator = AuthzPolicyGenerator(config)
    rules = generator.parse_openapi(args.spec)
    policies = generator.generate_policies(rules)

    if args.output == "-":
        for i, policy in enumerate(policies):
            if i > 0:
                print("---")
            yaml.dump(
                policy, sys.stdout, default_flow_style=False,
                allow_unicode=True,
            )
    else:
        output_dir = Path(args.output)
        output_dir.mkdir(parents=True, exist_ok=True)
        for policy in policies:
            name = policy["metadata"]["name"]
            filepath = output_dir / f"{name}.yaml"
            with open(filepath, "w") as f:
                yaml.dump(
                    policy, f, default_flow_style=False,
                    allow_unicode=True,
                )
            print(f"Generated: {filepath}")


if __name__ == "__main__":
    main()

使用示例:

# 从 OpenAPI spec 生成策略
python authz_policy_gen.py \
    --spec openapi.yaml \
    --namespace production \
    --app order-service \
    --service-account order-service \
    --output policies/

# 应用生成的策略
kubectl apply -f policies/

24.9 Java 实现:Spring Boot + Istio Sidecar 集成

在 Service Mesh 中运行的 Spring Boot 应用需要正确处理健康检查、优雅关闭和 header 传播,以确保 Istio sidecar 正常工作:

package com.example.meshapp;

import jakarta.servlet.FilterChain;
import jakarta.servlet.ServletException;
import jakarta.servlet.http.HttpServletRequest;
import jakarta.servlet.http.HttpServletResponse;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.MDC;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.web.client.RestTemplateBuilder;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.http.HttpRequest;
import org.springframework.http.client.*;
import org.springframework.stereotype.Component;
import org.springframework.web.bind.annotation.*;
import org.springframework.web.filter.OncePerRequestFilter;

import java.io.IOException;
import java.net.HttpURLConnection;
import java.net.URI;
import java.util.*;

/**
 * Spring Boot 应用 — 与 Istio Sidecar 深度集成示例
 *
 * 关键集成点:
 * 1. 健康检查:配合 Istio 的 sidecar 就绪探针
 * 2. 优雅关闭:等待 Envoy 排空连接后再停止
 * 3. Header 传播:透传 Istio 追踪和身份 header,保证分布式追踪链路完整
 * 4. 身份提取:从 x-forwarded-client-cert 提取调用方 SPIFFE ID
 */
@SpringBootApplication
public class MeshApplication {

    public static void main(String[] args) {
        SpringApplication.run(MeshApplication.class, args);
    }

    // ========== application.yml 推荐配置 ==========
    // server:
    //   port: 8080
    //   shutdown: graceful
    // spring:
    //   lifecycle:
    //     timeout-per-shutdown-phase: 30s
    // management:
    //   endpoints:
    //     web:
    //       exposure:
    //         include: health,info,prometheus
    //   endpoint:
    //     health:
    //       probes:
    //         enabled: true    # 启用 /actuator/health/liveness 和 /readiness
    //   health:
    //     readinessstate:
    //       enabled: true
    //     livenessstate:
    //       enabled: true

    // ========== Istio Header 传播 ==========

    /**
     * Istio 要求透传的 header 列表。
     * 这些 header 用于分布式追踪(Jaeger/Zipkin)和路由决策。
     * 如果不传播,跨服务的追踪链路会断裂。
     */
    static final List<String> ISTIO_PROPAGATION_HEADERS = List.of(
            "x-request-id",
            "x-b3-traceid",
            "x-b3-spanid",
            "x-b3-parentspanid",
            "x-b3-sampled",
            "x-b3-flags",
            "b3",
            "x-ot-span-context",
            "traceparent",           // W3C Trace Context
            "tracestate",
            "x-cloud-trace-context", // GCP
            "grpc-trace-bin"
        );

    /**
     * 入站过滤器:提取 Istio header 并存入 ThreadLocal,
     * 同时从 XFCC 中提取调用方身份。
     */
    @Component
    static class IstioHeaderCapture extends OncePerRequestFilter {

        private static final Logger log =
                LoggerFactory.getLogger(IstioHeaderCapture.class);

        @Override
        protected void doFilterInternal(
                HttpServletRequest request,
                HttpServletResponse response,
                FilterChain filterChain
        ) throws ServletException, IOException {
            // 捕获需要传播的 header
            Map<String, String> captured = new HashMap<>();
            for (String header : ISTIO_PROPAGATION_HEADERS) {
                String value = request.getHeader(header);
                if (value != null) {
                    captured.put(header, value);
                }
            }
            IstioHeaderContext.set(captured);

            // 提取调用方 SPIFFE ID 并放入 MDC(日志可见)
            String xfcc = request.getHeader("x-forwarded-client-cert");
            String callerId = extractSpiffeId(xfcc);
            if (callerId != null) {
                MDC.put("caller.spiffe.id", callerId);
                log.debug("Incoming request from: {}", callerId);
            }

            try {
                filterChain.doFilter(request, response);
            } finally {
                IstioHeaderContext.clear();
                MDC.remove("caller.spiffe.id");
            }
        }

        private String extractSpiffeId(String xfcc) {
            if (xfcc == null || xfcc.isEmpty()) return null;
            // 格式: By=spiffe://...;Hash=...;URI=spiffe://cluster.local/ns/prod/sa/frontend
            for (String part : xfcc.split(";")) {
                String trimmed = part.trim();
                if (trimmed.startsWith("URI=")) {
                    return trimmed.substring(4);
                }
            }
            return null;
        }
    }

    /**
     * ThreadLocal 存储当前请求的 Istio header,
     * 供出站 HTTP 调用时自动附加。
     */
    static class IstioHeaderContext {
        private static final ThreadLocal<Map<String, String>> HEADERS =
                new ThreadLocal<>();

        static void set(Map<String, String> headers) {
            HEADERS.set(headers);
        }

        static Map<String, String> get() {
            Map<String, String> h = HEADERS.get();
            return h != null ? h : Collections.emptyMap();
        }

        static void clear() {
            HEADERS.remove();
        }
    }

    /**
     * RestTemplate 拦截器:自动将 Istio header 附加到出站请求。
     */
    static class IstioPropagationInterceptor implements ClientHttpRequestInterceptor {
        @Override
        public ClientHttpResponse intercept(
                HttpRequest request, byte[] body,
                ClientHttpRequestExecution execution
        ) throws IOException {
            Map<String, String> headers = IstioHeaderContext.get();
            headers.forEach((key, value) ->
                    request.getHeaders().addIfAbsent(key, value));
            return execution.execute(request, body);
        }
    }

    /**
     * 配置 RestTemplate,自动传播 Istio header。
     */
    @Configuration
    static class RestTemplateConfig {
        @Bean
        public org.springframework.web.client.RestTemplate restTemplate(
                RestTemplateBuilder builder
        ) {
            return builder
                    .interceptors(new IstioPropagationInterceptor())
                    .build();
        }
    }

    // ========== 健康检查 ==========

    /**
     * 自定义健康检查:检测 Envoy sidecar 是否就绪。
     * Kubernetes 探针配置应指向 /actuator/health/readiness,
     * 此指示器确保 sidecar 就绪后应用才报告 ready。
     */
    @Component
    static class EnvoySidecarHealthIndicator
            implements org.springframework.boot.actuate.health.HealthIndicator {

        private static final Logger log =
                LoggerFactory.getLogger(EnvoySidecarHealthIndicator.class);

        @Override
        public org.springframework.boot.actuate.health.Health health() {
            try {
                // Envoy admin API 默认监听 15021
                var url = URI.create(
                        "http://localhost:15021/healthz/ready").toURL();
                HttpURLConnection conn =
                        (HttpURLConnection) url.openConnection();
                conn.setConnectTimeout(2000);
                conn.setReadTimeout(2000);
                int status = conn.getResponseCode();
                if (status == 200) {
                    return org.springframework.boot.actuate.health.Health
                            .up()
                            .withDetail("envoy", "ready")
                            .build();
                } else {
                    return org.springframework.boot.actuate.health.Health
                            .down()
                            .withDetail("envoy", "not ready: " + status)
                            .build();
                }
            } catch (Exception e) {
                log.warn("Envoy sidecar health check failed: {}",
                        e.getMessage());
                return org.springframework.boot.actuate.health.Health
                        .down()
                        .withDetail("envoy", "unreachable: " + e.getMessage())
                        .build();
            }
        }
    }

    // ========== 优雅关闭 ==========

    /**
     * 监听 Spring 关闭事件,通知 Envoy 排空连接。
     * 在 preStop hook 中调用 Envoy 的 /drain_listeners 端点,
     * 确保 in-flight 请求处理完毕后再停止。
     */
    @Component
    static class GracefulShutdownListener
            implements org.springframework.context.ApplicationListener<
            org.springframework.context.event.ContextClosedEvent> {

        private static final Logger log =
                LoggerFactory.getLogger(GracefulShutdownListener.class);

        @Override
        public void onApplicationEvent(
                org.springframework.context.event.ContextClosedEvent event
        ) {
            log.info("Application shutting down, notifying Envoy sidecar...");
            try {
                // 通知 Envoy 开始排空
                var url = URI.create(
                        "http://localhost:15000/drain_listeners?inboundonly"
                ).toURL();
                HttpURLConnection conn =
                        (HttpURLConnection) url.openConnection();
                conn.setRequestMethod("POST");
                conn.setConnectTimeout(2000);
                int status = conn.getResponseCode();
                log.info("Envoy drain response: {}", status);

                // 等待 in-flight 请求完成
                Thread.sleep(5000);
            } catch (Exception e) {
                log.warn("Failed to drain Envoy: {}", e.getMessage());
            }
        }
    }

    // ========== 示例 Controller ==========

    @RestController
    @RequestMapping("/api/v1")
    static class OrderController {

        private static final Logger log =
                LoggerFactory.getLogger(OrderController.class);

        private final org.springframework.web.client.RestTemplate restTemplate;

        OrderController(org.springframework.web.client.RestTemplate restTemplate) {
            this.restTemplate = restTemplate;
        }

        @GetMapping("/orders/{id}")
        public Map<String, Object> getOrder(
                @PathVariable String id,
                @RequestHeader(
                        value = "x-forwarded-client-cert",
                        required = false
                ) String xfcc
        ) {
            log.info("Get order {} (caller cert: {})", id, xfcc);

            // 调用下游服务时,Istio header 会自动传播
            String inventory = restTemplate.getForObject(
                    "http://inventory-service:8080/api/v1/stock/" + id,
                    String.class
            );

            return Map.of(
                    "orderId", id,
                    "status", "confirmed",
                    "inventory", Objects.requireNonNullElse(inventory, "unknown")
            );
        }
    }
}

对应的 Kubernetes 部署配置:

# Kubernetes Deployment — 配合 Istio sidecar 的最佳实践
apiVersion: apps/v1
kind: Deployment
metadata:
  name: order-service
  namespace: production
spec:
  replicas: 3
  selector:
    matchLabels:
      app: order-service
  template:
    metadata:
      labels:
        app: order-service
        version: v1
      annotations:
        sidecar.istio.io/inject: "true"
        # 让 Istio 重写探针,通过 sidecar 转发健康检查
        sidecar.istio.io/rewriteAppHTTPProbers: "true"
    spec:
      serviceAccountName: order-service
      terminationGracePeriodSeconds: 60
      containers:
        - name: order-service
          image: example.com/order-service:v1.2.0
          ports:
            - containerPort: 8080
              name: http
          livenessProbe:
            httpGet:
              path: /actuator/health/liveness
              port: 8080
            initialDelaySeconds: 15
            periodSeconds: 10
          readinessProbe:
            httpGet:
              path: /actuator/health/readiness
              port: 8080
            initialDelaySeconds: 10
            periodSeconds: 5
          lifecycle:
            preStop:
              exec:
                # 优雅关闭:先通知 Envoy 排空,再等待
                command:
                  - /bin/sh
                  - -c
                  - |
                    curl -s -X POST http://localhost:15000/drain_listeners?inboundonly
                    sleep 10
          resources:
            requests:
              cpu: 100m
              memory: 256Mi
            limits:
              cpu: 500m
              memory: 512Mi

24.10 Service Mesh 安全对比

Istio vs Linkerd vs Consul Connect

特性

Istio

Linkerd

Consul Connect

代理

Envoy (C++)

linkerd2-proxy (Rust)

Envoy / 内置代理

mTLS

✅ 自动 SPIFFE

✅ 自动

✅ 自动 (Consul CA)

证书管理

Citadel / SPIRE / 外部 CA

内置 CA (identity controller)

Consul CA / Vault

身份模型

SPIFFE ID

SPIFFE-like (ServiceAccount)

Consul Service Identity

授权策略

AuthorizationPolicy (丰富)

Server / HTTPRoute (基本)

Intention (Allow/Deny)

JWT 验证

✅ RequestAuthentication

❌ 需外部处理

❌ 需外部处理

外部授权

✅ ext_authz (OPA 等)

✅ policy controller

❌ 需自行集成

L7 策略粒度

方法 + 路径 + header + JWT claims

方法 + 路径

Intention (L4 为主)

多集群

✅ (复杂)

✅ (multi-cluster)

✅ (WAN Federation)

非 K8s 支持

✅ (VM workload)

❌ 仅 Kubernetes

✅ (原生多平台)

性能开销

中等 (~2-3ms p99)

低 (~1ms p99)

中等

资源消耗

高 (Envoy ~50MB/sidecar)

低 (~10MB/sidecar)

中等

复杂度

FIPS 140-2

✅ (BoringSSL)

✅ (rustls)

审计日志

✅ 访问日志 + Telemetry API

✅ tap

✅ Consul audit log

社区活跃度

非常高 (CNCF graduated)

高 (CNCF graduated)

高 (HashiCorp)

选型建议

场景

推荐

理由

需要丰富的 L7 安全策略

Istio

AuthorizationPolicy + JWT + ext_authz 最完整

追求低延迟和简单运维

Linkerd

Rust 代理性能优异,配置简单

混合环境 (K8s + VM + 多云)

Consul Connect

原生支持非 K8s 工作负载

已有 Envoy 投资

Istio

直接复用 Envoy 生态

小团队快速上手

Linkerd

学习曲线最低

24.11 mTLS 调试指南

常用 istioctl 命令

# ===== 检查 mTLS 状态 =====

# 查看整个 mesh 的 mTLS 状态
istioctl x describe pod <pod-name> -n <namespace>

# 检查两个服务之间是否使用 mTLS
istioctl authn tls-check <pod-name> <destination-service>

# 查看 PeerAuthentication 策略
istioctl x authz check <pod-name> -n <namespace>

# 查看生效的授权策略
istioctl experimental authz check <pod-name>

# ===== 代理状态检查 =====

# 查看 Envoy 代理同步状态
istioctl proxy-status

# 查看特定 pod 的 Envoy 配置
istioctl proxy-config cluster <pod-name> -n <namespace>
istioctl proxy-config listener <pod-name> -n <namespace>
istioctl proxy-config route <pod-name> -n <namespace>
istioctl proxy-config secret <pod-name> -n <namespace>

# 查看证书详情
istioctl proxy-config secret <pod-name> -n <namespace> -o json | \
  jq '.dynamicActiveSecrets[0].secret.tlsCertificate.certificateChain.inlineBytes' -r | \
  base64 -d | openssl x509 -text -noout

# ===== 日志和调试 =====

# 开启 Envoy 调试日志
istioctl proxy-config log <pod-name> --level rbac:debug,connection:debug

# 查看 Envoy 访问日志(确认 mTLS 状态)
kubectl logs <pod-name> -c istio-proxy | grep "downstream_peer_subject"

# 分析配置问题
istioctl analyze -n <namespace>
istioctl analyze --all-namespaces

使用 openssl 验证 mTLS

# ===== 从 sidecar 提取证书 =====

# 提取工作负载证书
kubectl exec <pod-name> -c istio-proxy -- \
  cat /var/run/secrets/workload-spiffe-credentials/cert-chain.pem > cert.pem

kubectl exec <pod-name> -c istio-proxy -- \
  cat /var/run/secrets/workload-spiffe-credentials/key.pem > key.pem

kubectl exec <pod-name> -c istio-proxy -- \
  cat /var/run/secrets/workload-spiffe-credentials/root-cert.pem > root-cert.pem

# ===== 验证证书内容 =====

# 查看证书详情(SPIFFE ID 在 SAN 中)
openssl x509 -in cert.pem -text -noout | grep -A1 "Subject Alternative Name"
# 输出: URI:spiffe://cluster.local/ns/production/sa/order-service

# 检查证书有效期
openssl x509 -in cert.pem -noout -dates
# notBefore=Mar 21 00:00:00 2026 GMT
# notAfter=Mar 22 00:00:00 2026 GMT  (默认24小时)

# 验证证书链
openssl verify -CAfile root-cert.pem cert.pem

# ===== 手动测试 mTLS 连接 =====

# 从 Pod 内部用 openssl 测试到目标服务的 mTLS
kubectl exec <pod-name> -c istio-proxy -- \
  openssl s_client \
    -connect <target-service>:80 \
    -cert /var/run/secrets/workload-spiffe-credentials/cert-chain.pem \
    -key /var/run/secrets/workload-spiffe-credentials/key.pem \
    -CAfile /var/run/secrets/workload-spiffe-credentials/root-cert.pem \
    -servername outbound_.80_._.<target-service>.<namespace>.svc.cluster.local

# ===== 常见问题排查 =====

# 问题:503 UC (Upstream Connection failure)
# 原因:目标服务要求 mTLS 但客户端未发送证书
# 排查:
istioctl proxy-config secret <client-pod> -n <namespace>
# 确认 ROOTCA 和 default 证书存在且未过期

# 问题:RBAC access denied
# 原因:AuthorizationPolicy 拒绝了请求
# 排查:
istioctl proxy-config log <server-pod> --level rbac:debug
kubectl logs <server-pod> -c istio-proxy | grep "rbac"
# 查看哪条策略拒绝了请求

# 问题:证书不被信任
# 原因:不同 namespace 使用了不同的 trust domain
# 排查:
openssl x509 -in cert.pem -text -noout | grep "URI:spiffe"
# 确认 trust domain 一致

24.12 渐进式 mTLS 迁移

        flowchart TD
    subgraph phase1["阶段 1:观察 (1-2周)"]
        p1a["部署 Istio,注入 Sidecar"]
        p1b["PeerAuthentication: PERMISSIVE"]
        p1c["观察流量,确认所有服务正常"]
        p1d["收集 mTLS 覆盖率指标"]
        p1a --> p1b --> p1c --> p1d
    end

    subgraph phase2["阶段 2:逐步启用 (2-4周)"]
        p2a["按命名空间逐步切换到 STRICT"]
        p2b["先从非关键服务开始"]
        p2c["监控错误率和延迟"]
        p2d["处理不兼容的服务"]
        p2a --> p2b --> p2c --> p2d
    end

    subgraph phase3["阶段 3:全面启用 (1-2周)"]
        p3a["全局 PeerAuthentication: STRICT"]
        p3b["部署 AuthorizationPolicy"]
        p3c["启用审计日志"]
        p3d["建立告警规则"]
        p3a --> p3b --> p3c --> p3d
    end

    subgraph phase4["阶段 4:持续优化"]
        p4a["细化授权策略"]
        p4b["集成 SPIRE(跨集群)"]
        p4c["添加 ext_authz(OPA)"]
        p4d["定期审查策略"]
        p4a --> p4b --> p4c --> p4d
    end

    phase1 --> phase2 --> phase3 --> phase4

    style phase1 fill:#e8f5e9,stroke:#4caf50
    style phase2 fill:#fff3e0,stroke:#ff9800
    style phase3 fill:#e3f2fd,stroke:#2196f3
    style phase4 fill:#f3e5f5,stroke:#9c27b0
    

24.13 小结

  • Service Mesh 在基础设施层实现零信任:自动 mTLS + 细粒度授权

  • Istio 提供最丰富的安全特性:PeerAuthentication、AuthorizationPolicy、RequestAuthentication

  • SPIRE 集成 实现跨集群、跨平台的统一工作负载身份

  • ext_authz 可以将授权决策委托给 OPA 等外部策略引擎

  • mTLS 迁移应该渐进式进行:PERMISSIVE → 逐步 STRICT → 全局 STRICT

  • Linkerd 适合追求简单和低延迟的场景,Consul Connect 适合混合环境

  • 掌握 istioctlopenssl 调试工具是排查 mTLS 问题的关键