第二十八章:云原生安全

“云原生不是更安全,而是安全的方式不同了。”

        mindmap
  root((云原生安全))
    4C 模型
      Cloud
      Cluster
      Container
      Code
    容器安全
      镜像扫描
      运行时安全
      镜像签名
      SBOM
    K8s 安全
      RBAC
      PSS
      Network Policy
      Admission Control
    合规
      SOC 2
      ISO 27001
      CIS Benchmarks
    

28.1 4C 安全模型

        flowchart TB
    subgraph Cloud["☁️ Cloud(云平台)"]
        direction TB
        CloudDesc["IAM · VPC · 加密 · 审计日志"]
        subgraph Cluster["🏗️ Cluster(集群)"]
            direction TB
            ClusterDesc["RBAC · PSS · Network Policy · Admission Control"]
            subgraph Container["📦 Container(容器)"]
                direction TB
                ContainerDesc["镜像扫描 · 最小基础镜像 · 非 root · 签名"]
                subgraph Code["💻 Code(代码)"]
                    CodeDesc["SAST · SCA · 安全编码 · 密钥管理"]
                end
            end
        end
    end

    style Cloud fill:#1a237e,stroke:#333,color:#fff
    style Cluster fill:#283593,stroke:#333,color:#fff
    style Container fill:#3949ab,stroke:#333,color:#fff
    style Code fill:#5c6bc0,stroke:#333,color:#fff
    

每一层的安全职责:

层级

安全措施

工具

Cloud

IAM 最小权限、VPC 隔离、静态加密、审计日志

AWS IAM, CloudTrail, VPC

Cluster

RBAC、Pod Security Standards、Network Policy

K8s RBAC, OPA, Cilium

Container

镜像扫描、签名验证、非 root、只读文件系统

Trivy, Cosign, Falco

Code

SAST、SCA、密钥管理、安全依赖

Semgrep, Snyk, Vault

28.2 容器安全

容器安全扫描流程

        flowchart LR
    A["👨‍💻 开发者<br/>提交代码"] --> B["🔨 CI 构建<br/>Docker 镜像"]
    B --> C["🔍 镜像扫描<br/>Trivy/Grype"]
    C --> D{"漏洞等级?"}

    D -->|"CRITICAL/HIGH"| E["❌ 构建失败<br/>通知开发者"]
    E --> F["🔧 修复漏洞"]
    F --> A

    D -->|"MEDIUM/LOW"| G["⚠️ 记录告警<br/>继续流水线"]
    D -->|"无漏洞"| H["✅ 通过"]

    G --> I["✍️ 镜像签名<br/>Cosign"]
    H --> I
    I --> J["📦 推送到<br/>私有仓库"]
    J --> K["🚀 部署到<br/>K8s 集群"]
    K --> L["🛡️ Admission<br/>验证签名"]

    style E fill:#F44336,stroke:#333,color:#fff
    style H fill:#4CAF50,stroke:#333,color:#fff
    style I fill:#FF9800,stroke:#333,color:#fff
    style L fill:#2196F3,stroke:#333,color:#fff
    

镜像扫描

# Trivy — 容器镜像漏洞扫描
trivy image myapp:latest
trivy image --severity HIGH,CRITICAL myapp:latest

# Grype — 另一个流行的扫描工具
grype myapp:latest

安全 Dockerfile — Python

# ✅ Python 安全最佳实践 Dockerfile
# 多阶段构建 + 非 root + 最小镜像

# ── 阶段 1:构建依赖 ──────────────────────────────────────────
FROM python:3.12-slim AS builder
WORKDIR /build

# 只复制依赖文件(利用 Docker 缓存)
COPY requirements.txt .
RUN pip install --no-cache-dir --prefix=/install -r requirements.txt

# ── 阶段 2:运行时镜像 ────────────────────────────────────────
FROM python:3.12-slim

