
@Slf4j
public class ConditionTest {
private static ReentrantLock reentrantLock = new ReentrantLock();
private static final Condition available = reentrantLock.newCondition();
public static void main(String[] args) {
for(int i = 0 ; i < 10; i++){
int finalI = i;
new Thread(()->{
reentrantLock.lock();
try {
System.out.println("正在执行" + finalI);
available.await(1, TimeUnit.MINUTES);
System.out.println("执行完成....");
}catch (Exception e){
e.printStackTrace();
}finally {
System.out.println("解锁");
reentrantLock.unlock();
}
}).start();
}
}
}
执行结果:
正在执行0 正在执行1 正在执行9 正在执行2 正在执行3 正在执行4 正在执行5 正在执行6 正在执行7 正在执行8 执行完成.... 解锁 执行完成.... 解锁 执行完成.... 解锁 执行完成.... 解锁 执行完成.... 解锁 执行完成.... 解锁 执行完成.... 解锁 执行完成.... 解锁 执行完成.... 解锁 执行完成.... 解锁 Process finished with exit code 0
得出的结论:await()方法挂起当前线程并释放锁
二、定时线程池类的类结构图它用来处理延时任务或定时任务。
它接收SchduledFutureTask类型的任务,是线程池调度任务的最小单位,有三种提交任务的方式:
它采用DelayQueue存储等待的任务
SchduledFutureTask接收的参数(成员变量):
private long time:任务开始的时间 private final long sequenceNumber;:任务的序号 private final long period:任务执行的时间间隔
工作线程的执行过程:
ScheduledThreadPoolExecutor会把待执行的任务放到工作队列DelayQueue中,DelayQueue封装了一个PriorityQueue,PriorityQueue会对队列中的ScheduledFutureTask进行排序,具体的排序算法实现如下:
public int compareTo(Delayed other) {
if (other == this) // compare zero if same object
return 0;
if (other instanceof ScheduledFutureTask) {
ScheduledFutureTask> x = (ScheduledFutureTask>)other;
long diff = time - x.time;
if (diff < 0)
return -1;
else if (diff > 0)
return 1;
else if (sequenceNumber < x.sequenceNumber)
return -1;
else
return 1;
}
long diff = getDelay(NANOSECONDS) - other.getDelay(NANOSECONDS);
return (diff < 0) ? -1 : (diff > 0) ? 1 : 0;
}
SchduledFutureTask之run方法实现
run方法是调度task的核心,task的执行实际上是run方法的执行。
public void run() {
boolean periodic = isPeriodic();
//如果当前线程池已经不支持执行任务,则取消
if (!canRunInCurrentRunState(periodic))
cancel(false);
//如果不需要周期性执行,则直接执行run方法然后结束
else if (!periodic)
ScheduledFutureTask.super.run();
//如果需要周期执行,则在执行完任务以后,设置下一次执行时间
else if (ScheduledFutureTask.super.runAndReset()) {
// 计算下次执行该任务的时间
setNextRunTime();
//重复执行任务
reExecutePeriodic(outerTask);
}
}
reExecutePeriodic方法
void reExecutePeriodic(RunnableScheduledFuture> task) {
if (canRunInCurrentRunState(true)) {
super.getQueue().add(task);
if (!canRunInCurrentRunState(true) && remove(task))
task.cancel(false);
else
ensurePrestart();
}
}
该方法和delayedExecute方法类似,不同的是:
线程池任务的提交
首先是schedule方法,该方法是指任务在指定延迟时间到达后触发,只会执行一次。
public ScheduledFuture> schedule(Runnable command,
long delay,
TimeUnit unit) {
//参数校验
if (command == null || unit == null)
throw new NullPointerException();
//这里是一个嵌套结构,首先把用户提交的任务包装成ScheduledFutureTask
//然后在调用decorateTask进行包装,该方法是留给用户去扩展的,默认是个空方法
RunnableScheduledFuture> t = decorateTask(command,
new ScheduledFutureTask(command, null,
triggerTime(delay, unit)));
//包装好任务以后,就进行提交了
delayedExecute(t);
return t;
}
任务提交方法:
private void delayedExecute(RunnableScheduledFuture> task) {
//如果线程池已经关闭,则使用拒绝策略把提交任务拒绝掉
if (isShutdown())
reject(task);
else {
//与ThreadPoolExecutor不同,这里直接把任务加入延迟队列
super.getQueue().add(task);//使用用的DelayedWorkQueue
//如果当前状态无法执行任务,则取消
if (isShutdown() &&
!canRunInCurrentRunState(task.isPeriodic()) &&
remove(task))
task.cancel(false);
else
//这里是增加一个worker线程,避免提交的任务没有worker去执行
//原因就是该类没有像ThreadPoolExecutor一样,woker满了才放入队列
ensurePrestart();
}
}
DelayedWorkQueue
ScheduledThreadPoolExecutor之所以要自己实现阻塞的工作队列,是因为ScheduledThreadPoolExecutor要求的工作队列有些特殊。
DelayedWorkQueue是一个基于堆的数据结构,类似于DelayQueue和PriorityQueue。在执行定时任务的时候,每个任务的执行时间都不同,所以DelayedWorkQueue的工作就是按照执行时间的升序来排列,执行时间距离当前时间越近的任务在队列的前面(注意:这里的顺序并不是绝对的,堆中的排序只保证了子节点的下次执行时间要比父节点的下次执行时间要大,而叶子节点之间并不一定是顺序的 )。
四、扩展:xxljob:
集成xxljob的项目,首先会有这么一段配置:
@Bean
public XxlJobSpringExecutor xxlJobExecutor() {
logger.info(">>>>>>>>>>> xxl-job config init.");
XxlJobSpringExecutor xxlJobSpringExecutor = new XxlJobSpringExecutor();
xxlJobSpringExecutor.setAdminAddresses(adminAddresses);
xxlJobSpringExecutor.setAppname(appname);
xxlJobSpringExecutor.setAddress(address);
xxlJobSpringExecutor.setIp(ip);
xxlJobSpringExecutor.setPort(port);
xxlJobSpringExecutor.setAccessToken(accessToken);
xxlJobSpringExecutor.setLogPath(logPath);
xxlJobSpringExecutor.setLogRetentionDays(logRetentionDays);
return xxlJobSpringExecutor;
}
XxlJobSpringExecutor 实现了SmartInitializingSingleton的接口,当所有单例 bean 都初始化完成以后, Spring的IOC容器会回调该接口的 afterSingletonsInstantiated()方法:
1、会将带有XxlJob注解的方法注册到容器中,key为XxlJob的value,value为MethodJobHandler。
2、使用了Netty的Http通信,然后启动了服务器端
xxljob-admin会和服务端进行通信:
1、创建一个线程scheduleThread,执行调度任务,是daemon线程:
1) 使用悲观锁,进行分布式锁,这样即时有多个xxljob-admin也只会有一台执行:
select * from xxl_job_lock where lock_name = 'schedule_lock' for update
2)获取小于等于下次执行时间在5秒后的任务
SELECTFROM xxl_job_info AS t WHERe t.trigger_status = 1 and t.trigger_next_time #{maxNextTime} ORDER BY id ASC LIMIT #{pagesize}
3)然后判断 当前任务的时间是否在5秒内,如果不在则放弃执行,
当前是10:00:00, trigger_next_time = 9:59:54 则直接放弃,其他情况将需要执行的作业根据执行时间取模并写入到一个Map结构中,即ringData
int ringSecond = (int)((jobInfo.getTriggerNextTime()/1000)%60); ListringItemData = ringData.get(ringSecond); if (ringItemData == null) { ringItemData = new ArrayList (); ringData.put(ringSecond, ringItemData); } ringItemData.add(jobId);
2、创建一个线程ringThread
不断循环获取ringItemData集合的数据,通过线程池执行任务:通过jobId获取任务,然后根据任务的jobGroup获取XxlJobGroup
SELECtFROM xxl_job_info AS t WHERe t.id = #{id}
SELECtFROM xxl_job_group AS t WHERe t.id = #{id}
向xxl-job-admin发执行任务的消息http://adress/run,请求参数:
TriggerParam triggerParam = new TriggerParam(); triggerParam.setJobId(jobInfo.getId()); triggerParam.setExecutorHandler(jobInfo.getExecutorHandler()); triggerParam.setExecutorParams(jobInfo.getExecutorParam()); triggerParam.setExecutorBlockStrategy(jobInfo.getExecutorBlockStrategy()); triggerParam.setExecutorTimeout(jobInfo.getExecutorTimeout()); triggerParam.setLogId(jobLog.getId()); triggerParam.setLogDateTime(jobLog.getTriggerTime().getTime()); triggerParam.setGlueType(jobInfo.getGlueType()); triggerParam.setGlueSource(jobInfo.getGlueSource()); triggerParam.setGlueUpdatetime(jobInfo.getGlueUpdatetime().getTime()); triggerParam.setBroadcastIndex(index); triggerParam.setBroadcastTotal(total);