Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
68 changes: 35 additions & 33 deletions perf/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"github.com/cilium/ebpf"
"github.com/cilium/ebpf/internal"
"github.com/cilium/ebpf/internal/epoll"
"github.com/cilium/ebpf/internal/sys"
"github.com/cilium/ebpf/internal/unix"
)

Expand Down Expand Up @@ -154,11 +155,10 @@ type Reader struct {
epollRings []*perfEventRing
eventHeader []byte

// pauseFds are a copy of the fds in 'rings', protected by 'pauseMu'.
// These allow Pause/Resume to be executed independently of any ongoing
// Read calls, which would otherwise need to be interrupted.
// pauseMu protects eventFds so that Pause / Resume can be invoked while
// Read is blocked.
pauseMu sync.Mutex
pauseFds []int
eventFds []*sys.FD

paused bool
overwritable bool
Expand Down Expand Up @@ -193,6 +193,12 @@ func NewReader(array *ebpf.Map, perCPUBuffer int) (*Reader, error) {

// NewReaderWithOptions creates a new reader with the given options.
func NewReaderWithOptions(array *ebpf.Map, perCPUBuffer int, opts ReaderOptions) (pr *Reader, err error) {
closeOnError := func(c io.Closer) {
if err != nil {
c.Close()
}
}

if perCPUBuffer < 1 {
return nil, errors.New("perCPUBuffer must be larger than 0")
}
Expand All @@ -201,53 +207,41 @@ func NewReaderWithOptions(array *ebpf.Map, perCPUBuffer int, opts ReaderOptions)
}

var (
fds []int
nCPU = int(array.MaxEntries())
rings = make([]*perfEventRing, 0, nCPU)
pauseFds = make([]int, 0, nCPU)
eventFds = make([]*sys.FD, 0, nCPU)
)

poller, err := epoll.New()
if err != nil {
return nil, err
}

defer func() {
if err != nil {
poller.Close()
for _, fd := range fds {
unix.Close(fd)
}
for _, ring := range rings {
if ring != nil {
ring.Close()
}
}
}
}()
defer closeOnError(poller)

// bpf_perf_event_output checks which CPU an event is enabled on,
// but doesn't allow using a wildcard like -1 to specify "all CPUs".
// Hence we have to create a ring for each CPU.
bufferSize := 0
for i := 0; i < nCPU; i++ {
ring, err := newPerfEventRing(i, perCPUBuffer, opts)
event, ring, err := newPerfEventRing(i, perCPUBuffer, opts)
if errors.Is(err, unix.ENODEV) {
// The requested CPU is currently offline, skip it.
rings = append(rings, nil)
pauseFds = append(pauseFds, -1)
eventFds = append(eventFds, nil)
continue
}

if err != nil {
return nil, fmt.Errorf("failed to create perf ring for CPU %d: %v", i, err)
}
defer closeOnError(event)
defer closeOnError(ring)

bufferSize = ring.size()
rings = append(rings, ring)
pauseFds = append(pauseFds, ring.fd)
eventFds = append(eventFds, event)

if err := poller.Add(ring.fd, i); err != nil {
if err := poller.Add(event.Int(), i); err != nil {
return nil, err
}
}
Expand All @@ -265,7 +259,7 @@ func NewReaderWithOptions(array *ebpf.Map, perCPUBuffer int, opts ReaderOptions)
epollEvents: make([]unix.EpollEvent, len(rings)),
epollRings: make([]*perfEventRing, 0, len(rings)),
eventHeader: make([]byte, perfEventHeaderSize),
pauseFds: pauseFds,
eventFds: eventFds,
overwritable: opts.Overwritable,
bufferSize: bufferSize,
}
Expand All @@ -291,17 +285,25 @@ func (pr *Reader) Close() error {
}

// Trying to poll will now fail, so Read() can't block anymore. Acquire the
// lock so that we can clean up.
// locks so that we can clean up.
pr.mu.Lock()
defer pr.mu.Unlock()

pr.pauseMu.Lock()
defer pr.pauseMu.Unlock()

for _, ring := range pr.rings {
if ring != nil {
ring.Close()
}
}
for _, event := range pr.eventFds {
if event != nil {
event.Close()
}
}
pr.rings = nil
pr.pauseFds = nil
pr.eventFds = nil
pr.array.Close()

return nil
Expand Down Expand Up @@ -406,11 +408,11 @@ func (pr *Reader) Pause() error {
pr.pauseMu.Lock()
defer pr.pauseMu.Unlock()

if pr.pauseFds == nil {
if pr.eventFds == nil {
return fmt.Errorf("%w", ErrClosed)
}

for i := range pr.pauseFds {
for i := range pr.eventFds {
if err := pr.array.Delete(uint32(i)); err != nil && !errors.Is(err, ebpf.ErrKeyNotExist) {
return fmt.Errorf("could't delete event fd for CPU %d: %w", i, err)
}
Expand All @@ -428,16 +430,16 @@ func (pr *Reader) Resume() error {
pr.pauseMu.Lock()
defer pr.pauseMu.Unlock()

if pr.pauseFds == nil {
if pr.eventFds == nil {
return fmt.Errorf("%w", ErrClosed)
}

for i, fd := range pr.pauseFds {
if fd == -1 {
for i, fd := range pr.eventFds {
if fd == nil {
continue
}

if err := pr.array.Put(uint32(i), uint32(fd)); err != nil {
if err := pr.array.Put(uint32(i), fd.Uint()); err != nil {
return fmt.Errorf("couldn't put event fd %d for CPU %d: %w", fd, i, err)
}
}
Expand Down
3 changes: 1 addition & 2 deletions perf/reader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ import (
"github.com/cilium/ebpf/internal"
"github.com/cilium/ebpf/internal/testutils"
"github.com/cilium/ebpf/internal/testutils/fdtrace"
"github.com/cilium/ebpf/internal/unix"

"github.com/go-quicktest/qt"
)
Expand Down Expand Up @@ -386,7 +385,7 @@ func TestCreatePerfEvent(t *testing.T) {
if err != nil {
t.Fatal("Can't create perf event:", err)
}
unix.Close(fd)
fd.Close()
}

func TestReadRecord(t *testing.T) {
Expand Down
43 changes: 22 additions & 21 deletions perf/ring.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,42 +10,47 @@ import (
"sync/atomic"
"unsafe"

"github.com/cilium/ebpf/internal/sys"
"github.com/cilium/ebpf/internal/unix"
)

// perfEventRing is a page of metadata followed by
// a variable number of pages which form a ring buffer.
type perfEventRing struct {
fd int
cpu int
mmap []byte
ringReader
}

func newPerfEventRing(cpu, perCPUBuffer int, opts ReaderOptions) (*perfEventRing, error) {
func newPerfEventRing(cpu, perCPUBuffer int, opts ReaderOptions) (_ *sys.FD, _ *perfEventRing, err error) {
closeOnError := func(c io.Closer) {
if err != nil {
c.Close()
}
}

if opts.Watermark >= perCPUBuffer {
return nil, errors.New("watermark must be smaller than perCPUBuffer")
return nil, nil, errors.New("watermark must be smaller than perCPUBuffer")
}

fd, err := createPerfEvent(cpu, opts)
if err != nil {
return nil, err
return nil, nil, err
}
defer closeOnError(fd)

if err := unix.SetNonblock(fd, true); err != nil {
unix.Close(fd)
return nil, err
if err := unix.SetNonblock(fd.Int(), true); err != nil {
return nil, nil, err
}

protections := unix.PROT_READ
if !opts.Overwritable {
protections |= unix.PROT_WRITE
}

mmap, err := unix.Mmap(fd, 0, perfBufferSize(perCPUBuffer), protections, unix.MAP_SHARED)
mmap, err := unix.Mmap(fd.Int(), 0, perfBufferSize(perCPUBuffer), protections, unix.MAP_SHARED)
if err != nil {
unix.Close(fd)
return nil, fmt.Errorf("can't mmap: %v", err)
return nil, nil, fmt.Errorf("can't mmap: %v", err)
}

// This relies on the fact that we allocate an extra metadata page,
Expand All @@ -62,14 +67,13 @@ func newPerfEventRing(cpu, perCPUBuffer int, opts ReaderOptions) (*perfEventRing
}

ring := &perfEventRing{
fd: fd,
cpu: cpu,
mmap: mmap,
ringReader: reader,
}
runtime.SetFinalizer(ring, (*perfEventRing).Close)

return ring, nil
return fd, ring, nil
}

// perfBufferSize returns a valid mmap buffer size for use with perf_event_open (1+2^n pages)
Expand All @@ -88,17 +92,14 @@ func perfBufferSize(perCPUBuffer int) int {
return nPages * pageSize
}

func (ring *perfEventRing) Close() {
func (ring *perfEventRing) Close() error {
runtime.SetFinalizer(ring, nil)

_ = unix.Close(ring.fd)
_ = unix.Munmap(ring.mmap)

ring.fd = -1
mmap := ring.mmap
ring.mmap = nil
return unix.Munmap(mmap)
}

func createPerfEvent(cpu int, opts ReaderOptions) (int, error) {
func createPerfEvent(cpu int, opts ReaderOptions) (*sys.FD, error) {
wakeup := 0
bits := 0
if opts.WakeupEvents > 0 {
Expand Down Expand Up @@ -126,9 +127,9 @@ func createPerfEvent(cpu int, opts ReaderOptions) (int, error) {
attr.Size = uint32(unsafe.Sizeof(attr))
fd, err := unix.PerfEventOpen(&attr, -1, cpu, -1, unix.PERF_FLAG_FD_CLOEXEC)
if err != nil {
return -1, fmt.Errorf("can't create perf event: %w", err)
return nil, fmt.Errorf("can't create perf event: %w", err)
}
return fd, nil
return sys.NewFD(fd)
}

type ringReader interface {
Expand Down
12 changes: 7 additions & 5 deletions perf/ring_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,10 +131,12 @@ func makeForwardRing(size, offset int) *forwardReader {

func TestPerfEventRing(t *testing.T) {
check := func(buffer, watermark int, overwritable bool) {
ring, err := newPerfEventRing(0, buffer, ReaderOptions{Watermark: watermark, Overwritable: overwritable})
event, ring, err := newPerfEventRing(0, buffer, ReaderOptions{Watermark: watermark, Overwritable: overwritable})
if err != nil {
t.Fatal(err)
}
defer event.Close()
defer ring.Close()

size := ring.size()

Expand All @@ -154,21 +156,21 @@ func TestPerfEventRing(t *testing.T) {
}

// watermark > buffer
_, err := newPerfEventRing(0, 8192, ReaderOptions{Watermark: 8193, Overwritable: false})
_, _, err := newPerfEventRing(0, 8192, ReaderOptions{Watermark: 8193, Overwritable: false})
if err == nil {
t.Fatal("watermark > buffer allowed")
}
_, err = newPerfEventRing(0, 8192, ReaderOptions{Watermark: 8193, Overwritable: true})
_, _, err = newPerfEventRing(0, 8192, ReaderOptions{Watermark: 8193, Overwritable: true})
if err == nil {
t.Fatal("watermark > buffer allowed")
}

// watermark == buffer
_, err = newPerfEventRing(0, 8192, ReaderOptions{Watermark: 8192, Overwritable: false})
_, _, err = newPerfEventRing(0, 8192, ReaderOptions{Watermark: 8192, Overwritable: false})
if err == nil {
t.Fatal("watermark == buffer allowed")
}
_, err = newPerfEventRing(0, 8192, ReaderOptions{Watermark: 8193, Overwritable: true})
_, _, err = newPerfEventRing(0, 8192, ReaderOptions{Watermark: 8193, Overwritable: true})
if err == nil {
t.Fatal("watermark == buffer allowed")
}
Expand Down