我是靠谱客的博主 勤奋芹菜,最近开发中收集的这篇文章主要介绍1、HttpClient源码解析之MinimalHttpClient1、MinimalHttpClient2、InternalHttpClientHttpClientConnectionManagerStrictConnPoolHttpRequestExecutorDefaultManagedHttpClientConnectionHttpContextHttpResponseHttpEntity,觉得挺不错的,现在分享给大家,希望可以做个参考。

概述

官方说明:Commons HttpClient 3.x codeline is at the end of life. All users of Commons HttpClient 3.x are strongly encouraged to upgrade to HttpClient 4.1。

<dependency>
    <groupId>commons-httpclient</groupId>
    <artifactId>commons-httpclient</artifactId>
    <version>3.1</version>
</dependency>

备注:HttpClient 3.x 的版本都是 Commons HttpClient 的实现版本。HttpClient 4.x以及以上都是 httpclientcommons的实现版本。无论是何种方式都是基于 HttpCore「HttpComponents Core」HttpClient「HttpComponents Client」实现的。其中HttpClient是基于 HttpCore实现的「意味着:导入对应的库HttpClient库时,也要导入相关的HttpCore的库」。

httpcore实现了一套HTTP 协议的基础组件,为构建客户端/代理/服务器端 HTTP 服务一致的 API。支持NIO & BIO。
httpclient 兼容HTTP 1.1、客户端认证功能、HTTP状态管理、HTTP连接管理等功能。

HttpComponents Client 是官方推荐使用的以下两种HTTP组件:

  • httpclient5-fluent:内部已经依赖了httpclient5、httpcore5。
<dependency>
    <groupId>org.apache.httpcomponents.client5</groupId>
    <artifactId>httpclient5-fluent</artifactId>
    <version>5.0.1</version>
</dependency>
  • httpclient:内部依赖了httpcore。
<dependency>
    <groupId>org.apache.httpcomponents</groupId>
    <artifactId>httpclient</artifactId>
    <version>4.5.6</version>
</dependency>

httpclient5-fluent流式组件:去掉了https证书验证、禁用了请求重试「disableAutomaticRetries()」、简单的post、get、上传文件 等封装。有需要其他的可以自行调用call 方法、可自行调整连接池大小和超时时间。

HttpClient的连接池是在4.3之后的版本才有,我们可以通过PoolingHttpClientConnectionManager来管理我们的连接池以及查看连接池的使用情况。
在这里插入图片描述
CloseableHttpClient 是抽象类,真正执行请求「doExecute」的是其实现类。根据实现方式选择为MinimalHttpClient、InternalHttpClient。

1、MinimalHttpClient

一个极简可以用的版本,是核心设计的原初模型。只是封装了最基本的HTTP过程,提供最直接的客户端服务器交互,不支持代理,不支持在各种情况下的重试(重定向,权限校验,IO异常等)。

/**
 * Minimal implementation of {@link CloseableHttpClient}. This client is
 * optimized for HTTP/1.1 message transport and does not support advanced
 * HTTP protocol functionality such as request execution via a proxy, state
 * management, authentication and request redirects.
 * <p>
 * Concurrent message exchanges executed by this client will get assigned to
 * separate connections leased from the connection pool.
 * </p>
 */

简洁翻译:适用于HTTP/1.1协议的消息传输。不涉及HTTP协议中代理、状态管理、认证以及重定向功能。每个消息都是通过链接池中独立的连接处理的。

protected CloseableHttpResponse doExecute(
        final HttpHost target,
        final ClassicHttpRequest request,
        final HttpContext context) throws IOException {
    ...
    final ExecRuntime execRuntime = new InternalExecRuntime(log, connManager, requestExecutor,
            request instanceof CancellableDependency ? (CancellableDependency) request : null);
    try {
        if (!execRuntime.isEndpointAcquired()) {
        	// 创建Endpoint
            execRuntime.acquireEndpoint(exchangeId, route, null, clientContext);
        }
        if (!execRuntime.isEndpointConnected()) {
        	// 通过 Endpoint 与服务端建立连接,并且将 连接与Socket绑定,创建 SocketHolder
            execRuntime.connectEndpoint(clientContext);
        }
		...
        httpProcessor.process(request, request.getEntity(), context);
        // 发送请求头、请求实体 & 返回响应头以及响应实体等信息
        final ClassicHttpResponse response = execRuntime.execute(exchangeId, request, clientContext);
        httpProcessor.process(response, response.getEntity(), context);
		...
        // check for entity, release connection if possible
        final HttpEntity entity = response.getEntity();
        if (entity == null || !entity.isStreaming()) {
            // connection not needed and (assumed to be) in re-usable state
            execRuntime.releaseEndpoint();
            return new CloseableHttpResponse(response, null);
        }
        ResponseEntityProxy.enhance(response, execRuntime);
        return new CloseableHttpResponse(response, execRuntime);
    }
}

