Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@

<groupId>kr.co.vcnc.haeinsa</groupId>
<artifactId>haeinsa</artifactId>
<version>1.0.6-aol</version>
<version>1.0.7-aol-snapshot</version>
<packaging>jar</packaging>

<name>haeinsa</name>
Expand Down
5 changes: 5 additions & 0 deletions src/main/java/kr/co/vcnc/haeinsa/ForwardingHaeinsaTable.java
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,11 @@ public void commitSingleRowPutOnly(HaeinsaRowTransaction rowState, byte[] row) t
delegate().commitSingleRowPutOnly(rowState, row);
}

@Override
public void commitSingleRowDeleteAllOnly(HaeinsaRowTransaction rowState, byte[] row) throws IOException {
delegate().commitSingleRowDeleteAllOnly(rowState, row);
}

@Override
public void checkSingleRowLock(HaeinsaRowTransaction rowState, byte[] row) throws IOException {
delegate().checkSingleRowLock(rowState, row);
Expand Down
33 changes: 31 additions & 2 deletions src/main/java/kr/co/vcnc/haeinsa/HaeinsaTable.java
Original file line number Diff line number Diff line change
Expand Up @@ -481,6 +481,13 @@ public void put(HaeinsaTransaction tx, HaeinsaPut put) throws IOException {
// TODO(improvement) : Should consider to get lock when commit() called.
rowState = checkOrRecoverLock(tx, row, tableState, rowState);
}

Preconditions.checkArgument(rowState.getMutations().size() == 0 ||
rowState.getMutations().size() > 1 ||
!(rowState.getMutations().get(0) instanceof HaeinsaDelete) ||
rowState.getMutations().get(0).getFamilyMap().size() != 0,
"Delete of full row only allow if it is the only mutation on a single row");

rowState.addMutation(put);
}

Expand Down Expand Up @@ -583,14 +590,20 @@ public void delete(HaeinsaTransaction tx, HaeinsaDelete delete) throws IOExcepti
Preconditions.checkNotNull(delete);

byte[] row = delete.getRow();
// Can't delete entire row in Haeinsa because of lock column. Please specify column families when needed.
Preconditions.checkArgument(delete.getFamilyMap().size() > 0, "can't delete an entire row.");
HaeinsaTableTransaction tableState = tx.createOrGetTableState(this.table.getName().getName());
HaeinsaRowTransaction rowState = tableState.getRowStates().get(row);
if (rowState == null) {
// TODO(improvement) : Should consider to get lock when commit() called.
rowState = checkOrRecoverLock(tx, row, tableState, rowState);
}

Preconditions.checkArgument((delete.getFamilyMap().size() > 0 &&
(rowState.getMutations().size() == 0 || rowState.getMutations().size() > 1 ||
!(rowState.getMutations().get(0) instanceof HaeinsaDelete) ||
rowState.getMutations().get(0).getFamilyMap().size() != 0)) ||
(delete.getFamilyMap().size() == 0 && rowState.getMutations().size() == 0),
"Delete of full row only allow if it is the only mutation on a single row");

rowState.addMutation(delete);
}

Expand Down Expand Up @@ -628,6 +641,22 @@ public void commitSingleRowPutOnly(HaeinsaRowTransaction rowState, byte[] row) t
}
}

@Override
public void commitSingleRowDeleteAllOnly(HaeinsaRowTransaction rowState, byte[] row) throws IOException {
HaeinsaTransaction tx = rowState.getTableTransaction().getTransaction();
Delete delete = new Delete(row);
HaeinsaDelete haeinsaDelete = (HaeinsaDelete) rowState.getMutations().remove(0);
Preconditions.checkArgument(haeinsaDelete.getFamilyMap().size() == 0,
"commitSingleRowDeleteAllOnly can only be called if there is a single mutation on a single row deleting the entire row");

byte[] currentRowLockBytes = TRowLocks.serialize(rowState.getCurrent());
if (!table.checkAndDelete(row, LOCK_FAMILY, LOCK_QUALIFIER, currentRowLockBytes, delete)) {
throw new ConflictException("can't acquire row's lock, commitSingleRowDeleteAllOnly failed");
} else {
rowState.setCurrent(null);
}
}

