第十三章:访问控制模型

“授权的本质是回答一个问题:主体 S 能否对客体 O 执行操作 A?”

        mindmap
  root((访问控制模型))
    DAC
      文件权限
      ACL
    MAC
      Bell-LaPadula
      SELinux
    RBAC
      角色
      层次
      约束
    ABAC
      属性
      策略
      XACML
    ReBAC
      Zanzibar
      关系元组
      图遍历
    

13.1 访问控制基本概念

访问控制的核心要素:

        flowchart LR
    S["🧑 主体(Subject)<br/>用户/服务"]
    A["⚙️ 操作(Action)<br/>读/写/删"]
    O["📄 客体(Object)<br/>文件/API"]
    P["📋 策略(Policy)<br/>允许/拒绝"]

    S -->|发起| A
    A -->|作用于| O
    A -->|评估| P
    P -->|决策| A
    

DAC / MAC / RBAC / ABAC / ReBAC 对比关系

        flowchart TB
    AC["访问控制模型"]

    DAC["DAC<br/>自主访问控制<br/>资源拥有者决定"]
    MAC["MAC<br/>强制访问控制<br/>系统策略强制"]
    RBAC["RBAC<br/>基于角色<br/>用户→角色→权限"]
    ABAC["ABAC<br/>基于属性<br/>多维属性决策"]
    ReBAC["ReBAC<br/>基于关系<br/>实体关系图"]

    AC --> DAC
    AC --> MAC
    AC --> RBAC
    AC --> ABAC
    AC --> ReBAC

    RBAC -->|"角色可作为属性"| ABAC
    ABAC -->|"属性可描述关系"| ReBAC
    RBAC -->|"角色可建模为关系"| ReBAC

    style DAC fill:#fce4ec
    style MAC fill:#e8eaf6
    style RBAC fill:#e0f2f1
    style ABAC fill:#fff3e0
    style ReBAC fill:#f3e5f5
    

13.2 DAC — 自主访问控制

DAC(Discretionary Access Control)由资源拥有者决定谁可以访问。最典型的例子是 Unix 文件权限:

# Unix 文件权限
-rwxr-xr-- 1 alice developers 4096 Jan 1 00:00 report.txt
│├─┤├─┤├─┤
│     └── 其他用户:只读
│   └───── (developers):读+执行
│ └──────── 拥有者(alice):读+写+执行
└────────── 文件类型

# ACL(访问控制列表)— 更细粒度
setfacl -m u:bob:rw report.txt    # 给 bob 读写权限
setfacl -m g:audit:r report.txt   # 给 audit 组只读权限
getfacl report.txt                 # 查看 ACL

优点:简单直观、灵活 缺点:难以集中管理、容易权限蔓延

13.3 MAC — 强制访问控制

MAC(Mandatory Access Control)由系统强制执行安全策略,用户无法更改。

Bell-LaPadula 模型(保密性)

        flowchart TB
    subgraph BLP["Bell-LaPadula 安全级别"]
        direction TB
        TS["🔴 绝密 Top Secret"]
        S["🟠 机密 Secret"]
        C["🟡 秘密 Confidential"]
        U["🟢 公开 Unclassified"]
        TS --- S --- C --- U
    end

    NRU["🚫 No Read Up<br/>不能读高于自己级别的数据<br/>(简单安全属性)"]
    NWD["🚫 No Write Down<br/>不能写低于自己级别的数据<br/>(*属性)"]

    BLP --> NRU
    BLP --> NWD

    style TS fill:#ffcdd2
    style S fill:#ffe0b2
    style C fill:#fff9c4
    style U fill:#c8e6c9
    

SELinux

# 查看 SELinux 上下文
ls -Z /var/www/html/index.html
# system_u:object_r:httpd_sys_content_t:s0 index.html

# SELinux 策略:httpd 进程只能访问 httpd_sys_content_t 类型的文件
# 即使 httpd 被攻破,也无法访问其他类型的文件

13.4 RBAC — 基于角色的访问控制

RBAC(Role-Based Access Control)通过角色间接授权:

RBAC 模型 ER 图

        erDiagram
    USER ||--o{ USER_ROLE : "拥有"
    ROLE ||--o{ USER_ROLE : "分配给"
    ROLE ||--o{ ROLE_PERMISSION : "包含"
    PERMISSION ||--o{ ROLE_PERMISSION : "授予"
    ROLE ||--o| ROLE : "继承自(父角色)"

    USER {
        int id PK
        string username
        string email
        datetime created_at
    }
    ROLE {
        int id PK
        string name
        string description
        int parent_role_id FK
    }
    PERMISSION {
        int id PK
        string resource
        string action
        string description
    }
    USER_ROLE {
        int user_id FK
        int role_id FK
        datetime assigned_at
        datetime expires_at
    }
    ROLE_PERMISSION {
        int role_id FK
        int permission_id FK
    }
    

RBAC 层次

from enum import Enum
from dataclasses import dataclass, field

class Permission(Enum):
    READ = "read"
    WRITE = "write"
    DELETE = "delete"
    ADMIN = "admin"

@dataclass
class Role:
    name: str
    permissions: set[Permission]
    parent: 'Role | None' = None  # 层次 RBAC

    def has_permission(self, perm: Permission) -> bool:
        if perm in self.permissions:
            return True
        if self.parent:
            return self.parent.has_permission(perm)
        return False

# 角色层次:Admin > Editor > Viewer
viewer = Role("viewer", {Permission.READ})
editor = Role("editor", {Permission.WRITE}, parent=viewer)
admin = Role("admin", {Permission.DELETE, Permission.ADMIN}, parent=editor)

assert admin.has_permission(Permission.READ)    # 继承自 viewer
assert admin.has_permission(Permission.WRITE)   # 继承自 editor
assert admin.has_permission(Permission.DELETE)   # 自身权限
assert not viewer.has_permission(Permission.WRITE)  # 无此权限

RBAC 的局限性

问题

描述

角色爆炸

细粒度需求导致角色数量指数增长

缺乏上下文

无法表达”只在工作时间访问”

资源级控制弱

难以表达”只能访问自己的文档”

动态权限难

权限变更需要修改角色定义

Python: SQLAlchemy 模型 + FastAPI RBAC 中间件

"""
RBAC 实现:SQLAlchemy 模型 + FastAPI 中间件
依赖: pip install fastapi sqlalchemy uvicorn python-jose[cryptography]
"""
from __future__ import annotations
import enum
from datetime import datetime, timezone
from typing import Optional

from fastapi import FastAPI, Depends, HTTPException, status, Request
from sqlalchemy import (
    create_engine, Column, Integer, String, DateTime,
    ForeignKey, Enum as SAEnum, Table
)
from sqlalchemy.orm import (
    DeclarativeBase, Session, relationship, sessionmaker
)

# ── SQLAlchemy 模型 ──────────────────────────────────────────

class Base(DeclarativeBase):
    pass

# 用户-角色 多对多关联表
user_role_table = Table(
    "user_roles", Base.metadata,
    Column("user_id", Integer, ForeignKey("users.id"), primary_key=True),
    Column("role_id", Integer, ForeignKey("roles.id"), primary_key=True),
    Column("assigned_at", DateTime, default=lambda: datetime.now(timezone.utc)),
)

# 角色-权限 多对多关联表
role_permission_table = Table(
    "role_permissions", Base.metadata,
    Column("role_id", Integer, ForeignKey("roles.id"), primary_key=True),
    Column("permission_id", Integer, ForeignKey("permissions.id"), primary_key=True),
)


class ActionEnum(str, enum.Enum):
    READ = "read"
    WRITE = "write"
    DELETE = "delete"
    ADMIN = "admin"


class User(Base):
    __tablename__ = "users"
    id = Column(Integer, primary_key=True)
    username = Column(String(64), unique=True, nullable=False)
    email = Column(String(128), unique=True)
    roles = relationship("Role", secondary=user_role_table, back_populates="users")


class Role(Base):
    __tablename__ = "roles"
    id = Column(Integer, primary_key=True)
    name = Column(String(64), unique=True, nullable=False)
    parent_id = Column(Integer, ForeignKey("roles.id"), nullable=True)
    parent = relationship("Role", remote_side=[id])
    users = relationship("User", secondary=user_role_table, back_populates="roles")
    permissions = relationship(
        "Permission", secondary=role_permission_table, back_populates="roles"
    )

    def all_permissions(self) -> set[str]:
        """递归收集本角色及所有父角色的权限"""
        perms = {f"{p.resource}:{p.action}" for p in self.permissions}
        if self.parent:
            perms |= self.parent.all_permissions()
        return perms


class Permission(Base):
    __tablename__ = "permissions"
    id = Column(Integer, primary_key=True)
    resource = Column(String(64), nullable=False)
    action = Column(SAEnum(ActionEnum), nullable=False)
    roles = relationship("Role", secondary=role_permission_table, back_populates="permissions")


# ── 数据库初始化 ─────────────────────────────────────────────

engine = create_engine("sqlite:///rbac.db", echo=False)
Base.metadata.create_all(engine)
SessionLocal = sessionmaker(bind=engine)


def get_db():
    db = SessionLocal()
    try:
        yield db
    finally:
        db.close()


# ── FastAPI 应用 + RBAC 中间件 ───────────────────────────────

app = FastAPI(title="RBAC Demo")


def get_current_user(request: Request, db: Session = Depends(get_db)) -> User:
    """从请求头中提取用户(简化版,实际应验证 JWT)"""
    username = request.headers.get("X-User")
    if not username:
        raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="缺少用户标识")
    user = db.query(User).filter(User.username == username).first()
    if not user:
        raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="用户不存在")
    return user


def require_permission(resource: str, action: ActionEnum):
    """RBAC 权限检查装饰器工厂"""
    def checker(user: User = Depends(get_current_user)):
        required = f"{resource}:{action.value}"
        # 收集用户所有角色的全部权限(含继承)
        user_perms: set[str] = set()
        for role in user.roles:
            user_perms |= role.all_permissions()
        if required not in user_perms:
            raise HTTPException(
                status_code=status.HTTP_403_FORBIDDEN,
                detail=f"缺少权限: {required}",
            )
        return user
    return checker


@app.get("/api/documents")
def list_documents(user: User = Depends(require_permission("document", ActionEnum.READ))):
    return {"message": f"{user.username} 可以读取文档列表"}


@app.delete("/api/documents/{doc_id}")
def delete_document(
    doc_id: int,
    user: User = Depends(require_permission("document", ActionEnum.DELETE)),
):
    return {"message": f"{user.username} 删除了文档 {doc_id}"}

Python: ABAC 策略引擎

"""
ABAC 策略引擎:基于属性的决策函数
支持主体属性、资源属性、操作属性、环境属性四维决策
"""
from __future__ import annotations
import ipaddress
from dataclasses import dataclass, field
from datetime import datetime, time, timezone
from enum import Enum
from typing import Any, Callable


class Decision(Enum):
    ALLOW = "allow"
    DENY = "deny"
    NOT_APPLICABLE = "not_applicable"


@dataclass
class Attributes:
    """通用属性容器"""
    data: dict[str, Any] = field(default_factory=dict)

    def get(self, key: str, default: Any = None) -> Any:
        return self.data.get(key, default)


@dataclass
class AccessRequest:
    """访问请求,包含四维属性"""
    subject: Attributes    # 主体属性:角色、部门、安全等级等
    resource: Attributes   # 资源属性:类型、密级、拥有者等
    action: Attributes     # 操作属性:动作类型
    environment: Attributes  # 环境属性:时间、IP、设备等


# 条件函数类型:接收 AccessRequest,返回 bool
Condition = Callable[[AccessRequest], bool]


@dataclass
class Policy:
    """ABAC 策略"""
    name: str
    description: str
    effect: Decision  # ALLOW 或 DENY
    conditions: list[Condition] = field(default_factory=list)

    def evaluate(self, request: AccessRequest) -> Decision:
        """评估策略:所有条件都满足时返回 effect,否则返回 NOT_APPLICABLE"""
        try:
            if all(cond(request) for cond in self.conditions):
                return self.effect
            return Decision.NOT_APPLICABLE
        except Exception:
            # 条件评估出错时,安全地返回 NOT_APPLICABLE
            return Decision.NOT_APPLICABLE


class ABACEngine:
    """ABAC 策略引擎,支持 deny-override 合并策略"""

    def __init__(self) -> None:
        self._policies: list[Policy] = []

    def add_policy(self, policy: Policy) -> None:
        self._policies.append(policy)

    def evaluate(self, request: AccessRequest) -> Decision:
        """
        评估所有策略(deny-override):
        - 任何一条 DENY → 最终 DENY
        - 至少一条 ALLOW 且无 DENY → 最终 ALLOW
        - 全部 NOT_APPLICABLE → 最终 DENY(默认拒绝)
        """
        results = [p.evaluate(request) for p in self._policies]
        if Decision.DENY in results:
            return Decision.DENY
        if Decision.ALLOW in results:
            return Decision.ALLOW
        return Decision.DENY  # 默认拒绝


# ── 常用条件构造器 ───────────────────────────────────────────

def subject_has_role(role: str) -> Condition:
    return lambda req: role in req.subject.get("roles", [])

def resource_type_is(rtype: str) -> Condition:
    return lambda req: req.resource.get("type") == rtype

def action_is(action: str) -> Condition:
    return lambda req: req.action.get("name") == action

def during_business_hours() -> Condition:
    def check(req: AccessRequest) -> bool:
        now = req.environment.get("time", datetime.now(timezone.utc))
        return time(9, 0) <= now.time() <= time(18, 0)
    return check

def from_internal_network() -> Condition:
    def check(req: AccessRequest) -> bool:
        ip = req.environment.get("ip", "0.0.0.0")
        return ipaddress.ip_address(ip) in ipaddress.ip_network("10.0.0.0/8")
    return check

def resource_classification_at_most(level: int) -> Condition:
    """资源密级不超过指定级别(0=公开, 1=内部, 2=机密, 3=绝密)"""
    return lambda req: req.resource.get("classification", 0) <= level


# ── 使用示例 ─────────────────────────────────────────────────

engine = ABACEngine()

# 策略1: 编辑者在工作时间从内网可以编辑内部文档
engine.add_policy(Policy(
    name="editor-internal-docs",
    description="编辑者在工作时间从内网可以编辑内部文档",
    effect=Decision.ALLOW,
    conditions=[
        subject_has_role("editor"),
        resource_type_is("document"),
        action_is("edit"),
        resource_classification_at_most(1),  # 内部及以下
        during_business_hours(),
        from_internal_network(),
    ],
))

# 策略2: 禁止任何人删除绝密文档
engine.add_policy(Policy(
    name="deny-delete-top-secret",
    description="禁止删除绝密文档",
    effect=Decision.DENY,
    conditions=[
        resource_type_is("document"),
        action_is("delete"),
        lambda req: req.resource.get("classification", 0) >= 3,
    ],
))

# 测试
request = AccessRequest(
    subject=Attributes({"roles": ["editor"], "department": "engineering"}),
    resource=Attributes({"type": "document", "classification": 1}),
    action=Attributes({"name": "edit"}),
    environment=Attributes({
        "time": datetime(2025, 6, 15, 14, 30, tzinfo=timezone.utc),
        "ip": "10.1.2.100",
    }),
)
result = engine.evaluate(request)
print(f"决策: {result.value}")  # 输出: 决策: allow

Python: Casbin 集成

"""
Casbin Python 集成示例
依赖: pip install casbin
"""
import casbin
import os
import tempfile


def create_casbin_example():
    """创建 Casbin RBAC 示例"""

    # ── 模型定义(PERM 元模型)──────────────────────────────
    model_text = """
[request_definition]
r = sub, obj, act

[policy_definition]
p = sub, obj, act

[role_definition]
g = _, _

[policy_effect]
e = some(where (p.eft == allow))

[matchers]
m = g(r.sub, p.sub) && r.obj == p.obj && r.act == p.act
"""

    # ── 策略定义 ─────────────────────────────────────────────
    policy_text = """
p, admin, document, read
p, admin, document, write
p, admin, document, delete
p, editor, document, read
p, editor, document, write
p, viewer, document, read

g, alice, admin
g, bob, editor
g, carol, viewer
"""

    # 写入临时文件
    model_path = os.path.join(tempfile.gettempdir(), "model.conf")
    policy_path = os.path.join(tempfile.gettempdir(), "policy.csv")

    with open(model_path, "w") as f:
        f.write(model_text)
    with open(policy_path, "w") as f:
        f.write(policy_text)

    # 创建 enforcer
    enforcer = casbin.Enforcer(model_path, policy_path)

    # ── 权限检查 ─────────────────────────────────────────────
    test_cases = [
        ("alice", "document", "delete", True),   # admin 可以删除
        ("bob", "document", "write", True),       # editor 可以写
        ("bob", "document", "delete", False),     # editor 不能删除
        ("carol", "document", "read", True),      # viewer 可以读
        ("carol", "document", "write", False),    # viewer 不能写
    ]

    for sub, obj, act, expected in test_cases:
        result = enforcer.enforce(sub, obj, act)
        status = "✅" if result == expected else "❌"
        print(f"{status} {sub} -> {obj}:{act} = {result}")

    # ── 动态添加策略 ─────────────────────────────────────────
    enforcer.add_policy("editor", "config", "read")
    enforcer.add_grouping_policy("dave", "editor")
    print(f"\n动态添加后: dave -> config:read = {enforcer.enforce('dave', 'config', 'read')}")

    # 清理
    os.unlink(model_path)
    os.unlink(policy_path)


if __name__ == "__main__":
    create_casbin_example()

Java: Spring Security RBAC

/**
 * Spring Security RBAC 实现
 * 使用 @PreAuthorize + 自定义 PermissionEvaluator
 *
 * 依赖 (build.gradle):
 *   implementation 'org.springframework.boot:spring-boot-starter-security'
 *   implementation 'org.springframework.boot:spring-boot-starter-web'
 *   implementation 'org.springframework.boot:spring-boot-starter-data-jpa'
 */
package com.example.rbac;

import org.springframework.security.access.PermissionEvaluator;
import org.springframework.security.access.prepost.PreAuthorize;
import org.springframework.security.config.annotation.method.configuration.EnableMethodSecurity;
import org.springframework.security.core.Authentication;
import org.springframework.security.core.GrantedAuthority;
import org.springframework.stereotype.Component;
import org.springframework.stereotype.Service;
import org.springframework.web.bind.annotation.*;

import jakarta.persistence.*;
import java.io.Serializable;
import java.util.*;

// ── JPA 实体 ────────────────────────────────────────────────

@Entity
@Table(name = "users")
class AppUser {
    @Id @GeneratedValue(strategy = GenerationType.IDENTITY)
    private Long id;

    @Column(unique = true, nullable = false)
    private String username;

    @ManyToMany(fetch = FetchType.EAGER)
    @JoinTable(
        name = "user_roles",
        joinColumns = @JoinColumn(name = "user_id"),
        inverseJoinColumns = @JoinColumn(name = "role_id")
    )
    private Set<AppRole> roles = new HashSet<>();

    // getters/setters 省略
    public Long getId() { return id; }
    public String getUsername() { return username; }
    public Set<AppRole> getRoles() { return roles; }
}

@Entity
@Table(name = "roles")
class AppRole {
    @Id @GeneratedValue(strategy = GenerationType.IDENTITY)
    private Long id;

    @Column(unique = true, nullable = false)
    private String name;

    @ManyToMany(fetch = FetchType.EAGER)
    @JoinTable(
        name = "role_permissions",
        joinColumns = @JoinColumn(name = "role_id"),
        inverseJoinColumns = @JoinColumn(name = "permission_id")
    )
    private Set<AppPermission> permissions = new HashSet<>();

    @ManyToOne
    @JoinColumn(name = "parent_id")
    private AppRole parent;

    /** 递归收集所有权限(含父角色继承) */
    public Set<String> allPermissions() {
        Set<String> perms = new HashSet<>();
        for (AppPermission p : permissions) {
            perms.add(p.getResource() + ":" + p.getAction());
        }
        if (parent != null) {
            perms.addAll(parent.allPermissions());
        }
        return perms;
    }

    public String getName() { return name; }
    public Set<AppPermission> getPermissions() { return permissions; }
}

@Entity
@Table(name = "permissions")
class AppPermission {
    @Id @GeneratedValue(strategy = GenerationType.IDENTITY)
    private Long id;

    @Column(nullable = false)
    private String resource;

    @Column(nullable = false)
    private String action;

    public String getResource() { return resource; }
    public String getAction() { return action; }
}

// ── 自定义 PermissionEvaluator ──────────────────────────────

@Component
class RbacPermissionEvaluator implements PermissionEvaluator {

    /**
     * 检查用户是否拥有指定权限
     * @param authentication 当前认证信息
     * @param targetDomainObject 资源标识(如 "document")
     * @param permission 操作(如 "read")
     */
    @Override
    public boolean hasPermission(
            Authentication authentication,
            Object targetDomainObject,
            Object permission) {

        if (authentication == null || !authentication.isAuthenticated()) {
            return false;
        }

        String required = targetDomainObject + ":" + permission;

        // 从 GrantedAuthority 中检查权限
        return authentication.getAuthorities().stream()
                .map(GrantedAuthority::getAuthority)
                .anyMatch(auth -> auth.equals(required) || auth.equals("ROLE_ADMIN"));
    }

    @Override
    public boolean hasPermission(
            Authentication authentication,
            Serializable targetId,
            String targetType,
            Object permission) {
        // 支持基于 ID 的权限检查(可扩展为资源级别控制)
        return hasPermission(authentication, targetType, permission);
    }
}

// ── 业务服务层 ──────────────────────────────────────────────

@Service
class DocumentService {

    @PreAuthorize("hasPermission('document', 'read')")
    public List<String> listDocuments() {
        return List.of("doc-1", "doc-2", "doc-3");
    }

    @PreAuthorize("hasPermission('document', 'write')")
    public String createDocument(String title) {
        return "Created: " + title;
    }

    @PreAuthorize("hasPermission('document', 'delete')")
    public String deleteDocument(Long id) {
        return "Deleted: " + id;
    }
}

// ── REST 控制器 ─────────────────────────────────────────────

@RestController
@RequestMapping("/api/documents")
@EnableMethodSecurity
class DocumentController {

    private final DocumentService documentService;

    DocumentController(DocumentService documentService) {
        this.documentService = documentService;
    }

    @GetMapping
    public List<String> list() {
        return documentService.listDocuments();
    }

    @PostMapping
    public String create(@RequestParam String title) {
        return documentService.createDocument(title);
    }

    @DeleteMapping("/{id}")
    public String delete(@PathVariable Long id) {
        return documentService.deleteDocument(id);
    }
}

Java: ABAC 实现(SpEL 表达式)

/**
 * Spring Security ABAC 实现
 * 使用 SpEL 表达式进行基于属性的访问控制
 */
package com.example.abac;

import org.springframework.security.access.prepost.PreAuthorize;
import org.springframework.security.core.Authentication;
import org.springframework.stereotype.Component;
import org.springframework.stereotype.Service;
import org.springframework.web.bind.annotation.*;

import jakarta.servlet.http.HttpServletRequest;
import java.time.LocalTime;
import java.util.*;

// ── 属性上下文 ──────────────────────────────────────────────

/** 安全属性上下文,供 SpEL 表达式引用 */
@Component("abac")
class AbacContext {

    /** 检查当前是否在工作时间 */
    public boolean isDuringBusinessHours() {
        LocalTime now = LocalTime.now();
        return now.isAfter(LocalTime.of(9, 0)) && now.isBefore(LocalTime.of(18, 0));
    }

    /** 检查请求 IP 是否来自内网 */
    public boolean isFromInternalNetwork(HttpServletRequest request) {
        String ip = request.getRemoteAddr();
        return ip != null && (ip.startsWith("10.") || ip.startsWith("192.168.")
                || ip.equals("127.0.0.1") || ip.equals("0:0:0:0:0:0:0:1"));
    }

    /** 检查用户部门是否匹配 */
    public boolean isInDepartment(Authentication auth, String department) {
        // 实际项目中从 JWT claims 或用户属性中获取
        Object details = auth.getDetails();
        if (details instanceof Map) {
            return department.equals(((Map<?, ?>) details).get("department"));
        }
        return false;
    }

    /** 检查资源密级是否在用户可访问范围内 */
    public boolean canAccessClassification(Authentication auth, int classification) {
        // 从用户属性中获取安全等级
        Object details = auth.getDetails();
        if (details instanceof Map) {
            Integer userLevel = (Integer) ((Map<?, ?>) details).get("clearance_level");
            return userLevel != null && userLevel >= classification;
        }
        return false;
    }
}

// ── 使用 SpEL 表达式的服务 ──────────────────────────────────

@Service
class SecureDocumentService {

    /**
     * 仅在工作时间、内网环境下,拥有 editor 角色的用户可以编辑
     * SpEL 表达式引用了 @abac bean 的方法
     */
    @PreAuthorize(
        "hasRole('EDITOR') " +
        "and @abac.isDuringBusinessHours() " +
        "and @abac.isFromInternalNetwork(#request)"
    )
    public String editDocument(Long docId, String content, HttpServletRequest request) {
        return "Document " + docId + " updated";
    }

    /**
     * 仅 engineering 部门的用户可以访问技术文档
     */
    @PreAuthorize(
        "@abac.isInDepartment(authentication, 'engineering') " +
        "and @abac.canAccessClassification(authentication, #classification)"
    )
    public String readTechnicalDoc(Long docId, int classification) {
        return "Technical document " + docId;
    }
}

Java: Apache Shiro 示例

/**
 * Apache Shiro RBAC 示例
 *
 * 依赖 (build.gradle):
 *   implementation 'org.apache.shiro:shiro-core:2.0.1'
 */
package com.example.shiro;

import org.apache.shiro.SecurityUtils;
import org.apache.shiro.authc.*;
import org.apache.shiro.authz.*;
import org.apache.shiro.mgt.DefaultSecurityManager;
import org.apache.shiro.realm.SimpleAccountRealm;
import org.apache.shiro.subject.Subject;

public class ShiroRbacExample {

    public static void main(String[] args) {
        // ── 配置 Realm(用户/角色/权限存储)────────────────
        SimpleAccountRealm realm = new SimpleAccountRealm();

        // 添加用户及其角色
        realm.addAccount("alice", "password123", "admin", "editor");
        realm.addAccount("bob", "password456", "editor");
        realm.addAccount("carol", "password789", "viewer");

        // 添加角色权限
        realm.addRole("admin");
        realm.addRole("editor");
        realm.addRole("viewer");

        // ── 初始化 SecurityManager ──────────────────────────
        DefaultSecurityManager securityManager = new DefaultSecurityManager(realm);
        SecurityUtils.setSecurityManager(securityManager);

        // ── 认证 + 授权 ────────────────────────────────────
        Subject subject = SecurityUtils.getSubject();

        try {
            // 登录
            subject.login(new UsernamePasswordToken("alice", "password123"));
            System.out.println("✅ alice 登录成功");

            // 角色检查
            System.out.println("是 admin? " + subject.hasRole("admin"));       // true
            System.out.println("是 viewer? " + subject.hasRole("viewer"));     // false

            // 权限检查(Shiro 通配符权限)
            // 需要自定义 Realm 来支持细粒度权限,这里演示角色检查
            subject.checkRole("admin");  // 通过
            System.out.println("✅ admin 角色检查通过");

            try {
                subject.checkRole("superadmin");  // 失败
            } catch (AuthorizationException e) {
                System.out.println("❌ superadmin 角色检查失败: " + e.getMessage());
            }

            // 登出
            subject.logout();
            System.out.println("✅ alice 已登出");

        } catch (AuthenticationException e) {
            System.err.println("认证失败: " + e.getMessage());
        }
    }
}

Go: Casbin RBAC/ABAC 完整示例

/*
 * Casbin Go RBAC/ABAC 完整示例
 * 依赖: go get github.com/casbin/casbin/v2
 */
package main

import (
	"fmt"
	"log"
	"os"
	"path/filepath"

	"github.com/casbin/casbin/v2"
)

// ── RBAC 示例 ───────────────────────────────────────────────

func rbacExample() {
	dir, _ := os.MkdirTemp("", "casbin")
	defer os.RemoveAll(dir)

	// RBAC 模型
	modelPath := filepath.Join(dir, "rbac_model.conf")
	os.WriteFile(modelPath, []byte(`
[request_definition]
r = sub, obj, act

[policy_definition]
p = sub, obj, act

[role_definition]
g = _, _

[policy_effect]
e = some(where (p.eft == allow))

[matchers]
m = g(r.sub, p.sub) && r.obj == p.obj && r.act == p.act
`), 0644)

	// 策略文件
	policyPath := filepath.Join(dir, "rbac_policy.csv")
	os.WriteFile(policyPath, []byte(`
p, admin, document, read
p, admin, document, write
p, admin, document, delete
p, editor, document, read
p, editor, document, write
p, viewer, document, read

g, alice, admin
g, bob, editor
g, carol, viewer
`), 0644)

	e, err := casbin.NewEnforcer(modelPath, policyPath)
	if err != nil {
		log.Fatalf("创建 enforcer 失败: %v", err)
	}

	// 权限检查
	tests := []struct {
		sub, obj, act string
		expected      bool
	}{
		{"alice", "document", "delete", true},
		{"bob", "document", "write", true},
		{"bob", "document", "delete", false},
		{"carol", "document", "read", true},
		{"carol", "document", "write", false},
	}

	fmt.Println("=== RBAC 示例 ===")
	for _, t := range tests {
		ok, err := e.Enforce(t.sub, t.obj, t.act)
		if err != nil {
			log.Printf("检查失败: %v", err)
			continue
		}
		status := "✅"
		if ok != t.expected {
			status = "❌"
		}
		fmt.Printf("%s %s -> %s:%s = %v\n", status, t.sub, t.obj, t.act, ok)
	}

	// 动态添加策略
	_, _ = e.AddPolicy("editor", "config", "read")
	_, _ = e.AddGroupingPolicy("dave", "editor")
	ok, _ := e.Enforce("dave", "config", "read")
	fmt.Printf("\n动态添加后: dave -> config:read = %v\n", ok)
}

// ── ABAC 示例 ───────────────────────────────────────────────

// ABACRequest 表示 ABAC 请求的属性
type ABACRequest struct {
	Age        int
	Department string
}

func abacExample() {
	dir, _ := os.MkdirTemp("", "casbin-abac")
	defer os.RemoveAll(dir)

	// ABAC 模型(使用 eval() 函数评估属性表达式)
	modelPath := filepath.Join(dir, "abac_model.conf")
	os.WriteFile(modelPath, []byte(`
[request_definition]
r = sub, obj, act

[policy_definition]
p = sub_rule, obj, act

[policy_effect]
e = some(where (p.eft == allow))

[matchers]
m = eval(p.sub_rule) && r.obj == p.obj && r.act == p.act
`), 0644)

	policyPath := filepath.Join(dir, "abac_policy.csv")
	os.WriteFile(policyPath, []byte(`
p, r.sub.Age > 18 && r.sub.Department == "engineering", document, read
p, r.sub.Department == "engineering", code, write
`), 0644)

	e, err := casbin.NewEnforcer(modelPath, policyPath)
	if err != nil {
		log.Fatalf("创建 ABAC enforcer 失败: %v", err)
	}

	fmt.Println("\n=== ABAC 示例 ===")

	// 满足条件的请求
	user1 := ABACRequest{Age: 25, Department: "engineering"}
	ok, _ := e.Enforce(user1, "document", "read")
	fmt.Printf("25岁工程师读文档: %v\n", ok) // true

	// 不满足年龄条件
	user2 := ABACRequest{Age: 16, Department: "engineering"}
	ok, _ = e.Enforce(user2, "document", "read")
	fmt.Printf("16岁工程师读文档: %v\n", ok) // false

	// 不满足部门条件
	user3 := ABACRequest{Age: 30, Department: "marketing"}
	ok, _ = e.Enforce(user3, "document", "read")
	fmt.Printf("30岁市场部读文档: %v\n", ok) // false
}

func main() {
	rbacExample()
	abacExample()
}

Go: 自定义 RBAC 中间件(Gin)

/*
 * Gin 框架自定义 RBAC 中间件
 * 依赖:
 *   go get github.com/gin-gonic/gin
 *   go get github.com/patrickmn/go-cache
 */
package main

import (
	"fmt"
	"net/http"
	"sync"
	"time"

	"github.com/gin-gonic/gin"
	"github.com/patrickmn/go-cache"
)

// ── 数据模型 ────────────────────────────────────────────────

// Permission 表示一个权限
type Permission struct {
	Resource string `json:"resource"`
	Action   string `json:"action"`
}

func (p Permission) String() string {
	return p.Resource + ":" + p.Action
}

// Role 表示一个角色
type Role struct {
	Name        string       `json:"name"`
	Permissions []Permission `json:"permissions"`
	ParentName  string       `json:"parent_name,omitempty"` // 父角色名称
}

// User 表示一个用户
type User struct {
	Username string   `json:"username"`
	Roles    []string `json:"roles"`
}

// ── RBAC 管理器(带缓存)────────────────────────────────────

// RBACManager 管理角色和权限,带权限缓存
type RBACManager struct {
	mu    sync.RWMutex
	roles map[string]*Role
	users map[string]*User
	cache *cache.Cache // 权限缓存
}

// NewRBACManager 创建新的 RBAC 管理器
func NewRBACManager() *RBACManager {
	return &RBACManager{
		roles: make(map[string]*Role),
		users: make(map[string]*User),
		// 缓存过期时间 5 分钟,清理间隔 10 分钟
		cache: cache.New(5*time.Minute, 10*time.Minute),
	}
}

// AddRole 添加角色
func (m *RBACManager) AddRole(role *Role) {
	m.mu.Lock()
	defer m.mu.Unlock()
	m.roles[role.Name] = role
	m.cache.Flush() // 角色变更时清空缓存
}

// AddUser 添加用户
func (m *RBACManager) AddUser(user *User) {
	m.mu.Lock()
	defer m.mu.Unlock()
	m.users[user.Username] = user
	m.cache.Flush()
}

// GetUser 获取用户
func (m *RBACManager) GetUser(username string) (*User, bool) {
	m.mu.RLock()
	defer m.mu.RUnlock()
	u, ok := m.users[username]
	return u, ok
}

// collectPermissions 递归收集角色的所有权限(含继承)
func (m *RBACManager) collectPermissions(roleName string, visited map[string]bool) map[string]bool {
	if visited[roleName] {
		return nil // 防止循环继承
	}
	visited[roleName] = true

	perms := make(map[string]bool)
	role, ok := m.roles[roleName]
	if !ok {
		return perms
	}

	for _, p := range role.Permissions {
		perms[p.String()] = true
	}

	// 递归收集父角色权限
	if role.ParentName != "" {
		for k, v := range m.collectPermissions(role.ParentName, visited) {
			perms[k] = v
		}
	}
	return perms
}

// HasPermission 检查用户是否拥有指定权限(带缓存)
func (m *RBACManager) HasPermission(username, resource, action string) bool {
	cacheKey := fmt.Sprintf("%s:%s:%s", username, resource, action)

	// 先查缓存
	if cached, found := m.cache.Get(cacheKey); found {
		return cached.(bool)
	}

	m.mu.RLock()
	defer m.mu.RUnlock()

	user, ok := m.users[username]
	if !ok {
		m.cache.Set(cacheKey, false, cache.DefaultExpiration)
		return false
	}

	required := resource + ":" + action
	for _, roleName := range user.Roles {
		perms := m.collectPermissions(roleName, make(map[string]bool))
		if perms[required] {
			m.cache.Set(cacheKey, true, cache.DefaultExpiration)
			return true
		}
	}

	m.cache.Set(cacheKey, false, cache.DefaultExpiration)
	return false
}

// ── Gin 中间件 ──────────────────────────────────────────────

// RequirePermission 返回 Gin 中间件,检查指定权限
func RequirePermission(manager *RBACManager, resource, action string) gin.HandlerFunc {
	return func(c *gin.Context) {
		// 从请求头获取用户名(实际项目中应从 JWT 中提取)
		username := c.GetHeader("X-User")
		if username == "" {
			c.AbortWithStatusJSON(http.StatusUnauthorized, gin.H{
				"error": "缺少用户标识",
			})
			return
		}

		// 检查用户是否存在
		if _, ok := manager.GetUser(username); !ok {
			c.AbortWithStatusJSON(http.StatusUnauthorized, gin.H{
				"error": "用户不存在",
			})
			return
		}

		// 检查权限
		if !manager.HasPermission(username, resource, action) {
			c.AbortWithStatusJSON(http.StatusForbidden, gin.H{
				"error":    "权限不足",
				"required": resource + ":" + action,
			})
			return
		}

		c.Set("username", username)
		c.Next()
	}
}

// ── 主函数 ──────────────────────────────────────────────────

func main() {
	manager := NewRBACManager()

	// 配置角色层次: admin > editor > viewer
	manager.AddRole(&Role{
		Name:       "viewer",
		Permissions: []Permission{{Resource: "document", Action: "read"}},
	})
	manager.AddRole(&Role{
		Name:       "editor",
		Permissions: []Permission{{Resource: "document", Action: "write"}},
		ParentName: "viewer",
	})
	manager.AddRole(&Role{
		Name: "admin",
		Permissions: []Permission{
			{Resource: "document", Action: "delete"},
			{Resource: "config", Action: "write"},
		},
		ParentName: "editor",
	})

	// 配置用户
	manager.AddUser(&User{Username: "alice", Roles: []string{"admin"}})
	manager.AddUser(&User{Username: "bob", Roles: []string{"editor"}})
	manager.AddUser(&User{Username: "carol", Roles: []string{"viewer"}})

	// 创建 Gin 路由
	r := gin.Default()

	api := r.Group("/api")
	{
		api.GET("/documents",
			RequirePermission(manager, "document", "read"),
			func(c *gin.Context) {
				c.JSON(http.StatusOK, gin.H{
					"user":      c.GetString("username"),
					"documents": []string{"doc-1", "doc-2"},
				})
			},
		)
		api.POST("/documents",
			RequirePermission(manager, "document", "write"),
			func(c *gin.Context) {
				c.JSON(http.StatusCreated, gin.H{
					"user":    c.GetString("username"),
					"message": "文档已创建",
				})
			},
		)
		api.DELETE("/documents/:id",
			RequirePermission(manager, "document", "delete"),
			func(c *gin.Context) {
				c.JSON(http.StatusOK, gin.H{
					"user":    c.GetString("username"),
					"message": "文档 " + c.Param("id") + " 已删除",
				})
			},
		)
	}

	fmt.Println("RBAC 服务启动于 :8080")
	r.Run(":8080")
}

13.5 ABAC — 基于属性的访问控制

ABAC 基于属性做出授权决策,更加灵活:

ABAC 策略评估流程

        flowchart TB
    REQ["📨 访问请求"]

    subgraph ATTRS["属性收集"]
        SA["🧑 主体属性<br/>角色=editor<br/>部门=engineering"]
        RA["📄 资源属性<br/>类型=document<br/>密级=internal"]
        AA["⚙️ 操作属性<br/>动作=edit"]
        EA["🌍 环境属性<br/>时间=工作日<br/>IP=内网"]
    end

    PDP["🧠 策略决策点 (PDP)<br/>评估所有匹配策略"]
    PE["📋 策略集<br/>Policy Store"]

    subgraph COMBINE["策略合并算法"]
        DO["deny-override<br/>任一 DENY → 拒绝"]
        PO["permit-override<br/>任一 ALLOW → 允许"]
        FA["first-applicable<br/>第一条匹配的策略"]
    end

    DECISION{{"决策结果"}}
    ALLOW["✅ 允许"]
    DENY["❌ 拒绝"]

    REQ --> ATTRS
    ATTRS --> PDP
    PE --> PDP
    PDP --> COMBINE
    COMBINE --> DECISION
    DECISION -->|"ALLOW"| ALLOW
    DECISION -->|"DENY"| DENY
    

RBAC vs ABAC 对比

维度

RBAC

ABAC

授权依据

角色

属性(多维度)

粒度

粗粒度

细粒度

灵活性

复杂度

上下文感知

动态策略

困难

容易

审计

简单

复杂

适用规模

中小型

大型

13.6 ReBAC — 基于关系的访问控制

ReBAC(Relationship-Based Access Control)基于实体之间的关系做出授权决策。Google 在 2019 年发表的 Zanzibar 论文奠定了 ReBAC 的基础。

Google Zanzibar

Zanzibar 是 Google 内部的全球授权系统,服务于 Google Drive、YouTube、Cloud 等产品。

核心概念 — 关系元组(Relation Tuple)

格式:object#relation@subject

示例:
document:budget#reader@user:alice
  → alice 是 budget 文档的 reader

document:budget#reader@group:finance#member
  → finance 组的所有 member 都是 budget 文档的 reader

folder:engineering#viewer@folder:root#viewer
  → root 文件夹的 viewer 也是 engineering 文件夹的 viewer(继承)

ReBAC 关系图示例

        flowchart LR
    alice["👤 user:alice"]
    bob["👤 user:bob"]
    eng["🏢 team:engineering"]
    acme["🏛️ org:acme"]
    shared["📁 folder:shared"]
    budget["📄 document:budget"]
    roadmap["📄 document:roadmap"]

    alice -->|"reader"| budget
    alice -->|"member"| eng
    bob -->|"member"| eng
    eng -->|"parent"| acme
    acme -->|"viewer"| shared
    shared -->|"parent"| roadmap

    style alice fill:#e3f2fd
    style bob fill:#e3f2fd
    style eng fill:#e8f5e9
    style acme fill:#fff3e0
    style shared fill:#fce4ec
    style budget fill:#f3e5f5
    style roadmap fill:#f3e5f5
    

关系链推导:Alice → member → engineering → parent → acme → viewer → shared → parent → roadmap,因此 Alice 可以查看 roadmap。

为什么 ReBAC 是未来趋势

RBAC 的问题:
"Alice 能编辑文档吗?" → 只能回答"能"或"不能"
无法区分"Alice 能编辑哪些文档"

ReBAC 的优势:
"Alice 能编辑文档 X 吗?" → 检查 Alice 与文档 X 的关系
"Alice 能编辑哪些文档?" → 遍历 Alice 的所有关系

13.7 访问控制模型对比

模型

授权依据

粒度

复杂度

适用场景

DAC

资源拥有者决定

文件系统

MAC

安全标签

军事/政府

RBAC

角色

企业应用

ABAC

属性

复杂策略

ReBAC

关系

协作应用、SaaS

各模型优缺点对比表

模型

优点

缺点

典型实现

DAC

简单直观;资源拥有者自主管理;实现成本低

难以集中管理;权限蔓延风险;无法防止信息泄露

Unix 文件权限、Windows ACL

MAC

安全性最高;防止信息泄露;系统强制执行

灵活性最低;管理成本高;用户体验差

SELinux、AppArmor、MLS

RBAC

易于理解和管理;审计简单;广泛支持

角色爆炸;缺乏上下文感知;资源级控制弱

Spring Security、Keycloak、AWS IAM

ABAC

细粒度控制;上下文感知;动态策略

策略管理复杂;性能开销;调试困难

OPA/Rego、AWS Cedar、XACML

ReBAC

直观的关系建模;细粒度资源级控制;高性能

关系图维护成本;学习曲线;生态尚不成熟

OpenFGA、SpiceDB、Ory Keto

选型决策树

        flowchart TD
    START{"你的授权需求是什么?"}

    Q1{"需要上下文感知?<br/>(时间/位置/设备)"}
    Q2{"需要资源级别的<br/>细粒度控制?"}
    Q3{"资源之间有<br/>层次/继承关系?"}
    Q4{"基于属性的条件?"}

    RBAC["✅ RBAC<br/>大多数应用的起点<br/>推荐: Spring Security / Casbin"]
    ABAC["✅ ABAC<br/>推荐: OPA / Cedar"]
    ReBAC["✅ ReBAC<br/>推荐: OpenFGA / SpiceDB"]
    ABAC_ReBAC["✅ ABAC + ReBAC 组合<br/>OPA + OpenFGA"]
    COMBO["✅ ReBAC + ABAC 组合<br/>OpenFGA + OPA"]

    START -->|"简单的角色权限"| RBAC
    START -->|"复杂需求"| Q1
    Q1 -->|"是"| ABAC
    Q1 -->|"否"| Q2
    Q2 -->|"否"| RBAC
    Q2 -->|"是"| Q3
    Q3 -->|"是"| ReBAC
    Q3 -->|"否"| Q4
    Q4 -->|"是"| ABAC_ReBAC
    Q4 -->|"否"| ReBAC
    START -->|"以上都需要"| COMBO

    style RBAC fill:#e0f2f1
    style ABAC fill:#fff3e0
    style ReBAC fill:#f3e5f5
    style ABAC_ReBAC fill:#fce4ec
    style COMBO fill:#e8eaf6
    

13.8 小结

  • DAC 简单但难以集中管理,适合文件系统

  • MAC 安全性最高但灵活性最低,适合军事/政府

  • RBAC 是最普及的模型,但存在角色爆炸和粒度不足的问题

  • ABAC 通过多维属性实现细粒度控制,但策略管理复杂

  • ReBAC 基于 Google Zanzibar,通过关系图实现直观的细粒度授权

  • 实际项目中,RBAC + ReBACRBAC + ABAC 的组合是最常见的选择