E-Job_career和job

E-Job_career和job一,Quartz-Misfire什么情况下错过触发?错过触发怎么办?线程池只有5个线程,当有5个任务都在执行的时候,第六个任务即将触发,这个时候任务就不能得到执行。在quartz.properties有一个属性misfireThreshold,用来定义触发器超时的"临界值",也就是超过了这个时间,就算错过触发了。例如,如果misfireThreshold是60000(60秒),9点整应该执行的任务,9点零1分还没有可用线程执行它,就会超时(misfires)。下面这些原因

一,Quartz-Misfire

什么情况下错过触发?错过触发怎么办?
线程池只有 5 个线程,当有 5 个任务都在执行的时候,第六个任务即将触发,这个时候任务就不能得到执行。在 quartz.properties 有一个属性 misfireThreshold,用来定义触发器超时的"临界值",也就是超过了这个时间,就算错过触发了。
例如,如果 misfireThreshold 是 60000(60 秒),9 点整应该执行的任务,9 点零1 分还没有可用线程执行它,就会超时(misfires)。

下面这些原因可能造成 misfired job:
1、 没有可用线程
2、 Trigger 被暂停
3、 系统重启
4、 禁止并发执行的任务在到达触发时间时,上次执行还没有结束。

错过触发怎么办?Misfire 策略设置
每一种 Trigger 都定义了自己的 Misfire 策略,不同的策略通过不同的方法来设置。
1、 忽略
2、 立即跑一次
3、 下次跑

怎么避免任务错过触发?
合理地设置线程池数量,以及任务触发间隔。

二,E-Job概述

1.Quartz的不足

  • 作业只能通过 DB 抢占随机负载,无法协调
  • 任务不能分片——单个任务数据太多了跑不完,消耗线程,负载不均
  • 作业日志可视化监控、统计

2,发展历史

Elastic-Job 是 ddframe 中的 dd-job 作业模块分离出来的作业框架,基于 Quartz和 Curator 开发,在 2015 年开源。

轻量级,无中心化解决方案。

为什么说是去中心化呢?因为没有统一的调度中心。集群的每个节点都是对等的,节点之间通过注册中心进行分布式协调。E-Job 存在主节点的概念,但是主节点没有调度的功能,而是用于处理一些集中式任务,如分片,清理运行时信息等。

如果 ZK 挂了怎么办?

每个任务有独立的线程池。

官网地址 github地址

Elastic-Job 最开始只有一个 elastic-job-core 的项目,在 2.X 版本以后主要分为Elastic-Job-Lite 和 Elastic-Job-Cloud 两个子项目。其中,Elastic-Job-Lite 定位为轻量级 无 中 心 化 解 决 方 案 , 使 用 jar 包 的 形 式 提 供 分 布 式 任 务 的 协 调 服 务 。 而Elastic-Job-Cloud 使用 Mesos + Docker 的解决方案,额外提供资源治理、应用分发以及进程隔离等服务(跟 Lite 的区别只是部署方式不同,他们使用相同的 API,只要开发一次)。

3,功能特性

  • 分布式调度协调:用 ZK 实现注册中心
  • 错过执行作业重触发(Misfire)
  • 支持并行调度(任务分片)
  • 作业分片一致性,保证同一分片在分布式环境中仅一个执行实例
  • 弹性扩容缩容:将任务拆分为 n 个任务项后,各个服务器分别执行各自分配到的任务项。一旦有新的服务器加入集群,或现有服务器下线,elastic-job 将在保留本次任务执行不变的情况下,下次任务开始前触发任务重分片。
  • 失效转移 failover:弹性扩容缩容在下次作业运行前重分片,但本次作业执行的过程中,下线的服务器所分配的作业将不会重新被分配。失效转移功能可以在本次作业运行中用空闲服务器抓取孤儿作业分片执行。同样失效转移功能也会牺牲部分性能。
  • 支持作业生命周期操作(Listener)
  • 丰富的作业类型(Simple、DataFlow、Script)
  • Spring 整合以及命名空间提供
  • 运维平台

4,项目架构

应用在各自的节点执行任务,通过 ZK 注册中心协调。节点注册、节点选举、任务分片、监听都在 E-Job 的代码中完成。

在这里插入图片描述

三,基本操作

1,pom依赖

<dependency>
    <groupId>com.dangdang</groupId>
    <artifactId>elastic-job-lite-core</artifactId>
    <version>2.1.5</version>
</dependency>
只听到从架构师办公室传来架构君的声音:
眼波才动被人猜。有谁来对上联或下联?

2,任务类型

1)SimpleJob

SimpleJob: 简单实现,未经任何封装的类型。需实现 SimpleJob 接口。

2)DataFlowJob

DataFlowJob:Dataflow 类型用于处理数据流,必须实现 fetchData()和processData()的方法,一个用来获取数据,一个用来处理获取到的数据。

3)ScriptJob

Script:Script 类型作业意为脚本类型作业,支持 shell,python,perl 等所有类型脚本。(只要指定脚本的内容或者位置)

3,E-Job配置

1)配置步骤

配置手册

配置级别 配置类 配置内容
Core JobCoreConfiguration 用于提供作业核心配置信息,如:作业名称、CRON 表达式、分片总数等。
Type JobTypeConfiguration 有 3 个子类分别对应 SIMPLE, DATAFLOW 和 SCRIPT 类型作业,提供 3 种作业需要的不同配置,如:DATAFLOW 类型是否流式处理或 SCRIPT 类型的命令行等。Simple 和 DataFlow 需要指定任务类的路径。
Root JobRootConfiguration 有 2 个子类分别对应 Lite 和 Cloud 部署类型,提供不同部署类型所需的配置,如:Lite 类型的是否需要覆盖本地配置或 Cloud 占用 CPU 或 Memory数量等。可以定义分片策略
此代码由Java架构师必看网-架构君整理
//配置注册中心 ZookeeperRegistryCenter registryCenter = new ZookeeperRegistryCenter(new ZookeeperConfiguration("121.199.31.160:2181", "e-job")); registryCenter.init(); //定义作业核心配置 JobCoreConfiguration jobCoreConfiguration = JobCoreConfiguration.newBuilder("job-01", "0/2 * * * * ?", 1).build(); //定义SIMPLE类型配置 SimpleJobConfiguration simpleJobConfiguration = new SimpleJobConfiguration(jobCoreConfiguration, MyEJob.class.getCanonicalName()); //定义Lite作业根配置 LiteJobConfiguration liteJobConfiguration = LiteJobConfiguration.newBuilder(simpleJobConfiguration).build(); //构建Job new JobScheduler(registryCenter, liteJobConfiguration).init();

作业配置分为 3 级,分别是 JobCoreConfigurationJobTypeConfigurationLiteJobConfigurationLiteJobConfiguration 使 用 JobTypeConfigurationJobTypeConfiguration 使用 JobCoreConfiguration,层层嵌套。

JobTypeConfiguration 根 据 不 同 实 现 类 型 分 为 SimpleJobConfigurationDataflowJobConfigurationScriptJobConfiguration

E-Job 使用 ZK 来做分布式协调,所有的配置都会写入到 ZK 节点。

2)ZK注册中心数据结构

一个任务一个二级节点。

这里面有些节点是临时节点,只有任务运行的时候才能看到。

