ElasticSearch 开发随笔

ElasticSearch 客户端选择

第一种客户端

基于 TCP 协议(ES 的 9300 端口,用于集群通信),依赖 spring-data-elasticsearch:transport-api.jar,此方式的缺点如下:

  • SpringBoot 版本不同, 依赖的 transport-api.jar 版本也就不同,不能适配不同版本的 ES
  • 从 ES 7.x 版本开始,官方已经不建议使用 9300 端口来操作,而且 ES 8.x 以后就要移除该操作方式

第二种客户端

基于 HTTP 协议(ES 的 9200 端口,用于 RESTful API),可选的客户端如下:

  • RestTemplate、HttpClient、OkHttp:直接发送 HTTP 请求,ES 的很多操作需要自己封装,使用起来比较麻烦
  • Elasticsearch-Rest-Client:官方的 Rest 客户端,分为 Java Low Level REST ClientJava High Level REST Client,API 层次分明,上手简单

不同客户端对比

客户端优点缺点说明
Java Low Level Rest Client 与 ES 版本之间没有关系,适用于作为所有版本 ES 的客户端可以看做是低级的 HTTP 客户端,没有封装过多的 ES 操作
Java High Level Rest Client 使用最多使用时必须与 ES 版本保持一致基于 Low Level Rest Client,但在 ES 7.15.0 版本之后被弃用
TransportClient 使用 Transport 端口 (9300) 进行通信,能够使用 ES 集群中的一些特性,性能最好 JAR 包版本必须与 ES 集群版本一致,ES 集群升级,客户端也要跟着升级到相同版本已过时,官方从 ES 7 版本开始不建议使用,ES 8 版本之后被移除
Elasticsearch Java API Client 最新的 ES 客户端文档较少

提示

关于更多的 Elasticsearch 客户端说明,建议阅读 官方文档

ElasticSearch 客户端使用

下面将简单介绍 SpringBoot 项目如何引入 Java High Level Rest Client,由于 SpringBoot Starter 默认依赖了某版本的 Elasticsearch,因此需要在 pom.xml 配置文件中使用 <elasticsearch.version> 来指定(覆盖) Elasticsearch 的实际版本号,否则会出现兼容性问题。

引入 Maven 坐标

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
<properties>
<elasticsearch.version>7.4.2</elasticsearch.version>
</properties>

<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.6.3</version>
<relativePath/>
</parent>

<dependencies>
<dependency>
<groupId>org.elasticsearch.client</groupId>
<artifactId>elasticsearch-rest-high-level-client</artifactId>
<version>${elasticsearch.version}</version>
</dependency>
</dependencies>

Java 配置类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
import org.apache.http.HttpHost;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestClientBuilder;
import org.elasticsearch.client.RestHighLevelClient;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class ElasticSearchConfig {

public static final RequestOptions COMMON_OPTIONS;

static {
// 基础配置信息
String token = "";
RequestOptions.Builder builder = RequestOptions.DEFAULT.toBuilder();
// builder.addHeader("Authorization", "Bearer " + token);
// builder.setHttpAsyncResponseConsumerFactory(
// new HttpAsyncResponseConsumerFactory.HeapBufferedResponseConsumerFactory(30 * 1024 * 1024 * 1024));
COMMON_OPTIONS = builder.build();
}

/**
* 定义 ES 客户端
*
* @return ES 客户端
*/
@Bean
public RestHighLevelClient restHighLevelClient() {
// 指定ES的连接地址
RestClientBuilder builder = RestClient.builder(new HttpHost("127.0.0.1", 9200, "http"));
return new RestHighLevelClient(builder);
}

}

Java 测试代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
import com.alibaba.fastjson2.JSON;
import com.clay.gulimall.search.config.ElasticSearchConfig;
import lombok.extern.slf4j.Slf4j;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.SearchHits;
import org.elasticsearch.search.aggregations.AggregationBuilders;
import org.elasticsearch.search.aggregations.Aggregations;
import org.elasticsearch.search.aggregations.bucket.terms.Terms;
import org.elasticsearch.search.aggregations.bucket.terms.TermsAggregationBuilder;
import org.elasticsearch.search.aggregations.metrics.Avg;
import org.elasticsearch.search.aggregations.metrics.AvgAggregationBuilder;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;

import java.util.Date;

