# 第二十八章:云原生安全
> "云原生不是更安全,而是安全的方式不同了。"
```{mermaid}
mindmap
root((云原生安全))
4C 模型
Cloud
Cluster
Container
Code
容器安全
镜像扫描
运行时安全
镜像签名
SBOM
K8s 安全
RBAC
PSS
Network Policy
Admission Control
合规
SOC 2
ISO 27001
CIS Benchmarks
```
## 28.1 4C 安全模型
```{mermaid}
flowchart TB
subgraph Cloud["☁️ Cloud(云平台)"]
direction TB
CloudDesc["IAM · VPC · 加密 · 审计日志"]
subgraph Cluster["🏗️ Cluster(集群)"]
direction TB
ClusterDesc["RBAC · PSS · Network Policy · Admission Control"]
subgraph Container["📦 Container(容器)"]
direction TB
ContainerDesc["镜像扫描 · 最小基础镜像 · 非 root · 签名"]
subgraph Code["💻 Code(代码)"]
CodeDesc["SAST · SCA · 安全编码 · 密钥管理"]
end
end
end
end
style Cloud fill:#1a237e,stroke:#333,color:#fff
style Cluster fill:#283593,stroke:#333,color:#fff
style Container fill:#3949ab,stroke:#333,color:#fff
style Code fill:#5c6bc0,stroke:#333,color:#fff
```
每一层的安全职责:
| 层级 | 安全措施 | 工具 |
|------|---------|------|
| Cloud | IAM 最小权限、VPC 隔离、静态加密、审计日志 | AWS IAM, CloudTrail, VPC |
| Cluster | RBAC、Pod Security Standards、Network Policy | K8s RBAC, OPA, Cilium |
| Container | 镜像扫描、签名验证、非 root、只读文件系统 | Trivy, Cosign, Falco |
| Code | SAST、SCA、密钥管理、安全依赖 | Semgrep, Snyk, Vault |
## 28.2 容器安全
### 容器安全扫描流程
```{mermaid}
flowchart LR
A["👨💻 开发者
提交代码"] --> B["🔨 CI 构建
Docker 镜像"]
B --> C["🔍 镜像扫描
Trivy/Grype"]
C --> D{"漏洞等级?"}
D -->|"CRITICAL/HIGH"| E["❌ 构建失败
通知开发者"]
E --> F["🔧 修复漏洞"]
F --> A
D -->|"MEDIUM/LOW"| G["⚠️ 记录告警
继续流水线"]
D -->|"无漏洞"| H["✅ 通过"]
G --> I["✍️ 镜像签名
Cosign"]
H --> I
I --> J["📦 推送到
私有仓库"]
J --> K["🚀 部署到
K8s 集群"]
K --> L["🛡️ Admission
验证签名"]
style E fill:#F44336,stroke:#333,color:#fff
style H fill:#4CAF50,stroke:#333,color:#fff
style I fill:#FF9800,stroke:#333,color:#fff
style L fill:#2196F3,stroke:#333,color:#fff
```
### 镜像扫描
```bash
# Trivy — 容器镜像漏洞扫描
trivy image myapp:latest
trivy image --severity HIGH,CRITICAL myapp:latest
# Grype — 另一个流行的扫描工具
grype myapp:latest
```
### 安全 Dockerfile — Python
```dockerfile
# ✅ Python 安全最佳实践 Dockerfile
# 多阶段构建 + 非 root + 最小镜像
# ── 阶段 1:构建依赖 ──────────────────────────────────────────
FROM python:3.12-slim AS builder
WORKDIR /build
# 只复制依赖文件(利用 Docker 缓存)
COPY requirements.txt .
RUN pip install --no-cache-dir --prefix=/install -r requirements.txt
# ── 阶段 2:运行时镜像 ────────────────────────────────────────
FROM python:3.12-slim
# ✅ 安装安全更新
RUN apt-get update && \
apt-get upgrade -y && \
apt-get clean && \
rm -rf /var/lib/apt/lists/*
WORKDIR /app
# ✅ 创建非 root 用户
RUN groupadd -r appuser && \
useradd -r -g appuser -d /app -s /sbin/nologin appuser
# ✅ 从构建阶段复制依赖(不包含构建工具)
COPY --from=builder /install /usr/local
COPY --chown=appuser:appuser . .
# ✅ 以非 root 用户运行
USER appuser
# ✅ 健康检查
HEALTHCHECK --interval=30s --timeout=3s --retries=3 \
CMD python -c "import urllib.request; urllib.request.urlopen('http://localhost:8000/health')" || exit 1
# ✅ 只暴露必要端口
EXPOSE 8000
# ✅ 使用 exec 形式(PID 1 信号处理)
CMD ["python", "-m", "uvicorn", "main:app", "--host", "0.0.0.0", "--port", "8000"]
# 运行时安全加固:
# docker run --read-only --tmpfs /tmp --cap-drop ALL --security-opt no-new-privileges myapp:latest
```
### 安全 Dockerfile — Java
```dockerfile
# ✅ Java 安全最佳实践 Dockerfile
# 多阶段构建 + JLink 最小 JRE + 非 root
# ── 阶段 1:构建 ──────────────────────────────────────────────
FROM eclipse-temurin:21-jdk-jammy AS builder
WORKDIR /build
# ✅ 只复制构建文件(利用缓存)
COPY gradle/ gradle/
COPY gradlew build.gradle settings.gradle ./
RUN ./gradlew dependencies --no-daemon
COPY src/ src/
RUN ./gradlew bootJar --no-daemon -x test
# ✅ 使用 jlink 创建最小 JRE(减少攻击面)
RUN jlink \
--add-modules java.base,java.logging,java.sql,java.naming,java.management,java.instrument,java.security.jgss,java.desktop \
--strip-debug \
--no-man-pages \
--no-header-files \
--compress=zip-6 \
--output /custom-jre
# ── 阶段 2:运行时镜像 ────────────────────────────────────────
FROM debian:bookworm-slim
# ✅ 安装安全更新
RUN apt-get update && \
apt-get upgrade -y && \
apt-get clean && \
rm -rf /var/lib/apt/lists/*
# ✅ 复制最小 JRE
COPY --from=builder /custom-jre /opt/java
ENV PATH="/opt/java/bin:${PATH}"
WORKDIR /app
# ✅ 创建非 root 用户
RUN groupadd -r appuser && \
useradd -r -g appuser -d /app -s /sbin/nologin appuser
# ✅ 复制应用 JAR
COPY --from=builder --chown=appuser:appuser /build/build/libs/*.jar app.jar
# ✅ 以非 root 用户运行
USER appuser
# ✅ JVM 安全参数
ENV JAVA_OPTS="-XX:+UseContainerSupport \
-XX:MaxRAMPercentage=75.0 \
-Djava.security.egd=file:/dev/urandom \
-Dserver.port=8080"
HEALTHCHECK --interval=30s --timeout=3s --retries=3 \
CMD java -cp app.jar com.example.HealthCheck || exit 1
EXPOSE 8080
ENTRYPOINT ["sh", "-c", "java $JAVA_OPTS -jar app.jar"]
```
### 安全 Dockerfile — Go
```dockerfile
# ✅ Go 安全最佳实践 Dockerfile
# 多阶段构建 + scratch/distroless + 静态编译
# ── 阶段 1:构建 ──────────────────────────────────────────────
FROM golang:1.22-bookworm AS builder
WORKDIR /build
# ✅ 只复制依赖文件(利用缓存)
COPY go.mod go.sum ./
RUN go mod download && go mod verify
COPY . .
# ✅ 静态编译(CGO_ENABLED=0),禁用符号表和调试信息
RUN CGO_ENABLED=0 GOOS=linux GOARCH=amd64 go build \
-ldflags="-w -s -X main.version=$(git describe --tags --always)" \
-trimpath \
-o /app/server ./cmd/server
# ── 阶段 2:运行时镜像(使用 distroless,极小攻击面)────────
FROM gcr.io/distroless/static-debian12:nonroot
# ✅ 复制二进制文件
COPY --from=builder /app/server /server
# ✅ 复制 CA 证书(用于 HTTPS 请求)
COPY --from=builder /etc/ssl/certs/ca-certificates.crt /etc/ssl/certs/
# ✅ distroless:nonroot 默认以 UID 65532 运行
USER nonroot:nonroot
EXPOSE 8080
ENTRYPOINT ["/server"]
# 镜像大小对比:
# golang:1.22 ~800MB
# debian:slim ~80MB
# distroless/static ~2MB ← 极小攻击面
# scratch ~0MB (但没有 CA 证书和时区数据)
```
### 镜像签名(Cosign)
```bash
# 签名镜像
cosign sign --key cosign.key myregistry.com/myapp:latest
# 验证签名
cosign verify --key cosign.pub myregistry.com/myapp:latest
```
### 供应链安全(SLSA)流程
```{mermaid}
flowchart TB
subgraph Source["📝 源代码"]
S1["代码审查"]
S2["签名提交"]
S3["分支保护"]
end
subgraph Build["🔨 构建"]
B1["隔离构建环境"]
B2["可重现构建"]
B3["构建来源证明
(Provenance)"]
end
subgraph Verify["✅ 验证"]
V1["SBOM 生成"]
V2["漏洞扫描"]
V3["签名验证"]
end
subgraph Deploy["🚀 部署"]
D1["Admission Webhook
验证签名+来源"]
D2["策略引擎
(OPA/Kyverno)"]
D3["运行时监控
(Falco)"]
end
Source --> Build --> Verify --> Deploy
style Source fill:#E8F5E9,stroke:#4CAF50
style Build fill:#E3F2FD,stroke:#2196F3
style Verify fill:#FFF3E0,stroke:#FF9800
style Deploy fill:#F3E5F5,stroke:#9C27B0
```
**SLSA 级别**:
| 级别 | 要求 | 说明 |
|------|------|------|
| SLSA 1 | 构建过程有文档 | 基本的构建来源记录 |
| SLSA 2 | 使用托管构建服务 | 构建服务生成来源证明 |
| SLSA 3 | 构建环境加固 | 隔离构建、不可篡改的来源证明 |
| SLSA 4 | 双人审查 + 密封构建 | 最高安全级别 |
## 28.3 Kubernetes 安全
### Pod Security 决策流程
```{mermaid}
flowchart TD
A["Pod 创建请求"] --> B{"命名空间 PSS 级别?"}
B -->|"privileged"| C["✅ 允许所有配置"]
B -->|"baseline"| D{"检查基线策略"}
B -->|"restricted"| E{"检查受限策略"}
D --> D1{"hostNetwork?"}
D1 -->|"true"| DENY["❌ 拒绝"]
D1 -->|"false"| D2{"privileged?"}
D2 -->|"true"| DENY
D2 -->|"false"| D3{"hostPID/hostIPC?"}
D3 -->|"true"| DENY
D3 -->|"false"| ALLOW_B["✅ 允许"]
E --> E1{"runAsNonRoot?"}
E1 -->|"false/未设置"| DENY
E1 -->|"true"| E2{"allowPrivilegeEscalation?"}
E2 -->|"true/未设置"| DENY
E2 -->|"false"| E3{"capabilities 只有
NET_BIND_SERVICE?"}
E3 -->|"否"| DENY
E3 -->|"是"| E4{"seccompProfile
= RuntimeDefault?"}
E4 -->|"否"| DENY
E4 -->|"是"| ALLOW_R["✅ 允许"]
style DENY fill:#F44336,stroke:#333,color:#fff
style ALLOW_B fill:#4CAF50,stroke:#333,color:#fff
style ALLOW_R fill:#4CAF50,stroke:#333,color:#fff
style A fill:#2196F3,stroke:#333,color:#fff
```
### RBAC
```yaml
# 最小权限 Role
apiVersion: rbac.authorization.k8s.io/v1
kind: Role
metadata:
name: app-reader
namespace: production
rules:
- apiGroups: [""]
resources: ["pods", "services"]
verbs: ["get", "list", "watch"]
- apiGroups: ["apps"]
resources: ["deployments"]
verbs: ["get", "list"]
---
apiVersion: rbac.authorization.k8s.io/v1
kind: RoleBinding
metadata:
name: app-reader-binding
namespace: production
subjects:
- kind: ServiceAccount
name: my-app
namespace: production
roleRef:
kind: Role
name: app-reader
apiGroup: rbac.authorization.k8s.io
```
### Pod Security Standards(PSS)— Restricted
```yaml
# ── Namespace 级别强制 Restricted PSS ──────────────────────────
apiVersion: v1
kind: Namespace
metadata:
name: production
labels:
pod-security.kubernetes.io/enforce: restricted
pod-security.kubernetes.io/enforce-version: latest
pod-security.kubernetes.io/audit: restricted
pod-security.kubernetes.io/audit-version: latest
pod-security.kubernetes.io/warn: restricted
pod-security.kubernetes.io/warn-version: latest
---
# ── 符合 Restricted PSS 的 Pod 示例 ───────────────────────────
apiVersion: v1
kind: Pod
metadata:
name: secure-app
namespace: production
spec:
# ✅ 不使用 host 命名空间
hostNetwork: false
hostPID: false
hostIPC: false
# ✅ Pod 级别安全上下文
securityContext:
runAsNonRoot: true
runAsUser: 65534
runAsGroup: 65534
fsGroup: 65534
seccompProfile:
type: RuntimeDefault
containers:
- name: app
image: myregistry.com/myapp:v1.2.3 # ✅ 不使用 latest
ports:
- containerPort: 8080
# ✅ 容器级别安全上下文
securityContext:
allowPrivilegeEscalation: false
readOnlyRootFilesystem: true
runAsNonRoot: true
runAsUser: 65534
capabilities:
drop:
- ALL
seccompProfile:
type: RuntimeDefault
# ✅ 资源限制(防止资源耗尽攻击)
resources:
requests:
cpu: 100m
memory: 128Mi
limits:
cpu: 500m
memory: 512Mi
# ✅ 健康检查
livenessProbe:
httpGet:
path: /health
port: 8080
initialDelaySeconds: 10
periodSeconds: 30
readinessProbe:
httpGet:
path: /ready
port: 8080
initialDelaySeconds: 5
periodSeconds: 10
# ✅ 临时目录(只读根文件系统需要)
volumeMounts:
- name: tmp
mountPath: /tmp
volumes:
- name: tmp
emptyDir:
sizeLimit: 100Mi
# ✅ 服务账户(最小权限)
serviceAccountName: my-app
automountServiceAccountToken: false # 不需要时禁用
```
### NetworkPolicy 示例
```yaml
# ── 默认拒绝所有入站流量 ──────────────────────────────────────
apiVersion: networking.k8s.io/v1
kind: NetworkPolicy
metadata:
name: default-deny-ingress
namespace: production
spec:
podSelector: {} # 匹配所有 Pod
policyTypes:
- Ingress
---
# ── 默认拒绝所有出站流量 ──────────────────────────────────────
apiVersion: networking.k8s.io/v1
kind: NetworkPolicy
metadata:
name: default-deny-egress
namespace: production
spec:
podSelector: {}
policyTypes:
- Egress
---
# ── 允许 frontend → backend 的流量 ────────────────────────────
apiVersion: networking.k8s.io/v1
kind: NetworkPolicy
metadata:
name: allow-frontend-to-backend
namespace: production
spec:
podSelector:
matchLabels:
app: backend
policyTypes:
- Ingress
ingress:
- from:
- podSelector:
matchLabels:
app: frontend
ports:
- protocol: TCP
port: 8080
---
# ── 允许 backend → database 的流量 ────────────────────────────
apiVersion: networking.k8s.io/v1
kind: NetworkPolicy
metadata:
name: allow-backend-to-db
namespace: production
spec:
podSelector:
matchLabels:
app: backend
policyTypes:
- Egress
egress:
- to:
- podSelector:
matchLabels:
app: database
ports:
- protocol: TCP
port: 5432
# ✅ 允许 DNS 查询
- to:
- namespaceSelector: {}
podSelector:
matchLabels:
k8s-app: kube-dns
ports:
- protocol: UDP
port: 53
```
### Admission Control(OPA Gatekeeper)
```yaml
# ── ConstraintTemplate:禁止使用指定镜像标签 ──────────────────
apiVersion: templates.gatekeeper.sh/v1
kind: ConstraintTemplate
metadata:
name: k8sdisallowedtags
spec:
crd:
spec:
names:
kind: K8sDisallowedTags
validation:
openAPIV3Schema:
type: object
properties:
tags:
type: array
description: "禁止使用的镜像标签列表"
items:
type: string
exemptImages:
type: array
description: "豁免的镜像列表(支持通配符)"
items:
type: string
targets:
- target: admission.k8s.gatekeeper.sh
rego: |
package k8sdisallowedtags
# 检查所有容器(包括 init 容器)
violation[{"msg": msg}] {
container := input_containers[_]
tags := {t | t := input.parameters.tags[_]}
tag := get_tag(container.image)
tags[tag]
not is_exempt(container.image)
msg := sprintf(
"容器 '%s' 使用了禁止的镜像标签 '%s'(镜像: %s)",
[container.name, tag, container.image]
)
}
# 没有标签等同于 latest
violation[{"msg": msg}] {
container := input_containers[_]
tags := {t | t := input.parameters.tags[_]}
not contains(container.image, ":")
tags["latest"]
not is_exempt(container.image)
msg := sprintf(
"容器 '%s' 未指定镜像标签(等同于 latest,镜像: %s)",
[container.name, container.image]
)
}
input_containers[c] {
c := input.review.object.spec.containers[_]
}
input_containers[c] {
c := input.review.object.spec.initContainers[_]
}
get_tag(image) = tag {
parts := split(image, ":")
count(parts) == 2
tag := parts[1]
}
is_exempt(image) {
exempt := input.parameters.exemptImages[_]
glob.match(exempt, [], image)
}
---
# ── Constraint:禁止使用 latest 标签 ──────────────────────────
apiVersion: constraints.gatekeeper.sh/v1beta1
kind: K8sDisallowedTags
metadata:
name: no-latest-tag
spec:
enforcementAction: deny
match:
kinds:
- apiGroups: [""]
kinds: ["Pod"]
namespaces:
- production
- staging
parameters:
tags: ["latest"]
exemptImages:
- "gcr.io/distroless/*" # 豁免 distroless 镜像
---
# ── ConstraintTemplate:要求资源限制 ──────────────────────────
apiVersion: templates.gatekeeper.sh/v1
kind: ConstraintTemplate
metadata:
name: k8srequiredresources
spec:
crd:
spec:
names:
kind: K8sRequiredResources
targets:
- target: admission.k8s.gatekeeper.sh
rego: |
package k8srequiredresources
violation[{"msg": msg}] {
container := input.review.object.spec.containers[_]
not container.resources.limits
msg := sprintf(
"容器 '%s' 必须设置资源限制 (resources.limits)",
[container.name]
)
}
violation[{"msg": msg}] {
container := input.review.object.spec.containers[_]
not container.resources.requests
msg := sprintf(
"容器 '%s' 必须设置资源请求 (resources.requests)",
[container.name]
)
}
---
# ── Constraint:要求所有容器设置资源限制 ──────────────────────
apiVersion: constraints.gatekeeper.sh/v1beta1
kind: K8sRequiredResources
metadata:
name: require-resource-limits
spec:
enforcementAction: deny
match:
kinds:
- apiGroups: [""]
kinds: ["Pod"]
namespaces:
- production
```
### Go: 容器镜像签名验证(Cosign/Sigstore)
```go
/*
容器镜像签名验证 — 使用 cosign/sigstore 库
依赖:
go get github.com/sigstore/cosign/v2
go get github.com/sigstore/sigstore/pkg/signature
*/
package main
import (
"context"
"crypto"
"encoding/json"
"fmt"
"log"
"os"
"github.com/google/go-containerregistry/pkg/name"
"github.com/sigstore/cosign/v2/cmd/cosign/cli/fulcio"
"github.com/sigstore/cosign/v2/cmd/cosign/cli/options"
"github.com/sigstore/cosign/v2/cmd/cosign/cli/verify"
"github.com/sigstore/cosign/v2/pkg/cosign"
ociremote "github.com/sigstore/cosign/v2/pkg/oci/remote"
"github.com/sigstore/sigstore/pkg/signature"
)
// ImageVerifier 镜像签名验证器
type ImageVerifier struct {
publicKeyPath string
}
// NewImageVerifier 创建镜像验证器
func NewImageVerifier(publicKeyPath string) *ImageVerifier {
return &ImageVerifier{publicKeyPath: publicKeyPath}
}
// VerifyImage 验证镜像签名
func (iv *ImageVerifier) VerifyImage(imageRef string) (*VerificationResult, error) {
ctx := context.Background()
// 解析镜像引用
ref, err := name.ParseReference(imageRef)
if err != nil {
return nil, fmt.Errorf("无效的镜像引用 '%s': %w", imageRef, err)
}
// 加载公钥
pubKey, err := signature.LoadPublicKeyRaw(ctx, iv.publicKeyPath, crypto.SHA256)
if err != nil {
return nil, fmt.Errorf("加载公钥失败: %w", err)
}
// 配置验证选项
checkOpts := &cosign.CheckOpts{
SigVerifier: pubKey,
}
// 执行验证
signatures, bundleVerified, err := cosign.VerifyImageSignatures(ctx, ref, checkOpts)
if err != nil {
return &VerificationResult{
Image: imageRef,
Verified: false,
Error: err.Error(),
}, nil
}
result := &VerificationResult{
Image: imageRef,
Verified: true,
BundleVerified: bundleVerified,
SignatureCount: len(signatures),
}
// 提取签名负载信息
for _, sig := range signatures {
payload, err := sig.Payload()
if err == nil {
result.Payloads = append(result.Payloads, string(payload))
}
}
log.Printf("✅ 镜像签名验证通过: %s (%d 个签名)", imageRef, len(signatures))
return result, nil
}
// VerificationResult 验证结果
type VerificationResult struct {
Image string `json:"image"`
Verified bool `json:"verified"`
BundleVerified bool `json:"bundle_verified"`
SignatureCount int `json:"signature_count"`
Payloads []string `json:"payloads,omitempty"`
Error string `json:"error,omitempty"`
}
// VerifyKeyless 使用 Keyless(Fulcio + Rekor)验证签名
func VerifyKeyless(imageRef, expectedIssuer, expectedSubject string) (*VerificationResult, error) {
ctx := context.Background()
ref, err := name.ParseReference(imageRef)
if err != nil {
return nil, fmt.Errorf("无效的镜像引用: %w", err)
}
// Keyless 验证:使用 Fulcio CA + Rekor 透明日志
checkOpts := &cosign.CheckOpts{
Identities: []cosign.Identity{
{
Issuer: expectedIssuer, // 如 "https://accounts.google.com"
Subject: expectedSubject, // 如 "user@example.com"
},
},
}
signatures, bundleVerified, err := cosign.VerifyImageSignatures(ctx, ref, checkOpts)
if err != nil {
return &VerificationResult{
Image: imageRef,
Verified: false,
Error: err.Error(),
}, nil
}
return &VerificationResult{
Image: imageRef,
Verified: true,
BundleVerified: bundleVerified,
SignatureCount: len(signatures),
}, nil
}
func main() {
// 使用公钥验证
verifier := NewImageVerifier("cosign.pub")
result, err := verifier.VerifyImage("myregistry.com/myapp:v1.2.3")
if err != nil {
log.Fatalf("验证失败: %v", err)
}
output, _ := json.MarshalIndent(result, "", " ")
fmt.Println(string(output))
if !result.Verified {
fmt.Println("❌ 镜像签名验证失败,拒绝部署!")
os.Exit(1)
}
fmt.Println("✅ 镜像签名验证通过,允许部署")
}
```
### Go: Admission Webhook 实现
```go
/*
Kubernetes Admission Webhook — 验证 Pod 安全策略
依赖:
go get k8s.io/api/admission/v1
go get k8s.io/apimachinery/pkg/runtime
*/
package main
import (
"encoding/json"
"fmt"
"io"
"log"
"net/http"
"os"
"strings"
admissionv1 "k8s.io/api/admission/v1"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/serializer"
)
var (
scheme = runtime.NewScheme()
codecs = serializer.NewCodecFactory(scheme)
)
// SecurityPolicy 安全策略配置
type SecurityPolicy struct {
// 禁止使用的镜像标签
DisallowedTags []string `json:"disallowed_tags"`
// 允许的镜像仓库前缀
AllowedRegistries []string `json:"allowed_registries"`
// 是否要求非 root 运行
RequireNonRoot bool `json:"require_non_root"`
// 是否要求资源限制
RequireResourceLimits bool `json:"require_resource_limits"`
// 是否要求只读根文件系统
RequireReadOnlyRoot bool `json:"require_readonly_root"`
}
// WebhookServer Admission Webhook 服务器
type WebhookServer struct {
policy SecurityPolicy
}
// NewWebhookServer 创建 Webhook 服务器
func NewWebhookServer(policy SecurityPolicy) *WebhookServer {
return &WebhookServer{policy: policy}
}
// handleValidate 处理验证请求
func (ws *WebhookServer) handleValidate(w http.ResponseWriter, r *http.Request) {
// 读取请求体
body, err := io.ReadAll(r.Body)
if err != nil {
http.Error(w, "读取请求失败", http.StatusBadRequest)
return
}
// 解析 AdmissionReview
var admissionReview admissionv1.AdmissionReview
if err := json.Unmarshal(body, &admissionReview); err != nil {
http.Error(w, "解析请求失败", http.StatusBadRequest)
return
}
// 验证 Pod
response := ws.validatePod(admissionReview.Request)
// 构建响应
admissionReview.Response = response
admissionReview.Response.UID = admissionReview.Request.UID
respBytes, err := json.Marshal(admissionReview)
if err != nil {
http.Error(w, "序列化响应失败", http.StatusInternalServerError)
return
}
w.Header().Set("Content-Type", "application/json")
w.Write(respBytes)
}
// validatePod 验证 Pod 安全策略
func (ws *WebhookServer) validatePod(req *admissionv1.AdmissionRequest) *admissionv1.AdmissionResponse {
// 解析 Pod
var pod corev1.Pod
if err := json.Unmarshal(req.Object.Raw, &pod); err != nil {
return &admissionv1.AdmissionResponse{
Allowed: false,
Result: &metav1.Status{
Message: fmt.Sprintf("解析 Pod 失败: %v", err),
},
}
}
var violations []string
// 检查所有容器
allContainers := append(pod.Spec.Containers, pod.Spec.InitContainers...)
for _, container := range allContainers {
// 检查镜像标签
violations = append(violations, ws.checkImageTag(container)...)
// 检查镜像仓库
violations = append(violations, ws.checkRegistry(container)...)
// 检查资源限制
if ws.policy.RequireResourceLimits {
violations = append(violations, ws.checkResourceLimits(container)...)
}
// 检查安全上下文
violations = append(violations, ws.checkSecurityContext(container)...)
}
if len(violations) > 0 {
log.Printf("❌ Pod %s/%s 被拒绝: %v", pod.Namespace, pod.Name, violations)
return &admissionv1.AdmissionResponse{
Allowed: false,
Result: &metav1.Status{
Message: fmt.Sprintf("安全策略违规:\n- %s",
strings.Join(violations, "\n- ")),
},
}
}
log.Printf("✅ Pod %s/%s 验证通过", pod.Namespace, pod.Name)
return &admissionv1.AdmissionResponse{Allowed: true}
}
func (ws *WebhookServer) checkImageTag(c corev1.Container) []string {
var violations []string
for _, tag := range ws.policy.DisallowedTags {
if strings.HasSuffix(c.Image, ":"+tag) {
violations = append(violations,
fmt.Sprintf("容器 '%s' 使用了禁止的标签 '%s'", c.Name, tag))
}
}
// 没有标签等同于 latest
if !strings.Contains(c.Image, ":") {
for _, tag := range ws.policy.DisallowedTags {
if tag == "latest" {
violations = append(violations,
fmt.Sprintf("容器 '%s' 未指定标签(等同于 latest)", c.Name))
}
}
}
return violations
}
func (ws *WebhookServer) checkRegistry(c corev1.Container) []string {
if len(ws.policy.AllowedRegistries) == 0 {
return nil
}
for _, reg := range ws.policy.AllowedRegistries {
if strings.HasPrefix(c.Image, reg) {
return nil
}
}
return []string{
fmt.Sprintf("容器 '%s' 的镜像 '%s' 不在允许的仓库列表中", c.Name, c.Image),
}
}
func (ws *WebhookServer) checkResourceLimits(c corev1.Container) []string {
var violations []string
if c.Resources.Limits == nil {
violations = append(violations,
fmt.Sprintf("容器 '%s' 必须设置资源限制", c.Name))
}
if c.Resources.Requests == nil {
violations = append(violations,
fmt.Sprintf("容器 '%s' 必须设置资源请求", c.Name))
}
return violations
}
func (ws *WebhookServer) checkSecurityContext(c corev1.Container) []string {
var violations []string
sc := c.SecurityContext
if ws.policy.RequireNonRoot {
if sc == nil || sc.RunAsNonRoot == nil || !*sc.RunAsNonRoot {
violations = append(violations,
fmt.Sprintf("容器 '%s' 必须设置 runAsNonRoot: true", c.Name))
}
}
if ws.policy.RequireReadOnlyRoot {
if sc == nil || sc.ReadOnlyRootFilesystem == nil || !*sc.ReadOnlyRootFilesystem {
violations = append(violations,
fmt.Sprintf("容器 '%s' 必须设置 readOnlyRootFilesystem: true", c.Name))
}
}
return violations
}
// handleHealth 健康检查
func (ws *WebhookServer) handleHealth(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusOK)
w.Write([]byte("ok"))
}
func main() {
policy := SecurityPolicy{
DisallowedTags: []string{"latest", "dev", "test"},
AllowedRegistries: []string{"myregistry.com/", "gcr.io/distroless/"},
RequireNonRoot: true,
RequireResourceLimits: true,
RequireReadOnlyRoot: true,
}
server := NewWebhookServer(policy)
mux := http.NewServeMux()
mux.HandleFunc("/validate", server.handleValidate)
mux.HandleFunc("/health", server.handleHealth)
// TLS 证书路径(K8s 会自动注入)
certFile := os.Getenv("TLS_CERT_FILE")
keyFile := os.Getenv("TLS_KEY_FILE")
if certFile == "" {
certFile = "/etc/webhook/certs/tls.crt"
}
if keyFile == "" {
keyFile = "/etc/webhook/certs/tls.key"
}
addr := ":8443"
log.Printf("🚀 Admission Webhook 启动: %s", addr)
log.Printf("📋 策略: %+v", policy)
if err := http.ListenAndServeTLS(addr, certFile, keyFile, mux); err != nil {
log.Fatalf("服务器启动失败: %v", err)
}
}
```
### Python: Trivy 扫描结果解析器
```python
"""
Trivy 扫描结果解析器 — 解析 JSON 输出,生成安全报告
依赖: pip install tabulate
使用: trivy image --format json -o results.json myapp:latest
"""
import json
import subprocess
import sys
import logging
from dataclasses import dataclass, field
from typing import Optional
from collections import Counter
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
@dataclass
class Vulnerability:
"""漏洞信息"""
vuln_id: str
pkg_name: str
installed_version: str
fixed_version: str
severity: str
title: str
description: str = ""
references: list = field(default_factory=list)
@dataclass
class ScanResult:
"""扫描结果"""
target: str
target_type: str
vulnerabilities: list = field(default_factory=list)
class TrivyScanner:
"""Trivy 扫描结果解析器"""
SEVERITY_ORDER = {"CRITICAL": 0, "HIGH": 1, "MEDIUM": 2, "LOW": 3, "UNKNOWN": 4}
def __init__(self):
self.results: list[ScanResult] = []
self.all_vulns: list[Vulnerability] = []
def scan_image(self, image: str, severity: str = "CRITICAL,HIGH,MEDIUM,LOW") -> dict:
"""
扫描容器镜像并返回 JSON 结果
Args:
image: 镜像名称(如 "myapp:latest")
severity: 漏洞严重级别过滤
Returns:
Trivy JSON 输出
"""
try:
cmd = [
"trivy", "image",
"--format", "json",
"--severity", severity,
"--quiet",
image,
]
logger.info("扫描镜像: %s", image)
result = subprocess.run(
cmd, capture_output=True, text=True, timeout=300
)
if result.returncode != 0 and not result.stdout:
logger.error("Trivy 扫描失败: %s", result.stderr)
raise RuntimeError(f"Trivy scan failed: {result.stderr}")
return json.loads(result.stdout)
except FileNotFoundError:
logger.error("Trivy 未安装,请运行: brew install trivy")
raise
except subprocess.TimeoutExpired:
logger.error("Trivy 扫描超时")
raise
def parse_results(self, trivy_output: dict) -> list[ScanResult]:
"""
解析 Trivy JSON 输出
Args:
trivy_output: Trivy JSON 输出字典
Returns:
解析后的扫描结果列表
"""
self.results = []
self.all_vulns = []
results = trivy_output.get("Results", [])
for result in results:
scan_result = ScanResult(
target=result.get("Target", "unknown"),
target_type=result.get("Type", "unknown"),
)
for vuln in result.get("Vulnerabilities", []):
v = Vulnerability(
vuln_id=vuln.get("VulnerabilityID", ""),
pkg_name=vuln.get("PkgName", ""),
installed_version=vuln.get("InstalledVersion", ""),
fixed_version=vuln.get("FixedVersion", "N/A"),
severity=vuln.get("Severity", "UNKNOWN"),
title=vuln.get("Title", ""),
description=vuln.get("Description", ""),
references=vuln.get("References", []),
)
scan_result.vulnerabilities.append(v)
self.all_vulns.append(v)
self.results.append(scan_result)
logger.info("解析完成: %d 个目标, %d 个漏洞",
len(self.results), len(self.all_vulns))
return self.results
def get_summary(self) -> dict:
"""获取漏洞摘要统计"""
severity_count = Counter(v.severity for v in self.all_vulns)
fixable = sum(1 for v in self.all_vulns if v.fixed_version != "N/A")
return {
"total": len(self.all_vulns),
"critical": severity_count.get("CRITICAL", 0),
"high": severity_count.get("HIGH", 0),
"medium": severity_count.get("MEDIUM", 0),
"low": severity_count.get("LOW", 0),
"fixable": fixable,
"unfixable": len(self.all_vulns) - fixable,
}
def get_critical_vulns(self) -> list[Vulnerability]:
"""获取所有 CRITICAL 和 HIGH 漏洞"""
return sorted(
[v for v in self.all_vulns if v.severity in ("CRITICAL", "HIGH")],
key=lambda v: self.SEVERITY_ORDER.get(v.severity, 99),
)
def generate_report(self) -> str:
"""生成文本格式的安全报告"""
summary = self.get_summary()
lines = [
"=" * 70,
"容器镜像安全扫描报告",
"=" * 70,
"",
f"漏洞总数: {summary['total']}",
f" CRITICAL: {summary['critical']}",
f" HIGH: {summary['high']}",
f" MEDIUM: {summary['medium']}",
f" LOW: {summary['low']}",
f" 可修复: {summary['fixable']}",
f" 不可修复: {summary['unfixable']}",
"",
]
critical_vulns = self.get_critical_vulns()
if critical_vulns:
lines.append("-" * 70)
lines.append("⚠️ CRITICAL / HIGH 漏洞详情:")
lines.append("-" * 70)
for v in critical_vulns:
lines.append(f"\n[{v.severity}] {v.vuln_id}")
lines.append(f" 包: {v.pkg_name} ({v.installed_version})")
lines.append(f" 修复版本: {v.fixed_version}")
lines.append(f" 标题: {v.title}")
lines.append("")
lines.append("=" * 70)
return "\n".join(lines)
def should_fail_build(self, max_critical: int = 0,
max_high: int = 0) -> bool:
"""
判断是否应该使构建失败
Args:
max_critical: 允许的最大 CRITICAL 漏洞数
max_high: 允许的最大 HIGH 漏洞数
Returns:
True 表示应该失败
"""
summary = self.get_summary()
if summary["critical"] > max_critical:
logger.error("CRITICAL 漏洞数 (%d) 超过阈值 (%d)",
summary["critical"], max_critical)
return True
if summary["high"] > max_high:
logger.error("HIGH 漏洞数 (%d) 超过阈值 (%d)",
summary["high"], max_high)
return True
return False
# ── 使用示例 ────────────────────────────────────────────────────
if __name__ == "__main__":
scanner = TrivyScanner()
# 方式 1: 直接扫描镜像
image = sys.argv[1] if len(sys.argv) > 1 else "python:3.12-slim"
try:
output = scanner.scan_image(image)
scanner.parse_results(output)
except Exception as e:
logger.error("扫描失败: %s", e)
sys.exit(1)
# 生成报告
report = scanner.generate_report()
print(report)
# CI/CD 集成:检查是否应该失败
if scanner.should_fail_build(max_critical=0, max_high=5):
print("\n❌ 构建失败:漏洞数超过阈值")
sys.exit(1)
else:
print("\n✅ 安全检查通过")
```
### Python: SBOM 生成与分析
```python
"""
SBOM (Software Bill of Materials) 生成与分析
依赖: pip install packageurl-python
工具: syft (生成 SBOM), grype (漏洞扫描)
"""
import json
import subprocess
import logging
from dataclasses import dataclass, field
from typing import Optional
from collections import Counter
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
@dataclass
class Component:
"""SBOM 组件"""
name: str
version: str
type: str # library, framework, application
purl: str = "" # Package URL
licenses: list = field(default_factory=list)
supplier: str = ""
class SBOMManager:
"""SBOM 生成与分析管理器"""
def __init__(self):
self.components: list[Component] = []
def generate_sbom(self, target: str, output_format: str = "cyclonedx-json",
output_file: Optional[str] = None) -> dict:
"""
使用 Syft 生成 SBOM
Args:
target: 扫描目标(镜像名、目录路径等)
output_format: 输出格式(cyclonedx-json, spdx-json, syft-json)
output_file: 输出文件路径(可选)
Returns:
SBOM JSON 数据
"""
try:
cmd = ["syft", target, "-o", output_format, "--quiet"]
logger.info("生成 SBOM: %s (格式: %s)", target, output_format)
result = subprocess.run(
cmd, capture_output=True, text=True, timeout=300
)
if result.returncode != 0:
logger.error("Syft 执行失败: %s", result.stderr)
raise RuntimeError(f"Syft failed: {result.stderr}")
sbom_data = json.loads(result.stdout)
if output_file:
with open(output_file, 'w') as f:
json.dump(sbom_data, f, indent=2)
logger.info("SBOM 已保存: %s", output_file)
return sbom_data
except FileNotFoundError:
logger.error("Syft 未安装,请运行: brew install syft")
raise
def parse_cyclonedx(self, sbom: dict) -> list[Component]:
"""
解析 CycloneDX 格式的 SBOM
Args:
sbom: CycloneDX JSON 数据
Returns:
组件列表
"""
self.components = []
for comp in sbom.get("components", []):
component = Component(
name=comp.get("name", ""),
version=comp.get("version", ""),
type=comp.get("type", "library"),
purl=comp.get("purl", ""),
licenses=[
lic.get("license", {}).get("id", "unknown")
for lic in comp.get("licenses", [])
],
supplier=comp.get("supplier", {}).get("name", ""),
)
self.components.append(component)
logger.info("SBOM 解析完成: %d 个组件", len(self.components))
return self.components
def analyze_licenses(self) -> dict:
"""分析许可证分布"""
license_count = Counter()
for comp in self.components:
for lic in comp.licenses:
license_count[lic] += 1
if not comp.licenses:
license_count["UNKNOWN"] += 1
return dict(license_count.most_common())
def find_risky_licenses(self,
risky_licenses: list = None) -> list[Component]:
"""
查找使用高风险许可证的组件
Args:
risky_licenses: 高风险许可证列表
Returns:
使用高风险许可证的组件列表
"""
if risky_licenses is None:
risky_licenses = [
"GPL-2.0-only", "GPL-3.0-only",
"AGPL-3.0-only", "SSPL-1.0",
"EUPL-1.2", "CPAL-1.0",
]
risky = []
for comp in self.components:
for lic in comp.licenses:
if lic in risky_licenses:
risky.append(comp)
break
if risky:
logger.warning("发现 %d 个使用高风险许可证的组件", len(risky))
return risky
def get_statistics(self) -> dict:
"""获取 SBOM 统计信息"""
type_count = Counter(c.type for c in self.components)
return {
"total_components": len(self.components),
"by_type": dict(type_count),
"licenses": self.analyze_licenses(),
"with_purl": sum(1 for c in self.components if c.purl),
"without_version": sum(1 for c in self.components if not c.version),
}
def generate_report(self) -> str:
"""生成 SBOM 分析报告"""
stats = self.get_statistics()
risky = self.find_risky_licenses()
lines = [
"=" * 60,
"SBOM 分析报告",
"=" * 60,
f"组件总数: {stats['total_components']}",
f"有 PURL: {stats['with_purl']}",
f"无版本号: {stats['without_version']}",
"",
"组件类型分布:",
]
for t, count in stats["by_type"].items():
lines.append(f" {t}: {count}")
lines.append("\n许可证分布:")
for lic, count in stats["licenses"].items():
lines.append(f" {lic}: {count}")
if risky:
lines.append(f"\n⚠️ 高风险许可证组件 ({len(risky)}):")
for comp in risky:
lines.append(f" - {comp.name}@{comp.version} ({', '.join(comp.licenses)})")
lines.append("=" * 60)
return "\n".join(lines)
def scan_vulnerabilities(self, sbom_file: str) -> dict:
"""
使用 Grype 扫描 SBOM 中的漏洞
Args:
sbom_file: SBOM 文件路径
Returns:
Grype 扫描结果
"""
try:
cmd = ["grype", f"sbom:{sbom_file}", "-o", "json", "--quiet"]
logger.info("扫描 SBOM 漏洞: %s", sbom_file)
result = subprocess.run(
cmd, capture_output=True, text=True, timeout=300
)
return json.loads(result.stdout) if result.stdout else {}
except FileNotFoundError:
logger.error("Grype 未安装,请运行: brew install grype")
raise
# ── 使用示例 ────────────────────────────────────────────────────
if __name__ == "__main__":
import sys
manager = SBOMManager()
target = sys.argv[1] if len(sys.argv) > 1 else "python:3.12-slim"
# 生成 SBOM
try:
sbom = manager.generate_sbom(target, output_file="/tmp/sbom.json")
except Exception as e:
logger.error("SBOM 生成失败: %s", e)
sys.exit(1)
# 解析并分析
manager.parse_cyclonedx(sbom)
report = manager.generate_report()
print(report)
# 检查高风险许可证
risky = manager.find_risky_licenses()
if risky:
print(f"\n⚠️ 发现 {len(risky)} 个高风险许可证组件,请审查!")
```
### Java: Spring Boot 容器化安全配置
```java
/**
* Spring Boot 容器化安全配置
* 适用于在 Kubernetes 中运行的 Spring Boot 应用
*/
package com.example.security;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.security.config.annotation.web.builders.HttpSecurity;
import org.springframework.security.config.annotation.web.configuration.EnableWebSecurity;
import org.springframework.security.config.http.SessionCreationPolicy;
import org.springframework.security.web.SecurityFilterChain;
import org.springframework.security.web.header.writers.ReferrerPolicyHeaderWriter;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
import java.util.Map;
@SpringBootApplication
public class SecureApplication {
public static void main(String[] args) {
SpringApplication.run(SecureApplication.class, args);
}
}
/**
* 安全配置 — 适合容器化部署
*/
@Configuration
@EnableWebSecurity
class SecurityConfig {
@Bean
public SecurityFilterChain filterChain(HttpSecurity http) throws Exception {
http
// ✅ 无状态会话(容器化应用不应依赖 session)
.sessionManagement(session ->
session.sessionCreationPolicy(SessionCreationPolicy.STATELESS))
// ✅ 安全头部
.headers(headers -> headers
// 防止点击劫持
.frameOptions(frame -> frame.deny())
// 内容安全策略
.contentSecurityPolicy(csp ->
csp.policyDirectives("default-src 'self'; script-src 'self'"))
// 防止 MIME 类型嗅探
.contentTypeOptions(content -> {})
// 引用策略
.referrerPolicy(referrer ->
referrer.policy(ReferrerPolicyHeaderWriter.ReferrerPolicy.STRICT_ORIGIN_WHEN_CROSS_ORIGIN))
// HSTS
.httpStrictTransportSecurity(hsts ->
hsts.includeSubDomains(true).maxAgeInSeconds(31536000))
)
// ✅ 路径权限
.authorizeHttpRequests(auth -> auth
// 健康检查端点(K8s 探针需要)
.requestMatchers("/health", "/ready", "/metrics").permitAll()
// 其他请求需要认证
.anyRequest().authenticated()
)
// ✅ 禁用不需要的功能
.csrf(csrf -> csrf.disable()) // 无状态 API 不需要 CSRF
.formLogin(form -> form.disable())
.httpBasic(basic -> basic.disable());
return http.build();
}
}
/**
* 健康检查控制器 — K8s 探针使用
*/
@RestController
class HealthController {
@GetMapping("/health")
public Map health() {
return Map.of("status", "UP");
}
@GetMapping("/ready")
public Map ready() {
// 检查依赖服务是否就绪
return Map.of("status", "READY");
}
}
```
**application.yaml** — 容器化安全配置:
```yaml
# Spring Boot 容器化安全配置
server:
port: 8080
# ✅ 安全头部
servlet:
session:
cookie:
secure: true
http-only: true
same-site: strict
# ✅ 错误处理(不暴露堆栈信息)
error:
include-stacktrace: never
include-message: never
# ✅ Actuator 安全配置
management:
endpoints:
web:
exposure:
include: health,ready,metrics
base-path: /
endpoint:
health:
show-details: never # 不暴露详细健康信息
server:
port: 8081 # 管理端口与业务端口分离
# ✅ 日志配置(不记录敏感信息)
logging:
level:
root: INFO
org.springframework.security: WARN
pattern:
console: "%d{ISO8601} [%thread] %-5level %logger{36} - %msg%n"
# ✅ 优雅关闭(K8s 需要)
spring:
lifecycle:
timeout-per-shutdown-phase: 30s
```
### Java: Jib 安全构建配置
```java
/**
* Jib 安全构建配置 — 无需 Docker daemon 构建安全镜像
*
* build.gradle:
*/
// ── build.gradle ────────────────────────────────────────────────
/*
plugins {
id 'org.springframework.boot' version '3.3.0'
id 'com.google.cloud.tools.jib' version '3.4.1'
}
jib {
// ✅ 使用 distroless 基础镜像(极小攻击面)
from {
image = 'gcr.io/distroless/java21-debian12:nonroot'
// 或使用 Eclipse Temurin:
// image = 'eclipse-temurin:21-jre-jammy'
}
to {
image = "${System.getenv('REGISTRY') ?: 'myregistry.com'}/myapp"
tags = [version, 'latest']
// ✅ 镜像签名(需要配置 cosign)
}
container {
// ✅ 非 root 用户运行
user = '65532:65532'
// ✅ JVM 安全参数
jvmFlags = [
'-XX:+UseContainerSupport',
'-XX:MaxRAMPercentage=75.0',
'-Djava.security.egd=file:/dev/urandom',
'-Dfile.encoding=UTF-8',
// 禁用不安全的序列化
'-Djdk.serialFilter=!*',
]
// ✅ 只暴露必要端口
ports = ['8080']
// ✅ 标签
labels = [
'maintainer': 'security-team@example.com',
'org.opencontainers.image.source': 'https://github.com/org/repo',
'org.opencontainers.image.description': 'Secure Spring Boot App',
]
// ✅ 创建时间(可重现构建)
creationTime = 'USE_CURRENT_TIMESTAMP'
// ✅ 环境变量
environment = [
'JAVA_TOOL_OPTIONS': '-XX:+UseContainerSupport',
'TZ': 'UTC',
]
}
// ✅ 额外目录(如配置文件)
extraDirectories {
paths {
path {
from = 'src/main/resources/docker'
into = '/app/config'
}
}
permissions = [
'/app/config/*': '444', // 只读
]
}
}
*/
// ── Jib 构建命令 ────────────────────────────────────────────────
// 构建并推送到仓库(无需 Docker daemon):
// ./gradlew jib
//
// 构建到本地 Docker:
// ./gradlew jibDockerBuild
//
// 构建为 tar 文件:
// ./gradlew jibBuildTar
```
## 28.4 容器安全扫描工具对比
| 特性 | Trivy | Grype | Snyk Container |
|------|-------|-------|----------------|
| **开源** | ✅ Apache 2.0 | ✅ Apache 2.0 | ⚠️ 免费层 + 商业 |
| **漏洞数据库** | NVD, GitHub Advisory, 多源 | NVD, GitHub Advisory | Snyk 自有数据库 |
| **扫描速度** | ⚡ 快(本地缓存) | ⚡ 快 | 🐢 较慢(云端分析) |
| **OS 包扫描** | ✅ | ✅ | ✅ |
| **语言包扫描** | ✅ (pip, npm, go, maven...) | ✅ | ✅ |
| **IaC 扫描** | ✅ (K8s, Terraform, Docker) | ❌ | ✅ |
| **SBOM 生成** | ✅ CycloneDX, SPDX | ❌ (用 Syft) | ✅ |
| **许可证扫描** | ✅ | ❌ | ✅ |
| **密钥扫描** | ✅ | ❌ | ❌ |
| **CI/CD 集成** | ✅ GitHub Actions, GitLab CI | ✅ GitHub Actions | ✅ 原生集成 |
| **IDE 插件** | ✅ VS Code | ❌ | ✅ VS Code, IntelliJ |
| **修复建议** | ⚠️ 基本 | ⚠️ 基本 | ✅ 详细(含 PR) |
| **安装** | `brew install trivy` | `brew install grype` | `npm install -g snyk` |
**选型建议**:
- **全能型** → **Trivy**(漏洞 + IaC + SBOM + 密钥,一个工具搞定)
- **专注漏洞扫描** → **Grype** + **Syft**(Anchore 生态)
- **企业级 + 修复建议** → **Snyk**(商业支持,自动修复 PR)
- **最佳实践**:CI 中同时使用 Trivy + Grype,交叉验证
## 28.5 Kubernetes 安全加固清单
### 集群级别
| 类别 | 检查项 | 优先级 | 工具/方法 |
|------|--------|--------|----------|
| **API Server** | 禁用匿名认证 | 🔴 高 | `--anonymous-auth=false` |
| **API Server** | 启用审计日志 | 🔴 高 | `--audit-log-path` |
| **API Server** | 启用 RBAC | 🔴 高 | `--authorization-mode=RBAC` |
| **etcd** | 启用静态加密 | 🔴 高 | `EncryptionConfiguration` |
| **etcd** | 启用 TLS | 🔴 高 | `--cert-file`, `--key-file` |
| **Kubelet** | 禁用匿名认证 | 🔴 高 | `--anonymous-auth=false` |
| **Kubelet** | 启用 Webhook 认证 | 🟡 中 | `--authentication-token-webhook` |
| **网络** | 启用 Network Policy | 🔴 高 | Calico / Cilium |
| **网络** | 加密 Pod 间通信 | 🟡 中 | WireGuard / Istio mTLS |
### 工作负载级别
| 类别 | 检查项 | 优先级 | 说明 |
|------|--------|--------|------|
| **PSS** | 生产命名空间使用 `restricted` | 🔴 高 | 强制非 root、只读 FS |
| **镜像** | 禁止使用 `latest` 标签 | 🔴 高 | OPA/Kyverno 策略 |
| **镜像** | 只允许可信仓库 | 🔴 高 | Admission Webhook |
| **镜像** | 启用镜像签名验证 | 🟡 中 | Cosign + Kyverno |
| **资源** | 设置 CPU/内存限制 | 🔴 高 | 防止资源耗尽 |
| **Secrets** | 使用 External Secrets | 🔴 高 | 不在 YAML 中硬编码 |
| **SA** | 禁用自动挂载 SA Token | 🟡 中 | `automountServiceAccountToken: false` |
| **容器** | 只读根文件系统 | 🟡 中 | `readOnlyRootFilesystem: true` |
| **容器** | 丢弃所有 capabilities | 🔴 高 | `drop: [ALL]` |
### 运行时安全
| 类别 | 检查项 | 优先级 | 工具 |
|------|--------|--------|------|
| **监控** | 异常行为检测 | 🔴 高 | Falco / Tetragon |
| **扫描** | 定期镜像漏洞扫描 | 🔴 高 | Trivy Operator |
| **日志** | 集中式日志收集 | 🔴 高 | Fluentd / Vector |
| **审计** | K8s 审计日志分析 | 🟡 中 | ELK / Loki |
| **备份** | etcd 定期备份 | 🔴 高 | Velero |
### CIS Benchmark 自动检查
```bash
# 使用 kube-bench 检查 CIS Kubernetes Benchmark
# 安装: brew install kube-bench
kube-bench run --targets master,node,policies
# 使用 Trivy 检查 K8s 安全配置
trivy k8s --report summary cluster
```
## 28.6 小结
- **4C 模型** 从云平台到代码,每一层都需要安全措施
- **容器安全**:最小镜像 + 非 root + 漏洞扫描 + 签名
- **K8s 安全**:RBAC + PSS + Network Policy + Admission Control
- **OPA Gatekeeper** 在 Admission 阶段强制执行安全策略
- **供应链安全**(SLSA)确保从源代码到部署的完整性
- **SBOM** 提供软件组成透明度,便于漏洞追踪和许可证合规
- **Admission Webhook** 可自定义安全策略,在部署前拦截违规配置
- 安全是**分层**的,每一层都不能缺少