赞
踩
本文对springBoot 整合 elasticjob 进行介绍。 本文环境 jdk:1.8 ; zookeeper : 3.7
<!-- https://mvnrepository.com/artifact/org.apache.shardingsphere.elasticjob/elasticjob-lite-core --> <dependency> <groupId>org.apache.shardingsphere.elasticjob</groupId> <artifactId>elasticjob-lite-core</artifactId> <!-- <version>3.0.4</version>--> <version>3.0.1</version> </dependency> <dependency> <groupId>org.yaml</groupId> <artifactId>snakeyaml</artifactId> <version>1.27</version> </dependency> <dependency> <groupId>ch.qos.logback</groupId> <artifactId>logback-classic</artifactId> <version>1.2.12</version> </dependency>
ElasticJobZookeeper:
import org.apache.shardingsphere.elasticjob.reg.base.CoordinatorRegistryCenter; import org.apache.shardingsphere.elasticjob.reg.zookeeper.ZookeeperConfiguration; import org.apache.shardingsphere.elasticjob.reg.zookeeper.ZookeeperRegistryCenter; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @Configuration public class ElasticJobZookeeper { @Bean(initMethod = "init") public CoordinatorRegistryCenter createRegistryCenter() { ZookeeperConfiguration zookeeperConfiguration = new ZookeeperConfiguration("localhost:2181", "elastic-job"); zookeeperConfiguration.setConnectionTimeoutMilliseconds(10000); zookeeperConfiguration.setSessionTimeoutMilliseconds(10000); zookeeperConfiguration.setMaxSleepTimeMilliseconds(10000); CoordinatorRegistryCenter regCenter = new ZookeeperRegistryCenter(zookeeperConfiguration); return regCenter; } }
MyJob:
import lombok.extern.slf4j.Slf4j; import org.apache.shardingsphere.elasticjob.api.ShardingContext; import org.apache.shardingsphere.elasticjob.simple.job.SimpleJob; import org.springframework.stereotype.Component; @Slf4j @Component public class MyJob implements SimpleJob { @Override public void execute(ShardingContext shardingContext) { // 分片参数 0=text,1=image,2=radio,3=vedio String shardingParameter= shardingContext.getShardingParameter(); String jobParameter= shardingContext.getJobParameter(); log.debug("job 执行 error,job名称:{},分片数量:{},分片:{},分片参数:{},jobParamer:{}", shardingContext.getJobName(), shardingContext.getShardingTotalCount(), shardingContext.getShardingItem(), shardingParameter,jobParameter); if ("text".equals(jobParameter)) { // do something by sharding } switch (shardingContext.getShardingItem()) { case 0: // do something by sharding item 0 break; case 1: // do something by sharding item 1 break; case 2: // do something by sharding item 2 break; // case n: ... } } }
MyJob1:
import lombok.extern.slf4j.Slf4j; import org.apache.shardingsphere.elasticjob.api.ShardingContext; import org.apache.shardingsphere.elasticjob.simple.job.SimpleJob; import org.springframework.stereotype.Component; @Slf4j @Component public class MyJob1 implements SimpleJob { @Override public void execute(ShardingContext shardingContext) { log.debug("job 执行 error,job名称:{},分片数量:{}",shardingContext.getJobName(),shardingContext.getShardingTotalCount()); switch (shardingContext.getShardingItem()) { case 0: // do something by sharding item 0 break; case 1: // do something by sharding item 1 break; case 2: // do something by sharding item 2 break; // case n: ... } } }
ElasticJobConfigure
import com.example.springelasticjob.quickstart.MyJob; import com.example.springelasticjob.quickstart.MyJob1; import org.apache.shardingsphere.elasticjob.api.JobConfiguration; import org.apache.shardingsphere.elasticjob.lite.api.bootstrap.impl.ScheduleJobBootstrap; import org.apache.shardingsphere.elasticjob.reg.base.CoordinatorRegistryCenter; import org.apache.shardingsphere.elasticjob.simple.job.SimpleJob; import org.springframework.beans.factory.InitializingBean; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Configuration; @Configuration public class ElasticJobConfigure implements InitializingBean { @Autowired private MyJob myJob; @Autowired private MyJob1 myJob1; @Autowired private CoordinatorRegistryCenter coordinatorRegistryCenter; public JobConfiguration createJobConfiguration(Class<? extends SimpleJob> JobClass, int shardingTotalCount, String cron, String shardingItemParameters) { // 创建作业配置 JobConfiguration jobConfiguration = JobConfiguration.newBuilder(JobClass.getName(), shardingTotalCount).cron(cron).overwrite(true) .shardingItemParameters(shardingItemParameters).jobListenerTypes().build(); return jobConfiguration; } @Override public void afterPropertiesSet() throws Exception { JobConfiguration jobConfiguration = createJobConfiguration(myJob.getClass(), 1, "0/10 * * * * ?", null); new ScheduleJobBootstrap(coordinatorRegistryCenter, myJob, jobConfiguration).schedule(); JobConfiguration jobConfiguration1 = createJobConfiguration(myJob1.getClass(), 1, "0/1 * * * * ?", null); new ScheduleJobBootstrap(coordinatorRegistryCenter, myJob1, jobConfiguration1).schedule(); } }
import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang.time.DateFormatUtils; import org.apache.shardingsphere.elasticjob.infra.listener.ElasticJobListener; import org.apache.shardingsphere.elasticjob.infra.listener.ShardingContexts; import java.util.Date; @Slf4j public class MyElasticJobListener implements ElasticJobListener { private long beginTime = 0; @Override public void beforeJobExecuted(ShardingContexts shardingContexts) { beginTime = System.currentTimeMillis(); log.info("===>{} MyElasticJobListener BEGIN TIME: {} <===",shardingContexts.getJobName(), DateFormatUtils.format(new Date(), "yyyy-MM-dd HH:mm:ss")); } @Override public void afterJobExecuted(ShardingContexts shardingContexts) { long endTime = System.currentTimeMillis(); log.info("===>{} MyElasticJobListener END TIME: {},TOTAL CAST: {} <===",shardingContexts.getJobName(), DateFormatUtils.format(new Date(), "yyyy-MM-dd HH:mm:ss"), endTime - beginTime); } @Override public String getType() { return "myElasticJobListener"; } }
2) 在项目resources 新建文件夹: META-INF\services
3)新建文件,名称为:org.apache.shardingsphere.elasticjob.infra.listener.ElasticJobListener
文集内容:
# 监听器实现类的 类全路径
com.example.springelasticjob.config.MyElasticJobListener
4)job 配置增加监听器:
// 创建作业配置
JobConfiguration jobConfiguration = JobConfiguration.newBuilder("myjob-param", 1).cron("0/5 * * * * ?")
.overwrite(true).shardingItemParameters("0=Beijing,1=Shanghai,2=Guangzhou").jobParameter("0=a,1=b,2=c")
.jobListenerTypes("myElasticJobListener")
.build();
jobListenerTypes(“myElasticJobListener”) 中 “myElasticJobListener” 要和 MyElasticJobListener getType() 返回的保持一致,否则启动无法找到 监听器:
import lombok.extern.slf4j.Slf4j; import org.apache.shardingsphere.elasticjob.api.ShardingContext; import org.apache.shardingsphere.elasticjob.dataflow.job.DataflowJob; import java.util.ArrayList; import java.util.List; /** * 流任务 */ @Slf4j public class MyDataFlowJob implements DataflowJob { @Override public List fetchData(ShardingContext shardingContext) { // 抓取数据 // 分片参数 0=text,1=image,2=radio,3=vedio String jobParameter = shardingContext.getJobParameter(); log.debug("job 执行 error,job名称:{},分片数量:{},分片:{},分片参数:{}", shardingContext.getJobName(), shardingContext.getShardingTotalCount(), shardingContext.getShardingItem(), jobParameter); List list = new ArrayList(1); list.add("lgx"); return list; } @Override public void processData(ShardingContext shardingContext, List list) { // 数据处理 System.out.println("list.toString() = " + list.toString()); } }
private static JobConfiguration createJobConfiguration() {
JobConfiguration jobConfiguration = JobConfiguration.newBuilder("myjob-dataflow-param", 1).cron("0/30 * * * * ?")
.overwrite(true).shardingItemParameters("0=Beijing,1=Shanghai,2=Guangzhou").jobParameter("0=a,1=b,2=c")
// streaming.process 流处理设置为true
.setProperty("streaming.process","true")
.build();
return jobConfiguration;
}
虽然任务是每隔30s 执行一次,但是因为 fetchData 可以一直获取到数据,使的 processData 方法可以一直被调用:
本文对 springBoot 整合 elasticjob 整合,监听器的使用,任务的流式处理做介绍。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。