# ✅ 安装安全更新
RUN apt-get update && \
    apt-get upgrade -y && \
    apt-get clean && \
    rm -rf /var/lib/apt/lists/*

WORKDIR /app

# ✅ 创建非 root 用户
RUN groupadd -r appuser && \
    useradd -r -g appuser -d /app -s /sbin/nologin appuser

# ✅ 从构建阶段复制依赖(不包含构建工具)
COPY --from=builder /install /usr/local
COPY --chown=appuser:appuser . .

# ✅ 以非 root 用户运行
USER appuser

# ✅ 健康检查
HEALTHCHECK --interval=30s --timeout=3s --retries=3 \
    CMD python -c "import urllib.request; urllib.request.urlopen('http://localhost:8000/health')" || exit 1

# ✅ 只暴露必要端口
EXPOSE 8000

# ✅ 使用 exec 形式(PID 1 信号处理)
CMD ["python", "-m", "uvicorn", "main:app", "--host", "0.0.0.0", "--port", "8000"]

# 运行时安全加固:
# docker run --read-only --tmpfs /tmp --cap-drop ALL --security-opt no-new-privileges myapp:latest

安全 Dockerfile — Java

# ✅ Java 安全最佳实践 Dockerfile
# 多阶段构建 + JLink 最小 JRE + 非 root

# ── 阶段 1:构建 ──────────────────────────────────────────────
FROM eclipse-temurin:21-jdk-jammy AS builder
WORKDIR /build

# ✅ 只复制构建文件(利用缓存)
COPY gradle/ gradle/
COPY gradlew build.gradle settings.gradle ./
RUN ./gradlew dependencies --no-daemon

COPY src/ src/
RUN ./gradlew bootJar --no-daemon -x test

# ✅ 使用 jlink 创建最小 JRE(减少攻击面)
RUN jlink \
    --add-modules java.base,java.logging,java.sql,java.naming,java.management,java.instrument,java.security.jgss,java.desktop \
    --strip-debug \
    --no-man-pages \
    --no-header-files \
    --compress=zip-6 \
    --output /custom-jre

# ── 阶段 2:运行时镜像 ────────────────────────────────────────
FROM debian:bookworm-slim

# ✅ 安装安全更新
RUN apt-get update && \
    apt-get upgrade -y && \
    apt-get clean && \
    rm -rf /var/lib/apt/lists/*

# ✅ 复制最小 JRE
COPY --from=builder /custom-jre /opt/java
ENV PATH="/opt/java/bin:${PATH}"

WORKDIR /app

# ✅ 创建非 root 用户
RUN groupadd -r appuser && \
    useradd -r -g appuser -d /app -s /sbin/nologin appuser

# ✅ 复制应用 JAR
COPY --from=builder --chown=appuser:appuser /build/build/libs/*.jar app.jar

# ✅ 以非 root 用户运行
USER appuser

# ✅ JVM 安全参数
ENV JAVA_OPTS="-XX:+UseContainerSupport \
    -XX:MaxRAMPercentage=75.0 \
    -Djava.security.egd=file:/dev/urandom \
    -Dserver.port=8080"

HEALTHCHECK --interval=30s --timeout=3s --retries=3 \
    CMD java -cp app.jar com.example.HealthCheck || exit 1

EXPOSE 8080

ENTRYPOINT ["sh", "-c", "java $JAVA_OPTS -jar app.jar"]

安全 Dockerfile — Go

# ✅ Go 安全最佳实践 Dockerfile
# 多阶段构建 + scratch/distroless + 静态编译

# ── 阶段 1:构建 ──────────────────────────────────────────────
FROM golang:1.22-bookworm AS builder
WORKDIR /build

# ✅ 只复制依赖文件(利用缓存)
COPY go.mod go.sum ./
RUN go mod download && go mod verify

COPY . .

# ✅ 静态编译(CGO_ENABLED=0),禁用符号表和调试信息
RUN CGO_ENABLED=0 GOOS=linux GOARCH=amd64 go build \
    -ldflags="-w -s -X main.version=$(git describe --tags --always)" \
    -trimpath \
    -o /app/server ./cmd/server

# ── 阶段 2:运行时镜像(使用 distroless,极小攻击面)────────
FROM gcr.io/distroless/static-debian12:nonroot

# ✅ 复制二进制文件
COPY --from=builder /app/server /server

# ✅ 复制 CA 证书(用于 HTTPS 请求)
COPY --from=builder /etc/ssl/certs/ca-certificates.crt /etc/ssl/certs/

# ✅ distroless:nonroot 默认以 UID 65532 运行
USER nonroot:nonroot

EXPOSE 8080

ENTRYPOINT ["/server"]

# 镜像大小对比:
# golang:1.22       ~800MB
# debian:slim        ~80MB
# distroless/static  ~2MB  ← 极小攻击面
# scratch            ~0MB  (但没有 CA 证书和时区数据)

镜像签名(Cosign)

# 签名镜像
cosign sign --key cosign.key myregistry.com/myapp:latest

# 验证签名
cosign verify --key cosign.pub myregistry.com/myapp:latest

供应链安全(SLSA)流程

        flowchart TB
    subgraph Source["📝 源代码"]
        S1["代码审查"]
        S2["签名提交"]
        S3["分支保护"]
    end

    subgraph Build["🔨 构建"]
        B1["隔离构建环境"]
        B2["可重现构建"]
        B3["构建来源证明<br/>(Provenance)"]
    end

    subgraph Verify["✅ 验证"]
        V1["SBOM 生成"]
        V2["漏洞扫描"]
        V3["签名验证"]
    end

    subgraph Deploy["🚀 部署"]
        D1["Admission Webhook<br/>验证签名+来源"]
        D2["策略引擎<br/>(OPA/Kyverno)"]
        D3["运行时监控<br/>(Falco)"]
    end

    Source --> Build --> Verify --> Deploy

    style Source fill:#E8F5E9,stroke:#4CAF50
    style Build fill:#E3F2FD,stroke:#2196F3
    style Verify fill:#FFF3E0,stroke:#FF9800
    style Deploy fill:#F3E5F5,stroke:#9C27B0
    

SLSA 级别

级别

要求

说明

SLSA 1

构建过程有文档

基本的构建来源记录

SLSA 2

使用托管构建服务

构建服务生成来源证明

SLSA 3

构建环境加固

隔离构建、不可篡改的来源证明

SLSA 4

双人审查 + 密封构建

最高安全级别

28.3 Kubernetes 安全

Pod Security 决策流程

        flowchart TD
    A["Pod 创建请求"] --> B{"命名空间 PSS 级别?"}

    B -->|"privileged"| C["✅ 允许所有配置"]
    B -->|"baseline"| D{"检查基线策略"}
    B -->|"restricted"| E{"检查受限策略"}

    D --> D1{"hostNetwork?"}
    D1 -->|"true"| DENY["❌ 拒绝"]
    D1 -->|"false"| D2{"privileged?"}
    D2 -->|"true"| DENY
    D2 -->|"false"| D3{"hostPID/hostIPC?"}
    D3 -->|"true"| DENY
    D3 -->|"false"| ALLOW_B["✅ 允许"]

    E --> E1{"runAsNonRoot?"}
    E1 -->|"false/未设置"| DENY
    E1 -->|"true"| E2{"allowPrivilegeEscalation?"}
    E2 -->|"true/未设置"| DENY
    E2 -->|"false"| E3{"capabilities 只有<br/>NET_BIND_SERVICE?"}
    E3 -->|"否"| DENY
    E3 -->|"是"| E4{"seccompProfile<br/>= RuntimeDefault?"}
    E4 -->|"否"| DENY
    E4 -->|"是"| ALLOW_R["✅ 允许"]

    style DENY fill:#F44336,stroke:#333,color:#fff
    style ALLOW_B fill:#4CAF50,stroke:#333,color:#fff
    style ALLOW_R fill:#4CAF50,stroke:#333,color:#fff
    style A fill:#2196F3,stroke:#333,color:#fff
    

RBAC

# 最小权限 Role
apiVersion: rbac.authorization.k8s.io/v1
kind: Role
metadata:
  name: app-reader
  namespace: production
rules:
  - apiGroups: [""]
    resources: ["pods", "services"]
    verbs: ["get", "list", "watch"]
  - apiGroups: ["apps"]
    resources: ["deployments"]
    verbs: ["get", "list"]

---
apiVersion: rbac.authorization.k8s.io/v1
kind: RoleBinding
metadata:
  name: app-reader-binding
  namespace: production
subjects:
  - kind: ServiceAccount
    name: my-app
    namespace: production
roleRef:
  kind: Role
  name: app-reader
  apiGroup: rbac.authorization.k8s.io

Pod Security Standards(PSS)— Restricted

# ── Namespace 级别强制 Restricted PSS ──────────────────────────
apiVersion: v1
kind: Namespace
metadata:
  name: production
  labels:
    pod-security.kubernetes.io/enforce: restricted
    pod-security.kubernetes.io/enforce-version: latest
    pod-security.kubernetes.io/audit: restricted
    pod-security.kubernetes.io/audit-version: latest
    pod-security.kubernetes.io/warn: restricted
    pod-security.kubernetes.io/warn-version: latest

---
# ── 符合 Restricted PSS 的 Pod 示例 ───────────────────────────
apiVersion: v1
kind: Pod
metadata:
  name: secure-app
  namespace: production
spec:
  # ✅ 不使用 host 命名空间
  hostNetwork: false
  hostPID: false
  hostIPC: false

  # ✅ Pod 级别安全上下文
  securityContext:
    runAsNonRoot: true
    runAsUser: 65534
    runAsGroup: 65534
    fsGroup: 65534
    seccompProfile:
      type: RuntimeDefault

  containers:
    - name: app
      image: myregistry.com/myapp:v1.2.3  # ✅ 不使用 latest
      ports:
        - containerPort: 8080

      # ✅ 容器级别安全上下文
      securityContext:
        allowPrivilegeEscalation: false
        readOnlyRootFilesystem: true
        runAsNonRoot: true
        runAsUser: 65534
        capabilities:
          drop:
            - ALL
        seccompProfile:
          type: RuntimeDefault

      # ✅ 资源限制(防止资源耗尽攻击)
      resources:
        requests:
          cpu: 100m
          memory: 128Mi
        limits:
          cpu: 500m
          memory: 512Mi

      # ✅ 健康检查
      livenessProbe:
        httpGet:
          path: /health
          port: 8080
        initialDelaySeconds: 10
        periodSeconds: 30
      readinessProbe:
        httpGet:
          path: /ready
          port: 8080
        initialDelaySeconds: 5
        periodSeconds: 10

      # ✅ 临时目录(只读根文件系统需要)
      volumeMounts:
        - name: tmp
          mountPath: /tmp

  volumes:
    - name: tmp
      emptyDir:
        sizeLimit: 100Mi

  # ✅ 服务账户(最小权限)
  serviceAccountName: my-app
  automountServiceAccountToken: false  # 不需要时禁用

NetworkPolicy 示例

# ── 默认拒绝所有入站流量 ──────────────────────────────────────
apiVersion: networking.k8s.io/v1
kind: NetworkPolicy
metadata:
  name: default-deny-ingress
  namespace: production
spec:
  podSelector: {}  # 匹配所有 Pod
  policyTypes:
    - Ingress

---
# ── 默认拒绝所有出站流量 ──────────────────────────────────────
apiVersion: networking.k8s.io/v1
kind: NetworkPolicy
metadata:
  name: default-deny-egress
  namespace: production
spec:
  podSelector: {}
  policyTypes:
    - Egress

---
# ── 允许 frontend → backend 的流量 ────────────────────────────
apiVersion: networking.k8s.io/v1
kind: NetworkPolicy
metadata:
  name: allow-frontend-to-backend
  namespace: production
spec:
  podSelector:
    matchLabels:
      app: backend
  policyTypes:
    - Ingress
  ingress:
    - from:
        - podSelector:
            matchLabels:
              app: frontend
      ports:
        - protocol: TCP
          port: 8080

---
# ── 允许 backend → database 的流量 ────────────────────────────
apiVersion: networking.k8s.io/v1
kind: NetworkPolicy
metadata:
  name: allow-backend-to-db
  namespace: production
spec:
  podSelector:
    matchLabels:
      app: backend
  policyTypes:
    - Egress
  egress:
    - to:
        - podSelector:
            matchLabels:
              app: database
      ports:
        - protocol: TCP
          port: 5432
    # ✅ 允许 DNS 查询
    - to:
        - namespaceSelector: {}
          podSelector:
            matchLabels:
              k8s-app: kube-dns
      ports:
        - protocol: UDP
          port: 53

Admission Control(OPA Gatekeeper)

# ── ConstraintTemplate:禁止使用指定镜像标签 ──────────────────
apiVersion: templates.gatekeeper.sh/v1
kind: ConstraintTemplate
metadata:
  name: k8sdisallowedtags
spec:
  crd:
    spec:
      names:
        kind: K8sDisallowedTags
      validation:
        openAPIV3Schema:
          type: object
          properties:
            tags:
              type: array
              description: "禁止使用的镜像标签列表"
              items:
                type: string
            exemptImages:
              type: array
              description: "豁免的镜像列表(支持通配符)"
              items:
                type: string
  targets:
    - target: admission.k8s.gatekeeper.sh
      rego: |
        package k8sdisallowedtags

        # 检查所有容器(包括 init 容器)
        violation[{"msg": msg}] {
            container := input_containers[_]
            tags := {t | t := input.parameters.tags[_]}
            tag := get_tag(container.image)
            tags[tag]
            not is_exempt(container.image)
            msg := sprintf(
                "容器 '%s' 使用了禁止的镜像标签 '%s'(镜像: %s)",
                [container.name, tag, container.image]
            )
        }

        # 没有标签等同于 latest
        violation[{"msg": msg}] {
            container := input_containers[_]
            tags := {t | t := input.parameters.tags[_]}
            not contains(container.image, ":")
            tags["latest"]
            not is_exempt(container.image)
            msg := sprintf(
                "容器 '%s' 未指定镜像标签(等同于 latest,镜像: %s)",
                [container.name, container.image]
            )
        }

        input_containers[c] {
            c := input.review.object.spec.containers[_]
        }
        input_containers[c] {
            c := input.review.object.spec.initContainers[_]
        }

        get_tag(image) = tag {
            parts := split(image, ":")
            count(parts) == 2
            tag := parts[1]
        }

        is_exempt(image) {
            exempt := input.parameters.exemptImages[_]
            glob.match(exempt, [], image)
        }

---
# ── Constraint:禁止使用 latest 标签 ──────────────────────────
apiVersion: constraints.gatekeeper.sh/v1beta1
kind: K8sDisallowedTags
metadata:
  name: no-latest-tag
spec:
  enforcementAction: deny
  match:
    kinds:
      - apiGroups: [""]
        kinds: ["Pod"]
    namespaces:
      - production
      - staging
  parameters:
    tags: ["latest"]
    exemptImages:
      - "gcr.io/distroless/*"  # 豁免 distroless 镜像

---
# ── ConstraintTemplate:要求资源限制 ──────────────────────────
apiVersion: templates.gatekeeper.sh/v1
kind: ConstraintTemplate
metadata:
  name: k8srequiredresources
spec:
  crd:
    spec:
      names:
        kind: K8sRequiredResources
  targets:
    - target: admission.k8s.gatekeeper.sh
      rego: |
        package k8srequiredresources

        violation[{"msg": msg}] {
            container := input.review.object.spec.containers[_]
            not container.resources.limits
            msg := sprintf(
                "容器 '%s' 必须设置资源限制 (resources.limits)",
                [container.name]
            )
        }

        violation[{"msg": msg}] {
            container := input.review.object.spec.containers[_]
            not container.resources.requests
            msg := sprintf(
                "容器 '%s' 必须设置资源请求 (resources.requests)",
                [container.name]
            )
        }

---
# ── Constraint:要求所有容器设置资源限制 ──────────────────────
apiVersion: constraints.gatekeeper.sh/v1beta1
kind: K8sRequiredResources
metadata:
  name: require-resource-limits
spec:
  enforcementAction: deny
  match:
    kinds:
      - apiGroups: [""]
        kinds: ["Pod"]
    namespaces:
      - production

Go: 容器镜像签名验证(Cosign/Sigstore)

/*
容器镜像签名验证 — 使用 cosign/sigstore 库
依赖:
  go get github.com/sigstore/cosign/v2
  go get github.com/sigstore/sigstore/pkg/signature
*/
package main

