You've already forked DataMate
* feature: 增加data-juicer算子 * feat: 支持运行data-juicer算子 * feat: 支持data-juicer任务下发 * feat: 支持data-juicer结果数据集归档 * feat: 支持data-juicer结果数据集归档
80 lines
2.6 KiB
Python
80 lines
2.6 KiB
Python
import json
|
|
import time
|
|
from typing import Dict
|
|
|
|
import ray
|
|
from jsonargparse import dict_to_namespace
|
|
from loguru import logger
|
|
|
|
from datamate.common.utils import check_valid_path
|
|
from datamate.sql_manager.persistence_atction import TaskInfoPersistence
|
|
|
|
|
|
class RayExecutor:
|
|
"""
|
|
基于Ray的执行器.
|
|
|
|
1. 当前仅支持Mapper,Filter类型的算子。
|
|
2. 当前仅加载json文件类型的数据集。
|
|
"""
|
|
|
|
def __init__(self, cfg=None, meta=None):
|
|
if isinstance(cfg, Dict):
|
|
self.cfg = dict_to_namespace(cfg)
|
|
else:
|
|
logger.error(f"Please set param: cfg as type Dict, but given cfg as type {type(cfg).__name__}")
|
|
raise TypeError(f"To params cfg, Dict type is required, but type {type(cfg).__name__} is given!")
|
|
|
|
self.cfg.process = cfg['process']
|
|
self.meta = meta
|
|
|
|
# init ray
|
|
logger.info('Initing Ray ...')
|
|
ray.init()
|
|
|
|
def load_meta(self, line):
|
|
meta = json.loads(line)
|
|
if meta.get("fileId"):
|
|
meta["sourceFileId"] = meta.get("fileId")
|
|
if meta.get("fileName"):
|
|
meta["sourceFileName"] = meta.get("fileName")
|
|
if meta.get("fileType"):
|
|
meta["sourceFileType"] = meta.get("fileType")
|
|
if meta.get("fileSize"):
|
|
meta["sourceFileSize"] = meta.get("fileSize")
|
|
if not meta.get("totalPageNum"):
|
|
meta["totalPageNum"] = 0
|
|
if not meta.get("extraFilePath"):
|
|
meta["extraFilePath"] = None
|
|
if not meta.get("extraFileType"):
|
|
meta["extraFileType"] = None
|
|
meta["dataset_id"] = self.cfg.dataset_id
|
|
return meta
|
|
|
|
def run(self):
|
|
pass
|
|
|
|
def load_dataset(self, jsonl_file_path = None):
|
|
retry = 0
|
|
dataset = None
|
|
if jsonl_file_path is None:
|
|
jsonl_file_path = self.cfg.dataset_path
|
|
while True:
|
|
if check_valid_path(jsonl_file_path):
|
|
with open(jsonl_file_path, "r", encoding='utf-8') as meta:
|
|
lines = meta.readlines()
|
|
dataset = ray.data.from_items([self.load_meta(line) for line in lines])
|
|
break
|
|
if retry < 5:
|
|
retry += 1
|
|
time.sleep(retry)
|
|
continue
|
|
else:
|
|
logger.error(f"can not load dataset from dataset_path")
|
|
raise RuntimeError(f"Load dataset Failed!, dataset_path: {self.cfg.dataset_path}.")
|
|
|
|
return dataset
|
|
|
|
def update_db(self, status):
|
|
task_info = TaskInfoPersistence()
|
|
task_info.update_result(self.cfg.dataset_id, self.cfg.instance_id, status) |