ElasticSearch7.x
准备
简介
Elaticsearch,简称为 ES,ES 是一个开源的高扩展的分布式全文搜索引擎,是整个 Elastic Stack 技术栈的核心。它可以近乎实时的存储、检索数据;本身扩展性很好,可以扩展到上 百台服务器,处理 PB 级别的数据。
安装
- 下载安装 ElasticSearch
- 进入bin目录
- 点击elasticsearch.bat
- 进入web端检查结果
9300 端口为 Elasticsearch 集群间组件的通信端口,9200 端口为浏览器访问的 http 协议 RESTful 端口
基本概念
-
ES支持RESTful规则
-
直接通过浏览器向 Elasticsearch 服务器发请求,仅支持 GET 和 POST 方法
-
可以使用postman发送任何类型的 HTTP 请求
-
ES通过JSON传输数据
-
Elasticsearch 是面向文档型数据库,一条数据在这里就是一个文档
- 这里 Types 的概念已经被逐渐弱化,Elasticsearch 6.X 中,一个 index 下已经只能包含一个 type,Elasticsearch 7.X 中, Type 的概念已经被删除了。
正排索引 key—>value
倒排索引 value—>key
HTTP操作
索引操作
创建索引
对比关系型数据库,创建索引就等同于创建数据库
-
向ES服务器发PUT请求
http://localhost:9200/shopping
-
服务器返回相应
{
"acknowledged"【响应结果】: true, # true 操作成功
"shards_acknowledged"【分片结果】: true, # 分片操作成功
"index"【索引名称】: "shopping"
}
# 注意:创建索引库的分片数默认 1 片,在 7.0.0 之前的 Elasticsearch 版本中,默认 5 片
- 如果重复添加索引,会返回错误信息
查看单个索引
发送GET请求 , http://127.0.0.1:9200/shopping
查看所有索引
发送get请求 http://127.0.0.1:9200/_cat/indices?v
_cat
表示查看的意思,indices 表示索引
health status index uuid pri rep docs.count docs.deleted store.size pri.store.size
yellow open shoppong HRJXm6zgSGK2M3GhYbWF4Q 1 1 0 0 208b 208b
yellow open shopping GhNTne8WTdKuSKhSgvlKng 1 1 0 0 208b 208b
删除索引
发生DELETE请求 http://localhost:9200/shopping
文档操作
索引已经创建好了,接下来我们来创建文档,并添加数据。这里的文档可以类比为关系型数 据库中的表数据,添加的数据格式为 JSON 格式
创建文档
POST http://localhost:9200/shopping/_doc
http://localhost:9200/shopping/_create
也可以
{
"title":"小米手机",
"category":"小米",
"images":"http://www.gulixueyuan.com/xm.jpg",
"price":3999.00
}
- 上面的数据创建后,由于没有指定数据唯一性标识(ID),默认情况下,ES 服务器会随机 生成一个。 如果想要自定义唯一性标识,需要在创建时指定:
http://127.0.0.1:9200/shopping/_doc/1
如果增加数据时明确数据主键,那么请求方式也可以为 PUT
查看文档
查看一条数据
GET http://127.0.0.1:9200/shopping/_doc/1
全部查询
http://127.0.0.1:9200/shopping/_search
修改数据
完全覆盖
幂等性,可以使用PUT
PUT http://localhost:9200/shopping/_doc/1
{
"title":"华为手机",
"category":"华为",
"images":"http://www.gulixueyuan.com/hw.jpg",
"price":4999.00
}
局部更新
POST http://127.0.0.1:9200/shopping/_update/1
http://127.0.0.1:9200/shopping/_doc/1
{
"doc": {
"price":3000.00
}
}
删除文档
删除一个文档不会立即从磁盘上移除,它只是被标记成已删除(逻辑删除)。
DELETE 请求 :http://127.0.0.1:9200/shopping/_doc/
条件删除文档
POST http://127.0.0.1:9200/shopping/_delete_by_query
{
"query":{
"match":{
"price":4000.00
}
}
}
条件查询
请求路径
GET http://127.0.0.1:9200/shopping/_search?q=category:小米
请求体
http://127.0.0.1:9200/shopping/_search
{
"query":{
"match":{
"category": "小米"
}
}
}
全量查询
{
"query":{
"match_all":{
}
}
}
分页查询
{
"query":{
"match_all":{
}
},
"from" :0, // (页码-1)*页长
"size" :1
}
指定字段
{
"query":{
"match_all":{
}
},
"from" :0,
"size" :6,
"_source":["title"]
}
结果排序
{
"query":{
"match_all":{
}
},
"from" :0,
"size" :7,
"_source":["title"],
"sort":{
"price":{
"order":"desc"
}
}
}
过滤字段
includes:来指定想要显示的字段
excludes:来指定不想要显示的字段
{
"_source": {
"includes": ["name","nickname"]
},
"query": {
"terms": {
"nickname": ["zhangsan"]
}
}
}
多条件查询
GET http://127.0.0.1:9200/shopping/_search
bool
把各种其它查询通过must
(必须 )、must_not
(必须不)、should
(应该)的方 式进行组合
同时成立
{
"query":{
"bool":{
"must":[
{
"match":{
"category":"小米"
}
},
{
"match":{
"price":"30"
}
}
]
}
}
}
任意成立
{
"query":{
"bool":{
"should":[
{
"match":{
"category":"小米"
}
},
{
"match":{
"category":"华为"
}
}
]
}
}
}
字段匹配查询
multi_match 与 match 类似,不同的是它可以在多个字段中查询。
GET 请求 :http://127.0.0.1:9200/student/_search
{
"query": {
"multi_match": {
"query": "zhangsan",
"fields": ["name","nickname"]
}
}
}
范围查询
{
"query":{
"bool":{
"should":[
{
"match":{
"category":"小米"
}
},
{
"match":{
"category":"华为"
}
}
],
"filter":{
"range":{
"price":{
"gt":20,
"lt":1000
}
}
}
}
}
}
完全匹配
问题提出 以下查询可以同时搜到小米与华为,(es分词查询默认单字)
{
"query":{
"match":{
"category":"小华"
}
}
}
解决
{
"query":{
"match_phrase":{
"category":"小华"
}
}
}
高亮查询
对category字段高亮显示
{
"query":{
"match_phrase":{
"category":"小华"
}
},
"highlight":{
"fields":{
"category":{}
}
}
}
模糊查询
返回包含与搜索字词相似的字词的文档。 编辑距离是将一个术语转换为另一个术语所需的一个字符更改的次数。
- 更改字符(box → fox)
- 删除字符(black → lack)
- 插入字符(sic → sick)
- 转置两个相邻字符(act → cat)
{
"query": {
"fuzzy": {
"title": {
"value": "zhangsan",
"fuzziness": 2
}
}
}
}
聚合查询
- term 查询,精确的关键词匹配查询,不对查询条件进行分词。
- terms 查询和 term 查询一样,但它允许你指定多值进行匹配。
分组计数
{
"aggs":{//聚合操作
"price_group":{//统计结果名称,随意起名
"terms":{//分组
"field":"price"//分组字段
}
}
},
"size":0//不显示原始数据
}
平均数
{
"aggs":{//聚合操作
"price_avg":{//统计结果名称,随意起名
"avg":{//分组
"field":"price"//分组字段
}
}
},
"size":0//不显示原始数据
}
求和
{
"aggs":{//聚合操作
"price_sum":{//统计结果名称,随意起名
"sum":{//分组
"field":"price"//分组字段
}
}
},
"size":0//不显示原始数据
}
映射
-
有了索引库,等于有了数据库中的 database。
-
索引库(index)中的映射了,类似于数据库(database)中的表结构(table)。 创建数据库表需要设置字段名称,类型,长度,约束等;索引库也一样,需要知道这个类型 下有哪些字段,每个字段有哪些约束信息,这就叫做映射(mapping)。
创建映射
- 创建索引
PUT http://localhost:9200/user
- 创建映射
PUT http://localhost:9200/user/_mapping
{
"properties": {
"name":{
"type": "text",
"index": true//字段可以被索引查询 ,有分词效果
},
"sex":{
"type": "keyword",//关键词,必须被完整匹配 ,无分词效果
"index": true
},
"tel":{
"type":"keyword",
"index":false//不能被索引
}
}
}
- 插入数据
{
"name":"小米",
"sex":"男的",
"tel":"1111"
}
- 查询数据验证
GET http://localhost:9200/user/_search
{
"query":{
"match":{
"tel":"1111"
}
}
}
映射数据说明
- 字段名:任意填写,下面指定许多属性,例如:title、subtitle、images、price
- type: 类型
- String 类型
- text:可分词
- keyword:不可分词,数据会作为完整字段进行匹配
- Numerical:数值类型,分两类
- 基本数据类型:long、integer、short、byte、double、float、half_float
- 浮点数的高精度类型:scaled_float
- Date:日期类型
- Array:数组类型
- Object:对象
- index:是否索引,默认为 true,也就是说你不进行任何配置,所有字段都会被索引。
- false:字段不会被索引,不能用来搜索
- store:是否将数据进行独立存储,默认为 false
- 原始的文本会存储在
_source
里面,默认情况下其他提取出来的字段都不是独立存储 的,是从_source
里面提取出来的。当然你也可以独立的存储某个字段,只要设置 “store”: true 即可,获取独立存储的字段要比从_source
中解析快得多,但是也会占用 更多的空间,所以要根据实际业务需求来设置。
- 原始的文本会存储在
- analyzer:分词器,这里的 ik_max_word 即使用 ik 分词器,后面会有专门的章节学习
- String 类型
JavaAPI
依赖
<dependencies>
<dependency>
<groupId>org.elasticsearch</groupId>
<artifactId>elasticsearch</artifactId>
<version>7.8.0</version>
</dependency>
<!-- elasticsearch的客户端 -->
<dependency>
<groupId>org.elasticsearch.client</groupId>
<artifactId>elasticsearch-rest-high-level-client</artifactId>
<version>7.8.0</version>
</dependency>
<!-- elasticsearch依赖2.x的log4j -->
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-api</artifactId>
<version>2.8.2</version>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-core</artifactId>
<version>2.8.2</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>2.9.9</version>
</dependency>
<!-- junit单元测试 -->
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.12</version>
</dependency>
</dependencies>
客户端对象
public static void main(String[] args) throws Exception {
// 创建ES客户端
RestHighLevelClient esClient = new RestHighLevelClient(
RestClient.builder(new HttpHost("localhost", 9200, "http"))
);
// 关闭ES客户端
esClient.close();
}
索引
创建索引
public static void main(String[] args) throws Exception {
RestHighLevelClient esClient = new RestHighLevelClient(
RestClient.builder(new HttpHost("localhost", 9200, "http"))
);
// 创建索引
CreateIndexRequest request = new CreateIndexRequest("user");
CreateIndexResponse createIndexResponse =
esClient.indices().create(request, RequestOptions.DEFAULT);
// 响应状态
boolean acknowledged = createIndexResponse.isAcknowledged();
System.out.println("索引操作 :" + acknowledged);
esClient.close();
}
查询索引
public static void main(String[] args) throws Exception {
RestHighLevelClient esClient = new RestHighLevelClient(
RestClient.builder(new HttpHost("localhost", 9200, "http"))
);
// 查询索引
GetIndexRequest request = new GetIndexRequest("user");
GetIndexResponse getIndexResponse =
esClient.indices().get(request, RequestOptions.DEFAULT);
// 响应状态
System.out.println(getIndexResponse.getAliases());
System.out.println(getIndexResponse.getMappings());
System.out.println(getIndexResponse.getSettings());
esClient.close();
}
删除索引
public static void main(String[] args) throws Exception {
RestHighLevelClient esClient = new RestHighLevelClient(
RestClient.builder(new HttpHost("localhost", 9200, "http"))
);
// 删除索引
DeleteIndexRequest request = new DeleteIndexRequest("user");
AcknowledgedResponse response = esClient.indices().delete(request, RequestOptions.DEFAULT);
// 响应状态
System.out.println(response.isAcknowledged());
esClient.close();
}
文档
插入数据
RestHighLevelClient esClient = new RestHighLevelClient(
RestClient.builder(new HttpHost("localhost", 9200, "http"))
);
// 插入数据
IndexRequest request = new IndexRequest();
request.index("user").id("1001");
User user = new User();
user.setName("zhangsan");
user.setAge(30);
user.setSex("男");
// 向ES插入数据,必须将数据转换位JSON格式
ObjectMapper mapper = new ObjectMapper();
String userJson = mapper.writeValueAsString(user);
request.source(userJson, XContentType.JSON);
IndexResponse response = esClient.index(request, RequestOptions.DEFAULT);
System.out.println(response.getResult());
esClient.close();
修改数据
RestHighLevelClient esClient = new RestHighLevelClient(
RestClient.builder(new HttpHost("localhost", 9200, "http"))
);
// 修改数据 局部更新
UpdateRequest request = new UpdateRequest();
request.index("user").id("1001");
request.doc(XContentType.JSON, "sex", "女");
UpdateResponse response = esClient.update(request, RequestOptions.DEFAULT);
System.out.println(response.getResult());
esClient.close();
查询数据
RestHighLevelClient esClient = new RestHighLevelClient(
RestClient.builder(new HttpHost("localhost", 9200, "http"))
);
// 查询数据
GetRequest request = new GetRequest();
request.index("user").id("1001");
GetResponse response = esClient.get(request, RequestOptions.DEFAULT);
System.out.println(response.getSourceAsString());
esClient.close();
删除数据
RestHighLevelClient esClient = new RestHighLevelClient(
RestClient.builder(new HttpHost("localhost", 9200, "http"))
);
DeleteRequest request = new DeleteRequest();
request.index("user").id("1001");
DeleteResponse response = esClient.delete(request, RequestOptions.DEFAULT);
System.out.println(response.toString());
esClient.close();
批量增加
RestHighLevelClient esClient = new RestHighLevelClient(
RestClient.builder(new HttpHost("localhost", 9200, "http"))
);
// 批量插入数据
BulkRequest request = new BulkRequest();
request.add(new IndexRequest().index("user").id("1001").source(XContentType.JSON, "name", "zhangsan", "age",30,"sex","男"));
request.add(new IndexRequest().index("user").id("1002").source(XContentType.JSON, "name", "lisi", "age",30,"sex","女"));
request.add(new IndexRequest().index("user").id("1003").source(XContentType.JSON, "name", "wangwu", "age",40,"sex","男"));
request.add(new IndexRequest().index("user").id("1004").source(XContentType.JSON, "name", "wangwu1", "age",40,"sex","女"));
request.add(new IndexRequest().index("user").id("1005").source(XContentType.JSON, "name", "wangwu2", "age",50,"sex","男"));
request.add(new IndexRequest().index("user").id("1006").source(XContentType.JSON, "name", "wangwu3", "age",50,"sex","男"));
request.add(new IndexRequest().index("user").id("1007").source(XContentType.JSON, "name", "wangwu44", "age",60,"sex","男"));
request.add(new IndexRequest().index("user").id("1008").source(XContentType.JSON, "name", "wangwu555", "age",60,"sex","男"));
request.add(new IndexRequest().index("user").id("1009").source(XContentType.JSON, "name", "wangwu66666", "age",60,"sex","男"));
BulkResponse response = esClient.bulk(request, RequestOptions.DEFAULT);
System.out.println(response.getTook());
System.out.println(response.getItems());
esClient.close();
批量删除
RestHighLevelClient esClient = new RestHighLevelClient(
RestClient.builder(new HttpHost("localhost", 9200, "http"))
);
// 批量删除数据
BulkRequest request = new BulkRequest();
request.add(new DeleteRequest().index("user").id("1001"));
request.add(new DeleteRequest().index("user").id("1002"));
request.add(new DeleteRequest().index("user").id("1003"));
BulkResponse response = esClient.bulk(request, RequestOptions.DEFAULT);
System.out.println(response.getTook());
System.out.println(response.getItems());
esClient.close();
高级查询
迭代器
Iterator<SearchHit> iterator = hits.iterator();
while (iterator.hasNext()) {
SearchHit hit = iterator.next();
System.out.println(hit.getSourceAsString());
}
全量查询
// 1. 查询索引中全部的数据
SearchRequest request = new SearchRequest();
request.indices("user");
request.source(new SearchSourceBuilder().query(QueryBuilders.matchAllQuery()));
SearchResponse response = esClient.search(request, RequestOptions.DEFAULT);
SearchHits hits = response.getHits();
System.out.println(hits.getTotalHits());
System.out.println(response.getTook());
for ( SearchHit hit : hits ) {
System.out.println(hit.getSourceAsString());
}
条件查询
// 2. 条件查询 : termQuery
SearchRequest request = new SearchRequest();
request.indices("user");
request.source(new SearchSourceBuilder().query(QueryBuilders.termQuery("age", 30)));
SearchResponse response = esClient.search(request, RequestOptions.DEFAULT);
SearchHits hits = response.getHits();
System.out.println(hits.getTotalHits());
System.out.println(response.getTook());
for ( SearchHit hit : hits ) {
System.out.println(hit.getSourceAsString());
}
分页查询
// 3. 分页查询
SearchRequest request = new SearchRequest();
request.indices("user");
SearchSourceBuilder builder = new SearchSourceBuilder().query(QueryBuilders.matchAllQuery());
// (当前页码-1)*每页显示数据条数
builder.from(2);
builder.size(2);
request.source(builder);
SearchResponse response = esClient.search(request, RequestOptions.DEFAULT);
SearchHits hits = response.getHits();
System.out.println(hits.getTotalHits());
System.out.println(response.getTook());
for ( SearchHit hit : hits ) {
System.out.println(hit.getSourceAsString());
}
查询排序
// 4. 查询排序
SearchRequest request = new SearchRequest();
request.indices("user");
SearchSourceBuilder builder = new SearchSourceBuilder().query(QueryBuilders.matchAllQuery());
//
builder.sort("age", SortOrder.DESC);
request.source(builder);
SearchResponse response = esClient.search(request, RequestOptions.DEFAULT);
SearchHits hits = response.getHits();
System.out.println(hits.getTotalHits());
System.out.println(response.getTook());
for ( SearchHit hit : hits ) {
System.out.println(hit.getSourceAsString());
}
过滤字段
// 5. 过滤字段
SearchRequest request = new SearchRequest();
request.indices("user");
SearchSourceBuilder builder = new SearchSourceBuilder().query(QueryBuilders.matchAllQuery());
//
String[] excludes = {"age"};
String[] includes = {};
builder.fetchSource(includes, excludes);
request.source(builder);
SearchResponse response = esClient.search(request, RequestOptions.DEFAULT);
SearchHits hits = response.getHits();
System.out.println(hits.getTotalHits());
System.out.println(response.getTook());
for ( SearchHit hit : hits ) {
System.out.println(hit.getSourceAsString());
}
组合查询
// 6. 组合查询
SearchRequest request = new SearchRequest();
request.indices("user");
SearchSourceBuilder builder = new SearchSourceBuilder();
BoolQueryBuilder boolQueryBuilder = QueryBuilders.boolQuery();
//boolQueryBuilder.must(QueryBuilders.matchQuery("age", 30));
//boolQueryBuilder.must(QueryBuilders.matchQuery("sex", "男"));
//boolQueryBuilder.mustNot(QueryBuilders.matchQuery("sex", "男"));
boolQueryBuilder.should(QueryBuilders.matchQuery("age", 30));
boolQueryBuilder.should(QueryBuilders.matchQuery("age", 40));
builder.query(boolQueryBuilder);
request.source(builder);
SearchResponse response = esClient.search(request, RequestOptions.DEFAULT);
SearchHits hits = response.getHits();
System.out.println(hits.getTotalHits());
System.out.println(response.getTook());
for ( SearchHit hit : hits ) {
System.out.println(hit.getSourceAsString());
}
范围查询
// 7. 范围查询
SearchRequest request = new SearchRequest();
request.indices("user");
SearchSourceBuilder builder = new SearchSourceBuilder();
RangeQueryBuilder rangeQuery = QueryBuilders.rangeQuery("age");
rangeQuery.gte(30);
rangeQuery.lt(50);
builder.query(rangeQuery);
request.source(builder);
SearchResponse response = esClient.search(request, RequestOptions.DEFAULT);
SearchHits hits = response.getHits();
System.out.println(hits.getTotalHits());
System.out.println(response.getTook());
for ( SearchHit hit : hits ) {
System.out.println(hit.getSourceAsString());
}
模糊查询
// 8. 模糊查询
SearchRequest request = new SearchRequest();
request.indices("user");
SearchSourceBuilder builder = new SearchSourceBuilder();
builder.query(QueryBuilders.fuzzyQuery("name", "wangwu").fuzziness(Fuzziness.TWO));
request.source(builder);
SearchResponse response = esClient.search(request, RequestOptions.DEFAULT);
SearchHits hits = response.getHits();
System.out.println(hits.getTotalHits());
System.out.println(response.getTook());
for ( SearchHit hit : hits ) {
System.out.println(hit.getSourceAsString());
}
高亮查询
// 9. 高亮查询
SearchRequest request = new SearchRequest();
request.indices("user");
SearchSourceBuilder builder = new SearchSourceBuilder();
TermsQueryBuilder termsQueryBuilder = QueryBuilders.termsQuery("name", "zhangsan");
HighlightBuilder highlightBuilder = new HighlightBuilder();
highlightBuilder.preTags("<font color='red'>");
highlightBuilder.postTags("</font>");
highlightBuilder.field("name");
builder.highlighter(highlightBuilder);
builder.query(termsQueryBuilder);
request.source(builder);
SearchResponse response = esClient.search(request, RequestOptions.DEFAULT);
SearchHits hits = response.getHits();
System.out.println(hits.getTotalHits());
System.out.println(response.getTook());
for ( SearchHit hit : hits ) {
System.out.println(hit.getSourceAsString());
}
聚合查询
// 10. 聚合查询
SearchRequest request = new SearchRequest();
request.indices("user");
SearchSourceBuilder builder = new SearchSourceBuilder();
AggregationBuilder aggregationBuilder = AggregationBuilders.max("maxAge").field("age");
builder.aggregation(aggregationBuilder);
request.source(builder);
SearchResponse response = esClient.search(request, RequestOptions.DEFAULT);
SearchHits hits = response.getHits();
System.out.println(hits.getTotalHits());
System.out.println(response.getTook());
for ( SearchHit hit : hits ) {
System.out.println(hit.getSourceAsString());
}
分组查询
// 11. 分组查询
SearchRequest request = new SearchRequest();
request.indices("user");
SearchSourceBuilder builder = new SearchSourceBuilder();
AggregationBuilder aggregationBuilder = AggregationBuilders.terms("ageGroup").field("age");
builder.aggregation(aggregationBuilder);
request.source(builder);
SearchResponse response = esClient.search(request, RequestOptions.DEFAULT);
SearchHits hits = response.getHits();
System.out.println(hits.getTotalHits());
System.out.println(response.getTook());
for ( SearchHit hit : hits ) {
System.out.println(hit.getSourceAsString());
}
esClient.close();
集群
在一个集群里,只要你想,可以拥有任意多个节点。而且,如果当前你的网络中没有运 行任何 Elasticsearch 节点,这时启动一个节点,会默认创建并加入一个叫做“elasticsearch”的 集群。
Windows集群
- 创建 elasticsearch-cluster 文件夹,在内部复制三个 elasticsearch 服务
- 修改集群文件目录中每个节点的 config/elasticsearch.yml 配置文件
#节点 1 的配置信息:
#集群名称,节点之间要保持一致
cluster.name: my-elasticsearch
#节点名称,集群内要唯一
node.name: node-1001
node.master: true
node.data: true
#ip 地址
network.host: localhost
#http 端口
http.port: 1001
#tcp 监听端口
transport.tcp.port: 9301
#discovery.seed_hosts: ["localhost:9301", "localhost:9302","localhost:9303"]
#discovery.zen.fd.ping_timeout: 1m
#discovery.zen.fd.ping_retries: 5
#集群内的可以被选为主节点的节点列表
#cluster.initial_master_nodes: ["node-1", "node-2","node-3"]
#跨域配置
#action.destructive_requires_name: true
http.cors.enabled: true
http.cors.allow-origin: "*"
#节点 2 的配置信息:
#集群名称,节点之间要保持一致
cluster.name: my-elasticsearch
#节点名称,集群内要唯一
node.name: node-1002
node.master: true
node.data: true
#ip 地址
network.host: localhost
#http 端口
http.port: 1002
#tcp 监听端口
transport.tcp.port: 9302
discovery.seed_hosts: ["localhost:9301"]
discovery.zen.fd.ping_timeout: 1m
discovery.zen.fd.ping_retries: 5
#集群内的可以被选为主节点的节点列表
#cluster.initial_master_nodes: ["node-1", "node-2","node-3"]
#跨域配置
#action.destructive_requires_name: true
http.cors.enabled: true
http.cors.allow-origin: "*"
#节点 3 的配置信息:
#集群名称,节点之间要保持一致
cluster.name: my-elasticsearch
#节点名称,集群内要唯一
node.name: node-1003
node.master: true
node.data: true
#ip 地址
network.host: localhost
#http 端口
http.port: 1003
#tcp 监听端口
transport.tcp.port: 9303
#候选主节点的地址,在开启服务后可以被选为主节点
discovery.seed_hosts: ["localhost:9301", "localhost:9302"]
discovery.zen.fd.ping_timeout: 1m
discovery.zen.fd.ping_retries: 5
#集群内的可以被选为主节点的节点列表
#cluster.initial_master_nodes: ["node-1", "node-2","node-3"]
#跨域配置
#action.destructive_requires_name: true
http.cors.enabled: true
http.cors.allow-origin: "*"
- 启动集群
- 测试集群
查看集群状态
GET http://127.0.0.1:1001/_cluster/health
GET http://127.0.0.1:1002/_cluster/health
GET http://127.0.0.1:1003/_cluster/health
- 向集群中的 node-1001 节点增加索引
- 向集群中的 node-1002 节点查询索引,查到即为成功
Linux集群
单机测试
软件下载地址:https://www.elastic.co/cn/downloads/past-releases/elasticsearch-7-8-0
- 软件安装
# 解压缩
tar -zxvf elasticsearch-7.8.0-linux-x86_64.tar.gz -C /opt/module
# 改名
mv elasticsearch-7.8.0 es
- 创建用户
因为安全问题,Elasticsearch 不允许 root 用户直接运行,所以要创建新用户,在 root 用 户中创建新用户
useradd es #新增 es 用户
passwd es #为 es 用户设置密码
userdel -r es #如果错了,可以删除再加
chown -R es:es /opt/module/es #文件夹所有者
- 修改配置文件
修改/opt/module/es/config/elasticsearch.yml 文件
# 加入如下配置
cluster.name: elasticsearch
node.name: node-1
network.host: 0.0.0.0
http.port: 9200
cluster.initial_master_nodes: ["node-1"]
修改/etc/security/limits.conf
# 在文件末尾中增加下面内容
# 每个进程可以打开的文件数的限制
es soft nofile 65536
es hard nofile 65536
修改/etc/security/limits.d/20-nproc.conf
# 在文件末尾中增加下面内容
# 每个进程可以打开的文件数的限制
es soft nofile 65536
es hard nofile 65536
# 操作系统级别对每个用户创建的进程数的限制
* hard nproc 4096
# 注:* 带表 Linux 所有用户名称
修改/etc/sysctl.conf
# 在文件中增加下面内容
# 一个进程可以拥有的 VMA(虚拟内存区域)的数量,默认值为 65536
vm.max_map_count=655360
重新加载
sysctl -p
- 启动软件
使用 ES 用户启动
cd /opt/module/es/
#启动
bin/elasticsearch
#后台启动
bin/elasticsearch -d
启动时,会动态生成文件,如果文件所属用户不匹配,会发生错误,需要重新进行修改用户 和用户组
集群
- 创建用户
- 修改配置文件
修改/opt/module/es/config/elasticsearch.yml 文件,分发文件
# 加入如下配置
#集群名称
cluster.name: cluster-es
#节点名称,每个节点的名称不能重复
node.name: node-1
#ip 地址,每个节点的地址不能重复
network.host: linux1
#是不是有资格主节点
node.master: true
node.data: true
http.port: 9200
# head 插件需要这打开这两个配置
http.cors.allow-origin: "*"
http.cors.enabled: true
http.max_content_length: 200mb
#es7.x 之后新增的配置,初始化一个新的集群时需要此配置来选举 master
cluster.initial_master_nodes: ["node-1"]
#es7.x 之后新增的配置,节点发现
discovery.seed_hosts: ["linux1:9300","linux2:9300","linux3:9300"]
gateway.recover_after_nodes: 2
network.tcp.keep_alive: true
network.tcp.no_delay: true
transport.tcp.compress: true
#集群内同时启动的数据任务个数,默认是 2 个
cluster.routing.allocation.cluster_concurrent_rebalance: 16
#添加或删除节点及负载均衡时并发恢复的线程个数,默认 4 个
cluster.routing.allocation.node_concurrent_recoveries: 16
#初始化数据恢复时,并发恢复线程的个数,默认 4 个
cluster.routing.allocation.node_initial_primaries_recoveries: 16
修改/etc/security/limits.conf ,分发文件
# 在文件末尾中增加下面内容
es soft nofile 65536
es hard nofile 65536
修改/etc/security/limits.d/20-nproc.conf,分发文件
# 在文件末尾中增加下面内容
es soft nofile 65536
es hard nofile 65536
* hard nproc 4096
# 注:* 带表 Linux 所有用户名称
修改/etc/sysctl.conf
# 在文件中增加下面内容
vm.max_map_count=655360
重新加载
sysctl -p
- 分别在不同节点上启动 ES 软件
cd /opt/module/es-cluster
#启动
bin/elasticsearch
#后台启动
bin/elasticsearch -d
- 测试集群
GET linux1:9200/_cat/nodes
进阶
核心概念
索引 Index
-
一个索引就是一个拥有几分相似特征的文档的集合。比如说,你可以有一个客户数据的 索引,另一个产品目录的索引,还有一个订单数据的索引。一个索引由一个名字来标识(必 须全部是小写字母),并且当我们要对这个索引中的文档进行索引、搜索、更新和删除的时 候,都要使用到这个名字。在一个集群中,可以定义任意多的索引。
-
能搜索的数据必须索引,这样的好处是可以提高查询速度,比如:新华字典前面的目录 就是索引的意思,目录可以提高查询速度。
类型 Type
-
在一个索引中,你可以定义一种或多种类型。
-
一个类型是你的索引的一个逻辑上的分类/分区,其语义完全由你来定。通常,会为具 有一组共同字段的文档定义一个类型。不同的版本,类型发生了不同的变化
文档 Document
-
一个文档是一个可被索引的基础信息单元,也就是一条数据
-
比如:你可以拥有某一个客户的文档,某一个产品的一个文档,当然,也可以拥有某个 订单的一个文档。文档以 JSON(Javascript Object Notation)格式来表示,而 JSON 是一个 到处存在的互联网数据交互格式
-
在一个 index/type 里面,你可以存储任意多的文档。
字段 Field
- 相当于是数据表的字段,对文档数据根据不同属性进行的分类标识。
映射 Mapping
- mapping 是处理数据的方式和规则方面做一些限制,如:某个字段的数据类型、默认值、 分析器、是否被索引等等。这些都是映射里面可以设置的,其它就是处理 ES 里面数据的一 些使用规则设置也叫做映射,按着最优规则处理数据对性能提高很大,因此才需要建立映射, 并且需要思考如何建立映射才能对性能更好。
分片 Shards
-
一个索引可以存储超出单个节点硬件限制的大量数据。比如,一个具有 10 亿文档数据 的索引占据 1TB 的磁盘空间,而任一节点都可能没有这样大的磁盘空间。或者单个节点处 理搜索请求,响应太慢。为了解决这个问题,Elasticsearch 提供了将索引划分成多份的能力, 每一份就称之为分片。当你创建一个索引的时候,你可以指定你想要的分片的数量。每个分 片本身也是一个功能完善并且独立的“索引”,这个“索引”可以被放置到集群中的任何节点 上。
-
分片很重要,主要有两方面的原因
- 允许你水平分割 / 扩展你的内容容量
- 允许你在分片之上进行分布式的、并行的操作,进而提高性能/吞吐量
副本 Replicas
-
在一个网络 / 云的环境里,失败随时都可能发生,在某个分片/节点不知怎么的就处于 离线状态,或者由于任何原因消失了,这种情况下,有一个故障转移机制是非常有用并且是 强烈推荐的。为此目的,Elasticsearch 允许你创建分片的一份或多份拷贝,这些拷贝叫做复 制分片(副本)。
-
复制分片之所以重要,有两个主要原因:
- 在分片/节点失败的情况下,提供了高可用性。因为这个原因,注意到复制分片从不与 原/主要(original/primary)分片置于同一节点上是非常重要的。
- 扩展你的搜索量/吞吐量,因为搜索可以在所有的副本上并行运行
-
每个索引可以被分成多个分片。一个索引也可以被复制 0 次(意思是没有复制) 或多次。一旦复制了,每个索引就有了主分片(作为复制源的原来的分片)和复制分片(主 分片的拷贝)之别。分片和复制的数量可以在索引创建的时候指定。在索引创建之后,你可以在任何时候动态地改变复制的数量,但是你事后不能改变分片的数量。默认情况下, Elasticsearch 中的每个索引被分片 1 个主分片和 1 个复制,这意味着,如果你的集群中至少 有两个节点,你的索引将会有 1 个主分片和另外 1 个复制分片(1 个完全拷贝),这样的话每个索引总共就有 2 个分片,我们需要根据索引需要确定分片个数。
分配 Allocation
- 将分片分配给某个节点的过程,包括分配主分片或者副本。如果是副本,还包含从主分片复制数据的过程。这个过程是由 master 节点完成的。
系统结构
- 一个运行中的 Elasticsearch 实例称为一个节点,而集群是由一个或者多个拥有相同 cluster.name 配置的节点组成, 它们共同承担数据和负载的压力。当有节点加入集群中或者 从集群中移除节点时,集群将会重新平均分布所有的数据。
- 当一个节点被选举成为主节点时, 它将负责管理集群范围内的所有变更,例如增加、 删除索引,或者增加、删除节点等。 而主节点并不需要涉及到文档级别的变更和搜索等操作,所以当集群只拥有一个主节点的情况下,即使流量的增加它也不会成为瓶颈。 任何节点都可以成为主节点。我们的示例集群就只有一个节点,所以它同时也成为了主节点。
- 我们可以将请求发送到集群中的任何节点 ,包括主节点。 每个节点都知道 任意文档所处的位置,并且能够将我们的请求直接转发到存储我们所需文档的节点。 无论 我们将请求发送到哪个节点,它都能负责从各个包含我们所需文档的节点收集回数据,并将 最终结果返回給客户端。 Elasticsearch 对这一切的管理都是透明的。
分布式集群
单节点集群
在包含一个空节点的集群内创建名为 users 的索引,为了演示目的,我们将分配 3 个主分片和一份副本(每个主分片拥有一个副本分片)
{
"settings" : {
"number_of_shards" : 3,
"number_of_replicas" : 1
}
}
故障转移
- 当集群中只有一个节点在运行时,意味着会有一个单点故障问题——没有冗余。 幸运 的是,我们只需再启动一个节点即可防止数据丢失。当你在同一台机器上启动了第二个节点 时,只要它和第一个节点有同样的 cluster.name 配置,它就会自动发现集群并加入到其中。 但是在不同机器上启动节点的时候,为了加入到同一集群,你需要配置一个可连接到的单播 主机列表。之所以配置为使用单播发现,以防止节点无意中加入集群。只有在同一台机器上 运行的节点才会自动组成集群。
- 如果启动了第二个节点,我们的集群将会拥有两个节点的集群 : 所有主分片和副本分 片都已被分配
水平扩容
当启动了第三个节点,我们的集群将 会拥有三个节点的集群 : 为了分散负载而对分片进行重新分配
- 主分片的数目在索引创建时就已经确定了下来。实际上,这个数目定义了这个索引能够 存储 的最大数据量。
- 当你拥有越多的副本分片 时,也将拥有越高的吞吐量。
- 在运行中的集群上是可以动态调整副本分片数目的,我们可以按需伸缩集群。让我们把 副本数从默认的 1 增加到 2
{
"number_of_replicas" : 2
}
应对故障
关闭第一个节点,这时集群的状态为:关闭了一个节点后的集群。
- 集群必须拥有一个主节点来保证正常工作,所以发生 的第一件事情就是选举一个新的主节点: Node 2 。在我们关闭 Node 1 的同时也失去了主 分片 1 和 2 ,并且在缺失主分片的时候索引也不能正常工作。 如果此时来检查集群的状况,我们看到的状态将会为 red :不是所有主分片都在正常工作。
- 幸运的是,在其它节点上存在着这两个主分片的完整副本, 所以新的主节点立即将这 些分片在 Node 2 和 Node 3 上对应的副本分片提升为主分片, 此时集群的状态将会为 yellow。
为什么我们集群状态是 yellow 而不是 green 呢?
- 虽然我们拥有所有的三个主分片,但是同时设置了每个主分片需要对应 2 份副本分片,而此 时只存在一份副本分片。 所以集群不能为 green 的状态,不过我们不必过于担心:如果我 们同样关闭了 Node 2 ,我们的程序 依然 可以保持在不丢任何数据的情况下运行,因为 Node 3 为每一个分片都保留着一份副本。
- 如果我们重新启动 Node 1 ,集群可以将缺失的副本分片再次进行分配,那么集群的状 态也将恢复成之前的状态。 如果 Node 1 依然拥有着之前的分片,它将尝试去重用它们, 同时仅从主分片复制发生了修改的数据文件。和之前的集群相比,只是 Master 节点切换了。
路由计算
- routing 是一个可变值,默认是文档的 _id ,也可以设置成一个自定义的值。 routing 通过 hash 函数生成一个数字,然后这个数字再除以 number_of_primary_shards (主分片的数量)后得到余数 。这个分布在 0 到 number_of_primary_shards-1 之间的余数,就是我们所寻求 的文档所在分片的位置
- 所有的文档 API( get 、 index 、 delete 、 bulk 、 update 以及 mget )都接受一 个叫做 routing 的路由参数 ,通过这个参数我们可以自定义文档到分片的映射。一个自定 义的路由参数可以用来确保所有相关的文档——例如所有属于同一个用户的文档——都被 存储到同一个分片中
分片控制
假设有一个集群由三个节点组成。 它包含一个叫 emps 的索引,有两个主分片, 每个主分片有两个副本分片。相同分片的副本不会放在同一节点。
我们可以发送请求到集群中的任一节点。 每个节点都有能力处理任意请求。 每个节点都知道集群中任一文档位置,所以可以直接将请求转发到需要的节点上。 在下面的例子中,将 所有的请求发送到 Node 1,我们将其称为 协调节点(coordinating node)
当发送请求的时候, 为了扩展负载,更好的做法是轮询集群中所有的节点。
写流程
读流程
更新流程
部分更新一个文档结合了先前说明的读取和写入流程:
多文档操作流程
分片原理
倒排索引
一个倒排索引由文档中所有不重复词的列表构成,对于其中每个词,有一个包含它的文 档列表。例如,假设我们有两个文档,每个文档的 content 域包含如下内容:
- The quick brown fox jumped over the lazy dog
- Quick brown foxes leap over lazy dogs in summer
倒排索引的问题:
分词和标准化的过程称为分析 , 你只能搜索在索引中出现的词条,所以索引文本和查询字符串必须标准化为相同的格式。
文档搜索
-
早期的全文检索会为整个文档集合建立一个很大的倒排索引并将其写入到磁盘。 一旦 新的索引就绪,旧的就会被其替换,这样最近的变化便可以被检索到。
-
倒排索引被写入磁盘后是 不可改变 的:它永远不会修改。
- 当然,一个不变的索引也有不好的地方。主要事实是它是不可变的! 你不能修改它。如 果你需要让一个新的文档 可被搜索,你需要重建整个索引。这要么对一个索引所能包含的 数据量造成了很大的限制,要么对索引可被更新的频率造成了很大的限制。
动态更新索引
- 当一个查询被触发,所有已知的段按顺序被查询。词项统计会对所有段的结果进行聚合,以 保证每个词和每个文档的关联都被准确计算。 这种方式可以用相对较低的成本将新文档添加到索引。
- 段是不可改变的,所以既不能从把文档从旧的段中移除,也不能修改旧的段来进行反映文档 的更新。 取而代之的是,每个提交点会包含一个 .del 文件,文件中会列出这些被删除文档 的段信息。
- 当一个文档被 “删除” 时,它实际上只是在 .del 文件中被 标记 删除。一个被标记删除的文档仍然可以被查询匹配到, 但它会在最终结果被返回前从结果集中移除。
- 文档更新也是类似的操作方式:当一个文档被更新时,旧版本文档被标记删除,文档的新版 本被索引到一个新的段中。 可能两个版本的文档都会被一个查询匹配到,但被删除的那个 旧版本文档在结果集返回前就已经被移除。
近实时搜索
并不是所有的情况都需要每秒刷新。可能你正在使用 Elasticsearch 索引大量的日志文件, 你可能想优化索引速度而不是近实时搜索, 可以通过设置 refresh_interval , 降低每个索 引的刷新频率
{
"settings": {
"refresh_interval": "30s"
}
}
refresh_interval 可以在既存索引上进行动态更新。 在生产环境中,当你正在建立一个大的 新索引时,可以先关闭自动刷新,待开始使用该索引时,再把它们调回来
# 关闭自动刷新
PUT /users/_settings
{ "refresh_interval": -1 }
# 每一秒刷新
PUT /users/_settings
{ "refresh_interval": "1s" }
持久化变更
段合并
文档分析
分析包含下面的过程:
- 将一块文本分成适合于倒排索引的独立的词条
- 将这些词条统一化为标准格式以提高它们的“可搜索性”,或者 recall
分析器执行上面的工作。分析器实际上是将三个功能封装到了一个包里:
- 字符过滤器
首先,字符串按顺序通过每个字符过滤器 。他们的任务是在分词前整理字符串。一个 字符过滤器可以用来去掉 HTML,或者将 & 转化成 and。
- 分词器
其次,字符串被分词器分为单个的词条。一个简单的分词器遇到空格和标点的时候, 可能会将文本拆分成词条。
- Token 过滤器
最后,词条按顺序通过每个 token 过滤器 。这个过程可能会改变词条(例如,小写化 Quick ),删除词条(例如, 像 a, and, the 等无用词),或者增加词条(例如,像 jump 和 leap 这种同义词)
内置分析器
注意看 transparent、 calling 和 set_trans 已经变为词根格式
分析器使用场景
- 当我们 索引 一个文档,它的全文域被分析成词条以用来创建倒排索引。 但是,当我 们在全文域 搜索 的时候,我们需要将查询字符串通过 相同的分析过程 ,以保证我们搜索 的词条格式与索引中的词条格式一致。
- 全文查询,理解每个域是如何定义的,因此它们可以做正确的事:
- 当你查询一个全文域时, 会对查询字符串应用相同的分析器,以产生正确的搜 索词条列表。
- 当你查询一个精确值域时,不会分析查询字符串,而是搜索你指定的精确值。
测试分析器
可以使用 analyze API 来看文本是如何被分析的。 在消息体里,指定分析器和要分析的文本
//GET http://localhost:9200/_analyze
{
"analyzer": "standard",
"text": "Text to analyze"
}
结果中每个元素代表一个单独的词条:
{
"tokens": [
{
"token": "text",
"start_offset": 0,
"end_offset": 4,
"type": "<ALPHANUM>",
"position": 0
},
{
"token": "to",
"start_offset": 5,
"end_offset": 7,
"type": "<ALPHANUM>",
"position": 1
},
{
"token": "analyze",
"start_offset": 8,
"end_offset": 15,
"type": "<ALPHANUM>",
"position": 2
}
]
}
token 是实际存储到索引中的词条。 position 指明词条在原始文本中出现的位置。 start_offset 和 end_offset 指明字符在原始字符串中的位置。
IK 分词器
首先我们通过 Postman 发送 GET 请求查询默认分词效果
//GET http://localhost:9200/_analyze
{
"text":"测试单词"
}
将解压后的后的文件夹放入 ES 根目录下的 plugins 目录下,重启 ES 即可使用。
我们这次加入新的查询参数"analyzer":“ik_max_word”
//GET http://localhost:9200/_analyze
{
"text":"测试单词",
"analyzer":"ik_max_word"
}
- ik_max_word:会将文本做最细粒度的拆分
- ik_smart:会将文本做最粗粒度的拆分
- 自定义词语
首先进入 ES 根目录中的 plugins 文件夹下的 ik 文件夹,进入 config 目录,创建 custom.dic 文件,写入弗雷尔卓德。同时打开 IKAnalyzer.cfg.xml 文件,将新建的 custom.dic 配置其中, 重启 ES 服务器。
自定义分析器
ngram 和 edge_ngram 词单元过滤器 可以产生 适合用于部分匹配或者自动补全的词单元。
// PUT http://localhost:9200/my_index
{
"settings": {
"analysis": {
"char_filter": {
"&_to_and": {
"type": "mapping",
"mappings": [ "&=> and "]
}},
"filter": {
"my_stopwords": {
"type": "stop",
"stopwords": [ "the", "a" ]
}},
"analyzer": {
"my_analyzer": {
"type": "custom",
"char_filter": [ "html_strip", "&_to_and" ],
"tokenizer": "standard",
"filter": [ "lowercase", "my_stopwords" ]
}}
}}}
// GET http://127.0.0.1:9200/my_index/_analyze
{
"text":"The quick & brown fox",
"analyzer": "my_analyzer"
}
文档处理
文档冲突
- 当我们使用 index API 更新文档 ,可以一次性读取原始文档,做我们的修改,然后重新索引 整个文档 。 最近的索引请求将获胜:无论最后哪一个文档被索引,都将被唯一存储在 Elasticsearch 中。如果其他人同时更改这个文档,他们的更改将丢失。
- 变更越频繁,读数据和更新数据的间隙越长,也就越可能丢失变更。
在数据库领域中,有两种方法通常被用来确保并发更新时变更不会丢失:
- 悲观并发控制 这种方法被关系型数据库广泛使用,它假定有变更冲突可能发生,因此阻塞访问资源以 防止冲突。 一个典型的例子是读取一行数据之前先将其锁住,确保只有放置锁的线程能够 对这行数据进行修改。
- 乐观并发控制 Elasticsearch 中使用的这种方法假定冲突是不可能发生的,并且不会阻塞正在尝试的操 作。 然而,如果源数据在读写当中被修改,更新将会失败。应用程序接下来将决定该如何 解决冲突。 例如,可以重试更新、使用新的数据、或者将相关情况报告给用户。
乐观并发控制
- Elasticsearch 是分布式的。当文档创建、更新或删除时, 新版本的文档必须复制到集群中的其他节点。Elasticsearch 也是异步和并发的,这意味着这些复制请求被并行发送,并且到达目的地时也许顺序是乱的 。 Elasticsearch 需要一种方法确保文档的旧版本不会覆 盖新的版本。
- 当我们之前讨论 index ,GET 和 delete 请求时,我们指出每个文档都有一个 _version (版本)号,当文档被修改时版本号递增。 Elasticsearch 使用这个 version 号来确保变更 以正确顺序得到执行。如果旧版本的文档在新版本之后到达,它可以被简单的忽略。
- 我们可以利用 version 号来确保应用中相互冲突的变更不会导致数据丢失。我们通过 指定想要修改文档的 version 号来达到这个目的。 如果该版本不是当前版本号,我们的请求将会失败。
外部系统版本控制
-
一个常见的设置是使用其它数据库作为主要的数据存储,使用 Elasticsearch 做数据检索, 这意味着主数据库的所有更改发生时都需要被复制到 Elasticsearch ,如果多个进程负 责这一数据同步,你可能遇到类似于之前描述的并发问题。
-
如果你的主数据库已经有了版本号 — 或一个能作为版本号的字段值比如 timestamp — 那么你就可以在 Elasticsearch 中通过增加 version_type=external 到查询字符串的方式重用 这些相同的版本号, 版本号必须是大于零的整数, 且小于 9.2E+18 — 一个 Java 中 long 类型的正值。
-
外部版本号的处理方式和我们之前讨论的内部版本号的处理方式有些不同, Elasticsearch 不是检查当前 _version 和请求中指定的版本号是否相同, 而是检查当前 _version 是否 小于 指定的版本号。 如果请求成功,外部的版本号作为文档的新 _version 进行存储。
-
外部版本号不仅在索引和删除请求是可以指定,而且在 创建 新文档时也可以指定。
Kibana
Kibana 是一个免费且开放的用户界面,能够让你对 Elasticsearch 数据进行可视化,并 让你在 Elastic Stack 中进行导航。
- 解压缩下载的 zip 文件
- 修改 config/kibana.yml 文
# 默认端口
server.port: 5601
# ES 服务器的地址
elasticsearch.hosts: ["http://localhost:9200"]
# 索引名
kibana.index: ".kibana"
# 支持中文
i18n.locale: "zh-CN"
访问localhost:5601
Spring Data 框架集成
Spring Data 是一个用于简化数据库、非关系型数据库、索引库访问,并支持云服务的 开源框架。其主要目标是使得对数据的访问变得方便快捷,并支持 map-reduce 框架和云计 算数据服务。 Spring Data 可以极大的简化 JPA(Elasticsearch„)的写法,可以在几乎不用 写实现的情况下,实现对数据的访问和操作。除了 CRUD 外,还包括如分页、排序等一些常用的功能。
Spring Data 的官网:https://spring.io/projects/spring-data
Spring Data Elasticsearch
Spring Data Elasticsearch 基于 spring data API 简化 Elasticsearch 操作,将原始操作 Elasticsearch 的客户端 API 进行封装 。Spring Data 为 Elasticsearch 项目提供集成搜索引擎。 Spring Data Elasticsearch POJO 的关键功能区域为中心的模型与 Elastichsearch 交互文档和轻 松地编写一个存储索引库数据访问层。
Spring boot2.3.x 一般可以兼容 Elasticsearch7.x
配置
- 设置依赖
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.3.6.RELEASE</version>
<relativePath/>
</parent>
<groupId>com.atguigu</groupId>
<artifactId>es</artifactId>
<version>1.0</version>
<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
</properties>
<dependencies>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-elasticsearch</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-devtools</artifactId>
<scope>runtime</scope>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-test</artifactId>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-test</artifactId>
</dependency>
</dependencies>
</project>
- 增加配置文件
在 resources 目录中增加 application.properties 文件
# es 服务地址
elasticsearch.host=127.0.0.1
# es 服务端口
elasticsearch.port=9200
# 配置日志级别,开启 debug 日志
logging.level.com.atguigu.es=debug
实例
SpringBoot 主程序
@SpringBootApplication
public class SpringDataElasticSearchMainApplication {
public static void main(String[] args) {
SpringApplication.run(SpringDataElasticSearchMainApplication.class,args);
}
}
数据实体类
@Data
@NoArgsConstructor
@AllArgsConstructor
@ToString
@Document(indexName = "product", shards = 3, replicas = 1)//索引名称,分片,副本
public class Product {
//必须有 id,这里的 id 是全局唯一的标识,等同于 es 中的"_id"
@Id
private Long id;//商品唯一标识
/**
* type : 字段数据类型
* analyzer : 分词器类型
* index : 是否索引(默认:true)
* Keyword : 短语,不进行分词
*/
@Id
private Long id;//商品唯一标识
@Field(type = FieldType.Text, analyzer = "ik_max_word")
private String title;//商品名称
@Field(type = FieldType.Keyword)
private String category;//分类名称
@Field(type = FieldType.Double)
private Double price;//商品价格
@Field(type = FieldType.Keyword, index = false)
private String images;//图片地址
}
配置类
-
ElasticsearchRestTemplate 是 spring-data-elasticsearch 项目中的一个类,和其他 spring 项目中的 template 类似。
-
ElasticsearchRestTemplate 基 于 RestHighLevelClient 客户端的。需要自定义配置类,继承 AbstractElasticsearchConfiguration,并实现 elasticsearchClient()抽象方法,创建 RestHighLevelClient 对 象。
@ConfigurationProperties(prefix = "elasticsearch")
@Configuration
@Data
public class ElasticsearchConfig extends AbstractElasticsearchConfiguration {
private String host ;
private Integer port ;
//重写父类方法
@Override
public RestHighLevelClient elasticsearchClient() {
RestClientBuilder builder = RestClient.builder(new HttpHost(host, port));
RestHighLevelClient restHighLevelClient = new RestHighLevelClient(builder);
return restHighLevelClient;
}
}
DAO 数据访问对象
@Repository
public interface ProductDao extends ElasticsearchRepository<Product,Long> {
}
索引操作
@RunWith(SpringRunner.class)
@SpringBootTest
public class SpringDataESIndexTest {
@Autowired
private ElasticsearchRestTemplate elasticsearchRestTemplate;
//创建索引,系统初始化会自动创建索引
@Test
public void createIndex(){
System.out.println("创建索引");
}
@Test
public void deleteIndex(){
boolean flg = elasticsearchRestTemplate.deleteIndex(Product.class);
System.out.println("删除索引 = " + flg);
}
}
文档CRUD
新增
@Autowired
private ProductDao productDao;
/**
* 新增
*/
@Test
public void save(){
Product product = new Product();
product.setId(2L);
product.setTitle("华为手机");
product.setCategory("手机");
product.setPrice(2999.0);
product.setImages("http://www.atguigu/hw.jpg");
productDao.save(product);
}
修改
//修改 ID相同修改 ID不同新增
@Test
public void update(){
Product product = new Product();
product.setId(2L);
product.setTitle("小米 2 手机");
product.setCategory("手机");
product.setPrice(9999.0);
product.setImages("http://www.atguigu/xm.jpg");
productDao.save(product);
}
查询
//根据 id 查询
@Test
public void findById(){
Product product = productDao.findById(1L).get();
System.out.println(product);
}
//查询所有
@Test
public void findAll(){
Iterable<Product> products = productDao.findAll();
for (Product product : products) {
System.out.println(product);
}
}
删除
//删除
@Test
public void delete(){
Product product = new Product();
product.setId(1L);
productDao.delete(product);
}
批量新增
//批量新增
@Test
public void saveAll(){
List<Product> productList = new ArrayList<>();
for (int i = 0; i < 10; i++) {
Product product = new Product();
product.setId(Long.valueOf(i));
product.setTitle("["+i+"]小米手机");
product.setCategory("手机");
product.setPrice(1999.0+i);
product.setImages("http://www.atguigu/xm.jpg");
productList.add(product);
}
productDao.saveAll(productList);
}
分页查询
//分页查询
@Test
public void findByPageable(){
//设置排序(排序方式,正序还是倒序,排序的 id)
Sort sort = Sort.by(Sort.Direction.DESC,"id");
int currentPage=0;//当前页,第一页从 0 开始,1 表示第二页
int pageSize = 5;//每页显示多少条
//设置查询分页
PageRequest pageRequest = PageRequest.of(currentPage, pageSize,sort);
//分页查询
Page<Product> productPage = productDao.findAll(pageRequest);
for (Product Product : productPage.getContent()) {
System.out.println(Product);
}
}
文档搜索
@Autowired
private ProductDao productDao;
/**
* term 查询
* search(termQueryBuilder) 调用搜索方法,参数查询构建器对象
*/
@Test
public void termQuery(){
TermQueryBuilder termQueryBuilder = QueryBuilders.termQuery("title", " 小米");
Iterable<Product> products = productDao.search(termQueryBuilder);
for (Product product : products) {
System.out.println(product);
}
}
/**
* term 查询加分页
*/
@Test
public void termQueryByPage(){
int currentPage= 0 ;
int pageSize = 5;
//设置查询分页
PageRequest pageRequest = PageRequest.of(currentPage, pageSize);
TermQueryBuilder termQueryBuilder = QueryBuilders.termQuery("title", " 小米");
Iterable<Product> products =
productDao.search(termQueryBuilder,pageRequest);
for (Product product : products) {
System.out.println(product);
}
}
面试题
为什么要使用 ?
系统中的数据,随着业务的发展,时间的推移,将会非常多,而业务中往往采用模糊查询进行数据的 搜索,而模糊查询会导致查询引擎放弃索引,导致系统查询数据时都是全表扫描,在百万级别的数据库中, 查询效率是非常低下的,而我们使用 ES 做一个全文索引,将经常查询的系统功能的某些字段,比如说电 商系统的商品表中商品名,描述、价格还有 id 这些字段我们放入 ES 索引库里,可以提高查询速度。
master 选举流程?
- Elasticsearch 的选主是 ZenDiscovery 模块负责的,主要包含 Ping(节点之间通过这个 RPC 来发现彼此) 和 Unicast(单播模块包含一个主机列表以控制哪些节点需要 ping 通)这两部分
- 对所有可以成为 master 的节点(node.master: true)根据 nodeId 字典排序,每次选举每个节点都把自己所知道节点排一次序,然后选出第一个(第 0 位)节点,暂且认为它是 master 节点。
- 如果对某个节点的投票数达到一定的值(可以成为 master 节点数 n/2+1)并且该节点自己也选举自己, 那这个节点就是 master。否则重新选举一直到满足上述条件。
- master 节点的职责主要包括集群、节点和索引的管理,不负责文档级别的管理;data 节点可以关闭 http 功能。
集群脑裂问题?
“脑裂”问题可能的成因
- 网络问题:集群间的网络延迟导致一些节点访问不到 master,认为 master 挂掉了从而选举出新的 master,并对 master 上的分片和副本标红,分配新的主分片
- 节点负载:主节点的角色既为 master 又为 data,访问量较大时可能会导致 ES 停止响应造成大面积延 迟,此时其他节点得不到主节点的响应认为主节点挂掉了,会重新选取主节点。
- 内存回收:data 节点上的 ES 进程占用的内存较大,引发 JVM 的大规模内存回收,造成 ES 进程失去 响应。
脑裂问题解决方案:
- 减少误判:discovery.zen.ping_timeout 节点状态的响应时间,默认为 3s,可以适当调大,如果 master 在该响应时间的范围内没有做出响应应答,判断该节点已经挂掉了。调大参数(如 6s, discovery.zen.ping_timeout:6),可适当减少误判。
- 选举触发: discovery.zen.minimum_master_nodes:1 该参数是用于控制选举行为发生的最小集群主节点数量。当备选主节点的个数大于等于该参数的值, 且备选主节点中有该参数个节点认为主节点挂了,进行选举。官方建议为(n/2)+1,n 为主节点个数 (即有资格成为主节点的节点个数)
- 角色分离:即 master 节点与 data 节点分离,限制角色
- 主节点配置为:node.master: true node.data: false
- 从节点配置为:node.master: false node.data: true
索引文档的流程?
更新和删除文档的流程?
搜索的流程?
GC 方面,在使用 Elasticsearch 时要注意什么
对于大数据量(上亿量级)的聚合如何实现
在并发情况下,Elasticsearch 如果保证读写一致
如何监控 Elasticsearch 集群状态
- elasticsearch-head 插件
- 通过 Kibana 监控 Elasticsearch。你可以实时查看你的集群健康状态和性能,也可以分析过去的集群、 索引和节点指标
集群、节点、索引、文档、类型是什么?
倒排索引是什么?
倒排索引是搜索引擎的核心。搜索引擎的主要目标是在查找发生搜索条件的文档时提供快速搜索。ES 中的倒排索引其实就是 lucene 的倒排索引,区别于传统的正向索引,倒排索引会再存储数据时将关键词和倒排索引是搜索引擎的核心。搜索引擎的主要目标是在查找发生搜索条件的文档时提供快速搜索。ES 中的倒排索引其实就是 lucene 的倒排索引,区别于传统的正向索引,倒排索引会再存储数据时将关键词和
Q.E.D.