diff --git a/internal/hammer/hammer.go b/internal/hammer/hammer.go index e64d66703..bca516930 100644 --- a/internal/hammer/hammer.go +++ b/internal/hammer/hammer.go @@ -16,6 +16,7 @@ package main import ( + "bytes" "context" "crypto/ecdh" "crypto/ecdsa" @@ -24,20 +25,27 @@ import ( "crypto/tls" "crypto/x509" "encoding/base64" + "encoding/json" "encoding/pem" "errors" "flag" "fmt" + "io" "math/rand/v2" "net/http" + "net/url" "os" + "strconv" "strings" "sync" "time" tdnote "github.com/transparency-dev/formats/note" "github.com/transparency-dev/tesseract/internal/client" + "github.com/transparency-dev/tesseract/internal/client/gcp" "github.com/transparency-dev/tesseract/internal/hammer/loadtest" + "github.com/transparency-dev/tesseract/internal/types/rfc6962" + "github.com/transparency-dev/tesseract/internal/types/staticct" "golang.org/x/mod/sumdb/note" "golang.org/x/net/http2" @@ -113,18 +121,15 @@ func main() { klog.Exitf("Failed to create verifier: %v", err) } - f, w, err := loadtest.NewLogClients(ctx, logURL, writeLogURL, loadtest.ClientOpts{ - Client: hc, - BearerToken: *bearerToken, - BearerTokenWrite: *bearerTokenWrite, - }) - if err != nil { - klog.Exit(err) + r := mustCreateReaders(ctx, logURL) + if len(writeLogURL) == 0 { + writeLogURL = logURL } + w := mustCreateWriters(writeLogURL) var cpRaw []byte - cons := client.UnilateralConsensus(f.ReadCheckpoint) - tracker, err := client.NewLogStateTracker(ctx, f.ReadCheckpoint, f.ReadTile, cpRaw, logSigV, logSigV.Name(), cons) + cons := client.UnilateralConsensus(r.ReadCheckpoint) + tracker, err := client.NewLogStateTracker(ctx, r.ReadCheckpoint, r.ReadTile, cpRaw, logSigV, logSigV.Name(), cons) if err != nil { klog.Exitf("Failed to create LogStateTracker: %v", err) } @@ -166,7 +171,7 @@ func main() { NumMMDVerifiers: *numMMDVerifiers, MMDDuration: *mmdDuration, } - hammer := loadtest.NewHammer(&tracker, f.ReadEntryBundle, w, gen, ha.SeqLeafChan, ha.ErrChan, opts) + hammer := loadtest.NewHammer(&tracker, r.ReadEntryBundle, w, gen, ha.SeqLeafChan, ha.ErrChan, opts) exitCode := 0 if *leafWriteGoal > 0 { @@ -393,3 +398,126 @@ func logSigVerifier(origin, b64PubKey string) (note.Verifier, error) { return logSigV, nil } + +func mustCreateReaders(ctx context.Context, us []string) loadtest.LogReader { + r := []loadtest.LogReader{} + for _, u := range us { + if !strings.HasSuffix(u, "/") { + u += "/" + } + rURL, err := url.Parse(u) + if err != nil { + klog.Exitf("Invalid log reader URL %q: %v", u, err) + } + + switch rURL.Scheme { + case "http", "https": + c, err := client.NewHTTPFetcher(rURL, http.DefaultClient) + if err != nil { + klog.Exitf("Failed to create HTTP fetcher for %q: %v", u, err) + } + if *bearerToken != "" { + c.SetAuthorizationHeader(fmt.Sprintf("Bearer %s", *bearerToken)) + } + r = append(r, c) + case "file": + r = append(r, client.FileFetcher{Root: rURL.Path}) + case "gs": + c, err := gcp.NewGSFetcher(ctx, rURL.Host, nil) + if err != nil { + klog.Exitf("NewGSFetcher: %v", err) + } + r = append(r, c) + default: + klog.Exitf("Unsupported scheme %s on log URL", rURL.Scheme) + } + } + return loadtest.NewRoundRobinReader(r) +} + +func mustCreateWriters(us []string) loadtest.LeafWriter { + w := []loadtest.LeafWriter{} + for _, u := range us { + if !strings.HasSuffix(u, "/") { + u += "/" + } + u += "ct/v1/add-chain" + wURL, err := url.Parse(u) + if err != nil { + klog.Exitf("Invalid log writer URL %q: %v", u, err) + } + w = append(w, httpWriter(wURL, http.DefaultClient, *bearerTokenWrite)) + } + return loadtest.NewRoundRobinWriter(w) +} + +func httpWriter(u *url.URL, hc *http.Client, bearerToken string) loadtest.LeafWriter { + return func(ctx context.Context, newLeaf []byte) (uint64, uint64, error) { + req, err := http.NewRequest(http.MethodPost, u.String(), bytes.NewReader(newLeaf)) + if err != nil { + return 0, 0, fmt.Errorf("failed to create request: %v", err) + } + if bearerToken != "" { + req.Header.Add("Authorization", fmt.Sprintf("Bearer %s", bearerToken)) + } + resp, err := hc.Do(req.WithContext(ctx)) + if err != nil { + return 0, 0, fmt.Errorf("failed to write leaf: %v", err) + } + body, err := io.ReadAll(resp.Body) + _ = resp.Body.Close() + if err != nil { + return 0, 0, fmt.Errorf("failed to read body: %v", err) + } + switch resp.StatusCode { + case http.StatusOK: + if resp.Request.Method != http.MethodPost { + return 0, 0, fmt.Errorf("write leaf was redirected to %s", resp.Request.URL) + } + // Continue below + case http.StatusServiceUnavailable, http.StatusBadGateway, http.StatusGatewayTimeout, http.StatusTooManyRequests: + // These status codes may indicate a delay before retrying, so handle that here: + time.Sleep(retryDelay(resp.Header.Get("Retry-After"), time.Second)) + + return 0, 0, fmt.Errorf("log not available. Status code: %d. Body: %q %w", resp.StatusCode, body, loadtest.ErrRetry) + default: + return 0, 0, fmt.Errorf("write leaf was not OK. Status code: %d. Body: %q", resp.StatusCode, body) + } + index, timestamp, err := parseAddChainResponse(body) + if err != nil { + return 0, 0, fmt.Errorf("write leaf failed to parse response: %v", body) + } + return index, timestamp, nil + } +} + +func retryDelay(retryAfter string, defaultDur time.Duration) time.Duration { + if retryAfter == "" { + return defaultDur + } + d, err := time.Parse(http.TimeFormat, retryAfter) + if err == nil { + return time.Until(d) + } + s, err := strconv.Atoi(retryAfter) + if err == nil { + return time.Duration(s) * time.Second + } + return defaultDur +} + +// parseAddChainResponse parses the add-chain response and returns the leaf +// index from the extensions and timestamp from the response. +// Code is inspired by https://github.com/FiloSottile/sunlight/blob/main/tile.go. +func parseAddChainResponse(body []byte) (uint64, uint64, error) { + var resp rfc6962.AddChainResponse + if err := json.Unmarshal(body, &resp); err != nil { + return 0, 0, fmt.Errorf("can't parse add-chain response: %v", err) + } + + leafIdx, err := staticct.ParseCTExtensions(resp.Extensions) + if err != nil { + return 0, 0, fmt.Errorf("can't parse extensions: %v", err) + } + return uint64(leafIdx), resp.Timestamp, nil +} diff --git a/internal/hammer/loadtest/client.go b/internal/hammer/loadtest/client.go index 5f32822db..d6d8564f4 100644 --- a/internal/hammer/loadtest/client.go +++ b/internal/hammer/loadtest/client.go @@ -15,24 +15,10 @@ package loadtest import ( - "bytes" "context" - "encoding/json" "errors" - "fmt" - "io" "net/http" - "net/url" - "strconv" - "strings" "sync" - "time" - - "github.com/transparency-dev/tesseract/internal/client" - "github.com/transparency-dev/tesseract/internal/client/gcp" - "github.com/transparency-dev/tesseract/internal/types/rfc6962" - "github.com/transparency-dev/tesseract/internal/types/staticct" - "k8s.io/klog/v2" ) var ErrRetry = errors.New("retry") @@ -50,168 +36,47 @@ type ClientOpts struct { Client *http.Client } -// NewLogClients returns a fetcher and a writer that will read -// and write leaves to all logs in the `log_url` flag set. -func NewLogClients(ctx context.Context, readLogURLs, writeLogURLs []string, opts ClientOpts) (LogReader, LeafWriter, error) { - if len(readLogURLs) == 0 { - return nil, nil, fmt.Errorf("URL(s) for reading log must be provided") - } - - if len(writeLogURLs) == 0 { - // If no write_log_url is provided, then default it to log_url - writeLogURLs = readLogURLs - } - - rootUrlOrDie := func(s string) *url.URL { - // url must reference a directory, by definition - if !strings.HasSuffix(s, "/") { - s += "/" - } - rootURL, err := url.Parse(s) - if err != nil { - klog.Exitf("Invalid log URL: %v", err) - } - return rootURL - } - - fetchers := []fetcher{} - for _, s := range readLogURLs { - fetchers = append(fetchers, newFetcher(ctx, rootUrlOrDie(s), opts.BearerToken)) - } - writers := []httpLeafWriter{} - for _, s := range writeLogURLs { - addURL, err := rootUrlOrDie(s).Parse("ct/v1/add-chain") - if err != nil { - return nil, nil, fmt.Errorf("failed to create add URL: %v", err) - } - writers = append(writers, newHTTPLeafWriter(opts.Client, addURL, opts.BearerTokenWrite)) - } - return &roundRobinFetcher{f: fetchers}, (&roundRobinLeafWriter{ws: writers}).Write, nil +// NewRoundRobinReader creates a new LogReader which will spread read requests over the passed-in LogReaders. +func NewRoundRobinReader(r []LogReader) LogReader { + return &roundRobinReader{r: r} } -// newFetcher creates a Fetcher for the log at the given root location. -func newFetcher(ctx context.Context, root *url.URL, bearerToken string) fetcher { - switch root.Scheme { - case "http", "https": - c, err := client.NewHTTPFetcher(root, nil) - if err != nil { - klog.Exitf("NewHTTPFetcher: %v", err) - } - if bearerToken != "" { - c.SetAuthorizationHeader(fmt.Sprintf("Bearer %s", bearerToken)) - } - return c - case "file": - return client.FileFetcher{Root: root.Path} - case "gs": - c, err := gcp.NewGSFetcher(ctx, root.Host, nil) - if err != nil { - klog.Exitf("NewGSFetcher: %v", err) - } - return c - } - klog.Exitf("Unknown scheme on log URL: %q", root.Scheme) - return nil +// NewRoundRobinWriter creates a new LeafWriter which will spread write requests over the passed-in LeafWriters. +func NewRoundRobinWriter(w []LeafWriter) LeafWriter { + return (&roundRobinLeafWriter{ws: w}).Write } -// roundRobinFetcher ensures that read requests are sent to all configured fetchers +// roundRobinReader ensures that read requests are sent to all configured readers // using a round-robin strategy. -type roundRobinFetcher struct { +type roundRobinReader struct { sync.Mutex idx int - f []fetcher + r []LogReader } -func (rr *roundRobinFetcher) ReadCheckpoint(ctx context.Context) ([]byte, error) { +func (rr *roundRobinReader) ReadCheckpoint(ctx context.Context) ([]byte, error) { f := rr.next() return f.ReadCheckpoint(ctx) } -func (rr *roundRobinFetcher) ReadTile(ctx context.Context, l, i uint64, p uint8) ([]byte, error) { +func (rr *roundRobinReader) ReadTile(ctx context.Context, l, i uint64, p uint8) ([]byte, error) { f := rr.next() return f.ReadTile(ctx, l, i, p) } -func (rr *roundRobinFetcher) ReadEntryBundle(ctx context.Context, i uint64, p uint8) ([]byte, error) { +func (rr *roundRobinReader) ReadEntryBundle(ctx context.Context, i uint64, p uint8) ([]byte, error) { f := rr.next() return f.ReadEntryBundle(ctx, i, p) } -func (rr *roundRobinFetcher) next() fetcher { +func (rr *roundRobinReader) next() fetcher { rr.Lock() defer rr.Unlock() - f := rr.f[rr.idx] - rr.idx = (rr.idx + 1) % len(rr.f) - - return f -} - -func newHTTPLeafWriter(hc *http.Client, u *url.URL, bearerToken string) httpLeafWriter { - return httpLeafWriter{ - hc: hc, - u: u, - bearerToken: bearerToken, - } -} + r := rr.r[rr.idx] + rr.idx = (rr.idx + 1) % len(rr.r) -type httpLeafWriter struct { - hc *http.Client - u *url.URL - bearerToken string -} - -func (w httpLeafWriter) Write(ctx context.Context, newLeaf []byte) (uint64, uint64, error) { - req, err := http.NewRequest(http.MethodPost, w.u.String(), bytes.NewReader(newLeaf)) - if err != nil { - return 0, 0, fmt.Errorf("failed to create request: %v", err) - } - if w.bearerToken != "" { - req.Header.Add("Authorization", fmt.Sprintf("Bearer %s", w.bearerToken)) - } - resp, err := w.hc.Do(req.WithContext(ctx)) - if err != nil { - return 0, 0, fmt.Errorf("failed to write leaf: %v", err) - } - body, err := io.ReadAll(resp.Body) - _ = resp.Body.Close() - if err != nil { - return 0, 0, fmt.Errorf("failed to read body: %v", err) - } - switch resp.StatusCode { - case http.StatusOK: - if resp.Request.Method != http.MethodPost { - return 0, 0, fmt.Errorf("write leaf was redirected to %s", resp.Request.URL) - } - // Continue below - case http.StatusServiceUnavailable, http.StatusBadGateway, http.StatusGatewayTimeout, http.StatusTooManyRequests: - // These status codes may indicate a delay before retrying, so handle that here: - time.Sleep(retryDelay(resp.Header.Get("Retry-After"), time.Second)) - - return 0, 0, fmt.Errorf("log not available. Status code: %d. Body: %q %w", resp.StatusCode, body, ErrRetry) - default: - return 0, 0, fmt.Errorf("write leaf was not OK. Status code: %d. Body: %q", resp.StatusCode, body) - } - index, timestamp, err := parseAddChainResponse(body) - if err != nil { - return 0, 0, fmt.Errorf("write leaf failed to parse response: %v", body) - } - return index, timestamp, nil -} - -func retryDelay(retryAfter string, defaultDur time.Duration) time.Duration { - if retryAfter == "" { - return defaultDur - } - d, err := time.Parse(http.TimeFormat, retryAfter) - if err == nil { - return time.Until(d) - } - s, err := strconv.Atoi(retryAfter) - if err == nil { - return time.Duration(s) * time.Second - } - return defaultDur + return r } // roundRobinLeafWriter ensures that write requests are sent to all configured @@ -219,7 +84,7 @@ func retryDelay(retryAfter string, defaultDur time.Duration) time.Duration { type roundRobinLeafWriter struct { sync.Mutex idx int - ws []httpLeafWriter + ws []LeafWriter } func (rr *roundRobinLeafWriter) Write(ctx context.Context, newLeaf []byte) (uint64, uint64, error) { @@ -231,24 +96,8 @@ func (rr *roundRobinLeafWriter) next() LeafWriter { rr.Lock() defer rr.Unlock() - f := rr.ws[rr.idx] + w := rr.ws[rr.idx] rr.idx = (rr.idx + 1) % len(rr.ws) - return f.Write -} - -// parseAddChainResponse parses the add-chain response and returns the leaf -// index from the extensions and timestamp from the response. -// Code is inspired by https://github.com/FiloSottile/sunlight/blob/main/tile.go. -func parseAddChainResponse(body []byte) (uint64, uint64, error) { - var resp rfc6962.AddChainResponse - if err := json.Unmarshal(body, &resp); err != nil { - return 0, 0, fmt.Errorf("can't parse add-chain response: %v", err) - } - - leafIdx, err := staticct.ParseCTExtensions(resp.Extensions) - if err != nil { - return 0, 0, fmt.Errorf("can't parse extensions: %v", err) - } - return uint64(leafIdx), resp.Timestamp, nil + return w }