Skip to content

34. Simple OAuth2 with Password and Bearer


1. Simple OAuth2 with Password and Bearer

  • 이전 예제에서 누락된 부분을 추가하여 더 나은 보안 flow를 완성해 보자.


2. Get the username and password

  • FastAPI 보안 유틸리티를 사용하여 usernamepassword를 가져올 것이다.
  • OAuth2는 "password flow"를 사용할 때 클라이언트가 usernamepassword field를 form 데이터로서 보내야 한다고 지정한다.
  • 그리고 사양에는 꼭 usernamepassword와 같은 것으로 field 이름을 지어야 한다고 언급한다.
  • 즉, user-name 또는 email과 같은 field 이름은 허용되지 않는다.
  • 또한 로그인 path operation의 경우, 사양과 호환되도록 usernamepassword와 같은 것으로 이름을 지어야 한다.


1) scope

  • 또한 사양은 클라이언트가 다른 form field인 scope을 보낼 수 있다고 언급한다.
  • form field의 이름은 scope이지만, 실제로는 공백으로 나눠진 "scopes"의 문자열이다.
  • 그러므로 자세히 표현하자면 각 "scope"은 공백을 제외한 문자열을 의미한다.
  • 각 scope는 일반적으로 다음과 같은 특정 보안 권한을 선언하는 데 사용된다.


1] users:read 또는 users:write

  • 일반적으로 사용되는 것이다.

2] instagram_basic

  • Facebook/Instagram에서 사용한다.

3] https://www.googleapis.com/auth/drive

  • Google에서 사용한다.


3. Code to get the username and password

  • FastAPI에서 제공하는 유틸리티를 사용해 보자.


1) OAuth2PasswordRequestForm

  • 먼저, OAuth2PasswordRequestForm을 임포트한 후, /token에 대한 path operation에서 Depends를 통해 dependency로 사용한다.


from typing import Optional
from fastapi import Depends, FastAPI, HTTPException, status
from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm
from pydantic import BaseModel

app = FastAPI()

fake_users_db = {
    "johndoe": {
        "username": "johndoe",
        "full_name": "John Doe",
        "email": "johndoe@example.com",
        "hashed_password": "fakehashedsecret",
        "disabled": False,
    },
    "alice": {
        "username": "alice",
        "full_name": "Alice Wonderson",
        "email": "alice@example.com",
        "hashed_password": "fakehashedsecret2",
        "disabled": True,
    },
}

def fake_hash_password(password: str):
    return "fakehashed" + password

oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")

class User(BaseModel):
    username: str
    email: Optional[str] = None
    full_name: Optional[str] = None
    disabled: Optional[bool] = None

class UserInDB(User):
    hashed_password: str

def get_user(db, username: str):
    if username in db:
        user_dict = db[username]

        return UserInDB(user_dict)

def fake_decode_token(token):
    # This doesn't provide any security at all
    # Check the next version
    user = get_user(fake_users_db, token)

    return user

async def get_current_user(token: str = Depends(oauth2_scheme)):
    user = fake_decode_token(token)

    if not user:
        raise HTTPException(
            status_code=status.HTTP_401_UNAUTHORIZED,
            detail="Invalid authentication credentials",
            headers={"WWW-Authenticate": "Bearer"},
        )

    return user

async def get_current_active_user(current_user: User = Depends(get_current_user)):
    if current_user.disabled:
        raise HTTPException(status_code=400, detail="Inactive user")

    return current_user

@app.post("/token")
async def login(form_data: OAuth2PasswordRequestForm = Depends()):
    user_dict = fake_users_db.get(form_data.username)

    if not user_dict:
        raise HTTPException(status_code=400, detail="Incorrect username or password")

    user = UserInDB(user_dict)
    hashed_password = fake_hash_password(form_data.password)

    if not hashed_password == user.hashed_password:
        raise HTTPException(status_code=400, detail="Incorrect username or password")

    return {"access_token": user.username, "token_type": "bearer"}