1.1、InternalExecRuntime

LeaseRequest

class InternalExecRuntime implements ExecRuntime, Cancellable {

	private final HttpClientConnectionManager manager;
	private final AtomicReference<ConnectionEndpoint> endpointRef;
	
	public void acquireEndpoint(
            final String id, final HttpRoute route, final Object object, final HttpClientContext context) throws 
            	IOException {
	    ...
       // 1、从客户端连接管理器 PoolingHttpClientConnectionManager#StrictConnPool 租借 LeaseRequest
       final LeaseRequest connRequest = manager.lease(id, route, connectionRequestTimeout, object);
       ...
       // 创建的Endpoint为 PoolingHttpClientConnectionManager的内部类InternalConnectionEndpoint
       final ConnectionEndpoint connectionEndpoint = connRequest.get(connectionRequestTimeout);
       endpointRef.set(connectionEndpoint);
       ...
	}
	
	private void connectEndpoint(final ConnectionEndpoint endpoint, final HttpClientContext context) throws IOException 	
	{	
		//HttpClientConnectionManager
        manager.connect(endpoint, connectTimeout, context);
    }

	public ClassicHttpResponse execute(
            final String id,
            final ClassicHttpRequest request,
            final HttpClientContext context) throws IOException, HttpException {
        // 通过 endpointRef 获取 ConnectionEndpoint
        final ConnectionEndpoint endpoint = ensureValid();
        if (!endpoint.isConnected()) {
            connectEndpoint(endpoint, context);
        }
        final RequestConfig requestConfig = context.getRequestConfig();
        final Timeout responseTimeout = requestConfig.getResponseTimeout();
        if (responseTimeout != null) {
            endpoint.setSocketTimeout(responseTimeout);
        }
        if (log.isDebugEnabled()) {
            log.debug(ConnPoolSupport.getId(endpoint) + ": start execution " + id);
        }
        return endpoint.execute(id, request, requestExecutor, context);
    }
}
  1. 获取InternalConnectionEndpoint。
  2. 连接Endpoint,将InternalConnectionEndpoint与Socket连接关系,与服务端建立连接通信。

2、InternalHttpClient

/**
 * Internal implementation of {@link CloseableHttpClient}.
 * <p>
 * Concurrent message exchanges executed by this client will get assigned to
 * separate connections leased from the connection pool.
 * </p>
 */

HttpClientConnectionManager

BasicHttpClientConnectionManager

BasicHttpClientConnectionManager是个简单的单连接管理器。尽管这个类是线程安全的,它在同一时间也只能被一个线程使用。对于相同的路由,BasicHttpClientConnectionManager会尽量重用旧的连接来发送后续的请求。如果后续请求的路由和旧连接中的路由不匹配,BasicHttpClientConnectionManager就会关闭当前连接,使用请求中的路由重新建立连接。如果当前的连接正在被占用,会抛出java.lang.IllegalStateException异常。

/**
 * A connection manager for a single connection. This connection manager maintains only one active
 * connection. Even though this class is fully thread-safe it ought to be used by one execution
 * thread only, as only one thread a time can lease the connection at a time.
 * <p>
 * This connection manager will make an effort to reuse the connection for subsequent requests
 * with the same {@link HttpRoute route}. It will, however, close the existing connection and
 * open it for the given route, if the route of the persistent connection does not match that
 * of the connection request. If the connection has been already been allocated
 * {@link IllegalStateException} is thrown.
 */
public class BasicHttpClientConnectionManager implements HttpClientConnectionManager {

}

PoolingHttpClientConnectionManager

创建MinimalHttpClient时初始化连接池。