import (
	"context"
	"crypto"
	"encoding/json"
	"fmt"
	"log"
	"os"

	"github.com/google/go-containerregistry/pkg/name"
	"github.com/sigstore/cosign/v2/cmd/cosign/cli/fulcio"
	"github.com/sigstore/cosign/v2/cmd/cosign/cli/options"
	"github.com/sigstore/cosign/v2/cmd/cosign/cli/verify"
	"github.com/sigstore/cosign/v2/pkg/cosign"
	ociremote "github.com/sigstore/cosign/v2/pkg/oci/remote"
	"github.com/sigstore/sigstore/pkg/signature"
)

// ImageVerifier 镜像签名验证器
type ImageVerifier struct {
	publicKeyPath string
}

// NewImageVerifier 创建镜像验证器
func NewImageVerifier(publicKeyPath string) *ImageVerifier {
	return &ImageVerifier{publicKeyPath: publicKeyPath}
}

// VerifyImage 验证镜像签名
func (iv *ImageVerifier) VerifyImage(imageRef string) (*VerificationResult, error) {
	ctx := context.Background()

	// 解析镜像引用
	ref, err := name.ParseReference(imageRef)
	if err != nil {
		return nil, fmt.Errorf("无效的镜像引用 '%s': %w", imageRef, err)
	}

	// 加载公钥
	pubKey, err := signature.LoadPublicKeyRaw(ctx, iv.publicKeyPath, crypto.SHA256)
	if err != nil {
		return nil, fmt.Errorf("加载公钥失败: %w", err)
	}

	// 配置验证选项
	checkOpts := &cosign.CheckOpts{
		SigVerifier: pubKey,
	}

	// 执行验证
	signatures, bundleVerified, err := cosign.VerifyImageSignatures(ctx, ref, checkOpts)
	if err != nil {
		return &VerificationResult{
			Image:    imageRef,
			Verified: false,
			Error:    err.Error(),
		}, nil
	}

	result := &VerificationResult{
		Image:          imageRef,
		Verified:       true,
		BundleVerified: bundleVerified,
		SignatureCount: len(signatures),
	}

	// 提取签名负载信息
	for _, sig := range signatures {
		payload, err := sig.Payload()
		if err == nil {
			result.Payloads = append(result.Payloads, string(payload))
		}
	}

	log.Printf("✅ 镜像签名验证通过: %s (%d 个签名)", imageRef, len(signatures))
	return result, nil
}

