ElasticsearchJavaClient工具类分析

news/2025/1/9 20:16:21 标签: elasticsearch, java, 搜索引擎

最近升级了Elasticsearch版本,从7.X升级到8.X的变化还是比较大的,原来7版本用的是RestHighLevelClient,8.X弃用RestHighLevelClient转而支持ElasticsearchClient,并且api调用方式经过建造者模式的改造,变成了链式调用。

因此为了更好地使用ElasticsearchClient的api操作Elasticsearch,封装了一个工具类,包含了常用的一些数据操作的方法。废话不多说直接上代码。。。

1、pom依赖

        <dependency>
            <groupId>co.elastic.clients</groupId>
            <artifactId>elasticsearch-java</artifactId>
            <version>8.15.2</version>
        </dependency>
        <dependency>
            <artifactId>elasticsearch-rest-client</artifactId>
            <groupId>org.elasticsearch.client</groupId>
            <version>8.15.2</version>
        </dependency>
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>2.0.30</version>
        </dependency>

2、工具类代码

java">import co.elastic.clients.elasticsearch.ElasticsearchClient;
import co.elastic.clients.elasticsearch._types.Result;
import co.elastic.clients.elasticsearch._types.SortOrder;
import co.elastic.clients.elasticsearch._types.Time;
import co.elastic.clients.elasticsearch._types.query_dsl.BoolQuery;
import co.elastic.clients.elasticsearch.core.*;
import co.elastic.clients.elasticsearch.core.bulk.BulkOperation;
import co.elastic.clients.elasticsearch.indices.AnalyzeRequest;
import co.elastic.clients.elasticsearch.indices.AnalyzeResponse;
import co.elastic.clients.elasticsearch.indices.CreateIndexResponse;
import co.elastic.clients.elasticsearch.indices.analyze.AnalyzeToken;
import co.elastic.clients.json.jackson.JacksonJsonpMapper;
import co.elastic.clients.transport.rest_client.RestClientTransport;
import com.alibaba.fastjson.JSONObject;
import org.apache.http.HttpHost;
import org.apache.http.auth.AuthScope;
import org.apache.http.auth.UsernamePasswordCredentials;
import org.apache.http.client.CredentialsProvider;
import org.apache.http.impl.client.BasicCredentialsProvider;
import org.apache.http.impl.nio.client.HttpAsyncClientBuilder;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestClientBuilder;
import java.io.IOException;
import java.io.StringReader;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;

/**
 * Elasticsearch工具类
 * Elasticsearch版本:8.15.3
 */
public class ElasticsearchJavaClient {
    private ElasticsearchClient client;

    /**
     * 构造方法,获取客户端(未开启认证)
     * @param httpUrls
     */
    public ElasticsearchJavaClient(String[] httpUrls){
        HttpHost[] httpHosts = Arrays.stream(httpUrls).map(HttpHost::create).toArray(HttpHost[]::new);
        this.client = new ElasticsearchClient(new RestClientTransport(RestClient.builder(httpHosts).build(),
                new JacksonJsonpMapper()));
    }

