Files
DataMate/runtime/python-executor/datamate/wrappers/data_juicer_executor.py
hhhhsc701 6a1eb85e8e feat: 支持运行data-juicer算子 (#215)
* feature: 增加data-juicer算子

* feat: 支持运行data-juicer算子

* feat: 支持data-juicer任务下发

* feat: 支持data-juicer结果数据集归档

* feat: 支持data-juicer结果数据集归档
2025-12-31 09:20:41 +08:00

150 lines
5.3 KiB
Python

import base64
import time
from json import dumps as jdumps
from json import loads as jloads
from typing import Dict, Optional
from urllib.parse import urljoin
import ray
import requests
import yaml
from jsonargparse import ArgumentParser
from loguru import logger
from datamate.core.base_op import FileExporter, SUCCESS_STATUS
from datamate.core.constant import Fields
from datamate.wrappers.executor import RayExecutor
DJ_OUTPUT = "outputs"
class DataJuicerClient:
def __init__(self, base_url):
self.base_url = base_url
def call_data_juicer_api(self, path: str, params: Optional[Dict] = None, json: Optional[Dict] = None):
url = urljoin(self.base_url, path)
if json is not None:
response = requests.post(url, params=params, json=json)
else:
response = requests.get(url, params=params)
return jloads(response.text)
def init_config(self, dataset_path: str, export_path, process):
"""
Initialize Data-Juicer config.
Args:
:param dataset_path: The input dataset path.
:param process: The ops
:param export_path: The export path.
"""
dj_config = {
"dataset_path": dataset_path,
"export_path": export_path,
"process": process,
"executor_type": "default",
}
url_path = "/data_juicer/config/get_init_configs"
try:
res = self.call_data_juicer_api(url_path, params={"cfg": jdumps(dj_config)})
except Exception as e:
error_msg = f"An unexpected error occurred in calling {url_path}:\n{e}"
raise RuntimeError(error_msg)
return res["result"]
def execute_config(self, dj_config: Dict):
"""
Execute data-juicer data process.
Args:
dj_config: configs of data-juicer
"""
url_path = "/data_juicer/core/DefaultExecutor/run"
try:
res = self.call_data_juicer_api(url_path, params={"skip_return": True}, json={"cfg": jdumps(dj_config)})
if res.get("status") != "success":
raise RuntimeError(f"An error occurred in calling {url_path}:\n{res}")
return dj_config["export_path"]
except Exception as e:
error_msg = f"An unexpected error occurred in calling {url_path}:\n{e}"
raise RuntimeError(error_msg)
class DataJuicerExecutor(RayExecutor):
def __init__(self, cfg = None, meta = None):
super().__init__(cfg, meta)
self.client = DataJuicerClient(base_url="http://datamate-data-juicer:8000")
self.dataset_path = f"/flow/{self.cfg.instance_id}/dataset_on_dj.jsonl"
self.export_path = f"/flow/{self.cfg.instance_id}/processed_dataset.jsonl"
def add_column(self, batch):
batch_size = len(batch["filePath"])
batch["execute_status"] = [SUCCESS_STATUS] * batch_size
batch[Fields.instance_id] = [self.cfg.instance_id] * batch_size
batch[Fields.export_path] = [self.cfg.export_path] * batch_size
return batch
def run(self):
# 1. 加载数据集
logger.info('Loading dataset with Ray...')
if self.meta:
file_content = base64.b64decode(self.meta)
lines = file_content.splitlines()
dataset = ray.data.from_items([jloads(line) for line in lines])
else:
dataset = self.load_dataset()
logger.info('Read data...')
dataset = dataset.map(FileExporter().read_file, num_cpus=0.05)
with open(self.dataset_path, "w", encoding="utf-8") as f:
for batch_df in dataset.iter_batches(batch_format="pandas", batch_size=2048):
batch_df.to_json(f, orient="records", lines=True, force_ascii=False)
logger.info('Processing data...')
tstart = time.time()
try:
dj_config = self.client.init_config(self.dataset_path, self.export_path, self.cfg.process)
result_path = self.client.execute_config(dj_config)
processed_dataset = self.load_dataset(result_path)
processed_dataset = processed_dataset.map_batches(self.add_column, num_cpus=0.05)
processed_dataset = processed_dataset.map(FileExporter().save_file_and_db, num_cpus=0.05)
for _ in processed_dataset.iter_batches():
pass
except Exception as e:
logger.error(f"An unexpected error occurred.", e)
raise e
tend = time.time()
logger.info(f'All Ops are done in {tend - tstart:.3f}s.')
if __name__ == '__main__':
parser = ArgumentParser(description="Create API for Submitting Job to Data-juicer")
parser.add_argument("--config_path", type=str, required=False, default="../configs/demo.yaml")
parser.add_argument("--flow_config", type=str, required=False, default=None)
args = parser.parse_args()
config_path = args.config_path
flow_config = args.flow_config
if flow_config:
m_cfg = yaml.safe_load(base64.b64decode(flow_config))
else:
with open(config_path, "r", encoding='utf-8') as f:
m_cfg = yaml.safe_load(f)
executor = DataJuicerExecutor(m_cfg)
try:
executor.run()
except Exception as e:
executor.update_db("FAILED")
raise e
executor.update_db("COMPLETED")