第二十七章:密钥与凭证管理

“密钥管理是安全的最后一公里 — 再好的加密算法,密钥泄露了也白搭。”

        mindmap
  root((密钥管理))
    HashiCorp Vault
      Secret Engines
      Auth Methods
      动态密钥
      PKI
    云密钥服务
      AWS KMS
      Azure Key Vault
      GCP Secret Manager
    K8s Secrets
      External Secrets
      Sealed Secrets
      CSI Driver
    密钥轮换
      零停机
      自动化
    泄露检测
      TruffleHog
      GitLeaks
    

27.1 密钥管理挑战

常见的密钥管理反模式:
❌ 硬编码在代码中:password = "admin123"
❌ 存储在环境变量:容器重启后可能丢失
❌ 提交到 Git:即使删除,历史中仍然存在
❌ 共享密钥:多个服务使用同一个密钥
❌ 永不轮换:密钥一用就是几年
❌ 明文传输:通过 Slack/邮件分享密钥

密钥生命周期管理

        flowchart LR
    A["🔑 生成<br/>Generate"] --> B["📦 分发<br/>Distribute"]
    B --> C["🔒 使用<br/>Use"]
    C --> D["🔄 轮换<br/>Rotate"]
    D --> C
    D --> E["🗑️ 销毁<br/>Destroy"]

    style A fill:#4CAF50,stroke:#333,color:#fff
    style B fill:#2196F3,stroke:#333,color:#fff
    style C fill:#FF9800,stroke:#333,color:#fff
    style D fill:#9C27B0,stroke:#333,color:#fff
    style E fill:#F44336,stroke:#333,color:#fff
    

每个阶段的关键要求:

阶段

关键要求

工具/方法

生成

使用 CSPRNG,足够的密钥长度

HSM、Vault、KMS

分发

加密传输,最小权限

TLS、Envelope Encryption

使用

内存保护,访问审计

Vault Transit、KMS API

轮换

零停机,自动化

双密钥策略、Vault 自动轮换

销毁

安全擦除,不可恢复

密钥版本过期、HSM 销毁

27.2 HashiCorp Vault

Vault 是最流行的密钥管理工具:

        flowchart TB
    subgraph Clients["客户端"]
        CLI["CLI"]
        API["HTTP API"]
        UI["Web UI"]
    end

    subgraph VaultServer["Vault Server"]
        direction TB
        subgraph AuthMethods["Auth Methods"]
            Token["Token"]
            AppRole["AppRole"]
            K8sAuth["Kubernetes"]
            AWSAuth["AWS IAM"]
            OIDC["OIDC"]
        end

        subgraph SecretEngines["Secret Engines"]
            KV["KV v2"]
            PKI["PKI"]
            Transit["Transit"]
            Database["Database"]
            SSH["SSH"]
        end

        subgraph Core["核心组件"]
            Policy["Policy Engine"]
            Audit["Audit Log"]
            Seal["Seal/Unseal"]
        end
    end

    subgraph Storage["Storage Backend"]
        Consul["Consul"]
        Raft["Integrated Raft"]
        PG["PostgreSQL"]
    end

    Clients --> AuthMethods
    AuthMethods --> Policy
    Policy --> SecretEngines
    Core --> Storage

    style VaultServer fill:#000,stroke:#FFD700,color:#fff
    style AuthMethods fill:#1a1a2e,stroke:#e94560,color:#fff
    style SecretEngines fill:#1a1a2e,stroke:#0f3460,color:#fff
    style Core fill:#1a1a2e,stroke:#16213e,color:#fff
    style Storage fill:#162447,stroke:#e94560,color:#fff
    

Secret Engines

Engine

用途

特点

KV

键值存储

版本化、审计

PKI

证书管理

动态签发 X.509 证书

Transit

加密即服务

不暴露密钥,只提供加解密 API

Database

数据库凭证

动态生成、自动过期

SSH

SSH 证书

签发短期 SSH 证书

AWS/GCP/Azure

云凭证

动态生成云平台凭证

AppRole 认证

import hvac

# 使用 AppRole 认证
client = hvac.Client(url='https://vault.example.com:8200')
client.auth.approle.login(
    role_id='your-role-id',
    secret_id='your-secret-id',
)

# 读取密钥
secret = client.secrets.kv.v2.read_secret_version(
    path='myapp/database',
    mount_point='secret',
)
db_password = secret['data']['data']['password']

# 动态数据库凭证
creds = client.secrets.database.generate_credentials(
    name='my-role',
    mount_point='database',
)
print(f"Username: {creds['data']['username']}")
print(f"Password: {creds['data']['password']}")
print(f"TTL: {creds['lease_duration']}s")  # 自动过期!

# PKI — 签发证书
cert = client.secrets.pki.generate_certificate(
    name='web-server',
    common_name='api.example.com',
    mount_point='pki',
)
print(f"Certificate: {cert['data']['certificate']}")
print(f"Private Key: {cert['data']['private_key']}")

Python: HashiCorp Vault 完整客户端

"""
HashiCorp Vault 客户端封装 — 支持 KV 读写、动态数据库凭证、Transit 加密
依赖: pip install hvac
"""
import hvac
import base64
import logging
from contextlib import contextmanager
from typing import Any, Optional

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)


