1. Translation Ko-En with Papago Translation API
1. Papago 번역 API 불러오기
- Papago 번역 API를 불러와 사용하려면 다음의 링크를 따라 등록해야 한다.
https://developers.naver.com/main/
- 그리고 다음과 같이 Papago 번역 API에 대한 클라이언트 ID와 클라이언트 Secret을 발급받는다.
- 다음은 네이버에서 제공하는 Papago 번역 API에 대한 Python 구현 예제이다.
import os
import sys
import urllib.request
client_id = "YOUR_CLIENT_ID" # 개발자센터에서 발급받은 Client ID 값
client_secret = "YOUR_CLIENT_SECRET" # 개발자센터에서 발급받은 Client Secret 값
encText = urllib.parse.quote("반갑습니다")
data = "source=ko&target=en&text=" + encText
url = "https://openapi.naver.com/v1/papago/n2mt"
request = urllib.request.Request(url)
request.add_header("X-Naver-Client-Id",client_id)
request.add_header("X-Naver-Client-Secret",client_secret)
response = urllib.request.urlopen(request, data=data.encode("utf-8"))
rescode = response.getcode()
if(rescode==200):
response_body = response.read()
print(response_body.decode('utf-8'))
else:
print("Error Code:" + rescode)
- 위의 구현 예제를 용도에 맞게 수정해야 한다.
2. 디렉터리 구조
- 만들고자 하는 프로젝트의 최종적인 디렉터리 구조는 다음과 같다.
1] Original_Data 디렉터리
- json이나 csv로 작성되어 있는 한국어 파일이 들어 있다.
2] Processed_Data 디렉터리
- 한국어 파일은 Papago 번역 API를 거친 후 이곳으로 옮겨지게 된다.
3] Translated_Data 디렉터리
- 영어로 번역된 파일이 이곳으로 저장된다.
4] api_crawler.py
- Multi-Processing을 통해 Papago 번역 API를 불러올 메인 스크립트 파일
5] utils.py
- 데이터를 로드하거나 저장하는 등의 부수적인 기능을 하는 스크립트 파일
3. utils.py
1) 라이브러리
2) _make_directory()
3) load_data()
def load_data(path: str):
extension = os.path.splitext(path)[-1][1:]
if extension == "json":
data = pd.read_json(path)
elif extension == "csv":
data = pd.read_csv(path)
else:
print(f"File Extension Error: {extension}")
data = None
return data, extension
4) save_csv()
def save_csv(path: str, obj: dict, index=False):
try:
obj.to_csv(path, index=index, encoding="utf-8-sig")
message = f"\n{'csv saved:':<15} {path}"
except Exception as e:
message = f"\n{'Failed to save:':<15} {e}"
print(message)
return message
5) check_default_directory()
def check_default_directory(directory: str):
name_list = ["Original_Data", "Processed_Data", "Translated_Data"]
original_dir, processed_dir, translated_dir = list(
map(lambda name: os.path.join(directory, name), name_list)
)
if original_dir.split("/")[-1] not in os.listdir(directory):
_make_directory(original_dir)
if processed_dir.split("/")[-1] not in os.listdir(directory):
_make_directory(processed_dir)
if translated_dir.split("/")[-1] not in os.listdir(directory):
_make_directory(translated_dir)
return original_dir, processed_dir, translated_dir
6) check_data()
def check_data(directory: str):
data_path_list = []
for path, direct, files in os.walk(directory):
data_list = [
data
for data in files
if os.path.splitext(data)[-1][1:] == "json"
or os.path.splitext(data)[-1][1:] == "csv"
]
data_path = [os.path.join(path, data) for data in data_list]
data_path_list.extend(data_path)
print(f"\n{'Data Directory:':<15} {directory}\n")
for data in data_path_list:
print(data)
return data_path_list
7) move_file()
def move_file(source_path: str, target_path: str):
try:
shutil.move(source_path, target_path)
message = f"\n{'file moved:':<15} {target_path}"
except Exception as e:
message = f"\n{'Failed to save:':<15} {e}"
print(message)
return message
8) get_batch()
def get_batch(text_list: list, cores_num: int):
div, mod = divmod(len(text_list), cores_num)
batch = [None] * cores_num
if mod > 0:
start, end = 0, div + 1
mod -= 1
else:
start, end = 0, div
for i in range(cores_num):
batch[i] = text_list[start:end]
start = end
if mod > 0:
end += div + 1
mod -= 1
else:
end += div
return batch
9) data_parser()
def data_parser(data: dict, data_extension: str):
# 항상 데이터는 [(index, text), (index, text), (index, text), ... (index, text)]로 만들어 줘야 함
if data_extension == "json":
text_list = []
idx = 0
for i in range(len(data)):
temp_list = []
for j in range(len(data["data"][i]["body"]["dialogue"])):
temp_list.append(
(idx, data["data"][i]["body"]["dialogue"][j]["utterance"])
)
idx += 1
text_list.extend(temp_list)
elif data_extension == "csv":
text_list = [(i, data["한국어"][i]) for i in range(len(data))]
else:
text_list = None
return text_list
10) 전체 코드
import os, shutil
import pandas as pd
def _make_directory(directory: str):
if not os.path.isdir(directory):
os.makedirs(directory)
def load_data(path: str):
extension = os.path.splitext(path)[-1][1:]
if extension == "json":
data = pd.read_json(path)
elif extension == "csv":
data = pd.read_csv(path)
else:
print(f"File Extension Error: {extension}")
data = None
return data, extension
def save_csv(path: str, obj: dict, index=False):
try:
obj.to_csv(path, index=index, encoding="utf-8-sig")
message = f"\n{'csv saved:':<15} {path}"
except Exception as e:
message = f"\n{'Failed to save:':<15} {e}"
print(message)
return message
def check_default_directory(directory: str):
name_list = ["Original_Data", "Processed_Data", "Translated_Data"]
original_dir, processed_dir, translated_dir = list(
map(lambda name: os.path.join(directory, name), name_list)
)
if original_dir.split("/")[-1] not in os.listdir(directory):
_make_directory(original_dir)
if processed_dir.split("/")[-1] not in os.listdir(directory):
_make_directory(processed_dir)
if translated_dir.split("/")[-1] not in os.listdir(directory):
_make_directory(translated_dir)
return original_dir, processed_dir, translated_dir
def check_data(directory: str):
data_path_list = []
for path, direct, files in os.walk(directory):
data_list = [
data
for data in files
if os.path.splitext(data)[-1][1:] == "json"
or os.path.splitext(data)[-1][1:] == "csv"
]
data_path = [os.path.join(path, data) for data in data_list]
data_path_list.extend(data_path)
print(f"\n{'Data Directory:':<15} {directory}\n")
for data in data_path_list:
print(data)
return data_path_list
def move_file(source_path: str, target_path: str):
try:
shutil.move(source_path, target_path)
message = f"\n{'file moved:':<15} {target_path}"
except Exception as e:
message = f"\n{'Failed to save:':<15} {e}"
print(message)
return message
def get_batch(text_list: list, cores_num: int):
div, mod = divmod(len(text_list), cores_num)
batch = [None] * cores_num
if mod > 0:
start, end = 0, div + 1
mod -= 1
else:
start, end = 0, div
for i in range(cores_num):
batch[i] = text_list[start:end]
start = end
if mod > 0:
end += div + 1
mod -= 1
else:
end += div
return batch
def data_parser(data: dict, data_extension: str):
# 항상 데이터는 [(index, text), (index, text), (index, text), ... (index, text)]로 만들어 줘야 함
if data_extension == "json":
text_list = []
idx = 0
for i in range(len(data)):
temp_list = []
for j in range(len(data["data"][i]["body"]["dialogue"])):
temp_list.append(
(idx, data["data"][i]["body"]["dialogue"][j]["utterance"])
)
idx += 1
text_list.extend(temp_list)
elif data_extension == "csv":
text_list = [(i, data["한국어"][i]) for i in range(len(data))]
else:
text_list = None
return text_list
4. api_crawler.py
1) 라이브러리
import utils
import os, time, re
import pandas as pd
import urllib.request
from multiprocessing import Process, Queue, cpu_count
2) process_info()
def process_info():
print(f"{'Module Name:':<15} {__name__}")
print(f"{'Parent PID:':<15} {os.getppid()}")
print(f"{'Current PID:':<15} {os.getpid()}")
3) papago_api()
def papago_api(cores_num: int, batch: list, result_queue: object):
print(f"\n{'API / Num:':<15} Papago / {cores_num + 1}")
process_info()
client_id = "발급받은 클라이언트 ID"
client_secret = "발급받은 클라이언트 SECRET"
url = "https://openapi.naver.com/v1/papago/n2mt"
request = urllib.request.Request(url)
request.add_header("X-Naver-Client-Id", client_id)
request.add_header("X-Naver-Client-Secret", client_secret)
for i in range(len(batch)):
try:
papago_data = f"source=ko&target=en&text={urllib.parse.quote(batch[i][1])}"
papago_response = urllib.request.urlopen(
request, data=papago_data.encode("utf-8")
)
papago_response_body = papago_response.read().decode("utf-8")
start_p, end_p = re.compile("translatedText"), re.compile("engineType")
start_idx = start_p.search(papago_response_body).span()[1] + 3
end_idx = end_p.search(papago_response_body).span()[0] - 3
result_queue.put((batch[i][0], papago_response_body[start_idx:end_idx]))
except:
result_queue.put((batch[i][0], "( - )"))
4) main()
if __name__ == "__main__":
start_total_real_time, start_total_proc_time = time.time(), time.process_time()
# Set Directory
PROJECT_DIR = os.path.dirname(os.path.abspath(__file__))
ORIGINAL_DIR, PROCESSED_DIR, TRANSLATED_DIR = utils.check_default_directory(
PROJECT_DIR
)
DATA_PATH_LIST = utils.check_data(ORIGINAL_DIR)
for i in range(len(DATA_PATH_LIST)):
start_real_time, start_proc_time = time.time(), time.process_time()
task_name = DATA_PATH_LIST[i]
data, data_extension = utils.load_data(task_name)
if data is not None:
# Set Original Text List
file_name = os.path.split(task_name)[-1]
text_list = utils.data_parser(data, data_extension)[:32] # 테스트 후 슬라이싱 제거
papago_dict = {}
# Set Multi-Processing
result_queue = Queue()
cores_num = cpu_count()
batch = utils.get_batch(text_list, cores_num)
text_list = [e[1] for e in text_list]
print(f"\n{'Data Num:':<15} {i + 1} of {len(DATA_PATH_LIST)}")
print(f"{'Data path:':<15} {task_name}")
print(f"{'Num of Cores:':<15} {cores_num}")
# Papago Multi-Processing
procs = list(
map(
lambda cores_num: Process(
target=papago_api,
args=(cores_num, batch[cores_num], result_queue),
),
[i for i in range(cores_num)],
)
)
[proc.start() for proc in procs]
[proc.join() for proc in procs]
while not result_queue.empty():
result = result_queue.get()
papago_dict[result[0]] = result[1]
papago_list = [e[1] for e in sorted(papago_dict.items())]
print(f"\nPapago Finished")
# Make DataFrame
df = pd.DataFrame({"Korean": text_list, "Papago": papago_list})
# Save CSV
target_path = os.path.join(
TRANSLATED_DIR, os.path.splitext(file_name)[0] + ".csv"
)
utils.save_csv(target_path, df)
# Move File
source_path = os.path.join(task_name)
target_path = os.path.join(PROCESSED_DIR, file_name)
utils.move_file(source_path, target_path)
end_real_time, end_proc_time = time.time(), time.process_time()
print(f"\nData {i + 1} of {len(DATA_PATH_LIST)} Finished")
print(f"{'Real Time:':<15} {end_real_time - start_real_time:.2f} sec.")
print(f"{'Proc Time:':<15} {end_proc_time - start_proc_time:.2f} sec.")
else:
print(f"\nData {i + 1} of {len(DATA_PATH_LIST)} is None\n")
end_total_real_time, end_total_proc_time = time.time(), time.process_time()
print(
f"\n{'Total Real:':<15} {end_total_real_time - start_total_real_time:.2f} sec."
)
print(f"{'Total Proc:':<15} {end_total_proc_time - start_total_proc_time:.2f} sec.")
5) 전체 코드
import utils
import os, time, re
import pandas as pd
import urllib.request
from multiprocessing import Process, Queue, cpu_count
def process_info():
print(f"{'Module Name:':<15} {__name__}")
print(f"{'Parent PID:':<15} {os.getppid()}")
print(f"{'Current PID:':<15} {os.getpid()}")
def papago_api(cores_num: int, batch: list, result_queue: object):
print(f"\n{'API / Num:':<15} Papago / {cores_num + 1}")
process_info()
client_id = "할당받은 클라이언트 ID"
client_secret = "할당받은 클라이언트 SECRET"
url = "https://openapi.naver.com/v1/papago/n2mt"
request = urllib.request.Request(url)
request.add_header("X-Naver-Client-Id", client_id)
request.add_header("X-Naver-Client-Secret", client_secret)
for i in range(len(batch)):
try:
papago_data = f"source=ko&target=en&text={urllib.parse.quote(batch[i][1])}"
papago_response = urllib.request.urlopen(
request, data=papago_data.encode("utf-8")
)
papago_response_body = papago_response.read().decode("utf-8")
start_p, end_p = re.compile("translatedText"), re.compile("engineType")
start_idx = start_p.search(papago_response_body).span()[1] + 3
end_idx = end_p.search(papago_response_body).span()[0] - 3
result_queue.put((batch[i][0], papago_response_body[start_idx:end_idx]))
except:
result_queue.put((batch[i][0], "( - )"))
if __name__ == "__main__":
start_total_real_time, start_total_proc_time = time.time(), time.process_time()
# Set Directory
PROJECT_DIR = os.path.dirname(os.path.abspath(__file__))
ORIGINAL_DIR, PROCESSED_DIR, TRANSLATED_DIR = utils.check_default_directory(
PROJECT_DIR
)
DATA_PATH_LIST = utils.check_data(ORIGINAL_DIR)
for i in range(len(DATA_PATH_LIST)):
start_real_time, start_proc_time = time.time(), time.process_time()
task_name = DATA_PATH_LIST[i]
data, data_extension = utils.load_data(task_name)
if data is not None:
# Set Original Text List
file_name = os.path.split(task_name)[-1]
text_list = utils.data_parser(data, data_extension)[:32] # 테스트 후 슬라이싱 제거
papago_dict = {}
# Set Multi-Processing
result_queue = Queue()
cores_num = cpu_count()
batch = utils.get_batch(text_list, cores_num)
text_list = [e[1] for e in text_list]
print(f"\n{'Data Num:':<15} {i + 1} of {len(DATA_PATH_LIST)}")
print(f"{'Data path:':<15} {task_name}")
print(f"{'Num of Cores:':<15} {cores_num}")
# Papago Multi-Processing
procs = list(
map(
lambda cores_num: Process(
target=papago_api,
args=(cores_num, batch[cores_num], result_queue),
),
[i for i in range(cores_num)],
)
)
[proc.start() for proc in procs]
[proc.join() for proc in procs]
while not result_queue.empty():
result = result_queue.get()
papago_dict[result[0]] = result[1]
papago_list = [e[1] for e in sorted(papago_dict.items())]
print(f"\nPapago Finished")
# Make DataFrame
df = pd.DataFrame({"Korean": text_list, "Papago": papago_list})
# Save CSV
target_path = os.path.join(
TRANSLATED_DIR, os.path.splitext(file_name)[0] + ".csv"
)
utils.save_csv(target_path, df)
# Move File
source_path = os.path.join(task_name)
target_path = os.path.join(PROCESSED_DIR, file_name)
utils.move_file(source_path, target_path)
end_real_time, end_proc_time = time.time(), time.process_time()
print(f"\nData {i + 1} of {len(DATA_PATH_LIST)} Finished")
print(f"{'Real Time:':<15} {end_real_time - start_real_time:.2f} sec.")
print(f"{'Proc Time:':<15} {end_proc_time - start_proc_time:.2f} sec.")
else:
print(f"\nData {i + 1} of {len(DATA_PATH_LIST)} is None\n")
end_total_real_time, end_total_proc_time = time.time(), time.process_time()
print(
f"\n{'Total Real:':<15} {end_total_real_time - start_total_real_time:.2f} sec."
)
print(f"{'Total Proc:':<15} {end_total_proc_time - start_total_proc_time:.2f} sec.")