Skip to content

Commit c148012

Browse files
committed
Add tests and resolve ClassCastException running SparkGraphComputer on HBase
Signed-off-by: sjudeng <sjudeng@users.noreply.github.com>
1 parent 8372f60 commit c148012

File tree

9 files changed

+243
-100
lines changed

9 files changed

+243
-100
lines changed

janusgraph-hadoop-parent/janusgraph-hadoop-2/pom.xml

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,14 @@
4040
<scope>test</scope>
4141
<optional>true</optional>
4242
</dependency>
43+
<!-- Include janusgraph-hbase-core to resolve Guava StopWatch error in HBase tests.
44+
Can be removed when Guava version is updated in HBase -->
45+
<dependency>
46+
<groupId>${project.groupId}</groupId>
47+
<artifactId>janusgraph-hbase-core</artifactId>
48+
<version>${project.version}</version>
49+
<scope>test</scope>
50+
</dependency>
4351
<dependency>
4452
<groupId>org.apache.mrunit</groupId>
4553
<artifactId>mrunit</artifactId>
@@ -50,20 +58,24 @@
5058
<dependency>
5159
<groupId>org.apache.hbase</groupId>
5260
<artifactId>hbase-server</artifactId>
53-
<version>${hbase098.version}</version>
61+
<version>${hbase100.version}</version>
5462
<optional>true</optional>
5563
<scope>test</scope>
5664
<exclusions>
5765
<exclusion>
5866
<groupId>org.mortbay.jetty</groupId>
5967
<artifactId>servlet-api-2.5</artifactId>
6068
</exclusion>
69+
<exclusion>
70+
<groupId>com.lmax</groupId>
71+
<artifactId>disruptor</artifactId>
72+
</exclusion>
6173
</exclusions>
6274
</dependency>
6375
<dependency>
6476
<groupId>org.apache.hbase</groupId>
6577
<artifactId>hbase-client</artifactId>
66-
<version>${hbase098.version}</version>
78+
<version>${hbase100.version}</version>
6779
<optional>true</optional>
6880
<scope>test</scope>
6981
</dependency>

