2023-05-06 11:46:53 +08:00

226 行
8.1 KiB
Java

package com.weilab.biology.service;
import cn.hutool.core.io.FileUtil;
import cn.hutool.core.util.StrUtil;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import com.weilab.biology.core.data.dto.AppConfigDto;
import com.weilab.biology.core.data.dto.JobDto;
import com.weilab.biology.core.data.dto.JobLessDto;
import com.weilab.biology.core.data.enums.JobStatusEnum;
import com.weilab.biology.core.data.po.AppConfig;
import com.weilab.biology.core.data.po.Job;
import com.weilab.biology.core.data.vo.result.CommonError;
import com.weilab.biology.core.data.vo.result.Result;
import com.weilab.biology.core.data.vo.result.error.JobError;
import com.weilab.biology.mapper.JobMapper;
import com.weilab.biology.util.FileUtils;
import com.weilab.biology.util.MailUtil;
import com.weilab.biology.util.TaskExecutorUtil;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import org.springframework.web.multipart.MultipartFile;
import java.io.IOException;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
/**
* Created by skyyemperor on 2021-09-19
* Description :
*/
@Service
@Slf4j
public class JobService extends ServiceImpl<JobMapper, Job> {
@Autowired
private JobMapper jobMapper;
@Autowired
private MailUtil mailUtil;
@Autowired
private TaskExecutorUtil<?> taskExecutorUtil;
@Autowired
private AppConfigService appConfigService;
/**
* 提交job
*/
@Transactional
public Result submit(AppConfigDto appConfig, String dataStr, MultipartFile dataFile, JSONObject param, Job job) {
this.saveOrUpdate(job);
try {
// dataStr和dataFile均为蛋白质序列,两者含义相同只保留其一,dataFile的优先级大于dataStr
// 若dataFile为空,则将dataStr写入本地文件,之后向python传递文件路径参数
String dataPath = String.format(appConfig.getRequestPath() + "job-%d-dataStr.txt", job.getJobId());
if (dataFile != null) {
FileUtil.writeFromStream(dataFile.getInputStream(), dataPath);
} else if (!StrUtil.isBlank(dataStr)) {
FileUtils.writeStringToFile(dataPath, dataStr);
}
//将jobId和dataPath参数添加至param中
param.put("jobId", job.getJobId());
param.put("requestDataPath", dataPath);
param.put("resultDataPath", appConfig.getResultPath());
// 更新数据库param字段
job.setParam(JSON.toJSONString(param));
this.saveOrUpdate(job);
updateJobStatus(job, JobStatusEnum.WAITING);
} catch (Exception e) {
e.printStackTrace();
updateJobStatus(job, JobStatusEnum.FAIL);
}
return getJobInfo(job.getJobId());
}
public Result getJobInfo(Integer jobId) {
Job job = jobMapper.selectById(jobId);
if (job == null) {
return Result.getResult(CommonError.CONTENT_NOT_FOUND);
}
return Result.success(JobDto.parseJob(job));
}
public Result updateJobStatus(Job job, JobStatusEnum status) {
return updateJobStatus(job, status, null);
}
/**
* 更新任务状态
*
* @param job 任务
* @param status 状态
* @param result 任务运行结果
*/
public Result updateJobStatus(Job job, JobStatusEnum status, String result) {
switch (status) {
case WAITING:
case CREATING:
break;
case SUCCESS:
job.setCompleteTime(LocalDateTime.now());
job.setResult(result);
break;
case FAIL:
case TIMEOUT:
if (!job.getStatus().equals(JobStatusEnum.RUNNING.getKey())
&& !job.getStatus().equals(JobStatusEnum.REQED.getKey())) {
return Result.getResult(JobError.STATUS_UPDATE_FAIL);
}
job.setCompleteTime(LocalDateTime.now());
break;
case RUNNING:
if (!job.getStatus().equals(JobStatusEnum.REQED.getKey())) {
return Result.getResult(JobError.STATUS_UPDATE_FAIL);
}
job.setRequestTime(LocalDateTime.now());
break;
}
job.setStatus(status.getKey());
jobMapper.updateById(job);
sendEmail(job, status, job.getMail());
return getJobInfo(job.getJobId());
}
public Result getJobList(Integer appId, Integer type, Boolean filterCreating, Integer page, Integer size) {
List<Job> jobs = jobMapper.selectJobListByPage(appId, type, filterCreating, (page - 1) * size, size);
return Result.success(jobs.stream().map(JobLessDto::parseJob).collect(Collectors.toList()));
}
/**
* 运行job
*/
public synchronized void runJob() {
List<AppConfigDto> appConfigList = appConfigService.list().stream()
.map(AppConfigDto::parse)
.collect(Collectors.toList());
appConfigList.forEach(appConfig -> {
//并发数小于concurrentNum,运行job
int size = appConfig.getConcurrentNum() - jobMapper.selectRunningJobs(appConfig.getAppId()).size();
if (size <= 0) {
return;
}
jobMapper.selectWaitingJobs(appConfig.getAppId(), size)
.forEach(job -> runJob(appConfig, job));
});
}
private void runJob(AppConfigDto appConfig, Job job) {
try {
String logFilePath = String.format(appConfig.getLogPath() + "task-log-%s.txt", job.getJobId());
String cmd = String.format("%s -setting '%s' >> %s 2>&1", appConfig.getCmd(), job.getParam(), logFilePath);
String[] cmds = new String[]{"/bin/sh", "-c", cmd};
Runtime.getRuntime().exec(cmds);
log.info("执行命令: " + cmd);
// 更新job状态
updateJobStatus(job, JobStatusEnum.REQED);
// 异步检测超时
asyncScheduleTask(appConfig, job.getJobId());
} catch (Exception e) {
e.printStackTrace();
updateJobStatus(job, JobStatusEnum.FAIL);
}
}
/**
* 开启异步定时任务,校验超时任务
*/
private void asyncScheduleTask(AppConfigDto appConfig, Integer jobId) {
//等待120秒,检查是否已运行
taskExecutorUtil.schedule(() -> {
Job job = jobMapper.selectById(jobId);
if (job.getStatus().equals(JobStatusEnum.REQED.getKey())) {
updateJobStatus(job, JobStatusEnum.FAIL);
}
}, 120, TimeUnit.SECONDS);
//等待一段时间后,查看是否执行完成
taskExecutorUtil.schedule(() -> {
Job job = jobMapper.selectById(jobId);
if (job.getStatus().equals(JobStatusEnum.RUNNING.getKey())) {
updateJobStatus(job, JobStatusEnum.TIMEOUT);
}
}, appConfig.getTimeout(), TimeUnit.MINUTES);
}
private void sendEmail(Job job, JobStatusEnum status, String mail) {
AppConfigDto appConfig = AppConfigDto.parse(appConfigService.getById(job.getAppId()));
if (appConfig == null) {
return;
}
// 多个参数
String param1 = job.getRequestTime().format(DateTimeFormatter.ofPattern("yyyyMMdd")) + job.getJobId();
String param2 = param1;
String content = appConfig.getEmailTemplate().getString(status.getRemark());
String subject = appConfig.getEmailTemplate().getString("subject");
if (StrUtil.isBlank(content) || StrUtil.isBlank(subject)) {
return;
}
mailUtil.send(mail, String.format(subject, param1), String.format(content, param1, param2));
}
}