# 第二十八章:云原生安全 > "云原生不是更安全,而是安全的方式不同了。" ```{mermaid} mindmap root((云原生安全)) 4C 模型 Cloud Cluster Container Code 容器安全 镜像扫描 运行时安全 镜像签名 SBOM K8s 安全 RBAC PSS Network Policy Admission Control 合规 SOC 2 ISO 27001 CIS Benchmarks ``` ## 28.1 4C 安全模型 ```{mermaid} flowchart TB subgraph Cloud["☁️ Cloud(云平台)"] direction TB CloudDesc["IAM · VPC · 加密 · 审计日志"] subgraph Cluster["🏗️ Cluster(集群)"] direction TB ClusterDesc["RBAC · PSS · Network Policy · Admission Control"] subgraph Container["📦 Container(容器)"] direction TB ContainerDesc["镜像扫描 · 最小基础镜像 · 非 root · 签名"] subgraph Code["💻 Code(代码)"] CodeDesc["SAST · SCA · 安全编码 · 密钥管理"] end end end end style Cloud fill:#1a237e,stroke:#333,color:#fff style Cluster fill:#283593,stroke:#333,color:#fff style Container fill:#3949ab,stroke:#333,color:#fff style Code fill:#5c6bc0,stroke:#333,color:#fff ``` 每一层的安全职责: | 层级 | 安全措施 | 工具 | |------|---------|------| | Cloud | IAM 最小权限、VPC 隔离、静态加密、审计日志 | AWS IAM, CloudTrail, VPC | | Cluster | RBAC、Pod Security Standards、Network Policy | K8s RBAC, OPA, Cilium | | Container | 镜像扫描、签名验证、非 root、只读文件系统 | Trivy, Cosign, Falco | | Code | SAST、SCA、密钥管理、安全依赖 | Semgrep, Snyk, Vault | ## 28.2 容器安全 ### 容器安全扫描流程 ```{mermaid} flowchart LR A["👨‍💻 开发者
提交代码"] --> B["🔨 CI 构建
Docker 镜像"] B --> C["🔍 镜像扫描
Trivy/Grype"] C --> D{"漏洞等级?"} D -->|"CRITICAL/HIGH"| E["❌ 构建失败
通知开发者"] E --> F["🔧 修复漏洞"] F --> A D -->|"MEDIUM/LOW"| G["⚠️ 记录告警
继续流水线"] D -->|"无漏洞"| H["✅ 通过"] G --> I["✍️ 镜像签名
Cosign"] H --> I I --> J["📦 推送到
私有仓库"] J --> K["🚀 部署到
K8s 集群"] K --> L["🛡️ Admission
验证签名"] style E fill:#F44336,stroke:#333,color:#fff style H fill:#4CAF50,stroke:#333,color:#fff style I fill:#FF9800,stroke:#333,color:#fff style L fill:#2196F3,stroke:#333,color:#fff ``` ### 镜像扫描 ```bash # Trivy — 容器镜像漏洞扫描 trivy image myapp:latest trivy image --severity HIGH,CRITICAL myapp:latest # Grype — 另一个流行的扫描工具 grype myapp:latest ``` ### 安全 Dockerfile — Python ```dockerfile # ✅ Python 安全最佳实践 Dockerfile # 多阶段构建 + 非 root + 最小镜像 # ── 阶段 1:构建依赖 ────────────────────────────────────────── FROM python:3.12-slim AS builder WORKDIR /build # 只复制依赖文件(利用 Docker 缓存) COPY requirements.txt . RUN pip install --no-cache-dir --prefix=/install -r requirements.txt # ── 阶段 2:运行时镜像 ──────────────────────────────────────── FROM python:3.12-slim # ✅ 安装安全更新 RUN apt-get update && \ apt-get upgrade -y && \ apt-get clean && \ rm -rf /var/lib/apt/lists/* WORKDIR /app # ✅ 创建非 root 用户 RUN groupadd -r appuser && \ useradd -r -g appuser -d /app -s /sbin/nologin appuser # ✅ 从构建阶段复制依赖(不包含构建工具) COPY --from=builder /install /usr/local COPY --chown=appuser:appuser . . # ✅ 以非 root 用户运行 USER appuser # ✅ 健康检查 HEALTHCHECK --interval=30s --timeout=3s --retries=3 \ CMD python -c "import urllib.request; urllib.request.urlopen('http://localhost:8000/health')" || exit 1 # ✅ 只暴露必要端口 EXPOSE 8000 # ✅ 使用 exec 形式(PID 1 信号处理) CMD ["python", "-m", "uvicorn", "main:app", "--host", "0.0.0.0", "--port", "8000"] # 运行时安全加固: # docker run --read-only --tmpfs /tmp --cap-drop ALL --security-opt no-new-privileges myapp:latest ``` ### 安全 Dockerfile — Java ```dockerfile # ✅ Java 安全最佳实践 Dockerfile # 多阶段构建 + JLink 最小 JRE + 非 root # ── 阶段 1:构建 ────────────────────────────────────────────── FROM eclipse-temurin:21-jdk-jammy AS builder WORKDIR /build # ✅ 只复制构建文件(利用缓存) COPY gradle/ gradle/ COPY gradlew build.gradle settings.gradle ./ RUN ./gradlew dependencies --no-daemon COPY src/ src/ RUN ./gradlew bootJar --no-daemon -x test # ✅ 使用 jlink 创建最小 JRE(减少攻击面) RUN jlink \ --add-modules java.base,java.logging,java.sql,java.naming,java.management,java.instrument,java.security.jgss,java.desktop \ --strip-debug \ --no-man-pages \ --no-header-files \ --compress=zip-6 \ --output /custom-jre # ── 阶段 2:运行时镜像 ──────────────────────────────────────── FROM debian:bookworm-slim # ✅ 安装安全更新 RUN apt-get update && \ apt-get upgrade -y && \ apt-get clean && \ rm -rf /var/lib/apt/lists/* # ✅ 复制最小 JRE COPY --from=builder /custom-jre /opt/java ENV PATH="/opt/java/bin:${PATH}" WORKDIR /app # ✅ 创建非 root 用户 RUN groupadd -r appuser && \ useradd -r -g appuser -d /app -s /sbin/nologin appuser # ✅ 复制应用 JAR COPY --from=builder --chown=appuser:appuser /build/build/libs/*.jar app.jar # ✅ 以非 root 用户运行 USER appuser # ✅ JVM 安全参数 ENV JAVA_OPTS="-XX:+UseContainerSupport \ -XX:MaxRAMPercentage=75.0 \ -Djava.security.egd=file:/dev/urandom \ -Dserver.port=8080" HEALTHCHECK --interval=30s --timeout=3s --retries=3 \ CMD java -cp app.jar com.example.HealthCheck || exit 1 EXPOSE 8080 ENTRYPOINT ["sh", "-c", "java $JAVA_OPTS -jar app.jar"] ``` ### 安全 Dockerfile — Go ```dockerfile # ✅ Go 安全最佳实践 Dockerfile # 多阶段构建 + scratch/distroless + 静态编译 # ── 阶段 1:构建 ────────────────────────────────────────────── FROM golang:1.22-bookworm AS builder WORKDIR /build # ✅ 只复制依赖文件(利用缓存) COPY go.mod go.sum ./ RUN go mod download && go mod verify COPY . . # ✅ 静态编译(CGO_ENABLED=0),禁用符号表和调试信息 RUN CGO_ENABLED=0 GOOS=linux GOARCH=amd64 go build \ -ldflags="-w -s -X main.version=$(git describe --tags --always)" \ -trimpath \ -o /app/server ./cmd/server # ── 阶段 2:运行时镜像(使用 distroless,极小攻击面)──────── FROM gcr.io/distroless/static-debian12:nonroot # ✅ 复制二进制文件 COPY --from=builder /app/server /server # ✅ 复制 CA 证书(用于 HTTPS 请求) COPY --from=builder /etc/ssl/certs/ca-certificates.crt /etc/ssl/certs/ # ✅ distroless:nonroot 默认以 UID 65532 运行 USER nonroot:nonroot EXPOSE 8080 ENTRYPOINT ["/server"] # 镜像大小对比: # golang:1.22 ~800MB # debian:slim ~80MB # distroless/static ~2MB ← 极小攻击面 # scratch ~0MB (但没有 CA 证书和时区数据) ``` ### 镜像签名(Cosign) ```bash # 签名镜像 cosign sign --key cosign.key myregistry.com/myapp:latest # 验证签名 cosign verify --key cosign.pub myregistry.com/myapp:latest ``` ### 供应链安全(SLSA)流程 ```{mermaid} flowchart TB subgraph Source["📝 源代码"] S1["代码审查"] S2["签名提交"] S3["分支保护"] end subgraph Build["🔨 构建"] B1["隔离构建环境"] B2["可重现构建"] B3["构建来源证明
(Provenance)"] end subgraph Verify["✅ 验证"] V1["SBOM 生成"] V2["漏洞扫描"] V3["签名验证"] end subgraph Deploy["🚀 部署"] D1["Admission Webhook
验证签名+来源"] D2["策略引擎
(OPA/Kyverno)"] D3["运行时监控
(Falco)"] end Source --> Build --> Verify --> Deploy style Source fill:#E8F5E9,stroke:#4CAF50 style Build fill:#E3F2FD,stroke:#2196F3 style Verify fill:#FFF3E0,stroke:#FF9800 style Deploy fill:#F3E5F5,stroke:#9C27B0 ``` **SLSA 级别**: | 级别 | 要求 | 说明 | |------|------|------| | SLSA 1 | 构建过程有文档 | 基本的构建来源记录 | | SLSA 2 | 使用托管构建服务 | 构建服务生成来源证明 | | SLSA 3 | 构建环境加固 | 隔离构建、不可篡改的来源证明 | | SLSA 4 | 双人审查 + 密封构建 | 最高安全级别 | ## 28.3 Kubernetes 安全 ### Pod Security 决策流程 ```{mermaid} flowchart TD A["Pod 创建请求"] --> B{"命名空间 PSS 级别?"} B -->|"privileged"| C["✅ 允许所有配置"] B -->|"baseline"| D{"检查基线策略"} B -->|"restricted"| E{"检查受限策略"} D --> D1{"hostNetwork?"} D1 -->|"true"| DENY["❌ 拒绝"] D1 -->|"false"| D2{"privileged?"} D2 -->|"true"| DENY D2 -->|"false"| D3{"hostPID/hostIPC?"} D3 -->|"true"| DENY D3 -->|"false"| ALLOW_B["✅ 允许"] E --> E1{"runAsNonRoot?"} E1 -->|"false/未设置"| DENY E1 -->|"true"| E2{"allowPrivilegeEscalation?"} E2 -->|"true/未设置"| DENY E2 -->|"false"| E3{"capabilities 只有
NET_BIND_SERVICE?"} E3 -->|"否"| DENY E3 -->|"是"| E4{"seccompProfile
= RuntimeDefault?"} E4 -->|"否"| DENY E4 -->|"是"| ALLOW_R["✅ 允许"] style DENY fill:#F44336,stroke:#333,color:#fff style ALLOW_B fill:#4CAF50,stroke:#333,color:#fff style ALLOW_R fill:#4CAF50,stroke:#333,color:#fff style A fill:#2196F3,stroke:#333,color:#fff ``` ### RBAC ```yaml # 最小权限 Role apiVersion: rbac.authorization.k8s.io/v1 kind: Role metadata: name: app-reader namespace: production rules: - apiGroups: [""] resources: ["pods", "services"] verbs: ["get", "list", "watch"] - apiGroups: ["apps"] resources: ["deployments"] verbs: ["get", "list"] --- apiVersion: rbac.authorization.k8s.io/v1 kind: RoleBinding metadata: name: app-reader-binding namespace: production subjects: - kind: ServiceAccount name: my-app namespace: production roleRef: kind: Role name: app-reader apiGroup: rbac.authorization.k8s.io ``` ### Pod Security Standards(PSS)— Restricted ```yaml # ── Namespace 级别强制 Restricted PSS ────────────────────────── apiVersion: v1 kind: Namespace metadata: name: production labels: pod-security.kubernetes.io/enforce: restricted pod-security.kubernetes.io/enforce-version: latest pod-security.kubernetes.io/audit: restricted pod-security.kubernetes.io/audit-version: latest pod-security.kubernetes.io/warn: restricted pod-security.kubernetes.io/warn-version: latest --- # ── 符合 Restricted PSS 的 Pod 示例 ─────────────────────────── apiVersion: v1 kind: Pod metadata: name: secure-app namespace: production spec: # ✅ 不使用 host 命名空间 hostNetwork: false hostPID: false hostIPC: false # ✅ Pod 级别安全上下文 securityContext: runAsNonRoot: true runAsUser: 65534 runAsGroup: 65534 fsGroup: 65534 seccompProfile: type: RuntimeDefault containers: - name: app image: myregistry.com/myapp:v1.2.3 # ✅ 不使用 latest ports: - containerPort: 8080 # ✅ 容器级别安全上下文 securityContext: allowPrivilegeEscalation: false readOnlyRootFilesystem: true runAsNonRoot: true runAsUser: 65534 capabilities: drop: - ALL seccompProfile: type: RuntimeDefault # ✅ 资源限制(防止资源耗尽攻击) resources: requests: cpu: 100m memory: 128Mi limits: cpu: 500m memory: 512Mi # ✅ 健康检查 livenessProbe: httpGet: path: /health port: 8080 initialDelaySeconds: 10 periodSeconds: 30 readinessProbe: httpGet: path: /ready port: 8080 initialDelaySeconds: 5 periodSeconds: 10 # ✅ 临时目录(只读根文件系统需要) volumeMounts: - name: tmp mountPath: /tmp volumes: - name: tmp emptyDir: sizeLimit: 100Mi # ✅ 服务账户(最小权限) serviceAccountName: my-app automountServiceAccountToken: false # 不需要时禁用 ``` ### NetworkPolicy 示例 ```yaml # ── 默认拒绝所有入站流量 ────────────────────────────────────── apiVersion: networking.k8s.io/v1 kind: NetworkPolicy metadata: name: default-deny-ingress namespace: production spec: podSelector: {} # 匹配所有 Pod policyTypes: - Ingress --- # ── 默认拒绝所有出站流量 ────────────────────────────────────── apiVersion: networking.k8s.io/v1 kind: NetworkPolicy metadata: name: default-deny-egress namespace: production spec: podSelector: {} policyTypes: - Egress --- # ── 允许 frontend → backend 的流量 ──────────────────────────── apiVersion: networking.k8s.io/v1 kind: NetworkPolicy metadata: name: allow-frontend-to-backend namespace: production spec: podSelector: matchLabels: app: backend policyTypes: - Ingress ingress: - from: - podSelector: matchLabels: app: frontend ports: - protocol: TCP port: 8080 --- # ── 允许 backend → database 的流量 ──────────────────────────── apiVersion: networking.k8s.io/v1 kind: NetworkPolicy metadata: name: allow-backend-to-db namespace: production spec: podSelector: matchLabels: app: backend policyTypes: - Egress egress: - to: - podSelector: matchLabels: app: database ports: - protocol: TCP port: 5432 # ✅ 允许 DNS 查询 - to: - namespaceSelector: {} podSelector: matchLabels: k8s-app: kube-dns ports: - protocol: UDP port: 53 ``` ### Admission Control(OPA Gatekeeper) ```yaml # ── ConstraintTemplate:禁止使用指定镜像标签 ────────────────── apiVersion: templates.gatekeeper.sh/v1 kind: ConstraintTemplate metadata: name: k8sdisallowedtags spec: crd: spec: names: kind: K8sDisallowedTags validation: openAPIV3Schema: type: object properties: tags: type: array description: "禁止使用的镜像标签列表" items: type: string exemptImages: type: array description: "豁免的镜像列表(支持通配符)" items: type: string targets: - target: admission.k8s.gatekeeper.sh rego: | package k8sdisallowedtags # 检查所有容器(包括 init 容器) violation[{"msg": msg}] { container := input_containers[_] tags := {t | t := input.parameters.tags[_]} tag := get_tag(container.image) tags[tag] not is_exempt(container.image) msg := sprintf( "容器 '%s' 使用了禁止的镜像标签 '%s'(镜像: %s)", [container.name, tag, container.image] ) } # 没有标签等同于 latest violation[{"msg": msg}] { container := input_containers[_] tags := {t | t := input.parameters.tags[_]} not contains(container.image, ":") tags["latest"] not is_exempt(container.image) msg := sprintf( "容器 '%s' 未指定镜像标签(等同于 latest,镜像: %s)", [container.name, container.image] ) } input_containers[c] { c := input.review.object.spec.containers[_] } input_containers[c] { c := input.review.object.spec.initContainers[_] } get_tag(image) = tag { parts := split(image, ":") count(parts) == 2 tag := parts[1] } is_exempt(image) { exempt := input.parameters.exemptImages[_] glob.match(exempt, [], image) } --- # ── Constraint:禁止使用 latest 标签 ────────────────────────── apiVersion: constraints.gatekeeper.sh/v1beta1 kind: K8sDisallowedTags metadata: name: no-latest-tag spec: enforcementAction: deny match: kinds: - apiGroups: [""] kinds: ["Pod"] namespaces: - production - staging parameters: tags: ["latest"] exemptImages: - "gcr.io/distroless/*" # 豁免 distroless 镜像 --- # ── ConstraintTemplate:要求资源限制 ────────────────────────── apiVersion: templates.gatekeeper.sh/v1 kind: ConstraintTemplate metadata: name: k8srequiredresources spec: crd: spec: names: kind: K8sRequiredResources targets: - target: admission.k8s.gatekeeper.sh rego: | package k8srequiredresources violation[{"msg": msg}] { container := input.review.object.spec.containers[_] not container.resources.limits msg := sprintf( "容器 '%s' 必须设置资源限制 (resources.limits)", [container.name] ) } violation[{"msg": msg}] { container := input.review.object.spec.containers[_] not container.resources.requests msg := sprintf( "容器 '%s' 必须设置资源请求 (resources.requests)", [container.name] ) } --- # ── Constraint:要求所有容器设置资源限制 ────────────────────── apiVersion: constraints.gatekeeper.sh/v1beta1 kind: K8sRequiredResources metadata: name: require-resource-limits spec: enforcementAction: deny match: kinds: - apiGroups: [""] kinds: ["Pod"] namespaces: - production ``` ### Go: 容器镜像签名验证(Cosign/Sigstore) ```go /* 容器镜像签名验证 — 使用 cosign/sigstore 库 依赖: go get github.com/sigstore/cosign/v2 go get github.com/sigstore/sigstore/pkg/signature */ package main import ( "context" "crypto" "encoding/json" "fmt" "log" "os" "github.com/google/go-containerregistry/pkg/name" "github.com/sigstore/cosign/v2/cmd/cosign/cli/fulcio" "github.com/sigstore/cosign/v2/cmd/cosign/cli/options" "github.com/sigstore/cosign/v2/cmd/cosign/cli/verify" "github.com/sigstore/cosign/v2/pkg/cosign" ociremote "github.com/sigstore/cosign/v2/pkg/oci/remote" "github.com/sigstore/sigstore/pkg/signature" ) // ImageVerifier 镜像签名验证器 type ImageVerifier struct { publicKeyPath string } // NewImageVerifier 创建镜像验证器 func NewImageVerifier(publicKeyPath string) *ImageVerifier { return &ImageVerifier{publicKeyPath: publicKeyPath} } // VerifyImage 验证镜像签名 func (iv *ImageVerifier) VerifyImage(imageRef string) (*VerificationResult, error) { ctx := context.Background() // 解析镜像引用 ref, err := name.ParseReference(imageRef) if err != nil { return nil, fmt.Errorf("无效的镜像引用 '%s': %w", imageRef, err) } // 加载公钥 pubKey, err := signature.LoadPublicKeyRaw(ctx, iv.publicKeyPath, crypto.SHA256) if err != nil { return nil, fmt.Errorf("加载公钥失败: %w", err) } // 配置验证选项 checkOpts := &cosign.CheckOpts{ SigVerifier: pubKey, } // 执行验证 signatures, bundleVerified, err := cosign.VerifyImageSignatures(ctx, ref, checkOpts) if err != nil { return &VerificationResult{ Image: imageRef, Verified: false, Error: err.Error(), }, nil } result := &VerificationResult{ Image: imageRef, Verified: true, BundleVerified: bundleVerified, SignatureCount: len(signatures), } // 提取签名负载信息 for _, sig := range signatures { payload, err := sig.Payload() if err == nil { result.Payloads = append(result.Payloads, string(payload)) } } log.Printf("✅ 镜像签名验证通过: %s (%d 个签名)", imageRef, len(signatures)) return result, nil } // VerificationResult 验证结果 type VerificationResult struct { Image string `json:"image"` Verified bool `json:"verified"` BundleVerified bool `json:"bundle_verified"` SignatureCount int `json:"signature_count"` Payloads []string `json:"payloads,omitempty"` Error string `json:"error,omitempty"` } // VerifyKeyless 使用 Keyless(Fulcio + Rekor)验证签名 func VerifyKeyless(imageRef, expectedIssuer, expectedSubject string) (*VerificationResult, error) { ctx := context.Background() ref, err := name.ParseReference(imageRef) if err != nil { return nil, fmt.Errorf("无效的镜像引用: %w", err) } // Keyless 验证:使用 Fulcio CA + Rekor 透明日志 checkOpts := &cosign.CheckOpts{ Identities: []cosign.Identity{ { Issuer: expectedIssuer, // 如 "https://accounts.google.com" Subject: expectedSubject, // 如 "user@example.com" }, }, } signatures, bundleVerified, err := cosign.VerifyImageSignatures(ctx, ref, checkOpts) if err != nil { return &VerificationResult{ Image: imageRef, Verified: false, Error: err.Error(), }, nil } return &VerificationResult{ Image: imageRef, Verified: true, BundleVerified: bundleVerified, SignatureCount: len(signatures), }, nil } func main() { // 使用公钥验证 verifier := NewImageVerifier("cosign.pub") result, err := verifier.VerifyImage("myregistry.com/myapp:v1.2.3") if err != nil { log.Fatalf("验证失败: %v", err) } output, _ := json.MarshalIndent(result, "", " ") fmt.Println(string(output)) if !result.Verified { fmt.Println("❌ 镜像签名验证失败,拒绝部署!") os.Exit(1) } fmt.Println("✅ 镜像签名验证通过,允许部署") } ``` ### Go: Admission Webhook 实现 ```go /* Kubernetes Admission Webhook — 验证 Pod 安全策略 依赖: go get k8s.io/api/admission/v1 go get k8s.io/apimachinery/pkg/runtime */ package main import ( "encoding/json" "fmt" "io" "log" "net/http" "os" "strings" admissionv1 "k8s.io/api/admission/v1" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime/serializer" ) var ( scheme = runtime.NewScheme() codecs = serializer.NewCodecFactory(scheme) ) // SecurityPolicy 安全策略配置 type SecurityPolicy struct { // 禁止使用的镜像标签 DisallowedTags []string `json:"disallowed_tags"` // 允许的镜像仓库前缀 AllowedRegistries []string `json:"allowed_registries"` // 是否要求非 root 运行 RequireNonRoot bool `json:"require_non_root"` // 是否要求资源限制 RequireResourceLimits bool `json:"require_resource_limits"` // 是否要求只读根文件系统 RequireReadOnlyRoot bool `json:"require_readonly_root"` } // WebhookServer Admission Webhook 服务器 type WebhookServer struct { policy SecurityPolicy } // NewWebhookServer 创建 Webhook 服务器 func NewWebhookServer(policy SecurityPolicy) *WebhookServer { return &WebhookServer{policy: policy} } // handleValidate 处理验证请求 func (ws *WebhookServer) handleValidate(w http.ResponseWriter, r *http.Request) { // 读取请求体 body, err := io.ReadAll(r.Body) if err != nil { http.Error(w, "读取请求失败", http.StatusBadRequest) return } // 解析 AdmissionReview var admissionReview admissionv1.AdmissionReview if err := json.Unmarshal(body, &admissionReview); err != nil { http.Error(w, "解析请求失败", http.StatusBadRequest) return } // 验证 Pod response := ws.validatePod(admissionReview.Request) // 构建响应 admissionReview.Response = response admissionReview.Response.UID = admissionReview.Request.UID respBytes, err := json.Marshal(admissionReview) if err != nil { http.Error(w, "序列化响应失败", http.StatusInternalServerError) return } w.Header().Set("Content-Type", "application/json") w.Write(respBytes) } // validatePod 验证 Pod 安全策略 func (ws *WebhookServer) validatePod(req *admissionv1.AdmissionRequest) *admissionv1.AdmissionResponse { // 解析 Pod var pod corev1.Pod if err := json.Unmarshal(req.Object.Raw, &pod); err != nil { return &admissionv1.AdmissionResponse{ Allowed: false, Result: &metav1.Status{ Message: fmt.Sprintf("解析 Pod 失败: %v", err), }, } } var violations []string // 检查所有容器 allContainers := append(pod.Spec.Containers, pod.Spec.InitContainers...) for _, container := range allContainers { // 检查镜像标签 violations = append(violations, ws.checkImageTag(container)...) // 检查镜像仓库 violations = append(violations, ws.checkRegistry(container)...) // 检查资源限制 if ws.policy.RequireResourceLimits { violations = append(violations, ws.checkResourceLimits(container)...) } // 检查安全上下文 violations = append(violations, ws.checkSecurityContext(container)...) } if len(violations) > 0 { log.Printf("❌ Pod %s/%s 被拒绝: %v", pod.Namespace, pod.Name, violations) return &admissionv1.AdmissionResponse{ Allowed: false, Result: &metav1.Status{ Message: fmt.Sprintf("安全策略违规:\n- %s", strings.Join(violations, "\n- ")), }, } } log.Printf("✅ Pod %s/%s 验证通过", pod.Namespace, pod.Name) return &admissionv1.AdmissionResponse{Allowed: true} } func (ws *WebhookServer) checkImageTag(c corev1.Container) []string { var violations []string for _, tag := range ws.policy.DisallowedTags { if strings.HasSuffix(c.Image, ":"+tag) { violations = append(violations, fmt.Sprintf("容器 '%s' 使用了禁止的标签 '%s'", c.Name, tag)) } } // 没有标签等同于 latest if !strings.Contains(c.Image, ":") { for _, tag := range ws.policy.DisallowedTags { if tag == "latest" { violations = append(violations, fmt.Sprintf("容器 '%s' 未指定标签(等同于 latest)", c.Name)) } } } return violations } func (ws *WebhookServer) checkRegistry(c corev1.Container) []string { if len(ws.policy.AllowedRegistries) == 0 { return nil } for _, reg := range ws.policy.AllowedRegistries { if strings.HasPrefix(c.Image, reg) { return nil } } return []string{ fmt.Sprintf("容器 '%s' 的镜像 '%s' 不在允许的仓库列表中", c.Name, c.Image), } } func (ws *WebhookServer) checkResourceLimits(c corev1.Container) []string { var violations []string if c.Resources.Limits == nil { violations = append(violations, fmt.Sprintf("容器 '%s' 必须设置资源限制", c.Name)) } if c.Resources.Requests == nil { violations = append(violations, fmt.Sprintf("容器 '%s' 必须设置资源请求", c.Name)) } return violations } func (ws *WebhookServer) checkSecurityContext(c corev1.Container) []string { var violations []string sc := c.SecurityContext if ws.policy.RequireNonRoot { if sc == nil || sc.RunAsNonRoot == nil || !*sc.RunAsNonRoot { violations = append(violations, fmt.Sprintf("容器 '%s' 必须设置 runAsNonRoot: true", c.Name)) } } if ws.policy.RequireReadOnlyRoot { if sc == nil || sc.ReadOnlyRootFilesystem == nil || !*sc.ReadOnlyRootFilesystem { violations = append(violations, fmt.Sprintf("容器 '%s' 必须设置 readOnlyRootFilesystem: true", c.Name)) } } return violations } // handleHealth 健康检查 func (ws *WebhookServer) handleHealth(w http.ResponseWriter, r *http.Request) { w.WriteHeader(http.StatusOK) w.Write([]byte("ok")) } func main() { policy := SecurityPolicy{ DisallowedTags: []string{"latest", "dev", "test"}, AllowedRegistries: []string{"myregistry.com/", "gcr.io/distroless/"}, RequireNonRoot: true, RequireResourceLimits: true, RequireReadOnlyRoot: true, } server := NewWebhookServer(policy) mux := http.NewServeMux() mux.HandleFunc("/validate", server.handleValidate) mux.HandleFunc("/health", server.handleHealth) // TLS 证书路径(K8s 会自动注入) certFile := os.Getenv("TLS_CERT_FILE") keyFile := os.Getenv("TLS_KEY_FILE") if certFile == "" { certFile = "/etc/webhook/certs/tls.crt" } if keyFile == "" { keyFile = "/etc/webhook/certs/tls.key" } addr := ":8443" log.Printf("🚀 Admission Webhook 启动: %s", addr) log.Printf("📋 策略: %+v", policy) if err := http.ListenAndServeTLS(addr, certFile, keyFile, mux); err != nil { log.Fatalf("服务器启动失败: %v", err) } } ``` ### Python: Trivy 扫描结果解析器 ```python """ Trivy 扫描结果解析器 — 解析 JSON 输出,生成安全报告 依赖: pip install tabulate 使用: trivy image --format json -o results.json myapp:latest """ import json import subprocess import sys import logging from dataclasses import dataclass, field from typing import Optional from collections import Counter logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) @dataclass class Vulnerability: """漏洞信息""" vuln_id: str pkg_name: str installed_version: str fixed_version: str severity: str title: str description: str = "" references: list = field(default_factory=list) @dataclass class ScanResult: """扫描结果""" target: str target_type: str vulnerabilities: list = field(default_factory=list) class TrivyScanner: """Trivy 扫描结果解析器""" SEVERITY_ORDER = {"CRITICAL": 0, "HIGH": 1, "MEDIUM": 2, "LOW": 3, "UNKNOWN": 4} def __init__(self): self.results: list[ScanResult] = [] self.all_vulns: list[Vulnerability] = [] def scan_image(self, image: str, severity: str = "CRITICAL,HIGH,MEDIUM,LOW") -> dict: """ 扫描容器镜像并返回 JSON 结果 Args: image: 镜像名称(如 "myapp:latest") severity: 漏洞严重级别过滤 Returns: Trivy JSON 输出 """ try: cmd = [ "trivy", "image", "--format", "json", "--severity", severity, "--quiet", image, ] logger.info("扫描镜像: %s", image) result = subprocess.run( cmd, capture_output=True, text=True, timeout=300 ) if result.returncode != 0 and not result.stdout: logger.error("Trivy 扫描失败: %s", result.stderr) raise RuntimeError(f"Trivy scan failed: {result.stderr}") return json.loads(result.stdout) except FileNotFoundError: logger.error("Trivy 未安装,请运行: brew install trivy") raise except subprocess.TimeoutExpired: logger.error("Trivy 扫描超时") raise def parse_results(self, trivy_output: dict) -> list[ScanResult]: """ 解析 Trivy JSON 输出 Args: trivy_output: Trivy JSON 输出字典 Returns: 解析后的扫描结果列表 """ self.results = [] self.all_vulns = [] results = trivy_output.get("Results", []) for result in results: scan_result = ScanResult( target=result.get("Target", "unknown"), target_type=result.get("Type", "unknown"), ) for vuln in result.get("Vulnerabilities", []): v = Vulnerability( vuln_id=vuln.get("VulnerabilityID", ""), pkg_name=vuln.get("PkgName", ""), installed_version=vuln.get("InstalledVersion", ""), fixed_version=vuln.get("FixedVersion", "N/A"), severity=vuln.get("Severity", "UNKNOWN"), title=vuln.get("Title", ""), description=vuln.get("Description", ""), references=vuln.get("References", []), ) scan_result.vulnerabilities.append(v) self.all_vulns.append(v) self.results.append(scan_result) logger.info("解析完成: %d 个目标, %d 个漏洞", len(self.results), len(self.all_vulns)) return self.results def get_summary(self) -> dict: """获取漏洞摘要统计""" severity_count = Counter(v.severity for v in self.all_vulns) fixable = sum(1 for v in self.all_vulns if v.fixed_version != "N/A") return { "total": len(self.all_vulns), "critical": severity_count.get("CRITICAL", 0), "high": severity_count.get("HIGH", 0), "medium": severity_count.get("MEDIUM", 0), "low": severity_count.get("LOW", 0), "fixable": fixable, "unfixable": len(self.all_vulns) - fixable, } def get_critical_vulns(self) -> list[Vulnerability]: """获取所有 CRITICAL 和 HIGH 漏洞""" return sorted( [v for v in self.all_vulns if v.severity in ("CRITICAL", "HIGH")], key=lambda v: self.SEVERITY_ORDER.get(v.severity, 99), ) def generate_report(self) -> str: """生成文本格式的安全报告""" summary = self.get_summary() lines = [ "=" * 70, "容器镜像安全扫描报告", "=" * 70, "", f"漏洞总数: {summary['total']}", f" CRITICAL: {summary['critical']}", f" HIGH: {summary['high']}", f" MEDIUM: {summary['medium']}", f" LOW: {summary['low']}", f" 可修复: {summary['fixable']}", f" 不可修复: {summary['unfixable']}", "", ] critical_vulns = self.get_critical_vulns() if critical_vulns: lines.append("-" * 70) lines.append("⚠️ CRITICAL / HIGH 漏洞详情:") lines.append("-" * 70) for v in critical_vulns: lines.append(f"\n[{v.severity}] {v.vuln_id}") lines.append(f" 包: {v.pkg_name} ({v.installed_version})") lines.append(f" 修复版本: {v.fixed_version}") lines.append(f" 标题: {v.title}") lines.append("") lines.append("=" * 70) return "\n".join(lines) def should_fail_build(self, max_critical: int = 0, max_high: int = 0) -> bool: """ 判断是否应该使构建失败 Args: max_critical: 允许的最大 CRITICAL 漏洞数 max_high: 允许的最大 HIGH 漏洞数 Returns: True 表示应该失败 """ summary = self.get_summary() if summary["critical"] > max_critical: logger.error("CRITICAL 漏洞数 (%d) 超过阈值 (%d)", summary["critical"], max_critical) return True if summary["high"] > max_high: logger.error("HIGH 漏洞数 (%d) 超过阈值 (%d)", summary["high"], max_high) return True return False # ── 使用示例 ──────────────────────────────────────────────────── if __name__ == "__main__": scanner = TrivyScanner() # 方式 1: 直接扫描镜像 image = sys.argv[1] if len(sys.argv) > 1 else "python:3.12-slim" try: output = scanner.scan_image(image) scanner.parse_results(output) except Exception as e: logger.error("扫描失败: %s", e) sys.exit(1) # 生成报告 report = scanner.generate_report() print(report) # CI/CD 集成:检查是否应该失败 if scanner.should_fail_build(max_critical=0, max_high=5): print("\n❌ 构建失败:漏洞数超过阈值") sys.exit(1) else: print("\n✅ 安全检查通过") ``` ### Python: SBOM 生成与分析 ```python """ SBOM (Software Bill of Materials) 生成与分析 依赖: pip install packageurl-python 工具: syft (生成 SBOM), grype (漏洞扫描) """ import json import subprocess import logging from dataclasses import dataclass, field from typing import Optional from collections import Counter logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) @dataclass class Component: """SBOM 组件""" name: str version: str type: str # library, framework, application purl: str = "" # Package URL licenses: list = field(default_factory=list) supplier: str = "" class SBOMManager: """SBOM 生成与分析管理器""" def __init__(self): self.components: list[Component] = [] def generate_sbom(self, target: str, output_format: str = "cyclonedx-json", output_file: Optional[str] = None) -> dict: """ 使用 Syft 生成 SBOM Args: target: 扫描目标(镜像名、目录路径等) output_format: 输出格式(cyclonedx-json, spdx-json, syft-json) output_file: 输出文件路径(可选) Returns: SBOM JSON 数据 """ try: cmd = ["syft", target, "-o", output_format, "--quiet"] logger.info("生成 SBOM: %s (格式: %s)", target, output_format) result = subprocess.run( cmd, capture_output=True, text=True, timeout=300 ) if result.returncode != 0: logger.error("Syft 执行失败: %s", result.stderr) raise RuntimeError(f"Syft failed: {result.stderr}") sbom_data = json.loads(result.stdout) if output_file: with open(output_file, 'w') as f: json.dump(sbom_data, f, indent=2) logger.info("SBOM 已保存: %s", output_file) return sbom_data except FileNotFoundError: logger.error("Syft 未安装,请运行: brew install syft") raise def parse_cyclonedx(self, sbom: dict) -> list[Component]: """ 解析 CycloneDX 格式的 SBOM Args: sbom: CycloneDX JSON 数据 Returns: 组件列表 """ self.components = [] for comp in sbom.get("components", []): component = Component( name=comp.get("name", ""), version=comp.get("version", ""), type=comp.get("type", "library"), purl=comp.get("purl", ""), licenses=[ lic.get("license", {}).get("id", "unknown") for lic in comp.get("licenses", []) ], supplier=comp.get("supplier", {}).get("name", ""), ) self.components.append(component) logger.info("SBOM 解析完成: %d 个组件", len(self.components)) return self.components def analyze_licenses(self) -> dict: """分析许可证分布""" license_count = Counter() for comp in self.components: for lic in comp.licenses: license_count[lic] += 1 if not comp.licenses: license_count["UNKNOWN"] += 1 return dict(license_count.most_common()) def find_risky_licenses(self, risky_licenses: list = None) -> list[Component]: """ 查找使用高风险许可证的组件 Args: risky_licenses: 高风险许可证列表 Returns: 使用高风险许可证的组件列表 """ if risky_licenses is None: risky_licenses = [ "GPL-2.0-only", "GPL-3.0-only", "AGPL-3.0-only", "SSPL-1.0", "EUPL-1.2", "CPAL-1.0", ] risky = [] for comp in self.components: for lic in comp.licenses: if lic in risky_licenses: risky.append(comp) break if risky: logger.warning("发现 %d 个使用高风险许可证的组件", len(risky)) return risky def get_statistics(self) -> dict: """获取 SBOM 统计信息""" type_count = Counter(c.type for c in self.components) return { "total_components": len(self.components), "by_type": dict(type_count), "licenses": self.analyze_licenses(), "with_purl": sum(1 for c in self.components if c.purl), "without_version": sum(1 for c in self.components if not c.version), } def generate_report(self) -> str: """生成 SBOM 分析报告""" stats = self.get_statistics() risky = self.find_risky_licenses() lines = [ "=" * 60, "SBOM 分析报告", "=" * 60, f"组件总数: {stats['total_components']}", f"有 PURL: {stats['with_purl']}", f"无版本号: {stats['without_version']}", "", "组件类型分布:", ] for t, count in stats["by_type"].items(): lines.append(f" {t}: {count}") lines.append("\n许可证分布:") for lic, count in stats["licenses"].items(): lines.append(f" {lic}: {count}") if risky: lines.append(f"\n⚠️ 高风险许可证组件 ({len(risky)}):") for comp in risky: lines.append(f" - {comp.name}@{comp.version} ({', '.join(comp.licenses)})") lines.append("=" * 60) return "\n".join(lines) def scan_vulnerabilities(self, sbom_file: str) -> dict: """ 使用 Grype 扫描 SBOM 中的漏洞 Args: sbom_file: SBOM 文件路径 Returns: Grype 扫描结果 """ try: cmd = ["grype", f"sbom:{sbom_file}", "-o", "json", "--quiet"] logger.info("扫描 SBOM 漏洞: %s", sbom_file) result = subprocess.run( cmd, capture_output=True, text=True, timeout=300 ) return json.loads(result.stdout) if result.stdout else {} except FileNotFoundError: logger.error("Grype 未安装,请运行: brew install grype") raise # ── 使用示例 ──────────────────────────────────────────────────── if __name__ == "__main__": import sys manager = SBOMManager() target = sys.argv[1] if len(sys.argv) > 1 else "python:3.12-slim" # 生成 SBOM try: sbom = manager.generate_sbom(target, output_file="/tmp/sbom.json") except Exception as e: logger.error("SBOM 生成失败: %s", e) sys.exit(1) # 解析并分析 manager.parse_cyclonedx(sbom) report = manager.generate_report() print(report) # 检查高风险许可证 risky = manager.find_risky_licenses() if risky: print(f"\n⚠️ 发现 {len(risky)} 个高风险许可证组件,请审查!") ``` ### Java: Spring Boot 容器化安全配置 ```java /** * Spring Boot 容器化安全配置 * 适用于在 Kubernetes 中运行的 Spring Boot 应用 */ package com.example.security; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.security.config.annotation.web.builders.HttpSecurity; import org.springframework.security.config.annotation.web.configuration.EnableWebSecurity; import org.springframework.security.config.http.SessionCreationPolicy; import org.springframework.security.web.SecurityFilterChain; import org.springframework.security.web.header.writers.ReferrerPolicyHeaderWriter; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RestController; import java.util.Map; @SpringBootApplication public class SecureApplication { public static void main(String[] args) { SpringApplication.run(SecureApplication.class, args); } } /** * 安全配置 — 适合容器化部署 */ @Configuration @EnableWebSecurity class SecurityConfig { @Bean public SecurityFilterChain filterChain(HttpSecurity http) throws Exception { http // ✅ 无状态会话(容器化应用不应依赖 session) .sessionManagement(session -> session.sessionCreationPolicy(SessionCreationPolicy.STATELESS)) // ✅ 安全头部 .headers(headers -> headers // 防止点击劫持 .frameOptions(frame -> frame.deny()) // 内容安全策略 .contentSecurityPolicy(csp -> csp.policyDirectives("default-src 'self'; script-src 'self'")) // 防止 MIME 类型嗅探 .contentTypeOptions(content -> {}) // 引用策略 .referrerPolicy(referrer -> referrer.policy(ReferrerPolicyHeaderWriter.ReferrerPolicy.STRICT_ORIGIN_WHEN_CROSS_ORIGIN)) // HSTS .httpStrictTransportSecurity(hsts -> hsts.includeSubDomains(true).maxAgeInSeconds(31536000)) ) // ✅ 路径权限 .authorizeHttpRequests(auth -> auth // 健康检查端点(K8s 探针需要) .requestMatchers("/health", "/ready", "/metrics").permitAll() // 其他请求需要认证 .anyRequest().authenticated() ) // ✅ 禁用不需要的功能 .csrf(csrf -> csrf.disable()) // 无状态 API 不需要 CSRF .formLogin(form -> form.disable()) .httpBasic(basic -> basic.disable()); return http.build(); } } /** * 健康检查控制器 — K8s 探针使用 */ @RestController class HealthController { @GetMapping("/health") public Map health() { return Map.of("status", "UP"); } @GetMapping("/ready") public Map ready() { // 检查依赖服务是否就绪 return Map.of("status", "READY"); } } ``` **application.yaml** — 容器化安全配置: ```yaml # Spring Boot 容器化安全配置 server: port: 8080 # ✅ 安全头部 servlet: session: cookie: secure: true http-only: true same-site: strict # ✅ 错误处理(不暴露堆栈信息) error: include-stacktrace: never include-message: never # ✅ Actuator 安全配置 management: endpoints: web: exposure: include: health,ready,metrics base-path: / endpoint: health: show-details: never # 不暴露详细健康信息 server: port: 8081 # 管理端口与业务端口分离 # ✅ 日志配置(不记录敏感信息) logging: level: root: INFO org.springframework.security: WARN pattern: console: "%d{ISO8601} [%thread] %-5level %logger{36} - %msg%n" # ✅ 优雅关闭(K8s 需要) spring: lifecycle: timeout-per-shutdown-phase: 30s ``` ### Java: Jib 安全构建配置 ```java /** * Jib 安全构建配置 — 无需 Docker daemon 构建安全镜像 * * build.gradle: */ // ── build.gradle ──────────────────────────────────────────────── /* plugins { id 'org.springframework.boot' version '3.3.0' id 'com.google.cloud.tools.jib' version '3.4.1' } jib { // ✅ 使用 distroless 基础镜像(极小攻击面) from { image = 'gcr.io/distroless/java21-debian12:nonroot' // 或使用 Eclipse Temurin: // image = 'eclipse-temurin:21-jre-jammy' } to { image = "${System.getenv('REGISTRY') ?: 'myregistry.com'}/myapp" tags = [version, 'latest'] // ✅ 镜像签名(需要配置 cosign) } container { // ✅ 非 root 用户运行 user = '65532:65532' // ✅ JVM 安全参数 jvmFlags = [ '-XX:+UseContainerSupport', '-XX:MaxRAMPercentage=75.0', '-Djava.security.egd=file:/dev/urandom', '-Dfile.encoding=UTF-8', // 禁用不安全的序列化 '-Djdk.serialFilter=!*', ] // ✅ 只暴露必要端口 ports = ['8080'] // ✅ 标签 labels = [ 'maintainer': 'security-team@example.com', 'org.opencontainers.image.source': 'https://github.com/org/repo', 'org.opencontainers.image.description': 'Secure Spring Boot App', ] // ✅ 创建时间(可重现构建) creationTime = 'USE_CURRENT_TIMESTAMP' // ✅ 环境变量 environment = [ 'JAVA_TOOL_OPTIONS': '-XX:+UseContainerSupport', 'TZ': 'UTC', ] } // ✅ 额外目录(如配置文件) extraDirectories { paths { path { from = 'src/main/resources/docker' into = '/app/config' } } permissions = [ '/app/config/*': '444', // 只读 ] } } */ // ── Jib 构建命令 ──────────────────────────────────────────────── // 构建并推送到仓库(无需 Docker daemon): // ./gradlew jib // // 构建到本地 Docker: // ./gradlew jibDockerBuild // // 构建为 tar 文件: // ./gradlew jibBuildTar ``` ## 28.4 容器安全扫描工具对比 | 特性 | Trivy | Grype | Snyk Container | |------|-------|-------|----------------| | **开源** | ✅ Apache 2.0 | ✅ Apache 2.0 | ⚠️ 免费层 + 商业 | | **漏洞数据库** | NVD, GitHub Advisory, 多源 | NVD, GitHub Advisory | Snyk 自有数据库 | | **扫描速度** | ⚡ 快(本地缓存) | ⚡ 快 | 🐢 较慢(云端分析) | | **OS 包扫描** | ✅ | ✅ | ✅ | | **语言包扫描** | ✅ (pip, npm, go, maven...) | ✅ | ✅ | | **IaC 扫描** | ✅ (K8s, Terraform, Docker) | ❌ | ✅ | | **SBOM 生成** | ✅ CycloneDX, SPDX | ❌ (用 Syft) | ✅ | | **许可证扫描** | ✅ | ❌ | ✅ | | **密钥扫描** | ✅ | ❌ | ❌ | | **CI/CD 集成** | ✅ GitHub Actions, GitLab CI | ✅ GitHub Actions | ✅ 原生集成 | | **IDE 插件** | ✅ VS Code | ❌ | ✅ VS Code, IntelliJ | | **修复建议** | ⚠️ 基本 | ⚠️ 基本 | ✅ 详细(含 PR) | | **安装** | `brew install trivy` | `brew install grype` | `npm install -g snyk` | **选型建议**: - **全能型** → **Trivy**(漏洞 + IaC + SBOM + 密钥,一个工具搞定) - **专注漏洞扫描** → **Grype** + **Syft**(Anchore 生态) - **企业级 + 修复建议** → **Snyk**(商业支持,自动修复 PR) - **最佳实践**:CI 中同时使用 Trivy + Grype,交叉验证 ## 28.5 Kubernetes 安全加固清单 ### 集群级别 | 类别 | 检查项 | 优先级 | 工具/方法 | |------|--------|--------|----------| | **API Server** | 禁用匿名认证 | 🔴 高 | `--anonymous-auth=false` | | **API Server** | 启用审计日志 | 🔴 高 | `--audit-log-path` | | **API Server** | 启用 RBAC | 🔴 高 | `--authorization-mode=RBAC` | | **etcd** | 启用静态加密 | 🔴 高 | `EncryptionConfiguration` | | **etcd** | 启用 TLS | 🔴 高 | `--cert-file`, `--key-file` | | **Kubelet** | 禁用匿名认证 | 🔴 高 | `--anonymous-auth=false` | | **Kubelet** | 启用 Webhook 认证 | 🟡 中 | `--authentication-token-webhook` | | **网络** | 启用 Network Policy | 🔴 高 | Calico / Cilium | | **网络** | 加密 Pod 间通信 | 🟡 中 | WireGuard / Istio mTLS | ### 工作负载级别 | 类别 | 检查项 | 优先级 | 说明 | |------|--------|--------|------| | **PSS** | 生产命名空间使用 `restricted` | 🔴 高 | 强制非 root、只读 FS | | **镜像** | 禁止使用 `latest` 标签 | 🔴 高 | OPA/Kyverno 策略 | | **镜像** | 只允许可信仓库 | 🔴 高 | Admission Webhook | | **镜像** | 启用镜像签名验证 | 🟡 中 | Cosign + Kyverno | | **资源** | 设置 CPU/内存限制 | 🔴 高 | 防止资源耗尽 | | **Secrets** | 使用 External Secrets | 🔴 高 | 不在 YAML 中硬编码 | | **SA** | 禁用自动挂载 SA Token | 🟡 中 | `automountServiceAccountToken: false` | | **容器** | 只读根文件系统 | 🟡 中 | `readOnlyRootFilesystem: true` | | **容器** | 丢弃所有 capabilities | 🔴 高 | `drop: [ALL]` | ### 运行时安全 | 类别 | 检查项 | 优先级 | 工具 | |------|--------|--------|------| | **监控** | 异常行为检测 | 🔴 高 | Falco / Tetragon | | **扫描** | 定期镜像漏洞扫描 | 🔴 高 | Trivy Operator | | **日志** | 集中式日志收集 | 🔴 高 | Fluentd / Vector | | **审计** | K8s 审计日志分析 | 🟡 中 | ELK / Loki | | **备份** | etcd 定期备份 | 🔴 高 | Velero | ### CIS Benchmark 自动检查 ```bash # 使用 kube-bench 检查 CIS Kubernetes Benchmark # 安装: brew install kube-bench kube-bench run --targets master,node,policies # 使用 Trivy 检查 K8s 安全配置 trivy k8s --report summary cluster ``` ## 28.6 小结 - **4C 模型** 从云平台到代码,每一层都需要安全措施 - **容器安全**:最小镜像 + 非 root + 漏洞扫描 + 签名 - **K8s 安全**:RBAC + PSS + Network Policy + Admission Control - **OPA Gatekeeper** 在 Admission 阶段强制执行安全策略 - **供应链安全**(SLSA)确保从源代码到部署的完整性 - **SBOM** 提供软件组成透明度,便于漏洞追踪和许可证合规 - **Admission Webhook** 可自定义安全策略,在部署前拦截违规配置 - 安全是**分层**的,每一层都不能缺少