You've already forked DataMate
优化部分问题 (#126)
* feature: 支持相对路径引用 * feature: 优化本地部署命令 * feature: 优化算子编排展示 * feature: 优化清洗任务失败后重试
This commit is contained in:
@@ -16,6 +16,8 @@ from datamate.core.base_op import Filter, Mapper, Slicer
|
||||
from datamate.core.constant import Fields
|
||||
from datamate.core.base_op import OPERATORS, BaseOp
|
||||
|
||||
from core.base_op import Filter as RELATIVE_Filter, Mapper as RELATIVE_Mapper, Slicer as RELATIVE_Slicer
|
||||
|
||||
rd.DataContext.get_current().enable_progress_bars = False
|
||||
|
||||
|
||||
@@ -136,7 +138,10 @@ class RayDataset(BasicDataset):
|
||||
parent_dir = os.path.join(os.path.dirname(os.path.dirname(os.path.abspath(__file__))), "ops")
|
||||
if parent_dir not in sys.path:
|
||||
sys.path.insert(0, parent_dir)
|
||||
registry_content = OPERATORS.modules[op_name]
|
||||
registry_content = OPERATORS.modules.get(op_name)
|
||||
if registry_content is None:
|
||||
from core.base_op import OPERATORS as RELATIVE_OPERATORS
|
||||
registry_content = RELATIVE_OPERATORS.modules.get(op_name)
|
||||
if isinstance(registry_content, str):
|
||||
# registry_content是module的路径
|
||||
submodule = importlib.import_module(registry_content)
|
||||
@@ -171,7 +176,7 @@ class RayDataset(BasicDataset):
|
||||
|
||||
kwargs.update({"ext_params": {}, "failed_reason": {}, "target_type": None})
|
||||
try:
|
||||
if issubclass(operators_cls, Mapper):
|
||||
if issubclass(operators_cls, (Mapper, RELATIVE_Mapper)):
|
||||
self.data = self.data.map(operators_cls,
|
||||
fn_constructor_kwargs=init_kwargs,
|
||||
fn_kwargs=kwargs,
|
||||
@@ -179,7 +184,7 @@ class RayDataset(BasicDataset):
|
||||
num_cpus=0.05,
|
||||
concurrency=(1, 1 if operators_cls.use_model else int(max_actor_nums)))
|
||||
|
||||
elif issubclass(operators_cls, Slicer):
|
||||
elif issubclass(operators_cls, (Slicer, RELATIVE_Slicer)):
|
||||
self.data = self.data.flat_map(operators_cls,
|
||||
fn_constructor_kwargs=init_kwargs,
|
||||
fn_kwargs=kwargs,
|
||||
@@ -187,7 +192,7 @@ class RayDataset(BasicDataset):
|
||||
num_cpus=0.05,
|
||||
concurrency=(1, int(max_actor_nums)))
|
||||
|
||||
elif issubclass(operators_cls, Filter):
|
||||
elif issubclass(operators_cls, (Filter, RELATIVE_Filter)):
|
||||
self.data = self.data.filter(operators_cls,
|
||||
fn_constructor_kwargs=init_kwargs,
|
||||
fn_kwargs=kwargs,
|
||||
|
||||
Reference in New Issue
Block a user