Skip to content

1. Translation Ko-En with Papago Translation API


1. Papago 번역 API 불러오기

  • Papago 번역 API를 불러와 사용하려면 다음의 링크를 따라 등록해야 한다.


https://developers.naver.com/main/


  • 그리고 다음과 같이 Papago 번역 API에 대한 클라이언트 ID와 클라이언트 Secret을 발급받는다.


001


  • 다음은 네이버에서 제공하는 Papago 번역 API에 대한 Python 구현 예제이다.


import os
import sys
import urllib.request

client_id = "YOUR_CLIENT_ID" # 개발자센터에서 발급받은 Client ID 값
client_secret = "YOUR_CLIENT_SECRET" # 개발자센터에서 발급받은 Client Secret 값
encText = urllib.parse.quote("반갑습니다")
data = "source=ko&target=en&text=" + encText
url = "https://openapi.naver.com/v1/papago/n2mt"

request = urllib.request.Request(url)
request.add_header("X-Naver-Client-Id",client_id)
request.add_header("X-Naver-Client-Secret",client_secret)
response = urllib.request.urlopen(request, data=data.encode("utf-8"))
rescode = response.getcode()

if(rescode==200):
    response_body = response.read()
    print(response_body.decode('utf-8'))
else:
    print("Error Code:" + rescode)


  • 위의 구현 예제를 용도에 맞게 수정해야 한다.


2. 디렉터리 구조

.
├── Original_Data
├── Processed_Data
├── Translated_Data
├── api_crawler.py
└── utils.py


  • 만들고자 하는 프로젝트의 최종적인 디렉터리 구조는 다음과 같다.


1] Original_Data 디렉터리

  • json이나 csv로 작성되어 있는 한국어 파일이 들어 있다.

2] Processed_Data 디렉터리

  • 한국어 파일은 Papago 번역 API를 거친 후 이곳으로 옮겨지게 된다.

3] Translated_Data 디렉터리

  • 영어로 번역된 파일이 이곳으로 저장된다.

4] api_crawler.py

  • Multi-Processing을 통해 Papago 번역 API를 불러올 메인 스크립트 파일

5] utils.py

  • 데이터를 로드하거나 저장하는 등의 부수적인 기능을 하는 스크립트 파일


3. utils.py

1) 라이브러리

import os, shutil
import pandas as pd


2) _make_directory()

def _make_directory(directory: str):
    if not os.path.isdir(directory):
        os.makedirs(directory)


3) load_data()

def load_data(path: str):
    extension = os.path.splitext(path)[-1][1:]

    if extension == "json":
        data = pd.read_json(path)
    elif extension == "csv":
        data = pd.read_csv(path)
    else:
        print(f"File Extension Error: {extension}")
        data = None

    return data, extension


4) save_csv()

def save_csv(path: str, obj: dict, index=False):
    try:
        obj.to_csv(path, index=index, encoding="utf-8-sig")
        message = f"\n{'csv saved:':<15} {path}"
    except Exception as e:
        message = f"\n{'Failed to save:':<15} {e}"
    print(message)

    return message


5) check_default_directory()

def check_default_directory(directory: str):
    name_list = ["Original_Data", "Processed_Data", "Translated_Data"]
    original_dir, processed_dir, translated_dir = list(
        map(lambda name: os.path.join(directory, name), name_list)
    )

    if original_dir.split("/")[-1] not in os.listdir(directory):
        _make_directory(original_dir)

    if processed_dir.split("/")[-1] not in os.listdir(directory):
        _make_directory(processed_dir)

    if translated_dir.split("/")[-1] not in os.listdir(directory):
        _make_directory(translated_dir)

    return original_dir, processed_dir, translated_dir


6) check_data()

def check_data(directory: str):
    data_path_list = []

    for path, direct, files in os.walk(directory):
        data_list = [
            data
            for data in files
            if os.path.splitext(data)[-1][1:] == "json"
            or os.path.splitext(data)[-1][1:] == "csv"
        ]
        data_path = [os.path.join(path, data) for data in data_list]

        data_path_list.extend(data_path)

    print(f"\n{'Data Directory:':<15} {directory}\n")

    for data in data_path_list:
        print(data)

    return data_path_list


7) move_file()

def move_file(source_path: str, target_path: str):
    try:
        shutil.move(source_path, target_path)
        message = f"\n{'file moved:':<15} {target_path}"
    except Exception as e:
        message = f"\n{'Failed to save:':<15} {e}"
    print(message)

    return message


8) get_batch()

