# 第七章:OAuth 2.0 授权框架
> "OAuth 2.0 不是认证协议,它是授权框架。这个区别至关重要。"
```{mermaid}
mindmap
root((OAuth 2.0))
四个角色
Resource Owner
Client
Auth Server
Resource Server
授权模式
Authorization Code
Client Credentials
Device Code
PKCE
Token
Access Token
Refresh Token
Scope
安全
PKCE
Token 存储
OAuth 2.1
```
## 7.1 OAuth 2.0 解决什么问题
OAuth 2.0 解决的核心问题是**委托授权**:允许第三方应用在不获取用户密码的情况下,访问用户在另一个服务上的资源。
```
没有 OAuth 的世界:
用户:"这是我的 GitHub 密码,请帮我读取仓库列表"
第三方应用:拿到密码后可以做任何事情 😱
有了 OAuth:
用户:"我授权这个应用只读取我的仓库列表"
第三方应用:只能做用户授权的事情 ✅
```
## 7.2 四个角色
| 角色 | 说明 | 示例 |
|------|------|------|
| Resource Owner | 资源拥有者(通常是用户) | GitHub 用户 |
| Client | 第三方应用 | CI/CD 工具 |
| Authorization Server | 授权服务器 | GitHub OAuth |
| Resource Server | 资源服务器 | GitHub API |
## 7.3 授权码模式(Authorization Code)
最安全、最推荐的模式:
```{mermaid}
sequenceDiagram
participant U as 用户 (浏览器)
participant C as Client (后端)
participant AS as 授权服务器
participant RS as 资源服务器
U->>AS: 1. 重定向到授权端点
(client_id, redirect_uri, scope, state)
AS->>U: 2. 显示登录 & 授权页面
U->>AS: 3. 用户登录并同意授权
AS->>U: 4. 重定向回 Client
(携带 authorization_code + state)
U->>C: 5. 将 code 发送给 Client 后端
C->>AS: 6. 用 code + client_secret 换取 token
AS->>C: 7. 返回 access_token + refresh_token
C->>RS: 8. 用 access_token 访问资源
RS->>C: 9. 返回受保护的资源
```
### PKCE(Proof Key for Code Exchange)
PKCE 防止授权码被拦截:
```{mermaid}
sequenceDiagram
participant U as 用户 (浏览器)
participant C as Client (公开客户端)
participant AS as 授权服务器
participant RS as 资源服务器
Note over C: 生成 code_verifier (随机字符串)
计算 code_challenge = SHA256(code_verifier)
C->>AS: 1. 授权请求
(client_id, redirect_uri, scope, state,
code_challenge, code_challenge_method=S256)
AS->>U: 2. 显示登录 & 授权页面
U->>AS: 3. 用户登录并同意授权
AS->>U: 4. 重定向回 Client (携带 code + state)
U->>C: 5. 将 code 传递给 Client
C->>AS: 6. Token 请求
(code + code_verifier)
Note over AS: 验证 SHA256(code_verifier)
== 之前收到的 code_challenge
AS->>C: 7. 返回 access_token + refresh_token
C->>RS: 8. 用 access_token 访问资源
RS->>C: 9. 返回受保护的资源
```
**Python PKCE 示例:**
```python
import hashlib
import base64
import secrets
# 1. Client 生成 code_verifier(随机字符串)
code_verifier = secrets.token_urlsafe(32)
# 2. 计算 code_challenge
code_challenge = base64.urlsafe_b64encode(
hashlib.sha256(code_verifier.encode()).digest()
).rstrip(b'=').decode()
# 3. 授权请求携带 code_challenge
auth_url = (
f"https://auth.example.com/authorize?"
f"response_type=code&"
f"client_id=my_app&"
f"redirect_uri=https://myapp.com/callback&"
f"scope=read:repos&"
f"code_challenge={code_challenge}&"
f"code_challenge_method=S256"
)
# 4. Token 请求携带 code_verifier
token_request = {
"grant_type": "authorization_code",
"code": "received_auth_code",
"redirect_uri": "https://myapp.com/callback",
"client_id": "my_app",
"code_verifier": code_verifier, # 授权服务器验证这个
}
```
## 7.4 客户端凭证模式(Client Credentials)
适用于**机器对机器**(M2M)通信,没有用户参与:
```{mermaid}
sequenceDiagram
participant C as Client (服务端应用)
participant AS as 授权服务器
participant RS as 资源服务器
C->>AS: 1. Token 请求
(grant_type=client_credentials,
client_id, client_secret, scope)
AS->>AS: 2. 验证客户端凭证
AS->>C: 3. 返回 access_token
C->>RS: 4. 用 access_token 访问资源
RS->>C: 5. 返回受保护的资源
```
```python
import httpx
async def get_m2m_token():
"""机器对机器认证"""
async with httpx.AsyncClient() as client:
response = await client.post(
"https://auth.example.com/oauth/token",
data={
"grant_type": "client_credentials",
"client_id": "service-a",
"client_secret": "secret",
"scope": "read:data write:data",
}
)
token_data = response.json()
return token_data["access_token"]
```
## 7.5 设备码模式(Device Code)
适用于输入受限的设备(智能电视、CLI 工具):
```{mermaid}
sequenceDiagram
participant D as 设备 (无浏览器)
participant AS as 授权服务器
participant U as 用户 (手机/电脑)
D->>AS: 1. 请求 device_code
(client_id, scope)
AS->>D: 2. 返回 device_code + user_code
+ verification_uri
Note over D: 显示: 请访问 xxx.com
输入代码 ABCD-1234
loop 轮询 (每隔 interval 秒)
D->>AS: 3. 轮询 Token 端点
(device_code, grant_type=
urn:ietf:params:oauth:grant-type:device_code)
AS->>D: 4. 返回 authorization_pending
end
U->>AS: 5. 用户在浏览器中访问 verification_uri
输入 user_code 并授权
D->>AS: 6. 再次轮询 Token 端点
AS->>D: 7. 返回 access_token + refresh_token
```
## 7.6 Token 安全
### Access Token vs Refresh Token
| 特性 | Access Token | Refresh Token |
|------|-------------|---------------|
| 用途 | 访问资源 | 获取新的 Access Token |
| 生命周期 | 短(15分钟-1小时) | 长(天-月) |
| 存储位置 | 内存 | 安全存储(HttpOnly Cookie) |
| 传输方式 | Authorization Header | Token 端点 |
| 泄露影响 | 有限(短期) | 严重(可持续获取访问) |
### Token 刷新流程
```{mermaid}
sequenceDiagram
participant C as Client
participant AS as 授权服务器
participant RS as 资源服务器
C->>RS: 1. 用 access_token 访问资源
RS->>C: 2. 401 Unauthorized (token 已过期)
C->>AS: 3. 刷新请求
(grant_type=refresh_token,
refresh_token, client_id, client_secret)
AS->>AS: 4. 验证 refresh_token 有效性
AS->>C: 5. 返回新的 access_token
+ 新的 refresh_token (轮换)
Note over C: 废弃旧的 refresh_token
安全存储新的 refresh_token
C->>RS: 6. 用新的 access_token 访问资源
RS->>C: 7. 返回受保护的资源
```
### Token 存储最佳实践
| 存储方式 | 安全性 | 推荐场景 |
|---------|--------|---------|
| HttpOnly Cookie | ✅ 高 | Web 应用首选 |
| 内存(变量) | ✅ 高 | SPA 应用 |
| SessionStorage | ⚠️ 中 | 可接受 |
| LocalStorage | ❌ 低 | 不推荐 |
| URL 参数 | ❌ 极低 | 禁止 |
## 7.7 OAuth 2.1
OAuth 2.1 是 OAuth 2.0 的整合更新,主要变化:
- ✅ **PKCE 成为必须**(所有公开客户端)
- ❌ **移除 Implicit 模式**(不安全)
- ❌ **移除 Password 模式**(不安全)
- ✅ **Refresh Token 轮换**(每次使用后失效旧的)
- ✅ **精确的 redirect_uri 匹配**(禁止通配符)
## 7.8 FastAPI OAuth2 实现
```python
from fastapi import FastAPI, Depends, HTTPException, status
from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm
from jose import JWTError, jwt
from datetime import datetime, timedelta, timezone
app = FastAPI()
SECRET_KEY = "your-secret-key"
ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES = 30
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")
def create_access_token(data: dict, expires_delta: timedelta = None):
to_encode = data.copy()
expire = datetime.now(timezone.utc) + (expires_delta or timedelta(minutes=15))
to_encode.update({"exp": expire})
return jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
async def get_current_user(token: str = Depends(oauth2_scheme)):
credentials_exception = HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Could not validate credentials",
headers={"WWW-Authenticate": "Bearer"},
)
try:
payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
username: str = payload.get("sub")
if username is None:
raise credentials_exception
return {"username": username, "roles": payload.get("roles", [])}
except JWTError:
raise credentials_exception
@app.post("/token")
async def login(form_data: OAuth2PasswordRequestForm = Depends()):
# 验证用户凭证(简化示例)
user = authenticate_user(form_data.username, form_data.password)
if not user:
raise HTTPException(status_code=400, detail="Incorrect credentials")
access_token = create_access_token(
data={"sub": user["username"], "roles": user["roles"]},
expires_delta=timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
)
return {"access_token": access_token, "token_type": "bearer"}
@app.get("/users/me")
async def read_users_me(current_user: dict = Depends(get_current_user)):
return current_user
```
## 7.9 实战:三种语言实现 OAuth 2.0 客户端
本节提供 Python、Java、Go 三种语言的完整 OAuth 2.0 客户端实现,涵盖 Authorization Code Flow(含 PKCE)、Client Credentials Flow 和 Token 刷新。
### 7.9.1 Python 实现(Authlib + HTTPX)
#### Authorization Code Flow(含 PKCE)
```python
"""
OAuth 2.0 Authorization Code Flow with PKCE — Python 实现
依赖: pip install authlib httpx starlette uvicorn
"""
import secrets
import hashlib
import base64
import json
from pathlib import Path
import httpx
from starlette.applications import Starlette
from starlette.requests import Request
from starlette.responses import RedirectResponse, JSONResponse
from starlette.routing import Route
# ============================================================
# 配置 — 实际项目中应从环境变量或密钥管理服务读取
# ============================================================
OAUTH_CONFIG = {
"client_id": "my-python-app",
"client_secret": "my-client-secret", # 机密客户端才需要
"authorization_endpoint": "https://auth.example.com/authorize",
"token_endpoint": "https://auth.example.com/oauth/token",
"redirect_uri": "http://localhost:8000/callback",
"scope": "openid profile read:repos",
}
# 简易内存 session 存储 — 生产环境应使用 Redis 等安全存储
_sessions: dict[str, dict] = {}
def _generate_pkce_pair() -> tuple[str, str]:
"""生成 PKCE code_verifier 和 code_challenge"""
# code_verifier: 43-128 字符的高熵随机字符串
code_verifier = secrets.token_urlsafe(32)
# code_challenge: SHA-256 哈希后 Base64url 编码(去除填充)
digest = hashlib.sha256(code_verifier.encode("ascii")).digest()
code_challenge = base64.urlsafe_b64encode(digest).rstrip(b"=").decode("ascii")
return code_verifier, code_challenge
async def login(request: Request):
"""步骤 1: 将用户重定向到授权服务器"""
# 生成防 CSRF 的 state 参数
state = secrets.token_urlsafe(16)
# 生成 PKCE 对
code_verifier, code_challenge = _generate_pkce_pair()
# 将 state 和 code_verifier 存入 session(服务端保存)
_sessions[state] = {"code_verifier": code_verifier}
# 构造授权 URL
params = {
"response_type": "code",
"client_id": OAUTH_CONFIG["client_id"],
"redirect_uri": OAUTH_CONFIG["redirect_uri"],
"scope": OAUTH_CONFIG["scope"],
"state": state,
"code_challenge": code_challenge,
"code_challenge_method": "S256",
}
auth_url = (
OAUTH_CONFIG["authorization_endpoint"]
+ "?"
+ "&".join(f"{k}={v}" for k, v in params.items())
)
return RedirectResponse(url=auth_url)
async def callback(request: Request):
"""步骤 2: 处理授权服务器的回调,用 code 换取 token"""
code = request.query_params.get("code")
state = request.query_params.get("state")
error = request.query_params.get("error")
# 错误处理: 授权服务器返回错误
if error:
return JSONResponse(
{"error": error, "description": request.query_params.get("error_description")},
status_code=400,
)
# 安全检查: 验证 state 参数防止 CSRF
if state not in _sessions:
return JSONResponse({"error": "invalid_state", "detail": "State 不匹配,可能遭受 CSRF 攻击"}, status_code=403)
session = _sessions.pop(state) # 一次性使用
# 用 authorization_code + code_verifier 换取 token
async with httpx.AsyncClient() as client:
try:
resp = await client.post(
OAUTH_CONFIG["token_endpoint"],
data={
"grant_type": "authorization_code",
"code": code,
"redirect_uri": OAUTH_CONFIG["redirect_uri"],
"client_id": OAUTH_CONFIG["client_id"],
"client_secret": OAUTH_CONFIG["client_secret"],
"code_verifier": session["code_verifier"], # PKCE 验证
},
headers={"Accept": "application/json"},
)
resp.raise_for_status()
except httpx.HTTPStatusError as exc:
return JSONResponse(
{"error": "token_exchange_failed", "detail": exc.response.text},
status_code=502,
)
token_data = resp.json()
# 安全最佳实践: 不要将 token 暴露给前端,存储在服务端 session
session_id = secrets.token_urlsafe(16)
_sessions[session_id] = {
"access_token": token_data["access_token"],
"refresh_token": token_data.get("refresh_token"),
"expires_in": token_data.get("expires_in"),
}
response = JSONResponse({"message": "登录成功", "session_id": session_id})
# 通过 HttpOnly Cookie 传递 session_id
response.set_cookie(
key="session_id",
value=session_id,
httponly=True,
secure=True, # 仅 HTTPS
samesite="lax", # 防止 CSRF
max_age=3600,
)
return response
# 启动应用: uvicorn ch07_python_oauth:app --port 8000
app = Starlette(routes=[
Route("/login", login),
Route("/callback", callback),
])
```
#### Client Credentials Flow
```python
"""
OAuth 2.0 Client Credentials Flow — Python 实现
适用于服务间 (M2M) 通信,无用户参与
依赖: pip install httpx
"""
import asyncio
import time
import logging
import httpx
logger = logging.getLogger(__name__)
class OAuth2ClientCredentials:
"""带自动刷新的 Client Credentials Token 管理器"""
def __init__(
self,
token_endpoint: str,
client_id: str,
client_secret: str,
scope: str = "",
):
self.token_endpoint = token_endpoint
self.client_id = client_id
self.client_secret = client_secret
self.scope = scope
self._access_token: str | None = None
self._expires_at: float = 0 # UNIX 时间戳
async def get_token(self) -> str:
"""获取有效的 access_token,过期时自动刷新"""
# 提前 30 秒刷新,避免边界情况
if self._access_token and time.time() < (self._expires_at - 30):
return self._access_token
logger.info("正在获取新的 Client Credentials token...")
async with httpx.AsyncClient() as client:
try:
resp = await client.post(
self.token_endpoint,
data={
"grant_type": "client_credentials",
"client_id": self.client_id,
"client_secret": self.client_secret,
"scope": self.scope,
},
headers={"Accept": "application/json"},
)
resp.raise_for_status()
except httpx.HTTPStatusError as exc:
logger.error("Token 请求失败: %s", exc.response.text)
raise RuntimeError(f"无法获取 token: {exc.response.status_code}") from exc
data = resp.json()
self._access_token = data["access_token"]
self._expires_at = time.time() + data.get("expires_in", 3600)
logger.info("Token 获取成功,有效期 %d 秒", data.get("expires_in", 3600))
return self._access_token
async def call_api(self, method: str, url: str, **kwargs) -> httpx.Response:
"""使用 access_token 调用受保护的 API"""
token = await self.get_token()
headers = kwargs.pop("headers", {})
headers["Authorization"] = f"Bearer {token}"
async with httpx.AsyncClient() as client:
resp = await client.request(method, url, headers=headers, **kwargs)
# 如果收到 401,尝试刷新 token 后重试一次
if resp.status_code == 401:
logger.warning("Token 被拒绝,尝试刷新...")
self._access_token = None
token = await self.get_token()
headers["Authorization"] = f"Bearer {token}"
resp = await client.request(method, url, headers=headers, **kwargs)
return resp
# 使用示例
async def main():
oauth = OAuth2ClientCredentials(
token_endpoint="https://auth.example.com/oauth/token",
client_id="service-a",
client_secret="service-a-secret",
scope="read:data write:data",
)
# 调用受保护的 API
resp = await oauth.call_api("GET", "https://api.example.com/internal/data")
print(f"状态码: {resp.status_code}")
print(f"响应: {resp.json()}")
if __name__ == "__main__":
asyncio.run(main())
```
### 7.9.2 Java 实现(Spring Security OAuth2)
#### Authorization Code Flow 配置
```java
/*
* Spring Boot OAuth2 Client — Authorization Code Flow
*
* 依赖 (build.gradle):
* implementation 'org.springframework.boot:spring-boot-starter-oauth2-client'
* implementation 'org.springframework.boot:spring-boot-starter-web'
* implementation 'org.springframework.boot:spring-boot-starter-security'
*
* application.yml 配置见下方
*/
// ============================================================
// application.yml
// ============================================================
/*
spring:
security:
oauth2:
client:
registration:
github:
client-id: ${OAUTH_CLIENT_ID}
client-secret: ${OAUTH_CLIENT_SECRET}
scope: read:user,repo
redirect-uri: "{baseUrl}/login/oauth2/code/{registrationId}"
authorization-grant-type: authorization_code
client-authentication-method: client_secret_basic
provider:
github:
authorization-uri: https://github.com/login/oauth/authorize
token-uri: https://github.com/login/oauth/access_token
user-info-uri: https://api.github.com/user
user-name-attribute: login
*/
package com.example.oauth2demo;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.security.config.annotation.web.builders.HttpSecurity;
import org.springframework.security.config.annotation.web.configuration.EnableWebSecurity;
import org.springframework.security.web.SecurityFilterChain;
@Configuration
@EnableWebSecurity
public class SecurityConfig {
@Bean
public SecurityFilterChain securityFilterChain(HttpSecurity http) throws Exception {
http
// 所有请求都需要认证
.authorizeHttpRequests(auth -> auth
.requestMatchers("/", "/public/**").permitAll()
.anyRequest().authenticated()
)
// 启用 OAuth2 登录 (Authorization Code Flow)
.oauth2Login(oauth2 -> oauth2
.loginPage("/login")
// 登录成功后的处理
.defaultSuccessUrl("/dashboard", true)
// 自定义用户信息映射
.userInfoEndpoint(userInfo -> userInfo
.userService(customOAuth2UserService())
)
)
// 登出配置
.logout(logout -> logout
.logoutSuccessUrl("/")
.invalidateHttpSession(true)
.deleteCookies("JSESSIONID")
);
return http.build();
}
@Bean
public CustomOAuth2UserService customOAuth2UserService() {
return new CustomOAuth2UserService();
}
}
```
```java
package com.example.oauth2demo;
import org.springframework.security.oauth2.client.userinfo.DefaultOAuth2UserService;
import org.springframework.security.oauth2.client.userinfo.OAuth2UserRequest;
import org.springframework.security.oauth2.core.OAuth2AuthenticationException;
import org.springframework.security.oauth2.core.user.OAuth2User;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* 自定义 OAuth2 用户服务 — 在用户登录后执行额外逻辑
*/
public class CustomOAuth2UserService extends DefaultOAuth2UserService {
private static final Logger logger = LoggerFactory.getLogger(CustomOAuth2UserService.class);
@Override
public OAuth2User loadUser(OAuth2UserRequest userRequest) throws OAuth2AuthenticationException {
OAuth2User user = super.loadUser(userRequest);
// 记录登录事件(安全审计)
String provider = userRequest.getClientRegistration().getRegistrationId();
String username = user.getAttribute("login");
logger.info("用户通过 {} 登录: {}", provider, username);
// 可在此处同步用户信息到本地数据库
// userRepository.findOrCreate(provider, username, user.getAttributes());
return user;
}
}
```
```java
package com.example.oauth2demo;
import org.springframework.security.core.annotation.AuthenticationPrincipal;
import org.springframework.security.oauth2.core.user.OAuth2User;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
import java.util.Map;
/**
* 受保护的 API 端点 — 需要 OAuth2 登录后才能访问
*/
@RestController
public class UserController {
@GetMapping("/dashboard")
public Map dashboard(@AuthenticationPrincipal OAuth2User principal) {
return Map.of(
"name", principal.getAttribute("login"),
"avatar", principal.getAttribute("avatar_url"),
"message", "欢迎回来!"
);
}
@GetMapping("/api/profile")
public Map profile(@AuthenticationPrincipal OAuth2User principal) {
return Map.of(
"username", principal.getAttribute("login"),
"email", principal.getAttribute("email"),
"attributes", principal.getAttributes()
);
}
}
```
#### Resource Server JWT 验证
```java
/*
* Spring Boot OAuth2 Resource Server — JWT 验证
*
* 依赖 (build.gradle):
* implementation 'org.springframework.boot:spring-boot-starter-oauth2-resource-server'
* implementation 'org.springframework.boot:spring-boot-starter-web'
*
* application.yml:
* spring:
* security:
* oauth2:
* resourceserver:
* jwt:
* issuer-uri: https://auth.example.com/
* # 或直接指定 JWKS 端点:
* # jwk-set-uri: https://auth.example.com/.well-known/jwks.json
*/
package com.example.resourceserver;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.security.config.annotation.method.configuration.EnableMethodSecurity;
import org.springframework.security.config.annotation.web.builders.HttpSecurity;
import org.springframework.security.config.annotation.web.configuration.EnableWebSecurity;
import org.springframework.security.config.http.SessionCreationPolicy;
import org.springframework.security.oauth2.server.resource.authentication.JwtAuthenticationConverter;
import org.springframework.security.oauth2.server.resource.authentication.JwtGrantedAuthoritiesConverter;
import org.springframework.security.web.SecurityFilterChain;
@Configuration
@EnableWebSecurity
@EnableMethodSecurity // 启用 @PreAuthorize 注解
public class ResourceServerConfig {
@Bean
public SecurityFilterChain securityFilterChain(HttpSecurity http) throws Exception {
http
// Resource Server 不需要 session(无状态)
.sessionManagement(session ->
session.sessionCreationPolicy(SessionCreationPolicy.STATELESS)
)
.authorizeHttpRequests(auth -> auth
.requestMatchers("/health").permitAll()
.requestMatchers("/api/admin/**").hasAuthority("SCOPE_admin")
.requestMatchers("/api/**").hasAuthority("SCOPE_read:data")
.anyRequest().authenticated()
)
// 配置 JWT 验证
.oauth2ResourceServer(oauth2 -> oauth2
.jwt(jwt -> jwt
.jwtAuthenticationConverter(jwtAuthenticationConverter())
)
);
return http.build();
}
/**
* 自定义 JWT 到 Spring Security Authority 的映射
* 将 JWT 中的 scope claim 转换为 SCOPE_ 前缀的权限
*/
@Bean
public JwtAuthenticationConverter jwtAuthenticationConverter() {
JwtGrantedAuthoritiesConverter grantedAuthoritiesConverter =
new JwtGrantedAuthoritiesConverter();
grantedAuthoritiesConverter.setAuthoritiesClaimName("scope");
grantedAuthoritiesConverter.setAuthorityPrefix("SCOPE_");
JwtAuthenticationConverter converter = new JwtAuthenticationConverter();
converter.setJwtGrantedAuthoritiesConverter(grantedAuthoritiesConverter);
return converter;
}
}
```
```java
package com.example.resourceserver;
import org.springframework.security.access.prepost.PreAuthorize;
import org.springframework.security.core.annotation.AuthenticationPrincipal;
import org.springframework.security.oauth2.jwt.Jwt;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import java.util.Map;
@RestController
@RequestMapping("/api")
public class ProtectedResourceController {
/**
* 需要 read:data scope 才能访问
*/
@GetMapping("/data")
public Map getData(@AuthenticationPrincipal Jwt jwt) {
return Map.of(
"subject", jwt.getSubject(),
"issuer", jwt.getIssuer().toString(),
"scopes", jwt.getClaimAsString("scope"),
"data", "这是受保护的数据"
);
}
/**
* 需要 admin scope 才能访问(方法级别安全)
*/
@PreAuthorize("hasAuthority('SCOPE_admin')")
@GetMapping("/admin/settings")
public Map adminSettings(@AuthenticationPrincipal Jwt jwt) {
return Map.of(
"admin", jwt.getSubject(),
"settings", Map.of("feature_flag", true, "max_users", 1000)
);
}
}
```
#### Client Credentials Flow 调用
```java
/*
* Spring Boot OAuth2 Client Credentials — 服务间调用
*
* 依赖 (build.gradle):
* implementation 'org.springframework.boot:spring-boot-starter-oauth2-client'
* implementation 'org.springframework.boot:spring-boot-starter-webflux' // WebClient
*
* application.yml:
* spring:
* security:
* oauth2:
* client:
* registration:
* internal-service:
* client-id: ${SERVICE_CLIENT_ID}
* client-secret: ${SERVICE_CLIENT_SECRET}
* authorization-grant-type: client_credentials
* scope: read:data,write:data
* provider:
* internal-service:
* token-uri: https://auth.example.com/oauth/token
*/
package com.example.m2m;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.security.oauth2.client.*;
import org.springframework.security.oauth2.client.registration.ClientRegistrationRepository;
import org.springframework.security.oauth2.client.web.reactive.function.client.ServletOAuth2AuthorizedClientExchangeFilterFunction;
import org.springframework.web.reactive.function.client.WebClient;
@Configuration
public class WebClientConfig {
/**
* 配置带 OAuth2 Client Credentials 自动注入的 WebClient
* Spring Security 会自动管理 token 的获取和刷新
*/
@Bean
public WebClient oauth2WebClient(
ClientRegistrationRepository clientRegistrationRepository,
OAuth2AuthorizedClientService authorizedClientService) {
// 使用 AuthorizedClientManager 管理 token 生命周期
AuthorizedClientServiceOAuth2AuthorizedClientManager clientManager =
new AuthorizedClientServiceOAuth2AuthorizedClientManager(
clientRegistrationRepository, authorizedClientService);
// 配置 Client Credentials 提供者
clientManager.setAuthorizedClientProvider(
OAuth2AuthorizedClientProviderBuilder.builder()
.clientCredentials() // 启用 Client Credentials 授权
.build()
);
// 创建带 OAuth2 过滤器的 WebClient
ServletOAuth2AuthorizedClientExchangeFilterFunction oauth2Filter =
new ServletOAuth2AuthorizedClientExchangeFilterFunction(clientManager);
oauth2Filter.setDefaultClientRegistrationId("internal-service");
return WebClient.builder()
.apply(oauth2Filter.oauth2Configuration())
.baseUrl("https://api.example.com")
.build();
}
}
```
```java
package com.example.m2m;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Service;
import org.springframework.web.reactive.function.client.WebClient;
import org.springframework.web.reactive.function.client.WebClientResponseException;
import java.util.Map;
/**
* 使用 Client Credentials 调用内部服务的示例
*/
@Service
public class InternalApiService {
private static final Logger logger = LoggerFactory.getLogger(InternalApiService.class);
private final WebClient oauth2WebClient;
public InternalApiService(WebClient oauth2WebClient) {
this.oauth2WebClient = oauth2WebClient;
}
/**
* 调用内部 API — WebClient 自动附加 Bearer token
*/
public Map fetchInternalData() {
try {
@SuppressWarnings("unchecked")
Map result = oauth2WebClient
.get()
.uri("/internal/data")
.retrieve()
.bodyToMono(Map.class)
.block();
logger.info("成功获取内部数据");
return result;
} catch (WebClientResponseException.Unauthorized e) {
logger.error("认证失败: {}", e.getResponseBodyAsString());
throw new RuntimeException("服务间认证失败", e);
} catch (WebClientResponseException e) {
logger.error("API 调用失败 [{}]: {}", e.getStatusCode(), e.getResponseBodyAsString());
throw new RuntimeException("内部 API 调用失败: " + e.getStatusCode(), e);
}
}
}
```
### 7.9.3 Go 实现(golang.org/x/oauth2)
#### Authorization Code Flow(含 PKCE)
```go
/*
* OAuth 2.0 Authorization Code Flow with PKCE — Go 实现
*
* 依赖:
* go get golang.org/x/oauth2
*
* 运行:
* go run main.go
* 访问 http://localhost:8080/login
*/
package main
import (
"context"
"crypto/rand"
"crypto/sha256"
"encoding/base64"
"encoding/json"
"fmt"
"log"
"net/http"
"sync"
"time"
"golang.org/x/oauth2"
)
// OAuthConfig 存储 OAuth2 配置
var oauthConfig = &oauth2.Config{
ClientID: "my-go-app",
ClientSecret: "my-client-secret", // 机密客户端需要
Scopes: []string{"openid", "profile", "read:repos"},
Endpoint: oauth2.Endpoint{
AuthURL: "https://auth.example.com/authorize",
TokenURL: "https://auth.example.com/oauth/token",
},
RedirectURL: "http://localhost:8080/callback",
}
// Session 存储 — 生产环境应使用 Redis 等安全存储
type SessionStore struct {
mu sync.RWMutex
sessions map[string]*SessionData
}
type SessionData struct {
State string
CodeVerifier string
Token *oauth2.Token
CreatedAt time.Time
}
var store = &SessionStore{sessions: make(map[string]*SessionData)}
func (s *SessionStore) Set(key string, data *SessionData) {
s.mu.Lock()
defer s.mu.Unlock()
s.sessions[key] = data
}
func (s *SessionStore) Get(key string) (*SessionData, bool) {
s.mu.RLock()
defer s.mu.RUnlock()
data, ok := s.sessions[key]
return data, ok
}
func (s *SessionStore) Delete(key string) {
s.mu.Lock()
defer s.mu.Unlock()
delete(s.sessions, key)
}
// generateRandomString 生成密码学安全的随机字符串
func generateRandomString(n int) (string, error) {
b := make([]byte, n)
if _, err := rand.Read(b); err != nil {
return "", fmt.Errorf("生成随机字符串失败: %w", err)
}
return base64.RawURLEncoding.EncodeToString(b), nil
}
// generatePKCE 生成 PKCE code_verifier 和 code_challenge
func generatePKCE() (verifier, challenge string, err error) {
verifier, err = generateRandomString(32)
if err != nil {
return "", "", err
}
h := sha256.Sum256([]byte(verifier))
challenge = base64.RawURLEncoding.EncodeToString(h[:])
return verifier, challenge, nil
}
// handleLogin 处理登录请求 — 重定向到授权服务器
func handleLogin(w http.ResponseWriter, r *http.Request) {
// 生成防 CSRF 的 state 参数
state, err := generateRandomString(16)
if err != nil {
http.Error(w, "内部错误", http.StatusInternalServerError)
log.Printf("生成 state 失败: %v", err)
return
}
// 生成 PKCE 对
codeVerifier, codeChallenge, err := generatePKCE()
if err != nil {
http.Error(w, "内部错误", http.StatusInternalServerError)
log.Printf("生成 PKCE 失败: %v", err)
return
}
// 保存 state 和 code_verifier 到服务端 session
store.Set(state, &SessionData{
State: state,
CodeVerifier: codeVerifier,
CreatedAt: time.Now(),
})
// 构造授权 URL(包含 PKCE 参数)
url := oauthConfig.AuthCodeURL(
state,
oauth2.SetAuthURLParam("code_challenge", codeChallenge),
oauth2.SetAuthURLParam("code_challenge_method", "S256"),
)
http.Redirect(w, r, url, http.StatusTemporaryRedirect)
}
// handleCallback 处理授权回调 — 用 code 换取 token
func handleCallback(w http.ResponseWriter, r *http.Request) {
// 检查授权服务器是否返回错误
if errParam := r.URL.Query().Get("error"); errParam != "" {
desc := r.URL.Query().Get("error_description")
http.Error(w, fmt.Sprintf("授权失败: %s - %s", errParam, desc), http.StatusBadRequest)
return
}
state := r.URL.Query().Get("state")
code := r.URL.Query().Get("code")
// 安全检查: 验证 state 参数防止 CSRF
session, ok := store.Get(state)
if !ok {
http.Error(w, "无效的 state 参数,可能遭受 CSRF 攻击", http.StatusForbidden)
return
}
store.Delete(state) // state 一次性使用
// 检查 session 是否过期(防止重放攻击)
if time.Since(session.CreatedAt) > 10*time.Minute {
http.Error(w, "授权请求已过期", http.StatusBadRequest)
return
}
// 用 authorization_code + code_verifier 换取 token
ctx := context.Background()
token, err := oauthConfig.Exchange(
ctx,
code,
oauth2.SetAuthURLParam("code_verifier", session.CodeVerifier), // PKCE 验证
)
if err != nil {
http.Error(w, fmt.Sprintf("Token 交换失败: %v", err), http.StatusBadGateway)
log.Printf("Token 交换失败: %v", err)
return
}
// 安全存储 token(服务端 session)
sessionID, _ := generateRandomString(16)
store.Set(sessionID, &SessionData{
Token: token,
CreatedAt: time.Now(),
})
// 通过 HttpOnly Cookie 传递 session ID
http.SetCookie(w, &http.Cookie{
Name: "session_id",
Value: sessionID,
HttpOnly: true,
Secure: true,
SameSite: http.SameSiteLaxMode,
MaxAge: 3600,
Path: "/",
})
// 返回成功信息
w.Header().Set("Content-Type", "application/json")
json.NewEncoder(w).Encode(map[string]interface{}{
"message": "登录成功",
"token_type": token.TokenType,
"expiry": token.Expiry,
})
}
func main() {
http.HandleFunc("/login", handleLogin)
http.HandleFunc("/callback", handleCallback)
log.Println("OAuth2 客户端启动在 http://localhost:8080")
log.Println("访问 http://localhost:8080/login 开始授权流程")
if err := http.ListenAndServe(":8080", nil); err != nil {
log.Fatalf("服务器启动失败: %v", err)
}
}
```
#### Client Credentials Flow
```go
/*
* OAuth 2.0 Client Credentials Flow — Go 实现
* 适用于服务间 (M2M) 通信
*/
package main
import (
"context"
"encoding/json"
"fmt"
"io"
"log"
"net/http"
"sync"
"time"
"golang.org/x/oauth2"
"golang.org/x/oauth2/clientcredentials"
)
// M2MClient 封装了带自动 Token 管理的 HTTP 客户端
type M2MClient struct {
config *clientcredentials.Config
httpClient *http.Client
mu sync.RWMutex
}
// NewM2MClient 创建新的 M2M 客户端
func NewM2MClient(tokenURL, clientID, clientSecret string, scopes []string) *M2MClient {
config := &clientcredentials.Config{
ClientID: clientID,
ClientSecret: clientSecret,
TokenURL: tokenURL,
Scopes: scopes,
}
return &M2MClient{
config: config,
// golang.org/x/oauth2 自动管理 token 的获取和刷新
httpClient: config.Client(context.Background()),
}
}
// CallAPI 调用受保护的 API — token 自动附加到请求头
func (c *M2MClient) CallAPI(ctx context.Context, method, url string) (map[string]interface{}, error) {
req, err := http.NewRequestWithContext(ctx, method, url, nil)
if err != nil {
return nil, fmt.Errorf("创建请求失败: %w", err)
}
resp, err := c.httpClient.Do(req)
if err != nil {
return nil, fmt.Errorf("请求失败: %w", err)
}
defer resp.Body.Close()
body, err := io.ReadAll(resp.Body)
if err != nil {
return nil, fmt.Errorf("读取响应失败: %w", err)
}
if resp.StatusCode < 200 || resp.StatusCode >= 300 {
return nil, fmt.Errorf("API 返回错误 [%d]: %s", resp.StatusCode, string(body))
}
var result map[string]interface{}
if err := json.Unmarshal(body, &result); err != nil {
return nil, fmt.Errorf("解析 JSON 失败: %w", err)
}
return result, nil
}
// GetToken 手动获取当前 token(用于调试或日志)
func (c *M2MClient) GetToken(ctx context.Context) (*oauth2.Token, error) {
token, err := c.config.Token(ctx)
if err != nil {
return nil, fmt.Errorf("获取 token 失败: %w", err)
}
return token, nil
}
func main() {
client := NewM2MClient(
"https://auth.example.com/oauth/token",
"service-a",
"service-a-secret",
[]string{"read:data", "write:data"},
)
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
// 调用受保护的 API
data, err := client.CallAPI(ctx, "GET", "https://api.example.com/internal/data")
if err != nil {
log.Fatalf("API 调用失败: %v", err)
}
fmt.Printf("响应数据: %+v\n", data)
}
```
#### Token 刷新
```go
/*
* OAuth 2.0 Token 刷新 — Go 实现
* 演示如何安全地管理和刷新 token
*/
package main
import (
"context"
"encoding/json"
"fmt"
"log"
"sync"
"time"
"golang.org/x/oauth2"
)
// TokenManager 管理 OAuth2 token 的生命周期,包括自动刷新
type TokenManager struct {
config *oauth2.Config
token *oauth2.Token
mu sync.RWMutex
// onTokenRefresh 回调 — 用于持久化新 token
onTokenRefresh func(newToken *oauth2.Token) error
}
// NewTokenManager 创建 Token 管理器
func NewTokenManager(
config *oauth2.Config,
initialToken *oauth2.Token,
onRefresh func(*oauth2.Token) error,
) *TokenManager {
return &TokenManager{
config: config,
token: initialToken,
onTokenRefresh: onRefresh,
}
}
// GetValidToken 获取有效的 token,过期时自动刷新
func (tm *TokenManager) GetValidToken(ctx context.Context) (*oauth2.Token, error) {
tm.mu.RLock()
token := tm.token
tm.mu.RUnlock()
if token == nil {
return nil, fmt.Errorf("没有可用的 token,请先完成授权")
}
// 检查 token 是否即将过期(提前 30 秒刷新)
if time.Until(token.Expiry) > 30*time.Second {
return token, nil
}
// Token 已过期或即将过期,尝试刷新
log.Println("Token 即将过期,正在刷新...")
return tm.refreshToken(ctx)
}
// refreshToken 使用 refresh_token 获取新的 access_token
func (tm *TokenManager) refreshToken(ctx context.Context) (*oauth2.Token, error) {
tm.mu.Lock()
defer tm.mu.Unlock()
// 双重检查 — 可能其他 goroutine 已经刷新了
if time.Until(tm.token.Expiry) > 30*time.Second {
return tm.token, nil
}
// 使用 TokenSource 自动刷新
tokenSource := tm.config.TokenSource(ctx, tm.token)
newToken, err := tokenSource.Token()
if err != nil {
return nil, fmt.Errorf("刷新 token 失败: %w", err)
}
log.Printf("Token 刷新成功,新 token 有效期至: %s", newToken.Expiry.Format(time.RFC3339))
// 更新内存中的 token
tm.token = newToken
// 持久化新 token(安全存储)
if tm.onTokenRefresh != nil {
if err := tm.onTokenRefresh(newToken); err != nil {
log.Printf("警告: 持久化 token 失败: %v", err)
// 不返回错误 — token 本身是有效的
}
}
return newToken, nil
}
// GetHTTPClient 返回自动附加 token 的 HTTP 客户端
func (tm *TokenManager) GetHTTPClient(ctx context.Context) *http.Client {
return oauth2.NewClient(ctx, tm)
}
// Token 实现 oauth2.TokenSource 接口
func (tm *TokenManager) Token() (*oauth2.Token, error) {
return tm.GetValidToken(context.Background())
}
func main() {
config := &oauth2.Config{
ClientID: "my-go-app",
ClientSecret: "my-client-secret",
Endpoint: oauth2.Endpoint{
AuthURL: "https://auth.example.com/authorize",
TokenURL: "https://auth.example.com/oauth/token",
},
Scopes: []string{"openid", "profile"},
}
// 假设已通过 Authorization Code Flow 获取了初始 token
initialToken := &oauth2.Token{
AccessToken: "initial-access-token",
RefreshToken: "initial-refresh-token",
TokenType: "Bearer",
Expiry: time.Now().Add(5 * time.Minute),
}
// 创建 Token 管理器,注册刷新回调
tm := NewTokenManager(config, initialToken, func(newToken *oauth2.Token) error {
// 安全持久化 token — 生产环境应加密存储
data, _ := json.Marshal(map[string]string{
"access_token": newToken.AccessToken,
"refresh_token": newToken.RefreshToken,
"expiry": newToken.Expiry.Format(time.RFC3339),
})
log.Printf("持久化新 token: %s", string(data))
// 实际应写入加密文件、数据库或密钥管理服务
return nil
})
// 使用 Token 管理器获取有效 token
ctx := context.Background()
token, err := tm.GetValidToken(ctx)
if err != nil {
log.Fatalf("获取 token 失败: %v", err)
}
fmt.Printf("当前 token 有效期至: %s\n", token.Expiry.Format(time.RFC3339))
}
```
## 7.10 小结
- OAuth 2.0 是**授权框架**,不是认证协议
- **授权码 + PKCE** 是最安全的模式,OAuth 2.1 将其设为必须
- **Client Credentials** 适合机器对机器通信
- Token 应存储在安全位置,Access Token 保持短生命周期
- OAuth 2.1 移除了不安全的 Implicit 和 Password 模式
- 三种语言实现要点:
- **Python**:使用 `authlib` / `httpx`,适合快速原型和微服务
- **Java**:Spring Security OAuth2 提供声明式配置,适合企业级应用
- **Go**:`golang.org/x/oauth2` 提供轻量级但完整的 OAuth2 支持,适合云原生服务