34. Simple OAuth2 with Password and Bearer
1. Simple OAuth2 with Password and Bearer
- 이전 예제에서 누락된 부분을 추가하여 더 나은 보안 flow를 완성해 보자.
2. Get the username
and password
- FastAPI 보안 유틸리티를 사용하여
username
과password
를 가져올 것이다. - OAuth2는 "password flow"를 사용할 때 클라이언트가
username
과password
field를 form 데이터로서 보내야 한다고 지정한다. - 그리고 사양에는 꼭
username
과password
와 같은 것으로 field 이름을 지어야 한다고 언급한다. - 즉,
user-name
또는email
과 같은 field 이름은 허용되지 않는다. - 또한 로그인 path operation의 경우, 사양과 호환되도록
username
과password
와 같은 것으로 이름을 지어야 한다.
1) scope
- 또한 사양은 클라이언트가 다른 form field인
scope
을 보낼 수 있다고 언급한다. - form field의 이름은
scope
이지만, 실제로는 공백으로 나눠진 "scopes"의 문자열이다. - 그러므로 자세히 표현하자면 각 "scope"은 공백을 제외한 문자열을 의미한다.
- 각 scope는 일반적으로 다음과 같은 특정 보안 권한을 선언하는 데 사용된다.
1] users:read
또는 users:write
- 일반적으로 사용되는 것이다.
2] instagram_basic
- Facebook/Instagram에서 사용한다.
3] https://www.googleapis.com/auth/drive
- Google에서 사용한다.
3. Code to get the username
and password
- FastAPI에서 제공하는 유틸리티를 사용해 보자.
1) OAuth2PasswordRequestForm
- 먼저,
OAuth2PasswordRequestForm
을 임포트한 후,/token
에 대한 path operation에서Depends
를 통해 dependency로 사용한다.
from typing import Optional
from fastapi import Depends, FastAPI, HTTPException, status
from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm
from pydantic import BaseModel
app = FastAPI()
fake_users_db = {
"johndoe": {
"username": "johndoe",
"full_name": "John Doe",
"email": "johndoe@example.com",
"hashed_password": "fakehashedsecret",
"disabled": False,
},
"alice": {
"username": "alice",
"full_name": "Alice Wonderson",
"email": "alice@example.com",
"hashed_password": "fakehashedsecret2",
"disabled": True,
},
}
def fake_hash_password(password: str):
return "fakehashed" + password
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")
class User(BaseModel):
username: str
email: Optional[str] = None
full_name: Optional[str] = None
disabled: Optional[bool] = None
class UserInDB(User):
hashed_password: str
def get_user(db, username: str):
if username in db:
user_dict = db[username]
return UserInDB(user_dict)
def fake_decode_token(token):
# This doesn't provide any security at all
# Check the next version
user = get_user(fake_users_db, token)
return user
async def get_current_user(token: str = Depends(oauth2_scheme)):
user = fake_decode_token(token)
if not user:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid authentication credentials",
headers={"WWW-Authenticate": "Bearer"},
)
return user
async def get_current_active_user(current_user: User = Depends(get_current_user)):
if current_user.disabled:
raise HTTPException(status_code=400, detail="Inactive user")
return current_user
@app.post("/token")
async def login(form_data: OAuth2PasswordRequestForm = Depends()):
user_dict = fake_users_db.get(form_data.username)
if not user_dict:
raise HTTPException(status_code=400, detail="Incorrect username or password")
user = UserInDB(user_dict)
hashed_password = fake_hash_password(form_data.password)
if not hashed_password == user.hashed_password:
raise HTTPException(status_code=400, detail="Incorrect username or password")
return {"access_token": user.username, "token_type": "bearer"}
@app.get("/users/me")
async def read_users_me(current_user: User = Depends(get_current_active_user)):
return current_user
OAuth2PasswordRequestForm
은 다음과 같은 것을 사용하여 form body를 선언하는 dependency 클래스이다.
1] username
2] password
3] optional scope
field
4] optional grant_type
5] optional client_id
6] optional client_secret
2) Use the form data
- form field의
username
을 사용하여 데이터베이스에서 사용자 데이터를 가져온다.
from typing import Optional
from fastapi import Depends, FastAPI, HTTPException, status
from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm
from pydantic import BaseModel
app = FastAPI()
fake_users_db = {
"johndoe": {
"username": "johndoe",
"full_name": "John Doe",
"email": "johndoe@example.com",
"hashed_password": "fakehashedsecret",
"disabled": False,
},
"alice": {
"username": "alice",
"full_name": "Alice Wonderson",
"email": "alice@example.com",
"hashed_password": "fakehashedsecret2",
"disabled": True,
},
}
def fake_hash_password(password: str):
return "fakehashed" + password
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")
class User(BaseModel):
username: str
email: Optional[str] = None
full_name: Optional[str] = None
disabled: Optional[bool] = None
class UserInDB(User):
hashed_password: str
def get_user(db, username: str):
if username in db:
user_dict = db[username]
return UserInDB(user_dict)
def fake_decode_token(token):
# This doesn't provide any security at all
# Check the next version
user = get_user(fake_users_db, token)
return user
async def get_current_user(token: str = Depends(oauth2_scheme)):
user = fake_decode_token(token)
if not user:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid authentication credentials",
headers={"WWW-Authenticate": "Bearer"},
)
return user
async def get_current_active_user(current_user: User = Depends(get_current_user)):
if current_user.disabled:
raise HTTPException(status_code=400, detail="Inactive user")
return current_user
@app.post("/token")
async def login(form_data: OAuth2PasswordRequestForm = Depends()):
user_dict = fake_users_db.get(form_data.username)
if not user_dict:
raise HTTPException(status_code=400, detail="Incorrect username or password")
user = UserInDB(user_dict)
hashed_password = fake_hash_password(form_data.password)
if not hashed_password == user.hashed_password:
raise HTTPException(status_code=400, detail="Incorrect username or password")
return {"access_token": user.username, "token_type": "bearer"}
@app.get("/users/me")
async def read_users_me(current_user: User = Depends(get_current_active_user)):
return current_user
3) Check the password
- 데이터베이스의 사용자 데이터는 있지만 아직 비밀번호는 확인하지 않았다.
- 다음과 같이 해당 데이터를 Pydantic
UserInDB
모델에 넣고, 일반 텍스트 비밀번호를 저장해서는 안 되므로 비밀번호 해싱 시스템을 사용한다.
from typing import Optional
from fastapi import Depends, FastAPI, HTTPException, status
from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm
from pydantic import BaseModel
app = FastAPI()
fake_users_db = {
"johndoe": {
"username": "johndoe",
"full_name": "John Doe",
"email": "johndoe@example.com",
"hashed_password": "fakehashedsecret",
"disabled": False,
},
"alice": {
"username": "alice",
"full_name": "Alice Wonderson",
"email": "alice@example.com",
"hashed_password": "fakehashedsecret2",
"disabled": True,
},
}
def fake_hash_password(password: str):
return "fakehashed" + password
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")
class User(BaseModel):
username: str
email: Optional[str] = None
full_name: Optional[str] = None
disabled: Optional[bool] = None
class UserInDB(User):
hashed_password: str
def get_user(db, username: str):
if username in db:
user_dict = db[username]
return UserInDB(user_dict)
def fake_decode_token(token):
# This doesn't provide any security at all
# Check the next version
user = get_user(fake_users_db, token)
return user
async def get_current_user(token: str = Depends(oauth2_scheme)):
user = fake_decode_token(token)
if not user:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid authentication credentials",
headers={"WWW-Authenticate": "Bearer"},
)
return user
async def get_current_active_user(current_user: User = Depends(get_current_user)):
if current_user.disabled:
raise HTTPException(status_code=400, detail="Inactive user")
return current_user
@app.post("/token")
async def login(form_data: OAuth2PasswordRequestForm = Depends()):
user_dict = fake_users_db.get(form_data.username)
if not user_dict:
raise HTTPException(status_code=400, detail="Incorrect username or password")
user = UserInDB(user_dict)
hashed_password = fake_hash_password(form_data.password)
if not hashed_password == user.hashed_password:
raise HTTPException(status_code=400, detail="Incorrect username or password")
return {"access_token": user.username, "token_type": "bearer"}
@app.get("/users/me")
async def read_users_me(current_user: User = Depends(get_current_active_user)):
return current_user
4. Return the token
token
의 response는 JSON 객체이어야 한다.- 또한
token_type
이 있어야 하는데, "Bearer" token을 사용하므로 token 타입은bearer
가 된다. - 그리고 액세스 token이 포함된 문자열인
access_token
이 있어야 한다.
- 다음과 같이 token으로서 같은
username
을 반환하도록 작성할 수 있다.
from typing import Optional
from fastapi import Depends, FastAPI, HTTPException, status
from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm
from pydantic import BaseModel
app = FastAPI()
fake_users_db = {
"johndoe": {
"username": "johndoe",
"full_name": "John Doe",
"email": "johndoe@example.com",
"hashed_password": "fakehashedsecret",
"disabled": False,
},
"alice": {
"username": "alice",
"full_name": "Alice Wonderson",
"email": "alice@example.com",
"hashed_password": "fakehashedsecret2",
"disabled": True,
},
}
def fake_hash_password(password: str):
return "fakehashed" + password
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")
class User(BaseModel):
username: str
email: Optional[str] = None
full_name: Optional[str] = None
disabled: Optional[bool] = None
class UserInDB(User):
hashed_password: str
def get_user(db, username: str):
if username in db:
user_dict = db[username]
return UserInDB(user_dict)
def fake_decode_token(token):
# This doesn't provide any security at all
# Check the next version
user = get_user(fake_users_db, token)
return user
async def get_current_user(token: str = Depends(oauth2_scheme)):
user = fake_decode_token(token)
if not user:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid authentication credentials",
headers={"WWW-Authenticate": "Bearer"},
)
return user
async def get_current_active_user(current_user: User = Depends(get_current_user)):
if current_user.disabled:
raise HTTPException(status_code=400, detail="Inactive user")
return current_user
@app.post("/token")
async def login(form_data: OAuth2PasswordRequestForm = Depends()):
user_dict = fake_users_db.get(form_data.username)
if not user_dict:
raise HTTPException(status_code=400, detail="Incorrect username or password")
user = UserInDB(user_dict)
hashed_password = fake_hash_password(form_data.password)
if not hashed_password == user.hashed_password:
raise HTTPException(status_code=400, detail="Incorrect username or password")
return {"access_token": user.username, "token_type": "bearer"}
@app.get("/users/me")
async def read_users_me(current_user: User = Depends(get_current_active_user)):
return current_user
5. Update the dependencies
- dependencies를 업데이트해야 하는데, 사용자가 활성화되어 있는 경우에만
current_user
를 얻을 것이다. - 따라서
get_current_user
를 dependency로 사용하는 추가적인 dependency인get_current_active_user
를 만든다. - 이러한 dependencies는 모두 사용자가 존재하지 않거나 비활성화된 경우 HTTP 에러를 반환한다.
- 따라서 프론트엔드에서는 사용자가 존재하고 올바르게 인증되었으며, 활성 상태인 경우에만 사용자를 얻게 된다.
- 다음과 같이 작성할 수 있다.
from typing import Optional
from fastapi import Depends, FastAPI, HTTPException, status
from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm
from pydantic import BaseModel
app = FastAPI()
fake_users_db = {
"johndoe": {
"username": "johndoe",
"full_name": "John Doe",
"email": "johndoe@example.com",
"hashed_password": "fakehashedsecret",
"disabled": False,
},
"alice": {
"username": "alice",
"full_name": "Alice Wonderson",
"email": "alice@example.com",
"hashed_password": "fakehashedsecret2",
"disabled": True,
},
}
def fake_hash_password(password: str):
return "fakehashed" + password
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")
class User(BaseModel):
username: str
email: Optional[str] = None
full_name: Optional[str] = None
disabled: Optional[bool] = None
class UserInDB(User):
hashed_password: str
def get_user(db, username: str):
if username in db:
user_dict = db[username]
return UserInDB(user_dict)
def fake_decode_token(token):
# This doesn't provide any security at all
# Check the next version
user = get_user(fake_users_db, token)
return user
async def get_current_user(token: str = Depends(oauth2_scheme)):
user = fake_decode_token(token)
if not user:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid authentication credentials",
headers={"WWW-Authenticate": "Bearer"},
)
return user
async def get_current_active_user(current_user: User = Depends(get_current_user)):
if current_user.disabled:
raise HTTPException(status_code=400, detail="Inactive user")
return current_user
@app.post("/token")
async def login(form_data: OAuth2PasswordRequestForm = Depends()):
user_dict = fake_users_db.get(form_data.username)
if not user_dict:
raise HTTPException(status_code=400, detail="Incorrect username or password")
user = UserInDB(user_dict)
hashed_password = fake_hash_password(form_data.password)
if not hashed_password == user.hashed_password:
raise HTTPException(status_code=400, detail="Incorrect username or password")
return {"access_token": user.username, "token_type": "bearer"}
@app.get("/users/me")
async def read_users_me(current_user: User = Depends(get_current_active_user)):
return current_user
6. See it in action
http://127.0.0.1:8000/docs
로 이동해 보자.
1) Authenticate
- "Authorize" 버튼을 클릭한 후 credentials를 다음과 같이 작성한다.
1] User
johndoe
2] Password
secret
- 시스템에서 인증하고 나면 다음과 같이 표시된다.
2) Get your own user data
/users/me
path로GET
operation을 사용해 보자.
- 다음과 같은 사용자 데이터를 얻을 수 있다.
{
"username": "johndoe",
"email": "johndoe@example.com",
"full_name": "John Doe",
"disabled": false,
"hashed_password": "fakehashedsecret"
}
- 만약 잠금 아이콘을 클릭하고, 로그아웃한 다음 동일한 작업을 다시 시도하면 다음과 같은 HTTP 401 에러가 발생한다.
4) Inactivate user
- 다음과 같이 비활성화된 사용자로 다시 한 번 시도해 보자.
1] User
alice
2] Password
secret2
/users/me
path로GET
operation을 사용해 보자.
- 다음과 같은 비활성화된 사용자 에러를 얻게 된다.