def get_batch(text_list: list, cores_num: int):
    div, mod = divmod(len(text_list), cores_num)
    batch = [None] * cores_num

    if mod > 0:
        start, end = 0, div + 1
        mod -= 1
    else:
        start, end = 0, div

    for i in range(cores_num):
        batch[i] = text_list[start:end]
        start = end

        if mod > 0:
            end += div + 1
            mod -= 1
        else:
            end += div

    return batch


9) data_parser()

def data_parser(data: dict, data_extension: str):

    # 항상 데이터는 [(index, text), (index, text), (index, text), ... (index, text)]로 만들어 줘야 함

    if data_extension == "json":
        text_list = []
        idx = 0

        for i in range(len(data)):
            temp_list = []

            for j in range(len(data["data"][i]["body"]["dialogue"])):
                temp_list.append(
                    (idx, data["data"][i]["body"]["dialogue"][j]["utterance"])
                )

                idx += 1

            text_list.extend(temp_list)
    elif data_extension == "csv":
        text_list = [(i, data["한국어"][i]) for i in range(len(data))]
    else:
        text_list = None

    return text_list


10) 전체 코드

import os, shutil
import pandas as pd

def _make_directory(directory: str):
    if not os.path.isdir(directory):
        os.makedirs(directory)

def load_data(path: str):
    extension = os.path.splitext(path)[-1][1:]

    if extension == "json":
        data = pd.read_json(path)
    elif extension == "csv":
        data = pd.read_csv(path)
    else:
        print(f"File Extension Error: {extension}")
        data = None

    return data, extension

def save_csv(path: str, obj: dict, index=False):
    try:
        obj.to_csv(path, index=index, encoding="utf-8-sig")
        message = f"\n{'csv saved:':<15} {path}"
    except Exception as e:
        message = f"\n{'Failed to save:':<15} {e}"
    print(message)

    return message

def check_default_directory(directory: str):
    name_list = ["Original_Data", "Processed_Data", "Translated_Data"]
    original_dir, processed_dir, translated_dir = list(
        map(lambda name: os.path.join(directory, name), name_list)
    )

    if original_dir.split("/")[-1] not in os.listdir(directory):
        _make_directory(original_dir)

    if processed_dir.split("/")[-1] not in os.listdir(directory):
        _make_directory(processed_dir)

    if translated_dir.split("/")[-1] not in os.listdir(directory):
        _make_directory(translated_dir)

    return original_dir, processed_dir, translated_dir

def check_data(directory: str):
    data_path_list = []

    for path, direct, files in os.walk(directory):
        data_list = [
            data
            for data in files
            if os.path.splitext(data)[-1][1:] == "json"
            or os.path.splitext(data)[-1][1:] == "csv"
        ]
        data_path = [os.path.join(path, data) for data in data_list]

        data_path_list.extend(data_path)

    print(f"\n{'Data Directory:':<15} {directory}\n")

    for data in data_path_list:
        print(data)

    return data_path_list

def move_file(source_path: str, target_path: str):
    try:
        shutil.move(source_path, target_path)
        message = f"\n{'file moved:':<15} {target_path}"
    except Exception as e:
        message = f"\n{'Failed to save:':<15} {e}"
    print(message)

    return message

def get_batch(text_list: list, cores_num: int):
    div, mod = divmod(len(text_list), cores_num)
    batch = [None] * cores_num

    if mod > 0:
        start, end = 0, div + 1
        mod -= 1
    else:
        start, end = 0, div

    for i in range(cores_num):
        batch[i] = text_list[start:end]
        start = end

        if mod > 0:
            end += div + 1
            mod -= 1
        else:
            end += div

    return batch

def data_parser(data: dict, data_extension: str):

    # 항상 데이터는 [(index, text), (index, text), (index, text), ... (index, text)]로 만들어 줘야 함

    if data_extension == "json":
        text_list = []
        idx = 0

        for i in range(len(data)):
            temp_list = []

            for j in range(len(data["data"][i]["body"]["dialogue"])):
                temp_list.append(
                    (idx, data["data"][i]["body"]["dialogue"][j]["utterance"])
                )

                idx += 1

            text_list.extend(temp_list)
    elif data_extension == "csv":
        text_list = [(i, data["한국어"][i]) for i in range(len(data))]
    else:
        text_list = None

    return text_list


4. api_crawler.py

1) 라이브러리

import utils
import os, time, re
import pandas as pd
import urllib.request
from multiprocessing import Process, Queue, cpu_count


2) process_info()

