- 
                Notifications
    
You must be signed in to change notification settings  - Fork 637
 
Open
Description
When using esClient.Search to retrieve data, I employed 50 goroutines for concurrent processing. However, when the data volume is large, the program does not terminate as expected. After testing, I found that the issue lies in the Perform method.When using esClient.Search to retrieve data, I employed 50 goroutines for concurrent processing. However, when the data volume is large, the program does not terminate as expected. After testing, I found that the issue lies in the Perform method. In poor network conditions, the program may remain in a waiting state indefinitely.
I used esClient.Search.WithTimeout(10*time.Second), but it didn't have any effect.
Testing needs to be conducted under poor network conditions.
package main
import (
	"encoding/json"
	"fmt"
	"log"
	"runtime"
	"strings"
	"sync"
	"time"
	elasticsearch8 "github.com/elastic/go-elasticsearch/v8"
)
var processLock sync.WaitGroup
func monitorGoroutines() {
	for {
		n := runtime.NumGoroutine()
		fmt.Printf("Number of goroutines: %d\n", n)
		time.Sleep(1 * time.Second)
	}
}
func main() {
	go monitorGoroutines()
	processChan := make(chan int, 1000)
	for range 50 {
		go Process(processChan)
		processLock.Add(1)
	}
	for range 2000 {
		processChan <- 1
	}
	close(processChan)
	processLock.Wait()
}
func Process(processChan chan int) {
	defer processLock.Done()
	esClient, err := elasticsearch8.NewClient(elasticsearch8.Config{
		Addresses: []string{"http://example.com"},
	})
	if err != nil {
		log.Println(err)
		return
	}
	for v := range processChan {
		query := map[string]interface{}{
			"query": map[string]interface{}{
				"bool": map[string]interface{}{
					"must": []map[string]interface{}{
						{
							"match": map[string]interface{}{
								"text": v,
							},
						},
					},
				},
			},
		}
		var buf strings.Builder
		if err := json.NewEncoder(&buf).Encode(query); err != nil {
			log.Fatalf("Error encoding query: %s", err)
		}
		res, err := esClient.Search(esClient.Search.WithIndex("example"), esClient.Search.WithBody(strings.NewReader(buf.String())), esClient.Search.WithTimeout(10*time.Second))
		if err != nil {
			log.Println(err)
			continue
		}
		fmt.Println(res)
		res.Body.Close()
	}
}Metadata
Metadata
Assignees
Labels
No labels
