This action will force synchronization from 快鸟/dal-job, which will overwrite any changes that you have made since you forked the repository, and can not be recovered!!!
Synchronous operation will process in the background and will refresh the page when finishing processing. Please be patient.
dal-job是一个去中心化的轻量的分布式Job框架。它没有master结点,代码是在各个模块上运行的。
帮助开发人员在分布式环境下开发job时,只用关注业务,而不用去关心job被重复执行的问题。
它主要提供如下功能:
问题思考
在分布式环境下,本地去中心化的分布式job需要解决的问题:
使用数据库的行级锁来保证同一时刻只有一台机在执行任务。
具体:使用 【悲观锁 + JobStatusCheck + TimeLimit】 实现在多线程与多进程(主要是多进程)环境下,一个job在运行过程中,只会有一台机在执行job
使用Quartz + MySQL。同时与Spring友好融合。提供注解(@TimedTask)形式的job配置
dal-job支持分布式环境下单台启动 和 多台启动。其中多台启动会在所有的实例上运行,需要自己解决取数问题。
dal-job提供了注解形式的job配置,具体可以参考com.kvn.dal.core.single_node.SingleNodeJob.java
@Target({ TYPE })
@Retention(RUNTIME)
public @interface TimedTask {
String corn();
boolean isGlobalSingle() default true; // 分布式环境下,是否单台启动
String desc() default "";
}
定时调度例子:
@TimedTask(corn = "0 0/1 * * * ?", desc = "测试job222")
@Service
public class MyTestJob2 implements ExecutableTask {
@Override
public void execute(JobExecutionContext context) throws JobExecutionException {
System.out.println(DateTime.now() + "--" + Thread.currentThread().getName() + "---------------doBizJob2222--------");
try {
Thread.sleep(3000L);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
if(new Random().nextInt() % 2 == 0){
throw new RuntimeException("biz执行MyTestJob2异常,xxxxxxxxx");
}
}
}
dal-job提供了内置的重试调度实现,可以方便的对异常数据进行定时重试。
重试分为两种:一是,事前重试;二是,事后重试
事前重试,即不管业务是否执行成功,都去记录执行日志(表:job_beforehand_retry),如果出现指定的异常,则标记记录为需要重试。待重试job执行时,就分发至相应的重试方法去执行。
**原理:**使用aop的方式,对需要重试的方法(含有@BeforehandRetry的方法)进行拦截
@BeforehandRetry:
/**
* 事前补偿,确保每次执行业务时都有留底。会牺牲一性的性能。
* @author wzy
* @date 2017年7月14日 下午5:03:45
*/
@Target({ METHOD })
@Retention(RUNTIME)
@Inherited
public @interface BeforehandRetry {
/**
* 执行重试的异常,默认是对BizRetryNeedException才去执行重试逻辑。业务异常是不需要重试的!!!
*/
Class<? extends Throwable> retryFor() default BizRetryNeedException.class;
/**
* 最大重试次数
*/
int maxRetryCount() default 3;
}
**例子参考:**com.kvn.dal.core.beforehand_retry.BeforehandRetryBizService.java
@Service
public class BeforehandRetryBizService {
@BeforehandRetry
public String doBiz(Foo foo, String param){
System.out.println("--->isRetryThread:" + ThreadContext.getContext().isRetryThread());
System.out.println("参数:Foo=" + JSON.toJSONString(foo) + ", param=" + param);
System.out.println("执行业务失败>>>>>>>>");
throw new BizRetryNeedException("业务失败,需要重试!!!");
}
}
事后重试,即执行业务出现异常后,对于我们需要重试的异常,将重试参数持久化到DB(表:job_retry),然后通过事后重试调度定时发起重试。
对于需要重试的类,可以通过实现 IRetrySupport 接口,或者继承 AbstractRetrySupport 类。
IRetrySupport.java
public interface IRetrySupport {
/**
* 重试
* @param retryContext 重试上下文
* @return 返回重试结果:true | false
*/
Boolean retry(AfterwardRetryContext retryContext);
}
例子参考:实现接口方式com.kvn.dal.core.afterward_retry.AfterwardRetryBizService.java
@Service
public class AfterwardRetryBizService implements IRetrySupport {
@Resource
IJobRetryDao jobRetryDao;
public void executeBiz() {
System.out.println(DateTime.now() + "--" + Thread.currentThread().getName() + "---------------doBizJob2222--------");
try {
Thread.sleep(3000L);
} catch (InterruptedException e) {
e.printStackTrace();
}
Foo foo = new Foo(1001, "xxx");
try {
throw new RuntimeException("executeBiz异常,xxxxxxxxx");
} catch (Exception e) {
ArrayList<RetryParam> retryLs = new RetryParamListWrapper().buildRetryParam(foo).buildRetryParam("xxx").buildRetryParam("hehehe").toArrayList();
JobRetry retry = JobRetry.createJobRetry(this.getClass(), "key001", retryLs);
jobRetryDao.add(retry);
throw e;
}
}
@Override
public Boolean retry(AfterwardRetryContext retryContext) {
/** 实现重试逻辑 */
String retryDataKey = retryContext.getRetryDataKey();
List<RetryParam> paramLs = retryContext.getRetryParamLs();
Foo foo = paramLs.get(0).retoreParam(Foo.class);
String originParam1 = paramLs.get(1).retoreParam(String.class);
String originParam2 = paramLs.get(2).retoreParam(String.class);
// 或者
Foo foo2 = retryContext.getRetryParamValueMap().get(Foo.class).get(0);
String originParam_1 = retryContext.getRetryParamValueMap().get(String.class).get(0);
String originParam_2 = retryContext.getRetryParamValueMap().get(String.class).get(1);
return true;
}
}
例子参考:继承类的方式com.kvn.dal.core.afterward_retry.AfterwardRetryBestPracticeService.java
@Service
public class AfterwardRetryBestPracticeService extends AbstractRetrySupport {
public void executeBiz() {
System.out.println(DateTime.now() + "--" + Thread.currentThread().getName() + "---------------doBizJob2222--------");
try {
Thread.sleep(3000L);
} catch (InterruptedException e) {
e.printStackTrace();
}
Foo foo = new Foo(1001, "xxx");
try {
throw new RuntimeException("executeBiz异常,xxxxxxxxx");
} catch (Exception e) {
this.retryEnqueue("key001", foo, "hehe", "morning");
throw e; // 出异常后,终止业务
}
}
@Override
public Boolean retry(AfterwardRetryContext retryContext) {
// 实现重试逻辑
return true;
}
}
此处可能存在不合适展示的内容,页面不予展示。您可通过相关编辑功能自查并修改。
如您确认内容无涉及 不当用语 / 纯广告导流 / 暴力 / 低俗色情 / 侵权 / 盗版 / 虚假 / 无价值内容或违法国家有关法律法规的内容,可点击提交进行申诉,我们将尽快为您处理。