Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,7 @@ public static StreamReadConstraintsUtil getConfiguredStreamReadConstraints() {

public static final ObjectMapper CBOR_MAPPER = new ObjectMapper(
new CBORFactory().configure(CBORGenerator.Feature.WRITE_MINIMAL_INTS, false)
.configure(CBORGenerator.Feature.STRINGREF, true)
).registerModules(RUBY_SERIALIZERS, CBOR_DESERIALIZERS)
.activateDefaultTyping(TYPE_VALIDATOR, ObjectMapper.DefaultTyping.NON_FINAL);

Expand Down
37 changes: 37 additions & 0 deletions logstash-core/src/test/java/org/logstash/EventTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,13 +27,16 @@
import org.junit.Test;

import java.io.IOException;
import java.io.InputStream;
import java.math.BigDecimal;
import java.math.BigInteger;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HexFormat;
import java.util.List;
import java.util.Map;

Expand All @@ -42,6 +45,7 @@
import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;

Expand Down Expand Up @@ -143,6 +147,24 @@ public void bigNumsBinaryRoundtrip() throws Exception {
assertEquals(bd, deserialized.getField("bd"));
}

@Test
public void deserializeStringrefExtensionEnabled() throws Exception {
byte[] stringrefCBOR = loadAnnotatedCBORFixture("stringref-enabled.annotated-cbor.txt");
Event event = Event.deserialize(stringrefCBOR);
event.getField("[event][original]");
assertEquals("stringref", event.getField("test"));
assertEquals(true, event.getField("[extension][enabled]"));
}

@Test
public void deserializeStringrefExtensionDisabled() throws Exception {
byte[] stringrefCBOR = loadAnnotatedCBORFixture("stringref-disabled.annotated-cbor.txt");
Event event = Event.deserialize(stringrefCBOR);
event.getField("[event][original]");
assertEquals("stringref", event.getField("test"));
assertEquals(true, event.getField("[extension][enabled]"));
}

@Test
public void testBareToJson() throws Exception {
Event e = new Event();
Expand Down Expand Up @@ -565,4 +587,19 @@ public void allowTopLevelTagsListOfStrings() {
assertNull(event.getField(Event.TAGS_FAILURE));
assertEquals(event.getField("[tags]"), List.of("foo", "bar"));
}

static byte[] loadAnnotatedCBORFixture(String name) throws IOException {
try (InputStream resourceAsStream = EventTest.class.getResourceAsStream(name)) {
assertNotNull(resourceAsStream);

String annotated = new String(resourceAsStream.readAllBytes(), StandardCharsets.UTF_8);
// annotated CBOR: strip #-initiated line comments, then strip whitespace to get hex
String hexBytes = annotated.replaceAll("#.*(\\n|$)", "").replaceAll("\\s", "");

// result should be even number of hex digits
assert hexBytes.matches("(?i:[0-9a-f]{2})*");

return HexFormat.of().parseHex(hexBytes);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ public class DeadLetterQueueReaderTest {
private Path dir;
private int defaultDlqSize = 100_000_000; // 100mb

private static final int PAD_FOR_BLOCK_SIZE_EVENT = 32490;
private static final int PAD_FOR_BLOCK_SIZE_EVENT = 32511;

@Rule
public TemporaryFolder temporaryFolder = new TemporaryFolder();
Expand Down Expand Up @@ -240,7 +240,7 @@ private void validateEntries(Path firstLog, int startEntry, int endEntry, int st
// This test tests for a single event that ends on a block boundary
@Test
public void testBlockBoundary() throws Exception {
final int PAD_FOR_BLOCK_SIZE_EVENT = 32490;
final int PAD_FOR_BLOCK_SIZE_EVENT = 32511;
Event event = createEventWithConstantSerializationOverhead();
char[] field = new char[PAD_FOR_BLOCK_SIZE_EVENT];
Arrays.fill(field, 'e');
Expand All @@ -267,7 +267,7 @@ public void testBlockBoundary() throws Exception {
@Test
public void testBlockBoundaryMultiple() throws Exception {
Event event = createEventWithConstantSerializationOverhead();
char[] field = new char[7929];
char[] field = new char[7950];
Arrays.fill(field, 'x');
event.setField("message", new String(field));
long startTime = System.currentTimeMillis();
Expand Down Expand Up @@ -935,7 +935,7 @@ private int prepareFilledSegmentFiles(int segments) throws IOException {

private int prepareFilledSegmentFiles(int segments, long start) throws IOException {
final Event event = createEventWithConstantSerializationOverhead(Collections.emptyMap());
event.setField("message", generateMessageContent(32479));
event.setField("message", generateMessageContent(32500));

DLQEntry entry = new DLQEntry(event, "", "", String.format("%05d", 1), constantSerializationLengthTimestamp(start));
assertEquals("Serialized dlq entry + header MUST be 32Kb (size of a block)", BLOCK_SIZE, entry.serialize().length + 13);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ public void setUp() throws Exception {
@Test
public void testRemovesOlderSegmentsWhenWriteOnReopenedDLQContainingExpiredSegments() throws IOException {
final Event event = DeadLetterQueueReaderTest.createEventWithConstantSerializationOverhead(Collections.emptyMap());
event.setField("message", DeadLetterQueueReaderTest.generateMessageContent(32479));
event.setField("message", DeadLetterQueueReaderTest.generateMessageContent(32500));

final Clock pointInTimeFixedClock = Clock.fixed(Instant.parse("2022-02-22T10:20:30.00Z"), ZoneId.of("Europe/Rome"));
final ForwardableClock fakeClock = new ForwardableClock(pointInTimeFixedClock);
Expand Down Expand Up @@ -160,7 +160,7 @@ private void prepareDLQWithFirstSegmentOlderThanRetainPeriod(Event event, Forwar
@Test
public void testRemovesOlderSegmentsWhenWritesIntoDLQContainingExpiredSegments() throws IOException {
final Event event = DeadLetterQueueReaderTest.createEventWithConstantSerializationOverhead(Collections.emptyMap());
event.setField("message", DeadLetterQueueReaderTest.generateMessageContent(32479));
event.setField("message", DeadLetterQueueReaderTest.generateMessageContent(32500));

long startTime = fakeClock.instant().toEpochMilli();
int messageSize = 0;
Expand Down Expand Up @@ -203,7 +203,7 @@ public void testRemovesOlderSegmentsWhenWritesIntoDLQContainingExpiredSegments()
@Test
public void testRemoveMultipleOldestSegmentsWhenRetainedAgeIsExceeded() throws IOException {
final Event event = DeadLetterQueueReaderTest.createEventWithConstantSerializationOverhead(Collections.emptyMap());
event.setField("message", DeadLetterQueueReaderTest.generateMessageContent(32479));
event.setField("message", DeadLetterQueueReaderTest.generateMessageContent(32500));

long startTime = fakeClock.instant().toEpochMilli();
int messageSize = 0;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -251,7 +251,7 @@ static String generateASCIIMessageContent(int size, byte fillChar) {
@Test
public void testRemoveOldestSegmentWhenRetainedSizeIsExceededAndDropOlderModeIsEnabled() throws IOException {
Event event = DeadLetterQueueReaderTest.createEventWithConstantSerializationOverhead(Collections.emptyMap());
event.setField("message", DeadLetterQueueReaderTest.generateMessageContent(32479));
event.setField("message", DeadLetterQueueReaderTest.generateMessageContent(32500));
long startTime = System.currentTimeMillis();

int messageSize = 0;
Expand Down Expand Up @@ -488,7 +488,7 @@ public void testReadTimestampOfLastEventInSegmentWithDeletedSegment() throws IOE
@Test
public void testDropEventCountCorrectlyNotEnqueuedEvents() throws IOException, InterruptedException {
Event blockAlmostFullEvent = DeadLetterQueueReaderTest.createEventWithConstantSerializationOverhead(Collections.emptyMap());
int serializationHeader = 286;
int serializationHeader = 265;
int notEnoughHeaderSpace = 5;
blockAlmostFullEvent.setField("message", DeadLetterQueueReaderTest.generateMessageContent(BLOCK_SIZE - serializationHeader - RECORD_HEADER_SIZE + notEnoughHeaderSpace));

Expand All @@ -501,7 +501,7 @@ public void testDropEventCountCorrectlyNotEnqueuedEvents() throws IOException, I
// enqueue a record with size smaller than BLOCK_SIZE
DLQEntry entry = new DLQEntry(blockAlmostFullEvent, "", "", "00001", DeadLetterQueueReaderTest.constantSerializationLengthTimestamp(System.currentTimeMillis()));
assertEquals("Serialized plus header must not leave enough space for another record header ",
entry.serialize().length, BLOCK_SIZE - RECORD_HEADER_SIZE - notEnoughHeaderSpace);
BLOCK_SIZE - RECORD_HEADER_SIZE - notEnoughHeaderSpace, entry.serialize().length);
writeManager.writeEntry(entry);

// enqueue a record bigger than BLOCK_SIZE
Expand All @@ -512,7 +512,7 @@ public void testDropEventCountCorrectlyNotEnqueuedEvents() throws IOException, I

// fill the queue to push out the segment with the 2 previous events
Event event = DeadLetterQueueReaderTest.createEventWithConstantSerializationOverhead(Collections.emptyMap());
event.setField("message", DeadLetterQueueReaderTest.generateMessageContent(32479));
event.setField("message", DeadLetterQueueReaderTest.generateMessageContent(32500));
try (DeadLetterQueueWriter writeManager = DeadLetterQueueWriter
.newBuilder(dir, 10 * MB, 20 * MB, Duration.ofSeconds(1))
.storageType(QueueStorageType.DROP_NEWER)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
9f # array(*)
71 # text(17)
6a6176612e7574696c2e486173684d6170 # "java.util.HashMap"
bf # map(*)
64 # text(4)
44415441 # "DATA"
9f # array(*)
78 19 # text(25)
6f72672e6c6f6773746173682e436f6e # "org.logstash.Con"
7665727465644d6170 # "vertedMap"
bf # map(*)
64 # text(4)
686f7374 # "host"
9f # array(*)
78 19 # text(25)
6f72672e6c6f6773746173682e436f6e # "org.logstash.Con"
7665727465644d6170 # "vertedMap"
bf # map(*)
68 # text(8)
686f73746e616d65 # "hostname"
9f # array(*)
74 # text(20)
6f72672e6a727562792e52756279537472696e67 # "org.jruby.RubyString"
67 # text(7)
70657268617073 # "perhaps"
ff # break
ff # break
ff # break
65 # text(5)
6576656e74 # "event"
9f # array(*)
78 19 # text(25)
6f72672e6c6f6773746173682e436f6e # "org.logstash.Con"
7665727465644d6170 # "vertedMap"
bf # map(*)
68 # text(8)
6f726967696e616c # "original"
9f # array(*)
74 # text(20)
6f72672e6a727562792e52756279537472696e67 # "org.jruby.RubyString"
78 32 # text(50)
7b2274657374223a22737472696e6772 # "{\"test\":\"stringr"
6566222c22657874656e73696f6e223a # "ef\",\"extension\":"
7b22656e61626c6564223a747275657d # "{\"enabled\":true}"
7d0a # "}\n"
ff # break
ff # break
ff # break
68 # text(8)
4076657273696f6e # "@version"
61 # text(1)
31 # "1"
69 # text(9)
657874656e73696f6e # "extension"
9f # array(*)
78 19 # text(25)
6f72672e6c6f6773746173682e436f6e # "org.logstash.Con"
7665727465644d6170 # "vertedMap"
bf # map(*)
67 # text(7)
656e61626c6564 # "enabled"
f5 # true, simple(21)
ff # break
ff # break
6a # text(10)
4074696d657374616d70 # "@timestamp"
9f # array(*)
76 # text(22)
6f72672e6c6f6773746173682e54696d657374616d70 # "org.logstash.Timestamp"
78 1b # text(27)
323032352d30372d32385431363a3432 # "2025-07-28T16:42"
3a32342e3432313434365a # ":24.421446Z"
ff # break
64 # text(4)
74657374 # "test"
9f # array(*)
74 # text(20)
6f72672e6a727562792e52756279537472696e67 # "org.jruby.RubyString"
69 # text(9)
737472696e67726566 # "stringref"
ff # break
ff # break
ff # break
64 # text(4)
4d455441 # "META"
9f # array(*)
78 19 # text(25)
6f72672e6c6f6773746173682e436f6e # "org.logstash.Con"
7665727465644d6170 # "vertedMap"
bf # map(*)
ff # break
ff # break
ff # break
ff # break
Loading