[zk: localhost:2181(CONNECTED) 9] ls /elastic-job/com.yhd.ejob.tasktype.MyEJob
[config, instances, leader, servers, sharding]

注意:修改了任务重新运行任务不生效,是因为 ZK 的信息不会更新, 除非把overwrite 修改成 true。

①config节点

JSON格式存储。

存储任务的配置信息,包含执行类,cron 表达式,分片算法类,分片数量,分片参数等等。

此代码由Java架构师必看网-架构君整理
{ "jobName":"MySimpleJob", "jobClass":"job.MySimpleJob", "jobType":"SIMPLE", "cron":"0/2 * * * * ?", "shardingTotalCount":1, "shardingItemParameters":"", "jobParameter":"", "failover":false, "misfire":true, "description":"", "jobProperties":{ "job_exception_handler":"com.dangdang.ddframe.job.executor.handler.impl.DefaultJobExceptionHandler", "executor_service_handler":"com.dangdang.ddframe.job.executor.handler.impl.DefaultExecutorServiceHandler" }, "monitorExecution":true, "maxTimeDiffSeconds":-1, "monitorPort":-1, "jobShardingStrategyClass":"", "reconcileIntervalMinutes":10, "disabled":false, "overwrite":false }

config节点的数据是通过ConfigService持久化到zookeeper中去的。默认状态下,如果你修改了 Job 的配置比如 cron 表达式、分片数量等是不会更新到 zookeeper 上去的,除非你在 Lite 级别的配置把参数 overwrite 修改成 true。

②instances节点

同一个 Job 下的 elastic-job 的部署实例。一台机器上可以启动多个 Job 实例,也就是 Jar 包。instances 的命名是 IP+@-@+PID。只有在运行的时候能看到。

③leader节点

任务实例的主节点信息,通过 zookeeper 的主节点选举,选出来的主节点信息。在 elastic job 中,任务的执行可以分布在不同的实例(节点)中,但任务分片等核心控制,需要由主节点完成。因此,任务执行前,需要选举出主节点。

下面有三个子节点:

  • election:主节点选举
  • sharding:分片
  • failover:失效转移

election 下面的 instance 节点显示了当前主节点的实例 ID:jobInstanceId。

election 下面的 latch 节点也是一个永久节点用于选举时候的实现分布式锁。

sharding 节点下面有一个临时节点,necessary,是否需要重新分片的标记。如果分片总数变化,或任务实例节点上下线或启用/禁用,以及主节点选举,都会触发设置重分片标记,主节点会进行分片计算。

④servers节点

任务实例的信息,主要是 IP 地址,任务实例的 IP 地址。跟 instances 不同,如果多个任务实例在同一台机器上运行则只会出现一个 IP 子节点。可在 IP 地址节点写入 DISABLED 表示该任务实例禁用。

⑤sharding节点

任务的分片信息,子节点是分片项序号,从 0 开始。分片个数是在任务配置中设置的。分片项序号的子节点存储详细信息。每个分片项下的子节点用于控制和记录分片运行状态。最主要的子节点就是 instance。


子节点名 是否临时节点 描述
instance 执行该分片项的作业运行实例主键
running 分片项正在运行的状态,仅配置 monitorExecution 时有效
failover 如果该分片项被失效转移分配给其他作业服务器,则此节点值记录执行此分片的作业服务器 IP
misfire 是否开启错过任务重新执行
disabled 是否禁用此分片项

四,运维平台

1,下载解压运行

github地址elastic-job-lite-console 打包得到安装包

解压缩 elastic-job-lite-console-${version}.tar.gz 并执行 bin\start.sh(Windows运行.bat)。打开浏览器访问 http://localhost:8899/即可访问控制台。

8899 为默认端口号,可通过启动脚本输入-p 自定义端口号。

默认管理员用户名和密码是 root/root。右上角可以切换语言。

2,添加 ZK 注册中心

第一步,舔加注册中心,输入 ZK 地址和命名空间,并连接。

在这里插入图片描述

运维平台和 elastic-job-lite 并无直接关系,是通过读取作业注册中心数据展现作业状态,或更新注册中心数据修改全局配置。

控制台只能控制作业本身是否运行,但不能控制作业进程的启动,因为控制台和作业本身服务器是完全分离的,控制台并不能控制作业服务器。

可以对作业进行操作。

在这里插入图片描述

在这里插入图片描述

3,事件追踪

官网说明

Elastic-Job 提供了事件追踪功能,可通过事件订阅的方式处理调度过程的重要事件,用于查询、统计和监控。

Elastic-Job-Lite 在配置中提供了 JobEventConfiguration,目前支持数据库方式配置。

BasicDataSource dataSource = new BasicDataSource();
    dataSource.setDriverClassName("com.mysql.jdbc.Driver");
    dataSource.setUrl("jdbc:mysql://localhost:3306/elastic_job_log");
    dataSource.setUsername("root");
    dataSource.setPassword("123456");
JobEventConfiguration jobEventConfig = new JobEventRdbConfiguration(dataSource);
…………
new JobScheduler(regCenter, simpleJobRootConfig, jobEventConfig).init();

事件追踪的 event_trace_rdb_url 属性对应库自动创建 JOB_EXECUTION_LOGJOB_STATUS_TRACE_LOG 两张表以及若干索引。

需要在运维平台中添加数据源信息,并且连接:

在这里插入图片描述

在作业历史中查询:

在这里插入图片描述

五,集成Spring与分片

1,依赖

		<dependency>
			<groupId>com.dangdang</groupId>
			<artifactId>elastic-job-lite-core</artifactId>
			<version>2.1.5</version>
		</dependency>
		<dependency>
			<groupId>com.dangdang</groupId>
			<artifactId>elastic-job-lite-spring</artifactId>
			<version>2.1.5</version>
		</dependency>

2,配置文件

regCenter.serverList=121.xxx.31.160:2181
regCenter.namespace=elastic-job

3,创建任务

/** * @author yhd * @email yinhuidong1@xiaomi.com * @description SimpleJob 简单实现,未经过任何封装。需要实现SimpleJob接口 * @since 2021/3/29 10:20 */
@Component
public class MyEJob implements SimpleJob { 
   

    @Override
    public void execute(ShardingContext shardingContext) { 
   
        System.out.println(String.format("Item: %s | Time: %s | Thread: %s ",shardingContext.getShardingItem(), new SimpleDateFormat("HH:mm:ss").format(new Date()),Thread.currentThread().getId()));
    }
}
/** * @author yhd * @email yinhuidong1@xiaomi.com * @description 处理数据流,必须实现fetchData 和 processData 方法。 * 一个用来获取数据流,一个用来处理数据流。 * @since 2021/3/29 10:23 */
@Component
public class MyDataFlowJob implements DataflowJob<String> { 
   

    /** * @author yhd * @since 2021/3/29 10:25 * @email yinhuidong1@xiaomi.com * @description 获取数据流 * @params * @return */
    @Override
    public List<String> fetchData(ShardingContext shardingContext) { 
   
        return Arrays.asList("zs","ls","ww");
    }

    /** * @author yhd * @since 2021/3/29 10:25 * @email yinhuidong1@xiaomi.com * @description 处理数据流 * @params * @return */
    @Override
    public void processData(ShardingContext shardingContext, List<String> list) { 
   
        list.forEach(System.out::println);
    }
}

