Skip to content

Commit 10bc7ba

Browse files
committed
HBASE-27775 Use a separate WAL provider for hbase:replication table
1 parent 49589ef commit 10bc7ba

5 files changed

Lines changed: 223 additions & 66 deletions

File tree

hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationStorageFactory.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -102,4 +102,10 @@ public static ReplicationQueueStorage getReplicationQueueStorage(Connection conn
102102
return ReflectionUtils.newInstance(clazz, conf, tableName);
103103
}
104104
}
105+
106+
public static boolean isReplicationQueueTable(Configuration conf, TableName tableName) {
107+
TableName replicationQueueTableName = TableName.valueOf(conf.get(REPLICATION_QUEUE_TABLE_NAME,
108+
REPLICATION_QUEUE_TABLE_NAME_DEFAULT.getNameAsString()));
109+
return replicationQueueTableName.equals(tableName);
110+
}
105111
}

hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerWrapperImpl.java

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -982,12 +982,12 @@ synchronized public void run() {
982982

983983
lastRan = currentTime;
984984

985-
final WALProvider provider = regionServer.getWalFactory().getWALProvider();
986-
final WALProvider metaProvider = regionServer.getWalFactory().getMetaWALProvider();
987-
numWALFiles = (provider == null ? 0 : provider.getNumLogFiles())
988-
+ (metaProvider == null ? 0 : metaProvider.getNumLogFiles());
989-
walFileSize = (provider == null ? 0 : provider.getLogFileSize())
990-
+ (metaProvider == null ? 0 : metaProvider.getLogFileSize());
985+
List<WALProvider> providers = regionServer.getWalFactory().getAllWALProviders();
986+
for (WALProvider provider : providers) {
987+
numWALFiles += provider.getNumLogFiles();
988+
walFileSize += provider.getLogFileSize();
989+
}
990+
991991
// Copy over computed values so that no thread sees half computed values.
992992
numStores = tempNumStores;
993993
numStoreFiles = tempNumStoreFiles;
Lines changed: 108 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,108 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
package org.apache.hadoop.hbase.wal;
19+
20+
import java.io.Closeable;
21+
import java.io.IOException;
22+
import java.util.concurrent.atomic.AtomicReference;
23+
import org.apache.hadoop.conf.Configuration;
24+
import org.apache.hadoop.hbase.Abortable;
25+
import org.apache.hadoop.hbase.regionserver.wal.MetricsWAL;
26+
import org.apache.hadoop.hbase.wal.WALFactory.Providers;
27+
import org.apache.yetus.audience.InterfaceAudience;
28+
29+
/**
30+
* A lazy initialized WAL provider for holding the WALProvider for some special tables, such as
31+
* hbase:meta, hbase:replication, etc.
32+
*/
33+
@InterfaceAudience.Private
34+
class LazyInitializedWALProvider implements Closeable {
35+
36+
private final WALFactory factory;
37+
38+
private final String providerId;
39+
40+
private final String providerConfigName;
41+
42+
private final Abortable abortable;
43+
44+
private final AtomicReference<WALProvider> holder = new AtomicReference<>();
45+
46+
LazyInitializedWALProvider(WALFactory factory, String providerId, String providerConfigName,
47+
Abortable abortable) {
48+
this.factory = factory;
49+
this.providerId = providerId;
50+
this.providerConfigName = providerConfigName;
51+
this.abortable = abortable;
52+
}
53+
54+
WALProvider getProvider() throws IOException {
55+
Configuration conf = factory.getConf();
56+
for (;;) {
57+
WALProvider provider = this.holder.get();
58+
if (provider != null) {
59+
return provider;
60+
}
61+
Class<? extends WALProvider> clz = null;
62+
if (conf.get(providerConfigName) == null) {
63+
try {
64+
clz = conf.getClass(WALFactory.WAL_PROVIDER, Providers.defaultProvider.clazz,
65+
WALProvider.class);
66+
} catch (Throwable t) {
67+
// the WAL provider should be an enum. Proceed
68+
}
69+
}
70+
if (clz == null) {
71+
clz = factory.getProviderClass(providerConfigName,
72+
conf.get(WALFactory.WAL_PROVIDER, WALFactory.DEFAULT_WAL_PROVIDER));
73+
}
74+
provider = WALFactory.createProvider(clz);
75+
provider.init(factory, conf, providerId, this.abortable);
76+
provider.addWALActionsListener(new MetricsWAL());
77+
if (this.holder.compareAndSet(null, provider)) {
78+
return provider;
79+
} else {
80+
// someone is ahead of us, close and try again.
81+
provider.close();
82+
}
83+
}
84+
}
85+
86+
/**
87+
* Get the provider if it already initialized, otherwise just return {@code null} instead of
88+
* creating it.
89+
*/
90+
WALProvider getProviderNoCreate() {
91+
return holder.get();
92+
}
93+
94+
@Override
95+
public void close() throws IOException {
96+
WALProvider provider = this.holder.get();
97+
if (provider != null) {
98+
provider.close();
99+
}
100+
}
101+
102+
void shutdown() throws IOException {
103+
WALProvider provider = this.holder.get();
104+
if (provider != null) {
105+
provider.shutdown();
106+
}
107+
}
108+
}

hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALFactory.java

Lines changed: 102 additions & 55 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import com.google.errorprone.annotations.RestrictedApi;
2121
import java.io.IOException;
2222
import java.io.InterruptedIOException;
23+
import java.util.ArrayList;
2324
import java.util.List;
2425
import java.util.concurrent.atomic.AtomicReference;
2526
import org.apache.hadoop.conf.Configuration;
@@ -28,10 +29,12 @@
2829
import org.apache.hadoop.hbase.Abortable;
2930
import org.apache.hadoop.hbase.ServerName;
3031
import org.apache.hadoop.hbase.client.RegionInfo;
32+
import org.apache.hadoop.hbase.client.RegionReplicaUtil;
3133
import org.apache.hadoop.hbase.io.asyncfs.monitor.ExcludeDatanodeManager;
3234
import org.apache.hadoop.hbase.regionserver.wal.MetricsWAL;
3335
import org.apache.hadoop.hbase.regionserver.wal.ProtobufWALStreamReader;
3436
import org.apache.hadoop.hbase.regionserver.wal.ProtobufWALTailingReader;
37+
import org.apache.hadoop.hbase.replication.ReplicationStorageFactory;
3538
import org.apache.hadoop.hbase.util.CancelableProgressable;
3639
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
3740
import org.apache.hadoop.hbase.util.LeaseNotRecoveredException;
@@ -99,15 +102,22 @@ enum Providers {
99102

100103
public static final String META_WAL_PROVIDER = "hbase.wal.meta_provider";
101104

105+
public static final String REPLICATION_WAL_PROVIDER = "hbase.wal.replication_provider";
106+
102107
public static final String WAL_ENABLED = "hbase.regionserver.hlog.enabled";
103108

109+
static final String REPLICATION_WAL_PROVIDER_ID = "rep";
110+
104111
final String factoryId;
105112
final Abortable abortable;
106113
private final WALProvider provider;
107114
// The meta updates are written to a different wal. If this
108115
// regionserver holds meta regions, then this ref will be non-null.
109116
// lazily intialized; most RegionServers don't deal with META
110-
private final AtomicReference<WALProvider> metaProvider = new AtomicReference<>();
117+
private final LazyInitializedWALProvider metaProvider;
118+
// This is for avoid hbase:replication itself keeps trigger unnecessary updates to WAL file and
119+
// generate a lot useless data, see HBASE-27775 for more details.
120+
private final LazyInitializedWALProvider replicationProvider;
111121

112122
/**
113123
* Configuration-specified WAL Reader used when a custom reader is requested
@@ -144,13 +154,15 @@ private WALFactory(Configuration conf) {
144154
factoryId = SINGLETON_ID;
145155
this.abortable = null;
146156
this.excludeDatanodeManager = new ExcludeDatanodeManager(conf);
157+
this.metaProvider = null;
158+
this.replicationProvider = null;
147159
}
148160

149161
Providers getDefaultProvider() {
150162
return Providers.defaultProvider;
151163
}
152164

153-
public Class<? extends WALProvider> getProviderClass(String key, String defaultValue) {
165+
Class<? extends WALProvider> getProviderClass(String key, String defaultValue) {
154166
try {
155167
Providers provider = Providers.valueOf(conf.get(key, defaultValue));
156168

@@ -246,6 +258,10 @@ private WALFactory(Configuration conf, String factoryId, Abortable abortable,
246258
this.factoryId = factoryId;
247259
this.excludeDatanodeManager = new ExcludeDatanodeManager(conf);
248260
this.abortable = abortable;
261+
this.metaProvider = new LazyInitializedWALProvider(this,
262+
AbstractFSWALProvider.META_WAL_PROVIDER_ID, META_WAL_PROVIDER, this.abortable);
263+
this.replicationProvider = new LazyInitializedWALProvider(this, REPLICATION_WAL_PROVIDER_ID,
264+
REPLICATION_WAL_PROVIDER, this.abortable);
249265
// end required early initialization
250266
if (conf.getBoolean(WAL_ENABLED, true)) {
251267
WALProvider provider = createProvider(getProviderClass(WAL_PROVIDER, DEFAULT_WAL_PROVIDER));
@@ -263,19 +279,45 @@ private WALFactory(Configuration conf, String factoryId, Abortable abortable,
263279
}
264280
}
265281

282+
public Configuration getConf() {
283+
return conf;
284+
}
285+
266286
/**
267287
* Shutdown all WALs and clean up any underlying storage. Use only when you will not need to
268288
* replay and edits that have gone to any wals from this factory.
269289
*/
270290
public void close() throws IOException {
271-
final WALProvider metaProvider = this.metaProvider.get();
272-
if (null != metaProvider) {
273-
metaProvider.close();
291+
List<IOException> ioes = new ArrayList<>();
292+
// these fields could be null if the WALFactory is created only for being used in the
293+
// getInstance method.
294+
if (metaProvider != null) {
295+
try {
296+
metaProvider.close();
297+
} catch (IOException e) {
298+
ioes.add(e);
299+
}
300+
}
301+
if (replicationProvider != null) {
302+
try {
303+
replicationProvider.close();
304+
} catch (IOException e) {
305+
ioes.add(e);
306+
}
274307
}
275-
// close is called on a WALFactory with null provider in the case of contention handling
276-
// within the getInstance method.
277-
if (null != provider) {
278-
provider.close();
308+
if (provider != null) {
309+
try {
310+
provider.close();
311+
} catch (IOException e) {
312+
ioes.add(e);
313+
}
314+
}
315+
if (!ioes.isEmpty()) {
316+
IOException ioe = new IOException("Failed to close WALFactory");
317+
for (IOException e : ioes) {
318+
ioe.addSuppressed(e);
319+
}
320+
throw ioe;
279321
}
280322
}
281323

@@ -285,18 +327,36 @@ public void close() throws IOException {
285327
* if you can as it will try to leave things as tidy as possible.
286328
*/
287329
public void shutdown() throws IOException {
288-
IOException exception = null;
289-
final WALProvider metaProvider = this.metaProvider.get();
290-
if (null != metaProvider) {
330+
List<IOException> ioes = new ArrayList<>();
331+
// these fields could be null if the WALFactory is created only for being used in the
332+
// getInstance method.
333+
if (metaProvider != null) {
291334
try {
292335
metaProvider.shutdown();
293-
} catch (IOException ioe) {
294-
exception = ioe;
336+
} catch (IOException e) {
337+
ioes.add(e);
338+
}
339+
}
340+
if (replicationProvider != null) {
341+
try {
342+
replicationProvider.shutdown();
343+
} catch (IOException e) {
344+
ioes.add(e);
345+
}
346+
}
347+
if (provider != null) {
348+
try {
349+
provider.shutdown();
350+
} catch (IOException e) {
351+
ioes.add(e);
295352
}
296353
}
297-
provider.shutdown();
298-
if (null != exception) {
299-
throw exception;
354+
if (!ioes.isEmpty()) {
355+
IOException ioe = new IOException("Failed to shutdown WALFactory");
356+
for (IOException e : ioes) {
357+
ioe.addSuppressed(e);
358+
}
359+
throw ioe;
300360
}
301361
}
302362

@@ -309,48 +369,23 @@ public List<WAL> getWALs() {
309369
* creating the first hbase:meta WAL so we can register a listener.
310370
* @see #getMetaWALProvider()
311371
*/
312-
public WALProvider getMetaProvider() throws IOException {
313-
for (;;) {
314-
WALProvider provider = this.metaProvider.get();
315-
if (provider != null) {
316-
return provider;
317-
}
318-
Class<? extends WALProvider> clz = null;
319-
if (conf.get(META_WAL_PROVIDER) == null) {
320-
try {
321-
clz = conf.getClass(WAL_PROVIDER, Providers.defaultProvider.clazz, WALProvider.class);
322-
} catch (Throwable t) {
323-
// the WAL provider should be an enum. Proceed
324-
}
325-
}
326-
if (clz == null) {
327-
clz = getProviderClass(META_WAL_PROVIDER, conf.get(WAL_PROVIDER, DEFAULT_WAL_PROVIDER));
328-
}
329-
provider = createProvider(clz);
330-
provider.init(this, conf, AbstractFSWALProvider.META_WAL_PROVIDER_ID, this.abortable);
331-
provider.addWALActionsListener(new MetricsWAL());
332-
if (metaProvider.compareAndSet(null, provider)) {
333-
return provider;
334-
} else {
335-
// someone is ahead of us, close and try again.
336-
provider.close();
337-
}
338-
}
372+
WALProvider getMetaProvider() throws IOException {
373+
return metaProvider.getProvider();
339374
}
340375

341376
/**
342377
* @param region the region which we want to get a WAL for. Could be null.
343378
*/
344379
public WAL getWAL(RegionInfo region) throws IOException {
345380
// Use different WAL for hbase:meta. Instantiates the meta WALProvider if not already up.
346-
if (
347-
region != null && region.isMetaRegion()
348-
&& region.getReplicaId() == RegionInfo.DEFAULT_REPLICA_ID
349-
) {
350-
return getMetaProvider().getWAL(region);
351-
} else {
352-
return provider.getWAL(region);
381+
if (region != null && RegionReplicaUtil.isDefaultReplica(region)) {
382+
if (region.isMetaRegion()) {
383+
return metaProvider.getProvider().getWAL(region);
384+
} else if (ReplicationStorageFactory.isReplicationQueueTable(conf, region.getTable())) {
385+
return replicationProvider.getProvider().getWAL(region);
386+
}
353387
}
388+
return provider.getWAL(region);
354389
}
355390

356391
public WALStreamReader createStreamReader(FileSystem fs, Path path) throws IOException {
@@ -532,11 +567,23 @@ public final WALProvider getWALProvider() {
532567
}
533568

534569
/**
535-
* @return Current metaProvider... may be null if not yet initialized.
536-
* @see #getMetaProvider()
570+
* Returns all the wal providers, for example, the default one, the one for hbase:meta and the one
571+
* for hbase:replication.
537572
*/
538-
public final WALProvider getMetaWALProvider() {
539-
return this.metaProvider.get();
573+
public final List<WALProvider> getAllWALProviders() {
574+
List<WALProvider> providers = new ArrayList<>();
575+
if (provider != null) {
576+
providers.add(provider);
577+
}
578+
WALProvider meta = metaProvider.getProviderNoCreate();
579+
if (meta != null) {
580+
providers.add(meta);
581+
}
582+
WALProvider replication = replicationProvider.getProviderNoCreate();
583+
if (replication != null) {
584+
providers.add(replication);
585+
}
586+
return providers;
540587
}
541588

542589
public ExcludeDatanodeManager getExcludeDatanodeManager() {

0 commit comments

Comments
 (0)