关于OAuth2的密码式令牌

想写个不需要登录,无需维护Session或Cookie等状态的纯RESTful API,但是又需要有身份认证和权限鉴别功能。比如我有一套爬来的金融数据API,想让指定的用户使用,其他人访问就报401。最近用FastAPI,里面提供了OAuth2以及scopes的密码式令牌鉴权机制,十分方便,但是部分场景也容易造成信息泄露。

OAuth2的密码式令牌应在可信任网络中使用。

OAuth2密码式令牌应用场景

众所周知OAuth2有四种应用场景各不相同的鉴权方式,其中,密码式可以说是在互联网环境中最不安全的一种了,因为你需要将你在诸如微信的用户名密码一类的信息告诉一个第三方应用,而后第三方应用拿着你的用户名密码去向微信申请数据。如果是在互联网上使用这种验证方式,你必须极度信任这个第三方应用,不然就是社会性自杀。

那么,这样一个危险的鉴权方式有什么实际作用呢?那当然是可控网络环境中,对各业务系统API进行集中访问控制了——毕竟现在微服务这么火,就算是内网应用也应该试试开放几个API吐出点数据——这样PPT上就可以写打破数据孤岛了✌️。

比如,我有很多业务系统,以系统A为例。a.com开放了两个API:

  • 查看当前用户的信息http://a.com/me
  • 查看当前用户拥有的数据条目http://a.com/me/items

这时候有人想调用这俩API,而A系统管理员只希望指定的人用,此外,还有XYZ的各种业务系统,都有这种需求,如何在不使用session、cookie这种有状态身份认证的情况下(调用API还要先认证个session太麻烦了)直接在请求中附带一个通行证就调用API,还要保证安全和权限分级呢?

OAuth2密码式令牌应用环境

密码式令牌提供了一个简单的解决方案:

  1. 在A系统上开一个发通行证access_token的窗口:http://a.com/token
  • 系统中有个加解密的秘钥SECRET_KEY
  • 通行证窗口是一个可以被A系统全信任的窗口,不私存用户名密码,就是个工具窗口。
  • 通行证窗口接受用户认证信息,返回一个jwt库使用秘钥SECRET_KEY加密的通行证。
  • 通行证窗口接受方式规定为HTTP POST方式。接受如下的表单,其中scope为表示权限的字符串,以空格为分隔符:
    1
    2
    3
    4
    grant_type: password
    scope: me items
    username: johndoe
    password: secret
  • 通行证窗口拿到用户名密码后确认身份,并取出库中的权限列表与请求中的权限列表进行核对,如果请求的权限多于在册的权限,则http-401,反之则放行。
  • 通行证窗口返回的通行证为json格式,通行证的内容包含认证用户ID、权限列表、通行证有效期,例如
    1
    2
    3
    {"access_token":"eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI1NiJ9.eyJzdWIiOiJqb2huZG9lIiwic2NvcGVzIjpbIm1lIiwiaXRlbXMiXSwiZXhwIjoxNTk0MTI2MjkxfQ.AimqzTZg1t7XHstWi-048prkRAPv6-m-IM9OfGYio8A","token_type":"bearer"}
    // 其中access_token解密后的明文形为
    {'sub': 'johndoe', 'scopes': ['me', 'items'], 'exp': 1593831859}
  1. A系统提供了很多API:
  • 查看当前用户的信息http://a.com/me
  • 查看当前用户拥有的数据条目http://a.com/me/items
  • 系统中有个加解密的秘钥SECRET_KEY,与发通行证的窗口用的一样;
  • 这些API都有身份验证机制,具有独立的权限列表要求,需要提供OAuth2通行证令牌,jwt库使用秘钥SECRET_KEY对令牌解密,以验明正身。
  1. A系统管理员维护着一个用户及权限表,包括用户名username(如johndoe)、密码hashed_password、权限列表scope(如['b/me', 'b/items'])。

OAuth2密码式令牌调用流程