// VerificationResult 验证结果
type VerificationResult struct {
	Image          string   `json:"image"`
	Verified       bool     `json:"verified"`
	BundleVerified bool     `json:"bundle_verified"`
	SignatureCount int      `json:"signature_count"`
	Payloads       []string `json:"payloads,omitempty"`
	Error          string   `json:"error,omitempty"`
}

// VerifyKeyless 使用 Keyless(Fulcio + Rekor)验证签名
func VerifyKeyless(imageRef, expectedIssuer, expectedSubject string) (*VerificationResult, error) {
	ctx := context.Background()

	ref, err := name.ParseReference(imageRef)
	if err != nil {
		return nil, fmt.Errorf("无效的镜像引用: %w", err)
	}

	// Keyless 验证:使用 Fulcio CA + Rekor 透明日志
	checkOpts := &cosign.CheckOpts{
		Identities: []cosign.Identity{
			{
				Issuer:  expectedIssuer,  // 如 "https://accounts.google.com"
				Subject: expectedSubject, // 如 "user@example.com"
			},
		},
	}

	signatures, bundleVerified, err := cosign.VerifyImageSignatures(ctx, ref, checkOpts)
	if err != nil {
		return &VerificationResult{
			Image:    imageRef,
			Verified: false,
			Error:    err.Error(),
		}, nil
	}

	return &VerificationResult{
		Image:          imageRef,
		Verified:       true,
		BundleVerified: bundleVerified,
		SignatureCount: len(signatures),
	}, nil
}

func main() {
	// 使用公钥验证
	verifier := NewImageVerifier("cosign.pub")
	result, err := verifier.VerifyImage("myregistry.com/myapp:v1.2.3")
	if err != nil {
		log.Fatalf("验证失败: %v", err)
	}

	output, _ := json.MarshalIndent(result, "", "  ")
	fmt.Println(string(output))

	if !result.Verified {
		fmt.Println("❌ 镜像签名验证失败,拒绝部署!")
		os.Exit(1)
	}
	fmt.Println("✅ 镜像签名验证通过,允许部署")
}

Go: Admission Webhook 实现

/*
Kubernetes Admission Webhook — 验证 Pod 安全策略
依赖:
  go get k8s.io/api/admission/v1
  go get k8s.io/apimachinery/pkg/runtime
*/
package main

import (
	"encoding/json"
	"fmt"
	"io"
	"log"
	"net/http"
	"os"
	"strings"

	admissionv1 "k8s.io/api/admission/v1"
	corev1 "k8s.io/api/core/v1"
	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
	"k8s.io/apimachinery/pkg/runtime"
	"k8s.io/apimachinery/pkg/runtime/serializer"
)

var (
	scheme = runtime.NewScheme()
	codecs = serializer.NewCodecFactory(scheme)
)

// SecurityPolicy 安全策略配置
type SecurityPolicy struct {
	// 禁止使用的镜像标签
	DisallowedTags []string `json:"disallowed_tags"`
	// 允许的镜像仓库前缀
	AllowedRegistries []string `json:"allowed_registries"`
	// 是否要求非 root 运行
	RequireNonRoot bool `json:"require_non_root"`
	// 是否要求资源限制
	RequireResourceLimits bool `json:"require_resource_limits"`
	// 是否要求只读根文件系统
	RequireReadOnlyRoot bool `json:"require_readonly_root"`
}

// WebhookServer Admission Webhook 服务器
type WebhookServer struct {
	policy SecurityPolicy
}

// NewWebhookServer 创建 Webhook 服务器
func NewWebhookServer(policy SecurityPolicy) *WebhookServer {
	return &WebhookServer{policy: policy}
}

// handleValidate 处理验证请求
func (ws *WebhookServer) handleValidate(w http.ResponseWriter, r *http.Request) {
	// 读取请求体
	body, err := io.ReadAll(r.Body)
	if err != nil {
		http.Error(w, "读取请求失败", http.StatusBadRequest)
		return
	}

	// 解析 AdmissionReview
	var admissionReview admissionv1.AdmissionReview
	if err := json.Unmarshal(body, &admissionReview); err != nil {
		http.Error(w, "解析请求失败", http.StatusBadRequest)
		return
	}

	// 验证 Pod
	response := ws.validatePod(admissionReview.Request)

	// 构建响应
	admissionReview.Response = response
	admissionReview.Response.UID = admissionReview.Request.UID

	respBytes, err := json.Marshal(admissionReview)
	if err != nil {
		http.Error(w, "序列化响应失败", http.StatusInternalServerError)
		return
	}

	w.Header().Set("Content-Type", "application/json")
	w.Write(respBytes)
}

