@@ -53,6 +53,7 @@ import (
5353 "reflect"
5454 "runtime"
5555 "runtime/pprof"
56+ "strconv"
5657 "strings"
5758 "sync"
5859 "sync/atomic"
8182 traceMode = flags .StringWithAllowedValues ("trace" , toggleModeOff ,
8283 fmt .Sprintf ("Trace mode - One of: %v" , strings .Join (allToggleModes , ", " )), allToggleModes )
8384 preloaderMode = flags .StringWithAllowedValues ("preloader" , toggleModeOff ,
84- fmt .Sprintf ("Preloader mode - One of: %v" , strings .Join (allToggleModes , ", " )), allToggleModes )
85+ fmt .Sprintf ("Preloader mode - One of: %v, preloader works only in streaming and unconstrained modes and will be ignored in unary mode" ,
86+ strings .Join (allToggleModes , ", " )), allToggleModes )
8587 channelzOn = flags .StringWithAllowedValues ("channelz" , toggleModeOff ,
8688 fmt .Sprintf ("Channelz mode - One of: %v" , strings .Join (allToggleModes , ", " )), allToggleModes )
8789 compressorMode = flags .StringWithAllowedValues ("compression" , compModeOff ,
@@ -401,20 +403,11 @@ func makeFuncUnary(bf stats.Features) (rpcCallFunc, rpcCleanupFunc) {
401403}
402404
403405func makeFuncStream (bf stats.Features ) (rpcCallFunc , rpcCleanupFunc ) {
404- clients , cleanup := makeClients (bf )
406+ streams , req , cleanup := setupStream (bf , false )
405407
406- streams := make ([][]testgrpc.BenchmarkService_StreamingCallClient , bf .Connections )
407- for cn := 0 ; cn < bf .Connections ; cn ++ {
408- tc := clients [cn ]
409- streams [cn ] = make ([]testgrpc.BenchmarkService_StreamingCallClient , bf .MaxConcurrentCalls )
410- for pos := 0 ; pos < bf .MaxConcurrentCalls ; pos ++ {
411-
412- stream , err := tc .StreamingCall (context .Background ())
413- if err != nil {
414- logger .Fatalf ("%v.StreamingCall(_) = _, %v" , tc , err )
415- }
416- streams [cn ][pos ] = stream
417- }
408+ var preparedMsg [][]* grpc.PreparedMsg
409+ if bf .EnablePreloader {
410+ preparedMsg = prepareMessages (streams , req )
418411 }
419412
420413 return func (cn , pos int ) {
@@ -426,24 +419,25 @@ func makeFuncStream(bf stats.Features) (rpcCallFunc, rpcCleanupFunc) {
426419 if bf .RespPayloadCurve != nil {
427420 respSizeBytes = bf .RespPayloadCurve .ChooseRandom ()
428421 }
429- streamCaller (streams [cn ][pos ], reqSizeBytes , respSizeBytes )
422+ var req interface {}
423+ if bf .EnablePreloader {
424+ req = preparedMsg [cn ][pos ]
425+ } else {
426+ pl := bm .NewPayload (testpb .PayloadType_COMPRESSABLE , reqSizeBytes )
427+ req = & testpb.SimpleRequest {
428+ ResponseType : pl .Type ,
429+ ResponseSize : int32 (respSizeBytes ),
430+ Payload : pl ,
431+ }
432+ }
433+ streamCaller (streams [cn ][pos ], req )
430434 }, cleanup
431435}
432436
433437func makeFuncUnconstrainedStreamPreloaded (bf stats.Features ) (rpcSendFunc , rpcRecvFunc , rpcCleanupFunc ) {
434- streams , req , cleanup := setupUnconstrainedStream (bf )
438+ streams , req , cleanup := setupStream (bf , true )
435439
436- preparedMsg := make ([][]* grpc.PreparedMsg , len (streams ))
437- for cn , connStreams := range streams {
438- preparedMsg [cn ] = make ([]* grpc.PreparedMsg , len (connStreams ))
439- for pos , stream := range connStreams {
440- preparedMsg [cn ][pos ] = & grpc.PreparedMsg {}
441- err := preparedMsg [cn ][pos ].Encode (stream , req )
442- if err != nil {
443- logger .Fatalf ("%v.Encode(%v, %v) = %v" , preparedMsg [cn ][pos ], req , stream , err )
444- }
445- }
446- }
440+ preparedMsg := prepareMessages (streams , req )
447441
448442 return func (cn , pos int ) {
449443 streams [cn ][pos ].SendMsg (preparedMsg [cn ][pos ])
@@ -453,7 +447,7 @@ func makeFuncUnconstrainedStreamPreloaded(bf stats.Features) (rpcSendFunc, rpcRe
453447}
454448
455449func makeFuncUnconstrainedStream (bf stats.Features ) (rpcSendFunc , rpcRecvFunc , rpcCleanupFunc ) {
456- streams , req , cleanup := setupUnconstrainedStream (bf )
450+ streams , req , cleanup := setupStream (bf , true )
457451
458452 return func (cn , pos int ) {
459453 streams [cn ][pos ].Send (req )
@@ -462,13 +456,19 @@ func makeFuncUnconstrainedStream(bf stats.Features) (rpcSendFunc, rpcRecvFunc, r
462456 }, cleanup
463457}
464458
465- func setupUnconstrainedStream (bf stats.Features ) ([][]testgrpc.BenchmarkService_StreamingCallClient , * testpb.SimpleRequest , rpcCleanupFunc ) {
459+ func setupStream (bf stats.Features , unconstrained bool ) ([][]testgrpc.BenchmarkService_StreamingCallClient , * testpb.SimpleRequest , rpcCleanupFunc ) {
466460 clients , cleanup := makeClients (bf )
467461
468462 streams := make ([][]testgrpc.BenchmarkService_StreamingCallClient , bf .Connections )
469- md := metadata .Pairs (benchmark .UnconstrainedStreamingHeader , "1" ,
470- benchmark .UnconstrainedStreamingDelayHeader , bf .SleepBetweenRPCs .String ())
471- ctx := metadata .NewOutgoingContext (context .Background (), md )
463+ ctx := context .Background ()
464+ if unconstrained {
465+ md := metadata .Pairs (benchmark .UnconstrainedStreamingHeader , "1" , benchmark .UnconstrainedStreamingDelayHeader , bf .SleepBetweenRPCs .String ())
466+ ctx = metadata .NewOutgoingContext (ctx , md )
467+ }
468+ if bf .EnablePreloader {
469+ md := metadata .Pairs (benchmark .PreloadMsgSizeHeader , strconv .Itoa (bf .RespSizeBytes ), benchmark .UnconstrainedStreamingDelayHeader , bf .SleepBetweenRPCs .String ())
470+ ctx = metadata .NewOutgoingContext (ctx , md )
471+ }
472472 for cn := 0 ; cn < bf .Connections ; cn ++ {
473473 tc := clients [cn ]
474474 streams [cn ] = make ([]testgrpc.BenchmarkService_StreamingCallClient , bf .MaxConcurrentCalls )
@@ -491,6 +491,20 @@ func setupUnconstrainedStream(bf stats.Features) ([][]testgrpc.BenchmarkService_
491491 return streams , req , cleanup
492492}
493493
494+ func prepareMessages (streams [][]testgrpc.BenchmarkService_StreamingCallClient , req * testpb.SimpleRequest ) [][]* grpc.PreparedMsg {
495+ preparedMsg := make ([][]* grpc.PreparedMsg , len (streams ))
496+ for cn , connStreams := range streams {
497+ preparedMsg [cn ] = make ([]* grpc.PreparedMsg , len (connStreams ))
498+ for pos , stream := range connStreams {
499+ preparedMsg [cn ][pos ] = & grpc.PreparedMsg {}
500+ if err := preparedMsg [cn ][pos ].Encode (stream , req ); err != nil {
501+ logger .Fatalf ("%v.Encode(%v, %v) = %v" , preparedMsg [cn ][pos ], req , stream , err )
502+ }
503+ }
504+ }
505+ return preparedMsg
506+ }
507+
494508// Makes a UnaryCall gRPC request using the given BenchmarkServiceClient and
495509// request and response sizes.
496510func unaryCaller (client testgrpc.BenchmarkServiceClient , reqSize , respSize int ) {
@@ -499,8 +513,8 @@ func unaryCaller(client testgrpc.BenchmarkServiceClient, reqSize, respSize int)
499513 }
500514}
501515
502- func streamCaller (stream testgrpc.BenchmarkService_StreamingCallClient , reqSize , respSize int ) {
503- if err := bm .DoStreamingRoundTrip (stream , reqSize , respSize ); err != nil {
516+ func streamCaller (stream testgrpc.BenchmarkService_StreamingCallClient , req interface {} ) {
517+ if err := bm .DoStreamingRoundTripPreloaded (stream , req ); err != nil {
504518 logger .Fatalf ("DoStreamingRoundTrip failed: %v" , err )
505519 }
506520}
@@ -790,6 +804,9 @@ func processFlags() *benchOpts {
790804 if len (opts .features .reqSizeBytes ) != 0 {
791805 log .Fatalf ("you may not specify -reqPayloadCurveFiles and -reqSizeBytes at the same time" )
792806 }
807+ if len (opts .features .enablePreloader ) != 0 {
808+ log .Fatalf ("you may not specify -reqPayloadCurveFiles and -preloader at the same time" )
809+ }
793810 for _ , file := range * reqPayloadCurveFiles {
794811 pc , err := stats .NewPayloadCurve (file )
795812 if err != nil {
@@ -807,6 +824,9 @@ func processFlags() *benchOpts {
807824 if len (opts .features .respSizeBytes ) != 0 {
808825 log .Fatalf ("you may not specify -respPayloadCurveFiles and -respSizeBytes at the same time" )
809826 }
827+ if len (opts .features .enablePreloader ) != 0 {
828+ log .Fatalf ("you may not specify -respPayloadCurveFiles and -preloader at the same time" )
829+ }
810830 for _ , file := range * respPayloadCurveFiles {
811831 pc , err := stats .NewPayloadCurve (file )
812832 if err != nil {
0 commit comments