diff --git a/google-cloud-clients/google-cloud-firestore/src/test/java/com/google/cloud/firestore/it/ITSystemTest.java b/google-cloud-clients/google-cloud-firestore/src/test/java/com/google/cloud/firestore/it/ITSystemTest.java index 0e24bce25b34..199219098475 100644 --- a/google-cloud-clients/google-cloud-firestore/src/test/java/com/google/cloud/firestore/it/ITSystemTest.java +++ b/google-cloud-clients/google-cloud-firestore/src/test/java/com/google/cloud/firestore/it/ITSystemTest.java @@ -18,6 +18,7 @@ import static com.google.cloud.firestore.LocalFirestoreHelper.map; import static com.google.common.collect.Sets.newHashSet; +import static com.google.common.truth.Truth.assertWithMessage; import static java.util.Arrays.asList; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; @@ -30,7 +31,7 @@ import com.google.api.core.ApiFutures; import com.google.cloud.Timestamp; import com.google.cloud.firestore.CollectionReference; -import com.google.cloud.firestore.DocumentChange.Type; +import com.google.cloud.firestore.DocumentChange; import com.google.cloud.firestore.DocumentReference; import com.google.cloud.firestore.DocumentSnapshot; import com.google.cloud.firestore.EventListener; @@ -53,20 +54,23 @@ import com.google.cloud.firestore.Transaction.Function; import com.google.cloud.firestore.WriteBatch; import com.google.cloud.firestore.WriteResult; -import com.google.common.base.Joiner; +import com.google.common.base.Optional; +import com.google.common.base.Predicate; +import com.google.common.collect.FluentIterable; import com.google.common.collect.ImmutableList; -import com.google.common.collect.Sets; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import java.util.HashMap; +import java.util.HashSet; import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Set; -import java.util.TreeSet; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; @@ -889,95 +893,195 @@ public void onEvent( @Test public void queryWatch() throws Exception { - final Semaphore semaphore = new Semaphore(0); + // This test has quite a bit of machinery to it, as there are several things going on. + // The main scenario we are test is: 'Given a query, listen for snapshot updates made to that + // query' + // + // To verify this behavior we have the following things happening: + // 1. A defined query with an attached snapshot listener. + // 2. Two different documents that we are making changes to throughout the duration of the + // test. These changes are expected to trigger a number of events that should be delivered + // to the snapshot listener from 1. + // 3. Once all the document updates have been performed, and the expected number of events + // has been delivered to the snapshot listener, the events will be queried for the expected + // series of events and will then be asserted on. + // + // The mechanics of how the test is executed are as follows + // A. All events delivered to the listener in 1 are added to a list of events we receive. + // B. A separate ExecutorService is created and has tasks submitted to it to perform the + // series of document updates mentioned in 2. (Each document gets its own task). + // * Each of these update tasks will wait for the snapshot listener to be active before + // performing the updates. + // C. CountDownLatches are used to keep track of completion of work while the test is running. + // * Every time an event is received by the listener in 1 a CDL is decremented. + // * Each document update task has a CDL that is decremented when all actions have been + // performed + // * Each CDL has a timeout associated with its await (this is important so that in the + // case there is a hang during the test the whole suite isn't taken with it) + // D. After all updates have been performed and the expected number of events have been + // received, assertions on the events will be performed. ListenerRegistration registration = null; + final ExecutorService updatesExecutor = Executors.newCachedThreadPool(); - final Set expectedEvents = - Sets.newTreeSet( - newHashSet("event 0", "event 1", "event 2", "event 3", "event 4", "event 5")); - final Set actualEvents = Collections.synchronizedSet(new TreeSet()); + final List receivedEvents = + Collections.synchronizedList(new ArrayList()); try { + // create our CDLs that are used to track work completion + final CountDownLatch doc1CDL = new CountDownLatch(1); + final CountDownLatch doc2CDL = new CountDownLatch(1); + final CountDownLatch eventsCDL = new CountDownLatch(6); + + // create a CDL that is used to signal the snapshot lister is active. + // if we don't do this, there is a possible race with out document updates happening before + // the listener has received the "empty" event that is expected in the case that the + // result set of the query would be empty (i.e. in the case of an empty collection with new + // documents being created) + final CountDownLatch snapshotListerActive = new CountDownLatch(1); + + final Query query = randomColl.whereEqualTo("foo", "bar"); + // register the snapshot listener for the query registration = - randomColl - .whereEqualTo("foo", "bar") - .addSnapshotListener( - new EventListener() { - DocumentReference ref1; - DocumentReference ref2; - - @Override - public void onEvent( - @Nullable QuerySnapshot value, @Nullable FirestoreException error) { - System.out.printf("onEvent(value : %s, error : %s)%n", value, error); - try { - switch (semaphore.availablePermits()) { - case 0: - actualEvents.add("event 0"); - assertTrue(value.isEmpty()); - ref1 = randomColl.add(map("foo", "foo")).get(); - ref2 = randomColl.add(map("foo", "bar")).get(); - break; - case 1: - actualEvents.add("event 1"); - assertEquals(1, value.size()); - assertEquals(1, value.getDocumentChanges().size()); - assertEquals(Type.ADDED, value.getDocumentChanges().get(0).getType()); - ref1.set(map("foo", "bar")); - break; - case 2: - actualEvents.add("event 2"); - assertEquals(2, value.size()); - assertEquals(1, value.getDocumentChanges().size()); - assertEquals(Type.ADDED, value.getDocumentChanges().get(0).getType()); - ref1.set(map("foo", "bar", "bar", " foo")); - break; - case 3: - actualEvents.add("event 3"); - assertEquals(2, value.size()); - assertEquals(1, value.getDocumentChanges().size()); - assertEquals( - Type.MODIFIED, value.getDocumentChanges().get(0).getType()); - ref2.set(map("foo", "foo")); - break; - case 4: - actualEvents.add("event 4"); - assertEquals(1, value.size()); - assertEquals(1, value.getDocumentChanges().size()); - assertEquals(Type.REMOVED, value.getDocumentChanges().get(0).getType()); - ref1.delete(); - break; - case 5: - actualEvents.add("event 5"); - assertTrue(value.isEmpty()); - assertEquals(1, value.getDocumentChanges().size()); - assertEquals(Type.REMOVED, value.getDocumentChanges().get(0).getType()); - break; - } - } catch (Exception e) { - fail(e.getMessage()); - } - semaphore.release(); - } - }); - - final boolean tryAcquire = semaphore.tryAcquire(6, 60, TimeUnit.SECONDS); - - final Joiner j = Joiner.on(", "); - final String expectedString = j.join(expectedEvents); - final String actualString = j.join(actualEvents); - assertTrue( - String.format( - "did not receive all expected events within the deadline.%n" - + "expectedEvents = [%s]%n" - + " actualEvents = [%s]%n", - expectedString, actualString), - tryAcquire); + query.addSnapshotListener( + new EventListener() { + @Override + public void onEvent( + @Nullable QuerySnapshot value, @Nullable FirestoreException error) { + snapshotListerActive.countDown(); + receivedEvents.add(new ListenerEvent(value, error)); + eventsCDL.countDown(); + } + }); + + // Perform a series of operations on some documents in a separate thread + // While we listen for the changes + updatesExecutor.submit( + new Runnable() { + DocumentReference doc1 = randomColl.document("doc1"); + + @Override + public void run() { + try { + snapshotListerActive.await(5, TimeUnit.SECONDS); + // create the first document + doc1.set(map("baz", "foo")).get(); + // update the document + doc1.set(map("foo", "bar")).get(); + // add a field to the document + doc1.set(map("foo", "bar", "bar", "foo")).get(); + // delete the document + doc1.delete().get(); + } catch (InterruptedException | ExecutionException e) { + fail(String.format("Error while processing doc1: %s", e.getMessage())); + } finally { + doc1CDL.countDown(); + } + } + }); + + updatesExecutor.submit( + new Runnable() { + DocumentReference doc2 = randomColl.document("doc2"); + + @Override + public void run() { + try { + snapshotListerActive.await(5, TimeUnit.SECONDS); + // create a second document + doc2.set(map("foo", "bar")).get(); + // update the document + doc2.set(map("foo", "foo")).get(); + } catch (InterruptedException | ExecutionException e) { + fail(String.format("Error while processing doc2: %s", e.getMessage())); + } finally { + doc2CDL.countDown(); + } + } + }); + + // Wait for the document update operations to be performed + final boolean doc1CDLAwait = doc1CDL.await(10, TimeUnit.SECONDS); + assertTrue("all operations for doc1 were not completed in time", doc1CDLAwait); + final boolean doc2CDLAwait = doc2CDL.await(10, TimeUnit.SECONDS); + assertTrue("all operations for doc2 were not completed in time", doc2CDLAwait); + + // Wait for the expected number of update events to be delivered to our listener + eventsCDL.await(30, TimeUnit.SECONDS); } finally { + // cleanup out listener if (registration != null) { registration.remove(); } + // Shutdown the thread pool used to perform the document updates + updatesExecutor.shutdown(); } + + // Extract certain events from the list of events we received in out listener + + final FluentIterable events = FluentIterable.from(receivedEvents); + + final Optional anyError = + events.firstMatch( + new Predicate() { + @Override + public boolean apply(ListenerEvent input) { + return input.error != null; + } + }); + assertWithMessage("snapshotListener received an error").that(anyError).isAbsent(); + + final FluentIterable querySnapshots = + events + .filter( + new Predicate() { + @Override + public boolean apply(ListenerEvent input) { + return input.value != null; + } + }) + .transform( + new com.google.common.base.Function() { + @Override + public QuerySnapshot apply(ListenerEvent input) { + return input.value; + } + }); + + final Optional initialEmpty = + querySnapshots.firstMatch( + new Predicate() { + @Override + public boolean apply(QuerySnapshot input) { + return input.isEmpty() && input.getDocumentChanges().size() == 0; + } + }); + final Set addedDocumentIds = getIds(querySnapshots, DocumentChange.Type.ADDED); + final Set modifiedDocumentIds = getIds(querySnapshots, DocumentChange.Type.MODIFIED); + final Set removedDocumentIds = getIds(querySnapshots, DocumentChange.Type.REMOVED); + final Optional finalRemove = + querySnapshots.firstMatch( + new Predicate() { + @Override + public boolean apply(QuerySnapshot input) { + return input.isEmpty() && input.getDocumentChanges().size() == 1; + } + }); + + assertWithMessage("snapshotListener did not receive expected initial empty event") + .that(initialEmpty) + .isPresent(); + assertWithMessage("snapshotListener did not receive expected added events") + .that(addedDocumentIds) + .isEqualTo(newHashSet("doc1", "doc2")); + assertWithMessage("snapshotListener did not receive expected modified events") + .that(modifiedDocumentIds) + .isEqualTo(newHashSet("doc1")); + assertWithMessage("snapshotListener did not receive expected removed events") + .that(removedDocumentIds) + .isEqualTo(newHashSet("doc1", "doc2")); + assertWithMessage("snapshotListener did not receive expected final empty event") + .that(finalRemove) + .isPresent(); } private int paginateResults(Query query, List results) @@ -1190,4 +1294,33 @@ public void floatIncrement() throws ExecutionException, InterruptedException { DocumentSnapshot docSnap = docRef.get().get(); assertEquals(3.3, (Double) docSnap.get("sum"), DOUBLE_EPSILON); } + + private static Set getIds( + FluentIterable querySnapshots, DocumentChange.Type type) { + final Set documentIds = new HashSet<>(); + for (QuerySnapshot querySnapshot : querySnapshots) { + final List changes = querySnapshot.getDocumentChanges(); + for (DocumentChange change : changes) { + if (change.getType() == type) { + documentIds.add(change.getDocument().getId()); + } + } + } + return documentIds; + } + + /** + * A tuple class used by {@code #queryWatch}. This class represents an event delivered to the + * registered query listener. + */ + private static final class ListenerEvent { + + @Nullable private final QuerySnapshot value; + @Nullable private final FirestoreException error; + + ListenerEvent(@Nullable QuerySnapshot value, @Nullable FirestoreException error) { + this.value = value; + this.error = error; + } + } }