NIO非阻塞模式的HTTP Client
背景
这段时间一直在开发一个API网关程序,因为旧的API网关出了一个大问题。
API网关顾名思义,上游的请求进来,下游的请求出去:
1)旧的API网关模式中,上游进来的请求采用NIO的模式处理请求接入,只需要少量IO线程就可以处理上游网络连接,这样上游就可以达到10k的连接数;
2)旧的API网关中,上游的线程模型采用的是Reactor模式,也就是 [CPU数量的IO线程Boss]+[128个请求处理线程worker],中间通过一个ArrayQueue来缓冲请求实体;
3)向下游发送请求时,是在worker线程中基于连接池创建一个HTTP Client 3.x的客户端,选择地址、组织数据,利用客户端对象发送数据,等待返回;
相信一眼就可以看到问题了,正如上文粗体的“等待”提示所述:
当上游有10k的请求连接进来的时候,网关中只能最多同时有128个worker线程等待下游的IO
返回。
这硬生生的给后端来了个“并发限制”,但是当下游某些特征的请求返回比较慢的时候,会造成线程全部类型请求的阻塞,这还能起到“网关”这个冠名的作用么?!
分析与实践
先上图,无图无真相:
BIO模式

NIO模式

图片摘自https://github.com/Netflix/zuul/wiki
从BIO的图中看出,正式我们之前的网关模式,后端没有返回网络数据的时候,线程会一直阻塞等待网络返回;但是下一张图给我们提供了一个好的思路,就是:
线程不可以阻塞在网络事件上,而且让网络事件来出发线程的下一步操作。这样的话我们再也不用
担心“多少个线程形成多少个并发”这样无谓的等待。
这也正是NIO、事件通知甚至linux的多路复用epoll的核心思路。
二话不说,准备上代码,上代码前有个整体的代码框架环境的预设:
我们的客户端是处在基于netty这样的优秀的NIO网络通信框架中的,其中上游已经建立一个
worker线程组NioEventLoopGroup来处理网络事件。
整个改造思路是:
下游客户端的IO最好也复用这个事件循环线程组,这样就如上图NIO一样,这个线程组都是由事件
驱动的。
上游的服务端IO已经处理好,如果数据都准备好了,我们准备进入下游客户端,上代码:
public class PerHostHttpClientPool implements IHttpClientPool{
private static final Logger logger = LoggerFactory.getLogger(PerHostHttpClientPool.class);
private static final int DEFAULT_SO_TIMEOUT_MILLS = 5000;
private final String clientPoolName;
private final ConnectionProvider pooledConnectionProvider;
private final PooledHttpClientConfiguration configuration;
public boolean ownResource = false;
private final EventLoopGroup eventLoopGroup;
private final AtomicInteger total;
private final AtomicInteger inUse;
public PerHostHttpClientPool(String clientPoolName, PooledHttpClientConfiguration pooledHttpClientConfiguration, EventLoopGroup eventLoopGroup) {
this.clientPoolName = clientPoolName;
this.configuration = pooledHttpClientConfiguration;
pooledConnectionProvider = ConnectionProvider.fixed(this.clientPoolName, pooledHttpClientConfiguration.getMaxConnectionsPerHost());
this.eventLoopGroup = eventLoopGroup;
ownResource = true;
total = new AtomicInteger(0);
inUse = new AtomicInteger(0);
}
@Override
public CompletableFuture<Tuple3<HttpResponseStatus, HttpHeaders, String>> applyRequest(URL endpoint, HttpMethod method , String body){
return this.applyRequest(endpoint, method,new DefaultHttpHeaders(), body,DEFAULT_SO_TIMEOUT_MILLS);
}
@Override
public CompletableFuture<Tuple3<HttpResponseStatus, HttpHeaders, String>> applyRequest(URL endpoint, HttpMethod method, HttpHeaders userHeaders, String body,int soTimeoutMills) {
return HttpClient.create(pooledConnectionProvider)
.tcpConfiguration(tcpClient -> tcpClient
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, configuration.getConnectTimeout())
.option(ChannelOption.WRITE_BUFFER_WATER_MARK, new WriteBufferWaterMark(configuration.getWriteBufferLowWaterMark(), configuration.getWriteBufferHighWaterMark()))
.option(ChannelOption.SO_KEEPALIVE, configuration.isSocketKeepAlive())
.doOnConnected(connection ->{
connection.addHandlerLast(new ReadTimeoutHandler(soTimeoutMills, TimeUnit.MILLISECONDS))
.addHandlerLast(new WriteTimeoutHandler(soTimeoutMills,TimeUnit.MILLISECONDS));
total.incrementAndGet();
}
)
.doOnDisconnected(connection -> total.decrementAndGet())
.runOn(eventLoopGroup)
.host(endpoint.getHost())
.port(endpoint.getPort())
)
.observe((connection, state) -> {
logger.debug("ClientPool[{}] Connection[{}] state has been changed to {}", pooledConnectionProvider, connection, state);
if(state == CONFIGURED){
inUse.incrementAndGet();
}else if(state == RELEASED || state == DISCONNECTING){
inUse.decrementAndGet();
}
})
.wiretap(true)
.headers(h -> h.set(userHeaders).set(HttpHeaderNames.USER_AGENT,"AiAccess(Asiainfo PRD TA)/1.0"))
.request(method)
.uri(endpoint.getPath())
.send(ByteBufFlux.fromString(Mono.just(body),configuration.getCharset(), ByteBufAllocator.DEFAULT))
.responseSingle((httpClientResponse, byteBufMono) -> Mono.zip(Mono.just(httpClientResponse.status()), Mono.just(httpClientResponse.responseHeaders()), byteBufMono.asString(configuration.getCharset())))
.toFuture();
}
@Override
public void shutdown() {
if(ownResource){
if(pooledConnectionProvider != null && !pooledConnectionProvider.isDisposed()){
pooledConnectionProvider.dispose();
ownResource = false;
}
}
}
@Override
public PooledHttpClientConfiguration getConfiguration() {
return configuration;
}
@Override
public int total() {
return total.get();
}
@Override
public int inUse() {
return inUse.get();
}
public static class UnitTest{
PooledHttpClientConfiguration configuration = new PooledHttpClientConfiguration();
int count;
CountDownLatch latch;
IHttpClientPool httpClientPool;
EventLoopGroup eventLoopGroup;
@Before
public void before(){
configuration.setSocketKeepAlive(false);
count = 10;
eventLoopGroup = new NioEventLoopGroup(2);
httpClientPool = new PerHostHttpClientPool("TestHttpClient",configuration, eventLoopGroup/*,onResponse,onException*/);
latch = new CountDownLatch(count);
}
@Test
public void testApplyRequest() throws Exception {
for (int i = 0; i < 10; i++) {
CompletableFuture<HttpHeaders> cf = httpClientPool.applyRequest(new URL("http://localhost:80/"),HttpMethod.POST,"hello aiaccess!").thenApply(resultTuple3 -> {
//Do something of busy code
logger.error("I'm {}, response code is {}, response headers is {}, response body is {}",Thread.currentThread().getName(),resultTuple3.getT1(),resultTuple3.getT2(),resultTuple3.getT3());
latch.countDown();
logger.error("total={}, inUse={}",httpClientPool.total(),httpClientPool.inUse());
//Do something of busy code
return resultTuple3.getT2();
}).whenComplete((result,throwable) -> {
//Do something of busy code
if(throwable != null) {
logger.error("I'm {}, http request error!", Thread.currentThread().getName(), throwable);
}else{
logger.error("I'm {}, http response Headers {} !", Thread.currentThread().getName(), result);
}
//Do something of busy code
logger.error("total={}, inUse={}",httpClientPool.total(),httpClientPool.inUse());
latch.countDown();
});
}
latch.await();
}
@After
public void after(){
httpClientPool.shutdown();
logger.error("shutdown pool.");
logger.error("total={}, inUse={}",httpClientPool.total(),httpClientPool.inUse());
}
}
}
后记
用户拿到的请求结果不再是直接结果,而是一个CompletableFuture,便于用户注册后续回调处理,真正做到网络完成后通知下一步操作,不再有线程在等。
所以说,反应式编程是不是很颠覆以前的写法,不断注册回调驱动业务操作,活生生切分了以前的业务逻辑。在我看来,openresty的那种coroutine的方式更加适合开发人员,因为协程调度器只需要在网络返回后resume一下协程继续处理,而后续的处理代码会很自然得写在网络操作之后,协程的本质是不是把整个请求的调用栈给存起来,等待网络响应时机去恢复调用栈,CPU只管不断循环各种事件来调度协程。
我猜想go routine 是不是类似的,有待研究一下后续补充。