# 第十三章:访问控制模型
> "授权的本质是回答一个问题:主体 S 能否对客体 O 执行操作 A?"
```{mermaid}
mindmap
root((访问控制模型))
DAC
文件权限
ACL
MAC
Bell-LaPadula
SELinux
RBAC
角色
层次
约束
ABAC
属性
策略
XACML
ReBAC
Zanzibar
关系元组
图遍历
```
## 13.1 访问控制基本概念
访问控制的核心要素:
```{mermaid}
flowchart LR
S["🧑 主体(Subject)
用户/服务"]
A["⚙️ 操作(Action)
读/写/删"]
O["📄 客体(Object)
文件/API"]
P["📋 策略(Policy)
允许/拒绝"]
S -->|发起| A
A -->|作用于| O
A -->|评估| P
P -->|决策| A
```
### DAC / MAC / RBAC / ABAC / ReBAC 对比关系
```{mermaid}
flowchart TB
AC["访问控制模型"]
DAC["DAC
自主访问控制
资源拥有者决定"]
MAC["MAC
强制访问控制
系统策略强制"]
RBAC["RBAC
基于角色
用户→角色→权限"]
ABAC["ABAC
基于属性
多维属性决策"]
ReBAC["ReBAC
基于关系
实体关系图"]
AC --> DAC
AC --> MAC
AC --> RBAC
AC --> ABAC
AC --> ReBAC
RBAC -->|"角色可作为属性"| ABAC
ABAC -->|"属性可描述关系"| ReBAC
RBAC -->|"角色可建模为关系"| ReBAC
style DAC fill:#fce4ec
style MAC fill:#e8eaf6
style RBAC fill:#e0f2f1
style ABAC fill:#fff3e0
style ReBAC fill:#f3e5f5
```
## 13.2 DAC — 自主访问控制
DAC(Discretionary Access Control)由资源拥有者决定谁可以访问。最典型的例子是 Unix 文件权限:
```bash
# Unix 文件权限
-rwxr-xr-- 1 alice developers 4096 Jan 1 00:00 report.txt
│├─┤├─┤├─┤
│ │ │ └── 其他用户:只读
│ │ └───── 组(developers):读+执行
│ └──────── 拥有者(alice):读+写+执行
└────────── 文件类型
# ACL(访问控制列表)— 更细粒度
setfacl -m u:bob:rw report.txt # 给 bob 读写权限
setfacl -m g:audit:r report.txt # 给 audit 组只读权限
getfacl report.txt # 查看 ACL
```
**优点**:简单直观、灵活
**缺点**:难以集中管理、容易权限蔓延
## 13.3 MAC — 强制访问控制
MAC(Mandatory Access Control)由系统强制执行安全策略,用户无法更改。
### Bell-LaPadula 模型(保密性)
```{mermaid}
flowchart TB
subgraph BLP["Bell-LaPadula 安全级别"]
direction TB
TS["🔴 绝密 Top Secret"]
S["🟠 机密 Secret"]
C["🟡 秘密 Confidential"]
U["🟢 公开 Unclassified"]
TS --- S --- C --- U
end
NRU["🚫 No Read Up
不能读高于自己级别的数据
(简单安全属性)"]
NWD["🚫 No Write Down
不能写低于自己级别的数据
(*属性)"]
BLP --> NRU
BLP --> NWD
style TS fill:#ffcdd2
style S fill:#ffe0b2
style C fill:#fff9c4
style U fill:#c8e6c9
```
### SELinux
```bash
# 查看 SELinux 上下文
ls -Z /var/www/html/index.html
# system_u:object_r:httpd_sys_content_t:s0 index.html
# SELinux 策略:httpd 进程只能访问 httpd_sys_content_t 类型的文件
# 即使 httpd 被攻破,也无法访问其他类型的文件
```
## 13.4 RBAC — 基于角色的访问控制
RBAC(Role-Based Access Control)通过角色间接授权:
### RBAC 模型 ER 图
```{mermaid}
erDiagram
USER ||--o{ USER_ROLE : "拥有"
ROLE ||--o{ USER_ROLE : "分配给"
ROLE ||--o{ ROLE_PERMISSION : "包含"
PERMISSION ||--o{ ROLE_PERMISSION : "授予"
ROLE ||--o| ROLE : "继承自(父角色)"
USER {
int id PK
string username
string email
datetime created_at
}
ROLE {
int id PK
string name
string description
int parent_role_id FK
}
PERMISSION {
int id PK
string resource
string action
string description
}
USER_ROLE {
int user_id FK
int role_id FK
datetime assigned_at
datetime expires_at
}
ROLE_PERMISSION {
int role_id FK
int permission_id FK
}
```
### RBAC 层次
```python
from enum import Enum
from dataclasses import dataclass, field
class Permission(Enum):
READ = "read"
WRITE = "write"
DELETE = "delete"
ADMIN = "admin"
@dataclass
class Role:
name: str
permissions: set[Permission]
parent: 'Role | None' = None # 层次 RBAC
def has_permission(self, perm: Permission) -> bool:
if perm in self.permissions:
return True
if self.parent:
return self.parent.has_permission(perm)
return False
# 角色层次:Admin > Editor > Viewer
viewer = Role("viewer", {Permission.READ})
editor = Role("editor", {Permission.WRITE}, parent=viewer)
admin = Role("admin", {Permission.DELETE, Permission.ADMIN}, parent=editor)
assert admin.has_permission(Permission.READ) # 继承自 viewer
assert admin.has_permission(Permission.WRITE) # 继承自 editor
assert admin.has_permission(Permission.DELETE) # 自身权限
assert not viewer.has_permission(Permission.WRITE) # 无此权限
```
### RBAC 的局限性
| 问题 | 描述 |
|------|------|
| 角色爆炸 | 细粒度需求导致角色数量指数增长 |
| 缺乏上下文 | 无法表达"只在工作时间访问" |
| 资源级控制弱 | 难以表达"只能访问自己的文档" |
| 动态权限难 | 权限变更需要修改角色定义 |
### Python: SQLAlchemy 模型 + FastAPI RBAC 中间件
```python
"""
RBAC 实现:SQLAlchemy 模型 + FastAPI 中间件
依赖: pip install fastapi sqlalchemy uvicorn python-jose[cryptography]
"""
from __future__ import annotations
import enum
from datetime import datetime, timezone
from typing import Optional
from fastapi import FastAPI, Depends, HTTPException, status, Request
from sqlalchemy import (
create_engine, Column, Integer, String, DateTime,
ForeignKey, Enum as SAEnum, Table
)
from sqlalchemy.orm import (
DeclarativeBase, Session, relationship, sessionmaker
)
# ── SQLAlchemy 模型 ──────────────────────────────────────────
class Base(DeclarativeBase):
pass
# 用户-角色 多对多关联表
user_role_table = Table(
"user_roles", Base.metadata,
Column("user_id", Integer, ForeignKey("users.id"), primary_key=True),
Column("role_id", Integer, ForeignKey("roles.id"), primary_key=True),
Column("assigned_at", DateTime, default=lambda: datetime.now(timezone.utc)),
)
# 角色-权限 多对多关联表
role_permission_table = Table(
"role_permissions", Base.metadata,
Column("role_id", Integer, ForeignKey("roles.id"), primary_key=True),
Column("permission_id", Integer, ForeignKey("permissions.id"), primary_key=True),
)
class ActionEnum(str, enum.Enum):
READ = "read"
WRITE = "write"
DELETE = "delete"
ADMIN = "admin"
class User(Base):
__tablename__ = "users"
id = Column(Integer, primary_key=True)
username = Column(String(64), unique=True, nullable=False)
email = Column(String(128), unique=True)
roles = relationship("Role", secondary=user_role_table, back_populates="users")
class Role(Base):
__tablename__ = "roles"
id = Column(Integer, primary_key=True)
name = Column(String(64), unique=True, nullable=False)
parent_id = Column(Integer, ForeignKey("roles.id"), nullable=True)
parent = relationship("Role", remote_side=[id])
users = relationship("User", secondary=user_role_table, back_populates="roles")
permissions = relationship(
"Permission", secondary=role_permission_table, back_populates="roles"
)
def all_permissions(self) -> set[str]:
"""递归收集本角色及所有父角色的权限"""
perms = {f"{p.resource}:{p.action}" for p in self.permissions}
if self.parent:
perms |= self.parent.all_permissions()
return perms
class Permission(Base):
__tablename__ = "permissions"
id = Column(Integer, primary_key=True)
resource = Column(String(64), nullable=False)
action = Column(SAEnum(ActionEnum), nullable=False)
roles = relationship("Role", secondary=role_permission_table, back_populates="permissions")
# ── 数据库初始化 ─────────────────────────────────────────────
engine = create_engine("sqlite:///rbac.db", echo=False)
Base.metadata.create_all(engine)
SessionLocal = sessionmaker(bind=engine)
def get_db():
db = SessionLocal()
try:
yield db
finally:
db.close()
# ── FastAPI 应用 + RBAC 中间件 ───────────────────────────────
app = FastAPI(title="RBAC Demo")
def get_current_user(request: Request, db: Session = Depends(get_db)) -> User:
"""从请求头中提取用户(简化版,实际应验证 JWT)"""
username = request.headers.get("X-User")
if not username:
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="缺少用户标识")
user = db.query(User).filter(User.username == username).first()
if not user:
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="用户不存在")
return user
def require_permission(resource: str, action: ActionEnum):
"""RBAC 权限检查装饰器工厂"""
def checker(user: User = Depends(get_current_user)):
required = f"{resource}:{action.value}"
# 收集用户所有角色的全部权限(含继承)
user_perms: set[str] = set()
for role in user.roles:
user_perms |= role.all_permissions()
if required not in user_perms:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail=f"缺少权限: {required}",
)
return user
return checker
@app.get("/api/documents")
def list_documents(user: User = Depends(require_permission("document", ActionEnum.READ))):
return {"message": f"{user.username} 可以读取文档列表"}
@app.delete("/api/documents/{doc_id}")
def delete_document(
doc_id: int,
user: User = Depends(require_permission("document", ActionEnum.DELETE)),
):
return {"message": f"{user.username} 删除了文档 {doc_id}"}
```
### Python: ABAC 策略引擎
```python
"""
ABAC 策略引擎:基于属性的决策函数
支持主体属性、资源属性、操作属性、环境属性四维决策
"""
from __future__ import annotations
import ipaddress
from dataclasses import dataclass, field
from datetime import datetime, time, timezone
from enum import Enum
from typing import Any, Callable
class Decision(Enum):
ALLOW = "allow"
DENY = "deny"
NOT_APPLICABLE = "not_applicable"
@dataclass
class Attributes:
"""通用属性容器"""
data: dict[str, Any] = field(default_factory=dict)
def get(self, key: str, default: Any = None) -> Any:
return self.data.get(key, default)
@dataclass
class AccessRequest:
"""访问请求,包含四维属性"""
subject: Attributes # 主体属性:角色、部门、安全等级等
resource: Attributes # 资源属性:类型、密级、拥有者等
action: Attributes # 操作属性:动作类型
environment: Attributes # 环境属性:时间、IP、设备等
# 条件函数类型:接收 AccessRequest,返回 bool
Condition = Callable[[AccessRequest], bool]
@dataclass
class Policy:
"""ABAC 策略"""
name: str
description: str
effect: Decision # ALLOW 或 DENY
conditions: list[Condition] = field(default_factory=list)
def evaluate(self, request: AccessRequest) -> Decision:
"""评估策略:所有条件都满足时返回 effect,否则返回 NOT_APPLICABLE"""
try:
if all(cond(request) for cond in self.conditions):
return self.effect
return Decision.NOT_APPLICABLE
except Exception:
# 条件评估出错时,安全地返回 NOT_APPLICABLE
return Decision.NOT_APPLICABLE
class ABACEngine:
"""ABAC 策略引擎,支持 deny-override 合并策略"""
def __init__(self) -> None:
self._policies: list[Policy] = []
def add_policy(self, policy: Policy) -> None:
self._policies.append(policy)
def evaluate(self, request: AccessRequest) -> Decision:
"""
评估所有策略(deny-override):
- 任何一条 DENY → 最终 DENY
- 至少一条 ALLOW 且无 DENY → 最终 ALLOW
- 全部 NOT_APPLICABLE → 最终 DENY(默认拒绝)
"""
results = [p.evaluate(request) for p in self._policies]
if Decision.DENY in results:
return Decision.DENY
if Decision.ALLOW in results:
return Decision.ALLOW
return Decision.DENY # 默认拒绝
# ── 常用条件构造器 ───────────────────────────────────────────
def subject_has_role(role: str) -> Condition:
return lambda req: role in req.subject.get("roles", [])
def resource_type_is(rtype: str) -> Condition:
return lambda req: req.resource.get("type") == rtype
def action_is(action: str) -> Condition:
return lambda req: req.action.get("name") == action
def during_business_hours() -> Condition:
def check(req: AccessRequest) -> bool:
now = req.environment.get("time", datetime.now(timezone.utc))
return time(9, 0) <= now.time() <= time(18, 0)
return check
def from_internal_network() -> Condition:
def check(req: AccessRequest) -> bool:
ip = req.environment.get("ip", "0.0.0.0")
return ipaddress.ip_address(ip) in ipaddress.ip_network("10.0.0.0/8")
return check
def resource_classification_at_most(level: int) -> Condition:
"""资源密级不超过指定级别(0=公开, 1=内部, 2=机密, 3=绝密)"""
return lambda req: req.resource.get("classification", 0) <= level
# ── 使用示例 ─────────────────────────────────────────────────
engine = ABACEngine()
# 策略1: 编辑者在工作时间从内网可以编辑内部文档
engine.add_policy(Policy(
name="editor-internal-docs",
description="编辑者在工作时间从内网可以编辑内部文档",
effect=Decision.ALLOW,
conditions=[
subject_has_role("editor"),
resource_type_is("document"),
action_is("edit"),
resource_classification_at_most(1), # 内部及以下
during_business_hours(),
from_internal_network(),
],
))
# 策略2: 禁止任何人删除绝密文档
engine.add_policy(Policy(
name="deny-delete-top-secret",
description="禁止删除绝密文档",
effect=Decision.DENY,
conditions=[
resource_type_is("document"),
action_is("delete"),
lambda req: req.resource.get("classification", 0) >= 3,
],
))
# 测试
request = AccessRequest(
subject=Attributes({"roles": ["editor"], "department": "engineering"}),
resource=Attributes({"type": "document", "classification": 1}),
action=Attributes({"name": "edit"}),
environment=Attributes({
"time": datetime(2025, 6, 15, 14, 30, tzinfo=timezone.utc),
"ip": "10.1.2.100",
}),
)
result = engine.evaluate(request)
print(f"决策: {result.value}") # 输出: 决策: allow
```
### Python: Casbin 集成
```python
"""
Casbin Python 集成示例
依赖: pip install casbin
"""
import casbin
import os
import tempfile
def create_casbin_example():
"""创建 Casbin RBAC 示例"""
# ── 模型定义(PERM 元模型)──────────────────────────────
model_text = """
[request_definition]
r = sub, obj, act
[policy_definition]
p = sub, obj, act
[role_definition]
g = _, _
[policy_effect]
e = some(where (p.eft == allow))
[matchers]
m = g(r.sub, p.sub) && r.obj == p.obj && r.act == p.act
"""
# ── 策略定义 ─────────────────────────────────────────────
policy_text = """
p, admin, document, read
p, admin, document, write
p, admin, document, delete
p, editor, document, read
p, editor, document, write
p, viewer, document, read
g, alice, admin
g, bob, editor
g, carol, viewer
"""
# 写入临时文件
model_path = os.path.join(tempfile.gettempdir(), "model.conf")
policy_path = os.path.join(tempfile.gettempdir(), "policy.csv")
with open(model_path, "w") as f:
f.write(model_text)
with open(policy_path, "w") as f:
f.write(policy_text)
# 创建 enforcer
enforcer = casbin.Enforcer(model_path, policy_path)
# ── 权限检查 ─────────────────────────────────────────────
test_cases = [
("alice", "document", "delete", True), # admin 可以删除
("bob", "document", "write", True), # editor 可以写
("bob", "document", "delete", False), # editor 不能删除
("carol", "document", "read", True), # viewer 可以读
("carol", "document", "write", False), # viewer 不能写
]
for sub, obj, act, expected in test_cases:
result = enforcer.enforce(sub, obj, act)
status = "✅" if result == expected else "❌"
print(f"{status} {sub} -> {obj}:{act} = {result}")
# ── 动态添加策略 ─────────────────────────────────────────
enforcer.add_policy("editor", "config", "read")
enforcer.add_grouping_policy("dave", "editor")
print(f"\n动态添加后: dave -> config:read = {enforcer.enforce('dave', 'config', 'read')}")
# 清理
os.unlink(model_path)
os.unlink(policy_path)
if __name__ == "__main__":
create_casbin_example()
```
### Java: Spring Security RBAC
```java
/**
* Spring Security RBAC 实现
* 使用 @PreAuthorize + 自定义 PermissionEvaluator
*
* 依赖 (build.gradle):
* implementation 'org.springframework.boot:spring-boot-starter-security'
* implementation 'org.springframework.boot:spring-boot-starter-web'
* implementation 'org.springframework.boot:spring-boot-starter-data-jpa'
*/
package com.example.rbac;
import org.springframework.security.access.PermissionEvaluator;
import org.springframework.security.access.prepost.PreAuthorize;
import org.springframework.security.config.annotation.method.configuration.EnableMethodSecurity;
import org.springframework.security.core.Authentication;
import org.springframework.security.core.GrantedAuthority;
import org.springframework.stereotype.Component;
import org.springframework.stereotype.Service;
import org.springframework.web.bind.annotation.*;
import jakarta.persistence.*;
import java.io.Serializable;
import java.util.*;
// ── JPA 实体 ────────────────────────────────────────────────
@Entity
@Table(name = "users")
class AppUser {
@Id @GeneratedValue(strategy = GenerationType.IDENTITY)
private Long id;
@Column(unique = true, nullable = false)
private String username;
@ManyToMany(fetch = FetchType.EAGER)
@JoinTable(
name = "user_roles",
joinColumns = @JoinColumn(name = "user_id"),
inverseJoinColumns = @JoinColumn(name = "role_id")
)
private Set roles = new HashSet<>();
// getters/setters 省略
public Long getId() { return id; }
public String getUsername() { return username; }
public Set getRoles() { return roles; }
}
@Entity
@Table(name = "roles")
class AppRole {
@Id @GeneratedValue(strategy = GenerationType.IDENTITY)
private Long id;
@Column(unique = true, nullable = false)
private String name;
@ManyToMany(fetch = FetchType.EAGER)
@JoinTable(
name = "role_permissions",
joinColumns = @JoinColumn(name = "role_id"),
inverseJoinColumns = @JoinColumn(name = "permission_id")
)
private Set permissions = new HashSet<>();
@ManyToOne
@JoinColumn(name = "parent_id")
private AppRole parent;
/** 递归收集所有权限(含父角色继承) */
public Set allPermissions() {
Set perms = new HashSet<>();
for (AppPermission p : permissions) {
perms.add(p.getResource() + ":" + p.getAction());
}
if (parent != null) {
perms.addAll(parent.allPermissions());
}
return perms;
}
public String getName() { return name; }
public Set getPermissions() { return permissions; }
}
@Entity
@Table(name = "permissions")
class AppPermission {
@Id @GeneratedValue(strategy = GenerationType.IDENTITY)
private Long id;
@Column(nullable = false)
private String resource;
@Column(nullable = false)
private String action;
public String getResource() { return resource; }
public String getAction() { return action; }
}
// ── 自定义 PermissionEvaluator ──────────────────────────────
@Component
class RbacPermissionEvaluator implements PermissionEvaluator {
/**
* 检查用户是否拥有指定权限
* @param authentication 当前认证信息
* @param targetDomainObject 资源标识(如 "document")
* @param permission 操作(如 "read")
*/
@Override
public boolean hasPermission(
Authentication authentication,
Object targetDomainObject,
Object permission) {
if (authentication == null || !authentication.isAuthenticated()) {
return false;
}
String required = targetDomainObject + ":" + permission;
// 从 GrantedAuthority 中检查权限
return authentication.getAuthorities().stream()
.map(GrantedAuthority::getAuthority)
.anyMatch(auth -> auth.equals(required) || auth.equals("ROLE_ADMIN"));
}
@Override
public boolean hasPermission(
Authentication authentication,
Serializable targetId,
String targetType,
Object permission) {
// 支持基于 ID 的权限检查(可扩展为资源级别控制)
return hasPermission(authentication, targetType, permission);
}
}
// ── 业务服务层 ──────────────────────────────────────────────
@Service
class DocumentService {
@PreAuthorize("hasPermission('document', 'read')")
public List listDocuments() {
return List.of("doc-1", "doc-2", "doc-3");
}
@PreAuthorize("hasPermission('document', 'write')")
public String createDocument(String title) {
return "Created: " + title;
}
@PreAuthorize("hasPermission('document', 'delete')")
public String deleteDocument(Long id) {
return "Deleted: " + id;
}
}
// ── REST 控制器 ─────────────────────────────────────────────
@RestController
@RequestMapping("/api/documents")
@EnableMethodSecurity
class DocumentController {
private final DocumentService documentService;
DocumentController(DocumentService documentService) {
this.documentService = documentService;
}
@GetMapping
public List list() {
return documentService.listDocuments();
}
@PostMapping
public String create(@RequestParam String title) {
return documentService.createDocument(title);
}
@DeleteMapping("/{id}")
public String delete(@PathVariable Long id) {
return documentService.deleteDocument(id);
}
}
```
### Java: ABAC 实现(SpEL 表达式)
```java
/**
* Spring Security ABAC 实现
* 使用 SpEL 表达式进行基于属性的访问控制
*/
package com.example.abac;
import org.springframework.security.access.prepost.PreAuthorize;
import org.springframework.security.core.Authentication;
import org.springframework.stereotype.Component;
import org.springframework.stereotype.Service;
import org.springframework.web.bind.annotation.*;
import jakarta.servlet.http.HttpServletRequest;
import java.time.LocalTime;
import java.util.*;
// ── 属性上下文 ──────────────────────────────────────────────
/** 安全属性上下文,供 SpEL 表达式引用 */
@Component("abac")
class AbacContext {
/** 检查当前是否在工作时间 */
public boolean isDuringBusinessHours() {
LocalTime now = LocalTime.now();
return now.isAfter(LocalTime.of(9, 0)) && now.isBefore(LocalTime.of(18, 0));
}
/** 检查请求 IP 是否来自内网 */
public boolean isFromInternalNetwork(HttpServletRequest request) {
String ip = request.getRemoteAddr();
return ip != null && (ip.startsWith("10.") || ip.startsWith("192.168.")
|| ip.equals("127.0.0.1") || ip.equals("0:0:0:0:0:0:0:1"));
}
/** 检查用户部门是否匹配 */
public boolean isInDepartment(Authentication auth, String department) {
// 实际项目中从 JWT claims 或用户属性中获取
Object details = auth.getDetails();
if (details instanceof Map) {
return department.equals(((Map, ?>) details).get("department"));
}
return false;
}
/** 检查资源密级是否在用户可访问范围内 */
public boolean canAccessClassification(Authentication auth, int classification) {
// 从用户属性中获取安全等级
Object details = auth.getDetails();
if (details instanceof Map) {
Integer userLevel = (Integer) ((Map, ?>) details).get("clearance_level");
return userLevel != null && userLevel >= classification;
}
return false;
}
}
// ── 使用 SpEL 表达式的服务 ──────────────────────────────────
@Service
class SecureDocumentService {
/**
* 仅在工作时间、内网环境下,拥有 editor 角色的用户可以编辑
* SpEL 表达式引用了 @abac bean 的方法
*/
@PreAuthorize(
"hasRole('EDITOR') " +
"and @abac.isDuringBusinessHours() " +
"and @abac.isFromInternalNetwork(#request)"
)
public String editDocument(Long docId, String content, HttpServletRequest request) {
return "Document " + docId + " updated";
}
/**
* 仅 engineering 部门的用户可以访问技术文档
*/
@PreAuthorize(
"@abac.isInDepartment(authentication, 'engineering') " +
"and @abac.canAccessClassification(authentication, #classification)"
)
public String readTechnicalDoc(Long docId, int classification) {
return "Technical document " + docId;
}
}
```
### Java: Apache Shiro 示例
```java
/**
* Apache Shiro RBAC 示例
*
* 依赖 (build.gradle):
* implementation 'org.apache.shiro:shiro-core:2.0.1'
*/
package com.example.shiro;
import org.apache.shiro.SecurityUtils;
import org.apache.shiro.authc.*;
import org.apache.shiro.authz.*;
import org.apache.shiro.mgt.DefaultSecurityManager;
import org.apache.shiro.realm.SimpleAccountRealm;
import org.apache.shiro.subject.Subject;
public class ShiroRbacExample {
public static void main(String[] args) {
// ── 配置 Realm(用户/角色/权限存储)────────────────
SimpleAccountRealm realm = new SimpleAccountRealm();
// 添加用户及其角色
realm.addAccount("alice", "password123", "admin", "editor");
realm.addAccount("bob", "password456", "editor");
realm.addAccount("carol", "password789", "viewer");
// 添加角色权限
realm.addRole("admin");
realm.addRole("editor");
realm.addRole("viewer");
// ── 初始化 SecurityManager ──────────────────────────
DefaultSecurityManager securityManager = new DefaultSecurityManager(realm);
SecurityUtils.setSecurityManager(securityManager);
// ── 认证 + 授权 ────────────────────────────────────
Subject subject = SecurityUtils.getSubject();
try {
// 登录
subject.login(new UsernamePasswordToken("alice", "password123"));
System.out.println("✅ alice 登录成功");
// 角色检查
System.out.println("是 admin? " + subject.hasRole("admin")); // true
System.out.println("是 viewer? " + subject.hasRole("viewer")); // false
// 权限检查(Shiro 通配符权限)
// 需要自定义 Realm 来支持细粒度权限,这里演示角色检查
subject.checkRole("admin"); // 通过
System.out.println("✅ admin 角色检查通过");
try {
subject.checkRole("superadmin"); // 失败
} catch (AuthorizationException e) {
System.out.println("❌ superadmin 角色检查失败: " + e.getMessage());
}
// 登出
subject.logout();
System.out.println("✅ alice 已登出");
} catch (AuthenticationException e) {
System.err.println("认证失败: " + e.getMessage());
}
}
}
```
### Go: Casbin RBAC/ABAC 完整示例
```go
/*
* Casbin Go RBAC/ABAC 完整示例
* 依赖: go get github.com/casbin/casbin/v2
*/
package main
import (
"fmt"
"log"
"os"
"path/filepath"
"github.com/casbin/casbin/v2"
)
// ── RBAC 示例 ───────────────────────────────────────────────
func rbacExample() {
dir, _ := os.MkdirTemp("", "casbin")
defer os.RemoveAll(dir)
// RBAC 模型
modelPath := filepath.Join(dir, "rbac_model.conf")
os.WriteFile(modelPath, []byte(`
[request_definition]
r = sub, obj, act
[policy_definition]
p = sub, obj, act
[role_definition]
g = _, _
[policy_effect]
e = some(where (p.eft == allow))
[matchers]
m = g(r.sub, p.sub) && r.obj == p.obj && r.act == p.act
`), 0644)
// 策略文件
policyPath := filepath.Join(dir, "rbac_policy.csv")
os.WriteFile(policyPath, []byte(`
p, admin, document, read
p, admin, document, write
p, admin, document, delete
p, editor, document, read
p, editor, document, write
p, viewer, document, read
g, alice, admin
g, bob, editor
g, carol, viewer
`), 0644)
e, err := casbin.NewEnforcer(modelPath, policyPath)
if err != nil {
log.Fatalf("创建 enforcer 失败: %v", err)
}
// 权限检查
tests := []struct {
sub, obj, act string
expected bool
}{
{"alice", "document", "delete", true},
{"bob", "document", "write", true},
{"bob", "document", "delete", false},
{"carol", "document", "read", true},
{"carol", "document", "write", false},
}
fmt.Println("=== RBAC 示例 ===")
for _, t := range tests {
ok, err := e.Enforce(t.sub, t.obj, t.act)
if err != nil {
log.Printf("检查失败: %v", err)
continue
}
status := "✅"
if ok != t.expected {
status = "❌"
}
fmt.Printf("%s %s -> %s:%s = %v\n", status, t.sub, t.obj, t.act, ok)
}
// 动态添加策略
_, _ = e.AddPolicy("editor", "config", "read")
_, _ = e.AddGroupingPolicy("dave", "editor")
ok, _ := e.Enforce("dave", "config", "read")
fmt.Printf("\n动态添加后: dave -> config:read = %v\n", ok)
}
// ── ABAC 示例 ───────────────────────────────────────────────
// ABACRequest 表示 ABAC 请求的属性
type ABACRequest struct {
Age int
Department string
}
func abacExample() {
dir, _ := os.MkdirTemp("", "casbin-abac")
defer os.RemoveAll(dir)
// ABAC 模型(使用 eval() 函数评估属性表达式)
modelPath := filepath.Join(dir, "abac_model.conf")
os.WriteFile(modelPath, []byte(`
[request_definition]
r = sub, obj, act
[policy_definition]
p = sub_rule, obj, act
[policy_effect]
e = some(where (p.eft == allow))
[matchers]
m = eval(p.sub_rule) && r.obj == p.obj && r.act == p.act
`), 0644)
policyPath := filepath.Join(dir, "abac_policy.csv")
os.WriteFile(policyPath, []byte(`
p, r.sub.Age > 18 && r.sub.Department == "engineering", document, read
p, r.sub.Department == "engineering", code, write
`), 0644)
e, err := casbin.NewEnforcer(modelPath, policyPath)
if err != nil {
log.Fatalf("创建 ABAC enforcer 失败: %v", err)
}
fmt.Println("\n=== ABAC 示例 ===")
// 满足条件的请求
user1 := ABACRequest{Age: 25, Department: "engineering"}
ok, _ := e.Enforce(user1, "document", "read")
fmt.Printf("25岁工程师读文档: %v\n", ok) // true
// 不满足年龄条件
user2 := ABACRequest{Age: 16, Department: "engineering"}
ok, _ = e.Enforce(user2, "document", "read")
fmt.Printf("16岁工程师读文档: %v\n", ok) // false
// 不满足部门条件
user3 := ABACRequest{Age: 30, Department: "marketing"}
ok, _ = e.Enforce(user3, "document", "read")
fmt.Printf("30岁市场部读文档: %v\n", ok) // false
}
func main() {
rbacExample()
abacExample()
}
```
### Go: 自定义 RBAC 中间件(Gin)
```go
/*
* Gin 框架自定义 RBAC 中间件
* 依赖:
* go get github.com/gin-gonic/gin
* go get github.com/patrickmn/go-cache
*/
package main
import (
"fmt"
"net/http"
"sync"
"time"
"github.com/gin-gonic/gin"
"github.com/patrickmn/go-cache"
)
// ── 数据模型 ────────────────────────────────────────────────
// Permission 表示一个权限
type Permission struct {
Resource string `json:"resource"`
Action string `json:"action"`
}
func (p Permission) String() string {
return p.Resource + ":" + p.Action
}
// Role 表示一个角色
type Role struct {
Name string `json:"name"`
Permissions []Permission `json:"permissions"`
ParentName string `json:"parent_name,omitempty"` // 父角色名称
}
// User 表示一个用户
type User struct {
Username string `json:"username"`
Roles []string `json:"roles"`
}
// ── RBAC 管理器(带缓存)────────────────────────────────────
// RBACManager 管理角色和权限,带权限缓存
type RBACManager struct {
mu sync.RWMutex
roles map[string]*Role
users map[string]*User
cache *cache.Cache // 权限缓存
}
// NewRBACManager 创建新的 RBAC 管理器
func NewRBACManager() *RBACManager {
return &RBACManager{
roles: make(map[string]*Role),
users: make(map[string]*User),
// 缓存过期时间 5 分钟,清理间隔 10 分钟
cache: cache.New(5*time.Minute, 10*time.Minute),
}
}
// AddRole 添加角色
func (m *RBACManager) AddRole(role *Role) {
m.mu.Lock()
defer m.mu.Unlock()
m.roles[role.Name] = role
m.cache.Flush() // 角色变更时清空缓存
}
// AddUser 添加用户
func (m *RBACManager) AddUser(user *User) {
m.mu.Lock()
defer m.mu.Unlock()
m.users[user.Username] = user
m.cache.Flush()
}
// GetUser 获取用户
func (m *RBACManager) GetUser(username string) (*User, bool) {
m.mu.RLock()
defer m.mu.RUnlock()
u, ok := m.users[username]
return u, ok
}
// collectPermissions 递归收集角色的所有权限(含继承)
func (m *RBACManager) collectPermissions(roleName string, visited map[string]bool) map[string]bool {
if visited[roleName] {
return nil // 防止循环继承
}
visited[roleName] = true
perms := make(map[string]bool)
role, ok := m.roles[roleName]
if !ok {
return perms
}
for _, p := range role.Permissions {
perms[p.String()] = true
}
// 递归收集父角色权限
if role.ParentName != "" {
for k, v := range m.collectPermissions(role.ParentName, visited) {
perms[k] = v
}
}
return perms
}
// HasPermission 检查用户是否拥有指定权限(带缓存)
func (m *RBACManager) HasPermission(username, resource, action string) bool {
cacheKey := fmt.Sprintf("%s:%s:%s", username, resource, action)
// 先查缓存
if cached, found := m.cache.Get(cacheKey); found {
return cached.(bool)
}
m.mu.RLock()
defer m.mu.RUnlock()
user, ok := m.users[username]
if !ok {
m.cache.Set(cacheKey, false, cache.DefaultExpiration)
return false
}
required := resource + ":" + action
for _, roleName := range user.Roles {
perms := m.collectPermissions(roleName, make(map[string]bool))
if perms[required] {
m.cache.Set(cacheKey, true, cache.DefaultExpiration)
return true
}
}
m.cache.Set(cacheKey, false, cache.DefaultExpiration)
return false
}
// ── Gin 中间件 ──────────────────────────────────────────────
// RequirePermission 返回 Gin 中间件,检查指定权限
func RequirePermission(manager *RBACManager, resource, action string) gin.HandlerFunc {
return func(c *gin.Context) {
// 从请求头获取用户名(实际项目中应从 JWT 中提取)
username := c.GetHeader("X-User")
if username == "" {
c.AbortWithStatusJSON(http.StatusUnauthorized, gin.H{
"error": "缺少用户标识",
})
return
}
// 检查用户是否存在
if _, ok := manager.GetUser(username); !ok {
c.AbortWithStatusJSON(http.StatusUnauthorized, gin.H{
"error": "用户不存在",
})
return
}
// 检查权限
if !manager.HasPermission(username, resource, action) {
c.AbortWithStatusJSON(http.StatusForbidden, gin.H{
"error": "权限不足",
"required": resource + ":" + action,
})
return
}
c.Set("username", username)
c.Next()
}
}
// ── 主函数 ──────────────────────────────────────────────────
func main() {
manager := NewRBACManager()
// 配置角色层次: admin > editor > viewer
manager.AddRole(&Role{
Name: "viewer",
Permissions: []Permission{{Resource: "document", Action: "read"}},
})
manager.AddRole(&Role{
Name: "editor",
Permissions: []Permission{{Resource: "document", Action: "write"}},
ParentName: "viewer",
})
manager.AddRole(&Role{
Name: "admin",
Permissions: []Permission{
{Resource: "document", Action: "delete"},
{Resource: "config", Action: "write"},
},
ParentName: "editor",
})
// 配置用户
manager.AddUser(&User{Username: "alice", Roles: []string{"admin"}})
manager.AddUser(&User{Username: "bob", Roles: []string{"editor"}})
manager.AddUser(&User{Username: "carol", Roles: []string{"viewer"}})
// 创建 Gin 路由
r := gin.Default()
api := r.Group("/api")
{
api.GET("/documents",
RequirePermission(manager, "document", "read"),
func(c *gin.Context) {
c.JSON(http.StatusOK, gin.H{
"user": c.GetString("username"),
"documents": []string{"doc-1", "doc-2"},
})
},
)
api.POST("/documents",
RequirePermission(manager, "document", "write"),
func(c *gin.Context) {
c.JSON(http.StatusCreated, gin.H{
"user": c.GetString("username"),
"message": "文档已创建",
})
},
)
api.DELETE("/documents/:id",
RequirePermission(manager, "document", "delete"),
func(c *gin.Context) {
c.JSON(http.StatusOK, gin.H{
"user": c.GetString("username"),
"message": "文档 " + c.Param("id") + " 已删除",
})
},
)
}
fmt.Println("RBAC 服务启动于 :8080")
r.Run(":8080")
}
```
## 13.5 ABAC — 基于属性的访问控制
ABAC 基于属性做出授权决策,更加灵活:
### ABAC 策略评估流程
```{mermaid}
flowchart TB
REQ["📨 访问请求"]
subgraph ATTRS["属性收集"]
SA["🧑 主体属性
角色=editor
部门=engineering"]
RA["📄 资源属性
类型=document
密级=internal"]
AA["⚙️ 操作属性
动作=edit"]
EA["🌍 环境属性
时间=工作日
IP=内网"]
end
PDP["🧠 策略决策点 (PDP)
评估所有匹配策略"]
PE["📋 策略集
Policy Store"]
subgraph COMBINE["策略合并算法"]
DO["deny-override
任一 DENY → 拒绝"]
PO["permit-override
任一 ALLOW → 允许"]
FA["first-applicable
第一条匹配的策略"]
end
DECISION{{"决策结果"}}
ALLOW["✅ 允许"]
DENY["❌ 拒绝"]
REQ --> ATTRS
ATTRS --> PDP
PE --> PDP
PDP --> COMBINE
COMBINE --> DECISION
DECISION -->|"ALLOW"| ALLOW
DECISION -->|"DENY"| DENY
```
### RBAC vs ABAC 对比
| 维度 | RBAC | ABAC |
|------|------|------|
| 授权依据 | 角色 | 属性(多维度) |
| 粒度 | 粗粒度 | 细粒度 |
| 灵活性 | 低 | 高 |
| 复杂度 | 低 | 高 |
| 上下文感知 | ❌ | ✅ |
| 动态策略 | 困难 | 容易 |
| 审计 | 简单 | 复杂 |
| 适用规模 | 中小型 | 大型 |
## 13.6 ReBAC — 基于关系的访问控制
ReBAC(Relationship-Based Access Control)基于实体之间的关系做出授权决策。Google 在 2019 年发表的 Zanzibar 论文奠定了 ReBAC 的基础。
### Google Zanzibar
Zanzibar 是 Google 内部的全球授权系统,服务于 Google Drive、YouTube、Cloud 等产品。
核心概念 — **关系元组(Relation Tuple)**:
```
格式:object#relation@subject
示例:
document:budget#reader@user:alice
→ alice 是 budget 文档的 reader
document:budget#reader@group:finance#member
→ finance 组的所有 member 都是 budget 文档的 reader
folder:engineering#viewer@folder:root#viewer
→ root 文件夹的 viewer 也是 engineering 文件夹的 viewer(继承)
```
### ReBAC 关系图示例
```{mermaid}
flowchart LR
alice["👤 user:alice"]
bob["👤 user:bob"]
eng["🏢 team:engineering"]
acme["🏛️ org:acme"]
shared["📁 folder:shared"]
budget["📄 document:budget"]
roadmap["📄 document:roadmap"]
alice -->|"reader"| budget
alice -->|"member"| eng
bob -->|"member"| eng
eng -->|"parent"| acme
acme -->|"viewer"| shared
shared -->|"parent"| roadmap
style alice fill:#e3f2fd
style bob fill:#e3f2fd
style eng fill:#e8f5e9
style acme fill:#fff3e0
style shared fill:#fce4ec
style budget fill:#f3e5f5
style roadmap fill:#f3e5f5
```
> **关系链推导**:Alice → member → engineering → parent → acme → viewer → shared → parent → roadmap,因此 Alice 可以查看 roadmap。
### 为什么 ReBAC 是未来趋势
```
RBAC 的问题:
"Alice 能编辑文档吗?" → 只能回答"能"或"不能"
无法区分"Alice 能编辑哪些文档"
ReBAC 的优势:
"Alice 能编辑文档 X 吗?" → 检查 Alice 与文档 X 的关系
"Alice 能编辑哪些文档?" → 遍历 Alice 的所有关系
```
## 13.7 访问控制模型对比
| 模型 | 授权依据 | 粒度 | 复杂度 | 适用场景 |
|------|---------|------|--------|---------|
| DAC | 资源拥有者决定 | 中 | 低 | 文件系统 |
| MAC | 安全标签 | 高 | 高 | 军事/政府 |
| RBAC | 角色 | 粗 | 低 | 企业应用 |
| ABAC | 属性 | 细 | 高 | 复杂策略 |
| ReBAC | 关系 | 细 | 中 | 协作应用、SaaS |
### 各模型优缺点对比表
| 模型 | 优点 | 缺点 | 典型实现 |
|------|------|------|---------|
| **DAC** | 简单直观;资源拥有者自主管理;实现成本低 | 难以集中管理;权限蔓延风险;无法防止信息泄露 | Unix 文件权限、Windows ACL |
| **MAC** | 安全性最高;防止信息泄露;系统强制执行 | 灵活性最低;管理成本高;用户体验差 | SELinux、AppArmor、MLS |
| **RBAC** | 易于理解和管理;审计简单;广泛支持 | 角色爆炸;缺乏上下文感知;资源级控制弱 | Spring Security、Keycloak、AWS IAM |
| **ABAC** | 细粒度控制;上下文感知;动态策略 | 策略管理复杂;性能开销;调试困难 | OPA/Rego、AWS Cedar、XACML |
| **ReBAC** | 直观的关系建模;细粒度资源级控制;高性能 | 关系图维护成本;学习曲线;生态尚不成熟 | OpenFGA、SpiceDB、Ory Keto |
### 选型决策树
```{mermaid}
flowchart TD
START{"你的授权需求是什么?"}
Q1{"需要上下文感知?
(时间/位置/设备)"}
Q2{"需要资源级别的
细粒度控制?"}
Q3{"资源之间有
层次/继承关系?"}
Q4{"基于属性的条件?"}
RBAC["✅ RBAC
大多数应用的起点
推荐: Spring Security / Casbin"]
ABAC["✅ ABAC
推荐: OPA / Cedar"]
ReBAC["✅ ReBAC
推荐: OpenFGA / SpiceDB"]
ABAC_ReBAC["✅ ABAC + ReBAC 组合
OPA + OpenFGA"]
COMBO["✅ ReBAC + ABAC 组合
OpenFGA + OPA"]
START -->|"简单的角色权限"| RBAC
START -->|"复杂需求"| Q1
Q1 -->|"是"| ABAC
Q1 -->|"否"| Q2
Q2 -->|"否"| RBAC
Q2 -->|"是"| Q3
Q3 -->|"是"| ReBAC
Q3 -->|"否"| Q4
Q4 -->|"是"| ABAC_ReBAC
Q4 -->|"否"| ReBAC
START -->|"以上都需要"| COMBO
style RBAC fill:#e0f2f1
style ABAC fill:#fff3e0
style ReBAC fill:#f3e5f5
style ABAC_ReBAC fill:#fce4ec
style COMBO fill:#e8eaf6
```
## 13.8 小结
- **DAC** 简单但难以集中管理,适合文件系统
- **MAC** 安全性最高但灵活性最低,适合军事/政府
- **RBAC** 是最普及的模型,但存在角色爆炸和粒度不足的问题
- **ABAC** 通过多维属性实现细粒度控制,但策略管理复杂
- **ReBAC** 基于 Google Zanzibar,通过关系图实现直观的细粒度授权
- 实际项目中,**RBAC + ReBAC** 或 **RBAC + ABAC** 的组合是最常见的选择