Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 14 additions & 14 deletions ringbuf/ring.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,8 @@ func newRingBufEventRing(mapFD, size int) (*ringbufEventRing, error) {
return nil, fmt.Errorf("can't mmap data pages: %w", err)
}

cons_pos := (*uint64)(unsafe.Pointer(&cons[0]))
prod_pos := (*uint64)(unsafe.Pointer(&prod[0]))
cons_pos := (*uintptr)(unsafe.Pointer(&cons[0]))
prod_pos := (*uintptr)(unsafe.Pointer(&prod[0]))

ring := &ringbufEventRing{
prod: prod,
Expand All @@ -58,17 +58,17 @@ func (ring *ringbufEventRing) Close() {

type ringReader struct {
// These point into mmap'ed memory and must be accessed atomically.
prod_pos, cons_pos *uint64
mask uint64
prod_pos, cons_pos *uintptr
mask uintptr
ring []byte
}

func newRingReader(cons_ptr, prod_ptr *uint64, ring []byte) *ringReader {
func newRingReader(cons_ptr, prod_ptr *uintptr, ring []byte) *ringReader {
return &ringReader{
prod_pos: prod_ptr,
cons_pos: cons_ptr,
// cap is always a power of two
mask: uint64(cap(ring)/2 - 1),
mask: uintptr(cap(ring)/2 - 1),
ring: ring,
}
}
Expand All @@ -82,15 +82,15 @@ func (rr *ringReader) size() int {

// The amount of data available to read in the ring buffer.
func (rr *ringReader) AvailableBytes() uint64 {
prod := atomic.LoadUint64(rr.prod_pos)
cons := atomic.LoadUint64(rr.cons_pos)
return prod - cons
prod := atomic.LoadUintptr(rr.prod_pos)
cons := atomic.LoadUintptr(rr.cons_pos)
return uint64(prod - cons)
}

// Read a record from an event ring.
func (rr *ringReader) readRecord(rec *Record) error {
prod := atomic.LoadUint64(rr.prod_pos)
cons := atomic.LoadUint64(rr.cons_pos)
prod := atomic.LoadUintptr(rr.prod_pos)
cons := atomic.LoadUintptr(rr.cons_pos)

for {
if remaining := prod - cons; remaining == 0 {
Expand All @@ -117,7 +117,7 @@ func (rr *ringReader) readRecord(rec *Record) error {
cons += sys.BPF_RINGBUF_HDR_SZ

// Data is always padded to 8 byte alignment.
dataLenAligned := uint64(internal.Align(header.dataLen(), 8))
dataLenAligned := uintptr(internal.Align(header.dataLen(), 8))
if remaining := prod - cons; remaining < dataLenAligned {
return fmt.Errorf("read sample data: %w", io.ErrUnexpectedEOF)
}
Expand All @@ -129,7 +129,7 @@ func (rr *ringReader) readRecord(rec *Record) error {
// when the record header indicates that the data should be
// discarded, we skip it by just updating the consumer position
// to the next record.
atomic.StoreUint64(rr.cons_pos, cons)
atomic.StoreUintptr(rr.cons_pos, cons)
continue
}

Expand All @@ -141,7 +141,7 @@ func (rr *ringReader) readRecord(rec *Record) error {

copy(rec.RawSample, rr.ring[start:])
rec.Remaining = int(prod - cons)
atomic.StoreUint64(rr.cons_pos, cons)
atomic.StoreUintptr(rr.cons_pos, cons)
return nil
}
}
Loading