赞
踩
在基于SpringCloud开发的微服务中,我们一般会选择在网关层记录请求和响应日志,并将其收集到ELK中用作查询和分析。
今天我们就来看看如何实现此功能。
首先我们在网关中定义一个日志实体,用于组装日志对象
- @Data
- public class AccessLog {
-
- /**用户编号**/
- private Long userId;
-
- /**路由**/
- private String targetServer;
-
- /**协议**/
- private String schema;
-
- /**请求方法名**/
- private String requestMethod;
-
- /**访问地址**/
- private String requestUrl;
-
- /**请求IP**/
- private String clientIp;
-
- /**查询参数**/
- private MultiValueMap<String, String> queryParams;
-
- /**请求体**/
- private String requestBody;
-
- /**请求头**/
- private MultiValueMap<String, String> requestHeaders;
-
- /**响应体**/
- private String responseBody;
-
- /**响应头**/
- private MultiValueMap<String, String> responseHeaders;
-
- /**响应结果**/
- private HttpStatusCode httpStatusCode;
-
- /**开始请求时间**/
- private LocalDateTime startTime;
-
- /**结束请求时间**/
- private LocalDateTime endTime;
-
- /**执行时长,单位:毫秒**/
- private Integer duration;
-
- }

接下来我们在网关中定义一个Filter,用于收集日志信息。
- @Component
- public class AccessLogFilter implements GlobalFilter, Ordered {
-
- private final List<HttpMessageReader<?>> messageReaders = HandlerStrategies.withDefaults().messageReaders();
- /**
- * 打印日志
- * @param accessLog 网关日志
- */
- private void writeAccessLog(AccessLog accessLog) {
- log.info("----access---- : {}", JsonUtils.obj2StringPretty(accessLog));
- }
-
- /**
- * 顺序必须是<-1,否则标准的NettyWriteResponseFilter将在您的过滤器得到一个被调用的机会之前发送响应
- * 也就是说如果不小于 -1 ,将不会执行获取后端响应的逻辑
- * @return
- */
- @Override
- public int getOrder() {
- return -100;
- }
-
- @Override
- public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
- // 将 Request 中可以直接获取到的参数,设置到网关日志
- ServerHttpRequest request = exchange.getRequest();
-
- AccessLog gatewayLog = new AccessLog();
- gatewayLog.setTargetServer(WebUtils.getGatewayRoute(exchange).getId());
- gatewayLog.setSchema(request.getURI().getScheme());
- gatewayLog.setRequestMethod(request.getMethod().name());
- gatewayLog.setRequestUrl(request.getURI().getRawPath());
- gatewayLog.setQueryParams(request.getQueryParams());
- gatewayLog.setRequestHeaders(request.getHeaders());
- gatewayLog.setStartTime(LocalDateTime.now());
- gatewayLog.setClientIp(WebUtils.getClientIP(exchange));
-
- // 继续 filter 过滤
- MediaType mediaType = request.getHeaders().getContentType();
- if (MediaType.APPLICATION_FORM_URLENCODED.isCompatibleWith(mediaType)
- || MediaType.APPLICATION_JSON.isCompatibleWith(mediaType)) { // 适合 JSON 和 Form 提交的请求
- return filterWithRequestBody(exchange, chain, gatewayLog);
- }
- return filterWithoutRequestBody(exchange, chain, gatewayLog);
- }
-
-
- /**
- * 没有请求体的请求只需要记录日志
- */
- private Mono<Void> filterWithoutRequestBody(ServerWebExchange exchange, GatewayFilterChain chain, AccessLog accessLog) {
- // 包装 Response,用于记录 Response Body
- ServerHttpResponseDecorator decoratedResponse = recordResponseLog(exchange, accessLog);
-
- return chain.filter(exchange.mutate().response(decoratedResponse).build())
- .then(Mono.fromRunnable(() -> writeAccessLog(accessLog)));
- }
-
- /**
- * 需要读取请求体
- * 参考 {@link ModifyRequestBodyGatewayFilterFactory} 实现
- */
- private Mono<Void> filterWithRequestBody(ServerWebExchange exchange, GatewayFilterChain chain, AccessLog gatewayLog) {
- // 设置 Request Body 读取时,设置到网关日志
- ServerRequest serverRequest = ServerRequest.create(exchange, messageReaders);
-
- Mono<String> modifiedBody = serverRequest.bodyToMono(String.class).flatMap(body -> {
- gatewayLog.setRequestBody(body);
- return Mono.just(body);
- });
-
- // 通过 BodyInserter 插入 body(支持修改body), 避免 request body 只能获取一次
- BodyInserter<Mono<String>, ReactiveHttpOutputMessage> bodyInserter = BodyInserters.fromPublisher(modifiedBody, String.class);
- HttpHeaders headers = new HttpHeaders();
- headers.putAll(exchange.getRequest().getHeaders());
- // the new content type will be computed by bodyInserter
- // and then set in the request decorator
- headers.remove(HttpHeaders.CONTENT_LENGTH);
-
- CachedBodyOutputMessage outputMessage = new CachedBodyOutputMessage(exchange, headers);
-
- // 通过 BodyInserter 将 Request Body 写入到 CachedBodyOutputMessage 中
- return bodyInserter.insert(outputMessage, new BodyInserterContext()).then(Mono.defer(() -> {
- // 重新封装请求
- ServerHttpRequest decoratedRequest = requestDecorate(exchange, headers, outputMessage);
- // 记录响应日志
- ServerHttpResponseDecorator decoratedResponse = recordResponseLog(exchange, gatewayLog);
- // 记录普通的
- return chain.filter(exchange.mutate().request(decoratedRequest).response(decoratedResponse).build())
- .then(Mono.fromRunnable(() -> writeAccessLog(gatewayLog))); // 打印日志
-
- }));
- }
-
- /**
- * 记录响应日志
- * 通过 DataBufferFactory 解决响应体分段传输问题。
- */
- private ServerHttpResponseDecorator recordResponseLog(ServerWebExchange exchange, AccessLog accessLog) {
- ServerHttpResponse response = exchange.getResponse();
-
- return new ServerHttpResponseDecorator(response) {
-
- @Override
- public Mono<Void> writeWith(Publisher<? extends DataBuffer> body) {
- if (body instanceof Flux) {
- DataBufferFactory bufferFactory = response.bufferFactory();
- // 计算执行时间
- accessLog.setEndTime(LocalDateTime.now());
- accessLog.setDuration((int) (LocalDateTimeUtil.between(accessLog.getStartTime(),
- accessLog.getEndTime()).toMillis()));
- accessLog.setResponseHeaders(response.getHeaders());
- accessLog.setHttpStatusCode(response.getStatusCode());
-
- // 获取响应类型,如果是 json 就打印
- String originalResponseContentType = exchange.getAttribute(ServerWebExchangeUtils.ORIGINAL_RESPONSE_CONTENT_TYPE_ATTR);
-
- if (StrUtil.isNotBlank(originalResponseContentType)
- && originalResponseContentType.contains("application/json")) {
- Flux<? extends DataBuffer> fluxBody = Flux.from(body);
-
- return super.writeWith(fluxBody.buffer().map(dataBuffers -> {
- // 设置 response body 到网关日志
- byte[] content = readContent(dataBuffers);
- String responseResult = new String(content, StandardCharsets.UTF_8);
- accessLog.setResponseBody(responseResult);
-
- // 响应
- return bufferFactory.wrap(content);
- }));
- }
- }
- // if body is not a flux. never got there.
- return super.writeWith(body);
- }
- };
- }
-
-
- /**
- * 请求装饰器,支持重新计算 headers、body 缓存
- *
- * @param exchange 请求
- * @param headers 请求头
- * @param outputMessage body 缓存
- * @return 请求装饰器
- */
- private ServerHttpRequestDecorator requestDecorate(ServerWebExchange exchange, HttpHeaders headers, CachedBodyOutputMessage outputMessage) {
- return new ServerHttpRequestDecorator(exchange.getRequest()) {
-
- @Override
- public HttpHeaders getHeaders() {
- long contentLength = headers.getContentLength();
- HttpHeaders httpHeaders = new HttpHeaders();
- httpHeaders.putAll(super.getHeaders());
- if (contentLength > 0) {
- httpHeaders.setContentLength(contentLength);
- } else {
- // TODO: this causes a 'HTTP/1.1 411 Length Required' // on
- // httpbin.org
- httpHeaders.set(HttpHeaders.TRANSFER_ENCODING, "chunked");
- }
- return httpHeaders;
- }
-
- @Override
- public Flux<DataBuffer> getBody() {
- return outputMessage.getBody();
- }
- };
- }
-
- /**
- * 从dataBuffers中读取数据
- * @author jam
- * @date 2024/5/26 22:31
- */
- private byte[] readContent(List<? extends DataBuffer> dataBuffers) {
- // 合并多个流集合,解决返回体分段传输
- DataBufferFactory dataBufferFactory = new DefaultDataBufferFactory();
- DataBuffer join = dataBufferFactory.join(dataBuffers);
- byte[] content = new byte[join.readableByteCount()];
- join.read(content);
- // 释放掉内存
- DataBufferUtils.release(join);
- return content;
- }
-
- }

代码较长建议直接拷贝到编辑器,只要注意下面一个关键点:
getOrder()
方法返回的值必须要<-1,否则标准的NettyWriteResponseFilter将在您的过滤器被调用的机会之前发送响应,即不会执行获取后端响应参数的方法
通过上面的两步我们已经可以获取到请求的输入输出参数了,在 writeAccessLog()
中将其打印到日志文件,方便通过ELK进行收集。
在实际项目中,网关日志量一般会非常大,不建议使用数据库进行存储。
服务正常响应
服务异常响应
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。