Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
4378de2
UD-635. Added datacenter info to datanodes, buckets and containers
apotheque Mar 11, 2025
7215507
Fix for no DC mapping provided
apotheque Mar 11, 2025
3ea5149
Fix null datacenters property in protobuf
apotheque Mar 11, 2025
2bc501c
Fix datacenters param mock
apotheque Mar 11, 2025
430de38
Fix reading null datacenters from protobuf
apotheque Mar 11, 2025
62d086d
Fix some tests
apotheque Mar 11, 2025
47177aa
Added missing xml configuration
apotheque Mar 11, 2025
2c99129
Check omBucketInfo is null
apotheque Mar 11, 2025
80dc31a
Fix reading null datacenters from protobuf #2
apotheque Mar 12, 2025
a5e71d7
Datacenter based policies for pipelines
apotheque Mar 13, 2025
2dc4267
Fix pmd
apotheque Mar 13, 2025
8f11ddd
Fix tests
apotheque Mar 14, 2025
a135bec
Make tests pass again
apotheque Mar 14, 2025
faec0bc
Set 1.4 branch as base
apotheque Mar 15, 2025
a52ba77
Fix tests
apotheque Mar 15, 2025
6afdba7
Choose random pipeline
apotheque Mar 17, 2025
7e78726
Disable BackgroundPipelineCreator
apotheque Mar 18, 2025
d7d8cf2
Return null if there is no suitable pipeline to choose from
apotheque Mar 18, 2025
6378878
Revert "Disable BackgroundPipelineCreator"
apotheque Mar 18, 2025
eb48107
Force BackgroundPipelineCreator to create pipelines bound to all spec…
apotheque Mar 18, 2025
578c7c7
Validate bucket datacenters option with regexp
apotheque Mar 19, 2025
fd13563
Use repeated type for datacenters in AllocateScmBlockRequestProto
apotheque Mar 19, 2025
960ecb6
Copy datacenters set in PipelineRequestInformation.Builder
apotheque Mar 19, 2025
6ea6503
Better description for ozone.scm.dc.datanode.mapping property
apotheque Mar 19, 2025
60d37de
Fix tests
apotheque Mar 19, 2025
2c2ada1
Fix checkstyle
apotheque Mar 19, 2025
f351fbe
Fix tests
apotheque Mar 20, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,17 +18,23 @@

package org.apache.hadoop.hdds.scm;

import java.util.Collections;
import java.util.HashSet;
import java.util.Set;

