From 17e327d5fe47d3928e189f3ccffc9bb5b27b72c8 Mon Sep 17 00:00:00 2001 From: BenWhitehead Date: Fri, 26 Jul 2019 17:20:41 -0400 Subject: [PATCH 1/2] Refactor com.google.cloud.firestore.it.ITSystemTest.queryWatch to make it more reliable. 1. Separate document update logic from event listener code 2. Move assertions out of listener code 3. Add timeouts to all await conditions with associated assertions to clearly communicate the test fails due to an await, and which one. 4. Switch from using a semaphore to using multiple CountDownLatches The queryWatch test has been flaky for some time, resulting in failure most of the time. Due to these persistent failures the trust of the firestore integration test suite has been low, and the results often ignored. After being able to run the test in a profileable environment, I ran the old implementation repeatedly for 25 minutes resulting in a total of 136 runs, 100 of which failed -- each with the same error. The new implementation was ran for 25 minutes resulting in a total of 2500 runs, none of which failed. --- .../cloud/firestore/it/ITSystemTest.java | 299 +++++++++++++----- 1 file changed, 216 insertions(+), 83 deletions(-) diff --git a/google-cloud-clients/google-cloud-firestore/src/test/java/com/google/cloud/firestore/it/ITSystemTest.java b/google-cloud-clients/google-cloud-firestore/src/test/java/com/google/cloud/firestore/it/ITSystemTest.java index 0e24bce25b34..3396ad392903 100644 --- a/google-cloud-clients/google-cloud-firestore/src/test/java/com/google/cloud/firestore/it/ITSystemTest.java +++ b/google-cloud-clients/google-cloud-firestore/src/test/java/com/google/cloud/firestore/it/ITSystemTest.java @@ -18,6 +18,7 @@ import static com.google.cloud.firestore.LocalFirestoreHelper.map; import static com.google.common.collect.Sets.newHashSet; +import static com.google.common.truth.Truth.assertWithMessage; import static java.util.Arrays.asList; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; @@ -30,7 +31,7 @@ import com.google.api.core.ApiFutures; import com.google.cloud.Timestamp; import com.google.cloud.firestore.CollectionReference; -import com.google.cloud.firestore.DocumentChange.Type; +import com.google.cloud.firestore.DocumentChange; import com.google.cloud.firestore.DocumentReference; import com.google.cloud.firestore.DocumentSnapshot; import com.google.cloud.firestore.EventListener; @@ -53,20 +54,23 @@ import com.google.cloud.firestore.Transaction.Function; import com.google.cloud.firestore.WriteBatch; import com.google.cloud.firestore.WriteResult; -import com.google.common.base.Joiner; +import com.google.common.base.Optional; +import com.google.common.base.Predicate; +import com.google.common.collect.FluentIterable; import com.google.common.collect.ImmutableList; -import com.google.common.collect.Sets; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import java.util.HashMap; +import java.util.HashSet; import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Set; -import java.util.TreeSet; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; @@ -889,95 +893,195 @@ public void onEvent( @Test public void queryWatch() throws Exception { - final Semaphore semaphore = new Semaphore(0); + // This test has quite a bit of machinery to it, as there are several things going on. + // The main scenario we are test is: 'Given a query, listen for snapshot updates made to that + // query' + // + // To verify this behavior we have the following things happening: + // 1. A defined query with an attached snapshot listener. + // 2. Two different documents that we are making changes to throughout the duration of the + // test. These changes are expected to trigger a number of events that should be delivered + // to the snapshot listener from 1. + // 3. Once all the document updates have been performed, and the expected number of events + // has been delivered to the snapshot listener, the events will be queried for the expected + // series of events and will then be asserted on. + // + // The mechanics of how the test is executed are as follows + // A. All events delivered to the listener in 1 are added to a list of event we receive. + // B. A separate ExecutorService is created and has tasks submitted to it to perform the + // series of document updates mentioned in 2. (Each document gets its own task). + // * Each of these update tasks will wait for the snapshot listener to be active before + // performing the updates. + // C. CountDownLatches are used to keep track of completion of work while the test is running. + // * Every time an event is received by the listener in 1 a CDL is decremented. + // * Each document update task has a CDL that is decremented when all actions have been + // performed + // * Each CDL has a timeout associated with its await (this is important so that in the + // case there is a hang during the test the whole suite isn't taken with it) + // D. After all updates have been performed and the expected number of events have been + // received, assertions on the events will be performed. ListenerRegistration registration = null; + final ExecutorService updatesExecutor = Executors.newCachedThreadPool(); - final Set expectedEvents = - Sets.newTreeSet( - newHashSet("event 0", "event 1", "event 2", "event 3", "event 4", "event 5")); - final Set actualEvents = Collections.synchronizedSet(new TreeSet()); + final List receivedEvents = + Collections.synchronizedList(new ArrayList()); try { + // create our CDLs that are used to track work completion + final CountDownLatch doc1CDL = new CountDownLatch(1); + final CountDownLatch doc2CDL = new CountDownLatch(1); + final CountDownLatch eventsCDL = new CountDownLatch(6); + + // create a CDL that is used to signal the snapshot lister is active. + // if we don't do this, there is a possible race with out document updates happening before + // the listener has received the "empty" event that is expected in the case that the + // result set of the query would be empty (i.e. in the case of an empty collection with new + // documents being created) + final CountDownLatch snapshotListerActive = new CountDownLatch(1); + + final Query query = randomColl.whereEqualTo("foo", "bar"); + // register the snapshot listener for the query registration = - randomColl - .whereEqualTo("foo", "bar") - .addSnapshotListener( - new EventListener() { - DocumentReference ref1; - DocumentReference ref2; - - @Override - public void onEvent( - @Nullable QuerySnapshot value, @Nullable FirestoreException error) { - System.out.printf("onEvent(value : %s, error : %s)%n", value, error); - try { - switch (semaphore.availablePermits()) { - case 0: - actualEvents.add("event 0"); - assertTrue(value.isEmpty()); - ref1 = randomColl.add(map("foo", "foo")).get(); - ref2 = randomColl.add(map("foo", "bar")).get(); - break; - case 1: - actualEvents.add("event 1"); - assertEquals(1, value.size()); - assertEquals(1, value.getDocumentChanges().size()); - assertEquals(Type.ADDED, value.getDocumentChanges().get(0).getType()); - ref1.set(map("foo", "bar")); - break; - case 2: - actualEvents.add("event 2"); - assertEquals(2, value.size()); - assertEquals(1, value.getDocumentChanges().size()); - assertEquals(Type.ADDED, value.getDocumentChanges().get(0).getType()); - ref1.set(map("foo", "bar", "bar", " foo")); - break; - case 3: - actualEvents.add("event 3"); - assertEquals(2, value.size()); - assertEquals(1, value.getDocumentChanges().size()); - assertEquals( - Type.MODIFIED, value.getDocumentChanges().get(0).getType()); - ref2.set(map("foo", "foo")); - break; - case 4: - actualEvents.add("event 4"); - assertEquals(1, value.size()); - assertEquals(1, value.getDocumentChanges().size()); - assertEquals(Type.REMOVED, value.getDocumentChanges().get(0).getType()); - ref1.delete(); - break; - case 5: - actualEvents.add("event 5"); - assertTrue(value.isEmpty()); - assertEquals(1, value.getDocumentChanges().size()); - assertEquals(Type.REMOVED, value.getDocumentChanges().get(0).getType()); - break; - } - } catch (Exception e) { - fail(e.getMessage()); - } - semaphore.release(); - } - }); - - final boolean tryAcquire = semaphore.tryAcquire(6, 60, TimeUnit.SECONDS); - - final Joiner j = Joiner.on(", "); - final String expectedString = j.join(expectedEvents); - final String actualString = j.join(actualEvents); - assertTrue( - String.format( - "did not receive all expected events within the deadline.%n" - + "expectedEvents = [%s]%n" - + " actualEvents = [%s]%n", - expectedString, actualString), - tryAcquire); + query.addSnapshotListener( + new EventListener() { + @Override + public void onEvent( + @Nullable QuerySnapshot value, @Nullable FirestoreException error) { + snapshotListerActive.countDown(); + receivedEvents.add(new ListenerEvent(value, error)); + eventsCDL.countDown(); + } + }); + + // Perform a series of operations on some documents in a separate thread + // While we listen for the changes + updatesExecutor.submit( + new Runnable() { + DocumentReference doc1 = randomColl.document("doc1"); + + @Override + public void run() { + try { + snapshotListerActive.await(5, TimeUnit.SECONDS); + // create the first document + doc1.set(map("baz", "foo")).get(); + // update a field in the document + doc1.set(map("foo", "bar")).get(); + // add a field to the document + doc1.set(map("foo", "bar", "bar", "foo")).get(); + // delete the document + doc1.delete().get(); + } catch (InterruptedException | ExecutionException e) { + fail(String.format("Error while processing doc1: %s", e.getMessage())); + } finally { + doc1CDL.countDown(); + } + } + }); + + updatesExecutor.submit( + new Runnable() { + DocumentReference doc2 = randomColl.document("doc2"); + + @Override + public void run() { + try { + snapshotListerActive.await(5, TimeUnit.SECONDS); + // create a second document + doc2.set(map("foo", "bar")).get(); + // update a field in the document + doc2.set(map("foo", "foo")).get(); + } catch (InterruptedException | ExecutionException e) { + fail(String.format("Error while processing doc2: %s", e.getMessage())); + } finally { + doc2CDL.countDown(); + } + } + }); + + // Wait for the document update operations to be performed + final boolean doc1CDLAwait = doc1CDL.await(10, TimeUnit.SECONDS); + assertTrue("all operations for doc1 were not completed in time", doc1CDLAwait); + final boolean doc2CDLAwait = doc2CDL.await(10, TimeUnit.SECONDS); + assertTrue("all operations for doc2 were not completed in time", doc2CDLAwait); + + // Wait for the expected number of update events to be delivered to our listener + eventsCDL.await(30, TimeUnit.SECONDS); } finally { + // cleanup out listener if (registration != null) { registration.remove(); } + // Shutdown the thread pool used to perform the document updates + updatesExecutor.shutdown(); } + + // Extract certain events from the list of events we received in out listener + + final FluentIterable events = FluentIterable.from(receivedEvents); + + final Optional anyError = + events.firstMatch( + new Predicate() { + @Override + public boolean apply(ListenerEvent input) { + return input.error != null; + } + }); + assertWithMessage("snapshotListener received an error").that(anyError).isAbsent(); + + final FluentIterable querySnapshots = + events + .filter( + new Predicate() { + @Override + public boolean apply(ListenerEvent input) { + return input.value != null; + } + }) + .transform( + new com.google.common.base.Function() { + @Override + public QuerySnapshot apply(ListenerEvent input) { + return input.value; + } + }); + + final Optional initialEmpty = + querySnapshots.firstMatch( + new Predicate() { + @Override + public boolean apply(QuerySnapshot input) { + return input.isEmpty() && input.getDocumentChanges().size() == 0; + } + }); + final Set addedDocumentIds = getIds(querySnapshots, DocumentChange.Type.ADDED); + final Set modifiedDocumentIds = getIds(querySnapshots, DocumentChange.Type.MODIFIED); + final Set removedDocumentIds = getIds(querySnapshots, DocumentChange.Type.REMOVED); + final Optional finalRemove = + querySnapshots.firstMatch( + new Predicate() { + @Override + public boolean apply(QuerySnapshot input) { + return input.isEmpty() && input.getDocumentChanges().size() == 1; + } + }); + + assertWithMessage("snapshotListener did not receive expected initial empty event") + .that(initialEmpty) + .isPresent(); + assertWithMessage("snapshotListener did not receive expected added events") + .that(addedDocumentIds) + .isEqualTo(newHashSet("doc1", "doc2")); + assertWithMessage("snapshotListener did not receive expected modified events") + .that(modifiedDocumentIds) + .isEqualTo(newHashSet("doc1")); + assertWithMessage("snapshotListener did not receive expected removed events") + .that(removedDocumentIds) + .isEqualTo(newHashSet("doc1", "doc2")); + assertWithMessage("snapshotListener did not receive expected final empty event") + .that(finalRemove) + .isPresent(); } private int paginateResults(Query query, List results) @@ -1190,4 +1294,33 @@ public void floatIncrement() throws ExecutionException, InterruptedException { DocumentSnapshot docSnap = docRef.get().get(); assertEquals(3.3, (Double) docSnap.get("sum"), DOUBLE_EPSILON); } + + private static Set getIds( + FluentIterable querySnapshots, DocumentChange.Type type) { + final Set documentIds = new HashSet<>(); + for (QuerySnapshot querySnapshot : querySnapshots) { + final List changes = querySnapshot.getDocumentChanges(); + for (DocumentChange change : changes) { + if (change.getType() == type) { + documentIds.add(change.getDocument().getId()); + } + } + } + return documentIds; + } + + /** + * A tuple class used by {@code #queryWatch}. This class represents an event delivered to the + * registered query listener. + */ + private static final class ListenerEvent { + + @Nullable private final QuerySnapshot value; + @Nullable private final FirestoreException error; + + ListenerEvent(@Nullable QuerySnapshot value, @Nullable FirestoreException error) { + this.value = value; + this.error = error; + } + } } From 43b8af60d1010bfb616e4357a8292eb0a2379309 Mon Sep 17 00:00:00 2001 From: BenWhitehead Date: Mon, 29 Jul 2019 14:30:35 -0400 Subject: [PATCH 2/2] Code review typo fixes --- .../java/com/google/cloud/firestore/it/ITSystemTest.java | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/google-cloud-clients/google-cloud-firestore/src/test/java/com/google/cloud/firestore/it/ITSystemTest.java b/google-cloud-clients/google-cloud-firestore/src/test/java/com/google/cloud/firestore/it/ITSystemTest.java index 3396ad392903..199219098475 100644 --- a/google-cloud-clients/google-cloud-firestore/src/test/java/com/google/cloud/firestore/it/ITSystemTest.java +++ b/google-cloud-clients/google-cloud-firestore/src/test/java/com/google/cloud/firestore/it/ITSystemTest.java @@ -907,7 +907,7 @@ public void queryWatch() throws Exception { // series of events and will then be asserted on. // // The mechanics of how the test is executed are as follows - // A. All events delivered to the listener in 1 are added to a list of event we receive. + // A. All events delivered to the listener in 1 are added to a list of events we receive. // B. A separate ExecutorService is created and has tasks submitted to it to perform the // series of document updates mentioned in 2. (Each document gets its own task). // * Each of these update tasks will wait for the snapshot listener to be active before @@ -965,7 +965,7 @@ public void run() { snapshotListerActive.await(5, TimeUnit.SECONDS); // create the first document doc1.set(map("baz", "foo")).get(); - // update a field in the document + // update the document doc1.set(map("foo", "bar")).get(); // add a field to the document doc1.set(map("foo", "bar", "bar", "foo")).get(); @@ -989,7 +989,7 @@ public void run() { snapshotListerActive.await(5, TimeUnit.SECONDS); // create a second document doc2.set(map("foo", "bar")).get(); - // update a field in the document + // update the document doc2.set(map("foo", "foo")).get(); } catch (InterruptedException | ExecutionException e) { fail(String.format("Error while processing doc2: %s", e.getMessage()));