Elasticsearch Java API Client最新客户端操作详解

Lou.Chen2023年1月11日
大约 8 分钟

前言

ES废弃和推荐使用的API

ElasticSearch 7.17开始,ES作废Java REST Client 也就是High Level REST ClientJava Transport Client

ElasticSearch官方目前推荐使用Java API Client for Elasticsearch,它能为所有 Elasticsearch APIs 提供强类型的请求和响应

什么是强类型的请求和响应

所有的 Elasticsearch APIs 本质上都是一个 RESTful 风格的 HTTP 请求,所以当我们调用这些Elasticsearch APIs的时候,可以就当成普通的 HTTP 接口来对待

例如使用 HttpUrlConnection 或者 RestTemplate 等工具来直接调用,如果使用这些工具直接调用,就需要我们自己组装 JSON 参数,然后自己解析服务端返回的 JSON。

强类型的请求和响应则是系统把请求参数封装成一个对象了,我们调用对象中的方法去设置就可以了,不需要自己手动拼接 JSON 参数了,请求的结果系统也会封装成一个对象,不需要自己手动去解析 JSON参数了

ElasticSearch Java API Client 简介

⚠️ ES ElasticSearch Java API Client 版本必须对应。

**官方文档地址:**https://www.elastic.co/guide/en/elasticsearch/client/java-api-client/8.5/introduction.htmlopen in new window 这里引用的版本为8.5

**ES下载地址:**https://www.elastic.co/cn/elasticsearch/open in new window 这里下载的ElasticSearch版本为8.5.2

  • 下载后解压即可

  • elasticsearch.yml配置文件:

    # Enable security features
    xpack.security.enabled: false
    
    xpack.security.enrollment.enabled: false
    
    # Enable encryption for HTTP API client connections, such as Kibana, Logstash, and Agents
    xpack.security.http.ssl:
      enabled: false
      keystore.path: certs/http.p12
    
    # Enable encryption and mutual authentication between cluster nodes
    xpack.security.transport.ssl:
      enabled: false
      verification_mode: certificate
      keystore.path: certs/transport.p12
      truststore.path: certs/transport.p12
    # Create a new cluster with the current node only
    # Additional nodes can still join the cluster later
    # cluster.initial_master_nodes: ["master"]
    
    #----------------------- END SECURITY AUTO CONFIGURATION -------------------------
    
    
    #节点名称
    node.name: master
    #集群名称
    cluster.name: lou-es
    #支持跨域
    http.cors.enabled: true
    #允许跨域地址所有
    http.cors.allow-origin: "*"
    #允许其他访问的地址 0.0.0.0 允许所有地址访问
    network.host: 127.0.0.1
    #设置在集群中的所有节点名称,这个节点名称就是之前所修改的,当然你也可以采用默认的也行,目前是单机,放入一个节点即可
    cluster.initial_master_nodes: ["master"]
    

特点:

  • 为所有 Elasticsearch APIs 提供强类型的请求和响应。
  • 所有 API 都有阻塞和异步版本。
  • 使用构建器模式,在创建复杂的嵌套结构时,可以编写简洁而可读的代码。
  • 通过使用对象映射器(如 Jackson 或任何实现了 JSON-B 的解析器),实现应用程序类的无缝集成。
  • 将协议处理委托给一个 http 客户端,如 Java Low Level REST Client,它负责所有传输级的问题。HTTP 连接池、重试、节点发现等等由它去完成。

兼容性:

  • Elasticsearch Java 客户端是向前兼容的,即该客户端支持与 Elasticsearch 的更大或相等的次要版本进行通信。
  • Elasticsearch Java 客户端只向后兼容默认的发行版本,并且没有做出保证。

使用:

<dependency>
  <groupId>co.elastic.clients</groupId>
  <artifactId>elasticsearch-java</artifactId>
  <version>8.5.2</version>
</dependency>

<dependency>
  <groupId>com.fasterxml.jackson.core</groupId>
  <artifactId>jackson-databind</artifactId>
  <version>2.12.3</version>
</dependency>

如果是 Spring Boot 项目,就不用添加第二个依赖了,因为 Spring Boot 的 Web 中默认已经加了这个依赖了,但是 Spring Boot 一般需要额外添加下面这个依赖,出现这个原因是由于从 JavaEE 过渡到 JakartaEE 时衍生出来的一些问题