此时,第三方系统B想要调用业务系统A的API,比如有人想要调用http://a.com/me,那么A系统如何认证用户和权限呢?

  1. B需要获取A库表中的某个用户名密码:
  • 这个动作可以是A管理员给B的,比如我A系统就开一个API专用账户,你B拿着这个账号就随便临幸我A吧。(这种通常权限开的尺度比较大)
  • 也可以是A的一个用户给B的,用户aaa表示你B系统可以去A系统拿我的某些数据。
  1. B拿着用户名密码去A的通行证窗口申请通行证:
  • 通行证窗口就是负责接收用户名密码,确认在A的库表中后,发通行证。
  • 查无此人直接http-401。
  1. B拿到通行证,在每一次对A系统的API请求Header中附带这个通行证:
  • 例如,在访问a.com/me时加上Authorization头部字段,其值形为Bearer eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI1NiJ9.eyJzdWIiOiJqb2huZG9lIiwic2NvcGVzIjpbIm1lIiwiaXRlbXMiXSwiZXhwIjoxNTk0MTI2MjkxfQ.AimqzTZg1t7XHstWi-048prkRAPv6-m-IM9OfGYio8A
  1. A的API会拿到Authorization头部字段,取出其中的通行证,解密,获取其中的用户名、权限、有效期:
  • 若令牌可解密,且在有效期内,比对API所需的权限请求通行证提供的权限,若提供满足所需,则API响应并返回,反之则http-401。

OAuth2密码式令牌代码示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
from datetime import datetime, timedelta

import jwt
from jwt import PyJWTError
from passlib.context import CryptContext

from fastapi import Depends, FastAPI, HTTPException, Security, status
from fastapi.security import (
OAuth2PasswordBearer,
OAuth2PasswordRequestForm,
SecurityScopes,
)

from typing import Optional, List
from pydantic import BaseModel, ValidationError

# to get a string like this run:
# openssl rand -hex 32
SECRET_KEY = "4ff135e7c8c7375a00738b8f02cc016a7983ea345fe35735900495bea0cf9ce9"
ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES = 30


app = FastAPI()
VERSION = '1.0'

fake_users_db = {
"johndoe": {
"username": "johndoe",
"full_name": "John Doe",
"email": "johndoe@example.com",
"hashed_password": "$2b$12$EixZaYVK1fsbw1ZfbX3OXePaWxn96p36WQoeG6Lruj3vjPGga31lW",
"disabled": False,
},
"alice": {
"username": "alice",
"full_name": "Alice Chains",
"email": "alicechains@example.com",
"hashed_password": "$2b$12$gSvqqUPvlXP2tfVFaWK1Be7DlH.PKZbv5H8KnzzVgXXbVxpva.pFm",
"disabled": True,
},
}

class Token(BaseModel):
access_token: str
token_type: str


class TokenData(BaseModel):
username: str = None
scopes: List[str] = []


class User(BaseModel):
username: str
email: Optional[str] = None
full_name: Optional[str] = None
disabled: Optional[bool] = None


class UserInDB(User):
hashed_password: str


pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
oauth2_scheme = OAuth2PasswordBearer(
tokenUrl="/token",
scopes={
"me": "Read information about the current user.",
"items": "Read items."
},
)


def verify_password(plain_password, hashed_password):
return pwd_context.verify(plain_password, hashed_password)


def get_user(db, username: str):
if username in db:
user_dict = db[username]
return UserInDB(**user_dict)


def authenticate_user(fake_db, username: str, password: str):
user = get_user(fake_db, username)
if not user:
return False
if not verify_password(password, user.hashed_password):
return False
return user


def create_access_token(data: dict, expires_delta: timedelta = None):
to_encode = data.copy()
if expires_delta:
expire = datetime.utcnow() + expires_delta
else:
expire = datetime.utcnow() + timedelta(minutes=15)
to_encode.update({"exp": expire})
encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
return encoded_jwt


