feat: add asyncTool

This commit is contained in:
landaiqing
2024-06-25 21:08:44 +08:00
parent a64b1f9533
commit ae608c5cf2
42 changed files with 3299 additions and 0 deletions

View File

@@ -0,0 +1,24 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.jd.platform</groupId>
<artifactId>schisandra-cloud-storage-asyncTool</artifactId>
<version>1.4.1-SNAPSHOT</version>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.3</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>
</plugins>
</build>
</project>

View File

@@ -0,0 +1,21 @@
package com.jd.platform.async.callback;
import com.jd.platform.async.worker.WorkResult;
/**
* 默认回调类,如果不设置的话,会默认给这个回调
* @author wuweifeng wrote on 2019-11-19.
*/
public class DefaultCallback<T, V> implements ICallback<T, V> {
@Override
public void begin() {
}
@Override
public void result(boolean success, T param, WorkResult<V> workResult) {
}
}

View File

@@ -0,0 +1,21 @@
package com.jd.platform.async.callback;
import com.jd.platform.async.wrapper.WorkerWrapper;
import java.util.List;
/**
* @author wuweifeng wrote on 2019-12-27
* @version 1.0
*/
public class DefaultGroupCallback implements IGroupCallback {
@Override
public void success(List<WorkerWrapper> workerWrappers) {
}
@Override
public void failure(List<WorkerWrapper> workerWrappers, Exception e) {
}
}

View File

@@ -0,0 +1,26 @@
package com.jd.platform.async.callback;
import com.jd.platform.async.worker.WorkResult;
/**
* 每个执行单元执行完毕后,会回调该接口</p>
* 需要监听执行结果的,实现该接口即可
*
* @author wuweifeng wrote on 2019-11-19.
*/
@FunctionalInterface
public interface ICallback<T, V> {
/**
* 任务开始的监听
*/
default void begin() {
}
/**
* 耗时操作执行完毕后就给value注入值
*/
void result(boolean success, T param, WorkResult<V> workResult);
}

View File

@@ -0,0 +1,20 @@
package com.jd.platform.async.callback;
import com.jd.platform.async.wrapper.WorkerWrapper;
import java.util.List;
/**
* 如果是异步执行整组的话,可以用这个组回调。不推荐使用
* @author wuweifeng wrote on 2019-11-19.
*/
public interface IGroupCallback {
/**
* 成功后可以从wrapper里去getWorkResult
*/
void success(List<WorkerWrapper> workerWrappers);
/**
* 失败了也可以从wrapper里去getWorkResult
*/
void failure(List<WorkerWrapper> workerWrappers, Exception e);
}

View File

@@ -0,0 +1,20 @@
package com.jd.platform.async.callback;
/**
* @author wuweifeng wrote on 2019-12-20
* @version 1.0
*/
public interface ITimeoutWorker<T, V> extends IWorker<T, V> {
/**
* 每个worker都可以设置超时时间
* @return 毫秒超时时间
*/
long timeOut();
/**
* 是否开启单个执行单元的超时功能有时是一个group设置个超时而不具备关心单个worker的超时
* <p>注意,如果开启了单个执行单元的超时检测,将使线程池数量多出一倍</p>
* @return 是否开启
*/
boolean enableTimeOut();
}

View File

@@ -0,0 +1,30 @@
package com.jd.platform.async.callback;
import java.util.Map;
import com.jd.platform.async.wrapper.WorkerWrapper;
/**
* 每个最小执行单元需要实现该接口
*
* @author wuweifeng wrote on 2019-11-19.
*/
@FunctionalInterface
public interface IWorker<T, V> {
/**
* 在这里做耗时操作如rpc请求、IO等
*
* @param object object
* @param allWrappers 任务包装
*/
V action(T object, Map<String, WorkerWrapper> allWrappers);
/**
* 超时、异常时,返回的默认值
*
* @return 默认值
*/
default V defaultValue() {
return null;
}
}

View File

@@ -0,0 +1,16 @@
package com.jd.platform.async.exception;
/**
* 如果任务在执行之前自己后面的任务已经执行完或正在被执行则抛该exception
* @author wuweifeng wrote on 2020-02-18
* @version 1.0
*/
public class SkippedException extends RuntimeException {
public SkippedException() {
super();
}
public SkippedException(String message) {
super(message);
}
}

View File

@@ -0,0 +1,157 @@
package com.jd.platform.async.executor;
import com.jd.platform.async.callback.DefaultGroupCallback;
import com.jd.platform.async.callback.IGroupCallback;
import com.jd.platform.async.wrapper.WorkerWrapper;
import java.util.*;
import java.util.concurrent.*;
import java.util.stream.Collectors;
/**
* 类入口可以根据自己情况调整core线程的数量
* @author wuweifeng wrote on 2019-12-18
* @version 1.0
*/
public class Async {
/**
* 默认不定长线程池
*/
private static final ThreadPoolExecutor COMMON_POOL = (ThreadPoolExecutor) Executors.newCachedThreadPool();
/**
* 注意这里是个static也就是只能有一个线程池。用户自定义线程池时也只能定义一个
*/
private static ExecutorService executorService;
/**
* 出发点
*/
public static boolean beginWork(long timeout, ExecutorService executorService, List<WorkerWrapper> workerWrappers) throws ExecutionException, InterruptedException {
if(workerWrappers == null || workerWrappers.size() == 0) {
return false;
}
//保存线程池变量
Async.executorService = executorService;
//定义一个map存放所有的wrapperkey为wrapper的唯一idvalue是该wrapper可以从value中获取wrapper的result
Map<String, WorkerWrapper> forParamUseWrappers = new ConcurrentHashMap<>();
CompletableFuture[] futures = new CompletableFuture[workerWrappers.size()];
for (int i = 0; i < workerWrappers.size(); i++) {
WorkerWrapper wrapper = workerWrappers.get(i);
futures[i] = CompletableFuture.runAsync(() -> wrapper.work(executorService, timeout, forParamUseWrappers), executorService);
}
try {
CompletableFuture.allOf(futures).get(timeout, TimeUnit.MILLISECONDS);
return true;
} catch (TimeoutException e) {
Set<WorkerWrapper> set = new HashSet<>();
totalWorkers(workerWrappers, set);
for (WorkerWrapper wrapper : set) {
wrapper.stopNow();
}
return false;
}
}
/**
* 如果想自定义线程池请传pool。不自定义的话就走默认的COMMON_POOL
*/
public static boolean beginWork(long timeout, ExecutorService executorService, WorkerWrapper... workerWrapper) throws ExecutionException, InterruptedException {
if(workerWrapper == null || workerWrapper.length == 0) {
return false;
}
List<WorkerWrapper> workerWrappers = Arrays.stream(workerWrapper).collect(Collectors.toList());
return beginWork(timeout, executorService, workerWrappers);
}
/**
* 同步阻塞,直到所有都完成,或失败
*/
public static boolean beginWork(long timeout, WorkerWrapper... workerWrapper) throws ExecutionException, InterruptedException {
return beginWork(timeout, COMMON_POOL, workerWrapper);
}
public static void beginWorkAsync(long timeout, IGroupCallback groupCallback, WorkerWrapper... workerWrapper) {
beginWorkAsync(timeout, COMMON_POOL, groupCallback, workerWrapper);
}
/**
* 异步执行,直到所有都完成,或失败后,发起回调
*/
public static void beginWorkAsync(long timeout, ExecutorService executorService, IGroupCallback groupCallback, WorkerWrapper... workerWrapper) {
if (groupCallback == null) {
groupCallback = new DefaultGroupCallback();
}
IGroupCallback finalGroupCallback = groupCallback;
if (executorService != null) {
executorService.submit(() -> {
try {
boolean success = beginWork(timeout, executorService, workerWrapper);
if (success) {
finalGroupCallback.success(Arrays.asList(workerWrapper));
} else {
finalGroupCallback.failure(Arrays.asList(workerWrapper), new TimeoutException());
}
} catch (ExecutionException | InterruptedException e) {
e.printStackTrace();
finalGroupCallback.failure(Arrays.asList(workerWrapper), e);
}
});
} else {
COMMON_POOL.submit(() -> {
try {
boolean success = beginWork(timeout, COMMON_POOL, workerWrapper);
if (success) {
finalGroupCallback.success(Arrays.asList(workerWrapper));
} else {
finalGroupCallback.failure(Arrays.asList(workerWrapper), new TimeoutException());
}
} catch (ExecutionException | InterruptedException e) {
e.printStackTrace();
finalGroupCallback.failure(Arrays.asList(workerWrapper), e);
}
});
}
}
/**
* 总共多少个执行单元
*/
@SuppressWarnings("unchecked")
private static void totalWorkers(List<WorkerWrapper> workerWrappers, Set<WorkerWrapper> set) {
set.addAll(workerWrappers);
for (WorkerWrapper wrapper : workerWrappers) {
if (wrapper.getNextWrappers() == null) {
continue;
}
List<WorkerWrapper> wrappers = wrapper.getNextWrappers();
totalWorkers(wrappers, set);
}
}
/**
* 关闭线程池
*/
public static void shutDown() {
shutDown(executorService);
}
/**
* 关闭线程池
*/
public static void shutDown(ExecutorService executorService) {
if (executorService != null) {
executorService.shutdown();
} else {
COMMON_POOL.shutdown();
}
}
public static String getThreadCount() {
return "activeCount=" + COMMON_POOL.getActiveCount() +
" completedCount " + COMMON_POOL.getCompletedTaskCount() +
" largestCount " + COMMON_POOL.getLargestPoolSize();
}
}

View File

@@ -0,0 +1,52 @@
package com.jd.platform.async.executor.timer;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
/**
* 用于解决高并发下System.currentTimeMillis卡顿
* @author lry
*/
public class SystemClock {
private final int period;
private final AtomicLong now;
private static class InstanceHolder {
private static final SystemClock INSTANCE = new SystemClock(1);
}
private SystemClock(int period) {
this.period = period;
this.now = new AtomicLong(System.currentTimeMillis());
scheduleClockUpdating();
}
private static SystemClock instance() {
return InstanceHolder.INSTANCE;
}
private void scheduleClockUpdating() {
ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor(runnable -> {
Thread thread = new Thread(runnable, "System Clock");
thread.setDaemon(true);
return thread;
});
scheduler.scheduleAtFixedRate(() -> now.set(System.currentTimeMillis()), period, period, TimeUnit.MILLISECONDS);
}
private long currentTimeMillis() {
return now.get();
}
/**
* 用来替换原来的System.currentTimeMillis()
*/
public static long now() {
return instance().currentTimeMillis();
}
}

View File