<dependency>
  <groupId>jakarta.json</groupId>
  <artifactId>jakarta.json-api</artifactId>
  <version>2.0.1</version>
</dependency>

ElasticSearch Java API Client 使用

建立连接

同步客户端

  • 首先创建一个low-level client也就是一个低级客户端
  • 接下来创建一个通信 Transport,并利用 JacksonJsonpMapper 做数据的解析
  • 最后创建一个阻塞的 Java 客户端
        //创建 low-level client
        RestClient restClient = RestClient.builder(
                new HttpHost("localhost", 9200)).build();

        //创建一个Transport通信 和 一个JacksonJsonpMapper序列化实例
        ElasticsearchTransport transport = new RestClientTransport(
                restClient, new JacksonJsonpMapper());

        //得到一个es同步客户端
        ElasticsearchClient client = new ElasticsearchClient(transport);

异步客户端

        //创建 low-level client
        RestClient restClient = RestClient.builder(
                new HttpHost("localhost", 9200)).build();

        //创建一个Transport通信 和 一个JacksonJsonpMapper序列化实例
        ElasticsearchTransport transport = new RestClientTransport(
                restClient, new JacksonJsonpMapper());

        //得到一个es异步的客户端
        ElasticsearchAsyncClient client = new ElasticsearchAsyncClient(transport);

只有第三步和前面的不一样,其他都一样。

利用阻塞的 Java 客户端操作 Es 的时候会发生阻塞,也就是必须等到 Es 给出响应之后,代码才会继续执行;非阻塞的 Java 客户端则不会阻塞后面的代码执行,非阻塞的 Java 客户端一般通过回调函数处理请求的响应值。

建立HTTPS连接

https://www.elastic.co/guide/en/elasticsearch/client/java-api-client/8.5/connecting.html#_using_a_secure_connectionopen in new window

创建索引

Elasticsearch Java API Client 中最大的特色就是建造者模式+Lambda 表达式

  • 创建blog索引
  • 创建settings信息:分片数2,副本数为3
  • 创建字段映射信息:
    • title字段为text类型,分词器为ik_max_word
    • publish字段为keyword类型
    • date字段为date类型,并指定格式化
  • 创建别名my_blog
public class JavaApiClient {
    public static void main(String[] args) throws IOException {
        //创建 low-level client
        RestClient restClient = RestClient.builder(
                new HttpHost("localhost", 9200)).build();

        //创建一个Transport通信 和 一个JacksonJsonpMapper序列化实例
        ElasticsearchTransport transport = new RestClientTransport(
                restClient, new JacksonJsonpMapper());

        //得到一个es客户端
        ElasticsearchClient client = new ElasticsearchClient(transport);

        CreateIndexResponse createIndexResponse = client.indices().create(i -> i
                .index("blog")
                .settings(s -> s
                        .numberOfShards("2")
                        .numberOfReplicas("3")
                )
                .mappings(m -> m
                        .properties("title", t -> t.text(tt -> tt.analyzer("ik_max_word")))
                        .properties("publish", p -> p.keyword(pp -> pp.index(true)))
                        .properties("date",d->d.date(dd->dd.format("yyyy-MM-dd")))
                )
                .aliases("my_blog", aa -> aa.isWriteIndex(false))
        );
        System.out.println("createIndexResponse.acknowledged() = " + createIndexResponse.acknowledged());
        System.out.println("createIndexResponse.shardsAcknowledged() = " + createIndexResponse.shardsAcknowledged());
        System.out.println("createIndexResponse.index() = " + createIndexResponse.index());
      
        restClient.close();
    }
}

等同于:

PUT blog
{
  "settings": {
    "number_of_shards": 2,
    "number_of_replicas": 3
  },
  "mappings": {
    "properties": {
      "title":{
        "type": "text",
        "analyzer": "ik_max_word"
      },
      "publish":{
        "type": "keyword"
      },
      "date":{
        "type": "date",
        "format": "yyyy-MM-dd"
      }
    }
  },
  "aliases": {
    "my_blog": {
      "is_write_index": false
    }
  }
}