async def get_current_user(security_scopes: SecurityScopes, token: str = Depends(oauth2_scheme)):
# Request Headers里添加了: Authorization: Bearer eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI1NiJ9.eyJzdWIiOiJqb2huZG9lIiwic2NvcGVzIjpbIm1lIiwiaXRlbXMiXSwiZXhwIjoxNTkzODMyMTM2fQ.Ulq6S2zg0kETKdS_3NhcFho9M4-JhzHXcXWPZQJm0Cc
# curl -X GET "http://localhost:8910/users/me/" -H "accept: application/json" -H "Authorization: Bearer eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI1NiJ9.eyJzdWIiOiJqb2huZG9lIiwic2NvcGVzIjpbIm1lIiwiaXRlbXMiXSwiZXhwIjoxNTkzODMyMTM2fQ.Ulq6S2zg0kETKdS_3NhcFho9M4-JhzHXcXWPZQJm0Cc"
if security_scopes.scopes:
authenticate_value = f'Bearer scope="{security_scopes.scope_str}"'
else:
authenticate_value = f"Bearer"
credentials_exception = HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Could not validate credentials",
headers={"WWW-Authenticate": authenticate_value},
)
try:
payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM]) # payload解密为 {'sub': 'johndoe', 'scopes': ['me', 'items'], 'exp': 1593831859}
username: str = payload.get("sub")
if username is None:
raise credentials_exception
token_scopes = payload.get("scopes", [])
token_data = TokenData(scopes=token_scopes, username=username)
except (PyJWTError, ValidationError):
raise credentials_exception
user = get_user(fake_users_db, username=token_data.username)
if user is None:
raise credentials_exception
for scope in security_scopes.scopes: # security_scopes.scopes为当前请求所要求的scopes权限,如/me为['me'],/me/items为['items', 'me']
if scope not in token_data.scopes: # token_data.scopes为当前请求的token中持有的scopes权限,如果所需的scopes中有任何一条不在请求持有的的scopes时(即少持有任一条),均禁止访问
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Not enough permissions",
headers={"WWW-Authenticate": authenticate_value},
)
return user


async def get_current_active_user(current_user: User = Security(get_current_user, scopes=["me"])):
if current_user.disabled:
raise HTTPException(status_code=400, detail="Inactive user")
return current_user


@app.post("/token", response_model=Token)
async def login_for_access_token(form_data: OAuth2PasswordRequestForm = Depends()):
user = authenticate_user(fake_users_db, form_data.username, form_data.password)
if not user:
raise HTTPException(status_code=400, detail="Incorrect username or password")
access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
access_token = create_access_token(
data={"sub": user.username, "scopes": form_data.scopes},
expires_delta=access_token_expires,
)
return {"access_token": access_token, "token_type": "bearer"}


@app.get("/users/me/", response_model=User)
async def read_users_me(current_user: User = Depends(get_current_active_user)):
return current_user


@app.get("/users/me/items/")
async def read_own_items(current_user: User = Security(get_current_active_user, scopes=["items"])):
return [{"item_id": "Foo", "owner": current_user.username}]


@app.get("/status/")
async def read_system_status(current_user: User = Depends(get_current_user)):
return {"status": "ok"}

警告

在152行中,示例为了方便直接给令牌赋予了请求者索要的权限,在真实情况下,此处应该将请求索要的权限与数据库中记录的权限进行比对,若索要权限大于记录权限,则应产生“未授权”的异常。

总结

在网络环境较为透明可控的场景下,使用OAuth2的密码方式对开放的API进行身份及权限验证,是一种安全、可控、简便的管理方法。除上文描述的方法外,还可以将各应用系统的权限收口统一管理,维护一个用户权限库表,将通行证的发放维护在一个入口中,再于各应用系统中约定同一个加解密秘钥,可进一步方便管理。当然,这些方法的前提必须是网络环境较为透明可控。

评论

Your browser is out-of-date!

Update your browser to view this website correctly.&npsb;Update my browser now

×