分布式任务调度系统chaconne介绍

分布式任务调度系统chaconne介绍本文介绍了如何在 Chaconne 框架中使用接口创建任务 包括不同类型的依赖关系 如串行依赖和并行依赖 以及 DAG 有向无环图 的工作流程

大家好,欢迎来到IT知识分享网。

@Run

public Object execute(JobKey jobKey, Object attachment, Logger log) throws Exception {

log.info(“DemoCronJob is running at: {}”, DateUtils.format(System.currentTimeMillis()));

return RandomUtils.randomLong(L, L);

}

@OnSuccess

public void onSuccess(JobKey jobKey, Object result, Logger log) {

log.info(“DemoCronJob’s return value is: {}”, result);

}

@OnFailure

public void onFailure(JobKey jobKey, Throwable e, Logger log) {

log.error(“DemoCronJob is failed by cause: {}”, e.getMessage(), e);

}

}

接口方式创建任务

@Component

public class HealthCheckJob extends ManagedJob {

@Override

public Object execute(JobKey jobKey, Object arg, Logger log) {

log.info(info());

return UUID.randomUUID().toString();

}

@Override

public Trigger getTrigger() {

return GenericTrigger.Builder.newTrigger(“*/5 * * * * ?”).setStartDate(DateUtils.addSeconds(new Date(), 30)).build();

}

private String info() {

long totalMemory = Runtime.getRuntime().totalMemory();

long usedMemory = totalMemory – Runtime.getRuntime().freeMemory();

return FileUtils.formatSize(usedMemory) + “/” + FileUtils.formatSize(totalMemory);

}

@Override

public long getTimeout() {

return 60L * 1000;

}

}

动态任务:

public class EtlJob implements NotManagedJob {

@Override

public Object execute(JobKey jobKey, Object attachment, Logger log) {

log.info(“JobKey:{}, Parameter: {}”, jobKey, attachment);

return null;

}

}

任务依赖

任务依赖是chaconne框架的重要特性之一,任务依赖分为串行依赖并行依赖

所谓串行依赖是指任务A做完接着执行任务B, 即任务B依赖任务A。

并行依赖是指,比如有3个任务,分别为任务A, 任务B, 任务C, 任务A和任务B都做完才能执行任务C, 类似会签的业务场景。

串行依赖和并行依赖都可以共享任务上下文参数和运行结果,并且支持自定义判断策略来决定要不要触发下游任务。

DAG(有向无环图)

而在结合串行依赖和并行依赖的基础上,chaconne框架又提供了DAG功能并提供了友好的API,来模拟类似工作流的业务场景,更加丰富了任务依赖的使用场景。

(这里为了方便举例,都通过注解的方式配置任务)

串行依赖示例:

@ChacJob

@ChacTrigger(triggerType = TriggerType.DEPENDENT)

@ChacDependency({ @ChacJobKey(className = “com.chinapex.test.chaconne.job.DemoSchedJob”, name = “demoSchedJob”) })

public class DemoDependentJob {

@Run

public Object execute(JobKey jobKey, Object attachment, Logger log) throws Exception {

log.info(“DemoDependentJob is running at: {}”, DateUtils.format(System.currentTimeMillis()));

return RandomUtils.randomLong(L, L);

}

@OnSuccess

public void onSuccess(JobKey jobKey, Object result, Logger log) {

log.info(“DemoDependentJob’s return value is: {}”, result);

}

@OnFailure

public void onFailure(JobKey jobKey, Throwable e, Logger log) {

log.error(“DemoDependentJob is failed by cause: {}”, e.getMessage(), e);

}

}

并行依赖示例:

有3个任务,DemoTask, DemoTaskOne, DemoTaskTwo

让DemoTaskOne, DemoTaskTwo都做完再执行DemoTask,且DemoTask可以获得DemoTaskOne, DemoTaskTwo执行后的值

DemoTaskOne:

@ChacJob

@ChacTrigger(triggerType = TriggerType.SIMPLE)

