diff --git a/config.go b/config.go index fd44221e..6ca83c47 100644 --- a/config.go +++ b/config.go @@ -107,6 +107,9 @@ func fillupDefaults(config *Config) { if config.Influx.BatchSize == 0 { config.Influx.BatchSize = DefaultIDBBatchSize } + if config.Influx.BufferSize == 0 { + config.Influx.BufferSize = DefaultIDBBufferSize + } if config.Influx.HTTPTimeout == 0 { config.Influx.HTTPTimeout = DefaultIDBTimeout } diff --git a/defaults.go b/defaults.go index 461c7c3e..cc1172c0 100644 --- a/defaults.go +++ b/defaults.go @@ -6,6 +6,8 @@ const ( // DefaultIDBBatchSize to use if user has not provided in the config DefaultIDBBatchSize = 1024 * 100 + // DefaultIDBBufferSize to use if user has not provided in the config + DefaultIDBBufferSize = 1024 //DefaultIDBBatchFreq is 2 seconds DefaultIDBBatchFreq = 2000 //DefaultIDBAccumulatorFreq is 2 seconds diff --git a/influx.go b/influx.go index 1806e7c9..fe33ca58 100644 --- a/influx.go +++ b/influx.go @@ -38,6 +38,7 @@ type InfluxConfig struct { Recreate bool `json:"recreate"` Measurement string `json:"measurement"` BatchSize int `json:"batchsize"` + BufferSize int `json:"buffersize"` BatchFrequency int `json:"batchfrequency"` HTTPTimeout int `json:"http-timeout"` RetentionPolicy string `json:"retention-policy"` @@ -206,12 +207,13 @@ func dbBatchWriteM(jctx *JCtx) { } batchSize := jctx.config.Influx.BatchSize - batchMCh := make(chan *batchWMData, batchSize/4) + bufferSize := jctx.config.Influx.BufferSize + batchMCh := make(chan *batchWMData, bufferSize) jctx.influxCtx.batchWMCh = batchMCh // wake up periodically and perform batch write into InfluxDB bFreq := jctx.config.Influx.BatchFrequency - jLog(jctx, fmt.Sprintln("batch size:", batchSize, "batch frequency:", bFreq)) + jLog(jctx, fmt.Sprintln("batch size:", batchSize, "batch frequency:", bFreq, "buffer size:", bufferSize)) ticker := time.NewTicker(time.Duration(bFreq) * time.Millisecond) go func() {