class VaultClient:
    """HashiCorp Vault 客户端封装"""

    def __init__(self, url: str, role_id: str, secret_id: str,
                 namespace: Optional[str] = None):
        """
        初始化 Vault 客户端并通过 AppRole 认证

        Args:
            url: Vault 服务器地址
            role_id: AppRole 的 Role ID
            secret_id: AppRole 的 Secret ID
            namespace: Vault Enterprise 命名空间(可选)
        """
        self.client = hvac.Client(url=url, namespace=namespace)
        try:
            resp = self.client.auth.approle.login(
                role_id=role_id,
                secret_id=secret_id,
            )
            logger.info("Vault 认证成功, accessor=%s", resp['auth']['accessor'])
        except hvac.exceptions.VaultError as e:
            logger.error("Vault 认证失败: %s", e)
            raise

    # ── KV v2 操作 ──────────────────────────────────────────────

    def read_secret(self, path: str, mount_point: str = "secret") -> dict:
        """
        从 KV v2 引擎读取密钥

        Args:
            path: 密钥路径,如 "myapp/database"
            mount_point: KV 引擎挂载点
        Returns:
            密钥数据字典
        """
        try:
            resp = self.client.secrets.kv.v2.read_secret_version(
                path=path,
                mount_point=mount_point,
            )
            return resp['data']['data']
        except hvac.exceptions.InvalidPath:
            logger.error("密钥路径不存在: %s", path)
            raise
        except hvac.exceptions.Forbidden:
            logger.error("无权限读取: %s", path)
            raise

    def write_secret(self, path: str, data: dict,
                     mount_point: str = "secret") -> dict:
        """
        写入密钥到 KV v2 引擎(自动版本化)

        Args:
            path: 密钥路径
            data: 密钥数据
            mount_point: KV 引擎挂载点
        Returns:
            写入响应(包含版本号)
        """
        try:
            resp = self.client.secrets.kv.v2.create_or_update_secret(
                path=path,
                secret=data,
                mount_point=mount_point,
            )
            version = resp['data']['version']
            logger.info("密钥写入成功: %s (version=%d)", path, version)
            return resp
        except hvac.exceptions.VaultError as e:
            logger.error("密钥写入失败: %s", e)
            raise

    # ── 动态数据库凭证 ──────────────────────────────────────────

    def get_db_credentials(self, role_name: str,
                           mount_point: str = "database") -> dict:
        """
        获取动态数据库凭证(自动过期)

        Args:
            role_name: 数据库角色名
            mount_point: Database 引擎挂载点
        Returns:
            包含 username, password, lease_id, lease_duration 的字典
        """
        try:
            resp = self.client.secrets.database.generate_credentials(
                name=role_name,
                mount_point=mount_point,
            )
            creds = {
                'username': resp['data']['username'],
                'password': resp['data']['password'],
                'lease_id': resp['lease_id'],
                'lease_duration': resp['lease_duration'],
            }
            logger.info(
                "动态凭证已生成: user=%s, ttl=%ds",
                creds['username'], creds['lease_duration']
            )
            return creds
        except hvac.exceptions.VaultError as e:
            logger.error("获取数据库凭证失败: %s", e)
            raise

    # ── Transit 加密即服务 ──────────────────────────────────────

    def transit_encrypt(self, key_name: str, plaintext: str,
                        mount_point: str = "transit") -> str:
        """
        使用 Transit 引擎加密数据(密钥永远不离开 Vault)

        Args:
            key_name: Transit 密钥名称
            plaintext: 明文字符串
            mount_point: Transit 引擎挂载点
        Returns:
            Vault 密文(vault:v1:... 格式)
        """
        try:
            # Transit 要求 base64 编码的明文
            encoded = base64.b64encode(plaintext.encode()).decode()
            resp = self.client.secrets.transit.encrypt_data(
                name=key_name,
                plaintext=encoded,
                mount_point=mount_point,
            )
            ciphertext = resp['data']['ciphertext']
            logger.info("Transit 加密成功, key=%s", key_name)
            return ciphertext
        except hvac.exceptions.VaultError as e:
            logger.error("Transit 加密失败: %s", e)
            raise

    def transit_decrypt(self, key_name: str, ciphertext: str,
                        mount_point: str = "transit") -> str:
        """
        使用 Transit 引擎解密数据

        Args:
            key_name: Transit 密钥名称
            ciphertext: Vault 密文
            mount_point: Transit 引擎挂载点
        Returns:
            解密后的明文字符串
        """
        try:
            resp = self.client.secrets.transit.decrypt_data(
                name=key_name,
                ciphertext=ciphertext,
                mount_point=mount_point,
            )
            plaintext = base64.b64decode(resp['data']['plaintext']).decode()
            logger.info("Transit 解密成功, key=%s", key_name)
            return plaintext
        except hvac.exceptions.VaultError as e:
            logger.error("Transit 解密失败: %s", e)
            raise


# ── 使用示例 ────────────────────────────────────────────────────
if __name__ == "__main__":
    vault = VaultClient(
        url="https://vault.example.com:8200",
        role_id="your-role-id",
        secret_id="your-secret-id",
    )

    # KV 读写
    vault.write_secret("myapp/config", {"api_key": "sk-xxxx", "db_host": "db.internal"})
    config = vault.read_secret("myapp/config")
    print(f"API Key: {config['api_key']}")

    # 动态数据库凭证
    db_creds = vault.get_db_credentials("readonly-role")
    print(f"DB User: {db_creds['username']}, TTL: {db_creds['lease_duration']}s")

    # Transit 加密/解密
    encrypted = vault.transit_encrypt("my-key", "sensitive-data-here")
    print(f"Encrypted: {encrypted}")
    decrypted = vault.transit_decrypt("my-key", encrypted)
    print(f"Decrypted: {decrypted}")

Python: AWS KMS / Envelope Encryption

"""
AWS KMS Envelope Encryption 实现
依赖: pip install boto3 cryptography
"""
import boto3
import base64
import json
import os
import logging
from cryptography.hazmat.primitives.ciphers.aead import AESGCM
from typing import Tuple

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)


class EnvelopeEncryption:
    """
    Envelope Encryption(信封加密):
    1. 用 KMS 主密钥(CMK)生成数据密钥(DEK)
    2. 用 DEK 在本地加密数据(高性能)
    3. 用 CMK 加密 DEK 本身
    4. 存储加密后的 DEK + 加密后的数据
    """

    def __init__(self, kms_key_id: str, region: str = "us-east-1"):
        """
        Args:
            kms_key_id: AWS KMS 密钥 ID 或 ARN
            region: AWS 区域
        """
        self.kms_client = boto3.client('kms', region_name=region)
        self.kms_key_id = kms_key_id

    def encrypt(self, plaintext: bytes,
                encryption_context: dict = None) -> dict:
        """
        使用 Envelope Encryption 加密数据

        Args:
            plaintext: 要加密的数据
            encryption_context: 加密上下文(用于 AAD,防止密文被挪用)
        Returns:
            包含 encrypted_key, nonce, ciphertext 的字典
        """
        try:
            # 步骤 1: 向 KMS 请求数据密钥
            params = {
                'KeyId': self.kms_key_id,
                'KeySpec': 'AES_256',
            }
            if encryption_context:
                params['EncryptionContext'] = encryption_context

            response = self.kms_client.generate_data_key(**params)
            plaintext_key = response['Plaintext']       # 明文 DEK(仅在内存中)
            encrypted_key = response['CiphertextBlob']  # 加密后的 DEK

            # 步骤 2: 用明文 DEK 在本地加密数据(AES-256-GCM)
            nonce = os.urandom(12)
            aesgcm = AESGCM(plaintext_key)
            aad = json.dumps(encryption_context).encode() if encryption_context else None
            ciphertext = aesgcm.encrypt(nonce, plaintext, aad)

            # 步骤 3: 立即清除内存中的明文密钥
            # (Python 无法保证立即清除,生产环境建议使用 C 扩展)
            del plaintext_key

            logger.info("Envelope 加密完成, 数据大小=%d bytes", len(plaintext))

            return {
                'encrypted_key': base64.b64encode(encrypted_key).decode(),
                'nonce': base64.b64encode(nonce).decode(),
                'ciphertext': base64.b64encode(ciphertext).decode(),
                'encryption_context': encryption_context,
            }

        except Exception as e:
            logger.error("Envelope 加密失败: %s", e)
            raise

    def decrypt(self, envelope: dict) -> bytes:
        """
        解密 Envelope Encryption 数据

        Args:
            envelope: encrypt() 返回的字典
        Returns:
            解密后的明文数据
        """
        try:
            encrypted_key = base64.b64decode(envelope['encrypted_key'])
            nonce = base64.b64decode(envelope['nonce'])
            ciphertext = base64.b64decode(envelope['ciphertext'])
            encryption_context = envelope.get('encryption_context')

            # 步骤 1: 用 KMS 解密数据密钥
            params = {'CiphertextBlob': encrypted_key}
            if encryption_context:
                params['EncryptionContext'] = encryption_context

            response = self.kms_client.decrypt(**params)
            plaintext_key = response['Plaintext']

            # 步骤 2: 用明文 DEK 解密数据
            aesgcm = AESGCM(plaintext_key)
            aad = (json.dumps(encryption_context).encode()
                   if encryption_context else None)
            plaintext = aesgcm.decrypt(nonce, ciphertext, aad)

            del plaintext_key
            logger.info("Envelope 解密完成")
            return plaintext

        except Exception as e:
            logger.error("Envelope 解密失败: %s", e)
            raise


