Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

import static com.google.cloud.firestore.LocalFirestoreHelper.map;
import static com.google.common.collect.Sets.newHashSet;
import static com.google.common.truth.Truth.assertWithMessage;
import static java.util.Arrays.asList;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
Expand All @@ -30,7 +31,7 @@
import com.google.api.core.ApiFutures;
import com.google.cloud.Timestamp;
import com.google.cloud.firestore.CollectionReference;
import com.google.cloud.firestore.DocumentChange.Type;
import com.google.cloud.firestore.DocumentChange;
import com.google.cloud.firestore.DocumentReference;
import com.google.cloud.firestore.DocumentSnapshot;
import com.google.cloud.firestore.EventListener;
Expand All @@ -53,20 +54,23 @@
import com.google.cloud.firestore.Transaction.Function;
import com.google.cloud.firestore.WriteBatch;
import com.google.cloud.firestore.WriteResult;
import com.google.common.base.Joiner;
import com.google.common.base.Optional;
import com.google.common.base.Predicate;
import com.google.common.collect.FluentIterable;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Sets;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeSet;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
Expand Down Expand Up @@ -889,95 +893,195 @@ public void onEvent(

@Test
public void queryWatch() throws Exception {
final Semaphore semaphore = new Semaphore(0);
// This test has quite a bit of machinery to it, as there are several things going on.
// The main scenario we are test is: 'Given a query, listen for snapshot updates made to that
// query'
//
// To verify this behavior we have the following things happening:
// 1. A defined query with an attached snapshot listener.
// 2. Two different documents that we are making changes to throughout the duration of the
// test. These changes are expected to trigger a number of events that should be delivered
// to the snapshot listener from 1.
// 3. Once all the document updates have been performed, and the expected number of events
// has been delivered to the snapshot listener, the events will be queried for the expected
// series of events and will then be asserted on.
//
// The mechanics of how the test is executed are as follows
// A. All events delivered to the listener in 1 are added to a list of events we receive.
// B. A separate ExecutorService is created and has tasks submitted to it to perform the
// series of document updates mentioned in 2. (Each document gets its own task).
// * Each of these update tasks will wait for the snapshot listener to be active before
// performing the updates.
// C. CountDownLatches are used to keep track of completion of work while the test is running.
// * Every time an event is received by the listener in 1 a CDL is decremented.
// * Each document update task has a CDL that is decremented when all actions have been
// performed
// * Each CDL has a timeout associated with its await (this is important so that in the
// case there is a hang during the test the whole suite isn't taken with it)
// D. After all updates have been performed and the expected number of events have been
// received, assertions on the events will be performed.
ListenerRegistration registration = null;
final ExecutorService updatesExecutor = Executors.newCachedThreadPool();

final Set<String> expectedEvents =
Sets.newTreeSet(
newHashSet("event 0", "event 1", "event 2", "event 3", "event 4", "event 5"));
final Set<String> actualEvents = Collections.synchronizedSet(new TreeSet<String>());
final List<ListenerEvent> receivedEvents =
Collections.synchronizedList(new ArrayList<ListenerEvent>());

try {
// create our CDLs that are used to track work completion
final CountDownLatch doc1CDL = new CountDownLatch(1);
final CountDownLatch doc2CDL = new CountDownLatch(1);
final CountDownLatch eventsCDL = new CountDownLatch(6);

// create a CDL that is used to signal the snapshot lister is active.
// if we don't do this, there is a possible race with out document updates happening before
// the listener has received the "empty" event that is expected in the case that the
// result set of the query would be empty (i.e. in the case of an empty collection with new
// documents being created)
final CountDownLatch snapshotListerActive = new CountDownLatch(1);

final Query query = randomColl.whereEqualTo("foo", "bar");
// register the snapshot listener for the query
registration =
randomColl
.whereEqualTo("foo", "bar")
.addSnapshotListener(
new EventListener<QuerySnapshot>() {
DocumentReference ref1;
DocumentReference ref2;

@Override
public void onEvent(
@Nullable QuerySnapshot value, @Nullable FirestoreException error) {
System.out.printf("onEvent(value : %s, error : %s)%n", value, error);
try {
switch (semaphore.availablePermits()) {
case 0:
actualEvents.add("event 0");
assertTrue(value.isEmpty());
ref1 = randomColl.add(map("foo", "foo")).get();
ref2 = randomColl.add(map("foo", "bar")).get();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the problem with this test is that these two adds are not guaranteed to generate two different events. One way of fixing this would be either split these up into two tasks or use a WriteBatch for both writes, which is guaranteed to only raise one event of size 2.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This scenario feels a bit like a different test case to me. I can look at adding another test if that sounds good to you?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's different for sure - but this test is just meant to show that basic Watch functionality works. The actual steps that we verify are of minor importance.

break;
case 1:
actualEvents.add("event 1");
assertEquals(1, value.size());
assertEquals(1, value.getDocumentChanges().size());
assertEquals(Type.ADDED, value.getDocumentChanges().get(0).getType());
ref1.set(map("foo", "bar"));
break;
case 2:
actualEvents.add("event 2");
assertEquals(2, value.size());
assertEquals(1, value.getDocumentChanges().size());
assertEquals(Type.ADDED, value.getDocumentChanges().get(0).getType());
ref1.set(map("foo", "bar", "bar", " foo"));
break;
case 3:
actualEvents.add("event 3");
assertEquals(2, value.size());
assertEquals(1, value.getDocumentChanges().size());
assertEquals(
Type.MODIFIED, value.getDocumentChanges().get(0).getType());
ref2.set(map("foo", "foo"));
break;
case 4:
actualEvents.add("event 4");
assertEquals(1, value.size());
assertEquals(1, value.getDocumentChanges().size());
assertEquals(Type.REMOVED, value.getDocumentChanges().get(0).getType());
ref1.delete();
break;
case 5:
actualEvents.add("event 5");
assertTrue(value.isEmpty());
assertEquals(1, value.getDocumentChanges().size());
assertEquals(Type.REMOVED, value.getDocumentChanges().get(0).getType());
break;
}
} catch (Exception e) {
fail(e.getMessage());
}
semaphore.release();
}
});

final boolean tryAcquire = semaphore.tryAcquire(6, 60, TimeUnit.SECONDS);

final Joiner j = Joiner.on(", ");
final String expectedString = j.join(expectedEvents);
final String actualString = j.join(actualEvents);
assertTrue(
String.format(
"did not receive all expected events within the deadline.%n"
+ "expectedEvents = [%s]%n"
+ " actualEvents = [%s]%n",
expectedString, actualString),
tryAcquire);
query.addSnapshotListener(
new EventListener<QuerySnapshot>() {
@Override
public void onEvent(
@Nullable QuerySnapshot value, @Nullable FirestoreException error) {
snapshotListerActive.countDown();
receivedEvents.add(new ListenerEvent(value, error));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should never see an error here. To simplify this test, I would add an assert here that the error is null (and maybe that value is not) and replace ListenerEvent with QuerySnapshot.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Asserts inside the snapshotListener unfortunately do not automatically cascade outside the listener and as such can't fail the test, that I why I pulled it up to the top level and added the assertion after all the operations.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What happens if we don't see the right number of snapshots (we get an error but we ignore it and don't add a receivedEvent)? My only reservation with this PR is that the generic nature of it adds a lot of code that makes the event flow somewhat hard to parse. I'm totally biased here, but the original test seems significantly easier to follow.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When I started investigating why this tests would fail ~65% of the time in CI I narrowed it down to the fact that the semaphore wasn't ever being satisfied. In #5746 I added a timeout as well as some diagnostic information to the test and found that the failure always occurred in the case 0 brach and resulted in the other branches never being executed. To ensure that performing new document updates wouldn't ever block the processing of received I separated the update and the lister as presented in this PR. I have ran this new implementation continuously for 2,500 iterations and all 2,500 runs passed.

I agree that it is more code -- a fair amount of it would be reduced if we were able to use java 1.8 constructs (which will hopefully happen sooner rather than later) -- and not super straightforward from a cursory glance, but I don't think the original test was either. One advantage with this new implementation is that it's more explicit about what the interrelationships are between the listener and document updates. In addition, there are explicit timeouts defined for each type of operation so we don't ever hang the entire build, and having the assertions outside the listeners means if there is a failure it will actually fail the test.

If it isn't important for doc1 and doc2 to be updated independent from one another I can merge those workers. If we want to avoid the anonymous inner classes from FluentIterable I can rewrite that with for loops.

eventsCDL.countDown();
}
});

// Perform a series of operations on some documents in a separate thread
// While we listen for the changes
updatesExecutor.submit(
new Runnable() {
DocumentReference doc1 = randomColl.document("doc1");

@Override
public void run() {
try {
snapshotListerActive.await(5, TimeUnit.SECONDS);
// create the first document
doc1.set(map("baz", "foo")).get();
// update the document
doc1.set(map("foo", "bar")).get();
// add a field to the document
doc1.set(map("foo", "bar", "bar", "foo")).get();
// delete the document
doc1.delete().get();
} catch (InterruptedException | ExecutionException e) {
fail(String.format("Error while processing doc1: %s", e.getMessage()));
} finally {
doc1CDL.countDown();
}
}
});

updatesExecutor.submit(
new Runnable() {
DocumentReference doc2 = randomColl.document("doc2");

@Override
public void run() {
try {
snapshotListerActive.await(5, TimeUnit.SECONDS);
// create a second document
doc2.set(map("foo", "bar")).get();
// update the document
doc2.set(map("foo", "foo")).get();
} catch (InterruptedException | ExecutionException e) {
fail(String.format("Error while processing doc2: %s", e.getMessage()));
} finally {
doc2CDL.countDown();
}
}
});

// Wait for the document update operations to be performed
final boolean doc1CDLAwait = doc1CDL.await(10, TimeUnit.SECONDS);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If I am not mistaken, doc1CDLAwait and doc2CDLAwait could be a single count down latch initialized with new CountDownLatch(2).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is true we could do this, but then we would lose the info about which of the document operations timed out for the assertion. I'd like to keep the extra detail in case the test ever fails due to this.

assertTrue("all operations for doc1 were not completed in time", doc1CDLAwait);
final boolean doc2CDLAwait = doc2CDL.await(10, TimeUnit.SECONDS);
assertTrue("all operations for doc2 were not completed in time", doc2CDLAwait);

// Wait for the expected number of update events to be delivered to our listener
eventsCDL.await(30, TimeUnit.SECONDS);
} finally {
// cleanup out listener
if (registration != null) {
registration.remove();
}
// Shutdown the thread pool used to perform the document updates
updatesExecutor.shutdown();
}

// Extract certain events from the list of events we received in out listener

final FluentIterable<ListenerEvent> events = FluentIterable.from(receivedEvents);

final Optional<ListenerEvent> anyError =
events.firstMatch(
new Predicate<ListenerEvent>() {
@Override
public boolean apply(ListenerEvent input) {
return input.error != null;
}
});
assertWithMessage("snapshotListener received an error").that(anyError).isAbsent();

final FluentIterable<QuerySnapshot> querySnapshots =
events
.filter(
new Predicate<ListenerEvent>() {
@Override
public boolean apply(ListenerEvent input) {
return input.value != null;
}
})
.transform(
new com.google.common.base.Function<ListenerEvent, QuerySnapshot>() {
@Override
public QuerySnapshot apply(ListenerEvent input) {
return input.value;
}
});

final Optional<QuerySnapshot> initialEmpty =
querySnapshots.firstMatch(
new Predicate<QuerySnapshot>() {
@Override
public boolean apply(QuerySnapshot input) {
return input.isEmpty() && input.getDocumentChanges().size() == 0;
}
});
final Set<String> addedDocumentIds = getIds(querySnapshots, DocumentChange.Type.ADDED);
final Set<String> modifiedDocumentIds = getIds(querySnapshots, DocumentChange.Type.MODIFIED);
final Set<String> removedDocumentIds = getIds(querySnapshots, DocumentChange.Type.REMOVED);
final Optional<QuerySnapshot> finalRemove =
querySnapshots.firstMatch(
new Predicate<QuerySnapshot>() {
@Override
public boolean apply(QuerySnapshot input) {
return input.isEmpty() && input.getDocumentChanges().size() == 1;
}
});

assertWithMessage("snapshotListener did not receive expected initial empty event")
.that(initialEmpty)
.isPresent();
assertWithMessage("snapshotListener did not receive expected added events")
.that(addedDocumentIds)
.isEqualTo(newHashSet("doc1", "doc2"));
assertWithMessage("snapshotListener did not receive expected modified events")
.that(modifiedDocumentIds)
.isEqualTo(newHashSet("doc1"));
assertWithMessage("snapshotListener did not receive expected removed events")
.that(removedDocumentIds)
.isEqualTo(newHashSet("doc1", "doc2"));
assertWithMessage("snapshotListener did not receive expected final empty event")
.that(finalRemove)
.isPresent();
}

private int paginateResults(Query query, List<DocumentSnapshot> results)
Expand Down Expand Up @@ -1190,4 +1294,33 @@ public void floatIncrement() throws ExecutionException, InterruptedException {
DocumentSnapshot docSnap = docRef.get().get();
assertEquals(3.3, (Double) docSnap.get("sum"), DOUBLE_EPSILON);
}

private static Set<String> getIds(
FluentIterable<QuerySnapshot> querySnapshots, DocumentChange.Type type) {
final Set<String> documentIds = new HashSet<>();
for (QuerySnapshot querySnapshot : querySnapshots) {
final List<DocumentChange> changes = querySnapshot.getDocumentChanges();
for (DocumentChange change : changes) {
if (change.getType() == type) {
documentIds.add(change.getDocument().getId());
}
}
}
return documentIds;
}

/**
* A tuple class used by {@code #queryWatch}. This class represents an event delivered to the
* registered query listener.
*/
private static final class ListenerEvent {

@Nullable private final QuerySnapshot value;
@Nullable private final FirestoreException error;

ListenerEvent(@Nullable QuerySnapshot value, @Nullable FirestoreException error) {
this.value = value;
this.error = error;
}
}
}