Contents

NIO非阻塞模式的HTTP Client

背景

这段时间一直在开发一个API网关程序,因为旧的API网关出了一个大问题。

API网关顾名思义,上游的请求进来,下游的请求出去:

1)旧的API网关模式中,上游进来的请求采用NIO的模式处理请求接入,只需要少量IO线程就可以处理上游网络连接,这样上游就可以达到10k的连接数;

2)旧的API网关中,上游的线程模型采用的是Reactor模式,也就是 [CPU数量的IO线程Boss]+[128个请求处理线程worker],中间通过一个ArrayQueue来缓冲请求实体;

3)向下游发送请求时,是在worker线程中基于连接池创建一个HTTP Client 3.x的客户端,选择地址、组织数据,利用客户端对象发送数据,等待返回;

相信一眼就可以看到问题了,正如上文粗体的“等待”提示所述:

当上游有10k的请求连接进来的时候,网关中只能最多同时有128个worker线程等待下游的IO
返回。

这硬生生的给后端来了个“并发限制”,但是当下游某些特征的请求返回比较慢的时候,会造成线程全部类型请求的阻塞,这还能起到“网关”这个冠名的作用么?!

分析与实践

先上图,无图无真相:

BIO模式

/BIO.png

NIO模式

/NIO.png

图片摘自https://github.com/Netflix/zuul/wiki

从BIO的图中看出,正式我们之前的网关模式,后端没有返回网络数据的时候,线程会一直阻塞等待网络返回;但是下一张图给我们提供了一个好的思路,就是:

线程不可以阻塞在网络事件上,而且让网络事件来出发线程的下一步操作。这样的话我们再也不用
担心“多少个线程形成多少个并发”这样无谓的等待。

这也正是NIO、事件通知甚至linux的多路复用epoll的核心思路。

二话不说,准备上代码,上代码前有个整体的代码框架环境的预设:

我们的客户端是处在基于netty这样的优秀的NIO网络通信框架中的,其中上游已经建立一个
worker线程组NioEventLoopGroup来处理网络事件。

整个改造思路是:

下游客户端的IO最好也复用这个事件循环线程组,这样就如上图NIO一样,这个线程组都是由事件
驱动的。

上游的服务端IO已经处理好,如果数据都准备好了,我们准备进入下游客户端,上代码:

public class PerHostHttpClientPool implements IHttpClientPool{
    private static final Logger logger = LoggerFactory.getLogger(PerHostHttpClientPool.class);
    private static final int DEFAULT_SO_TIMEOUT_MILLS = 5000;

    private final String clientPoolName;
    private final ConnectionProvider pooledConnectionProvider;
    private final PooledHttpClientConfiguration configuration;
    public boolean ownResource = false;

    private final EventLoopGroup eventLoopGroup;

    private final AtomicInteger total;
    private final AtomicInteger inUse;

    public PerHostHttpClientPool(String clientPoolName, PooledHttpClientConfiguration pooledHttpClientConfiguration, EventLoopGroup eventLoopGroup) {
        this.clientPoolName = clientPoolName;
        this.configuration = pooledHttpClientConfiguration;
        pooledConnectionProvider = ConnectionProvider.fixed(this.clientPoolName, pooledHttpClientConfiguration.getMaxConnectionsPerHost());
        this.eventLoopGroup = eventLoopGroup;
        ownResource = true;
        total = new AtomicInteger(0);
        inUse =  new AtomicInteger(0);
    }


    @Override
    public CompletableFuture<Tuple3<HttpResponseStatus, HttpHeaders, String>> applyRequest(URL endpoint, HttpMethod method , String body){
        return this.applyRequest(endpoint, method,new DefaultHttpHeaders(), body,DEFAULT_SO_TIMEOUT_MILLS);
    }