public class DemoTaskOne {

@Run

public Object execute(JobKey jobKey, Object attachment, Logger log) throws Exception {

log.info(“DemoTaskOne is running at: {}”, DateUtils.format(System.currentTimeMillis()));

return RandomUtils.randomLong(L, L);

}

@OnSuccess

public void onSuccess(JobKey jobKey, Object result, Logger log) {

log.info(“DemoTaskOne return value is: {}”, result);

}

@OnFailure

public void onFailure(JobKey jobKey, Throwable e, Logger log) {

log.error(“DemoTaskOne is failed by cause: {}”, e.getMessage(), e);

}

}

DemoTaskTwo:

@ChacJob

@ChacTrigger(triggerType = TriggerType.SIMPLE)

public class DemoTaskTwo {

@Run

public Object execute(JobKey jobKey, Object attachment, Logger log) throws Exception {

log.info(“DemoTaskTwo is running at: {}”, DateUtils.format(System.currentTimeMillis()));

return RandomUtils.randomLong(L, L);

}

@OnSuccess

public void onSuccess(JobKey jobKey, Object result, Logger log) {

log.info(“DemoTaskTwo return value is: {}”, result);

}

@OnFailure

public void onFailure(JobKey jobKey, Throwable e, Logger log) {

log.error(“DemoTaskTwo is failed by cause: {}”, e.getMessage(), e);

}

}

DemoTask:

@ChacJob

@ChacTrigger(cron = “0 0/1 * * * ?”, triggerType = TriggerType.CRON)

@ChacFork({ @ChacJobKey(className = “com.chinapex.test.chaconne.job.DemoTaskOne”, name = “demoTaskOne”),

@ChacJobKey(className = “com.chinapex.test.chaconne.job.DemoTaskTwo”, name = “demoTaskTwo”) })

public class DemoTask {

@Run

public Object execute(JobKey jobKey, Object attachment, Logger log) throws Exception {

log.info(“DemoTask is running at: {}”, DateUtils.format(System.currentTimeMillis()));

TaskJoinResult joinResult = (TaskJoinResult) attachment;

TaskForkResult[] forkResults = joinResult.getTaskForkResults();

long max = 0;

for (TaskForkResult forkResult : forkResults) {

max = Long.max(max, (Long) forkResult.getResult());

}

return max;

}

@OnSuccess

public void onSuccess(JobKey jobKey, Object result, Logger log) {

log.info(“DemoTask return max value is: {}”, result);

}

@OnFailure

public void onFailure(JobKey jobKey, Throwable e, Logger log) {

log.error(“DemoTask is failed by cause: {}”, e.getMessage(), e);

}

}

DAG任务示例

DAG任务目前只支持API创建, 后续会持续改进,增加界面方式创建DAG任务

@RequestMapping(“/dag”)

@RestController

public class DagJobController {

@Value(“${spring.application.cluster.name}”)

private String clusterName;

@Value(“${spring.application.name}”)

private String applicationName;

@Autowired

private JobManager jobManager;

@GetMapping(“/create”)

public Map<String, Object> createDagTask() throws Exception {

Dag dag = new Dag(clusterName, applicationName, “testDag”);// 创建一个Dag任务,并指定集群名,应用名,和任务名称

dag.setTrigger(new CronTrigger(“0 0/1 * * * ?”));// 设置Cron表达式

dag.setDescription(“This is only a demo of dag job”);// 任务描述

DagFlow first = dag.startWith(clusterName, applicationName, “demoDagStart”, DemoDagStart.class.getName());// 定义第一个节点

DagFlow second = first.flow(clusterName, applicationName, “demoDag”, DemoDag.class.getName());// 定义第二个节点

// 第二个节点fork两个子进程处理

second.fork(clusterName, applicationName, “demoDagOne”, DemoDagOne.class.getName());

second.fork(clusterName, applicationName, “demoDagTwo”, DemoDagTwo.class.getName());

jobManager.persistJob(dag, “123”);

return Collections.singletonMap(“ok”, 1);

}

}

上面的DAG示例说明一下,chaconne框架提供的DAG模型支持串行流入,即flow模式,也提供了fork模式进行并行处理,上例中,任务demoDag fork了两个子进程(“demoDagOne”和“demoDagTwo”),即demoDagOne和demoDagTwo同时处理完了再触发demoDag任务。

