当前位置:   article > 正文

go整合es_go es

go es
  1. 引入依赖
  • 看你的es版本,自己引入
go get "github.com/elastic/go-elasticsearch/v7"
  1. 创建一个客户端
  1. func InitEsClient() error {
  2. client, err := elasticsearch.NewClient(elasticsearch.Config{
  3. Addresses: []string{"http://" +
  4. myconfig.GConfig.ElasticConfig.Host +
  5. ":" +
  6. strconv.Itoa(myconfig.GConfig.ElasticConfig.Port)},
  7. })
  8. if err != nil {
  9. return err
  10. }
  11. util.EsClient = client
  12. return nil
  13. }
  1. 在es_util中定义方法全局变量和客户端
var EsClient *elasticsearch.Client
  1. 定义创建索引的方法
  • 第二个参数是创建;了一个上下文
  1. 控制请求的超时时间:通过设定上下文的超时时间,可以控制请求的执行时间。如果在设定的时间内请求未能完成,那么可以通过取消信号来中断请求。
  2. 取消请求:如果请求的执行过程中出现了错误,或者我们不再需要这个请求,可以通过取消信号来中断请求。
  1. func CreateIndex(indexName string) error {
  2. // 执行构建操作,十秒之后会调用执行函数
  3. withTimeout, cancelFunc := context.WithTimeout(context.Background(), 10*time.Second)
  4. defer cancelFunc()
  5. select {
  6. case <-withTimeout.Done():
  7. // 如果超时,取消操作并执行超时后的操作
  8. return fmt.Errorf("create index timeout")
  9. default:
  10. // 执行正常操作
  11. withContext := EsClient.Indices.Create.WithContext(withTimeout)
  12. response, err := EsClient.Indices.Create(
  13. indexName,
  14. withContext,
  15. )
  16. if err != nil {
  17. return fmt.Errorf("create index error: %s", err)
  18. }
  19. if response.Status() != "200" {
  20. return fmt.Errorf("create index error: %s", response.Status())
  21. }
  22. return err
  23. }
  24. }
  1. 新增一条文档
  1. func InsertDocument(indexName string, doc interface{}) error {
  2. // 将结构体转换为 JSON 的字节切片
  3. marshal, err := json.Marshal(doc)
  4. if err != nil {
  5. return err
  6. }
  7. // 反射获取id
  8. idFiled := reflect.ValueOf(doc).FieldByName("Id")
  9. if !idFiled.IsValid() {
  10. return fmt.Errorf("invalid id field")
  11. }
  12. kind := idFiled.Kind()
  13. if kind != reflect.String {
  14. return fmt.Errorf("invalid id is not string type")
  15. }
  16. response, err := EsClient.Index(
  17. indexName,
  18. // 输入流
  19. bytes.NewReader(marshal),
  20. // 上下文
  21. EsClient.Index.WithContext(context.Background()),
  22. // 自定义Id
  23. EsClient.Index.WithDocumentID(idFiled.String()),
  24. )
  25. if err != nil {
  26. return fmt.Errorf("insert document error: %s", err)
  27. }
  28. if response.Status() != "200 OK" {
  29. return fmt.Errorf("insert document error: %s", response.Status())
  30. }
  31. return nil
  32. }
  1. 查询文档
  1. func SearchDocuments(indexName string, query string) ([]interface{}, error) {
  2. //query是查询的条件"{\n \"query\": {\n \"match_all\": {}\n }\n}\n"
  3. res, err := EsClient.Search(
  4. // 上下文
  5. EsClient.Search.WithContext(context.Background()),
  6. // 索引名字
  7. EsClient.Search.WithIndex(indexName),
  8. // 额外的查询条件
  9. EsClient.Search.WithBody(bytes.NewReader([]byte(query))),
  10. )
  11. if err != nil {
  12. return nil, fmt.Errorf("search document error: %s", err)
  13. }
  14. // 关闭输入流
  15. defer func(Body io.ReadCloser) {
  16. err := Body.Close()
  17. if err != nil {
  18. return
  19. }
  20. }(res.Body)
  21. if res.IsError() {
  22. return nil, fmt.Errorf("search document error: %s", res.Status())
  23. }
  24. // 结果集
  25. var result map[string]interface{}
  26. // 封装结果集
  27. if err := json.NewDecoder(res.Body).Decode(&result); err != nil {
  28. return nil, fmt.Errorf("error in decoding result: %s", err)
  29. }
  30. // 结果集中获取命中,强转成数组
  31. r := result["hits"].(map[string]interface{})["hits"].([]interface{})
  32. var data []interface{}
  33. // 只留下_source里面的数据
  34. for j := 0; j < len(r); j++ {
  35. i := r[j].(map[string]interface{})["_source"]
  36. data = append(data, i)
  37. }
  38. return data, nil
  39. }
  1. 更新文档
  1. func UpdateDocument(indexName string, docID string, doc interface{}) error {
  2. // 编码文档数据为 JSON 格式
  3. // 这个是部分更新
  4. marshal, err := json.Marshal(map[string]interface{}{"doc": doc})
  5. if err != nil {
  6. return fmt.Errorf("marshal document error: %s", err)
  7. }
  8. // 执行更新操作,不再指定文档类型
  9. update, err := EsClient.Update(
  10. indexName,
  11. docID,
  12. bytes.NewReader(marshal),
  13. EsClient.Update.WithContext(context.Background()))
  14. if err != nil {
  15. return fmt.Errorf("update document error: %s", err)
  16. }
  17. if update.StatusCode != 200 {
  18. return fmt.Errorf("update document error: %s", update.Status())
  19. }
  20. return nil
  21. }
  1. 删除文档
  1. func DeleteDocument(indexName string, docID string) error {
  2. response, err := EsClient.Delete(
  3. indexName,
  4. docID,
  5. EsClient.Delete.WithContext(context.Background()))
  6. if err != nil {
  7. return err
  8. }
  9. if response.Status() != "200" {
  10. return fmt.Errorf("delete document error: %s", response.Status())
  11. }
  12. return nil
  13. }
  1. 批量添加文档
  1. func BulkAddDocument(indexName string, data []interface{}) error {
  2. datas, err := convertToBulkData(indexName, data)
  3. if err != nil {
  4. return fmt.Errorf("convert to bulk data error: %s", err)
  5. }
  6. res, err := EsClient.Bulk(
  7. strings.NewReader(datas),
  8. EsClient.Bulk.WithContext(context.Background()))
  9. if err != nil {
  10. return fmt.Errorf("bulk add document error: %s", err)
  11. }
  12. if res.StatusCode != 200 {
  13. return fmt.Errorf("bulk add document error: %s", res.Status())
  14. }
  15. return nil
  16. }
  17. func convertToBulkData(indexName string, data []interface{}) (string, error) {
  18. var buf bytes.Buffer
  19. for _, item := range data {
  20. id, err := getIdByReflect(item)
  21. if err != nil {
  22. return "", err
  23. }
  24. meta := map[string]interface{}{
  25. "index": map[string]string{
  26. "_index": indexName,
  27. "_id": id,
  28. },
  29. }
  30. metaLine, err := json.Marshal(meta)
  31. if err != nil {
  32. return "", fmt.Errorf("marshal meta error: %s", err)
  33. }
  34. dataLine, err := json.Marshal(item)
  35. if err != nil {
  36. return "", fmt.Errorf("marshal meta error: %s", err)
  37. }
  38. // Append to buffer with newline
  39. buf.WriteString(string(metaLine) + "\n")
  40. buf.WriteString(string(dataLine) + "\n")
  41. }
  42. return buf.String(), nil
  43. }
  44. func getIdByReflect(doc interface{}) (string, error) {
  45. // 反射获取id
  46. idFiled := reflect.ValueOf(doc).FieldByName("Id")
  47. if !idFiled.IsValid() {
  48. return "", fmt.Errorf("invalid id field")
  49. }
  50. kind := idFiled.Kind()
  51. if kind != reflect.String {
  52. return "", fmt.Errorf("invalid id is not string type")
  53. }
  54. return idFiled.String(), nil
  55. }
  • 测试添加数据
  1. func TestBulkAddDocuments() {
  2. var users []User
  3. i := append(users, User{
  4. Id: "1111111111111111111111114541",
  5. Name: "ledger",
  6. Age: 18,
  7. Email: "123@qq.com",
  8. })
  9. i2 := append(i, User{
  10. Id: "111111111111111111111111",
  11. Name: "ledger",
  12. Age: 18,
  13. Email: "123@qq.com",
  14. })
  15. var interfaceSlice []interface{}
  16. for _, u := range i2 {
  17. interfaceSlice = append(interfaceSlice, u)
  18. }
  19. err := util.BulkAddDocument("test", interfaceSlice)
  20. if err != nil {
  21. fmt.Printf("service error: %s\n\n", err)
  22. } else {
  23. fmt.Println("success")
  24. }
  25. }
声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/很楠不爱3/article/detail/734089
推荐阅读
相关标签
  

闽ICP备14008679号