/**
* Read {@link TRowLock} from HBase and compare that lock with prevRowLock.
* If TRowLock is changed, it means transaction is failed, so throw
Expand Down
13 changes: 13 additions & 0 deletions src/main/java/kr/co/vcnc/haeinsa/HaeinsaTableIfaceInternal.java
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,19 @@ interface HaeinsaTableIfaceInternal extends HaeinsaTableIface {
*/
void commitSingleRowPutOnly(HaeinsaRowTransaction rowState, byte[] row) throws IOException;

/**
* Commit single row delete all (empty family map) only Transaction. Delete
* all the row including the lock column (rest of Haeinsa treat missing lock column as
* STABLE with LONG_MIN commit timestamp. Separate this because Single Row delete all only
* transaction can save one checkAndPut operation to complete.
* <p>
* If TRowLock is changed and checkAndDelete failed, it means transaction is
* failed so throw {@link ConflictException}.
*
* @throws IOException ConflictException, HBase IOException.
*/
void commitSingleRowDeleteAllOnly(HaeinsaRowTransaction rowState, byte[] row) throws IOException;

/**
* Read {@link TRowLock} from HBase and compare that lock with prevRowLock.
* If TRowLock is changed, it means transaction is failed, so throw
Expand Down
115 changes: 96 additions & 19 deletions src/main/java/kr/co/vcnc/haeinsa/HaeinsaTransaction.java
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,11 @@ private static enum CommitMethod {
* If there is only one rowTx and type of its mutation is HaeinsaPut.
*/
SINGLE_ROW_PUT_ONLY,
/**
* If there is only one rowTx and type of its mutation is HaeinsaDelete and
* there is no column family (delete all)
*/
SINGLE_ROW_DELETE_ALL_ONLY,
/**
* When there is multiple rowTx and at least one of that include mutation,
* or there is only one rowTx and its mutation contains HaeinsaDelete.
Expand Down Expand Up @@ -250,26 +255,34 @@ public void commit() throws IOException {
// Than determineCommitMethod whill return NOTHING.
setPrimary(primaryRowKey);

// Can't delete entire row in Haeinsa because of lock column, unless it is the only mutation
// on a single row. Please specify column families when needed.
Preconditions.checkArgument(txStates.checkDeleteAllIfAnyForValidity(),
"Delete all is only allowed if it is the only mutation on a single row");
CommitMethod method = txStates.determineCommitMethod();
switch (method) {
case READ_ONLY: {
commitReadOnly();
break;
}
case SINGLE_ROW_PUT_ONLY: {
commitSingleRowPutOnly();
break;
}
case MULTI_ROW_MUTATIONS: {
commitMultiRowsMutation();
break;
}
case NOTHING: {
break;
}
default: {
break;
}
case READ_ONLY: {
commitReadOnly();
break;
}
case SINGLE_ROW_PUT_ONLY: {
commitSingleRowPutOnly();
break;
}
case SINGLE_ROW_DELETE_ALL_ONLY: {
commitSingleRowDeleteAllOnly();
break;
}
case MULTI_ROW_MUTATIONS: {
commitMultiRowsMutation();
break;
}
case NOTHING: {
break;
}
default: {
break;
}
}
}

Expand Down Expand Up @@ -321,6 +334,20 @@ private void commitSingleRowPutOnly() throws IOException {
}
}

/**
* Commit single row & DELETE all only (possibly include get/scan, but not any other mutation)
* Transaction.
*/
private void commitSingleRowDeleteAllOnly() throws IOException {
HaeinsaTableTransaction primaryTableState = createOrGetTableState(primary.getTableName());
HaeinsaRowTransaction primaryRowState = primaryTableState.createOrGetRowState(primary.getRow());

// commit primary row
try (HaeinsaTableIfaceInternal table = getManager().getTable(primary.getTableName())) {
table.commitSingleRowDeleteAllOnly(primaryRowState, primary.getRow());
}
}