Chaconne部署说明

chaconne除了依托SpringBoot框架外,默认用MySQL存储任务信息(目前仅支持MySQL,后续会支持更多类型的数据库), 用Redis保存集群元数据和进行消息广播

所以无论使用哪种部署方式,你都需要在你的应用中设置DataSource和RedisConnectionFactory

示例代码:

@Slf4j

@Configuration

public class ResourceConfig {

@Setter

@Configuration(proxyBeanMethods = false)

@ConfigurationProperties(prefix = “spring.datasource”)

public static class DataSourceConfig {

private String jdbcUrl;

private String username;

private String password;

private String driverClassName;

private HikariConfig getDbConfig() {

if (log.isTraceEnabled()) {

log.trace(“DataSourceConfig JdbcUrl: ” + jdbcUrl);

log.trace(“DataSourceConfig Username: ” + username);

log.trace(“DataSourceConfig Password: ” + password);

log.trace(“DataSourceConfig DriverClassName: ” + driverClassName);

}

final HikariConfig config = new HikariConfig();

config.setDriverClassName(driverClassName);

自我介绍一下,小编13年上海交大毕业,曾经在小公司待过,也去过华为、OPPO等大厂,18年进入阿里一直到现在。

深知大多数Java工程师,想要提升技能,往往是自己摸索成长或者是报班学习,但对于培训机构动则几千的学费,着实压力不小。自己不成体系的自学效果低效又漫长,而且极易碰到天花板技术停滞不前!

因此收集整理了一份《2024年Java开发全套学习资料》,初衷也很简单,就是希望能够帮助到想自学提升又不知道该从何学起的朋友,同时减轻大家的负担。img

分布式任务调度系统chaconne介绍

分布式任务调度系统chaconne介绍

既有适合小白学习的零基础资料,也有适合3年以上经验的小伙伴深入学习提升的进阶课程,基本涵盖了95%以上Java开发知识点,真正体系化!

由于文件比较大,这里只是将部分目录截图出来,每个节点里面都包含大厂面经、学习笔记、源码讲义、实战项目、讲解视频,并且会持续更新!

如果你觉得这些内容对你有帮助,可以扫码获取!!(备注Java获取)

img

1200页Java架构面试专题及答案

小编整理不易,对这份1200页Java架构面试专题及答案感兴趣劳烦帮忙转发/点赞

分布式任务调度系统chaconne介绍

分布式任务调度系统chaconne介绍

百度、字节、美团等大厂常见面试题

分布式任务调度系统chaconne介绍

《互联网大厂面试真题解析、进阶开发核心学习笔记、全套讲解视频、实战项目源码讲义》点击传送门即可获取!
含大厂面经、学习笔记、源码讲义、实战项目、讲解视频,并且会持续更新!

如果你觉得这些内容对你有帮助,可以扫码获取!!(备注Java获取)

img

1200页Java架构面试专题及答案

小编整理不易,对这份1200页Java架构面试专题及答案感兴趣劳烦帮忙转发/点赞

[外链图片转存中…(img-BNOjSLrH-66)]

[外链图片转存中…(img-IBaE3gRq-66)]

百度、字节、美团等大厂常见面试题

[外链图片转存中…(img-rgdjFBUM-67)]

《互联网大厂面试真题解析、进阶开发核心学习笔记、全套讲解视频、实战项目源码讲义》点击传送门即可获取!

免责声明:本站所有文章内容,图片,视频等均是来源于用户投稿和互联网及文摘转载整编而成,不代表本站观点,不承担相关法律责任。其著作权各归其原作者或其出版社所有。如发现本站有涉嫌抄袭侵权/违法违规的内容,侵犯到您的权益,请在线联系站长,一经查实,本站将立刻删除。 本文来自网络,若有侵权,请联系删除,如若转载,请注明出处:https://haidsoft.com/151038.html

(0)
上一篇 2025-03-15 19:45
下一篇 2025-03-15 20:05

相关推荐

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注

关注微信