1717 */
1818package org .apache .hadoop .hbase .regionserver ;
1919
20+ import static org .apache .hadoop .hbase .regionserver .StoreFileWriter .shouldEnableHistoricalCompactionFiles ;
21+
22+ import edu .umd .cs .findbugs .annotations .Nullable ;
2023import java .io .IOException ;
24+ import java .util .ArrayList ;
2125import java .util .Collection ;
2226import java .util .Comparator ;
2327import java .util .Iterator ;
@@ -48,36 +52,71 @@ class DefaultStoreFileManager implements StoreFileManager {
4852 private final CompactionConfiguration comConf ;
4953 private final int blockingFileCount ;
5054 private final Comparator <HStoreFile > storeFileComparator ;
51- /**
52- * List of store files inside this store. This is an immutable list that is atomically replaced
53- * when its contents change.
54- */
55- private volatile ImmutableList <HStoreFile > storefiles = ImmutableList .of ();
55+
56+ static class StoreFileList {
57+ /**
58+ * List of store files inside this store. This is an immutable list that is atomically replaced
59+ * when its contents change.
60+ */
61+ final ImmutableList <HStoreFile > all ;
62+ /**
63+ * List of store files that include the latest cells inside this store. This is an immutable
64+ * list that is atomically replaced when its contents change.
65+ */
66+ @ Nullable
67+ final ImmutableList <HStoreFile > live ;
68+
69+ StoreFileList (ImmutableList <HStoreFile > storeFiles , ImmutableList <HStoreFile > liveStoreFiles ) {
70+ this .all = storeFiles ;
71+ this .live = liveStoreFiles ;
72+ }
73+ }
74+
75+ private volatile StoreFileList storeFiles ;
76+
5677 /**
5778 * List of compacted files inside this store that needs to be excluded in reads because further
5879 * new reads will be using only the newly created files out of compaction. These compacted files
5980 * will be deleted/cleared once all the existing readers on these compacted files are done.
6081 */
6182 private volatile ImmutableList <HStoreFile > compactedfiles = ImmutableList .of ();
83+ private final boolean enableLiveFileTracking ;
6284
6385 public DefaultStoreFileManager (CellComparator cellComparator ,
6486 Comparator <HStoreFile > storeFileComparator , Configuration conf ,
6587 CompactionConfiguration comConf ) {
6688 this .cellComparator = cellComparator ;
6789 this .storeFileComparator = storeFileComparator ;
6890 this .comConf = comConf ;
69- this . blockingFileCount =
91+ blockingFileCount =
7092 conf .getInt (HStore .BLOCKING_STOREFILES_KEY , HStore .DEFAULT_BLOCKING_STOREFILE_COUNT );
93+ enableLiveFileTracking = shouldEnableHistoricalCompactionFiles (conf );
94+ storeFiles =
95+ new StoreFileList (ImmutableList .of (), enableLiveFileTracking ? ImmutableList .of () : null );
96+ }
97+
98+ private List <HStoreFile > getLiveFiles (Collection <HStoreFile > storeFiles ) throws IOException {
99+ List <HStoreFile > liveFiles = new ArrayList <>(storeFiles .size ());
100+ for (HStoreFile file : storeFiles ) {
101+ file .initReader ();
102+ if (!file .isHistorical ()) {
103+ liveFiles .add (file );
104+ }
105+ }
106+ return liveFiles ;
71107 }
72108
73109 @ Override
74- public void loadFiles (List <HStoreFile > storeFiles ) {
75- this .storefiles = ImmutableList .sortedCopyOf (storeFileComparator , storeFiles );
110+ public void loadFiles (List <HStoreFile > storeFiles ) throws IOException {
111+ this .storeFiles = new StoreFileList (ImmutableList .sortedCopyOf (storeFileComparator , storeFiles ),
112+ enableLiveFileTracking
113+ ? ImmutableList .sortedCopyOf (storeFileComparator , getLiveFiles (storeFiles ))
114+ : null );
76115 }
77116
78117 @ Override
79- public final Collection <HStoreFile > getStorefiles () {
80- return storefiles ;
118+ public final Collection <HStoreFile > getStoreFiles () {
119+ return storeFiles . all ;
81120 }
82121
83122 @ Override
@@ -86,15 +125,20 @@ public Collection<HStoreFile> getCompactedfiles() {
86125 }
87126
88127 @ Override
89- public void insertNewFiles (Collection <HStoreFile > sfs ) {
90- this .storefiles =
91- ImmutableList .sortedCopyOf (storeFileComparator , Iterables .concat (this .storefiles , sfs ));
128+ public void insertNewFiles (Collection <HStoreFile > sfs ) throws IOException {
129+ storeFiles = new StoreFileList (
130+ ImmutableList .sortedCopyOf (storeFileComparator , Iterables .concat (storeFiles .all , sfs )),
131+ enableLiveFileTracking
132+ ? ImmutableList .sortedCopyOf (storeFileComparator ,
133+ Iterables .concat (storeFiles .live , getLiveFiles (sfs )))
134+ : null );
92135 }
93136
94137 @ Override
95138 public ImmutableCollection <HStoreFile > clearFiles () {
96- ImmutableList <HStoreFile > result = storefiles ;
97- storefiles = ImmutableList .of ();
139+ ImmutableList <HStoreFile > result = storeFiles .all ;
140+ storeFiles =
141+ new StoreFileList (ImmutableList .of (), enableLiveFileTracking ? ImmutableList .of () : null );
98142 return result ;
99143 }
100144
@@ -107,7 +151,7 @@ public Collection<HStoreFile> clearCompactedFiles() {
107151
108152 @ Override
109153 public final int getStorefileCount () {
110- return storefiles .size ();
154+ return storeFiles . all .size ();
111155 }
112156
113157 @ Override
@@ -117,28 +161,38 @@ public final int getCompactedFilesCount() {
117161
118162 @ Override
119163 public void addCompactionResults (Collection <HStoreFile > newCompactedfiles ,
120- Collection <HStoreFile > results ) {
121- this .storefiles = ImmutableList .sortedCopyOf (storeFileComparator , Iterables
122- .concat (Iterables .filter (storefiles , sf -> !newCompactedfiles .contains (sf )), results ));
164+ Collection <HStoreFile > results ) throws IOException {
165+ ImmutableList <HStoreFile > liveStoreFiles = null ;
166+ if (enableLiveFileTracking ) {
167+ liveStoreFiles = ImmutableList .sortedCopyOf (storeFileComparator ,
168+ Iterables .concat (Iterables .filter (storeFiles .live , sf -> !newCompactedfiles .contains (sf )),
169+ getLiveFiles (results )));
170+ }
171+ storeFiles =
172+ new StoreFileList (
173+ ImmutableList
174+ .sortedCopyOf (storeFileComparator ,
175+ Iterables .concat (
176+ Iterables .filter (storeFiles .all , sf -> !newCompactedfiles .contains (sf )), results )),
177+ liveStoreFiles );
123178 // Mark the files as compactedAway once the storefiles and compactedfiles list is finalized
124179 // Let a background thread close the actual reader on these compacted files and also
125180 // ensure to evict the blocks from block cache so that they are no longer in
126181 // cache
127182 newCompactedfiles .forEach (HStoreFile ::markCompactedAway );
128- this . compactedfiles = ImmutableList .sortedCopyOf (storeFileComparator ,
129- Iterables .concat (this . compactedfiles , newCompactedfiles ));
183+ compactedfiles = ImmutableList .sortedCopyOf (storeFileComparator ,
184+ Iterables .concat (compactedfiles , newCompactedfiles ));
130185 }
131186
132187 @ Override
133188 public void removeCompactedFiles (Collection <HStoreFile > removedCompactedfiles ) {
134- this .compactedfiles =
135- this .compactedfiles .stream ().filter (sf -> !removedCompactedfiles .contains (sf ))
136- .sorted (storeFileComparator ).collect (ImmutableList .toImmutableList ());
189+ compactedfiles = compactedfiles .stream ().filter (sf -> !removedCompactedfiles .contains (sf ))
190+ .sorted (storeFileComparator ).collect (ImmutableList .toImmutableList ());
137191 }
138192
139193 @ Override
140194 public final Iterator <HStoreFile > getCandidateFilesForRowKeyBefore (KeyValue targetKey ) {
141- return this . storefiles .reverse ().iterator ();
195+ return storeFiles . all .reverse ().iterator ();
142196 }
143197
144198 @ Override
@@ -153,25 +207,28 @@ public Iterator<HStoreFile> updateCandidateFilesForRowKeyBefore(
153207
154208 @ Override
155209 public final Optional <byte []> getSplitPoint () throws IOException {
156- return StoreUtils .getSplitPoint (storefiles , cellComparator );
210+ return StoreUtils .getSplitPoint (storeFiles . all , cellComparator );
157211 }
158212
159213 @ Override
160- public final Collection <HStoreFile > getFilesForScan (byte [] startRow , boolean includeStartRow ,
161- byte [] stopRow , boolean includeStopRow ) {
214+ public Collection <HStoreFile > getFilesForScan (byte [] startRow , boolean includeStartRow ,
215+ byte [] stopRow , boolean includeStopRow , boolean onlyLatestVersion ) {
216+ if (onlyLatestVersion && enableLiveFileTracking ) {
217+ return storeFiles .live ;
218+ }
162219 // We cannot provide any useful input and already have the files sorted by seqNum.
163- return getStorefiles ();
220+ return getStoreFiles ();
164221 }
165222
166223 @ Override
167224 public int getStoreCompactionPriority () {
168- int priority = blockingFileCount - storefiles .size ();
225+ int priority = blockingFileCount - storeFiles . all .size ();
169226 return (priority == HStore .PRIORITY_USER ) ? priority + 1 : priority ;
170227 }
171228
172229 @ Override
173230 public Collection <HStoreFile > getUnneededFiles (long maxTs , List <HStoreFile > filesCompacting ) {
174- ImmutableList <HStoreFile > files = storefiles ;
231+ ImmutableList <HStoreFile > files = storeFiles . all ;
175232 // 1) We can never get rid of the last file which has the maximum seqid.
176233 // 2) Files that are not the latest can't become one due to (1), so the rest are fair game.
177234 return files .stream ().limit (Math .max (0 , files .size () - 1 )).filter (sf -> {
0 commit comments