直接通过JSON创建

public class JavaApiClient {
    public static void main(String[] args) throws IOException {
        //创建 low-level client
        RestClient restClient = RestClient.builder(
                new HttpHost("localhost", 9200)).build();

        //创建一个Transport通信 和 一个JacksonJsonpMapper序列化实例
        ElasticsearchTransport transport = new RestClientTransport(
                restClient, new JacksonJsonpMapper());

        //得到一个es客户端
        ElasticsearchClient client = new ElasticsearchClient(transport);

        //直接使用json
        StringReader input = new StringReader("{\n" +
                "  \"settings\": {\n" +
                "    \"number_of_shards\": 2,\n" +
                "    \"number_of_replicas\": 3\n" +
                "  },\n" +
                "  \"mappings\": {\n" +
                "    \"properties\": {\n" +
                "      \"title\":{\n" +
                "        \"type\": \"text\",\n" +
                "        \"analyzer\": \"ik_max_word\"\n" +
                "      },\n" +
                "      \"publish\":{\n" +
                "        \"type\": \"keyword\"\n" +
                "      },\n" +
                "      \"date\":{\n" +
                "        \"type\": \"date\",\n" +
                "        \"format\": \"yyyy-MM-dd\"\n" +
                "      }\n" +
                "    }\n" +
                "  },\n" +
                "  \"aliases\": {\n" +
                "    \"my_blog\": {\n" +
                "      \"is_write_index\": false\n" +
                "    }\n" +
                "  }\n" +
                "}");

        CreateIndexResponse createIndexResponse = client.indices().create(i-> i.index("blog").withJson(input));
        System.out.println("createIndexResponse.acknowledged() = " + createIndexResponse.acknowledged());
        System.out.println("createIndexResponse.shardsAcknowledged() = " + createIndexResponse.shardsAcknowledged());
        System.out.println("createIndexResponse.index() = " + createIndexResponse.index());

        restClient.close();
    }
}

删除索引

public class JavaApiClient {
    public static void main(String[] args) throws IOException {
        //创建 low-level client
        RestClient restClient = RestClient.builder(
                new HttpHost("localhost", 9200)).build();

        //创建一个Transport通信 和 一个JacksonJsonpMapper序列化实例
        ElasticsearchTransport transport = new RestClientTransport(
                restClient, new JacksonJsonpMapper());

        //得到一个es客户端
        ElasticsearchClient client = new ElasticsearchClient(transport);

        DeleteIndexResponse deleteIndexResponse = client.indices().delete(i -> i.index("blog"));
        System.out.println("createIndexResponse.acknowledged() = " + deleteIndexResponse.acknowledged());

        restClient.close();
    }
}

添加/修改文档

public class JavaApiClient1 {
    public static void main(String[] args) throws IOException {
        //创建 low-level client
        RestClient restClient = RestClient.builder(
                new HttpHost("localhost", 9200)).build();

        //创建一个Transport通信 和 一个JacksonJsonpMapper序列化实例
        ElasticsearchTransport transport = new RestClientTransport(
                restClient, new JacksonJsonpMapper());

        //得到一个es客户端
        ElasticsearchClient client = new ElasticsearchClient(transport);

        Book blog = new Book("三国演义", "罗贯中", "2022-01-11");
        IndexResponse response = client.index(i -> i.index("books").document(blog).id("1"));

        System.out.println("response.result() = " + response.result());
        System.out.println("response.id() = " + response.id());
        System.out.println("response.seqNo() = " + response.seqNo());
        System.out.println("response.index() = " + response.index());
        System.out.println("response.shards() = " + response.shards());

        restClient.close();
    }
}
public class Book {
    private String title;
    private String publish;
    private String date;
    //getter setter
}

删除文档

  • 异步删除books索引中id1的文档
  • whenComplete 方法处理回调就行了,里边有两个参数,一个是正常情况下返回的对象,另外一个则是出错时候的异常。