# ── Envelope Encryption 原理图 ──────────────────────────────────
# 见下方 Mermaid 图表

# ── 使用示例 ────────────────────────────────────────────────────
if __name__ == "__main__":
    envelope_enc = EnvelopeEncryption(
        kms_key_id="arn:aws:kms:us-east-1:123456789:key/your-key-id",
        region="us-east-1",
    )

    # 加密
    data = b"This is highly sensitive data"
    context = {"purpose": "user-data", "tenant": "acme-corp"}
    envelope = envelope_enc.encrypt(data, encryption_context=context)
    print(f"Encrypted envelope: {json.dumps(envelope, indent=2)}")

    # 解密
    decrypted = envelope_enc.decrypt(envelope)
    print(f"Decrypted: {decrypted.decode()}")

Envelope Encryption 原理图

        flowchart TB
    subgraph KMS["☁️ AWS KMS"]
        CMK["主密钥 CMK<br/>(永远不离开 KMS)"]
    end

    subgraph Encrypt["加密流程"]
        direction TB
        E1["1️⃣ 请求 GenerateDataKey"] --> E2["2️⃣ KMS 返回<br/>明文 DEK + 加密 DEK"]
        E2 --> E3["3️⃣ 用明文 DEK<br/>本地 AES-GCM 加密数据"]
        E3 --> E4["4️⃣ 丢弃明文 DEK"]
        E4 --> E5["5️⃣ 存储:加密 DEK + 密文"]
    end

    subgraph Decrypt["解密流程"]
        direction TB
        D1["1️⃣ 读取加密 DEK"] --> D2["2️⃣ 发送加密 DEK 到 KMS"]
        D2 --> D3["3️⃣ KMS 返回明文 DEK"]
        D3 --> D4["4️⃣ 用明文 DEK<br/>本地 AES-GCM 解密"]
        D4 --> D5["5️⃣ 丢弃明文 DEK"]
    end

    CMK -.->|生成 DEK| E1
    CMK -.->|解密 DEK| D2

    style KMS fill:#FF9900,stroke:#333,color:#fff
    style Encrypt fill:#e8f5e9,stroke:#4CAF50
    style Decrypt fill:#e3f2fd,stroke:#2196F3
    

Python: 本地密钥安全存储(keyring 库)

"""
使用 keyring 库安全存储本地凭证
依赖: pip install keyring
- macOS: 使用 Keychain
- Linux: 使用 Secret Service (GNOME Keyring / KWallet)
- Windows: 使用 Windows Credential Locker
"""
import keyring
import logging
import json
from typing import Optional

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

# 服务名称(用于在系统密钥链中分组)
SERVICE_NAME = "myapp-credentials"


def store_credential(key: str, value: str) -> None:
    """
    安全存储凭证到系统密钥链

    Args:
        key: 凭证名称(如 "api_key", "db_password")
        value: 凭证值
    """
    try:
        keyring.set_password(SERVICE_NAME, key, value)
        logger.info("凭证已安全存储: %s", key)
    except keyring.errors.KeyringError as e:
        logger.error("存储凭证失败: %s", e)
        raise


def get_credential(key: str) -> Optional[str]:
    """
    从系统密钥链读取凭证

    Args:
        key: 凭证名称
    Returns:
        凭证值,不存在则返回 None
    """
    try:
        value = keyring.get_password(SERVICE_NAME, key)
        if value is None:
            logger.warning("凭证不存在: %s", key)
        return value
    except keyring.errors.KeyringError as e:
        logger.error("读取凭证失败: %s", e)
        raise


def delete_credential(key: str) -> None:
    """
    从系统密钥链删除凭证

    Args:
        key: 凭证名称
    """
    try:
        keyring.delete_password(SERVICE_NAME, key)
        logger.info("凭证已删除: %s", key)
    except keyring.errors.PasswordDeleteError:
        logger.warning("凭证不存在,无需删除: %s", key)
    except keyring.errors.KeyringError as e:
        logger.error("删除凭证失败: %s", e)
        raise


def store_json_credential(key: str, data: dict) -> None:
    """存储 JSON 格式的复合凭证"""
    store_credential(key, json.dumps(data))


def get_json_credential(key: str) -> Optional[dict]:
    """读取 JSON 格式的复合凭证"""
    value = get_credential(key)
    return json.loads(value) if value else None


# ── 使用示例 ────────────────────────────────────────────────────
if __name__ == "__main__":
    # 存储 API 密钥
    store_credential("github_token", "ghp_xxxxxxxxxxxxxxxxxxxx")

    # 读取 API 密钥
    token = get_credential("github_token")
    print(f"GitHub Token: {token[:10]}..." if token else "Not found")

    # 存储复合凭证
    store_json_credential("db_config", {
        "host": "db.example.com",
        "port": 5432,
        "username": "app_user",
        "password": "s3cret!",
    })

    # 读取复合凭证
    db_config = get_json_credential("db_config")
    if db_config:
        print(f"DB Host: {db_config['host']}")

    # 清理
    delete_credential("github_token")

Java: Spring Vault 集成

/**
 * Spring Vault 集成 — 使用 VaultTemplate 读写密钥
 * 依赖: spring-vault-core
 *
 * build.gradle:
 *   implementation 'org.springframework.vault:spring-vault-core:3.1.1'
 */
package com.example.vault;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.vault.core.VaultTemplate;
import org.springframework.vault.core.VaultTransitOperations;
import org.springframework.vault.support.Ciphertext;
import org.springframework.vault.support.Plaintext;
import org.springframework.vault.support.VaultResponseSupport;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Map;
import java.util.Objects;

@Service
public class VaultSecretService {

    private static final Logger logger = LoggerFactory.getLogger(VaultSecretService.class);

    private final VaultTemplate vaultTemplate;

    @Autowired
    public VaultSecretService(VaultTemplate vaultTemplate) {
        this.vaultTemplate = vaultTemplate;
    }

    // ── KV v2 读写 ──────────────────────────────────────────────

    /**
     * 写入密钥到 KV v2 引擎
     *
     * @param path 密钥路径,如 "myapp/database"
     * @param secrets 密钥键值对
     */
    public void writeSecret(String path, Map<String, String> secrets) {
        try {
            vaultTemplate.opsForKeyValue("secret").put(path, secrets);
            logger.info("密钥写入成功: {}", path);
        } catch (Exception e) {
            logger.error("密钥写入失败: {}", path, e);
            throw new RuntimeException("Failed to write secret: " + path, e);
        }
    }

    /**
     * 从 KV v2 引擎读取密钥
     *
     * @param path 密钥路径
     * @param clazz 目标类型
     * @return 密钥数据对象
     */
    public <T> T readSecret(String path, Class<T> clazz) {
        try {
            VaultResponseSupport<T> response =
                vaultTemplate.opsForKeyValue("secret").get(path, clazz);
            if (response == null || response.getData() == null) {
                logger.warn("密钥不存在: {}", path);
                return null;
            }
            logger.info("密钥读取成功: {}", path);
            return response.getData();
        } catch (Exception e) {
            logger.error("密钥读取失败: {}", path, e);
            throw new RuntimeException("Failed to read secret: " + path, e);
        }
    }

