Skip to content

Commit 5a2dab1

Browse files
committed
[SPARK-29111][CORE] Support snapshot/restore on KVStore
1 parent 88c8d5e commit 5a2dab1

13 files changed

Lines changed: 437 additions & 41 deletions

File tree

common/kvstore/src/main/java/org/apache/spark/util/kvstore/InMemoryStore.java

Lines changed: 14 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -17,13 +17,7 @@
1717

1818
package org.apache.spark.util.kvstore;
1919

20-
import java.util.ArrayList;
21-
import java.util.Collection;
22-
import java.util.Collections;
23-
import java.util.Iterator;
24-
import java.util.HashSet;
25-
import java.util.List;
26-
import java.util.NoSuchElementException;
20+
import java.util.*;
2721
import java.util.concurrent.ConcurrentHashMap;
2822
import java.util.concurrent.ConcurrentMap;
2923
import java.util.function.BiConsumer;
@@ -106,6 +100,19 @@ public <T> KVStoreView<T> view(Class<T> type){
106100
return list != null ? list.view() : emptyView();
107101
}
108102

103+
@Override
104+
public Class<?> metadataType() {
105+
if (metadata == null) {
106+
return null;
107+
}
108+
return metadata.getClass();
109+
}
110+
111+
@Override
112+
public Set<Class<?>> types() {
113+
return inMemoryLists.data.keySet();
114+
}
115+
109116
@Override
110117
public void close() {
111118
metadata = null;

common/kvstore/src/main/java/org/apache/spark/util/kvstore/KVStore.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919

2020
import java.io.Closeable;
2121
import java.util.Collection;
22+
import java.util.Set;
2223

2324
import org.apache.spark.annotation.Private;
2425

@@ -117,6 +118,16 @@ public interface KVStore extends Closeable {
117118
*/
118119
<T> KVStoreView<T> view(Class<T> type) throws Exception;
119120

121+
/**
122+
* Returns a type of app-specific metadata from the store, or null if it's not currently set.
123+
*/
124+
Class<?> metadataType();
125+
126+
/**
127+
* Returns all available types of all objects.
128+
*/
129+
Set<Class<?>> types();
130+
120131
/**
121132
* Returns the number of items of the given type currently in the store.
122133
*/
Lines changed: 145 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,145 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.util.kvstore;
19+
20+
import com.google.common.io.ByteStreams;
21+
22+
import java.io.*;
23+
import java.nio.charset.StandardCharsets;
24+
import java.util.Set;
25+
26+
public class KVStoreSnapshotter {
27+
private static final int MARKER_END_OF_TYPE = -2;
28+
private static final int MARKER_END_OF_FILE = -1;
29+
30+
private final KVStoreSerializer serializer;
31+
32+
public KVStoreSnapshotter(KVStoreSerializer serializer) {
33+
this.serializer = serializer;
34+
}
35+
36+
public void dump(KVStore store, File snapshotFile) throws Exception {
37+
DataOutputStream output = new DataOutputStream(new FileOutputStream(snapshotFile));
38+
39+
// store metadata if it exists
40+
Class<?> metadataType = store.metadataType();
41+
if (metadataType != null) {
42+
writeClassName(metadataType, output);
43+
Object metadata = store.getMetadata(metadataType);
44+
writeObject(metadata, output);
45+
writeEndOfType(output);
46+
} else {
47+
writeEndOfType(output);
48+
}
49+
50+
Set<Class<?>> types = store.types();
51+
for (Class<?> clazz : types) {
52+
writeClassName(clazz, output);
53+
54+
KVStoreView<?> view = store.view(clazz);
55+
for (Object obj : view) {
56+
writeObject(obj, output);
57+
}
58+
59+
writeEndOfType(output);
60+
}
61+
62+
writeEndOfFile(output);
63+
output.close();
64+
}
65+
66+
public void restore(File snapshotFile, KVStore store) throws Exception {
67+
DataInputStream input = new DataInputStream(new FileInputStream(snapshotFile));
68+
69+
// first one would be metadata
70+
int metadataClazzLen = input.readInt();
71+
if (metadataClazzLen > 0) {
72+
Class<?> metadataClazz = readClassName(input, metadataClazzLen);
73+
// metadata presented
74+
int objLen = input.readInt();
75+
Object metadata = readObj(input, metadataClazz, objLen);
76+
store.setMetadata(metadata);
77+
78+
// additionally read -2 as end of type
79+
consumeEndOfType(input);
80+
}
81+
82+
boolean eof = false;
83+
while (!eof) {
84+
int typeClazzNameLen = input.readInt();
85+
if (typeClazzNameLen == MARKER_END_OF_FILE) {
86+
eof = true;
87+
} else {
88+
Class<?> typeClazz = readClassName(input, typeClazzNameLen);
89+
boolean eot = false;
90+
while (!eot) {
91+
int objLen = input.readInt();
92+
if (objLen == MARKER_END_OF_TYPE) {
93+
eot = true;
94+
} else {
95+
Object obj = readObj(input, typeClazz, objLen);
96+
store.write(obj);
97+
}
98+
}
99+
}
100+
}
101+
102+
input.close();
103+
}
104+
105+
private void writeClassName(Class<?> clazz, DataOutputStream output) throws IOException {
106+
byte[] clazzName = clazz.getCanonicalName().getBytes(StandardCharsets.UTF_8);
107+
output.writeInt(clazzName.length);
108+
output.write(clazzName);
109+
}
110+
111+
private void writeObject(Object obj, DataOutputStream output) throws Exception {
112+
byte[] ser = serializer.serialize(obj);
113+
output.writeInt(ser.length);
114+
output.write(ser);
115+
}
116+
117+
private void writeEndOfType(DataOutputStream output) throws IOException {
118+
output.writeInt(MARKER_END_OF_TYPE);
119+
}
120+
121+
private void writeEndOfFile(DataOutputStream output) throws IOException {
122+
output.writeInt(MARKER_END_OF_FILE);
123+
}
124+
125+
private Class<?> readClassName(DataInputStream input, int classNameLen) throws IOException, ClassNotFoundException {
126+
byte[] classNameBuffer = new byte[classNameLen];
127+
ByteStreams.readFully(input, classNameBuffer, 0, classNameLen);
128+
String className = new String(classNameBuffer, StandardCharsets.UTF_8);
129+
return Class.forName(className);
130+
}
131+
132+
private Object readObj(DataInputStream input, Class<?> clazz, int objLen) throws Exception {
133+
byte[] objBuffer = new byte[objLen];
134+
ByteStreams.readFully(input, objBuffer, 0, objLen);
135+
return serializer.deserialize(objBuffer, clazz);
136+
}
137+
138+
private void consumeEndOfType(DataInputStream input) throws IOException {
139+
int eotCode = input.readInt();
140+
if (eotCode != MARKER_END_OF_TYPE) {
141+
throw new IllegalStateException("The notion of 'end of type' is expected here, but got " +
142+
eotCode + " instead");
143+
}
144+
}
145+
}

common/kvstore/src/main/java/org/apache/spark/util/kvstore/LevelDB.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,7 @@ public class LevelDB implements KVStore {
6363
*/
6464
private final ConcurrentMap<String, byte[]> typeAliases;
6565
private final ConcurrentMap<Class<?>, LevelDBTypeInfo> types;
66+
private Class<?> metadataType;
6667

6768
public LevelDB(File path) throws Exception {
6869
this(path, new KVStoreSerializer());
@@ -109,8 +110,10 @@ public <T> T getMetadata(Class<T> klass) throws Exception {
109110
public void setMetadata(Object value) throws Exception {
110111
if (value != null) {
111112
put(METADATA_KEY, value);
113+
metadataType = value.getClass();
112114
} else {
113115
db().delete(METADATA_KEY);
116+
metadataType = null;
114117
}
115118
}
116119

@@ -197,6 +200,16 @@ public Iterator<T> iterator() {
197200
};
198201
}
199202

203+
@Override
204+
public Class<?> metadataType() {
205+
return metadataType;
206+
}
207+
208+
@Override
209+
public Set<Class<?>> types() {
210+
return types.keySet();
211+
}
212+
200213
@Override
201214
public <T> boolean removeAllByIndexValues(
202215
Class<T> klass,

common/kvstore/src/test/java/org/apache/spark/util/kvstore/ArrayKeyIndexType.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,6 @@ public boolean equals(Object o) {
3838

3939
@Override
4040
public int hashCode() {
41-
return key.hashCode();
41+
return Arrays.hashCode(key);
4242
}
43-
4443
}

common/kvstore/src/test/java/org/apache/spark/util/kvstore/InMemoryIteratorSuite.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717

1818
package org.apache.spark.util.kvstore;
1919

20-
public class InMemoryIteratorSuite extends DBIteratorSuite {
20+
public class InMemoryIteratorSuite extends KVStoreIteratorSuite {
2121

2222
@Override
2323
protected KVStore createStore() {

common/kvstore/src/test/java/org/apache/spark/util/kvstore/InMemoryStoreSuite.java

Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,9 +17,11 @@
1717

1818
package org.apache.spark.util.kvstore;
1919

20+
import java.util.Arrays;
2021
import java.util.NoSuchElementException;
2122

2223
import com.google.common.collect.ImmutableSet;
24+
import com.google.common.collect.Sets;
2325
import org.junit.Test;
2426
import static org.junit.Assert.*;
2527

@@ -86,6 +88,52 @@ public void testMultipleObjectWriteReadDelete() throws Exception {
8688
}
8789
}
8890

91+
@Test
92+
public void testMultipleTypesWriteReadDelete() throws Exception {
93+
KVStore store = new InMemoryStore();
94+
95+
CustomType1 t1 = new CustomType1();
96+
t1.key = "key1";
97+
t1.id = "id";
98+
t1.name = "name1";
99+
100+
IntKeyType t2 = new IntKeyType();
101+
t2.key = 2;
102+
t2.id = "2";
103+
t2.values = Arrays.asList("value1", "value2");
104+
105+
ArrayKeyIndexType t3 = new ArrayKeyIndexType();
106+
t3.key = new int[] { 42, 84 };
107+
t3.id = new String[] { "id1", "id2" };
108+
109+
store.write(t1);
110+
store.write(t2);
111+
store.write(t3);
112+
113+
assertEquals(Sets.newHashSet(IntKeyType.class, ArrayKeyIndexType.class, CustomType1.class), store.types());
114+
115+
assertEquals(t1, store.read(t1.getClass(), t1.key));
116+
assertEquals(t2, store.read(t2.getClass(), t2.key));
117+
assertEquals(t3, store.read(t3.getClass(), t3.key));
118+
119+
// There should be one "id" index with a single entry for each type.
120+
assertEquals(1, store.count(t1.getClass(), "id", t1.id));
121+
assertEquals(1, store.count(t2.getClass(), "id", t2.id));
122+
assertEquals(1, store.count(t3.getClass(), "id", t3.id));
123+
124+
// Delete the first entry; this should not affect the entries for the second type.
125+
store.delete(t1.getClass(), t1.key);
126+
assertEquals(1, store.count(t2.getClass(), "id", t2.id));
127+
assertEquals(1, store.count(t3.getClass(), "id", t3.id));
128+
129+
// Delete the remaining entries, make sure all data is gone.
130+
store.delete(t2.getClass(), t2.key);
131+
assertEquals(0, store.count(t2.getClass()));
132+
133+
store.delete(t3.getClass(), t3.key);
134+
assertEquals(0, store.count(t3.getClass()));
135+
}
136+
89137
@Test
90138
public void testMetadata() throws Exception {
91139
KVStore store = new InMemoryStore();
@@ -96,9 +144,11 @@ public void testMetadata() throws Exception {
96144
t.name = "name";
97145

98146
store.setMetadata(t);
147+
assertEquals(CustomType1.class, store.metadataType());
99148
assertEquals(t, store.getMetadata(CustomType1.class));
100149

101150
store.setMetadata(null);
151+
assertNull(store.metadataType());
102152
assertNull(store.getMetadata(CustomType1.class));
103153
}
104154

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.util.kvstore;
19+
20+
import java.util.List;
21+
22+
public class IntKeyType {
23+
@KVIndex
24+
public int key;
25+
26+
@KVIndex("id")
27+
public String id;
28+
29+
public List<String> values;
30+
31+
@Override
32+
public boolean equals(Object o) {
33+
if (o instanceof IntKeyType) {
34+
IntKeyType other = (IntKeyType) o;
35+
return key == other.key && id.equals(other.id) && values.equals(other.values);
36+
}
37+
return false;
38+
}
39+
40+
@Override
41+
public int hashCode() {
42+
return id.hashCode();
43+
}
44+
}

0 commit comments

Comments
 (0)