最近升级了Elasticsearch版本,从7.X升级到8.X的变化还是比较大的,原来7版本用的是RestHighLevelClient,8.X弃用RestHighLevelClient转而支持ElasticsearchClient,并且api调用方式经过建造者模式的改造,变成了链式调用。
因此为了更好地使用ElasticsearchClient的api操作Elasticsearch,封装了一个工具类,包含了常用的一些数据操作的方法。废话不多说直接上代码。。。
1、pom依赖
<dependency>
<groupId>co.elastic.clients</groupId>
<artifactId>elasticsearch-java</artifactId>
<version>8.15.2</version>
</dependency>
<dependency>
<artifactId>elasticsearch-rest-client</artifactId>
<groupId>org.elasticsearch.client</groupId>
<version>8.15.2</version>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>2.0.30</version>
</dependency>
2、工具类代码
java">import co.elastic.clients.elasticsearch.ElasticsearchClient;
import co.elastic.clients.elasticsearch._types.Result;
import co.elastic.clients.elasticsearch._types.SortOrder;
import co.elastic.clients.elasticsearch._types.Time;
import co.elastic.clients.elasticsearch._types.query_dsl.BoolQuery;
import co.elastic.clients.elasticsearch.core.*;
import co.elastic.clients.elasticsearch.core.bulk.BulkOperation;
import co.elastic.clients.elasticsearch.indices.AnalyzeRequest;
import co.elastic.clients.elasticsearch.indices.AnalyzeResponse;
import co.elastic.clients.elasticsearch.indices.CreateIndexResponse;
import co.elastic.clients.elasticsearch.indices.analyze.AnalyzeToken;
import co.elastic.clients.json.jackson.JacksonJsonpMapper;
import co.elastic.clients.transport.rest_client.RestClientTransport;
import com.alibaba.fastjson.JSONObject;
import org.apache.http.HttpHost;
import org.apache.http.auth.AuthScope;
import org.apache.http.auth.UsernamePasswordCredentials;
import org.apache.http.client.CredentialsProvider;
import org.apache.http.impl.client.BasicCredentialsProvider;
import org.apache.http.impl.nio.client.HttpAsyncClientBuilder;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestClientBuilder;
import java.io.IOException;
import java.io.StringReader;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
/**
* Elasticsearch工具类
* Elasticsearch版本:8.15.3
*/
public class ElasticsearchJavaClient {
private ElasticsearchClient client;
/**
* 构造方法,获取客户端(未开启认证)
* @param httpUrls
*/
public ElasticsearchJavaClient(String[] httpUrls){
HttpHost[] httpHosts = Arrays.stream(httpUrls).map(HttpHost::create).toArray(HttpHost[]::new);
this.client = new ElasticsearchClient(new RestClientTransport(RestClient.builder(httpHosts).build(),
new JacksonJsonpMapper()));
}
/**
* 构造方法,获取客户端(开启认证,通过用户名密码进行认证并获取客户端)
* @param httpUrls
* @param username
* @param password
*/
public ElasticsearchJavaClient(String[] httpUrls, String username, String password){
HttpHost[] httpHosts = Arrays.stream(httpUrls).map(HttpHost::create).toArray(HttpHost[]::new);
final CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
credentialsProvider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials(username, password));
RestClientBuilder builder = RestClient.builder(httpHosts);
builder.setHttpClientConfigCallback(new RestClientBuilder.HttpClientConfigCallback() {
@Override
public HttpAsyncClientBuilder customizeHttpClient(HttpAsyncClientBuilder httpAsyncClientBuilder) {
return httpAsyncClientBuilder.setDefaultCredentialsProvider(credentialsProvider);
}
});
this.client = new ElasticsearchClient(new RestClientTransport(builder.build(), new JacksonJsonpMapper()));
}
/**
* 创建索引
* @param indexName 索引名
* @param numberOfShards 分片数
* @param numberOfReplicas 副本数
* @param mapping mapping设计json字符串
* @return
*/
public boolean createIndex(String indexName, Integer numberOfShards,
Integer numberOfReplicas, String mapping) {
CreateIndexResponse response = null;
try {
response = client.indices()
.create(builder -> builder.index(indexName)
.settings(b -> b.numberOfReplicas(numberOfReplicas.toString())
.numberOfShards(numberOfShards.toString()))
.mappings(a -> a.withJson(new StringReader(mapping))));
} catch (IOException e) {
e.printStackTrace();
} finally {
client.shutdown();
}
return response.acknowledged();
}
/**
* 删除索引
* @param indexName 索引名
* @return
*/
public boolean deleteIndex(String indexName) {
try {
return client.indices().delete(a -> a.index(indexName)).acknowledged();
} catch (IOException e) {
e.printStackTrace();
} finally {
client.shutdown();
}
return false;
}
/**
* 判断索引是否已存在
* @param indexName 索引名
* @return
*/
public boolean indexExisit(String indexName) {
try {
return client.indices().exists(req -> req.index(indexName)).value();
} catch (IOException e) {
e.printStackTrace();
} finally {
client.shutdown();
}
return false;
}
/**
* 由于数据落盘有默认的1秒延迟,刷新后使数据能被检索到
* @param indexString
*/
public void refresh(String indexString){
try {
client.indices().refresh(req -> req.index(indexString));
} catch (IOException e) {
e.printStackTrace();
}finally {
client.shutdown();
}
}
/**
* 插入数据
* @param indexName
* @param data
* @return
*/
public String insertData(String indexName, JSONObject data){
try {
IndexResponse response = client.index(a -> a.index(indexName).document(data));
return response.id();
} catch (IOException e) {
e.printStackTrace();
}finally {
client.shutdown();
}
return null;
}
/**
* 根据索引和_id查询数据
* @param indexName
* @param id
* @return
*/
public Map<String, Object> getDocById(String indexName, String id) {
GetRequest request = GetRequest.of(g -> g.index(indexName).id(id));
try {
GetResponse<Map> response = client.get(request, Map.class);
if(response.found()){
return response.source();
}
} catch (IOException e) {
e.printStackTrace();
}finally {
client.shutdown();
}
return null;
}
/**
* 根据索引和_id查询数据
* @param indexName
* @param id
* @return
*/
public JSONObject getDocInfoById(String indexName, String id) {
GetRequest request = GetRequest.of(g -> g.index(indexName).id(id));
try {
GetResponse<JSONObject> response = client.get(request, JSONObject.class);
if(response.found()){
return response.source();
}
} catch (IOException e) {
e.printStackTrace();
}finally {
client.shutdown();
}
return null;
}
/**
* 根据索引和_id查询数据,并过滤掉无需返回的字段
* @param indexName
* @param id
* @param excludes
* @return
*/
public JSONObject getDocInfoById(String indexName, String id, String [] excludes) {
GetRequest request = GetRequest.of(g -> g.index(indexName).id(id).sourceExcludes(Arrays.asList(excludes)));
try {
GetResponse<JSONObject> response = client.get(request, JSONObject.class);
if(response.found()){
return response.source();
}
} catch (IOException e) {
e.printStackTrace();
}finally {
client.shutdown();
}
return null;
}
/**
* 根据索引和_id查询数据,并过指定要返回的字段
* @param indexName
* @param id
* @param includes
* @return
*/
public JSONObject getDocInfoByIdWithIncludes(String indexName, String id, String [] includes) {
GetRequest request = GetRequest.of(g -> g.index(indexName).id(id).sourceIncludes(Arrays.asList(includes)));
try {
GetResponse<JSONObject> response = client.get(request, JSONObject.class);
if(response.found()){
return response.source();
}
} catch (IOException e) {
e.printStackTrace();
}finally {
client.shutdown();
}
return null;
}
/**
* 判断数据是否存在
* @param indexName
* @param id
* @return
*/
public boolean exists(String indexName, String id) {
GetRequest request = GetRequest.of(g -> g.index(indexName).id(id));
try {
GetResponse<JSONObject> response = client.get(request, JSONObject.class);
return response.found();
} catch (IOException e) {
e.printStackTrace();
}finally {
client.shutdown();
}
return false;
}
/**
* 根据索引和_id删除数据
* @param indexName
* @param id
* @return
*/
public boolean deleteDocById(String indexName, String id) {
DeleteRequest request = DeleteRequest.of(a -> a.index(indexName).id(id));
try {
DeleteResponse response = client.delete(request);
if(response != null && response.result() != null){
return Result.Deleted.jsonValue().equals(response.result().jsonValue());
}
} catch (IOException e) {
e.printStackTrace();
}finally {
client.shutdown();
}
return false;
}
/**
* 更新数据
* @param indexName
* @param id
* @param newDoc
* @return
*/
public boolean updateDocById(String indexName, String id, JSONObject newDoc) {
UpdateRequest request = UpdateRequest.of(r -> r.id(id).index(indexName).doc(newDoc));
request.refresh();
try {
UpdateResponse response = client.update(request, JSONObject.class);
if(response != null && response.result() != null){
return Result.Updated.jsonValue().equals(response.result().jsonValue());
}
} catch (IOException e) {
e.printStackTrace();
}finally {
client.shutdown();
}
return false;
}
/**
* 对输入的text使用analyzerName进行分词,返回分词后的词项
* @param analyzerName
* @param text
* @return
*/
public List<AnalyzeToken> analyze(String analyzerName, String text){
AnalyzeRequest analyzeRequest = new AnalyzeRequest.Builder().analyzer(analyzerName).text(text).build();
AnalyzeResponse response = null;
try {
response = client.indices().analyze(analyzeRequest);
} catch (IOException e) {
e.printStackTrace();
}finally {
client.shutdown();
}
return response.tokens();
}
/**
* 批量删除
* @param requestList
* @return
*/
public boolean bulkDelete(List<DeleteRequest> requestList){
List<BulkOperation> ops = requestList
.stream()
.map(req -> BulkOperation.of(op -> op
.delete(d -> d.id(req.id()).index(req.index()))))
.collect(Collectors.toList());
try {
BulkResponse response = client.bulk(r -> r.operations(ops));
if(response != null ){
return true;
}
} catch (IOException e) {
e.printStackTrace();
}finally {
client.shutdown();
}
return false;
}
/**
* 批量更新
* @param requestList
* @return
*/
public boolean bulkUpdate(List<UpdateRequest> requestList){
List<BulkOperation> ops = requestList
.stream()
.map(req -> BulkOperation.of(op -> op
.update(d -> d.id(req.id())
.index(req.index())
.action(a -> a.doc(req.doc())))))
.collect(Collectors.toList());
try {
BulkResponse response = client.bulk(r -> r.operations(ops));
if(response != null ){
return true;
}
} catch (IOException e) {
e.printStackTrace();
}finally {
client.shutdown();
}
return false;
}
/**
* 批量插入数据
* @param requestList
* @return
*/
public boolean bulkInsert(List<IndexRequest> requestList){
List<BulkOperation> ops = requestList
.stream()
.map(req -> BulkOperation.of(op -> op
.index(i -> i.document(req.document()).index(req.index()))))
.collect(Collectors.toList());
try {
BulkResponse response = client.bulk(r -> r.operations(ops));
if(response != null ){
return true;
}
} catch (IOException e) {
e.printStackTrace();
}finally {
client.shutdown();
}
return false;
}
/**
* 通过脚本批量更新
* @param index
* @param query
* @param script
* @return
*/
public boolean updateByquery(String index, BoolQuery query, String script){
try {
UpdateByQueryResponse response = client.updateByQuery(q -> q.index(index)
.query(query._toQuery())
.script(s -> s.source(script)));
if(response != null ){
return true;
}
} catch (IOException e) {
e.printStackTrace();
}finally {
client.shutdown();
}
return false;
}
/**
* 检索
* @param indexName
* @param pageNo
* @param pageSize
* @param sortField
* @param sortOrder
* @param boolQuery
* @return
*/
public SearchResponse search(String indexName, Integer pageNo, Integer pageSize,
String sortField, SortOrder sortOrder, BoolQuery boolQuery) {
SearchRequest request = new SearchRequest.Builder().index(indexName)
.query(q -> q.bool(boolQuery))
.from((pageNo - 1) * pageSize)
.size(pageSize)
.sort(s -> s.field(f -> f.field(sortField)
.order(sortOrder))).build();
SearchResponse response = null;
try {
response = client.search(request, JSONObject.class);
} catch (IOException e) {
e.printStackTrace();
}
return response;
}
public SearchResponse search(String indexName, Integer pageNo, Integer pageSize,
String sortField, SortOrder sortOrder, BoolQuery boolQuery, String[] excludes) {
SearchRequest request = new SearchRequest.Builder().index(indexName)
.query(q -> q.bool(boolQuery))
.source(a -> a.filter(f -> f.excludes(Arrays.asList(excludes))))
.from((pageNo - 1) * pageSize)
.size(pageSize)
.sort(s -> s.field(f -> f.field(sortField)
.order(sortOrder))).build();
SearchResponse response = null;
try {
response = client.search(request, JSONObject.class);
} catch (IOException e) {
e.printStackTrace();
}
return response;
}
public SearchResponse search(String indexName, Integer pageNo, Integer pageSize, BoolQuery boolQuery,
String sortField, SortOrder sortOrder, String sortField2, SortOrder sortOrder2) {
SearchRequest request = new SearchRequest.Builder().index(indexName)
.query(q -> q.bool(boolQuery))
.from((pageNo - 1) * pageSize)
.size(pageSize)
.sort(s -> s.field(f -> f.field(sortField)
.order(sortOrder).field(sortField2).order(sortOrder2))).build();
SearchResponse response = null;
try {
response = client.search(request, JSONObject.class);
} catch (IOException e) {
e.printStackTrace();
}
return response;
}
public SearchResponse search(String indexName, Integer pageNo, Integer pageSize, BoolQuery boolQuery,
String sortField, SortOrder sortOrder, String sortField2, SortOrder sortOrder2,
String[] excludes) {
SearchRequest request = new SearchRequest.Builder().index(indexName)
.query(q -> q.bool(boolQuery))
.source(a -> a.filter(f -> f.excludes(Arrays.asList(excludes))))
.from((pageNo - 1) * pageSize)
.size(pageSize)
.sort(s -> s.field(f -> f.field(sortField)
.order(sortOrder).field(sortField2).order(sortOrder2))).build();
SearchResponse response = null;
try {
response = client.search(request, JSONObject.class);
} catch (IOException e) {
e.printStackTrace();
}
return response;
}
public SearchResponse search(String indexName, Integer pageNo, Integer pageSize, BoolQuery boolQuery,
String sortField, SortOrder sortOrder, String[] includes, String[] excludes) {
SearchRequest request = new SearchRequest.Builder().index(indexName)
.query(q -> q.bool(boolQuery))
.source(a -> a.filter(f -> f.excludes(Arrays.asList(excludes)).includes(Arrays.asList(includes))))
.from((pageNo - 1) * pageSize)
.size(pageSize)
.sort(s -> s.field(f -> f.field(sortField)
.order(sortOrder))).build();
SearchResponse response = null;
try {
response = client.search(request, JSONObject.class);
} catch (IOException e) {
e.printStackTrace();
}
return response;
}
public SearchResponse search(String indexName, Integer pageNo, Integer pageSize, BoolQuery boolQuery,
String sortField, SortOrder sortOrder, String time) {
SearchRequest request = new SearchRequest.Builder().index(indexName)
.query(q -> q.bool(boolQuery))
.from((pageNo - 1) * pageSize)
.size(pageSize)
.scroll(new Time.Builder().time(time).build())
.sort(s -> s.field(f -> f.field(sortField)
.order(sortOrder))).build();
SearchResponse response = null;
try {
response = client.search(request, JSONObject.class);
} catch (IOException e) {
e.printStackTrace();
}
return response;
}
/**
* 查询符合条件的数据条数
* @param indexName
* @param boolQuery
* @return
*/
public CountResponse count(String indexName, BoolQuery boolQuery) {
try {
return client.count(c -> c.index(indexName).query(q -> q.bool(boolQuery)));
} catch (IOException e) {
e.printStackTrace();
}
return null;
}
public SearchResponse search(String indexName, BoolQuery boolQuery) {
SearchRequest request = new SearchRequest.Builder().index(indexName)
.query(q -> q.bool(boolQuery))
.build();
SearchResponse response = null;
try {
response = client.search(request, JSONObject.class);
} catch (IOException e) {
e.printStackTrace();
}
return response;
}
public SearchResponse search(String indexName, BoolQuery boolQuery, int size) {
SearchRequest request = new SearchRequest.Builder().index(indexName)
.query(q -> q.bool(boolQuery))
.size(size)
.build();
SearchResponse response = null;
try {
response = client.search(request, JSONObject.class);
} catch (IOException e) {
e.printStackTrace();
}
return response;
}
}
如果本文对你有帮助,请点赞、收藏 + 关注,谢谢!!(本文将持续更新)