    // ── Transit 加密即服务 ──────────────────────────────────────

    /**
     * 使用 Transit 引擎加密
     *
     * @param keyName Transit 密钥名称
     * @param data 明文数据
     * @return 密文
     */
    public String transitEncrypt(String keyName, String data) {
        try {
            VaultTransitOperations transit = vaultTemplate.opsForTransit();
            Ciphertext ciphertext = transit.encrypt(keyName, Plaintext.of(data));
            logger.info("Transit 加密成功, key={}", keyName);
            return Objects.requireNonNull(ciphertext).getCiphertext();
        } catch (Exception e) {
            logger.error("Transit 加密失败, key={}", keyName, e);
            throw new RuntimeException("Transit encryption failed", e);
        }
    }

    /**
     * 使用 Transit 引擎解密
     *
     * @param keyName Transit 密钥名称
     * @param ciphertext 密文
     * @return 明文
     */
    public String transitDecrypt(String keyName, String ciphertext) {
        try {
            VaultTransitOperations transit = vaultTemplate.opsForTransit();
            Plaintext plaintext = transit.decrypt(keyName, Ciphertext.of(ciphertext));
            logger.info("Transit 解密成功, key={}", keyName);
            return Objects.requireNonNull(plaintext).asString();
        } catch (Exception e) {
            logger.error("Transit 解密失败, key={}", keyName, e);
            throw new RuntimeException("Transit decryption failed", e);
        }
    }
}

Java: KeyStore 管理(PKCS12)

/**
 * Java KeyStore 管理 — PKCS12 创建、读取、密钥轮换
 */
package com.example.keystore;

import javax.crypto.KeyGenerator;
import javax.crypto.SecretKey;
import javax.crypto.spec.SecretKeySpec;
import java.io.*;
import java.security.*;
import java.security.cert.CertificateException;
import java.time.Instant;
import java.util.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class KeyStoreManager {

    private static final Logger logger = LoggerFactory.getLogger(KeyStoreManager.class);
    private static final String KEYSTORE_TYPE = "PKCS12";

    private final String keystorePath;
    private final char[] keystorePassword;
    private KeyStore keyStore;

    public KeyStoreManager(String keystorePath, String keystorePassword)
            throws KeyStoreException, IOException, CertificateException, NoSuchAlgorithmException {
        this.keystorePath = keystorePath;
        this.keystorePassword = keystorePassword.toCharArray();
        this.keyStore = KeyStore.getInstance(KEYSTORE_TYPE);

        File file = new File(keystorePath);
        if (file.exists()) {
            // 加载已有 KeyStore
            try (FileInputStream fis = new FileInputStream(file)) {
                keyStore.load(fis, this.keystorePassword);
                logger.info("KeyStore 已加载: {}", keystorePath);
            }
        } else {
            // 创建新 KeyStore
            keyStore.load(null, this.keystorePassword);
            save();
            logger.info("KeyStore 已创建: {}", keystorePath);
        }
    }

    /**
     * 生成并存储 AES-256 密钥
     *
     * @param alias 密钥别名
     * @return 生成的密钥
     */
    public SecretKey generateAndStoreKey(String alias)
            throws NoSuchAlgorithmException, KeyStoreException, IOException,
                   CertificateException {
        KeyGenerator keyGen = KeyGenerator.getInstance("AES");
        keyGen.init(256, new SecureRandom());
        SecretKey secretKey = keyGen.generateKey();

        KeyStore.SecretKeyEntry entry = new KeyStore.SecretKeyEntry(secretKey);
        KeyStore.ProtectionParameter protection =
            new KeyStore.PasswordProtection(keystorePassword);
        keyStore.setEntry(alias, entry, protection);
        save();

        logger.info("AES-256 密钥已生成并存储: alias={}", alias);
        return secretKey;
    }

    /**
     * 读取密钥
     *
     * @param alias 密钥别名
     * @return 密钥,不存在返回 null
     */
    public SecretKey getKey(String alias)
            throws UnrecoverableKeyException, KeyStoreException, NoSuchAlgorithmException {
        Key key = keyStore.getKey(alias, keystorePassword);
        if (key instanceof SecretKey) {
            logger.info("密钥已读取: alias={}", alias);
            return (SecretKey) key;
        }
        logger.warn("密钥不存在或类型不匹配: alias={}", alias);
        return null;
    }

    /**
     * 密钥轮换 — 生成新密钥,保留旧密钥用于解密
     *
     * @param alias 密钥别名
     * @return 新密钥
     */
    public SecretKey rotateKey(String alias)
            throws Exception {
        // 备份旧密钥(添加时间戳后缀)
        SecretKey oldKey = getKey(alias);
        if (oldKey != null) {
            String backupAlias = alias + "-rotated-" + Instant.now().getEpochSecond();
            KeyStore.SecretKeyEntry entry = new KeyStore.SecretKeyEntry(oldKey);
            KeyStore.ProtectionParameter protection =
                new KeyStore.PasswordProtection(keystorePassword);
            keyStore.setEntry(backupAlias, entry, protection);
            logger.info("旧密钥已备份: {} -> {}", alias, backupAlias);
        }

        // 生成新密钥
        SecretKey newKey = generateAndStoreKey(alias);
        logger.info("密钥轮换完成: alias={}", alias);
        return newKey;
    }

    /**
     * 列出所有密钥别名
     */
    public List<String> listAliases() throws KeyStoreException {
        List<String> aliases = Collections.list(keyStore.aliases());
        logger.info("KeyStore 中共有 {} 个条目", aliases.size());
        return aliases;
    }

    private void save() throws IOException, KeyStoreException,
            CertificateException, NoSuchAlgorithmException {
        try (FileOutputStream fos = new FileOutputStream(keystorePath)) {
            keyStore.store(fos, keystorePassword);
        }
    }

    // ── 使用示例 ────────────────────────────────────────────────
    public static void main(String[] args) throws Exception {
        KeyStoreManager ksm = new KeyStoreManager(
            "/tmp/myapp-keys.p12", "changeit"
        );

        // 生成密钥
        SecretKey key = ksm.generateAndStoreKey("encryption-key-v1");
        System.out.println("Key algorithm: " + key.getAlgorithm());
        System.out.println("Key format: " + key.getFormat());

        // 读取密钥
        SecretKey loaded = ksm.getKey("encryption-key-v1");
        System.out.println("Keys match: " + Arrays.equals(key.getEncoded(), loaded.getEncoded()));

        // 密钥轮换
        SecretKey rotated = ksm.rotateKey("encryption-key-v1");
        System.out.println("Rotated key differs: " +
            !Arrays.equals(key.getEncoded(), rotated.getEncoded()));

        // 列出所有密钥
        System.out.println("All aliases: " + ksm.listAliases());
    }
}

Java: AWS KMS 客户端

/**
 * AWS KMS 客户端 — 加密/解密/密钥轮换
 * 依赖: software.amazon.awssdk:kms
 *
 * build.gradle:
 *   implementation 'software.amazon.awssdk:kms:2.25.0'
 */
package com.example.kms;

import software.amazon.awssdk.core.SdkBytes;
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.kms.KmsClient;
import software.amazon.awssdk.services.kms.model.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.nio.charset.StandardCharsets;
import java.util.Base64;
import java.util.Map;

public class AwsKmsService {

