博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
聊聊Elasticsearch RestClient的RequestLogger
阅读量:6611 次
发布时间:2019-06-24

本文共 11734 字,大约阅读时间需要 39 分钟。

hot3.png

本文主要研究一下Elasticsearch RestClient的RequestLogger

RequestLogger

elasticsearch-7.0.1/client/rest/src/main/java/org/elasticsearch/client/RequestLogger.java

final class RequestLogger {    private static final Log tracer = LogFactory.getLog("tracer");    private RequestLogger() {    }    /**     * Logs a request that yielded a response     */    static void logResponse(Log logger, HttpUriRequest request, HttpHost host, HttpResponse httpResponse) {        if (logger.isDebugEnabled()) {            logger.debug("request [" + request.getMethod() + " " + host + getUri(request.getRequestLine()) +                    "] returned [" + httpResponse.getStatusLine() + "]");        }        if (logger.isWarnEnabled()) {            Header[] warnings = httpResponse.getHeaders("Warning");            if (warnings != null && warnings.length > 0) {                logger.warn(buildWarningMessage(request, host, warnings));            }        }        if (tracer.isTraceEnabled()) {            String requestLine;            try {                requestLine = buildTraceRequest(request, host);            } catch(IOException e) {                requestLine = "";                tracer.trace("error while reading request for trace purposes", e);            }            String responseLine;            try {                responseLine = buildTraceResponse(httpResponse);            } catch(IOException e) {                responseLine = "";                tracer.trace("error while reading response for trace purposes", e);            }            tracer.trace(requestLine + '\n' + responseLine);        }    }    /**     * Logs a request that failed     */    static void logFailedRequest(Log logger, HttpUriRequest request, Node node, Exception e) {        if (logger.isDebugEnabled()) {            logger.debug("request [" + request.getMethod() + " " + node.getHost() + getUri(request.getRequestLine()) + "] failed", e);        }        if (tracer.isTraceEnabled()) {            String traceRequest;            try {                traceRequest = buildTraceRequest(request, node.getHost());            } catch (IOException e1) {                tracer.trace("error while reading request for trace purposes", e);                traceRequest = "";            }            tracer.trace(traceRequest);        }    }    static String buildWarningMessage(HttpUriRequest request, HttpHost host, Header[] warnings) {        StringBuilder message = new StringBuilder("request [").append(request.getMethod()).append(" ").append(host)                .append(getUri(request.getRequestLine())).append("] returned ").append(warnings.length).append(" warnings: ");        for (int i = 0; i < warnings.length; i++) {            if (i > 0) {                message.append(",");            }            message.append("[").append(warnings[i].getValue()).append("]");        }        return message.toString();    }    /**     * Creates curl output for given request     */    static String buildTraceRequest(HttpUriRequest request, HttpHost host) throws IOException {        String requestLine = "curl -iX " + request.getMethod() + " '" + host + getUri(request.getRequestLine()) + "'";        if (request instanceof  HttpEntityEnclosingRequest) {            HttpEntityEnclosingRequest enclosingRequest = (HttpEntityEnclosingRequest) request;            if (enclosingRequest.getEntity() != null) {                requestLine += " -d '";                HttpEntity entity = enclosingRequest.getEntity();                if (entity.isRepeatable() == false) {                    entity = new BufferedHttpEntity(enclosingRequest.getEntity());                    enclosingRequest.setEntity(entity);                }                requestLine += EntityUtils.toString(entity, StandardCharsets.UTF_8) + "'";            }        }        return requestLine;    }    /**     * Creates curl output for given response     */    static String buildTraceResponse(HttpResponse httpResponse) throws IOException {        StringBuilder responseLine = new StringBuilder();        responseLine.append("# ").append(httpResponse.getStatusLine());        for (Header header : httpResponse.getAllHeaders()) {            responseLine.append("\n# ").append(header.getName()).append(": ").append(header.getValue());        }        responseLine.append("\n#");        HttpEntity entity = httpResponse.getEntity();        if (entity != null) {            if (entity.isRepeatable() == false) {                entity = new BufferedHttpEntity(entity);            }            httpResponse.setEntity(entity);            ContentType contentType = ContentType.get(entity);            Charset charset = StandardCharsets.UTF_8;            if (contentType != null && contentType.getCharset() != null) {                charset = contentType.getCharset();            }            try (BufferedReader reader = new BufferedReader(new InputStreamReader(entity.getContent(), charset))) {                String line;                while( (line = reader.readLine()) != null) {                    responseLine.append("\n# ").append(line);                }            }        }        return responseLine.toString();    }    private static String getUri(RequestLine requestLine) {        if (requestLine.getUri().charAt(0) != '/') {            return "/" + requestLine.getUri();        }        return requestLine.getUri();    }}
  • RequestLogger提供了logResponse、logFailedRequest等方法

RestClient

elasticsearch-7.0.1/client/rest/src/main/java/org/elasticsearch/client/RestClient.java