    /**
     * 构造方法,获取客户端(开启认证,通过用户名密码进行认证并获取客户端)
     * @param httpUrls
     * @param username
     * @param password
     */
    public ElasticsearchJavaClient(String[] httpUrls, String username, String password){
        HttpHost[] httpHosts = Arrays.stream(httpUrls).map(HttpHost::create).toArray(HttpHost[]::new);
        final CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
        credentialsProvider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials(username, password));
        RestClientBuilder builder = RestClient.builder(httpHosts);
        builder.setHttpClientConfigCallback(new RestClientBuilder.HttpClientConfigCallback() {
            @Override
            public HttpAsyncClientBuilder customizeHttpClient(HttpAsyncClientBuilder httpAsyncClientBuilder) {
                return httpAsyncClientBuilder.setDefaultCredentialsProvider(credentialsProvider);
            }
        });
        this.client = new ElasticsearchClient(new RestClientTransport(builder.build(), new JacksonJsonpMapper()));
    }


    /**
     * 创建索引
     * @param indexName  索引名
     * @param numberOfShards  分片数
     * @param numberOfReplicas  副本数
     * @param mapping  mapping设计json字符串
     * @return
     */
    public boolean createIndex(String indexName, Integer numberOfShards,
                               Integer numberOfReplicas, String mapping) {
        CreateIndexResponse response = null;
        try {
            response = client.indices()
                    .create(builder -> builder.index(indexName)
                            .settings(b -> b.numberOfReplicas(numberOfReplicas.toString())
                                    .numberOfShards(numberOfShards.toString()))
                            .mappings(a -> a.withJson(new StringReader(mapping))));
        } catch (IOException e) {
            e.printStackTrace();
        } finally {
            client.shutdown();
        }
        return response.acknowledged();
    }

    /**
     * 删除索引
     * @param indexName  索引名
     * @return
     */
    public boolean deleteIndex(String indexName) {
        try {
            return client.indices().delete(a -> a.index(indexName)).acknowledged();
        } catch (IOException e) {
            e.printStackTrace();
        } finally {
            client.shutdown();
        }
        return false;
    }

    /**
     * 判断索引是否已存在
     * @param indexName 索引名
     * @return
     */
    public boolean indexExisit(String indexName) {
        try {
            return client.indices().exists(req -> req.index(indexName)).value();
        } catch (IOException e) {
            e.printStackTrace();
        } finally {
            client.shutdown();
        }
        return false;
    }

    /**
     * 由于数据落盘有默认的1秒延迟,刷新后使数据能被检索到
     * @param indexString
     */
    public void refresh(String indexString){
        try {
            client.indices().refresh(req -> req.index(indexString));
        } catch (IOException e) {
            e.printStackTrace();
        }finally {
            client.shutdown();
        }
    }

    /**
     * 插入数据
     * @param indexName
     * @param data
     * @return
     */
    public String insertData(String indexName, JSONObject data){
        try {
            IndexResponse response = client.index(a -> a.index(indexName).document(data));
            return response.id();
        } catch (IOException e) {
            e.printStackTrace();
        }finally {
            client.shutdown();
        }
        return null;
    }

    /**
     * 根据索引和_id查询数据
     * @param indexName
     * @param id
     * @return
     */
    public Map<String, Object> getDocById(String indexName, String id) {
        GetRequest request = GetRequest.of(g -> g.index(indexName).id(id));
        try {
            GetResponse<Map> response = client.get(request, Map.class);
            if(response.found()){
                return response.source();
            }
        } catch (IOException e) {
            e.printStackTrace();
        }finally {
            client.shutdown();
        }
        return null;
    }

    /**
     * 根据索引和_id查询数据
     * @param indexName
     * @param id
     * @return
     */
    public JSONObject getDocInfoById(String indexName, String id) {
        GetRequest request = GetRequest.of(g -> g.index(indexName).id(id));
        try {
            GetResponse<JSONObject> response = client.get(request, JSONObject.class);
            if(response.found()){
                return response.source();
            }
        } catch (IOException e) {
            e.printStackTrace();
        }finally {
            client.shutdown();
        }
        return null;
    }

    /**
     * 根据索引和_id查询数据,并过滤掉无需返回的字段
     * @param indexName
     * @param id
     * @param excludes
     * @return
     */
    public JSONObject getDocInfoById(String indexName, String id, String [] excludes) {
        GetRequest request = GetRequest.of(g -> g.index(indexName).id(id).sourceExcludes(Arrays.asList(excludes)));
        try {
            GetResponse<JSONObject> response = client.get(request, JSONObject.class);
            if(response.found()){
                return response.source();
            }
        } catch (IOException e) {
            e.printStackTrace();
        }finally {
            client.shutdown();
        }
        return null;
    }

    /**
     * 根据索引和_id查询数据,并过指定要返回的字段
     * @param indexName
     * @param id
     * @param includes
     * @return
     */
    public JSONObject getDocInfoByIdWithIncludes(String indexName, String id, String [] includes) {
        GetRequest request = GetRequest.of(g -> g.index(indexName).id(id).sourceIncludes(Arrays.asList(includes)));
        try {
            GetResponse<JSONObject> response = client.get(request, JSONObject.class);
            if(response.found()){
                return response.source();
            }
        } catch (IOException e) {
            e.printStackTrace();
        }finally {
            client.shutdown();
        }
        return null;
    }

    /**
     * 判断数据是否存在
     * @param indexName
     * @param id
     * @return
     */
    public boolean exists(String indexName, String id) {
        GetRequest request = GetRequest.of(g -> g.index(indexName).id(id));
        try {
            GetResponse<JSONObject> response = client.get(request, JSONObject.class);
            return response.found();
        } catch (IOException e) {
            e.printStackTrace();
        }finally {
            client.shutdown();
        }
        return false;
    }

    /**
     * 根据索引和_id删除数据
     * @param indexName
     * @param id
     * @return
     */
    public boolean deleteDocById(String indexName, String id) {
        DeleteRequest request = DeleteRequest.of(a -> a.index(indexName).id(id));
        try {
            DeleteResponse response = client.delete(request);
            if(response != null && response.result() != null){
                return Result.Deleted.jsonValue().equals(response.result().jsonValue());
            }
        } catch (IOException e) {
            e.printStackTrace();
        }finally {
            client.shutdown();
        }
        return false;
    }

    /**
     * 更新数据
     * @param indexName
     * @param id
     * @param newDoc
     * @return
     */
    public boolean updateDocById(String indexName, String id, JSONObject newDoc) {
        UpdateRequest request = UpdateRequest.of(r -> r.id(id).index(indexName).doc(newDoc));
        request.refresh();
        try {
            UpdateResponse response = client.update(request, JSONObject.class);
            if(response != null && response.result() != null){
                return Result.Updated.jsonValue().equals(response.result().jsonValue());
            }
        } catch (IOException e) {
            e.printStackTrace();
        }finally {
            client.shutdown();
        }
        return false;
    }

    /**
     * 对输入的text使用analyzerName进行分词,返回分词后的词项
     * @param analyzerName
     * @param text
     * @return
     */
    public List<AnalyzeToken> analyze(String analyzerName, String text){
        AnalyzeRequest analyzeRequest = new AnalyzeRequest.Builder().analyzer(analyzerName).text(text).build();
        AnalyzeResponse response = null;
        try {
            response = client.indices().analyze(analyzeRequest);
        } catch (IOException e) {
            e.printStackTrace();
        }finally {
            client.shutdown();
        }
        return response.tokens();
    }

    /**
     * 批量删除
     * @param requestList
     * @return
     */
    public boolean bulkDelete(List<DeleteRequest> requestList){
        List<BulkOperation> ops = requestList
                .stream()
                .map(req -> BulkOperation.of(op -> op
                        .delete(d -> d.id(req.id()).index(req.index()))))
                .collect(Collectors.toList());
        try {
            BulkResponse response = client.bulk(r -> r.operations(ops));
            if(response != null ){
                return true;
            }
        } catch (IOException e) {
            e.printStackTrace();
        }finally {
            client.shutdown();
        }
        return false;
    }

    /**
     * 批量更新
     * @param requestList
     * @return
     */
    public boolean bulkUpdate(List<UpdateRequest> requestList){
        List<BulkOperation> ops = requestList
                .stream()
                .map(req -> BulkOperation.of(op -> op
                        .update(d -> d.id(req.id())
                                .index(req.index())
                                .action(a -> a.doc(req.doc())))))
                .collect(Collectors.toList());
        try {
            BulkResponse response = client.bulk(r -> r.operations(ops));
            if(response != null ){
                return true;
            }
        } catch (IOException e) {
            e.printStackTrace();
        }finally {
            client.shutdown();
        }
        return false;
    }

    /**
     * 批量插入数据
     * @param requestList
     * @return
     */
    public boolean bulkInsert(List<IndexRequest> requestList){
        List<BulkOperation> ops = requestList
                .stream()
                .map(req -> BulkOperation.of(op -> op
                        .index(i -> i.document(req.document()).index(req.index()))))
                .collect(Collectors.toList());
        try {
            BulkResponse response = client.bulk(r -> r.operations(ops));
            if(response != null ){
                return true;
            }
        } catch (IOException e) {
            e.printStackTrace();
        }finally {
            client.shutdown();
        }
        return false;
    }

    /**
     * 通过脚本批量更新
     * @param index
     * @param query
     * @param script
     * @return
     */
    public boolean updateByquery(String index, BoolQuery query, String script){
        try {
            UpdateByQueryResponse response = client.updateByQuery(q -> q.index(index)
                    .query(query._toQuery())
                    .script(s -> s.source(script)));
            if(response != null ){
                return true;
            }
        } catch (IOException e) {
            e.printStackTrace();
        }finally {
            client.shutdown();
        }
        return false;
    }

    /**
     * 检索
     * @param indexName
     * @param pageNo
     * @param pageSize
     * @param sortField
     * @param sortOrder
     * @param boolQuery
     * @return
     */
    public SearchResponse search(String indexName, Integer pageNo, Integer pageSize,
                                 String sortField, SortOrder sortOrder, BoolQuery boolQuery) {
        SearchRequest request = new SearchRequest.Builder().index(indexName)
                .query(q -> q.bool(boolQuery))
                .from((pageNo - 1) * pageSize)
                .size(pageSize)
                .sort(s -> s.field(f -> f.field(sortField)
                        .order(sortOrder))).build();
        SearchResponse response = null;
        try {
            response = client.search(request, JSONObject.class);
        } catch (IOException e) {
            e.printStackTrace();
        }
        return response;
    }

    public SearchResponse search(String indexName, Integer pageNo, Integer pageSize,
                                 String sortField, SortOrder sortOrder, BoolQuery boolQuery, String[] excludes) {
        SearchRequest request = new SearchRequest.Builder().index(indexName)
                .query(q -> q.bool(boolQuery))
                .source(a -> a.filter(f -> f.excludes(Arrays.asList(excludes))))
                .from((pageNo - 1) * pageSize)
                .size(pageSize)
                .sort(s -> s.field(f -> f.field(sortField)
                        .order(sortOrder))).build();
        SearchResponse response = null;
        try {
            response = client.search(request, JSONObject.class);
        } catch (IOException e) {
            e.printStackTrace();
        }
        return response;
    }

    public SearchResponse search(String indexName, Integer pageNo, Integer pageSize, BoolQuery boolQuery,
                                 String sortField, SortOrder sortOrder, String sortField2, SortOrder sortOrder2) {
        SearchRequest request = new SearchRequest.Builder().index(indexName)
                .query(q -> q.bool(boolQuery))
                .from((pageNo - 1) * pageSize)
                .size(pageSize)
                .sort(s -> s.field(f -> f.field(sortField)
                        .order(sortOrder).field(sortField2).order(sortOrder2))).build();
        SearchResponse response = null;
        try {
            response = client.search(request, JSONObject.class);
        } catch (IOException e) {
            e.printStackTrace();
        }
        return response;
    }

    public SearchResponse search(String indexName, Integer pageNo, Integer pageSize, BoolQuery boolQuery,
                                 String sortField, SortOrder sortOrder, String sortField2, SortOrder sortOrder2,
                                 String[] excludes) {
        SearchRequest request = new SearchRequest.Builder().index(indexName)
                .query(q -> q.bool(boolQuery))
                .source(a -> a.filter(f -> f.excludes(Arrays.asList(excludes))))
                .from((pageNo - 1) * pageSize)
                .size(pageSize)
                .sort(s -> s.field(f -> f.field(sortField)
                        .order(sortOrder).field(sortField2).order(sortOrder2))).build();
        SearchResponse response = null;
        try {
            response = client.search(request, JSONObject.class);
        } catch (IOException e) {
            e.printStackTrace();
        }
        return response;
    }

    public SearchResponse search(String indexName, Integer pageNo, Integer pageSize, BoolQuery boolQuery,
                                 String sortField, SortOrder sortOrder, String[] includes, String[] excludes) {
        SearchRequest request = new SearchRequest.Builder().index(indexName)
                .query(q -> q.bool(boolQuery))
                .source(a -> a.filter(f -> f.excludes(Arrays.asList(excludes)).includes(Arrays.asList(includes))))
                .from((pageNo - 1) * pageSize)
                .size(pageSize)
                .sort(s -> s.field(f -> f.field(sortField)
                        .order(sortOrder))).build();
        SearchResponse response = null;
        try {
            response = client.search(request, JSONObject.class);
        } catch (IOException e) {
            e.printStackTrace();
        }
        return response;
    }

    public SearchResponse search(String indexName, Integer pageNo, Integer pageSize,  BoolQuery boolQuery,
                                 String sortField, SortOrder sortOrder, String time) {
        SearchRequest request = new SearchRequest.Builder().index(indexName)
                .query(q -> q.bool(boolQuery))
                .from((pageNo - 1) * pageSize)
                .size(pageSize)
                .scroll(new Time.Builder().time(time).build())
                .sort(s -> s.field(f -> f.field(sortField)
                        .order(sortOrder))).build();
        SearchResponse response = null;
        try {
            response = client.search(request, JSONObject.class);
        } catch (IOException e) {
            e.printStackTrace();
        }
        return response;
    }

    /**
     * 查询符合条件的数据条数
     * @param indexName
     * @param boolQuery
     * @return
     */
    public CountResponse count(String indexName, BoolQuery boolQuery) {
        try {
            return client.count(c -> c.index(indexName).query(q -> q.bool(boolQuery)));
        } catch (IOException e) {
            e.printStackTrace();
        }
        return null;
    }

    public SearchResponse search(String indexName, BoolQuery boolQuery) {
        SearchRequest request = new SearchRequest.Builder().index(indexName)
                .query(q -> q.bool(boolQuery))
                .build();
        SearchResponse response = null;
        try {
            response = client.search(request, JSONObject.class);
        } catch (IOException e) {
            e.printStackTrace();
        }
        return response;
    }

    public SearchResponse search(String indexName, BoolQuery boolQuery, int size) {
        SearchRequest request = new SearchRequest.Builder().index(indexName)
                .query(q -> q.bool(boolQuery))
                .size(size)
                .build();
        SearchResponse response = null;
        try {
            response = client.search(request, JSONObject.class);
        } catch (IOException e) {
            e.printStackTrace();
        }
        return response;
    }
}