    private static final Logger logger = LoggerFactory.getLogger(AwsKmsService.class);

    private final KmsClient kmsClient;
    private final String keyId;

    public AwsKmsService(String keyId, Region region) {
        this.kmsClient = KmsClient.builder()
            .region(region)
            .build();
        this.keyId = keyId;
    }

    /**
     * 使用 KMS 加密数据(适合小数据 < 4KB)
     *
     * @param plaintext 明文
     * @param context 加密上下文(可选,用于 AAD)
     * @return Base64 编码的密文
     */
    public String encrypt(String plaintext, Map<String, String> context) {
        try {
            EncryptRequest.Builder builder = EncryptRequest.builder()
                .keyId(keyId)
                .plaintext(SdkBytes.fromUtf8String(plaintext));

            if (context != null && !context.isEmpty()) {
                builder.encryptionContext(context);
            }

            EncryptResponse response = kmsClient.encrypt(builder.build());
            String ciphertext = Base64.getEncoder()
                .encodeToString(response.ciphertextBlob().asByteArray());

            logger.info("KMS 加密成功, keyId={}", keyId);
            return ciphertext;

        } catch (KmsException e) {
            logger.error("KMS 加密失败: {}", e.getMessage());
            throw new RuntimeException("KMS encryption failed", e);
        }
    }

    /**
     * 使用 KMS 解密数据
     *
     * @param ciphertextBase64 Base64 编码的密文
     * @param context 加密上下文(必须与加密时一致)
     * @return 明文
     */
    public String decrypt(String ciphertextBase64, Map<String, String> context) {
        try {
            byte[] ciphertextBytes = Base64.getDecoder().decode(ciphertextBase64);

            DecryptRequest.Builder builder = DecryptRequest.builder()
                .ciphertextBlob(SdkBytes.fromByteArray(ciphertextBytes));

            if (context != null && !context.isEmpty()) {
                builder.encryptionContext(context);
            }

            DecryptResponse response = kmsClient.decrypt(builder.build());
            String plaintext = response.plaintext().asUtf8String();

            logger.info("KMS 解密成功");
            return plaintext;

        } catch (KmsException e) {
            logger.error("KMS 解密失败: {}", e.getMessage());
            throw new RuntimeException("KMS decryption failed", e);
        }
    }

    /**
     * 触发 KMS 密钥自动轮换(启用年度自动轮换)
     */
    public void enableAutoRotation() {
        try {
            kmsClient.enableKeyRotation(
                EnableKeyRotationRequest.builder()
                    .keyId(keyId)
                    .build()
            );
            logger.info("KMS 自动轮换已启用: keyId={}", keyId);
        } catch (KmsException e) {
            logger.error("启用自动轮换失败: {}", e.getMessage());
            throw new RuntimeException("Failed to enable key rotation", e);
        }
    }

    /**
     * 手动触发密钥轮换
     */
    public void manualRotation() {
        try {
            kmsClient.rotateKeyOnDemand(
                RotateKeyOnDemandRequest.builder()
                    .keyId(keyId)
                    .build()
            );
            logger.info("KMS 手动轮换已触发: keyId={}", keyId);
        } catch (KmsException e) {
            logger.error("手动轮换失败: {}", e.getMessage());
            throw new RuntimeException("Failed to rotate key", e);
        }
    }

    /**
     * 查询密钥轮换状态
     */
    public boolean isRotationEnabled() {
        try {
            GetKeyRotationStatusResponse response = kmsClient.getKeyRotationStatus(
                GetKeyRotationStatusRequest.builder()
                    .keyId(keyId)
                    .build()
            );
            return response.keyRotationEnabled();
        } catch (KmsException e) {
            logger.error("查询轮换状态失败: {}", e.getMessage());
            throw new RuntimeException("Failed to get rotation status", e);
        }
    }

    // ── 使用示例 ────────────────────────────────────────────────
    public static void main(String[] args) {
        AwsKmsService kms = new AwsKmsService(
            "arn:aws:kms:us-east-1:123456789:key/your-key-id",
            Region.US_EAST_1
        );

        Map<String, String> context = Map.of(
            "purpose", "user-data",
            "tenant", "acme-corp"
        );

        // 加密
        String encrypted = kms.encrypt("sensitive-data", context);
        System.out.println("Encrypted: " + encrypted);

        // 解密
        String decrypted = kms.decrypt(encrypted, context);
        System.out.println("Decrypted: " + decrypted);

        // 启用自动轮换
        kms.enableAutoRotation();
        System.out.println("Rotation enabled: " + kms.isRotationEnabled());
    }
}

Go: HashiCorp Vault 客户端

/*
HashiCorp Vault Go 客户端 — KV 读写、Transit 加密、动态凭证
依赖: go get github.com/hashicorp/vault/api
*/
package main

import (
	"context"
	"encoding/base64"
	"fmt"
	"log"
	"os"
	"time"

	vault "github.com/hashicorp/vault/api"
)

// VaultClient 封装 Vault 操作
type VaultClient struct {
	client *vault.Client
	ctx    context.Context
}

// NewVaultClient 创建并认证 Vault 客户端
func NewVaultClient(addr, roleID, secretID string) (*VaultClient, error) {
	config := vault.DefaultConfig()
	config.Address = addr
	// 生产环境应配置 TLS
	// config.ConfigureTLS(&vault.TLSConfig{CACert: "/path/to/ca.pem"})

	client, err := vault.NewClient(config)
	if err != nil {
		return nil, fmt.Errorf("创建 Vault 客户端失败: %w", err)
	}

	// AppRole 认证
	resp, err := client.Logical().Write("auth/approle/login", map[string]interface{}{
		"role_id":   roleID,
		"secret_id": secretID,
	})
	if err != nil {
		return nil, fmt.Errorf("Vault 认证失败: %w", err)
	}
	if resp.Auth == nil {
		return nil, fmt.Errorf("Vault 认证响应为空")
	}

	client.SetToken(resp.Auth.ClientToken)
	log.Printf("Vault 认证成功, accessor=%s", resp.Auth.Accessor)

	return &VaultClient{
		client: client,
		ctx:    context.Background(),
	}, nil
}

// ReadSecret 从 KV v2 读取密钥
func (vc *VaultClient) ReadSecret(path string) (map[string]interface{}, error) {
	secret, err := vc.client.KVv2("secret").Get(vc.ctx, path)
	if err != nil {
		return nil, fmt.Errorf("读取密钥失败 [%s]: %w", path, err)
	}
	if secret == nil || secret.Data == nil {
		return nil, fmt.Errorf("密钥不存在: %s", path)
	}
	log.Printf("密钥读取成功: %s (version=%d)", path, secret.VersionMetadata.Version)
	return secret.Data, nil
}

// WriteSecret 写入密钥到 KV v2
func (vc *VaultClient) WriteSecret(path string, data map[string]interface{}) error {
	_, err := vc.client.KVv2("secret").Put(vc.ctx, path, data)
	if err != nil {
		return fmt.Errorf("写入密钥失败 [%s]: %w", path, err)
	}
	log.Printf("密钥写入成功: %s", path)
	return nil
}

