diff --git a/perf/reader.go b/perf/reader.go index 22548c0d8..272e56f9e 100644 --- a/perf/reader.go +++ b/perf/reader.go @@ -13,6 +13,7 @@ import ( "github.com/cilium/ebpf" "github.com/cilium/ebpf/internal" "github.com/cilium/ebpf/internal/epoll" + "github.com/cilium/ebpf/internal/sys" "github.com/cilium/ebpf/internal/unix" ) @@ -154,11 +155,10 @@ type Reader struct { epollRings []*perfEventRing eventHeader []byte - // pauseFds are a copy of the fds in 'rings', protected by 'pauseMu'. - // These allow Pause/Resume to be executed independently of any ongoing - // Read calls, which would otherwise need to be interrupted. + // pauseMu protects eventFds so that Pause / Resume can be invoked while + // Read is blocked. pauseMu sync.Mutex - pauseFds []int + eventFds []*sys.FD paused bool overwritable bool @@ -193,6 +193,12 @@ func NewReader(array *ebpf.Map, perCPUBuffer int) (*Reader, error) { // NewReaderWithOptions creates a new reader with the given options. func NewReaderWithOptions(array *ebpf.Map, perCPUBuffer int, opts ReaderOptions) (pr *Reader, err error) { + closeOnError := func(c io.Closer) { + if err != nil { + c.Close() + } + } + if perCPUBuffer < 1 { return nil, errors.New("perCPUBuffer must be larger than 0") } @@ -201,53 +207,41 @@ func NewReaderWithOptions(array *ebpf.Map, perCPUBuffer int, opts ReaderOptions) } var ( - fds []int nCPU = int(array.MaxEntries()) rings = make([]*perfEventRing, 0, nCPU) - pauseFds = make([]int, 0, nCPU) + eventFds = make([]*sys.FD, 0, nCPU) ) poller, err := epoll.New() if err != nil { return nil, err } - - defer func() { - if err != nil { - poller.Close() - for _, fd := range fds { - unix.Close(fd) - } - for _, ring := range rings { - if ring != nil { - ring.Close() - } - } - } - }() + defer closeOnError(poller) // bpf_perf_event_output checks which CPU an event is enabled on, // but doesn't allow using a wildcard like -1 to specify "all CPUs". // Hence we have to create a ring for each CPU. bufferSize := 0 for i := 0; i < nCPU; i++ { - ring, err := newPerfEventRing(i, perCPUBuffer, opts) + event, ring, err := newPerfEventRing(i, perCPUBuffer, opts) if errors.Is(err, unix.ENODEV) { // The requested CPU is currently offline, skip it. rings = append(rings, nil) - pauseFds = append(pauseFds, -1) + eventFds = append(eventFds, nil) continue } if err != nil { return nil, fmt.Errorf("failed to create perf ring for CPU %d: %v", i, err) } + defer closeOnError(event) + defer closeOnError(ring) bufferSize = ring.size() rings = append(rings, ring) - pauseFds = append(pauseFds, ring.fd) + eventFds = append(eventFds, event) - if err := poller.Add(ring.fd, i); err != nil { + if err := poller.Add(event.Int(), i); err != nil { return nil, err } } @@ -265,7 +259,7 @@ func NewReaderWithOptions(array *ebpf.Map, perCPUBuffer int, opts ReaderOptions) epollEvents: make([]unix.EpollEvent, len(rings)), epollRings: make([]*perfEventRing, 0, len(rings)), eventHeader: make([]byte, perfEventHeaderSize), - pauseFds: pauseFds, + eventFds: eventFds, overwritable: opts.Overwritable, bufferSize: bufferSize, } @@ -291,17 +285,25 @@ func (pr *Reader) Close() error { } // Trying to poll will now fail, so Read() can't block anymore. Acquire the - // lock so that we can clean up. + // locks so that we can clean up. pr.mu.Lock() defer pr.mu.Unlock() + pr.pauseMu.Lock() + defer pr.pauseMu.Unlock() + for _, ring := range pr.rings { if ring != nil { ring.Close() } } + for _, event := range pr.eventFds { + if event != nil { + event.Close() + } + } pr.rings = nil - pr.pauseFds = nil + pr.eventFds = nil pr.array.Close() return nil @@ -406,11 +408,11 @@ func (pr *Reader) Pause() error { pr.pauseMu.Lock() defer pr.pauseMu.Unlock() - if pr.pauseFds == nil { + if pr.eventFds == nil { return fmt.Errorf("%w", ErrClosed) } - for i := range pr.pauseFds { + for i := range pr.eventFds { if err := pr.array.Delete(uint32(i)); err != nil && !errors.Is(err, ebpf.ErrKeyNotExist) { return fmt.Errorf("could't delete event fd for CPU %d: %w", i, err) } @@ -428,16 +430,16 @@ func (pr *Reader) Resume() error { pr.pauseMu.Lock() defer pr.pauseMu.Unlock() - if pr.pauseFds == nil { + if pr.eventFds == nil { return fmt.Errorf("%w", ErrClosed) } - for i, fd := range pr.pauseFds { - if fd == -1 { + for i, fd := range pr.eventFds { + if fd == nil { continue } - if err := pr.array.Put(uint32(i), uint32(fd)); err != nil { + if err := pr.array.Put(uint32(i), fd.Uint()); err != nil { return fmt.Errorf("couldn't put event fd %d for CPU %d: %w", fd, i, err) } } diff --git a/perf/reader_test.go b/perf/reader_test.go index c348c4eb9..0baeef042 100644 --- a/perf/reader_test.go +++ b/perf/reader_test.go @@ -16,7 +16,6 @@ import ( "github.com/cilium/ebpf/internal" "github.com/cilium/ebpf/internal/testutils" "github.com/cilium/ebpf/internal/testutils/fdtrace" - "github.com/cilium/ebpf/internal/unix" "github.com/go-quicktest/qt" ) @@ -386,7 +385,7 @@ func TestCreatePerfEvent(t *testing.T) { if err != nil { t.Fatal("Can't create perf event:", err) } - unix.Close(fd) + fd.Close() } func TestReadRecord(t *testing.T) { diff --git a/perf/ring.go b/perf/ring.go index 9bd959263..63555f323 100644 --- a/perf/ring.go +++ b/perf/ring.go @@ -10,31 +10,37 @@ import ( "sync/atomic" "unsafe" + "github.com/cilium/ebpf/internal/sys" "github.com/cilium/ebpf/internal/unix" ) // perfEventRing is a page of metadata followed by // a variable number of pages which form a ring buffer. type perfEventRing struct { - fd int cpu int mmap []byte ringReader } -func newPerfEventRing(cpu, perCPUBuffer int, opts ReaderOptions) (*perfEventRing, error) { +func newPerfEventRing(cpu, perCPUBuffer int, opts ReaderOptions) (_ *sys.FD, _ *perfEventRing, err error) { + closeOnError := func(c io.Closer) { + if err != nil { + c.Close() + } + } + if opts.Watermark >= perCPUBuffer { - return nil, errors.New("watermark must be smaller than perCPUBuffer") + return nil, nil, errors.New("watermark must be smaller than perCPUBuffer") } fd, err := createPerfEvent(cpu, opts) if err != nil { - return nil, err + return nil, nil, err } + defer closeOnError(fd) - if err := unix.SetNonblock(fd, true); err != nil { - unix.Close(fd) - return nil, err + if err := unix.SetNonblock(fd.Int(), true); err != nil { + return nil, nil, err } protections := unix.PROT_READ @@ -42,10 +48,9 @@ func newPerfEventRing(cpu, perCPUBuffer int, opts ReaderOptions) (*perfEventRing protections |= unix.PROT_WRITE } - mmap, err := unix.Mmap(fd, 0, perfBufferSize(perCPUBuffer), protections, unix.MAP_SHARED) + mmap, err := unix.Mmap(fd.Int(), 0, perfBufferSize(perCPUBuffer), protections, unix.MAP_SHARED) if err != nil { - unix.Close(fd) - return nil, fmt.Errorf("can't mmap: %v", err) + return nil, nil, fmt.Errorf("can't mmap: %v", err) } // This relies on the fact that we allocate an extra metadata page, @@ -62,14 +67,13 @@ func newPerfEventRing(cpu, perCPUBuffer int, opts ReaderOptions) (*perfEventRing } ring := &perfEventRing{ - fd: fd, cpu: cpu, mmap: mmap, ringReader: reader, } runtime.SetFinalizer(ring, (*perfEventRing).Close) - return ring, nil + return fd, ring, nil } // perfBufferSize returns a valid mmap buffer size for use with perf_event_open (1+2^n pages) @@ -88,17 +92,14 @@ func perfBufferSize(perCPUBuffer int) int { return nPages * pageSize } -func (ring *perfEventRing) Close() { +func (ring *perfEventRing) Close() error { runtime.SetFinalizer(ring, nil) - - _ = unix.Close(ring.fd) - _ = unix.Munmap(ring.mmap) - - ring.fd = -1 + mmap := ring.mmap ring.mmap = nil + return unix.Munmap(mmap) } -func createPerfEvent(cpu int, opts ReaderOptions) (int, error) { +func createPerfEvent(cpu int, opts ReaderOptions) (*sys.FD, error) { wakeup := 0 bits := 0 if opts.WakeupEvents > 0 { @@ -126,9 +127,9 @@ func createPerfEvent(cpu int, opts ReaderOptions) (int, error) { attr.Size = uint32(unsafe.Sizeof(attr)) fd, err := unix.PerfEventOpen(&attr, -1, cpu, -1, unix.PERF_FLAG_FD_CLOEXEC) if err != nil { - return -1, fmt.Errorf("can't create perf event: %w", err) + return nil, fmt.Errorf("can't create perf event: %w", err) } - return fd, nil + return sys.NewFD(fd) } type ringReader interface { diff --git a/perf/ring_test.go b/perf/ring_test.go index 9bca2f98c..fa9e70c29 100644 --- a/perf/ring_test.go +++ b/perf/ring_test.go @@ -131,10 +131,12 @@ func makeForwardRing(size, offset int) *forwardReader { func TestPerfEventRing(t *testing.T) { check := func(buffer, watermark int, overwritable bool) { - ring, err := newPerfEventRing(0, buffer, ReaderOptions{Watermark: watermark, Overwritable: overwritable}) + event, ring, err := newPerfEventRing(0, buffer, ReaderOptions{Watermark: watermark, Overwritable: overwritable}) if err != nil { t.Fatal(err) } + defer event.Close() + defer ring.Close() size := ring.size() @@ -154,21 +156,21 @@ func TestPerfEventRing(t *testing.T) { } // watermark > buffer - _, err := newPerfEventRing(0, 8192, ReaderOptions{Watermark: 8193, Overwritable: false}) + _, _, err := newPerfEventRing(0, 8192, ReaderOptions{Watermark: 8193, Overwritable: false}) if err == nil { t.Fatal("watermark > buffer allowed") } - _, err = newPerfEventRing(0, 8192, ReaderOptions{Watermark: 8193, Overwritable: true}) + _, _, err = newPerfEventRing(0, 8192, ReaderOptions{Watermark: 8193, Overwritable: true}) if err == nil { t.Fatal("watermark > buffer allowed") } // watermark == buffer - _, err = newPerfEventRing(0, 8192, ReaderOptions{Watermark: 8192, Overwritable: false}) + _, _, err = newPerfEventRing(0, 8192, ReaderOptions{Watermark: 8192, Overwritable: false}) if err == nil { t.Fatal("watermark == buffer allowed") } - _, err = newPerfEventRing(0, 8192, ReaderOptions{Watermark: 8193, Overwritable: true}) + _, _, err = newPerfEventRing(0, 8192, ReaderOptions{Watermark: 8193, Overwritable: true}) if err == nil { t.Fatal("watermark == buffer allowed") }