4,配置注册中心

Bean 的 initMethod 属性用来指定 Bean 初始化完成之后要执行的方法,用来替代继承 InitializingBean 接口,以便在容器启动的时候创建注册中心。

/** * @author yhd * @email yinhuidong1@xiaomi.com * @description 定时任务注册中心配置 * @since 2021/3/29 11:03 */
@SpringBootConfiguration
//不配置这个的话可能会出现空指针异常
@ConditionalOnExpression("'${regCenter.serverList}'.length() > 0")
public class JobRegistryCenterConfig { 
   

    /** * @author yhd * @since 2021/3/29 11:06 * @email yinhuidong1@xiaomi.com * @description 配置定时任务注册中心 * @params * @return */
    @Bean(initMethod = "init")
    public ZookeeperRegistryCenter regCenter(@Value("${regCenter.serverList}") final String serverList, @Value("${regCenter.namespace}") final String namespace){ 
   
        return new ZookeeperRegistryCenter(new ZookeeperConfiguration(serverList,namespace));
    }
}

5,作业三级配置

Core——Type——Lite

/** * @author yhd * @email yinhuidong1@xiaomi.com * @description 作业三级配置 * @since 2021/3/29 11:20 */
@SpringBootConfiguration
public class MyJobConfig { 
   

    private String cron = "0/5 * * * * ?";
	//分片数
    private Integer shardingTotalCount = 3;

    private String shardingItemParameters = "0=A,1=B,2=C";

    private String jobParameters = "parameter";

    @Resource
    private ZookeeperRegistryCenter regCenter;

    @Resource
    private MyEJob myEJob;

    @Resource
    private MyDataFlowJob myDataFlowJob;

    /** * @return * @author yhd * @email yinhuidong1@xiaomi.com * @description SimpleJob * @params * @since 2021/3/29 16:49 */
    @Bean(initMethod = "init")
    public JobScheduler simpleJobScheduler(DataSource dataSource) { 
   
        return new SpringJobScheduler(
                myEJob,
                regCenter,
                getLiteJobConfiguration(
                        myEJob.getClass(),
                        cron,
                        shardingTotalCount,
                        shardingItemParameters,
                        jobParameters
                ),new JobEventRdbConfiguration(dataSource));
    }



    /** * @return * @author yhd * @email yinhuidong1@xiaomi.com * @description 启动SimpleJob作业 * @params * @since 2021/3/29 16:50 */
    private LiteJobConfiguration getLiteJobConfiguration(final Class<? extends SimpleJob> jobClass,
                                                         final String cron,
                                                         final int shardingTotalCount,
                                                         final String shardingItemParameters,
                                                         final String jobParameters) { 
   
        // 定义作业核心配置
        JobCoreConfiguration simpleCoreConfig = JobCoreConfiguration
                .newBuilder(jobClass.getName(), cron, shardingTotalCount)
                .shardingItemParameters(shardingItemParameters)
                .jobParameter(jobParameters)
                .build();
        // 定义SIMPLE类型配置
        SimpleJobConfiguration simpleJobConfig = new SimpleJobConfiguration(simpleCoreConfig, jobClass.getCanonicalName());
        // 定义Lite作业根配置
        return LiteJobConfiguration
                .newBuilder(simpleJobConfig)
                .overwrite(true)
                .build();

    }

    /** * @return * @author yhd * @email yinhuidong1@xiaomi.com * @description DataFlowJob * @params * @since 2021/3/29 17:10 */
    @Bean(initMethod = "init")
    public JobScheduler dataFlowScheduler() { 
   
        return new SpringJobScheduler(
                myDataFlowJob,
                regCenter,
                getDataFlowJobConfiguration(
                        myDataFlowJob.getClass(),
                        cron,
                        shardingTotalCount,
                        shardingItemParameters,
                        jobParameters
                ));
    }

    /** * @return * @author yhd * @email yinhuidong1@xiaomi.com * @description DataFlowJobConfiguration * @params * @since 2021/3/29 17:10 */
    private LiteJobConfiguration getDataFlowJobConfiguration(Class<? extends MyDataFlowJob> aClass, String cron, Integer shardingTotalCount, String shardingItemParameters, String jobParameters) { 
   


        return LiteJobConfiguration
                .newBuilder(
                        new DataflowJobConfiguration(
                                JobCoreConfiguration
                                        .newBuilder(aClass.getName(), cron, shardingTotalCount)
                                        .shardingItemParameters(shardingItemParameters)
                                        .jobParameter(jobParameters)
                                        .build(),
                                aClass.getCanonicalName(),
                                false))
                .overwrite(true)
                .build();
    }

}

6,分片策略

1)分片项与分片参数

任务分片,是为了实现把一个任务拆分成多个子任务,在不同的 ejob 实例上执行。

例如 100W 条数据,在配置文件中指定分成 10 个子任务(分片项),这 10 个子任务再按照一定的规则分配到 5 个实际运行的服务器上执行。除了直接用分片项 ShardingItem获取分片任务之外,还可以用 item 对应的 parameter 获取任务。

定义几个分片项,一个任务就会有几个线程去运行它。

注意:分片个数和分片参数要一一对应。通常把分片项设置得比 E-Job 服务器个数大一些,比如 3 台服务器,分成 9 片,这样如果有服务器宕机,分片还可以相对均匀。

2)分片验证

多实例运行(单机):

  • 多运行一个点,任务不会重跑(两个节点各获得一个分片项)
  • 关闭一个节点,任务不会漏跑

3)分片策略

官网说明

分片项如何分配到服务器?这个跟分片策略有关。

策略类 描述 具体规则
AverageAllocationJobShardingStrategy 基于平均分配算法的分片策略,也是默认的分片策略。 如果分片不能整除,则不能整除的多余分片将依
次追加到序号小的服务器。如:
 如果有 3 台服务器,分成 9 片,则每台服务
器 分 到 的 分 片 是 : 1=[0,1,2], 2=[3,4,5],
3=[6,7,8]
 如果有 3 台服务器,分成 8 片,则每台服务
器分到的分片是:1=[0,1,6], 2=[2,3,7], 3=[4,5]
 如果有 3 台服务器,分成 10 片,则每台服务
器 分 到 的 分 片 是 : 1=[0,1,2,9], 2=[3,4,5],
3=[6,7,8]
OdevitySortByNameJobShardingStrategy 根据作业名的哈希值奇偶数决定 IP 升降序算法的分片策略。 根据作业名的哈希值奇偶数决定 IP 升降序算法的
分片策略。
 作业名的哈希值为奇数则 IP 升序。
 作业名的哈希值为偶数则 IP 降序。
用于不同的作业平均分配负载至不同的服务器。
RotateServerByNameJobShardingStrategy 根据作业名的哈希值对服务器列表进行轮转的分片策略。
自定义分片策略 实现 JobShardingStrategy 接口并实现 sharding 方
法,接口方法参数为作业服务器 IP 列表和分片策
略选项,分片策略选项包括作业名称,分片总数
以及分片序列号和个性化参数对照表,可以根据
需求定制化自己的分片策略。

AverageAllocationJobShardingStrategy 的缺点是,一旦分片数小于作业服务器数,作业将永远分配至 IP 地址靠前的服务器,导致 IP 地址靠后的服务器空闲。而 OdevitySortByNameJobShardingStrategy 则可以根据作业名称重新分配服务器负载。如:

如果有 3 台服务器,分成 2 片,作业名称的哈希值为奇数,则每台服务器分到的分片是:1=[0], 2=[1], 3=[]

如果有 3 台服务器,分成 2 片,作业名称的哈希值为偶数,则每台服务器分到的分片是:3=[0], 2=[1], 1=[]

在 Lite 配置中指定分片策略

String jobShardingStrategyClass = AverageAllocationJobShardingStrategy.class.getCanonicalName();
LiteJobConfiguration simpleJobRootConfig =
LiteJobConfiguration.newBuilder(simpleJobConfig).jobShardingStrategyClass(jobShardingStrategyClass).build();

4)分片方案

获取到分片项 shardingItem 之后,怎么对数据进行分片?

1、对业务主键进行取模,获取余数等于分片项的数据

举例:获取到的 sharding item 是 0,1

在 SQL 中加入过滤条件:where mod(id, 4) in (1, 2)。

这种方式的缺点:会导致索引失效,查询数据时会全表扫描。

解决方案:在查询条件中在增加一个索引条件进行过滤。

2、在表中增加一个字段,根据分片数生成一个 mod 值。取模的基数要大于机器数。否则在增加机器后,会导致机器空闲。例如取模基数是 2,而服务器有 5 台,那么有三台服务器永远空闲。而取模基数是 10,生成 10 个 shardingItem,可以分配到 5 台服务器。当然,取模基数也可以调整。

3、如果从业务层面,可以用 ShardingParamter 进行分片。

例如 0=RDP, 1=CORE, 2=SIMS, 3=ECIF

List<users> = SELECT * FROM user WHERE status = 0 AND SYSTEM_ID ='RDP' limit 0, 100

在 Spring Boot 中要 Elastic-Job 要配置的内容太多了,有没有更简单的添加任务的方法呢?比如在类上添加一个注解?这个时候我们就要用到 starter 了。

7,自定义ejob-spring-boot-starter

需求 实现 作用
可以在启动类上使用@Enable 功能开启 E-Job 任务调度 注解@EnableElasticJob 在自动配置类上用@ConditionalOnBean决定是否自动配置
可以在 properties 或 yml 中识别配置内容 配置类 RegCenterProperties.java 支 持 在 properties 文 件 中 使 用elasticjob.regCenter 前缀,配置注册中心参数
在类上加上注解,直接创建任务 注解 @JobScheduled 配置任务参数,包括定分片项、分片参数等等
不用创建 ZK 注册中心 自动配置类RegCentreAutoConfiguration.java 注入从 RegCenterProperties.java 读取到的参数,自动创 ZookeeperConfiguration
不用创建三级(Core、Type、Lite)配置 自动配置类JobAutoConfiguration.java 读 取 注 解 的 参 数 , 创 建JobCoreConfiguration 、JobTypeConfiguration 、LiteJobConfiguration在注册中心创建之后再创建
Spring Boot 启动时自动配置 创建Resource/META-INF/spring.factories 指定两个自动配置类

1)pom依赖

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
	<modelVersion>4.0.0</modelVersion>
	<parent>
		<groupId>org.springframework.boot</groupId>
		<artifactId>spring-boot-starter-parent</artifactId>
		<version>2.4.4</version>
		<relativePath/> <!-- lookup parent from repository -->
	</parent>
	<groupId>com.yhd</groupId>
	<artifactId>ejob-spring-boot-starter</artifactId>
	<version>1.0</version>
	<name>ejob-spring-boot-starter</name>
	<description>for the project elastic-job  autoconfiguration</description>
	<properties>
		<java.version>1.8</java.version>
	</properties>
	<dependencies>
		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter</artifactId>
		</dependency>

		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-test</artifactId>
			<scope>test</scope>
		</dependency>

		<dependency>
			<groupId>com.dangdang</groupId>
			<artifactId>elastic-job-lite-core</artifactId>
			<version>2.1.5</version>
		</dependency>
		<dependency>
			<groupId>com.dangdang</groupId>
			<artifactId>elastic-job-lite-spring</artifactId>
			<version>2.1.5</version>
		</dependency>

		<!-- Spring Boot -->
		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-autoconfigure</artifactId>
		</dependency>
		<!-- @ConfigurationProperties annotation processing (metadata for IDEs) -->
		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-configuration-processor</artifactId>
			<optional>true</optional>
		</dependency>

		<dependency>
			<groupId>org.projectlombok</groupId>
			<artifactId>lombok</artifactId>
		</dependency>
		<dependency>
			<groupId>cn.hutool</groupId>
			<artifactId>hutool-all</artifactId>
			<version>5.5.0</version>
		</dependency>
	</dependencies>


</project>

2)resources/META-INF/spring.factories

org.springframework.boot.autoconfigure.EnableAutoConfiguration=\
com.yhd.ejobspringbootstarter.autoconfigure.JobAutoConfiguration,\
com.yhd.ejobspringbootstarter.autoconfigure.RegCentreAutoConfiguration

3)annotation

/** * @author yhd * @since 2021/3/30 16:47 * @email yinhuidong1@xiaomi.com * @description 开启 e-job 自动配置功能 * @params * @return */
@Target({ 
   ElementType.TYPE})
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Inherited
public @interface EnableElasticJob { 
   

}
/** * @author yhd * @since 2021/3/30 16:48 * @email yinhuidong1@xiaomi.com * @description 表示这是一个任务类,可以通过注解的方式对任务类进行配置 * @params * @return */
@Component
@Target({ 
   ElementType.TYPE})
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Inherited
public @interface JobScheduled { 
   

    /** * 作业名称 * @return */
    String jobName();

    /** * cron表达式,用于控制作业触发时间 * @return */
    String cron() default "";

    /** * 作业分片总数 * @return */
    int shardingTotalCount() default 1;

    /** * 分片序列号和参数用等号分隔,多个键值对用逗号分隔 * <p>分片序列号从0开始,不可大于或等于作业分片总数<p> * <p>如:<p> * <p>0=a,1=b,2=c<p> * @return */
    String shardingItemParameters() default "";

    /** * 作业自定义参数 * <p>作业自定义参数,可通过传递该参数为作业调度的业务方法传参,用于实现带参数的作业<p> * <p>例:每次获取的数据量、作业实例从数据库读取的主键等<p> * @return */
    String jobParameter() default "";

    /** * 是否开启任务执行失效转移,开启表示如果作业在一次任务执行中途宕机,允许将该次未完成的任务在另一作业节点上补偿执行 * @return */
    boolean failover() default false;

    /** * 是否开启错过任务重新执行 * @return */
    boolean misfire() default false;

    /** * 作业是否禁止启动,可用于部署作业时,先禁止启动,部署结束后统一启动 * @return */
    boolean disabled() default false;

    boolean overwrite() default false;

    /** * 作业描述信息 * @return */
    String description() default "";

    boolean streamingProcess() default false;

    String scriptCommandLine() default "";

    boolean monitorExecution() default true;

    int monitorPort() default -1;

    int maxTimeDiffSeconds() default -1;

    String jobShardingStrategyClass() default "";


}

4)数据模型

/** * @author yhd * @email yinhuidong1@xiaomi.com * @description TODO * @since 2021/3/30 20:22 */
@Data
@NoArgsConstructor
@AllArgsConstructor
@Accessors(chain = true)
public class Prop implements Serializable { 
   

