比較提交

..

沒有共同的提交。「master」和「zzy-dev」的歷史完全不同。

共有 19 個檔案被更改,包括 418 行新增573 行删除

查看文件

@ -1,27 +0,0 @@
package com.weilab.biology.config;
import com.weilab.biology.service.JobService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.EnableScheduling;
import org.springframework.scheduling.annotation.Scheduled;
@Configuration
@EnableScheduling
@Slf4j
public class StaticScheduleTask {
@Autowired
private JobService jobService;
/**
* 一分钟执行一次
*/
@Scheduled(fixedRate = 60 * 1000L)
public void crawlStudyRoomFrequently() {
jobService.runJob();
}
}

查看文件

@ -3,10 +3,9 @@ package com.weilab.biology.controller;
import cn.hutool.core.io.FileUtil;
import cn.hutool.core.util.IdUtil;
import cn.hutool.core.util.StrUtil;
import cn.hutool.json.JSONUtil;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.weilab.biology.core.data.dto.AppConfigDto;
import com.baomidou.mybatisplus.core.toolkit.StringUtils;
import com.weilab.biology.core.data.dto.JobDto;
import com.weilab.biology.core.data.enums.JobStatusEnum;
import com.weilab.biology.core.data.po.Job;
@ -15,19 +14,18 @@ import com.weilab.biology.core.data.vo.result.Result;
import com.weilab.biology.core.data.vo.result.error.JobError;
import com.weilab.biology.core.validation.EnumValidation;
import com.weilab.biology.mapper.JobMapper;
import com.weilab.biology.service.AppConfigService;
import com.weilab.biology.service.JobService;
import com.weilab.biology.util.JobIdUtil;
import com.weilab.biology.util.FileUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.web.bind.annotation.*;
import org.springframework.web.multipart.MultipartFile;
import java.io.BufferedInputStream;
import java.io.IOException;
import java.time.LocalDateTime;
import java.util.Arrays;
import java.util.List;
@RestController
@ -37,31 +35,77 @@ public class JobController {
@Autowired
private JobService jobService;
@Autowired
private AppConfigService appConfigService;
@Autowired
private JobMapper jobMapper;
@Value("${biology.request-path}")
private String requestPath;
@PostMapping("/submit")
public Result submit(@RequestParam String appName,
@RequestParam(required = false) String jobId,
public Result submit(@RequestParam(required = false) Long jobId,
@RequestParam(required = false) String dataStr,
@RequestParam(required = false) MultipartFile dataFile,
@RequestParam(defaultValue = "{}") String param,
@RequestParam String param,
@RequestParam String mail,
@RequestParam(defaultValue = "0") Integer type) {
if (dataFile == null && StrUtil.isBlank(dataStr)) {
@RequestParam(defaultValue = "0") Integer type,
@RequestParam(required = false) MultipartFile file1,
@RequestParam(required = false) MultipartFile file2,
@RequestParam(required = false) MultipartFile file3,
@RequestParam(required = false) MultipartFile file4,
@RequestParam(required = false) MultipartFile file5,
@RequestParam(required = false) MultipartFile file6,
@RequestParam(required = false) MultipartFile file7,
@RequestParam(required = false) MultipartFile file8,
@RequestParam(required = false) MultipartFile file9,
@RequestParam(required = false) MultipartFile file10,
@RequestParam(required = false) MultipartFile file11,
@RequestParam(required = false) MultipartFile file12,
@RequestParam(required = false) MultipartFile file13,
@RequestParam(required = false) MultipartFile file14,
@RequestParam(required = false) MultipartFile file15,
@RequestParam(required = false) MultipartFile file16,
@RequestParam(required = false) MultipartFile file17,
@RequestParam(required = false) MultipartFile file18,
@RequestParam(required = false) MultipartFile file19,
@RequestParam(required = false) MultipartFile file20) {
if (dataFile == null && StringUtils.isBlank(dataStr)) {
return Result.getResult(JobError.PARAM_CAN_NOT_BE_EMPTY);
}
AppConfigDto appConfig = appConfigService.getAppConfig(appName);
if (appConfig == null) {
return Result.getResult(JobError.APP_NOT_FOUND);
// 解析param字符串为map对象
JSONObject reqParamMap = null;
try {
reqParamMap = JSON.parseObject(param);
} catch (Exception e) {
return Result.getResult(CommonError.PARAM_WRONG);
}
// 写入file文件并将文件路径放入paramMap
try {
List<MultipartFile> files = Arrays.asList(file1, file2, file3, file4, file5,
file6, file7, file8, file9, file10, file11, file12, file13, file14, file15,
file16, file17, file18, file19, file20);
for (MultipartFile file : files) {
if (file != null) {
String filePath = requestPath + FileUtil.FILE_SEPARATOR + "file" + FileUtil.FILE_SEPARATOR
+ IdUtil.fastUUID() + "." + FileUtil.extName(file.getOriginalFilename());
FileUtil.writeFromStream(file.getInputStream(), filePath);
reqParamMap.put(file.getName(), filePath);
}
}
} catch (IOException e) {
e.printStackTrace();
return Result.getResult(JobError.FILE_READ_FAIL);
}
// 将请求数据file转为InputStream
BufferedInputStream dataStream = null;
if (dataFile != null) {
dataStream = FileUtil.getInputStream(FileUtils.multipartToFile(dataFile));
}
Job job = null;
if (StrUtil.isNotBlank(jobId)) {
if (jobId != null) {
// jobId不为空从数据库获取已创建的任务
job = jobMapper.selectById(jobId);
if (job == null) {
@ -69,56 +113,36 @@ public class JobController {
}
} else {
job = new Job();
job.setJobId(JobIdUtil.newId());
}
// 解析param字符串为map对象
JSONObject reqParamMap = null;
try {
reqParamMap = JSON.parseObject(param);
reqParamMap.put("type", type);
} catch (Exception e) {
return Result.getResult(CommonError.PARAM_WRONG);
if (!StrUtil.isEmpty(job.getParam())) {
JSONObject params = JSON.parseObject(job.getParam());
params.putAll(reqParamMap);
reqParamMap = params;
job.setParam(JSON.toJSONString(reqParamMap));
}
// 将reqParam覆盖添加到原有job的param中
JSONObject params = JSON.parseObject(job.getParam());
if (params == null) {
params = new JSONObject();
}
params.putAll(reqParamMap);
job.setParam(JSON.toJSONString(params));
job.setAppId(appConfig.getAppId());
job.setData("");
job.setMail(mail);
job.setStatus(JobStatusEnum.WAITING.getKey());
job.setCreateTime(LocalDateTime.now());
job.setStatus(JobStatusEnum.WAIT.getKey());
job.setRequestTime(LocalDateTime.now());
job.setType(type);
return jobService.submit(appConfig, dataStr, dataFile, reqParamMap, job);
return jobService.submit(dataStr, dataStream, reqParamMap, job);
}
@PostMapping("/create")
public Result createJob(@RequestParam String appName) {
AppConfigDto appConfig = appConfigService.getAppConfig(appName);
if (appConfig == null) {
return Result.getResult(JobError.APP_NOT_FOUND);
}
public Result createJob() {
Job job = new Job();
job.setJobId(JobIdUtil.newId());
job.setAppId(appConfig.getAppId());
job.setParam("{}");
job.setStatus(JobStatusEnum.CREATING.getKey());
job.setCreateTime(LocalDateTime.now());
job.setRequestTime(LocalDateTime.now());
jobMapper.insert(job);
return Result.success(JobDto.parseJob(job));
}
/**
* 上传文件接口将文件地址保存至本地并将路径放入参数中
*/
@PostMapping("/file/upload")
public Result uploadFile(@RequestParam String jobId,
public Result uploadFile(@RequestParam Long jobId,
@RequestParam String fileKey,
@RequestParam MultipartFile file) {
Job job = jobMapper.selectById(jobId);
@ -126,65 +150,62 @@ public class JobController {
return Result.getResult(JobError.JOB_NOT_FOUND);
}
AppConfigDto appConfig = AppConfigDto.parse(appConfigService.getById(job.getAppId()));
JSONObject params = JSON.parseObject(job.getParam());
try {
String extName = FileUtil.extName(file.getOriginalFilename());
if (!StrUtil.isEmpty(extName)) {
extName = StrUtil.DOT + extName;
}
String filePath = appConfig.getRequestPath() + jobId + FileUtil.FILE_SEPARATOR
+ IdUtil.fastUUID() + extName;
FileUtil.writeFromStream(file.getInputStream(), filePath);
params.put(fileKey, filePath);
if (file != null) {
String extName = FileUtil.extName(file.getOriginalFilename());
if (!StrUtil.isEmpty(extName)) {
extName = StrUtil.DOT + extName;
}
String filePath = requestPath + FileUtil.FILE_SEPARATOR + jobId + FileUtil.FILE_SEPARATOR
+ IdUtil.fastUUID() + extName;
FileUtil.writeFromStream(file.getInputStream(), filePath);
params.put(fileKey, filePath);
job.setParam(JSON.toJSONString(params));
jobMapper.updateById(job);
return Result.success(JobDto.parseJob(job));
job.setParam(JSON.toJSONString(params));
jobMapper.updateById(job);
return Result.success(JobDto.parseJob(job));
}
} catch (Exception e) {
e.printStackTrace();
return Result.fail();
}
return Result.fail();
}
@PostMapping("/status/update")
public Result updateJobStatus(@RequestParam String jobId,
@EnumValidation(clazz = JobStatusEnum.class, message = "没有此状态") @RequestParam Integer status,
public Result updateJobStatus(@RequestParam Integer jobId,
@EnumValidation(clazz = JobStatusEnum.class, message = "没有此状态")
@RequestParam Integer status,
@RequestParam(required = false) String result) {
if (status.equals(JobStatusEnum.WAITING.getKey())) {
if (status.equals(JobStatusEnum.WAIT.getKey()))
return Result.getResult(CommonError.PARAM_WRONG);
}
if (status.equals(JobStatusEnum.SUCCESS.getKey())) {
if (!JSONUtil.isJson(result)) {
if (StringUtils.isBlank(result))
return Result.getResult(CommonError.PARAM_WRONG);
try {
JSON.parseObject(result);
} catch (Exception e) {
return Result.getResult(CommonError.PARAM_WRONG);
}
}
Job job = jobMapper.selectById(jobId);
if (job == null) {
return Result.getResult(CommonError.CONTENT_NOT_FOUND);
}
return jobService.updateJobStatus(job, JobStatusEnum.getEnumByKey(status), result);
return jobService.updateJobStatus(jobId, JobStatusEnum.getEnumByKey(status), result);
}
@GetMapping("/info/{jobId}")
public Result getJobInfo(@PathVariable String jobId) {
public Result getJobInfo(@PathVariable Integer jobId) {
return jobService.getJobInfo(jobId);
}
@GetMapping("/list")
public Result getJobList(@RequestParam String appName,
@RequestParam(required = false) Integer type,
public Result getJobList(@RequestParam(required = false) Integer type) {
return jobService.getJobList(type);
}
@GetMapping("/list/v2")
public Result getJobList(@RequestParam(required = false) Integer type,
@RequestParam(defaultValue = "true") Boolean filterCreating,
@RequestParam(defaultValue = "1") Integer page,
@RequestParam(defaultValue = "100") Integer size) {
AppConfigDto appConfig = appConfigService.getAppConfig(appName);
if (appConfig == null) {
return Result.getResult(JobError.APP_NOT_FOUND);
}
return jobService.getJobList(appConfig.getAppId(), type, filterCreating, page, size);
return jobService.getJobList(type, filterCreating, page, size);
}
}

查看文件

@ -1,94 +0,0 @@
package com.weilab.biology.core.data.dto;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.weilab.biology.core.data.po.AppConfig;
import lombok.Data;
import lombok.EqualsAndHashCode;
import lombok.experimental.Accessors;
import java.io.File;
import java.io.Serializable;
/**
* 应用配置信息Dto
*
* @author yurui
* @date 2023/5/5
*/
@Data
@EqualsAndHashCode(callSuper = false)
@Accessors(chain = true)
public class AppConfigDto implements Serializable {
private static final long serialVersionUID = 1L;
/**
* 应用ID
*/
private Integer appId;
/**
* 应用名
*/
private String appName;
/**
* 并发数
*/
private Integer concurrentNum;
/**
* 命令行
*/
private String cmd;
/**
* 任务超时时间(单位:分钟)
*/
private Long timeout;
/**
* 放置文件的路径
*/
private String localPath;
/**
* 放置请求参数文件的路径
*/
private String requestPath;
/**
* 放置结果文件的路径
*/
private String resultPath;
/**
* 放置日志文件的路径
*/
private String logPath;
/**
* 邮件格式(json格式:{"subject":"**","success":"**"})
*/
private JSONObject emailTemplate;
public static AppConfigDto parse(AppConfig o) {
if (o == null) {
return null;
}
return new AppConfigDto()
.setAppId(o.getAppId())
.setAppName(o.getAppName())
.setConcurrentNum(o.getConcurrentNum())
.setCmd(o.getCmd())
.setTimeout(o.getTimeout())
.setLocalPath(o.getLocalPath())
.setRequestPath(o.getLocalPath() + File.separator + "request" + File.separator)
.setResultPath(o.getLocalPath() + File.separator + "result" + File.separator)
.setLogPath(o.getLocalPath() + File.separator + "log" + File.separator)
.setEmailTemplate(JSON.parseObject(o.getEmailTemplate()));
}
}

查看文件

@ -2,6 +2,9 @@ package com.weilab.biology.core.data.dto;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.baomidou.mybatisplus.annotation.IdType;
import com.baomidou.mybatisplus.annotation.TableField;
import com.baomidou.mybatisplus.annotation.TableId;
import com.fasterxml.jackson.annotation.JsonFormat;
import com.weilab.biology.core.data.enums.JobStatusEnum;
import com.weilab.biology.core.data.po.Job;
@ -9,20 +12,17 @@ import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import java.io.Serializable;
import java.time.LocalDateTime;
@Data
@AllArgsConstructor
@NoArgsConstructor
public class JobDto implements Serializable {
private static final long serialVersionUID = 1L;
public class JobDto {
/**
* jobId
*/
private String jobId;
private Integer jobId;
/**
* 任务状态

查看文件

@ -21,7 +21,7 @@ public class JobLessDto {
/**
* jobId
*/
private String jobId;
private Integer jobId;
/**
* 任务状态

查看文件

@ -6,37 +6,16 @@ import java.util.HashMap;
import java.util.Map;
/**
* 任务状态的枚举类
* 校区的枚举类
*/
@Getter
public enum JobStatusEnum {
/**
* 等待运行
*/
WAITING(0, "waiting"),
/**
* 正在运行
*/
WAIT(0, "waiting"),
RUNNING(1, "running"),
/**
* 任务执行成功
*/
SUCCESS(2, "success"),
/**
* 已发送请求
*/
REQED(3, "requested"),
/**
* 任务执行失败
*/
FAIL(-1, "failed"),
/**
* 运行超时
*/
TIMEOUT(-2, "timeout"),
/**
* 正在创建
*/
CREATING(-3, "creating"),
;

查看文件

@ -1,71 +0,0 @@
package com.weilab.biology.core.data.po;
import com.baomidou.mybatisplus.annotation.TableField;
import com.baomidou.mybatisplus.annotation.TableName;
import com.baomidou.mybatisplus.annotation.IdType;
import com.baomidou.mybatisplus.annotation.TableId;
import java.io.Serializable;
import lombok.Data;
import lombok.EqualsAndHashCode;
import lombok.experimental.Accessors;
/**
* <p>
* 应用配置信息
* </p>
*
* @author skyyemperor
*/
@Data
@EqualsAndHashCode(callSuper = false)
@Accessors(chain = true)
@TableName("app_config")
public class AppConfig implements Serializable {
private static final long serialVersionUID = 1L;
/**
* 应用ID
*/
@TableId(value = "app_id", type = IdType.AUTO)
private Integer appId;
/**
* 应用名
*/
@TableField("app_name")
private String appName;
/**
* 并发数
*/
@TableField("concurrent_num")
private Integer concurrentNum;
/**
* 命令行
*/
@TableField("cmd")
private String cmd;
/**
* 任务超时时间(单位:分钟)
*/
@TableField("timeout")
private Long timeout;
/**
* 放置文件的路径
*/
@TableField("local_path")
private String localPath;
/**
* 邮件格式(json格式:{"subject":"**","success":"**","waiting":"**".....})
* 参照com.weilab.biology.core.data.enums.JobStatusEnum
*/
@TableField("email_template")
private String emailTemplate;
}

查看文件

@ -4,13 +4,17 @@ import com.baomidou.mybatisplus.annotation.IdType;
import com.baomidou.mybatisplus.annotation.TableField;
import com.baomidou.mybatisplus.annotation.TableId;
import com.baomidou.mybatisplus.annotation.TableName;
import java.io.Serializable;
import java.time.LocalDate;
import java.time.LocalDateTime;
import java.util.Date;
import jnr.ffi.annotations.In;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import java.io.Serializable;
import java.time.LocalDateTime;
/**
* Created by skyyemperor on 2021-09-19
* Description :
@ -23,14 +27,14 @@ public class Job implements Serializable {
/**
* jobId
*/
@TableId(value = "job_id", type = IdType.INPUT)
private String jobId;
@TableId(value = "job_id", type = IdType.AUTO)
private Integer jobId;
/**
* 应用ID
* 基因序列数据
*/
@TableField(value = "app_id")
private Integer appId;
@TableField(value = "`data`")
private String data;
/**
* 请求参数
@ -75,12 +79,13 @@ public class Job implements Serializable {
private LocalDateTime completeTime;
/**
* 任务类型
* 请求类型
*/
@TableField(value = "type")
private Integer type;
public Job(String data, String param, String mail, Integer status, LocalDateTime requestTime, Integer type) {
this.data = data;
this.param = param;
this.mail = mail;
this.status = status;

查看文件

@ -9,9 +9,8 @@ import com.weilab.biology.core.data.vo.result.ResultError;
public enum JobError implements ResultError {
PARAM_CAN_NOT_BE_EMPTY(40100, "文本框和文件不能同时为空"),
FILE_READ_FAIL(40101, "文件读取出错"),
STATUS_UPDATE_FAIL(40102, "当前状态下不允许更新为指定状态"),
JOB_NOT_FOUND(40103, "任务不存在"),
APP_NOT_FOUND(40104, "应用不存在"),
STATUS_UPDATE_FAIL(40102,"当前状态下不允许更新为指定状态"),
JOB_NOT_FOUND(40103,"当前状态下不允许更新为指定状态"),
;
private int code;

查看文件

@ -1,21 +0,0 @@
package com.weilab.biology.mapper;
import com.weilab.biology.core.data.po.AppConfig;
import com.baomidou.mybatisplus.core.mapper.BaseMapper;
import org.apache.ibatis.annotations.Mapper;
import org.apache.ibatis.annotations.Param;
/**
* <p>
* 应用配置信息 Mapper 接口
* </p>
*
* @author
* @since 2023-05-05
*/
@Mapper
public interface AppConfigMapper extends BaseMapper<AppConfig> {
AppConfig getAppConfigByAppName(@Param("appName") String appName);
}

查看文件

@ -14,13 +14,14 @@ import java.util.List;
@Mapper
public interface JobMapper extends BaseMapper<Job> {
List<Job> selectRunningJobs(@Param("appId") Integer appId);
List<Job> selectRunningJobs();
List<Job> selectWaitingJobs(@Param("appId") Integer appId, @Param("size") Integer size);
Job selectNextWaitingJob();
List<Job> selectJobListByPage(@Param("appId") Integer appId,
@Param("type") Integer type,
@Param("filterCreating") Boolean filterCreating,
List<Job> selectJobList(@Param("type") Integer type);
List<Job> selectJobListByPage(@Param("type") Integer type,
@Param("filterCreating") Boolean filterCreating,
@Param("offset") Integer offset,
@Param("count") Integer count);

查看文件

@ -1,35 +0,0 @@
package com.weilab.biology.service;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import com.weilab.biology.core.data.dto.AppConfigDto;
import com.weilab.biology.core.data.po.AppConfig;
import com.weilab.biology.mapper.AppConfigMapper;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
/**
* <p>
* 应用配置信息 服务实现类
* </p>
*
* @author
* @since 2023-05-05
*/
@Service
public class AppConfigService extends ServiceImpl<AppConfigMapper, AppConfig> {
@Autowired
private AppConfigMapper appConfigMapper;
/**
* 通过appName获取应用配置
*
* @param appName 唯一应用名
* @return AppConfigDto
*/
public AppConfigDto getAppConfig(String appName) {
return AppConfigDto.parse(appConfigMapper.getAppConfigByAppName(appName));
}
}

查看文件

@ -1,11 +1,8 @@
package com.weilab.biology.service;
import cn.hutool.core.io.FileUtil;
import cn.hutool.core.util.StrUtil;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import com.weilab.biology.core.data.dto.AppConfigDto;
import com.weilab.biology.core.data.dto.JobDto;
import com.weilab.biology.core.data.dto.JobLessDto;
import com.weilab.biology.core.data.enums.JobStatusEnum;
@ -14,15 +11,17 @@ import com.weilab.biology.core.data.vo.result.CommonError;
import com.weilab.biology.core.data.vo.result.Result;
import com.weilab.biology.core.data.vo.result.error.JobError;
import com.weilab.biology.mapper.JobMapper;
import com.weilab.biology.util.FileUtils;
import com.weilab.biology.util.MailUtil;
import com.weilab.biology.util.TaskExecutorUtil;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import org.springframework.web.multipart.MultipartFile;
import java.io.IOException;
import java.io.BufferedInputStream;
import java.io.File;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.List;
@ -35,7 +34,22 @@ import java.util.stream.Collectors;
*/
@Service
@Slf4j
public class JobService extends ServiceImpl<JobMapper, Job> {
public class JobService {
@Value("${biology.python-cmd}")
private String pythonCmd;
@Value("${biology.request-path}")
private String requestPath;
@Value("${biology.result-path}")
private String resultDataPath;
@Value("${biology.log-path}")
private String logPath;
@Value("${biology.concurrent-num}")
private Integer concurrentNum;
@Autowired
private JobMapper jobMapper;
@ -46,63 +60,79 @@ public class JobService extends ServiceImpl<JobMapper, Job> {
@Autowired
private TaskExecutorUtil<?> taskExecutorUtil;
@Autowired
private AppConfigService appConfigService;
@Value("${email.subject}")
private String SUBJECT;
@Value("${email.content.success}")
private String SUCCESS_EMAIL_CONTENT;
@Value("${email.content.fail}")
private String FAIL_EMAIL_CONTENT;
@Value("${email.content.timeout}")
private String TIMEOUT_EMAIL_CONTENT;
@Value("${email.content.received}")
private String RECEIVED_EMAIL_CONTENT;
@Value("${email.content.running}")
private String START_RUNNING_EMAIL_CONTENT;
/**
* 提交job
*/
@Transactional
public Result submit(AppConfigDto appConfig, String dataStr, MultipartFile dataFile, JSONObject param, Job job) {
public Result submit(String dataStr, BufferedInputStream dataStream, JSONObject param, Job job) {
jobMapper.insert(job);
sendEmail(job, JobStatusEnum.WAIT, job.getMail());
try {
// dataStr和dataFile均为蛋白质序列两者含义相同只保留其一dataFile的优先级大于dataStr
// 若dataFile为空则将dataStr写入本地文件之后向python传递文件路径参数
String dataPath = String.format(appConfig.getRequestPath() + "job-%s-dataStr.txt", job.getJobId());
if (dataFile != null) {
FileUtil.writeFromStream(dataFile.getInputStream(), dataPath);
} else if (!StrUtil.isBlank(dataStr)) {
FileUtil.writeBytes(dataStr.getBytes(), dataPath);
//将请求数据写入本地文件之后向python传递文件路径参数
String dataPath = String.format(requestPath + File.separator + "job-%d-dataStr.txt", job.getJobId());
if (dataStream != null) {
FileUtil.writeFromStream(dataStream, dataPath);
} else {
FileUtils.writeStringToFile(dataPath, dataStr);
}
//将jobId和dataPath参数添加至param中
param.put("jobId", job.getJobId());
param.put("requestDataPath", dataPath);
param.put("resultDataPath", appConfig.getResultPath());
} catch (IOException e) {
throw new RuntimeException(e);
}
param.put("resultDataPath", resultDataPath);
// 更新数据库param字段
job.setParam(JSON.toJSONString(param));
this.saveOrUpdate(job);
updateJobStatus(job, JobStatusEnum.WAITING);
//更新数据库param字段
job.setParam(JSON.toJSONString(param));
jobMapper.updateById(job);
runNextJob();
} catch (Exception e) {
e.printStackTrace();
updateJobStatus(job.getJobId(), JobStatusEnum.FAIL);
}
return getJobInfo(job.getJobId());
}
public Result getJobInfo(String jobId) {
public Result getJobInfo(Integer jobId) {
Job job = jobMapper.selectById(jobId);
if (job == null)
return Result.getResult(CommonError.CONTENT_NOT_FOUND);
return Result.success(JobDto.parseJob(job));
}
public Result updateJobStatus(Integer jobId, JobStatusEnum status) {
return updateJobStatus(jobId, status, null);
}
public Result updateJobStatus(Integer jobId, JobStatusEnum status, String result) {
System.out.println(status.getRemark());
Job job = jobMapper.selectById(jobId);
if (job == null) {
return Result.getResult(CommonError.CONTENT_NOT_FOUND);
}
return Result.success(JobDto.parseJob(job));
}
public Result updateJobStatus(Job job, JobStatusEnum status) {
return updateJobStatus(job, status, null);
}
/**
* 更新任务状态
*
* @param job 任务
* @param status 状态
* @param result 任务运行结果
*/
public Result updateJobStatus(Job job, JobStatusEnum status, String result) {
switch (status) {
case WAITING:
case CREATING:
case WAIT:
break;
case SUCCESS:
job.setCompleteTime(LocalDateTime.now());
@ -111,7 +141,7 @@ public class JobService extends ServiceImpl<JobMapper, Job> {
case FAIL:
case TIMEOUT:
if (!job.getStatus().equals(JobStatusEnum.RUNNING.getKey())
&& !job.getStatus().equals(JobStatusEnum.REQED.getKey())) {
&& !job.getStatus().equals(JobStatusEnum.REQED.getKey())){
return Result.getResult(JobError.STATUS_UPDATE_FAIL);
}
job.setCompleteTime(LocalDateTime.now());
@ -120,7 +150,7 @@ public class JobService extends ServiceImpl<JobMapper, Job> {
if (!job.getStatus().equals(JobStatusEnum.REQED.getKey())) {
return Result.getResult(JobError.STATUS_UPDATE_FAIL);
}
job.setRequestTime(LocalDateTime.now());
job.setCreateTime(LocalDateTime.now());
break;
}
@ -128,96 +158,104 @@ public class JobService extends ServiceImpl<JobMapper, Job> {
jobMapper.updateById(job);
sendEmail(job, status, job.getMail());
runNextJob();
return getJobInfo(job.getJobId());
return getJobInfo(jobId);
}
public Result getJobList(Integer appId, Integer type, Boolean filterCreating, Integer page, Integer size) {
List<Job> jobs = jobMapper.selectJobListByPage(appId, type, filterCreating, (page - 1) * size, size);
public Result getJobList(Integer type) {
List<Job> jobs = jobMapper.selectJobList(type);
return Result.success(jobs.stream().map(JobLessDto::parseJob).collect(Collectors.toList()));
}
public Result getJobList(Integer type, Boolean filterCreating, Integer page, Integer size) {
List<Job> jobs = jobMapper.selectJobListByPage(type, filterCreating, (page - 1) * size, size);
return Result.success(jobs.stream().map(JobLessDto::parseJob).collect(Collectors.toList()));
}
/**
* 运行job
* 运行下一个job
*/
public synchronized void runJob() {
List<AppConfigDto> appConfigList = appConfigService.list().stream()
.map(AppConfigDto::parse)
.collect(Collectors.toList());
appConfigList.forEach(appConfig -> {
//并发数小于concurrentNum运行job
int size = appConfig.getConcurrentNum() - jobMapper.selectRunningJobs(appConfig.getAppId()).size();
if (size <= 0) {
return;
}
jobMapper.selectWaitingJobs(appConfig.getAppId(), size)
.forEach(job -> runJob(appConfig, job));
});
}
private void runJob(AppConfigDto appConfig, Job job) {
private synchronized void runNextJob() {
Job nextJob = null;
try {
FileUtil.mkdir(appConfig.getLogPath());
String logFilePath = String.format(appConfig.getLogPath() + "task-log-%s.txt", job.getJobId());
String cmd = String.format("%s -setting '%s' >> %s 2>&1", appConfig.getCmd(), job.getParam(), logFilePath);
String[] cmds = new String[]{"/bin/sh", "-c", cmd};
Runtime.getRuntime().exec(cmds);
log.info("执行命令: " + cmd);
//并发数小于concurrentNum运行该job
if (jobMapper.selectRunningJobs().size() < concurrentNum) {
if ((nextJob = jobMapper.selectNextWaitingJob()) != null) {
//更新job状态
nextJob.setStatus(JobStatusEnum.REQED.getKey());
jobMapper.updateById(nextJob);
// 更新job状态
updateJobStatus(job, JobStatusEnum.REQED);
waitRunning(nextJob.getJobId());
// 异步检测超时
asyncScheduleTask(appConfig, job.getJobId());
String logFilePath = String.format(logPath + File.separator + "task-log-%s.txt", nextJob.getJobId());
String cmd = String.format("%s -setting '%s' >> %s 2>&1", pythonCmd, nextJob.getParam(), logFilePath);
log.info("执行命令: " + cmd);
String[] cmds = new String[]{"/bin/sh", "-c", cmd};
Runtime.getRuntime().exec(cmds);
}
}
} catch (Exception e) {
e.printStackTrace();
updateJobStatus(job, JobStatusEnum.FAIL);
if (nextJob != null) {
nextJob.setStatus(JobStatusEnum.FAIL.getKey());
jobMapper.updateById(nextJob);
}
}
}
/**
* 开启异步定时任务校验超时任务
* 等待运行
*/
private void asyncScheduleTask(AppConfigDto appConfig, String jobId) {
private void waitRunning(Integer jobId) {
//等待120秒检查是否已运行
taskExecutorUtil.schedule(() -> {
Job job = jobMapper.selectById(jobId);
if (job.getStatus().equals(JobStatusEnum.REQED.getKey())) {
updateJobStatus(job, JobStatusEnum.FAIL);
updateJobStatus(jobId, JobStatusEnum.FAIL);
}
}, 120, TimeUnit.SECONDS);
//等待一段时间后查看是否执行完成
//等待14小时查看是否执行完成
taskExecutorUtil.schedule(() -> {
Job job = jobMapper.selectById(jobId);
if (job.getStatus().equals(JobStatusEnum.RUNNING.getKey())) {
updateJobStatus(job, JobStatusEnum.TIMEOUT);
updateJobStatus(jobId, JobStatusEnum.TIMEOUT);
}
}, appConfig.getTimeout(), TimeUnit.MINUTES);
}, 14, TimeUnit.HOURS);
}
private void sendEmail(Job job, JobStatusEnum status, String mail) {
AppConfigDto appConfig = AppConfigDto.parse(appConfigService.getById(job.getAppId()));
if (appConfig == null) {
return;
}
// 多个参数
String jobId = job.getJobId();
String time = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));
String param1 = job.getRequestTime().format(DateTimeFormatter.ofPattern("yyyyMMdd")) + job.getJobId();
String param2 = param1;
String content = appConfig.getEmailTemplate().getString(status.getRemark());
String subject = appConfig.getEmailTemplate().getString("subject");
if (StrUtil.isBlank(content) || StrUtil.isBlank(subject)) {
return;
String content = null;
switch (status) {
case WAIT:
content = RECEIVED_EMAIL_CONTENT;
break;
case SUCCESS:
content = SUCCESS_EMAIL_CONTENT;
break;
case FAIL:
content = FAIL_EMAIL_CONTENT;
break;
case TIMEOUT:
content = TIMEOUT_EMAIL_CONTENT;
break;
case RUNNING:
content = START_RUNNING_EMAIL_CONTENT;
break;
}
// 格式化填充参数
content = content.replaceAll("%jobId%", jobId).replaceAll("%time%", time);
subject = subject.replaceAll("%jobId%", jobId).replaceAll("%time%", time);
mailUtil.send(mail, subject, content);
if (content != null) {
mailUtil.send(mail, String.format(SUBJECT, param1), String.format(content, param1, param2));
}
}
}

查看文件

@ -0,0 +1,118 @@
package com.weilab.biology.util;
import org.springframework.web.multipart.MultipartFile;
import java.io.*;
/**
* 文件读取工具类
*/
public class FileUtils {
/**
* MultipartFile 转换成File
*
* @param multfile 原文件类型
* @return File
*/
public static File multipartToFile(MultipartFile multfile) {
File file = null;
try {
file = File.createTempFile("prefix", "_" + multfile.getOriginalFilename());
multfile.transferTo(file);
} catch (IOException e) {
e.printStackTrace();
}
return file;
}
/**
* 读取文件内容作为字符串返回
*/
public static String readFileAsString(String filePath) throws IOException {
File file = new File(filePath);
if (!file.exists()) {
throw new FileNotFoundException(filePath);
}
return readFileAsString(file);
}
/**
* 读取文件内容作为字符串返回
*/
public static String readFileAsString(File file) throws IOException {
StringBuilder sb = new StringBuilder((int) (file.length()));
// 创建字节输入流
FileInputStream fis = new FileInputStream(file);
// 创建一个长度为10240的Buffer
byte[] bbuf = new byte[10240];
// 用于保存实际读取的字节数
int hasRead = 0;
while ((hasRead = fis.read(bbuf)) > 0) {
sb.append(new String(bbuf, 0, hasRead));
}
fis.close();
return sb.toString();
}
/**
* 根据文件路径读取byte[] 数组
*/
public static byte[] readFileAsBytes(String filePath) throws IOException {
File file = new File(filePath);
if (!file.exists()) {
throw new FileNotFoundException(filePath);
} else {
return readFileAsBytes(file);
}
}
/**
* 根据文件读取byte[]数组
*/
public static byte[] readFileAsBytes(File file) throws IOException {
if (file == null) {
throw new FileNotFoundException();
} else {
ByteArrayOutputStream bos = new ByteArrayOutputStream((int) file.length());
BufferedInputStream in = null;
try {
in = new BufferedInputStream(new FileInputStream(file));
short bufSize = 1024;
byte[] buffer = new byte[bufSize];
int len1;
while (-1 != (len1 = in.read(buffer, 0, bufSize))) {
bos.write(buffer, 0, len1);
}
return bos.toByteArray();
} finally {
try {
if (in != null) {
in.close();
}
} catch (IOException var14) {
var14.printStackTrace();
}
bos.close();
}
}
}
public static void writeStringToFile(String filePath, String content) throws IOException {
BufferedOutputStream bufferedOutputStream = new BufferedOutputStream(new FileOutputStream(filePath));
bufferedOutputStream.write(content.getBytes());
bufferedOutputStream.flush();
bufferedOutputStream.close();
}
public static void writeByteToFile(String filePath, byte[] content) throws IOException {
BufferedOutputStream bufferedOutputStream = new BufferedOutputStream(new FileOutputStream(filePath));
bufferedOutputStream.write(content);
bufferedOutputStream.flush();
bufferedOutputStream.close();
}
}

查看文件

@ -1,20 +0,0 @@
package com.weilab.biology.util;
import cn.hutool.core.util.IdUtil;
import java.time.LocalDate;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
/**
* @author yurui
* @date 2023/6/13
*/
public class JobIdUtil {
public static String newId() {
return LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyyMMddHHmmss"))
+ IdUtil.fastUUID().substring(0, 8);
}
}

查看文件

@ -7,6 +7,9 @@ import org.springframework.mail.SimpleMailMessage;
import org.springframework.mail.javamail.JavaMailSenderImpl;
import org.springframework.stereotype.Component;
import java.util.HashMap;
import java.util.Map;
@Component
@Slf4j
public class MailUtil {

查看文件

@ -1,30 +0,0 @@
create table app_config
(
app_id int auto_increment comment '应用ID'
primary key,
app_name varchar(100) not null comment '应用名',
concurrent_num int default 1 not null comment '并发数',
cmd varchar(500) not null comment '命令行',
timeout bigint default 840 not null comment '任务超时时间(单位:分钟)',
local_path varchar(100) default '/data' not null comment '放置文件的路径',
email_template json not null comment '邮件格式(json格式:{"subject":"**","success":"**"})',
constraint app_config_app_name_uniq
unique (app_name)
)
comment '应用配置信息';
create table job
(
job_id varchar(200) not null comment 'jobId'
primary key,
app_id int not null comment '应用ID',
param varchar(10000) null comment '请求参数',
mail varchar(500) null comment '联系邮箱',
result text null comment '运行结果',
status tinyint null comment '任务状态。0为待运行,1为正在运行,2为运行成功,-1为运行失败',
request_time datetime null comment '请求时间',
create_time datetime null comment '创建时间',
complete_time datetime null comment '完成时间',
type int null comment '请求类型'
);

查看文件

@ -1,28 +0,0 @@
<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd">
<mapper namespace="com.weilab.biology.mapper.AppConfigMapper">
<!-- 通用查询映射结果 -->
<resultMap id="BaseResultMap" type="com.weilab.biology.core.data.po.AppConfig">
<id column="app_id" property="appId"/>
<result column="app_name" property="appName"/>
<result column="concurrent_num" property="concurrentNum"/>
<result column="cmd" property="cmd"/>
<result column="timeout" property="timeout"/>
<result column="local_path" property="localPath"/>
<result column="email_template" property="emailTemplate"/>
</resultMap>
<sql id="AppConfigSql">
SELECT app_id, app_name, concurrent_num, cmd, timeout, local_path, email_template
FROM app_config
</sql>
<select id="getAppConfigByAppName" resultType="com.weilab.biology.core.data.po.AppConfig"
parameterType="java.lang.String">
<include refid="AppConfigSql"/>
WHERE app_name = #{appName}
</select>
</mapper>

查看文件

@ -2,7 +2,8 @@
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd">
<mapper namespace="com.weilab.biology.mapper.JobMapper">
<resultMap id="BaseResultMap" type="com.weilab.biology.core.data.po.Job">
<id column="job_id" jdbcType="VARCHAR" property="jobId"/>
<id column="job_id" jdbcType="INTEGER" property="jobId"/>
<result column="data" jdbcType="LONGVARCHAR" property="data"/>
<result column="param" jdbcType="VARCHAR" property="param"/>
<result column="mail" jdbcType="VARCHAR" property="mail"/>
<result column="result" jdbcType="LONGVARCHAR" property="result"/>
@ -14,14 +15,15 @@
</resultMap>
<sql id="JobSql">
SELECT job_id,
`data`,
param,
mail,
`result`,
result,
`status`,
request_time,
create_time,
complete_time,
`type`
type
FROM job
</sql>
<sql id="JobLessSql">
@ -30,29 +32,34 @@
request_time,
create_time,
complete_time,
`type`
type
FROM job
</sql>
<select id="selectRunningJobs" resultMap="BaseResultMap">
<include refid="JobSql"/>
WHERE app_id = #{appId}
AND (status = '${@com.weilab.biology.core.data.enums.JobStatusEnum@RUNNING.getKey()}'
OR status = '${@com.weilab.biology.core.data.enums.JobStatusEnum@REQED.getKey()}')
WHERE status = '${@com.weilab.biology.core.data.enums.JobStatusEnum@RUNNING.getKey()}'
OR status = '${@com.weilab.biology.core.data.enums.JobStatusEnum@REQED.getKey()}'
</select>
<select id="selectWaitingJobs" resultMap="BaseResultMap">
<select id="selectNextWaitingJob" resultMap="BaseResultMap">
<include refid="JobSql"/>
WHERE app_id = #{appId}
AND status = '${@com.weilab.biology.core.data.enums.JobStatusEnum@WAITING.getKey()}'
ORDER BY create_time LIMIT #{size}
WHERE status = '${@com.weilab.biology.core.data.enums.JobStatusEnum@WAIT.getKey()}'
ORDER BY request_time LIMIT 1
</select>
<select id="selectJobList" resultMap="BaseResultMap">
<include refid="JobLessSql"/>
<where>
<if test="type != null">
AND type = #{type}
</if>
AND request_time > DATE_SUB(CURDATE(), INTERVAL 3 MONTH)
</where>
ORDER BY job_id DESC
<!-- LIMIT #{count}-->
</select>
<select id="selectJobListByPage" resultMap="BaseResultMap">
<include refid="JobLessSql"/>
<where>
<if test="appId != null">
AND app_id = #{appId}
</if>
<if test="type != null">
AND type = #{type}
</if>
@ -61,6 +68,6 @@
</if>
</where>
ORDER BY job_id DESC
LIMIT #{offset}, #{count}
LIMIT #{offset}, #{count}
</select>
</mapper>