Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 0 additions & 21 deletions extension/healthcheckextension/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,6 @@ package healthcheckextension

import (
"context"
"errors"
"sync/atomic"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/config/configmodels"
Expand Down Expand Up @@ -50,24 +48,5 @@ func createDefaultConfig() configmodels.Extension {
func createExtension(_ context.Context, params component.ExtensionCreateParams, cfg configmodels.Extension) (component.Extension, error) {
config := cfg.(*Config)

// The runtime settings are global to the application, so while in principle it
// is possible to have more than one instance, running multiple does not bring
// any value to the service.
// In order to avoid this issue we will allow the creation of a single
// instance once per process while keeping the private function that allow
// the creation of multiple instances for unit tests. Summary: only a single
// instance can be created via the factory.
if !atomic.CompareAndSwapInt32(&instanceState, instanceNotCreated, instanceCreated) {
return nil, errors.New("only a single health check extension instance can be created per process")
}

return newServer(*config, params.Logger), nil
}

// See comment in createExtension how these are used.
var instanceState int32

const (
instanceNotCreated int32 = 0
instanceCreated int32 = 1
)
23 changes: 0 additions & 23 deletions extension/healthcheckextension/factory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ package healthcheckextension

import (
"context"
"sync/atomic"
"testing"

"github.com/stretchr/testify/assert"
Expand Down Expand Up @@ -44,9 +43,6 @@ func TestFactory_CreateDefaultConfig(t *testing.T) {
ext, err := createExtension(context.Background(), component.ExtensionCreateParams{Logger: zap.NewNop()}, cfg)
require.NoError(t, err)
require.NotNil(t, ext)

// Restore instance tracking from factory, for other tests.
atomic.StoreInt32(&instanceState, instanceNotCreated)
}

func TestFactory_CreateExtension(t *testing.T) {
Expand All @@ -56,23 +52,4 @@ func TestFactory_CreateExtension(t *testing.T) {
ext, err := createExtension(context.Background(), component.ExtensionCreateParams{Logger: zap.NewNop()}, cfg)
require.NoError(t, err)
require.NotNil(t, ext)

// Restore instance tracking from factory, for other tests.
atomic.StoreInt32(&instanceState, instanceNotCreated)
}

func TestFactory_CreateExtensionOnlyOnce(t *testing.T) {
cfg := createDefaultConfig().(*Config)
cfg.Port = testutil.GetAvailablePort(t)

ext, err := createExtension(context.Background(), component.ExtensionCreateParams{Logger: zap.NewNop()}, cfg)
require.NoError(t, err)
require.NotNil(t, ext)

ext1, err := createExtension(context.Background(), component.ExtensionCreateParams{Logger: zap.NewNop()}, cfg)
require.Error(t, err)
require.Nil(t, ext1)

// Restore instance tracking from factory, for other tests.
atomic.StoreInt32(&instanceState, instanceNotCreated)
}
14 changes: 10 additions & 4 deletions extension/healthcheckextension/healthcheckextension.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ type healthCheckExtension struct {
logger *zap.Logger
state *healthcheck.HealthCheck
server http.Server
stopCh chan struct{}
}

var _ component.PipelineWatcher = (*healthCheckExtension)(nil)
Expand All @@ -43,14 +44,15 @@ func (hc *healthCheckExtension) Start(_ context.Context, host component.Host) er
portStr := ":" + strconv.Itoa(int(hc.config.Port))
ln, err := net.Listen("tcp", portStr)
if err != nil {
host.ReportFatalError(err)
return nil
return err
}

// Mount HC handler
hc.server.Handler = hc.state.Handler()

hc.stopCh = make(chan struct{})
go func() {
defer close(hc.stopCh)

// The listener ownership goes to the server.
if err := hc.server.Serve(ln); err != http.ErrServerClosed && err != nil {
host.ReportFatalError(err)
Expand All @@ -61,7 +63,11 @@ func (hc *healthCheckExtension) Start(_ context.Context, host component.Host) er
}

func (hc *healthCheckExtension) Shutdown(context.Context) error {
return hc.server.Close()
err := hc.server.Close()
if hc.stopCh != nil {
<-hc.stopCh
}
return err
}

func (hc *healthCheckExtension) Ready() error {
Expand Down
43 changes: 26 additions & 17 deletions extension/healthcheckextension/healthcheckextension_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,12 @@ import (
"runtime"
"strconv"
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.uber.org/zap"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/component/componenttest"
"go.opentelemetry.io/collector/testutil"
)
Expand Down Expand Up @@ -86,14 +87,8 @@ func TestHealthCheckExtensionPortAlreadyInUse(t *testing.T) {
hcExt := newServer(config, zap.NewNop())
require.NotNil(t, hcExt)

// Health check will report port already in use in a goroutine, use the error waiting
// host to get it.
mh := componenttest.NewErrorWaitingHost()
require.NoError(t, hcExt.Start(context.Background(), mh))

receivedError, receivedErr := mh.WaitForFatalError(500 * time.Millisecond)
require.True(t, receivedError)
require.Error(t, receivedErr)
mh := newAssertNoErrorHost(t)
require.Error(t, hcExt.Start(context.Background(), mh))
}

func TestHealthCheckMultipleStarts(t *testing.T) {
Expand All @@ -104,17 +99,11 @@ func TestHealthCheckMultipleStarts(t *testing.T) {
hcExt := newServer(config, zap.NewNop())
require.NotNil(t, hcExt)

mh := componenttest.NewErrorWaitingHost()
mh := newAssertNoErrorHost(t)
require.NoError(t, hcExt.Start(context.Background(), mh))
defer hcExt.Shutdown(context.Background())

// Health check will report already in use in a goroutine, use the error waiting
// host to get it.
require.NoError(t, hcExt.Start(context.Background(), mh))

receivedError, receivedErr := mh.WaitForFatalError(500 * time.Millisecond)
require.True(t, receivedError)
require.Error(t, receivedErr)
require.Error(t, hcExt.Start(context.Background(), mh))
}

func TestHealthCheckMultipleShutdowns(t *testing.T) {
Expand All @@ -140,3 +129,23 @@ func TestHealthCheckShutdownWithoutStart(t *testing.T) {

require.NoError(t, hcExt.Shutdown(context.Background()))
}

// assertNoErrorHost implements a component.Host that asserts that there were no errors.
type assertNoErrorHost struct {
component.Host
*testing.T
}

var _ component.Host = (*assertNoErrorHost)(nil)

// newAssertNoErrorHost returns a new instance of assertNoErrorHost.
func newAssertNoErrorHost(t *testing.T) component.Host {
return &assertNoErrorHost{
componenttest.NewNopHost(),
t,
}
}

func (aneh *assertNoErrorHost) ReportFatalError(err error) {
assert.NoError(aneh, err)
}
21 changes: 0 additions & 21 deletions extension/pprofextension/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ package pprofextension
import (
"context"
"errors"
"sync/atomic"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/config/configmodels"
Expand Down Expand Up @@ -53,25 +52,5 @@ func createExtension(_ context.Context, params component.ExtensionCreateParams,
return nil, errors.New("\"endpoint\" is required when using the \"pprof\" extension")
}

// The runtime settings are global to the application, so while in principle it
// is possible to have more than one instance, running multiple will mean that
// the settings of the last started instance will prevail. In order to avoid
// this issue we will allow the creation of a single instance once per process
// while keeping the private function that allow the creation of multiple
// instances for unit tests. Summary: only a single instance can be created
// via the factory.
// TODO: Move this as an option to extensionhelper.
if !atomic.CompareAndSwapInt32(&instanceState, instanceNotCreated, instanceCreated) {
return nil, errors.New("only a single pprof extension instance can be created per process")
}

return newServer(*config, params.Logger), nil
}

// See comment in createExtension how these are used.
var instanceState int32

const (
instanceNotCreated int32 = 0
instanceCreated int32 = 1
)
23 changes: 0 additions & 23 deletions extension/pprofextension/factory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ package pprofextension

import (
"context"
"sync/atomic"
"testing"

"github.com/stretchr/testify/assert"
Expand Down Expand Up @@ -44,9 +43,6 @@ func TestFactory_CreateDefaultConfig(t *testing.T) {
ext, err := createExtension(context.Background(), component.ExtensionCreateParams{Logger: zap.NewNop()}, cfg)
require.NoError(t, err)
require.NotNil(t, ext)

// Restore instance tracking from factory, for other tests.
atomic.StoreInt32(&instanceState, instanceNotCreated)
}

func TestFactory_CreateExtension(t *testing.T) {
Expand All @@ -56,23 +52,4 @@ func TestFactory_CreateExtension(t *testing.T) {
ext, err := createExtension(context.Background(), component.ExtensionCreateParams{Logger: zap.NewNop()}, cfg)
require.NoError(t, err)
require.NotNil(t, ext)

// Restore instance tracking from factory, for other tests.
atomic.StoreInt32(&instanceState, instanceNotCreated)
}

func TestFactory_CreateExtensionOnlyOnce(t *testing.T) {
cfg := createDefaultConfig().(*Config)
cfg.Endpoint = testutil.GetAvailableLocalAddress(t)

ext, err := createExtension(context.Background(), component.ExtensionCreateParams{Logger: zap.NewNop()}, cfg)
require.NoError(t, err)
require.NotNil(t, ext)

ext1, err := createExtension(context.Background(), component.ExtensionCreateParams{Logger: zap.NewNop()}, cfg)
require.Error(t, err)
require.Nil(t, ext1)

// Restore instance tracking from factory, for other tests.
atomic.StoreInt32(&instanceState, instanceNotCreated)
}
60 changes: 49 additions & 11 deletions extension/pprofextension/pprofextension.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,62 +16,100 @@ package pprofextension

import (
"context"
"errors"
"net"
"net/http"
_ "net/http/pprof" // #nosec Needed to enable the performance profiler
"os"
"runtime"
"runtime/pprof"
"sync/atomic"
"unsafe"

"go.uber.org/zap"

"go.opentelemetry.io/collector/component"
)

// Tracks that only a single instance is active per process.
// See comment on Start method for the reasons for that.
var activeInstance *pprofExtension
var activeInstancePtr = (*unsafe.Pointer)(unsafe.Pointer(&activeInstance))

type pprofExtension struct {
config Config
logger *zap.Logger
file *os.File
server http.Server
stopCh chan struct{}
}

func (p *pprofExtension) Start(_ context.Context, host component.Host) error {
// The runtime settings are global to the application, so while in principle it
// is possible to have more than one instance, running multiple will mean that
// the settings of the last started instance will prevail. In order to avoid
// this issue we will allow the start of a single instance once per process
// Summary: only a single instance can be running in the same process.
if !atomic.CompareAndSwapPointer(activeInstancePtr, nil, unsafe.Pointer(p)) {
return errors.New("only a single pprof extension instance can be running per process")
}

// Take care that if any error happen when starting the active instance is cleaned.
var startErr error
defer func() {
if startErr != nil {
atomic.StorePointer(activeInstancePtr, nil)
}
}()

// Start the listener here so we can have earlier failure if port is
// already in use.
ln, err := net.Listen("tcp", p.config.Endpoint)
if err != nil {
return err
var ln net.Listener
ln, startErr = net.Listen("tcp", p.config.Endpoint)
if startErr != nil {
return startErr
}

runtime.SetBlockProfileRate(p.config.BlockProfileFraction)
runtime.SetMutexProfileFraction(p.config.MutexProfileFraction)

p.logger.Info("Starting net/http/pprof server", zap.Any("config", p.config))
p.stopCh = make(chan struct{})
go func() {
defer close(p.stopCh)

// The listener ownership goes to the server.
if err := p.server.Serve(ln); err != nil && err != http.ErrServerClosed {
err := p.server.Serve(ln)
atomic.StorePointer(activeInstancePtr, nil)
if err != nil && err != http.ErrServerClosed {
host.ReportFatalError(err)
}
}()

if p.config.SaveToFile != "" {
f, err := os.Create(p.config.SaveToFile)
if err != nil {
return err
var f *os.File
f, startErr = os.Create(p.config.SaveToFile)
if startErr != nil {
return startErr
}
p.file = f
return pprof.StartCPUProfile(f)
startErr = pprof.StartCPUProfile(f)
}

return nil
return startErr
}

func (p *pprofExtension) Shutdown(context.Context) error {
defer atomic.StorePointer(activeInstancePtr, nil)
if p.file != nil {
pprof.StopCPUProfile()
p.file.Close() // ignore the error
_ = p.file.Close() // ignore the error
}
err := p.server.Close()
if p.stopCh != nil {
<-p.stopCh
}
return p.server.Close()
return err
}

func newServer(config Config, logger *zap.Logger) *pprofExtension {
Expand Down
Loading