/**
* Commit multiple row Transaction or single row Transaction which includes
* Delete operation.
Expand Down Expand Up @@ -560,7 +587,8 @@ public boolean hasChanges() {
* {@link CommitMethod#MULTI_ROW_MUTATIONS}
* <p>
* Transaction of single row with at least one of {@link HaeinsaDelete}
* will be considered as {@link CommitMethod#MULTI_ROW_MUTATIONS}.
* will be considered as {@link CommitMethod#MULTI_ROW_MUTATIONS} unless it
* is a delete all on a single row and there is no other mutation.
*/
public CommitMethod determineCommitMethod() {
int count = 0;
Expand All @@ -580,6 +608,10 @@ public CommitMethod determineCommitMethod() {
} else if (rowState.getMutations().get(0) instanceof HaeinsaPut
&& rowState.getMutations().size() == 1) {
method = CommitMethod.SINGLE_ROW_PUT_ONLY;
} else if (rowState.getMutations().get(0) instanceof HaeinsaDelete
&& ((HaeinsaDelete) rowState.getMutations().get(0)).getFamilyMap().size() == 0
&& rowState.getMutations().size() == 1) {
method = CommitMethod.SINGLE_ROW_DELETE_ALL_ONLY;
} else if (haveMuations) {
// if rowTx contiains HaeinsaDelete
method = CommitMethod.MULTI_ROW_MUTATIONS;
Expand All @@ -597,6 +629,51 @@ public CommitMethod determineCommitMethod() {
return method;
}

/**
* Return true if
* a) there is no delete all transaction, or
* b) there is one but it is the only mutation
* on a single row and there is no other row
* involved in the transaction
*/
public boolean checkDeleteAllIfAnyForValidity() {
int count = 0;
int mutationCount = 0;
boolean haveDeleteAll = false;
for (HaeinsaTableTransaction tableState : tableStates.values()) {
count++;
for (HaeinsaRowTransaction rowState : tableState.getRowStates().values()) {
mutationCount += rowState.getMutations().size();
if ((mutationCount > 1 || count > 1) && haveDeleteAll) {
return false;
}
for (HaeinsaMutation mutation : rowState.getMutations()) {
if (mutation instanceof HaeinsaDelete &&
((HaeinsaDelete) mutation).getFamilyMap().size() == 0) {
haveDeleteAll = true;
}

if ((mutationCount > 1 || count > 1) && haveDeleteAll) {
return false;
}
}
if ((mutationCount > 1 || count > 1) && haveDeleteAll) {
return false;
}
}

if ((mutationCount > 1 || count > 1) && haveDeleteAll) {
return false;
}
}

if ((mutationCount > 1 || count > 1) && haveDeleteAll) {
return false;
}

return true;
}

/**
* Return mutation rows which is hash-sorted by TRowKey(table, row).
*/
Expand Down
78 changes: 78 additions & 0 deletions src/test/java/kr/co/vcnc/haeinsa/HaeinsaUnitTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@

import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.exceptions.IllegalArgumentIOException;
import org.apache.hadoop.hbase.util.Bytes;
import org.testng.Assert;
import org.testng.annotations.Test;
Expand Down Expand Up @@ -351,6 +352,83 @@ public void testMultiPutAndMultiDelete() throws Exception {
testTable.close();
}

@Test
public void testSingleRowDeleteAll() throws Exception {
final HaeinsaTransactionManager tm = context().getTransactionManager();
final HaeinsaTableIface testTable = context().getHaeinsaTableIface("test");

HaeinsaTransaction tx = tm.begin();

HaeinsaPut put = new HaeinsaPut(Bytes.toBytes("ymkim"));
put.add(Bytes.toBytes("data"), Bytes.toBytes("phoneNumber"), Bytes.toBytes("010-1234-5678+1"));
put.add(Bytes.toBytes("data"), Bytes.toBytes("email"), Bytes.toBytes("[email protected]"));
put.add(Bytes.toBytes("data"), Bytes.toBytes("name"), Bytes.toBytes("Youngmok Kim+1"));
testTable.put(tx, put);

tx.commit();

tx = tm.begin();

HaeinsaDelete delete = new HaeinsaDelete(Bytes.toBytes("ymkim"));
testTable.delete(tx, delete);

tx.commit();

tx = tm.begin();
HaeinsaGet get = new HaeinsaGet(Bytes.toBytes("ymkim"));
HaeinsaResult result = testTable.get(tx, get);
Assert.assertTrue(result.isEmpty());
tx.rollback();

testTable.close();
}

@Test
public void testSingleRowDeleteAllAndPutShouldFail() throws Exception {
final HaeinsaTransactionManager tm = context().getTransactionManager();
final HaeinsaTableIface testTable = context().getHaeinsaTableIface("test");

try {
HaeinsaTransaction tx = tm.begin();

HaeinsaPut put = new HaeinsaPut(Bytes.toBytes("ymkim"));
put.add(Bytes.toBytes("data"), Bytes.toBytes("phoneNumber"), Bytes.toBytes("010-1234-5678+1"));
put.add(Bytes.toBytes("data"), Bytes.toBytes("email"), Bytes.toBytes("[email protected]"));
put.add(Bytes.toBytes("data"), Bytes.toBytes("name"), Bytes.toBytes("Youngmok Kim+1"));
testTable.put(tx, put);
HaeinsaDelete delete = new HaeinsaDelete(Bytes.toBytes("ymkim"));
testTable.delete(tx, delete);

tx.commit();
Assert.fail("Trying to have a delete all on a row is not allowed with any other mutation, should fail");
} catch (IllegalArgumentException ex) {
System.out.println("Success");
}
}

@Test
public void testMultipleRowDeleteAllAndPutShouldFail() throws Exception {
final HaeinsaTransactionManager tm = context().getTransactionManager();
final HaeinsaTableIface testTable = context().getHaeinsaTableIface("test");

try {
HaeinsaTransaction tx = tm.begin();

HaeinsaPut put = new HaeinsaPut(Bytes.toBytes("ymkim"));
put.add(Bytes.toBytes("data"), Bytes.toBytes("phoneNumber"), Bytes.toBytes("010-1234-5678+1"));
put.add(Bytes.toBytes("data"), Bytes.toBytes("email"), Bytes.toBytes("[email protected]"));
put.add(Bytes.toBytes("data"), Bytes.toBytes("name"), Bytes.toBytes("Youngmok Kim+1"));
testTable.put(tx, put);
HaeinsaDelete delete = new HaeinsaDelete(Bytes.toBytes("ymkim2"));
testTable.delete(tx, delete);

tx.commit();
Assert.fail("Trying to have a delete all on a row is not allowed with any other mutation, should fail");
} catch (IllegalArgumentException ex) {
System.out.println("Success");
}
}

@Test
public void testMultiRowReadOnly() throws Exception {
final HaeinsaTransactionManager tm = context().getTransactionManager();
Expand Down
10 changes: 10 additions & 0 deletions wiki/API-Usage.md
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,10 @@ This transaction put some data on specific row and then get data from same row:

tx.commit(); // commit transaction to HBase

The library also supports a HaeinsaTable.get API that takes a list of HaiensaGet's
and the implementation is faster if your application can us that API, as opposed to
doing each HaeinsaGet sequentially.

### Put

Haeinsa provides transaction on Put operation.
Expand Down Expand Up @@ -75,6 +79,12 @@ This transaction get data from specific row and then, delete the data:

tx.commit(); // commit transaction to HBase

Because of the need to coordinate trasactions across rows, delete of an entire
row (including the lock column used by Haeinsa, indicated by a HaeinsaDelete instance with
no column family) is only allowed if it is the only mutation in the transaction and there
is no other row involved in the transaction. If your delete meets these conditions, there
is an optimization that saves extra operations in the implementation.

### Intra-row scan

Haeinsa provides transaction on intra-row scan operation.
Expand Down