The Go Client for Elasticsearch
Karel Minařík
Karel Minařík
Application Performance Metrics
github.com/elastic/apm-agent-go
3Source code for this talk:
github.com/karmi/gotalks/tree/master/2019/go-elasticsearch
4“Writing good code has much in common with writing good English”
~ Kernighan/Pike, The Practice of Programming
5
1. Be consistent with other official clients
2. Be idiomatic for language developers
Contradicting goals? Indeed… :)
6“All About Elasticsearch Language Clients”, Elastic{ON} 2015
@olivere
invited as collaboratorGoals:
1. elastic/go-elasticsearch
2. elastic/go-elasticsearch/esapi
3. elastic/go-elasticsearch/estransport
4. elastic/go-elasticsearch/esutil
godoc
)es
) and don't use generic names (api
)type Interface interface { Perform(*http.Request) (*http.Response, error) }
import "github.com/elastic/go-elasticsearch"
// Defaults // // Note: the ELASTICSEARCH_URL environment variable is used when exported // es, err := elasticsearch.NewDefaultClient()
// Configuration // cfg := elasticsearch.Config{ Addresses: []string{"https://example.com"}, Username: "foo", Password: "bar", } es, err = elasticsearch.NewClient(cfg)
es, _ := elasticsearch.NewDefaultClient()
// +build ignore
package main
import (
"log"
"os"
)
// START1 OMIT
import "github.com/elastic/go-elasticsearch"
// END1 OMIT
func main() {
log.SetFlags(0)
log.SetOutput(os.Stdout)
// START2 OMIT
// Defaults
//
// Note: the ELASTICSEARCH_URL environment variable is used when exported
//
es, err := elasticsearch.NewDefaultClient() // HL
// END2 OMIT
// START3 OMIT
// Configuration
//
cfg := elasticsearch.Config{
Addresses: []string{"https://example.com"},
Username: "foo",
Password: "bar",
}
es, err = elasticsearch.NewClient(cfg) // HL
// END3 OMIT
es, err = elasticsearch.NewDefaultClient()
res, err := es.Cluster.Health( es.Cluster.Health.WithLevel("indices"), es.Cluster.Health.WithPretty(), ) if err != nil { log.Fatalf("ERROR: %s", err) } defer res.Body.Close() log.Println(res)
}
esapi.Response
is a fmt.Stringer
res.Body
is an io.ReadCloser
— no decoding of response bodypackage esapi type ClusterHealth func(o ...func(*ClusterHealthRequest)) (*Response, error) func (f ClusterHealth) WithLevel(v string) func(*ClusterHealthRequest) { return func(r *ClusterHealthRequest) { r.Level = v } } // WithContext(), WithMasterTimeout(), WithIndex(), WithWaitForActiveShards(), ...
package esapi type ClusterHealthRequest struct { Context context.Context Level string Local bool MasterTimeout time.Duration // ... Pretty bool } func (r ClusterHealthRequest) Do(ctx context.Context, transport Transport) (*Response, error) { // ... req, _ := newRequest(method, path.String(), nil) // ... res, err := transport.Perform(req) // ... response := Response{ ... } return &response, nil }
package esapi func newClusterHealthFunc(t Transport) ClusterHealth { return func(o ...func(*ClusterHealthRequest)) (*Response, error) { var r = ClusterHealthRequest{} for _, f := range o { f(&r) } return r.Do(t) } } func New(t Transport) *API { return &API{ Cluster: &Cluster{ Health: newClusterHealthFunc(t), }, // ... } }
{ "cluster.health":{ "documentation":{ "url":"https://www.elastic.co/guide/en/elasticsearch/reference/master/cluster-health.html", "description":"Returns basic information about the health of the cluster." }, "stability":"stable", "url":{ "paths":[ { "path":"/_cluster/health", "methods":[ "GET" ] }, { "path":"/_cluster/health/{index}", "methods":[ "GET" ], "parts":{ "index":{ "type":"list", "description":"Limit the information returned to a specific index" } } } ] }, "params":{ "level":{ "type":"enum", "options":[ "cluster", "indices", "shards" ], "default":"cluster", "description":"Specify the level of detail for returned information" }, "local":{ "type":"boolean", "description":"Return local information, do not retrieve the state from master node (default: false)" }, "master_timeout":{ "type":"time", "description":"Explicit operation timeout for connection to master node" }, // ... } }
--- "cluster health basic test": - do: cluster.health: {} - is_true: cluster_name - is_false: timed_out - gte: { number_of_nodes: 1 } - gte: { number_of_data_nodes: 1 } - match: { active_primary_shards: 0 } - match: { active_shards: 0 } - match: { relocating_shards: 0 } - match: { initializing_shards: 0 } - match: { unassigned_shards: 0 } - gte: { number_of_pending_tasks: 0 } --- "cluster health basic test, one index": - do: indices.create: index: test_index body: settings: index: number_of_replicas: 0 - do: cluster.health: wait_for_status: green wait_for_no_relocating_shards: true - is_true: cluster_name - is_false: timed_out - gte: { number_of_nodes: 1 } - gte: { number_of_data_nodes: 1 } - gt: { active_primary_shards: 0 } - gt: { active_shards: 0 } - gte: { relocating_shards: 0 } - match: { initializing_shards: 0 } - match: { unassigned_shards: 0 } - gte: { number_of_pending_tasks: 0 } --- "cluster health basic test, one index with wait for active shards": - do: indices.create: index: test_index body: settings: index: number_of_replicas: 0 - do: cluster.health: wait_for_active_shards: 1 wait_for_no_relocating_shards: true - is_true: cluster_name - is_false: timed_out - gte: { number_of_nodes: 1 } - gte: { number_of_data_nodes: 1 } - gt: { active_primary_shards: 0 } - gt: { active_shards: 0 } - gte: { relocating_shards: 0 } - match: { initializing_shards: 0 } - match: { unassigned_shards: 0 } - gte: { number_of_pending_tasks: 0 } --- "cluster health basic test, one index with wait for all active shards": - do: indices.create: index: test_index body: settings: index: number_of_replicas: 0 - do: cluster.health: wait_for_active_shards: all wait_for_no_relocating_shards: true - is_true: cluster_name - is_false: timed_out - gte: { number_of_nodes: 1 } - gte: { number_of_data_nodes: 1 } - gt: { active_primary_shards: 0 } - gt: { active_shards: 0 } - gte: { relocating_shards: 0 } - match: { initializing_shards: 0 } - match: { unassigned_shards: 0 } - gte: { number_of_pending_tasks: 0 } --- "cluster health basic test, one index with wait for no initializing shards": - do: indices.create: index: test_index wait_for_active_shards: 0 body: settings: index: number_of_replicas: 0 - do: cluster.health: wait_for_no_initializing_shards: true - match: { initializing_shards: 0 } --- "cluster health levels": - do: indices.create: index: test_index - do: cluster.health: level: indices - is_true: indices - is_false: indices.test_index.shards - do: cluster.health: level: shards - is_true: indices - is_true: indices.test_index.shards --- "cluster health with closed index (pre 7.2.0)": - skip: version: "7.2.0 - " reason: "closed indices are replicated starting version 7.2.0" - do: indices.create: index: index-1 body: settings: index: number_of_replicas: 0 - do: cluster.health: wait_for_status: green - match: { status: green } - do: indices.create: index: index-2 body: settings: index: number_of_replicas: 50 - do: cluster.health: wait_for_status: yellow wait_for_no_relocating_shards: true - match: { status: yellow } - do: cluster.health: index: index-* - match: { status: yellow } - do: cluster.health: index: index-1 - match: { status: green } - do: cluster.health: index: index-2 - match: { status: yellow } - do: indices.close: index: index-2 - is_true: acknowledged # closing the index-2 turns the cluster health back to green - do: cluster.health: wait_for_status: green - match: { status: green } - do: cluster.health: index: index-* - match: { status: green } - do: cluster.health: index: index-1 - match: { status: green } - do: cluster.health: index: index-2 - match: { status: green } --- "cluster health with closed index": - skip: version: " - 7.1.99" reason: "closed indices are replicated starting version 7.2.0" - do: indices.create: index: index-1 body: settings: index: number_of_replicas: 0 - do: cluster.health: wait_for_status: green - match: { status: green } - do: indices.create: index: index-2 body: settings: index: number_of_replicas: 50 - do: cluster.health: wait_for_status: yellow wait_for_no_relocating_shards: true - match: { status: yellow } - do: cluster.health: index: index-* - match: { status: yellow } - do: cluster.health: index: index-1 - match: { status: green } - do: cluster.health: index: index-2 - match: { status: yellow } # closing the index-2 does not change the cluster health with replicated closed indices - do: indices.close: index: index-2 - is_true: acknowledged - do: cluster.health: wait_for_status: yellow - match: { status: yellow } - do: cluster.health: index: index-* - match: { status: yellow } - do: cluster.health: index: index-1 - match: { status: green } - do: cluster.health: index: index-2 - match: { status: yellow }
$ go doc go-elasticsearch/esapi ClusterHealth
type Index func(index string, body io.Reader, o ...func(*IndexRequest)) (*Response, error)
// +build ignore
package main
import (
"log"
"os"
"strings"
"github.com/elastic/go-elasticsearch"
)
func main() {
log.SetFlags(0)
log.SetOutput(os.Stdout)
es, err := elasticsearch.NewDefaultClient()
if err != nil {
log.Fatalf("ERROR: %s", err)
}
res, err := es.Index( "test", strings.NewReader(`{"foo":"bar"}`), es.Index.WithDocumentID("1"), es.Index.WithPretty(), ) if err != nil { log.Fatalf("ERROR: %s", err) } defer res.Body.Close() log.Println(res)
}
import ( "github.com/elastic/go-elasticsearch" "github.com/elastic/go-elasticsearch/esapi" )
// +build ignore
package main
import (
"context"
"log"
"os"
"strings"
)
// START1 OMIT
import (
"github.com/elastic/go-elasticsearch"
"github.com/elastic/go-elasticsearch/esapi" // HL
)
// END1 OMIT
func main() {
log.SetFlags(0)
log.SetOutput(os.Stdout)
es, err := elasticsearch.NewDefaultClient()
if err != nil {
log.Fatalf("ERROR: %s", err)
}
req := esapi.IndexRequest{ Index: "test", DocumentID: "1", Body: strings.NewReader(`{"foo":"bar"}`), Pretty: true, } res, err := req.Do(context.Background(), es) if err != nil { log.Fatalf("Error getting response: %s", err) } defer res.Body.Close() log.Println(res)
}
import "encoding/json" import "github.com/tidwall/gjson"
var d map[string]interface{} err = json.NewDecoder(res.Body).Decode(&d) if err != nil { log.Fatalf("Error parsing the response: %s", err) } log.Println("encoding/json:", d["tagline"])
// +build ignore
package main
import (
"bytes"
"log"
"os"
"github.com/elastic/go-elasticsearch"
)
// START1 OMIT
import "encoding/json"
import "github.com/tidwall/gjson"
// END1 OMIT
func main() {
log.SetFlags(0)
log.SetOutput(os.Stdout)
es, err := elasticsearch.NewDefaultClient()
if err != nil {
log.Fatalf("ERROR: %s", err)
}
res, _ := es.Info()
defer res.Body.Close()
// START2 OMIT
var d map[string]interface{} // HL
err = json.NewDecoder(res.Body).Decode(&d) // HL
if err != nil {
log.Fatalf("Error parsing the response: %s", err)
}
log.Println("encoding/json:", d["tagline"]) // HL
// END2 OMIT
res, _ = es.Info()
defer res.Body.Close()
var b bytes.Buffer b.ReadFrom(res.Body) log.Println("tidwall/gjson:", gjson.GetBytes(b.Bytes(), "tagline"))
}
import "github.com/elastic/go-elasticsearch/esutil"
res, _ = es.Index( "test", esutil.NewJSONReader(map[string]interface{}{"foo": "bar"}), )
// +build ignore
package main
import (
"log"
"os"
"github.com/elastic/go-elasticsearch"
"github.com/elastic/go-elasticsearch/esapi"
)
// START1 OMIT
import "github.com/elastic/go-elasticsearch/esutil"
// END1 OMIT
func main() {
log.SetFlags(0)
log.SetOutput(os.Stdout)
var (
res *esapi.Response
err error
)
es, err := elasticsearch.NewDefaultClient()
if err != nil {
log.Fatalf("ERROR: %s", err)
}
// START2 OMIT
res, _ = es.Index(
"test",
esutil.NewJSONReader(map[string]interface{}{"foo": "bar"}), // HL
)
// END2 OMIT
defer res.Body.Close()
log.Println(res)
type Doc struct { Title string `json:"title"` } doc := Doc{Title: "Test"} res, _ = es.Index( "test", esutil.NewJSONReader(&doc), )
defer res.Body.Close()
log.Println(res)
}
github.com/elastic/go-elasticsearch/tree/master/_examples
23github.com/elastic/go-elasticsearch/tree/master/_examples
valyala/fasthttp
)encoding/json
, mailru/easyjson
, tidwall/gjson
)Karel Minařík