The Go Client for Elasticsearch

Karel Minařík

$ whoami

2

Go and Elastic.co

Application Performance Metrics

3

~

Source code for this talk:

4

Ruby → Golang ﹦ ⁈

“Writing good code has much in common with writing good English”

~ Kernighan/Pike, The Practice of Programming

5

Goals for an official Elasticsearch client

1. Be consistent with other official clients
2. Be idiomatic for language developers

Contradicting goals? Indeed… :)

6

7

Divide and Conquer

Goals:

8

Package structure and naming

1. elastic/go-elasticsearch
2. elastic/go-elasticsearch/esapi
3. elastic/go-elasticsearch/estransport
4. elastic/go-elasticsearch/esutil

type Interface interface {
  Perform(*http.Request) (*http.Response, error)
}
9

Usage: Initialization

import "github.com/elastic/go-elasticsearch"
    // Defaults
    //
    // Note: the ELASTICSEARCH_URL environment variable is used when exported
    //
    es, err := elasticsearch.NewDefaultClient()
    // Configuration
    //
    cfg := elasticsearch.Config{
        Addresses: []string{"https://example.com"},
        Username:  "foo",
        Password:  "bar",
    }

    es, err = elasticsearch.NewClient(cfg)
10

Usage: Calling APIs

es, _ := elasticsearch.NewDefaultClient()
// +build ignore

package main

import (
	"log"
	"os"
)

// START1 OMIT
import "github.com/elastic/go-elasticsearch"

// END1 OMIT

func main() {
	log.SetFlags(0)
	log.SetOutput(os.Stdout)

	// START2 OMIT
	// Defaults
	//
	// Note: the ELASTICSEARCH_URL environment variable is used when exported
	//
	es, err := elasticsearch.NewDefaultClient() // HL
	// END2 OMIT

	// START3 OMIT
	// Configuration
	//
	cfg := elasticsearch.Config{
		Addresses: []string{"https://example.com"},
		Username:  "foo",
		Password:  "bar",
	}

	es, err = elasticsearch.NewClient(cfg) // HL
	// END3 OMIT

	es, err = elasticsearch.NewDefaultClient()

    res, err := es.Cluster.Health(
        es.Cluster.Health.WithLevel("indices"),
        es.Cluster.Health.WithPretty(),
    )
    if err != nil {
        log.Fatalf("ERROR: %s", err)
    }
    defer res.Body.Close()
    log.Println(res)
}
11

Implementation: Methods

package esapi

type ClusterHealth func(o ...func(*ClusterHealthRequest)) (*Response, error)

func (f ClusterHealth) WithLevel(v string) func(*ClusterHealthRequest) {
    return func(r *ClusterHealthRequest) {
        r.Level = v
    }
}

// WithContext(), WithMasterTimeout(), WithIndex(), WithWaitForActiveShards(), ...
12

Implementation: Request Structs

package esapi

type ClusterHealthRequest struct {
    Context context.Context

    Level         string
    Local         bool
    MasterTimeout time.Duration
    // ...

    Pretty bool
}

func (r ClusterHealthRequest) Do(ctx context.Context, transport Transport) (*Response, error) {
    // ...
    req, _ := newRequest(method, path.String(), nil)
    // ...
    res, err := transport.Perform(req)
    // ...
    response := Response{ ... }
    return &response, nil
}
13

Implementation: Constructors

package esapi

func newClusterHealthFunc(t Transport) ClusterHealth {
    return func(o ...func(*ClusterHealthRequest)) (*Response, error) {
        var r = ClusterHealthRequest{}
        for _, f := range o {
            f(&r)
        }
        return r.Do(t)
    }
}

func New(t Transport) *API {
    return &API{
        Cluster: &Cluster{
            Health: newClusterHealthFunc(t),
        },

        // ...
    }
}
https://github.com/elastic/go-elasticsearch/blob/master/esapi/api._.go
14

Code Generation: API