def process_info():
    print(f"{'Module Name:':<15} {__name__}")
    print(f"{'Parent PID:':<15} {os.getppid()}")
    print(f"{'Current PID:':<15} {os.getpid()}")


3) papago_api()

def papago_api(cores_num: int, batch: list, result_queue: object):
    print(f"\n{'API / Num:':<15} Papago / {cores_num + 1}")
    process_info()

    client_id = "발급받은 클라이언트 ID"
    client_secret = "발급받은 클라이언트 SECRET"
    url = "https://openapi.naver.com/v1/papago/n2mt"
    request = urllib.request.Request(url)
    request.add_header("X-Naver-Client-Id", client_id)
    request.add_header("X-Naver-Client-Secret", client_secret)

    for i in range(len(batch)):
        try:
            papago_data = f"source=ko&target=en&text={urllib.parse.quote(batch[i][1])}"
            papago_response = urllib.request.urlopen(
                request, data=papago_data.encode("utf-8")
            )
            papago_response_body = papago_response.read().decode("utf-8")

            start_p, end_p = re.compile("translatedText"), re.compile("engineType")
            start_idx = start_p.search(papago_response_body).span()[1] + 3
            end_idx = end_p.search(papago_response_body).span()[0] - 3

            result_queue.put((batch[i][0], papago_response_body[start_idx:end_idx]))

        except:
            result_queue.put((batch[i][0], "( - )"))


4) main()

if __name__ == "__main__":
    start_total_real_time, start_total_proc_time = time.time(), time.process_time()

    # Set Directory
    PROJECT_DIR = os.path.dirname(os.path.abspath(__file__))
    ORIGINAL_DIR, PROCESSED_DIR, TRANSLATED_DIR = utils.check_default_directory(
        PROJECT_DIR
    )
    DATA_PATH_LIST = utils.check_data(ORIGINAL_DIR)

    for i in range(len(DATA_PATH_LIST)):
        start_real_time, start_proc_time = time.time(), time.process_time()

        task_name = DATA_PATH_LIST[i]
        data, data_extension = utils.load_data(task_name)

        if data is not None:
            # Set Original Text List
            file_name = os.path.split(task_name)[-1]
            text_list = utils.data_parser(data, data_extension)[:32]  # 테스트 후 슬라이싱 제거
            papago_dict = {}

            # Set Multi-Processing
            result_queue = Queue()
            cores_num = cpu_count()
            batch = utils.get_batch(text_list, cores_num)
            text_list = [e[1] for e in text_list]

            print(f"\n{'Data Num:':<15} {i + 1} of {len(DATA_PATH_LIST)}")
            print(f"{'Data path:':<15} {task_name}")
            print(f"{'Num of Cores:':<15} {cores_num}")

            # Papago Multi-Processing
            procs = list(
                map(
                    lambda cores_num: Process(
                        target=papago_api,
                        args=(cores_num, batch[cores_num], result_queue),
                    ),
                    [i for i in range(cores_num)],
                )
            )

            [proc.start() for proc in procs]
            [proc.join() for proc in procs]

            while not result_queue.empty():
                result = result_queue.get()
                papago_dict[result[0]] = result[1]

            papago_list = [e[1] for e in sorted(papago_dict.items())]
            print(f"\nPapago Finished")

            # Make DataFrame
            df = pd.DataFrame({"Korean": text_list, "Papago": papago_list})

            # Save CSV
            target_path = os.path.join(
                TRANSLATED_DIR, os.path.splitext(file_name)[0] + ".csv"
            )
            utils.save_csv(target_path, df)

            # Move File
            source_path = os.path.join(task_name)
            target_path = os.path.join(PROCESSED_DIR, file_name)
            utils.move_file(source_path, target_path)

            end_real_time, end_proc_time = time.time(), time.process_time()

            print(f"\nData {i + 1} of {len(DATA_PATH_LIST)} Finished")
            print(f"{'Real Time:':<15} {end_real_time - start_real_time:.2f} sec.")
            print(f"{'Proc Time:':<15} {end_proc_time - start_proc_time:.2f} sec.")
        else:
            print(f"\nData {i + 1} of {len(DATA_PATH_LIST)} is None\n")

    end_total_real_time, end_total_proc_time = time.time(), time.process_time()

    print(
        f"\n{'Total Real:':<15} {end_total_real_time - start_total_real_time:.2f} sec."
    )
    print(f"{'Total Proc:':<15} {end_total_proc_time - start_total_proc_time:.2f} sec.")


5) 전체 코드