@Slf4j
@SpringBootTest
public class ElasticSearchApiTest {

@Autowired
private RestHighLevelClient esClient;

/**
* 创建索引数据
*/
@Test
public void indexData() throws Exception {
IndexRequest request = new IndexRequest("posts").id("1")
.source("user", "Jim", "postDate", new Date(), "message", "trying out ElasticSearch");

IndexResponse indexResponse = esClient.index(request, ElasticSearchConfig.COMMON_OPTIONS);
log.info(JSON.toJSONString(indexResponse));
}

/**
* 聚合查询
* <p> 查询 address 中包含 mill 的所有人的年龄分布以及平均薪资
*/
@Test
public void searchData() throws Exception {
SearchRequest searchRequest = new SearchRequest();
// 指定索引
searchRequest.indices("bank");
// 检索条件
SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
searchSourceBuilder.query(QueryBuilders.matchQuery("address", "mill"));
// 按照年龄的分布进行聚合
TermsAggregationBuilder ageAgg = AggregationBuilders.terms("group_by_age").field("age").size(100);
searchSourceBuilder.aggregation(ageAgg);
// 计算所有人的平均薪资
AvgAggregationBuilder avgBalance = AggregationBuilders.avg("avgBalance").field("balance");
searchSourceBuilder.aggregation(avgBalance);
// 执行检索
searchRequest.source(searchSourceBuilder);
SearchResponse searchResponse = esClient.search(searchRequest, ElasticSearchConfig.COMMON_OPTIONS);

// 获取搜索结果
SearchHits searchHits = searchResponse.getHits();
SearchHit[] hitArray = searchHits.getHits();
for (SearchHit hit : hitArray) {
String recored = hit.getSourceAsString();
log.info("id: {}, data: {}", hit.getId(), recored);
}

// 获取聚合结果 - 年龄的分布
Aggregations aggregations = searchResponse.getAggregations();
Terms terms = aggregations.get("group_by_age");
for (Terms.Bucket bucket : terms.getBuckets()) {
log.info("age: {}, total: {}", bucket.getKeyAsString(), bucket.getDocCount());
}

// 获取聚合结果 - 平均薪资
Avg avg = aggregations.get("avgBalance");
log.info("avg balance: {}", avg.getValue());

log.info("search params: {}\n", searchSourceBuilder.toString());
log.info("search result: {}\n", JSON.toJSONString(searchResponse));
}

}

上述的聚合查询代码,最终发出 HTTP 请求体内容如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
GET /bank/_search
{
"query": {
"match": {
"address": "mill"
}
},
"aggs": {
"group_by_age": {
"terms": {
"field": "age",
"size": 100
}
},
"avgBalance": {
"avg": {
"field": "balance"
}
}
}
}

ElasticSearch 性能优化

几十亿数据量的场景下优化查询性能

这是一道高频的 BAT 面试题:”ES 在几十亿数据量的场景下如何优化查询性能”,详细的答题思路如下。

面试官心理分析

问这个问题,其实是个判断题 —— 说白了,就是看你有没有真正用过 ES 在线上项目中 “实际干过”。为什么这么说?因为 ES 的性能并没有你想象中那么强大。在数据量较小时,ES 确实很快,做个 Demo 也没啥问题。但一旦数据量上了规模,尤其是达到几亿条记录的时候,你可能就会懵了:怎么一个搜索请求突然耗时 5 到 10 秒,简直坑爹。第一次查询特别慢,甚至感觉系统卡住了;但同样的查询第二次、第三次却非常快,只需要几百毫秒。这就让人很困惑:难道每个用户第一次访问的时候都会特别慢、特别卡顿吗?如果你没在真实项目中用过 ES,或者只是自己本地跑跑 Demo,那么面对这个问题可能就会懵,甚至答不上来。这其实很容易暴露出你对 ES 的理解还停留在 “会用” 而不是 “精通” 的阶段。

面试题剖析