public class PoolingHttpClientConnectionManager
    implements HttpClientConnectionManager, ConnPoolControl<HttpRoute> {
    private final ManagedConnPool<HttpRoute, ManagedHttpClientConnection> pool;
    private final HttpClientConnectionOperator connectionOperator;
  
	protected PoolingHttpClientConnectionManager(
            final HttpClientConnectionOperator httpClientConnectionOperator,
            final PoolConcurrencyPolicy poolConcurrencyPolicy,
            final PoolReusePolicy poolReusePolicy,
            final TimeValue timeToLive,
            final HttpConnectionFactory<ManagedHttpClientConnection> connFactory) {
        super();
        this.connectionOperator = Args.notNull(httpClientConnectionOperator, "Connection operator");
        switch (poolConcurrencyPolicy != null ? poolConcurrencyPolicy : PoolConcurrencyPolicy.STRICT) {
            case STRICT:
                this.pool = new StrictConnPool<>(
                        DEFAULT_MAX_CONNECTIONS_PER_ROUTE,// 5
                        DEFAULT_MAX_TOTAL_CONNECTIONS,// 25
                        timeToLive,// -1
                        poolReusePolicy,//LIFO
                        null);
                break;
            case LAX:
                this.pool = new LaxConnPool<>(
                        DEFAULT_MAX_CONNECTIONS_PER_ROUTE,
                        timeToLive,
                        poolReusePolicy,
                        null);
                break;
            default:
                throw new IllegalArgumentException("Unexpected PoolConcurrencyPolicy value: " + poolConcurrencyPolicy);
        }
        this.connFactory = connFactory != null ? connFactory : ManagedHttpClientConnectionFactory.INSTANCE;
        this.closed = new AtomicBoolean(false);
    }
} 

public LeaseRequest lease(final String id,final HttpRoute route,final Timeout requestTimeout,final Object state) {
 	//pool:StrictConnPool
  final Future<PoolEntry<HttpRoute, ManagedHttpClientConnection>> leaseFuture = this.pool.lease(route, state, requestTimeout, null);
  
  return new LeaseRequest() {
  
      private volatile ConnectionEndpoint endpoint;
      @Override
      public synchronized ConnectionEndpoint get(
              final Timeout timeout) throws InterruptedException, ExecutionException, TimeoutException {
          if (this.endpoint != null) {
              return this.endpoint;
          }
          final PoolEntry<HttpRoute, ManagedHttpClientConnection> poolEntry;
          try {
              poolEntry = leaseFuture.get(timeout.getDuration(), timeout.getTimeUnit());
          }
          try {
              ...
              final ManagedHttpClientConnection conn = poolEntry.getConnection();
              if (conn != null) {
                  conn.activate();
              } else {
              	  // 创建客户端连接 DefaultManagedHttpClientConnection,并且有 poolEntry#connRef持有
                  poolEntry.assignConnection(connFactory.createConnection(null));
              }
              if (leaseFuture.isCancelled()) {
                  pool.release(poolEntry, false);
              } else {
              	  // 创建默认的Endpoint 之 InternalConnectionEndpoint
                  this.endpoint = new InternalConnectionEndpoint(poolEntry);
              }
              return this.endpoint;
          }
      }
      ...
  };

}
public void connect(final ConnectionEndpoint endpoint, final TimeValue connectTimeout, final HttpContext context) {
    final InternalConnectionEndpoint internalEndpoint = cast(endpoint);
    final PoolEntry<HttpRoute, ManagedHttpClientConnection> poolEntry = internalEndpoint.getPoolEntry();
    if (!poolEntry.hasConnection()) {
        poolEntry.assignConnection(connFactory.createConnection(null));
    }
    final HttpRoute route = poolEntry.getRoute();
    final HttpHost host;
    if (route.getProxyHost() != null) {
        host = route.getProxyHost();
    } else {
        host = route.getTargetHost();
    }
    // 通过poolEntry#connRef获取
    final ManagedHttpClientConnection conn = poolEntry.getConnection();
    // 连接执行者
    this.connectionOperator.connect(
            conn,
            host,//目标地址
            route.getLocalSocketAddress(),// 本地地址
            connectTimeout,//超时时间
            defaultSocketConfig != null ? this.defaultSocketConfig : SocketConfig.DEFAULT,//socket配置
            context);// http配置上下文
}

InternalConnectionEndpoint

public class PoolingHttpClientConnectionManager{
	 class InternalConnectionEndpoint extends ConnectionEndpoint implements Identifiable {
		public ClassicHttpResponse execute(final String exchangeId,final ClassicHttpRequest request,final HttpRequestExecutor requestExecutor,final HttpContext context) throws IOException, HttpException {
            final ManagedHttpClientConnection connection = getValidatedPoolEntry().getConnection();
            //HttpRequestExecutor
            return requestExecutor.execute(request, connection, context);
        }
	}
}

HttpClientConnectionOperator

主要实现TCP连接的建立。将连接与Socket绑定。