// validatePod 验证 Pod 安全策略
func (ws *WebhookServer) validatePod(req *admissionv1.AdmissionRequest) *admissionv1.AdmissionResponse {
	// 解析 Pod
	var pod corev1.Pod
	if err := json.Unmarshal(req.Object.Raw, &pod); err != nil {
		return &admissionv1.AdmissionResponse{
			Allowed: false,
			Result: &metav1.Status{
				Message: fmt.Sprintf("解析 Pod 失败: %v", err),
			},
		}
	}

	var violations []string

	// 检查所有容器
	allContainers := append(pod.Spec.Containers, pod.Spec.InitContainers...)
	for _, container := range allContainers {
		// 检查镜像标签
		violations = append(violations, ws.checkImageTag(container)...)

		// 检查镜像仓库
		violations = append(violations, ws.checkRegistry(container)...)

		// 检查资源限制
		if ws.policy.RequireResourceLimits {
			violations = append(violations, ws.checkResourceLimits(container)...)
		}

		// 检查安全上下文
		violations = append(violations, ws.checkSecurityContext(container)...)
	}

	if len(violations) > 0 {
		log.Printf("❌ Pod %s/%s 被拒绝: %v", pod.Namespace, pod.Name, violations)
		return &admissionv1.AdmissionResponse{
			Allowed: false,
			Result: &metav1.Status{
				Message: fmt.Sprintf("安全策略违规:\n- %s",
					strings.Join(violations, "\n- ")),
			},
		}
	}

	log.Printf("✅ Pod %s/%s 验证通过", pod.Namespace, pod.Name)
	return &admissionv1.AdmissionResponse{Allowed: true}
}

func (ws *WebhookServer) checkImageTag(c corev1.Container) []string {
	var violations []string
	for _, tag := range ws.policy.DisallowedTags {
		if strings.HasSuffix(c.Image, ":"+tag) {
			violations = append(violations,
				fmt.Sprintf("容器 '%s' 使用了禁止的标签 '%s'", c.Name, tag))
		}
	}
	// 没有标签等同于 latest
	if !strings.Contains(c.Image, ":") {
		for _, tag := range ws.policy.DisallowedTags {
			if tag == "latest" {
				violations = append(violations,
					fmt.Sprintf("容器 '%s' 未指定标签(等同于 latest)", c.Name))
			}
		}
	}
	return violations
}

func (ws *WebhookServer) checkRegistry(c corev1.Container) []string {
	if len(ws.policy.AllowedRegistries) == 0 {
		return nil
	}
	for _, reg := range ws.policy.AllowedRegistries {
		if strings.HasPrefix(c.Image, reg) {
			return nil
		}
	}
	return []string{
		fmt.Sprintf("容器 '%s' 的镜像 '%s' 不在允许的仓库列表中", c.Name, c.Image),
	}
}

func (ws *WebhookServer) checkResourceLimits(c corev1.Container) []string {
	var violations []string
	if c.Resources.Limits == nil {
		violations = append(violations,
			fmt.Sprintf("容器 '%s' 必须设置资源限制", c.Name))
	}
	if c.Resources.Requests == nil {
		violations = append(violations,
			fmt.Sprintf("容器 '%s' 必须设置资源请求", c.Name))
	}
	return violations
}

func (ws *WebhookServer) checkSecurityContext(c corev1.Container) []string {
	var violations []string
	sc := c.SecurityContext

	if ws.policy.RequireNonRoot {
		if sc == nil || sc.RunAsNonRoot == nil || !*sc.RunAsNonRoot {
			violations = append(violations,
				fmt.Sprintf("容器 '%s' 必须设置 runAsNonRoot: true", c.Name))
		}
	}

	if ws.policy.RequireReadOnlyRoot {
		if sc == nil || sc.ReadOnlyRootFilesystem == nil || !*sc.ReadOnlyRootFilesystem {
			violations = append(violations,
				fmt.Sprintf("容器 '%s' 必须设置 readOnlyRootFilesystem: true", c.Name))
		}
	}

	return violations
}

// handleHealth 健康检查
func (ws *WebhookServer) handleHealth(w http.ResponseWriter, r *http.Request) {
	w.WriteHeader(http.StatusOK)
	w.Write([]byte("ok"))
}

func main() {
	policy := SecurityPolicy{
		DisallowedTags:        []string{"latest", "dev", "test"},
		AllowedRegistries:     []string{"myregistry.com/", "gcr.io/distroless/"},
		RequireNonRoot:        true,
		RequireResourceLimits: true,
		RequireReadOnlyRoot:   true,
	}

	server := NewWebhookServer(policy)

	mux := http.NewServeMux()
	mux.HandleFunc("/validate", server.handleValidate)
	mux.HandleFunc("/health", server.handleHealth)

	// TLS 证书路径(K8s 会自动注入)
	certFile := os.Getenv("TLS_CERT_FILE")
	keyFile := os.Getenv("TLS_KEY_FILE")
	if certFile == "" {
		certFile = "/etc/webhook/certs/tls.crt"
	}
	if keyFile == "" {
		keyFile = "/etc/webhook/certs/tls.key"
	}

	addr := ":8443"
	log.Printf("🚀 Admission Webhook 启动: %s", addr)
	log.Printf("📋 策略: %+v", policy)

	if err := http.ListenAndServeTLS(addr, certFile, keyFile, mux); err != nil {
		log.Fatalf("服务器启动失败: %v", err)
	}
}

Python: Trivy 扫描结果解析器

"""
Trivy 扫描结果解析器 — 解析 JSON 输出,生成安全报告
依赖: pip install tabulate
使用: trivy image --format json -o results.json myapp:latest
"""
import json
import subprocess
import sys
import logging
from dataclasses import dataclass, field
from typing import Optional
from collections import Counter

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)


@dataclass
class Vulnerability:
    """漏洞信息"""
    vuln_id: str
    pkg_name: str
    installed_version: str
    fixed_version: str
    severity: str
    title: str
    description: str = ""
    references: list = field(default_factory=list)


@dataclass
class ScanResult:
    """扫描结果"""
    target: str
    target_type: str
    vulnerabilities: list = field(default_factory=list)


