package es import ( "github.com/imroc/req" "math/rand" "net/http" "go_sku_server/pkg/config" ) type Response struct { Took int `json:"took"` TimeOut bool `json:"time_out"` Shard map[string]int `json:"_shard"` HitsResult HitsResult `json:"hits"` } type HitsResult struct { Total int `json:"total"` MaxScore float64 `json:"max_score"` Hits string `json:"hits"` } func CurlES(index string, param string) (result string, err error) { endpoints := config.Get("es.urls").Strings(",") //随机获取一个节点进行请求 esUrl := endpoints[rand.Intn(len(endpoints))] params := req.BodyJSON(param) resp, err := req.Get(esUrl+"/"+index+"/_search", params) if err != nil { return } result = resp.String() return } /* 主分片查询 */ func CurlEsPrimary(index string, param string) (result string, err error) { endpoints := config.Get("es.urls").Strings(",") //随机获取一个节点进行请求 esUrl := endpoints[rand.Intn(len(endpoints))] params := req.BodyJSON(param) resp, err := req.Get(esUrl+"/"+index+"/_search?preference=_primary_first", params) if err != nil { return } result = resp.String() return } /* 批量插入 或者 更新 es POST /_bulk { "index":{"_index":"hcy1","_type":"goods","_id":"333333"} } { "name":"john doe","age":25 } { "index":{"_index":"hcy1","_type":"goods","_id":"6666"} } { "name":"mary smith","age":32 } eg: 方法一: param := `{"index":{"_index":"hcy1","_type":"goods","_id":"s1"} }`+"\n"+`{"name":"john doe","age":25 }`+"\n"+`{"index":{"_index":"hcy1","_type":"goods","_id":"s2"} }`+"\n"+`{"name":"john doe","age":25 }`+"\n" result,err := es.BulkES(param) println(result,err) 方法二: lines := []string{ `{"index":{"_index":"hcy1","_type":"goods","_id":"s1"} }`, `{"name":"john doe","age":25 }`, `{"index":{"_index":"hcy1","_type":"goods","_id":"s2"} }`, `{"name":"mary smith","age":32 }`, } param := strings.Join(lines, "\n")+"\n" */ func BulkES(param string) (result string, err error) { endpoints := config.Get("es.urls").Strings(",") esUrl := endpoints[rand.Intn(len(endpoints))] //随机获取一个节点进行请求 //params := req.BodyJSON(param) //req.Debug = true header := make(http.Header) header.Set("Content-Type", "application/x-ndjson") resp, err := req.Post(esUrl+"/_bulk?refresh=true", header, param) //refresh=true 立刻插入或修改 立刻生效,不需要等待es缓存写入文档 if err != nil { return } result = resp.String() return } //多条件多索引查询,需要查询的索引和条件自己拼接成queryJson里面 func CurlESMSearch(queryJson string) (result string, err error) { endpoints := config.Get("es.urls").Strings(",") //随机获取一个节点进行请求 req.Debug = false esUrl := endpoints[rand.Intn(len(endpoints))] params := req.BodyJSON(queryJson) resp, err := req.Post(esUrl+"/_msearch", params) if err != nil { return } result = resp.String() return } //滚动搜索和索引之间的文档重索引 func ScrollES(param string) (result string, err error) { endpoints := config.Get("es.urls").Strings(",") //随机获取一个节点进行请求 req.Debug = false esUrl := endpoints[rand.Intn(len(endpoints))] params := req.BodyJSON(param) resp, err := req.Post(esUrl+"/_search/scroll", params) if err != nil { return } result = resp.String() return }