
import java.util.ArrayList; import java.util.Collections; import java.util.List; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; import java.util.function.Consumer; import java.util.function.Function; public class Promise{ private final CompletableFuture completableFuture; public Promise(@NonNull Consumer > consumer) { this.completableFuture = new CompletableFuture<>(); consumer.accept(new PromisePublisher (this.completableFuture)); } private Promise(@NonNull CompletableFuture completableFuture) { this.completableFuture = completableFuture; } public static Promise empty() { return new Promise<>(CompletableFuture.completedFuture(null)); } public static Promise of(R r) { return new Promise<>(CompletableFuture.completedFuture(r)); } @SuppressWarnings("unchecked") public static Promise > all(@NonNull List
> promises) { CompletableFuture [] futures = CollectionTools.map(promises, p -> p == null ? CompletableFuture.completedFuture(null) : p.completableFuture) .toArray(new CompletableFuture[promises.size()]); CompletableFuture completableFuture = CompletableFuture.allOf(futures); return new Promise<>(completableFuture.thenApply(v -> { List list = new ArrayList<>(); for (CompletableFuture future : futures) { try { list.add(future.get()); } catch (Exception e) { throw new IllegalStateException(e); } } return list; })); } public static Promise fail(Throwable e) { return new Promise<>(p -> p.publishError(e)); } public Promise then(@NonNull Function super T, ? extends R> fn) { return new Promise<>(this.completableFuture.thenApply(fn)); } public Promise thenCompose(@NonNull Function super T, ? extends Promise > fn) { return new Promise<>(publisher -> { this.then(t -> { try { AssertTools.notNull(fn.apply(t), () -> new NullPointerException("函数:" + fn.getClass() + " 执行完成之后返回了null")).then(r -> { publisher.publish(r); return null; }).peekError(throwable -> { publisher.publishError(throwable); }); } catch (Exception e) { publisher.publishError(e); } return null; }).peekError(throwable -> { publisher.publishError(throwable); }); }); } public Promise catchError(@NonNull Function fn) { return new Promise<>(this.completableFuture.exceptionally(throwable -> { return fn.apply(throwable.getCause()); })); } public Promise catchErrorCompose(@NonNull Function > fn) { return new Promise (publisher -> { this.then(t -> { publisher.publish(t); return null; }).catchError(throwable -> { try { AssertTools.notNull(fn.apply(throwable), () -> new NullPointerException("函数:" + fn.getClass() + " 执行完成之后返回了null")).then(t -> { publisher.publish(t); return null; }).peekError(e -> { publisher.publishError(e); }); } catch (Exception e) { publisher.publishError(e); } return null; }); }); } public Promise peekError(@NonNull Consumer consumer) { return new Promise (publisher -> { this.then(t -> { publisher.publish(t); return null; }).catchError(e -> { try { consumer.accept(e); publisher.publishError(e); } catch (Exception ex) { publisher.publishError(ex); } return null; }); }); } public Promise acceptError(@NonNull Consumer consumer) { return this.catchError(e -> { consumer.accept(e); return null; }); } public T sync() throws RuntimeException { try { return this.completableFuture.get(); } catch (RuntimeException e) { throw e; } catch (InterruptedException e) { throw new RuntimeException(e); } catch (ExecutionException e) { Throwable cause = e.getCause(); if (cause instanceof RuntimeException) { throw (RuntimeException) cause; } throw new RuntimeException(cause); } } }
Publisher类
import java.util.concurrent.CompletableFuture; public class PromisePublisher{ private final CompletableFuture completableFuture; protected PromisePublisher(CompletableFuture completableFuture) { this.completableFuture = completableFuture; } public void publish(T value) { this.completableFuture.complete(value); } public void publishError(Throwable e) { this.completableFuture.completeExceptionally(e); } }