-- DataMate 数据标注模块 - 通用算子编排改造(第1步:DDL) -- 说明: -- 1) 修改 t_dm_auto_annotation_tasks,新增编排相关字段和索引 -- 2) 新建 t_dm_annotation_task_operator_instance,用于记录任务内算子实例 -- 3) 本脚本按“幂等”方式编写,可重复执行 USE datamate; SET @db_name = DATABASE(); -- ===================================================== -- 1) 修改 t_dm_auto_annotation_tasks 表 -- ===================================================== -- task_mode: 任务模式(legacy_yolo / pipeline) SET @ddl = ( SELECT IF( EXISTS( SELECT 1 FROM information_schema.COLUMNS WHERE TABLE_SCHEMA = @db_name AND TABLE_NAME = 't_dm_auto_annotation_tasks' AND COLUMN_NAME = 'task_mode' ), 'SELECT ''skip: column task_mode exists''', 'ALTER TABLE t_dm_auto_annotation_tasks ADD COLUMN task_mode VARCHAR(32) NOT NULL DEFAULT ''legacy_yolo'' COMMENT ''任务模式: legacy_yolo/pipeline'' AFTER status' ) ); PREPARE stmt FROM @ddl; EXECUTE stmt; DEALLOCATE PREPARE stmt; -- executor_type: 执行器类型 SET @ddl = ( SELECT IF( EXISTS( SELECT 1 FROM information_schema.COLUMNS WHERE TABLE_SCHEMA = @db_name AND TABLE_NAME = 't_dm_auto_annotation_tasks' AND COLUMN_NAME = 'executor_type' ), 'SELECT ''skip: column executor_type exists''', 'ALTER TABLE t_dm_auto_annotation_tasks ADD COLUMN executor_type VARCHAR(32) NOT NULL DEFAULT ''annotation_local'' COMMENT ''执行器类型'' AFTER task_mode' ) ); PREPARE stmt FROM @ddl; EXECUTE stmt; DEALLOCATE PREPARE stmt; -- pipeline: 算子编排定义(JSON) SET @ddl = ( SELECT IF( EXISTS( SELECT 1 FROM information_schema.COLUMNS WHERE TABLE_SCHEMA = @db_name AND TABLE_NAME = 't_dm_auto_annotation_tasks' AND COLUMN_NAME = 'pipeline' ), 'SELECT ''skip: column pipeline exists''', 'ALTER TABLE t_dm_auto_annotation_tasks ADD COLUMN pipeline JSON NULL COMMENT ''算子编排定义'' AFTER executor_type' ) ); PREPARE stmt FROM @ddl; EXECUTE stmt; DEALLOCATE PREPARE stmt; -- output_dataset_id: 输出数据集ID SET @ddl = ( SELECT IF( EXISTS( SELECT 1 FROM information_schema.COLUMNS WHERE TABLE_SCHEMA = @db_name AND TABLE_NAME = 't_dm_auto_annotation_tasks' AND COLUMN_NAME = 'output_dataset_id' ), 'SELECT ''skip: column output_dataset_id exists''', 'ALTER TABLE t_dm_auto_annotation_tasks ADD COLUMN output_dataset_id VARCHAR(36) NULL COMMENT ''输出数据集ID'' AFTER output_path' ) ); PREPARE stmt FROM @ddl; EXECUTE stmt; DEALLOCATE PREPARE stmt; -- created_by: 任务创建人 SET @ddl = ( SELECT IF( EXISTS( SELECT 1 FROM information_schema.COLUMNS WHERE TABLE_SCHEMA = @db_name AND TABLE_NAME = 't_dm_auto_annotation_tasks' AND COLUMN_NAME = 'created_by' ), 'SELECT ''skip: column created_by exists''', 'ALTER TABLE t_dm_auto_annotation_tasks ADD COLUMN created_by VARCHAR(255) NULL COMMENT ''任务创建人'' AFTER dataset_name' ) ); PREPARE stmt FROM @ddl; EXECUTE stmt; DEALLOCATE PREPARE stmt; -- stop_requested: 停止请求标记 SET @ddl = ( SELECT IF( EXISTS( SELECT 1 FROM information_schema.COLUMNS WHERE TABLE_SCHEMA = @db_name AND TABLE_NAME = 't_dm_auto_annotation_tasks' AND COLUMN_NAME = 'stop_requested' ), 'SELECT ''skip: column stop_requested exists''', 'ALTER TABLE t_dm_auto_annotation_tasks ADD COLUMN stop_requested TINYINT(1) NOT NULL DEFAULT 0 COMMENT ''是否请求停止: 0否/1是'' AFTER progress' ) ); PREPARE stmt FROM @ddl; EXECUTE stmt; DEALLOCATE PREPARE stmt; -- started_at: 启动时间 SET @ddl = ( SELECT IF( EXISTS( SELECT 1 FROM information_schema.COLUMNS WHERE TABLE_SCHEMA = @db_name AND TABLE_NAME = 't_dm_auto_annotation_tasks' AND COLUMN_NAME = 'started_at' ), 'SELECT ''skip: column started_at exists''', 'ALTER TABLE t_dm_auto_annotation_tasks ADD COLUMN started_at TIMESTAMP NULL COMMENT ''任务启动时间'' AFTER updated_at' ) ); PREPARE stmt FROM @ddl; EXECUTE stmt; DEALLOCATE PREPARE stmt; -- heartbeat_at: worker 心跳时间 SET @ddl = ( SELECT IF( EXISTS( SELECT 1 FROM information_schema.COLUMNS WHERE TABLE_SCHEMA = @db_name AND TABLE_NAME = 't_dm_auto_annotation_tasks' AND COLUMN_NAME = 'heartbeat_at' ), 'SELECT ''skip: column heartbeat_at exists''', 'ALTER TABLE t_dm_auto_annotation_tasks ADD COLUMN heartbeat_at TIMESTAMP NULL COMMENT ''worker心跳时间'' AFTER started_at' ) ); PREPARE stmt FROM @ddl; EXECUTE stmt; DEALLOCATE PREPARE stmt; -- run_token: 运行令牌(用于任务 claim) SET @ddl = ( SELECT IF( EXISTS( SELECT 1 FROM information_schema.COLUMNS WHERE TABLE_SCHEMA = @db_name AND TABLE_NAME = 't_dm_auto_annotation_tasks' AND COLUMN_NAME = 'run_token' ), 'SELECT ''skip: column run_token exists''', 'ALTER TABLE t_dm_auto_annotation_tasks ADD COLUMN run_token VARCHAR(64) NULL COMMENT ''运行令牌'' AFTER heartbeat_at' ) ); PREPARE stmt FROM @ddl; EXECUTE stmt; DEALLOCATE PREPARE stmt; -- status 注释补全 stopped(若字段已存在) SET @ddl = ( SELECT IF( EXISTS( SELECT 1 FROM information_schema.COLUMNS WHERE TABLE_SCHEMA = @db_name AND TABLE_NAME = 't_dm_auto_annotation_tasks' AND COLUMN_NAME = 'status' ), 'ALTER TABLE t_dm_auto_annotation_tasks MODIFY COLUMN status VARCHAR(50) NOT NULL DEFAULT ''pending'' COMMENT ''任务状态: pending/running/completed/failed/stopped''', 'SELECT ''skip: column status not found''' ) ); PREPARE stmt FROM @ddl; EXECUTE stmt; DEALLOCATE PREPARE stmt; -- 索引:按状态 + 创建时间查询任务 SET @ddl = ( SELECT IF( EXISTS( SELECT 1 FROM information_schema.STATISTICS WHERE TABLE_SCHEMA = @db_name AND TABLE_NAME = 't_dm_auto_annotation_tasks' AND INDEX_NAME = 'idx_status_created' ), 'SELECT ''skip: index idx_status_created exists''', 'CREATE INDEX idx_status_created ON t_dm_auto_annotation_tasks (status, created_at)' ) ); PREPARE stmt FROM @ddl; EXECUTE stmt; DEALLOCATE PREPARE stmt; -- 索引:按创建人过滤任务 SET @ddl = ( SELECT IF( EXISTS( SELECT 1 FROM information_schema.STATISTICS WHERE TABLE_SCHEMA = @db_name AND TABLE_NAME = 't_dm_auto_annotation_tasks' AND INDEX_NAME = 'idx_created_by' ), 'SELECT ''skip: index idx_created_by exists''', 'CREATE INDEX idx_created_by ON t_dm_auto_annotation_tasks (created_by)' ) ); PREPARE stmt FROM @ddl; EXECUTE stmt; DEALLOCATE PREPARE stmt; -- ===================================================== -- 2) 创建 t_dm_annotation_task_operator_instance 表 -- ===================================================== CREATE TABLE IF NOT EXISTS t_dm_annotation_task_operator_instance ( id BIGINT AUTO_INCREMENT PRIMARY KEY COMMENT '自增主键', task_id VARCHAR(36) NOT NULL COMMENT '自动标注任务ID', op_index INT NOT NULL COMMENT '算子顺序(从1开始)', operator_id VARCHAR(64) NOT NULL COMMENT '算子ID(raw_id)', settings_override JSON NULL COMMENT '任务级算子参数覆盖', inputs VARCHAR(64) NULL COMMENT '输入模态', outputs VARCHAR(64) NULL COMMENT '输出模态', created_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间', updated_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间', UNIQUE KEY uk_task_op_index (task_id, op_index), KEY idx_task_id (task_id), KEY idx_operator_id (operator_id) ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='标注任务算子实例表';