概述
官方说明:Commons HttpClient 3.x
codeline is at the end of life. All users of Commons HttpClient 3.x are strongly encouraged to upgrade to HttpClient 4.1。
<dependency>
<groupId>commons-httpclient</groupId>
<artifactId>commons-httpclient</artifactId>
<version>3.1</version>
</dependency>
备注:HttpClient 3.x 的版本都是 Commons HttpClient 的实现版本。HttpClient 4.x以及以上都是 httpclientcommons
的实现版本。无论是何种方式都是基于 HttpCore「HttpComponents Core」
和 HttpClient「HttpComponents Client」
实现的。其中HttpClient是基于 HttpCore实现的「意味着:导入对应的库HttpClient库时,也要导入相关的HttpCore的库」。
httpcore实现了一套HTTP 协议的基础组件,为构建客户端/代理/服务器端 HTTP 服务一致的 API。支持NIO & BIO。
httpclient 兼容HTTP 1.1、客户端认证功能、HTTP状态管理、HTTP连接管理等功能。
HttpComponents Client 是官方推荐使用的以下两种HTTP组件:
- httpclient5-fluent:内部已经依赖了httpclient5、httpcore5。
<dependency>
<groupId>org.apache.httpcomponents.client5</groupId>
<artifactId>httpclient5-fluent</artifactId>
<version>5.0.1</version>
</dependency>
- httpclient:内部依赖了httpcore。
<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpclient</artifactId>
<version>4.5.6</version>
</dependency>
httpclient5-fluent
流式组件:去掉了https证书验证、禁用了请求重试「disableAutomaticRetries()」、简单的post、get、上传文件 等封装。有需要其他的可以自行调用call 方法、可自行调整连接池大小和超时时间。
HttpClient的连接池是在4.3之后的版本才有,我们可以通过PoolingHttpClientConnectionManager来管理我们的连接池以及查看连接池的使用情况。
CloseableHttpClient 是抽象类,真正执行请求「doExecute」的是其实现类。根据实现方式选择为MinimalHttpClient、InternalHttpClient。
1、MinimalHttpClient
一个极简可以用的版本,是核心设计的原初模型。只是封装了最基本的HTTP过程,提供最直接的客户端服务器交互,不支持代理,不支持在各种情况下的重试(重定向,权限校验,IO异常等)。
/**
* Minimal implementation of {@link CloseableHttpClient}. This client is
* optimized for HTTP/1.1 message transport and does not support advanced
* HTTP protocol functionality such as request execution via a proxy, state
* management, authentication and request redirects.
* <p>
* Concurrent message exchanges executed by this client will get assigned to
* separate connections leased from the connection pool.
* </p>
*/
简洁翻译:适用于HTTP/1.1协议的消息传输。不涉及HTTP协议中代理、状态管理、认证以及重定向功能。每个消息都是通过链接池中独立的连接处理的。
protected CloseableHttpResponse doExecute(
final HttpHost target,
final ClassicHttpRequest request,
final HttpContext context) throws IOException {
...
final ExecRuntime execRuntime = new InternalExecRuntime(log, connManager, requestExecutor,
request instanceof CancellableDependency ? (CancellableDependency) request : null);
try {
if (!execRuntime.isEndpointAcquired()) {
// 创建Endpoint
execRuntime.acquireEndpoint(exchangeId, route, null, clientContext);
}
if (!execRuntime.isEndpointConnected()) {
// 通过 Endpoint 与服务端建立连接,并且将 连接与Socket绑定,创建 SocketHolder
execRuntime.connectEndpoint(clientContext);
}
...
httpProcessor.process(request, request.getEntity(), context);
// 发送请求头、请求实体 & 返回响应头以及响应实体等信息
final ClassicHttpResponse response = execRuntime.execute(exchangeId, request, clientContext);
httpProcessor.process(response, response.getEntity(), context);
...
// check for entity, release connection if possible
final HttpEntity entity = response.getEntity();
if (entity == null || !entity.isStreaming()) {
// connection not needed and (assumed to be) in re-usable state
execRuntime.releaseEndpoint();
return new CloseableHttpResponse(response, null);
}
ResponseEntityProxy.enhance(response, execRuntime);
return new CloseableHttpResponse(response, execRuntime);
}
}
1.1、InternalExecRuntime
LeaseRequest
class InternalExecRuntime implements ExecRuntime, Cancellable {
private final HttpClientConnectionManager manager;
private final AtomicReference<ConnectionEndpoint> endpointRef;
public void acquireEndpoint(
final String id, final HttpRoute route, final Object object, final HttpClientContext context) throws
IOException {
...
// 1、从客户端连接管理器 PoolingHttpClientConnectionManager#StrictConnPool 租借 LeaseRequest
final LeaseRequest connRequest = manager.lease(id, route, connectionRequestTimeout, object);
...
// 创建的Endpoint为 PoolingHttpClientConnectionManager的内部类InternalConnectionEndpoint
final ConnectionEndpoint connectionEndpoint = connRequest.get(connectionRequestTimeout);
endpointRef.set(connectionEndpoint);
...
}
private void connectEndpoint(final ConnectionEndpoint endpoint, final HttpClientContext context) throws IOException
{
//HttpClientConnectionManager
manager.connect(endpoint, connectTimeout, context);
}
public ClassicHttpResponse execute(
final String id,
final ClassicHttpRequest request,
final HttpClientContext context) throws IOException, HttpException {
// 通过 endpointRef 获取 ConnectionEndpoint
final ConnectionEndpoint endpoint = ensureValid();
if (!endpoint.isConnected()) {
connectEndpoint(endpoint, context);
}
final RequestConfig requestConfig = context.getRequestConfig();
final Timeout responseTimeout = requestConfig.getResponseTimeout();
if (responseTimeout != null) {
endpoint.setSocketTimeout(responseTimeout);
}
if (log.isDebugEnabled()) {
log.debug(ConnPoolSupport.getId(endpoint) + ": start execution " + id);
}
return endpoint.execute(id, request, requestExecutor, context);
}
}
- 获取InternalConnectionEndpoint。
- 连接Endpoint,将InternalConnectionEndpoint与Socket连接关系,与服务端建立连接通信。
2、InternalHttpClient
/**
* Internal implementation of {@link CloseableHttpClient}.
* <p>
* Concurrent message exchanges executed by this client will get assigned to
* separate connections leased from the connection pool.
* </p>
*/
HttpClientConnectionManager
BasicHttpClientConnectionManager
BasicHttpClientConnectionManager
是个简单的单连接管理器。尽管这个类是线程安全的,它在同一时间也只能被一个线程使用。对于相同的路由,BasicHttpClientConnectionManager
会尽量重用旧的连接来发送后续的请求。如果后续请求的路由和旧连接中的路由不匹配,BasicHttpClientConnectionManager
就会关闭当前连接,使用请求中的路由重新建立连接。如果当前的连接正在被占用,会抛出java.lang.IllegalStateException异常。
/**
* A connection manager for a single connection. This connection manager maintains only one active
* connection. Even though this class is fully thread-safe it ought to be used by one execution
* thread only, as only one thread a time can lease the connection at a time.
* <p>
* This connection manager will make an effort to reuse the connection for subsequent requests
* with the same {@link HttpRoute route}. It will, however, close the existing connection and
* open it for the given route, if the route of the persistent connection does not match that
* of the connection request. If the connection has been already been allocated
* {@link IllegalStateException} is thrown.
*/
public class BasicHttpClientConnectionManager implements HttpClientConnectionManager {
}
PoolingHttpClientConnectionManager
创建MinimalHttpClient
时初始化连接池。
public class PoolingHttpClientConnectionManager
implements HttpClientConnectionManager, ConnPoolControl<HttpRoute> {
private final ManagedConnPool<HttpRoute, ManagedHttpClientConnection> pool;
private final HttpClientConnectionOperator connectionOperator;
protected PoolingHttpClientConnectionManager(
final HttpClientConnectionOperator httpClientConnectionOperator,
final PoolConcurrencyPolicy poolConcurrencyPolicy,
final PoolReusePolicy poolReusePolicy,
final TimeValue timeToLive,
final HttpConnectionFactory<ManagedHttpClientConnection> connFactory) {
super();
this.connectionOperator = Args.notNull(httpClientConnectionOperator, "Connection operator");
switch (poolConcurrencyPolicy != null ? poolConcurrencyPolicy : PoolConcurrencyPolicy.STRICT) {
case STRICT:
this.pool = new StrictConnPool<>(
DEFAULT_MAX_CONNECTIONS_PER_ROUTE,// 5
DEFAULT_MAX_TOTAL_CONNECTIONS,// 25
timeToLive,// -1
poolReusePolicy,//LIFO
null);
break;
case LAX:
this.pool = new LaxConnPool<>(
DEFAULT_MAX_CONNECTIONS_PER_ROUTE,
timeToLive,
poolReusePolicy,
null);
break;
default:
throw new IllegalArgumentException("Unexpected PoolConcurrencyPolicy value: " + poolConcurrencyPolicy);
}
this.connFactory = connFactory != null ? connFactory : ManagedHttpClientConnectionFactory.INSTANCE;
this.closed = new AtomicBoolean(false);
}
}
public LeaseRequest lease(final String id,final HttpRoute route,final Timeout requestTimeout,final Object state) {
//pool:StrictConnPool
final Future<PoolEntry<HttpRoute, ManagedHttpClientConnection>> leaseFuture = this.pool.lease(route, state, requestTimeout, null);
return new LeaseRequest() {
private volatile ConnectionEndpoint endpoint;
@Override
public synchronized ConnectionEndpoint get(
final Timeout timeout) throws InterruptedException, ExecutionException, TimeoutException {
if (this.endpoint != null) {
return this.endpoint;
}
final PoolEntry<HttpRoute, ManagedHttpClientConnection> poolEntry;
try {
poolEntry = leaseFuture.get(timeout.getDuration(), timeout.getTimeUnit());
}
try {
...
final ManagedHttpClientConnection conn = poolEntry.getConnection();
if (conn != null) {
conn.activate();
} else {
// 创建客户端连接 DefaultManagedHttpClientConnection,并且有 poolEntry#connRef持有
poolEntry.assignConnection(connFactory.createConnection(null));
}
if (leaseFuture.isCancelled()) {
pool.release(poolEntry, false);
} else {
// 创建默认的Endpoint 之 InternalConnectionEndpoint
this.endpoint = new InternalConnectionEndpoint(poolEntry);
}
return this.endpoint;
}
}
...
};
}
public void connect(final ConnectionEndpoint endpoint, final TimeValue connectTimeout, final HttpContext context) {
final InternalConnectionEndpoint internalEndpoint = cast(endpoint);
final PoolEntry<HttpRoute, ManagedHttpClientConnection> poolEntry = internalEndpoint.getPoolEntry();
if (!poolEntry.hasConnection()) {
poolEntry.assignConnection(connFactory.createConnection(null));
}
final HttpRoute route = poolEntry.getRoute();
final HttpHost host;
if (route.getProxyHost() != null) {
host = route.getProxyHost();
} else {
host = route.getTargetHost();
}
// 通过poolEntry#connRef获取
final ManagedHttpClientConnection conn = poolEntry.getConnection();
// 连接执行者
this.connectionOperator.connect(
conn,
host,//目标地址
route.getLocalSocketAddress(),// 本地地址
connectTimeout,//超时时间
defaultSocketConfig != null ? this.defaultSocketConfig : SocketConfig.DEFAULT,//socket配置
context);// http配置上下文
}
InternalConnectionEndpoint
public class PoolingHttpClientConnectionManager{
class InternalConnectionEndpoint extends ConnectionEndpoint implements Identifiable {
public ClassicHttpResponse execute(final String exchangeId,final ClassicHttpRequest request,final HttpRequestExecutor requestExecutor,final HttpContext context) throws IOException, HttpException {
final ManagedHttpClientConnection connection = getValidatedPoolEntry().getConnection();
//HttpRequestExecutor
return requestExecutor.execute(request, connection, context);
}
}
}
HttpClientConnectionOperator
主要实现TCP连接的建立。将连接与Socket绑定。
public void connect(
final ManagedHttpClientConnection conn,final HttpHost host,final InetSocketAddress localAddress,
final TimeValue connectTimeout,final SocketConfig socketConfig, final HttpContext context) throws IOException {
final Lookup<ConnectionSocketFactory> registry = getSocketFactoryRegistry(context);
final ConnectionSocketFactory sf = registry.lookup(host.getSchemeName());
final InetAddress[] addresses = host.getAddress() != null ?
new InetAddress[] { host.getAddress() } : this.dnsResolver.resolve(host.getHostName());
final int port = this.schemePortResolver.resolve(host);
for (int i = 0; i < addresses.length; i++) {
final InetAddress address = addresses[i];
final boolean last = i == addresses.length - 1;
Socket sock = sf.createSocket(context);
sock.setSoTimeout(socketConfig.getSoTimeout().toMillisecondsIntBound());
sock.setReuseAddress(socketConfig.isSoReuseAddress());
sock.setTcpNoDelay(socketConfig.isTcpNoDelay());
sock.setKeepAlive(socketConfig.isSoKeepAlive());
...
// 创建 SocketHolder,最终由 BHttpConnectionBase#socketHolderRef持有
conn.bind(sock);
final InetSocketAddress remoteAddress = new InetSocketAddress(address, port);
try {
sock = sf.connectSocket(connectTimeout, sock, host, remoteAddress, localAddress, context);
conn.bind(sock);
return;
}
}
}
StrictConnPool
public Future<PoolEntry<T, C>> lease(final T route, final Object state,final Timeout requestTimeout,final
FutureCallback<PoolEntry<T, C>> callback) {
LeaseRequest<T, C> request = new LeaseRequest<>(route, state, requestTimeout, future);
final boolean completed = processPendingRequest(request);
if (!request.isDone() && !completed) {
this.leasingRequests.add(request);
}
if (request.isDone()) {
this.completedRequests.add(request);
}
}
PoolEntry
// 持有 DefaultManagedHttpClientConnection引用
private final AtomicReference<C> connRef;
private boolean processPendingRequest(final LeaseRequest<T, C> request) {
final T route = request.getRoute();
final Object state = request.getState();
final Deadline deadline = request.getDeadline();
final PerRoutePool<T, C> pool = getPool(route);
PoolEntry<T, C> entry;
...
if (pool.getAllocatedCount() < maxPerRoute) {
...
entry = pool.createEntry(this.timeToLive);
this.leased.add(entry);
request.completed(entry);
if (this.connPoolListener != null) {
this.connPoolListener.onLease(entry.getRoute(), this);
}
return true;
}
return false;
}
private PerRoutePool<T, C> getPool(final T route) {
PerRoutePool<T, C> pool = this.routeToPool.get(route);
if (pool == null) {
pool = new PerRoutePool<>(route, this.disposalCallback);
this.routeToPool.put(route, pool);
}
return pool;
}
public PoolEntry<T, C> createEntry(final TimeValue timeToLive) {
final PoolEntry<T, C> entry = new PoolEntry<>(this.route, timeToLive, disposalCallback);
this.leased.add(entry);
return entry;
}
HttpRequestExecutor
public ClassicHttpResponse execute(
final ClassicHttpRequest request,
final HttpClientConnection conn,
final HttpResponseInformationCallback informationCallback,
final HttpContext context) throws IOException, HttpException {
try {
...
// 发送请求头
conn.sendRequestHeader(request);
boolean expectContinue = false;
final HttpEntity entity = request.getEntity();
if (entity != null) {
final Header expect = request.getFirstHeader(HttpHeaders.EXPECT);
expectContinue = expect != null && HeaderElements.CONTINUE.equalsIgnoreCase(expect.getValue());
if (!expectContinue) {
// 如果请求实体不为空,则发送请求体
conn.sendRequestEntity(request);
}
}
conn.flush();
ClassicHttpResponse response = null;
while (response == null) {
if (expectContinue) {
if (conn.isDataAvailable(this.waitForContinue)) {
// 获取响应头
response = conn.receiveResponseHeader();
if (streamListener != null) {
streamListener.onResponseHead(conn, response);
}
final int status = response.getCode();
if (status == HttpStatus.SC_CONTINUE) {
// discard 100-continue
response = null;
conn.sendRequestEntity(request);
} else if (status < HttpStatus.SC_SUCCESS) {
...
continue;
} else if (status >= HttpStatus.SC_CLIENT_ERROR){
conn.terminateRequest(request);
} else {
conn.sendRequestEntity(request);
}
} else {
conn.sendRequestEntity(request);
}
conn.flush();
expectContinue = false;
} else {
// 调用抽象类 DefaultBHttpClientConnection#receiveResponseHeader
response = conn.receiveResponseHeader();
...
}
}
if (MessageSupport.canResponseHaveBody(request.getMethod(), response)) {
// 获取响应实体
conn.receiveResponseEntity(response);
}
return response;
}
}
DefaultManagedHttpClientConnection
真正的客户端链接。初始化DefaultManagedHttpClientConnection
首先会初始化DefaultBHttpClientConnection
、BHttpConnectionBase
、SessionInputBufferImpl。
final AtomicReference<SocketHolder> socketHolderRef;//持有SocketHolder
BHttpConnectionBase(
final Http1Config http1Config,
final CharsetDecoder charDecoder,
final CharsetEncoder charEncoder) {
this.http1Config = http1Config != null ? http1Config : Http1Config.DEFAULT;
final BasicHttpTransportMetrics inTransportMetrics = new BasicHttpTransportMetrics();
final BasicHttpTransportMetrics outTransportMetrics = new BasicHttpTransportMetrics();
this.inBuffer = new SessionInputBufferImpl(inTransportMetrics,
this.http1Config.getBufferSize(), -1,
this.http1Config.getMaxLineLength(), charDecoder);
this.outbuffer = new SessionOutputBufferImpl(outTransportMetrics,
this.http1Config.getBufferSize(),
this.http1Config.getChunkSizeHint(), charEncoder);
this.connMetrics = new BasicHttpConnectionMetrics(inTransportMetrics, outTransportMetrics);
this.socketHolderRef = new AtomicReference<>();
}
// Socket 相关配置
public SessionInputBufferImpl(
final BasicHttpTransportMetrics metrics,
final int bufferSize,// 8192 每次从socket读取的字节数
final int minChunkLimit,
final int maxLineLen,
final CharsetDecoder charDecoder) {
this.metrics = metrics;
this.buffer = new byte[bufferSize];
this.bufferPos = 0;
this.bufferLen = 0;
this.minChunkLimit = minChunkLimit >= 0 ? minChunkLimit : 512;
this.maxLineLen = maxLineLen > 0 ? maxLineLen : 0;
this.lineBuffer = new ByteArrayBuffer(bufferSize);
this.decoder = charDecoder;
}
DefaultBHttpClientConnection
public ClassicHttpResponse receiveResponseHeader() throws HttpException, IOException {
final SocketHolder socketHolder = ensureOpen();
//从Socket读取服务端响应流
// responseParser:LenientHttpResponseParser
final ClassicHttpResponse response = this.responseParser.parse(this.inBuffer, socketHolder.getInputStream());
...
return response;
}
<dependency>
<groupId>org.apache.httpcomponents.client5</groupId>
<artifactId>httpclient5-fluent</artifactId>
<version>5.0.1</version>
</dependency>
HttpContext
HttpResponse
CloseableHttpResponse
public final class CloseableHttpResponse implements ClassicHttpResponse {
// BasicClassicHttpResponse
private final ClassicHttpResponse response;
private final ExecRuntime execRuntime;
}
BasicClassicHttpResponse
public class BasicClassicHttpResponse extends BasicHttpResponse implements ClassicHttpResponse {
private HttpEntity entity;
}
HttpEntity
最后
以上就是勤奋芹菜为你收集整理的1、HttpClient源码解析之MinimalHttpClient1、MinimalHttpClient2、InternalHttpClientHttpClientConnectionManagerStrictConnPoolHttpRequestExecutorDefaultManagedHttpClientConnectionHttpContextHttpResponseHttpEntity的全部内容,希望文章能够帮你解决1、HttpClient源码解析之MinimalHttpClient1、MinimalHttpClient2、InternalHttpClientHttpClientConnectionManagerStrictConnPoolHttpRequestExecutorDefaultManagedHttpClientConnectionHttpContextHttpResponseHttpEntity所遇到的程序开发问题。
如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。
发表评论 取消回复