@app.get("/users/me")
async def read_users_me(current_user: User = Depends(get_current_active_user)):
    return current_user


  • OAuth2PasswordRequestForm은 다음과 같은 것을 사용하여 form body를 선언하는 dependency 클래스이다.


1] username

2] password

3] optional scope field

4] optional grant_type

5] optional client_id

6] optional client_secret


2) Use the form data

  • form field의 username을 사용하여 데이터베이스에서 사용자 데이터를 가져온다.


from typing import Optional
from fastapi import Depends, FastAPI, HTTPException, status
from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm
from pydantic import BaseModel

app = FastAPI()

fake_users_db = {
    "johndoe": {
        "username": "johndoe",
        "full_name": "John Doe",
        "email": "johndoe@example.com",
        "hashed_password": "fakehashedsecret",
        "disabled": False,
    },
    "alice": {
        "username": "alice",
        "full_name": "Alice Wonderson",
        "email": "alice@example.com",
        "hashed_password": "fakehashedsecret2",
        "disabled": True,
    },
}

def fake_hash_password(password: str):
    return "fakehashed" + password

oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")

class User(BaseModel):
    username: str
    email: Optional[str] = None
    full_name: Optional[str] = None
    disabled: Optional[bool] = None

class UserInDB(User):
    hashed_password: str

def get_user(db, username: str):
    if username in db:
        user_dict = db[username]

        return UserInDB(user_dict)

def fake_decode_token(token):
    # This doesn't provide any security at all
    # Check the next version
    user = get_user(fake_users_db, token)

    return user

async def get_current_user(token: str = Depends(oauth2_scheme)):
    user = fake_decode_token(token)

    if not user:
        raise HTTPException(
            status_code=status.HTTP_401_UNAUTHORIZED,
            detail="Invalid authentication credentials",
            headers={"WWW-Authenticate": "Bearer"},
        )

    return user

async def get_current_active_user(current_user: User = Depends(get_current_user)):
    if current_user.disabled:
        raise HTTPException(status_code=400, detail="Inactive user")

    return current_user

@app.post("/token")
async def login(form_data: OAuth2PasswordRequestForm = Depends()):
    user_dict = fake_users_db.get(form_data.username)

    if not user_dict:
        raise HTTPException(status_code=400, detail="Incorrect username or password")

    user = UserInDB(user_dict)
    hashed_password = fake_hash_password(form_data.password)

    if not hashed_password == user.hashed_password:
        raise HTTPException(status_code=400, detail="Incorrect username or password")

    return {"access_token": user.username, "token_type": "bearer"}

@app.get("/users/me")
async def read_users_me(current_user: User = Depends(get_current_active_user)):
    return current_user


3) Check the password

  • 데이터베이스의 사용자 데이터는 있지만 아직 비밀번호는 확인하지 않았다.


  • 다음과 같이 해당 데이터를 Pydantic UserInDB 모델에 넣고, 일반 텍스트 비밀번호를 저장해서는 안 되므로 비밀번호 해싱 시스템을 사용한다.


from typing import Optional
from fastapi import Depends, FastAPI, HTTPException, status
from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm
from pydantic import BaseModel

app = FastAPI()

fake_users_db = {
    "johndoe": {
        "username": "johndoe",
        "full_name": "John Doe",
        "email": "johndoe@example.com",
        "hashed_password": "fakehashedsecret",
        "disabled": False,
    },
    "alice": {
        "username": "alice",
        "full_name": "Alice Wonderson",
        "email": "alice@example.com",
        "hashed_password": "fakehashedsecret2",
        "disabled": True,
    },
}

def fake_hash_password(password: str):
    return "fakehashed" + password

oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")

class User(BaseModel):
    username: str
    email: Optional[str] = None
    full_name: Optional[str] = None
    disabled: Optional[bool] = None

class UserInDB(User):
    hashed_password: str

def get_user(db, username: str):
    if username in db:
        user_dict = db[username]

        return UserInDB(user_dict)