public void connect(
            final ManagedHttpClientConnection conn,final HttpHost host,final InetSocketAddress localAddress,
            final TimeValue connectTimeout,final SocketConfig socketConfig, final HttpContext context) throws IOException {
        final Lookup<ConnectionSocketFactory> registry = getSocketFactoryRegistry(context);
        final ConnectionSocketFactory sf = registry.lookup(host.getSchemeName());
        final InetAddress[] addresses = host.getAddress() != null ?
                new InetAddress[] { host.getAddress() } : this.dnsResolver.resolve(host.getHostName());
        final int port = this.schemePortResolver.resolve(host);
        for (int i = 0; i < addresses.length; i++) {
            final InetAddress address = addresses[i];
            final boolean last = i == addresses.length - 1;
            Socket sock = sf.createSocket(context);
            sock.setSoTimeout(socketConfig.getSoTimeout().toMillisecondsIntBound());
            sock.setReuseAddress(socketConfig.isSoReuseAddress());
            sock.setTcpNoDelay(socketConfig.isTcpNoDelay());
            sock.setKeepAlive(socketConfig.isSoKeepAlive());
            ...
            // 创建 SocketHolder,最终由 BHttpConnectionBase#socketHolderRef持有
            conn.bind(sock);
            final InetSocketAddress remoteAddress = new InetSocketAddress(address, port);
            try {
                sock = sf.connectSocket(connectTimeout, sock, host, remoteAddress, localAddress, context);
                conn.bind(sock);
                return;
            }
        }
    }

StrictConnPool

public Future<PoolEntry<T, C>> lease(final T route, final Object state,final Timeout requestTimeout,final 
FutureCallback<PoolEntry<T, C>> callback) {
	LeaseRequest<T, C> request = new LeaseRequest<>(route, state, requestTimeout, future);
	final boolean completed = processPendingRequest(request);
    if (!request.isDone() && !completed) {
        this.leasingRequests.add(request);
    }
    if (request.isDone()) {
        this.completedRequests.add(request);
    }
}

PoolEntry

// 持有 DefaultManagedHttpClientConnection引用
private final AtomicReference<C> connRef;

private boolean processPendingRequest(final LeaseRequest<T, C> request) {
    final T route = request.getRoute();
    final Object state = request.getState();
    final Deadline deadline = request.getDeadline();
    final PerRoutePool<T, C> pool = getPool(route);
    PoolEntry<T, C> entry;
	...
    if (pool.getAllocatedCount() < maxPerRoute) {
        ...
        entry = pool.createEntry(this.timeToLive);
        this.leased.add(entry);
        request.completed(entry);
        if (this.connPoolListener != null) {
            this.connPoolListener.onLease(entry.getRoute(), this);
        }
        return true;
    }
    return false;
}

private PerRoutePool<T, C> getPool(final T route) {
    PerRoutePool<T, C> pool = this.routeToPool.get(route);
    if (pool == null) {
        pool = new PerRoutePool<>(route, this.disposalCallback);
        this.routeToPool.put(route, pool);
    }
    return pool;
}

public PoolEntry<T, C> createEntry(final TimeValue timeToLive) {
    final PoolEntry<T, C> entry = new PoolEntry<>(this.route, timeToLive, disposalCallback);
    this.leased.add(entry);
    return entry;
}

HttpRequestExecutor

public ClassicHttpResponse execute(
        final ClassicHttpRequest request,
        final HttpClientConnection conn,
        final HttpResponseInformationCallback informationCallback,
        final HttpContext context) throws IOException, HttpException {
    try {
        ...
        // 发送请求头
        conn.sendRequestHeader(request);
        boolean expectContinue = false;
        final HttpEntity entity = request.getEntity();
        if (entity != null) {
            final Header expect = request.getFirstHeader(HttpHeaders.EXPECT);
            expectContinue = expect != null && HeaderElements.CONTINUE.equalsIgnoreCase(expect.getValue());
            if (!expectContinue) {
            	// 如果请求实体不为空,则发送请求体
                conn.sendRequestEntity(request);
            }
        }
        conn.flush();
        ClassicHttpResponse response = null;
        while (response == null) {
            if (expectContinue) {
                if (conn.isDataAvailable(this.waitForContinue)) {
                	// 获取响应头
                    response = conn.receiveResponseHeader();
                    if (streamListener != null) {
                        streamListener.onResponseHead(conn, response);
                    }
                    final int status = response.getCode();
                    if (status == HttpStatus.SC_CONTINUE) {
                        // discard 100-continue
                        response = null;
                        conn.sendRequestEntity(request);
                    } else if (status < HttpStatus.SC_SUCCESS) {
                        ...
                        continue;
                    } else if (status >= HttpStatus.SC_CLIENT_ERROR){
                        conn.terminateRequest(request);
                    } else {
                        conn.sendRequestEntity(request);
                    }
                } else {
                    conn.sendRequestEntity(request);
                }
                conn.flush();
                expectContinue = false;
            } else {
            	// 调用抽象类 DefaultBHttpClientConnection#receiveResponseHeader
                response = conn.receiveResponseHeader();
                ...
            }
        }
        if (MessageSupport.canResponseHaveBody(request.getMethod(), response)) {
        	// 获取响应实体
            conn.receiveResponseEntity(response);
        }
        return response;
    } 
}

