diff --git a/README.md b/README.md index 53bd996e9df8136091821dff68e6ac6c6fb3e623..f3903c578e7f8dc640ffeb6c67a283f03b9595f5 100644 --- a/README.md +++ b/README.md @@ -8,7 +8,6 @@ ## 2.特性 * 基于SSH的脚本执行机制,部署简单快捷,仅需单个服务 * 基于Yarn Rest Api的任务状态同步机制,对Spark、Flink无版本限制 -* 支持分布式 * 支持失败重试 * 支持任务依赖 * 支持复杂任务编排(DAG) @@ -85,7 +84,6 @@ * 点击左侧操作栏“调度实例”可查看调度实例列表、运行状态和节点启动日志 ![image](https://gitee.com/meetyoucrop/big-whale/raw/master/doc/images/step11-schedule_instance.png) * 点击左侧操作栏“手动执行”可触发调度执行 -* 点击左侧操作栏“历史补数”可运行补数实例 ## 2.实时任务 ### 2.1 新增 * 目前支持“Spark Stream”和“Flink Stream”两种类型的流处理任务 diff --git a/doc/images/architecture.png b/doc/images/architecture.png index cf95004dc06196b1aa0da7defc16fc594758be1e..92fabdc8f25cbbbae818e4dd76646ca07cf79933 100644 Binary files a/doc/images/architecture.png and b/doc/images/architecture.png differ diff --git a/doc/images/step10-schedule_lsit.png b/doc/images/step10-schedule_lsit.png index f78d3d8fca55fdaf2d4dd7cd0deeed01dce13479..4a867c641aaa6826aa780aedadd6cf78c0549c55 100644 Binary files a/doc/images/step10-schedule_lsit.png and b/doc/images/step10-schedule_lsit.png differ diff --git a/script/big-whale.sql b/script/big-whale.sql index 24e0bebb1a32d488ac8f08943fe337800385324e..25d4cf224145dc898a0ce433292019e2d2d36e0f 100644 --- a/script/big-whale.sql +++ b/script/big-whale.sql @@ -464,6 +464,24 @@ ALTER TABLE `script_history` DROP TABLE `schedule_snapshot`; + +ALTER TABLE `script_history` + DROP COLUMN `schedule_supplement`, + DROP COLUMN `schedule_operate_time`, + ADD COLUMN `schedule_runnable` bit NULL AFTER `schedule_failure_handle`, + ADD COLUMN `schedule_retry` bit NULL AFTER `schedule_runnable`, + ADD COLUMN `schedule_empty` bit NULL AFTER `schedule_retry`, + ADD COLUMN `schedule_rerun` bit NULL AFTER `schedule_empty`, + ADD COLUMN `business_time` datetime NOT NULL AFTER `create_by`, + ADD COLUMN `delay_time` datetime NULL AFTER `business_time`, + ADD COLUMN `submit_time` datetime NULL AFTER `delay_time`; + + +ALTER TABLE `script_history` + ADD INDEX `schedule_runnable_index` (`schedule_runnable`) USING BTREE, + ADD INDEX `delay_time_index` (`delay_time`) USING BTREE; + + /*!40101 SET SQL_MODE=IFNULL(@OLD_SQL_MODE, '') */; /*!40014 SET FOREIGN_KEY_CHECKS=IF(@OLD_FOREIGN_KEY_CHECKS IS NULL, 1, @OLD_FOREIGN_KEY_CHECKS) */; /*!40101 SET CHARACTER_SET_CLIENT=@OLD_CHARACTER_SET_CLIENT */; diff --git a/src/main/java/com/meiyou/bigwhale/common/Constant.java b/src/main/java/com/meiyou/bigwhale/common/Constant.java index c7046eae3263781e74c3346b833317d620e15dcc..36e627e5db51a8e34623bbc1e892f4b37e420943 100644 --- a/src/main/java/com/meiyou/bigwhale/common/Constant.java +++ b/src/main/java/com/meiyou/bigwhale/common/Constant.java @@ -24,14 +24,21 @@ public interface Constant { String COMMON = "common"; String MONITOR = "monitor"; String SCHEDULE = "schedule"; - String SCRIPT_HISTORY = "scriptHistory"; + String SCRIPT_JOB = "scriptJob"; } /** * 执行状态 */ interface JobState { - String INITED = "INITED"; + + /** + * 调度扩展执行状态 + */ + String UN_CONFIRMED_ = "UN_CONFIRMED"; + String TIME_WAIT_ = "TIME_WAIT"; + + String SUBMIT_WAIT = "SUBMIT_WAIT"; String SUBMITTING = "SUBMITTING"; String SUBMITTED = "SUBMITTED"; String ACCEPTED = "ACCEPTED"; @@ -40,12 +47,7 @@ public interface Constant { String KILLED = "KILLED"; String FAILED = "FAILED"; String TIMEOUT = "TIMEOUT"; - /** - * 调度扩展执行状态 - */ - String UN_CONFIRMED_ = "UN_CONFIRMED"; - String WAITING_PARENT_ = "WAITING_PARENT"; - String PARENT_FAILED_ = "PARENT_FAILED"; + } /** @@ -77,6 +79,7 @@ public interface Constant { String FLINK_STREAM_UNUSUAL_RESTART = "flink实时任务异常(%s),已重启"; String FLINK_STREAM_UNUSUAL_RESTART_FAILED = "flink实时任务异常(%s),重启失败"; + String SERVER_UNEXPECTED_EXIT = "服务异常退出"; String APP_DUPLICATE = "应用重复"; String APP_NO_RUNNING = "应用未运行"; String APP_MEMORY_OVERLIMIT = "内存超限"; diff --git a/src/main/java/com/meiyou/bigwhale/controller/ScheduleController.java b/src/main/java/com/meiyou/bigwhale/controller/ScheduleController.java index 1e8c11efa711ae23975b24e104859e3d01d52614..d0150bd354c35bf5a0d3dffe1e53ed6608f7266a 100644 --- a/src/main/java/com/meiyou/bigwhale/controller/ScheduleController.java +++ b/src/main/java/com/meiyou/bigwhale/controller/ScheduleController.java @@ -1,6 +1,5 @@ package com.meiyou.bigwhale.controller; -import com.alibaba.fastjson.JSONObject; import com.meiyou.bigwhale.common.Constant; import com.meiyou.bigwhale.common.pojo.Msg; import com.meiyou.bigwhale.data.domain.PageRequest; @@ -9,8 +8,7 @@ import com.meiyou.bigwhale.dto.DtoSchedule; import com.meiyou.bigwhale.entity.Script; import com.meiyou.bigwhale.entity.Schedule; import com.meiyou.bigwhale.entity.ScriptHistory; -import com.meiyou.bigwhale.job.ScheduleJob; -import com.meiyou.bigwhale.job.ScriptHistoryShellRunnerJob; +import com.meiyou.bigwhale.scheduler.workflow.ScheduleJobBuilder; import com.meiyou.bigwhale.service.ScheduleService; import com.meiyou.bigwhale.security.LoginUser; import com.meiyou.bigwhale.service.ScriptHistoryService; @@ -24,7 +22,6 @@ import org.springframework.data.domain.Sort; import org.springframework.web.bind.annotation.*; import java.text.DateFormat; -import java.text.ParseException; import java.text.SimpleDateFormat; import java.util.*; import java.util.stream.Collectors; @@ -189,7 +186,32 @@ public class ScheduleController extends BaseController { keyword.append("_").append(script.getName()).append("_").append(script.getApp()); } schedule.setKeyword(keyword.toString().replaceAll("_null", "_-")); - scheduleService.update(schedule, scripts); + if (schedule.getEnabled()) { + Date effectTime = schedule.getStartTime().compareTo(now) <= 0 ? now : schedule.getStartTime(); + Date needFireTime = SchedulerUtils.getNeedFireTime(schedule.generateCron(), effectTime); + schedule.setNeedFireTime(needFireTime); + Date nextFireTime = SchedulerUtils.getNextFireTime(schedule.generateCron(), effectTime); + if (nextFireTime.compareTo(schedule.getEndTime()) <= 0) { + schedule.setNextFireTime(nextFireTime); + } + } else { + schedule.setNeedFireTime(null); + schedule.setNextFireTime(null); + } + if (schedule.getId() != null) { + SchedulerUtils.interrupt(schedule.getId(), Constant.JobGroup.SCHEDULE); + SchedulerUtils.deleteJob(schedule.getId(), Constant.JobGroup.SCHEDULE); + } + schedule = scheduleService.update(schedule, scripts); + scriptHistoryService.deleteFuture(schedule.getId(), new Date()); + if (schedule.getEnabled()) { + if (schedule.getNextFireTime() != null && schedule.getNextFireTime().compareTo(schedule.getEndTime()) <= 0) { + DateFormat dateFormat = new SimpleDateFormat("yyyyMMddHHmmss"); + String scheduleInstanceId = dateFormat.format(schedule.getNextFireTime()); + scriptService.reGenerateHistory(schedule, scheduleInstanceId, null); + ScheduleJobBuilder.build(schedule); + } + } return success(); } @@ -199,8 +221,10 @@ public class ScheduleController extends BaseController { if (schedule == null) { return failed(); } - SchedulerUtils.deleteJob(req.getId(), Constant.JobGroup.SCHEDULE); + SchedulerUtils.interrupt(schedule.getId(), Constant.JobGroup.SCHEDULE); + SchedulerUtils.deleteJob(schedule.getId(), Constant.JobGroup.SCHEDULE); scheduleService.delete(schedule); + scriptHistoryService.deleteFuture(schedule.getId(), new Date()); return success(); } @@ -224,7 +248,7 @@ public class ScheduleController extends BaseController { } Date now = new Date(); String scheduleInstanceId = new SimpleDateFormat("yyyyMMddHHmmss").format(now); - generateScriptHistories(schedule, scheduleInstanceId, null, now, now, false); + generateHistory(schedule, scheduleInstanceId, null); schedule.setRealFireTime(now); scheduleService.save(schedule); DtoSchedule dtoSchedule = new DtoSchedule(); @@ -235,33 +259,6 @@ public class ScheduleController extends BaseController { return success(result); } - @RequestMapping(value = "/supplement.api", method = RequestMethod.POST) - public Msg supplement(@RequestBody JSONObject params) throws ParseException { - Integer id = params.getInteger("id"); - Schedule schedule = scheduleService.findById(id); - if (schedule == null) { - return failed(); - } - DateFormat dateFormat = new SimpleDateFormat("yyyyMMddHHmmss"); - DateFormat dateFormat1 = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); - Date startTime = dateFormat1.parse(params.getString("start")); - Date endTime = dateFormat1.parse(params.getString("end")); - Date now = new Date(); - Date needFireTime = ScheduleJob.getNeedFireTime(schedule.generateCron(), startTime); - // 获取第一个触发时间 - while (needFireTime.compareTo(startTime) < 0) { - needFireTime = ScheduleJob.getNextFireTime(schedule.generateCron(), needFireTime); - } - if (needFireTime.compareTo(endTime) > 0) { - return failed("时间范围有误"); - } - do { - generateScriptHistories(schedule, dateFormat.format(needFireTime), null, needFireTime, now, true); - needFireTime = ScheduleJob.getNextFireTime(schedule.generateCron(), needFireTime); - } while (needFireTime.compareTo(endTime) < 0); - return success(); - } - @RequestMapping(value = "/treeview.api", method = RequestMethod.GET) public Msg treeView(@RequestParam Integer id, @RequestParam String instance) { @@ -270,9 +267,7 @@ public class ScheduleController extends BaseController { return failed(); } Map nodeTree = new HashMap<>(); - List scriptHistories = scriptHistoryService.findByQuery( - "scheduleId=" + schedule.getId() + - ";scheduleInstanceId=" + instance, + List scriptHistories = scriptHistoryService.findByQuery("scheduleId=" + schedule.getId() + ";scheduleInstanceId=" + instance, Sort.by(Sort.Direction.ASC, "id")); generateNodeTree(null, scriptHistories, nodeTree); return success(Collections.singletonList(nodeTree)); @@ -289,24 +284,12 @@ public class ScheduleController extends BaseController { } } - private void generateScriptHistories(Schedule schedule, String scheduleInstanceId, String previousScheduleTopNodeId, Date instanceTime, Date now, boolean supplement) { + private void generateHistory(Schedule schedule, String scheduleInstanceId, String previousScheduleTopNodeId) { Map nodeIdToData = schedule.analyzeNextNode(previousScheduleTopNodeId); for (String nodeId : nodeIdToData.keySet()) { Script script = scriptService.findOneByQuery("scheduleId=" + schedule.getId() + ";scheduleTopNodeId=" + nodeId); - ScriptHistory scriptHistory = scriptService.generateHistory(script, schedule, scheduleInstanceId, previousScheduleTopNodeId, supplement ? 2 : 0); - scriptHistory.updateState(Constant.JobState.WAITING_PARENT_); - if (supplement) { - scriptHistory.updateState(Constant.JobState.INITED); - } - scriptHistory = scriptHistoryService.save(scriptHistory); - if (supplement) { - if (instanceTime.compareTo(now) <= 0) { - ScriptHistoryShellRunnerJob.build(scriptHistory); - } else { - ScriptHistoryShellRunnerJob.build(scriptHistory, instanceTime); - } - } - generateScriptHistories(schedule, scheduleInstanceId, nodeId, instanceTime, now, supplement); + scriptService.generateHistory(script, schedule, scheduleInstanceId, previousScheduleTopNodeId); + generateHistory(schedule, scheduleInstanceId, nodeId); } } @@ -320,30 +303,38 @@ public class ScheduleController extends BaseController { } for (List histories : topScriptHistories.values()) { String stateTag = StringUtils.join(getStateTag(histories), ""); - ScriptHistory history = histories.get(0); + ScriptHistory firstScriptHistory = histories.get(0); + ScriptHistory latestScriptHistory = histories.get(histories.size() - 1); + boolean rerunEnabled = !(Constant.JobState.UN_CONFIRMED_.equals(latestScriptHistory.getState()) || + latestScriptHistory.isRunning()); + boolean emptyEnabled = !(!Constant.JobState.KILLED.equals(latestScriptHistory.getState()) && + !Constant.JobState.FAILED.equals(latestScriptHistory.getState()) && + !Constant.JobState.TIMEOUT.equals(latestScriptHistory.getState())); if (previousScheduleTopNodeId == null) { if (stateTag != null) { - nodeTree.put("text", history.getScriptName() + stateTag); - nodeTree.put("rerunEnabled_", !stateTag.contains("label-default")); + nodeTree.put("text", firstScriptHistory.getScriptName() + stateTag); + nodeTree.put("rerunEnabled_", rerunEnabled); + nodeTree.put("emptyEnabled_", emptyEnabled); } else { - nodeTree.put("text", history.getScriptName()); + nodeTree.put("text", firstScriptHistory.getScriptName()); } - nodeTree.put("nodeId_", history.getScheduleTopNodeId()); - nodeTree.put("scheduleId_", history.getScheduleId()); - nodeTree.put("icon", "iconfont " + scriptIconClass.get(history.getScriptType())); + nodeTree.put("nodeId_", firstScriptHistory.getScheduleTopNodeId()); + nodeTree.put("scheduleId_", firstScriptHistory.getScheduleId()); + nodeTree.put("icon", "iconfont " + scriptIconClass.get(firstScriptHistory.getScriptType())); nodeTree.put("state", Collections.singletonMap("expanded", true)); - generateNodeTree(history.getScheduleTopNodeId(), scriptHistories, nodeTree); + generateNodeTree(firstScriptHistory.getScheduleTopNodeId(), scriptHistories, nodeTree); } else { Map childNode = new HashMap<>(); if (stateTag != null) { - childNode.put("text", history.getScriptName() + stateTag); - childNode.put("rerunEnabled_", !stateTag.contains("label-default")); + childNode.put("text", firstScriptHistory.getScriptName() + stateTag); + childNode.put("rerunEnabled_", rerunEnabled); + childNode.put("emptyEnabled_", emptyEnabled); } else { - childNode.put("text", history.getScriptName()); + childNode.put("text", firstScriptHistory.getScriptName()); } - childNode.put("nodeId_", history.getScheduleTopNodeId()); - childNode.put("scheduleId_", history.getScheduleId()); - childNode.put("icon", "iconfont " + scriptIconClass.get(history.getScriptType())); + childNode.put("nodeId_", firstScriptHistory.getScheduleTopNodeId()); + childNode.put("scheduleId_", firstScriptHistory.getScheduleId()); + childNode.put("icon", "iconfont " + scriptIconClass.get(firstScriptHistory.getScriptType())); childNode.put("state", Collections.singletonMap("expanded", true)); List> childNodes = (List>)nodeTree.get("nodes"); if (childNodes == null) { @@ -351,7 +342,7 @@ public class ScheduleController extends BaseController { nodeTree.put("nodes", childNodes); } childNodes.add(childNode); - generateNodeTree(history.getScheduleTopNodeId(), scriptHistories, childNode); + generateNodeTree(firstScriptHistory.getScheduleTopNodeId(), scriptHistories, childNode); } } } @@ -361,7 +352,7 @@ public class ScheduleController extends BaseController { if (Constant.JobState.UN_CONFIRMED_.equals(scriptHistory.getState())) { return ""; } - if (Constant.JobState.WAITING_PARENT_.equals(scriptHistory.getState())) { + if (Constant.JobState.TIME_WAIT_.equals(scriptHistory.getState())) { return ""; } if (scriptHistory.isRunning()) { diff --git a/src/main/java/com/meiyou/bigwhale/controller/ScriptController.java b/src/main/java/com/meiyou/bigwhale/controller/ScriptController.java index a98e9b314cac2d8a647c84aa0bc3935fca6aba27..f45331a586867da077729843308d48764b1c3f7d 100644 --- a/src/main/java/com/meiyou/bigwhale/controller/ScriptController.java +++ b/src/main/java/com/meiyou/bigwhale/controller/ScriptController.java @@ -1,11 +1,13 @@ package com.meiyou.bigwhale.controller; +import com.meiyou.bigwhale.common.Constant; import com.meiyou.bigwhale.common.pojo.Msg; import com.meiyou.bigwhale.dto.DtoScript; import com.meiyou.bigwhale.entity.Script; import com.meiyou.bigwhale.entity.ScriptHistory; -import com.meiyou.bigwhale.job.ScriptHistoryShellRunnerJob; +import com.meiyou.bigwhale.scheduler.job.ScriptJob; import com.meiyou.bigwhale.security.LoginUser; +import com.meiyou.bigwhale.service.ScriptHistoryService; import com.meiyou.bigwhale.service.ScriptService; import org.springframework.beans.BeanUtils; import org.springframework.beans.factory.annotation.Autowired; @@ -20,6 +22,8 @@ public class ScriptController extends BaseController { @Autowired private ScriptService scriptService; + @Autowired + private ScriptHistoryService scriptHistoryService; @RequestMapping(value = "/execute.api", method = RequestMethod.POST) public Msg execute(@RequestBody DtoScript req) { @@ -43,8 +47,10 @@ public class ScriptController extends BaseController { Script script = new Script(); BeanUtils.copyProperties(req, script); ScriptHistory scriptHistory = scriptService.generateHistory(script); - if (scriptHistory != null) { - ScriptHistoryShellRunnerJob.build(scriptHistory); + if (Constant.JobState.SUBMIT_WAIT.equals(scriptHistory.getState())) { + scriptHistory.updateState(Constant.JobState.SUBMITTING); + scriptHistory = scriptHistoryService.save(scriptHistory); + ScriptJob.build(scriptHistory); } return success(scriptHistory); } diff --git a/src/main/java/com/meiyou/bigwhale/controller/ScriptHistoryController.java b/src/main/java/com/meiyou/bigwhale/controller/ScriptHistoryController.java index 1b2b18ceff221f1b342cae8205bb1617b1001797..d8bca93e9de7eddbd32ead24988414a3984ecc03 100644 --- a/src/main/java/com/meiyou/bigwhale/controller/ScriptHistoryController.java +++ b/src/main/java/com/meiyou/bigwhale/controller/ScriptHistoryController.java @@ -6,7 +6,6 @@ import com.meiyou.bigwhale.data.domain.PageRequest; import com.meiyou.bigwhale.dto.DtoScriptHistory; import com.meiyou.bigwhale.entity.Schedule; import com.meiyou.bigwhale.entity.ScriptHistory; -import com.meiyou.bigwhale.job.ScriptHistoryShellRunnerJob; import com.meiyou.bigwhale.security.LoginUser; import com.meiyou.bigwhale.service.ScheduleService; import com.meiyou.bigwhale.service.ScriptHistoryService; @@ -15,6 +14,7 @@ import org.springframework.beans.BeanUtils; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.data.domain.Page; import org.springframework.data.domain.Sort; +import org.springframework.jdbc.core.JdbcTemplate; import org.springframework.web.bind.annotation.RequestBody; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RequestMethod; @@ -23,10 +23,8 @@ import org.springframework.web.bind.annotation.RestController; import java.text.DateFormat; import java.text.SimpleDateFormat; import java.util.ArrayList; -import java.util.Comparator; import java.util.Date; import java.util.List; -import java.util.stream.Collectors; @RestController @RequestMapping("/script_history") @@ -87,12 +85,17 @@ public class ScriptHistoryController extends BaseController{ displayName = "?" + " - " + scriptHistory.getScriptName(); } - if (scriptHistory.getScheduleSupplement() != null && scriptHistory.getScheduleSupplement() ) { - displayName += "(补数)"; + if (scriptHistory.getScheduleRetry()) { + displayName += "(重试 - " + scriptHistory.getScheduleFailureHandle().split(";")[2] + ")"; + } + if (scriptHistory.getScheduleEmpty()) { + displayName += "(空跑)"; + } + if (scriptHistory.getScheduleRerun()) { + displayName += "(重跑)"; } } else { - displayName = "实时任务" + " - " + - scriptHistory.getScriptName(); + displayName = scriptHistory.getScriptName(); } } else { displayName = "Edit Test"; @@ -108,25 +111,65 @@ public class ScriptHistoryController extends BaseController{ List scriptHistories = scriptHistoryService.findByQuery( "scheduleId=" + req.getScheduleId() + ";scheduleTopNodeId=" + req.getScheduleTopNodeId() + - ";scheduleInstanceId=" + req.getScheduleInstanceId()); - // 升序 - scriptHistories.sort(Comparator.comparing(ScriptHistory::getId)); - scriptHistories = scriptHistories.stream().filter(scriptHistory -> !scriptHistory.getScheduleSupplement()).collect(Collectors.toList()); - if (scriptHistories.isEmpty()) { - return failed("补数节点不能重跑"); - } - ScriptHistory scriptHistory = scriptHistories.get(0); - if (scriptHistory.getCreateTime().after(new Date())) { + ";scheduleInstanceId=" + req.getScheduleInstanceId(), new Sort(Sort.Direction.ASC, "id")); + ScriptHistory firstScriptHistory = scriptHistories.get(0); + ScriptHistory latestScriptHistory = scriptHistories.get(scriptHistories.size() - 1); + if (Constant.JobState.UN_CONFIRMED_.equals(latestScriptHistory.getState()) || + latestScriptHistory.isRunning()) { return failed(); } - scriptHistory.resetState(); - scriptHistory.setScheduleOperateTime(new Date()); - scriptHistory.updateState(Constant.JobState.UN_CONFIRMED_); - scriptHistory.updateState(Constant.JobState.WAITING_PARENT_); - scriptHistory.updateState(Constant.JobState.INITED); - scriptHistory = scriptHistoryService.save(scriptHistory); - ScriptHistoryShellRunnerJob.build(scriptHistory); + ScriptHistory rerunScriptHistory = new ScriptHistory(); + BeanUtils.copyProperties(firstScriptHistory, rerunScriptHistory); + rerunScriptHistory.reset(); + rerunScriptHistory.setScheduleRerun(true); + rerunScriptHistory.setCreateTime(new Date()); + rerunScriptHistory.updateState(Constant.JobState.UN_CONFIRMED_); + rerunScriptHistory.updateState(Constant.JobState.TIME_WAIT_); + rerunScriptHistory.setDelayTime(new Date()); + scriptHistoryService.save(rerunScriptHistory); + updateChildren(latestScriptHistory); + return success(); + } + + @RequestMapping(value = "/empty.api", method = RequestMethod.POST) + public Msg empty(@RequestBody DtoScriptHistory req) { + List scriptHistories = scriptHistoryService.findByQuery( + "scheduleId=" + req.getScheduleId() + + ";scheduleTopNodeId=" + req.getScheduleTopNodeId() + + ";scheduleInstanceId=" + req.getScheduleInstanceId(), new Sort(Sort.Direction.ASC, "id")); + ScriptHistory firstScriptHistory = scriptHistories.get(0); + ScriptHistory latestScriptHistory = scriptHistories.get(scriptHistories.size() - 1); + if (!Constant.JobState.KILLED.equals(latestScriptHistory.getState()) && + !Constant.JobState.FAILED.equals(latestScriptHistory.getState()) && + !Constant.JobState.TIMEOUT.equals(latestScriptHistory.getState())) { + return failed("状态错误"); + } + // 无须提交 + ScriptHistory emptyScriptHistory = new ScriptHistory(); + BeanUtils.copyProperties(firstScriptHistory, emptyScriptHistory); + emptyScriptHistory.reset(); + emptyScriptHistory.setScheduleEmpty(true); + emptyScriptHistory.setCreateTime(new Date()); + emptyScriptHistory.updateState(Constant.JobState.UN_CONFIRMED_); + emptyScriptHistory.updateState(Constant.JobState.TIME_WAIT_); + emptyScriptHistory.setDelayTime(new Date()); + emptyScriptHistory.updateState(Constant.JobState.SUCCEEDED); + emptyScriptHistory.setFinishTime(new Date()); + scriptHistoryService.save(emptyScriptHistory); + updateChildren(latestScriptHistory); return success(); } + private void updateChildren(ScriptHistory scriptHistory) { + List childrenScriptHistories = scriptHistoryService.findByQuery("scheduleId=" + scriptHistory.getScheduleId() + + ";scheduleInstanceId=" + scriptHistory.getScheduleInstanceId() + + ";previousScheduleTopNodeId=" + scriptHistory.getScheduleTopNodeId()); + for (ScriptHistory childrenScriptHistory : childrenScriptHistories) { + if (!childrenScriptHistory.getScheduleRunnable()) { + scriptHistoryService.switchScheduleRunnable(childrenScriptHistory.getId(), true); + updateChildren(childrenScriptHistory); + } + } + } + } diff --git a/src/main/java/com/meiyou/bigwhale/controller/StreamController.java b/src/main/java/com/meiyou/bigwhale/controller/StreamController.java index a330fe6e7ae2ae98e8a3603e62deda0a5346916f..e96187b3950f056bb9f66747848892620b83f114 100644 --- a/src/main/java/com/meiyou/bigwhale/controller/StreamController.java +++ b/src/main/java/com/meiyou/bigwhale/controller/StreamController.java @@ -5,7 +5,8 @@ import com.meiyou.bigwhale.common.pojo.Msg; import com.meiyou.bigwhale.dto.DtoMonitor; import com.meiyou.bigwhale.dto.DtoScript; import com.meiyou.bigwhale.entity.*; -import com.meiyou.bigwhale.job.ScriptHistoryShellRunnerJob; +import com.meiyou.bigwhale.scheduler.monitor.StreamJobMonitor; +import com.meiyou.bigwhale.scheduler.job.ScriptJob; import com.meiyou.bigwhale.service.MonitorService; import com.meiyou.bigwhale.security.LoginUser; import com.meiyou.bigwhale.service.ScriptHistoryService; @@ -145,7 +146,15 @@ public class StreamController extends BaseController { if (req.getMonitor().getDingdingHooks() != null && !req.getMonitor().getDingdingHooks().isEmpty()) { monitor.setDingdingHooks(StringUtils.join(req.getMonitor().getDingdingHooks(), ",")); } - scriptService.update(script, monitor); + if (monitor.getId() != null) { + SchedulerUtils.interrupt(monitor.getId(), Constant.JobGroup.MONITOR); + SchedulerUtils.deleteJob(monitor.getId(), Constant.JobGroup.MONITOR); + } + script = scriptService.update(script, monitor); + monitor = monitorService.findById(script.getMonitorId()); + if (monitor.getEnabled()) { + StreamJobMonitor.build(monitor); + } return success(); } @@ -156,6 +165,7 @@ public class StreamController extends BaseController { return failed(); } Monitor monitor = monitorService.findById(script.getMonitorId()); + SchedulerUtils.interrupt(monitor.getId(), Constant.JobGroup.MONITOR); SchedulerUtils.deleteJob(monitor.getId(), Constant.JobGroup.MONITOR); scriptService.delete(script); return success(); @@ -167,13 +177,15 @@ public class StreamController extends BaseController { if (script == null) { return failed(); } - ScriptHistory scriptHistory = scriptHistoryService.findNoScheduleLatestByScriptId(script.getId()); + ScriptHistory scriptHistory = scriptHistoryService.findScriptLatest(script.getId()); if (scriptHistory != null && scriptHistory.isRunning()) { return failed("任务运行中,请勿重复执行"); } scriptHistory = scriptService.generateHistory(script); - if (scriptHistory != null) { - ScriptHistoryShellRunnerJob.build(scriptHistory); + if (Constant.JobState.SUBMIT_WAIT.equals(scriptHistory.getState())) { + scriptHistory.updateState(Constant.JobState.SUBMITTING); + scriptHistory = scriptHistoryService.save(scriptHistory); + ScriptJob.build(scriptHistory); } return success(scriptHistory); } diff --git a/src/main/java/com/meiyou/bigwhale/dto/DtoScriptHistory.java b/src/main/java/com/meiyou/bigwhale/dto/DtoScriptHistory.java index d40745860a8f1fd58ba210ddd861b81973ccd454..ff4ce6191f29ebc4779ba179d4dde051e6970475 100644 --- a/src/main/java/com/meiyou/bigwhale/dto/DtoScriptHistory.java +++ b/src/main/java/com/meiyou/bigwhale/dto/DtoScriptHistory.java @@ -4,19 +4,18 @@ import com.meiyou.bigwhale.common.Constant; import lombok.AllArgsConstructor; import lombok.Builder; import lombok.Data; +import lombok.EqualsAndHashCode; import lombok.NoArgsConstructor; import java.util.Date; -import java.util.regex.Pattern; @Data +@EqualsAndHashCode(callSuper = true) @Builder @NoArgsConstructor @AllArgsConstructor public class DtoScriptHistory extends AbstractPageDto { - private static final Pattern PROXY_USER_PATTERN = Pattern.compile("proxy user: ([\\w-.,]+)"); - private Integer id; /** @@ -34,8 +33,10 @@ public class DtoScriptHistory extends AbstractPageDto { private String scheduleTopNodeId; private String scheduleInstanceId; private String scheduleFailureHandle; - private Boolean scheduleSupplement; - private Date scheduleOperateTime; + private Boolean scheduleRunnable; + private Boolean scheduleRetry; + private Boolean scheduleEmpty; + private Boolean scheduleRerun; private String previousScheduleTopNodeId; private Integer scriptId; @@ -51,10 +52,12 @@ public class DtoScriptHistory extends AbstractPageDto { private String errors; private Date createTime; private Integer createBy; + private Date businessTime; + private Date delayTime; + private Date submitTime; private Date startTime; private Date finishTime; - /* ----- yarn 相关字段 ----- */ /** * user;queue;name @@ -82,8 +85,8 @@ public class DtoScriptHistory extends AbstractPageDto { } public boolean isRunning() { - return Constant.JobState.WAITING_PARENT_.equals(state) || - Constant.JobState.INITED.equals(state) || + return Constant.JobState.TIME_WAIT_.equals(state) || + Constant.JobState.SUBMIT_WAIT.equals(state) || Constant.JobState.SUBMITTING.equals(state) || Constant.JobState.SUBMITTED.equals(state) || Constant.JobState.ACCEPTED.equals(state) || diff --git a/src/main/java/com/meiyou/bigwhale/entity/ScriptHistory.java b/src/main/java/com/meiyou/bigwhale/entity/ScriptHistory.java index 2b2a8adbfe2c8e137aa4fb76e85e60d5bb6a4139..f641bee20ddb706ab71cfba5251c6785f8ccffe0 100644 --- a/src/main/java/com/meiyou/bigwhale/entity/ScriptHistory.java +++ b/src/main/java/com/meiyou/bigwhale/entity/ScriptHistory.java @@ -6,7 +6,11 @@ import lombok.Builder; import lombok.Data; import lombok.NoArgsConstructor; -import javax.persistence.*; +import javax.persistence.Entity; +import javax.persistence.GeneratedValue; +import javax.persistence.GenerationType; +import javax.persistence.Id; +import javax.persistence.Table; import java.util.Date; @Data @@ -36,8 +40,10 @@ public class ScriptHistory { private String scheduleTopNodeId; private String scheduleInstanceId; private String scheduleFailureHandle; - private Boolean scheduleSupplement; - private Date scheduleOperateTime; + private Boolean scheduleRunnable; + private Boolean scheduleRetry; + private Boolean scheduleEmpty; + private Boolean scheduleRerun; private String previousScheduleTopNodeId; private Integer scriptId; @@ -53,6 +59,9 @@ public class ScriptHistory { private String errors; private Date createTime; private Integer createBy; + private Date businessTime; + private Date delayTime; + private Date submitTime; private Date startTime; private Date finishTime; @@ -74,13 +83,21 @@ public class ScriptHistory { this.steps = this.steps.split("]")[0] + ",\"" + state + "\"]"; } } + if (state.equals(Constant.JobState.SUBMITTING)) { + this.submitTime = new Date(); + } } - public void resetState() { + public void reset() { + this.id = null; + this.scheduleFailureHandle = null; this.state = null; this.steps = null; this.outputs = null; this.errors = null; + this.createTime = null; + this.delayTime = null; + this.submitTime = null; this.startTime = null; this.finishTime = null; this.jobId = null; @@ -93,8 +110,8 @@ public class ScriptHistory { } public boolean isRunning() { - return Constant.JobState.WAITING_PARENT_.equals(state) || - Constant.JobState.INITED.equals(state) || + return Constant.JobState.TIME_WAIT_.equals(state) || + Constant.JobState.SUBMIT_WAIT.equals(state) || Constant.JobState.SUBMITTING.equals(state) || Constant.JobState.SUBMITTED.equals(state) || Constant.JobState.ACCEPTED.equals(state) || diff --git a/src/main/java/com/meiyou/bigwhale/job/ScheduleJob.java b/src/main/java/com/meiyou/bigwhale/job/ScheduleJob.java deleted file mode 100644 index 7e1ac82b66e3ad8a12ddae98226a891cd70c94ff..0000000000000000000000000000000000000000 --- a/src/main/java/com/meiyou/bigwhale/job/ScheduleJob.java +++ /dev/null @@ -1,123 +0,0 @@ -package com.meiyou.bigwhale.job; - -import com.meiyou.bigwhale.common.Constant; -import com.meiyou.bigwhale.entity.Script; -import com.meiyou.bigwhale.entity.ScriptHistory; -import com.meiyou.bigwhale.entity.Schedule; -import com.meiyou.bigwhale.service.ScriptHistoryService; -import com.meiyou.bigwhale.service.ScriptService; -import com.meiyou.bigwhale.service.ScheduleService; -import com.meiyou.bigwhale.util.SchedulerUtils; -import org.apache.commons.lang.time.DateUtils; -import org.quartz.*; -import org.springframework.beans.factory.annotation.Autowired; - -import java.text.DateFormat; -import java.text.ParseException; -import java.text.SimpleDateFormat; -import java.util.Date; -import java.util.List; -import java.util.Map; - -/** - * @author Suxy - * @date 2019/9/5 - * @description file description - */ -public class ScheduleJob implements Job { - - private DateFormat dateFormat = new SimpleDateFormat("yyyyMMddHHmmss"); - - @Autowired - private ScriptHistoryService scriptHistoryService; - @Autowired - private ScriptService scriptService; - @Autowired - private ScheduleService scheduleService; - - @Override - public void execute(JobExecutionContext jobExecutionContext) { - Integer scheduleId = Integer.parseInt(jobExecutionContext.getJobDetail().getKey().getName()); - Schedule schedule = scheduleService.findById(scheduleId); - schedule.setRealFireTime(jobExecutionContext.getFireTime()); - schedule.setNeedFireTime(jobExecutionContext.getScheduledFireTime()); - schedule.setNextFireTime(jobExecutionContext.getNextFireTime()); - scheduleService.save(schedule); - confirmedNeed(jobExecutionContext, schedule); - } - - private void confirmedNeed(JobExecutionContext jobExecutionContext, Schedule schedule) { - String scheduleInstanceId = dateFormat.format(jobExecutionContext.getScheduledFireTime()); - List scriptHistories = scriptHistoryService.findByQuery("scheduleId=" + schedule.getId() + - ";scheduleInstanceId=" + scheduleInstanceId + ";state=" + Constant.JobState.UN_CONFIRMED_); - if (scriptHistories.isEmpty()) { - scheduleInstanceId = dateFormat.format(jobExecutionContext.getNextFireTime()); - scriptHistories = scriptHistoryService.findByQuery("scheduleId=" + schedule.getId() + - ";scheduleInstanceId=" + scheduleInstanceId); - if (scriptHistories.isEmpty()) { - scriptHistories = scriptHistoryService.findByQuery("scheduleId=" + schedule.getId() + - ";state=" + Constant.JobState.UN_CONFIRMED_); - scriptHistories.forEach(scriptHistoryService::missingScheduling); - prepareNext(jobExecutionContext, schedule); - } - return; - } - generateHistory(schedule, scheduleInstanceId, null, 1); - prepareNext(jobExecutionContext, schedule); - } - - private void prepareNext(JobExecutionContext jobExecutionContext, Schedule schedule) { - String scheduleInstanceId = dateFormat.format(jobExecutionContext.getNextFireTime()); - generateHistory(schedule, scheduleInstanceId, null, 0); - } - - private void generateHistory(Schedule schedule, String scheduleInstanceId, String previousScheduleTopNodeId, int generateStatus) { - Map nextNodeIdToObj = schedule.analyzeNextNode(previousScheduleTopNodeId); - for (String nodeId : nextNodeIdToObj.keySet()) { - Script script = scriptService.findOneByQuery("scheduleId=" + schedule.getId() + ";scheduleTopNodeId=" + nodeId); - scriptService.generateHistory(script, schedule, scheduleInstanceId, previousScheduleTopNodeId, generateStatus); - generateHistory(schedule, scheduleInstanceId, nodeId, generateStatus); - } - } - - public static Date getNeedFireTime(String cron, Date startDate) { - Date nextFireTime1 = getNextFireTime(cron, startDate); - Date nextFireTime2 = getNextFireTime(cron, nextFireTime1); - int intervals = (int) (nextFireTime2.getTime() - nextFireTime1.getTime()); - Date cal1 = DateUtils.addMilliseconds(nextFireTime1, - intervals); - Date cal2 = getNextFireTime(cron, cal1); - Date cal3 = getNextFireTime(cron, cal2); - while (!cal3.equals(nextFireTime1)) { - cal1 = DateUtils.addMilliseconds(cal1, - intervals); - cal2 = getNextFireTime(cron, cal1); - cal3 = getNextFireTime(cron, cal2); - if (cal3.before(nextFireTime1)) { - intervals = -1000; - } - } - return cal2; - } - - public static Date getNextFireTime(String cron, Date startDate) { - return getCronExpression(cron).getTimeAfter(startDate); - } - - private static CronExpression getCronExpression(String cron) { - try { - return new CronExpression(cron); - } catch (ParseException e) { - throw new IllegalArgumentException(e); - } - } - - public static void build(Schedule schedule) { - SchedulerUtils.scheduleCronJob(ScheduleJob.class, - schedule.getId(), - Constant.JobGroup.SCHEDULE, - schedule.generateCron(), - null, - schedule.getStartTime(), - schedule.getEndTime()); - } - -} diff --git a/src/main/java/com/meiyou/bigwhale/job/ScheduleSubmitJob.java b/src/main/java/com/meiyou/bigwhale/job/ScheduleSubmitJob.java deleted file mode 100644 index 2dc57ffb604579e7700c7f889c152652e0fdf45b..0000000000000000000000000000000000000000 --- a/src/main/java/com/meiyou/bigwhale/job/ScheduleSubmitJob.java +++ /dev/null @@ -1,107 +0,0 @@ -package com.meiyou.bigwhale.job; - -import com.meiyou.bigwhale.common.Constant; -import com.meiyou.bigwhale.entity.ScriptHistory; -import com.meiyou.bigwhale.service.ScriptHistoryService; -import org.quartz.DisallowConcurrentExecution; -import org.quartz.Job; -import org.quartz.JobExecutionContext; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.data.domain.Sort; - -import java.util.Date; -import java.util.List; -import java.util.stream.Collectors; - -/** - * @author Suxy - * @date 2020/4/23 - * @description file description - */ -@DisallowConcurrentExecution -public class ScheduleSubmitJob implements Job { - - @Autowired - private ScriptHistoryService scriptHistoryService; - - @Override - public void execute(JobExecutionContext jobExecutionContext) { - List scriptHistories = scriptHistoryService.findByQuery("state=" + Constant.JobState.WAITING_PARENT_, - new Sort(Sort.Direction.ASC, "createTime")); - for (ScriptHistory scriptHistory : scriptHistories) { - String state; - if (scriptHistory.getPreviousScheduleTopNodeId() == null) { - state = Constant.JobState.INITED; - } else { - String previousNodeState = previousNodeState(scriptHistory.getScheduleId(), scriptHistory.getScheduleInstanceId(), scriptHistory.getPreviousScheduleTopNodeId()); - switch (previousNodeState) { - case Constant.JobState.SUCCEEDED: - state = Constant.JobState.INITED; - break; - case Constant.JobState.FAILED: - state = Constant.JobState.PARENT_FAILED_; - break; - case Constant.JobState.RUNNING: - default: - continue; - } - } - scriptHistory.updateState(state); - if (Constant.JobState.INITED.equals(state)) { - scriptHistory = scriptHistoryService.save(scriptHistory); - ScriptHistoryShellRunnerJob.build(scriptHistory); - } else { - scriptHistory.setFinishTime(new Date()); - scriptHistoryService.save(scriptHistory); - } - } - } - - /** - * @param scheduleId - * @param scheduleInstanceId - * @param previousScheduleTopNodeId - * @return - */ - private String previousNodeState(Integer scheduleId, String scheduleInstanceId, String previousScheduleTopNodeId) { - List scriptHistories = scriptHistoryService.findByQuery("scheduleId=" + scheduleId + - ";scheduleTopNodeId=" + previousScheduleTopNodeId + - ";scheduleInstanceId=" + scheduleInstanceId); - sort(scriptHistories); - for (ScriptHistory scriptHistory : scriptHistories) { - if (scriptHistory.isRunning()) { - return Constant.JobState.RUNNING; - } - if (Constant.JobState.SUCCEEDED.equals(scriptHistory.getState())) { - return Constant.JobState.SUCCEEDED; - } - } - scriptHistories = scriptHistories.stream().filter(scriptHistory -> !scriptHistory.getScheduleSupplement()).collect(Collectors.toList()); - if (!scriptHistories.isEmpty()) { - ScriptHistory scriptHistory = scriptHistories.get(0); - String [] scheduleFailureHandleArr = scriptHistory.getScheduleFailureHandle().split(";"); - int failureRetries = Integer.parseInt(scheduleFailureHandleArr[0]); - int currFailureRetries = Integer.parseInt(scheduleFailureHandleArr[2]); - if (currFailureRetries < failureRetries) { - // 一般失败后便会进入重试流程,如果最后一个任务结束30S之后还没有进入重试流程,则直接走失败判断流程 - if ((System.currentTimeMillis() - scriptHistory.getFinishTime().getTime()) < 30000) { - return Constant.JobState.RUNNING; - } - } - } - return Constant.JobState.FAILED; - } - - private void sort(List scriptHistories) { - // 降序 - scriptHistories.sort((i1, i2) -> { - if (i1.getId() > i2.getId()) { - return -1; - } else if (i1.getId() < i2.getId()) { - return 1; - } - return 0; - }); - } - -} diff --git a/src/main/java/com/meiyou/bigwhale/job/ScriptHistoryTimeoutJob.java b/src/main/java/com/meiyou/bigwhale/job/ScriptHistoryTimeoutJob.java deleted file mode 100644 index fae9ad00b33dae286d240c4a643c4dcb8c88f935..0000000000000000000000000000000000000000 --- a/src/main/java/com/meiyou/bigwhale/job/ScriptHistoryTimeoutJob.java +++ /dev/null @@ -1,78 +0,0 @@ -package com.meiyou.bigwhale.job; - -import com.meiyou.bigwhale.common.Constant; -import com.meiyou.bigwhale.common.pojo.HttpYarnApp; -import com.meiyou.bigwhale.entity.Cluster; -import com.meiyou.bigwhale.entity.ScriptHistory; -import com.meiyou.bigwhale.service.ClusterService; -import com.meiyou.bigwhale.service.ScriptHistoryService; -import com.meiyou.bigwhale.util.SchedulerUtils; -import com.meiyou.bigwhale.util.YarnApiUtils; -import org.apache.commons.lang.StringUtils; -import org.quartz.*; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.util.CollectionUtils; - -import java.util.*; - -/** - * @author Suxy - * @date 2019/9/6 - * @description file description - */ -@DisallowConcurrentExecution -public class ScriptHistoryTimeoutJob extends AbstractRetryableJob implements Job { - - private static final String [] RUNNING_STATES = new String[] { - Constant.JobState.INITED, - Constant.JobState.SUBMITTING, - Constant.JobState.SUBMITTED, - Constant.JobState.ACCEPTED, - Constant.JobState.RUNNING - }; - - @Autowired - private ScriptHistoryService scriptHistoryService; - @Autowired - private ClusterService clusterService; - - @Override - public void execute(JobExecutionContext jobExecutionContext) { - //未开始执行和正在执行的 - List scriptHistories = scriptHistoryService.findByQuery("state=" + StringUtils.join(RUNNING_STATES, ",") + ";jobFinalStatus-"); - if (CollectionUtils.isEmpty(scriptHistories)) { - return; - } - for (ScriptHistory scriptHistory : scriptHistories) { - if (scriptHistoryService.execTimeout(scriptHistory)) { - boolean retry = true; - // Yarn资源不够时,客户端会长时间处于提交请求状态,平台无法中断此请求,故在此处再判断一次状态 - if (scriptHistory.getClusterId() != null && scriptHistory.getState().equals(Constant.JobState.SUBMITTING)) { - Cluster cluster = clusterService.findById(scriptHistory.getClusterId()); - String [] jobParams = scriptHistory.getJobParams().split(";"); - HttpYarnApp httpYarnApp = YarnApiUtils.getActiveApp(cluster.getYarnUrl(), jobParams[0], jobParams[1], jobParams[2], 3); - if (httpYarnApp != null) { - retry = false; - scriptHistory.updateState(Constant.JobState.SUBMITTED); - scriptHistory.setJobFinalStatus("UNDEFINED"); - } else { - scriptHistory.updateState(Constant.JobState.TIMEOUT); - scriptHistory.setFinishTime(new Date()); - } - } else { - scriptHistory.updateState(Constant.JobState.TIMEOUT); - scriptHistory.setFinishTime(new Date()); - } - scriptHistoryService.save(scriptHistory); - // 处理调度 - SchedulerUtils.interrupt(scriptHistory.getId(), Constant.JobGroup.SCRIPT_HISTORY); - SchedulerUtils.deleteJob(scriptHistory.getId(), Constant.JobGroup.SCRIPT_HISTORY); - // 重试 - if (retry) { - retryCurrentNode(scriptHistory, Constant.ErrorType.TIMEOUT); - } - } - } - } - -} diff --git a/src/main/java/com/meiyou/bigwhale/listener/ApplicationReadyListener.java b/src/main/java/com/meiyou/bigwhale/listener/ApplicationReadyListener.java index ed1bcc53bba47c117331b0ecd2c590fa238336de..2539ed2df4fa32030201e24e6d765374c90507b5 100644 --- a/src/main/java/com/meiyou/bigwhale/listener/ApplicationReadyListener.java +++ b/src/main/java/com/meiyou/bigwhale/listener/ApplicationReadyListener.java @@ -2,11 +2,16 @@ package com.meiyou.bigwhale.listener; import com.meiyou.bigwhale.common.Constant; import com.meiyou.bigwhale.entity.*; -import com.meiyou.bigwhale.job.*; -import com.meiyou.bigwhale.job.system.PlatformTimeoutJob; +import com.meiyou.bigwhale.scheduler.*; +import com.meiyou.bigwhale.scheduler.monitor.StreamJobMonitor; +import com.meiyou.bigwhale.scheduler.system.PlatformTimeoutChecker; +import com.meiyou.bigwhale.scheduler.system.ScriptHistoryCleaner; +import com.meiyou.bigwhale.scheduler.workflow.ScheduleJobBuilder; +import com.meiyou.bigwhale.scheduler.workflow.ScheduleJobSubmitter; import com.meiyou.bigwhale.service.*; -import com.meiyou.bigwhale.job.system.ActiveYarnAppRefreshJob; +import com.meiyou.bigwhale.scheduler.system.ActiveYarnAppRefresher; import com.meiyou.bigwhale.util.SchedulerUtils; +import org.apache.commons.lang.time.DateUtils; import org.quartz.SchedulerException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -46,7 +51,7 @@ public class ApplicationReadyListener implements ApplicationListener monitors = monitorService.findByQuery("enabled=" + true); - monitors.forEach(MonitorJob::build); + monitors.forEach(StreamJobMonitor::build); } private void startSchedule() { List schedules = scheduleService.findByQuery("enabled=" + true); Date now = new Date(); schedules.forEach(schedule -> { - Date nextFireTime = ScheduleJob.getNextFireTime(schedule.generateCron(), schedule.getStartTime().compareTo(now) <= 0 ? now : schedule.getStartTime()); - String scheduleInstanceId = dateFormat.format(nextFireTime); - List scriptHistories = scriptHistoryService.findByQuery("scheduleId=" + schedule.getId() + ";state=" + Constant.JobState.UN_CONFIRMED_); - scriptService.reGenerateHistory(schedule, scheduleInstanceId, null, 0, scriptHistories); - scriptHistories.forEach(scriptHistoryService::missingScheduling); - ScheduleJob.build(schedule); + Date nextFireTime = SchedulerUtils.getNextFireTime(schedule.generateCron(), schedule.getStartTime().compareTo(now) <= 0 ? now : schedule.getStartTime()); + // 生成下个实例 + if (nextFireTime.compareTo(schedule.getEndTime()) <= 0) { + String scheduleInstanceId = dateFormat.format(nextFireTime); + ScriptHistory scriptHistory = scriptHistoryService.findOneByQuery("scheduleId=" + schedule.getId() + + ";scheduleInstanceId=" + scheduleInstanceId); + if (scriptHistory == null) { + scriptService.reGenerateHistory(schedule, scheduleInstanceId, null); + } + ScheduleJobBuilder.build(schedule); + } }); - // 启动调度记录提交任务 - SchedulerUtils.scheduleCronJob(ScheduleSubmitJob.class, "*/1 * * * * ?"); + // 调度作业提交 + SchedulerUtils.scheduleCronJob(ScheduleJobSubmitter.class, ScheduleJobSubmitter.class.getSimpleName(), Constant.JobGroup.SCHEDULE, "*/1 * * * * ?"); + // 脚本执行超时处理 + SchedulerUtils.scheduleCronJob(ScriptJobTimeoutChecker.class, ScriptJobTimeoutChecker.class.getSimpleName(), Constant.JobGroup.SCHEDULE, "*/10 * * * * ?"); + // 作业状态更新 + SchedulerUtils.scheduleCronJob(ScriptJobYarnStateRefresher.class, ScriptJobYarnStateRefresher.class.getSimpleName(), Constant.JobGroup.SCHEDULE, "*/5 * * * * ?"); + // 服务异常退出处理 + SchedulerUtils.scheduleSimpleJob(ScriptJobExceptionFeedbacker.class, ScriptJobExceptionFeedbacker.class.getSimpleName(), Constant.JobGroup.SCHEDULE, 0, 0, null, DateUtils.addSeconds(new Date(), 60), null); } } diff --git a/src/main/java/com/meiyou/bigwhale/job/AbstractNoticeableJob.java b/src/main/java/com/meiyou/bigwhale/scheduler/AbstractNoticeable.java similarity index 97% rename from src/main/java/com/meiyou/bigwhale/job/AbstractNoticeableJob.java rename to src/main/java/com/meiyou/bigwhale/scheduler/AbstractNoticeable.java index 955ee1c06136bc38d5d07b82c8ced0f30c7420e8..3d870f0e3aa6deade2a17fac87f0fa31d1fce786 100644 --- a/src/main/java/com/meiyou/bigwhale/job/AbstractNoticeableJob.java +++ b/src/main/java/com/meiyou/bigwhale/scheduler/AbstractNoticeable.java @@ -1,4 +1,4 @@ -package com.meiyou.bigwhale.job; +package com.meiyou.bigwhale.scheduler; import com.meiyou.bigwhale.entity.*; import com.meiyou.bigwhale.service.*; @@ -13,7 +13,7 @@ import org.springframework.beans.factory.annotation.Autowired; * @date 2020/4/24 * @description file description */ -public abstract class AbstractNoticeableJob { +public abstract class AbstractNoticeable { @Autowired private UserService userService; @@ -57,7 +57,7 @@ public abstract class AbstractNoticeableJob { if (monitor == null) { return; } - taskName = "实时任务" + " - " + scriptHistory.getScriptName(); + taskName = scriptHistory.getScriptName(); email = monitor.getSendEmail() ? user.getEmail() : null; dingDingHooks = monitor.getDingdingHooks(); } else if (scriptHistory.getScriptId() != null){ diff --git a/src/main/java/com/meiyou/bigwhale/job/AbstractRetryableJob.java b/src/main/java/com/meiyou/bigwhale/scheduler/AbstractRetryable.java similarity index 51% rename from src/main/java/com/meiyou/bigwhale/job/AbstractRetryableJob.java rename to src/main/java/com/meiyou/bigwhale/scheduler/AbstractRetryable.java index 1bb6225317ddad2671fb7a3457ea4e3d4f5f0e5a..d74832aaf03295b63b4d6e66cc7d5a4fa60f8c37 100644 --- a/src/main/java/com/meiyou/bigwhale/job/AbstractRetryableJob.java +++ b/src/main/java/com/meiyou/bigwhale/scheduler/AbstractRetryable.java @@ -1,4 +1,4 @@ -package com.meiyou.bigwhale.job; +package com.meiyou.bigwhale.scheduler; import com.meiyou.bigwhale.common.Constant; import com.meiyou.bigwhale.entity.Schedule; @@ -6,17 +6,17 @@ import com.meiyou.bigwhale.entity.ScriptHistory; import com.meiyou.bigwhale.service.ScheduleService; import com.meiyou.bigwhale.service.ScriptHistoryService; import org.apache.commons.lang.time.DateUtils; +import org.springframework.beans.BeanUtils; import org.springframework.beans.factory.annotation.Autowired; import java.util.Date; - /** * @author Suxy * @date 2019/9/11 * @description file description */ -public abstract class AbstractRetryableJob extends AbstractNoticeableJob { +public abstract class AbstractRetryable extends AbstractNoticeable { @Autowired private ScriptHistoryService scriptHistoryService; @@ -31,11 +31,11 @@ public abstract class AbstractRetryableJob extends AbstractNoticeableJob { protected void retryCurrentNode(ScriptHistory scriptHistory, String errorType) { notice(scriptHistory, errorType); boolean retryable = scriptHistory.getScheduleId() != null && - !scriptHistory.getScheduleSupplement() && + scriptHistory.getScheduleFailureHandle() != null && !"UNKNOWN".equals(scriptHistory.getJobFinalStatus()); if (retryable) { Schedule schedule = scheduleService.findById(scriptHistory.getScheduleId()); - if (schedule == null || schedule.getUpdateTime().after(scriptHistory.getCreateTime())) { + if (schedule == null || schedule.getUpdateTime().after(scriptHistory.getBusinessTime())) { // 过期任务不重试 return; } @@ -47,29 +47,16 @@ public abstract class AbstractRetryableJob extends AbstractNoticeableJob { return; } Date startAt = DateUtils.addMinutes(new Date(), intervals); - ScriptHistory retryScriptHistory = ScriptHistory.builder() - .scheduleId(scriptHistory.getScheduleId()) - .scheduleTopNodeId(scriptHistory.getScheduleTopNodeId()) - .scheduleInstanceId(scriptHistory.getScheduleInstanceId()) - .scheduleFailureHandle(retries + ";" + intervals + (currRetries + 1)) - .scheduleSupplement(false) - .scheduleOperateTime(startAt) - .previousScheduleTopNodeId(scriptHistory.getPreviousScheduleTopNodeId()) - .scriptId(scriptHistory.getScriptId()) - .scriptName(scriptHistory.getScriptName()) - .scriptType(scriptHistory.getScriptType()) - .clusterId(scriptHistory.getClusterId()) - .agentId(scriptHistory.getAgentId()) - .timeout(scriptHistory.getTimeout()) - .content(scriptHistory.getContent()) - .createTime(scriptHistory.getCreateTime()) - .createBy(scriptHistory.getCreateBy()) - .build(); + ScriptHistory retryScriptHistory = new ScriptHistory(); + BeanUtils.copyProperties(scriptHistory, retryScriptHistory); + retryScriptHistory.reset(); + retryScriptHistory.setScheduleFailureHandle(retries + ";" + intervals + (currRetries + 1)); + retryScriptHistory.setScheduleRetry(true); + retryScriptHistory.setCreateTime(new Date()); retryScriptHistory.updateState(Constant.JobState.UN_CONFIRMED_); - retryScriptHistory.updateState(Constant.JobState.WAITING_PARENT_); - retryScriptHistory.updateState(Constant.JobState.INITED); - retryScriptHistory = scriptHistoryService.save(retryScriptHistory); - ScriptHistoryShellRunnerJob.build(retryScriptHistory, startAt); + retryScriptHistory.updateState(Constant.JobState.TIME_WAIT_); + retryScriptHistory.setDelayTime(startAt); + scriptHistoryService.save(retryScriptHistory); } } diff --git a/src/main/java/com/meiyou/bigwhale/scheduler/ScriptJobExceptionFeedbacker.java b/src/main/java/com/meiyou/bigwhale/scheduler/ScriptJobExceptionFeedbacker.java new file mode 100644 index 0000000000000000000000000000000000000000..75b1aae4944ee990a3582e6299479488bfec8bda --- /dev/null +++ b/src/main/java/com/meiyou/bigwhale/scheduler/ScriptJobExceptionFeedbacker.java @@ -0,0 +1,85 @@ +package com.meiyou.bigwhale.scheduler; + +import com.meiyou.bigwhale.common.Constant; +import com.meiyou.bigwhale.common.pojo.HttpYarnApp; +import com.meiyou.bigwhale.entity.Cluster; +import com.meiyou.bigwhale.entity.ScriptHistory; +import com.meiyou.bigwhale.scheduler.job.ScriptJob; +import com.meiyou.bigwhale.service.ClusterService; +import com.meiyou.bigwhale.service.ScriptHistoryService; +import com.meiyou.bigwhale.util.SchedulerUtils; +import com.meiyou.bigwhale.util.YarnApiUtils; +import org.apache.commons.lang.StringUtils; +import org.quartz.DisallowConcurrentExecution; +import org.quartz.Job; +import org.quartz.JobExecutionContext; +import org.springframework.beans.factory.annotation.Autowired; + +import java.util.Date; +import java.util.List; + +/** + * @author Suxy + * @date 2019/9/6 + * @description file description + */ +@DisallowConcurrentExecution +public class ScriptJobExceptionFeedbacker extends AbstractRetryable implements Job { + + private String[] feedbackStates = new String[] { + Constant.JobState.SUBMITTING, + Constant.JobState.SUBMITTED, + Constant.JobState.ACCEPTED, + Constant.JobState.RUNNING + }; + + @Autowired + private ScriptHistoryService scriptHistoryService; + @Autowired + private ClusterService clusterService; + + @Override + public void execute(JobExecutionContext jobExecutionContext) { + List scriptHistories = scriptHistoryService.findByQuery("state=" + Constant.JobState.SUBMIT_WAIT); + if (!scriptHistories.isEmpty()) { + for (ScriptHistory scriptHistory : scriptHistories) { + scriptHistory.updateState(Constant.JobState.SUBMITTING); + scriptHistory = scriptHistoryService.save(scriptHistory); + ScriptJob.build(scriptHistory); + } + } + scriptHistories = scriptHistoryService.findByQuery("state=" + StringUtils.join(feedbackStates, ",") + ";jobFinalStatus-"); + if (scriptHistories.isEmpty()) { + return; + } + for (ScriptHistory scriptHistory : scriptHistories) { + if (!SchedulerUtils.checkExists(scriptHistory.getId(), Constant.JobGroup.SCRIPT_JOB)) { + // 服务意外退出的情况下,期间的任务状态更新可能丢失, + // 针对Yarn类型的任务,符合条件的情况下扔给ScriptJobYarnStateRefresher检查 + if (Constant.ScriptType.SPARK_BATCH.equals(scriptHistory.getScriptType()) || Constant.ScriptType.SPARK_STREAM.equals(scriptHistory.getScriptType()) || + Constant.ScriptType.FLINK_BATCH.equals(scriptHistory.getScriptType()) || Constant.ScriptType.FLINK_STREAM.equals(scriptHistory.getScriptType())) { + Cluster cluster = clusterService.findById(scriptHistory.getClusterId()); + String[] params = scriptHistory.getJobParams().split(";"); + HttpYarnApp httpYarnApp = YarnApiUtils.getActiveApp(cluster.getYarnUrl(), params[0], params[1], params[2], 3); + if (httpYarnApp == null) { + httpYarnApp = YarnApiUtils.getLastNoActiveApp(cluster.getYarnUrl(), params[0], params[1], params[2], 3); + } + if (httpYarnApp != null) { + scriptHistory.updateState(Constant.JobState.SUBMITTED); + scriptHistory.updateState(Constant.JobState.ACCEPTED); + scriptHistory.updateState(Constant.JobState.RUNNING); + scriptHistory.setJobFinalStatus("UNDEFINED"); + scriptHistoryService.save(scriptHistory); + continue; + } + } + scriptHistory.updateState(Constant.JobState.FAILED); + scriptHistory.setErrors(scriptHistory.getErrors() != null ? scriptHistory.getErrors() + "\nServer unexpected exit" : "Server unexpected exit"); + scriptHistory.setFinishTime(new Date()); + scriptHistory = scriptHistoryService.save(scriptHistory); + retryCurrentNode(scriptHistory, Constant.ErrorType.SERVER_UNEXPECTED_EXIT); + } + } + } + +} diff --git a/src/main/java/com/meiyou/bigwhale/scheduler/ScriptJobTimeoutChecker.java b/src/main/java/com/meiyou/bigwhale/scheduler/ScriptJobTimeoutChecker.java new file mode 100644 index 0000000000000000000000000000000000000000..0e8be6418e566ff471e8c7ae8b1d43909e6f9418 --- /dev/null +++ b/src/main/java/com/meiyou/bigwhale/scheduler/ScriptJobTimeoutChecker.java @@ -0,0 +1,41 @@ +package com.meiyou.bigwhale.scheduler; + +import com.meiyou.bigwhale.common.Constant; +import com.meiyou.bigwhale.scheduler.job.ScriptJob; +import com.meiyou.bigwhale.util.SchedulerUtils; +import org.quartz.*; + +import java.util.*; + +/** + * @author Suxy + * @date 2019/9/6 + * @description file description + */ +@DisallowConcurrentExecution +public class ScriptJobTimeoutChecker extends AbstractRetryable implements Job { + + @Override + public void execute(JobExecutionContext jobExecutionContext) { + List executionContexts; + try { + executionContexts = SchedulerUtils.getScheduler().getCurrentlyExecutingJobs(); + } catch (SchedulerException e) { + e.printStackTrace(); + return; + } + for (JobExecutionContext executionContext : executionContexts) { + JobKey jobKey = executionContext.getJobDetail().getKey(); + if (Constant.JobGroup.SCRIPT_JOB.equals(jobKey.getGroup())) { + JobDataMap jobDataMap = executionContext.getMergedJobDataMap(); + Integer timeout = (Integer) jobDataMap.get("timeout"); + Date submitTime = (Date) jobDataMap.get("submitTime"); + if ((System.currentTimeMillis() - submitTime.getTime()) > (timeout * 60 * 1000)) { + jobDataMap.put("timeout", true); + ScriptJob.destroy(jobKey.getName()); + } + } + } + } + +} diff --git a/src/main/java/com/meiyou/bigwhale/job/ScriptHistoryYarnStateRefreshJob.java b/src/main/java/com/meiyou/bigwhale/scheduler/ScriptJobYarnStateRefresher.java similarity index 91% rename from src/main/java/com/meiyou/bigwhale/job/ScriptHistoryYarnStateRefreshJob.java rename to src/main/java/com/meiyou/bigwhale/scheduler/ScriptJobYarnStateRefresher.java index 4cadfa0a908c94d266bbbc2c0049e0c65c89be09..fdacf14b57b5986edc56fbcd002d2ea9e58a54a1 100644 --- a/src/main/java/com/meiyou/bigwhale/job/ScriptHistoryYarnStateRefreshJob.java +++ b/src/main/java/com/meiyou/bigwhale/scheduler/ScriptJobYarnStateRefresher.java @@ -1,4 +1,4 @@ -package com.meiyou.bigwhale.job; +package com.meiyou.bigwhale.scheduler; import com.meiyou.bigwhale.common.Constant; import com.meiyou.bigwhale.common.pojo.HttpYarnApp; @@ -15,7 +15,7 @@ import org.springframework.beans.factory.annotation.Autowired; import java.util.*; @DisallowConcurrentExecution -public class ScriptHistoryYarnStateRefreshJob extends AbstractRetryableJob implements InterruptableJob { +public class ScriptJobYarnStateRefresher extends AbstractRetryable implements InterruptableJob { private Thread thread; private volatile boolean interrupted = false; @@ -89,18 +89,11 @@ public class ScriptHistoryYarnStateRefreshJob extends AbstractRetryableJob imple } private void updateMatchScriptHistory(HttpYarnApp httpYarnApp, ScriptHistory scriptHistory) { - if ("RUNNING".equals(httpYarnApp.getState()) || - "FINAL_SAVING".equals(httpYarnApp.getState()) || - "FINISHING".equals(httpYarnApp.getState()) || - "KILLING".equals(httpYarnApp.getState())) { - scriptHistory.updateState(Constant.JobState.ACCEPTED); - scriptHistory.updateState(Constant.JobState.RUNNING); - } else { - scriptHistory.updateState(httpYarnApp.getState()); - } + scriptHistory.updateState(Constant.JobState.ACCEPTED); + scriptHistory.updateState(Constant.JobState.RUNNING); + scriptHistory.setStartTime(new Date(httpYarnApp.getStartedTime())); scriptHistory.setJobId(httpYarnApp.getId()); scriptHistory.setJobUrl(httpYarnApp.getTrackingUrl()); - scriptHistory.setStartTime(new Date(httpYarnApp.getStartedTime())); scriptHistoryService.save(scriptHistory); } @@ -113,9 +106,6 @@ public class ScriptHistoryYarnStateRefreshJob extends AbstractRetryableJob imple } else { scriptHistory.updateState(httpYarnApp.getState()); } - scriptHistory.setJobId(httpYarnApp.getId()); - scriptHistory.setJobUrl(httpYarnApp.getTrackingUrl()); - scriptHistory.setJobFinalStatus(httpYarnApp.getFinalStatus()); if ("FAILED".equals(httpYarnApp.getFinalStatus())) { if (httpYarnApp.getDiagnostics() != null) { if (httpYarnApp.getDiagnostics().length() > 61440) { @@ -127,10 +117,13 @@ public class ScriptHistoryYarnStateRefreshJob extends AbstractRetryableJob imple } scriptHistory.setStartTime(new Date(httpYarnApp.getStartedTime())); scriptHistory.setFinishTime(new Date(httpYarnApp.getFinishedTime())); + scriptHistory.setJobId(httpYarnApp.getId()); + scriptHistory.setJobUrl(httpYarnApp.getTrackingUrl()); + scriptHistory.setJobFinalStatus(httpYarnApp.getFinalStatus()); } else { scriptHistory.updateState(Constant.JobState.FAILED); - scriptHistory.setJobFinalStatus("UNKNOWN"); scriptHistory.setFinishTime(new Date()); + scriptHistory.setJobFinalStatus("UNKNOWN"); } scriptHistoryService.save(scriptHistory); if (!"SUCCEEDED".equals(scriptHistory.getJobFinalStatus())) { diff --git a/src/main/java/com/meiyou/bigwhale/job/ScriptHistoryShellRunnerJob.java b/src/main/java/com/meiyou/bigwhale/scheduler/job/ScriptJob.java similarity index 84% rename from src/main/java/com/meiyou/bigwhale/job/ScriptHistoryShellRunnerJob.java rename to src/main/java/com/meiyou/bigwhale/scheduler/job/ScriptJob.java index a60446b528b4359663af3c6bfd476b76ef4b23fe..a1e52bcef2985b0245a148225d68431887cc601b 100644 --- a/src/main/java/com/meiyou/bigwhale/job/ScriptHistoryShellRunnerJob.java +++ b/src/main/java/com/meiyou/bigwhale/scheduler/job/ScriptJob.java @@ -1,18 +1,23 @@ -package com.meiyou.bigwhale.job; +package com.meiyou.bigwhale.scheduler.job; import ch.ethz.ssh2.ChannelCondition; import ch.ethz.ssh2.Connection; import ch.ethz.ssh2.Session; import ch.ethz.ssh2.StreamGobbler; import com.meiyou.bigwhale.common.Constant; +import com.meiyou.bigwhale.common.pojo.HttpYarnApp; import com.meiyou.bigwhale.config.SshConfig; +import com.meiyou.bigwhale.entity.Cluster; import com.meiyou.bigwhale.entity.ScriptHistory; +import com.meiyou.bigwhale.scheduler.AbstractRetryable; import com.meiyou.bigwhale.service.AgentService; import com.meiyou.bigwhale.service.ClusterService; import com.meiyou.bigwhale.service.ScriptHistoryService; import com.meiyou.bigwhale.util.SchedulerUtils; +import com.meiyou.bigwhale.util.YarnApiUtils; import org.quartz.DisallowConcurrentExecution; import org.quartz.InterruptableJob; +import org.quartz.JobDataMap; import org.quartz.JobExecutionContext; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -38,17 +43,18 @@ import java.util.regex.Pattern; * @description file description */ @DisallowConcurrentExecution -public class ScriptHistoryShellRunnerJob extends AbstractRetryableJob implements InterruptableJob { +public class ScriptJob extends AbstractRetryable implements InterruptableJob { - private static final Logger LOGGER = LoggerFactory.getLogger(ScriptHistoryShellRunnerJob.class); + private static final Logger LOGGER = LoggerFactory.getLogger(ScriptJob.class); private static final Pattern YARN_PATTERN = Pattern.compile("application_\\d+_\\d+"); private static final Pattern KILL_PATTERN = Pattern.compile("time mark: (\\d+)"); + private JobExecutionContext context; private Thread thread; + private ScriptHistory scriptHistory; private volatile boolean commandFinish = false; private volatile boolean interrupted = false; - private ScriptHistory scriptHistory; private Connection conn; @Autowired @@ -71,15 +77,17 @@ public class ScriptHistoryShellRunnerJob extends AbstractRetryableJob implements @Override public void execute(JobExecutionContext jobExecutionContext) { + context = jobExecutionContext; thread = Thread.currentThread(); Integer scriptHistoryId = Integer.parseInt(jobExecutionContext.getJobDetail().getKey().getName()); scriptHistory = scriptHistoryService.findById(scriptHistoryId); - if (scriptHistoryService.execTimeout(scriptHistory)) { + if ((System.currentTimeMillis() - scriptHistory.getSubmitTime().getTime()) > (scriptHistory.getTimeout() * 60 * 1000)) { + scriptHistory.updateState(Constant.JobState.TIMEOUT); + scriptHistory.setFinishTime(new Date()); + scriptHistoryService.save(scriptHistory); + interrupted = true; return; } - scriptHistory.updateState(Constant.JobState.SUBMITTING); - scriptHistory.setStartTime(new Date()); - scriptHistoryService.save(scriptHistory); String command = scriptHistory.getContent(); try { if (Constant.ScriptType.SHELL.equals(scriptHistory.getScriptType())) { @@ -96,8 +104,8 @@ public class ScriptHistoryShellRunnerJob extends AbstractRetryableJob implements } LOGGER.error(e.getMessage(), e); scriptHistory.updateState(Constant.JobState.FAILED); - scriptHistory.setFinishTime(new Date()); scriptHistory.setErrors(e.getMessage()); + scriptHistory.setFinishTime(new Date()); scriptHistoryService.save(scriptHistory); //重试 retryCurrentNode(scriptHistory, Constant.ErrorType.FAILED); @@ -179,6 +187,7 @@ public class ScriptHistoryShellRunnerJob extends AbstractRetryableJob implements scriptHistory.updateState(Constant.JobState.SUBMITTED); scriptHistory.updateState(Constant.JobState.ACCEPTED); scriptHistory.updateState(Constant.JobState.RUNNING); + scriptHistory.setStartTime(new Date()); scriptHistoryService.save(scriptHistory); //并发执行读取 readOutput(session); @@ -283,13 +292,30 @@ public class ScriptHistoryShellRunnerJob extends AbstractRetryableJob implements } private void dealInterrupted() { - // 维护已读取的执行日志 - ScriptHistory dbScriptHistory = scriptHistoryService.findById(scriptHistory.getId()); - scriptHistory.setState(dbScriptHistory.getState()); - scriptHistory.setSteps(dbScriptHistory.getSteps()); - scriptHistory.setFinishTime(dbScriptHistory.getFinishTime()); - scriptHistory.setJobFinalStatus(dbScriptHistory.getJobFinalStatus()); + Object timeout = context.getMergedJobDataMap().get("timeout"); + if (scriptHistory.getClusterId() != null && scriptHistory.getState().equals(Constant.JobState.SUBMITTING)) { + Cluster cluster = clusterService.findById(scriptHistory.getClusterId()); + String [] jobParams = scriptHistory.getJobParams().split(";"); + HttpYarnApp httpYarnApp = YarnApiUtils.getActiveApp(cluster.getYarnUrl(), jobParams[0], jobParams[1], jobParams[2], 3); + if (httpYarnApp != null) { + scriptHistory.updateState(Constant.JobState.SUBMITTED); + scriptHistory.setJobFinalStatus("UNDEFINED"); + scriptHistoryService.save(scriptHistory); + return; + } + } + if (timeout != null) { + scriptHistory.updateState(Constant.JobState.TIMEOUT); + } else { + scriptHistory.updateState(Constant.JobState.KILLED); + } + scriptHistory.setFinishTime(new Date()); scriptHistoryService.save(scriptHistory); + // 手动kill不重试 + if (Constant.JobState.TIMEOUT.equals(scriptHistory.getState())) { + // 重试 + retryCurrentNode(scriptHistory, Constant.ErrorType.TIMEOUT); + } } private void kill() { @@ -356,7 +382,15 @@ public class ScriptHistoryShellRunnerJob extends AbstractRetryableJob implements } public static void build(ScriptHistory scriptHistory, Date startDate) { - SchedulerUtils.scheduleSimpleJob(ScriptHistoryShellRunnerJob.class, scriptHistory.getId(), Constant.JobGroup.SCRIPT_HISTORY, 0, 0, null, startDate, null); + JobDataMap jobDataMap = new JobDataMap(); + jobDataMap.put("timeout", scriptHistory.getTimeout()); + jobDataMap.put("submitTime", scriptHistory.getSubmitTime()); + SchedulerUtils.scheduleSimpleJob(ScriptJob.class, scriptHistory.getId(), Constant.JobGroup.SCRIPT_JOB, 0, 0, jobDataMap, startDate, null); + } + + public static void destroy(Object name) { + SchedulerUtils.interrupt(name, Constant.JobGroup.SCRIPT_JOB); + SchedulerUtils.deleteJob(name, Constant.JobGroup.SCRIPT_JOB); } } diff --git a/src/main/java/com/meiyou/bigwhale/job/MonitorJob.java b/src/main/java/com/meiyou/bigwhale/scheduler/monitor/StreamJobMonitor.java similarity index 91% rename from src/main/java/com/meiyou/bigwhale/job/MonitorJob.java rename to src/main/java/com/meiyou/bigwhale/scheduler/monitor/StreamJobMonitor.java index fedb36a9836ca1a9be337aaf64565914a1f5b942..d9502efe78fcdcc67595e943057836dd9d1c76bf 100644 --- a/src/main/java/com/meiyou/bigwhale/job/MonitorJob.java +++ b/src/main/java/com/meiyou/bigwhale/scheduler/monitor/StreamJobMonitor.java @@ -1,4 +1,4 @@ -package com.meiyou.bigwhale.job; +package com.meiyou.bigwhale.scheduler.monitor; import com.meiyou.bigwhale.common.Constant; import com.meiyou.bigwhale.common.pojo.BackpressureInfo; @@ -6,6 +6,8 @@ import com.meiyou.bigwhale.entity.Cluster; import com.meiyou.bigwhale.entity.Monitor; import com.meiyou.bigwhale.entity.Script; import com.meiyou.bigwhale.entity.ScriptHistory; +import com.meiyou.bigwhale.scheduler.AbstractNoticeable; +import com.meiyou.bigwhale.scheduler.job.ScriptJob; import com.meiyou.bigwhale.service.*; import com.meiyou.bigwhale.util.SchedulerUtils; import com.meiyou.bigwhale.util.YarnApiUtils; @@ -20,7 +22,7 @@ import org.springframework.beans.factory.annotation.Autowired; * @description file description */ @DisallowConcurrentExecution -public class MonitorJob extends AbstractNoticeableJob implements InterruptableJob { +public class StreamJobMonitor extends AbstractNoticeable implements InterruptableJob { private Thread thread; private volatile boolean interrupted = false; @@ -55,7 +57,7 @@ public class MonitorJob extends AbstractNoticeableJob implements InterruptableJo monitor.setRealFireTime(jobExecutionContext.getFireTime()); monitorService.save(monitor); script = scriptService.findOneByQuery("monitorId=" + monitorId); - scriptHistory = scriptHistoryService.findNoScheduleLatestByScriptId(script.getId()); + scriptHistory = scriptHistoryService.findScriptLatest(script.getId()); if (scriptHistory == null) { restart(); return; @@ -64,7 +66,7 @@ public class MonitorJob extends AbstractNoticeableJob implements InterruptableJo if (scriptHistory.getMonitorId() == null) { scriptHistory.setMonitorId(monitorId); } - if (Constant.JobState.INITED.equals(scriptHistory.getState()) || + if (Constant.JobState.SUBMIT_WAIT.equals(scriptHistory.getState()) || Constant.JobState.SUBMITTING.equals(scriptHistory.getState())) { return; } @@ -187,20 +189,22 @@ public class MonitorJob extends AbstractNoticeableJob implements InterruptableJo } private boolean restart() { - ScriptHistory scriptHistory = scriptHistoryService.findNoScheduleLatestByScriptId(script.getId()); + ScriptHistory scriptHistory = scriptHistoryService.findScriptLatest(script.getId()); if (scriptHistory != null && scriptHistory.isRunning()) { return true; } scriptHistory = scriptService.generateHistory(script, monitor); - if (scriptHistory != null) { - ScriptHistoryShellRunnerJob.build(scriptHistory); + if (Constant.JobState.SUBMIT_WAIT.equals(scriptHistory.getState())) { + scriptHistory.updateState(Constant.JobState.SUBMITTING); + scriptHistory = scriptHistoryService.save(scriptHistory); + ScriptJob.build(scriptHistory); return true; } return false; } public static void build(Monitor monitor) { - SchedulerUtils.scheduleCronJob(MonitorJob.class, + SchedulerUtils.scheduleCronJob(StreamJobMonitor.class, monitor.getId(), Constant.JobGroup.MONITOR, monitor.generateCron()); diff --git a/src/main/java/com/meiyou/bigwhale/job/system/ActiveYarnAppRefreshJob.java b/src/main/java/com/meiyou/bigwhale/scheduler/system/ActiveYarnAppRefresher.java similarity index 97% rename from src/main/java/com/meiyou/bigwhale/job/system/ActiveYarnAppRefreshJob.java rename to src/main/java/com/meiyou/bigwhale/scheduler/system/ActiveYarnAppRefresher.java index 62890b1ac0073bc00ccb665f2e5e8172284b352c..f114dfa252306575a9cc1e9746e6af0c28cf5571 100644 --- a/src/main/java/com/meiyou/bigwhale/job/system/ActiveYarnAppRefreshJob.java +++ b/src/main/java/com/meiyou/bigwhale/scheduler/system/ActiveYarnAppRefresher.java @@ -1,4 +1,4 @@ -package com.meiyou.bigwhale.job.system; +package com.meiyou.bigwhale.scheduler.system; import com.meiyou.bigwhale.common.Constant; import com.meiyou.bigwhale.common.pojo.HttpYarnApp; @@ -7,7 +7,7 @@ import com.meiyou.bigwhale.entity.YarnApp; import com.meiyou.bigwhale.service.ClusterService; import com.meiyou.bigwhale.service.ClusterUserService; import com.meiyou.bigwhale.service.YarnAppService; -import com.meiyou.bigwhale.job.AbstractNoticeableJob; +import com.meiyou.bigwhale.scheduler.AbstractNoticeable; import com.meiyou.bigwhale.util.YarnApiUtils; import com.meiyou.bigwhale.entity.Cluster; import com.meiyou.bigwhale.entity.ClusterUser; @@ -26,7 +26,7 @@ import java.util.*; * @description file description */ @DisallowConcurrentExecution -public class ActiveYarnAppRefreshJob extends AbstractNoticeableJob implements InterruptableJob { +public class ActiveYarnAppRefresher extends AbstractNoticeable implements InterruptableJob { private static int checkAppDuplicateAndNoRunningSkipCount = 0; private static int checkAppMemorySkipCount = 0; diff --git a/src/main/java/com/meiyou/bigwhale/job/system/PlatformTimeoutJob.java b/src/main/java/com/meiyou/bigwhale/scheduler/system/PlatformTimeoutChecker.java similarity index 81% rename from src/main/java/com/meiyou/bigwhale/job/system/PlatformTimeoutJob.java rename to src/main/java/com/meiyou/bigwhale/scheduler/system/PlatformTimeoutChecker.java index 5d805d9ea285b15aaf16b7192f74cccee8bb7212..cc15ef5f97fbe92096c7a2a8cce4519355eab6e9 100644 --- a/src/main/java/com/meiyou/bigwhale/job/system/PlatformTimeoutJob.java +++ b/src/main/java/com/meiyou/bigwhale/scheduler/system/PlatformTimeoutChecker.java @@ -1,16 +1,14 @@ -package com.meiyou.bigwhale.job.system; +package com.meiyou.bigwhale.scheduler.system; import com.meiyou.bigwhale.common.Constant; import com.meiyou.bigwhale.entity.*; -import com.meiyou.bigwhale.job.AbstractNoticeableJob; -import com.meiyou.bigwhale.job.ScriptHistoryYarnStateRefreshJob; +import com.meiyou.bigwhale.scheduler.AbstractNoticeable; +import com.meiyou.bigwhale.scheduler.ScriptJobYarnStateRefresher; import com.meiyou.bigwhale.service.*; import com.meiyou.bigwhale.util.YarnApiUtils; import com.meiyou.bigwhale.util.SchedulerUtils; import org.apache.commons.lang.time.DateUtils; import org.quartz.*; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import java.util.Date; @@ -22,9 +20,7 @@ import java.util.List; * @description file description */ @DisallowConcurrentExecution -public class PlatformTimeoutJob extends AbstractNoticeableJob implements Job { - - private static final Logger LOGGER = LoggerFactory.getLogger(PlatformTimeoutJob.class); +public class PlatformTimeoutChecker extends AbstractNoticeable implements Job { @Autowired private MonitorService monitorService; @@ -43,7 +39,7 @@ public class PlatformTimeoutJob extends AbstractNoticeableJob implements Job { try { executionContexts = SchedulerUtils.getScheduler().getCurrentlyExecutingJobs(); } catch (SchedulerException e) { - LOGGER.error(e.getMessage(), e); + e.printStackTrace(); return; } Date current = new Date(); @@ -53,11 +49,11 @@ public class PlatformTimeoutJob extends AbstractNoticeableJob implements Job { JobKey jobKey = executionContext.getJobDetail().getKey(); //yarn应用列表更新 if (Constant.JobGroup.COMMON.equals(jobKey.getGroup())) { - if (ActiveYarnAppRefreshJob.class.getSimpleName().equals(jobKey.getName())) { + if (ActiveYarnAppRefresher.class.getSimpleName().equals(jobKey.getName())) { notice("调度平台-Yarn应用列表更新任务", "系统任务运行超时"); SchedulerUtils.interrupt(jobKey.getName(), jobKey.getGroup()); } - if (ScriptHistoryYarnStateRefreshJob.class.getSimpleName().equals(jobKey.getName())) { + if (ScriptJobYarnStateRefresher.class.getSimpleName().equals(jobKey.getName())) { notice("调度平台-Yarn应用状态更新任务", "系统任务运行超时"); SchedulerUtils.interrupt(jobKey.getName(), jobKey.getGroup()); } @@ -67,7 +63,7 @@ public class PlatformTimeoutJob extends AbstractNoticeableJob implements Job { //杀掉应用 Monitor monitor = monitorService.findById(Integer.parseInt(jobKey.getName())); Script script = scriptService.findOneByQuery("monitorId=" + monitor.getId()); - ScriptHistory scriptHistory = scriptHistoryService.findNoScheduleLatestByScriptId(script.getId()); + ScriptHistory scriptHistory = scriptHistoryService.findScriptLatest(script.getId()); if (scriptHistory.getJobId() != null) { Cluster cluster = clusterService.findById(scriptHistory.getClusterId()); boolean success = YarnApiUtils.killApp(cluster.getYarnUrl(), scriptHistory.getJobId()); diff --git a/src/main/java/com/meiyou/bigwhale/job/ScriptHistoryClearJob.java b/src/main/java/com/meiyou/bigwhale/scheduler/system/ScriptHistoryCleaner.java similarity index 91% rename from src/main/java/com/meiyou/bigwhale/job/ScriptHistoryClearJob.java rename to src/main/java/com/meiyou/bigwhale/scheduler/system/ScriptHistoryCleaner.java index 48e9c712fa9bcc36f7d0fbe9edb909e00b5632a1..c35ff6dd7a6743b088884ac022259e7d7faeb792 100644 --- a/src/main/java/com/meiyou/bigwhale/job/ScriptHistoryClearJob.java +++ b/src/main/java/com/meiyou/bigwhale/scheduler/system/ScriptHistoryCleaner.java @@ -1,4 +1,4 @@ -package com.meiyou.bigwhale.job; +package com.meiyou.bigwhale.scheduler.system; import com.meiyou.bigwhale.entity.ScriptHistory; import com.meiyou.bigwhale.service.ScriptHistoryService; @@ -17,7 +17,7 @@ import java.util.List; * @description file description */ @DisallowConcurrentExecution -public class ScriptHistoryClearJob implements Job { +public class ScriptHistoryCleaner implements Job { private static final DateFormat DATE_FORMAT = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); diff --git a/src/main/java/com/meiyou/bigwhale/scheduler/workflow/ScheduleJobBuilder.java b/src/main/java/com/meiyou/bigwhale/scheduler/workflow/ScheduleJobBuilder.java new file mode 100644 index 0000000000000000000000000000000000000000..b848024b3e596cf041c61ba479ffa70f209277a0 --- /dev/null +++ b/src/main/java/com/meiyou/bigwhale/scheduler/workflow/ScheduleJobBuilder.java @@ -0,0 +1,74 @@ +package com.meiyou.bigwhale.scheduler.workflow; + +import com.meiyou.bigwhale.common.Constant; +import com.meiyou.bigwhale.entity.Script; +import com.meiyou.bigwhale.entity.ScriptHistory; +import com.meiyou.bigwhale.entity.Schedule; +import com.meiyou.bigwhale.service.ScriptHistoryService; +import com.meiyou.bigwhale.service.ScriptService; +import com.meiyou.bigwhale.service.ScheduleService; +import com.meiyou.bigwhale.util.SchedulerUtils; +import org.quartz.*; +import org.springframework.beans.factory.annotation.Autowired; + +import java.text.DateFormat; +import java.text.SimpleDateFormat; +import java.util.Map; + +/** + * @author Suxy + * @date 2019/9/5 + * @description file description + */ +public class ScheduleJobBuilder implements Job { + + private DateFormat dateFormat = new SimpleDateFormat("yyyyMMddHHmmss"); + + @Autowired + private ScriptHistoryService scriptHistoryService; + @Autowired + private ScriptService scriptService; + @Autowired + private ScheduleService scheduleService; + + @Override + public void execute(JobExecutionContext jobExecutionContext) { + Integer scheduleId = Integer.parseInt(jobExecutionContext.getJobDetail().getKey().getName()); + Schedule schedule = scheduleService.findById(scheduleId); + schedule.setRealFireTime(jobExecutionContext.getFireTime()); + schedule.setNeedFireTime(jobExecutionContext.getScheduledFireTime()); + schedule.setNextFireTime(jobExecutionContext.getNextFireTime()); + scheduleService.save(schedule); + prepareNext(jobExecutionContext, schedule); + } + + private void prepareNext(JobExecutionContext jobExecutionContext, Schedule schedule) { + String scheduleInstanceId = dateFormat.format(jobExecutionContext.getNextFireTime()); + ScriptHistory scriptHistory = scriptHistoryService.findOneByQuery("scheduleId=" + schedule.getId() + + ";scheduleInstanceId=" + scheduleInstanceId); + if (scriptHistory != null) { + return; + } + generateHistory(schedule, scheduleInstanceId, null); + } + + private void generateHistory(Schedule schedule, String scheduleInstanceId, String previousScheduleTopNodeId) { + Map nextNodeIdToObj = schedule.analyzeNextNode(previousScheduleTopNodeId); + for (String nodeId : nextNodeIdToObj.keySet()) { + Script script = scriptService.findOneByQuery("scheduleId=" + schedule.getId() + ";scheduleTopNodeId=" + nodeId); + scriptService.generateHistory(script, schedule, scheduleInstanceId, previousScheduleTopNodeId); + generateHistory(schedule, scheduleInstanceId, nodeId); + } + } + + public static void build(Schedule schedule) { + SchedulerUtils.scheduleCronJob(ScheduleJobBuilder.class, + schedule.getId(), + Constant.JobGroup.SCHEDULE, + schedule.generateCron(), + null, + schedule.getStartTime(), + schedule.getEndTime()); + } + +} diff --git a/src/main/java/com/meiyou/bigwhale/scheduler/workflow/ScheduleJobSubmitter.java b/src/main/java/com/meiyou/bigwhale/scheduler/workflow/ScheduleJobSubmitter.java new file mode 100644 index 0000000000000000000000000000000000000000..72640e5ccd77b685e7399d959ab11615bc7d7e0c --- /dev/null +++ b/src/main/java/com/meiyou/bigwhale/scheduler/workflow/ScheduleJobSubmitter.java @@ -0,0 +1,135 @@ +package com.meiyou.bigwhale.scheduler.workflow; + +import com.meiyou.bigwhale.common.Constant; +import com.meiyou.bigwhale.entity.ScriptHistory; +import com.meiyou.bigwhale.scheduler.job.ScriptJob; +import com.meiyou.bigwhale.service.ScriptHistoryService; +import org.quartz.DisallowConcurrentExecution; +import org.quartz.Job; +import org.quartz.JobExecutionContext; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.data.domain.Sort; +import org.springframework.jdbc.core.JdbcTemplate; + +import java.text.DateFormat; +import java.text.SimpleDateFormat; +import java.util.Date; +import java.util.List; +import java.util.stream.Collectors; + +/** + * @author Suxy + * @date 2020/4/23 + * @description file description + */ +@DisallowConcurrentExecution +public class ScheduleJobSubmitter implements Job { + + private static final DateFormat DATE_FORMAT = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); + + @Autowired + private JdbcTemplate jdbcTemplate; + @Autowired + private ScriptHistoryService scriptHistoryService; + + @Override + public void execute(JobExecutionContext jobExecutionContext) { + updateUnconfirmed(); + submitTimeWait(); + } + + private void updateUnconfirmed() { + List scriptHistories = scriptHistoryService.findByQuery("scheduleId+;scheduleRunnable=true;state=" + Constant.JobState.UN_CONFIRMED_, + new Sort(Sort.Direction.ASC, "id")); + for (ScriptHistory scriptHistory : scriptHistories) { + boolean switchTimeWait; + if (scriptHistory.getPreviousScheduleTopNodeId() == null) { + switchTimeWait = true; + } else { + String previousNodeState = previousNodeState(scriptHistory.getScheduleId(), scriptHistory.getScheduleInstanceId(), scriptHistory.getPreviousScheduleTopNodeId()); + switch (previousNodeState) { + case Constant.JobState.SUCCEEDED: + switchTimeWait = true; + break; + case Constant.JobState.FAILED: + switchTimeWait = false; + break; + case Constant.JobState.RUNNING: + default: + continue; + } + } + if (switchTimeWait) { + scriptHistory.updateState(Constant.JobState.TIME_WAIT_); + scriptHistory.setDelayTime(scriptHistory.getBusinessTime()); + jdbcTemplate.update("UPDATE " + + " script_history " + + "SET " + + " state = ?, " + + " steps = ?, " + + " delay_time = ? " + + "WHERE " + + " id = ?;", + scriptHistory.getState(), scriptHistory.getSteps(), scriptHistory.getDelayTime(), + scriptHistory.getId() + ); + } else { + scriptHistory.setScheduleRunnable(false); + scriptHistoryService.switchScheduleRunnable(scriptHistory.getId(), scriptHistory.getScheduleRunnable()); + } + } + } + + private void submitTimeWait() { + List scriptHistories = scriptHistoryService.findByQuery("scheduleId+;" + + "delayTime<=" + DATE_FORMAT.format(new Date()) + ";" + + "state=" + Constant.JobState.TIME_WAIT_, + new Sort(Sort.Direction.ASC, "id")); + for (ScriptHistory scriptHistory : scriptHistories) { + scriptHistory.updateState(Constant.JobState.SUBMIT_WAIT); + scriptHistory.updateState(Constant.JobState.SUBMITTING); + scriptHistory = scriptHistoryService.save(scriptHistory); + ScriptJob.build(scriptHistory); + } + } + + /** + * @param scheduleId + * @param scheduleInstanceId + * @param previousScheduleTopNodeId + * @return + */ + private String previousNodeState(Integer scheduleId, String scheduleInstanceId, String previousScheduleTopNodeId) { + List scriptHistories = scriptHistoryService.findByQuery("scheduleId=" + scheduleId + + ";scheduleTopNodeId=" + previousScheduleTopNodeId + + ";scheduleInstanceId=" + scheduleInstanceId, new Sort(Sort.Direction.DESC, "id")); + // 判断是否重试完毕 + for (ScriptHistory scriptHistory : scriptHistories) { + if (scriptHistory.getScheduleFailureHandle() != null) { + String [] scheduleFailureHandleArr = scriptHistory.getScheduleFailureHandle().split(";"); + int failureRetries = Integer.parseInt(scheduleFailureHandleArr[0]); + int currFailureRetries = Integer.parseInt(scheduleFailureHandleArr[2]); + if (currFailureRetries < failureRetries) { + // 一般失败后便会进入重试流程,如果最后一个任务结束30S之后还没有进入重试流程,则直接走状态判断流程 + if ((System.currentTimeMillis() - scriptHistory.getFinishTime().getTime()) < 30000) { + return Constant.JobState.RUNNING; + } + } + break; + } + } + // 判断最后一次记录 + ScriptHistory lastScriptHistory = scriptHistories.get(0); + if (!lastScriptHistory.getScheduleRunnable()) { + return Constant.JobState.FAILED; + } + if (Constant.JobState.UN_CONFIRMED_.equals(lastScriptHistory.getState()) || lastScriptHistory.isRunning()) { + return Constant.JobState.RUNNING; + } + if (Constant.JobState.SUCCEEDED.equals(lastScriptHistory.getState())) { + return Constant.JobState.SUCCEEDED; + } + return Constant.JobState.FAILED; + } + +} diff --git a/src/main/java/com/meiyou/bigwhale/service/ScheduleService.java b/src/main/java/com/meiyou/bigwhale/service/ScheduleService.java index 987fe05b899df6caea48e34f11414b2defb81b22..0c7e3bb1b03bc1097ab132b7e8f27dc85f95cb8b 100644 --- a/src/main/java/com/meiyou/bigwhale/service/ScheduleService.java +++ b/src/main/java/com/meiyou/bigwhale/service/ScheduleService.java @@ -10,7 +10,7 @@ import java.util.List; public interface ScheduleService extends PagingAndSortingQueryService { - void update(Schedule entity, List -