    @Override
    public CompletableFuture<Tuple3<HttpResponseStatus, HttpHeaders, String>> applyRequest(URL endpoint, HttpMethod method, HttpHeaders userHeaders, String body,int soTimeoutMills) {
        return HttpClient.create(pooledConnectionProvider)
                .tcpConfiguration(tcpClient -> tcpClient
                        .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, configuration.getConnectTimeout())
                        .option(ChannelOption.WRITE_BUFFER_WATER_MARK, new WriteBufferWaterMark(configuration.getWriteBufferLowWaterMark(), configuration.getWriteBufferHighWaterMark()))
                        .option(ChannelOption.SO_KEEPALIVE, configuration.isSocketKeepAlive())
                        .doOnConnected(connection ->{
                                    connection.addHandlerLast(new ReadTimeoutHandler(soTimeoutMills, TimeUnit.MILLISECONDS))
                                            .addHandlerLast(new WriteTimeoutHandler(soTimeoutMills,TimeUnit.MILLISECONDS));
                                    total.incrementAndGet();
                                }
                        )
                        .doOnDisconnected(connection -> total.decrementAndGet())
                        .runOn(eventLoopGroup)
                        .host(endpoint.getHost())
                        .port(endpoint.getPort())
                )
                .observe((connection, state) -> {
                            logger.debug("ClientPool[{}] Connection[{}] state has been changed to {}", pooledConnectionProvider, connection, state);
                            if(state == CONFIGURED){
                                inUse.incrementAndGet();
                            }else if(state == RELEASED || state == DISCONNECTING){
                                inUse.decrementAndGet();
                            }
                        })
                .wiretap(true)
                .headers(h -> h.set(userHeaders).set(HttpHeaderNames.USER_AGENT,"AiAccess(Asiainfo PRD TA)/1.0"))
                .request(method)
                .uri(endpoint.getPath())
                .send(ByteBufFlux.fromString(Mono.just(body),configuration.getCharset(), ByteBufAllocator.DEFAULT))
                .responseSingle((httpClientResponse, byteBufMono) -> Mono.zip(Mono.just(httpClientResponse.status()), Mono.just(httpClientResponse.responseHeaders()), byteBufMono.asString(configuration.getCharset())))
                .toFuture();
    }

    @Override
    public void shutdown() {
        if(ownResource){
            if(pooledConnectionProvider != null && !pooledConnectionProvider.isDisposed()){
                pooledConnectionProvider.dispose();
                ownResource = false;
            }
        }
    }

    @Override
    public PooledHttpClientConfiguration getConfiguration() {
        return configuration;
    }

    @Override
    public int total() {
        return total.get();
    }

    @Override
    public int inUse() {
        return inUse.get();
    }


    public static class UnitTest{
        PooledHttpClientConfiguration configuration = new PooledHttpClientConfiguration();
        int count;
        CountDownLatch latch;
        IHttpClientPool httpClientPool;
        EventLoopGroup eventLoopGroup;


        @Before
        public void before(){
            configuration.setSocketKeepAlive(false);
            count = 10;
            eventLoopGroup = new NioEventLoopGroup(2);

            httpClientPool = new PerHostHttpClientPool("TestHttpClient",configuration, eventLoopGroup/*,onResponse,onException*/);
            latch  = new CountDownLatch(count);
        }


        @Test
        public void testApplyRequest() throws Exception {
            for (int i = 0; i < 10; i++) {
                CompletableFuture<HttpHeaders> cf = httpClientPool.applyRequest(new URL("http://localhost:80/"),HttpMethod.POST,"hello aiaccess!").thenApply(resultTuple3 -> {
                    //Do something of busy code
                    logger.error("I'm {}, response code is {}, response headers is {}, response body is {}",Thread.currentThread().getName(),resultTuple3.getT1(),resultTuple3.getT2(),resultTuple3.getT3());
                    latch.countDown();
                    logger.error("total={}, inUse={}",httpClientPool.total(),httpClientPool.inUse());
                    //Do something of busy code
                    return resultTuple3.getT2();
                }).whenComplete((result,throwable) -> {
                    //Do something of busy code

                    if(throwable != null) {
                        logger.error("I'm {}, http request error!", Thread.currentThread().getName(), throwable);
                    }else{
                        logger.error("I'm {}, http response Headers {} !", Thread.currentThread().getName(), result);
                    }
                    //Do something of busy code
                    logger.error("total={}, inUse={}",httpClientPool.total(),httpClientPool.inUse());
                    latch.countDown();
                });
            }
            latch.await();
        }

        @After
        public void after(){
            httpClientPool.shutdown();
            logger.error("shutdown pool.");
            logger.error("total={}, inUse={}",httpClientPool.total(),httpClientPool.inUse());
        }
    }
}

后记

用户拿到的请求结果不再是直接结果,而是一个CompletableFuture,便于用户注册后续回调处理,真正做到网络完成后通知下一步操作,不再有线程在等。

所以说,反应式编程是不是很颠覆以前的写法,不断注册回调驱动业务操作,活生生切分了以前的业务逻辑。在我看来,openresty的那种coroutine的方式更加适合开发人员,因为协程调度器只需要在网络返回后resume一下协程继续处理,而后续的处理代码会很自然得写在网络操作之后,协程的本质是不是把整个请求的调用栈给存起来,等待网络响应时机去恢复调用栈,CPU只管不断循环各种事件来调度协程。

我猜想go routine 是不是类似的,有待研究一下后续补充。