Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
66 changes: 34 additions & 32 deletions lib/events/athena/querier_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -998,42 +998,44 @@ func Test_querier_streamEventsFromChunk(t *testing.T) {
},
}
for _, tt := range tests {
// add event parquets (in one .parquet file) to mock S3 getter
payloads, err := auditEventsToParquet(tt.events)
require.NoError(t, err)
t.Run(tt.name, func(t *testing.T) {
// add event parquets (in one .parquet file) to mock S3 getter
payloads, err := auditEventsToParquet(tt.events)
require.NoError(t, err)

buf := new(bytes.Buffer)
writer := parquet.NewGenericWriter[eventParquet](buf)
_, err = writer.Write(payloads)
require.NoError(t, err)
require.NoError(t, writer.Close())
buf := new(bytes.Buffer)
writer := parquet.NewGenericWriter[eventParquet](buf)
_, err = writer.Write(payloads)
require.NoError(t, err)
require.NoError(t, writer.Close())

key := fmt.Sprintf("%s/%s/%s.parquet", prefix, date, chunkID)
file := filepath.Join(bucketName, key)
mockS3 := &mockS3Getter{
files: map[string][]byte{
file: buf.Bytes(),
},
}
key := fmt.Sprintf("%s/%s/%s.parquet", prefix, date, chunkID)
file := filepath.Join(bucketName, key)
mockS3 := &mockS3Getter{
files: map[string][]byte{
file: buf.Bytes(),
},
}

q := &querier{
querierConfig: querierConfig{
tablename: tableName,
locationS3Bucket: bucketName,
locationS3Prefix: prefix,
logger: slog.Default(),
tracer: tracing.NoopTracer(teleport.ComponentAthena),
},
s3Getter: mockS3,
}
q := &querier{
querierConfig: querierConfig{
tablename: tableName,
locationS3Bucket: bucketName,
locationS3Prefix: prefix,
logger: slog.Default(),
tracer: tracing.NoopTracer(teleport.ComponentAthena),
},
s3Getter: mockS3,
}

eventStream := q.streamEventsFromChunk(t.Context(), date, chunkID)
eventParquets, err := stream.Collect(eventStream)
require.NoError(t, err)
require.Len(t, eventParquets, len(payloads))
for i, e := range eventParquets {
require.Equal(t, payloads[i].UID, e.UID)
}
eventStream := q.streamEventsFromChunk(t.Context(), date, chunkID)
eventParquets, err := stream.Collect(eventStream)
require.NoError(t, err)
require.Len(t, eventParquets, len(payloads))
for i, e := range eventParquets {
require.Equal(t, payloads[i].UID, e.UID)
}
})
}
}

Expand Down
105 changes: 50 additions & 55 deletions lib/events/dynamoevents/dynamoevents.go
Original file line number Diff line number Diff line change
Expand Up @@ -266,6 +266,15 @@ type event struct {
EventNamespace string
}

// toIterator marshals an event's EventKey, to be used in checkpointKey.
func (e *event) toIterator() (string, error) {
b, err := json.Marshal(e.EventKey)
if err != nil {
return "", trace.Wrap(err)
}
return string(b), nil
}

const (
// keyExpires is a key used for TTL specification
keyExpires = "Expires"
Expand Down Expand Up @@ -742,7 +751,8 @@ type checkpointKey struct {

// EventKey is a derived identifier for an event used for resuming
// sub-page breaks due to size constraints.
// TODO(hugoShaka): Deprecate and remove this field.
// Deprecated: only here to support backwards compatibility for old checkpoints.
// New checkpoints set sub-page breaks via Iterator instead.
EventKey string `json:"event_key,omitempty"`
}

Expand Down Expand Up @@ -926,8 +936,6 @@ func (l *Log) searchEventsRaw(ctx context.Context, fromUTC, toUTC time.Time, nam
}
}

foundStart := checkpoint.EventKey == ""