    String jobClass;
    String jobName;
    String cron;
    int shardingTotalCount;
    String shardingItemParameters;
    String jobParameter;
    String description;
    String jobShardingStrategyClass;
    String scriptCommandLine;
    boolean failover;
    boolean misfire;
    boolean overwrite;
    boolean disabled;
    boolean monitorExecution;
    boolean streamingProcess;
    int monitorPort;
    int maxTimeDiffSeconds;

}

5)配置文件映射类

/** * @author yhd * @email yinhuidong1@xiaomi.com * @description 配置文件可配置参数 * @since 2021/3/30 16:51 */
@Data
@ConfigurationProperties(prefix = "ejob.regcenter")
public class RegCenterProperties { 
   
    /** * @author yhd * @email yinhuidong1@xiaomi.com * @description 连接Zookeeper服务器的列表. 包括IP地址和端口号. 多个地址用逗号分隔. 如: host1:2181,host2:2181 * @params * @return * @since 2021/3/30 20:10 */
    private String serverLists;

    /** * @author yhd * @email yinhuidong1@xiaomi.com * @description 命名空间. * @params * @return * @since 2021/3/30 20:10 */
    private String namespace;

    /** * @author yhd * @email yinhuidong1@xiaomi.com * @description 等待重试的间隔时间的初始值. 单位毫秒. * @params * @return * @since 2021/3/30 20:11 */
    private int baseSleepTimeMilliseconds = 1000;

    /** * @author yhd * @email yinhuidong1@xiaomi.com * @description 等待重试的间隔时间的最大值. 单位毫秒. * @params * @return * @since 2021/3/30 20:11 */
    private int maxSleepTimeMilliseconds = 3000;

    /** * @author yhd * @email yinhuidong1@xiaomi.com * @description 最大重试次数 * @params * @return * @since 2021/3/30 20:11 */
    private int maxRetries = 3;

    /** * @author yhd * @email yinhuidong1@xiaomi.com * @description 会话超时时间. 单位毫秒 * @params * @return * @since 2021/3/30 20:11 */
    private int sessionTimeoutMilliseconds;

    /** * @author yhd * @email yinhuidong1@xiaomi.com * @description 连接超时时间. 单位毫秒. * @params * @return * @since 2021/3/30 20:11 */
    private int connectionTimeoutMilliseconds;

    private String digest;

}

6)注册中心自动配置类

/** * @author yhd * @email yinhuidong1@xiaomi.com * @description Zookeeper自动配置类 * @since 2021/3/30 17:00 */
@Configuration
@ConditionalOnClass(ZookeeperRegistryCenter.class)
@EnableConfigurationProperties(RegCenterProperties.class)
public class RegCentreAutoConfiguration { 
   

    @Resource
    private RegCenterProperties regCenterProperties;

    @Bean(initMethod = "init")
    public ZookeeperRegistryCenter registryCenter() { 
   
        ZookeeperConfiguration zkConfig = new ZookeeperConfiguration(regCenterProperties.getServerLists(),
                regCenterProperties.getNamespace());
        zkConfig.setBaseSleepTimeMilliseconds(regCenterProperties.getBaseSleepTimeMilliseconds());
        zkConfig.setConnectionTimeoutMilliseconds(regCenterProperties.getConnectionTimeoutMilliseconds());
        zkConfig.setDigest(regCenterProperties.getDigest());
        zkConfig.setMaxRetries(regCenterProperties.getMaxRetries());
        zkConfig.setMaxSleepTimeMilliseconds(regCenterProperties.getMaxSleepTimeMilliseconds());
        zkConfig.setSessionTimeoutMilliseconds(regCenterProperties.getSessionTimeoutMilliseconds());
        return new ZookeeperRegistryCenter(zkConfig);
    }


}

7)定时任务自动配置类

/** * @author yhd * @email yinhuidong1@xiaomi.com * @description 定时任务自动配置类 * @since 2021/3/30 16:56 */
@Slf4j
@SpringBootConfiguration
@ConditionalOnBean(annotation = EnableElasticJob.class)
@ConditionalOnClass({ 
   SimpleJob.class, DataflowJob.class})
@AutoConfigureAfter(RegCentreAutoConfiguration.class)
public class JobAutoConfiguration { 
   

    private final String prefix = "ejob";

    @Resource
    private ZookeeperRegistryCenter registryCenter;

    @Resource
    private ApplicationContext ioc;

    private Environment environment;

    private final AtomicInteger counter = new AtomicInteger();

    @PostConstruct
    public void init() { 
   
        this.environment = ioc.getEnvironment();
        log.info("scan JobScheduled Annotation start");
        Map<String, Object> beanMap = ioc.getBeansWithAnnotation(JobScheduled.class);
        if (!CollectionUtils.isEmpty(beanMap)) { 
   
            beanMap.forEach(this::initElasticJobBean);
        }
    }

    /** * @return * @author yhd * @email yinhuidong1@xiaomi.com * @description 处理每一个定时任务类 * @params * @since 2021/3/30 20:16 */
    private void initElasticJobBean(String beanName, Object bean) throws BeansException { 
   
        Class<?> clazz = bean.getClass();
        JobScheduled jobScheduled = clazz.getAnnotation(JobScheduled.class);

        Prop prop = getProp(clazz, jobScheduled);

        JobCoreConfiguration coreConfig = getCoreConfig(prop);

        LiteJobConfiguration jobConfig = getJobConfig(prop, getTypeConfig(clazz, prop, coreConfig));


        BeanDefinitionBuilder builder = this.builder(clazz, bean, jobConfig);

        postProcessor(prop, builder);
    }

    private void postProcessor(Prop prop, BeanDefinitionBuilder builder) { 
   

        DefaultListableBeanFactory beanFactory = (DefaultListableBeanFactory) ioc.getAutowireCapableBeanFactory();
        String jobSchedulerBeanName = String.format("%s_Scheduler_%s", prop.getJobClass(), counter.incrementAndGet());
        while (beanFactory.containsBeanDefinition(jobSchedulerBeanName)) { 
   
            jobSchedulerBeanName = String.format("%s_Scheduler_%s", prop.getJobClass(), counter.incrementAndGet());
        }
        if (log.isDebugEnabled()) { 
   
            log.debug("Add JobScheduler bean:{} for job:{}", jobSchedulerBeanName, prop.getJobClass());
        }
        beanFactory.registerBeanDefinition(jobSchedulerBeanName, builder.getBeanDefinition());
        beanFactory.getBean(jobSchedulerBeanName);
    }

    /** * @return * @author yhd * @email yinhuidong1@xiaomi.com * @description 构建SpringJobScheduler对象来初始化任务 * @params * @since 2021/3/30 20:55 */
    private BeanDefinitionBuilder builder(Class<?> clazz, Object bean, LiteJobConfiguration jobConfig) { 
   
        BeanDefinitionBuilder builder = BeanDefinitionBuilder.rootBeanDefinition(SpringJobScheduler.class);
        builder.setInitMethodName("init");
        builder.setScope(BeanDefinition.SCOPE_PROTOTYPE);
        if (ScriptJob.class.isAssignableFrom(clazz)) { 
   
            builder.addConstructorArgValue(null);
        } else { 
   
            builder.addConstructorArgValue(bean);
        }
        builder.addConstructorArgValue(registryCenter);
        builder.addConstructorArgValue(jobConfig);
        builder.addConstructorArgValue(Collections.emptyList());
        return builder;
    }

