diff --git a/pom.xml b/pom.xml
index c2db1a8b6f..a0e5eb3fc8 100644
--- a/pom.xml
+++ b/pom.xml
@@ -20,6 +20,7 @@
titan-test
titan-berkeleyje
titan-cassandra
+ titan-cassandra-cql
titan-persistit
titan-hbase
titan-es
diff --git a/titan-cassandra-cql/pom.xml b/titan-cassandra-cql/pom.xml
new file mode 100644
index 0000000000..f1b5b9e696
--- /dev/null
+++ b/titan-cassandra-cql/pom.xml
@@ -0,0 +1,49 @@
+
+
+ 4.0.0
+
+ com.thinkaurelius.titan
+ titan
+ 0.4.0-SNAPSHOT
+
+ titan-cassandra-cql
+ titan-cassandra-cql
+ http://maven.apache.org
+
+ UTF-8
+
+
+
+ com.thinkaurelius.titan
+ titan-core
+ ${titan.version}
+
+
+ com.thinkaurelius.titan
+ titan-test
+ ${titan.version}
+ test
+
+
+ com.codahale.metrics
+ metrics-core
+ ${metrics.version}
+
+
+
+ net.java.dev.jna
+ jna
+ 3.2.7
+ true
+
+
+ com.datastax.cassandra
+ cassandra-driver-core
+ 1.0.1
+
+
+
diff --git a/titan-cassandra-cql/src/main/java/org/titan/cassandra/cql/Consistency.java b/titan-cassandra-cql/src/main/java/org/titan/cassandra/cql/Consistency.java
new file mode 100644
index 0000000000..519d2ad101
--- /dev/null
+++ b/titan-cassandra-cql/src/main/java/org/titan/cassandra/cql/Consistency.java
@@ -0,0 +1,58 @@
+package org.titan.cassandra.cql;
+
+import com.datastax.driver.core.ConsistencyLevel;
+import com.google.common.base.Preconditions;
+
+/**
+ * Map CQL consistency levels to Titan.
+ *
+ * @author gciuloaica
+ *
+ */
+public enum Consistency {
+ ONE, TWO, THREE, ANY, ALL, QUORUM, LOCAL_QUORUM, EACH_QUORUM;
+
+ public static Consistency parse(String value) {
+ Preconditions.checkArgument(value != null && !value.isEmpty());
+ value = value.trim();
+ if (value.equals("1"))
+ return ONE;
+ else if (value.equals("2"))
+ return TWO;
+ else if (value.equals("3"))
+ return THREE;
+ else {
+ for (Consistency c : values()) {
+ if (c.toString().equalsIgnoreCase(value))
+ return c;
+ }
+ }
+ throw new IllegalArgumentException(
+ "Unrecognized cassandra consistency level: " + value);
+ }
+
+ public ConsistencyLevel getCqlConsistency() {
+ switch (this) {
+ case ONE:
+ return ConsistencyLevel.ONE;
+ case TWO:
+ return ConsistencyLevel.TWO;
+ case THREE:
+ return ConsistencyLevel.THREE;
+ case ALL:
+ return ConsistencyLevel.ALL;
+ case ANY:
+ return ConsistencyLevel.ANY;
+ case QUORUM:
+ return ConsistencyLevel.QUORUM;
+ case LOCAL_QUORUM:
+ return ConsistencyLevel.LOCAL_QUORUM;
+ case EACH_QUORUM:
+ return ConsistencyLevel.EACH_QUORUM;
+ default:
+ throw new IllegalArgumentException(
+ "Unrecognized consistency level: " + this);
+ }
+ }
+
+}
diff --git a/titan-cassandra-cql/src/main/java/org/titan/cassandra/cql/CqlKeyColumnValueStore.java b/titan-cassandra-cql/src/main/java/org/titan/cassandra/cql/CqlKeyColumnValueStore.java
new file mode 100644
index 0000000000..3134a575d2
--- /dev/null
+++ b/titan-cassandra-cql/src/main/java/org/titan/cassandra/cql/CqlKeyColumnValueStore.java
@@ -0,0 +1,287 @@
+package org.titan.cassandra.cql;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Set;
+import java.util.TreeSet;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.datastax.driver.core.BoundStatement;
+import com.datastax.driver.core.PreparedStatement;
+import com.datastax.driver.core.ResultSet;
+import com.datastax.driver.core.Row;
+import com.datastax.driver.core.Session;
+import com.thinkaurelius.titan.diskstorage.StaticBuffer;
+import com.thinkaurelius.titan.diskstorage.StorageException;
+import com.thinkaurelius.titan.diskstorage.keycolumnvalue.ByteBufferEntry;
+import com.thinkaurelius.titan.diskstorage.keycolumnvalue.Entry;
+import com.thinkaurelius.titan.diskstorage.keycolumnvalue.KeyColumnValueStore;
+import com.thinkaurelius.titan.diskstorage.keycolumnvalue.KeyIterator;
+import com.thinkaurelius.titan.diskstorage.keycolumnvalue.KeyRangeQuery;
+import com.thinkaurelius.titan.diskstorage.keycolumnvalue.KeySliceQuery;
+import com.thinkaurelius.titan.diskstorage.keycolumnvalue.SliceQuery;
+import com.thinkaurelius.titan.diskstorage.keycolumnvalue.StoreTransaction;
+import com.thinkaurelius.titan.diskstorage.util.RecordIterator;
+import com.thinkaurelius.titan.diskstorage.util.StaticByteBuffer;
+
+public class CqlKeyColumnValueStore implements KeyColumnValueStore {
+ private static final Logger log = LoggerFactory
+ .getLogger(CqlKeyColumnValueStore.class);
+
+ private final Session session;
+ private String name;
+
+ private final PreparedStatement readKeyStatement;
+ private final PreparedStatement readKeyValueStatement;
+ private final PreparedStatement writeKeyValueStatement;
+ private final PreparedStatement removeKeyValueStatement;
+
+ CqlKeyColumnValueStore(String name, final Session session) {
+ this.session = session;
+ this.name = name;
+ this.readKeyStatement = buildReadKeyStatement(name);
+ this.readKeyValueStatement = buildReadKeyValueStatement(name);
+ this.writeKeyValueStatement = buildWriteKeyValueStatement(name);
+ this.removeKeyValueStatement = buildRemoveKeyValueStatement(name);
+ }
+
+ @Override
+ public boolean containsKey(StaticBuffer key, StoreTransaction txh)
+ throws StorageException {
+
+ BoundStatement boundStmt = new BoundStatement(readKeyStatement);
+ boundStmt.setConsistencyLevel(CqlTransaction.getTx(txh)
+ .getReadConsistency().getCqlConsistency());
+
+ boundStmt.setBytes("rowKey", key.asByteBuffer());
+
+ ResultSet rs = session.execute(boundStmt);
+ if (rs.iterator().hasNext()) {
+ return true;
+ } else {
+ return false;
+ }
+ }
+
+ @Override
+ public List getSlice(KeySliceQuery query, StoreTransaction txh)
+ throws StorageException {
+ int limit = Integer.MAX_VALUE - 1;
+ if (query.hasLimit())
+ limit = query.getLimit();
+
+ log.info("Table: {} Key: {}", getName(), query.getKey());
+
+ BoundStatement boundStmt = new BoundStatement(readKeyValueStatement);
+ boundStmt.setConsistencyLevel(CqlTransaction.getTx(txh)
+ .getReadConsistency().getCqlConsistency());
+ boundStmt.setBytes("rowKey", query.getKey().asByteBuffer());
+ boundStmt.setBytes(1, query.getSliceStart().asByteBuffer());
+ boundStmt.setBytes(2, query.getSliceEnd().asByteBuffer());
+
+ List entries = new ArrayList(1);
+ // order of the slices matter.
+ if (query.getSliceStart().compareTo(query.getSliceEnd()) == -1) {
+ ResultSet rs = session.execute(boundStmt);
+ int counter = 0;
+ while (rs.iterator().hasNext()) {
+ Row row = rs.iterator().next();
+ ByteBuffer column = row.getBytes(0);
+ ByteBuffer value = row.getBytes(1);
+ ByteBufferEntry entry = new ByteBufferEntry(column, value);
+ entries.add(entry);
+ if (++counter > (limit - 1))
+ break;
+ }
+ log.info("Counter: {}", counter);
+ }
+ log.info("Entries: {}", entries.size());
+
+ return entries;
+ }
+
+ @Override
+ public void mutate(StaticBuffer key, List additions,
+ List deletions, StoreTransaction txh)
+ throws StorageException {
+ log.info("Table: {} Key: {} Addtions: {}, Deletions: {}", new Object[] {
+ getName(), key.toString(), additions.size(), deletions.size() });
+ if (!deletions.isEmpty()) {
+ removeEntries(key, deletions, txh);
+ }
+
+ if (!additions.isEmpty()) {
+ addEntries(key, additions, txh);
+ }
+
+ }
+
+ @Override
+ public void acquireLock(StaticBuffer key, StaticBuffer column,
+ StaticBuffer expectedValue, StoreTransaction txh)
+ throws StorageException {
+ throw new UnsupportedOperationException();
+
+ }
+
+ @Override
+ public RecordIterator getKeys(StoreTransaction txh)
+ throws StorageException {
+ StringBuilder sb = new StringBuilder("SELECT rowKey FROM ");
+ sb.append(name);
+ ResultSet rs = session.execute(sb.toString());
+ final Iterator it = rs.iterator();
+
+ // because of our cql model, we have to select distinct the keys...
+ final Set seenKeys = new TreeSet();
+ while (it.hasNext()) {
+ StaticByteBuffer key = new StaticByteBuffer(it.next().getBytes(
+ "rowKey"));
+ seenKeys.add(key);
+ }
+
+ final Iterator keyIterator = seenKeys.iterator();
+
+ return new RecordIterator() {
+
+ @Override
+ public boolean hasNext() throws StorageException {
+ return keyIterator.hasNext();
+ }
+
+ @Override
+ public StaticBuffer next() throws StorageException {
+ return keyIterator.next();
+ }
+
+ @Override
+ public void close() throws StorageException {
+ seenKeys.clear();
+ }
+ };
+ }
+
+ @Override
+ public StaticBuffer[] getLocalKeyPartition() throws StorageException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public String getName() {
+ return name;
+ }
+
+ @Override
+ public void close() throws StorageException {
+ session.shutdown();
+ }
+
+ private void addEntries(StaticBuffer key, List additions,
+ StoreTransaction txh) {
+ for (Entry entry : additions) {
+ BoundStatement boundStmt = new BoundStatement(
+ writeKeyValueStatement);
+ boundStmt.setConsistencyLevel(CqlTransaction.getTx(txh)
+ .getWriteConsistency().getCqlConsistency());
+ boundStmt.setBytes("rowKey", key.asByteBuffer());
+ boundStmt.setBytes("columnName", entry.getColumn().asByteBuffer());
+ boundStmt.setBytes("value", entry.getValue().asByteBuffer());
+
+ session.execute(boundStmt);
+
+ log.info(
+ "Add entry: name: {}; rowKey: {}; columnName: {}; value: {}",
+ new Object[] { getName(), key.toString(),
+ entry.getByteBufferColumn().toString(),
+ entry.getByteBufferValue().toString() });
+
+ }
+
+ }
+
+ private void removeEntries(StaticBuffer key, List deletions,
+ StoreTransaction txh) {
+ // FIXME: can we do it with a batch ?
+ for (StaticBuffer entry : deletions) {
+ BoundStatement boundStmt = new BoundStatement(
+ removeKeyValueStatement);
+ boundStmt.setConsistencyLevel(CqlTransaction.getTx(txh)
+ .getWriteConsistency().getCqlConsistency());
+ boundStmt.setBytes("rowKey", key.asByteBuffer());
+ boundStmt.setBytes("columnName", entry.asByteBuffer());
+ session.execute(boundStmt);
+ log.info(
+ "Remove entry: name: {}; rowKey: {}; columnName: {};\n",
+ new Object[] { getName(), key.toString(), entry.toString() });
+
+ }
+
+ }
+
+ private PreparedStatement buildReadKeyStatement(String name) {
+ StringBuilder sb = new StringBuilder();
+ sb.append("select rowKey from ").append(name).append(" where rowKey=?");
+ return session.prepare(sb.toString());
+ }
+
+ private PreparedStatement buildReadKeyRangeStatement(String name) {
+ StringBuilder sb = new StringBuilder();
+ sb.append("select rowKey from ").append(name)
+ .append(" where rowKey >= ? and rowKey < ?");
+ return session.prepare(sb.toString());
+ }
+
+ private PreparedStatement buildReadKeyValueStatement(String name) {
+ StringBuilder sb = new StringBuilder();
+ sb.append("select columnName, value from ")
+ .append(name)
+ .append(" where rowKey=? and columnName >= ? and columnName < ?");
+
+ return session.prepare(sb.toString());
+
+ }
+
+ private PreparedStatement buildWriteKeyValueStatement(String name) {
+ StringBuilder sb = new StringBuilder();
+ sb.append("INSERT INTO ").append(name)
+ .append(" (rowKey, columnName, value) values (?,?,?)");
+
+ return session.prepare(sb.toString());
+
+ }
+
+ private PreparedStatement buildRemoveKeyValueStatement(String name) {
+ StringBuilder sb = new StringBuilder();
+ sb.append("DELETE FROM ").append(name)
+ .append(" WHERE rowKey=? AND columnName=?");
+
+ return session.prepare(sb.toString());
+
+ }
+
+ @Override
+ public List> getSlice(List keys,
+ SliceQuery query, StoreTransaction txh) throws StorageException {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ @Override
+ public KeyIterator getKeys(KeyRangeQuery query, StoreTransaction txh)
+ throws StorageException {
+
+ return null;
+ }
+
+ @Override
+ public KeyIterator getKeys(SliceQuery query, StoreTransaction txh)
+ throws StorageException {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+}
diff --git a/titan-cassandra-cql/src/main/java/org/titan/cassandra/cql/CqlStoreManager.java b/titan-cassandra-cql/src/main/java/org/titan/cassandra/cql/CqlStoreManager.java
new file mode 100644
index 0000000000..32afd917fb
--- /dev/null
+++ b/titan-cassandra-cql/src/main/java/org/titan/cassandra/cql/CqlStoreManager.java
@@ -0,0 +1,308 @@
+package org.titan.cassandra.cql;
+
+import java.nio.ByteBuffer;
+import java.util.Collection;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.commons.configuration.Configuration;
+import org.apache.commons.lang.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.datastax.driver.core.BoundStatement;
+import com.datastax.driver.core.Cluster;
+import com.datastax.driver.core.Host;
+import com.datastax.driver.core.HostDistance;
+import com.datastax.driver.core.KeyspaceMetadata;
+import com.datastax.driver.core.Metadata;
+import com.datastax.driver.core.PreparedStatement;
+import com.datastax.driver.core.ResultSet;
+import com.datastax.driver.core.Row;
+import com.datastax.driver.core.Session;
+import com.datastax.driver.core.TableMetadata;
+import com.thinkaurelius.titan.diskstorage.StaticBuffer;
+import com.thinkaurelius.titan.diskstorage.StorageException;
+import com.thinkaurelius.titan.diskstorage.common.DistributedStoreManager;
+import com.thinkaurelius.titan.diskstorage.keycolumnvalue.ConsistencyLevel;
+import com.thinkaurelius.titan.diskstorage.keycolumnvalue.KCVMutation;
+import com.thinkaurelius.titan.diskstorage.keycolumnvalue.KeyColumnValueStoreManager;
+import com.thinkaurelius.titan.diskstorage.keycolumnvalue.StoreFeatures;
+import com.thinkaurelius.titan.diskstorage.keycolumnvalue.StoreTransaction;
+
+/**
+ * CqlStorageManager
+ *
+ * @author gciuloaica
+ *
+ */
+public class CqlStoreManager extends DistributedStoreManager implements
+ KeyColumnValueStoreManager {
+
+ private static final Logger log = LoggerFactory
+ .getLogger(CqlStoreManager.class);
+
+ /**
+ * Default port at which to attempt Cassandra Native connection.
+ *
+ * Value = {@value}
+ */
+ public static final int PORT_DEFAULT = 9042;
+
+ public static final int REPLICATION_FACTOR_DEFAULT = 1;
+
+ public static final String READ_CONSISTENCY_LEVEL_KEY = "read-consistency-level";
+ public static final String READ_CONSISTENCY_LEVEL_DEFAULT = "QUORUM";
+
+ public static final String WRITE_CONSISTENCY_LEVEL_KEY = "write-consistency-level";
+ public static final String REPLICATION_FACTOR_KEY = "replication-factor";
+
+ public static final String WRITE_CONSISTENCY_LEVEL_DEFAULT = "QUORUM";
+
+ protected static final String SYSTEM_PROPERTIES_CF = "system_properties";
+ protected static final String SYSTEM_PROPERTIES_KEY = "general";
+
+ private final String SYSTEM_INSERT_STMT = "INSERT INTO "
+ + SYSTEM_PROPERTIES_CF
+ + " (rowKey, columnName, value) VALUES (?,?,?)";
+
+ /**
+ * Default name for the Cassandra keyspace
+ *
+ * Value = {@value}
+ */
+ public static final String KEYSPACE_DEFAULT = "titan";
+ public static final String KEYSPACE_KEY = "keyspace";
+
+ /**
+ * Default name for the Cassandra cluster
+ *
+ * Value = {@value}
+ */
+ public static final String CLUSTER_DEFAULT = "Titan Cluster";
+ public static final String CLUSTER_KEY = "cluster-name";
+
+ /**
+ * Maximum pooled connections per host.
+ *
+ * Value = {@value}
+ */
+ public static final int MAX_CONNECTIONS_PER_HOST_DEFAULT = 6;
+ public static final String MAX_CONNECTIONS_PER_HOST_KEY = "max-connections-per-host";
+
+ protected final String keySpaceName;
+ protected final int replicationFactor;
+
+ private final int maxConnectionsPerHost;
+
+ private final Consistency readConsistencyLevel;
+ private final Consistency writeConsistencyLevel;
+
+ private StoreFeatures features = null;
+
+ private Cluster cluster;
+
+ private final Map openStores = new ConcurrentHashMap(
+ 8);
+
+ public CqlStoreManager(Configuration storageConfig) {
+ super(storageConfig, PORT_DEFAULT);
+
+ this.keySpaceName = storageConfig.getString(KEYSPACE_KEY,
+ KEYSPACE_DEFAULT);
+
+ this.replicationFactor = storageConfig.getInt(REPLICATION_FACTOR_KEY,
+ REPLICATION_FACTOR_DEFAULT);
+
+ this.readConsistencyLevel = Consistency.parse(storageConfig.getString(
+ READ_CONSISTENCY_LEVEL_KEY, READ_CONSISTENCY_LEVEL_DEFAULT));
+
+ this.writeConsistencyLevel = Consistency.parse(storageConfig.getString(
+ WRITE_CONSISTENCY_LEVEL_KEY, WRITE_CONSISTENCY_LEVEL_DEFAULT));
+
+ this.maxConnectionsPerHost = storageConfig.getInt(
+ MAX_CONNECTIONS_PER_HOST_KEY, MAX_CONNECTIONS_PER_HOST_DEFAULT);
+ connect();
+ if (cluster.getMetadata().getKeyspace(keySpaceName) == null) {
+ createKeyspace(keySpaceName);
+ createSystemTable();
+ }
+
+ }
+
+ @Override
+ public StoreTransaction beginTransaction(ConsistencyLevel level)
+ throws StorageException {
+ return new CqlTransaction(level, readConsistencyLevel,
+ writeConsistencyLevel);
+ }
+
+ @Override
+ public void close() throws StorageException {
+ log.info("Close Storage.");
+ openStores.clear();
+ cluster.shutdown();
+
+ }
+
+ @Override
+ public void clearStorage() throws StorageException {
+ log.info("Clear storage.");
+ Session session = cluster.connect();
+ session.execute("DROP KEYSPACE " + keySpaceName + ";");
+ session.shutdown();
+ }
+
+ @Override
+ public StoreFeatures getFeatures() {
+ if (features == null) {
+ features = new StoreFeatures();
+ features.supportsScan = true;
+ features.supportsBatchMutation = false;
+ features.supportsTransactions = false;
+ features.supportsConsistentKeyOperations = true;
+ features.supportsLocking = false;
+ features.isDistributed = true;
+ features.isKeyOrdered = false;
+ features.hasLocalKeyPartition = false;
+ }
+ return features;
+ }
+
+ @Override
+ public String getConfigurationProperty(String key) throws StorageException {
+ Session session = cluster.connect(keySpaceName);
+ StringBuilder sb = new StringBuilder("SELECT value FROM ")
+ .append(SYSTEM_PROPERTIES_CF).append(" WHERE rowKey=? ")
+ .append(" AND columnName=?");
+
+ PreparedStatement stmt = session.prepare(sb.toString());
+ BoundStatement boundStmt = new BoundStatement(stmt);
+ boundStmt.setConsistencyLevel(Consistency.QUORUM.getCqlConsistency());
+ boundStmt.setBytes("rowKey",
+ ByteBuffer.wrap(SYSTEM_PROPERTIES_KEY.getBytes()));
+ boundStmt.setString("columnName", key);
+ ResultSet rs = session.execute(boundStmt);
+ String value = null;
+ if (rs.iterator().hasNext()) {
+ Row row = rs.iterator().next();
+ ByteBuffer bb = row.getBytes("value");
+ int length = bb.remaining();
+ byte[] buffer = new byte[length];
+ bb.get(buffer, 0, length);
+ value = new String(buffer);
+ }
+ session.shutdown();
+ return value;
+
+ }
+
+ @Override
+ public void setConfigurationProperty(String key, String value)
+ throws StorageException {
+ Session session = cluster.connect(keySpaceName);
+ PreparedStatement stmt = session.prepare(SYSTEM_INSERT_STMT);
+ BoundStatement boundStmt = new BoundStatement(stmt);
+ boundStmt.setConsistencyLevel(Consistency.QUORUM.getCqlConsistency());
+ boundStmt.setBytes("rowKey",
+ ByteBuffer.wrap(SYSTEM_PROPERTIES_KEY.getBytes()));
+ boundStmt.setString("columnName", key);
+ boundStmt.setBytes("value", ByteBuffer.wrap(value.getBytes()));
+ session.execute(boundStmt);
+ session.shutdown();
+
+ }
+
+ @Override
+ public CqlKeyColumnValueStore openDatabase(String name)
+ throws StorageException {
+ if (openStores.containsKey(name)) {
+ return openStores.get(name);
+ } else {
+ ensureColumnFamilyExists(name);
+ Session session = cluster.connect(keySpaceName);
+ CqlKeyColumnValueStore store = new CqlKeyColumnValueStore(name,
+ session);
+ openStores.put(name, store);
+ return store;
+ }
+
+ }
+
+ @Override
+ public void mutateMany(
+ Map> mutations,
+ StoreTransaction txh) throws StorageException {
+ // TODO Auto-generated method stub
+ log.info("Mutate many");
+
+ }
+
+ private void connect() {
+ Cluster.Builder builder = Cluster.builder().addContactPoints(
+ StringUtils.join(hostnames, ","));
+ builder.poolingOptions().setMaxConnectionsPerHost(HostDistance.LOCAL,
+ maxConnectionsPerHost);
+ cluster = builder.build();
+ Metadata metadata = cluster.getMetadata();
+ log.info("Connected to cluster: %s\n", metadata.getClusterName());
+ for (Host host : metadata.getAllHosts()) {
+ log.info("Datatacenter: {}; Host: {}; Rack: {}\n",
+ new Object[] { host.getDatacenter(),
+ host.getAddress().toString(), host.getRack() });
+ }
+ }
+
+ private void createKeyspace(String name) {
+ Session session = cluster.connect();
+ StringBuilder sb = new StringBuilder("CREATE KEYSPACE ")
+ .append(name)
+ .append(" WITH replication = {'class':'SimpleStrategy', 'replication_factor':")
+ .append(String.valueOf(replicationFactor)).append("};");
+ session.execute(sb.toString());
+ session.shutdown();
+ }
+
+ private void ensureColumnFamilyExists(String name) {
+ Metadata metadata = cluster.getMetadata();
+ KeyspaceMetadata keyspaceMetadata = metadata.getKeyspace(keySpaceName);
+ Collection tablesMetadata = keyspaceMetadata.getTables();
+ boolean isTablePresent = false;
+ for (TableMetadata tableMetdata : tablesMetadata) {
+ if (tableMetdata.getName().equalsIgnoreCase(name)) {
+ isTablePresent = true;
+ break;
+ }
+ }
+ if (!isTablePresent) {
+ createTable(name);
+ }
+
+ }
+
+ private void createTable(String name) {
+ Session session = cluster.connect(keySpaceName);
+ StringBuilder sb = new StringBuilder("CREATE TABLE ")
+ .append(name)
+ .append(" (rowKey blob, columnName blob, value blob, PRIMARY KEY(rowKey, columnName));");
+ session.execute(sb.toString());
+ session.shutdown();
+
+ }
+
+ private void createSystemTable() {
+ Session session = cluster.connect(keySpaceName);
+ StringBuilder sb = new StringBuilder("CREATE TABLE ")
+ .append(SYSTEM_PROPERTIES_CF)
+ .append(" (rowKey blob, columnName text, value blob, PRIMARY KEY(rowKey, columnName));");
+ session.execute(sb.toString());
+ session.shutdown();
+
+ }
+
+ @Override
+ public String getName() {
+ return KEYSPACE_DEFAULT;
+ }
+
+}
diff --git a/titan-cassandra-cql/src/main/java/org/titan/cassandra/cql/CqlTransaction.java b/titan-cassandra-cql/src/main/java/org/titan/cassandra/cql/CqlTransaction.java
new file mode 100644
index 0000000000..f5ab14ec6b
--- /dev/null
+++ b/titan-cassandra-cql/src/main/java/org/titan/cassandra/cql/CqlTransaction.java
@@ -0,0 +1,41 @@
+package org.titan.cassandra.cql;
+
+import com.google.common.base.Preconditions;
+import com.thinkaurelius.titan.diskstorage.common.AbstractStoreTransaction;
+import com.thinkaurelius.titan.diskstorage.keycolumnvalue.ConsistencyLevel;
+import com.thinkaurelius.titan.diskstorage.keycolumnvalue.StoreTransaction;
+
+public class CqlTransaction extends AbstractStoreTransaction {
+
+ private final Consistency readConsistency;
+ private final Consistency writeConsistency;
+
+ public CqlTransaction(ConsistencyLevel level, Consistency readConsistency,
+ Consistency writeConsistency) {
+ super(level);
+ if (level == ConsistencyLevel.KEY_CONSISTENT) {
+ this.readConsistency = Consistency.QUORUM;
+ this.writeConsistency = Consistency.QUORUM;
+ } else {
+ Preconditions.checkNotNull(readConsistency);
+ Preconditions.checkNotNull(writeConsistency);
+ this.readConsistency = readConsistency;
+ this.writeConsistency = writeConsistency;
+ }
+ }
+
+ public Consistency getWriteConsistency() {
+ return writeConsistency;
+ }
+
+ public Consistency getReadConsistency() {
+ return readConsistency;
+ }
+
+ public static CqlTransaction getTx(StoreTransaction txh) {
+ Preconditions.checkArgument(txh != null
+ && (txh instanceof CqlTransaction));
+ return (CqlTransaction) txh;
+ }
+
+}
diff --git a/titan-cassandra-cql/src/main/resources/log4j.properties b/titan-cassandra-cql/src/main/resources/log4j.properties
new file mode 100644
index 0000000000..09f42a86f2
--- /dev/null
+++ b/titan-cassandra-cql/src/main/resources/log4j.properties
@@ -0,0 +1,14 @@
+# A1 is set to be a ConsoleAppender.
+log4j.appender.A1=org.apache.log4j.ConsoleAppender
+
+# A1 uses PatternLayout.
+log4j.appender.A1.layout=org.apache.log4j.PatternLayout
+log4j.appender.A1.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n
+
+# Set root logger level to the designated level and its only appender to A1.
+log4j.rootLogger=INFO, A1
+
+log4j.logger.org.apache.cassandra=INFO
+log4j.logger.org.titan.cassandra.cql=INFO
+log4j.logger.com.datastax.driver=ERROR
+log4j.logger.com.yammer.metrics.reporting=ERROR
diff --git a/titan-cassandra-cql/src/main/resources/titan.properties b/titan-cassandra-cql/src/main/resources/titan.properties
new file mode 100644
index 0000000000..29354aa8c4
--- /dev/null
+++ b/titan-cassandra-cql/src/main/resources/titan.properties
@@ -0,0 +1,3 @@
+titan.version=${project.version}
+titan.compatible-versions=${titan.compatible.versions}
+storage.cassandra-cql=org.titan.cassandra.cql.CqlStoreManager
diff --git a/titan-cassandra-cql/src/test/java/org/titan/cassandra/cql/TestStorage.java b/titan-cassandra-cql/src/test/java/org/titan/cassandra/cql/TestStorage.java
new file mode 100644
index 0000000000..c95c6e8bf3
--- /dev/null
+++ b/titan-cassandra-cql/src/test/java/org/titan/cassandra/cql/TestStorage.java
@@ -0,0 +1,32 @@
+package org.titan.cassandra.cql;
+
+import static org.junit.Assert.*;
+
+import org.apache.commons.configuration.BaseConfiguration;
+import org.apache.commons.configuration.Configuration;
+import org.junit.Test;
+
+import com.thinkaurelius.titan.diskstorage.StorageException;
+
+public class TestStorage {
+
+ @Test
+ public void testCreateStorage() throws StorageException {
+
+ CqlStoreManager manager = new CqlStoreManager(
+ getCassandraStorageConfiguration());
+ CqlKeyColumnValueStore keystore = manager.openDatabase("titan");
+
+ assertNotNull(keystore);
+ manager.clearStorage();
+ manager.close();
+
+ }
+
+ private Configuration getCassandraStorageConfiguration() {
+ BaseConfiguration config = new BaseConfiguration();
+ return config;
+
+ }
+
+}
diff --git a/titan-cassandra-cql/src/test/java/org/titan/cassandra/cql/blueprints/CqlBlueprintsTest.java b/titan-cassandra-cql/src/test/java/org/titan/cassandra/cql/blueprints/CqlBlueprintsTest.java
new file mode 100644
index 0000000000..e2b5918c4b
--- /dev/null
+++ b/titan-cassandra-cql/src/test/java/org/titan/cassandra/cql/blueprints/CqlBlueprintsTest.java
@@ -0,0 +1,56 @@
+package org.titan.cassandra.cql.blueprints;
+
+import static com.thinkaurelius.titan.graphdb.configuration.GraphDatabaseConfiguration.STORAGE_BACKEND_KEY;
+import static com.thinkaurelius.titan.graphdb.configuration.GraphDatabaseConfiguration.STORAGE_NAMESPACE;
+
+import org.apache.commons.configuration.BaseConfiguration;
+import org.apache.commons.configuration.Configuration;
+import org.titan.cassandra.cql.CqlStoreManager;
+
+import com.thinkaurelius.titan.blueprints.TitanBlueprintsTest;
+import com.thinkaurelius.titan.core.TitanFactory;
+import com.thinkaurelius.titan.diskstorage.StorageException;
+import com.tinkerpop.blueprints.Graph;
+
+public class CqlBlueprintsTest extends TitanBlueprintsTest {
+
+ @Override
+ public Graph generateGraph() {
+ Configuration config = new BaseConfiguration();
+ config.subset(STORAGE_NAMESPACE).addProperty(STORAGE_BACKEND_KEY,
+ "cassandra-cql");
+ Graph g = TitanFactory.open(config);
+ return g;
+ }
+
+ @Override
+ public boolean supportsMultipleGraphs() {
+ return false;
+ }
+
+ @Override
+ public void startUp() {
+
+ }
+
+ @Override
+ public void shutDown() {
+
+ }
+
+ @Override
+ public void cleanUp() throws StorageException {
+ Configuration config = new BaseConfiguration();
+ config.subset(STORAGE_NAMESPACE).addProperty(STORAGE_BACKEND_KEY,
+ "cassandra-cql");
+ CqlStoreManager manager = new CqlStoreManager(config);
+ manager.clearStorage();
+ }
+
+ @Override
+ public Graph generateGraph(String graph) {
+ throw new UnsupportedOperationException();
+ }
+
+
+}
diff --git a/titan-cassandra-cql/src/test/java/org/titan/cassandra/cql/blueprints/CqlGraphTest.java b/titan-cassandra-cql/src/test/java/org/titan/cassandra/cql/blueprints/CqlGraphTest.java
new file mode 100644
index 0000000000..7afdadbaf5
--- /dev/null
+++ b/titan-cassandra-cql/src/test/java/org/titan/cassandra/cql/blueprints/CqlGraphTest.java
@@ -0,0 +1,28 @@
+package org.titan.cassandra.cql.blueprints;
+
+import static com.thinkaurelius.titan.graphdb.configuration.GraphDatabaseConfiguration.STORAGE_BACKEND_KEY;
+import static com.thinkaurelius.titan.graphdb.configuration.GraphDatabaseConfiguration.STORAGE_NAMESPACE;
+
+import org.apache.commons.configuration.BaseConfiguration;
+import org.apache.commons.configuration.Configuration;
+
+import com.thinkaurelius.titan.graphdb.TitanGraphTest;
+
+public class CqlGraphTest extends TitanGraphTest {
+
+ public CqlGraphTest() {
+
+ super(getConfig());
+ }
+
+ private static Configuration getConfig() {
+ Configuration config = new BaseConfiguration();
+ config.subset(STORAGE_NAMESPACE).addProperty(STORAGE_BACKEND_KEY,
+ "cassandra-cql");
+ return config;
+
+ }
+
+
+
+}
diff --git a/titan-cassandra-cql/src/test/java/org/titan/cassandra/cql/blueprints/CqlKeyColumnValueStoreTest.java b/titan-cassandra-cql/src/test/java/org/titan/cassandra/cql/blueprints/CqlKeyColumnValueStoreTest.java
new file mode 100644
index 0000000000..b571499d43
--- /dev/null
+++ b/titan-cassandra-cql/src/test/java/org/titan/cassandra/cql/blueprints/CqlKeyColumnValueStoreTest.java
@@ -0,0 +1,27 @@
+package org.titan.cassandra.cql.blueprints;
+
+import static com.thinkaurelius.titan.graphdb.configuration.GraphDatabaseConfiguration.STORAGE_BACKEND_KEY;
+import static com.thinkaurelius.titan.graphdb.configuration.GraphDatabaseConfiguration.STORAGE_NAMESPACE;
+
+import org.apache.commons.configuration.BaseConfiguration;
+import org.apache.commons.configuration.Configuration;
+import org.titan.cassandra.cql.CqlStoreManager;
+
+import com.thinkaurelius.titan.diskstorage.KeyColumnValueStoreTest;
+import com.thinkaurelius.titan.diskstorage.StorageException;
+import com.thinkaurelius.titan.diskstorage.keycolumnvalue.KeyColumnValueStoreManager;
+
+public class CqlKeyColumnValueStoreTest extends KeyColumnValueStoreTest{
+
+
+ @Override
+ public KeyColumnValueStoreManager openStorageManager()
+ throws StorageException {
+ Configuration config = new BaseConfiguration();
+ config.subset(STORAGE_NAMESPACE).addProperty(STORAGE_BACKEND_KEY,
+ "cassandra-cql");
+
+ return new CqlStoreManager(config);
+ }
+
+}
diff --git a/titan-cassandra-cql/src/test/java/org/titan/cassandra/cql/blueprints/CqlPerformanceGraphTest.java b/titan-cassandra-cql/src/test/java/org/titan/cassandra/cql/blueprints/CqlPerformanceGraphTest.java
new file mode 100644
index 0000000000..081b3bb76c
--- /dev/null
+++ b/titan-cassandra-cql/src/test/java/org/titan/cassandra/cql/blueprints/CqlPerformanceGraphTest.java
@@ -0,0 +1,26 @@
+package org.titan.cassandra.cql.blueprints;
+
+import static com.thinkaurelius.titan.graphdb.configuration.GraphDatabaseConfiguration.STORAGE_BACKEND_KEY;
+import static com.thinkaurelius.titan.graphdb.configuration.GraphDatabaseConfiguration.STORAGE_NAMESPACE;
+
+import org.apache.commons.configuration.BaseConfiguration;
+import org.apache.commons.configuration.Configuration;
+
+import com.thinkaurelius.titan.graphdb.TitanGraphPerformanceTest;
+
+public class CqlPerformanceGraphTest extends TitanGraphPerformanceTest{
+
+ public CqlPerformanceGraphTest() {
+ super(getConfig());
+ }
+
+ private static Configuration getConfig() {
+ Configuration config = new BaseConfiguration();
+ config.subset(STORAGE_NAMESPACE).addProperty(STORAGE_BACKEND_KEY,
+ "cassandra-cql");
+ return config;
+
+ }
+
+
+}