@@ -0,0 +1,59 @@
package com.jd.platform.async.worker;
import com.jd.platform.async.wrapper.WorkerWrapper;
/**
* 对依赖的wrapper的封装
* @author wuweifeng wrote on 2019-12-20
* @version 1.0
*/
public class DependWrapper {
private WorkerWrapper<?, ?> dependWrapper;
/**
* 是否该依赖必须完成后才能执行自己.<p>
* 因为存在一个任务,依赖于多个任务,是让这多个任务全部完成后才执行自己,还是某几个执行完毕就可以执行自己
* 如
* 1
* ---3
* 2
* 或
* 1---3
* 2---3
* 这两种就不一样上面的就是必须12都完毕才能3
* 下面的就是1完毕就可以3
*/
private boolean must = true;
public DependWrapper(WorkerWrapper<?, ?> dependWrapper, boolean must) {
this.dependWrapper = dependWrapper;
this.must = must;
}
public DependWrapper() {
}
public WorkerWrapper<?, ?> getDependWrapper() {
return dependWrapper;
}
public void setDependWrapper(WorkerWrapper<?, ?> dependWrapper) {
this.dependWrapper = dependWrapper;
}
public boolean isMust() {
return must;
}
public void setMust(boolean must) {
this.must = must;
}
@Override
public String toString() {
return "DependWrapper{" +
"dependWrapper=" + dependWrapper +
", must=" + must +
'}';
}
}

View File

@@ -0,0 +1,12 @@
package com.jd.platform.async.worker;
/**
* 结果状态
* @author wuweifeng wrote on 2019-11-19.
*/
public enum ResultState {
SUCCESS,
TIMEOUT,
EXCEPTION,
DEFAULT //默认状态
}

View File

@@ -0,0 +1,63 @@
package com.jd.platform.async.worker;
/**
* 执行结果
*/
public class WorkResult<V> {
/**
* 执行的结果
*/
private V result;
/**
* 结果状态
*/
private ResultState resultState;
private Exception ex;
public WorkResult(V result, ResultState resultState) {
this(result, resultState, null);
}
public WorkResult(V result, ResultState resultState, Exception ex) {
this.result = result;
this.resultState = resultState;
this.ex = ex;
}
public static <V> WorkResult<V> defaultResult() {
return new WorkResult<>(null, ResultState.DEFAULT);
}
@Override
public String toString() {
return "WorkResult{" +
"result=" + result +
", resultState=" + resultState +
", ex=" + ex +
'}';
}
public Exception getEx() {
return ex;
}
public void setEx(Exception ex) {
this.ex = ex;
}
public V getResult() {
return result;
}
public void setResult(V result) {
this.result = result;
}
public ResultState getResultState() {
return resultState;
}
public void setResultState(ResultState resultState) {
this.resultState = resultState;
}
}

View File

@@ -0,0 +1,613 @@
package com.jd.platform.async.wrapper;
import com.jd.platform.async.callback.DefaultCallback;
import com.jd.platform.async.callback.ICallback;
import com.jd.platform.async.callback.IWorker;
import com.jd.platform.async.exception.SkippedException;
import com.jd.platform.async.executor.timer.SystemClock;
import com.jd.platform.async.worker.DependWrapper;
import com.jd.platform.async.worker.ResultState;
import com.jd.platform.async.worker.WorkResult;
import java.util.*;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
/**
* 对每个worker及callback进行包装一对一
*
* @author wuweifeng wrote on 2019-11-19.
*/
public class WorkerWrapper<T, V> {
/**
* 该wrapper的唯一标识
*/
private String id;
/**
* worker将来要处理的param
*/
private T param;
private IWorker<T, V> worker;
private ICallback<T, V> callback;
/**
* 在自己后面的wrapper如果没有自己就是末尾如果有一个就是串行如果有多个有几个就需要开几个线程</p>
* -------2
* 1
* -------3
* 如1后面有2、3
*/
private List<WorkerWrapper<?, ?>> nextWrappers;
/**
* 依赖的wrappers有2种情况1:必须依赖的全部完成后,才能执行自己 2:依赖的任何一个、多个完成了,就可以执行自己
* 通过must字段来控制是否依赖项必须完成
* 1
* -------3
* 2
* 1、2执行完毕后才能执行3
*/
private List<DependWrapper> dependWrappers;
/**
* 标记该事件是否已经被处理过了譬如已经超时返回false了后续rpc又收到返回值了则不再二次回调
* 经试验,volatile并不能保证"同一毫秒"内,多线程对该值的修改和拉取
* <p>
* 1-finish, 2-error, 3-working
*/
private AtomicInteger state = new AtomicInteger(0);
/**
* 该map存放所有wrapper的id和wrapper映射
*/
private Map<String, WorkerWrapper> forParamUseWrappers;
/**
* 也是个钩子变量,用来存临时的结果
*/
private volatile WorkResult<V> workResult = WorkResult.defaultResult();
/**
* 是否在执行自己前去校验nextWrapper的执行结果<p>
* 1 4
* -------3
* 2
* 如这种在4执行前可能3已经执行完毕了被2执行完后触发的那么4就没必要执行了。
* 注意该属性仅在nextWrapper数量<=1时有效>1时的情况是不存在的
*/
private volatile boolean needCheckNextWrapperResult = true;
private static final int FINISH = 1;
private static final int ERROR = 2;
private static final int WORKING = 3;
private static final int INIT = 0;
private WorkerWrapper(String id, IWorker<T, V> worker, T param, ICallback<T, V> callback) {
if (worker == null) {
throw new NullPointerException("async.worker is null");
}
this.worker = worker;
this.param = param;
this.id = id;
//允许不设置回调
if (callback == null) {
callback = new DefaultCallback<>();
}
this.callback = callback;
}
/**
* 开始工作
* fromWrapper代表这次work是由哪个上游wrapper发起的
*/
private void work(ExecutorService executorService, WorkerWrapper fromWrapper, long remainTime, Map<String, WorkerWrapper> forParamUseWrappers) {
this.forParamUseWrappers = forParamUseWrappers;
//将自己放到所有wrapper的集合里去
forParamUseWrappers.put(id, this);
long now = SystemClock.now();
//总的已经超时了,就快速失败,进行下一个
if (remainTime <= 0) {
fastFail(INIT, null);
beginNext(executorService, now, remainTime);
return;
}
//如果自己已经执行过了。
//可能有多个依赖,其中的一个依赖已经执行完了,并且自己也已开始执行或执行完毕。当另一个依赖执行完毕,又进来该方法时,就不重复处理了
if (getState() == FINISH || getState() == ERROR) {
beginNext(executorService, now, remainTime);
return;
}
//如果在执行前需要校验nextWrapper的状态
if (needCheckNextWrapperResult) {
//如果自己的next链上有已经出结果或已经开始执行的任务了自己就不用继续了
if (!checkNextWrapperResult()) {
fastFail(INIT, new SkippedException());
beginNext(executorService, now, remainTime);
return;
}
}
//如果没有任何依赖,说明自己就是第一批要执行的
if (dependWrappers == null || dependWrappers.size() == 0) {
fire();
beginNext(executorService, now, remainTime);
return;
}
/*如果有前方依赖,存在两种情况
一种是前面只有一个wrapper。即 A -> B
一种是前面有多个wrapper。A C D -> B。需要A、C、D都完成了才能轮到B。但是无论是A执行完还是C执行完都会去唤醒B。
所以需要B来做判断必须A、C、D都完成自己才能执行 */
//只有一个依赖
if (dependWrappers.size() == 1) {
doDependsOneJob(fromWrapper);
beginNext(executorService, now, remainTime);
} else {
//有多个依赖时
doDependsJobs(executorService, dependWrappers, fromWrapper, now, remainTime);
}
}
public void work(ExecutorService executorService, long remainTime, Map<String, WorkerWrapper> forParamUseWrappers) {
work(executorService, null, remainTime, forParamUseWrappers);
}
/**
* 总控制台超时,停止所有任务
*/
public void stopNow() {
if (getState() == INIT || getState() == WORKING) {
fastFail(getState(), null);
}
}
/**
* 判断自己下游链路上,是否存在已经出结果的或已经开始执行的
* 如果没有返回true如果有返回false
*/
private boolean checkNextWrapperResult() {
//如果自己就是最后一个或者后面有并行的多个就返回true
if (nextWrappers == null || nextWrappers.size() != 1) {
return getState() == INIT;
}
WorkerWrapper nextWrapper = nextWrappers.get(0);
boolean state = nextWrapper.getState() == INIT;
//继续校验自己的next的状态
return state && nextWrapper.checkNextWrapperResult();
}
/**
* 进行下一个任务
*/
private void beginNext(ExecutorService executorService, long now, long remainTime) {
//花费的时间
long costTime = SystemClock.now() - now;
if (nextWrappers == null) {
return;
}
if (nextWrappers.size() == 1) {
nextWrappers.get(0).work(executorService, WorkerWrapper.this, remainTime - costTime, forParamUseWrappers);
return;
}
CompletableFuture[] futures = new CompletableFuture[nextWrappers.size()];
for (int i = 0; i < nextWrappers.size(); i++) {
int finalI = i;
futures[i] = CompletableFuture.runAsync(() -> nextWrappers.get(finalI)
.work(executorService, WorkerWrapper.this, remainTime - costTime, forParamUseWrappers), executorService);
}
try {
CompletableFuture.allOf(futures).get(remainTime - costTime, TimeUnit.MILLISECONDS);
} catch (Exception e) {
e.printStackTrace();
}
}
private void doDependsOneJob(WorkerWrapper dependWrapper) {
if (ResultState.TIMEOUT == dependWrapper.getWorkResult().getResultState()) {
workResult = defaultResult();
fastFail(INIT, null);
} else if (ResultState.EXCEPTION == dependWrapper.getWorkResult().getResultState()) {
workResult = defaultExResult(dependWrapper.getWorkResult().getEx());
fastFail(INIT, null);
} else {
//前面任务正常完毕了,该自己了
fire();
}
}
private synchronized void doDependsJobs(ExecutorService executorService, List<DependWrapper> dependWrappers, WorkerWrapper fromWrapper, long now, long remainTime) {
//如果当前任务已经完成了,依赖的其他任务拿到锁再进来时,不需要执行下面的逻辑了。
if (getState() != INIT) {
return;
}
boolean nowDependIsMust = false;
//创建必须完成的上游wrapper集合
Set<DependWrapper> mustWrapper = new HashSet<>();
for (DependWrapper dependWrapper : dependWrappers) {
if (dependWrapper.isMust()) {
mustWrapper.add(dependWrapper);
}
if (dependWrapper.getDependWrapper().equals(fromWrapper)) {
nowDependIsMust = dependWrapper.isMust();
}
}
//如果全部是不必须的条件,那么只要到了这里,就执行自己。
if (mustWrapper.size() == 0) {
if (ResultState.TIMEOUT == fromWrapper.getWorkResult().getResultState()) {
fastFail(INIT, null);
} else {
fire();
}
beginNext(executorService, now, remainTime);
return;
}
//如果存在需要必须完成的且fromWrapper不是必须的就什么也不干
if (!nowDependIsMust) {
return;
}
//如果fromWrapper是必须的
boolean existNoFinish = false;
boolean hasError = false;
//先判断前面必须要执行的依赖任务的执行结果如果有任何一个失败那就不用走action了直接给自己设置为失败进行下一步就是了
for (DependWrapper dependWrapper : mustWrapper) {
WorkerWrapper workerWrapper = dependWrapper.getDependWrapper();
WorkResult tempWorkResult = workerWrapper.getWorkResult();
//为null或者isWorking说明它依赖的某个任务还没执行到或没执行完
if (workerWrapper.getState() == INIT || workerWrapper.getState() == WORKING) {
existNoFinish = true;
break;
}
if (ResultState.TIMEOUT == tempWorkResult.getResultState()) {
workResult = defaultResult();
hasError = true;
break;
}
if (ResultState.EXCEPTION == tempWorkResult.getResultState()) {
workResult = defaultExResult(workerWrapper.getWorkResult().getEx());
hasError = true;
break;
}
}
//只要有失败的
if (hasError) {
fastFail(INIT, null);
beginNext(executorService, now, remainTime);
return;
}
//如果上游都没有失败分为两种情况一种是都finish了一种是有的在working
//都finish的话
if (!existNoFinish) {
//上游都finish了进行自己
fire();
beginNext(executorService, now, remainTime);
return;
}
}
/**
* 执行自己的job.具体的执行是在另一个线程里,但判断阻塞超时是在work线程
*/
private void fire() {
//阻塞取结果
workResult = workerDoJob();
}
/**
* 快速失败
*/
private boolean fastFail(int expect, Exception e) {
//试图将它从expect状态,改成Error
if (!compareAndSetState(expect, ERROR)) {
return false;
}
//尚未处理过结果
if (checkIsNullResult()) {
if (e == null) {
workResult = defaultResult();
} else {
workResult = defaultExResult(e);
}
}
callback.result(false, param, workResult);
return true;
}
/**
* 具体的单个worker执行任务
*/
private WorkResult<V> workerDoJob() {
//避免重复执行
if (!checkIsNullResult()) {
return workResult;
}
try {
//如果已经不是init状态了说明正在被执行或已执行完毕。这一步很重要可以保证任务不被重复执行
if (!compareAndSetState(INIT, WORKING)) {
return workResult;
}
callback.begin();
//执行耗时操作
V resultValue = worker.action(param, forParamUseWrappers);
//如果状态不是在working,说明别的地方已经修改了
if (!compareAndSetState(WORKING, FINISH)) {
return workResult;
}
workResult.setResultState(ResultState.SUCCESS);
workResult.setResult(resultValue);
//回调成功
callback.result(true, param, workResult);
return workResult;
} catch (Exception e) {
//避免重复回调
if (!checkIsNullResult()) {
return workResult;
}
fastFail(WORKING, e);
return workResult;
}
}
public WorkResult<V> getWorkResult() {
return workResult;
}
public List<WorkerWrapper<?, ?>> getNextWrappers() {
return nextWrappers;
}
public void setParam(T param) {
this.param = param;
}
private boolean checkIsNullResult() {
return ResultState.DEFAULT == workResult.getResultState();
}
private void addDepend(WorkerWrapper<?, ?> workerWrapper, boolean must) {
addDepend(new DependWrapper(workerWrapper, must));
}
private void addDepend(DependWrapper dependWrapper) {
if (dependWrappers == null) {
dependWrappers = new ArrayList<>();
}
//如果依赖的是重复的同一个,就不重复添加了
for (DependWrapper wrapper : dependWrappers) {
if (wrapper.equals(dependWrapper)) {
return;
}
}
dependWrappers.add(dependWrapper);
}
private void addNext(WorkerWrapper<?, ?> workerWrapper) {
if (nextWrappers == null) {
nextWrappers = new ArrayList<>();
}
//避免添加重复
for (WorkerWrapper wrapper : nextWrappers) {
if (workerWrapper.equals(wrapper)) {
return;
}
}
nextWrappers.add(workerWrapper);
}
private void addNextWrappers(List<WorkerWrapper<?, ?>> wrappers) {
if (wrappers == null) {
return;
}
for (WorkerWrapper<?, ?> wrapper : wrappers) {
addNext(wrapper);
}
}
private void addDependWrappers(List<DependWrapper> dependWrappers) {
if (dependWrappers == null) {
return;
}
for (DependWrapper wrapper : dependWrappers) {
addDepend(wrapper);
}
}
private WorkResult<V> defaultResult() {
workResult.setResultState(ResultState.TIMEOUT);
workResult.setResult(worker.defaultValue());
return workResult;
}
private WorkResult<V> defaultExResult(Exception ex) {
workResult.setResultState(ResultState.EXCEPTION);
workResult.setResult(worker.defaultValue());
workResult.setEx(ex);
return workResult;
}
private int getState() {
return state.get();
}
public String getId() {
return id;
}
private boolean compareAndSetState(int expect, int update) {
return this.state.compareAndSet(expect, update);
}
private void setNeedCheckNextWrapperResult(boolean needCheckNextWrapperResult) {
this.needCheckNextWrapperResult = needCheckNextWrapperResult;
}
@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
WorkerWrapper<?, ?> that = (WorkerWrapper<?, ?>) o;
return needCheckNextWrapperResult == that.needCheckNextWrapperResult &&
Objects.equals(param, that.param) &&
Objects.equals(worker, that.worker) &&
Objects.equals(callback, that.callback) &&
Objects.equals(nextWrappers, that.nextWrappers) &&
Objects.equals(dependWrappers, that.dependWrappers) &&
Objects.equals(state, that.state) &&
Objects.equals(workResult, that.workResult);
}
@Override
public int hashCode() {
return Objects.hash(param, worker, callback, nextWrappers, dependWrappers, state, workResult, needCheckNextWrapperResult);
}
public static class Builder<W, C> {
/**
* 该wrapper的唯一标识
*/
private String id = UUID.randomUUID().toString();
/**
* worker将来要处理的param
*/
private W param;
private IWorker<W, C> worker;
private ICallback<W, C> callback;
/**
* 自己后面的所有
*/
private List<WorkerWrapper<?, ?>> nextWrappers;
/**
* 自己依赖的所有
*/
private List<DependWrapper> dependWrappers;
/**
* 存储强依赖于自己的wrapper集合
*/
private Set<WorkerWrapper<?, ?>> selfIsMustSet;
private boolean needCheckNextWrapperResult = true;
public Builder<W, C> worker(IWorker<W, C> worker) {
this.worker = worker;
return this;
}
public Builder<W, C> param(W w) {
this.param = w;
return this;
}
public Builder<W, C> id(String id) {
if (id != null) {
this.id = id;
}
return this;
}
public Builder<W, C> needCheckNextWrapperResult(boolean needCheckNextWrapperResult) {
this.needCheckNextWrapperResult = needCheckNextWrapperResult;
return this;
}
public Builder<W, C> callback(ICallback<W, C> callback) {
this.callback = callback;
return this;
}
public Builder<W, C> depend(WorkerWrapper<?, ?>... wrappers) {
if (wrappers == null) {
return this;
}
for (WorkerWrapper<?, ?> wrapper : wrappers) {
depend(wrapper);
}
return this;
}
public Builder<W, C> depend(WorkerWrapper<?, ?> wrapper) {
return depend(wrapper, true);
}
public Builder<W, C> depend(WorkerWrapper<?, ?> wrapper, boolean isMust) {
if (wrapper == null) {
return this;
}
DependWrapper dependWrapper = new DependWrapper(wrapper, isMust);
if (dependWrappers == null) {
dependWrappers = new ArrayList<>();
}
dependWrappers.add(dependWrapper);
return this;
}
public Builder<W, C> next(WorkerWrapper<?, ?> wrapper) {
return next(wrapper, true);
}
public Builder<W, C> next(WorkerWrapper<?, ?> wrapper, boolean selfIsMust) {
if (nextWrappers == null) {
nextWrappers = new ArrayList<>();
}
nextWrappers.add(wrapper);
//强依赖自己
if (selfIsMust) {
if (selfIsMustSet == null) {
selfIsMustSet = new HashSet<>();
}
selfIsMustSet.add(wrapper);
}
return this;
}
public Builder<W, C> next(WorkerWrapper<?, ?>... wrappers) {
if (wrappers == null) {
return this;
}
for (WorkerWrapper<?, ?> wrapper : wrappers) {
next(wrapper);
}
return this;
}
public WorkerWrapper<W, C> build() {
WorkerWrapper<W, C> wrapper = new WorkerWrapper<>(id, worker, param, callback);
wrapper.setNeedCheckNextWrapperResult(needCheckNextWrapperResult);
if (dependWrappers != null) {
for (DependWrapper workerWrapper : dependWrappers) {
workerWrapper.getDependWrapper().addNext(wrapper);
wrapper.addDepend(workerWrapper);
}
}
if (nextWrappers != null) {
for (WorkerWrapper<?, ?> workerWrapper : nextWrappers) {
boolean must = false;
if (selfIsMustSet != null && selfIsMustSet.contains(workerWrapper)) {
must = true;
}
workerWrapper.addDepend(wrapper, must);
wrapper.addNext(workerWrapper);
}
}
return wrapper;
}
}
}