如果本文对你有帮助,请点赞、收藏 + 关注,谢谢!!(本文将持续更新)


http://www.niftyadmin.cn/n/5817950.html

相关文章

Node.js JXcore 打包教程

Node.js JXcore 打包教程 介绍 Node.js 是一个基于 Chrome V8 引擎的 JavaScript 运行时环境,它允许开发者使用 JavaScript 编写服务器端和网络应用程序。JXcore 是一个流行的 Node.js 发行版,它支持将 Node.js 应用程序打包成单一的可执行文件,使得部署和分发变得更加容易…

uni app 写了一个连连看

使用uni app 写了个连连看&#xff0c;还没有路径展示&#xff0c;由于最近工作比较忙&#xff0c;没有时间取完善。大佬看到了可以改改加上路径然后写到评论或者开源一下。&#xff08;绘制的有坐标。注掉坐标代码即可&#xff09;不喜勿喷&#xff0c;欢迎大佬完善 <temp…

穷举vs暴搜vs深搜vs回溯vs剪枝系列一>组合总和

题目&#xff1a; 方法一&#xff1a; 解析&#xff1a; 代码&#xff1a; private List<List<Integer>> ret;private List<Integer> path;private int aim; public List<List<Integer>> combinationSum(int[] candidates, int target) {aim …