说实话,ES 在性能优化上是没有 “银弹” 的。什么意思呢?就是不要指望随手调一个参数,就能万能地解决所有性能瓶颈。有些场景下,确实可能通过调整参数或优化查询语法获得明显提升,但这绝对不是普遍适用的解决方案。所以,我们要分场景、分模块,一步步系统性地分析优化手段。以我们在生产环境中的实际经验来看,在海量数据(例如上亿级别)场景下,提升 ES 搜索性能最关键的一招,就是 —— 充分利用操作系统的文件系统缓存(FileSystem Cache)

  • 什么是 Filesystem Cache?
    • ES 写入数据时,底层实际上是将索引数据写入磁盘上的 Segment File。操作系统会自动将这些磁盘文件的数据缓存在内存中,这部分缓存就叫 Page Cache(操作系统页缓存),也称为 FileSystem Cache。
    • ES 没有自己的独立缓存机制(除了 FieldData、Query Cache 等小部分),它非常依赖 Page Cache 来实现索引数据的快速访问。
  • 为什么 FileSystem Cache 对搜索性能至关重要?
    • ES 搜索引擎在查询时并不会直接访问磁盘,而是尽可能从操作系统的 FileSystem Cache 中读取索引数据。如果你的服务器有足够的内存,能够容纳所有的 Segment File(也就是整个索引的元数据和倒排索引结构),那么搜索请求大多数都会命中内存,这时候性能就非常高了。
  • ES 线上测试结果显示:
    • ES 命中 FileSystem Cache 的查询,通常只需要耗时几毫秒到几百毫秒;
    • 而 ES 未命中 FileSystem Cache,走磁盘 I/O 的查询,则可能耗时 1 秒、5 秒,甚至 10 秒以上。

这个性能差距是一个数量级的!因此,如果你发现 ES 查询慢,第一步要考虑的不是乱调参数,而是先排查 —— 你的服务器内存是否足够大?是否足以承载索引的全部 Segment File?是否频繁发生 Cache Miss(缓存未命中)?

真实案例分析

这里举一个线上真实案例:张三公司有一个 ES 集群,共有 3 台机器,每台机器配有 64GB 内存,那么总内存是 64 * 3 = 192GB。你为每台节点配置了 32GB 的 JVM Heap(这是官方推荐的上限),这样每台机器剩下的可用于 FileSystem Cache(也就是操作系统页缓存)的内存就是 64GB - 32GB = 32GB,整个集群加起来留给 FileSystem Cache 的总内存只有 32GB * 3 = 96GB。接着我问他:” 你们现在 ES 中到底存了多少数据?” 他们说 3 台机器总共存了大约 1TB 的索引数据,也就是说每台机器大概承载了 300 多 GB 的 Segment File,而每台机器只有 32GB 内存用于缓存这些索引数据,仅仅能容纳不到 10% 的索引数据,其余 90% 只能从磁盘读取。你觉得在这种情况下,搜索性能能好吗?显然,大多数搜索请求都会落到磁盘,性能自然就非常差。这就是他们在测试环境中发现搜索非常慢的根本原因:以为用了 64GB 内存的高配机器就能轻松撑住 1TB 的数据量,实际上远远不够。

要想让 ES 在大数据量场景下依然保持高性能,必须尽可能让内存承载尽量多的索引文件,也就是 Segment File 的数据。最佳实践是:集群中留给 FileSystem Cache 的总内存,至少应该达到实际索引数据量的一半甚至更多。比如你要在 ES 中存储 1TB 的索引数据,那你至少应该确保集群中用于 FileSystem Cache 的可用内存达到 512GB 或更多,这样大部分热数据都能命中内存,查询性能才能稳定在 2 秒、3 秒以内,甚至低于 1 秒。否则,如果大多数查询都落到磁盘上,哪怕你机器再多、JVM 配置再高,也优化不了搜索性能。

面试题答案