    /** * @return * @author yhd * @email yinhuidong1@xiaomi.com * @description 属性封装 * @params * @since 2021/3/30 20:44 */
    private Prop getProp(Class<?> clazz, JobScheduled jobScheduled) { 
   
        Prop prop = new Prop();

        prop.setJobClass(clazz.getName()).setJobName(jobScheduled.jobName()).setCron(getEnvironmentStringValue(prop.getJobName(), "cron", jobScheduled.cron())).setShardingTotalCount(getEnvironmentIntValue(prop.getJobName(), "shardingTotalCount", jobScheduled.shardingTotalCount())).setShardingItemParameters(getEnvironmentStringValue(prop.getJobName(), "shardingItemParameters", jobScheduled.shardingItemParameters())).setJobParameter(getEnvironmentStringValue(prop.getJobName(), "jobParameter", jobScheduled.jobParameter())).setDescription(getEnvironmentStringValue(prop.getJobName(), "description", jobScheduled.description())).setJobShardingStrategyClass(getEnvironmentStringValue(prop.getJobName(), "jobShardingStrategyClass", jobScheduled.jobShardingStrategyClass())).setScriptCommandLine(getEnvironmentStringValue(prop.getJobName(), "scriptCommandLine", jobScheduled.scriptCommandLine())).setFailover(getEnvironmentBooleanValue(prop.getJobName(), "failover", jobScheduled.failover())).setMisfire(getEnvironmentBooleanValue(prop.getJobName(), "misfire", jobScheduled.misfire())).setOverwrite(getEnvironmentBooleanValue(prop.getJobName(), "overwrite", jobScheduled.overwrite())).setDisabled(getEnvironmentBooleanValue(prop.getJobName(), "disabled", jobScheduled.disabled())).setMonitorExecution(getEnvironmentBooleanValue(prop.getJobName(), "monitorExecution", jobScheduled.monitorExecution())).setStreamingProcess(getEnvironmentBooleanValue(prop.getJobName(), "streamingProcess", jobScheduled.streamingProcess())).setMonitorPort(getEnvironmentIntValue(prop.getJobName(), "monitorPort", jobScheduled.monitorPort())).setMaxTimeDiffSeconds(getEnvironmentIntValue(prop.getJobName(), "maxTimeDiffSeconds", jobScheduled.maxTimeDiffSeconds()));
        return prop;
    }

    /** * @return * @author yhd * @email yinhuidong1@xiaomi.com * @description 核心配置 * @params * @since 2021/3/30 20:44 */
    private JobCoreConfiguration getCoreConfig(Prop prop) { 
   
        return JobCoreConfiguration.newBuilder(prop.getJobName(), prop.getCron(), prop.getShardingTotalCount()).shardingItemParameters(prop.getShardingItemParameters()).description(prop.getDescription()).failover(prop.isFailover()).jobParameter(prop.getJobParameter()).misfire(prop.isMisfire()).build();
    }

    /** * @return * @author yhd * @email yinhuidong1@xiaomi.com * @description 不同类型的任务配置处理 * @params * @since 2021/3/30 20:47 */
    private JobTypeConfiguration getTypeConfig(Class<?> clazz, Prop prop, JobCoreConfiguration coreConfig) { 
   
        JobTypeConfiguration typeConfig = null;
        if (SimpleJob.class.isAssignableFrom(clazz)) { 
   
            typeConfig = new SimpleJobConfiguration(coreConfig, prop.getJobClass());
        } else if (DataflowJob.class.isAssignableFrom(clazz)) { 
   
            typeConfig = new DataflowJobConfiguration(coreConfig, prop.getJobClass(), prop.isStreamingProcess());
        } else if (ScriptJob.class.isAssignableFrom(clazz)) { 
   
            typeConfig = new ScriptJobConfiguration(coreConfig, prop.getScriptCommandLine());
        }
        return typeConfig;
    }

    /** * @return * @author yhd * @email yinhuidong1@xiaomi.com * @description TODO * @params * @since 2021/3/30 20:50 */
    private LiteJobConfiguration getJobConfig(Prop prop, JobTypeConfiguration typeConfig) { 
   
        return LiteJobConfiguration.newBuilder(typeConfig).overwrite(prop.isOverwrite()).disabled(prop.isDisabled()).monitorPort(prop.getMonitorPort()).monitorExecution(prop.isMonitorExecution()).maxTimeDiffSeconds(prop.getMaxTimeDiffSeconds()).jobShardingStrategyClass(prop.getJobShardingStrategyClass()).build();
    }

    /** * @param fieldName 属性名称 * @param defaultValue 默认值 * @return * @author yhd * @email yinhuidong1@xiaomi.com * @description 获取配置中的任务属性值,environment没有就用注解中的值 * @params jobName 任务名称 * @since 2021/3/30 20:17 */
    private String getEnvironmentStringValue(String jobName, String fieldName, String defaultValue) { 
   
        String key = prefix + jobName + "." + fieldName;
        String value = environment.getProperty(key);
        if (StringUtils.hasText(value)) { 
   
            return value;
        }
        return defaultValue;
    }

    private int getEnvironmentIntValue(String jobName, String fieldName, int defaultValue) { 
   
        String key = prefix + jobName + "." + fieldName;
        String value = environment.getProperty(key);
        if (StringUtils.hasText(value)) { 
   
            return Integer.parseInt(value);
        }
        return defaultValue;
    }

    private long getEnvironmentLongValue(String jobName, String fieldName, long defaultValue) { 
   
        String key = prefix + jobName + "." + fieldName;
        String value = environment.getProperty(key);
        if (StringUtils.hasText(value)) { 
   
            return Long.parseLong(value);
        }
        return defaultValue;
    }

    private boolean getEnvironmentBooleanValue(String jobName, String fieldName, boolean defaultValue) { 
   
        String key = prefix + jobName + "." + fieldName;
        String value = environment.getProperty(key);
        if (StringUtils.hasText(value)) { 
   
            return Boolean.parseBoolean(value);
        }
        return defaultValue;
    }
}

六,E-Job原理

1,启动

    public static void main(String[] args) { 
   
        ZookeeperRegistryCenter center = new ZookeeperRegistryCenter(new ZookeeperConfiguration("121.199.31.160:2181", "e-job"));
        center.init();
        JobCoreConfiguration simpleCoreConfig = JobCoreConfiguration.newBuilder("myjob", "0/2 * * * * ?", 1).build();
        SimpleJobConfiguration simpleJobConfiguration = new SimpleJobConfiguration(simpleCoreConfig, MyJob.class.getCanonicalName());
        LiteJobConfiguration liteJobConfiguration = LiteJobConfiguration.newBuilder(simpleJobConfiguration).build();
        new JobScheduler(center,liteJobConfiguration).init();

    }