HTML - <link>

1.简介 <link>标签主要用于将当前网页与相关的外部资源联系起来&#xff0c;通常放在<head>元素里面。最常见的用途就是加载 CSS 样式表。 <link rel"stylesheet" type"text/css" href"theme.css">上面代码为网页加载样式表…

创建基本的 Electron 应用项目的详细步骤

创建一个基本的 Electron 应用项目的详细步骤。我们将从安装 Node.js 开始&#xff0c;然后创建项目文件夹并初始化 Electron 项目。 1. 安装 Node.js 首先&#xff0c;确保你已经安装了 Node.js 和 npm。你可以在终端中运行以下命令来检查是否已经安装&#xff1a; node -v…

微软发布AIOpsLab:一个开源的全面AI框架,用于AIOps代理

在当今这个云计算技术迅猛发展的时代&#xff0c;企业面临着前所未有的挑战与机遇。随着云基础设施的日益复杂化&#xff0c;它们成为了企业运营不可或缺的支柱。网站可靠性工程师&#xff08;Site Reliability Engineers&#xff0c;简称SRE&#xff09;和DevOps团队肩负着关键…

Go语言中的接收器(Receiver)详解

在 Go 语言中&#xff0c;接收器&#xff08;Receiver&#xff09; 是指在方法声明中与方法绑定的对象。它是 Go 语言实现面向对象编程&#xff08;OOP&#xff09;特性的核心之一。接收器的作用是将方法绑定到某个类型的实例&#xff08;值或者指针&#xff09;&#xff0c;让…

STM32-笔记39-SPI-W25Q128

一、什么是SPI&#xff1f; SPI是串行外设接口&#xff08;Serial Peripheral Interface&#xff09;的缩写&#xff0c;是一种高速的&#xff0c;全双工&#xff0c;同步的通信总线&#xff0c;并且 在芯片的管脚上只占用四根线&#xff0c;节约了芯片的管脚&#xff0c;同时为…