如何优化Elasticsearch数据同步以应对查询?

摘要:Elasticsearch数据同步优化 背景 为了满足项目需求,需要将大量数据的数据写入到ES进行检索,预估数据量是40亿左右,目前需要同步进去的是2亿左右。 ES集群配置 三台128G的国产服务器 国产linux系统 CPU主频低的拉跨
Elasticsearch数据同步优化 背景 为了满足项目需求,需要将大量数据的数据写入到ES进行检索,预估数据量是40亿左右,目前需要同步进去的是2亿左右。 ES集群配置 三台128G的国产服务器 国产linux系统 CPU主频低的拉跨 JDK8的版本 机械硬盘 遇到的问题 后端使用Java调用es的bulk api进行数据同步,数据同步特别慢,在测试环境做同步的时候速度是很快的,但是在正式环境下速度出奇的慢。 直接上优化方案 升级JDK版本 将JDK的版本升级到JDK17(中途先升级到JDK11的),升级之后速度提升明显 JDK8的垃圾回收器到底是比不过JDK17 ES索引的副本数据在数据同步阶段设置为0 多个副本就意味着要多写几份数据 ES索引的分片数量设置为3 和集群数量一致 调整Java调用ES bulk api的代码 使用异步批量调用的方式,后面会详细介绍 经过一阵鼓捣 数据同步速度极大提升, Java调用ES bulk api 首先es是有一个bulk的批量接口的,一般来说做批量数据同步的时候是使用的这个api,实际上还有一种更加灵活的api,在ES7里面是BulkProcessor这个类,在ES8里面是BulkIngester类,两者功能基本一致。 先说一下这两个api的工作原理 bulk api 接收到批量数据之后 会立即将数据提交给es集群,es集群如果在使用默认写入配置的情况下,会很快将数据进行落盘的,数据落盘的这个过程是比较耗时的。 BulkProcessor BulkIngester 这两个类中是可以动态配置数据提交给es的机制,总体来说就是 数据会在内存中暂存起来,等数据的指标达到我们配置的值的时候 api就会异步的将数据提交给es集群,从而减少es集群数据落盘的次数 代码 ES7版本 @Value("${es.address}") private String esAddress; @Value("${es.username}") private String username; @Value("${es.password}") private String password; @Bean public RestHighLevelClient restHighLevelClient() { BasicCredentialsProvider basicCredentialsProvider = new BasicCredentialsProvider(); basicCredentialsProvider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials(username, password)); String[] esAddresses = esAddress.split(","); RestClientBuilder builder; if (esAddresses.length > 1) { HttpHost[] httpHosts = new HttpHost[esAddresses.length]; for (int i = 0; i < esAddresses.length; i++) { httpHosts[i] = HttpHost.create(esAddresses[i]); } builder = RestClient.builder(httpHosts); } else { HttpHost httpHost = HttpHost.create(esAddresses[0]); builder = RestClient.builder(httpHost); } builder.setRequestConfigCallback(f -> { f.setConnectTimeout(600000); f.setSocketTimeout(600000); f.setConnectionRequestTimeout(600000); return f; }); builder.setHttpClientConfigCallback(f -> f.setDefaultCredentialsProvider(basicCredentialsProvider)); return new RestHighLevelClient(builder); } @Bean public BulkProcessor bulkProcessor(RestHighLevelClient restHighLevelClient) { BulkProcessor.Listener listener = new BulkProcessor.Listener() { @Override public void beforeBulk(long executionId, BulkRequest request) { LOGGER.info("【beforeBulk】批次[{}] 携带 {} 请求数量", executionId, request.numberOfActions()); } @Override public void afterBulk(long executionId, BulkRequest request, BulkResponse response) { if (!response.hasFailures()) { LOGGER.info("【afterBulk-成功】批量 [{}] 完成在 {} ms", executionId, response.getTook().getMillis()); } else { BulkItemResponse[] items = response.getItems(); for (BulkItemResponse item : items) { if (item.isFailed()) { LOGGER.info("afterBulk-失败】批量 [{}] 出现异常的原因 : {}", executionId, item.getFailureMessage()); break; } } } } @Override public void afterBulk(long l, BulkRequest bulkRequest, Throwable throwable) { throwable.printStackTrace(); } }; BulkProcessor.Builder builder = BulkProcessor.builder(((bulkRequest, bulkResponseActionListener) -> { restHighLevelClient.bulkAsync(bulkRequest, RequestOptions.DEFAULT, bulkResponseActionListener); }), listener); //到达指定条数时刷新 -1则禁用该配置 builder.setBulkActions(bulkActions); //内存到达指定大小时刷新 builder.setBulkSize(new ByteSizeValue(bulkSize, ByteSizeUnit.MB)); //设置的刷新间隔 单位是s -1则禁用该配置 builder.setFlushInterval(TimeValue.timeValueSeconds(flushInterval)); //设置允许执行的并发请求数 builder.setConcurrentRequests(concurrentRequests); //设置重试策略 builder.setBackoffPolicy(BackoffPolicy.constantBackoff(TimeValue.timeValueSeconds(1), maxNumberOfRetries)); return builder.build(); } 这里将该类配置为spring bean,在使用的时候直接注入使用即可,剩下的交给BulkProcessor即可 @Resource private BulkProcessor bulkProcessor; IndexRequest request = new IndexRequest(); request.id(id); request.index(tableToEs.getIndexName()); request.source(JSON.toJSONString(esTopicCollectModel, serializeConfig), XContentType.JSON); bulkProcessor.add(request); 代码 ES8版本 @Configuration public class BeanConfig { @Value("${es.address}") private String esAddress; @Value("${es.username}") private String username; @Value("${es.password}") private String password; @Value("${es.concurrentRequests}") private Integer concurrentRequests; @Value("${es.bulkSize}") private Long bulkSize; @Value("${es.flushInterval}") private Integer flushInterval; private static final Logger LOGGER = LoggerFactory.getLogger(BeanConfig.class); /** * 配置es客户端 */ @Bean public ElasticsearchClient elasticsearchClient() throws Exception { String[] split = esAddress.split(","); HttpHost[] httpHosts = new HttpHost[split.length]; for (int i = 0; i < split.length; i++) { httpHosts[i] = HttpHost.create(split[i]); } // 账号密码的配置 CredentialsProvider credentialsProvider = new BasicCredentialsProvider(); credentialsProvider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials(username, password)); // 创建SSLContext以跳过SSL证书验证 SSLContext sslContext = SSLContextBuilder.create() .loadTrustMaterial((chain, authType) -> true) .build(); // 配置HTTP客户端以使用SSLContext和跳过SSL主机名验证 RestClientBuilder builder = RestClient.builder(httpHosts) .setHttpClientConfigCallback(httpClientBuilder -> httpClientBuilder .setSSLContext(sslContext) .setDefaultCredentialsProvider(credentialsProvider) .setSSLHostnameVerifier(NoopHostnameVerifier.INSTANCE) .setDefaultRequestConfig(RequestConfig.custom() .setConnectTimeout(60000) .setSocketTimeout(60000) .build() ) .setDefaultIOReactorConfig( IOReactorConfig.custom() .setConnectTimeout(60000) .setSoTimeout(60000) .setIoThreadCount(1) .build())); RestClientTransport transport = new RestClientTransport(builder.build(), new JacksonJsonpMapper()); return new ElasticsearchClient(transport); } /** * 异步处理批量请求的对象 */ @Bean public BulkIngester<String> bulkIngester() throws Exception { BulkListener<String> listener = new BulkListener<String>() { /** * * @param executionId 此请求的id * @param request 将发送的批量请求 * @param contexts 数据集 */ @Override public void beforeBulk(long executionId, BulkRequest request, List<String> contexts) { LOGGER.info("【beforeBulk】批次[{}】 携带 【{}】 请求数量", executionId, contexts.size()); } /** * 批量请求之后调用 * @param executionId 此请求的id * @param request 将发送的批量请求 * @param contexts 数据集 * @param response 返回值 */ @Override public void afterBulk(long executionId, BulkRequest request, List<String> contexts, BulkResponse response) { LOGGER.info("【afterBulk】批次[{}】 提交数据量【{}】 提交结果【{}】", executionId, contexts.size(), response.errors() ? "失败" : "成功"); if (response.errors()) { LOGGER.error(response.toString()); } } /** * 当批量请求无法发送到Elasticsearch时调用 * @param executionId 此请求的id * @param request 将发送的批量请求 * @param contexts 数据集 * @param failure 异常信息 */ @Override public void afterBulk(long executionId, BulkRequest request, List<String> contexts, Throwable failure) { LOGGER.error("Bulk request " + executionId + " failed", failure); } }; ElasticsearchClient elasticsearchClient = elasticsearchClient(); BulkIngester<String> ingester = BulkIngester.of(b -> b .client(elasticsearchClient) .maxOperations(-1) .maxSize(bulkSize) .maxConcurrentRequests(concurrentRequests) .flushInterval(flushInterval, TimeUnit.SECONDS) .listener(listener) ); return ingester; } } 使用方式 @Resource private BulkIngester<String> bulkIngester; IndexOperation<EsTopicCollectModel> indexOperation = new IndexOperation.Builder<EsTopicCollectModel>() // 索引 .index(tableToEs.getIndexName()) // 文档id .id(tableToEs.getTableName() + "_" + data.getOrDefault(StrUtil.toCamelCase(tableToEs.getPkColumn()), "")) // 文档内容 .document(esTopicCollectModel) .build(); BulkOperation bulkOperation = new BulkOperation.Builder() .index(indexOperation) .build(); bulkIngester.add(bulkOperation); ```java