Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import com.codahale.metrics.SlidingWindowReservoir;
import com.codahale.metrics.Timer;

import org.apache.hudi.common.metrics.Registry;
import org.apache.hudi.common.util.HoodieTimer;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.metrics.Metrics;
Expand Down Expand Up @@ -70,11 +71,13 @@ private String getMetricsName(String metric) {

private Timer createTimerForMetrics(MetricRegistry registry, String metric) {
String metricName = getMetricsName(metric);
synchronized (REGISTRY_LOCK) {
if (registry.getMetrics().get(metricName) == null) {
lockDuration = new Timer(new SlidingWindowReservoir(keepLastNtimes));
registry.register(metricName, lockDuration);
return lockDuration;
if (registry.getMetrics().get(metricName) == null) {
synchronized (Registry.class) {
if (registry.getMetrics().get(metricName) == null) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why the REGISTRY_LOCK can not work correctly here, can you elaborate the explanation then ?

Copy link
Author

@kaori-seasons kaori-seasons Dec 5, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

为什么REGISTRY_LOCK不能在这里正常工作,你能详细解释一下吗?

When using the reentrant lock, the Task threads are isolated from each other before, but there will be a situation where the second Task thread enters the synchronized scope and waits after the first Task thread registers the value. At this time, the metrics may not be obtained. So there is a duplicate registration

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I mean the REGISTRY_LOCK can still be used instead of the class obj lock Registry.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, it is not necessary to lock the whole class. I will make the correction in these two days

lockDuration = new Timer(new SlidingWindowReservoir(keepLastNtimes));
registry.register(metricName, lockDuration);
return lockDuration;
}
}
}
return (Timer) registry.getMetrics().get(metricName);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
package org.apache.hudi.config.metrics;

import org.apache.hudi.client.transaction.lock.metrics.HoodieLockMetrics;
import org.apache.hudi.common.config.HoodieMetadataConfig;
import org.apache.hudi.common.table.view.FileSystemViewStorageConfig;
import org.apache.hudi.common.table.view.FileSystemViewStorageType;
import org.apache.hudi.common.util.CustomizedThreadFactory;
import org.apache.hudi.config.HoodieCompactionConfig;
import org.apache.hudi.config.HoodieIndexConfig;
import org.apache.hudi.config.HoodieStorageConfig;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.index.HoodieIndex;
import org.apache.hudi.metrics.Metrics;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.io.TempDir;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;

import java.io.IOException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadPoolExecutor;

import static org.apache.hudi.common.testutils.HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA;

public class TestHoodieLockMetrics {

@TempDir
public java.nio.file.Path tempDir;
protected String basePath = null;

@BeforeEach
public void setUp() throws IOException {
java.nio.file.Path basePath = tempDir.resolve("dataset");
java.nio.file.Files.createDirectories(basePath);
this.basePath = basePath.toString();
}

@ParameterizedTest
@ValueSource(booleans = {true, false})
public void testTaskManagerConcurrentRegisterMetrics(){
ExecutorService threadPoolExecutor = Executors.newFixedThreadPool(10, new CustomizedThreadFactory("hudi-metrics-"));

threadPoolExecutor.submit( () -> {
HoodieWriteConfig writeConfig = getConfigBuilder(false).build();
return new HoodieLockMetrics(writeConfig);
});
}
protected HoodieWriteConfig.Builder getConfigBuilder(Boolean autoCommit) {

return HoodieWriteConfig.newBuilder().withPath(basePath)
.withSchema(TRIP_EXAMPLE_SCHEMA)
.withParallelism(2, 2)
.withAutoCommit(autoCommit)
.withMetadataConfig(HoodieMetadataConfig.newBuilder().withAssumeDatePartitioning(true).build())
.withCompactionConfig(HoodieCompactionConfig.newBuilder().compactionSmallFileSize(1024 * 1024 * 1024)
.withInlineCompaction(false).withMaxNumDeltaCommitsBeforeCompaction(1).build())
.withStorageConfig(HoodieStorageConfig.newBuilder()
.hfileMaxFileSize(1024 * 1024 * 1024).parquetMaxFileSize(1024 * 1024 * 1024).orcMaxFileSize(1024 * 1024 * 1024).build())
.forTable("test-trip-table")
.withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(HoodieIndex.IndexType.BLOOM).build())
.withEmbeddedTimelineServerEnabled(true).withFileSystemViewConfig(FileSystemViewStorageConfig.newBuilder()
.withStorageType(FileSystemViewStorageType.EMBEDDED_KV_STORE).build());
}
}