DefaultManagedHttpClientConnection

真正的客户端链接。初始化DefaultManagedHttpClientConnection首先会初始化DefaultBHttpClientConnectionBHttpConnectionBase、SessionInputBufferImpl。

final AtomicReference<SocketHolder> socketHolderRef;//持有SocketHolder
BHttpConnectionBase(
        final Http1Config http1Config,
        final CharsetDecoder charDecoder,
        final CharsetEncoder charEncoder) {
    this.http1Config = http1Config != null ? http1Config : Http1Config.DEFAULT;
    final BasicHttpTransportMetrics inTransportMetrics = new BasicHttpTransportMetrics();
    final BasicHttpTransportMetrics outTransportMetrics = new BasicHttpTransportMetrics();
    this.inBuffer = new SessionInputBufferImpl(inTransportMetrics,
            this.http1Config.getBufferSize(), -1,
            this.http1Config.getMaxLineLength(), charDecoder);
    this.outbuffer = new SessionOutputBufferImpl(outTransportMetrics,
            this.http1Config.getBufferSize(),
            this.http1Config.getChunkSizeHint(), charEncoder);
    this.connMetrics = new BasicHttpConnectionMetrics(inTransportMetrics, outTransportMetrics);
    this.socketHolderRef = new AtomicReference<>();
}
// Socket 相关配置
public SessionInputBufferImpl(
        final BasicHttpTransportMetrics metrics,
        final int bufferSize,// 8192 每次从socket读取的字节数
        final int minChunkLimit,
        final int maxLineLen,
        final CharsetDecoder charDecoder) {
    this.metrics = metrics;
    this.buffer = new byte[bufferSize];
    this.bufferPos = 0;
    this.bufferLen = 0;
    this.minChunkLimit = minChunkLimit >= 0 ? minChunkLimit : 512;
    this.maxLineLen = maxLineLen > 0 ? maxLineLen : 0;
    this.lineBuffer = new ByteArrayBuffer(bufferSize);
    this.decoder = charDecoder;
}

DefaultBHttpClientConnection

public ClassicHttpResponse receiveResponseHeader() throws HttpException, IOException {
    final SocketHolder socketHolder = ensureOpen();
    //从Socket读取服务端响应流  
    // responseParser:LenientHttpResponseParser
    final ClassicHttpResponse response = this.responseParser.parse(this.inBuffer, socketHolder.getInputStream());
    ...
    return response;
}
<dependency>
    <groupId>org.apache.httpcomponents.client5</groupId>
    <artifactId>httpclient5-fluent</artifactId>
    <version>5.0.1</version>
</dependency>

HttpContext

在这里插入图片描述

HttpResponse

在这里插入图片描述

CloseableHttpResponse

public final class CloseableHttpResponse implements ClassicHttpResponse {
	// BasicClassicHttpResponse
	private final ClassicHttpResponse response;
    private final ExecRuntime execRuntime;
}

BasicClassicHttpResponse

public class BasicClassicHttpResponse extends BasicHttpResponse implements ClassicHttpResponse {
	private HttpEntity entity;
}

HttpEntity

在这里插入图片描述

最后

以上就是勤奋芹菜为你收集整理的1、HttpClient源码解析之MinimalHttpClient1、MinimalHttpClient2、InternalHttpClientHttpClientConnectionManagerStrictConnPoolHttpRequestExecutorDefaultManagedHttpClientConnectionHttpContextHttpResponseHttpEntity的全部内容,希望文章能够帮你解决1、HttpClient源码解析之MinimalHttpClient1、MinimalHttpClient2、InternalHttpClientHttpClientConnectionManagerStrictConnPoolHttpRequestExecutorDefaultManagedHttpClientConnectionHttpContextHttpResponseHttpEntity所遇到的程序开发问题。

如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。

本图文内容来源于网友提供,作为学习参考使用,或来自网络收集整理,版权属于原作者所有。
点赞(48)

评论列表共有 0 条评论

立即
投稿
返回
顶部