/**
* The information of the request of pipeline.
*/
public final class PipelineRequestInformation {
private final long size;
private final Set<String> datacenters;

/**
* Builder for PipelineRequestInformation.
*/
public static class Builder {
private long size;
private Set<String> datacenters = Collections.emptySet();

public static Builder getBuilder() {
return new Builder();
Expand All @@ -44,16 +50,31 @@ public Builder setSize(long sz) {
return this;
}

/**
* sets the datacenters.
* @param datacenters request datacenters
* @return Builder for PipelineRequestInformation
*/
public Builder setDatacenters(Set<String> datacenters) {
this.datacenters = new HashSet<>(datacenters);
return this;
}

public PipelineRequestInformation build() {
return new PipelineRequestInformation(size);
return new PipelineRequestInformation(size, datacenters);
}
}

private PipelineRequestInformation(long size) {
private PipelineRequestInformation(long size, Set<String> datacenters) {
this.size = size;
this.datacenters = datacenters;
}

public long getSize() {
return size;
}

public Set<String> getDatacenters() {
return datacenters;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -617,6 +617,13 @@ public final class ScmConfigKeys {
"ozone.scm.ha.dbtransactionbuffer.flush.interval";
public static final long
OZONE_SCM_HA_DBTRANSACTIONBUFFER_FLUSH_INTERVAL_DEFAULT = 600 * 1000L;

public static final String OZONE_SCM_DC_DATANODE_MAPPING_KEY =
"ozone.scm.dc.datanode.mapping";

public static final String OZONE_SCM_DC_DATANODE_MAPPING_DEFAULT =
"";

/**
* Never constructed.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,24 +17,27 @@
*/
package org.apache.hadoop.hdds.scm.container;

import java.time.Clock;
import java.time.Instant;
import java.util.Comparator;

import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonProperty;
import org.apache.commons.lang3.builder.EqualsBuilder;
import org.apache.commons.lang3.builder.HashCodeBuilder;
import org.apache.hadoop.hdds.client.ECReplicationConfig;
import org.apache.hadoop.hdds.client.ReplicationConfig;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.hdds.scm.pipeline.PipelineID;
import org.apache.hadoop.hdds.utils.db.Codec;
import org.apache.hadoop.hdds.utils.db.DelegatedCodec;
import org.apache.hadoop.hdds.utils.db.Proto2Codec;
import org.apache.ratis.util.Preconditions;

import java.time.Clock;
import java.time.Instant;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashSet;
import java.util.Set;

import com.fasterxml.jackson.annotation.JsonIgnore;
import static java.lang.Math.max;
import org.apache.commons.lang3.builder.EqualsBuilder;
import org.apache.commons.lang3.builder.HashCodeBuilder;
import org.apache.ratis.util.Preconditions;

/**
* Class wraps ozone container info.
Expand Down Expand Up @@ -89,6 +92,7 @@ public static Codec<ContainerInfo> getCodec() {
// The sequenceId of a close container cannot change, and all the
// container replica should have the same sequenceId.
private long sequenceId;
private final Set<String> datacenters;

@SuppressWarnings("parameternumber")
private ContainerInfo(
Expand All @@ -102,7 +106,8 @@ private ContainerInfo(
long deleteTransactionId,
long sequenceId,
ReplicationConfig repConfig,
Clock clock) {
Clock clock,
Set<String> datacenters) {
this.containerID = ContainerID.valueOf(containerID);
this.pipelineID = pipelineID;
this.usedBytes = usedBytes;
Expand All @@ -115,6 +120,7 @@ private ContainerInfo(
this.sequenceId = sequenceId;
this.replicationConfig = repConfig;
this.clock = clock;
this.datacenters = datacenters;
}

public static ContainerInfo fromProtobuf(HddsProtos.ContainerInfoProto info) {
Expand All @@ -131,13 +137,14 @@ public static ContainerInfo fromProtobuf(HddsProtos.ContainerInfoProto info) {
.setDeleteTransactionId(info.getDeleteTransactionId())
.setReplicationConfig(config)
.setSequenceId(info.getSequenceId())
.setDatacenters(new HashSet<>(info.getDatacentersList()))
.build();

if (info.hasPipelineID()) {
builder.setPipelineID(PipelineID.getFromProtobuf(info.getPipelineID()));
}
return builder.build();

return builder.build();
}

/**
Expand Down Expand Up @@ -229,6 +236,10 @@ public long getSequenceId() {
return sequenceId;
}

public Set<String> getDatacenters() {
return datacenters;
}

public void updateDeleteTransactionId(long transactionId) {
deleteTransactionId = max(transactionId, deleteTransactionId);
}
Expand Down Expand Up @@ -267,7 +278,8 @@ public HddsProtos.ContainerInfoProto getProtobuf() {
.setDeleteTransactionId(getDeleteTransactionId())
.setOwner(getOwner())
.setSequenceId(getSequenceId())
.setReplicationType(getReplicationType());
.setReplicationType(getReplicationType())
.addAllDatacenters(getDatacenters());

if (replicationConfig instanceof ECReplicationConfig) {
builder.setEcReplicationConfig(((ECReplicationConfig) replicationConfig)
Expand All @@ -282,6 +294,7 @@ public HddsProtos.ContainerInfoProto getProtobuf() {
if (getPipelineID() != null) {
builder.setPipelineID(getPipelineID().getProtobuf());
}

return builder.build();
}

Expand Down Expand Up @@ -383,6 +396,7 @@ public static class Builder {
private long sequenceId;
private PipelineID pipelineID;
private ReplicationConfig replicationConfig;
private Set<String> datacenters = Collections.emptySet();

public Builder setPipelineID(PipelineID pipelineId) {
this.pipelineID = pipelineId;
Expand Down Expand Up @@ -444,10 +458,15 @@ public Builder setClock(Clock clock) {
return this;
}

public Builder setDatacenters(Set<String> datacenters) {
this.datacenters = datacenters;
return this;
}

public ContainerInfo build() {
return new ContainerInfo(containerID, state, pipelineID,
used, keys, stateEnterTime, owner, deleteTransactionId,
sequenceId, replicationConfig, clock);
sequenceId, replicationConfig, clock, datacenters);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,7 @@ public enum ResultCodes {
CA_ROTATION_IN_PROGRESS,
CA_ROTATION_IN_POST_PROGRESS,
CONTAINER_ALREADY_CLOSED,
CONTAINER_ALREADY_CLOSING
CONTAINER_ALREADY_CLOSING,
FAILED_TO_FIND_ENOUGH_NODES_WITHIN_DATACENTER,
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.LinkedList;
Expand Down Expand Up @@ -93,6 +94,8 @@ public static Codec<Pipeline> getCodec() {

private final Instant stateEnterTime;

private final Set<String> datacenters;

/**
* The immutable properties of pipeline object is used in
* ContainerStateManager#getMatchingContainerByPipeline to take a lock on
Expand All @@ -118,6 +121,7 @@ private Pipeline(Builder b) {
replicaIndexes = b.replicaIndexes != null ? ImmutableMap.copyOf(b.replicaIndexes) : ImmutableMap.of();
creationTimestamp = b.creationTimestamp != null ? b.creationTimestamp : Instant.now();
stateEnterTime = Instant.now();
datacenters = b.datacenters;
}

/**
Expand Down Expand Up @@ -160,6 +164,10 @@ public Instant getStateEnterTime() {
return stateEnterTime;
}

public Set<String> getDatacenters() {
return datacenters;
}

/**
* Return the suggested leaderId which has a high priority among DNs of the
* pipeline.
Expand Down Expand Up @@ -379,7 +387,8 @@ public HddsProtos.Pipeline getProtobufMessage(int clientVersion)
.setLeaderID(leaderId != null ? leaderId.toString() : "")
.setCreationTimeStamp(creationTimestamp.toEpochMilli())
.addAllMembers(members)
.addAllMemberReplicaIndexes(memberReplicaIndexes);
.addAllMemberReplicaIndexes(memberReplicaIndexes)
.addAllDatacenters(datacenters);

if (replicationConfig instanceof ECReplicationConfig) {
builder.setEcReplicationConfig(((ECReplicationConfig) replicationConfig)
Expand Down Expand Up @@ -479,7 +488,8 @@ public static Builder toBuilder(HddsProtos.Pipeline pipeline)
.setLeaderId(leaderId)
.setSuggestedLeaderId(suggestedLeaderId)
.setNodeOrder(pipeline.getMemberOrdersList())
.setCreateTimestamp(pipeline.getCreationTimeStamp());
.setCreateTimestamp(pipeline.getCreationTimeStamp())
.setDatacenters(new HashSet<>(pipeline.getDatacentersList()));
}

public static Pipeline getFromProtobuf(HddsProtos.Pipeline pipeline)
Expand Down Expand Up @@ -555,6 +565,7 @@ public static class Builder {
private Instant creationTimestamp = null;
private UUID suggestedLeaderId = null;
private Map<DatanodeDetails, Integer> replicaIndexes;
private Set<String> datacenters = new HashSet<>();

public Builder() { }

Expand All @@ -567,6 +578,7 @@ public Builder(Pipeline pipeline) {
this.leaderId = pipeline.getLeaderId();
this.creationTimestamp = pipeline.getCreationTimestamp();
this.suggestedLeaderId = pipeline.getSuggestedLeaderId();
this.datacenters = pipeline.getDatacenters();
if (nodeStatus != null) {
replicaIndexes = new HashMap<>();
for (DatanodeDetails dn : nodeStatus.keySet()) {
Expand Down Expand Up @@ -635,6 +647,10 @@ public Builder setSuggestedLeaderId(UUID uuid) {
return this;
}

public Builder setDatacenters(Set<String> datacenters) {
this.datacenters = datacenters;
return this;
}

public Builder setReplicaIndexes(Map<DatanodeDetails, Integer> indexes) {
this.replicaIndexes = indexes;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -407,6 +407,9 @@ private OzoneConsts() {
public static final String GDPR_SECRET = "secret";
public static final String GDPR_ALGORITHM = "algorithm";

// DATACENTERS
public static final String DATACENTERS = "datacenters";

/**
* Block key name as illegal characters
*
Expand Down
12 changes: 12 additions & 0 deletions hadoop-hdds/common/src/main/resources/ozone-default.xml
Original file line number Diff line number Diff line change
Expand Up @@ -4349,4 +4349,16 @@
A key of the cache entry is a pair of bucket and the requested key path.
</description>
</property>

<property>
<name>ozone.scm.dc.datanode.mapping</name>
<value></value>
<tag>SCM</tag>
<description>
This property defines a mapping that associates each DataNode with its corresponding data center.
The mapping is specified as a comma-separated list of pairs in the format `hostname:ratis_port=dc_name`.
By establishing this association, Ozone can identify which data center a DataNode belongs to,
enabling more efficient storage management and data placement across data centers.
</description>
</property>
</configuration>
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@

import java.time.Duration;
import java.time.Instant;
import java.util.Collections;

import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleState.CLOSING;
import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleState.OPEN;
Expand Down Expand Up @@ -116,6 +117,7 @@ public static ContainerInfo.Builder newBuilderForTest() {
.setContainerID(1234)
.setPipelineID(PipelineID.randomId())
.setState(OPEN)
.setOwner("scm");
.setOwner("scm")
.setDatacenters(Collections.emptySet());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -72,6 +73,7 @@ public static Pipeline createPipeline(Iterable<DatanodeDetails> ids) {
.setReplicationConfig(
StandaloneReplicationConfig.getInstance(ReplicationFactor.ONE))
.setNodes(dns)
.setDatacenters(Collections.emptySet())
.build();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,20 +19,21 @@

import org.apache.hadoop.hdds.client.ReplicationConfig;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType;
import org.apache.hadoop.hdds.scm.AddSCMRequest;
import org.apache.hadoop.hdds.scm.ScmConfig;
import org.apache.hadoop.hdds.scm.container.common.helpers.ExcludeList;
import org.apache.hadoop.security.KerberosInfo;
import org.apache.hadoop.hdds.scm.ScmInfo;
import org.apache.hadoop.hdds.scm.container.common.helpers.AllocatedBlock;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType;
import org.apache.hadoop.hdds.scm.container.common.helpers.ExcludeList;
import org.apache.hadoop.ozone.common.BlockGroup;
import org.apache.hadoop.ozone.common.DeleteBlockGroupResult;
import org.apache.hadoop.security.KerberosInfo;

import java.io.Closeable;
import java.io.IOException;
import java.util.List;
import java.util.Set;
import java.util.concurrent.TimeoutException;

/**
Expand Down Expand Up @@ -87,7 +88,7 @@ default List<AllocatedBlock> allocateBlock(long size, int numBlocks,
ReplicationConfig replicationConfig, String owner,
ExcludeList excludeList) throws IOException {
return allocateBlock(size, numBlocks, replicationConfig, owner,
excludeList, null);
excludeList, null, null);
}

/**
Expand All @@ -109,7 +110,7 @@ default List<AllocatedBlock> allocateBlock(long size, int numBlocks,
*/
List<AllocatedBlock> allocateBlock(long size, int numBlocks,
ReplicationConfig replicationConfig, String owner,
ExcludeList excludeList, String clientMachine) throws IOException;
ExcludeList excludeList, String clientMachine, Set<String> datacenters) throws IOException;

/**
* Delete blocks for a set of object keys.
Expand Down
Loading