// TransitEncrypt 使用 Transit 引擎加密
func (vc *VaultClient) TransitEncrypt(keyName, plaintext string) (string, error) {
	encoded := base64.StdEncoding.EncodeToString([]byte(plaintext))
	resp, err := vc.client.Logical().Write(
		fmt.Sprintf("transit/encrypt/%s", keyName),
		map[string]interface{}{
			"plaintext": encoded,
		},
	)
	if err != nil {
		return "", fmt.Errorf("Transit 加密失败: %w", err)
	}
	ciphertext, ok := resp.Data["ciphertext"].(string)
	if !ok {
		return "", fmt.Errorf("无效的加密响应")
	}
	log.Printf("Transit 加密成功, key=%s", keyName)
	return ciphertext, nil
}

// TransitDecrypt 使用 Transit 引擎解密
func (vc *VaultClient) TransitDecrypt(keyName, ciphertext string) (string, error) {
	resp, err := vc.client.Logical().Write(
		fmt.Sprintf("transit/decrypt/%s", keyName),
		map[string]interface{}{
			"ciphertext": ciphertext,
		},
	)
	if err != nil {
		return "", fmt.Errorf("Transit 解密失败: %w", err)
	}
	encodedPlaintext, ok := resp.Data["plaintext"].(string)
	if !ok {
		return "", fmt.Errorf("无效的解密响应")
	}
	decoded, err := base64.StdEncoding.DecodeString(encodedPlaintext)
	if err != nil {
		return "", fmt.Errorf("Base64 解码失败: %w", err)
	}
	log.Printf("Transit 解密成功, key=%s", keyName)
	return string(decoded), nil
}

// GetDatabaseCredentials 获取动态数据库凭证
func (vc *VaultClient) GetDatabaseCredentials(roleName string) (string, string, time.Duration, error) {
	resp, err := vc.client.Logical().Read(
		fmt.Sprintf("database/creds/%s", roleName),
	)
	if err != nil {
		return "", "", 0, fmt.Errorf("获取数据库凭证失败: %w", err)
	}
	username, _ := resp.Data["username"].(string)
	password, _ := resp.Data["password"].(string)
	ttl := time.Duration(resp.LeaseDuration) * time.Second

	log.Printf("动态凭证已生成: user=%s, ttl=%s", username, ttl)
	return username, password, ttl, nil
}

func main() {
	vc, err := NewVaultClient(
		os.Getenv("VAULT_ADDR"),
		os.Getenv("VAULT_ROLE_ID"),
		os.Getenv("VAULT_SECRET_ID"),
	)
	if err != nil {
		log.Fatalf("初始化失败: %v", err)
	}

	// KV 读写
	err = vc.WriteSecret("myapp/config", map[string]interface{}{
		"api_key": "sk-xxxx",
		"db_host": "db.internal",
	})
	if err != nil {
		log.Fatalf("写入失败: %v", err)
	}

	data, err := vc.ReadSecret("myapp/config")
	if err != nil {
		log.Fatalf("读取失败: %v", err)
	}
	fmt.Printf("API Key: %s\n", data["api_key"])

	// Transit 加密/解密
	encrypted, err := vc.TransitEncrypt("my-key", "sensitive-data")
	if err != nil {
		log.Fatalf("加密失败: %v", err)
	}
	fmt.Printf("Encrypted: %s\n", encrypted)

	decrypted, err := vc.TransitDecrypt("my-key", encrypted)
	if err != nil {
		log.Fatalf("解密失败: %v", err)
	}
	fmt.Printf("Decrypted: %s\n", decrypted)
}

Go: 密钥轮换自动化脚本

/*
密钥轮换自动化 — 定期轮换 Vault Transit 密钥并重新加密数据
依赖: go get github.com/hashicorp/vault/api
*/
package main

import (
	"context"
	"encoding/json"
	"fmt"
	"log"
	"os"
	"os/signal"
	"syscall"
	"time"

	vault "github.com/hashicorp/vault/api"
)

// KeyRotator 自动密钥轮换器
type KeyRotator struct {
	client   *vault.Client
	ctx      context.Context
	cancel   context.CancelFunc
	interval time.Duration
}

// NewKeyRotator 创建密钥轮换器
func NewKeyRotator(vaultAddr, token string, interval time.Duration) (*KeyRotator, error) {
	config := vault.DefaultConfig()
	config.Address = vaultAddr

	client, err := vault.NewClient(config)
	if err != nil {
		return nil, fmt.Errorf("创建 Vault 客户端失败: %w", err)
	}
	client.SetToken(token)

	ctx, cancel := context.WithCancel(context.Background())
	return &KeyRotator{
		client:   client,
		ctx:      ctx,
		cancel:   cancel,
		interval: interval,
	}, nil
}

// RotateTransitKey 轮换 Transit 密钥(创建新版本)
func (kr *KeyRotator) RotateTransitKey(keyName string) error {
	_, err := kr.client.Logical().Write(
		fmt.Sprintf("transit/keys/%s/rotate", keyName),
		nil,
	)
	if err != nil {
		return fmt.Errorf("轮换 Transit 密钥失败 [%s]: %w", keyName, err)
	}
	log.Printf("✅ Transit 密钥已轮换: %s", keyName)
	return nil
}

// UpdateMinDecryptionVersion 更新最小解密版本(淘汰旧密钥版本)
func (kr *KeyRotator) UpdateMinDecryptionVersion(keyName string, minVersion int) error {
	_, err := kr.client.Logical().Write(
		fmt.Sprintf("transit/keys/%s/config", keyName),
		map[string]interface{}{
			"min_decryption_version": minVersion,
		},
	)
	if err != nil {
		return fmt.Errorf("更新最小解密版本失败: %w", err)
	}
	log.Printf("✅ 最小解密版本已更新: %s -> v%d", keyName, minVersion)
	return nil
}

// RewrapData 使用最新密钥版本重新加密数据(无需解密)
func (kr *KeyRotator) RewrapData(keyName, ciphertext string) (string, error) {
	resp, err := kr.client.Logical().Write(
		fmt.Sprintf("transit/rewrap/%s", keyName),
		map[string]interface{}{
			"ciphertext": ciphertext,
		},
	)
	if err != nil {
		return "", fmt.Errorf("重新加密失败: %w", err)
	}
	newCiphertext, ok := resp.Data["ciphertext"].(string)
	if !ok {
		return "", fmt.Errorf("无效的重新加密响应")
	}
	log.Printf("✅ 数据已用最新密钥重新加密: %s", keyName)
	return newCiphertext, nil
}

// GetKeyInfo 获取密钥信息
func (kr *KeyRotator) GetKeyInfo(keyName string) {
	resp, err := kr.client.Logical().Read(
		fmt.Sprintf("transit/keys/%s", keyName),
	)
	if err != nil {
		log.Printf("获取密钥信息失败: %v", err)
		return
	}
	info, _ := json.MarshalIndent(resp.Data, "", "  ")
	fmt.Printf("密钥信息 [%s]:\n%s\n", keyName, string(info))
}

// StartAutoRotation 启动自动轮换(后台运行)
func (kr *KeyRotator) StartAutoRotation(keyNames []string) {
	log.Printf("🔄 自动轮换已启动, 间隔=%s, 密钥=%v", kr.interval, keyNames)

	ticker := time.NewTicker(kr.interval)
	defer ticker.Stop()

	for {
		select {
		case <-kr.ctx.Done():
			log.Println("⏹️ 自动轮换已停止")
			return
		case <-ticker.C:
			for _, keyName := range keyNames {
				if err := kr.RotateTransitKey(keyName); err != nil {
					log.Printf("❌ 轮换失败 [%s]: %v", keyName, err)
				}
			}
		}
	}
}