class TrivyScanner:
    """Trivy 扫描结果解析器"""

    SEVERITY_ORDER = {"CRITICAL": 0, "HIGH": 1, "MEDIUM": 2, "LOW": 3, "UNKNOWN": 4}

    def __init__(self):
        self.results: list[ScanResult] = []
        self.all_vulns: list[Vulnerability] = []

    def scan_image(self, image: str, severity: str = "CRITICAL,HIGH,MEDIUM,LOW") -> dict:
        """
        扫描容器镜像并返回 JSON 结果

        Args:
            image: 镜像名称(如 "myapp:latest")
            severity: 漏洞严重级别过滤
        Returns:
            Trivy JSON 输出
        """
        try:
            cmd = [
                "trivy", "image",
                "--format", "json",
                "--severity", severity,
                "--quiet",
                image,
            ]
            logger.info("扫描镜像: %s", image)
            result = subprocess.run(
                cmd, capture_output=True, text=True, timeout=300
            )
            if result.returncode != 0 and not result.stdout:
                logger.error("Trivy 扫描失败: %s", result.stderr)
                raise RuntimeError(f"Trivy scan failed: {result.stderr}")

            return json.loads(result.stdout)

        except FileNotFoundError:
            logger.error("Trivy 未安装,请运行: brew install trivy")
            raise
        except subprocess.TimeoutExpired:
            logger.error("Trivy 扫描超时")
            raise

    def parse_results(self, trivy_output: dict) -> list[ScanResult]:
        """
        解析 Trivy JSON 输出

        Args:
            trivy_output: Trivy JSON 输出字典
        Returns:
            解析后的扫描结果列表
        """
        self.results = []
        self.all_vulns = []

        results = trivy_output.get("Results", [])
        for result in results:
            scan_result = ScanResult(
                target=result.get("Target", "unknown"),
                target_type=result.get("Type", "unknown"),
            )

            for vuln in result.get("Vulnerabilities", []):
                v = Vulnerability(
                    vuln_id=vuln.get("VulnerabilityID", ""),
                    pkg_name=vuln.get("PkgName", ""),
                    installed_version=vuln.get("InstalledVersion", ""),
                    fixed_version=vuln.get("FixedVersion", "N/A"),
                    severity=vuln.get("Severity", "UNKNOWN"),
                    title=vuln.get("Title", ""),
                    description=vuln.get("Description", ""),
                    references=vuln.get("References", []),
                )
                scan_result.vulnerabilities.append(v)
                self.all_vulns.append(v)

            self.results.append(scan_result)

        logger.info("解析完成: %d 个目标, %d 个漏洞",
                     len(self.results), len(self.all_vulns))
        return self.results

    def get_summary(self) -> dict:
        """获取漏洞摘要统计"""
        severity_count = Counter(v.severity for v in self.all_vulns)
        fixable = sum(1 for v in self.all_vulns if v.fixed_version != "N/A")

        return {
            "total": len(self.all_vulns),
            "critical": severity_count.get("CRITICAL", 0),
            "high": severity_count.get("HIGH", 0),
            "medium": severity_count.get("MEDIUM", 0),
            "low": severity_count.get("LOW", 0),
            "fixable": fixable,
            "unfixable": len(self.all_vulns) - fixable,
        }

    def get_critical_vulns(self) -> list[Vulnerability]:
        """获取所有 CRITICAL 和 HIGH 漏洞"""
        return sorted(
            [v for v in self.all_vulns if v.severity in ("CRITICAL", "HIGH")],
            key=lambda v: self.SEVERITY_ORDER.get(v.severity, 99),
        )

    def generate_report(self) -> str:
        """生成文本格式的安全报告"""
        summary = self.get_summary()
        lines = [
            "=" * 70,
            "容器镜像安全扫描报告",
            "=" * 70,
            "",
            f"漏洞总数: {summary['total']}",
            f"  CRITICAL: {summary['critical']}",
            f"  HIGH:     {summary['high']}",
            f"  MEDIUM:   {summary['medium']}",
            f"  LOW:      {summary['low']}",
            f"  可修复:   {summary['fixable']}",
            f"  不可修复: {summary['unfixable']}",
            "",
        ]

        critical_vulns = self.get_critical_vulns()
        if critical_vulns:
            lines.append("-" * 70)
            lines.append("⚠️  CRITICAL / HIGH 漏洞详情:")
            lines.append("-" * 70)
            for v in critical_vulns:
                lines.append(f"\n[{v.severity}] {v.vuln_id}")
                lines.append(f"  包: {v.pkg_name} ({v.installed_version})")
                lines.append(f"  修复版本: {v.fixed_version}")
                lines.append(f"  标题: {v.title}")

        lines.append("")
        lines.append("=" * 70)
        return "\n".join(lines)

    def should_fail_build(self, max_critical: int = 0,
                          max_high: int = 0) -> bool:
        """
        判断是否应该使构建失败

        Args:
            max_critical: 允许的最大 CRITICAL 漏洞数
            max_high: 允许的最大 HIGH 漏洞数
        Returns:
            True 表示应该失败
        """
        summary = self.get_summary()
        if summary["critical"] > max_critical:
            logger.error("CRITICAL 漏洞数 (%d) 超过阈值 (%d)",
                         summary["critical"], max_critical)
            return True
        if summary["high"] > max_high:
            logger.error("HIGH 漏洞数 (%d) 超过阈值 (%d)",
                         summary["high"], max_high)
            return True
        return False


# ── 使用示例 ────────────────────────────────────────────────────
if __name__ == "__main__":
    scanner = TrivyScanner()

    # 方式 1: 直接扫描镜像
    image = sys.argv[1] if len(sys.argv) > 1 else "python:3.12-slim"
    try:
        output = scanner.scan_image(image)
        scanner.parse_results(output)
    except Exception as e:
        logger.error("扫描失败: %s", e)
        sys.exit(1)

    # 生成报告
    report = scanner.generate_report()
    print(report)

    # CI/CD 集成:检查是否应该失败
    if scanner.should_fail_build(max_critical=0, max_high=5):
        print("\n❌ 构建失败:漏洞数超过阈值")
        sys.exit(1)
    else:
        print("\n✅ 安全检查通过")

Python: SBOM 生成与分析

"""
SBOM (Software Bill of Materials) 生成与分析
依赖: pip install packageurl-python
工具: syft (生成 SBOM), grype (漏洞扫描)
"""
import json
import subprocess
import logging
from dataclasses import dataclass, field
from typing import Optional
from collections import Counter

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)


@dataclass
class Component:
    """SBOM 组件"""
    name: str
    version: str
    type: str  # library, framework, application
    purl: str = ""  # Package URL
    licenses: list = field(default_factory=list)
    supplier: str = ""