init()

    /** * 初始化作业. */
    public void init() { 
   
        //更新作业配置,并返回更新后的作业配置
        LiteJobConfiguration liteJobConfigFromRegCenter = schedulerFacade.updateJobConfiguration(liteJobConfig);
        //设置当前分片总数
        JobRegistry.getInstance().setCurrentShardingTotalCount(liteJobConfigFromRegCenter.getJobName(), liteJobConfigFromRegCenter.getTypeConfig().getCoreConfig().getShardingTotalCount());
        //构建任务,创建调度器
        JobScheduleController jobScheduleController = new JobScheduleController(
                createScheduler(), createJobDetail(liteJobConfigFromRegCenter.getTypeConfig().getJobClass()), liteJobConfigFromRegCenter.getJobName());
        //在 ZK 上注册任务
        JobRegistry.getInstance().registerJob(liteJobConfigFromRegCenter.getJobName(), jobScheduleController, regCenter);
        //添加任务信息并进行节点选取
        schedulerFacade.registerStartUpInfo(!liteJobConfigFromRegCenter.isDisabled());
        //启动调度器
        jobScheduleController.scheduleJob(liteJobConfigFromRegCenter.getTypeConfig().getCoreConfig().getCron());
    }

registerStartUpInfo()-添加任务信息并进行节点选取

    /** * 注册作业启动信息. * * @param enabled 作业是否启用 */
    public void registerStartUpInfo(final boolean enabled) { 
   
        //开启所有监听器
        listenerManager.startAllListeners();
        //选举主节点
        leaderService.electLeader();
        //服务信息持久化到zk
        serverService.persistOnline(enabled);
        //实例信息持久化到zk
        instanceService.persistOnline();
        //设置需要重新分片的标记
        shardingService.setReshardingFlag();
        //初始化作业监听服务
        monitorService.listen();
        //自诊断修复,使本地节点与 ZK 数据一致
        if (!reconcileService.isRunning()) { 
   
            reconcileService.startAsync();
        }
    }

监听器用于监听Zk节点的变化。

electLeader()-启动的时候进行节点选择

    /** * 选举主节点. */
    public void electLeader() { 
   
        log.debug("Elect a new leader now.");
        jobNodeStorage.executeInLeader(LeaderNode.LATCH, new LeaderElectionExecutionCallback());
        log.debug("Leader election completed.");
    }

Latch 是一个分布式锁,选举成功后在 instance 写入服务器信息。

查看LeaderElectionExecutionCallback

    @RequiredArgsConstructor
    class LeaderElectionExecutionCallback implements LeaderExecutionCallback { 
   
        
        @Override
        public void execute() { 
   
            if (!hasLeader()) { 
   
                //填充临时节点数据
                jobNodeStorage.fillEphemeralJobNode(LeaderNode.INSTANCE, JobRegistry.getInstance().getJobInstance(jobName).getJobInstanceId());
            }
        }
    }

executeInLeade()-选主逻辑

    /** * 在主节点执行操作. * * @param latchNode 分布式锁使用的作业节点名称 * @param callback 执行操作的回调 */
    public void executeInLeader(final String latchNode, final LeaderExecutionCallback callback) { 
   
        try (LeaderLatch latch = new LeaderLatch(getClient(), jobNodePath.getFullPath(latchNode))) { 
   
            //step into
            latch.start();
            latch.await();
            callback.execute();
        //CHECKSTYLE:OFF
        } catch (final Exception ex) { 
   
        //CHECKSTYLE:ON
            handleException(ex);
        }
    }

调用了 curator 的逻辑进行选主。

latch.start()

/** * Add this instance to the leadership election and attempt to acquire leadership. * * @throws Exception errors */
public void start() throws Exception
{ 
   
    Preconditions.checkState(state.compareAndSet(State.LATENT, State.STARTED), "Cannot be started more than once");

    startTask.set(AfterConnectionEstablished.execute(client, new Runnable()
            { 
   
                @Override
                public void run()
                { 
   
                    try
                    { 
   
                        internalStart();
                    }
                    finally
                    { 
   
                        startTask.set(null);
                    }
                }
            }));
}

persistOnline()-服务信息持久化

    /** * 持久化作业服务器上线信息. * * @param enabled 作业是否启用 */
    public void persistOnline(final boolean enabled) { 
   
        if (!JobRegistry.getInstance().isShutdown(jobName)) { 
   
            jobNodeStorage.fillJobNode(serverNode.getServerNode(JobRegistry.getInstance().getJobInstance(jobName).getIp()), enabled ? "" : ServerStatus.DISABLED.name());
        }
    }

填充的是servers数据

persistOnline()-实例信息持久化

    /** * 持久化作业运行实例上线相关信息. */
    public void persistOnline() { 
   
        jobNodeStorage.fillEphemeralJobNode(instanceNode.getLocalInstanceNode(), "");
    }

填充的是instance节点

2,任务执行与分片原理

  • LiteJob 是怎么被执行的?
  • 分片项是怎么分配给不同的服务实例的?

在创建 Job 的时候(createJobDetail),创建的是实现了 Quartz 的 Job 接口的LiteJob 类,LiteJob 类实现了 Quartz 的 Job 接口。

    private JobDetail createJobDetail(final String jobClass) { 
   
        JobDetail result = JobBuilder.newJob(LiteJob.class).withIdentity(liteJobConfig.getJobName()).build();
        result.getJobDataMap().put(JOB_FACADE_DATA_MAP_KEY, jobFacade);
        Optional<ElasticJob> elasticJobInstance = createElasticJobInstance();
        if (elasticJobInstance.isPresent()) { 
   
            result.getJobDataMap().put(ELASTIC_JOB_DATA_MAP_KEY, elasticJobInstance.get());
        } else if (!jobClass.equals(ScriptJob.class.getCanonicalName())) { 
   
            try { 
   
                result.getJobDataMap().put(ELASTIC_JOB_DATA_MAP_KEY, Class.forName(jobClass).newInstance());
            } catch (final ReflectiveOperationException ex) { 
   
                throw new JobConfigurationException("Elastic-Job: Job class '%s' can not initialize.", jobClass);
            }
        }
        return result;
    }

在 LiteJob 的 execute 方法中获取对应类型的执行器,调用 execute()方法。

/** * Lite调度作业. * * @author zhangliang */
public final class LiteJob implements Job { 
   
    
    @Setter
    private ElasticJob elasticJob;
    
    @Setter
    private JobFacade jobFacade;
    
    @Override
    public void execute(final JobExecutionContext context) throws JobExecutionException { 
   
        JobExecutorFactory.getJobExecutor(elasticJob, jobFacade).execute();
    }
}

EJOB 提供管理任务执行器的抽象类 AbstractElasticJobExecutor,核心动作在execute()方法中执行。

    /** * 获取作业执行器. * * @param elasticJob 分布式弹性作业 * @param jobFacade 作业内部服务门面服务 * @return 作业执行器 */
    @SuppressWarnings("unchecked")
    public static AbstractElasticJobExecutor getJobExecutor(final ElasticJob elasticJob, final JobFacade jobFacade) { 
   
        if (null == elasticJob) { 
   
            return new ScriptJobExecutor(jobFacade);
        }
        if (elasticJob instanceof SimpleJob) { 
   
            return new SimpleJobExecutor((SimpleJob) elasticJob, jobFacade);
        }
        if (elasticJob instanceof DataflowJob) { 
   
            return new DataflowJobExecutor((DataflowJob) elasticJob, jobFacade);
        }
        throw new JobConfigurationException("Cannot support job type '%s'", elasticJob.getClass().getCanonicalName());
    }

