6. Authentication
1. Mission
- 인증이 안 된 클라이언트를 제한해야 함 → Authentication (코드 구현에서는
Authorization
으로 지정함)
- 서버 내 Access Token을 생성한 후 클라이언트에게 전송
- 각각의 클라이언트는 전송 받은 Access Token을 HTTP headers에 담아 requests 전송
- Tgt_Lang Classifier 서버에서 requests headers를 확인하여 서버 내 존재하는 Access Token이면 뒷단으로 전송
2. Access Token Manager
1) bcrypt
설치
$(torchserve) pip install bcrypt
2) Access Token Manager 생성
- 다음과 같이 Access Token을 관리할 수 있도록
access_token_manager.py
를 생성한다.
import os
import json
import random
import bcrypt
class AccessTokenManager:
def __init__(self):
self.project_dir = os.path.dirname(os.path.realpath(__file__))
self.json_name = "access_token.json"
self.json_path = os.path.abspath(os.path.join(self.project_dir, self.json_name))
if not os.path.isfile(self.json_path):
self.json_data = {}
self.save()
self .load()
def save(self):
with open(self.json_path, "w") as json_file:
json.dump(self.json_data, json_file)
print(f"JSON file saved to {self.json_path}")
def load(self):
with open(self.json_path, "r") as json_file:
self.json_data = json.load(json_file)
print(f"JSON file loaded from {self.json_path}")
def generate_token(self, client_name: str):
password = str(random.SystemRandom(7)) + client_name
access_token = bcrypt.hashpw(password.encode("utf-8"), bcrypt.gensalt()).decode("utf-8")
print(f"Client Name {client_name} / Access Token {access_token} has generated!")
return access_token
def create(self, client_name: str):
if client_name in self.json_data.keys():
print(f"Client Name {client_name} already exists!")
else:
self.json_data[client_name] = self.generate_token(client_name)
def read(self, option: str):
if option == "all":
print(self.json_data)
else:
if option in self.json_data.keys():
print(self.json_data[option])
else:
print("Enter Valid Client Name!")
def update(self, client_name: str):
if client_name in self.json_data.keys():
option = input("Enter Update Option\n ---> (Name / Token)").lower()
if option == "name":
new_client_name = input("Enter New Client Name\n ---> (STRING)").lower()
if new_client_name in self.json_data.keys():
print(f"Client Name {new_client_name} already exists!")
else:
self.json_data[new_client_name] = self.json_data.pop(client_name)
print(f"Client Name {client_name} has changed to {new_client_name}")
elif option == "token":
self.json_data[client_name] = self.generate_token(client_name)
else:
print("Enter Valid Option!")
else:
print("Enter Valid Client Name!")
def delete(self, client_name: str):
if client_name in self.json_data.keys():
check = input("Delete Client?\n ---> (YES / NO)").lower()
if check == "yes":
del self.json_data[client_name]
print("Client Name {client_name} has been deleted!")
else:
print("Nothing Done")
else:
print("Enter Valid Client Name!")
if __name__ == "__main__":
atm = AccessTokenManager()
while True:
command = input("1. Create (C)\n2. Read (R)\n3. Update (U)\n4. Delete (D)\n5. Save (S)\n6. Init (I)\n7. Exit (E)\n ---> (COMMAND)").lower()
if command == "c":
client_name = input("Enter Client Name\n ---> (STRING)").lower()
atm.create(client_name)
elif command == "r":
option = input("Enter Read Option\n ---> (ALL / Client Name)").lower()
atm.read(option)
elif command == "u":
client_name = input("Enter Client Name\n ---> (STRING)").lower()
atm.update(client_name)
elif command == "d":
client_name = input("Enter Client Name\n ---> (STRING)").lower()
atm.delete(client_name)
elif command == "s":
atm.save()
elif command == "i":
atm.load()
elif command == "e":
print("Bye Bye")
break
else:
print("Enter Valid Command!")
- 다음과 같이
access_token_manager.py
를 실행한 후 voiceprint의 Access Token을 등록한다.
$(torchserve) python access_token_manager.py
1. Create (C)
2. Read (R)
3. Update (U)
4. Delete (D)
5. Save (S)
6. Init (I)
7. Exit (E)
---> (COMMAND)c
Enter Client Name
---> (STRING)voiceprint
- 그럼
access_token.json
이 생성되며 다음과 같이 voiceprint의 Access Token이 등록된 것을 확인할 수 있다.
{ "voiceprint": "$2b$12$sUlMELV0B9Uj.UyvaE5Yq.c6um9hcLqdfWQTy1sNiirVCQZ8sI2ZG" }
3. Classifier 수정
- 다음과 같이
classifier.py
를 수정한다.
from flask import Flask, request
import requests
import json
import os
app = Flask(__name__)
class Classifier:
def __init__(self):
self.project_dir = os.path.dirname(os.path.abspath(__file__))
self.json_name = "access_token.json"
self.json_path = os.path.join(self.project_dir, self.json_name)
self.PID = os.getpid()
print(f"PID: {self.PID} / Loaded Classifier!")
if not os.path.isfile(self.json_path):
print(f"PID: {self.PID} / {self.json_path} does not exist!")
self.load()
def load(self):
with open(self.json_path, "r") as json_file:
self.json_data = json.load(json_file)
print(f"PID: {self.PID} / JSON file loaded from {self.json_path}")
def request_text(URL: str, body: dict):
response = requests.post(URL, data=json.dumps(body)).json()
return response
classifier = Classifier()
access_tokens = classifier.json_data.values()
@app.route("/", methods=["POST"])
def main():
URL = "http://127.0.0.1:9999/predictions/test"
headers = request.headers
body = request.get_json(force=True)
print(body)
print(headers)
print(body)
if headers["Authorization"] in access_tokens:
src_lang = body["src_lang"]
src_text = body["src_text"]
tgt_lang = body["tgt_lang"]
translated = request_text(URL, body)
print(f"\nPID: {classifier.PID} \ URL: {URL}\nSrc_Lang: {src_lang} \ Src_text: {src_text}\nTgt_Lang: {tgt_lang} \ Translated: {translated['translated']}")
return translated
else:
return "Access Token Error!"
if __name__ == "__main__":
app.run(host="0.0.0.0", port="80", debug=True)
$(base) json='{"src_lang": "ko", "src_text": "안녕", "tgt_lang": "en"}' && curl -X POST http://14.138.218.153:9002 -H 'Authorization: $2b$12$sUlMELV0B9Uj.UyvaE5Yq.c6um9hcLqdfWQTy1sNiirVCQZ8sI2ZG' -H 'Content-type: application/json' -d "$json"
{"translated":"\"What's up?\""}
4. Streamlit 수정
- 만약 클라이언트로 Streamlit을 사용하는 경우 requests를 보낼 때 headers를 추가해야 한다.
import os, sys, json, time
import requests
import streamlit as st
import pandas as pd
from multiprocessing import Process, Queue
from torchtext.data import get_tokenizer
from torchtext.data.metrics import bleu_score
def request_text(idx: int, GMT: str, URL: str, ACCESS_TOKEN: str, src_lang: str, src_text: str, tgt_lang: str, result_queue: Queue):
headers = {"Authorization": ACCESS_TOKEN}
body = {"src_lang": src_lang, "src_text": src_text, "tgt_lang": tgt_lang}
response = requests.post(URL, data=json.dumps(body), headers=headers)
result = response.json()["translated"]
print(f"\nIndex: {idx} / GMT: {GMT}\nSrc_Lang: {src_lang} / Src_Text: {src_text}\nTgt_lang: {tgt_lang} / Translated: {result}\n")
result_queue.put((idx, result))
def main():
# st.set_page_config(layout="wide")
URL = "http://127.0.0.1:9002"
ACCESS_TOKEN = "$2b$12$sUlMELV0B9Uj.UyvaE5Yq.c6um9hcLqdfWQTy1sNiirVCQZ8sI2ZG"
lang_codes = {
"Korean": "ko",
"English": "en",
"Chinese": "zh",
"Spanish": "es",
"French": "fr",
"Rusian": "ru",
"Indonesian": "id",
"Japanese": "ja",
"Vietnamese": "vi",
"German": "de",
"Italian": "it",
"Hindi": "hi",
"Arabic": "ar",
"Portuguese": "pt",
}
st.title("Many to Many Translator")
language_option = st.selectbox("Choose a Source Language", list(lang_codes.keys()))
if language_option:
src_text = st.text_area(
"Enter text", value="", height=None, max_chars=None, key=None
)
ref_text = ""
if language_option == "Korean" or language_option == "English":
bleu_button = st.checkbox("Check BLEU Score")
if bleu_button:
ref_text = st.text_area("Enter reference:", value="", height=None, max_chars=None, key=None)
col1, col2, col3, col4, col5 = st.columns(5)
if col5.button("Translate text"):
if src_text == "":
st.write("Please enter text for tanslation")
else:
with st.spinner("Running translation..."):
start = time.time()
src_lang = lang_codes.pop(language_option)
GMT = time.ctime(time.time())
print(f"GMT: {GMT}\nSource Language: {language_option}\n")
result_queue = Queue()
idx_list, translated_list, bleu_list = [], [], []
procs = []
procs = [Process(target=request_text, args=(idx, GMT, URL, ACCESS_TOKEN, src_lang, src_text, lang_code, result_queue)) for idx, (lang, lang_code) in enumerate(lang_codes.items())]
for proc in procs:
proc.start()
for proc in procs:
proc.join()
while not result_queue.empty():
idx, translated = result_queue.get()
idx_list.append(idx)
translated_list.append(translated)
translated_list = list(pd.DataFrame({"idx": idx_list, "Translated": translated_list}).sort_values(by=["idx"], axis=0)["Translated"])
if len(ref_text) > 0:
tokenizer = get_tokenizer("basic_english")
reference_corpus = [[tokenizer(ref_text)]]
translated_corpus = [tokenizer(translated_list[0])]
bleu_list.append(str(bleu_score(translated_corpus, reference_corpus, max_n=1, weights=[1.0])))
for _ in range(len(lang_codes.keys()) - 1):
bleu_list.append("")
print(translated_list)
df = pd.DataFrame({"Language": list(lang_codes.keys()), "Translated Text": translated_list, "BLEU Score": bleu_list})
else:
print(translated_list)
df = pd.DataFrame({"Language": list(lang_codes.keys()), "Translated Text": translated_list})
st.table(df)
end = time.time()
col1, col2, col3, col4, col5 = st.columns(5)
col5.write(f"{end - start:.5f} sec.")
if __name__ == "__main__":
main()