第二十七章:密钥与凭证管理
“密钥管理是安全的最后一公里 — 再好的加密算法,密钥泄露了也白搭。”
mindmap
root((密钥管理))
HashiCorp Vault
Secret Engines
Auth Methods
动态密钥
PKI
云密钥服务
AWS KMS
Azure Key Vault
GCP Secret Manager
K8s Secrets
External Secrets
Sealed Secrets
CSI Driver
密钥轮换
零停机
自动化
泄露检测
TruffleHog
GitLeaks
27.1 密钥管理挑战
常见的密钥管理反模式:
❌ 硬编码在代码中:password = "admin123"
❌ 存储在环境变量:容器重启后可能丢失
❌ 提交到 Git:即使删除,历史中仍然存在
❌ 共享密钥:多个服务使用同一个密钥
❌ 永不轮换:密钥一用就是几年
❌ 明文传输:通过 Slack/邮件分享密钥
密钥生命周期管理
flowchart LR
A["🔑 生成<br/>Generate"] --> B["📦 分发<br/>Distribute"]
B --> C["🔒 使用<br/>Use"]
C --> D["🔄 轮换<br/>Rotate"]
D --> C
D --> E["🗑️ 销毁<br/>Destroy"]
style A fill:#4CAF50,stroke:#333,color:#fff
style B fill:#2196F3,stroke:#333,color:#fff
style C fill:#FF9800,stroke:#333,color:#fff
style D fill:#9C27B0,stroke:#333,color:#fff
style E fill:#F44336,stroke:#333,color:#fff
每个阶段的关键要求:
阶段 |
关键要求 |
工具/方法 |
|---|---|---|
生成 |
使用 CSPRNG,足够的密钥长度 |
HSM、Vault、KMS |
分发 |
加密传输,最小权限 |
TLS、Envelope Encryption |
使用 |
内存保护,访问审计 |
Vault Transit、KMS API |
轮换 |
零停机,自动化 |
双密钥策略、Vault 自动轮换 |
销毁 |
安全擦除,不可恢复 |
密钥版本过期、HSM 销毁 |
27.2 HashiCorp Vault
Vault 是最流行的密钥管理工具:
flowchart TB
subgraph Clients["客户端"]
CLI["CLI"]
API["HTTP API"]
UI["Web UI"]
end
subgraph VaultServer["Vault Server"]
direction TB
subgraph AuthMethods["Auth Methods"]
Token["Token"]
AppRole["AppRole"]
K8sAuth["Kubernetes"]
AWSAuth["AWS IAM"]
OIDC["OIDC"]
end
subgraph SecretEngines["Secret Engines"]
KV["KV v2"]
PKI["PKI"]
Transit["Transit"]
Database["Database"]
SSH["SSH"]
end
subgraph Core["核心组件"]
Policy["Policy Engine"]
Audit["Audit Log"]
Seal["Seal/Unseal"]
end
end
subgraph Storage["Storage Backend"]
Consul["Consul"]
Raft["Integrated Raft"]
PG["PostgreSQL"]
end
Clients --> AuthMethods
AuthMethods --> Policy
Policy --> SecretEngines
Core --> Storage
style VaultServer fill:#000,stroke:#FFD700,color:#fff
style AuthMethods fill:#1a1a2e,stroke:#e94560,color:#fff
style SecretEngines fill:#1a1a2e,stroke:#0f3460,color:#fff
style Core fill:#1a1a2e,stroke:#16213e,color:#fff
style Storage fill:#162447,stroke:#e94560,color:#fff
Secret Engines
Engine |
用途 |
特点 |
|---|---|---|
KV |
键值存储 |
版本化、审计 |
PKI |
证书管理 |
动态签发 X.509 证书 |
Transit |
加密即服务 |
不暴露密钥,只提供加解密 API |
Database |
数据库凭证 |
动态生成、自动过期 |
SSH |
SSH 证书 |
签发短期 SSH 证书 |
AWS/GCP/Azure |
云凭证 |
动态生成云平台凭证 |
AppRole 认证
import hvac
# 使用 AppRole 认证
client = hvac.Client(url='https://vault.example.com:8200')
client.auth.approle.login(
role_id='your-role-id',
secret_id='your-secret-id',
)
# 读取密钥
secret = client.secrets.kv.v2.read_secret_version(
path='myapp/database',
mount_point='secret',
)
db_password = secret['data']['data']['password']
# 动态数据库凭证
creds = client.secrets.database.generate_credentials(
name='my-role',
mount_point='database',
)
print(f"Username: {creds['data']['username']}")
print(f"Password: {creds['data']['password']}")
print(f"TTL: {creds['lease_duration']}s") # 自动过期!
# PKI — 签发证书
cert = client.secrets.pki.generate_certificate(
name='web-server',
common_name='api.example.com',
mount_point='pki',
)
print(f"Certificate: {cert['data']['certificate']}")
print(f"Private Key: {cert['data']['private_key']}")
Python: HashiCorp Vault 完整客户端
"""
HashiCorp Vault 客户端封装 — 支持 KV 读写、动态数据库凭证、Transit 加密
依赖: pip install hvac
"""
import hvac
import base64
import logging
from contextlib import contextmanager
from typing import Any, Optional
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class VaultClient:
"""HashiCorp Vault 客户端封装"""
def __init__(self, url: str, role_id: str, secret_id: str,
namespace: Optional[str] = None):
"""
初始化 Vault 客户端并通过 AppRole 认证
Args:
url: Vault 服务器地址
role_id: AppRole 的 Role ID
secret_id: AppRole 的 Secret ID
namespace: Vault Enterprise 命名空间(可选)
"""
self.client = hvac.Client(url=url, namespace=namespace)
try:
resp = self.client.auth.approle.login(
role_id=role_id,
secret_id=secret_id,
)
logger.info("Vault 认证成功, accessor=%s", resp['auth']['accessor'])
except hvac.exceptions.VaultError as e:
logger.error("Vault 认证失败: %s", e)
raise
# ── KV v2 操作 ──────────────────────────────────────────────
def read_secret(self, path: str, mount_point: str = "secret") -> dict:
"""
从 KV v2 引擎读取密钥
Args:
path: 密钥路径,如 "myapp/database"
mount_point: KV 引擎挂载点
Returns:
密钥数据字典
"""
try:
resp = self.client.secrets.kv.v2.read_secret_version(
path=path,
mount_point=mount_point,
)
return resp['data']['data']
except hvac.exceptions.InvalidPath:
logger.error("密钥路径不存在: %s", path)
raise
except hvac.exceptions.Forbidden:
logger.error("无权限读取: %s", path)
raise
def write_secret(self, path: str, data: dict,
mount_point: str = "secret") -> dict:
"""
写入密钥到 KV v2 引擎(自动版本化)
Args:
path: 密钥路径
data: 密钥数据
mount_point: KV 引擎挂载点
Returns:
写入响应(包含版本号)
"""
try:
resp = self.client.secrets.kv.v2.create_or_update_secret(
path=path,
secret=data,
mount_point=mount_point,
)
version = resp['data']['version']
logger.info("密钥写入成功: %s (version=%d)", path, version)
return resp
except hvac.exceptions.VaultError as e:
logger.error("密钥写入失败: %s", e)
raise
# ── 动态数据库凭证 ──────────────────────────────────────────
def get_db_credentials(self, role_name: str,
mount_point: str = "database") -> dict:
"""
获取动态数据库凭证(自动过期)
Args:
role_name: 数据库角色名
mount_point: Database 引擎挂载点
Returns:
包含 username, password, lease_id, lease_duration 的字典
"""
try:
resp = self.client.secrets.database.generate_credentials(
name=role_name,
mount_point=mount_point,
)
creds = {
'username': resp['data']['username'],
'password': resp['data']['password'],
'lease_id': resp['lease_id'],
'lease_duration': resp['lease_duration'],
}
logger.info(
"动态凭证已生成: user=%s, ttl=%ds",
creds['username'], creds['lease_duration']
)
return creds
except hvac.exceptions.VaultError as e:
logger.error("获取数据库凭证失败: %s", e)
raise
# ── Transit 加密即服务 ──────────────────────────────────────
def transit_encrypt(self, key_name: str, plaintext: str,
mount_point: str = "transit") -> str:
"""
使用 Transit 引擎加密数据(密钥永远不离开 Vault)
Args:
key_name: Transit 密钥名称
plaintext: 明文字符串
mount_point: Transit 引擎挂载点
Returns:
Vault 密文(vault:v1:... 格式)
"""
try:
# Transit 要求 base64 编码的明文
encoded = base64.b64encode(plaintext.encode()).decode()
resp = self.client.secrets.transit.encrypt_data(
name=key_name,
plaintext=encoded,
mount_point=mount_point,
)
ciphertext = resp['data']['ciphertext']
logger.info("Transit 加密成功, key=%s", key_name)
return ciphertext
except hvac.exceptions.VaultError as e:
logger.error("Transit 加密失败: %s", e)
raise
def transit_decrypt(self, key_name: str, ciphertext: str,
mount_point: str = "transit") -> str:
"""
使用 Transit 引擎解密数据
Args:
key_name: Transit 密钥名称
ciphertext: Vault 密文
mount_point: Transit 引擎挂载点
Returns:
解密后的明文字符串
"""
try:
resp = self.client.secrets.transit.decrypt_data(
name=key_name,
ciphertext=ciphertext,
mount_point=mount_point,
)
plaintext = base64.b64decode(resp['data']['plaintext']).decode()
logger.info("Transit 解密成功, key=%s", key_name)
return plaintext
except hvac.exceptions.VaultError as e:
logger.error("Transit 解密失败: %s", e)
raise
# ── 使用示例 ────────────────────────────────────────────────────
if __name__ == "__main__":
vault = VaultClient(
url="https://vault.example.com:8200",
role_id="your-role-id",
secret_id="your-secret-id",
)
# KV 读写
vault.write_secret("myapp/config", {"api_key": "sk-xxxx", "db_host": "db.internal"})
config = vault.read_secret("myapp/config")
print(f"API Key: {config['api_key']}")
# 动态数据库凭证
db_creds = vault.get_db_credentials("readonly-role")
print(f"DB User: {db_creds['username']}, TTL: {db_creds['lease_duration']}s")
# Transit 加密/解密
encrypted = vault.transit_encrypt("my-key", "sensitive-data-here")
print(f"Encrypted: {encrypted}")
decrypted = vault.transit_decrypt("my-key", encrypted)
print(f"Decrypted: {decrypted}")
Python: AWS KMS / Envelope Encryption
"""
AWS KMS Envelope Encryption 实现
依赖: pip install boto3 cryptography
"""
import boto3
import base64
import json
import os
import logging
from cryptography.hazmat.primitives.ciphers.aead import AESGCM
from typing import Tuple
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class EnvelopeEncryption:
"""
Envelope Encryption(信封加密):
1. 用 KMS 主密钥(CMK)生成数据密钥(DEK)
2. 用 DEK 在本地加密数据(高性能)
3. 用 CMK 加密 DEK 本身
4. 存储加密后的 DEK + 加密后的数据
"""
def __init__(self, kms_key_id: str, region: str = "us-east-1"):
"""
Args:
kms_key_id: AWS KMS 密钥 ID 或 ARN
region: AWS 区域
"""
self.kms_client = boto3.client('kms', region_name=region)
self.kms_key_id = kms_key_id
def encrypt(self, plaintext: bytes,
encryption_context: dict = None) -> dict:
"""
使用 Envelope Encryption 加密数据
Args:
plaintext: 要加密的数据
encryption_context: 加密上下文(用于 AAD,防止密文被挪用)
Returns:
包含 encrypted_key, nonce, ciphertext 的字典
"""
try:
# 步骤 1: 向 KMS 请求数据密钥
params = {
'KeyId': self.kms_key_id,
'KeySpec': 'AES_256',
}
if encryption_context:
params['EncryptionContext'] = encryption_context
response = self.kms_client.generate_data_key(**params)
plaintext_key = response['Plaintext'] # 明文 DEK(仅在内存中)
encrypted_key = response['CiphertextBlob'] # 加密后的 DEK
# 步骤 2: 用明文 DEK 在本地加密数据(AES-256-GCM)
nonce = os.urandom(12)
aesgcm = AESGCM(plaintext_key)
aad = json.dumps(encryption_context).encode() if encryption_context else None
ciphertext = aesgcm.encrypt(nonce, plaintext, aad)
# 步骤 3: 立即清除内存中的明文密钥
# (Python 无法保证立即清除,生产环境建议使用 C 扩展)
del plaintext_key
logger.info("Envelope 加密完成, 数据大小=%d bytes", len(plaintext))
return {
'encrypted_key': base64.b64encode(encrypted_key).decode(),
'nonce': base64.b64encode(nonce).decode(),
'ciphertext': base64.b64encode(ciphertext).decode(),
'encryption_context': encryption_context,
}
except Exception as e:
logger.error("Envelope 加密失败: %s", e)
raise
def decrypt(self, envelope: dict) -> bytes:
"""
解密 Envelope Encryption 数据
Args:
envelope: encrypt() 返回的字典
Returns:
解密后的明文数据
"""
try:
encrypted_key = base64.b64decode(envelope['encrypted_key'])
nonce = base64.b64decode(envelope['nonce'])
ciphertext = base64.b64decode(envelope['ciphertext'])
encryption_context = envelope.get('encryption_context')
# 步骤 1: 用 KMS 解密数据密钥
params = {'CiphertextBlob': encrypted_key}
if encryption_context:
params['EncryptionContext'] = encryption_context
response = self.kms_client.decrypt(**params)
plaintext_key = response['Plaintext']
# 步骤 2: 用明文 DEK 解密数据
aesgcm = AESGCM(plaintext_key)
aad = (json.dumps(encryption_context).encode()
if encryption_context else None)
plaintext = aesgcm.decrypt(nonce, ciphertext, aad)
del plaintext_key
logger.info("Envelope 解密完成")
return plaintext
except Exception as e:
logger.error("Envelope 解密失败: %s", e)
raise
# ── Envelope Encryption 原理图 ──────────────────────────────────
# 见下方 Mermaid 图表
# ── 使用示例 ────────────────────────────────────────────────────
if __name__ == "__main__":
envelope_enc = EnvelopeEncryption(
kms_key_id="arn:aws:kms:us-east-1:123456789:key/your-key-id",
region="us-east-1",
)
# 加密
data = b"This is highly sensitive data"
context = {"purpose": "user-data", "tenant": "acme-corp"}
envelope = envelope_enc.encrypt(data, encryption_context=context)
print(f"Encrypted envelope: {json.dumps(envelope, indent=2)}")
# 解密
decrypted = envelope_enc.decrypt(envelope)
print(f"Decrypted: {decrypted.decode()}")
Envelope Encryption 原理图
flowchart TB
subgraph KMS["☁️ AWS KMS"]
CMK["主密钥 CMK<br/>(永远不离开 KMS)"]
end
subgraph Encrypt["加密流程"]
direction TB
E1["1️⃣ 请求 GenerateDataKey"] --> E2["2️⃣ KMS 返回<br/>明文 DEK + 加密 DEK"]
E2 --> E3["3️⃣ 用明文 DEK<br/>本地 AES-GCM 加密数据"]
E3 --> E4["4️⃣ 丢弃明文 DEK"]
E4 --> E5["5️⃣ 存储:加密 DEK + 密文"]
end
subgraph Decrypt["解密流程"]
direction TB
D1["1️⃣ 读取加密 DEK"] --> D2["2️⃣ 发送加密 DEK 到 KMS"]
D2 --> D3["3️⃣ KMS 返回明文 DEK"]
D3 --> D4["4️⃣ 用明文 DEK<br/>本地 AES-GCM 解密"]
D4 --> D5["5️⃣ 丢弃明文 DEK"]
end
CMK -.->|生成 DEK| E1
CMK -.->|解密 DEK| D2
style KMS fill:#FF9900,stroke:#333,color:#fff
style Encrypt fill:#e8f5e9,stroke:#4CAF50
style Decrypt fill:#e3f2fd,stroke:#2196F3
Python: 本地密钥安全存储(keyring 库)
"""
使用 keyring 库安全存储本地凭证
依赖: pip install keyring
- macOS: 使用 Keychain
- Linux: 使用 Secret Service (GNOME Keyring / KWallet)
- Windows: 使用 Windows Credential Locker
"""
import keyring
import logging
import json
from typing import Optional
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
# 服务名称(用于在系统密钥链中分组)
SERVICE_NAME = "myapp-credentials"
def store_credential(key: str, value: str) -> None:
"""
安全存储凭证到系统密钥链
Args:
key: 凭证名称(如 "api_key", "db_password")
value: 凭证值
"""
try:
keyring.set_password(SERVICE_NAME, key, value)
logger.info("凭证已安全存储: %s", key)
except keyring.errors.KeyringError as e:
logger.error("存储凭证失败: %s", e)
raise
def get_credential(key: str) -> Optional[str]:
"""
从系统密钥链读取凭证
Args:
key: 凭证名称
Returns:
凭证值,不存在则返回 None
"""
try:
value = keyring.get_password(SERVICE_NAME, key)
if value is None:
logger.warning("凭证不存在: %s", key)
return value
except keyring.errors.KeyringError as e:
logger.error("读取凭证失败: %s", e)
raise
def delete_credential(key: str) -> None:
"""
从系统密钥链删除凭证
Args:
key: 凭证名称
"""
try:
keyring.delete_password(SERVICE_NAME, key)
logger.info("凭证已删除: %s", key)
except keyring.errors.PasswordDeleteError:
logger.warning("凭证不存在,无需删除: %s", key)
except keyring.errors.KeyringError as e:
logger.error("删除凭证失败: %s", e)
raise
def store_json_credential(key: str, data: dict) -> None:
"""存储 JSON 格式的复合凭证"""
store_credential(key, json.dumps(data))
def get_json_credential(key: str) -> Optional[dict]:
"""读取 JSON 格式的复合凭证"""
value = get_credential(key)
return json.loads(value) if value else None
# ── 使用示例 ────────────────────────────────────────────────────
if __name__ == "__main__":
# 存储 API 密钥
store_credential("github_token", "ghp_xxxxxxxxxxxxxxxxxxxx")
# 读取 API 密钥
token = get_credential("github_token")
print(f"GitHub Token: {token[:10]}..." if token else "Not found")
# 存储复合凭证
store_json_credential("db_config", {
"host": "db.example.com",
"port": 5432,
"username": "app_user",
"password": "s3cret!",
})
# 读取复合凭证
db_config = get_json_credential("db_config")
if db_config:
print(f"DB Host: {db_config['host']}")
# 清理
delete_credential("github_token")
Java: Spring Vault 集成
/**
* Spring Vault 集成 — 使用 VaultTemplate 读写密钥
* 依赖: spring-vault-core
*
* build.gradle:
* implementation 'org.springframework.vault:spring-vault-core:3.1.1'
*/
package com.example.vault;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.vault.core.VaultTemplate;
import org.springframework.vault.core.VaultTransitOperations;
import org.springframework.vault.support.Ciphertext;
import org.springframework.vault.support.Plaintext;
import org.springframework.vault.support.VaultResponseSupport;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Map;
import java.util.Objects;
@Service
public class VaultSecretService {
private static final Logger logger = LoggerFactory.getLogger(VaultSecretService.class);
private final VaultTemplate vaultTemplate;
@Autowired
public VaultSecretService(VaultTemplate vaultTemplate) {
this.vaultTemplate = vaultTemplate;
}
// ── KV v2 读写 ──────────────────────────────────────────────
/**
* 写入密钥到 KV v2 引擎
*
* @param path 密钥路径,如 "myapp/database"
* @param secrets 密钥键值对
*/
public void writeSecret(String path, Map<String, String> secrets) {
try {
vaultTemplate.opsForKeyValue("secret").put(path, secrets);
logger.info("密钥写入成功: {}", path);
} catch (Exception e) {
logger.error("密钥写入失败: {}", path, e);
throw new RuntimeException("Failed to write secret: " + path, e);
}
}
/**
* 从 KV v2 引擎读取密钥
*
* @param path 密钥路径
* @param clazz 目标类型
* @return 密钥数据对象
*/
public <T> T readSecret(String path, Class<T> clazz) {
try {
VaultResponseSupport<T> response =
vaultTemplate.opsForKeyValue("secret").get(path, clazz);
if (response == null || response.getData() == null) {
logger.warn("密钥不存在: {}", path);
return null;
}
logger.info("密钥读取成功: {}", path);
return response.getData();
} catch (Exception e) {
logger.error("密钥读取失败: {}", path, e);
throw new RuntimeException("Failed to read secret: " + path, e);
}
}
// ── Transit 加密即服务 ──────────────────────────────────────
/**
* 使用 Transit 引擎加密
*
* @param keyName Transit 密钥名称
* @param data 明文数据
* @return 密文
*/
public String transitEncrypt(String keyName, String data) {
try {
VaultTransitOperations transit = vaultTemplate.opsForTransit();
Ciphertext ciphertext = transit.encrypt(keyName, Plaintext.of(data));
logger.info("Transit 加密成功, key={}", keyName);
return Objects.requireNonNull(ciphertext).getCiphertext();
} catch (Exception e) {
logger.error("Transit 加密失败, key={}", keyName, e);
throw new RuntimeException("Transit encryption failed", e);
}
}
/**
* 使用 Transit 引擎解密
*
* @param keyName Transit 密钥名称
* @param ciphertext 密文
* @return 明文
*/
public String transitDecrypt(String keyName, String ciphertext) {
try {
VaultTransitOperations transit = vaultTemplate.opsForTransit();
Plaintext plaintext = transit.decrypt(keyName, Ciphertext.of(ciphertext));
logger.info("Transit 解密成功, key={}", keyName);
return Objects.requireNonNull(plaintext).asString();
} catch (Exception e) {
logger.error("Transit 解密失败, key={}", keyName, e);
throw new RuntimeException("Transit decryption failed", e);
}
}
}
Java: KeyStore 管理(PKCS12)
/**
* Java KeyStore 管理 — PKCS12 创建、读取、密钥轮换
*/
package com.example.keystore;
import javax.crypto.KeyGenerator;
import javax.crypto.SecretKey;
import javax.crypto.spec.SecretKeySpec;
import java.io.*;
import java.security.*;
import java.security.cert.CertificateException;
import java.time.Instant;
import java.util.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class KeyStoreManager {
private static final Logger logger = LoggerFactory.getLogger(KeyStoreManager.class);
private static final String KEYSTORE_TYPE = "PKCS12";
private final String keystorePath;
private final char[] keystorePassword;
private KeyStore keyStore;
public KeyStoreManager(String keystorePath, String keystorePassword)
throws KeyStoreException, IOException, CertificateException, NoSuchAlgorithmException {
this.keystorePath = keystorePath;
this.keystorePassword = keystorePassword.toCharArray();
this.keyStore = KeyStore.getInstance(KEYSTORE_TYPE);
File file = new File(keystorePath);
if (file.exists()) {
// 加载已有 KeyStore
try (FileInputStream fis = new FileInputStream(file)) {
keyStore.load(fis, this.keystorePassword);
logger.info("KeyStore 已加载: {}", keystorePath);
}
} else {
// 创建新 KeyStore
keyStore.load(null, this.keystorePassword);
save();
logger.info("KeyStore 已创建: {}", keystorePath);
}
}
/**
* 生成并存储 AES-256 密钥
*
* @param alias 密钥别名
* @return 生成的密钥
*/
public SecretKey generateAndStoreKey(String alias)
throws NoSuchAlgorithmException, KeyStoreException, IOException,
CertificateException {
KeyGenerator keyGen = KeyGenerator.getInstance("AES");
keyGen.init(256, new SecureRandom());
SecretKey secretKey = keyGen.generateKey();
KeyStore.SecretKeyEntry entry = new KeyStore.SecretKeyEntry(secretKey);
KeyStore.ProtectionParameter protection =
new KeyStore.PasswordProtection(keystorePassword);
keyStore.setEntry(alias, entry, protection);
save();
logger.info("AES-256 密钥已生成并存储: alias={}", alias);
return secretKey;
}
/**
* 读取密钥
*
* @param alias 密钥别名
* @return 密钥,不存在返回 null
*/
public SecretKey getKey(String alias)
throws UnrecoverableKeyException, KeyStoreException, NoSuchAlgorithmException {
Key key = keyStore.getKey(alias, keystorePassword);
if (key instanceof SecretKey) {
logger.info("密钥已读取: alias={}", alias);
return (SecretKey) key;
}
logger.warn("密钥不存在或类型不匹配: alias={}", alias);
return null;
}
/**
* 密钥轮换 — 生成新密钥,保留旧密钥用于解密
*
* @param alias 密钥别名
* @return 新密钥
*/
public SecretKey rotateKey(String alias)
throws Exception {
// 备份旧密钥(添加时间戳后缀)
SecretKey oldKey = getKey(alias);
if (oldKey != null) {
String backupAlias = alias + "-rotated-" + Instant.now().getEpochSecond();
KeyStore.SecretKeyEntry entry = new KeyStore.SecretKeyEntry(oldKey);
KeyStore.ProtectionParameter protection =
new KeyStore.PasswordProtection(keystorePassword);
keyStore.setEntry(backupAlias, entry, protection);
logger.info("旧密钥已备份: {} -> {}", alias, backupAlias);
}
// 生成新密钥
SecretKey newKey = generateAndStoreKey(alias);
logger.info("密钥轮换完成: alias={}", alias);
return newKey;
}
/**
* 列出所有密钥别名
*/
public List<String> listAliases() throws KeyStoreException {
List<String> aliases = Collections.list(keyStore.aliases());
logger.info("KeyStore 中共有 {} 个条目", aliases.size());
return aliases;
}
private void save() throws IOException, KeyStoreException,
CertificateException, NoSuchAlgorithmException {
try (FileOutputStream fos = new FileOutputStream(keystorePath)) {
keyStore.store(fos, keystorePassword);
}
}
// ── 使用示例 ────────────────────────────────────────────────
public static void main(String[] args) throws Exception {
KeyStoreManager ksm = new KeyStoreManager(
"/tmp/myapp-keys.p12", "changeit"
);
// 生成密钥
SecretKey key = ksm.generateAndStoreKey("encryption-key-v1");
System.out.println("Key algorithm: " + key.getAlgorithm());
System.out.println("Key format: " + key.getFormat());
// 读取密钥
SecretKey loaded = ksm.getKey("encryption-key-v1");
System.out.println("Keys match: " + Arrays.equals(key.getEncoded(), loaded.getEncoded()));
// 密钥轮换
SecretKey rotated = ksm.rotateKey("encryption-key-v1");
System.out.println("Rotated key differs: " +
!Arrays.equals(key.getEncoded(), rotated.getEncoded()));
// 列出所有密钥
System.out.println("All aliases: " + ksm.listAliases());
}
}
Java: AWS KMS 客户端
/**
* AWS KMS 客户端 — 加密/解密/密钥轮换
* 依赖: software.amazon.awssdk:kms
*
* build.gradle:
* implementation 'software.amazon.awssdk:kms:2.25.0'
*/
package com.example.kms;
import software.amazon.awssdk.core.SdkBytes;
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.kms.KmsClient;
import software.amazon.awssdk.services.kms.model.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.nio.charset.StandardCharsets;
import java.util.Base64;
import java.util.Map;
public class AwsKmsService {
private static final Logger logger = LoggerFactory.getLogger(AwsKmsService.class);
private final KmsClient kmsClient;
private final String keyId;
public AwsKmsService(String keyId, Region region) {
this.kmsClient = KmsClient.builder()
.region(region)
.build();
this.keyId = keyId;
}
/**
* 使用 KMS 加密数据(适合小数据 < 4KB)
*
* @param plaintext 明文
* @param context 加密上下文(可选,用于 AAD)
* @return Base64 编码的密文
*/
public String encrypt(String plaintext, Map<String, String> context) {
try {
EncryptRequest.Builder builder = EncryptRequest.builder()
.keyId(keyId)
.plaintext(SdkBytes.fromUtf8String(plaintext));
if (context != null && !context.isEmpty()) {
builder.encryptionContext(context);
}
EncryptResponse response = kmsClient.encrypt(builder.build());
String ciphertext = Base64.getEncoder()
.encodeToString(response.ciphertextBlob().asByteArray());
logger.info("KMS 加密成功, keyId={}", keyId);
return ciphertext;
} catch (KmsException e) {
logger.error("KMS 加密失败: {}", e.getMessage());
throw new RuntimeException("KMS encryption failed", e);
}
}
/**
* 使用 KMS 解密数据
*
* @param ciphertextBase64 Base64 编码的密文
* @param context 加密上下文(必须与加密时一致)
* @return 明文
*/
public String decrypt(String ciphertextBase64, Map<String, String> context) {
try {
byte[] ciphertextBytes = Base64.getDecoder().decode(ciphertextBase64);
DecryptRequest.Builder builder = DecryptRequest.builder()
.ciphertextBlob(SdkBytes.fromByteArray(ciphertextBytes));
if (context != null && !context.isEmpty()) {
builder.encryptionContext(context);
}
DecryptResponse response = kmsClient.decrypt(builder.build());
String plaintext = response.plaintext().asUtf8String();
logger.info("KMS 解密成功");
return plaintext;
} catch (KmsException e) {
logger.error("KMS 解密失败: {}", e.getMessage());
throw new RuntimeException("KMS decryption failed", e);
}
}
/**
* 触发 KMS 密钥自动轮换(启用年度自动轮换)
*/
public void enableAutoRotation() {
try {
kmsClient.enableKeyRotation(
EnableKeyRotationRequest.builder()
.keyId(keyId)
.build()
);
logger.info("KMS 自动轮换已启用: keyId={}", keyId);
} catch (KmsException e) {
logger.error("启用自动轮换失败: {}", e.getMessage());
throw new RuntimeException("Failed to enable key rotation", e);
}
}
/**
* 手动触发密钥轮换
*/
public void manualRotation() {
try {
kmsClient.rotateKeyOnDemand(
RotateKeyOnDemandRequest.builder()
.keyId(keyId)
.build()
);
logger.info("KMS 手动轮换已触发: keyId={}", keyId);
} catch (KmsException e) {
logger.error("手动轮换失败: {}", e.getMessage());
throw new RuntimeException("Failed to rotate key", e);
}
}
/**
* 查询密钥轮换状态
*/
public boolean isRotationEnabled() {
try {
GetKeyRotationStatusResponse response = kmsClient.getKeyRotationStatus(
GetKeyRotationStatusRequest.builder()
.keyId(keyId)
.build()
);
return response.keyRotationEnabled();
} catch (KmsException e) {
logger.error("查询轮换状态失败: {}", e.getMessage());
throw new RuntimeException("Failed to get rotation status", e);
}
}
// ── 使用示例 ────────────────────────────────────────────────
public static void main(String[] args) {
AwsKmsService kms = new AwsKmsService(
"arn:aws:kms:us-east-1:123456789:key/your-key-id",
Region.US_EAST_1
);
Map<String, String> context = Map.of(
"purpose", "user-data",
"tenant", "acme-corp"
);
// 加密
String encrypted = kms.encrypt("sensitive-data", context);
System.out.println("Encrypted: " + encrypted);
// 解密
String decrypted = kms.decrypt(encrypted, context);
System.out.println("Decrypted: " + decrypted);
// 启用自动轮换
kms.enableAutoRotation();
System.out.println("Rotation enabled: " + kms.isRotationEnabled());
}
}
Go: HashiCorp Vault 客户端
/*
HashiCorp Vault Go 客户端 — KV 读写、Transit 加密、动态凭证
依赖: go get github.com/hashicorp/vault/api
*/
package main
import (
"context"
"encoding/base64"
"fmt"
"log"
"os"
"time"
vault "github.com/hashicorp/vault/api"
)
// VaultClient 封装 Vault 操作
type VaultClient struct {
client *vault.Client
ctx context.Context
}
// NewVaultClient 创建并认证 Vault 客户端
func NewVaultClient(addr, roleID, secretID string) (*VaultClient, error) {
config := vault.DefaultConfig()
config.Address = addr
// 生产环境应配置 TLS
// config.ConfigureTLS(&vault.TLSConfig{CACert: "/path/to/ca.pem"})
client, err := vault.NewClient(config)
if err != nil {
return nil, fmt.Errorf("创建 Vault 客户端失败: %w", err)
}
// AppRole 认证
resp, err := client.Logical().Write("auth/approle/login", map[string]interface{}{
"role_id": roleID,
"secret_id": secretID,
})
if err != nil {
return nil, fmt.Errorf("Vault 认证失败: %w", err)
}
if resp.Auth == nil {
return nil, fmt.Errorf("Vault 认证响应为空")
}
client.SetToken(resp.Auth.ClientToken)
log.Printf("Vault 认证成功, accessor=%s", resp.Auth.Accessor)
return &VaultClient{
client: client,
ctx: context.Background(),
}, nil
}
// ReadSecret 从 KV v2 读取密钥
func (vc *VaultClient) ReadSecret(path string) (map[string]interface{}, error) {
secret, err := vc.client.KVv2("secret").Get(vc.ctx, path)
if err != nil {
return nil, fmt.Errorf("读取密钥失败 [%s]: %w", path, err)
}
if secret == nil || secret.Data == nil {
return nil, fmt.Errorf("密钥不存在: %s", path)
}
log.Printf("密钥读取成功: %s (version=%d)", path, secret.VersionMetadata.Version)
return secret.Data, nil
}
// WriteSecret 写入密钥到 KV v2
func (vc *VaultClient) WriteSecret(path string, data map[string]interface{}) error {
_, err := vc.client.KVv2("secret").Put(vc.ctx, path, data)
if err != nil {
return fmt.Errorf("写入密钥失败 [%s]: %w", path, err)
}
log.Printf("密钥写入成功: %s", path)
return nil
}
// TransitEncrypt 使用 Transit 引擎加密
func (vc *VaultClient) TransitEncrypt(keyName, plaintext string) (string, error) {
encoded := base64.StdEncoding.EncodeToString([]byte(plaintext))
resp, err := vc.client.Logical().Write(
fmt.Sprintf("transit/encrypt/%s", keyName),
map[string]interface{}{
"plaintext": encoded,
},
)
if err != nil {
return "", fmt.Errorf("Transit 加密失败: %w", err)
}
ciphertext, ok := resp.Data["ciphertext"].(string)
if !ok {
return "", fmt.Errorf("无效的加密响应")
}
log.Printf("Transit 加密成功, key=%s", keyName)
return ciphertext, nil
}
// TransitDecrypt 使用 Transit 引擎解密
func (vc *VaultClient) TransitDecrypt(keyName, ciphertext string) (string, error) {
resp, err := vc.client.Logical().Write(
fmt.Sprintf("transit/decrypt/%s", keyName),
map[string]interface{}{
"ciphertext": ciphertext,
},
)
if err != nil {
return "", fmt.Errorf("Transit 解密失败: %w", err)
}
encodedPlaintext, ok := resp.Data["plaintext"].(string)
if !ok {
return "", fmt.Errorf("无效的解密响应")
}
decoded, err := base64.StdEncoding.DecodeString(encodedPlaintext)
if err != nil {
return "", fmt.Errorf("Base64 解码失败: %w", err)
}
log.Printf("Transit 解密成功, key=%s", keyName)
return string(decoded), nil
}
// GetDatabaseCredentials 获取动态数据库凭证
func (vc *VaultClient) GetDatabaseCredentials(roleName string) (string, string, time.Duration, error) {
resp, err := vc.client.Logical().Read(
fmt.Sprintf("database/creds/%s", roleName),
)
if err != nil {
return "", "", 0, fmt.Errorf("获取数据库凭证失败: %w", err)
}
username, _ := resp.Data["username"].(string)
password, _ := resp.Data["password"].(string)
ttl := time.Duration(resp.LeaseDuration) * time.Second
log.Printf("动态凭证已生成: user=%s, ttl=%s", username, ttl)
return username, password, ttl, nil
}
func main() {
vc, err := NewVaultClient(
os.Getenv("VAULT_ADDR"),
os.Getenv("VAULT_ROLE_ID"),
os.Getenv("VAULT_SECRET_ID"),
)
if err != nil {
log.Fatalf("初始化失败: %v", err)
}
// KV 读写
err = vc.WriteSecret("myapp/config", map[string]interface{}{
"api_key": "sk-xxxx",
"db_host": "db.internal",
})
if err != nil {
log.Fatalf("写入失败: %v", err)
}
data, err := vc.ReadSecret("myapp/config")
if err != nil {
log.Fatalf("读取失败: %v", err)
}
fmt.Printf("API Key: %s\n", data["api_key"])
// Transit 加密/解密
encrypted, err := vc.TransitEncrypt("my-key", "sensitive-data")
if err != nil {
log.Fatalf("加密失败: %v", err)
}
fmt.Printf("Encrypted: %s\n", encrypted)
decrypted, err := vc.TransitDecrypt("my-key", encrypted)
if err != nil {
log.Fatalf("解密失败: %v", err)
}
fmt.Printf("Decrypted: %s\n", decrypted)
}
Go: 密钥轮换自动化脚本
/*
密钥轮换自动化 — 定期轮换 Vault Transit 密钥并重新加密数据
依赖: go get github.com/hashicorp/vault/api
*/
package main
import (
"context"
"encoding/json"
"fmt"
"log"
"os"
"os/signal"
"syscall"
"time"
vault "github.com/hashicorp/vault/api"
)
// KeyRotator 自动密钥轮换器
type KeyRotator struct {
client *vault.Client
ctx context.Context
cancel context.CancelFunc
interval time.Duration
}
// NewKeyRotator 创建密钥轮换器
func NewKeyRotator(vaultAddr, token string, interval time.Duration) (*KeyRotator, error) {
config := vault.DefaultConfig()
config.Address = vaultAddr
client, err := vault.NewClient(config)
if err != nil {
return nil, fmt.Errorf("创建 Vault 客户端失败: %w", err)
}
client.SetToken(token)
ctx, cancel := context.WithCancel(context.Background())
return &KeyRotator{
client: client,
ctx: ctx,
cancel: cancel,
interval: interval,
}, nil
}
// RotateTransitKey 轮换 Transit 密钥(创建新版本)
func (kr *KeyRotator) RotateTransitKey(keyName string) error {
_, err := kr.client.Logical().Write(
fmt.Sprintf("transit/keys/%s/rotate", keyName),
nil,
)
if err != nil {
return fmt.Errorf("轮换 Transit 密钥失败 [%s]: %w", keyName, err)
}
log.Printf("✅ Transit 密钥已轮换: %s", keyName)
return nil
}
// UpdateMinDecryptionVersion 更新最小解密版本(淘汰旧密钥版本)
func (kr *KeyRotator) UpdateMinDecryptionVersion(keyName string, minVersion int) error {
_, err := kr.client.Logical().Write(
fmt.Sprintf("transit/keys/%s/config", keyName),
map[string]interface{}{
"min_decryption_version": minVersion,
},
)
if err != nil {
return fmt.Errorf("更新最小解密版本失败: %w", err)
}
log.Printf("✅ 最小解密版本已更新: %s -> v%d", keyName, minVersion)
return nil
}
// RewrapData 使用最新密钥版本重新加密数据(无需解密)
func (kr *KeyRotator) RewrapData(keyName, ciphertext string) (string, error) {
resp, err := kr.client.Logical().Write(
fmt.Sprintf("transit/rewrap/%s", keyName),
map[string]interface{}{
"ciphertext": ciphertext,
},
)
if err != nil {
return "", fmt.Errorf("重新加密失败: %w", err)
}
newCiphertext, ok := resp.Data["ciphertext"].(string)
if !ok {
return "", fmt.Errorf("无效的重新加密响应")
}
log.Printf("✅ 数据已用最新密钥重新加密: %s", keyName)
return newCiphertext, nil
}
// GetKeyInfo 获取密钥信息
func (kr *KeyRotator) GetKeyInfo(keyName string) {
resp, err := kr.client.Logical().Read(
fmt.Sprintf("transit/keys/%s", keyName),
)
if err != nil {
log.Printf("获取密钥信息失败: %v", err)
return
}
info, _ := json.MarshalIndent(resp.Data, "", " ")
fmt.Printf("密钥信息 [%s]:\n%s\n", keyName, string(info))
}
// StartAutoRotation 启动自动轮换(后台运行)
func (kr *KeyRotator) StartAutoRotation(keyNames []string) {
log.Printf("🔄 自动轮换已启动, 间隔=%s, 密钥=%v", kr.interval, keyNames)
ticker := time.NewTicker(kr.interval)
defer ticker.Stop()
for {
select {
case <-kr.ctx.Done():
log.Println("⏹️ 自动轮换已停止")
return
case <-ticker.C:
for _, keyName := range keyNames {
if err := kr.RotateTransitKey(keyName); err != nil {
log.Printf("❌ 轮换失败 [%s]: %v", keyName, err)
}
}
}
}
}
// Stop 停止自动轮换
func (kr *KeyRotator) Stop() {
kr.cancel()
}
func main() {
rotator, err := NewKeyRotator(
os.Getenv("VAULT_ADDR"),
os.Getenv("VAULT_TOKEN"),
24*time.Hour, // 每 24 小时轮换一次
)
if err != nil {
log.Fatalf("初始化失败: %v", err)
}
// 查看当前密钥信息
rotator.GetKeyInfo("my-encryption-key")
// 手动轮换一次
if err := rotator.RotateTransitKey("my-encryption-key"); err != nil {
log.Fatalf("手动轮换失败: %v", err)
}
// 启动后台自动轮换
go rotator.StartAutoRotation([]string{"my-encryption-key", "my-signing-key"})
// 优雅关闭
sigCh := make(chan os.Signal, 1)
signal.Notify(sigCh, syscall.SIGINT, syscall.SIGTERM)
<-sigCh
log.Println("收到关闭信号...")
rotator.Stop()
}
Go: SOPS 集成
/*
SOPS (Secrets OPerationS) 集成 — 加密/解密配置文件
依赖:
- go get github.com/getsops/sops/v3
- 需要安装 sops CLI: brew install sops
*/
package main
import (
"encoding/json"
"fmt"
"log"
"os"
"os/exec"
"path/filepath"
"strings"
)
// SOPSManager 管理 SOPS 加密的配置文件
type SOPSManager struct {
// 加密方式:可以是 AWS KMS ARN、GCP KMS、Azure Key Vault 或 age 公钥
kmsArn string
ageKey string
configDir string
}
// NewSOPSManagerWithKMS 使用 AWS KMS 创建 SOPS 管理器
func NewSOPSManagerWithKMS(kmsArn, configDir string) *SOPSManager {
return &SOPSManager{
kmsArn: kmsArn,
configDir: configDir,
}
}
// NewSOPSManagerWithAge 使用 age 密钥创建 SOPS 管理器
func NewSOPSManagerWithAge(agePublicKey, configDir string) *SOPSManager {
return &SOPSManager{
ageKey: agePublicKey,
configDir: configDir,
}
}
// EncryptFile 加密配置文件
func (sm *SOPSManager) EncryptFile(inputPath, outputPath string) error {
args := []string{"--encrypt"}
if sm.kmsArn != "" {
args = append(args, "--kms", sm.kmsArn)
} else if sm.ageKey != "" {
args = append(args, "--age", sm.ageKey)
} else {
return fmt.Errorf("未配置加密密钥(KMS 或 age)")
}
args = append(args, "--output", outputPath, inputPath)
cmd := exec.Command("sops", args...)
cmd.Stderr = os.Stderr
output, err := cmd.Output()
if err != nil {
return fmt.Errorf("SOPS 加密失败: %w, output: %s", err, string(output))
}
log.Printf("✅ 文件已加密: %s -> %s", inputPath, outputPath)
return nil
}
// DecryptFile 解密配置文件
func (sm *SOPSManager) DecryptFile(encryptedPath string) (map[string]interface{}, error) {
cmd := exec.Command("sops", "--decrypt", encryptedPath)
cmd.Stderr = os.Stderr
output, err := cmd.Output()
if err != nil {
return nil, fmt.Errorf("SOPS 解密失败: %w", err)
}
var result map[string]interface{}
ext := strings.ToLower(filepath.Ext(encryptedPath))
switch ext {
case ".json":
if err := json.Unmarshal(output, &result); err != nil {
return nil, fmt.Errorf("JSON 解析失败: %w", err)
}
case ".yaml", ".yml":
// 对于 YAML,这里简化处理,实际项目中使用 gopkg.in/yaml.v3
if err := json.Unmarshal(output, &result); err != nil {
// 如果不是 JSON 格式,返回原始内容
return map[string]interface{}{"raw": string(output)}, nil
}
default:
return map[string]interface{}{"raw": string(output)}, nil
}
log.Printf("✅ 文件已解密: %s", encryptedPath)
return result, nil
}
// DecryptToFile 解密到文件
func (sm *SOPSManager) DecryptToFile(encryptedPath, outputPath string) error {
cmd := exec.Command("sops", "--decrypt", "--output", outputPath, encryptedPath)
cmd.Stderr = os.Stderr
if err := cmd.Run(); err != nil {
return fmt.Errorf("SOPS 解密到文件失败: %w", err)
}
log.Printf("✅ 文件已解密: %s -> %s", encryptedPath, outputPath)
return nil
}
// RotateKeys 轮换 SOPS 文件的数据密钥
func (sm *SOPSManager) RotateKeys(encryptedPath string) error {
cmd := exec.Command("sops", "--rotate", "--in-place", encryptedPath)
cmd.Stderr = os.Stderr
if err := cmd.Run(); err != nil {
return fmt.Errorf("SOPS 密钥轮换失败: %w", err)
}
log.Printf("✅ 数据密钥已轮换: %s", encryptedPath)
return nil
}
// CreateSOPSConfig 生成 .sops.yaml 配置文件
func (sm *SOPSManager) CreateSOPSConfig() error {
var config string
if sm.kmsArn != "" {
config = fmt.Sprintf(`creation_rules:
- path_regex: \.enc\.(yaml|json)$
kms: '%s'
- path_regex: secrets/.*
kms: '%s'
`, sm.kmsArn, sm.kmsArn)
} else if sm.ageKey != "" {
config = fmt.Sprintf(`creation_rules:
- path_regex: \.enc\.(yaml|json)$
age: '%s'
- path_regex: secrets/.*
age: '%s'
`, sm.ageKey, sm.ageKey)
}
configPath := filepath.Join(sm.configDir, ".sops.yaml")
if err := os.WriteFile(configPath, []byte(config), 0644); err != nil {
return fmt.Errorf("写入 .sops.yaml 失败: %w", err)
}
log.Printf("✅ .sops.yaml 已创建: %s", configPath)
return nil
}
func main() {
// 使用 age 密钥(适合本地开发)
manager := NewSOPSManagerWithAge(
"age1xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx",
".",
)
// 生成 .sops.yaml 配置
if err := manager.CreateSOPSConfig(); err != nil {
log.Fatalf("创建配置失败: %v", err)
}
// 创建示例明文配置
plainConfig := map[string]interface{}{
"database": map[string]interface{}{
"host": "db.example.com",
"port": 5432,
"password": "super-secret-password",
},
"api_key": "sk-xxxxxxxxxxxx",
}
plainBytes, _ := json.MarshalIndent(plainConfig, "", " ")
os.WriteFile("/tmp/config.json", plainBytes, 0644)
// 加密
if err := manager.EncryptFile("/tmp/config.json", "/tmp/config.enc.json"); err != nil {
log.Fatalf("加密失败: %v", err)
}
// 解密
decrypted, err := manager.DecryptFile("/tmp/config.enc.json")
if err != nil {
log.Fatalf("解密失败: %v", err)
}
fmt.Printf("Decrypted config: %+v\n", decrypted)
// 轮换数据密钥
if err := manager.RotateKeys("/tmp/config.enc.json"); err != nil {
log.Fatalf("轮换失败: %v", err)
}
}
Vault + SPIRE 集成
# SPIRE Server 使用 Vault 作为上游 CA
UpstreamAuthority "vault" {
plugin_data {
vault_addr = "https://vault.example.com:8200"
pki_mount_point = "pki"
approle_auth_mount_point = "auth/approle"
approle_id = "spire-role-id"
approle_secret_id = "spire-secret-id"
}
}
27.3 Kubernetes Secrets 安全
External Secrets Operator
# 从 Vault 同步密钥到 K8s Secret
apiVersion: external-secrets.io/v1beta1
kind: ExternalSecret
metadata:
name: db-credentials
spec:
refreshInterval: 1h
secretStoreRef:
name: vault-backend
kind: SecretStore
target:
name: db-credentials
data:
- secretKey: username
remoteRef:
key: secret/data/myapp/database
property: username
- secretKey: password
remoteRef:
key: secret/data/myapp/database
property: password
27.4 密钥轮换
零停机轮换策略
sequenceDiagram
participant Admin as 管理员/自动化
participant Vault as Vault/KMS
participant App as 应用服务
participant DB as 数据库
Note over Admin,DB: 阶段 1:添加新密钥
Admin->>Vault: 生成新密钥 Key-B
Vault-->>Admin: Key-B 已就绪
Admin->>App: 更新配置:接受 Key-A 和 Key-B
App-->>Admin: 配置已更新
Note over Admin,DB: 阶段 2:切换主密钥
Admin->>App: 设置 Key-B 为主密钥(用于加密/签名)
App->>DB: 新数据使用 Key-B 加密
App->>DB: 旧数据仍可用 Key-A 解密
Note over Admin,DB: 阶段 3:重新加密历史数据(可选)
Admin->>App: 触发批量重新加密
App->>DB: 用 Key-B 重新加密旧数据
Note over Admin,DB: 阶段 4:移除旧密钥
Admin->>App: 移除 Key-A
Admin->>Vault: 标记 Key-A 为已废弃
Vault-->>Admin: Key-A 已销毁
27.5 密钥管理方案对比
特性 |
HashiCorp Vault |
AWS KMS |
Azure Key Vault |
GCP Cloud KMS |
|---|---|---|---|---|
部署方式 |
自托管 / HCP Cloud |
全托管 |
全托管 |
全托管 |
动态密钥 |
✅ 数据库、云凭证、PKI |
❌ |
❌ |
❌ |
加密即服务 |
✅ Transit Engine |
✅ Encrypt/Decrypt API |
✅ Key Operations |
✅ Encrypt/Decrypt |
密钥轮换 |
✅ 自动 + 手动 |
✅ 年度自动 + 按需 |
✅ 自动 + 手动 |
✅ 自动 + 手动 |
HSM 支持 |
✅ Seal/Unseal |
✅ CloudHSM |
✅ HSM 级别 |
✅ Cloud HSM |
PKI/证书 |
✅ 内置 PKI Engine |
❌ (用 ACM) |
❌ (用 App Service Cert) |
❌ (用 Certificate Authority) |
多云支持 |
✅ 任意环境 |
⚠️ AWS 生态 |
⚠️ Azure 生态 |
⚠️ GCP 生态 |
审计日志 |
✅ 详细审计 |
✅ CloudTrail |
✅ Azure Monitor |
✅ Cloud Audit |
开源 |
✅ BSL 许可 |
❌ |
❌ |
❌ |
K8s 集成 |
✅ CSI Driver + Injector |
✅ Secrets Manager CSI |
✅ CSI Driver |
✅ Secret Manager |
价格 |
自托管免费 / HCP 按需 |
按密钥 + API 调用 |
按密钥 + 操作 |
按密钥 + 操作 |
适用场景 |
多云、混合云、复杂需求 |
AWS 原生应用 |
Azure 原生应用 |
GCP 原生应用 |
选型建议:
多云/混合云 → HashiCorp Vault
纯 AWS → AWS KMS + Secrets Manager
纯 Azure → Azure Key Vault
纯 GCP → GCP Cloud KMS + Secret Manager
需要动态凭证 → HashiCorp Vault(独有优势)
27.6 凭证泄露应急响应
flowchart TD
A["🚨 发现凭证泄露"] --> B{"泄露渠道?"}
B -->|"Git 仓库"| C["立即轮换泄露的凭证"]
B -->|"日志/监控"| C
B -->|"第三方通知"| C
B -->|"暗网/公开"| C
C --> D["撤销/禁用旧凭证"]
D --> E["审计访问日志"]
E --> F{"是否有异常访问?"}
F -->|"是"| G["启动安全事件响应"]
G --> H["评估影响范围"]
H --> I["通知受影响方"]
I --> J["取证与根因分析"]
F -->|"否"| K["记录事件"]
J --> L["修复根因"]
K --> L
L --> M["更新安全策略"]
M --> N["加强预防措施"]
N --> O["事后复盘"]
subgraph Prevention["预防措施"]
P1["Pre-commit Hook<br/>(gitleaks)"]
P2["CI/CD 扫描<br/>(TruffleHog)"]
P3["GitHub Secret Scanning"]
P4["定期密钥轮换"]
end
O --> Prevention
style A fill:#F44336,stroke:#333,color:#fff
style G fill:#FF9800,stroke:#333,color:#fff
style O fill:#4CAF50,stroke:#333,color:#fff
style Prevention fill:#E3F2FD,stroke:#2196F3
应急响应清单
步骤 |
操作 |
时间要求 |
负责人 |
|---|---|---|---|
1 |
确认泄露范围和凭证类型 |
< 15 分钟 |
安全团队 |
2 |
轮换/撤销泄露的凭证 |
< 30 分钟 |
运维团队 |
3 |
审计访问日志(CloudTrail/Vault Audit) |
< 1 小时 |
安全团队 |
4 |
评估数据泄露影响 |
< 2 小时 |
安全团队 + 业务 |
5 |
通知受影响用户(如需要) |
< 24 小时 |
法务 + 管理层 |
6 |
根因分析与修复 |
< 48 小时 |
开发 + 安全团队 |
7 |
事后复盘与策略更新 |
< 1 周 |
全团队 |
27.7 密钥泄露检测
# TruffleHog — 扫描 Git 历史中的密钥
trufflehog git https://github.com/org/repo --only-verified
# GitLeaks — 快速扫描
gitleaks detect --source . --verbose
# Pre-commit Hook 防止提交密钥
# .pre-commit-config.yaml
repos:
- repo: https://github.com/gitleaks/gitleaks
rev: v8.18.0
hooks:
- id: gitleaks
27.8 小结
HashiCorp Vault 是最全面的密钥管理工具,支持动态密钥、PKI、加密即服务
动态密钥(数据库凭证、云凭证)自动过期,大幅降低泄露风险
External Secrets Operator 将 Vault 密钥安全同步到 Kubernetes
密钥轮换 应该自动化,使用双密钥策略实现零停机
Envelope Encryption 结合 KMS 与本地加密,兼顾安全性与性能
Pre-commit Hook + CI 扫描 防止密钥泄露到代码仓库
凭证泄露应急响应 需要预案,第一时间轮换凭证、审计日志