def fake_decode_token(token):
    # This doesn't provide any security at all
    # Check the next version
    user = get_user(fake_users_db, token)

    return user

async def get_current_user(token: str = Depends(oauth2_scheme)):
    user = fake_decode_token(token)

    if not user:
        raise HTTPException(
            status_code=status.HTTP_401_UNAUTHORIZED,
            detail="Invalid authentication credentials",
            headers={"WWW-Authenticate": "Bearer"},
        )

    return user

async def get_current_active_user(current_user: User = Depends(get_current_user)):
    if current_user.disabled:
        raise HTTPException(status_code=400, detail="Inactive user")

    return current_user

@app.post("/token")
async def login(form_data: OAuth2PasswordRequestForm = Depends()):
    user_dict = fake_users_db.get(form_data.username)

    if not user_dict:
        raise HTTPException(status_code=400, detail="Incorrect username or password")

    user = UserInDB(user_dict)
    hashed_password = fake_hash_password(form_data.password)

    if not hashed_password == user.hashed_password:
        raise HTTPException(status_code=400, detail="Incorrect username or password")

    return {"access_token": user.username, "token_type": "bearer"}

@app.get("/users/me")
async def read_users_me(current_user: User = Depends(get_current_active_user)):
    return current_user


4. Return the token

  • token의 response는 JSON 객체이어야 한다.
  • 또한 token_type이 있어야 하는데, "Bearer" token을 사용하므로 token 타입은 bearer가 된다.
  • 그리고 액세스 token이 포함된 문자열인 access_token이 있어야 한다.


  • 다음과 같이 token으로서 같은 username을 반환하도록 작성할 수 있다.


from typing import Optional
from fastapi import Depends, FastAPI, HTTPException, status
from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm
from pydantic import BaseModel

app = FastAPI()

fake_users_db = {
    "johndoe": {
        "username": "johndoe",
        "full_name": "John Doe",
        "email": "johndoe@example.com",
        "hashed_password": "fakehashedsecret",
        "disabled": False,
    },
    "alice": {
        "username": "alice",
        "full_name": "Alice Wonderson",
        "email": "alice@example.com",
        "hashed_password": "fakehashedsecret2",
        "disabled": True,
    },
}

def fake_hash_password(password: str):
    return "fakehashed" + password

oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")

class User(BaseModel):
    username: str
    email: Optional[str] = None
    full_name: Optional[str] = None
    disabled: Optional[bool] = None

class UserInDB(User):
    hashed_password: str

def get_user(db, username: str):
    if username in db:
        user_dict = db[username]

        return UserInDB(user_dict)

def fake_decode_token(token):
    # This doesn't provide any security at all
    # Check the next version
    user = get_user(fake_users_db, token)

    return user

async def get_current_user(token: str = Depends(oauth2_scheme)):
    user = fake_decode_token(token)

    if not user:
        raise HTTPException(
            status_code=status.HTTP_401_UNAUTHORIZED,
            detail="Invalid authentication credentials",
            headers={"WWW-Authenticate": "Bearer"},
        )

    return user

async def get_current_active_user(current_user: User = Depends(get_current_user)):
    if current_user.disabled:
        raise HTTPException(status_code=400, detail="Inactive user")

    return current_user

@app.post("/token")
async def login(form_data: OAuth2PasswordRequestForm = Depends()):
    user_dict = fake_users_db.get(form_data.username)

    if not user_dict:
        raise HTTPException(status_code=400, detail="Incorrect username or password")

    user = UserInDB(user_dict)
    hashed_password = fake_hash_password(form_data.password)

    if not hashed_password == user.hashed_password:
        raise HTTPException(status_code=400, detail="Incorrect username or password")

    return {"access_token": user.username, "token_type": "bearer"}

@app.get("/users/me")
async def read_users_me(current_user: User = Depends(get_current_active_user)):
    return current_user


