diff --git a/examples/examples_test.sh b/examples/examples_test.sh index bead4d0dcbe1..250840205abb 100755 --- a/examples/examples_test.sh +++ b/examples/examples_test.sh @@ -59,6 +59,7 @@ EXAMPLES=( "features/encryption/TLS" "features/error_details" "features/error_handling" + "features/flow_control" "features/interceptor" "features/load_balancing" "features/metadata" @@ -112,6 +113,7 @@ declare -A EXPECTED_SERVER_OUTPUT=( ["features/encryption/TLS"]="" ["features/error_details"]="" ["features/error_handling"]="" + ["features/flow_control"]="Stream ended successfully." ["features/interceptor"]="unary echoing message \"hello world\"" ["features/load_balancing"]="serving on :50051" ["features/metadata"]="message:\"this is examples/metadata\", sending echo" @@ -134,6 +136,7 @@ declare -A EXPECTED_CLIENT_OUTPUT=( ["features/encryption/TLS"]="UnaryEcho: hello world" ["features/error_details"]="Greeting: Hello world" ["features/error_handling"]="Received error" + ["features/flow_control"]="Stream ended successfully." ["features/interceptor"]="UnaryEcho: hello world" ["features/load_balancing"]="calling helloworld.Greeter/SayHello with pick_first" ["features/metadata"]="this is examples/metadata" diff --git a/examples/features/flow_control/README.md b/examples/features/flow_control/README.md new file mode 100644 index 000000000000..0ffb7c1266e0 --- /dev/null +++ b/examples/features/flow_control/README.md @@ -0,0 +1,55 @@ +# Flow Control + +Flow control is a feature in gRPC that prevents senders from writing more data +on a stream than a receiver is capable of handling. This feature behaves the +same for both clients and servers. Because gRPC-Go uses a blocking-style API +for stream operations, flow control pushback is implemented by simply blocking +send operations on a stream when that stream's flow control limits have been +reached. When the receiver has read enough data from the stream, the send +operation will unblock automatically. Flow control is configured automatically +based on a connection's Bandwidth Delay Product (BDP) to ensure the buffer is +the minimum size necessary to allow for maximum throughput on the stream if the +receiver is reading at its maximum speed. + +## Try it + +``` +go run ./server +``` + +``` +go run ./client +``` + +## Example explanation + +The example client and server are written to demonstrate the blocking by +intentionally sending messages while the other side is not receiving. The +bidirectional echo stream in the example begins by having the client send +messages until it detects it has blocked (utilizing another goroutine). The +server sleeps for 2 seconds to allow this to occur. Then the server will read +all of these messages, and the roles of the client and server are swapped so the +server attempts to send continuously while the client sleeps. After the client +sleeps for 2 seconds, it will read again to unblock the server. The server will +detect that it has blocked, and end the stream once it has unblocked. + +### Expected Output + +The client output should look like: +``` +2023/09/19 15:49:49 New stream began. +2023/09/19 15:49:50 Sending is blocked. +2023/09/19 15:49:51 Sent 25 messages. +2023/09/19 15:49:53 Read 25 messages. +2023/09/19 15:49:53 Stream ended successfully. +``` + +while the server should output the following logs: + +``` +2023/09/19 15:49:49 New stream began. +2023/09/19 15:49:51 Read 25 messages. +2023/09/19 15:49:52 Sending is blocked. +2023/09/19 15:49:53 Sent 25 messages. +2023/09/19 15:49:53 Stream ended successfully. +``` diff --git a/examples/features/flow_control/client/main.go b/examples/features/flow_control/client/main.go new file mode 100644 index 000000000000..2af381759952 --- /dev/null +++ b/examples/features/flow_control/client/main.go @@ -0,0 +1,102 @@ +/* + * + * Copyright 2023 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +// Binary client is an example client. +package main + +import ( + "context" + "flag" + "io" + "log" + "time" + + "google.golang.org/grpc" + "google.golang.org/grpc/credentials/insecure" + pb "google.golang.org/grpc/examples/features/proto/echo" + "google.golang.org/grpc/internal/grpcsync" +) + +var addr = flag.String("addr", "localhost:50052", "the address to connect to") + +var payload string = string(make([]byte, 8*1024)) // 8KB + +func main() { + flag.Parse() + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + + conn, err := grpc.Dial(*addr, grpc.WithTransportCredentials(insecure.NewCredentials())) + if err != nil { + log.Fatalf("did not connect: %v", err) + } + defer conn.Close() + + c := pb.NewEchoClient(conn) + + stream, err := c.BidirectionalStreamingEcho(ctx) + if err != nil { + log.Fatalf("Error creating stream: %v", err) + } + log.Printf("New stream began.") + + // First we will send data on the stream until we cannot send any more. We + // detect this by not seeing a message sent 1s after the last sent message. + stopSending := grpcsync.NewEvent() + sentOne := make(chan struct{}) + go func() { + i := 0 + for !stopSending.HasFired() { + i++ + if err := stream.Send(&pb.EchoRequest{Message: payload}); err != nil { + log.Fatalf("Error sending data: %v", err) + } + sentOne <- struct{}{} + } + log.Printf("Sent %v messages.", i) + stream.CloseSend() + }() + + for !stopSending.HasFired() { + after := time.NewTimer(time.Second) + select { + case <-sentOne: + after.Stop() + case <-after.C: + log.Printf("Sending is blocked.") + stopSending.Fire() + <-sentOne + } + } + + // Next, we wait 2 seconds before reading from the stream, to give the + // server an opportunity to block while sending its responses. + time.Sleep(2 * time.Second) + + // Finally, read all the data sent by the server to allow it to unblock. + for i := 0; true; i++ { + if _, err := stream.Recv(); err != nil { + log.Printf("Read %v messages.", i) + if err == io.EOF { + log.Printf("Stream ended successfully.") + return + } + log.Fatalf("Error receiving data: %v", err) + } + } +} diff --git a/examples/features/flow_control/server/main.go b/examples/features/flow_control/server/main.go new file mode 100644 index 000000000000..1b674cab416b --- /dev/null +++ b/examples/features/flow_control/server/main.go @@ -0,0 +1,110 @@ +/* + * + * Copyright 2023 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +// Binary server is an example server. +package main + +import ( + "flag" + "fmt" + "io" + "log" + "net" + "time" + + "google.golang.org/grpc" + + pb "google.golang.org/grpc/examples/features/proto/echo" + "google.golang.org/grpc/internal/grpcsync" +) + +var port = flag.Int("port", 50052, "port number") + +var payload string = string(make([]byte, 8*1024)) // 8KB + +// server is used to implement EchoServer. +type server struct { + pb.UnimplementedEchoServer +} + +func (s *server) BidirectionalStreamingEcho(stream pb.Echo_BidirectionalStreamingEchoServer) error { + log.Printf("New stream began.") + // First, we wait 2 seconds before reading from the stream, to give the + // client an opportunity to block while sending its requests. + time.Sleep(2 * time.Second) + + // Next, read all the data sent by the client to allow it to unblock. + for i := 0; true; i++ { + if _, err := stream.Recv(); err != nil { + log.Printf("Read %v messages.", i) + if err == io.EOF { + break + } + log.Printf("Error receiving data: %v", err) + return err + } + } + + // Finally, send data until we block, then end the stream after we unblock. + stopSending := grpcsync.NewEvent() + sentOne := make(chan struct{}) + go func() { + for !stopSending.HasFired() { + after := time.NewTimer(time.Second) + select { + case <-sentOne: + after.Stop() + case <-after.C: + log.Printf("Sending is blocked.") + stopSending.Fire() + <-sentOne + } + } + }() + + i := 0 + for !stopSending.HasFired() { + i++ + if err := stream.Send(&pb.EchoResponse{Message: payload}); err != nil { + log.Printf("Error sending data: %v", err) + return err + } + sentOne <- struct{}{} + } + log.Printf("Sent %v messages.", i) + + log.Printf("Stream ended successfully.") + return nil +} + +func main() { + flag.Parse() + + address := fmt.Sprintf(":%v", *port) + lis, err := net.Listen("tcp", address) + if err != nil { + log.Fatalf("failed to listen: %v", err) + } + + grpcServer := grpc.NewServer() + pb.RegisterEchoServer(grpcServer, &server{}) + + if err := grpcServer.Serve(lis); err != nil { + log.Fatalf("failed to serve: %v", err) + } +}