Skip to content

Commit 6fdc54a

Browse files
authored
HDDS-8572. Support CodecBuffer for protobuf v3 codecs. (#4693)
1 parent 0036c31 commit 6fdc54a

14 files changed

Lines changed: 281 additions & 155 deletions

File tree

hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerID.java

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,6 @@
1515
* the License.
1616
*
1717
*/
18-
1918
package org.apache.hadoop.hdds.scm.container;
2019

2120
import com.google.common.base.Preconditions;
@@ -24,14 +23,25 @@
2423
import org.apache.commons.lang3.builder.EqualsBuilder;
2524
import org.apache.commons.lang3.builder.HashCodeBuilder;
2625
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
26+
import org.apache.hadoop.hdds.utils.db.Codec;
27+
import org.apache.hadoop.hdds.utils.db.DelegatedCodec;
28+
import org.apache.hadoop.hdds.utils.db.LongCodec;
2729

2830
/**
2931
* Container ID is an integer that is a value between 1..MAX_CONTAINER ID.
3032
* <p>
3133
* We are creating a specific type for this to avoid mixing this with
3234
* normal integers in code.
35+
* <p>
36+
* This class is immutable.
3337
*/
3438
public final class ContainerID implements Comparable<ContainerID> {
39+
private static final Codec<ContainerID> CODEC = new DelegatedCodec<>(
40+
LongCodec.get(), ContainerID::valueOf, c -> c.id, true);
41+
42+
public static Codec<ContainerID> getCodec() {
43+
return CODEC;
44+
}
3545

3646
private final long id;
3747

hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/utils/db/CodecBuffer.java

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
import java.nio.ByteBuffer;
2929
import java.util.concurrent.CompletableFuture;
3030
import java.util.concurrent.atomic.AtomicInteger;
31+
import java.util.function.ToIntFunction;
3132

3233
/**
3334
* A buffer used by {@link Codec}
@@ -169,4 +170,19 @@ public CodecBuffer put(ByteBuffer buffer) {
169170
buf.writeBytes(buffer);
170171
return this;
171172
}
173+
174+
/**
175+
* Put bytes from the given source to this buffer.
176+
*
177+
* @param source put bytes to a {@link ByteBuffer} and return the size.
178+
* @return this object.
179+
*/
180+
public CodecBuffer put(ToIntFunction<ByteBuffer> source) {
181+
assertRefCnt(1);
182+
final int w = buf.writerIndex();
183+
final ByteBuffer buffer = buf.nioBuffer(w, buf.writableBytes());
184+
final int size = source.applyAsInt(buffer);
185+
buf.setIndex(buf.readerIndex(), w + size);
186+
return this;
187+
}
172188
}
Lines changed: 101 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,101 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
package org.apache.hadoop.hdds.utils.db;
19+
20+
import org.apache.ratis.util.function.CheckedFunction;
21+
22+
import javax.annotation.Nonnull;
23+
import java.io.IOException;
24+
import java.util.function.IntFunction;
25+
26+
/**
27+
* A {@link Codec} to serialize/deserialize objects by delegation.
28+
*
29+
* @param <T> The object type of this {@link Codec}.
30+
* @param <DELEGATE> The object type of the {@link #delegate}.
31+
*/
32+
public class DelegatedCodec<T, DELEGATE> implements Codec<T> {
33+
private final Codec<DELEGATE> delegate;
34+
private final CheckedFunction<DELEGATE, T, IOException> forward;
35+
private final CheckedFunction<T, DELEGATE, IOException> backward;
36+
private final boolean immutable;
37+
38+
/**
39+
* Construct a {@link Codec} using the given delegate.
40+
*
41+
* @param delegate the delegate {@link Codec}
42+
* @param forward a function to convert {@link DELEGATE} to {@link T}.
43+
* @param backward a function to convert {@link T} back to {@link DELEGATE}.
44+
* @param immutable are the objects in {@link T} immutable?
45+
*/
46+
public DelegatedCodec(Codec<DELEGATE> delegate,
47+
CheckedFunction<DELEGATE, T, IOException> forward,
48+
CheckedFunction<T, DELEGATE, IOException> backward,
49+
boolean immutable) {
50+
this.delegate = delegate;
51+
this.forward = forward;
52+
this.backward = backward;
53+
this.immutable = immutable;
54+
}
55+
56+
/** The same as new DelegatedCodec(delegate, forward, backward, false). */
57+
public DelegatedCodec(Codec<DELEGATE> delegate,
58+
CheckedFunction<DELEGATE, T, IOException> forward,
59+
CheckedFunction<T, DELEGATE, IOException> backward) {
60+
this(delegate, forward, backward, false);
61+
}
62+
63+
@Override
64+
public final boolean supportCodecBuffer() {
65+
return delegate.supportCodecBuffer();
66+
}
67+
68+
@Override
69+
public final CodecBuffer toCodecBuffer(@Nonnull T message,
70+
IntFunction<CodecBuffer> allocator) throws IOException {
71+
return delegate.toCodecBuffer(backward.apply(message), allocator);
72+
}
73+
74+
@Override
75+
public final T fromCodecBuffer(@Nonnull CodecBuffer buffer)
76+
throws IOException {
77+
return forward.apply(delegate.fromCodecBuffer(buffer));
78+
}
79+
80+
@Override
81+
public final byte[] toPersistedFormat(T message) throws IOException {
82+
return delegate.toPersistedFormat(backward.apply(message));
83+
}
84+
85+
@Override
86+
public final T fromPersistedFormat(byte[] bytes) throws IOException {
87+
return forward.apply(delegate.fromPersistedFormat(bytes));
88+
}
89+
90+
@Override
91+
public T copyObject(T message) {
92+
if (immutable) {
93+
return message;
94+
}
95+
try {
96+
return forward.apply(delegate.copyObject(backward.apply(message)));
97+
} catch (IOException e) {
98+
throw new IllegalStateException("Failed to copyObject", e);
99+
}
100+
}
101+
}

hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/LongCodec.java renamed to hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/utils/db/LongCodec.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,12 @@
2727
* Codec to convert Long to/from byte array.
2828
*/
2929
public final class LongCodec implements Codec<Long> {
30+
private static final LongCodec CODEC = new LongCodec();
31+
32+
public static LongCodec get() {
33+
return CODEC;
34+
}
35+
3036
@Override
3137
public boolean supportCodecBuffer() {
3238
return true;
Lines changed: 106 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,106 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
package org.apache.hadoop.hdds.utils.db;
19+
20+
import org.apache.ratis.thirdparty.com.google.protobuf.CodedOutputStream;
21+
import org.apache.ratis.thirdparty.com.google.protobuf.InvalidProtocolBufferException;
22+
import org.apache.ratis.thirdparty.com.google.protobuf.MessageLite;
23+
import org.apache.ratis.thirdparty.com.google.protobuf.Parser;
24+
25+
import javax.annotation.Nonnull;
26+
import java.io.IOException;
27+
import java.util.concurrent.ConcurrentHashMap;
28+
import java.util.concurrent.ConcurrentMap;
29+
import java.util.function.IntFunction;
30+
31+
/**
32+
* Codecs to serialize/deserialize Protobuf v3 messages.
33+
*/
34+
public final class Proto3Codec<M extends MessageLite>
35+
implements Codec<M> {
36+
private static final ConcurrentMap<Class<? extends MessageLite>,
37+
Codec<? extends MessageLite>> CODECS
38+
= new ConcurrentHashMap<>();
39+
40+
/**
41+
* @return the {@link Codec} for the given class.
42+
*/
43+
public static <T extends MessageLite> Codec<T> get(Class<T> clazz) {
44+
final Codec<?> codec = CODECS.computeIfAbsent(clazz, Proto3Codec::new);
45+
return (Codec<T>) codec;
46+
}
47+
48+
private static <M extends MessageLite> Parser<M> getParser(Class<M> clazz) {
49+
final String name = "PARSER";
50+
try {
51+
return (Parser<M>) clazz.getField(name).get(null);
52+
} catch (Exception e) {
53+
throw new IllegalStateException(
54+
"Failed to get " + name + " field from " + clazz, e);
55+
}
56+
}
57+
58+
private final Parser<M> parser;
59+
60+
private Proto3Codec(Class<M> clazz) {
61+
this.parser = getParser(clazz);
62+
}
63+
64+
@Override
65+
public boolean supportCodecBuffer() {
66+
return true;
67+
}
68+
69+
@Override
70+
public CodecBuffer toCodecBuffer(@Nonnull M message,
71+
IntFunction<CodecBuffer> allocator) {
72+
final int size = message.getSerializedSize();
73+
return allocator.apply(size).put(buffer -> {
74+
try {
75+
message.writeTo(CodedOutputStream.newInstance(buffer));
76+
} catch (IOException e) {
77+
throw new IllegalStateException(
78+
"Failed to writeTo: message=" + message, e);
79+
}
80+
return size;
81+
});
82+
}
83+
84+
@Override
85+
public M fromCodecBuffer(@Nonnull CodecBuffer buffer)
86+
throws InvalidProtocolBufferException {
87+
return parser.parseFrom(buffer.asReadOnlyByteBuffer());
88+
}
89+
90+
@Override
91+
public byte[] toPersistedFormat(M message) {
92+
return message.toByteArray();
93+
}
94+
95+
@Override
96+
public M fromPersistedFormat(byte[] bytes)
97+
throws InvalidProtocolBufferException {
98+
return parser.parseFrom(bytes);
99+
}
100+
101+
@Override
102+
public M copyObject(M message) {
103+
// proto messages are immutable
104+
return message;
105+
}
106+
}

hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/container/common/helpers/BlockData.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,9 @@
2020
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
2121
import org.apache.hadoop.hdds.client.BlockID;
2222
import com.google.common.base.Preconditions;
23+
import org.apache.hadoop.hdds.utils.db.Codec;
24+
import org.apache.hadoop.hdds.utils.db.DelegatedCodec;
25+
import org.apache.hadoop.hdds.utils.db.Proto3Codec;
2326

2427
import java.io.IOException;
2528
import java.util.Collections;
@@ -32,6 +35,15 @@
3235
* Helper class to convert Protobuf to Java classes.
3336
*/
3437
public class BlockData {
38+
private static final Codec<BlockData> CODEC = new DelegatedCodec<>(
39+
Proto3Codec.get(ContainerProtos.BlockData.class),
40+
BlockData::getFromProtoBuf,
41+
BlockData::getProtoBufMessage);
42+
43+
public static Codec<BlockData> getCodec() {
44+
return CODEC;
45+
}
46+
3547
private final BlockID blockID;
3648
private final Map<String, String> metadata;
3749

hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/container/common/helpers/ChunkInfoList.java

Lines changed: 18 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,23 +18,38 @@
1818
package org.apache.hadoop.ozone.container.common.helpers;
1919

2020
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
21+
import org.apache.hadoop.hdds.utils.db.Codec;
22+
import org.apache.hadoop.hdds.utils.db.DelegatedCodec;
23+
import org.apache.hadoop.hdds.utils.db.Proto3Codec;
2124

2225
import java.util.Collections;
2326
import java.util.List;
2427

2528
/**
2629
* Helper class to convert between protobuf lists and Java lists of
2730
* {@link ContainerProtos.ChunkInfo} objects.
31+
* <p>
32+
* This class is immutable.
2833
*/
2934
public class ChunkInfoList {
30-
private List<ContainerProtos.ChunkInfo> chunks;
35+
private static final Codec<ChunkInfoList> CODEC = new DelegatedCodec<>(
36+
Proto3Codec.get(ContainerProtos.ChunkInfoList.class),
37+
ChunkInfoList::getFromProtoBuf,
38+
ChunkInfoList::getProtoBufMessage,
39+
true);
40+
41+
public static Codec<ChunkInfoList> getCodec() {
42+
return CODEC;
43+
}
44+
45+
private final List<ContainerProtos.ChunkInfo> chunks;
3146

3247
public ChunkInfoList(List<ContainerProtos.ChunkInfo> chunks) {
33-
this.chunks = chunks;
48+
this.chunks = Collections.unmodifiableList(chunks);
3449
}
3550

3651
public List<ContainerProtos.ChunkInfo> asList() {
37-
return Collections.unmodifiableList(chunks);
52+
return chunks;
3853
}
3954

4055
/**

hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/metadata/BlockDataCodec.java

Lines changed: 0 additions & 47 deletions
This file was deleted.

0 commit comments

Comments
 (0)