// Stop 停止自动轮换
func (kr *KeyRotator) Stop() {
	kr.cancel()
}

func main() {
	rotator, err := NewKeyRotator(
		os.Getenv("VAULT_ADDR"),
		os.Getenv("VAULT_TOKEN"),
		24*time.Hour, // 每 24 小时轮换一次
	)
	if err != nil {
		log.Fatalf("初始化失败: %v", err)
	}

	// 查看当前密钥信息
	rotator.GetKeyInfo("my-encryption-key")

	// 手动轮换一次
	if err := rotator.RotateTransitKey("my-encryption-key"); err != nil {
		log.Fatalf("手动轮换失败: %v", err)
	}

	// 启动后台自动轮换
	go rotator.StartAutoRotation([]string{"my-encryption-key", "my-signing-key"})

	// 优雅关闭
	sigCh := make(chan os.Signal, 1)
	signal.Notify(sigCh, syscall.SIGINT, syscall.SIGTERM)
	<-sigCh
	log.Println("收到关闭信号...")
	rotator.Stop()
}

Go: SOPS 集成

/*
SOPS (Secrets OPerationS) 集成 — 加密/解密配置文件
依赖:
  - go get github.com/getsops/sops/v3
  - 需要安装 sops CLI: brew install sops
*/
package main

import (
	"encoding/json"
	"fmt"
	"log"
	"os"
	"os/exec"
	"path/filepath"
	"strings"
)

// SOPSManager 管理 SOPS 加密的配置文件
type SOPSManager struct {
	// 加密方式:可以是 AWS KMS ARN、GCP KMS、Azure Key Vault 或 age 公钥
	kmsArn    string
	ageKey    string
	configDir string
}

// NewSOPSManagerWithKMS 使用 AWS KMS 创建 SOPS 管理器
func NewSOPSManagerWithKMS(kmsArn, configDir string) *SOPSManager {
	return &SOPSManager{
		kmsArn:    kmsArn,
		configDir: configDir,
	}
}

// NewSOPSManagerWithAge 使用 age 密钥创建 SOPS 管理器
func NewSOPSManagerWithAge(agePublicKey, configDir string) *SOPSManager {
	return &SOPSManager{
		ageKey:    agePublicKey,
		configDir: configDir,
	}
}

// EncryptFile 加密配置文件
func (sm *SOPSManager) EncryptFile(inputPath, outputPath string) error {
	args := []string{"--encrypt"}

	if sm.kmsArn != "" {
		args = append(args, "--kms", sm.kmsArn)
	} else if sm.ageKey != "" {
		args = append(args, "--age", sm.ageKey)
	} else {
		return fmt.Errorf("未配置加密密钥(KMS 或 age)")
	}

	args = append(args, "--output", outputPath, inputPath)

	cmd := exec.Command("sops", args...)
	cmd.Stderr = os.Stderr
	output, err := cmd.Output()
	if err != nil {
		return fmt.Errorf("SOPS 加密失败: %w, output: %s", err, string(output))
	}

	log.Printf("✅ 文件已加密: %s -> %s", inputPath, outputPath)
	return nil
}

// DecryptFile 解密配置文件
func (sm *SOPSManager) DecryptFile(encryptedPath string) (map[string]interface{}, error) {
	cmd := exec.Command("sops", "--decrypt", encryptedPath)
	cmd.Stderr = os.Stderr
	output, err := cmd.Output()
	if err != nil {
		return nil, fmt.Errorf("SOPS 解密失败: %w", err)
	}

	var result map[string]interface{}
	ext := strings.ToLower(filepath.Ext(encryptedPath))
	switch ext {
	case ".json":
		if err := json.Unmarshal(output, &result); err != nil {
			return nil, fmt.Errorf("JSON 解析失败: %w", err)
		}
	case ".yaml", ".yml":
		// 对于 YAML,这里简化处理,实际项目中使用 gopkg.in/yaml.v3
		if err := json.Unmarshal(output, &result); err != nil {
			// 如果不是 JSON 格式,返回原始内容
			return map[string]interface{}{"raw": string(output)}, nil
		}
	default:
		return map[string]interface{}{"raw": string(output)}, nil
	}

	log.Printf("✅ 文件已解密: %s", encryptedPath)
	return result, nil
}

// DecryptToFile 解密到文件
func (sm *SOPSManager) DecryptToFile(encryptedPath, outputPath string) error {
	cmd := exec.Command("sops", "--decrypt", "--output", outputPath, encryptedPath)
	cmd.Stderr = os.Stderr
	if err := cmd.Run(); err != nil {
		return fmt.Errorf("SOPS 解密到文件失败: %w", err)
	}
	log.Printf("✅ 文件已解密: %s -> %s", encryptedPath, outputPath)
	return nil
}

// RotateKeys 轮换 SOPS 文件的数据密钥
func (sm *SOPSManager) RotateKeys(encryptedPath string) error {
	cmd := exec.Command("sops", "--rotate", "--in-place", encryptedPath)
	cmd.Stderr = os.Stderr
	if err := cmd.Run(); err != nil {
		return fmt.Errorf("SOPS 密钥轮换失败: %w", err)
	}
	log.Printf("✅ 数据密钥已轮换: %s", encryptedPath)
	return nil
}

// CreateSOPSConfig 生成 .sops.yaml 配置文件
func (sm *SOPSManager) CreateSOPSConfig() error {
	var config string
	if sm.kmsArn != "" {
		config = fmt.Sprintf(`creation_rules:
  - path_regex: \.enc\.(yaml|json)$
    kms: '%s'
  - path_regex: secrets/.*
    kms: '%s'
`, sm.kmsArn, sm.kmsArn)
	} else if sm.ageKey != "" {
		config = fmt.Sprintf(`creation_rules:
  - path_regex: \.enc\.(yaml|json)$
    age: '%s'
  - path_regex: secrets/.*
    age: '%s'
`, sm.ageKey, sm.ageKey)
	}

	configPath := filepath.Join(sm.configDir, ".sops.yaml")
	if err := os.WriteFile(configPath, []byte(config), 0644); err != nil {
		return fmt.Errorf("写入 .sops.yaml 失败: %w", err)
	}
	log.Printf("✅ .sops.yaml 已创建: %s", configPath)
	return nil
}

func main() {
	// 使用 age 密钥(适合本地开发)
	manager := NewSOPSManagerWithAge(
		"age1xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx",
		".",
	)

	// 生成 .sops.yaml 配置
	if err := manager.CreateSOPSConfig(); err != nil {
		log.Fatalf("创建配置失败: %v", err)
	}

	// 创建示例明文配置
	plainConfig := map[string]interface{}{
		"database": map[string]interface{}{
			"host":     "db.example.com",
			"port":     5432,
			"password": "super-secret-password",
		},
		"api_key": "sk-xxxxxxxxxxxx",
	}
	plainBytes, _ := json.MarshalIndent(plainConfig, "", "  ")
	os.WriteFile("/tmp/config.json", plainBytes, 0644)

	// 加密
	if err := manager.EncryptFile("/tmp/config.json", "/tmp/config.enc.json"); err != nil {
		log.Fatalf("加密失败: %v", err)
	}

	// 解密
	decrypted, err := manager.DecryptFile("/tmp/config.enc.json")
	if err != nil {
		log.Fatalf("解密失败: %v", err)
	}
	fmt.Printf("Decrypted config: %+v\n", decrypted)

	// 轮换数据密钥
	if err := manager.RotateKeys("/tmp/config.enc.json"); err != nil {
		log.Fatalf("轮换失败: %v", err)
	}
}

