2. Nginx + TorchServe
1. Mission
2. TorchServe
1) Dependencies 설치
- 다음과 같이 Dependencies를 설치한다.
$(torchserve) pip install transformers
2) 모델 준비
.
└── voiceprint
└── m2m100_418M
├── config.json
├── pytorch_model.bin
├── sentencepiece.bpe.model
├── special_tokens_map.json
├── tokenizer_config.json
└── vocab.json
3) test.py
작성
- 다음과 같이 Python 스크립트를 작성한다.
import ast
import logging
import os, time
import torch
from abc import ABC
from transformers import M2M100ForConditionalGeneration, AutoTokenizer
from ts.torch_handler.base_handler import BaseHandler
torch.set_grad_enabled(False)
torch.set_num_threads(4)
logger = logging.getLogger(__name__)
class InferenceHandler(BaseHandler, ABC):
def __init__(self):
super(InferenceHandler, self).__init__()
self.initialized = False
def initialize(self, ctx):
self.PID = os.getpid()
self.manifest = ctx.manifest
properties = ctx.system_properties
model_dir = properties.get("model_dir")
self.device = "cpu"
self.model = M2M100ForConditionalGeneration.from_pretrained(model_dir)
self.model.to(self.device)
self.tokenizer = AutoTokenizer.from_pretrained(model_dir)
self.quantized_model = torch.quantization.quantize_dynamic(
self.model, {torch.nn.Linear}, dtype=torch.qint8
)
del self.model
logger.info(f"PID: {self.PID} / Transformer model from path: {model_dir} loaded successfully")
self.initialized = True
def preprocess(self, data):
logger.info(f"PID: {self.PID} / Received Data: {data}")
return data
def inference(self, data):
data = data[0]
p = data.get("data")
if p is None:
p = data.get("body")
if isinstance(p, (bytes, bytearray)):
p = ast.literal_eval(p.decode("utf-8"))
src_lang = p["src_lang"]
src_text = p["src_text"]
tgt_lang = p["tgt_lang"]
logger.info(f"PID: {self.PID} / Received Src_Lang: {src_lang}")
logger.info(f"PID: {self.PID} / Received Src_Text: {src_text}")
logger.info(f"PID: {self.PID} / Received Tgt_Lang: {tgt_lang}")
self.tokenizer.src_lang = src_lang
encoded_text = self.tokenizer(src_text, return_tensors="pt").to(self.device)
generated_tokens = self.quantized_model.generate(
**encoded_text,
forced_bos_token_id=self.tokenizer.get_lang_id(tgt_lang),
num_beams=2,
)
translated = str(
self.tokenizer.batch_decode(generated_tokens, skip_special_tokens=True)
).strip("]['")
logger.info(f"PID: {self.PID} / Translated: {translated}")
return translated
def postprocess(self, data):
return data
def handle(self, data, context):
start = time.time()
self.context = context
data = self.preprocess(data)
data = self.inference(data)
translated = self.postprocess(data)
end = time.time()
return [{"translated": translated}]
4) config.properties
작성
- 다음과 같이 모델 서버의 구성을 지정할 수 있다.
# Configure TorchServe listening address and port
inference_address=http://127.0.0.1:9999
management_address=http://127.0.0.1:8081
metrics_address=http://127.0.0.1:8082
# Limit GPU usage
number_of_gpu=0
# Load models at startup
model_store=./model_store
#load_models=en.mar
# Other properties
# Worker / Capacity
default_workers_per_model=1
job_queue_size=4096
default_response_timeout=86400
5) 자동 실행 코드 작성
- 다음과 같이
torchserve.sh
를 작성하여 간단하게 실행할 수 있도록 한다.
#!/bin/bash
model_dir=../model/voiceprint/m2m100_418M
model_name=test
model_ver=1.0
handler_file=./test.py
if [ "$1" = "new" ]; then
if [ -f ./model_dirlist.txt ]; then
rm ./model_dirlist.txt
fi
echo | ls $model_dir > ./model_dirlist.txt
readarray files < ./model_dirlist.txt
if [ -f ./model_dirlist.txt ]; then
rm ./model_dirlist.txt
fi
files_len=${#files[@]}
for ids in ${!files[@]}
do
if [[ "${files[$ids]}" =~ ".bin" ]]; then
model_file=${files[$ids]}
files_len=$((files_len-1))
else
file_name=${files[$ids]}
if [ $ids = $files_len ]; then
extra_files_args=$extra_files_args$model_dir/$file_name
else
extra_files_args=$extra_files_args$model_dir/$file_name,
fi
fi
done
extra_files_args=`echo $extra_files_args | tr -d ' '`
if [ -d ./logs ]; then
rm -rf ./logs
echo Directory ./logs Removed!!!
fi
if [ -d ./model_store ]; then
rm -rf ./model_store
echo Directory ./model_store Removed!!!
fi
echo Torch Model Archiver Start...
if [ -f ./$model_name.mar ]; then
rm ./$model_name.mar
fi
time torch-model-archiver --model-name $model_name --version $model_ver --serialized-file $model_dir/$model_file --handler $handler_file --extra-files "$extra_files_args"
if [ ! -d ./model_store ]; then
mkdir ./model_store
echo Directory ./model_store Made!!!
fi
mv ./$model_name.mar ./model_store
echo ./$model_name.mar Moved to ./model_store
fi
if [ -f ./model_store/$model_name.mar ]; then
torchserve --start --model-store ./model_store --ts-config ./config.properties --models $model_name=$model_name.mar --ncs
fi
6) TorchServe 실행
- 다음과 같이 실행하면
.mar
파일이 만들어진 후 TorchServe
가 시작된다.
$(torchserve) bash torchserve.sh new
7) TorchServe 테스트
- 먼저 다음과 같이
test.txt
를 작성한다.
$(torchserve) echo '{"src_lang": "ko", "src_text": "오늘 하루도 하나님의 은총과 평안과 기쁨과 축복과 행복이 가정과 일터에 가득하길 소망합니다.", "tgt_lang": "en"}' > ./test.txt
curl
명령어를 통해 추론을 테스트할 수 있다.
$(torchserve) curl -X POST http://127.0.0.1:9999/predictions/test -T test.txt
{
"translated": "\"I hope that God's grace, peace, joy, blessing, and happiness will be filled in our homes and workplaces even today.\""
}
3. Nginx
1) nginx.conf
작성
$(torchserve) sudo vim /etc/nginx/nginx.conf
user www-data;
worker_processes 10; # 동시에 Requests 받을 수 있는 수 = worker_processes * worker_connections
pid /run/nginx.pid;
include /etc/nginx/modules-enabled/*.conf;
# worker_rlimit_nofile 2048;
events {
worker_connections 4096; # Reverse Proxy이므로 2배 설정
multi_accept on;
use epoll;
}
http {
##
# Basic Settings
##
sendfile on;
tcp_nopush on;
tcp_nodelay on;
##
# Server Settings
##
#upstream gunicorns {
# server 127.0.0.1:8888 max_fails=3 fail_timeout=1s;
#}
#upstream torchserves {
# server 127.0.0.1:9999 max_fails=3 fail_timeout=1s;
#}
server {
listen 7777;
server_name localhost;
location /gunicorn {
proxy_pass http://127.0.0.1:8888/$1;
}
location /torchserve {
proxy_pass http://127.0.0.1:9999/predictions/test/$1;
}
error_page 500 502 503 504 /50x.html;
location = /50x.html {
root /usr/share/nginx/html;
}
}
##
# Timeout Settings
##
# keepalive_timeout 86400; # Connection 시간
# keepalive_reqeusts 100 # Connection 횟수
client_body_timeout 86400; # Client로부터 request body를 받는 시간
send_timeout 86400; # Client에게 response body를 보내는 시간
reset_timedout_connection on; # Timeout으로 Connection이 끊긴 경우 리셋
proxy_connect_timeout 86400; # Proxy 서버와 연결되는 시간 (75초를 초과할 수 없음)
proxy_send_timeout 86400; # Proxy 서버에 요청을 보내는 시간
proxy_read_timeout 86400; # Proxy 서버에서 응답을 받는 시간
types_hash_max_size 2048;
# server_tokens off;
# server_names_hash_bucket_size 64;
# server_name_in_redireict off;
include /etc/nginx/mime.types;
default_type application/octet-stream;
##
# SSL Settings
##
ssl_protocols TLSv1 TLSv1.1 TLSv1.2 TLSv1.3; # Dropping SSLv3, ref: POODLE
ssl_prefer_server_ciphers on;
##
# Logging Settings
##
access_log /var/log/nginx/access.log;
error_log /var/log/nginx/error.log;
##
# Gzip Settings
##
gzip on;
# gzip_vary on;
# gzip_proxied any;
# gzip_comp_level 6;
# gzip_buffers 16 8k;
# gzip_http_version 1.1;
# gzip_types text/plain text/css application/json application/javascript text/xml application/xml application/xml+rss text/javascript;
##
# Virtual Host Configs
##
# include /etc/nginx/conf.d/*.conf;
include /etc/nginx/sites-enabled/*;
}
#mail {
# # See sample authentication script at:
# # http://wiki.nginx.org/ImapAuthenticateWithApachePhpScript
#
# # auth_http localhost/auth.php;
# # pop3_capabilities "TOP" "USER";
# # imap_capabilities "IMAP4rev1" "UIDPLUS";
#
# server {
# listen localhost:110;
# protocol pop3;
# proxy on;
# }
#
# server {
# listen localhost:143;
# protocol imap;
# proxy on;
# }
#}
2) Nginx 실행
- 구성을 변경한 후 다음과 같이 Nginx를 실행한다.
$(torchserve) sudo service nginx restart
3) Nginx 상태 확인
- 다음과 같이 Nginx가 정상적으로 작동하는지 확인한다.
$(torchserve) sudo systemctl status nginx
4. Nginx + TorchServe 테스트
1) client.py
작성
- 다음과 같이 Python 스크립트를 작성한다.
import sys
import requests
import json
import time
import pandas as pd
from multiprocessing import Process, Queue
def request_text(idx: int, URL: str, src_lang: str, src_text: str, tgt_lang: str, result_queue: Queue):
start = time.time()
data = {"src_lang": src_lang, "src_text": src_text, "tgt_lang": tgt_lang}
response = requests.post(URL, data=json.dumps(data))
result = response.json()["translated"]
end = time.time()
print(f"\nIndex: {idx} Time: {end - start} sec.\nSrc_Lang: {src_lang} / Src_Text: {src_text}\nTgt_lang: {tgt_lang} / Translated: {result}\n")
result_queue.put((idx, result))
def main(URL):
start = time.time()
lang_codes = {"Korean": "ko", "English": "en", "Chinese": "zh", "Spanish": "es", "French": "fr", "Rusian": "ru", "Indonesian": "id", "Japanese": "ja", "Vietnamese": "vi", "German": "de", "Italian": "it", "Hindi": "hi", "Arabic": "ar", "Portuguese": "pt"}
src_lang = "Korean"
src_text = "오늘 하루도 하나님의 은총과 평안과 기쁨과 축복과 행복이 가정과 일터에 가득하길 소망합니다."
src_lang = lang_codes.pop(src_lang)
result_queue = Queue()
idx_list, translated_list, bleu_list = [], [], []
procs = [Process(target=request_text, args=(idx, URL, src_lang, src_text, lang_code, result_queue))for idx, (lang, lang_code) in enumerate(lang_codes.items())]
for proc in procs:
proc.start()
for proc in procs:
proc.join()
while not result_queue.empty():
idx, translated = result_queue.get()
idx_list.append(idx)
translated_list.append(translated)
translated_list = list(pd.DataFrame({"idx": idx_list, "Translated": translated_list}).sort_values(by=["idx"], axis=0)["Translated"])
end = time.time()
print(f"Time: {end - start}")
if __name__ == "__main__":
argv = sys.argv
URL = "http://127.0.0.1:7777/"
if len(argv) == 2:
arg = argv[1]
if arg == "gunicorn":
URL = URL + arg
elif arg == "torchserve":
URL = URL + arg
else:
print(f"\nWrong Server Name...\n")
sys.exit()
else:
print(f"\nEnter Server Name...\n")
sys.exit()
main(URL)
2) client.py
실행
- 다음과 같이 Python 스크립트를 실행한다.
$(torchserve) python client.py torchserve