Skip to content

Commit a91933f

Browse files
improve readability of PaginatedStore
1 parent cf1e5bb commit a91933f

1 file changed

Lines changed: 50 additions & 27 deletions

File tree

src/main/java/com/datadoghq/sketch/ddsketch/store/PaginatedStore.java

Lines changed: 50 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,22 @@
44
import java.util.Iterator;
55
import java.util.NoSuchElementException;
66

7+
/**
8+
* This is an unbounded store which allocates storage for counts
9+
* in aligned pages stored in an array at offsets modulo the page
10+
* size. This means that if a distribution has several modes, the
11+
* cost of the storage for the space in between the modes is that
12+
* of a null pointer per page, requiring 4-8 bytes (depending on
13+
* CompressedOops, whether ZGC is used, and heap size) for each
14+
* 2KB page.
15+
*
16+
* On the contrary, if the data is uniformly distributed filling
17+
* each page in a range [N, N + K * PAGE_SIZE) this store will
18+
* require K * (20 + 4|8) extra space over
19+
* {@code UnboundedSizeDenseStore}, because of the metadata
20+
* overhead of the array headers and references to each page.
21+
*
22+
*/
723
public final class PaginatedStore implements Store {
824

925
private static final int GROWTH = 8;
@@ -12,24 +28,24 @@ public final class PaginatedStore implements Store {
1228
private static final int PAGE_SHIFT = Integer.bitCount(PAGE_MASK);
1329

1430
private double[][] pages = null;
15-
private int pageOffset;
31+
private int minPageIndex;
1632

1733
public PaginatedStore() {
18-
this(0);
34+
this(Integer.MAX_VALUE);
1935
}
2036

21-
PaginatedStore(int pageOffset) {
22-
this.pageOffset = pageOffset;
37+
PaginatedStore(int minPageIndex) {
38+
this.minPageIndex = minPageIndex;
2339
}
2440

2541
PaginatedStore(PaginatedStore store) {
26-
this(store.pageOffset);
42+
this(store.minPageIndex);
2743
this.pages = deepCopy(store.pages);
2844
}
2945

3046
@Override
3147
public boolean isEmpty() {
32-
// won't initialise any pages unless until a value is added,
48+
// won't initialise any pages until a value is added,
3349
// and values can't be removed.
3450
return null == pages;
3551
}
@@ -41,7 +57,7 @@ public int getMinIndex() {
4157
if (null != pages[i]) {
4258
for (int j = 0; j < pages[i].length; ++j) {
4359
if (pages[i][j] != 0D) {
44-
return ((i - pageOffset) << PAGE_SHIFT) + j;
60+
return ((i + minPageIndex) << PAGE_SHIFT) + j;
4561
}
4662
}
4763
}
@@ -57,7 +73,7 @@ public int getMaxIndex() {
5773
if (null != pages[i]) {
5874
for (int j = pages[i].length - 1; j >= 0; --j) {
5975
if (pages[i][j] != 0D) {
60-
return ((i - pageOffset) << PAGE_SHIFT) + j;
76+
return ((i + minPageIndex) << PAGE_SHIFT) + j;
6177
}
6278
}
6379
}
@@ -84,26 +100,33 @@ private double[] getPage(int pageIndex) {
84100
}
85101

86102
private int alignedIndex(int index) {
103+
// get the index of the page this value should be stored in
87104
int pageIndex = index < 0
88-
? -(-index >>> PAGE_SHIFT) - 1
105+
? ~(-index >>> PAGE_SHIFT)
89106
: index >>> PAGE_SHIFT;
90-
if (null == pages) {
91-
lazyInit(pageIndex);
92-
} else if (pageIndex + pageOffset < 0) {
93-
growBelow(pageIndex);
94-
} else if (pageIndex + pageOffset >= pages.length - 1) {
95-
growAbove(pageIndex);
107+
if (pageIndex < minPageIndex) {
108+
// then space needs to be made before the first page,
109+
// unless this is the first insertion
110+
if (null == pages) {
111+
lazyInit(pageIndex);
112+
} else {
113+
shiftPagesRight(pageIndex);
114+
}
115+
} else if (pageIndex >= minPageIndex + pages.length - 1) {
116+
// then space needs to be made after the last page
117+
extendTo(pageIndex);
96118
}
97-
return index + (pageOffset << PAGE_SHIFT);
119+
// align the index relative to the start of the sketch
120+
return index + (-minPageIndex << PAGE_SHIFT);
98121
}
99122

100123
private void lazyInit(int pageIndex) {
101-
pageOffset = -pageIndex;
124+
minPageIndex = pageIndex;
102125
pages = new double[GROWTH][];
103126
}
104127

105-
private void growBelow(int pageIndex) {
106-
int requiredExtension = -pageOffset - pageIndex;
128+
private void shiftPagesRight(int pageIndex) {
129+
int requiredExtension = minPageIndex - pageIndex;
107130
// check if there is space to shift into
108131
boolean canShiftRight = true;
109132
for (int i = 0; i < requiredExtension && canShiftRight; ++i) {
@@ -112,16 +135,16 @@ private void growBelow(int pageIndex) {
112135
if (canShiftRight) {
113136
System.arraycopy(pages, 0, pages, requiredExtension, pages.length - requiredExtension);
114137
} else {
115-
double[][] newPages = new double[pages.length + aligned(GROWTH, requiredExtension)][];
138+
double[][] newPages = new double[pages.length + aligned(requiredExtension)][];
116139
System.arraycopy(pages, 0, newPages, requiredExtension, pages.length);
117140
this.pages = newPages;
118141
}
119142
Arrays.fill(pages, 0, requiredExtension, null);
120-
this.pageOffset = -pageIndex;
143+
this.minPageIndex = pageIndex;
121144
}
122145

123-
private void growAbove(int pageIndex) {
124-
this.pages = Arrays.copyOf(pages, pages.length + aligned(GROWTH,pageIndex + 1 + pageOffset));
146+
private void extendTo(int pageIndex) {
147+
this.pages = Arrays.copyOf(pages, pages.length + aligned(pageIndex - minPageIndex + 1));
125148
}
126149

127150
@Override
@@ -139,8 +162,8 @@ public Iterator<Bin> getDescendingIterator() {
139162
return new DescendingIterator();
140163
}
141164

142-
private static int aligned(int alignment, int required) {
143-
return (required + alignment - 1) & -alignment;
165+
private static int aligned(int required) {
166+
return (required + GROWTH - 1) & -GROWTH;
144167
}
145168

146169
private static double[][] deepCopy(double[][] pages) {
@@ -185,7 +208,7 @@ public boolean hasNext() {
185208
@Override
186209
public Bin next() {
187210
double value = next;
188-
int index = ((pageIndex - pageOffset) << PAGE_SHIFT) + valueIndex;
211+
int index = ((pageIndex + minPageIndex) << PAGE_SHIFT) + valueIndex;
189212
++valueIndex;
190213
next = nextInPage();
191214
if (Double.isNaN(next)) {
@@ -241,7 +264,7 @@ public boolean hasNext() {
241264
@Override
242265
public Bin next() {
243266
double value = previous;
244-
int index = ((pageIndex - pageOffset) << PAGE_SHIFT) + valueIndex;
267+
int index = ((pageIndex + minPageIndex) << PAGE_SHIFT) + valueIndex;
245268
--valueIndex;
246269
previous = previousInPage();
247270
if (Double.isNaN(previous)) {

0 commit comments

Comments
 (0)