View File

@@ -0,0 +1,42 @@
package depend;
import com.jd.platform.async.callback.ICallback;
import com.jd.platform.async.callback.IWorker;
import com.jd.platform.async.worker.WorkResult;
import com.jd.platform.async.wrapper.WorkerWrapper;
import java.util.Map;
/**
* @author wuweifeng wrote on 2019-11-20.
*/
public class DeWorker implements IWorker<String, User>, ICallback<String, User> {
@Override
public User action(String object, Map<String, WorkerWrapper> allWrappers) {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return new User("user0");
}
@Override
public User defaultValue() {
return new User("default User");
}
@Override
public void begin() {
//System.out.println(Thread.currentThread().getName() + "- start --" + System.currentTimeMillis());
}
@Override
public void result(boolean success, String param, WorkResult<User> workResult) {
System.out.println("worker0 的结果是:" + workResult.getResult());
}
}

View File

@@ -0,0 +1,43 @@
package depend;
import com.jd.platform.async.callback.ICallback;
import com.jd.platform.async.callback.IWorker;
import com.jd.platform.async.worker.WorkResult;
import com.jd.platform.async.wrapper.WorkerWrapper;
import java.util.Map;
/**
* @author wuweifeng wrote on 2019-11-20.
*/
public class DeWorker1 implements IWorker<WorkResult<User>, User>, ICallback<WorkResult<User>, User> {
@Override
public User action(WorkResult<User> result, Map<String, WorkerWrapper> allWrappers) {
System.out.println("par1的入参来自于par0 " + result.getResult());
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return new User("user1");
}
@Override
public User defaultValue() {
return new User("default User");
}
@Override
public void begin() {
//System.out.println(Thread.currentThread().getName() + "- start --" + System.currentTimeMillis());
}
@Override
public void result(boolean success, WorkResult<User> param, WorkResult<User> workResult) {
System.out.println("worker1 的结果是:" + workResult.getResult());
}
}

View File

@@ -0,0 +1,43 @@
package depend;
import com.jd.platform.async.callback.ICallback;
import com.jd.platform.async.callback.IWorker;
import com.jd.platform.async.worker.WorkResult;
import com.jd.platform.async.wrapper.WorkerWrapper;
import java.util.Map;
/**
* @author wuweifeng wrote on 2019-11-20.
*/
public class DeWorker2 implements IWorker<WorkResult<User>, String>, ICallback<WorkResult<User>, String> {
@Override
public String action(WorkResult<User> result, Map<String, WorkerWrapper> allWrappers) {
System.out.println("par2的入参来自于par1 " + result.getResult());
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return result.getResult().getName();
}
@Override
public String defaultValue() {
return "default";
}
@Override
public void begin() {
//System.out.println(Thread.currentThread().getName() + "- start --" + System.currentTimeMillis());
}
@Override
public void result(boolean success, WorkResult<User> param, WorkResult<String> workResult) {
System.out.println("worker2 的结果是:" + workResult.getResult());
}
}