AbstractElasticJobExecutor的execute()中调用了execute()

execute(shardingContexts, JobExecutionEvent.ExecutionSource.NORMAL_TRIGGER);

点进去,又调用了process()

process(shardingContexts, executionSource);
    private void process(final ShardingContexts shardingContexts, final JobExecutionEvent.ExecutionSource executionSource) { 
   
        Collection<Integer> items = shardingContexts.getShardingItemParameters().keySet();
        //只有一个分片时,直接执行
        if (1 == items.size()) { 
   
            int item = shardingContexts.getShardingItemParameters().keySet().iterator().next();
            JobExecutionEvent jobExecutionEvent =  new JobExecutionEvent(shardingContexts.getTaskId(), jobName, executionSource, item);
            process(shardingContexts, item, jobExecutionEvent);
            return;
        }
        final CountDownLatch latch = new CountDownLatch(items.size());
        //当前节点遍历执行相应的分片信息
        for (final int each : items) { 
   
            final JobExecutionEvent jobExecutionEvent = new JobExecutionEvent(shardingContexts.getTaskId(), jobName, executionSource, each);
            if (executorService.isShutdown()) { 
   
                return;
            }
            
            executorService.submit(new Runnable() { 
   
                
                @Override
                public void run() { 
   
                    try { 
   
                        //交给具体的实现类(SimpleJobExecutor、DataflowJobExecutor、ScriptJobExecutor)去处理。
                        process(shardingContexts, each, jobExecutionEvent);
                    } finally { 
   
                        latch.countDown();
                    }
                }
            });
        }
        try { 
   
            //等待所有分片项执行完毕
            latch.await();
        } catch (final InterruptedException ex) { 
   
            Thread.currentThread().interrupt();
        }
    }

又调用了另一个 process()方法,交给具体的实现类(SimpleJobExecutor、DataflowJobExecutor、ScriptJobExecutor)去处理。最终调用到任务类。

    @Override
    protected void process(final ShardingContext shardingContext) { 
   
        simpleJob.execute(shardingContext);
    }

3,失效转移

所谓失效转移,就是在执行任务的过程中发生异常时,这个分片任务可以在其他节点再次执行。

JobCoreConfiguration simpleCoreConfig = JobCoreConfiguration.newBuilder("myjob", "0/2 * * * * ?", 1).failover(true).build();

FailoverListenerManager 监听的是 zk 的 instance 节点删除事件。如果任务配置了 failover 等于 true,其中某个 instance 与 zk 失去联系或被删除,并且失效的节点又不是本身,就会触发失效转移逻辑。

Job 的失效转移监听来源于 FailoverListenerManager 中内部类 JobCrashedJobListener 的 dataChanged 方法。

当节点任务失效时会调用 JobCrashedJobListener 监听器,此监听器会根据实例 id获取所有的分片,然后调用 FailoverService 的 setCrashedFailoverFlag 方法,将每个分片 id 写到/jobName/leader/failover/items 下,例如原来的实例负责 1、2 分片项,那么 items 节点就会写入 1、2,代表这两个分片项需要失效转移。

protected void dataChanged(final String path, final Type eventType, final String data) { 
   
    if (isFailoverEnabled() && Type.NODE_REMOVED == eventType && instanceNode.isInstancePath(path)) { 
   
        String jobInstanceId = path.substring(instanceNode.getInstanceFullPath().length() + 1);
        if (jobInstanceId.equals(JobRegistry.getInstance().getJobInstance(jobName).getJobInstanceId())) { 
   
            return;
        }
        List<Integer> failoverItems = failoverService.getFailoverItems(jobInstanceId);
        if (!failoverItems.isEmpty()) { 
   
            for (int each : failoverItems) { 
   
                //设置失效的分片项标记
                failoverService.setCrashedFailoverFlag(each);
                failoverService.failoverIfNecessary();
            }
        } else { 
   
            for (int each : shardingService.getShardingItems(jobInstanceId)) { 
   
                failoverService.setCrashedFailoverFlag(each);
                failoverService.failoverIfNecessary();
            }
        }
    }
}

然后接下来调用 FailoverService 的 failoverIfNessary 方法,首先判断是否需要失败转移,如果可以需要则只需作业失败转移。

    /** * 如果需要失效转移, 则执行作业失效转移. */
    public void failoverIfNecessary() { 
   
        if (needFailover()) { 
   
            jobNodeStorage.executeInLeader(FailoverNode.LATCH, new FailoverLeaderExecutionCallback());
        }
    }

条件一:${JOB_NAME}/leader/failover/items/${ITEM_ID} 有失效转移的作业分片项。

条件二:当前作业不在运行中。

private boolean needFailover() { 
   
    return jobNodeStorage.isJobNodeExisted(FailoverNode.ITEMS_ROOT) && !jobNodeStorage.getJobNodeChildrenKeys(FailoverNode.ITEMS_ROOT).isEmpty()
        && !JobRegistry.getInstance().isJobRunning(jobName);
}

在主节点执行操作

    /** * 在主节点执行操作. * * @param latchNode 分布式锁使用的作业节点名称 * @param callback 执行操作的回调 */
    public void executeInLeader(final String latchNode, final LeaderExecutionCallback callback) { 
   
        try (LeaderLatch latch = new LeaderLatch(getClient(), jobNodePath.getFullPath(latchNode))) { 
   
            latch.start();
            latch.await();
            callback.execute();
        //CHECKSTYLE:OFF
        } catch (final Exception ex) { 
   
        //CHECKSTYLE:ON
            handleException(ex);
        }
    }
class FailoverLeaderExecutionCallback implements LeaderExecutionCallback { 
   

    @Override
    public void execute() { 
   
        //判断是否需要失效转移
        if (JobRegistry.getInstance().isShutdown(jobName) || !needFailover()) { 
   
            return;
        }
        //从${JOB_NAME}/leader/failover/items/${ITEM_ID}获取到一个分片项
        int crashedItem = Integer.parseInt(jobNodeStorage.getJobNodeChildrenKeys(FailoverNode.ITEMS_ROOT).get(0));
        // 在注册中心节点`${JOB_NAME}/sharding/${ITEM_ID}/failover`注册作业分片项为当前作业节点
        jobNodeStorage.fillEphemeralJobNode(FailoverNode.getExecutionFailoverNode(crashedItem), JobRegistry.getInstance().getJobInstance(jobName).getJobInstanceId());
        //移除任务转移分片项
        jobNodeStorage.removeJobNodeIfExisted(FailoverNode.getItemsNode(crashedItem));
        // TODO 不应使用triggerJob, 而是使用executor统一调度
        JobScheduleController jobScheduleController = JobRegistry.getInstance().getJobScheduleController(jobName);
        if (null != jobScheduleController) { 
   
            //提交任务
            jobScheduleController.triggerJob();
        }
    }
}
    /** * 立刻启动作业. */
    public synchronized void triggerJob() { 
   
        try { 
   
            if (!scheduler.isShutdown()) { 
   
                scheduler.triggerJob(jobDetail.getKey());
            }
        } catch (final SchedulerException ex) { 
   
            throw new JobSystemException(ex);
        }
    }

作业和消息中间件的区别是:作业是时间驱动的产品,消息中间件是事件驱动的产品。

架构君码字不易,如需转载,请注明出处:https://javajgs.com/archives/210365
0
   

发表评论