Conversation
robshakir
commented
Jun 6, 2021
* (M) client/client.go
- TODOs for implementation details that are pending.
* (M) client/client.go
- add handling for receiving messages from a client into
restructured pending and result queues that include more
info including the timestamp and result codes.
* (M) client/client_test.go
- add test coverage for all non-integration parts.
* (M) fluent/fluent.go
- Since the client connection parameters might be across
different RPCs, restructure the fluent client (per the design
doc) to encapsulate the connection parameters.
* (M) fluent/fluent_test.go
- Update existing tests to correspond to absorb the changes
described above.
* (M) client/client.go
* (M) client/client_test.go
- Restructure pneding queues to be able to store the type of
transaction that is pending, not just operations - such that
it is possible to track latency of other operations and ensure
that the client is aware that is has pending non-operations
requests.
- Add a converged method to check whether there are any pending
requests from the server in the client.
* (M) fluent/fluent.go
* (M) fluent/fluent_test.go
- Add initial Await() implementation.
* (A) chk/chk.go
* (A) chk/chk_test.go
- Add a check library that can be used to determine
characteristics of the results that are returned by the client.
This library is a fluent-style helper library for gRIBI client
results to maintain readability of our test cases.
* (M) client/client.go
- Change the modifyCh to be unbuffered such that we have blocking
writes, this is required to ensure that we do not lose messages
and ensure that Await operates as expected.
- Add String debugging output to OpResult.
* (M) fluent/fluent.go
* (M) fluent/fluent_test.go
- Add Results method, and extend testing to results.
* (M) go.mod
* (M) go.sum
- Housekeeping
* (M) server/server.go
* (M) server/server_test.go
- Add support and testing for master election.
* (M) client/client.go
- Avoid appending an empty session parameters request with no
provided parameters.
* (M) fluent/fluent_test.go
- Handle test flake because of timing issues.
* (M) client/client_test.go
- Remove sending empty parameters message when no parameters are
set.
* (M) server/server.go
* (M) server/server_test.go
- Add handling of session parameters validation and checks against
the supported capabilities of the fake.
* (M) client/client.go
* (M) client/client_test.go
- Add new option to allow for persistence to be sent to the server.
- Ensure that a client with errors is marked as converged.
* (M) fluent/fluent.go
* (M) fluent/fluent_test.go
- Plumb persistence through fluent client.
- Add test case to check that a client with errors does not hang
indefinitely.
* (M) server/server.go
* (M) server/server_test.go
- Fix error handling.
* (M) chk/chk.go
- Add support for checking whether the received errors from the
server contain a particular error.
* (A) compliance/compliance.go
* (A) compliance/compliance_test.go
- Create a compliance testing library for tests that are relevant
to other implementations.
* (M) fluent/fluent.go
* (M) fluent/fluent_test.go
- Remove compliance tests from fluent_test.go
- Implement builder for Modify errors.
* (A) testcommon/testcommon.go
- Move testServer into a common package.
compliance/compliance.go
Outdated
| c.Connection().WithTarget(addr) | ||
| c.Start(context.Background(), t) | ||
| c.StartSending(context.Background(), t) | ||
| time.Sleep(100 * time.Millisecond) |
There was a problem hiding this comment.
May be good to explain the sleep. Do we need a loop or is 100ms practically good enough for de-flaking the test.
There was a problem hiding this comment.
Thanks - I'll add some explanation in here, but thoughts are welcome.
Without this sleep, the test is flaky because:
- We start the server, which is listening. The client connects at
c.Start() - We start the client sending at
c.StartSendingat this point it empties the queues that we created in thec.Connection()call. - Whilst we're doing that, we call
Awaitand because thesendqis empty, and the messages haven't made it onto the wire such that they're in thependqthen we consider the client converged.
100msec (and likely less) gives the client time to have got the messages onto the wire and add them to the pending queues.
Writing this down helped me think about this a bit more, and this helped me figure out where the synchronisation issue was -- I introduced a couple of new booleans that indicate that the client is sending and/or receiving - i.e., we are in that window between sending or receiving and post-processing the message to add to the right queue. We can't do this the other way around as otherwise we won't know whether the send/recv was actually successful.
We also don't want there to be something blocking here, since this could result in a deadlock if isConverged is called at a time when a goroutine wants to grab the lock to actually handle a message, so I've adopted an atomic boolean that is used to ensure that isConverged cannot return true during periods that we know that we have pending post-processing.
This means that tests are de-flaked without the sleep :-D
There was a problem hiding this comment.
Gosh, this was quite an adventure.
There were a number of really subtle races here of the type described above -- the one that was only triggering on GitHub actions and not on GCP, AWS, a private VM, any of my Macs, my embedded ARM64 machine..., like anywhere was one whereby:
- the client took a message out of the
sendqwhilst holding the mutex, and wrote it tomodifyCh. - whilst the goroutine that handled
modifyChtook the message off the channel and was in the process of adding it to thependq(throughhandleModifyRequest),Await()was called. At this time, there was nothing pending (we didn't add anything there), nothing in the channel (we took the message off there), and nothing in the send queue (we took the message out of there!) - i.e., the message was either directly in the process of being sent to the socket, /or/ we were part way through processing. - Await looked at all of these things, and thought - boy, we're converged, so returned.
- Results was called, and didn't have the result for the 2nd message. It was always the 2nd message, because, hey - we held the pending queue lock during receiving the first message, so we slowed down adding the 2nd message to the pending queue.
So, what did I do? Other than really struggle to debug this?
- added a
sync.RWMutexcalledawaiting. This mutex is read-locked every time a goroutine enters a state where it is doing something with a message that is sent or received.awaitingis write locked byAwaitwhen it tries to read all the state - this means if anyone is doing something where there's a message in flight,Awaitblocks and waits for things to come back, it also means that no-one can do any new work whilst we're reading the status - so the queues will remain consistent.awaitinghaving this form means that if we're in the state that we haven't yet sent the message to the socket, thenAwaitwill block. If we're processing the message then we'll also block. - Pre-process messages then they are submitted with
Qto add them to the pending queue -- we expect messages that we get should be valid /enough/ to store them as a pending transaction (in the future for completely invalid messages, we'll give access to the stream itself), if they're not, we throw an error and the client returns. This removes the race whereby we're mid-way through sending a message to the channel/getting it from there when someone callsAwait.
I switched bool types that were not being protected by mutexes to being atomically updated. I'd like to discuss this a bit -- it seems like we might still want a mutex rather than to use sync/atomic, if we're not checking the value when we're writing (e.g., https://github.com/openconfig/gribigo/pull/13/files#diff-bd3a55a72186f59e2e63efb4951573b2f9e4a7cc98086e922b0859f8ccc1dd09R234) it seems like it is safe for us to not have a mutex and just update this value in place.
I also started to think about how to cover this in tests going forward - it's clear that this logic is complex enough that the integration tests that were being used are not sufficient.
There was a problem hiding this comment.
Wow that is quite a thorough explanation. As a general rule, I think mutex is better than atomic variables unless we have a single atomic variable. Ultimately atomic variables are also sort of mutex with CAS instruction. I will have to admit that my review has been mostly superficial looking for anything that smells :) I think I am following what you are saying about the various queues. I wonder if there is a need to use counters instead of simple booleans. I mean like push and pop from the queues that are backed by a counter indicating length. Or if you have mutex then you can simply use the size(). The key point is protecting using appropriate guard that wraps a complex operation as atomic.
* (M) chk/chk.go
- Add removed documentation string.
* (M) client/client.go
* (M) client/client_test.go
- Add atomic.Bool to indicate whether we are in the process of
sending or receiving.
* (M) compliance/compliance.go
* (M) fluent/fluent_test.go
- Remove time.Sleep!
* (M) go.mod
* (M) go.sum
- New dependency on Uber's atomic package.
* (M) compliance/compliance.go
- Actually remove time.Sleep.
- Add docstring comments.
* (M) fluent/fluent_test.go
- Actually remove time.Sleep.
* (M) client/client_test.go
- Avoid a deadlock by ensuring that we're actually emptying the
modifyCh now that it is blocking.
|
@sthesayi PTAL. It's clear I vastly underestimated the number of story points that this was going to be based on this debugging alone! |
sthesayi
left a comment
There was a problem hiding this comment.
The changes look good. I will probably need to go through the logic in more detail when I have some time. Probably later today. I want to follow the use of the various atomic variables and the mutexes. At first glance it seems quite complicated and probably for good reason. I wonder if there is a simpler way using channels but can't quite answer that without more deeper understanding of the queues.
| // queue (enqued by Q) to the connection established by Connect. | ||
| func (c *Client) StartSending() { | ||
| c.qs.sending = true | ||
| c.qs.sending.Store(true) |
There was a problem hiding this comment.
When will this bool be set to false?
There was a problem hiding this comment.
It set to false in two cases:
- by default before the client called
StartSending, so to start with the client doesn't actually put messages on the wire. This allows us to handle the case where we want to queue up a bunch of things to send when we connect. - by some future
StopSendingfunction which will allow us to test some timing related areas.
|
Suresh, thanks for the detailed comments here - and potential future review. I do think that there might be some opportunity to simplify this using channels, I just couldn't find the way to do it. The issue breaks down to:
The other thing I spent a lot of time experimenting with was whether you could use I'm very open to there being a better way to do this :-) Maybe it was post-midnight thinking but I couldn't find one that worked. |
|
Merging this PR, we can spend some time on this going forward - I don't think there's an immediate alternative here. |