import utils
import os, time, re
import pandas as pd
import urllib.request
from multiprocessing import Process, Queue, cpu_count

def process_info():
    print(f"{'Module Name:':<15} {__name__}")
    print(f"{'Parent PID:':<15} {os.getppid()}")
    print(f"{'Current PID:':<15} {os.getpid()}")

def papago_api(cores_num: int, batch: list, result_queue: object):
    print(f"\n{'API / Num:':<15} Papago / {cores_num + 1}")
    process_info()

    client_id = "할당받은 클라이언트 ID"
    client_secret = "할당받은 클라이언트 SECRET"
    url = "https://openapi.naver.com/v1/papago/n2mt"
    request = urllib.request.Request(url)
    request.add_header("X-Naver-Client-Id", client_id)
    request.add_header("X-Naver-Client-Secret", client_secret)

    for i in range(len(batch)):
        try:
            papago_data = f"source=ko&target=en&text={urllib.parse.quote(batch[i][1])}"
            papago_response = urllib.request.urlopen(
                request, data=papago_data.encode("utf-8")
            )
            papago_response_body = papago_response.read().decode("utf-8")

            start_p, end_p = re.compile("translatedText"), re.compile("engineType")
            start_idx = start_p.search(papago_response_body).span()[1] + 3
            end_idx = end_p.search(papago_response_body).span()[0] - 3

            result_queue.put((batch[i][0], papago_response_body[start_idx:end_idx]))

        except:
            result_queue.put((batch[i][0], "( - )"))

if __name__ == "__main__":
    start_total_real_time, start_total_proc_time = time.time(), time.process_time()

    # Set Directory
    PROJECT_DIR = os.path.dirname(os.path.abspath(__file__))
    ORIGINAL_DIR, PROCESSED_DIR, TRANSLATED_DIR = utils.check_default_directory(
        PROJECT_DIR
    )
    DATA_PATH_LIST = utils.check_data(ORIGINAL_DIR)

    for i in range(len(DATA_PATH_LIST)):
        start_real_time, start_proc_time = time.time(), time.process_time()

        task_name = DATA_PATH_LIST[i]
        data, data_extension = utils.load_data(task_name)

        if data is not None:
            # Set Original Text List
            file_name = os.path.split(task_name)[-1]
            text_list = utils.data_parser(data, data_extension)[:32]  # 테스트 후 슬라이싱 제거
            papago_dict = {}

            # Set Multi-Processing
            result_queue = Queue()
            cores_num = cpu_count()
            batch = utils.get_batch(text_list, cores_num)
            text_list = [e[1] for e in text_list]

            print(f"\n{'Data Num:':<15} {i + 1} of {len(DATA_PATH_LIST)}")
            print(f"{'Data path:':<15} {task_name}")
            print(f"{'Num of Cores:':<15} {cores_num}")

            # Papago Multi-Processing
            procs = list(
                map(
                    lambda cores_num: Process(
                        target=papago_api,
                        args=(cores_num, batch[cores_num], result_queue),
                    ),
                    [i for i in range(cores_num)],
                )
            )

            [proc.start() for proc in procs]
            [proc.join() for proc in procs]

            while not result_queue.empty():
                result = result_queue.get()
                papago_dict[result[0]] = result[1]

            papago_list = [e[1] for e in sorted(papago_dict.items())]
            print(f"\nPapago Finished")

            # Make DataFrame
            df = pd.DataFrame({"Korean": text_list, "Papago": papago_list})

            # Save CSV
            target_path = os.path.join(
                TRANSLATED_DIR, os.path.splitext(file_name)[0] + ".csv"
            )
            utils.save_csv(target_path, df)

            # Move File
            source_path = os.path.join(task_name)
            target_path = os.path.join(PROCESSED_DIR, file_name)
            utils.move_file(source_path, target_path)

            end_real_time, end_proc_time = time.time(), time.process_time()

            print(f"\nData {i + 1} of {len(DATA_PATH_LIST)} Finished")
            print(f"{'Real Time:':<15} {end_real_time - start_real_time:.2f} sec.")
            print(f"{'Proc Time:':<15} {end_proc_time - start_proc_time:.2f} sec.")
        else:
            print(f"\nData {i + 1} of {len(DATA_PATH_LIST)} is None\n")

    end_total_real_time, end_total_proc_time = time.time(), time.process_time()

    print(
        f"\n{'Total Real:':<15} {end_total_real_time - start_total_real_time:.2f} sec."
    )
    print(f"{'Total Proc:':<15} {end_total_proc_time - start_total_proc_time:.2f} sec.")