1. 新增分页接口聚合查询注解支持

2. 优化 databus api 日志记录的字段缺失问题
3. 新增 eplat sso 页面登录校验
4. 用户、部门编辑新增 seata 事务支持
5. 新增 iwork 流程发起接口
6. 新增 eban 同步用户时的岗位处理逻辑
7. 新增无 skywalking 时的 traceId 支持
This commit is contained in:
chenbowen
2025-11-18 10:03:34 +08:00
parent af7f103a38
commit 266eb45e00
74 changed files with 5001 additions and 102 deletions

View File

@@ -7,17 +7,23 @@ import com.zt.plat.module.databus.controller.admin.gateway.vo.accesslog.ApiAcces
import com.zt.plat.module.databus.controller.admin.gateway.vo.accesslog.ApiAccessLogRespVO;
import com.zt.plat.module.databus.dal.dataobject.gateway.ApiAccessLogDO;
import com.zt.plat.module.databus.service.gateway.ApiAccessLogService;
import com.zt.plat.module.databus.service.gateway.ApiDefinitionService;
import io.swagger.v3.oas.annotations.Operation;
import io.swagger.v3.oas.annotations.Parameter;
import io.swagger.v3.oas.annotations.tags.Tag;
import jakarta.annotation.Resource;
import jakarta.validation.Valid;
import org.springframework.security.access.prepost.PreAuthorize;
import org.springframework.util.CollectionUtils;
import org.springframework.util.StringUtils;
import org.springframework.validation.annotation.Validated;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import static com.zt.plat.framework.common.pojo.CommonResult.success;
@@ -33,13 +39,18 @@ public class ApiAccessLogController {
@Resource
private ApiAccessLogService apiAccessLogService;
@Resource
private ApiDefinitionService apiDefinitionService;
@GetMapping("/get")
@Operation(summary = "获取访问日志详情")
@Parameter(name = "id", description = "日志编号", required = true, example = "1024")
@PreAuthorize("@ss.hasPermission('databus:gateway:access-log:query')")
public CommonResult<ApiAccessLogRespVO> get(@RequestParam("id") Long id) {
ApiAccessLogDO logDO = apiAccessLogService.get(id);
return success(ApiAccessLogConvert.INSTANCE.convert(logDO));
ApiAccessLogRespVO respVO = ApiAccessLogConvert.INSTANCE.convert(logDO);
enrichDefinitionInfo(respVO);
return success(respVO);
}
@GetMapping("/page")
@@ -47,6 +58,51 @@ public class ApiAccessLogController {
@PreAuthorize("@ss.hasPermission('databus:gateway:access-log:query')")
public CommonResult<PageResult<ApiAccessLogRespVO>> page(@Valid ApiAccessLogPageReqVO pageReqVO) {
PageResult<ApiAccessLogDO> pageResult = apiAccessLogService.getPage(pageReqVO);
return success(ApiAccessLogConvert.INSTANCE.convertPage(pageResult));
PageResult<ApiAccessLogRespVO> result = ApiAccessLogConvert.INSTANCE.convertPage(pageResult);
enrichDefinitionInfo(result.getList());
return success(result);
}
private void enrichDefinitionInfo(List<ApiAccessLogRespVO> list) {
// 对分页结果批量补充 API 描述,使用本地缓存减少重复查询
if (CollectionUtils.isEmpty(list)) {
return;
}
Map<String, String> cache = new HashMap<>(list.size());
list.forEach(item -> {
if (item == null) {
return;
}
String cacheKey = buildCacheKey(item.getApiCode(), item.getApiVersion());
if (!cache.containsKey(cacheKey)) {
cache.put(cacheKey, resolveApiDescription(item.getApiCode(), item.getApiVersion()));
}
item.setApiDescription(cache.get(cacheKey));
});
}
private void enrichDefinitionInfo(ApiAccessLogRespVO item) {
// 单条数据同样需要补全描述信息
if (item == null) {
return;
}
item.setApiDescription(resolveApiDescription(item.getApiCode(), item.getApiVersion()));
}
private String resolveApiDescription(String apiCode, String apiVersion) {
if (!StringUtils.hasText(apiCode)) {
return null;
}
String normalizedVersion = StringUtils.hasText(apiVersion) ? apiVersion.trim() : apiVersion;
// 通过网关定义服务补全 API 描述,提升页面可读性
return apiDefinitionService.findByCodeAndVersionIncludingInactive(apiCode, normalizedVersion)
.map(aggregate -> aggregate.getDefinition() != null ? aggregate.getDefinition().getDescription() : null)
.filter(StringUtils::hasText)
.orElse(null);
}
private String buildCacheKey(String apiCode, String apiVersion) {
// 组合唯一键,避免重复查询相同的 API 描述
return (apiCode == null ? "" : apiCode) + "#" + (apiVersion == null ? "" : apiVersion);
}
}

View File

@@ -4,7 +4,9 @@ import com.zt.plat.framework.common.pojo.PageResult;
import com.zt.plat.module.databus.controller.admin.gateway.vo.accesslog.ApiAccessLogRespVO;
import com.zt.plat.module.databus.dal.dataobject.gateway.ApiAccessLogDO;
import org.mapstruct.Mapper;
import org.mapstruct.Mapping;
import org.mapstruct.factory.Mappers;
import org.springframework.http.HttpStatus;
import java.util.List;
@@ -13,6 +15,8 @@ public interface ApiAccessLogConvert {
ApiAccessLogConvert INSTANCE = Mappers.getMapper(ApiAccessLogConvert.class);
@Mapping(target = "statusDesc", expression = "java(statusDesc(bean.getStatus()))")
@Mapping(target = "responseStatusText", expression = "java(resolveHttpStatusText(bean.getResponseStatus()))")
ApiAccessLogRespVO convert(ApiAccessLogDO bean);
List<ApiAccessLogRespVO> convertList(List<ApiAccessLogDO> list);
@@ -26,4 +30,26 @@ public interface ApiAccessLogConvert {
result.setTotal(page.getTotal());
return result;
}
default String statusDesc(Integer status) {
// 将数字状态码转换为中文描述,方便前端直接展示
if (status == null) {
return "未知";
}
return switch (status) {
case 0 -> "成功";
case 1 -> "客户端错误";
case 2 -> "服务端错误";
default -> "未知";
};
}
default String resolveHttpStatusText(Integer status) {
// 统一使用 Spring 的 HttpStatus 解析出标准文案
if (status == null) {
return null;
}
HttpStatus resolved = HttpStatus.resolve(status);
return resolved != null ? resolved.getReasonPhrase() : null;
}
}

View File

@@ -21,6 +21,9 @@ public class ApiAccessLogRespVO {
@Schema(description = "API 编码", example = "user.query")
private String apiCode;
@Schema(description = "API 描述", example = "用户查询服务")
private String apiDescription;
@Schema(description = "API 版本", example = "v1")
private String apiVersion;
@@ -42,6 +45,9 @@ public class ApiAccessLogRespVO {
@Schema(description = "响应 HTTP 状态", example = "200")
private Integer responseStatus;
@Schema(description = "响应 HTTP 状态说明", example = "OK")
private String responseStatusText;
@Schema(description = "响应提示", example = "OK")
private String responseMessage;
@@ -51,6 +57,9 @@ public class ApiAccessLogRespVO {
@Schema(description = "访问状态", example = "0")
private Integer status;
@Schema(description = "访问状态展示文案", example = "成功")
private String statusDesc;
@Schema(description = "错误码", example = "DAT-001")
private String errorCode;

View File

@@ -28,7 +28,7 @@ public class ApiAccessLogDO extends TenantBaseDO {
private Long id;
/**
* 请求追踪标识,对应 {@link com.zt.plat.module.databus.framework.integration.gateway.model.ApiInvocationContext#getRequestId()}
* 请求追踪标识,对应 {@link com.zt.plat.framework.common.util.monitor.TracerUtils#getTraceId()}
*/
private String traceId;

View File

@@ -2,12 +2,14 @@ package com.zt.plat.module.databus.framework.integration.gateway.core;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.zt.plat.framework.common.util.monitor.TracerUtils;
import com.zt.plat.module.databus.dal.dataobject.gateway.ApiAccessLogDO;
import com.zt.plat.module.databus.framework.integration.gateway.model.ApiInvocationContext;
import com.zt.plat.module.databus.service.gateway.ApiAccessLogService;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.http.HttpHeaders;
import org.springframework.http.HttpStatus;
import org.springframework.stereotype.Component;
import org.springframework.util.CollectionUtils;
import org.springframework.util.StringUtils;
@@ -42,8 +44,9 @@ public class ApiGatewayAccessLogger {
*/
public void onRequest(ApiInvocationContext context) {
try {
String traceId = TracerUtils.getTraceId();
ApiAccessLogDO logDO = new ApiAccessLogDO();
logDO.setTraceId(context.getRequestId());
logDO.setTraceId(traceId);
logDO.setApiCode(context.getApiCode());
logDO.setApiVersion(context.getApiVersion());
logDO.setRequestMethod(context.getHttpMethod());
@@ -60,7 +63,7 @@ public class ApiGatewayAccessLogger {
Long logId = apiAccessLogService.create(logDO);
context.getAttributes().put(ATTR_LOG_ID, logId);
} catch (Exception ex) {
log.warn("记录 API 访问日志开始阶段失败, traceId={}", context.getRequestId(), ex);
log.warn("记录 API 访问日志开始阶段失败, traceId={}", TracerUtils.getTraceId(), ex);
}
}
@@ -85,12 +88,18 @@ public class ApiGatewayAccessLogger {
try {
ApiAccessLogDO update = new ApiAccessLogDO();
update.setId(logId);
update.setResponseStatus(context.getResponseStatus());
update.setResponseMessage(context.getResponseMessage());
int responseStatus = resolveHttpStatus(context);
context.setResponseStatus(responseStatus);
update.setResponseStatus(responseStatus);
String responseMessage = resolveResponseMessage(context, responseStatus);
update.setResponseMessage(responseMessage);
if (!StringUtils.hasText(context.getResponseMessage()) && StringUtils.hasText(responseMessage)) {
context.setResponseMessage(responseMessage);
}
update.setResponseBody(toJson(context.getResponseBody()));
update.setStatus(resolveStatus(context.getResponseStatus()));
update.setErrorCode(extractErrorCode(context.getResponseBody()));
update.setErrorMessage(resolveErrorMessage(context));
update.setStatus(resolveStatus(responseStatus));
update.setErrorCode(extractErrorCode(context.getResponseBody(), responseStatus));
update.setErrorMessage(resolveErrorMessage(context, responseStatus));
update.setExceptionStack((String) context.getAttributes().get(ATTR_EXCEPTION_STACK));
update.setStepResults(toJson(context.getStepResults()));
update.setExtra(toJson(buildExtra(context)));
@@ -98,7 +107,7 @@ public class ApiGatewayAccessLogger {
update.setDuration(calculateDuration(context));
apiAccessLogService.update(update);
} catch (Exception ex) {
log.warn("记录 API 访问日志结束阶段失败, traceId={}, logId={}", context.getRequestId(), logId, ex);
log.warn("记录 API 访问日志结束阶段失败, traceId={}, logId={}", TracerUtils.getTraceId(), logId, ex);
}
}
@@ -137,7 +146,10 @@ public class ApiGatewayAccessLogger {
return 3;
}
private String resolveErrorMessage(ApiInvocationContext context) {
private String resolveErrorMessage(ApiInvocationContext context, int responseStatus) {
if (!isErrorStatus(responseStatus)) {
return null;
}
if (StringUtils.hasText(context.getResponseMessage())) {
return truncate(context.getResponseMessage());
}
@@ -151,7 +163,10 @@ public class ApiGatewayAccessLogger {
return null;
}
private String extractErrorCode(Object responseBody) {
private String extractErrorCode(Object responseBody, int responseStatus) {
if (!isErrorStatus(responseStatus)) {
return null;
}
if (responseBody instanceof Map<?, ?> map) {
Object errorCode = firstNonNull(map.get("errorCode"), map.get("code"));
return errorCode == null ? null : truncate(String.valueOf(errorCode));
@@ -159,6 +174,27 @@ public class ApiGatewayAccessLogger {
return null;
}
private int resolveHttpStatus(ApiInvocationContext context) {
Integer status = context.getResponseStatus();
if (status != null) {
return status;
}
// 默认兜底为 200避免日志中出现空的 HTTP 状态码
return HttpStatus.OK.value();
}
private String resolveResponseMessage(ApiInvocationContext context, int responseStatus) {
if (StringUtils.hasText(context.getResponseMessage())) {
return truncate(context.getResponseMessage());
}
HttpStatus resolved = HttpStatus.resolve(responseStatus);
return resolved != null ? resolved.getReasonPhrase() : null;
}
private boolean isErrorStatus(int responseStatus) {
return responseStatus >= 400;
}
private Map<String, Object> buildExtra(ApiInvocationContext context) {
Map<String, Object> extra = new HashMap<>();
if (!CollectionUtils.isEmpty(context.getVariables())) {

View File

@@ -4,6 +4,7 @@ import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.zt.plat.framework.common.exception.ServiceException;
import com.zt.plat.framework.common.exception.util.ServiceExceptionUtil;
import com.zt.plat.framework.common.util.monitor.TracerUtils;
import com.zt.plat.module.databus.controller.admin.gateway.vo.ApiGatewayInvokeReqVO;
import com.zt.plat.module.databus.framework.integration.config.ApiGatewayProperties;
import com.zt.plat.module.databus.framework.integration.gateway.domain.ApiDefinitionAggregate;
@@ -236,12 +237,12 @@ public class ApiGatewayExecutionService {
String message = StringUtils.hasText(context.getResponseMessage())
? context.getResponseMessage()
: HttpStatus.valueOf(status).getReasonPhrase();
return ApiGatewayResponse.builder()
return ApiGatewayResponse.builder()
.code(status)
.message(message)
.response(context.getResponseBody())
.traceId(context.getRequestId())
.build();
.message(message)
.response(context.getResponseBody())
.traceId(TracerUtils.getTraceId())
.build();
}
private String normalizeBasePath(String basePath) {

View File

@@ -4,6 +4,7 @@ import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.zt.plat.framework.common.util.json.JsonUtils;
import com.zt.plat.framework.common.util.monitor.TracerUtils;
import com.zt.plat.framework.common.util.security.CryptoSignatureUtils;
import com.zt.plat.framework.common.util.servlet.ServletUtils;
import com.zt.plat.framework.security.core.LoginUser;
@@ -464,11 +465,12 @@ public class GatewaySecurityFilter extends OncePerRequestFilter {
response.resetBuffer();
response.setStatus(status.value());
String resolvedMessage = StringUtils.hasText(message) ? message : status.getReasonPhrase();
ApiGatewayResponse envelope = ApiGatewayResponse.builder()
String traceId = TracerUtils.getTraceId();
ApiGatewayResponse envelope = ApiGatewayResponse.builder()
.code(status.value())
.message(resolvedMessage)
.response(null)
.traceId(null)
.traceId(traceId)
.build();
if (shouldEncryptErrorResponse(security, credential)) {
String encryptionKey = credential.getEncryptionKey();

View File

@@ -0,0 +1,360 @@
package com.zt.plat.module.databus.framework.integration.gateway.step.impl;
import com.zt.plat.framework.common.exception.ServiceException;
import org.junit.jupiter.api.Test;
import org.springframework.http.MediaType;
import org.springframework.http.client.reactive.ReactorClientHttpConnector;
import org.springframework.web.reactive.function.client.WebClient;
import reactor.core.publisher.Mono;
import reactor.netty.http.client.HttpClient;
import reactor.netty.http.client.PrematureCloseException;
import reactor.netty.resources.ConnectionProvider;
import reactor.util.retry.Retry;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.OutputStream;
import java.net.*;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
import static org.assertj.core.api.Assertions.assertThat;
/**
* Demonstrates the stale-connection scenario using the legacy vs the deferred retry pipeline.
*/
class HttpStepHandlerConnectionResetScenarioTest {
private static final Duration RETRY_DELAY = Duration.ofMillis(200);
private static final int RETRY_ATTEMPTS = 3;
private static final Duration BLOCK_TIMEOUT = Duration.ofSeconds(5);
private static final Duration RESET_WAIT = Duration.ofMillis(300);
@Test
void legacyPipelineLosesSuccessfulRetry() throws Exception {
try (ResetOnceHttpServer server = new ResetOnceHttpServer()) {
WebClient webClient = createWebClient();
URI uri = server.uri("/demo");
warmUp(server, webClient, uri);
server.awaitWarmupConnectionReset(RESET_WAIT);
legacyInvoke(webClient, uri, Map.of("mode", "legacy"));
server.awaitFreshResponses(1, Duration.ofSeconds(2));
assertThat(server.getFreshResponseCount()).isEqualTo(1);
assertThat(server.getServedBodies()).contains("reset", "fresh");
}
}
@Test
void deferredPipelinePropagatesSuccessfulRetry() throws Exception {
try (ResetOnceHttpServer server = new ResetOnceHttpServer()) {
WebClient webClient = createWebClient();
URI uri = server.uri("/demo");
warmUp(server, webClient, uri);
server.awaitWarmupConnectionReset(RESET_WAIT);
Object result = deferredInvoke(webClient, uri, Map.of("mode", "defer"));
assertThat(result).isInstanceOf(Map.class);
Map<?, ?> resultMap = (Map<?, ?>) result;
assertThat(resultMap.get("stage")).isEqualTo("fresh");
server.awaitFreshResponses(1, Duration.ofSeconds(2));
assertThat(server.getFreshResponseCount()).isEqualTo(1);
assertThat(server.getServedBodies()).contains("reset", "fresh");
}
}
private WebClient createWebClient() {
ConnectionProvider provider = ConnectionProvider.builder("http-step-handler-demo")
.maxConnections(1)
.pendingAcquireMaxCount(-1)
.maxIdleTime(Duration.ofSeconds(5))
.build();
HttpClient httpClient = HttpClient.create(provider).compress(true);
return WebClient.builder()
.clientConnector(new ReactorClientHttpConnector(httpClient))
.build();
}
private void warmUp(ResetOnceHttpServer server, WebClient webClient, URI uri) {
webClient.post()
.uri(uri)
.contentType(MediaType.APPLICATION_JSON)
.accept(MediaType.APPLICATION_JSON)
.bodyValue(Map.of("warm", true))
.retrieve()
.bodyToMono(Object.class)
.block(BLOCK_TIMEOUT);
server.awaitWarmupResponse(Duration.ofSeconds(2));
}
private Object legacyInvoke(WebClient webClient, URI uri, Object body) {
WebClient.RequestHeadersSpec<?> spec = webClient.post()
.uri(uri)
.contentType(MediaType.APPLICATION_JSON)
.accept(MediaType.APPLICATION_JSON)
.bodyValue(body);
Mono<Object> responseMono = spec.retrieve()
.bodyToMono(Object.class)
// 模拟业务中首次订阅后缓存失败结果的场景
.cache();
return responseMono.retryWhen(Retry.fixedDelay(RETRY_ATTEMPTS, RETRY_DELAY)
.filter(this::isRetryableException)
.onRetryExhaustedThrow((specification, signal) -> signal.failure()))
.block(BLOCK_TIMEOUT);
}
private Object deferredInvoke(WebClient webClient, URI uri, Object body) {
Mono<Object> responseMono = Mono.defer(() -> webClient.post()
.uri(uri)
.contentType(MediaType.APPLICATION_JSON)
.accept(MediaType.APPLICATION_JSON)
.bodyValue(body)
.retrieve()
.bodyToMono(Object.class)
// 通过 defer每次重试都会重新创建带缓存的响应 Mono
.cache());
return responseMono.retryWhen(Retry.fixedDelay(RETRY_ATTEMPTS, RETRY_DELAY)
.filter(this::isRetryableException))
.block(BLOCK_TIMEOUT);
}
private boolean isRetryableException(Throwable throwable) {
if (throwable == null) {
return false;
}
Throwable cursor = throwable;
while (cursor != null) {
if (cursor instanceof ServiceException) {
return false;
}
if (cursor instanceof PrematureCloseException) {
return true;
}
if (cursor instanceof IOException) {
return true;
}
cursor = cursor.getCause();
}
return false;
}
private static final class ResetOnceHttpServer implements AutoCloseable {
private static final Duration RESET_DELAY = Duration.ofMillis(250);
private final ServerSocket serverSocket;
private final ExecutorService acceptExecutor;
private final ScheduledExecutorService scheduler;
private final AtomicInteger connectionCount = new AtomicInteger();
private final AtomicInteger freshResponses = new AtomicInteger();
private final CountDownLatch warmupResponseSent = new CountDownLatch(1);
private final CountDownLatch warmupReset = new CountDownLatch(1);
private final List<String> servedBodies = new CopyOnWriteArrayList<>();
private volatile boolean running = true;
private volatile Socket warmupSocket;
ResetOnceHttpServer() throws IOException {
this.serverSocket = new ServerSocket(0, 50, InetAddress.getByName("127.0.0.1"));
this.serverSocket.setReuseAddress(true);
this.acceptExecutor = Executors.newSingleThreadExecutor(r -> {
Thread t = new Thread(r, "reset-once-http-accept");
t.setDaemon(true);
return t;
});
this.scheduler = Executors.newSingleThreadScheduledExecutor(r -> {
Thread t = new Thread(r, "reset-once-http-scheduler");
t.setDaemon(true);
return t;
});
acceptExecutor.submit(this::acceptLoop);
}
URI uri(String path) {
Objects.requireNonNull(path, "path");
if (!path.startsWith("/")) {
path = "/" + path;
}
return URI.create("http://127.0.0.1:" + serverSocket.getLocalPort() + path);
}
List<String> getServedBodies() {
return new ArrayList<>(servedBodies);
}
int getFreshResponseCount() {
return freshResponses.get();
}
void awaitWarmupResponse(Duration timeout) {
awaitLatch(warmupResponseSent, timeout);
}
void awaitWarmupConnectionReset(Duration timeout) {
awaitLatch(warmupReset, timeout);
}
void awaitFreshResponses(int expected, Duration timeout) {
long deadline = System.nanoTime() + timeout.toNanos();
while (freshResponses.get() < expected && System.nanoTime() < deadline) {
try {
Thread.sleep(10);
} catch (InterruptedException ignored) {
Thread.currentThread().interrupt();
return;
}
}
}
private void awaitLatch(CountDownLatch latch, Duration timeout) {
try {
if (!latch.await(timeout.toMillis(), TimeUnit.MILLISECONDS)) {
throw new IllegalStateException("Timed out waiting for latch");
}
} catch (InterruptedException ex) {
Thread.currentThread().interrupt();
throw new IllegalStateException(ex);
}
}
private void acceptLoop() {
try {
while (running) {
Socket socket = serverSocket.accept();
int index = connectionCount.incrementAndGet();
handle(socket, index);
}
} catch (SocketException ex) {
if (running) {
throw new IllegalStateException("Unexpected server socket error", ex);
}
} catch (IOException ex) {
if (running) {
throw new IllegalStateException("I/O error in server", ex);
}
}
}
private void handle(Socket socket, int index) {
try {
socket.setTcpNoDelay(true);
RequestMetadata metadata = readRequest(socket);
if (index == 1) {
warmupSocket = socket;
String body = "{\"stage\":\"warmup\",\"path\":\"" + metadata.path + "\"}";
writeResponse(socket, body, true);
servedBodies.add("warmup");
warmupResponseSent.countDown();
scheduler.schedule(() -> forceReset(socket), RESET_DELAY.toMillis(), TimeUnit.MILLISECONDS);
} else if (index == 2) {
// 模拟客户端复用到仍在连接池中的旧连接,但服务端已在请求到达后立即复位。
servedBodies.add("reset");
scheduler.schedule(() -> closeWithReset(socket), 10, TimeUnit.MILLISECONDS);
} else {
String body = "{\"stage\":\"fresh\",\"attempt\":" + index + "}";
writeResponse(socket, body, false);
servedBodies.add("fresh");
freshResponses.incrementAndGet();
socket.close();
}
} catch (IOException ex) {
// ignore for the purpose of the test
}
}
private void forceReset(Socket socket) {
try {
if (!socket.isClosed()) {
servedBodies.add("reset");
closeWithReset(socket);
}
} finally {
warmupReset.countDown();
}
}
private void closeWithReset(Socket socket) {
try {
if (!socket.isClosed()) {
socket.setSoLinger(true, 0);
socket.close();
}
} catch (IOException ignored) {
// ignore
}
}
private RequestMetadata readRequest(Socket socket) throws IOException {
BufferedReader reader = new BufferedReader(new InputStreamReader(socket.getInputStream(), StandardCharsets.US_ASCII));
String requestLine = reader.readLine();
if (requestLine == null) {
return new RequestMetadata("unknown", 0);
}
String path = requestLine.split(" ", 3)[1];
int contentLength = 0;
String line;
while ((line = reader.readLine()) != null && !line.isEmpty()) {
if (line.toLowerCase(Locale.ROOT).startsWith("content-length:")) {
contentLength = Integer.parseInt(line.substring(line.indexOf(':') + 1).trim());
}
}
if (contentLength > 0) {
char[] buffer = new char[contentLength];
int read = 0;
while (read < contentLength) {
int r = reader.read(buffer, read, contentLength - read);
if (r < 0) {
break;
}
read += r;
}
}
return new RequestMetadata(path, contentLength);
}
private void writeResponse(Socket socket, String body, boolean keepAlive) throws IOException {
byte[] bodyBytes = body.getBytes(StandardCharsets.UTF_8);
StringBuilder builder = new StringBuilder()
.append("HTTP/1.1 200 OK\r\n")
.append("Content-Type: application/json\r\n")
.append("Content-Length: ").append(bodyBytes.length).append("\r\n");
if (keepAlive) {
builder.append("Connection: keep-alive\r\n");
} else {
builder.append("Connection: close\r\n");
}
builder.append("\r\n");
OutputStream outputStream = socket.getOutputStream();
outputStream.write(builder.toString().getBytes(StandardCharsets.US_ASCII));
outputStream.write(bodyBytes);
outputStream.flush();
}
@Override
public void close() throws Exception {
running = false;
try {
serverSocket.close();
} catch (IOException ignored) {
}
if (warmupSocket != null && !warmupSocket.isClosed()) {
try {
warmupSocket.close();
} catch (IOException ignored) {
}
}
scheduler.shutdownNow();
acceptExecutor.shutdownNow();
}
private record RequestMetadata(String path, int contentLength) {
}
}
}