diff --git a/src/jmh/java/com/datadoghq/sketch/ddsketch/DDSketchOption.java b/src/jmh/java/com/datadoghq/sketch/ddsketch/DDSketchOption.java index 948e9ea..5c910bd 100644 --- a/src/jmh/java/com/datadoghq/sketch/ddsketch/DDSketchOption.java +++ b/src/jmh/java/com/datadoghq/sketch/ddsketch/DDSketchOption.java @@ -1,11 +1,16 @@ package com.datadoghq.sketch.ddsketch; +import com.datadoghq.sketch.ddsketch.mapping.BitwiseLinearlyInterpolatedMapping; +import com.datadoghq.sketch.ddsketch.store.PaginatedStore; +import com.datadoghq.sketch.ddsketch.store.UnboundedSizeDenseStore; + import java.util.function.DoubleFunction; public enum DDSketchOption { - FAST(DDSketch::fast), - MEMORY_OPTIMAL(DDSketch::memoryOptimal), - BALANCED(DDSketch::balanced); + FAST(relativeAccuracy -> new DDSketch(new BitwiseLinearlyInterpolatedMapping(relativeAccuracy), UnboundedSizeDenseStore::new)), + MEMORY_OPTIMAL(DDSketches::logarithmicUnboundedDense), + BALANCED(DDSketches::unboundedDense), + PAGINATED(relativeAccuracy -> new DDSketch(new BitwiseLinearlyInterpolatedMapping(relativeAccuracy), PaginatedStore::new)); private final DoubleFunction creator; diff --git a/src/jmh/java/com/datadoghq/sketch/ddsketch/benchmarks/Merge.java b/src/jmh/java/com/datadoghq/sketch/ddsketch/benchmarks/Merge.java new file mode 100644 index 0000000..f63d442 --- /dev/null +++ b/src/jmh/java/com/datadoghq/sketch/ddsketch/benchmarks/Merge.java @@ -0,0 +1,50 @@ +package com.datadoghq.sketch.ddsketch.benchmarks; + +import com.datadoghq.sketch.ddsketch.DDSketch; +import com.datadoghq.sketch.ddsketch.DDSketchOption; +import com.datadoghq.sketch.ddsketch.DataGenerator; +import org.openjdk.jmh.annotations.*; + +import java.util.concurrent.TimeUnit; + +@State(Scope.Benchmark) +@OutputTimeUnit(TimeUnit.MICROSECONDS) +@BenchmarkMode(Mode.AverageTime) +public class Merge { + + @Param + DataGenerator generator; + + @Param({"NANOSECONDS", "MICROSECONDS", "MILLISECONDS"}) + TimeUnit unit; + + @Param + DDSketchOption sketchOption; + + @Param("100000") + int count; + + @Param({"0.01"}) + double relativeAccuracy; + + DDSketch left; + DDSketch right; + + @Setup(Level.Trial) + public void init() { + this.left = sketchOption.create(relativeAccuracy); + this.right = sketchOption.create(relativeAccuracy); + for (int i = 0; i < count; ++i) { + left.accept(unit.toNanos(Math.abs(Math.round(generator.nextValue())))); + right.accept(unit.toNanos(Math.abs(Math.round(generator.nextValue())))); + } + } + + @Benchmark + public Object merge() { + DDSketch target = sketchOption.create(relativeAccuracy); + target.mergeWith(left); + target.mergeWith(right); + return target; + } +} diff --git a/src/main/java/com/datadoghq/sketch/ddsketch/store/PaginatedStore.java b/src/main/java/com/datadoghq/sketch/ddsketch/store/PaginatedStore.java new file mode 100644 index 0000000..d8a5846 --- /dev/null +++ b/src/main/java/com/datadoghq/sketch/ddsketch/store/PaginatedStore.java @@ -0,0 +1,391 @@ +package com.datadoghq.sketch.ddsketch.store; + +import java.util.Arrays; +import java.util.Iterator; +import java.util.NoSuchElementException; + +/** + * This is an unbounded store which allocates storage for counts + * in aligned pages stored in an array at offsets modulo the page + * size. This means that if a distribution has several modes, the + * cost of the storage for the space in between the modes is that + * of a null pointer per page, requiring 4-8 bytes (depending on + * CompressedOops, whether ZGC is used, and heap size) for each + * 1KB page. + * + * On the contrary, if the data is uniformly distributed filling + * each page in a range [N, N + K * PAGE_SIZE) this store will + * require K * (20 + 4|8) extra space over + * {@code UnboundedSizeDenseStore}, because of the metadata + * overhead of the array headers and references to each page. + * + */ +public final class PaginatedStore implements Store { + + private static final int GROWTH = 8; + private static final int PAGE_SIZE = 128; + private static final int PAGE_MASK = PAGE_SIZE - 1; + private static final int PAGE_SHIFT = Integer.bitCount(PAGE_MASK); + + private double[][] pages = null; + private int minPageIndex; + + public PaginatedStore() { + this(Integer.MAX_VALUE); + } + + PaginatedStore(int minPageIndex) { + this.minPageIndex = minPageIndex; + } + + PaginatedStore(PaginatedStore store) { + this(store.minPageIndex); + this.pages = deepCopy(store.pages); + } + + @Override + public boolean isEmpty() { + // won't initialise any pages until a value is added, + // and values can't be removed. + return null == pages; + } + + @Override + public int getMinIndex() { + if (null != pages) { + for (int i = 0; i < pages.length; ++i) { + if (null != pages[i]) { + for (int j = 0; j < pages[i].length; ++j) { + if (pages[i][j] != 0D) { + return ((i + minPageIndex) << PAGE_SHIFT) + j; + } + } + } + } + } + throw new NoSuchElementException(); + } + + @Override + public int getMaxIndex() { + if (null != pages) { + for (int i = pages.length - 1; i >= 0; --i) { + if (null != pages[i]) { + for (int j = pages[i].length - 1; j >= 0; --j) { + if (pages[i][j] != 0D) { + return ((i + minPageIndex) << PAGE_SHIFT) + j; + } + } + } + } + } + throw new NoSuchElementException(); + } + + @Override + public double getTotalCount() { + if (isEmpty()) { + return 0D; + } + double total = 0D; + for (double[] page : pages) { + if (null != page) { + for (double count : page) { + total += count; + } + } + } + return total; + } + + @Override + public void add(int index, double count) { + if (count > 0) { + int alignedIndex = alignedIndex(index); + double[] page = getPage(alignedIndex >>> PAGE_SHIFT); + page[alignedIndex & PAGE_MASK] += count; + } + } + + private double[] getPage(int pageIndex) { + double[] page = pages[pageIndex]; + if (null == page) { + page = pages[pageIndex] = new double[PAGE_SIZE]; + } + return page; + } + + private int alignedIndex(int index) { + // get the index of the page this value should be stored in + int pageIndex = index < 0 + ? ~(-index >>> PAGE_SHIFT) + : index >>> PAGE_SHIFT; + if (pageIndex < minPageIndex) { + // then space needs to be made before the first page, + // unless this is the first insertion + if (null == pages) { + lazyInit(pageIndex); + } else { + shiftPagesRight(pageIndex); + } + } else if (pageIndex >= minPageIndex + pages.length - 1) { + // then space needs to be made after the last page + extendTo(pageIndex); + } + // align the index relative to the start of the sketch + return index + (-minPageIndex << PAGE_SHIFT); + } + + private void lazyInit(int pageIndex) { + minPageIndex = pageIndex; + pages = new double[GROWTH][]; + } + + private void shiftPagesRight(int pageIndex) { + int requiredExtension = minPageIndex - pageIndex; + if (requiredExtension > 0) { + // check if there is space to shift into + boolean canShiftRight = true; + // check if there are enough null slots at the end of the array to shift into + for (int i = 0; i < requiredExtension && canShiftRight && i < pages.length; ++i) { + canShiftRight = null == pages[pages.length - i - 1]; + } + if (canShiftRight) { + System.arraycopy(pages, 0, pages, requiredExtension, pages.length - requiredExtension); + } else { + double[][] newPages = new double[pages.length + aligned(requiredExtension)][]; + System.arraycopy(pages, 0, newPages, requiredExtension, pages.length); + this.pages = newPages; + } + Arrays.fill(pages, 0, requiredExtension, null); + this.minPageIndex = pageIndex; + } + } + + private void extendTo(int pageIndex) { + this.pages = Arrays.copyOf(pages, aligned(pageIndex - minPageIndex + 2)); + } + + @Override + public void mergeWith(Store store) { + if (store.isEmpty()) { + return; + } + if (store instanceof PaginatedStore) { + mergeWith((PaginatedStore) store); + } else { + store.getStream().forEach(this::add); + } + } + + private void mergeWith(PaginatedStore store) { + if (isEmpty()) { + this.pages = deepCopy(store.pages); + this.minPageIndex = store.minPageIndex; + } else { + int min = minPageIndex; + int max = minPageIndex + pages.length; + int storeMin = store.minPageIndex; + int storeMax = store.minPageIndex + store.pages.length; + if (max < storeMin) { + extendTo(storeMax); + for (int i = 0; i < store.pages.length; ++i) { + double[] page = store.pages[i]; + if (null != page) { + pages[i + storeMin - min] = Arrays.copyOf(page, page.length); + } + } + } else if (min > storeMax) { + shiftPagesRight(storeMin); + for (int i = 0; i < store.pages.length; ++i) { + double[] page = store.pages[i]; + if (null != page) { + pages[i] = Arrays.copyOf(page, page.length); + } + } + } else if (min < storeMin) { + if (storeMax > max) { + extendTo(storeMax); + } + for (int i = 0; i < store.pages.length; ++i) { + double[] page = store.pages[i]; + if (null != page) { + double[] target = pages[i + storeMin - min]; + if (null == target) { + pages[i + storeMin - min] = Arrays.copyOf(page, page.length); + } else { + for (int j = 0; j < page.length; ++j) { + target[j] += page[j]; + } + } + } + } + } else { + if (min > storeMin) { + shiftPagesRight(storeMin); + } + if (storeMax > max) { + extendTo(storeMax); + } + for (int i = 0; i < store.pages.length; ++i) { + double[] page = store.pages[i]; + if (null != page) { + double[] target = pages[i]; + if (null == target) { + pages[i] = Arrays.copyOf(page, page.length); + } else { + for (int j = 0; j < page.length; ++j) { + target[j] += page[j]; + } + } + } + } + } + } + } + + @Override + public Store copy() { + return new PaginatedStore(this); + } + + @Override + public Iterator getAscendingIterator() { + return new AscendingIterator(); + } + + @Override + public Iterator getDescendingIterator() { + return new DescendingIterator(); + } + + private static int aligned(int required) { + return (required + GROWTH - 1) & -GROWTH; + } + + private static double[][] deepCopy(double[][] pages) { + if (null != pages) { + double[][] copy = new double[pages.length][]; + for (int i = 0; i < pages.length; ++i) { + double[] page = pages[i]; + if (null != page) { + copy[i] = Arrays.copyOf(page, page.length); + } + } + return copy; + } + return null; + } + + private final class AscendingIterator implements Iterator { + + int pageIndex = 0; + int valueIndex = 0; + double[] page = null; + double next = Double.NaN; + + private AscendingIterator() { + if (null != pages) { + for (int i = 0; i < pages.length; ++i) { + if (pages[i] != null) { + page = pages[i]; + pageIndex = i; + next = nextInPage(); + break; + } + } + } + } + + @Override + public boolean hasNext() { + return !Double.isNaN(next); + } + + @Override + public Bin next() { + double value = next; + int index = ((pageIndex + minPageIndex) << PAGE_SHIFT) + valueIndex; + ++valueIndex; + next = nextInPage(); + if (Double.isNaN(next)) { + for (int i = pageIndex + 1; i < pages.length; ++i) { + if (pages[i] != null) { + page = pages[i]; + pageIndex = i; + valueIndex = 0; + next = nextInPage(); + break; + } + } + } + return new Bin(index, value); + } + + private double nextInPage() { + for (int i = valueIndex; i < page.length; ++i) { + if (page[i] != 0D) { + valueIndex = i; + return page[i]; + } + } + return Double.NaN; + } + } + + private final class DescendingIterator implements Iterator { + + int pageIndex = 0; + int valueIndex = PAGE_SIZE - 1; + double[] page = null; + double previous = Double.NaN; + + private DescendingIterator() { + if (null != pages) { + for (int i = pages.length - 1; i >= 0; --i) { + if (pages[i] != null) { + page = pages[i]; + pageIndex = i; + previous = previousInPage(); + break; + } + } + } + } + + @Override + public boolean hasNext() { + return !Double.isNaN(previous); + } + + @Override + public Bin next() { + double value = previous; + int index = ((pageIndex + minPageIndex) << PAGE_SHIFT) + valueIndex; + --valueIndex; + previous = previousInPage(); + if (Double.isNaN(previous)) { + for (int i = pageIndex - 1; i >= 0; --i) { + if (pages[i] != null) { + page = pages[i]; + pageIndex = i; + valueIndex = page.length - 1; + previous = previousInPage(); + break; + } + } + } + return new Bin(index, value); + } + + private double previousInPage() { + for (int i = valueIndex; i >= 0; --i) { + if (page[i] != 0D) { + valueIndex = i; + return page[i]; + } + } + return Double.NaN; + } + } +} diff --git a/src/test/java/com/datadoghq/sketch/ddsketch/footprint/FootprintTest.java b/src/test/java/com/datadoghq/sketch/ddsketch/footprint/FootprintTest.java index cb97bcf..3a330a6 100644 --- a/src/test/java/com/datadoghq/sketch/ddsketch/footprint/FootprintTest.java +++ b/src/test/java/com/datadoghq/sketch/ddsketch/footprint/FootprintTest.java @@ -1,6 +1,8 @@ package com.datadoghq.sketch.ddsketch.footprint; import com.datadoghq.sketch.ddsketch.DDSketch; +import com.datadoghq.sketch.ddsketch.mapping.BitwiseLinearlyInterpolatedMapping; +import com.datadoghq.sketch.ddsketch.store.PaginatedStore; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.Arguments; import org.junit.jupiter.params.provider.MethodSource; @@ -42,7 +44,9 @@ public static Stream parameters() { .composeWith(POISSON.of(0.99)), POISSON.of(0.001) .composeWith(POISSON.of(0.999)) - ).flatMap(dist -> Stream.of(DDSketch::balanced, DDSketch::memoryOptimal, (DoubleFunction) DDSketch::fast) + ).flatMap(dist -> Stream.of(DDSketch::balanced, DDSketch::memoryOptimal, + re -> new DDSketch(new BitwiseLinearlyInterpolatedMapping(re), PaginatedStore::new), + (DoubleFunction) DDSketch::fast) .flatMap(ctor -> Stream.of(NANOSECONDS, MICROSECONDS, MILLISECONDS) .map(timeUnit -> Arguments.of(timeUnit, dist, ctor)))); } diff --git a/src/test/java/com/datadoghq/sketch/ddsketch/store/PaginatedStoreTest.java b/src/test/java/com/datadoghq/sketch/ddsketch/store/PaginatedStoreTest.java new file mode 100644 index 0000000..c879cb4 --- /dev/null +++ b/src/test/java/com/datadoghq/sketch/ddsketch/store/PaginatedStoreTest.java @@ -0,0 +1,67 @@ +package com.datadoghq.sketch.ddsketch.store; + +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.MethodSource; + +import java.util.Iterator; +import java.util.stream.IntStream; +import java.util.stream.Stream; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; + +public class PaginatedStoreTest extends ExhaustiveStoreTest { + + + @Override + Store newStore() { + return new PaginatedStore(); + } + + @Override + void testExtremeValues() { + // PaginatedStore is not meant to be used with values that are extremely far from one another as it + // would allocate an excessively large array. + } + + @Override + void testMergingExtremeValues() { + // PaginatedStore is not meant to be used with values that are extremely far from one another as it + // would allocate an excessively large array. + } + + public static Stream affineTransformations() { + return Stream.of( + Arguments.of(1, 0), + Arguments.of(127, 1), + Arguments.of(128, 0), + Arguments.of(128, 1), + Arguments.of(129, 0), + Arguments.of(129, 1), + Arguments.of(-127, 1), + Arguments.of(-128, 0), + Arguments.of(-128, 1), + Arguments.of(-129, 0), + Arguments.of(-129, 1) + ); + } + + @ParameterizedTest + @MethodSource("affineTransformations") + public void shouldBeEquivalentToUnboundedSizeDenseStore(int m, int c) { + Store paginatedStore = newStore(); + UnboundedSizeDenseStore denseStore = new UnboundedSizeDenseStore(); + IntStream.range(0, 1000).map(x -> m * x + c).forEach(x -> { + paginatedStore.add(x); + denseStore.add(x); + }); + Iterator pit = paginatedStore.getAscendingIterator(); + Iterator dit = denseStore.getAscendingIterator(); + while (pit.hasNext() && dit.hasNext()) { + assertEquals(dit.next().getIndex(), pit.next().getIndex()); + } + assertFalse(pit.hasNext()); + assertFalse(dit.hasNext()); + } +} diff --git a/src/test/java/com/datadoghq/sketch/ddsketch/store/StoreTest.java b/src/test/java/com/datadoghq/sketch/ddsketch/store/StoreTest.java index 91d473a..ffc6b42 100644 --- a/src/test/java/com/datadoghq/sketch/ddsketch/store/StoreTest.java +++ b/src/test/java/com/datadoghq/sketch/ddsketch/store/StoreTest.java @@ -219,6 +219,9 @@ void testMergingFarApart() { testMerging(new int[]{ -10000 }, new int[]{ 10000 }); testMerging(new int[]{ 10000 }, new int[]{ -10000 }); testMerging(new int[]{ 10000 }, new int[]{ -10000 }, new int[]{ 0 }); + testMerging(new int[]{ -10000, 10000 }, new int[]{ -5000, 5000 }); + testMerging(new int[]{ -5000, 5000 }, new int[]{ -10000, 10000 }); + testMerging(new int[]{ -5000, 10000 }, new int[]{ -10000, 5000 }); testMerging(new int[]{ 10000, 0 }, new int[]{ -10000 }, new int[]{ 0 }); }