class SBOMManager:
    """SBOM 生成与分析管理器"""

    def __init__(self):
        self.components: list[Component] = []

    def generate_sbom(self, target: str, output_format: str = "cyclonedx-json",
                      output_file: Optional[str] = None) -> dict:
        """
        使用 Syft 生成 SBOM

        Args:
            target: 扫描目标(镜像名、目录路径等)
            output_format: 输出格式(cyclonedx-json, spdx-json, syft-json)
            output_file: 输出文件路径(可选)
        Returns:
            SBOM JSON 数据
        """
        try:
            cmd = ["syft", target, "-o", output_format, "--quiet"]
            logger.info("生成 SBOM: %s (格式: %s)", target, output_format)

            result = subprocess.run(
                cmd, capture_output=True, text=True, timeout=300
            )
            if result.returncode != 0:
                logger.error("Syft 执行失败: %s", result.stderr)
                raise RuntimeError(f"Syft failed: {result.stderr}")

            sbom_data = json.loads(result.stdout)

            if output_file:
                with open(output_file, 'w') as f:
                    json.dump(sbom_data, f, indent=2)
                logger.info("SBOM 已保存: %s", output_file)

            return sbom_data

        except FileNotFoundError:
            logger.error("Syft 未安装,请运行: brew install syft")
            raise

    def parse_cyclonedx(self, sbom: dict) -> list[Component]:
        """
        解析 CycloneDX 格式的 SBOM

        Args:
            sbom: CycloneDX JSON 数据
        Returns:
            组件列表
        """
        self.components = []

        for comp in sbom.get("components", []):
            component = Component(
                name=comp.get("name", ""),
                version=comp.get("version", ""),
                type=comp.get("type", "library"),
                purl=comp.get("purl", ""),
                licenses=[
                    lic.get("license", {}).get("id", "unknown")
                    for lic in comp.get("licenses", [])
                ],
                supplier=comp.get("supplier", {}).get("name", ""),
            )
            self.components.append(component)

        logger.info("SBOM 解析完成: %d 个组件", len(self.components))
        return self.components

    def analyze_licenses(self) -> dict:
        """分析许可证分布"""
        license_count = Counter()
        for comp in self.components:
            for lic in comp.licenses:
                license_count[lic] += 1
            if not comp.licenses:
                license_count["UNKNOWN"] += 1

        return dict(license_count.most_common())

    def find_risky_licenses(self,
                            risky_licenses: list = None) -> list[Component]:
        """
        查找使用高风险许可证的组件

        Args:
            risky_licenses: 高风险许可证列表
        Returns:
            使用高风险许可证的组件列表
        """
        if risky_licenses is None:
            risky_licenses = [
                "GPL-2.0-only", "GPL-3.0-only",
                "AGPL-3.0-only", "SSPL-1.0",
                "EUPL-1.2", "CPAL-1.0",
            ]

        risky = []
        for comp in self.components:
            for lic in comp.licenses:
                if lic in risky_licenses:
                    risky.append(comp)
                    break

        if risky:
            logger.warning("发现 %d 个使用高风险许可证的组件", len(risky))
        return risky

    def get_statistics(self) -> dict:
        """获取 SBOM 统计信息"""
        type_count = Counter(c.type for c in self.components)
        return {
            "total_components": len(self.components),
            "by_type": dict(type_count),
            "licenses": self.analyze_licenses(),
            "with_purl": sum(1 for c in self.components if c.purl),
            "without_version": sum(1 for c in self.components if not c.version),
        }

    def generate_report(self) -> str:
        """生成 SBOM 分析报告"""
        stats = self.get_statistics()
        risky = self.find_risky_licenses()

        lines = [
            "=" * 60,
            "SBOM 分析报告",
            "=" * 60,
            f"组件总数: {stats['total_components']}",
            f"有 PURL:  {stats['with_purl']}",
            f"无版本号: {stats['without_version']}",
            "",
            "组件类型分布:",
        ]
        for t, count in stats["by_type"].items():
            lines.append(f"  {t}: {count}")

        lines.append("\n许可证分布:")
        for lic, count in stats["licenses"].items():
            lines.append(f"  {lic}: {count}")

        if risky:
            lines.append(f"\n⚠️  高风险许可证组件 ({len(risky)}):")
            for comp in risky:
                lines.append(f"  - {comp.name}@{comp.version} ({', '.join(comp.licenses)})")

        lines.append("=" * 60)
        return "\n".join(lines)

    def scan_vulnerabilities(self, sbom_file: str) -> dict:
        """
        使用 Grype 扫描 SBOM 中的漏洞

        Args:
            sbom_file: SBOM 文件路径
        Returns:
            Grype 扫描结果
        """
        try:
            cmd = ["grype", f"sbom:{sbom_file}", "-o", "json", "--quiet"]
            logger.info("扫描 SBOM 漏洞: %s", sbom_file)

            result = subprocess.run(
                cmd, capture_output=True, text=True, timeout=300
            )
            return json.loads(result.stdout) if result.stdout else {}

        except FileNotFoundError:
            logger.error("Grype 未安装,请运行: brew install grype")
            raise


# ── 使用示例 ────────────────────────────────────────────────────
if __name__ == "__main__":
    import sys

    manager = SBOMManager()
    target = sys.argv[1] if len(sys.argv) > 1 else "python:3.12-slim"

    # 生成 SBOM
    try:
        sbom = manager.generate_sbom(target, output_file="/tmp/sbom.json")
    except Exception as e:
        logger.error("SBOM 生成失败: %s", e)
        sys.exit(1)

    # 解析并分析
    manager.parse_cyclonedx(sbom)
    report = manager.generate_report()
    print(report)

    # 检查高风险许可证
    risky = manager.find_risky_licenses()
    if risky:
        print(f"\n⚠️  发现 {len(risky)} 个高风险许可证组件,请审查!")

Java: Spring Boot 容器化安全配置

/**
 * Spring Boot 容器化安全配置
 * 适用于在 Kubernetes 中运行的 Spring Boot 应用
 */
package com.example.security;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.security.config.annotation.web.builders.HttpSecurity;
import org.springframework.security.config.annotation.web.configuration.EnableWebSecurity;
import org.springframework.security.config.http.SessionCreationPolicy;
import org.springframework.security.web.SecurityFilterChain;
import org.springframework.security.web.header.writers.ReferrerPolicyHeaderWriter;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;

import java.util.Map;

@SpringBootApplication
public class SecureApplication {
    public static void main(String[] args) {
        SpringApplication.run(SecureApplication.class, args);
    }
}

/**
 * 安全配置 — 适合容器化部署
 */
@Configuration
@EnableWebSecurity
class SecurityConfig {

    @Bean
    public SecurityFilterChain filterChain(HttpSecurity http) throws Exception {
        http
            // ✅ 无状态会话(容器化应用不应依赖 session)
            .sessionManagement(session ->
                session.sessionCreationPolicy(SessionCreationPolicy.STATELESS))

            // ✅ 安全头部
            .headers(headers -> headers
                // 防止点击劫持
                .frameOptions(frame -> frame.deny())
                // 内容安全策略
                .contentSecurityPolicy(csp ->
                    csp.policyDirectives("default-src 'self'; script-src 'self'"))
                // 防止 MIME 类型嗅探
                .contentTypeOptions(content -> {})
                // 引用策略
                .referrerPolicy(referrer ->
                    referrer.policy(ReferrerPolicyHeaderWriter.ReferrerPolicy.STRICT_ORIGIN_WHEN_CROSS_ORIGIN))
                // HSTS
                .httpStrictTransportSecurity(hsts ->
                    hsts.includeSubDomains(true).maxAgeInSeconds(31536000))
            )

            // ✅ 路径权限
            .authorizeHttpRequests(auth -> auth
                // 健康检查端点(K8s 探针需要)
                .requestMatchers("/health", "/ready", "/metrics").permitAll()
                // 其他请求需要认证
                .anyRequest().authenticated()
            )

            // ✅ 禁用不需要的功能
            .csrf(csrf -> csrf.disable())  // 无状态 API 不需要 CSRF
            .formLogin(form -> form.disable())
            .httpBasic(basic -> basic.disable());

        return http.build();
    }
}

/**
 * 健康检查控制器 — K8s 探针使用
 */
@RestController
class HealthController {

    @GetMapping("/health")
    public Map<String, String> health() {
        return Map.of("status", "UP");
    }

    @GetMapping("/ready")
    public Map<String, String> ready() {
        // 检查依赖服务是否就绪
        return Map.of("status", "READY");
    }
}

application.yaml — 容器化安全配置:

# Spring Boot 容器化安全配置
server:
  port: 8080
  # ✅ 安全头部
  servlet:
    session:
      cookie:
        secure: true
        http-only: true
        same-site: strict
  # ✅ 错误处理(不暴露堆栈信息)
  error:
    include-stacktrace: never
    include-message: never

# ✅ Actuator 安全配置
management:
  endpoints:
    web:
      exposure:
        include: health,ready,metrics
      base-path: /
  endpoint:
    health:
      show-details: never  # 不暴露详细健康信息
  server:
    port: 8081  # 管理端口与业务端口分离