View File

@@ -0,0 +1,74 @@
package depend;
import java.util.Map;
import com.jd.platform.async.executor.Async;
import com.jd.platform.async.worker.WorkResult;
import com.jd.platform.async.wrapper.WorkerWrapper;
/**
* @author sjsdfg
* @since 2020/6/14
*/
public class LambdaTest {
public static void main(String[] args) throws Exception {
WorkerWrapper<WorkResult<User>, String> workerWrapper2 = new WorkerWrapper.Builder<WorkResult<User>, String>()
.worker((WorkResult<User> result, Map<String, WorkerWrapper> allWrappers) -> {
System.out.println("par2的入参来自于par1 " + result.getResult());
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return result.getResult().getName();
})
.callback((boolean success, WorkResult<User> param, WorkResult<String> workResult) ->
System.out.println(String.format("thread is %s, param is %s, result is %s", Thread.currentThread().getName(), param, workResult)))
.id("third")
.build();
WorkerWrapper<WorkResult<User>, User> workerWrapper1 = new WorkerWrapper.Builder<WorkResult<User>, User>()
.worker((WorkResult<User> result, Map<String, WorkerWrapper> allWrappers) -> {
System.out.println("par1的入参来自于par0 " + result.getResult());
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return new User("user1");
})
.callback((boolean success, WorkResult<User> param, WorkResult<User> workResult) ->
System.out.println(String.format("thread is %s, param is %s, result is %s", Thread.currentThread().getName(), param, workResult)))
.id("second")
.next(workerWrapper2)
.build();
WorkerWrapper<String, User> workerWrapper = new WorkerWrapper.Builder<String, User>()
.worker((String object, Map<String, WorkerWrapper> allWrappers) -> {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return new User("user0");
})
.param("0")
.id("first")
.next(workerWrapper1, true)
.callback((boolean success, String param, WorkResult<User> workResult) ->
System.out.println(String.format("thread is %s, param is %s, result is %s", Thread.currentThread().getName(), param, workResult)))
.build();
//虽然尚未执行但是也可以先取得结果的引用作为下一个任务的入参。V1.2前写法,需要手工给
//V1.3后不用给wrapper setParam了直接在worker的action里自行根据id获取即可.参考dependnew包下代码
WorkResult<User> result = workerWrapper.getWorkResult();
WorkResult<User> result1 = workerWrapper1.getWorkResult();
workerWrapper1.setParam(result);
workerWrapper2.setParam(result1);
Async.beginWork(3500, workerWrapper);
System.out.println(workerWrapper2.getWorkResult());
Async.shutDown();
}
}

View File

@@ -0,0 +1,55 @@
package depend;
import com.jd.platform.async.executor.Async;
import com.jd.platform.async.worker.WorkResult;
import com.jd.platform.async.wrapper.WorkerWrapper;
import java.util.concurrent.ExecutionException;
/**
* 后面请求依赖于前面请求的执行结果
* @author wuweifeng wrote on 2019-12-26
* @version 1.0
*/
public class Test {
public static void main(String[] args) throws ExecutionException, InterruptedException {
DeWorker w = new DeWorker();
DeWorker1 w1 = new DeWorker1();
DeWorker2 w2 = new DeWorker2();
WorkerWrapper<WorkResult<User>, String> workerWrapper2 = new WorkerWrapper.Builder<WorkResult<User>, String>()
.worker(w2)
.callback(w2)
.id("third")
.build();
WorkerWrapper<WorkResult<User>, User> workerWrapper1 = new WorkerWrapper.Builder<WorkResult<User>, User>()
.worker(w1)
.callback(w1)
.id("second")
.next(workerWrapper2)
.build();
WorkerWrapper<String, User> workerWrapper = new WorkerWrapper.Builder<String, User>()
.worker(w)
.param("0")
.id("first")
.next(workerWrapper1, true)
.callback(w)
.build();
//虽然尚未执行但是也可以先取得结果的引用作为下一个任务的入参。V1.2前写法,需要手工给
//V1.3后不用给wrapper setParam了直接在worker的action里自行根据id获取即可.参考dependnew包下代码
WorkResult<User> result = workerWrapper.getWorkResult();
WorkResult<User> result1 = workerWrapper1.getWorkResult();
workerWrapper1.setParam(result);
workerWrapper2.setParam(result1);
Async.beginWork(3500, workerWrapper);
System.out.println(workerWrapper2.getWorkResult());
Async.shutDown();
}
}

View File

