如何为高邮市创建一个自己的国外网站?
摘要:高邮市建设网站,怎样做自己的国外网站,前端刚毕业开多少工资,互联网营销师培训基地本文要点: 为什么需要异步调用CompletableFuture 基本使用RPC 异步调用HTTP 异步调用编排 Completa
高邮市建设网站,怎样做自己的国外网站,前端刚毕业开多少工资,互联网营销师培训基地本文要点#xff1a;
为什么需要异步调用CompletableFuture 基本使用RPC 异步调用HTTP 异步调用编排 CompletableFuture 提高吞吐量BIO 模型
当用户进程调用了recvfrom 这个系统调用#xff0c;kernel 就开始了 IO 的第一个阶段#xff1a;准备数据。对于 network io 来说…本文要点
为什么需要异步调用CompletableFuture 基本使用RPC 异步调用HTTP 异步调用编排 CompletableFuture 提高吞吐量BIO 模型
当用户进程调用了recvfrom 这个系统调用kernel 就开始了 IO 的第一个阶段准备数据。对于 network io 来说很多时候数据在一开始还没有到达比如还没有收到一个完整的UDP包这个时候 kernel 就要等待足够的数据到来。而在用户进程这边整个进程会被阻塞。当 kernel 一直等到数据准备好了它就会将数据从 kernel 中拷贝到用户内存然后 kernel 返回结果用户进程才解除 block 的状态重新运行起来。所以Blocking IO 的特点就是在 IO 执行的两个阶段都被 block 了。
同步调用 在同步调用的场景下依次请求多个接口耗时长、性能差接口响应时长 T T1T2T3……Tn。
减少同步等待
一般这个时候为了减少同步等待时间会使用线程池来同时处理多个任务接口的响应时间就是 MAX(T1,T2,T3) 线程池异步
大概代码如下
FutureString future executorService.submit(() - {Thread.sleep(2000);return hello world;
});
while (true) {if (future.isDone()) {System.out.println(future.get());break;}
}
同步模型中使用线程池确实能实现异步调用的效果也能压缩同步等待的时间但是也有一些缺陷
CPU 资源大量浪费在阻塞等待上导致 CPU 资源利用率低。为了增加并发度会引入更多额外的线程池随着 CPU 调度线程数的增加会导致更严重的资源争用上下文切换占用 CPU 资源。线程池中的线程都是阻塞的硬件资源无法充分利用系统吞吐量容易达到瓶颈。
NIO 模型
为了解决 BIO 中的缺陷引入 NIO 模型 NIO 模型
当用户进程发出 read 操作时如果 kernel 中的数据还没有准备好那么它并不会 block 用户进程而是立刻返回一个 error。从用户进程角度讲 它发起一个 read 操作后并不需要等待而是马上就得到了一个结果。用户进程判断结果是一个 error 时它就知道数据还没有准备好于是它可以再次发送 read 操作。一旦 kernel 中的数据准备好了并且又再次收到了用户进程的 system call那么它马上就将数据拷贝到了用户内存然后返回。所以用户进程其实是需要不断的主动询问 kernel 数据好了没有。
异步优化思路
我们知道了 NIO 的调用方式比 BIO 好那我们怎么能在业务编码中使用到 NIO 呢自己动手将 BIO 替换成 NIO 肯定不现实已有组件支持 NIO 的可以直接使用不支持的继续使用自定义线程池。
通过 RPC NIO 异步调用、 HTTP 异步调用的方式降低线程数从而降低调度上下文切换开销。没有原生支持 NIO 异步调用的继续使用线程池。引入 CompletableFuture 对业务流程进行编排降低依赖之间的阻塞。
简述CompletableFuture
CompletableFuture 是 java.util.concurrent 库在 java 8 中新增的主要工具同传统的 Future 相比其支持流式计算、函数式编程、完成通知、自定义异常处理等很多新的特性 常用 API 举例
supplyAsync
CompletableFutureString future CompletableFuture.supplyAsync(()-{ try{Thread.sleep(1000L);return hello world;} catch (Exception e){return failed;}
});
System.out.println(future.join());
// output
hello world
开启异步任务到另一个线程执行。
complete
CompletableFutureString future1 new CompletableFuture();
future.complete(hello world); //异步线程执行
future.whenComplete((res, throwable) - {System.out.println(res);
});
System.out.println(future1.join());
CompletableFutureString future2 new CompletableFuture();
future.completeExceptionally(new Throwable(failed)); //异步线程执行
System.out.println(future2.join());
// output
hello world
hello worldException in thread main
java.util.concurrent.CompletionException:
java.lang.Throwable: failed
complete 正常完成该 CompletableFuture。
completeExceptionally 异常完成该 CompletableFuture。
thenApply
String original Message;
CompletableFutureString cf CompletableFuture.completedFuture(original).thenApply(String::toUpperCase);
System.out.println(cf.join());
// output
MESSAGE
任务后置处理。
图示 thenApply 图示
thenCombine
CompletableFutureString cf CompletableFuture.completedFuture(Message).thenApply(String::toUpperCase);
CompletableFutureString cf1 CompletableFuture.completedFuture(Message).thenApply(String::toLowerCase);
CompletableFutureString allCf cf.thenCombine(cf1, (s1, s2) - s1 s2);
System.out.println(allCf.join());
// output
MSGmsg
合并任务两个任务同时执行结果由合并函数 BiFunction 返回。
图示 thenCombine 图示
allOf
CompletableFutureString future1 CompletableFuture.supplyAsync(() - Message1);
CompletableFutureString future2 CompletableFuture.supplyAsync(() - Message2);
CompletableFutureString future3 CompletableFuture.supplyAsync(() - Message3);
CompletableFutureString future CompletableFuture.allOf(future1, future2, future3).thenApply(v - {String join1 future1.join();String join2 future2.join();String join3 future3.join();return join1 join2 join3;});
System.out.println(future.join());
// output
Msg1Msg2Msg3
allOf 会阻塞等待所有异步线程任务结束。
allOf 里的 join 并不会阻塞传给 thenApply 的函数是在 future1, future2, future3 全部完成时才会执行 。
图示
allOf 图示CF 执行线程
下面有两个小demo可以先试着想想输出的结果
String original Message;
CompletableFuture cf CompletableFuture.supplyAsync(() - {System.out.println(supplyAsync thread: Thread.currentThread().getName());return original;
}).thenApply(r - {System.out.println(thenApply thread: Thread.currentThread().getName());return r;
});
System.out.println(cf.join());
// output
supplyAsync thread: ForkJoinPool.commonPool-worker-1
thenApply thread: main
Message
String original Message;
CompletableFuture cf CompletableFuture.supplyAsync(() - {System.out.println(supplyAsync thread: Thread.currentThread().getName());try {Thread.sleep(100);} catch (InterruptedException e) {throw new RuntimeException(e);}return original;
}).thenApply(r - {System.out.println(thenApply thread: Thread.currentThread().getName());return r;
});
System.out.println(cf.join());
// output
supplyAsync thread: ForkJoinPool.commonPool-worker-1
thenApply thread: ForkJoinPool.commonPool-worker-1
Message
先看结论
执行 complete 的线程会执行当前调用链上的所有CF。如果 CF 提前 complete后续 CF 由初始线程执行。
异步任务里没有 sleep 的时候异步任务很快就会完成意味着 JVM 执行到 thenApply 的时候前置 CF 已经提前完成所以后续的 CF 会被初始线程 main 线程执行。
异步任务里有 sleep 的时候 JVM 执行到 thenApply 时前置 CF 还没有完成前置 CF complete 的线程会执行所有后续的 CF。
CF 嵌套join
ExecutorService executorService Executors.newFixedThreadPool(2);
CompletableFutureInteger cf1 CompletableFuture.supplyAsync(() - {Thread.sleep(3000);return 1;
}, executorService);
CompletableFutureInteger cf2 CompletableFuture.supplyAsync(() - {Thread.sleep(3000);return 2;
}, executorService);
Integer join1 cf1.thenApply((cf1Val) - {System.out.println(cf1 start value: cf1Val);Integer cf2Val cf2.join();System.out.println(cf2 end value: cf2Val);return 3;
}).join();
//output
cf1 start value:1
cf2 end value:2
代码很简单有一个线程数为 2 的线程池cf1、cf2 都使用这个线程执行异步任务特别的是在 cf1.thenApply 中会调用 cf2.join()当线程数是2的时候可以顺利输出
ExecutorService executorService Executors.newFixedThreadPool(1);
CompletableFutureInteger cf1 CompletableFuture.supplyAsync(() - {Thread.sleep(3000);return 1;
}, executorService);
CompletableFutureInteger cf2 CompletableFuture.supplyAsync(() - {Thread.sleep(3000);return 2;
}, executorService);Integer join1 cf1.thenApply((cf1Val) - {System.out.println(cf1 start value: cf1Val);Integer cf2Val cf2.join();System.out.println(cf2 end value: cf2Val);return 3;
}).join();
//output
cf1 start value:1
这时候我们将线程池的线程数调整为 1这时只会输出 cf1 start value:1然后就一直阻塞。
使用 jstack -l pid 查看线程状态发现是 WAITING等待的地方正是我们在代码里调用的cf2.join()
pool-1-thread-1 #11 prio5 os_prio31 tid0x00000001429f5000 nid0xa903 waiting on conditionjava.lang.Thread.State: WAITING (parking)at sun.misc.Unsafe.park(Native Method)- parking to wait for 0x000000076ba5f7d0 (a java.util.concurrent.CompletableFuture$Signaller)at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)at java.util.concurrent.CompletableFuture$Signaller.block(CompletableFuture.java:1707)at java.util.concurrent.ForkJoinPool.managedBlock(ForkJoinPool.java:3323)at java.util.concurrent.CompletableFuture.waitingGet(CompletableFuture.java:1742)at java.util.concurrent.CompletableFuture.join(CompletableFuture.java:1947)at com.ppphuang.demo.threadPool.ExecutorsTest.lambda$main$2(ThreadPoolExecutorsTest.java:34)
原因是我们在唯一一个线程中调用 cf2.join()阻塞等待 cf2 完成但是 cf2 需要等待 cf1 完成之后才有空闲线程去执行。这就类似于你右手正拿着一个水杯然后等待右手拿水壶倒满水这是不可能完成的。所以尽量不要嵌套join不注意隔离线程池的话很容易造成’死锁‘线程阻塞。
CF 常用 API
API描述supplyAsync开启异步任务到另一个线程执行异步任务有返回值。complete完成任务。completeExceptionally异常结束任务。thenCombine合并任务两个任务同时执行结果由合并函数 BiFunction 返回。thenApply任务后置处理。applyToEither会取两个任务最先完成的任务上个任务和这个任务同时进行哪个先结束先用哪个结果。handle后续处理。whenComplete完成后的处理。allOf等待所有异步线程任务结束。join获取返回值没有complete的 CF 对象调用join时会等待complete再返回已经 complete的 CF 对象调用join时会立刻返回结果。
优化过程
异步 RPC 客户端
我们手写的这个 RPC 框架支持异步调用如果你想看具体的实现可以在文末找到源码链接。异步调用之前会设置一个 CallBack 方法异步调用时会直接返回 null不会等待服务端返回接果服务端返回结果之后会通过 RPC 客户端自带的线程池执行设置的 CallBack 方法。
RPC 异步调用图示 RPC 异步调用
包装异步RPC Client
通过 AsyncExecutor 包装 RPC的客户端AsyncExecutor 类中的 client 属性值为创建的某个 RPC 服务的异步客户端代理类这个代理类在构造方法中创建并赋值给 client 属性。
类中的 async 方法接受 Function 类型的参数 function可以通过 function.apply(client) 来通过 client 执行真正的 RPC 调用。
在 async 方法中实例化一个 CompletableFuture 并将 CompletableFuture 作为异步回调的上下文设置到 RPC 的异步回调中之后将该 CompletableFuture 返回给调用者。
public class AsyncExecutorC {private C client;public AsyncExecutor(ClientProxyFactory clientProxyFactory, ClassC clazz, String group, String version) {this.client clientProxyFactory.getProxy(clazz, group, version, true);}public R CompletableFutureR async(FunctionC, R function) {CompletableFutureR future new CompletableFuture();ClientProxyFactory.setLocalAsyncContextAndAsyncReceiveHandler(future, CompletableFutureAsyncCallBack.instance());try {function.apply(client);} catch (Exception e) {future.completeExceptionally(e);}return future;}
} 异步回调类
public class CompletableFutureAsyncCallBack extends AsyncReceiveHandler {private static volatile CompletableFutureAsyncCallBack INSTANCE;private CompletableFutureAsyncCallBack() {}Overridepublic void callBack(Object context, Object result) {if (!(context instanceof CompletableFuture)) {throw new IllegalStateException(the context must be CompletableFuture);}CompletableFuture future (CompletableFuture) context;if (result instanceof Throwable) {future.completeExceptionally((Throwable) result);return;}log.info(result:{}, result);future.complete(result);}
}
AsyncReceiveHandler 是 RPC 的异步回调抽象类类中的 callBack、onException 抽象方法需要子类实现。
CompletableFutureAsyncCallBack 实现了这个 callBack 抽象方法第一个参数是我们在包装异步 RPC Client 时设置的 CompletableFuture 上下文第二个参数是 RPC 返回的结果。方法中判断 RPC 返回的结果是否异常若异常通过 completeExceptionally 异常结束这个 CompletableFuture若正常通过 complete 正常结束这个 CompletableFuture。
注册异步客户端Bean
Component
public class AsyncExecutorConfig {AutowiredClientProxyFactory clientProxyFactory;Beanpublic AsyncExecutorDemoService demoServiceAsyncExecutor() {return new AsyncExecutor(clientProxyFactory, DemoService.class, , );}
} 异步 RPC 调用
Autowired
AsyncExecutorDemoService demoServiceAsyncExecutor;CompletableFutureString pppName demoServiceAsyncExecutor.async(service - service.hello(ppp));String name pppName.join(); 异步HTTP WebClient
WebClient 是从 Spring WebFlux 5.0 版本开始提供的一个非阻塞的基于响应式编程的进行 HTTP 请求的客户端工具。它的响应式编程的基于 Reactor 的。
WebClient VS RestTemplate
WebClient的优势在于
非阻塞响应式 IO单位时间内有限资源下支持更高的并发量。支持使用 Java8 Lambda 表达式函数。支持同步、异步、Stream 流式传输。
WebClient 使用
public CompletableFutureString asyncHttp(String url) {WebClient localhostWebClient WebClient.builder().baseUrl(http://localhost:8080).build();MonoHttpResultString userMono localhostWebClient.method(HttpMethod.GET).uri(url).retrieve().bodyToMono(new ParameterizedTypeReferenceHttpResultString() {})//异常处理 有onErrorReturn时doOnError不会触发所以不需要后续在CompletableFuture中handle处理异常//如果不使用onErrorReturn建议在后续CompletableFuture中handle处理异常.onErrorReturn(new HttpResult(201, default, default hello))//超时处理.timeout(Duration.ofSeconds(3))//返回值过滤.filter(httpResult - httpResult.code 200)//默认值.defaultIfEmpty(new HttpResult(201, defaultIfEmpty, defaultIfEmpty hello))//失败重试.retryWhen(Retry.backoff(1, Duration.ofSeconds(1)));CompletableFutureHttpResultString stringCompletableFuture WebClientFutureFactory.getCompletableFuture(userMono);return stringCompletableFuture.thenApply(HttpResult::getData);}
WebClient 整合 CF
WebClientFutureFactory.getCompletableFuture 方法会把 WebClient 返回的结果组装成 CompletableFuture 使用的是 Mono 类的 doOnError 和 subscribe 方法当正常返回时通过 subscribe 来调用 completableFuture.complete当异常时通过 doOnError 来调用 completableFuture.completeExceptionally
public class WebClientFutureFactory {public static T CompletableFutureT getCompletableFuture(MonoT mono) {CompletableFutureT completableFuture new CompletableFuture();mono.doOnError(throwable - {completableFuture.completeExceptionally(throwable);log.error(mono.doOnError throwable:{}, throwable.getMessage());}).subscribe(result - {completableFuture.complete(result);log.debug(mono.subscribe execute thread: {}, Thread.currentThread().getName());});return completableFuture;}
}
WebClient 对同一服务的多次调用
public FluxUser fetchUsers(ListInteger userIds) {return Flux.fromIterable(userIds).parallel().flatMap(this::getUser).ordered((u1, u2) - u2.id() - u1.id());
}
对返回相同类型的不同服务进行多次调用
public FluxUser fetchUserAndOtherUser(int id) {return Flux.merge(getUser(id), getOtherUser(id)).parallel().runOn(Schedulers.elastic()).ordered((u1, u2) - u2.id() - u1.id());
}
对不同类型的不同服务的多次调用
public Mono fetchUserAndItem(int userId, int itemId) {MonoUser user getUser(userId).subscribeOn(Schedulers.elastic());MonoItem item getItem(itemId).subscribeOn(Schedulers.elastic());return Mono.zip(user, item, UserWithItem::new);
}
异步数据库调用
使用 CompletableFuture.supplyAsync 执行异步任务时必须指定成自己的线程池否则 CompletableFuture 会使用默认的线程池 ForkJoinPool默认线程池数量为 cpus - 1
WebClientBoolean dbFuture CompletableFuture.supplyAsync(() - getDb(id), ThreadPoolConfig.ASYNC_TASK_EXECUTOR);
编排 CF
构造了所有需要异步执行的 CompletableFuture 之后使用 allOf 方法阻塞等待所有的 CompletableFuture 结果allOf 响应之后可以通过 join 获取各个 CompletableFuture 的响应接口这里的 join 是会立刻返回的不会阻塞
//RPC 的 CompletableFuture
CompletableFutureString pppName demoServiceAsyncExecutor.async(service - service.hello(ppp));//RPC 的 CompletableFuture
CompletableFutureString huangName demoServiceAsyncExecutor.async(service - service.hello(huang));//DB 操作的 CompletableFuture
WebClientBoolean dbFuture CompletableFuture.supplyAsync(() - getDb(id), ThreadPoolConfig.ASYNC_TASK_EXECUTOR);//allOf 方法阻塞等待所有的 CompletableFuture 结果
return CompletableFuture.allOf(pppName, huangName, dbFuture)//组装结果返回.thenApply(r - pppName.join() huangName.join() dbFuture.join()).join();
超时处理
java9 中 CompletableFuture 才有超时处理使用方法如下
CompletableFuture.supplyAsync(() - 6 / 3).orTimeout(1, TimeUnit.SECONDS);
java8 中需要配合 ScheduledExecutorService applyToEither
public class TimeoutUtils {private static final ScheduledExecutorService scheduledExecutor Executors.newScheduledThreadPool(1);static {Runtime.getRuntime().addShutdownHook(new Thread(scheduledExecutor::shutdownNow));}public static T CompletableFutureT timeout(CompletableFutureT cf, long timeout, TimeUnit unit) {CompletableFutureT result new CompletableFuture();scheduledExecutor.schedule(() - result.completeExceptionally(new TimeoutException()), timeout, unit);return cf.applyToEither(result, Function.identity());}
}
TimeoutUtils 类与有一个静态属性值为初始化的一个 ScheduledExecutorService 还有一个静态方法 timeout 这个方法将传入的 cf 用 applyToEither 接口与一个调度计时的 CompletableFuture 组合哪个 CompletableFuture 先执行完成就返回哪个的结果。
具体使用如下
CompletableFutureInteger future demoServiceAsyncExecutor.async(service - service.getAge(18));
CompletableFutureInteger futureWithTimeout TimeoutUtils.timeout(future, 3, TimeUnit.SECONDS);
futureWithTimeout.join();
异常与默认值处理
CompletableFuture 中可以处理异常有下面三个 API
public U CompletableFutureU handle(java.util.function.BiFunction? super T, Throwable, ? extends U fn)
handle 接口不论 CompletableFuture 执行成功还是异常都会被处罚handle 接受一个 BiFunction 参数BiFunction 中的第一个参数为 CompletableFuture 的结果另一个参数为 CompletableFuture 执行过程中的异常Handle可以返回任意类型的值。可以给 handle 传入自定义函数根据结果跟执行异常返回最终数据。
public CompletableFutureT whenComplete(java.util.function.BiConsumer? super T, ? super Throwable action)
whenComplete 接口与 handle 类似whenComplete 接受一个 BiConsumer 参数BiConsumer 中的第一个参数为 CompletableFuture 的结果另一个参数为 CompletableFuture 执行过程中的异常但是没有返回值。
public CompletableFutureT exceptionally(java.util.function.FunctionThrowable, ? extends T fn)
exceptionally 接口只有在执行异常的时候才会被触发接受一个 Function 参会 Function 只有一个参数为 CompletableFuture 执行过程中的异常可以有一个任意返回值。
下表是三个接口的对比
handle()whenComplete()exceptionly()访问成功YesYesNo访问失败YesYesYes能从失败中恢复YesNoYes能转换结果从T 到 UYesNoNo成功时触发YesYesNo失败时触发YesYesYes有异步版本YesYesYes
我们使用 handle 接口来处理异常与默认值下面是封装的一个 handle 接口入参
public class DefaultValueHandleR extends AbstractLogActionR implements BiFunctionR, Throwable, R {public DefaultValueHandle(boolean isNullToDefault, R defaultValue, String methodName, Object... args) {super(methodName, args);this.defaultValue defaultValue;this.isNullToDefault isNullToDefault;}Overridepublic R apply(R result, Throwable throwable) {logResult(result, throwable);if (throwable ! null) {return defaultValue;}if (result null isNullToDefault) {return defaultValue;}return result;}
}
这个类实现了 handle 接口需要的 BiFunction 类型在构造方法中有四个参数 boolean isNullToDefault, R defaultValue, String methodName, Object... args 第一个参数是决定执行结果为空值时是否将我们传进来的第二个参数作为默认值返回。当异常时也会将第二个参数作为默认返回值。最后两个参数一个是方法名称一个是调用参数可以给父类用作日志记录。
与 CompletableFuture 配合使用如下
CompletableFutureString pppName demoServiceAsyncExecutor.async(service - service.hello(ppp)).handle(new DefaultValueHandle(true, name, service.hello, ppp));
日志
封装了一个实现 BiConsumer 的 LogErrorAction 类父类有个抽象类 AbstractLogAction 这个类就是简单使用 logReslut 方法记录日志可以自己随意实现
public class LogErrorActionR extends AbstractLogActionR implements BiConsumerR, Throwable{Overridepublic void accept(R result, Throwable throwable) {logResult(result, throwable);}
}
与 CompletableFuture 配合使用如下
CompletableFutureString pppName demoServiceAsyncExecutor.async(service - service.hello(ppp)).whenComplete(new LogErrorAction(hello, ppp));
优化效果
优化前接口平均响应耗时 350ms优化后平均响应耗时 180ms下降 49% 左右。
最佳实践
禁止嵌套 join避免“死锁”线程阻塞。多个 CompletableFuture 聚合时建议使用 allOf。HTTP 使用无阻塞的 Spring webclient避免自定义线程池线程阻塞。使用 RPC 或者 HTTP 异步调用生成的 CompletableFuture 后续的 thenAppplyhandle 等禁止耗时操作避免阻塞异步框架线程池。禁止使用 CompletableFuture 的默认线程池不同任务自定义线程池不同级别业务线程池隔离根据测试情况设置线程数队列长度拒绝策略。异步执行的操作都加上超时CF 超时后不会终止线程中的超时任务不设置超时可能导致线程长时间阻塞。建议使用异常、默认值、空值替换、错误日志等工具记录信息方便排查问题。