{
  "cluster.health":{
    "documentation":{
      "url":"https://www.elastic.co/guide/en/elasticsearch/reference/master/cluster-health.html",
      "description":"Returns basic information about the health of the cluster."
    },
    "stability":"stable",
    "url":{
      "paths":[
        {
          "path":"/_cluster/health",
          "methods":[
            "GET"
          ]
        },
        {
          "path":"/_cluster/health/{index}",
          "methods":[
            "GET"
          ],
          "parts":{
            "index":{
              "type":"list",
              "description":"Limit the information returned to a specific index"
            }
          }
        }
      ]
    },
    "params":{
      "level":{
        "type":"enum",
        "options":[
          "cluster",
          "indices",
          "shards"
        ],
        "default":"cluster",
        "description":"Specify the level of detail for returned information"
      },
      "local":{
        "type":"boolean",
        "description":"Return local information, do not retrieve the state from master node (default: false)"
      },
      "master_timeout":{
        "type":"time",
        "description":"Explicit operation timeout for connection to master node"
      },
      // ...
  }
}
https://github.com/elastic/elasticsearch/blob/master/rest-api-spec
15

Code Generation: Tests

---
"cluster health basic test":
  - do:
      cluster.health: {}

  - is_true:   cluster_name
  - is_false:  timed_out
  - gte:       { number_of_nodes:         1 }
  - gte:       { number_of_data_nodes:    1 }
  - match:     { active_primary_shards:   0 }
  - match:     { active_shards:           0 }
  - match:     { relocating_shards:       0 }
  - match:     { initializing_shards:     0 }
  - match:     { unassigned_shards:       0 }
  - gte:       { number_of_pending_tasks: 0 }

---
"cluster health basic test, one index":
  - do:
      indices.create:
        index: test_index
        body:
          settings:
            index:
              number_of_replicas: 0

  - do:
      cluster.health:
        wait_for_status: green
        wait_for_no_relocating_shards: true

  - is_true:   cluster_name
  - is_false:  timed_out
  - gte:       { number_of_nodes:         1 }
  - gte:       { number_of_data_nodes:    1 }
  - gt:        { active_primary_shards:   0 }
  - gt:        { active_shards:           0 }
  - gte:       { relocating_shards:       0 }
  - match:     { initializing_shards:     0 }
  - match:     { unassigned_shards:       0 }
  - gte:       { number_of_pending_tasks: 0 }

---
"cluster health basic test, one index with wait for active shards":
  - do:
      indices.create:
        index: test_index
        body:
          settings:
            index:
              number_of_replicas: 0

  - do:
      cluster.health:
        wait_for_active_shards: 1
        wait_for_no_relocating_shards: true

  - is_true:   cluster_name
  - is_false:  timed_out
  - gte:       { number_of_nodes:         1 }
  - gte:       { number_of_data_nodes:    1 }
  - gt:        { active_primary_shards:   0 }
  - gt:        { active_shards:           0 }
  - gte:       { relocating_shards:       0 }
  - match:     { initializing_shards:     0 }
  - match:     { unassigned_shards:       0 }
  - gte:       { number_of_pending_tasks: 0 }

---
"cluster health basic test, one index with wait for all active shards":
  - do:
      indices.create:
        index: test_index
        body:
          settings:
            index:
              number_of_replicas: 0

  - do:
      cluster.health:
        wait_for_active_shards: all
        wait_for_no_relocating_shards: true

  - is_true:   cluster_name
  - is_false:  timed_out
  - gte:       { number_of_nodes:         1 }
  - gte:       { number_of_data_nodes:    1 }
  - gt:        { active_primary_shards:   0 }
  - gt:        { active_shards:           0 }
  - gte:       { relocating_shards:       0 }
  - match:     { initializing_shards:     0 }
  - match:     { unassigned_shards:       0 }
  - gte:       { number_of_pending_tasks: 0 }

---
"cluster health basic test, one index with wait for no initializing shards":

  - do:
      indices.create:
        index: test_index
        wait_for_active_shards: 0
        body:
          settings:
            index:
              number_of_replicas: 0

  - do:
      cluster.health:
        wait_for_no_initializing_shards: true

  - match: { initializing_shards: 0 }

---
"cluster health levels":
  - do:
      indices.create:
        index: test_index
  - do:
      cluster.health:
        level: indices

  - is_true: indices
  - is_false: indices.test_index.shards

  - do:
        cluster.health:
          level: shards

  - is_true: indices
  - is_true: indices.test_index.shards

---
"cluster health with closed index (pre 7.2.0)":
  - skip:
      version: "7.2.0 - "
      reason:  "closed indices are replicated starting version 7.2.0"

  - do:
      indices.create:
        index: index-1
        body:
          settings:
            index:
              number_of_replicas: 0

  - do:
      cluster.health:
        wait_for_status: green
  - match:     { status:     green }

  - do:
      indices.create:
        index: index-2
        body:
          settings:
            index:
              number_of_replicas: 50

  - do:
      cluster.health:
        wait_for_status: yellow
        wait_for_no_relocating_shards: true
  - match:     { status:     yellow }

  - do:
      cluster.health:
        index: index-*
  - match:     { status:     yellow }

  - do:
      cluster.health:
        index: index-1
  - match:     { status:     green }

  - do:
      cluster.health:
        index: index-2
  - match:     { status:     yellow }

  - do:
      indices.close:
        index: index-2
  - is_true: acknowledged

  # closing the index-2 turns the cluster health back to green
  - do:
      cluster.health:
        wait_for_status: green
  - match:     { status:     green }

  - do:
      cluster.health:
        index: index-*
  - match:     { status:     green }

  - do:
      cluster.health:
        index: index-1
  - match:     { status:     green }

  - do:
      cluster.health:
        index: index-2
  - match:     { status:     green }

---
"cluster health with closed index":
  - skip:
      version: " - 7.1.99"
      reason:  "closed indices are replicated starting version 7.2.0"

  - do:
      indices.create:
        index: index-1
        body:
          settings:
            index:
              number_of_replicas: 0

  - do:
      cluster.health:
        wait_for_status: green
  - match:     { status:     green }

  - do:
      indices.create:
        index: index-2
        body:
          settings:
            index:
              number_of_replicas: 50

  - do:
      cluster.health:
        wait_for_status: yellow
        wait_for_no_relocating_shards: true
  - match:     { status:     yellow }

  - do:
      cluster.health:
        index: index-*
  - match:     { status:     yellow }

  - do:
      cluster.health:
        index: index-1
  - match:     { status:     green }

  - do:
      cluster.health:
        index: index-2
  - match:     { status:     yellow }

  # closing the index-2 does not change the cluster health with replicated closed indices
  - do:
      indices.close:
        index: index-2
  - is_true: acknowledged

  - do:
      cluster.health:
        wait_for_status: yellow
  - match:     { status:     yellow }

  - do:
      cluster.health:
        index: index-*
  - match:     { status:     yellow }

  - do:
      cluster.health:
        index: index-1
  - match:     { status:     green }

  - do:
      cluster.health:
        index: index-2
  - match:     { status:     yellow }
https://github.com/elastic/elasticsearch/blob/master/rest-api-spec
16

Continuous Integration

17

Documentation

$ go doc go-elasticsearch/esapi ClusterHealth
18

Usage: Method arguments and request bodies

type Index func(index string, body io.Reader, o ...func(*IndexRequest)) (*Response, error)
// +build ignore

package main

import (
	"log"
	"os"
	"strings"

	"github.com/elastic/go-elasticsearch"
)

func main() {
	log.SetFlags(0)
	log.SetOutput(os.Stdout)

	es, err := elasticsearch.NewDefaultClient()
	if err != nil {
		log.Fatalf("ERROR: %s", err)
	}

    res, err := es.Index(
        "test",
        strings.NewReader(`{"foo":"bar"}`),
        es.Index.WithDocumentID("1"),
        es.Index.WithPretty(),
    )
    if err != nil {
        log.Fatalf("ERROR: %s", err)
    }
    defer res.Body.Close()
    log.Println(res)
}
19

Usage: Request Structs

import (
    "github.com/elastic/go-elasticsearch"
    "github.com/elastic/go-elasticsearch/esapi"
)
// +build ignore

package main

import (
	"context"
	"log"
	"os"
	"strings"
)

// START1 OMIT
import (
	"github.com/elastic/go-elasticsearch"
	"github.com/elastic/go-elasticsearch/esapi" // HL
)

// END1 OMIT

func main() {
	log.SetFlags(0)
	log.SetOutput(os.Stdout)

	es, err := elasticsearch.NewDefaultClient()
	if err != nil {
		log.Fatalf("ERROR: %s", err)
	}

    req := esapi.IndexRequest{
        Index:      "test",
        DocumentID: "1",
        Body:       strings.NewReader(`{"foo":"bar"}`),
        Pretty:     true,
    }

    res, err := req.Do(context.Background(), es)
    if err != nil {
        log.Fatalf("Error getting response: %s", err)
    }
    defer res.Body.Close()
    log.Println(res)
}
20

Usage: JSON Decoding

import "encoding/json"
import "github.com/tidwall/gjson"
    var d map[string]interface{}
    err = json.NewDecoder(res.Body).Decode(&d)
    if err != nil {
        log.Fatalf("Error parsing the response: %s", err)
    }

    log.Println("encoding/json:", d["tagline"])
// +build ignore

package main

import (
	"bytes"
	"log"
	"os"

	"github.com/elastic/go-elasticsearch"
)

// START1 OMIT
import "encoding/json"
import "github.com/tidwall/gjson"

// END1 OMIT

func main() {
	log.SetFlags(0)
	log.SetOutput(os.Stdout)

	es, err := elasticsearch.NewDefaultClient()
	if err != nil {
		log.Fatalf("ERROR: %s", err)
	}

	res, _ := es.Info()
	defer res.Body.Close()

	// START2 OMIT
	var d map[string]interface{}               // HL
	err = json.NewDecoder(res.Body).Decode(&d) // HL
	if err != nil {
		log.Fatalf("Error parsing the response: %s", err)
	}

	log.Println("encoding/json:", d["tagline"]) // HL
	// END2 OMIT

	res, _ = es.Info()
	defer res.Body.Close()

    var b bytes.Buffer
    b.ReadFrom(res.Body)

    log.Println("tidwall/gjson:", gjson.GetBytes(b.Bytes(), "tagline"))
}
21

Usage: JSON Encoding

import "github.com/elastic/go-elasticsearch/esutil"
    res, _ = es.Index(
        "test",
        esutil.NewJSONReader(map[string]interface{}{"foo": "bar"}),
    )
// +build ignore

package main

import (
	"log"
	"os"

	"github.com/elastic/go-elasticsearch"
	"github.com/elastic/go-elasticsearch/esapi"
)

// START1 OMIT
import "github.com/elastic/go-elasticsearch/esutil"

// END1 OMIT

func main() {
	log.SetFlags(0)
	log.SetOutput(os.Stdout)

	var (
		res *esapi.Response
		err error
	)

	es, err := elasticsearch.NewDefaultClient()
	if err != nil {
		log.Fatalf("ERROR: %s", err)
	}

	// START2 OMIT
	res, _ = es.Index(
		"test",
		esutil.NewJSONReader(map[string]interface{}{"foo": "bar"}), // HL
	)
	// END2 OMIT

	defer res.Body.Close()
	log.Println(res)

    type Doc struct {
        Title string `json:"title"`
    }

    doc := Doc{Title: "Test"}

    res, _ = es.Index(
        "test",
        esutil.NewJSONReader(&doc),
    )

	defer res.Body.Close()
	log.Println(res)
}
22

Examples! Examples! Examples!

23

Examples! Examples! Examples!

24

Thank You

 Questions, please!

Karel Minařík