Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -48,9 +48,11 @@ require (
github.com/tcnksm/ghr v0.13.0
github.com/uber/jaeger-lib v2.2.0+incompatible
go.opencensus.io v0.22.3
go.uber.org/atomic v1.5.1
go.uber.org/zap v1.13.0
golang.org/x/net v0.0.0-20200324143707-d3edc9973b7e
golang.org/x/sys v0.0.0-20200408040146-ea54a3c99b9b
golang.org/x/text v0.3.2
golang.org/x/tools v0.0.0-20200428211428-0c9eba77bc32 // indirect
google.golang.org/api v0.10.0 // indirect
google.golang.org/genproto v0.0.0-20200408120641-fbb3ad325eb7
Expand Down
24 changes: 12 additions & 12 deletions testbed/testbed/child_process.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,12 @@ import (
"path"
"path/filepath"
"sync"
"sync/atomic"
"syscall"
"time"

"github.com/shirou/gopsutil/cpu"
"github.com/shirou/gopsutil/process"
"go.uber.org/atomic"
)

// ResourceSpec is a resource consumption specification.
Expand Down Expand Up @@ -95,10 +95,10 @@ type ChildProcess struct {
lastProcessTimes *cpu.TimesStat

// Current RAM RSS in MiBs
ramMiBCur uint32
ramMiBCur atomic.Uint32

// Current CPU percentage times 1000 (we use scaling since we have to use int for atomic operations).
cpuPercentX1000Cur uint32
cpuPercentX1000Cur atomic.Uint32

// Maximum CPU seen
cpuPercentMax float64
Expand Down Expand Up @@ -287,8 +287,8 @@ func (cp *ChildProcess) Stop() (stopped bool, err error) {
close(finished)

// Set resource consumption stats to 0
atomic.StoreUint32(&cp.ramMiBCur, 0)
atomic.StoreUint32(&cp.cpuPercentX1000Cur, 0)
cp.ramMiBCur.Store(0)
cp.cpuPercentX1000Cur.Store(0)

log.Printf("%s process stopped, exit code=%d", cp.name, cp.cmd.ProcessState.ExitCode())

Expand Down Expand Up @@ -369,7 +369,7 @@ func (cp *ChildProcess) fetchRAMUsage() {
}

// Store current usage.
atomic.StoreUint32(&cp.ramMiBCur, ramMiBCur)
cp.ramMiBCur.Store(ramMiBCur)
}

func (cp *ChildProcess) fetchCPUUsage() {
Expand Down Expand Up @@ -398,19 +398,19 @@ func (cp *ChildProcess) fetchCPUUsage() {
curCPUPercentageX1000 := uint32(cpuPercent * 1000)

// Store current usage.
atomic.StoreUint32(&cp.cpuPercentX1000Cur, curCPUPercentageX1000)
cp.cpuPercentX1000Cur.Store(curCPUPercentageX1000)
}

func (cp *ChildProcess) checkAllowedResourceUsage() error {
// Check if current CPU usage exceeds expected.
var errMsg string
if cp.resourceSpec.ExpectedMaxCPU != 0 && cp.cpuPercentX1000Cur/1000 > cp.resourceSpec.ExpectedMaxCPU {
if cp.resourceSpec.ExpectedMaxCPU != 0 && cp.cpuPercentX1000Cur.Load()/1000 > cp.resourceSpec.ExpectedMaxCPU {
errMsg = fmt.Sprintf("CPU consumption is %.1f%%, max expected is %d%%",
float64(cp.cpuPercentX1000Cur)/1000.0, cp.resourceSpec.ExpectedMaxCPU)
float64(cp.cpuPercentX1000Cur.Load())/1000.0, cp.resourceSpec.ExpectedMaxCPU)
}

// Check if current RAM usage exceeds expected.
if cp.resourceSpec.ExpectedMaxRAM != 0 && cp.ramMiBCur > cp.resourceSpec.ExpectedMaxRAM {
if cp.resourceSpec.ExpectedMaxRAM != 0 && cp.ramMiBCur.Load() > cp.resourceSpec.ExpectedMaxRAM {
errMsg = fmt.Sprintf("RAM consumption is %d MiB, max expected is %d MiB",
cp.ramMiBCur, cp.resourceSpec.ExpectedMaxRAM)
}
Expand All @@ -431,8 +431,8 @@ func (cp *ChildProcess) GetResourceConsumption() string {
return ""
}

curRSSMib := atomic.LoadUint32(&cp.ramMiBCur)
curCPUPercentageX1000 := atomic.LoadUint32(&cp.cpuPercentX1000Cur)
curRSSMib := cp.ramMiBCur.Load()
curCPUPercentageX1000 := cp.cpuPercentX1000Cur.Load()

return fmt.Sprintf("%s RAM (RES):%4d MiB, CPU:%4.1f%%", cp.name,
curRSSMib, float64(curCPUPercentageX1000)/1000.0)
Expand Down
70 changes: 35 additions & 35 deletions testbed/testbed/data_providers.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,13 @@ import (
"log"
"math/rand"
"strconv"
"sync/atomic"
"time"

metricspb "github.com/census-instrumentation/opencensus-proto/gen-go/metrics/v1"
resourcepb "github.com/census-instrumentation/opencensus-proto/gen-go/resource/v1"
tracepb "github.com/census-instrumentation/opencensus-proto/gen-go/trace/v1"
"github.com/golang/protobuf/ptypes/timestamp"
"go.uber.org/atomic"

"go.opentelemetry.io/collector/consumer/pdata"
"go.opentelemetry.io/collector/internal/data"
Expand All @@ -37,53 +37,53 @@ import (
"go.opentelemetry.io/collector/translator/internaldata"
)

//DataProvider defines the interface for generators of test data used to drive various end-to-end tests.
// DataProvider defines the interface for generators of test data used to drive various end-to-end tests.
type DataProvider interface {
//SetLoadGeneratorCounters supplies pointers to LoadGenerator counters.
//The data provider implementation should increment these as it generates data.
SetLoadGeneratorCounters(batchesGenerated *uint64, dataItemsGenerated *uint64)
//GenerateTraces returns an internal Traces instance with an OTLP ResourceSpans slice populated with test data.
// SetLoadGeneratorCounters supplies pointers to LoadGenerator counters.
// The data provider implementation should increment these as it generates data.
SetLoadGeneratorCounters(batchesGenerated *atomic.Uint64, dataItemsGenerated *atomic.Uint64)
// GenerateTraces returns an internal Traces instance with an OTLP ResourceSpans slice populated with test data.
GenerateTraces() (pdata.Traces, bool)
//GenerateTracesOld returns a slice of OpenCensus Span instances populated with test data.
// GenerateTracesOld returns a slice of OpenCensus Span instances populated with test data.
GenerateTracesOld() ([]*tracepb.Span, bool)
//GenerateMetrics returns an internal MetricData instance with an OTLP ResourceMetrics slice of test data.
// GenerateMetrics returns an internal MetricData instance with an OTLP ResourceMetrics slice of test data.
GenerateMetrics() (data.MetricData, bool)
//GenerateMetricsOld returns a slice of OpenCensus Metric instances populated with test data.
// GenerateMetricsOld returns a slice of OpenCensus Metric instances populated with test data.
GenerateMetricsOld() ([]*metricspb.Metric, bool)
//GetGeneratedSpan returns the generated Span matching the provided traceId and spanId or else nil if no match found.
// GetGeneratedSpan returns the generated Span matching the provided traceId and spanId or else nil if no match found.
GetGeneratedSpan(traceID []byte, spanID []byte) *otlptrace.Span
}

//PerfTestDataProvider in an implementation of the DataProvider for use in performance tests.
//Tracing IDs are based on the incremented batch and data items counters.
// PerfTestDataProvider in an implementation of the DataProvider for use in performance tests.
// Tracing IDs are based on the incremented batch and data items counters.
type PerfTestDataProvider struct {
options LoadOptions
batchesGenerated *uint64
dataItemsGenerated *uint64
batchesGenerated *atomic.Uint64
dataItemsGenerated *atomic.Uint64
}

//NewPerfTestDataProvider creates an instance of PerfTestDataProvider which generates test data based on the sizes
//specified in the supplied LoadOptions.
// NewPerfTestDataProvider creates an instance of PerfTestDataProvider which generates test data based on the sizes
// specified in the supplied LoadOptions.
func NewPerfTestDataProvider(options LoadOptions) *PerfTestDataProvider {
return &PerfTestDataProvider{
options: options,
}
}

func (dp *PerfTestDataProvider) SetLoadGeneratorCounters(batchesGenerated *uint64, dataItemsGenerated *uint64) {
func (dp *PerfTestDataProvider) SetLoadGeneratorCounters(batchesGenerated *atomic.Uint64, dataItemsGenerated *atomic.Uint64) {
dp.batchesGenerated = batchesGenerated
dp.dataItemsGenerated = dataItemsGenerated
}

func (dp *PerfTestDataProvider) GenerateTracesOld() ([]*tracepb.Span, bool) {

var spans []*tracepb.Span
traceID := atomic.AddUint64(dp.batchesGenerated, 1)
traceID := dp.batchesGenerated.Inc()
for i := 0; i < dp.options.ItemsPerBatch; i++ {

startTime := time.Now()

spanID := atomic.AddUint64(dp.dataItemsGenerated, 1)
spanID := dp.dataItemsGenerated.Inc()

// Create a span.
span := &tracepb.Span{
Expand Down Expand Up @@ -126,13 +126,13 @@ func (dp *PerfTestDataProvider) GenerateTraces() (pdata.Traces, bool) {
spans := ilss.At(0).Spans()
spans.Resize(dp.options.ItemsPerBatch)

traceID := atomic.AddUint64(dp.batchesGenerated, 1)
traceID := dp.batchesGenerated.Inc()
for i := 0; i < dp.options.ItemsPerBatch; i++ {

startTime := time.Now()
endTime := startTime.Add(time.Duration(time.Millisecond))

spanID := atomic.AddUint64(dp.dataItemsGenerated, 1)
spanID := dp.dataItemsGenerated.Inc()

span := spans.At(i)

Expand Down Expand Up @@ -192,7 +192,7 @@ func (dp *PerfTestDataProvider) GenerateMetricsOld() ([]*metricspb.Metric, bool)
Resource: resource,
}

batchIndex := atomic.AddUint64(dp.batchesGenerated, 1)
batchIndex := dp.batchesGenerated.Inc()

// Generate data points for the metric. We generate timeseries each containing
// a single data points. This is the most typical payload composition since
Expand All @@ -201,7 +201,7 @@ func (dp *PerfTestDataProvider) GenerateMetricsOld() ([]*metricspb.Metric, bool)
timeseries := &metricspb.TimeSeries{}

startTime := time.Now()
value := atomic.AddUint64(dp.dataItemsGenerated, 1)
value := dp.dataItemsGenerated.Inc()

// Create a data point.
point := &metricspb.Point{
Expand Down Expand Up @@ -248,14 +248,14 @@ func (dp *PerfTestDataProvider) GenerateMetrics() (data.MetricData, bool) {
metricDescriptor.SetDescription("Load Generator Counter #" + strconv.Itoa(i))
metricDescriptor.SetType(pdata.MetricTypeInt64)

batchIndex := atomic.AddUint64(dp.batchesGenerated, 1)
batchIndex := dp.batchesGenerated.Inc()

// Generate data points for the metric.
metric.Int64DataPoints().Resize(dataPointsPerMetric)
for j := 0; j < dataPointsPerMetric; j++ {
dataPoint := metric.Int64DataPoints().At(j)
dataPoint.SetStartTime(pdata.TimestampUnixNano(uint64(time.Now().UnixNano())))
value := atomic.AddUint64(dp.dataItemsGenerated, 1)
value := dp.dataItemsGenerated.Inc()
dataPoint.SetValue(int64(value))
dataPoint.LabelsMap().InitFromMap(map[string]string{
"item_index": "item_" + strconv.Itoa(j),
Expand Down Expand Up @@ -283,22 +283,22 @@ func timeToTimestamp(t time.Time) *timestamp.Timestamp {
}
}

//GoldenDataProvider is an implementation of DataProvider for use in correctness tests.
//Provided data from the "Golden" dataset generated using pairwise combinatorial testing techniques.
// GoldenDataProvider is an implementation of DataProvider for use in correctness tests.
// Provided data from the "Golden" dataset generated using pairwise combinatorial testing techniques.
type GoldenDataProvider struct {
tracePairsFile string
spanPairsFile string
random io.Reader
batchesGenerated *uint64
dataItemsGenerated *uint64
batchesGenerated *atomic.Uint64
dataItemsGenerated *atomic.Uint64
resourceSpans []*otlptrace.ResourceSpans
spansIndex int
spansMap map[string]*otlptrace.Span
}

//NewGoldenDataProvider creates a new instance of GoldenDataProvider which generates test data based
//on the pairwise combinations specified in the tracePairsFile and spanPairsFile input variables.
//The supplied randomSeed is used to initialize the random number generator used in generating tracing IDs.
// NewGoldenDataProvider creates a new instance of GoldenDataProvider which generates test data based
// on the pairwise combinations specified in the tracePairsFile and spanPairsFile input variables.
// The supplied randomSeed is used to initialize the random number generator used in generating tracing IDs.
func NewGoldenDataProvider(tracePairsFile string, spanPairsFile string, randomSeed int64) *GoldenDataProvider {
return &GoldenDataProvider{
tracePairsFile: tracePairsFile,
Expand All @@ -307,7 +307,7 @@ func NewGoldenDataProvider(tracePairsFile string, spanPairsFile string, randomSe
}
}

func (dp *GoldenDataProvider) SetLoadGeneratorCounters(batchesGenerated *uint64, dataItemsGenerated *uint64) {
func (dp *GoldenDataProvider) SetLoadGeneratorCounters(batchesGenerated *atomic.Uint64, dataItemsGenerated *atomic.Uint64) {
dp.batchesGenerated = batchesGenerated
dp.dataItemsGenerated = dataItemsGenerated
}
Expand All @@ -321,7 +321,7 @@ func (dp *GoldenDataProvider) GenerateTraces() (pdata.Traces, bool) {
dp.resourceSpans = make([]*otlptrace.ResourceSpans, 0)
}
}
atomic.AddUint64(dp.batchesGenerated, 1)
dp.batchesGenerated.Inc()
if dp.spansIndex >= len(dp.resourceSpans) {
return pdata.TracesFromOtlp(make([]*otlptrace.ResourceSpans, 0)), true
}
Expand All @@ -332,7 +332,7 @@ func (dp *GoldenDataProvider) GenerateTraces() (pdata.Traces, bool) {
for _, libSpans := range resourceSpans[0].InstrumentationLibrarySpans {
spanCount += uint64(len(libSpans.Spans))
}
atomic.AddUint64(dp.dataItemsGenerated, spanCount)
dp.dataItemsGenerated.Add(spanCount)
return pdata.TracesFromOtlp(resourceSpans), false
}

Expand Down
15 changes: 9 additions & 6 deletions testbed/testbed/load_generator.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,25 +18,28 @@ import (
"fmt"
"log"
"sync"
"sync/atomic"
"time"

resourcepb "github.com/census-instrumentation/opencensus-proto/gen-go/resource/v1"
"go.uber.org/atomic"
"golang.org/x/text/message"

"go.opentelemetry.io/collector/consumer/consumerdata"
)

var printer = message.NewPrinter(message.MatchLanguage("en"))

// LoadGenerator is a simple load generator.
type LoadGenerator struct {
sender DataSender

dataProvider DataProvider

// Number of batches of data items sent.
batchesSent uint64
batchesSent atomic.Uint64

// Number of data items (spans or metric data points) sent.
dataItemsSent uint64
dataItemsSent atomic.Uint64

stopOnce sync.Once
stopWait sync.WaitGroup
Expand Down Expand Up @@ -111,11 +114,11 @@ func (lg *LoadGenerator) Stop() {

// GetStats returns the stats as a printable string.
func (lg *LoadGenerator) GetStats() string {
return fmt.Sprintf("Sent:%5d items", atomic.LoadUint64(&lg.dataItemsSent))
return printer.Sprintf("Sent:%10d items", lg.DataItemsSent())
}

func (lg *LoadGenerator) DataItemsSent() uint64 {
return atomic.LoadUint64(&lg.dataItemsSent)
return lg.dataItemsSent.Load()
}

// IncDataItemsSent is used when a test bypasses the LoadGenerator and sends data
Expand All @@ -125,7 +128,7 @@ func (lg *LoadGenerator) DataItemsSent() uint64 {
// reports to use their own counter and load generator and other sending sources
// to contribute to this counter. This could be done as a future improvement.
func (lg *LoadGenerator) IncDataItemsSent() {
atomic.AddUint64(&lg.dataItemsSent, 1)
lg.dataItemsSent.Inc()
}

func (lg *LoadGenerator) generate() {
Expand Down
Loading