public class RestClient implements Closeable {	//......    public Response performRequest(Request request) throws IOException {        InternalRequest internalRequest = new InternalRequest(request);        return performRequest(nextNodes(), internalRequest, null);    }    private Response performRequest(final NodeTuple
> nodeTuple, final InternalRequest request, Exception previousException) throws IOException { RequestContext context = request.createContextForNextAttempt(nodeTuple.nodes.next(), nodeTuple.authCache); HttpResponse httpResponse; try { httpResponse = client.execute(context.requestProducer, context.asyncResponseConsumer, context.context, null).get(); } catch(Exception e) { RequestLogger.logFailedRequest(logger, request.httpRequest, context.node, e); onFailure(context.node); Exception cause = extractAndWrapCause(e); addSuppressedException(previousException, cause); if (nodeTuple.nodes.hasNext()) { return performRequest(nodeTuple, request, cause); } if (cause instanceof IOException) { throw (IOException) cause; } if (cause instanceof RuntimeException) { throw (RuntimeException) cause; } throw new IllegalStateException("unexpected exception type: must be either RuntimeException or IOException", cause); } ResponseOrResponseException responseOrResponseException = convertResponse(request, context.node, httpResponse); if (responseOrResponseException.responseException == null) { return responseOrResponseException.response; } addSuppressedException(previousException, responseOrResponseException.responseException); if (nodeTuple.nodes.hasNext()) { return performRequest(nodeTuple, request, responseOrResponseException.responseException); } throw responseOrResponseException.responseException; } private ResponseOrResponseException convertResponse(InternalRequest request, Node node, HttpResponse httpResponse) throws IOException { RequestLogger.logResponse(logger, request.httpRequest, node.getHost(), httpResponse); int statusCode = httpResponse.getStatusLine().getStatusCode(); Response response = new Response(request.httpRequest.getRequestLine(), node.getHost(), httpResponse); if (isSuccessfulResponse(statusCode) || request.ignoreErrorCodes.contains(response.getStatusLine().getStatusCode())) { onResponse(node); if (request.warningsHandler.warningsShouldFailRequest(response.getWarnings())) { throw new WarningFailureException(response); } return new ResponseOrResponseException(response); } ResponseException responseException = new ResponseException(response); if (isRetryStatus(statusCode)) { //mark host dead and retry against next one onFailure(node); return new ResponseOrResponseException(responseException); } //mark host alive and don't retry, as the error should be a request problem onResponse(node); throw responseException; } private void performRequestAsync(final NodeTuple
> nodeTuple, final InternalRequest request, final FailureTrackingResponseListener listener) { final RequestContext context = request.createContextForNextAttempt(nodeTuple.nodes.next(), nodeTuple.authCache); client.execute(context.requestProducer, context.asyncResponseConsumer, context.context, new FutureCallback
() { @Override public void completed(HttpResponse httpResponse) { try { ResponseOrResponseException responseOrResponseException = convertResponse(request, context.node, httpResponse); if (responseOrResponseException.responseException == null) { listener.onSuccess(responseOrResponseException.response); } else { if (nodeTuple.nodes.hasNext()) { listener.trackFailure(responseOrResponseException.responseException); performRequestAsync(nodeTuple, request, listener); } else { listener.onDefinitiveFailure(responseOrResponseException.responseException); } } } catch(Exception e) { listener.onDefinitiveFailure(e); } } @Override public void failed(Exception failure) { try { RequestLogger.logFailedRequest(logger, request.httpRequest, context.node, failure); onFailure(context.node); if (nodeTuple.nodes.hasNext()) { listener.trackFailure(failure); performRequestAsync(nodeTuple, request, listener); } else { listener.onDefinitiveFailure(failure); } } catch(Exception e) { listener.onDefinitiveFailure(e); } } @Override public void cancelled() { listener.onDefinitiveFailure(new ExecutionException("request was cancelled", null)); } }); } //......}
  • RestClient的performRequest方法在异常的时候会调用RequestLogger.logFailedRequest,成功时则调用convertResponse方法,该方法首先通过RequestLogger.logResponse来记录response;performRequestAsync方法则是在completed方法里头调用convertResponse方法,在failed方法里头调用RequestLogger.logFailedRequest

小结

  • RequestLogger提供了logResponse、logFailedRequest等方法
  • RestClient的performRequest方法在异常的时候会调用RequestLogger.logFailedRequest,成功时则调用convertResponse方法,该方法首先通过RequestLogger.logResponse来记录response
  • RestClient的performRequestAsync方法则是在completed方法里头调用convertResponse方法,在failed方法里头调用RequestLogger.logFailedRequest

doc

转载于:https://my.oschina.net/go4it/blog/3050573

你可能感兴趣的文章
使用JdbcTemplate和JdbcDaoSupport
查看>>
C博客作业--指针
查看>>
版本12.2.0.1.0数据库,复制种子数据库快速创建租户数据库PDB
查看>>
Glibc 和 uClibc
查看>>
vs2012中自带IIS如何让其他电脑访问
查看>>
Redux:异步操作
查看>>
Mysql学习第三课-分析二进制日志进行增量备份和还原
查看>>
如何在 Android 手机上安装 Ubuntu 13.04
查看>>
HDU 6073 - Matching In Multiplication | 2017 Multi-University Training Contest 4
查看>>
C语言 scanf()和gets()函数的区别
查看>>
如何检测域名是否被微信屏蔽 微信域名检测接口API是如何实现
查看>>
POJ1611-The Suspects
查看>>
Linux下安装Python-3.3.2【转】
查看>>
LeetCode OJ:Merge Two Sorted Lists(合并两个链表)
查看>>
功能测试
查看>>
【BZOJ 1901】Dynamic Rankings
查看>>
阿里架构师都在学的知识体系
查看>>
PAT (Advanced Level) 1028. List Sorting (25)
查看>>
【转】聚集索引和非聚集索引的区别
查看>>
【转】mac os 安装php
查看>>