This commit is contained in:
2022-04-09 21:44:03 +08:00
commit 34e481f0a5
26 changed files with 1693 additions and 0 deletions

View File

@@ -0,0 +1,242 @@
package com.weilab.biology.service;
import cn.hutool.core.io.FileUtil;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.weilab.biology.core.data.dto.JobDto;
import com.weilab.biology.core.data.dto.JobLessDto;
import com.weilab.biology.core.data.enums.JobStatusEnum;
import com.weilab.biology.core.data.po.Job;
import com.weilab.biology.core.data.vo.result.CommonError;
import com.weilab.biology.core.data.vo.result.Result;
import com.weilab.biology.core.data.vo.result.error.JobError;
import com.weilab.biology.mapper.JobMapper;
import com.weilab.biology.util.FileUtils;
import com.weilab.biology.util.MailUtil;
import com.weilab.biology.util.TaskExecutorUtil;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import java.io.BufferedInputStream;
import java.io.File;
import java.time.LocalDateTime;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
/**
* Created by skyyemperor on 2021-09-19
* Description :
*/
@Service
@Slf4j
public class JobService {
@Value("${biology.python-cmd}")
private String pythonCmd;
@Value("${biology.request-path}")
private String requestPath;
@Value("${biology.result-path}")
private String resultDataPath;
@Value("${biology.log-path}")
private String logPath;
@Value("${biology.concurrent-num}")
private Integer concurrentNum;
@Autowired
private JobMapper jobMapper;
@Autowired
private MailUtil mailUtil;
@Autowired
private TaskExecutorUtil<?> taskExecutorUtil;
private static final String SUBJECT = "【DeepBIO Result Notice】";
private static final String SUCCESS_EMAIL_CONTENT = "Your request has been completed, click http://server.wei-group.net/front/biology/#/resultMail?jobId=%s to check the detail information";
private static final String FAIL_EMAIL_CONTENT = "We are very sorry, but some errors occurred in the task you submitted, click http://server.wei-group.net/front/biology/#/resultMail?jobId=%s to check the detail information";
private static final String TIMEOUT_EMAIL_CONTENT = "We are very sorry, but the task you submitted was overtime, click http://server.wei-group.net/front/biology/#/resultMail?jobId=%s to check the detail information";
private static final String RECEIVED_EMAIL_CONTENT = "Your request has been received, click http://server.wei-group.net/front/biology/#/resultMail?jobId=%s to check the detail information";
private static final String START_RUNNING_EMAIL_CONTENT = "Your task has started running, click http://server.wei-group.net/front/biology/#/resultMail?jobId=%s to check the detail information";
/**
* 提交job
*
* @param dataStr 文本内容
* @param param 其他参数(json格式)
* @param mail 邮箱
* @return
* @throws Exception
*/
@Transactional
public Result submit(String dataStr, BufferedInputStream dataStream, JSONObject param, String mail, Integer type) {
Job job = new Job("", JSON.toJSONString(param), mail, JobStatusEnum.WAIT.getKey(), LocalDateTime.now(), type);
jobMapper.insert(job);
sendEmail(job.getJobId(), JobStatusEnum.WAIT, mail);
try {
//将请求数据写入本地文件之后向python传递文件路径参数
String dataPath = String.format(requestPath + File.separator + "job-%d-dataStr.txt", job.getJobId());
if (dataStream != null) {
FileUtil.writeFromStream(dataStream, dataPath);
} else {
FileUtils.writeStringToFile(dataPath, dataStr);
}
//将jobId和dataPath参数添加至param中
param.put("jobId", job.getJobId());
param.put("requestDataPath", dataPath);
param.put("resultDataPath", resultDataPath);
//更新数据库param字段
job.setParam(JSON.toJSONString(param));
jobMapper.updateById(job);
runNextJob();
} catch (Exception e) {
e.printStackTrace();
updateJobStatus(job.getJobId(), JobStatusEnum.FAIL);
}
return getJobInfo(job.getJobId());
}
public Result getJobInfo(Integer jobId) {
Job job = jobMapper.selectById(jobId);
if (job == null)
return Result.getResult(CommonError.CONTENT_NOT_FOUND);
return Result.success(JobDto.parseJob(job));
}
public Result updateJobStatus(Integer jobId, JobStatusEnum status) {
return updateJobStatus(jobId, status, null);
}
public Result updateJobStatus(Integer jobId, JobStatusEnum status, String result) {
System.out.println(status.getRemark());
Job job = jobMapper.selectById(jobId);
if (job == null)
return Result.getResult(CommonError.CONTENT_NOT_FOUND);
switch (status) {
case WAIT:
break;
case SUCCESS:
job.setCompleteTime(LocalDateTime.now());
job.setResult(result);
break;
case FAIL:
case TIMEOUT:
if (!job.getStatus().equals(JobStatusEnum.RUNNING.getKey())
&& !job.getStatus().equals(JobStatusEnum.REQED.getKey()))
return Result.getResult(JobError.STATUS_UPDATE_FAIL);
job.setCompleteTime(LocalDateTime.now());
break;
case RUNNING:
if (!job.getStatus().equals(JobStatusEnum.REQED.getKey()))
return Result.getResult(JobError.STATUS_UPDATE_FAIL);
job.setCreateTime(LocalDateTime.now());
break;
}
job.setStatus(status.getKey());
jobMapper.updateById(job);
sendEmail(jobId, status, job.getMail());
runNextJob();
return getJobInfo(jobId);
}
public Result getJobList(Integer type) {
List<Job> jobs = jobMapper.selectJobList(type, 200);
return Result.success(jobs.stream().map(JobLessDto::parseJob).collect(Collectors.toList()));
}
/**
* 运行下一个job
*/
private synchronized void runNextJob() {
Job nextJob = null;
try {
//并发数小于concurrentNum运行该job
if (jobMapper.selectRunningJobs().size() < concurrentNum) {
if ((nextJob = jobMapper.selectNextWaitingJob()) != null) {
//更新job状态
nextJob.setStatus(JobStatusEnum.REQED.getKey());
jobMapper.updateById(nextJob);
waitRunning(nextJob.getJobId());
String logFilePath = String.format(logPath + File.separator + "task-log-%s.txt", nextJob.getJobId());
String cmd = String.format("%s -setting '%s' >> %s 2>&1", pythonCmd, nextJob.getParam(), logFilePath);
log.info("执行命令: " + cmd);
String[] cmds = new String[]{"/bin/sh", "-c", cmd};
Runtime.getRuntime().exec(cmds);
}
}
} catch (Exception e) {
e.printStackTrace();
if (nextJob != null) {
nextJob.setStatus(JobStatusEnum.FAIL.getKey());
jobMapper.updateById(nextJob);
}
}
}
/**
* 等待运行超时时间60秒
*/
private void waitRunning(Integer jobId) {
//等待60秒检查是否已运行
taskExecutorUtil.schedule(() -> {
Job job = jobMapper.selectById(jobId);
if (job.getStatus().equals(JobStatusEnum.REQED.getKey())) {
updateJobStatus(jobId, JobStatusEnum.FAIL);
}
}, 60, TimeUnit.SECONDS);
//等待4小时查看是否执行完成
taskExecutorUtil.schedule(() -> {
Job job = jobMapper.selectById(jobId);
if (job.getStatus().equals(JobStatusEnum.RUNNING.getKey())) {
updateJobStatus(jobId, JobStatusEnum.TIMEOUT);
}
}, 4, TimeUnit.HOURS);
}
private void sendEmail(Integer jobId, JobStatusEnum status, String mail) {
String content = null;
switch (status) {
case WAIT:
content = String.format(RECEIVED_EMAIL_CONTENT, jobId);
break;
case SUCCESS:
content = String.format(SUCCESS_EMAIL_CONTENT, jobId);
break;
case FAIL:
content = String.format(FAIL_EMAIL_CONTENT, jobId);
break;
case TIMEOUT:
content = String.format(TIMEOUT_EMAIL_CONTENT, jobId);
break;
case RUNNING:
content = String.format(START_RUNNING_EMAIL_CONTENT, jobId);
break;
}
if (content != null)
mailUtil.send(mail, SUBJECT, content);
}
}