Vault + SPIRE 集成

# SPIRE Server 使用 Vault 作为上游 CA
UpstreamAuthority "vault" {
    plugin_data {
        vault_addr = "https://vault.example.com:8200"
        pki_mount_point = "pki"
        approle_auth_mount_point = "auth/approle"
        approle_id = "spire-role-id"
        approle_secret_id = "spire-secret-id"
    }
}

27.3 Kubernetes Secrets 安全

External Secrets Operator

# 从 Vault 同步密钥到 K8s Secret
apiVersion: external-secrets.io/v1beta1
kind: ExternalSecret
metadata:
  name: db-credentials
spec:
  refreshInterval: 1h
  secretStoreRef:
    name: vault-backend
    kind: SecretStore
  target:
    name: db-credentials
  data:
    - secretKey: username
      remoteRef:
        key: secret/data/myapp/database
        property: username
    - secretKey: password
      remoteRef:
        key: secret/data/myapp/database
        property: password

27.4 密钥轮换

零停机轮换策略

        sequenceDiagram
    participant Admin as 管理员/自动化
    participant Vault as Vault/KMS
    participant App as 应用服务
    participant DB as 数据库

    Note over Admin,DB: 阶段 1:添加新密钥
    Admin->>Vault: 生成新密钥 Key-B
    Vault-->>Admin: Key-B 已就绪
    Admin->>App: 更新配置:接受 Key-A 和 Key-B
    App-->>Admin: 配置已更新

    Note over Admin,DB: 阶段 2:切换主密钥
    Admin->>App: 设置 Key-B 为主密钥(用于加密/签名)
    App->>DB: 新数据使用 Key-B 加密
    App->>DB: 旧数据仍可用 Key-A 解密

    Note over Admin,DB: 阶段 3:重新加密历史数据(可选)
    Admin->>App: 触发批量重新加密
    App->>DB: 用 Key-B 重新加密旧数据

    Note over Admin,DB: 阶段 4:移除旧密钥
    Admin->>App: 移除 Key-A
    Admin->>Vault: 标记 Key-A 为已废弃
    Vault-->>Admin: Key-A 已销毁
    

27.5 密钥管理方案对比

特性

HashiCorp Vault

AWS KMS

Azure Key Vault

GCP Cloud KMS

部署方式

自托管 / HCP Cloud

全托管

全托管

全托管

动态密钥

✅ 数据库、云凭证、PKI

加密即服务

✅ Transit Engine

✅ Encrypt/Decrypt API

✅ Key Operations

✅ Encrypt/Decrypt

密钥轮换

✅ 自动 + 手动

✅ 年度自动 + 按需

✅ 自动 + 手动

✅ 自动 + 手动

HSM 支持

✅ Seal/Unseal

✅ CloudHSM

✅ HSM 级别

✅ Cloud HSM

PKI/证书

✅ 内置 PKI Engine

❌ (用 ACM)

❌ (用 App Service Cert)

❌ (用 Certificate Authority)

多云支持

✅ 任意环境

⚠️ AWS 生态

⚠️ Azure 生态

⚠️ GCP 生态

审计日志

✅ 详细审计

✅ CloudTrail

✅ Azure Monitor

✅ Cloud Audit

开源

✅ BSL 许可

K8s 集成

✅ CSI Driver + Injector

✅ Secrets Manager CSI

✅ CSI Driver

✅ Secret Manager

价格

自托管免费 / HCP 按需

按密钥 + API 调用

按密钥 + 操作

按密钥 + 操作

适用场景

多云、混合云、复杂需求

AWS 原生应用

Azure 原生应用

GCP 原生应用

选型建议

  • 多云/混合云 → HashiCorp Vault

  • 纯 AWS → AWS KMS + Secrets Manager

  • 纯 Azure → Azure Key Vault

  • 纯 GCP → GCP Cloud KMS + Secret Manager

  • 需要动态凭证 → HashiCorp Vault(独有优势)

27.6 凭证泄露应急响应

        flowchart TD
    A["🚨 发现凭证泄露"] --> B{"泄露渠道?"}

    B -->|"Git 仓库"| C["立即轮换泄露的凭证"]
    B -->|"日志/监控"| C
    B -->|"第三方通知"| C
    B -->|"暗网/公开"| C

    C --> D["撤销/禁用旧凭证"]
    D --> E["审计访问日志"]
    E --> F{"是否有异常访问?"}

    F -->|"是"| G["启动安全事件响应"]
    G --> H["评估影响范围"]
    H --> I["通知受影响方"]
    I --> J["取证与根因分析"]

    F -->|"否"| K["记录事件"]

    J --> L["修复根因"]
    K --> L

    L --> M["更新安全策略"]
    M --> N["加强预防措施"]
    N --> O["事后复盘"]

    subgraph Prevention["预防措施"]
        P1["Pre-commit Hook<br/>(gitleaks)"]
        P2["CI/CD 扫描<br/>(TruffleHog)"]
        P3["GitHub Secret Scanning"]
        P4["定期密钥轮换"]
    end

    O --> Prevention

    style A fill:#F44336,stroke:#333,color:#fff
    style G fill:#FF9800,stroke:#333,color:#fff
    style O fill:#4CAF50,stroke:#333,color:#fff
    style Prevention fill:#E3F2FD,stroke:#2196F3
    

应急响应清单

步骤

操作

时间要求

负责人

1

确认泄露范围和凭证类型

< 15 分钟

安全团队

2

轮换/撤销泄露的凭证

< 30 分钟

运维团队

3

审计访问日志(CloudTrail/Vault Audit)

< 1 小时

安全团队

4

评估数据泄露影响

< 2 小时

安全团队 + 业务

5

通知受影响用户(如需要)

< 24 小时

法务 + 管理层

6

根因分析与修复

< 48 小时

开发 + 安全团队

7

事后复盘与策略更新

< 1 周

全团队

27.7 密钥泄露检测

# TruffleHog — 扫描 Git 历史中的密钥
trufflehog git https://github.com/org/repo --only-verified

# GitLeaks — 快速扫描
gitleaks detect --source . --verbose

# Pre-commit Hook 防止提交密钥
# .pre-commit-config.yaml
repos:
  - repo: https://github.com/gitleaks/gitleaks
    rev: v8.18.0
    hooks:
      - id: gitleaks

27.8 小结

  • HashiCorp Vault 是最全面的密钥管理工具,支持动态密钥、PKI、加密即服务

  • 动态密钥(数据库凭证、云凭证)自动过期,大幅降低泄露风险

  • External Secrets Operator 将 Vault 密钥安全同步到 Kubernetes

  • 密钥轮换 应该自动化,使用双密钥策略实现零停机

  • Envelope Encryption 结合 KMS 与本地加密,兼顾安全性与性能

  • Pre-commit Hook + CI 扫描 防止密钥泄露到代码仓库

  • 凭证泄露应急响应 需要预案,第一时间轮换凭证、审计日志