ES 在几十亿数据量的场景下,优化查询性能的方案有以下几种:

  • ES + HBase/MySQL

    • 根据在生产环境中的实践经验,想要让 ES 保持高性能,一个核心策略就是尽量减少写入 ES 的数据量,只保留用于检索的必要字段。比如说,如果每台 ES 机器为 FileSystem Cache 分配了 100GB 内存,那就应控制写入 ES 索引文件的总数据量在 100GB 以内,确保索引数据基本都能被缓存在内存中,这样搜索几乎都走内存,性能非常高,通常能做到 1 秒以内。
    • 举个例子,如果现在有一行数据包含 30 个字段,比如 idnameage 等,而你的搜索场景只涉及 idnameage 这三个字段,那就没有必要把所有 30 个字段都写入 ES。否则,多余的字段不仅没用,反而白白占用了宝贵的 FileSystem Cache 空间,导致同样的内存能缓存的数据量变少,进而拉低整体查询性能。
    • 正确做法是:只将用于检索的字段写入 ES,例如 idnameage,其余字段(如明细信息、描述、历史记录等)可以存入其他数据库,推荐的架构是 ES + HBase 或 ES + MySQL。比如,如果有一份数据包含 100 个字段,但真正参与搜索的只有 10 个字段,那就只把这 10 个字段写入 ES,其余 90 个字段可以存 HBase 或 MySQL。这样 ES 中的数据结构更轻量,缓存命中率更高,搜索速度更快,同时也降低了集群资源消耗和维护成本。
    • 具体流程如下:你在 ES 中根据 nameage 进行搜索,返回一批匹配的文档 ID(比如 20 条);接着你再拿这些 ID 去 HBase 或 MySQL 中查询对应的完整数据,最终把这些完整信息返回给前端。这样的架构可以显著提高性能。比如搜索阶段仅耗时 20ms,HBase 查询阶段耗时 30ms,总体响应时间在 50ms 左右;而如果你把所有 1TB 的数据全放进 ES,每次查询可能都得耗费 5 到 10 秒。
    • 因此,一贯的建议是:ES 中只存储关键的、用于搜索的字段,其数据量应尽量控制在 FileSystem Cache 能容纳的范围内;剩下不用于搜索的大量字段,可以存放在 HBase、MySQL 等系统中,按需查询。
  • ES 数据预热

    • 即使已经按照 “只将用于搜索的字段写入 ES” 的最佳实践去做了,但在一些场景下,每台 ES 机器上写入的数据量仍可能超过 FileSystem Cache 的可承载范围。比如,一台机器上写入了 60GB 的索引数据,而用于 FileSystem Cache 的内存只有 30GB,意味着仍有一半数据只能依赖磁盘访问,性能依旧无法保证。
    • 这时候就可以考虑引入一个非常有效的优化手段 —— 热数据的缓存预热机制。简单来说,就是让系统提前把那些被频繁访问的数据 “主动读一遍”,让操作系统把它们预加载到 FileSystem Cache 中。这样,用户后续再访问这些数据时,ES 查询就可以直接命中内存,速度会非常快。
    • 比如在微博场景下,你可以识别出一些大 V 用户的内容,这些内容访问频率高、用户关注度强。你可以在后台做一个定时任务系统,每隔一段时间(比如每分钟),由后台自动搜索一遍这些热数据,将它们 “刷” 进文件系统缓存。这样,当真实用户来访问这些热门微博内容时,查询就能直接从内存中命中,响应速度自然更快。
    • 再比如电商系统,像 iPhone、热门折扣商品等,经常会被用户反复浏览,可以提前通过后台定时访问这些热商品的索引,让它们保持在 FileSystem Cache 中,提高搜索命中率和系统整体响应速度。
    • 因此,建议针对热数据建立一个专门的 “缓存预热子系统”,定期主动触发对热点内容的查询或预加载操作,将热点索引数据尽可能保留在内存中。这种方式在大数据量场景下尤其有效,可以大幅度降低磁盘 I/O,提高系统的稳定性和搜索性能。
  • ES 数据冷热分离

    • 在 ES 的性能优化中,数据拆分是一项非常关键的策略。前面提到过,可以将大量不用于搜索的字段剥离到其他存储系统中,比如 HBase 或 MySQL,这实际上对应的是类似于 MySQL 中 “分库分表” 里的垂直拆分:即将非核心字段从主索引中拆出去,减少 ES 中的存储压力,提高缓存命中率。
    • 而在 ES 中,同样可以借鉴 MySQL 的水平拆分思路,也就是按照数据的访问频率来拆分索引。具体来说,就是将访问频率非常低的 “冷数据” 单独写入一个索引,将访问频率非常高的” 热数据” 单独写入另一个索引。这样做的目的是避免冷数据冲刷掉热数据在操作系统 FileSystem Cache 中的缓存空间,从而确保热数据的访问性能始终保持在较高水平。
    • 举个例子,假设有一个 6 台机器组成的 ES 集群,数据被分为两个索引:一个用于冷数据,一个用于热数据,每个索引配置 3 个 Primary Shard。你可以将热数据的索引分配到其中 3 台机器,冷数据的索引分配到另外 3 台机器。这样,访问热数据时只会命中那 3 台负责热索引的机器,而热数据本身只占总数据量的 10%,内存压力小,Segment File 很容易被 FileSystem Cache 缓存住,查询几乎全在内存完成,性能自然非常高。
    • 而冷数据则完全被隔离在另一个索引中,并分布在另外 3 台机器上。即使冷数据需要从磁盘读取、性能相对较差,也不会影响到主流用户的搜索体验。整体来看,90% 的请求命中热数据索引,享受高性能;只有 10% 的请求会访问冷数据,性能稍弱但可接受。通过这样的冷热分离和水平拆分策略,可以在确保整体系统性能的同时,有效应对大数据量场景下的资源冲突问题。
  • 优化 Document 模型设计

    • 在 ES 中,Document 模型设计是性能优化的关键环节之一
      • 假设在 MySQL 中有两张表 —— 订单表(order)和订单条目表(order_item),表结构如下:
        • order 表:id, order_code, total_price
        • order_item 表:id, order_id, goods_id, purchase_count, price
      • 在 MySQL 中,通常使用以下 JOIN 查询:
        • SQL 查询语句: SELECT * FROM order JOIN order_item ON order.id = order_item.order_id WHERE order.id = 1;
        • 返回结果会是两行,每条订单记录与其子条目连接在一起。
      • 但在 ES 中,这种复杂的关联查询、Join 操作、甚至 parent-childnested 查询,不仅语法复杂,而且性能非常差。因此建议:尽量不要在 ES 查询阶段做这些复杂操作,而是将复杂操作转移到 ES 数据写入阶段执行。
    • 推荐的 ES 数据模型设计方式如下:
      • 在写入 ES 数据时,将数据预处理为两个索引:
        • 一个是 order 索引,仅包含订单主信息:id, order_code, total_price
        • 一个是 order_item 索引,在写入数据之前,就完成订单和订单条目的 Join,把订单字段和订单条目字段合并写进 ES,例如: order_id, order_code, total_price, goods_id, purchase_count, price
      • 这个 Join 操作不是在 ES 查询时做,而是在 Java 系统中通过程序处理好,再将处理好的数据写入 ES,这样查询时就不需要依赖 ES 本身的 Join 功能,查询效率更高、逻辑更简单。
    • ES 模型设计的核心建议:
      • 很多 ES 新手常犯的错误,就是在查询时试图执行各种复杂的业务逻辑、数据组合、字段匹配、嵌套结构解析等。但实际上,ES 本质上只是一个强大的全文搜索引擎,它并不擅长复杂的业务逻辑或关系型处理,所以不要强行把它当数据库用。
      • 针对复杂查询场景,建议采用以下两种思路:
        • 在写入阶段设计好模型:预先处理好关联关系、聚合信息、冗余字段等,写入时就将结果数据写好,避免查询时再做复杂处理。
        • 在 Java 应用层封装业务逻辑:让 ES 负责能高效完成的部分(如关键词搜索、过滤),然后由 Java 程序对返回结果进行进一步处理,完成复杂的业务逻辑。
      • 总之,ES 能做的就让它做,不能做的就别强求。把复杂逻辑尽可能提前转移到数据写入或业务系统层,才是高性能、高可维护性 ES 架构的关键。
  • ES 分页性能优化

    • ES 的分页机制存在较大的性能隐患。举个例子,如果你每页展示 10 条数据,想查看第 100 页的内容,ES 实际上会从每个 Shard 上拉取前 1000 条数据到协调节点(因为 100 页 × 10 条 = 1000 条),假如有 5 个 Shard,总共就是 5000 条数据。当 ES 协调节点拿到这 5000 条数据后,还需要进行排序、筛选等操作,然后再从中取出最终的第 100 页的那 10 条结果。
    • 这是因为 ES 是分布式的,是不可能说从 5 个 Shard,每个 Shard 就返回 2 条数据,最后到协调节点合并成 10 条数据。ES 必须得从每个 Shard 都查 1000 条数据过来,然后根据你的需求进行排序、筛选等等操作,最后再次分页,拿到里面第 100 页的数据。翻页越深,每个 Shard 拉取的数据越多,而且协调节点处理的时间越长。所以,使用 ES 做分页的时候,会发现分页越翻到后面,就越是慢。比如,使用 ES 做分页,前几页可能几十毫秒就返回了,翻到第 10 页之后,性能明显下滑,几十页时甚至可能需要 5~10 秒才能返回一页结果。
    • 为了解决 ES 分页越翻到后面就越慢这个问题,一般建议不要使用深度分页(ES 默认的深度分页性能很差),产品设计上也要避免用户任意翻页。如果业务场景类似于微博、App 推荐商品那种 “下拉刷新、逐页加载” 的模式,建议使用 ES 提供的 Scroll API。因为 Scroll 的原理是生成一个数据快照,配合游标每次往后翻一页,性能非常好,即便翻很多页也能维持在毫秒级响应,但唯一的缺点就是它不支持跳页(如从第 1 页跳到第 100 页)。Scroll 更适合于只允许顺序翻页的场景,并且需要确保数据在一定时间窗口内保持一致。因此,如今很多 App 或网站的产品设计,都是不支持随意跳转页码的,而是采用 “无限下拉加载” 的形式,这就是为了避开 ES 分页性能的问题。