# ✅ 日志配置(不记录敏感信息)
logging:
  level:
    root: INFO
    org.springframework.security: WARN
  pattern:
    console: "%d{ISO8601} [%thread] %-5level %logger{36} - %msg%n"

# ✅ 优雅关闭(K8s 需要)
spring:
  lifecycle:
    timeout-per-shutdown-phase: 30s

Java: Jib 安全构建配置

/**
 * Jib 安全构建配置 — 无需 Docker daemon 构建安全镜像
 *
 * build.gradle:
 */
// ── build.gradle ────────────────────────────────────────────────
/*
plugins {
    id 'org.springframework.boot' version '3.3.0'
    id 'com.google.cloud.tools.jib' version '3.4.1'
}

jib {
    // ✅ 使用 distroless 基础镜像(极小攻击面)
    from {
        image = 'gcr.io/distroless/java21-debian12:nonroot'
        // 或使用 Eclipse Temurin:
        // image = 'eclipse-temurin:21-jre-jammy'
    }

    to {
        image = "${System.getenv('REGISTRY') ?: 'myregistry.com'}/myapp"
        tags = [version, 'latest']
        // ✅ 镜像签名(需要配置 cosign)
    }

    container {
        // ✅ 非 root 用户运行
        user = '65532:65532'

        // ✅ JVM 安全参数
        jvmFlags = [
            '-XX:+UseContainerSupport',
            '-XX:MaxRAMPercentage=75.0',
            '-Djava.security.egd=file:/dev/urandom',
            '-Dfile.encoding=UTF-8',
            // 禁用不安全的序列化
            '-Djdk.serialFilter=!*',
        ]

        // ✅ 只暴露必要端口
        ports = ['8080']

        // ✅ 标签
        labels = [
            'maintainer': 'security-team@example.com',
            'org.opencontainers.image.source': 'https://github.com/org/repo',
            'org.opencontainers.image.description': 'Secure Spring Boot App',
        ]

        // ✅ 创建时间(可重现构建)
        creationTime = 'USE_CURRENT_TIMESTAMP'

        // ✅ 环境变量
        environment = [
            'JAVA_TOOL_OPTIONS': '-XX:+UseContainerSupport',
            'TZ': 'UTC',
        ]
    }

    // ✅ 额外目录(如配置文件)
    extraDirectories {
        paths {
            path {
                from = 'src/main/resources/docker'
                into = '/app/config'
            }
        }
        permissions = [
            '/app/config/*': '444',  // 只读
        ]
    }
}
*/

// ── Jib 构建命令 ────────────────────────────────────────────────
// 构建并推送到仓库(无需 Docker daemon):
//   ./gradlew jib
//
// 构建到本地 Docker:
//   ./gradlew jibDockerBuild
//
// 构建为 tar 文件:
//   ./gradlew jibBuildTar

28.4 容器安全扫描工具对比

特性

Trivy

Grype

Snyk Container

开源

✅ Apache 2.0

✅ Apache 2.0

⚠️ 免费层 + 商业

漏洞数据库

NVD, GitHub Advisory, 多源

NVD, GitHub Advisory

Snyk 自有数据库

扫描速度

⚡ 快(本地缓存)

⚡ 快

🐢 较慢(云端分析)

OS 包扫描

语言包扫描

✅ (pip, npm, go, maven…)

IaC 扫描

✅ (K8s, Terraform, Docker)

SBOM 生成

✅ CycloneDX, SPDX

❌ (用 Syft)

许可证扫描

密钥扫描

CI/CD 集成

✅ GitHub Actions, GitLab CI

✅ GitHub Actions

✅ 原生集成

IDE 插件

✅ VS Code

✅ VS Code, IntelliJ

修复建议

⚠️ 基本

⚠️ 基本

✅ 详细(含 PR)

安装

brew install trivy

brew install grype

npm install -g snyk

选型建议

  • 全能型Trivy(漏洞 + IaC + SBOM + 密钥,一个工具搞定)

  • 专注漏洞扫描Grype + Syft(Anchore 生态)

  • 企业级 + 修复建议Snyk(商业支持,自动修复 PR)

  • 最佳实践:CI 中同时使用 Trivy + Grype,交叉验证

28.5 Kubernetes 安全加固清单

集群级别

类别

检查项

优先级

工具/方法

API Server

禁用匿名认证

🔴 高

--anonymous-auth=false

API Server

启用审计日志

🔴 高

--audit-log-path

API Server

启用 RBAC

🔴 高

--authorization-mode=RBAC

etcd

启用静态加密

🔴 高

EncryptionConfiguration

etcd

启用 TLS

🔴 高

--cert-file, --key-file

Kubelet

禁用匿名认证

🔴 高

--anonymous-auth=false

Kubelet

启用 Webhook 认证

🟡 中

--authentication-token-webhook

网络

启用 Network Policy

🔴 高

Calico / Cilium

网络

加密 Pod 间通信

🟡 中

WireGuard / Istio mTLS

工作负载级别

类别

检查项

优先级

说明

PSS

生产命名空间使用 restricted

🔴 高

强制非 root、只读 FS

镜像

禁止使用 latest 标签

🔴 高

OPA/Kyverno 策略

镜像

只允许可信仓库

🔴 高

Admission Webhook

镜像

启用镜像签名验证

🟡 中

Cosign + Kyverno

资源

设置 CPU/内存限制

🔴 高

防止资源耗尽

Secrets

使用 External Secrets

🔴 高

不在 YAML 中硬编码

SA

禁用自动挂载 SA Token

🟡 中

automountServiceAccountToken: false

容器

只读根文件系统

🟡 中

readOnlyRootFilesystem: true

容器

丢弃所有 capabilities

🔴 高

drop: [ALL]

运行时安全

类别

检查项

优先级

工具

监控

异常行为检测

🔴 高

Falco / Tetragon

扫描

定期镜像漏洞扫描

🔴 高

Trivy Operator

日志

集中式日志收集

🔴 高

Fluentd / Vector

审计

K8s 审计日志分析

🟡 中

ELK / Loki

备份

etcd 定期备份

🔴 高

Velero

CIS Benchmark 自动检查

# 使用 kube-bench 检查 CIS Kubernetes Benchmark
# 安装: brew install kube-bench
kube-bench run --targets master,node,policies

# 使用 Trivy 检查 K8s 安全配置
trivy k8s --report summary cluster

28.6 小结

  • 4C 模型 从云平台到代码,每一层都需要安全措施

  • 容器安全:最小镜像 + 非 root + 漏洞扫描 + 签名

  • K8s 安全:RBAC + PSS + Network Policy + Admission Control

  • OPA Gatekeeper 在 Admission 阶段强制执行安全策略

  • 供应链安全(SLSA)确保从源代码到部署的完整性

  • SBOM 提供软件组成透明度,便于漏洞追踪和许可证合规

  • Admission Webhook 可自定义安全策略,在部署前拦截违规配置

  • 安全是分层的,每一层都不能缺少