public class JavaApiClient1 {
    public static void main(String[] args) throws IOException {
        //创建 low-level client
        RestClient restClient = RestClient.builder(
                new HttpHost("localhost", 9200)).build();

        //创建一个Transport通信 和 一个JacksonJsonpMapper序列化实例
        ElasticsearchTransport transport = new RestClientTransport(
                restClient, new JacksonJsonpMapper());

        //得到一个es客户端
        ElasticsearchAsyncClient client = new ElasticsearchAsyncClient(transport);

        client.delete(i -> i.index("books").id("1")).whenComplete((success,failure)->{
            System.out.println(success.index());
            System.out.println(success.version());
        });

        restClient.close();
    }
}

查询文档

  • 查询books索引
  • 指定查询条件为:query查询中match匹配title字段值为三国演义 并且 query查询中term匹配author字段值为
public class JavaApiClient1 {
    public static void main(String[] args) throws IOException {
        //创建 low-level client
        RestClient restClient = RestClient.builder(
                new HttpHost("localhost", 9200)).build();

        //创建一个Transport通信 和 一个JacksonJsonpMapper序列化实例
        ElasticsearchTransport transport = new RestClientTransport(
                restClient, new JacksonJsonpMapper());

        //得到一个es客户端
        ElasticsearchClient client = new ElasticsearchClient(transport);

        //构造查询条件
        SearchRequest searchRequest = SearchRequest.of(s -> s.index("books")
                .query(q -> q.match(m -> m.field("title").query("三国演义")))
                .query(q -> q.term(t -> t.field("author").value("罗")))
        );

        //处理响应结果
        SearchResponse<Book> response = client.search(searchRequest, Book.class);
        System.out.println("response.toString() = " + response.toString());
        //最大分数
        System.out.println(response.maxScore());
        //分片数
        System.out.println(response.shards());
        //是否超时
        System.out.println(response.timedOut());


        //拿到匹配的数据
        HitsMetadata<Book> hitsMetadata = response.hits();
        //得到总数
        System.out.println(hitsMetadata.total());
        //拿到hits命中的数据
        List<Hit<Book>> hits = hitsMetadata.hits();
        for (Hit<Book> hit : hits) {
            //拿到_source中的数据
            System.out.println(hit.source());
            System.out.println(hit.index());
            System.out.println(hit.id());
            System.out.println(hit.score());
        }

        restClient.close();
    }
}

public class Book {
    private String title;
    private String author;
    //getter setter
}

直接通过JSON查询

public class JavaApiClient1 {
    public static void main(String[] args) throws IOException {
        //创建 low-level client
        RestClient restClient = RestClient.builder(
                new HttpHost("localhost", 9200)).build();

        //创建一个Transport通信 和 一个JacksonJsonpMapper序列化实例
        ElasticsearchTransport transport = new RestClientTransport(
                restClient, new JacksonJsonpMapper());

        //得到一个es客户端
        ElasticsearchClient client = new ElasticsearchClient(transport);

        StringReader stringReader = new StringReader("{\n" +
                "  \"query\": {\n" +
                "    \"bool\": {\n" +
                "      \"must\": [\n" +
                "        {\n" +
                "          \"match\": {\n" +
                "            \"title\": \"三国演义\"\n" +
                "          }\n" +
                "        },\n" +
                "        {\n" +
                "          \"term\": {\n" +
                "            \"title\": {\n" +
                "              \"value\": \"三\"\n" +
                "            }\n" +
                "          }\n" +
                "        }\n" +
                "      ]\n" +
                "    }\n" +
                "  }\n" +
                "}");

        //构造查询条件
        SearchRequest searchRequest = SearchRequest.of(t-> t.withJson(stringReader));

        //处理响应结果
        SearchResponse<Book> response = client.search(searchRequest, Book.class);
        System.out.println("response.toString() = " + response.toString());
        //最大分数
        System.out.println(response.maxScore());
        //分片数
        System.out.println(response.shards());
        //是否超时
        System.out.println(response.timedOut());


        //拿到匹配的数据
        HitsMetadata<Book> hitsMetadata = response.hits();
        //得到总数
        System.out.println(hitsMetadata.total());
        //拿到hits命中的数据
        List<Hit<Book>> hits = hitsMetadata.hits();
        for (Hit<Book> hit : hits) {
            //拿到_source中的数据
            System.out.println(hit.source());
            System.out.println(hit.index());
            System.out.println(hit.id());
            System.out.println(hit.score());
        }

        restClient.close();
    }
}