@@ -0,0 +1,29 @@
package depend;
/**
* 一个包装类
* @author wuweifeng wrote on 2019-12-26
* @version 1.0
*/
public class User {
private String name;
public User(String name) {
this.name = name;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
@Override
public String toString() {
return "User{" +
"name='" + name + '\'' +
'}';
}
}

View File

@@ -0,0 +1,42 @@
package dependnew;
import com.jd.platform.async.callback.ICallback;
import com.jd.platform.async.callback.IWorker;
import com.jd.platform.async.worker.WorkResult;
import com.jd.platform.async.wrapper.WorkerWrapper;
import java.util.Map;
/**
* @author wuweifeng wrote on 2019-11-20.
*/
public class DeWorker implements IWorker<String, User>, ICallback<String, User> {
@Override
public User action(String object, Map<String, WorkerWrapper> allWrappers) {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return new User("user0");
}
@Override
public User defaultValue() {
return new User("default User");
}
@Override
public void begin() {
//System.out.println(Thread.currentThread().getName() + "- start --" + System.currentTimeMillis());
}
@Override
public void result(boolean success, String param, WorkResult<User> workResult) {
System.out.println("worker0 的结果是:" + workResult.getResult());
}
}

View File

@@ -0,0 +1,45 @@
package dependnew;
import com.jd.platform.async.callback.ICallback;
import com.jd.platform.async.callback.IWorker;
import com.jd.platform.async.worker.WorkResult;
import com.jd.platform.async.wrapper.WorkerWrapper;
import java.util.Map;
/**
* @author wuweifeng wrote on 2019-11-20.
*/
public class DeWorker1 implements IWorker<String, User>, ICallback<String, User> {
@Override
public User action(String object, Map<String, WorkerWrapper> allWrappers) {
System.out.println("-----------------");
System.out.println("获取par0的执行结果 " + allWrappers.get("first").getWorkResult());
System.out.println("取par0的结果作为自己的入参并将par0的结果加上一些东西");
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
User user0 = (User) allWrappers.get("first").getWorkResult().getResult();
return new User(user0.getName() + " worker1 add");
}
@Override
public User defaultValue() {
return new User("default User");
}
@Override
public void begin() {
//System.out.println(Thread.currentThread().getName() + "- start --" + System.currentTimeMillis());
}
@Override
public void result(boolean success, String param, WorkResult<User> workResult) {
System.out.println("worker1 的结果是:" + workResult.getResult());
}
}

View File

@@ -0,0 +1,45 @@
package dependnew;
import com.jd.platform.async.callback.ICallback;
import com.jd.platform.async.callback.IWorker;
import com.jd.platform.async.worker.WorkResult;
import com.jd.platform.async.wrapper.WorkerWrapper;
import java.util.Map;
/**
* @author wuweifeng wrote on 2019-11-20.
*/
public class DeWorker2 implements IWorker<User, String>, ICallback<User, String> {
@Override
public String action(User object, Map<String, WorkerWrapper> allWrappers) {
System.out.println("-----------------");
System.out.println("par1的执行结果是 " + allWrappers.get("second").getWorkResult());
System.out.println("取par1的结果作为自己的入参并将par1的结果加上一些东西");
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
User user1 = (User) allWrappers.get("second").getWorkResult().getResult();
return user1.getName() + " worker2 add";
}
@Override
public String defaultValue() {
return "default";
}
@Override
public void begin() {
//System.out.println(Thread.currentThread().getName() + "- start --" + System.currentTimeMillis());
}
@Override
public void result(boolean success, User param, WorkResult<String> workResult) {
System.out.println("worker2 的结果是:" + workResult.getResult());
}
}

View File

@@ -0,0 +1,49 @@
package dependnew;
import com.jd.platform.async.executor.Async;
import com.jd.platform.async.wrapper.WorkerWrapper;
import java.util.concurrent.ExecutionException;
/**
* 后面请求依赖于前面请求的执行结果
* @author wuweifeng wrote on 2019-12-26
* @version 1.0
*/
public class Test {
public static void main(String[] args) throws ExecutionException, InterruptedException {
DeWorker w = new DeWorker();
DeWorker1 w1 = new DeWorker1();
DeWorker2 w2 = new DeWorker2();
WorkerWrapper<User, String> workerWrapper2 = new WorkerWrapper.Builder<User, String>()
.worker(w2)
.callback(w2)
.id("third")
.build();
WorkerWrapper<String, User> workerWrapper1 = new WorkerWrapper.Builder<String, User>()
.worker(w1)
.callback(w1)
.id("second")
.next(workerWrapper2)
.build();
WorkerWrapper<String, User> workerWrapper = new WorkerWrapper.Builder<String, User>()
.worker(w)
.param("0")
.id("first")
.next(workerWrapper1)
.callback(w)
.build();
//V1.3后不用给wrapper setParam了直接在worker的action里自行根据id获取即可
Async.beginWork(3500, workerWrapper);
System.out.println(workerWrapper2.getWorkResult());
Async.shutDown();
}
}

View File

@@ -0,0 +1,29 @@
package dependnew;
/**
* 一个包装类
* @author wuweifeng wrote on 2019-12-26
* @version 1.0
*/
public class User {
private String name;
public User(String name) {
this.name = name;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
@Override
public String toString() {
return "User{" +
"name='" + name + '\'' +
'}';
}
}

View File

@@ -0,0 +1,49 @@
package parallel;
import com.jd.platform.async.callback.ICallback;
import com.jd.platform.async.callback.IWorker;
import com.jd.platform.async.executor.timer.SystemClock;
import com.jd.platform.async.worker.WorkResult;
import com.jd.platform.async.wrapper.WorkerWrapper;
import java.util.Map;
/**
* @author wuweifeng wrote on 2019-11-20.
*/
public class ParTimeoutWorker implements IWorker<String, String>, ICallback<String, String> {
@Override
public String action(String object, Map<String, WorkerWrapper> allWrappers) {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "result = " + SystemClock.now() + "---param = " + object + " from 0";
}
@Override
public String defaultValue() {
return "worker0--default";
}
@Override
public void begin() {
//System.out.println(Thread.currentThread().getName() + "- start --" + System.currentTimeMillis());
}
@Override
public void result(boolean success, String param, WorkResult<String> workResult) {
if (success) {
System.out.println("callback worker0 success--" + SystemClock.now() + "----" + workResult.getResult()
+ "-threadName:" +Thread.currentThread().getName());
} else {
System.err.println("callback worker0 failure--" + SystemClock.now() + "----" + workResult.getResult()
+ "-threadName:" +Thread.currentThread().getName());
}
}
}

View File

@@ -0,0 +1,49 @@
package parallel;
import com.jd.platform.async.callback.ICallback;
import com.jd.platform.async.callback.IWorker;
import com.jd.platform.async.executor.timer.SystemClock;
import com.jd.platform.async.worker.WorkResult;
import com.jd.platform.async.wrapper.WorkerWrapper;
import java.util.Map;
/**
* @author wuweifeng wrote on 2019-11-20.
*/
public class ParWorker implements IWorker<String, String>, ICallback<String, String> {
@Override
public String action(String object, Map<String, WorkerWrapper> allWrappers) {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "result = " + SystemClock.now() + "---param = " + object + " from 0";
}
@Override
public String defaultValue() {
return "worker0--default";
}
@Override
public void begin() {
//System.out.println(Thread.currentThread().getName() + "- start --" + System.currentTimeMillis());
}
@Override
public void result(boolean success, String param, WorkResult<String> workResult) {
if (success) {
System.out.println("callback worker0 success--" + SystemClock.now() + "----" + workResult.getResult()
+ "-threadName:" +Thread.currentThread().getName());
} else {
System.err.println("callback worker0 failure--" + SystemClock.now() + "----" + workResult.getResult()
+ "-threadName:" +Thread.currentThread().getName());
}
}
}

View File

@@ -0,0 +1,53 @@
package parallel;
import com.jd.platform.async.callback.ICallback;
import com.jd.platform.async.callback.IWorker;
import com.jd.platform.async.executor.timer.SystemClock;
import com.jd.platform.async.worker.WorkResult;
import com.jd.platform.async.wrapper.WorkerWrapper;
import java.util.Map;
/**
* @author wuweifeng wrote on 2019-11-20.
*/
public class ParWorker1 implements IWorker<String, String>, ICallback<String, String> {
private long sleepTime = 1000;
public void setSleepTime(long sleepTime) {
this.sleepTime = sleepTime;
}
@Override
public String action(String object, Map<String, WorkerWrapper> allWrappers) {
try {
Thread.sleep(sleepTime);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "result = " + SystemClock.now() + "---param = " + object + " from 1";
}
@Override
public String defaultValue() {
return "worker1--default";
}
@Override
public void begin() {
//System.out.println(Thread.currentThread().getName() + "- start --" + System.currentTimeMillis());
}
@Override
public void result(boolean success, String param, WorkResult<String> workResult) {
if (success) {
System.out.println("callback worker1 success--" + SystemClock.now() + "----" + workResult.getResult()
+ "-threadName:" +Thread.currentThread().getName());
} else {
System.err.println("callback worker1 failure--" + SystemClock.now() + "----" + workResult.getResult()
+ "-threadName:" +Thread.currentThread().getName());
}
}
}

View File

@@ -0,0 +1,54 @@
package parallel;
import com.jd.platform.async.callback.ICallback;
import com.jd.platform.async.callback.IWorker;
import com.jd.platform.async.executor.timer.SystemClock;
import com.jd.platform.async.worker.WorkResult;
import com.jd.platform.async.wrapper.WorkerWrapper;
import java.util.Map;
/**
* @author wuweifeng wrote on 2019-11-20.
*/
public class ParWorker2 implements IWorker<String, String>, ICallback<String, String> {
private long sleepTime = 1000;
public void setSleepTime(long sleepTime) {
this.sleepTime = sleepTime;
}
@Override
public String action(String object, Map<String, WorkerWrapper> allWrappers) {
try {
Thread.sleep(sleepTime);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "result = " + SystemClock.now() + "---param = " + object + " from 2";
}
@Override
public String defaultValue() {
return "worker2--default";
}
@Override
public void begin() {
//System.out.println(Thread.currentThread().getName() + "- start --" + System.currentTimeMillis());
}
@Override
public void result(boolean success, String param, WorkResult<String> workResult) {
if (success) {
System.out.println("callback worker2 success--" + SystemClock.now() + "----" + workResult.getResult()
+ "-threadName:" +Thread.currentThread().getName());
} else {
System.err.println("callback worker2 failure--" + SystemClock.now() + "----" + workResult.getResult()
+ "-threadName:" +Thread.currentThread().getName());
}
}
}

View File

@@ -0,0 +1,53 @@
package parallel;
import com.jd.platform.async.callback.ICallback;
import com.jd.platform.async.callback.IWorker;
import com.jd.platform.async.executor.timer.SystemClock;
import com.jd.platform.async.worker.WorkResult;
import com.jd.platform.async.wrapper.WorkerWrapper;
import java.util.Map;
/**
* @author wuweifeng wrote on 2019-11-20.
*/
public class ParWorker3 implements IWorker<String, String>, ICallback<String, String> {
private long sleepTime = 1000;
public void setSleepTime(long sleepTime) {
this.sleepTime = sleepTime;
}
@Override
public String action(String object, Map<String, WorkerWrapper> allWrappers) {
try {
Thread.sleep(sleepTime);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "result = " + SystemClock.now() + "---param = " + object + " from 3";
}
@Override
public String defaultValue() {
return "worker3--default";
}
@Override
public void begin() {
//System.out.println(Thread.currentThread().getName() + "- start --" + System.currentTimeMillis());
}
@Override
public void result(boolean success, String param, WorkResult<String> workResult) {
if (success) {
System.out.println("callback worker3 success--" + SystemClock.now() + "----" + workResult.getResult()
+ "-threadName:" +Thread.currentThread().getName());
} else {
System.err.println("callback worker3 failure--" + SystemClock.now() + "----" + workResult.getResult()
+ "-threadName:" +Thread.currentThread().getName());
}
}
}

View File

@@ -0,0 +1,49 @@
package parallel;
import com.jd.platform.async.callback.ICallback;
import com.jd.platform.async.callback.IWorker;
import com.jd.platform.async.executor.timer.SystemClock;
import com.jd.platform.async.worker.WorkResult;
import com.jd.platform.async.wrapper.WorkerWrapper;
import java.util.Map;
/**
* @author wuweifeng wrote on 2019-11-20.
*/
public class ParWorker4 implements IWorker<String, String>, ICallback<String, String> {
@Override
public String action(String object, Map<String, WorkerWrapper> allWrappers) {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "result = " + SystemClock.now() + "---param = " + object + " from 4";
}
@Override
public String defaultValue() {
return "worker4--default";
}
@Override
public void begin() {
//System.out.println(Thread.currentThread().getName() + "- start --" + System.currentTimeMillis());
}
@Override
public void result(boolean success, String param, WorkResult<String> workResult) {
if (success) {
System.out.println("callback worker4 success--" + SystemClock.now() + "----" + workResult.getResult()
+ "-threadName:" +Thread.currentThread().getName());
} else {
System.err.println("callback worker4 failure--" + SystemClock.now() + "----" + workResult.getResult()
+ "-threadName:" +Thread.currentThread().getName());
}
}
}

View File

@@ -0,0 +1,53 @@
package parallel;
import com.jd.platform.async.callback.ICallback;
import com.jd.platform.async.callback.IWorker;
import com.jd.platform.async.executor.timer.SystemClock;
import com.jd.platform.async.worker.WorkResult;
import com.jd.platform.async.wrapper.WorkerWrapper;
import java.util.Map;
/**
* @author wuweifeng wrote on 2019-11-20.
*/
public class ParWorker5 implements IWorker<String, String>, ICallback<String, String> {
private long sleepTime = 1000;
public void setSleepTime(long sleepTime) {
this.sleepTime = sleepTime;
}
@Override
public String action(String object, Map<String, WorkerWrapper> allWrappers) {
try {
Thread.sleep(sleepTime);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "result = " + SystemClock.now() + "---param = " + object + " from 3";
}
@Override
public String defaultValue() {
return "worker3--default";
}
@Override
public void begin() {
//System.out.println(Thread.currentThread().getName() + "- start --" + System.currentTimeMillis());
}
@Override
public void result(boolean success, String param, WorkResult<String> workResult) {
if (success) {
System.out.println("callback worker3 success--" + SystemClock.now() + "----" + workResult.getResult()
+ "-threadName:" +Thread.currentThread().getName());
} else {
System.err.println("callback worker3 failure--" + SystemClock.now() + "----" + workResult.getResult()
+ "-threadName:" +Thread.currentThread().getName());
}
}
}

View File

@@ -0,0 +1,53 @@
package parallel;
import com.jd.platform.async.callback.ICallback;
import com.jd.platform.async.callback.IWorker;
import com.jd.platform.async.executor.timer.SystemClock;
import com.jd.platform.async.worker.WorkResult;
import com.jd.platform.async.wrapper.WorkerWrapper;
import java.util.Map;
/**
* @author wuweifeng wrote on 2019-11-20.
*/
public class ParWorker6 implements IWorker<String, String>, ICallback<String, String> {
private long sleepTime = 1000;
public void setSleepTime(long sleepTime) {
this.sleepTime = sleepTime;
}
@Override
public String action(String object, Map<String, WorkerWrapper> allWrappers) {
try {
Thread.sleep(sleepTime);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "result = " + SystemClock.now() + "---param = " + object + " from 3";
}
@Override
public String defaultValue() {
return "worker3--default";
}
@Override
public void begin() {
//System.out.println(Thread.currentThread().getName() + "- start --" + System.currentTimeMillis());
}
@Override
public void result(boolean success, String param, WorkResult<String> workResult) {
if (success) {
System.out.println("callback worker3 success--" + SystemClock.now() + "----" + workResult.getResult()
+ "-threadName:" +Thread.currentThread().getName());
} else {
System.err.println("callback worker3 failure--" + SystemClock.now() + "----" + workResult.getResult()
+ "-threadName:" +Thread.currentThread().getName());
}
}
}

View File

@@ -0,0 +1,53 @@
package parallel;
import com.jd.platform.async.callback.ICallback;
import com.jd.platform.async.callback.IWorker;
import com.jd.platform.async.executor.timer.SystemClock;
import com.jd.platform.async.worker.WorkResult;
import com.jd.platform.async.wrapper.WorkerWrapper;
import java.util.Map;
/**
* @author wuweifeng wrote on 2019-11-20.
*/
public class ParWorker7 implements IWorker<String, String>, ICallback<String, String> {
private long sleepTime = 1000;
public void setSleepTime(long sleepTime) {
this.sleepTime = sleepTime;
}
@Override
public String action(String object, Map<String, WorkerWrapper> allWrappers) {
try {
Thread.sleep(sleepTime);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "result = " + SystemClock.now() + "---param = " + object + " from 3";
}
@Override
public String defaultValue() {
return "worker3--default";
}
@Override
public void begin() {
//System.out.println(Thread.currentThread().getName() + "- start --" + System.currentTimeMillis());
}
@Override
public void result(boolean success, String param, WorkResult<String> workResult) {
if (success) {
System.out.println("callback worker3 success--" + SystemClock.now() + "----" + workResult.getResult()
+ "-threadName:" +Thread.currentThread().getName());
} else {
System.err.println("callback worker3 failure--" + SystemClock.now() + "----" + workResult.getResult()
+ "-threadName:" +Thread.currentThread().getName());
}
}
}

View File

@@ -0,0 +1,871 @@
package parallel;
import com.jd.platform.async.executor.Async;
import com.jd.platform.async.executor.timer.SystemClock;
import com.jd.platform.async.wrapper.WorkerWrapper;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
/**
* 并行测试
*
* @author wuweifeng wrote on 2019-11-20.
*/
@SuppressWarnings("ALL")
public class TestPar {
public static void main(String[] args) throws Exception {
// testNormal();
// testMulti();
// testMultiReverse();
testMultiError();
// testMultiError2();
// testMulti3();
// testMulti3Reverse();
// testMulti4();
// testMulti4Reverse();
// testMulti5();
// testMulti5Reverse();
// testMulti6();
// testMulti7();
// testMulti8();
// testMulti9();
// testMulti9Reverse();
}
/**
* 3个并行测试不同时间的超时
*/
private static void testNormal() throws InterruptedException, ExecutionException {
ParWorker w = new ParWorker();
ParWorker1 w1 = new ParWorker1();
ParWorker2 w2 = new ParWorker2();
WorkerWrapper<String, String> workerWrapper2 = new WorkerWrapper.Builder<String, String>()
.worker(w2)
.callback(w2)
.param("2")
.build();
WorkerWrapper<String, String> workerWrapper1 = new WorkerWrapper.Builder<String, String>()
.worker(w1)
.callback(w1)
.param("1")
.build();
WorkerWrapper<String, String> workerWrapper = new WorkerWrapper.Builder<String, String>()
.worker(w)
.callback(w)
.param("0")
.build();
long now = SystemClock.now();
System.out.println("begin-" + now);
Async.beginWork(1500, workerWrapper, workerWrapper1, workerWrapper2);
// Async.beginWork(800, workerWrapper, workerWrapper1, workerWrapper2);
// Async.beginWork(1000, workerWrapper, workerWrapper1, workerWrapper2);
System.out.println("end-" + SystemClock.now());
System.err.println("cost-" + (SystemClock.now() - now));
System.out.println(Async.getThreadCount());
System.out.println(workerWrapper.getWorkResult());
Async.shutDown();
}
/**
* 0,2同时开启,1在0后面
* 0---1
* 2
*/
private static void testMulti() throws ExecutionException, InterruptedException {
ParWorker w = new ParWorker();
ParWorker1 w1 = new ParWorker1();
ParWorker2 w2 = new ParWorker2();
WorkerWrapper<String, String> workerWrapper2 = new WorkerWrapper.Builder<String, String>()
.worker(w2)
.callback(w2)
.param("2")
.build();
WorkerWrapper<String, String> workerWrapper1 = new WorkerWrapper.Builder<String, String>()
.worker(w1)
.callback(w1)
.param("1")
.build();
WorkerWrapper<String, String> workerWrapper = new WorkerWrapper.Builder<String, String>()
.worker(w)
.callback(w)
.param("0")
.next(workerWrapper1)
.build();
long now = SystemClock.now();
System.out.println("begin-" + now);
Async.beginWork(2500, workerWrapper, workerWrapper2);
System.out.println("end-" + SystemClock.now());
System.err.println("cost-" + (SystemClock.now() - now));
Async.shutDown();
}
/**
* 0,2同时开启,1在0后面
* 0---1
* 2
*/
private static void testMultiReverse() throws ExecutionException, InterruptedException {
ParWorker w = new ParWorker();
ParWorker1 w1 = new ParWorker1();
ParWorker2 w2 = new ParWorker2();
WorkerWrapper<String, String> workerWrapper = new WorkerWrapper.Builder<String, String>()
.worker(w)
.callback(w)
.param("0")
.build();
WorkerWrapper<String, String> workerWrapper1 = new WorkerWrapper.Builder<String, String>()
.worker(w1)
.callback(w1)
.param("1")
.depend(workerWrapper)
.build();
WorkerWrapper<String, String> workerWrapper2 = new WorkerWrapper.Builder<String, String>()
.worker(w2)
.callback(w2)
.param("2")
.build();
long now = SystemClock.now();
System.out.println("begin-" + now);
Async.beginWork(2500, workerWrapper, workerWrapper2);
System.out.println("end-" + SystemClock.now());
System.err.println("cost-" + (SystemClock.now() - now));
Async.shutDown();
}
/**
* 0,2同时开启,1在0后面. 组超时,则0和2成功,1失败
* 0---1
* 2
*/
private static void testMultiError() throws ExecutionException, InterruptedException {
ParWorker w = new ParWorker();
ParWorker1 w1 = new ParWorker1();
ParWorker2 w2 = new ParWorker2();
WorkerWrapper<String, String> workerWrapper2 = new WorkerWrapper.Builder<String, String>()
.worker(w2)
.callback(w2)
.param("2")
.build();
WorkerWrapper<String, String> workerWrapper1 = new WorkerWrapper.Builder<String, String>()
.worker(w1)
.callback(w1)
.param("1")
.build();
WorkerWrapper<String, String> workerWrapper = new WorkerWrapper.Builder<String, String>()
.worker(w)
.callback(w)
.param("0")
.next(workerWrapper1)
.build();
long now = SystemClock.now();
System.out.println("begin-" + now);
Async.beginWork(1500, workerWrapper, workerWrapper2);
System.out.println("end-" + SystemClock.now());
System.err.println("cost-" + (SystemClock.now() - now));
Async.shutDown();
}
/**
* 0执行完,同时1和2, 1\2都完成后3
* 1
* 0 3
* 2
*/
private static void testMulti3() throws ExecutionException, InterruptedException {
ParWorker w = new ParWorker();
ParWorker1 w1 = new ParWorker1();
ParWorker2 w2 = new ParWorker2();
ParWorker3 w3 = new ParWorker3();
WorkerWrapper<String, String> workerWrapper3 = new WorkerWrapper.Builder<String, String>()
.worker(w3)
.callback(w3)
.param("3")
.build();
WorkerWrapper<String, String> workerWrapper2 = new WorkerWrapper.Builder<String, String>()
.worker(w2)
.callback(w2)
.param("2")
.next(workerWrapper3)
.build();
WorkerWrapper<String, String> workerWrapper1 = new WorkerWrapper.Builder<String, String>()
.worker(w1)
.callback(w1)
.param("1")
.next(workerWrapper3)
.build();
WorkerWrapper<String, String> workerWrapper = new WorkerWrapper.Builder<String, String>()
.worker(w)
.callback(w)
.param("0")
.next(workerWrapper1, workerWrapper2)
.build();
long now = SystemClock.now();
System.out.println("begin-" + now);
Async.beginWork(3100, workerWrapper);
// Async.beginWork(2100, workerWrapper);
System.out.println("end-" + SystemClock.now());
System.err.println("cost-" + (SystemClock.now() - now));
System.out.println(Async.getThreadCount());
Async.shutDown();
}
/**
* 0执行完,同时1和2, 1\2都完成后3
* 1
* 0 3
* 2
*/
private static void testMulti3Reverse() throws ExecutionException, InterruptedException {
ParWorker w = new ParWorker();
ParWorker1 w1 = new ParWorker1();
ParWorker2 w2 = new ParWorker2();
ParWorker3 w3 = new ParWorker3();
WorkerWrapper<String, String> workerWrapper = new WorkerWrapper.Builder<String, String>()
.worker(w)
.callback(w)
.param("0")
.build();
WorkerWrapper<String, String> workerWrapper2 = new WorkerWrapper.Builder<String, String>()
.worker(w2)
.callback(w2)
.param("2")
.depend(workerWrapper)
.build();
WorkerWrapper<String, String> workerWrapper1 = new WorkerWrapper.Builder<String, String>()
.worker(w1)
.callback(w1)
.param("1")
.depend(workerWrapper)
.build();
WorkerWrapper<String, String> workerWrapper3 = new WorkerWrapper.Builder<String, String>()
.worker(w3)
.callback(w3)
.param("3")
.depend(workerWrapper1, workerWrapper2)
.build();
long now = SystemClock.now();
System.out.println("begin-" + now);
Async.beginWork(3100, workerWrapper);
// Async.beginWork(2100, workerWrapper);
System.out.println("end-" + SystemClock.now());
System.err.println("cost-" + (SystemClock.now() - now));
System.out.println(Async.getThreadCount());
Async.shutDown();
}
/**
* 0执行完,同时1和2, 1\2都完成后32耗时2秒1耗时1秒。3会等待2完成
* 1
* 0 3
* 2
*
* 执行结果0123
*/
private static void testMulti4() throws ExecutionException, InterruptedException {
ParWorker w = new ParWorker();
ParWorker1 w1 = new ParWorker1();
ParWorker2 w2 = new ParWorker2();
w2.setSleepTime(2000);
ParWorker3 w3 = new ParWorker3();
WorkerWrapper<String, String> workerWrapper3 = new WorkerWrapper.Builder<String, String>()
.worker(w3)
.callback(w3)
.param("3")
.build();
WorkerWrapper<String, String> workerWrapper2 = new WorkerWrapper.Builder<String, String>()
.worker(w2)
.callback(w2)
.param("2")
.next(workerWrapper3)
.build();
WorkerWrapper<String, String> workerWrapper1 = new WorkerWrapper.Builder<String, String>()
.worker(w1)
.callback(w1)
.param("1")
.next(workerWrapper3)
.build();
WorkerWrapper<String, String> workerWrapper = new WorkerWrapper.Builder<String, String>()
.worker(w)
.callback(w)
.param("0")
.next(workerWrapper1, workerWrapper2)
.build();
long now = SystemClock.now();
System.out.println("begin-" + now);
//正常完毕
Async.beginWork(4100, workerWrapper);
//3会超时
// Async.beginWork(3100, workerWrapper);
//2,3会超时
// Async.beginWork(2900, workerWrapper);
System.out.println("end-" + SystemClock.now());
System.err.println("cost-" + (SystemClock.now() - now));
System.out.println(Async.getThreadCount());
Async.shutDown();
}
/**
* 0执行完,同时1和2, 1\2都完成后32耗时2秒1耗时1秒。3会等待2完成
* 1
* 0 3
* 2
*
* 执行结果0123
*/
private static void testMulti4Reverse() throws ExecutionException, InterruptedException {
ParWorker w = new ParWorker();
ParWorker1 w1 = new ParWorker1();
ParWorker2 w2 = new ParWorker2();
w2.setSleepTime(2000);
ParWorker3 w3 = new ParWorker3();
WorkerWrapper<String, String> workerWrapper = new WorkerWrapper.Builder<String, String>()
.worker(w)
.callback(w)
.param("0")
.build();
WorkerWrapper<String, String> workerWrapper3 = new WorkerWrapper.Builder<String, String>()
.worker(w3)
.callback(w3)
.param("3")
.build();
WorkerWrapper<String, String> workerWrapper2 = new WorkerWrapper.Builder<String, String>()
.worker(w2)
.callback(w2)
.param("2")
.depend(workerWrapper)
.next(workerWrapper3)
.build();
WorkerWrapper<String, String> workerWrapper1 = new WorkerWrapper.Builder<String, String>()
.worker(w1)
.callback(w1)
.param("1")
.depend(workerWrapper)
.next(workerWrapper3)
.build();
long now = SystemClock.now();
System.out.println("begin-" + now);
//正常完毕
Async.beginWork(4100, workerWrapper);
//3会超时
// Async.beginWork(3100, workerWrapper);
//2,3会超时
// Async.beginWork(2900, workerWrapper);
System.out.println("end-" + SystemClock.now());
System.err.println("cost-" + (SystemClock.now() - now));
System.out.println(Async.getThreadCount());
Async.shutDown();
}
/**
* 0执行完,同时1和2, 1\2 任何一个执行完后都执行3
* 1
* 0 3
* 2
*
* 则结果是:
* 0231
* 23分别是500、400.3执行完毕后1才执行完
*/
private static void testMulti5() throws ExecutionException, InterruptedException {
ParWorker w = new ParWorker();
ParWorker1 w1 = new ParWorker1();
ParWorker2 w2 = new ParWorker2();
w2.setSleepTime(500);
ParWorker3 w3 = new ParWorker3();
w3.setSleepTime(400);
WorkerWrapper<String, String> workerWrapper3 = new WorkerWrapper.Builder<String, String>()
.worker(w3)
.callback(w3)
.param("3")
.build();
WorkerWrapper<String, String> workerWrapper2 = new WorkerWrapper.Builder<String, String>()
.worker(w2)
.callback(w2)
.param("2")
.next(workerWrapper3, false)
.build();
WorkerWrapper<String, String> workerWrapper1 = new WorkerWrapper.Builder<String, String>()
.worker(w1)
.callback(w1)
.param("1")
.next(workerWrapper3, false)
.build();
WorkerWrapper<String, String> workerWrapper = new WorkerWrapper.Builder<String, String>()
.worker(w)
.callback(w)
.param("0")
.next(workerWrapper1, workerWrapper2)
.build();
long now = SystemClock.now();
System.out.println("begin-" + now);
//正常完毕
Async.beginWork(4100, workerWrapper);
System.out.println("end-" + SystemClock.now());
System.err.println("cost-" + (SystemClock.now() - now));
System.out.println(Async.getThreadCount());
Async.shutDown();
}
/**
* 0执行完,同时1和2, 1\2 任何一个执行完后都执行3
* 1
* 0 3
* 2
*
* 则结果是:
* 0231
* 23分别是500、400.3执行完毕后1才执行完
*/
private static void testMulti5Reverse() throws ExecutionException, InterruptedException {
ParWorker w = new ParWorker();
ParWorker1 w1 = new ParWorker1();
ParWorker2 w2 = new ParWorker2();
w2.setSleepTime(500);
ParWorker3 w3 = new ParWorker3();
w3.setSleepTime(400);
WorkerWrapper<String, String> workerWrapper = new WorkerWrapper.Builder<String, String>()
.worker(w)
.callback(w)
.param("0")
.build();
WorkerWrapper<String, String> workerWrapper3 = new WorkerWrapper.Builder<String, String>()
.worker(w3)
.callback(w3)
.param("3")
.build();
WorkerWrapper<String, String> workerWrapper2 = new WorkerWrapper.Builder<String, String>()
.worker(w2)
.callback(w2)
.param("2")
.depend(workerWrapper, true)
.next(workerWrapper3, false)
.build();
WorkerWrapper<String, String> workerWrapper1 = new WorkerWrapper.Builder<String, String>()
.worker(w1)
.callback(w1)
.param("1")
.depend(workerWrapper, true)
.next(workerWrapper3, false)
.build();
long now = SystemClock.now();
System.out.println("begin-" + now);
//正常完毕
Async.beginWork(4100, workerWrapper);
System.out.println("end-" + SystemClock.now());
System.err.println("cost-" + (SystemClock.now() - now));
System.out.println(Async.getThreadCount());
Async.shutDown();
}
/**
* 0执行完,同时1和2, 必须1执行完毕后才能执行3. 无论2是否领先1完毕都要等1
* 1
* 0 3
* 2
*
* 则结果是:
* 0213
*
* 23分别是500、400.2执行完了1没完那就等着1完毕才能3
*/
private static void testMulti6() throws ExecutionException, InterruptedException {
ParWorker w = new ParWorker();
ParWorker1 w1 = new ParWorker1();
ParWorker2 w2 = new ParWorker2();
w2.setSleepTime(500);
ParWorker3 w3 = new ParWorker3();
w3.setSleepTime(400);
WorkerWrapper<String, String> workerWrapper3 = new WorkerWrapper.Builder<String, String>()
.worker(w3)
.callback(w3)
.param("3")
.build();
//设置2不是必须
WorkerWrapper<String, String> workerWrapper2 = new WorkerWrapper.Builder<String, String>()
.worker(w2)
.callback(w2)
.param("2")
.next(workerWrapper3, false)
.build();
// 设置1是必须的
WorkerWrapper<String, String> workerWrapper1 = new WorkerWrapper.Builder<String, String>()
.worker(w1)
.callback(w1)
.param("1")
.next(workerWrapper3, true)
.build();
WorkerWrapper<String, String> workerWrapper0 = new WorkerWrapper.Builder<String, String>()
.worker(w)
.callback(w)
.param("0")
.next(workerWrapper2, workerWrapper1)
.build();
long now = SystemClock.now();
System.out.println("begin-" + now);
//正常完毕
Async.beginWork(4100, workerWrapper0);
System.out.println("end-" + SystemClock.now());
System.err.println("cost-" + (SystemClock.now() - now));
System.out.println(Async.getThreadCount());
Async.shutDown();
}
/**
* 两个0并行上面0执行完,同时1和2, 下面0执行完开始1上面的 必须1、2执行完毕后才能执行3. 最后必须2、3都完成才能4
* 1
* 0 3
* 2 4
* ---------
* 0 1 2
*
* 则结果是:
* callback worker0 success--1577242870969----result = 1577242870968---param = 00 from 0-threadName:Thread-1
* callback worker0 success--1577242870969----result = 1577242870968---param = 0 from 0-threadName:Thread-0
* callback worker1 success--1577242871972----result = 1577242871972---param = 11 from 1-threadName:Thread-1
* callback worker1 success--1577242871972----result = 1577242871972---param = 1 from 1-threadName:Thread-2
* callback worker2 success--1577242871973----result = 1577242871973---param = 2 from 2-threadName:Thread-3
* callback worker2 success--1577242872975----result = 1577242872975---param = 22 from 2-threadName:Thread-1
* callback worker3 success--1577242872977----result = 1577242872977---param = 3 from 3-threadName:Thread-2
* callback worker4 success--1577242873980----result = 1577242873980---param = 4 from 3-threadName:Thread-2
*/
private static void testMulti7() throws ExecutionException, InterruptedException {
ParWorker w = new ParWorker();
ParWorker1 w1 = new ParWorker1();
ParWorker2 w2 = new ParWorker2();
ParWorker3 w3 = new ParWorker3();
ParWorker4 w4 = new ParWorker4();
WorkerWrapper<String, String> workerWrapper4 = new WorkerWrapper.Builder<String, String>()
.worker(w4)
.callback(w4)
.param("4")
.build();
WorkerWrapper<String, String> workerWrapper3 = new WorkerWrapper.Builder<String, String>()
.worker(w3)
.callback(w3)
.param("3")
.next(workerWrapper4)
.build();
//下面的2
WorkerWrapper<String, String> workerWrapper22 = new WorkerWrapper.Builder<String, String>()
.worker(w2)
.callback(w2)
.param("22")
.next(workerWrapper4)
.build();
//下面的1
WorkerWrapper<String, String> workerWrapper11 = new WorkerWrapper.Builder<String, String>()
.worker(w1)
.callback(w1)
.param("11")
.next(workerWrapper22)
.build();
//下面的0
WorkerWrapper<String, String> workerWrapper00 = new WorkerWrapper.Builder<String, String>()
.worker(w)
.callback(w)
.param("00")
.next(workerWrapper11)
.build();
//上面的1
WorkerWrapper<String, String> workerWrapper1 = new WorkerWrapper.Builder<String, String>()
.worker(w1)
.callback(w1)
.param("1")
.next(workerWrapper3)
.build();
//上面的2
WorkerWrapper<String, String> workerWrapper2 = new WorkerWrapper.Builder<String, String>()
.worker(w2)
.callback(w2)
.param("2")
.next(workerWrapper3)
.build();
//上面的0
WorkerWrapper<String, String> workerWrapper0 = new WorkerWrapper.Builder<String, String>()
.worker(w)
.callback(w)
.param("0")
.next(workerWrapper1, workerWrapper2)
.build();
long now = SystemClock.now();
System.out.println("begin-" + now);
//正常完毕
Async.beginWork(4100, workerWrapper00, workerWrapper0);
System.out.println("end-" + SystemClock.now());
System.err.println("cost-" + (SystemClock.now() - now));
System.out.println(Async.getThreadCount());
Async.shutDown();
}
/**
* a1 -> b -> c
* a2 -> b -> c
*
* b、c
*/
private static void testMulti8() throws ExecutionException, InterruptedException {
ParWorker w = new ParWorker();
ParWorker1 w1 = new ParWorker1();
w1.setSleepTime(1005);
ParWorker2 w2 = new ParWorker2();
w2.setSleepTime(3000);
ParWorker3 w3 = new ParWorker3();
w3.setSleepTime(1000);
WorkerWrapper<String, String> workerWrapper3 = new WorkerWrapper.Builder<String, String>()
.worker(w3)
.callback(w3)
.param("c")
.build();
WorkerWrapper<String, String> workerWrapper2 = new WorkerWrapper.Builder<String, String>()
.worker(w2)
.callback(w2)
.param("b")
.next(workerWrapper3)
.build();
WorkerWrapper<String, String> workerWrappera1 = new WorkerWrapper.Builder<String, String>()
.worker(w1)
.callback(w1)
.param("a1")
.next(workerWrapper2)
.build();
WorkerWrapper<String, String> workerWrappera2 = new WorkerWrapper.Builder<String, String>()
.worker(w1)
.callback(w1)
.param("a2")
.next(workerWrapper2)
.build();
Async.beginWork(6000, workerWrappera1, workerWrappera2);
Async.shutDown();
}
/**
* w1 -> w2 -> w3
* --- last
* w
* w1和w并行w执行完后就执行last此时b、c还没开始b、c就不需要执行了
*/
private static void testMulti9() throws ExecutionException, InterruptedException {
ParWorker1 w1 = new ParWorker1();
//注意这里如果w1的执行时间比w长那么w2和w3肯定不走。 如果w1和w执行时间一样长多运行几次会发现w2有时走有时不走
// w1.setSleepTime(1100);
ParWorker w = new ParWorker();
ParWorker2 w2 = new ParWorker2();
ParWorker3 w3 = new ParWorker3();
ParWorker4 w4 = new ParWorker4();
WorkerWrapper<String, String> last = new WorkerWrapper.Builder<String, String>()
.worker(w1)
.callback(w1)
.param("last")
.build();
WorkerWrapper<String, String> wrapperW = new WorkerWrapper.Builder<String, String>()
.worker(w)
.callback(w)
.param("w")
.next(last, false)
.build();
WorkerWrapper<String, String> wrapperW3 = new WorkerWrapper.Builder<String, String>()
.worker(w3)
.callback(w3)
.param("w3")
.next(last, false)
.build();
WorkerWrapper<String, String> wrapperW2 = new WorkerWrapper.Builder<String, String>()
.worker(w2)
.callback(w2)
.param("w2")
.next(wrapperW3)
.build();
WorkerWrapper<String, String> wrapperW1 = new WorkerWrapper.Builder<String, String>()
.worker(w1)
.callback(w1)
.param("w1")
.next(wrapperW2)
.build();
Async.beginWork(6000, wrapperW, wrapperW1);
Async.shutDown();
}
/**
* w1 -> w2 -> w3
* --- last
* w
* w1和w并行w执行完后就执行last此时b、c还没开始b、c就不需要执行了
*/
private static void testMulti9Reverse() throws ExecutionException, InterruptedException {
ParWorker1 w1 = new ParWorker1();
//注意这里如果w1的执行时间比w长那么w2和w3肯定不走。 如果w1和w执行时间一样长多运行几次会发现w2有时走有时不走
// w1.setSleepTime(1100);
ParWorker w = new ParWorker();
ParWorker2 w2 = new ParWorker2();
ParWorker3 w3 = new ParWorker3();
ParWorker4 w4 = new ParWorker4();
WorkerWrapper<String, String> wrapperW1 = new WorkerWrapper.Builder<String, String>()
.worker(w1)
.callback(w1)
.param("w1")
.build();
WorkerWrapper<String, String> wrapperW = new WorkerWrapper.Builder<String, String>()
.worker(w)
.callback(w)
.param("w")
.build();
WorkerWrapper<String, String> last = new WorkerWrapper.Builder<String, String>()
.worker(w1)
.callback(w1)
.param("last")
.depend(wrapperW)
.build();
WorkerWrapper<String, String> wrapperW2 = new WorkerWrapper.Builder<String, String>()
.worker(w2)
.callback(w2)
.param("w2")
.depend(wrapperW1)
.build();
WorkerWrapper<String, String> wrapperW3 = new WorkerWrapper.Builder<String, String>()
.worker(w3)
.callback(w3)
.param("w3")
.depend(wrapperW2)
.next(last, false)
.build();
Async.beginWork(6000,Executors.newCachedThreadPool(), wrapperW, wrapperW1);
Async.shutDown();
}
}

View File

@@ -0,0 +1,48 @@
package seq;
import com.jd.platform.async.callback.ICallback;
import com.jd.platform.async.callback.IWorker;
import com.jd.platform.async.executor.timer.SystemClock;
import com.jd.platform.async.worker.WorkResult;
import com.jd.platform.async.wrapper.WorkerWrapper;
import java.util.Map;
/**
* @author wuweifeng wrote on 2019-11-20.
*/
public class SeqTimeoutWorker implements IWorker<String, String>, ICallback<String, String> {
@Override
public String action(String object, Map<String, WorkerWrapper> allWrappers) {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "result = " + SystemClock.now() + "---param = " + object + " from 0";
}
@Override
public String defaultValue() {
return "worker0--default";
}
@Override
public void begin() {
//System.out.println(Thread.currentThread().getName() + "- start --" + System.currentTimeMillis());
}
@Override
public void result(boolean success, String param, WorkResult<String> workResult) {
if (success) {
System.out.println("callback worker0 success--" + SystemClock.now() + "----" + workResult.getResult()
+ "-threadName:" +Thread.currentThread().getName());
} else {
System.err.println("callback worker0 failure--" + SystemClock.now() + "----" + workResult.getResult()
+ "-threadName:" +Thread.currentThread().getName());
}
}
}

View File

@@ -0,0 +1,49 @@
package seq;
import com.jd.platform.async.callback.ICallback;
import com.jd.platform.async.callback.IWorker;
import com.jd.platform.async.executor.timer.SystemClock;
import com.jd.platform.async.worker.WorkResult;
import com.jd.platform.async.wrapper.WorkerWrapper;
import java.util.Map;
/**
* @author wuweifeng wrote on 2019-11-20.
*/
public class SeqWorker implements IWorker<String, String>, ICallback<String, String> {
@Override
public String action(String object, Map<String, WorkerWrapper> allWrappers) {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "result = " + SystemClock.now() + "---param = " + object + " from 0";
}
@Override
public String defaultValue() {
return "worker0--default";
}
@Override
public void begin() {
//System.out.println(Thread.currentThread().getName() + "- start --" + System.currentTimeMillis());
}
@Override
public void result(boolean success, String param, WorkResult<String> workResult) {
if (success) {
System.out.println("callback worker0 success--" + SystemClock.now() + "----" + workResult.getResult()
+ "-threadName:" +Thread.currentThread().getName());
} else {
System.err.println("callback worker0 failure--" + SystemClock.now() + "----" + workResult.getResult()
+ "-threadName:" +Thread.currentThread().getName());
}
}
}

View File

@@ -0,0 +1,48 @@
package seq;
import com.jd.platform.async.callback.ICallback;
import com.jd.platform.async.callback.IWorker;
import com.jd.platform.async.executor.timer.SystemClock;
import com.jd.platform.async.worker.WorkResult;
import com.jd.platform.async.wrapper.WorkerWrapper;
import java.util.Map;
/**
* @author wuweifeng wrote on 2019-11-20.
*/
public class SeqWorker1 implements IWorker<String, String>, ICallback<String, String> {
@Override
public String action(String object, Map<String, WorkerWrapper> allWrappers) {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "result = " + SystemClock.now() + "---param = " + object + " from 0";
}
@Override
public String defaultValue() {
return "worker1--default";
}
@Override
public void begin() {
//System.out.println(Thread.currentThread().getName() + "- start --" + System.currentTimeMillis());
}
@Override
public void result(boolean success, String param, WorkResult<String> workResult) {
if (success) {
System.out.println("callback worker1 success--" + SystemClock.now() + "----" + workResult.getResult()
+ "-threadName:" +Thread.currentThread().getName());
} else {
System.err.println("callback worker1 failure--" + SystemClock.now() + "----" + workResult.getResult()
+ "-threadName:" +Thread.currentThread().getName());
}
}
}

View File

@@ -0,0 +1,48 @@
package seq;
import com.jd.platform.async.callback.ICallback;
import com.jd.platform.async.callback.IWorker;
import com.jd.platform.async.executor.timer.SystemClock;
import com.jd.platform.async.worker.WorkResult;
import com.jd.platform.async.wrapper.WorkerWrapper;
import java.util.Map;
/**
* @author wuweifeng wrote on 2019-11-20.
*/
public class SeqWorker2 implements IWorker<String, String>, ICallback<String, String> {
@Override
public String action(String object, Map<String, WorkerWrapper> allWrappers) {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "result = " + SystemClock.now() + "---param = " + object + " from 0";
}
@Override
public String defaultValue() {
return "worker2--default";
}
@Override
public void begin() {
//System.out.println(Thread.currentThread().getName() + "- start --" + System.currentTimeMillis());
}
@Override
public void result(boolean success, String param, WorkResult<String> workResult) {
if (success) {
System.out.println("callback worker2 success--" + SystemClock.now() + "----" + workResult.getResult()
+ "-threadName:" +Thread.currentThread().getName());
} else {
System.err.println("callback worker2 failure--" + SystemClock.now() + "----" + workResult.getResult()
+ "-threadName:" +Thread.currentThread().getName());
}
}
}

View File

@@ -0,0 +1,71 @@
package seq;
import com.jd.platform.async.executor.Async;
import com.jd.platform.async.executor.timer.SystemClock;
import com.jd.platform.async.wrapper.WorkerWrapper;
import java.util.concurrent.ExecutionException;
/**
* 串行测试
* @author wuweifeng wrote on 2019-11-20.
*/
public class TestSequential {
public static void main(String[] args) throws InterruptedException, ExecutionException {
SeqWorker w = new SeqWorker();
SeqWorker1 w1 = new SeqWorker1();
SeqWorker2 w2 = new SeqWorker2();
//顺序0-1-2
WorkerWrapper<String, String> workerWrapper2 = new WorkerWrapper.Builder<String, String>()
.worker(w2)
.callback(w2)
.param("2")
.build();
WorkerWrapper<String, String> workerWrapper1 = new WorkerWrapper.Builder<String, String>()
.worker(w1)
.callback(w1)
.param("1")
.next(workerWrapper2)
.build();
WorkerWrapper<String, String> workerWrapper = new WorkerWrapper.Builder<String, String>()
.worker(w)
.callback(w)
.param("0")
.next(workerWrapper1)
.build();
// testNormal(workerWrapper);
testGroupTimeout(workerWrapper);
}
private static void testNormal(WorkerWrapper<String, String> workerWrapper) throws ExecutionException, InterruptedException {
long now = SystemClock.now();
System.out.println("begin-" + now);
Async.beginWork(3500, workerWrapper);
System.out.println("end-" + SystemClock.now());
System.err.println("cost-" + (SystemClock.now() - now));
Async.shutDown();
}
private static void testGroupTimeout(WorkerWrapper<String, String> workerWrapper) throws ExecutionException, InterruptedException {
long now = SystemClock.now();
System.out.println("begin-" + now);
Async.beginWork(2500, workerWrapper);
System.out.println("end-" + SystemClock.now());
System.err.println("cost-" + (SystemClock.now() - now));
Async.shutDown();
}
}

View File

@@ -0,0 +1,67 @@
package seq;
import com.jd.platform.async.executor.Async;
import com.jd.platform.async.executor.timer.SystemClock;
import com.jd.platform.async.wrapper.WorkerWrapper;
import java.util.concurrent.ExecutionException;
/**
* 串行测试
* @author wuweifeng wrote on 2019-11-20.
*/
@SuppressWarnings("Duplicates")
public class TestSequentialTimeout {
public static void main(String[] args) throws InterruptedException, ExecutionException {
testFirstTimeout();
}
/**
* begin-1576719450476
* callback worker0 failure--1576719451338----worker0--default-threadName:main
* callback worker1 failure--1576719451338----worker1--default-threadName:main
* callback worker2 failure--1576719451338----worker2--default-threadName:main
* end-1576719451338
* cost-862
*/
private static void testFirstTimeout() throws ExecutionException, InterruptedException {
SeqWorker1 w1 = new SeqWorker1();
SeqWorker2 w2 = new SeqWorker2();
SeqTimeoutWorker t = new SeqTimeoutWorker();
WorkerWrapper<String, String> workerWrapper2 = new WorkerWrapper.Builder<String, String>()
.worker(w2)
.callback(w2)
.param("2")
.build();
WorkerWrapper<String, String> workerWrapper1 = new WorkerWrapper.Builder<String, String>()
.worker(w1)
.callback(w1)
.param("1")
.next(workerWrapper2)
.build();
//2在1后面串行
//T会超时
WorkerWrapper<String, String> workerWrapperT = new WorkerWrapper.Builder<String, String>()
.worker(t)
.callback(t)
.param("t")
.next(workerWrapper1)
.build();
long now = SystemClock.now();
System.out.println("begin-" + now);
Async.beginWork(5000, workerWrapperT);
System.out.println("end-" + SystemClock.now());
System.err.println("cost-" + (SystemClock.now() - now));
Async.shutDown();
}
}