You've already forked DataMate
* feature: 增加data-juicer算子 * feat: 支持运行data-juicer算子 * feat: 支持data-juicer任务下发 * feat: 支持data-juicer结果数据集归档 * feat: 支持data-juicer结果数据集归档
150 lines
5.3 KiB
Python
150 lines
5.3 KiB
Python
import base64
|
|
import time
|
|
from json import dumps as jdumps
|
|
from json import loads as jloads
|
|
from typing import Dict, Optional
|
|
from urllib.parse import urljoin
|
|
|
|
import ray
|
|
import requests
|
|
import yaml
|
|
from jsonargparse import ArgumentParser
|
|
from loguru import logger
|
|
|
|
from datamate.core.base_op import FileExporter, SUCCESS_STATUS
|
|
from datamate.core.constant import Fields
|
|
from datamate.wrappers.executor import RayExecutor
|
|
|
|
DJ_OUTPUT = "outputs"
|
|
|
|
|
|
class DataJuicerClient:
|
|
def __init__(self, base_url):
|
|
self.base_url = base_url
|
|
|
|
def call_data_juicer_api(self, path: str, params: Optional[Dict] = None, json: Optional[Dict] = None):
|
|
url = urljoin(self.base_url, path)
|
|
|
|
if json is not None:
|
|
response = requests.post(url, params=params, json=json)
|
|
else:
|
|
response = requests.get(url, params=params)
|
|
|
|
return jloads(response.text)
|
|
|
|
|
|
def init_config(self, dataset_path: str, export_path, process):
|
|
"""
|
|
Initialize Data-Juicer config.
|
|
|
|
Args:
|
|
:param dataset_path: The input dataset path.
|
|
:param process: The ops
|
|
:param export_path: The export path.
|
|
"""
|
|
dj_config = {
|
|
"dataset_path": dataset_path,
|
|
"export_path": export_path,
|
|
"process": process,
|
|
"executor_type": "default",
|
|
}
|
|
url_path = "/data_juicer/config/get_init_configs"
|
|
try:
|
|
res = self.call_data_juicer_api(url_path, params={"cfg": jdumps(dj_config)})
|
|
except Exception as e:
|
|
error_msg = f"An unexpected error occurred in calling {url_path}:\n{e}"
|
|
raise RuntimeError(error_msg)
|
|
return res["result"]
|
|
|
|
def execute_config(self, dj_config: Dict):
|
|
"""
|
|
Execute data-juicer data process.
|
|
|
|
Args:
|
|
dj_config: configs of data-juicer
|
|
"""
|
|
|
|
url_path = "/data_juicer/core/DefaultExecutor/run"
|
|
try:
|
|
res = self.call_data_juicer_api(url_path, params={"skip_return": True}, json={"cfg": jdumps(dj_config)})
|
|
if res.get("status") != "success":
|
|
raise RuntimeError(f"An error occurred in calling {url_path}:\n{res}")
|
|
return dj_config["export_path"]
|
|
except Exception as e:
|
|
error_msg = f"An unexpected error occurred in calling {url_path}:\n{e}"
|
|
raise RuntimeError(error_msg)
|
|
|
|
|
|
class DataJuicerExecutor(RayExecutor):
|
|
def __init__(self, cfg = None, meta = None):
|
|
super().__init__(cfg, meta)
|
|
self.client = DataJuicerClient(base_url="http://datamate-data-juicer:8000")
|
|
self.dataset_path = f"/flow/{self.cfg.instance_id}/dataset_on_dj.jsonl"
|
|
self.export_path = f"/flow/{self.cfg.instance_id}/processed_dataset.jsonl"
|
|
|
|
def add_column(self, batch):
|
|
batch_size = len(batch["filePath"])
|
|
batch["execute_status"] = [SUCCESS_STATUS] * batch_size
|
|
batch[Fields.instance_id] = [self.cfg.instance_id] * batch_size
|
|
batch[Fields.export_path] = [self.cfg.export_path] * batch_size
|
|
return batch
|
|
|
|
def run(self):
|
|
# 1. 加载数据集
|
|
logger.info('Loading dataset with Ray...')
|
|
|
|
if self.meta:
|
|
file_content = base64.b64decode(self.meta)
|
|
lines = file_content.splitlines()
|
|
dataset = ray.data.from_items([jloads(line) for line in lines])
|
|
else:
|
|
dataset = self.load_dataset()
|
|
|
|
logger.info('Read data...')
|
|
dataset = dataset.map(FileExporter().read_file, num_cpus=0.05)
|
|
|
|
with open(self.dataset_path, "w", encoding="utf-8") as f:
|
|
for batch_df in dataset.iter_batches(batch_format="pandas", batch_size=2048):
|
|
batch_df.to_json(f, orient="records", lines=True, force_ascii=False)
|
|
|
|
logger.info('Processing data...')
|
|
tstart = time.time()
|
|
try:
|
|
dj_config = self.client.init_config(self.dataset_path, self.export_path, self.cfg.process)
|
|
result_path = self.client.execute_config(dj_config)
|
|
|
|
processed_dataset = self.load_dataset(result_path)
|
|
processed_dataset = processed_dataset.map_batches(self.add_column, num_cpus=0.05)
|
|
processed_dataset = processed_dataset.map(FileExporter().save_file_and_db, num_cpus=0.05)
|
|
for _ in processed_dataset.iter_batches():
|
|
pass
|
|
except Exception as e:
|
|
logger.error(f"An unexpected error occurred.", e)
|
|
raise e
|
|
tend = time.time()
|
|
logger.info(f'All Ops are done in {tend - tstart:.3f}s.')
|
|
|
|
|
|
if __name__ == '__main__':
|
|
parser = ArgumentParser(description="Create API for Submitting Job to Data-juicer")
|
|
parser.add_argument("--config_path", type=str, required=False, default="../configs/demo.yaml")
|
|
parser.add_argument("--flow_config", type=str, required=False, default=None)
|
|
|
|
args = parser.parse_args()
|
|
|
|
config_path = args.config_path
|
|
flow_config = args.flow_config
|
|
|
|
if flow_config:
|
|
m_cfg = yaml.safe_load(base64.b64decode(flow_config))
|
|
else:
|
|
with open(config_path, "r", encoding='utf-8') as f:
|
|
m_cfg = yaml.safe_load(f)
|
|
|
|
executor = DataJuicerExecutor(m_cfg)
|
|
try:
|
|
executor.run()
|
|
except Exception as e:
|
|
executor.update_db("FAILED")
|
|
raise e
|
|
executor.update_db("COMPLETED") |