5. Update the dependencies

  • dependencies를 업데이트해야 하는데, 사용자가 활성화되어 있는 경우에만 current_user를 얻을 것이다.
  • 따라서 get_current_user를 dependency로 사용하는 추가적인 dependency인 get_current_active_user를 만든다.
  • 이러한 dependencies는 모두 사용자가 존재하지 않거나 비활성화된 경우 HTTP 에러를 반환한다.
  • 따라서 프론트엔드에서는 사용자가 존재하고 올바르게 인증되었으며, 활성 상태인 경우에만 사용자를 얻게 된다.


  • 다음과 같이 작성할 수 있다.


from typing import Optional
from fastapi import Depends, FastAPI, HTTPException, status
from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm
from pydantic import BaseModel

app = FastAPI()

fake_users_db = {
    "johndoe": {
        "username": "johndoe",
        "full_name": "John Doe",
        "email": "johndoe@example.com",
        "hashed_password": "fakehashedsecret",
        "disabled": False,
    },
    "alice": {
        "username": "alice",
        "full_name": "Alice Wonderson",
        "email": "alice@example.com",
        "hashed_password": "fakehashedsecret2",
        "disabled": True,
    },
}

def fake_hash_password(password: str):
    return "fakehashed" + password

oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")

class User(BaseModel):
    username: str
    email: Optional[str] = None
    full_name: Optional[str] = None
    disabled: Optional[bool] = None

class UserInDB(User):
    hashed_password: str

def get_user(db, username: str):
    if username in db:
        user_dict = db[username]

        return UserInDB(user_dict)

def fake_decode_token(token):
    # This doesn't provide any security at all
    # Check the next version
    user = get_user(fake_users_db, token)

    return user

async def get_current_user(token: str = Depends(oauth2_scheme)):
    user = fake_decode_token(token)

    if not user:
        raise HTTPException(
            status_code=status.HTTP_401_UNAUTHORIZED,
            detail="Invalid authentication credentials",
            headers={"WWW-Authenticate": "Bearer"},
        )

    return user

async def get_current_active_user(current_user: User = Depends(get_current_user)):
    if current_user.disabled:
        raise HTTPException(status_code=400, detail="Inactive user")

    return current_user

@app.post("/token")
async def login(form_data: OAuth2PasswordRequestForm = Depends()):
    user_dict = fake_users_db.get(form_data.username)

    if not user_dict:
        raise HTTPException(status_code=400, detail="Incorrect username or password")

    user = UserInDB(user_dict)
    hashed_password = fake_hash_password(form_data.password)

    if not hashed_password == user.hashed_password:
        raise HTTPException(status_code=400, detail="Incorrect username or password")

    return {"access_token": user.username, "token_type": "bearer"}

@app.get("/users/me")
async def read_users_me(current_user: User = Depends(get_current_active_user)):
    return current_user


6. See it in action

  • http://127.0.0.1:8000/docs로 이동해 보자.


1) Authenticate

  • "Authorize" 버튼을 클릭한 후 credentials를 다음과 같이 작성한다.


1] User

  • johndoe

2] Password

  • secret


001


  • 시스템에서 인증하고 나면 다음과 같이 표시된다.


002


2) Get your own user data

  • /users/me path로 GET operation을 사용해 보자.


  • 다음과 같은 사용자 데이터를 얻을 수 있다.


{
  "username": "johndoe",
  "email": "johndoe@example.com",
  "full_name": "John Doe",
  "disabled": false,
  "hashed_password": "fakehashedsecret"
}


003


  • 만약 잠금 아이콘을 클릭하고, 로그아웃한 다음 동일한 작업을 다시 시도하면 다음과 같은 HTTP 401 에러가 발생한다.


{
  "detail": "Not authenticated"
}


4) Inactivate user

  • 다음과 같이 비활성화된 사용자로 다시 한 번 시도해 보자.


1] User

  • alice

2] Password

  • secret2


  • /users/me path로 GET operation을 사용해 보자.


  • 다음과 같은 비활성화된 사용자 에러를 얻게 된다.


{
  "detail": "Inactive user"
}

References