var forward bool
switch order {
case types.EventOrderAscending:
Expand All @@ -952,7 +960,6 @@ func (l *Log) searchEventsRaw(ctx context.Context, fromUTC, toUTC time.Time, nam
log: logger,
totalSize: totalSize,
checkpoint: &checkpoint,
foundStart: foundStart,
dates: dates,
left: left,
fromUTC: fromUTC,
Expand Down Expand Up @@ -980,7 +987,7 @@ func (l *Log) searchEventsRaw(ctx context.Context, fromUTC, toUTC time.Time, nam
}

var lastKey []byte
if ef.hasLeft {
if checkpoint.Iterator != "" {
lastKey, err = json.Marshal(&checkpoint)
if err != nil {
return nil, "", trace.Wrap(err)
Expand Down Expand Up @@ -1086,6 +1093,7 @@ func getExprFilter(filter searchEventsFilter) *string {
return filterExpr
}

// Deprecated: only here to support backwards compatibility for old checkpoints using checkpointKey.EventKey.
func getSubPageCheckpoint(e *event) (string, error) {
data, err := utils.FastMarshal(e)
if err != nil {
Expand Down Expand Up @@ -1584,9 +1592,7 @@ type eventsFetcher struct {
api query

totalSize int
hasLeft bool
checkpoint *checkpointKey
foundStart bool
dates []string
left int32

Expand All @@ -1598,8 +1604,11 @@ type eventsFetcher struct {
filter searchEventsFilter
}

func (l *eventsFetcher) processQueryOutput(output *dynamodb.QueryOutput, hasLeftFun func() bool) ([]event, bool, error) {
oldIterator := l.checkpoint.Iterator
// processQueryOutput returns events from the DynamoDB query output.
// It stops if events.MaxEventBytesInResponse is reached, the query limit is reached, or there are no more events.
// If events.MaxEventBytesInResponse is reached or the query limit is reached,
// we create a new nonempty checkpoint iterator, indicating there are more events to fetch.
func (l *eventsFetcher) processQueryOutput(output *dynamodb.QueryOutput) ([]event, bool, error) {
l.checkpoint.Iterator = ""

if output.LastEvaluatedKey != nil {
Expand Down Expand Up @@ -1630,9 +1639,11 @@ func (l *eventsFetcher) processQueryOutput(output *dynamodb.QueryOutput, hasLeft
return nil, false, trace.Wrap(err)
}

// TODO(hugoShaka): Fix this. This code path has terrible performance
// and should be replaced by proper pagination.
if !l.foundStart {
// Support reading old checkpoints that are using checkpointKey.EventKey.
// This field was used to resume processing events after a sub-page break.
// After we begin processing from the correct event,
// new checkpoints will not set EventKey and use Iterator instead.
if l.checkpoint.EventKey != "" {
Comment on lines -1635 to +1646
Copy link
Contributor Author

@kshi36 kshi36 Jan 15, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since new checkpoints can resume directly from checkpoint.Iterator, we can change foundStart functionality to simply check if checkpoint.EventKey is populated. As such, we can remove foundStart

key, err := getSubPageCheckpoint(&e)
if err != nil {
return nil, false, trace.Wrap(err)
Expand All @@ -1641,50 +1652,49 @@ func (l *eventsFetcher) processQueryOutput(output *dynamodb.QueryOutput, hasLeft
if key != l.checkpoint.EventKey {
continue
}
l.foundStart = true

// Found the correct event, reset the EventKey so new checkpoints don't use it.
l.checkpoint.EventKey = ""
}
// Because this may break on non page boundaries an additional
// checkpoint is needed for sub-page breaks.
// Stop early when the fetcher's total size exceeds the response size limit.
if l.totalSize+len(data) >= events.MaxEventBytesInResponse {
key, err := getSubPageCheckpoint(&e)
if err != nil {
if err := l.saveCheckpointAtEvent(out[len(out)-1]); err != nil {
return nil, false, trace.Wrap(err)
}
l.log.DebugContext(context.Background(), "breaking up sub-page due to event size", "key", key)
l.checkpoint.EventKey = key

// We need to reset the iterator so we get the previous page again.
l.checkpoint.Iterator = oldIterator

// If we stopped because of the size limit, we know that at least one event has to be fetched from the
// current date and old iterator, so we must set it to true independently of the hasLeftFun or
// the new iterator being empty.
l.hasLeft = true

return out, true, nil
}
l.totalSize += len(data)
out = append(out, e)
l.left--
// Stop early if the query limit is reached.
if l.left == 0 {
hf := false
if hasLeftFun != nil {
hf = hasLeftFun()
if err := l.saveCheckpointAtEvent(out[len(out)-1]); err != nil {
return nil, false, trace.Wrap(err)
}
l.hasLeft = hf || l.checkpoint.Iterator != ""
l.checkpoint.EventKey = ""
l.log.DebugContext(context.Background(), "resetting checkpoint event-key due to full page", "has_left", l.hasLeft, "checkpoint", l.checkpoint)
return out, true, nil
}
}
return out, false, nil
}

// saveCheckpointAtEvent updates the checkpoint iterator at the given event.
// This overrides LastEvaluatedKey to resume future processing from this iterator.
func (l *eventsFetcher) saveCheckpointAtEvent(e event) error {
iterator, err := e.toIterator()
if err != nil {
return trace.Wrap(err)
}

l.checkpoint.Iterator = iterator
l.log.DebugContext(context.Background(), "sub-page break, saving new checkpoint", "iterator", iterator)
return nil
}

func (l *eventsFetcher) QueryByDateIndex(ctx context.Context, filterExpr *string) (values []event, err error) {
query := "CreatedAtDate = :date AND CreatedAt BETWEEN :start and :end"

dateLoop:
for i, date := range l.dates {
for _, date := range l.dates {
l.checkpoint.Date = date

attributes := map[string]interface{}{
Expand Down Expand Up @@ -1738,29 +1748,17 @@ dateLoop:
"iterator", l.checkpoint.Iterator,
)

hasLeft := func() bool {
return i+1 != len(l.dates)
}
result, limitReached, err := l.processQueryOutput(out, hasLeft)
result, limitReached, err := l.processQueryOutput(out)
if err != nil {
return nil, trace.Wrap(err)
}
values = append(values, result...)

// Return all currently processed events.
if limitReached {
// If we've reached the limit, we need to determine whether there are more events to fetch from the current date
// or if we need to move the cursor to the next date.
// To do this, we check if the iterator is empty and if the EventKey is empty.
// DynamoDB returns an empty iterator if all events from the current date have been consumed.
// We need to check if the EventKey is empty because it indicates that we left the page midway
// due to reaching the maximum response size. In this case, we need to resume the query
// from the same date and the request's iterator to fetch the remainder of the page.
// If the input iterator is empty but the EventKey is not, we need to resume the query from the same date
// and we shouldn't move to the next date.
if i < len(l.dates)-1 && l.checkpoint.Iterator == "" && l.checkpoint.EventKey == "" {
l.checkpoint.Date = l.dates[i+1]
}
return values, nil
}
// An empty iterator indicates that there are no more events to fetch from the current date.
if l.checkpoint.Iterator == "" {
continue dateLoop
}
Expand Down Expand Up @@ -1819,13 +1817,10 @@ func (l *eventsFetcher) QueryBySessionIDIndex(ctx context.Context, sessionID str
"iterator", l.checkpoint.Iterator,
)

result, limitReached, err := l.processQueryOutput(out, nil)
result, _, err := l.processQueryOutput(out)
if err != nil {
return nil, trace.Wrap(err)
}
values = append(values, result...)
if limitReached {
return values, nil
}
return values, nil
}
Loading
Loading