janusgraph-hadoop-parent/janusgraph-hadoop-core/src/main/java/org/janusgraph/hadoop/formats/hbase/HBaseBinaryInputFormat.java

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -24,8 +24,10 @@
2424
import org.janusgraph.hadoop.formats.util.AbstractBinaryInputFormat;
2525
import org.apache.hadoop.conf.Configuration;
2626
import org.apache.hadoop.hbase.HConstants;
27+
import org.apache.hadoop.hbase.client.Result;
2728
import org.apache.hadoop.hbase.client.Scan;
2829
import org.apache.hadoop.hbase.filter.Filter;
30+
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
2931
import org.apache.hadoop.hbase.mapreduce.TableInputFormat;
3032
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
3133
import org.apache.hadoop.hbase.mapreduce.TableRecordReader;
@@ -46,7 +48,7 @@ public class HBaseBinaryInputFormat extends AbstractBinaryInputFormat {
4648
private static final Logger log = LoggerFactory.getLogger(HBaseBinaryInputFormat.class);
4749

4850
private final TableInputFormat tableInputFormat = new TableInputFormat();
49-
private TableRecordReader tableReader;
51+
private RecordReader<ImmutableBytesWritable, Result> tableReader;
5052
private byte[] inputCFBytes;
5153
private RecordReader<StaticBuffer, Iterable<Entry>> janusgraphRecordReader;
5254

@@ -57,8 +59,7 @@ public List<InputSplit> getSplits(final JobContext jobContext) throws IOExceptio
5759

5860
@Override
5961
public RecordReader<StaticBuffer, Iterable<Entry>> createRecordReader(final InputSplit inputSplit, final TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException {
60-
tableReader =
61-
(TableRecordReader) tableInputFormat.createRecordReader(inputSplit, taskAttemptContext);
62+
tableReader = tableInputFormat.createRecordReader(inputSplit, taskAttemptContext);
6263
janusgraphRecordReader =
6364
new HBaseBinaryRecordReader(tableReader, inputCFBytes);
6465
return janusgraphRecordReader;
@@ -104,7 +105,7 @@ public void setConf(final Configuration config) {
104105
this.tableInputFormat.setConf(config);
105106
}
106107

107-
public TableRecordReader getTableReader() {
108+
public RecordReader<ImmutableBytesWritable, Result> getTableReader() {
108109
return tableReader;
109110
}
110111

janusgraph-hadoop-parent/janusgraph-hadoop-core/src/main/java/org/janusgraph/hadoop/formats/hbase/HBaseBinaryRecordReader.java

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -15,11 +15,12 @@
1515
package org.janusgraph.hadoop.formats.hbase;
1616

1717
import com.google.common.base.Preconditions;
18+
import org.apache.hadoop.hbase.client.Result;
19+
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
1820
import org.janusgraph.diskstorage.Entry;
1921
import org.janusgraph.diskstorage.StaticBuffer;
2022
import org.janusgraph.diskstorage.util.StaticArrayBuffer;
2123
import org.janusgraph.diskstorage.util.StaticArrayEntry;
22-
import org.apache.hadoop.hbase.mapreduce.TableRecordReader;
2324
import org.apache.hadoop.mapreduce.InputSplit;
2425
import org.apache.hadoop.mapreduce.RecordReader;
2526
import org.apache.hadoop.mapreduce.TaskAttemptContext;
@@ -31,11 +32,11 @@
3132

3233
public class HBaseBinaryRecordReader extends RecordReader<StaticBuffer, Iterable<Entry>> {
3334

34-
private TableRecordReader reader;
35+
private RecordReader<ImmutableBytesWritable, Result> reader;
3536

3637
private final byte[] edgestoreFamilyBytes;
3738

38-
public HBaseBinaryRecordReader(final TableRecordReader reader, final byte[] edgestoreFamilyBytes) {
39+
public HBaseBinaryRecordReader(final RecordReader<ImmutableBytesWritable, Result> reader, final byte[] edgestoreFamilyBytes) {
3940
this.reader = reader;
4041
this.edgestoreFamilyBytes = edgestoreFamilyBytes;
4142
}
@@ -66,7 +67,7 @@ public void close() throws IOException {
6667
}
6768

6869
@Override
69-
public float getProgress() {
70+
public float getProgress() throws IOException, InterruptedException {
7071
return this.reader.getProgress();
7172
}
7273

Lines changed: 115 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,115 @@
1+
// Copyright 2017 JanusGraph Authors
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
package org.janusgraph.hadoop;
16+
17+
import org.janusgraph.core.Cardinality;
18+
import org.janusgraph.core.JanusGraphVertex;
19+
import org.janusgraph.diskstorage.configuration.WriteConfiguration;
20+
import org.janusgraph.example.GraphOfTheGodsFactory;
21+
import org.janusgraph.graphdb.JanusGraphBaseTest;
22+
import org.apache.commons.configuration.ConfigurationException;
23+
import org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.GraphTraversalSource;
24+
import org.apache.tinkerpop.gremlin.spark.process.computer.SparkGraphComputer;
25+
import org.apache.tinkerpop.gremlin.structure.Direction;
26+
import org.apache.tinkerpop.gremlin.structure.Edge;
27+
import org.apache.tinkerpop.gremlin.structure.Graph;
28+
import org.apache.tinkerpop.gremlin.structure.Vertex;
29+
import org.apache.tinkerpop.gremlin.structure.util.GraphFactory;
30+
import org.junit.Test;
31+
32+
import java.io.IOException;
33+
import java.util.Iterator;
34+
import java.util.List;
35+
import java.util.Map;
36+
import java.util.Set;
37+
38+
import com.google.common.collect.Iterators;
39+
import com.google.common.collect.Lists;
40+
import com.google.common.collect.Sets;
41+
42+
import static org.junit.Assert.assertEquals;
43+
import static org.junit.Assert.assertNotNull;
44+
import static org.junit.Assert.assertTrue;
45+
46+
public abstract class AbstractInputFormatIT extends JanusGraphBaseTest {
47+
48+
@Test
49+
public void testReadGraphOfTheGods() throws Exception {
50+
GraphOfTheGodsFactory.load(graph, null, true);
51+
assertEquals(12L, (long) graph.traversal().V().count().next());
52+
Graph g = getGraph();
53+
GraphTraversalSource t = g.traversal(GraphTraversalSource.computer(SparkGraphComputer.class));
54+
assertEquals(12L, (long) t.V().count().next());
55+
}
56+
57+
@Test
58+
public void testReadWideVertexWithManyProperties() throws Exception {
59+
int numProps = 1 << 16;
60+
61+
long numV = 1;
62+
mgmt.makePropertyKey("p").cardinality(Cardinality.LIST).dataType(Integer.class).make();
63+
mgmt.commit();
64+
finishSchema();
65+
66+
for (int j = 0; j < numV; j++) {
67+
Vertex v = graph.addVertex();
68+
for (int i = 0; i < numProps; i++) {
69+
v.property("p", i);
70+
}
71+
}
72+
graph.tx().commit();
73+
74+
assertEquals(numV, (long) graph.traversal().V().count().next());
75+
Map<String, Object> propertiesOnVertex = graph.traversal().V().valueMap().next();
76+
List<?> valuesOnP = (List)propertiesOnVertex.values().iterator().next();
77+
assertEquals(numProps, valuesOnP.size());
78+
Graph g = getGraph();
79+
GraphTraversalSource t = g.traversal(GraphTraversalSource.computer(SparkGraphComputer.class));
80+
assertEquals(numV, (long) t.V().count().next());
81+
propertiesOnVertex = t.V().valueMap().next();
82+
valuesOnP = (List)propertiesOnVertex.values().iterator().next();
83+
assertEquals(numProps, valuesOnP.size());
84+
}
85+
86+
@Test
87+
public void testReadSelfEdge() throws Exception {
88+
GraphOfTheGodsFactory.load(graph, null, true);
89+
assertEquals(12L, (long) graph.traversal().V().count().next());
90+
91+
// Add a self-loop on sky with edge label "lives"; it's nonsense, but at least it needs no schema changes
92+
JanusGraphVertex sky = (JanusGraphVertex)graph.query().has("name", "sky").vertices().iterator().next();
93+
assertNotNull(sky);
94+
assertEquals("sky", sky.value("name"));
95+
assertEquals(1L, sky.query().direction(Direction.IN).edgeCount());
96+
assertEquals(0L, sky.query().direction(Direction.OUT).edgeCount());
97+
assertEquals(1L, sky.query().direction(Direction.BOTH).edgeCount());
98+
sky.addEdge("lives", sky, "reason", "testReadSelfEdge");
99+
assertEquals(2L, sky.query().direction(Direction.IN).edgeCount());
100+
assertEquals(1L, sky.query().direction(Direction.OUT).edgeCount());
101+
assertEquals(3L, sky.query().direction(Direction.BOTH).edgeCount());
102+
graph.tx().commit();
103+
104+
// Read the new edge using the inputformat
105+
Graph g = getGraph();
106+
GraphTraversalSource t = g.traversal(GraphTraversalSource.computer(SparkGraphComputer.class));
107+
Iterator<Object> edgeIdIter = t.V().has("name", "sky").bothE().id();
108+
assertNotNull(edgeIdIter);
109+
assertTrue(edgeIdIter.hasNext());
110+
Set<Object> edges = Sets.newHashSet(edgeIdIter);
111+
assertEquals(2, edges.size());
112+
}
113+
114+
abstract protected Graph getGraph() throws IOException, ConfigurationException;
115+
}

janusgraph-hadoop-parent/janusgraph-hadoop-core/src/test/java/org/janusgraph/hadoop/CassandraInputFormatIT.java

Lines changed: 14 additions & 88 deletions
Original file line numberDiff line numberDiff line change
@@ -14,102 +14,28 @@
1414

1515
package org.janusgraph.hadoop;
1616

17+
import org.apache.commons.configuration.ConfigurationException;
18+
import org.apache.commons.configuration.PropertiesConfiguration;
1719
import org.janusgraph.CassandraStorageSetup;
18-
import org.janusgraph.core.Cardinality;
19-
import org.janusgraph.core.JanusGraphVertex;
2020
import org.janusgraph.diskstorage.configuration.ModifiableConfiguration;
2121
import org.janusgraph.diskstorage.configuration.WriteConfiguration;
22-
import org.janusgraph.example.GraphOfTheGodsFactory;
23-
import org.janusgraph.graphdb.JanusGraphBaseTest;
24-
import org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.GraphTraversalSource;
25-
import org.apache.tinkerpop.gremlin.spark.process.computer.SparkGraphComputer;
26-
import org.apache.tinkerpop.gremlin.structure.Direction;
27-
import org.apache.tinkerpop.gremlin.structure.Edge;
2822
import org.apache.tinkerpop.gremlin.structure.Graph;
29-
import org.apache.tinkerpop.gremlin.structure.Vertex;
3023
import org.apache.tinkerpop.gremlin.structure.util.GraphFactory;
31-
import org.junit.Test;
3224

33-
import java.util.Iterator;
34-
import java.util.List;
35-
import java.util.Map;
36-
import java.util.Set;
25+
import java.io.IOException;
26+
import java.nio.file.Files;
27+
import java.nio.file.Path;
28+
import java.nio.file.Paths;
3729

38-
import com.google.common.collect.Iterators;
39-
import com.google.common.collect.Lists;
40-
import com.google.common.collect.Sets;
30+
public class CassandraInputFormatIT extends AbstractInputFormatIT {
4131

42-
import static org.junit.Assert.assertEquals;
43-
import static org.junit.Assert.assertNotNull;
44-
import static org.junit.Assert.assertTrue;
45-
46-
public class CassandraInputFormatIT extends JanusGraphBaseTest {
47-
48-
49-
@Test
50-
public void testReadGraphOfTheGods() {
51-
GraphOfTheGodsFactory.load(graph, null, true);
52-
assertEquals(12L, (long) graph.traversal().V().count().next());
53-
Graph g = GraphFactory.open("target/test-classes/cassandra-read.properties");
54-
GraphTraversalSource t = g.traversal(GraphTraversalSource.computer(SparkGraphComputer.class));
55-
assertEquals(12L, (long) t.V().count().next());
56-
}
57-
58-
@Test
59-
public void testReadWideVertexWithManyProperties() {
60-
int numProps = 1 << 16;
61-
62-
long numV = 1;
63-
mgmt.makePropertyKey("p").cardinality(Cardinality.LIST).dataType(Integer.class).make();
64-
mgmt.commit();
65-
finishSchema();
66-
67-
for (int j = 0; j < numV; j++) {
68-
Vertex v = graph.addVertex();
69-
for (int i = 0; i < numProps; i++) {
70-
v.property("p", i);
71-
}
72-
}
73-
graph.tx().commit();
74-
75-
assertEquals(numV, (long) graph.traversal().V().count().next());
76-
Map<String, Object> propertiesOnVertex = graph.traversal().V().valueMap().next();
77-
List<?> valuesOnP = (List)propertiesOnVertex.values().iterator().next();
78-
assertEquals(numProps, valuesOnP.size());
79-
Graph g = GraphFactory.open("target/test-classes/cassandra-read.properties");
80-
GraphTraversalSource t = g.traversal(GraphTraversalSource.computer(SparkGraphComputer.class));
81-
assertEquals(numV, (long) t.V().count().next());
82-
propertiesOnVertex = t.V().valueMap().next();
83-
valuesOnP = (List)propertiesOnVertex.values().iterator().next();
84-
assertEquals(numProps, valuesOnP.size());
85-
}
86-
87-
@Test
88-
public void testReadSelfEdge() {
89-
GraphOfTheGodsFactory.load(graph, null, true);
90-
assertEquals(12L, (long) graph.traversal().V().count().next());
91-
92-
// Add a self-loop on sky with edge label "lives"; it's nonsense, but at least it needs no schema changes
93-
JanusGraphVertex sky = (JanusGraphVertex)graph.query().has("name", "sky").vertices().iterator().next();
94-
assertNotNull(sky);
95-
assertEquals("sky", sky.value("name"));
96-
assertEquals(1L, sky.query().direction(Direction.IN).edgeCount());
97-
assertEquals(0L, sky.query().direction(Direction.OUT).edgeCount());
98-
assertEquals(1L, sky.query().direction(Direction.BOTH).edgeCount());
99-
sky.addEdge("lives", sky, "reason", "testReadSelfEdge");
100-
assertEquals(2L, sky.query().direction(Direction.IN).edgeCount());
101-
assertEquals(1L, sky.query().direction(Direction.OUT).edgeCount());
102-
assertEquals(3L, sky.query().direction(Direction.BOTH).edgeCount());
103-
graph.tx().commit();
104-
105-
// Read the new edge using the inputformat
106-
Graph g = GraphFactory.open("target/test-classes/cassandra-read.properties");
107-
GraphTraversalSource t = g.traversal(GraphTraversalSource.computer(SparkGraphComputer.class));
108-
Iterator<Object> edgeIdIter = t.V().has("name", "sky").bothE().id();
109-
assertNotNull(edgeIdIter);
110-
assertTrue(edgeIdIter.hasNext());
111-
Set<Object> edges = Sets.newHashSet(edgeIdIter);
112-
assertEquals(2, edges.size());
32+
protected Graph getGraph() throws ConfigurationException, IOException {
33+
final PropertiesConfiguration config = new PropertiesConfiguration("target/test-classes/cassandra-read.properties");
34+
Path baseOutDir = Paths.get((String) config.getProperty("gremlin.hadoop.outputLocation"));
35+
baseOutDir.toFile().mkdirs();
36+
String outDir = Files.createTempDirectory(baseOutDir, null).toAbsolutePath().toString();
37+
config.setProperty("gremlin.hadoop.outputLocation", outDir);
38+
return GraphFactory.open(config);
11339
}
11440

11541
@Override

0 commit comments

Comments
 (0)