diff --git a/BUILD.bazel b/BUILD.bazel index 298e8b322..791392e4e 100644 --- a/BUILD.bazel +++ b/BUILD.bazel @@ -19,6 +19,7 @@ go_library( "//cache/gcsproxy:go_default_library", "//cache/httpproxy:go_default_library", "//cache/s3proxy:go_default_library", + "//cache/metricsdecorator:go_default_library", "//config:go_default_library", "//server:go_default_library", "//utils/idle:go_default_library", diff --git a/README.md b/README.md index 70cd71e52..b89855136 100644 --- a/README.md +++ b/README.md @@ -223,6 +223,23 @@ host: localhost # If true, enable experimental remote asset API support: #experimental_remote_asset_api: true + +# Allows mapping HTTP and gRPC headers to prometheus +# labels. Headers can be set by bazel client as: +# --remote_header=os=ubuntu18-04. Not all counters are +# affected. +#metrics: +# categories: +# os: +# - rhel7 +# - rhel8 +# - ubuntu16-04 +# - ubuntu18-04 +# branch: +# - master +# user: +# - ci + ``` ## Docker diff --git a/cache/BUILD.bazel b/cache/BUILD.bazel index 14417d924..afb38deb3 100644 --- a/cache/BUILD.bazel +++ b/cache/BUILD.bazel @@ -5,4 +5,7 @@ go_library( srcs = ["cache.go"], importpath = "github.com/buchgr/bazel-remote/cache", visibility = ["//visibility:public"], + deps = [ + "@com_github_bazelbuild_remote_apis//build/bazel/remote/execution/v2:go_default_library", + ], ) diff --git a/cache/cache.go b/cache/cache.go index 49c7cc93c..e99c8037d 100644 --- a/cache/cache.go +++ b/cache/cache.go @@ -5,6 +5,8 @@ import ( "encoding/hex" "io" "path/filepath" + + pb "github.com/bazelbuild/remote-apis/build/bazel/remote/execution/v2" ) // EntryKind describes the kind of cache entry @@ -50,6 +52,52 @@ func (e *Error) Error() string { return e.Text } +// Represent the context of an incoming request. For now it acts as an +// adapter providing a common interface to headers from HTTP and gRPC +// requests. In the future it could be extend if additional +// information needs to be associated with a request, or be propagated +// from HTTP/gRPC servers towards disk cache, or perhaps further +// to proxies. +type RequestContext interface { + // Return values for HTTP/gRPC header in the associated + // request. Returns a slice since there could be several + // headers with same name. Returns empty slice if no + // headers exist with the requested name. + // The headerName is expected in lowercase. + GetHeader(headerName string) (headerValues []string) +} + +// TODO Document interface +type BlobStore interface { + // TODO change to io.ReadCloser? + Put(kind EntryKind, hash string, size int64, rdr io.Reader, reqCtx RequestContext) error + + Get(kind EntryKind, hash string, size int64, reqCtx RequestContext) (io.ReadCloser, int64, error) + + Contains(kind EntryKind, hash string, size int64, reqCtx RequestContext) (bool, int64) +} + +// TODO Document interface +type AcStore interface { + GetValidatedActionResult(hash string, reqCtx RequestContext) (*pb.ActionResult, []byte, error) +} + +// TODO Document interface +type BlobAcStore interface { + BlobStore + AcStore +} + +// TODO Document interface +type Stats interface { + Stats() (totalSize int64, reservedSize int64, numItems int) + MaxSize() int64 +} + +// TODO Could the proxies implement the BlobStore interface instead? And remove Proxy interface? +// Having access to the original headers would allow new use cases such as forwarding of +// custom headers from client via proxy, or support for HTTP headers like Max-Forwards. + // Proxy is the interface that (optional) proxy backends must implement. // Implementations are expected to be safe for concurrent use. type Proxy interface { diff --git a/cache/disk/disk.go b/cache/disk/disk.go index 40c4a6a7c..53b1b8978 100644 --- a/cache/disk/disk.go +++ b/cache/disk/disk.go @@ -27,6 +27,7 @@ import ( "github.com/golang/protobuf/proto" ) +// TODO remove these counters? var ( cacheHits = promauto.NewCounter(prometheus.CounterOpts{ Name: "bazel_remote_disk_cache_hits", @@ -230,7 +231,7 @@ func (c *Cache) loadExistingFiles() error { // Put stores a stream of `size` bytes from `r` into the cache. // If `hash` is not the empty string, and the contents don't match it, // a non-nil error is returned. -func (c *Cache) Put(kind cache.EntryKind, hash string, size int64, r io.Reader) (rErr error) { +func (c *Cache) Put(kind cache.EntryKind, hash string, size int64, r io.Reader, reqCtx cache.RequestContext) (rErr error) { if size < 0 { return fmt.Errorf("Invalid (negative) size: %d", size) } @@ -471,7 +472,7 @@ func (c *Cache) availableOrTryProxy(key string, size int64, blobPath string) (rc // item is not found, the io.ReadCloser will be nil. If some error occurred // when processing the request, then it is returned. Callers should provide // the `size` of the item to be retrieved, or -1 if unknown. -func (c *Cache) Get(kind cache.EntryKind, hash string, size int64) (rc io.ReadCloser, s int64, rErr error) { +func (c *Cache) Get(kind cache.EntryKind, hash string, size int64, reqCtx cache.RequestContext) (rc io.ReadCloser, s int64, rErr error) { // The hash format is checked properly in the http/grpc code. // Just perform a simple/fast check here, to catch bad tests. @@ -575,7 +576,7 @@ func (c *Cache) Get(kind cache.EntryKind, hash string, size int64) (rc io.ReadCl // one) will be checked. // // Callers should provide the `size` of the item, or -1 if unknown. -func (c *Cache) Contains(kind cache.EntryKind, hash string, size int64) (bool, int64) { +func (c *Cache) Contains(kind cache.EntryKind, hash string, size int64, reqCtx cache.RequestContext) (bool, int64) { // The hash format is checked properly in the http/grpc code. // Just perform a simple/fast check here, to catch bad tests. @@ -648,9 +649,12 @@ func cacheFilePath(kind cache.EntryKind, cacheDir string, hash string) string { // value from the CAS if it and all its dependencies are also available. If // not, nil values are returned. If something unexpected went wrong, return // an error. -func (c *Cache) GetValidatedActionResult(hash string) (*pb.ActionResult, []byte, error) { +// TODO Consider separating implementation of cache.AcStore interface, to open up +// possibilities combining that functionality with proxies, and also allow +// bazel-remote configurations with proxy but no local disk storage? +func (c *Cache) GetValidatedActionResult(hash string, reqCtx cache.RequestContext) (*pb.ActionResult, []byte, error) { - rc, sizeBytes, err := c.Get(cache.AC, hash, -1) + rc, sizeBytes, err := c.Get(cache.AC, hash, -1, reqCtx) if rc != nil { defer rc.Close() } @@ -675,7 +679,7 @@ func (c *Cache) GetValidatedActionResult(hash string) (*pb.ActionResult, []byte, for _, f := range result.OutputFiles { if len(f.Contents) == 0 { - found, _ := c.Contains(cache.CAS, f.Digest.Hash, f.Digest.SizeBytes) + found, _ := c.Contains(cache.CAS, f.Digest.Hash, f.Digest.SizeBytes, reqCtx) if !found { return nil, nil, nil // aka "not found" } @@ -683,7 +687,7 @@ func (c *Cache) GetValidatedActionResult(hash string) (*pb.ActionResult, []byte, } for _, d := range result.OutputDirectories { - r, size, err := c.Get(cache.CAS, d.TreeDigest.Hash, d.TreeDigest.SizeBytes) + r, size, err := c.Get(cache.CAS, d.TreeDigest.Hash, d.TreeDigest.SizeBytes, reqCtx) if r == nil { return nil, nil, err // aka "not found", or an err if non-nil } @@ -714,7 +718,7 @@ func (c *Cache) GetValidatedActionResult(hash string) (*pb.ActionResult, []byte, if f.Digest == nil { continue } - found, _ := c.Contains(cache.CAS, f.Digest.Hash, f.Digest.SizeBytes) + found, _ := c.Contains(cache.CAS, f.Digest.Hash, f.Digest.SizeBytes, reqCtx) if !found { return nil, nil, nil // aka "not found" } @@ -725,7 +729,7 @@ func (c *Cache) GetValidatedActionResult(hash string) (*pb.ActionResult, []byte, if f.Digest == nil { continue } - found, _ := c.Contains(cache.CAS, f.Digest.Hash, f.Digest.SizeBytes) + found, _ := c.Contains(cache.CAS, f.Digest.Hash, f.Digest.SizeBytes, reqCtx) if !found { return nil, nil, nil // aka "not found" } @@ -734,14 +738,14 @@ func (c *Cache) GetValidatedActionResult(hash string) (*pb.ActionResult, []byte, } if result.StdoutDigest != nil { - found, _ := c.Contains(cache.CAS, result.StdoutDigest.Hash, result.StdoutDigest.SizeBytes) + found, _ := c.Contains(cache.CAS, result.StdoutDigest.Hash, result.StdoutDigest.SizeBytes, reqCtx) if !found { return nil, nil, nil // aka "not found" } } if result.StderrDigest != nil { - found, _ := c.Contains(cache.CAS, result.StderrDigest.Hash, result.StderrDigest.SizeBytes) + found, _ := c.Contains(cache.CAS, result.StderrDigest.Hash, result.StderrDigest.SizeBytes, reqCtx) if !found { return nil, nil, nil // aka "not found" } diff --git a/cache/disk/disk_test.go b/cache/disk/disk_test.go index d4b341529..3d527925d 100644 --- a/cache/disk/disk_test.go +++ b/cache/disk/disk_test.go @@ -78,7 +78,7 @@ func TestCacheBasics(t *testing.T) { } // Non-existing item - rdr, _, err := testCache.Get(cache.CAS, contentsHash, contentsLength) + rdr, _, err := testCache.Get(cache.CAS, contentsHash, contentsLength, nil) if err != nil { t.Fatal(err) } @@ -88,7 +88,7 @@ func TestCacheBasics(t *testing.T) { // Add an item err = testCache.Put(cache.CAS, contentsHash, int64(len(contents)), - ioutil.NopCloser(strings.NewReader(contents))) + ioutil.NopCloser(strings.NewReader(contents)), nil) if err != nil { t.Fatal(err) } @@ -101,7 +101,7 @@ func TestCacheBasics(t *testing.T) { } // Get the item back - rdr, sizeBytes, err := testCache.Get(cache.CAS, contentsHash, contentsLength) + rdr, sizeBytes, err := testCache.Get(cache.CAS, contentsHash, contentsLength, nil) if err != nil { t.Fatal(err) } @@ -142,7 +142,7 @@ func TestCacheEviction(t *testing.T) { } err := testCache.Put(cache.AC, key, int64(i), - ioutil.NopCloser(strReader)) + ioutil.NopCloser(strReader), nil) if err != nil { t.Fatal(err) } @@ -164,16 +164,16 @@ func TestCachePutWrongSize(t *testing.T) { var err error - err = testCache.Put(cache.AC, hash, int64(len(content)), strings.NewReader(content)) + err = testCache.Put(cache.AC, hash, int64(len(content)), strings.NewReader(content), nil) if err != nil { t.Fatal("Expected success", err) } - err = testCache.Put(cache.AC, hash, int64(len(content))+1, strings.NewReader(content)) + err = testCache.Put(cache.AC, hash, int64(len(content))+1, strings.NewReader(content), nil) if err == nil { t.Error("Expected error due to size being different") } - err = testCache.Put(cache.AC, hash, int64(len(content))-1, strings.NewReader(content)) + err = testCache.Put(cache.AC, hash, int64(len(content))-1, strings.NewReader(content), nil) if err == nil { t.Error("Expected error due to size being different") } @@ -188,27 +188,27 @@ func TestCacheGetContainsWrongSize(t *testing.T) { var found bool var rdr io.ReadCloser - err := testCache.Put(cache.CAS, contentsHash, contentsLength, strings.NewReader(contents)) + err := testCache.Put(cache.CAS, contentsHash, contentsLength, strings.NewReader(contents), nil) if err != nil { t.Fatal("Expected success", err) } - found, _ = testCache.Contains(cache.CAS, contentsHash, contentsLength+1) + found, _ = testCache.Contains(cache.CAS, contentsHash, contentsLength+1, nil) if found { t.Error("Expected not found, due to size being different") } - rdr, _, _ = testCache.Get(cache.CAS, contentsHash, contentsLength+1) + rdr, _, _ = testCache.Get(cache.CAS, contentsHash, contentsLength+1, nil) if rdr != nil { t.Error("Expected not found, due to size being different") } - found, _ = testCache.Contains(cache.CAS, contentsHash, -1) + found, _ = testCache.Contains(cache.CAS, contentsHash, -1, nil) if !found { t.Error("Expected found, when unknown size") } - rdr, _, _ = testCache.Get(cache.CAS, contentsHash, -1) + rdr, _, _ = testCache.Get(cache.CAS, contentsHash, -1, nil) if rdr == nil { t.Error("Expected found, when unknown size") } @@ -225,12 +225,12 @@ func TestCacheGetContainsWrongSizeWithProxy(t *testing.T) { // The proxyStub contains the digest {contentsHash, contentsLength}. - found, _ = testCache.Contains(cache.CAS, contentsHash, contentsLength+1) + found, _ = testCache.Contains(cache.CAS, contentsHash, contentsLength+1, nil) if found { t.Error("Expected not found, due to size being different") } - rdr, _, _ = testCache.Get(cache.CAS, contentsHash, contentsLength+1) + rdr, _, _ = testCache.Get(cache.CAS, contentsHash, contentsLength+1, nil) if rdr != nil { t.Error("Expected not found, due to size being different") } @@ -238,12 +238,12 @@ func TestCacheGetContainsWrongSizeWithProxy(t *testing.T) { t.Fatal(err) } - found, _ = testCache.Contains(cache.CAS, contentsHash, -1) + found, _ = testCache.Contains(cache.CAS, contentsHash, -1, nil) if !found { t.Error("Expected found, when unknown size") } - rdr, _, _ = testCache.Get(cache.CAS, contentsHash, -1) + rdr, _, _ = testCache.Get(cache.CAS, contentsHash, -1, nil) if rdr == nil { t.Error("Expected found, when unknown size") } @@ -302,12 +302,12 @@ func putGetCompareBytes(kind cache.EntryKind, hash string, data []byte, testCach r := bytes.NewReader(data) - err := testCache.Put(kind, hash, int64(len(data)), r) + err := testCache.Put(kind, hash, int64(len(data)), r, nil) if err != nil { return err } - rdr, sizeBytes, err := testCache.Get(kind, hash, int64(len(data))) + rdr, sizeBytes, err := testCache.Get(kind, hash, int64(len(data)), nil) if err != nil { return err } @@ -389,7 +389,7 @@ func TestCacheExistingFiles(t *testing.T) { } // Adding a new file should evict items[0] (the oldest) - err = testCache.Put(cache.CAS, contentsHash, int64(len(contents)), strings.NewReader(contents)) + err = testCache.Put(cache.CAS, contentsHash, int64(len(contents)), strings.NewReader(contents), nil) if err != nil { t.Fatal(err) } @@ -398,7 +398,7 @@ func TestCacheExistingFiles(t *testing.T) { if err != nil { t.Fatal(err) } - found, _ := testCache.Contains(cache.CAS, "f53b46209596d170f7659a414c9ff9f6b545cf77ffd6e1cbe9bcc57e1afacfbd", contentsLength) + found, _ := testCache.Contains(cache.CAS, "f53b46209596d170f7659a414c9ff9f6b545cf77ffd6e1cbe9bcc57e1afacfbd", contentsLength, nil) if found { t.Fatalf("%s should have been evicted", items[0]) } @@ -413,7 +413,7 @@ func TestCacheBlobTooLarge(t *testing.T) { for k := range []cache.EntryKind{cache.AC, cache.RAW} { kind := cache.EntryKind(k) - err := testCache.Put(kind, hashStr("foo"), 10000, strings.NewReader(contents)) + err := testCache.Put(kind, hashStr("foo"), 10000, strings.NewReader(contents), nil) if err == nil { t.Fatal("Expected an error") } @@ -435,14 +435,14 @@ func TestCacheCorruptedCASBlob(t *testing.T) { testCache := New(cacheDir, 1000, nil) err := testCache.Put(cache.CAS, hashStr("foo"), int64(len(contents)), - strings.NewReader(contents)) + strings.NewReader(contents), nil) if err == nil { t.Fatal("expected hash mismatch error") } // We expect the upload to succeed without validation: err = testCache.Put(cache.RAW, hashStr("foo"), int64(len(contents)), - strings.NewReader(contents)) + strings.NewReader(contents), nil) if err != nil { t.Fatal(err) } @@ -481,17 +481,17 @@ func TestMigrateFromOldDirectoryStructure(t *testing.T) { } var found bool - found, _ = testCache.Contains(cache.AC, acHash, 512) + found, _ = testCache.Contains(cache.AC, acHash, 512, nil) if !found { t.Fatalf("Expected cache to contain AC entry '%s'", acHash) } - found, _ = testCache.Contains(cache.CAS, casHash1, 1024) + found, _ = testCache.Contains(cache.CAS, casHash1, 1024, nil) if !found { t.Fatalf("Expected cache to contain CAS entry '%s'", casHash1) } - found, _ = testCache.Contains(cache.CAS, casHash2, 1024) + found, _ = testCache.Contains(cache.CAS, casHash2, 1024, nil) if !found { t.Fatalf("Expected cache to contain CAS entry '%s'", casHash2) } @@ -527,17 +527,17 @@ func TestLoadExistingEntries(t *testing.T) { var found bool - found, _ = testCache.Contains(cache.AC, acHash, blobSize) + found, _ = testCache.Contains(cache.AC, acHash, blobSize, nil) if !found { t.Fatalf("Expected cache to contain AC entry '%s'", acHash) } - found, _ = testCache.Contains(cache.CAS, casHash, blobSize) + found, _ = testCache.Contains(cache.CAS, casHash, blobSize, nil) if !found { t.Fatalf("Expected cache to contain CAS entry '%s'", casHash) } - found, _ = testCache.Contains(cache.RAW, rawHash, blobSize) + found, _ = testCache.Contains(cache.RAW, rawHash, blobSize, nil) if !found { t.Fatalf("Expected cache to contain RAW entry '%s'", rawHash) } @@ -671,7 +671,7 @@ func TestHttpProxyBackend(t *testing.T) { blob, casHash := testutils.RandomDataAndHash(blobSize) // Non-existing item - r, _, err := testCache.Get(cache.CAS, casHash, blobSize) + r, _, err := testCache.Get(cache.CAS, casHash, blobSize, nil) if err != nil { t.Fatal(err) } @@ -684,7 +684,7 @@ func TestHttpProxyBackend(t *testing.T) { } err = testCache.Put(cache.CAS, casHash, int64(len(blob)), - bytes.NewReader(blob)) + bytes.NewReader(blob), nil) if err != nil { t.Fatal(err) } @@ -705,12 +705,12 @@ func TestHttpProxyBackend(t *testing.T) { // Confirm that it does not contain the item we added to the // first testCache and the proxy backend. - found, _ := testCache.Contains(cache.CAS, casHash, blobSize) + found, _ := testCache.Contains(cache.CAS, casHash, blobSize, nil) if found { t.Fatalf("Expected the cache not to contain %s", casHash) } - r, _, err = testCache.Get(cache.CAS, casHash, blobSize) + r, _, err = testCache.Get(cache.CAS, casHash, blobSize, nil) if err != nil { t.Fatal(err) } @@ -721,13 +721,13 @@ func TestHttpProxyBackend(t *testing.T) { // Add the proxy backend and check that we can Get the item. testCache.proxy = proxy - found, _ = testCache.Contains(cache.CAS, casHash, blobSize) + found, _ = testCache.Contains(cache.CAS, casHash, blobSize, nil) if !found { t.Fatalf("Expected the cache to contain %s (via the proxy)", casHash) } - r, fetchedSize, err := testCache.Get(cache.CAS, casHash, blobSize) + r, fetchedSize, err := testCache.Get(cache.CAS, casHash, blobSize, nil) if err != nil { t.Fatal(err) } @@ -772,7 +772,7 @@ func TestGetValidatedActionResult(t *testing.T) { grokHashStr := hex.EncodeToString(grokHash[:]) err = testCache.Put(cache.CAS, grokHashStr, int64(len(grokData)), - bytes.NewReader(grokData)) + bytes.NewReader(grokData), nil) if err != nil { t.Fatal(err) } @@ -782,7 +782,7 @@ func TestGetValidatedActionResult(t *testing.T) { fooHashStr := hex.EncodeToString(fooHash[:]) err = testCache.Put(cache.CAS, fooHashStr, int64(len(fooData)), - bytes.NewReader(fooData)) + bytes.NewReader(fooData), nil) if err != nil { t.Fatal(err) } @@ -814,7 +814,7 @@ func TestGetValidatedActionResult(t *testing.T) { barDataHashStr := hex.EncodeToString(barDataHash[:]) err = testCache.Put(cache.CAS, barDataHashStr, int64(len(barData)), - bytes.NewReader(barData)) + bytes.NewReader(barData), nil) if err != nil { t.Fatal(err) } @@ -839,7 +839,7 @@ func TestGetValidatedActionResult(t *testing.T) { rootDataHashStr := hex.EncodeToString(rootDataHash[:]) err = testCache.Put(cache.CAS, rootDataHashStr, int64(len(rootData)), - bytes.NewReader(rootData)) + bytes.NewReader(rootData), nil) if err != nil { t.Fatal(err) } @@ -856,7 +856,7 @@ func TestGetValidatedActionResult(t *testing.T) { treeDataHashStr := hex.EncodeToString(treeDataHash[:]) err = testCache.Put(cache.CAS, treeDataHashStr, int64(len(treeData)), - bytes.NewReader(treeData)) + bytes.NewReader(treeData), nil) if err != nil { t.Fatal(err) } @@ -898,7 +898,7 @@ func TestGetValidatedActionResult(t *testing.T) { arDataHashStr := hex.EncodeToString(arDataHash[:]) err = testCache.Put(cache.AC, arDataHashStr, int64(len(arData)), - bytes.NewReader(arData)) + bytes.NewReader(arData), nil) if err != nil { t.Fatal(err) } @@ -910,7 +910,7 @@ func TestGetValidatedActionResult(t *testing.T) { // to assume that the value should be returned unchanged by the cache // layer. - rAR, rData, err := testCache.GetValidatedActionResult(arDataHashStr) + rAR, rData, err := testCache.GetValidatedActionResult(arDataHashStr, nil) if err != nil { t.Fatal(err) } diff --git a/cache/httpproxy/httpproxy_test.go b/cache/httpproxy/httpproxy_test.go index 23da27936..302c9c3dc 100644 --- a/cache/httpproxy/httpproxy_test.go +++ b/cache/httpproxy/httpproxy_test.go @@ -107,7 +107,7 @@ func TestEverything(t *testing.T) { // PUT two different values with the same key in ac and cas. - err = diskCache.Put(cache.AC, hash, int64(len(acData)), bytes.NewReader(acData)) + err = diskCache.Put(cache.AC, hash, int64(len(acData)), bytes.NewReader(acData), nil) if err != nil { t.Error(err) } @@ -124,7 +124,7 @@ func TestEverything(t *testing.T) { } s.mu.Unlock() - err = diskCache.Put(cache.CAS, hash, int64(len(casData)), bytes.NewReader(casData)) + err = diskCache.Put(cache.CAS, hash, int64(len(casData)), bytes.NewReader(casData), nil) if err != nil { t.Error(err) } @@ -157,7 +157,7 @@ func TestEverything(t *testing.T) { var found bool var size int64 - found, size = diskCache.Contains(cache.AC, hash, int64(len(acData))) + found, size = diskCache.Contains(cache.AC, hash, int64(len(acData)), nil) if !found { t.Fatalf("Expected to find AC item %s", hash) } @@ -166,7 +166,7 @@ func TestEverything(t *testing.T) { len(acData), size) } - found, size = diskCache.Contains(cache.CAS, hash, int64(len(casData))) + found, size = diskCache.Contains(cache.CAS, hash, int64(len(casData)), nil) if !found { t.Fatalf("Expected to find CAS item %s", hash) } @@ -180,7 +180,7 @@ func TestEverything(t *testing.T) { var data []byte var rc io.ReadCloser - rc, size, err = diskCache.Get(cache.AC, hash, int64(len(acData))) + rc, size, err = diskCache.Get(cache.AC, hash, int64(len(acData)), nil) if err != nil { t.Error(err) } @@ -200,7 +200,7 @@ func TestEverything(t *testing.T) { } rc.Close() - rc, size, err = diskCache.Get(cache.CAS, hash, int64(len(casData))) + rc, size, err = diskCache.Get(cache.CAS, hash, int64(len(casData)), nil) if err != nil { t.Error(err) } @@ -235,7 +235,7 @@ func TestEverything(t *testing.T) { // Confirm that we can HEAD both values successfully. - found, size = diskCache.Contains(cache.AC, hash, int64(len(acData))) + found, size = diskCache.Contains(cache.AC, hash, int64(len(acData)), nil) if !found { t.Fatalf("Expected to find AC item %s", hash) } @@ -244,7 +244,7 @@ func TestEverything(t *testing.T) { len(acData), size) } - found, size = diskCache.Contains(cache.CAS, hash, int64(len(casData))) + found, size = diskCache.Contains(cache.CAS, hash, int64(len(casData)), nil) if !found { t.Fatalf("Expected to find CAS item %s", hash) } @@ -255,7 +255,7 @@ func TestEverything(t *testing.T) { // Confirm that we can GET both values successfully. - rc, size, err = diskCache.Get(cache.AC, hash, int64(len(acData))) + rc, size, err = diskCache.Get(cache.AC, hash, int64(len(acData)), nil) if err != nil { t.Error(err) } @@ -275,7 +275,7 @@ func TestEverything(t *testing.T) { } rc.Close() - rc, size, err = diskCache.Get(cache.CAS, hash, int64(len(casData))) + rc, size, err = diskCache.Get(cache.CAS, hash, int64(len(casData)), nil) if err != nil { t.Error(err) } diff --git a/cache/metricsdecorator/BUILD.bazel b/cache/metricsdecorator/BUILD.bazel new file mode 100644 index 000000000..9f88c253e --- /dev/null +++ b/cache/metricsdecorator/BUILD.bazel @@ -0,0 +1,18 @@ +load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test") + +go_library( + name = "go_default_library", + srcs = [ + "metricsdecorator.go", + ], + importpath = "github.com/buchgr/bazel-remote/cache/metricsdecorator", + visibility = ["//visibility:public"], + deps = [ + "//config:go_default_library", + "//cache:go_default_library", + "@com_github_bazelbuild_remote_apis//build/bazel/remote/execution/v2:go_default_library", + "@com_github_prometheus_client_golang//prometheus:go_default_library", + "@com_github_prometheus_client_golang//prometheus/promauto:go_default_library", + ], +) + diff --git a/cache/metricsdecorator/metricsdecorator.go b/cache/metricsdecorator/metricsdecorator.go new file mode 100644 index 000000000..74f1bfc1e --- /dev/null +++ b/cache/metricsdecorator/metricsdecorator.go @@ -0,0 +1,204 @@ +package metricsdecorator + +// This is a decorator for any implementation of the cache.BlobAcStore interface. +// It adds prometheus metrics for the cache requests. +// +// The decorator can report cache miss if AC is found but referenced CAS entries are missing. +// That is possible since metricsdecorator supports GetValidatedActionResult in the +// cache.BlobAcStore interface. +// +// TODO Consider allow using a metricsdecorator also for pure cache.BlobStore interfaces, +// in order to replace the current prometheus counters in the proxies? That would +// probably require better support for non AC requests in metricsdecorator and configurable +// counter name. +import ( + "github.com/buchgr/bazel-remote/cache" + "github.com/buchgr/bazel-remote/config" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promauto" + "io" + "strings" + + pb "github.com/bazelbuild/remote-apis/build/bazel/remote/execution/v2" +) + +type metrics struct { + categoryValues map[string]map[string]struct{} + counterIncomingReqs *prometheus.CounterVec + parent cache.BlobAcStore +} + +const statusOK = "ok" +const statusNotFound = "notFound" +const statusError = "error" + +const methodGet = "get" +const methodPut = "put" +const methodContains = "contains" + +// TODO add test cases for this file + +func NewMetricsDecorator(config *config.Metrics, parent cache.BlobAcStore) cache.BlobAcStore { + + labels := []string{"method", "status", "kind"} + categoryValues := make(map[string]map[string]struct{}) + + if config != nil && config.Categories != nil { + for categoryName, allowedValues := range config.Categories { + // Normalize to lower case since canonical for gRPC headers + // and convention for prometheus. + categoryName := strings.ToLower(categoryName) + + // Store allowed category values as set for efficient access + allowedValuesSet := make(map[string]struct{}) + for _, categoryValue := range allowedValues { + allowedValuesSet[categoryValue] = struct{}{} + } + categoryValues[categoryName] = allowedValuesSet + + // Construct a prometheus label for each category. + // Prometheus does not allow changing set of + // labels until next time bazel-remote is + // restarted. + labels = append(labels, categoryName) + } + } + + // For now we only count AC requests, and only the most common status codes, + // becuse: + // + // - No identified use case for others. + // - Limit number of prometheus time series (if many configured categories). + // - Reduce performance overhead of counters (if many configured categories). + // + // But the naming, and the labels, of the counter, are generic to allow + // counting additional requests types or status codes in the future. Without + // having to rename the counter and get issues with non continous history of + // metrics. + + counterIncomingReqs := promauto.NewCounterVec( + prometheus.CounterOpts{ + Name: "bazel_remote_incoming_requests_total", + Help: "The number of incoming HTTP and gRPC request. Currently only AC requests", + }, + labels) + + m := &metrics{ + categoryValues: categoryValues, + counterIncomingReqs: counterIncomingReqs, + parent: parent, + } + return m +} + +func (m *metrics) Put(kind cache.EntryKind, hash string, size int64, r io.Reader, context cache.RequestContext) error { + err := m.parent.Put(kind, hash, size, r, context) + + if kind == cache.AC { + var status string + if err != nil { + status = statusError + } else { + status = statusOK + } + m.incrementRequests(kind, methodPut, status, context) + } + + return err +} + +func (m *metrics) Get(kind cache.EntryKind, hash string, size int64, context cache.RequestContext) (io.ReadCloser, int64, error) { + rc, sizeBytes, err := m.parent.Get(kind, hash, size, context) + + if kind == cache.AC { + var status string + if err != nil { + status = statusError + } else if rc == nil { + status = statusNotFound + } else { + status = statusOK + } + m.incrementRequests(kind, methodGet, status, context) + } + + return rc, sizeBytes, err +} + +func (m *metrics) Contains(kind cache.EntryKind, hash string, size int64, context cache.RequestContext) (bool, int64) { + ok, sizeBytes := m.parent.Contains(kind, hash, size, context) + + if kind == cache.AC { + var status string + if ok { + status = statusOK + } else { + status = statusNotFound + } + m.incrementRequests(kind, methodContains, status, context) + } + + return ok, sizeBytes +} + +func (m *metrics) GetValidatedActionResult(hash string, context cache.RequestContext) (*pb.ActionResult, []byte, error) { + ac, data, err := m.parent.GetValidatedActionResult(hash, context) + + var status string + if err != nil { + status = statusError + } else if ac == nil { + status = statusNotFound + } else { + status = statusOK + } + m.incrementRequests(cache.AC, methodGet, status, context) + + return ac, data, err +} + +func getLabelValueFromHeaderValues(headerValues []string, allowedValues map[string]struct{}) string { + if len(headerValues) == 0 { + return "" // No header for this label + } + for _, headerValue := range headerValues { + // Prometheus only allows one value per label. + // Pick the first allowed header value we find. + if _, ok := allowedValues[headerValue]; ok { + return headerValue + } + } + + // The values found in the header has not been listed in + // the configuration file. Represent them as "other". + // + // Listening allowed values is an attempt to avoid polluting + // prometheus with too many different time series. + // + // https://prometheus.io/docs/practices/naming/ warns about: + // + // "CAUTION: Remember that every unique combination of key-value + // label pairs represents a new time series, which can dramatically + // increase the amount of data stored. Do not use labels to store + // dimensions with high cardinality (many different label values), + // such as user IDs, email addresses, or other unbounded sets of + // values." + // + // It would have been nice if bazel-remote could reload the set + // of allowed values from updated configuration file, by + // SIGHUP signal instead of having to restart bazel-remote. + return "other" +} + +func (m *metrics) incrementRequests(kind cache.EntryKind, method string, status string, reqCtx cache.RequestContext) { + labels := make(prometheus.Labels) + labels["method"] = method + labels["status"] = status + labels["kind"] = kind.String() + + for labelName := range m.categoryValues { + labels[labelName] = getLabelValueFromHeaderValues(reqCtx.GetHeader(labelName), m.categoryValues[labelName]) + } + + m.counterIncomingReqs.With(labels).Inc() +} diff --git a/config/config.go b/config/config.go index c9efc2da3..7df906921 100644 --- a/config/config.go +++ b/config/config.go @@ -35,6 +35,11 @@ type HTTPBackendConfig struct { BaseURL string `yaml:"url"` } +// Metrics stores configuration for prometheus metrics. +type Metrics struct { + Categories map[string][]string `yaml:"categories"` +} + // Config holds the top-level configuration for bazel-remote. type Config struct { Host string `yaml:"host"` @@ -55,6 +60,7 @@ type Config struct { DisableGRPCACDepsCheck bool `yaml:"disable_grpc_ac_deps_check"` EnableACKeyInstanceMangling bool `yaml:"enable_ac_key_instance_mangling"` EnableEndpointMetrics bool `yaml:"enable_endpoint_metrics"` + Metrics *Metrics `yaml:"metrics"` ExperimentalRemoteAssetAPI bool `yaml:"experimental_remote_asset_api"` HTTPReadTimeout time.Duration `yaml:"http_read_timeout"` HTTPWriteTimeout time.Duration `yaml:"http_write_timeout"` @@ -73,6 +79,7 @@ func New(dir string, maxSize int, host string, port int, grpcPort int, disableGRPCACDepsCheck bool, enableACKeyInstanceMangling bool, enableEndpointMetrics bool, + metrics *Metrics, experimentalRemoteAssetAPI bool, httpReadTimeout time.Duration, httpWriteTimeout time.Duration) (*Config, error) { @@ -95,6 +102,7 @@ func New(dir string, maxSize int, host string, port int, grpcPort int, DisableGRPCACDepsCheck: disableGRPCACDepsCheck, EnableACKeyInstanceMangling: enableACKeyInstanceMangling, EnableEndpointMetrics: enableEndpointMetrics, + Metrics: metrics, ExperimentalRemoteAssetAPI: experimentalRemoteAssetAPI, HTTPReadTimeout: httpReadTimeout, HTTPWriteTimeout: httpWriteTimeout, diff --git a/main.go b/main.go index 98876863d..96cc292a4 100644 --- a/main.go +++ b/main.go @@ -18,6 +18,7 @@ import ( "github.com/buchgr/bazel-remote/cache/s3proxy" "github.com/buchgr/bazel-remote/cache/httpproxy" + "github.com/buchgr/bazel-remote/cache/metricsdecorator" "github.com/buchgr/bazel-remote/config" "github.com/buchgr/bazel-remote/server" @@ -283,6 +284,7 @@ func main() { ctx.Bool("disable_grpc_ac_deps_check"), ctx.Bool("enable_ac_key_instance_mangling"), ctx.Bool("enable_endpoint_metrics"), + nil, ctx.Bool("experimental_remote_asset_api"), ctx.Duration("http_read_timeout"), ctx.Duration("http_write_timeout"), @@ -335,6 +337,8 @@ func main() { diskCache := disk.New(c.Dir, int64(c.MaxSize)*1024*1024*1024, proxyCache) + casAcCache := metricsdecorator.NewMetricsDecorator(c.Metrics, diskCache) + mux := http.NewServeMux() httpServer := &http.Server{ Addr: c.Host + ":" + strconv.Itoa(c.Port), @@ -344,8 +348,7 @@ func main() { } validateAC := !c.DisableHTTPACValidation - h := server.NewHTTPCache(diskCache, accessLogger, errorLogger, validateAC, c.EnableACKeyInstanceMangling, gitCommit) - + h := server.NewHTTPCache(casAcCache, diskCache, accessLogger, errorLogger, validateAC, c.EnableACKeyInstanceMangling, gitCommit) var htpasswdSecrets auth.SecretProvider cacheHandler := h.CacheHandler if c.HtpasswdFile != "" { @@ -444,7 +447,7 @@ func main() { validateAC, c.EnableACKeyInstanceMangling, enableRemoteAssetAPI, - diskCache, accessLogger, errorLogger) + casAcCache, accessLogger, errorLogger) if err3 != nil { log.Fatal(err3) } diff --git a/server/grpc.go b/server/grpc.go index a89ec78d6..6cdd22285 100644 --- a/server/grpc.go +++ b/server/grpc.go @@ -8,8 +8,10 @@ import ( "google.golang.org/genproto/googleapis/bytestream" "google.golang.org/grpc" + "google.golang.org/grpc/codes" _ "google.golang.org/grpc/encoding/gzip" // Register gzip support. + "google.golang.org/grpc/metadata" "google.golang.org/grpc/status" asset "github.com/bazelbuild/remote-apis/build/bazel/remote/asset/v1" @@ -17,8 +19,6 @@ import ( "github.com/bazelbuild/remote-apis/build/bazel/semver" "github.com/buchgr/bazel-remote/cache" - "github.com/buchgr/bazel-remote/cache/disk" - _ "github.com/mostynb/go-grpc-compression/snappy" // Register snappy _ "github.com/mostynb/go-grpc-compression/zstd" // and zstd support. ) @@ -34,7 +34,7 @@ var ( ) type grpcServer struct { - cache *disk.Cache + cache cache.BlobAcStore accessLogger cache.Logger errorLogger cache.Logger depsCheck bool @@ -48,7 +48,7 @@ func ListenAndServeGRPC(addr string, opts []grpc.ServerOption, validateACDeps bool, mangleACKeys bool, enableRemoteAssetAPI bool, - c *disk.Cache, a cache.Logger, e cache.Logger) error { + c cache.BlobAcStore, a cache.Logger, e cache.Logger) error { listener, err := net.Listen("tcp", addr) if err != nil { @@ -62,7 +62,7 @@ func serveGRPC(l net.Listener, opts []grpc.ServerOption, validateACDepsCheck bool, mangleACKeys bool, enableRemoteAssetAPI bool, - c *disk.Cache, a cache.Logger, e cache.Logger) error { + c cache.BlobAcStore, a cache.Logger, e cache.Logger) error { srv := grpc.NewServer(opts...) s := &grpcServer{ @@ -139,3 +139,23 @@ func (s *grpcServer) validateHash(hash string, size int64, logPrefix string) err return nil } + +type reqCtxGrpc struct { + ctx context.Context +} + +func newReqCtxGrpc(ctx context.Context) *reqCtxGrpc { + rc := &reqCtxGrpc{ + ctx: ctx, + } + return rc +} + +func (h *reqCtxGrpc) GetHeader(headerName string) (headerValues []string) { + headers, _ := metadata.FromIncomingContext(h.ctx) // TODO avoid doing for each header? + if headerValues, ok := headers[headerName]; ok { + return headerValues + } else { + return []string{} + } +} diff --git a/server/grpc_ac.go b/server/grpc_ac.go index a92d91bb0..906a92db0 100644 --- a/server/grpc_ac.go +++ b/server/grpc_ac.go @@ -40,6 +40,8 @@ func (s *grpcServer) GetActionResult(ctx context.Context, logPrefix := "GRPC AC GET" + reqCtx := newReqCtxGrpc(ctx) + if s.mangleACKeys { req.ActionDigest.Hash = cache.TransformActionCacheKey(req.ActionDigest.Hash, req.InstanceName, s.accessLogger) } @@ -56,7 +58,7 @@ func (s *grpcServer) GetActionResult(ctx context.Context, if !s.depsCheck { logPrefix = "GRPC AC GET NODEPSCHECK" - rdr, sizeBytes, err := s.cache.Get(cache.AC, req.ActionDigest.Hash, unknownActionResultSize) + rdr, sizeBytes, err := s.cache.Get(cache.AC, req.ActionDigest.Hash, unknownActionResultSize, reqCtx) if err != nil { s.accessLogger.Printf("%s %s %s", logPrefix, req.ActionDigest.Hash, err) return nil, status.Error(codes.Unknown, err.Error()) @@ -85,7 +87,7 @@ func (s *grpcServer) GetActionResult(ctx context.Context, return result, nil } - result, _, err := s.cache.GetValidatedActionResult(req.ActionDigest.Hash) + result, _, err := s.cache.GetValidatedActionResult(req.ActionDigest.Hash, reqCtx) if err != nil { s.accessLogger.Printf("%s %s %s", logPrefix, req.ActionDigest.Hash, err) return nil, status.Error(codes.Unknown, err.Error()) @@ -102,14 +104,14 @@ func (s *grpcServer) GetActionResult(ctx context.Context, var inlinedSoFar int64 err = s.maybeInline(req.InlineStdout, - &result.StdoutRaw, &result.StdoutDigest, &inlinedSoFar) + &result.StdoutRaw, &result.StdoutDigest, &inlinedSoFar, reqCtx) if err != nil { s.accessLogger.Printf("%s %s %s", logPrefix, req.ActionDigest.Hash, err) return nil, status.Error(codes.Unknown, err.Error()) } err = s.maybeInline(req.InlineStderr, - &result.StderrRaw, &result.StderrDigest, &inlinedSoFar) + &result.StderrRaw, &result.StderrDigest, &inlinedSoFar, reqCtx) if err != nil { s.accessLogger.Printf("%s %s %s", logPrefix, req.ActionDigest.Hash, err) return nil, status.Error(codes.Unknown, err.Error()) @@ -121,7 +123,7 @@ func (s *grpcServer) GetActionResult(ctx context.Context, } for _, of := range result.GetOutputFiles() { _, ok := inlinableFiles[of.Path] - err = s.maybeInline(ok, &of.Contents, &of.Digest, &inlinedSoFar) + err = s.maybeInline(ok, &of.Contents, &of.Digest, &inlinedSoFar, reqCtx) if err != nil { s.accessLogger.Printf("%s %s %s", logPrefix, req.ActionDigest.Hash, err) return nil, status.Error(codes.Unknown, err.Error()) @@ -133,7 +135,7 @@ func (s *grpcServer) GetActionResult(ctx context.Context, return result, nil } -func (s *grpcServer) maybeInline(inline bool, slice *[]byte, digest **pb.Digest, inlinedSoFar *int64) error { +func (s *grpcServer) maybeInline(inline bool, slice *[]byte, digest **pb.Digest, inlinedSoFar *int64, reqCtx *reqCtxGrpc) error { if (*inlinedSoFar + int64(len(*slice))) > maxInlineSize { inline = false @@ -155,10 +157,10 @@ func (s *grpcServer) maybeInline(inline bool, slice *[]byte, digest **pb.Digest, } } - found, _ := s.cache.Contains(cache.CAS, (*digest).Hash, (*digest).SizeBytes) + found, _ := s.cache.Contains(cache.CAS, (*digest).Hash, (*digest).SizeBytes, reqCtx) if !found { err := s.cache.Put(cache.CAS, (*digest).Hash, (*digest).SizeBytes, - bytes.NewReader(*slice)) + bytes.NewReader(*slice), reqCtx) if err != nil { return err } @@ -179,7 +181,7 @@ func (s *grpcServer) maybeInline(inline bool, slice *[]byte, digest **pb.Digest, // Otherwise, attempt to inline. if (*digest).SizeBytes > 0 { - data, err := s.getBlobData((*digest).Hash, (*digest).SizeBytes) + data, err := s.getBlobData((*digest).Hash, (*digest).SizeBytes, reqCtx) if err != nil { return err } @@ -193,6 +195,8 @@ func (s *grpcServer) maybeInline(inline bool, slice *[]byte, digest **pb.Digest, func (s *grpcServer) UpdateActionResult(ctx context.Context, req *pb.UpdateActionResultRequest) (*pb.ActionResult, error) { + var reqCtx cache.RequestContext + logPrefix := "GRPC AC PUT" err := s.validateHash(req.ActionDigest.Hash, req.ActionDigest.SizeBytes, logPrefix) if err != nil { @@ -215,7 +219,7 @@ func (s *grpcServer) UpdateActionResult(ctx context.Context, } err = s.cache.Put(cache.AC, req.ActionDigest.Hash, - int64(len(data)), bytes.NewReader(data)) + int64(len(data)), bytes.NewReader(data), reqCtx) if err != nil { s.accessLogger.Printf("%s %s %s", logPrefix, req.ActionDigest.Hash, err) return nil, status.Error(codes.Internal, err.Error()) @@ -238,7 +242,7 @@ func (s *grpcServer) UpdateActionResult(ctx context.Context, } err = s.cache.Put(cache.CAS, f.Digest.Hash, - f.Digest.SizeBytes, bytes.NewReader(f.Contents)) + f.Digest.SizeBytes, bytes.NewReader(f.Contents), reqCtx) if err != nil { s.accessLogger.Printf("%s %s %s", logPrefix, req.ActionDigest.Hash, err) @@ -260,7 +264,7 @@ func (s *grpcServer) UpdateActionResult(ctx context.Context, } err = s.cache.Put(cache.CAS, hash, sizeBytes, - bytes.NewReader(req.ActionResult.StdoutRaw)) + bytes.NewReader(req.ActionResult.StdoutRaw), reqCtx) if err != nil { s.accessLogger.Printf("%s %s %s", logPrefix, req.ActionDigest.Hash, err) @@ -281,7 +285,7 @@ func (s *grpcServer) UpdateActionResult(ctx context.Context, } err = s.cache.Put(cache.CAS, hash, sizeBytes, - bytes.NewReader(req.ActionResult.StderrRaw)) + bytes.NewReader(req.ActionResult.StderrRaw), reqCtx) if err != nil { s.accessLogger.Printf("%s %s %s", logPrefix, req.ActionDigest.Hash, err) diff --git a/server/grpc_asset.go b/server/grpc_asset.go index 09351f764..d1153ea18 100644 --- a/server/grpc_asset.go +++ b/server/grpc_asset.go @@ -48,6 +48,8 @@ func (s *grpcServer) FetchBlob(ctx context.Context, req *asset.FetchBlobRequest) // key -> CAS sha256 + timestamp // Should we place a limit on the size of the index? + reqCtx := newReqCtxGrpc(ctx) + for _, q := range req.GetQualifiers() { if q.Name == "checksum.sri" && strings.HasPrefix(q.Value, "sha256-") { // Ref: https://developer.mozilla.org/en-US/docs/Web/Security/Subresource_Integrity @@ -63,14 +65,14 @@ func (s *grpcServer) FetchBlob(ctx context.Context, req *asset.FetchBlobRequest) sha256Str = hex.EncodeToString(decoded) - found, size := s.cache.Contains(cache.CAS, sha256Str, -1) + found, size := s.cache.Contains(cache.CAS, sha256Str, -1, reqCtx) if !found { continue } if size < 0 { // We don't know the size yet (bad http backend?). - r, size, err := s.cache.Get(cache.CAS, sha256Str, -1) + r, size, err := s.cache.Get(cache.CAS, sha256Str, -1, reqCtx) if r != nil { defer r.Close() } @@ -94,7 +96,7 @@ func (s *grpcServer) FetchBlob(ctx context.Context, req *asset.FetchBlobRequest) // Cache miss. See if we can download one of the URIs. for _, uri := range req.GetUris() { - ok, actualHash, size := s.fetchItem(uri, sha256Str) + ok, actualHash, size := s.fetchItem(uri, sha256Str, reqCtx) if ok { return &asset.FetchBlobResponse{ Status: &status.Status{Code: int32(codes.OK)}, @@ -114,7 +116,7 @@ func (s *grpcServer) FetchBlob(ctx context.Context, req *asset.FetchBlobRequest) }, nil } -func (s *grpcServer) fetchItem(uri string, expectedHash string) (bool, string, int64) { +func (s *grpcServer) fetchItem(uri string, expectedHash string, reqCtx *reqCtxGrpc) (bool, string, int64) { u, err := url.Parse(uri) if err != nil { s.errorLogger.Printf("unable to parse URI: %s err: %v", uri, err) @@ -163,7 +165,7 @@ func (s *grpcServer) fetchItem(uri string, expectedHash string) (bool, string, i rc = ioutil.NopCloser(bytes.NewReader(data)) } - err = s.cache.Put(cache.CAS, expectedHash, expectedSize, rc) + err = s.cache.Put(cache.CAS, expectedHash, expectedSize, rc, reqCtx) if err != nil { s.errorLogger.Printf("failed to Put %s: %v", expectedHash, err) return false, "", int64(-1) diff --git a/server/grpc_bytestream.go b/server/grpc_bytestream.go index b28fe9b13..a2683f8f4 100644 --- a/server/grpc_bytestream.go +++ b/server/grpc_bytestream.go @@ -94,7 +94,7 @@ func (s *grpcServer) Read(req *bytestream.ReadRequest, return status.Error(codes.OutOfRange, msg) } - rdr, sizeBytes, err := s.cache.Get(cache.CAS, hash, size) + rdr, sizeBytes, err := s.cache.Get(cache.CAS, hash, size, newReqCtxGrpc(resp.Context())) if err != nil { msg := fmt.Sprintf("GRPC BYTESTREAM READ FAILED: %v", err) s.accessLogger.Printf(msg) @@ -237,6 +237,8 @@ func (s *grpcServer) Write(srv bytestream.ByteStream_WriteServer) error { recvResult := make(chan error) resourceNameChan := make(chan string, 1) + reqCtx := newReqCtxGrpc(srv.Context()) + go func() { firstIteration := true var resourceName string @@ -279,7 +281,7 @@ func (s *grpcServer) Write(srv bytestream.ByteStream_WriteServer) error { return } - exists, _ := s.cache.Contains(cache.CAS, hash, size) + exists, _ := s.cache.Contains(cache.CAS, hash, size, reqCtx) if exists { // Blob already exists, return without writing anything. resp.CommittedSize = size @@ -295,7 +297,7 @@ func (s *grpcServer) Write(srv bytestream.ByteStream_WriteServer) error { } go func() { - putResult <- s.cache.Put(cache.CAS, hash, size, pr) + putResult <- s.cache.Put(cache.CAS, hash, size, pr, reqCtx) }() firstIteration = false diff --git a/server/grpc_cas.go b/server/grpc_cas.go index 7e9dcc0c6..b8e5f7a2d 100644 --- a/server/grpc_cas.go +++ b/server/grpc_cas.go @@ -37,7 +37,7 @@ func (s *grpcServer) FindMissingBlobs(ctx context.Context, return nil, err } - found, _ := s.cache.Contains(cache.CAS, hash, digest.GetSizeBytes()) + found, _ := s.cache.Contains(cache.CAS, hash, digest.GetSizeBytes(), newReqCtxGrpc(ctx)) if !found { s.accessLogger.Printf("GRPC CAS HEAD %s NOT FOUND", hash) resp.MissingBlobDigests = append(resp.MissingBlobDigests, digest) @@ -75,7 +75,7 @@ func (s *grpcServer) BatchUpdateBlobs(ctx context.Context, resp.Responses = append(resp.Responses, &rr) err = s.cache.Put(cache.CAS, req.Digest.Hash, - int64(len(req.Data)), bytes.NewReader(req.Data)) + int64(len(req.Data)), bytes.NewReader(req.Data), newReqCtxGrpc(ctx)) if err != nil { s.errorLogger.Printf("%s %s %s", errorPrefix, req.Digest.Hash, err) rr.Status.Code = int32(code.Code_UNKNOWN) @@ -91,7 +91,7 @@ func (s *grpcServer) BatchUpdateBlobs(ctx context.Context, // Return the data for a blob, or an error. If the blob was not // found, the returned error is errBlobNotFound. Only use this // function when it's OK to buffer the entire blob in memory. -func (s *grpcServer) getBlobData(hash string, size int64) ([]byte, error) { +func (s *grpcServer) getBlobData(hash string, size int64, reqCtx *reqCtxGrpc) ([]byte, error) { if size < 0 { return []byte{}, errBadSize } @@ -100,7 +100,7 @@ func (s *grpcServer) getBlobData(hash string, size int64) ([]byte, error) { return []byte{}, nil } - rdr, sizeBytes, err := s.cache.Get(cache.CAS, hash, size) + rdr, sizeBytes, err := s.cache.Get(cache.CAS, hash, size, reqCtx) if err != nil { rdr.Close() return []byte{}, err @@ -124,10 +124,10 @@ func (s *grpcServer) getBlobData(hash string, size int64) ([]byte, error) { return data, rdr.Close() } -func (s *grpcServer) getBlobResponse(digest *pb.Digest) *pb.BatchReadBlobsResponse_Response { +func (s *grpcServer) getBlobResponse(digest *pb.Digest, reqCtx *reqCtxGrpc) *pb.BatchReadBlobsResponse_Response { r := pb.BatchReadBlobsResponse_Response{Digest: digest} - data, err := s.getBlobData(digest.Hash, digest.SizeBytes) + data, err := s.getBlobData(digest.Hash, digest.SizeBytes, reqCtx) if err == errBlobNotFound { s.accessLogger.Printf("GRPC CAS GET %s NOT FOUND", digest.Hash) r.Status = &status.Status{Code: int32(code.Code_NOT_FOUND)} @@ -163,7 +163,7 @@ func (s *grpcServer) BatchReadBlobs(ctx context.Context, if err != nil { return nil, err } - resp.Responses = append(resp.Responses, s.getBlobResponse(digest)) + resp.Responses = append(resp.Responses, s.getBlobResponse(digest, newReqCtxGrpc(ctx))) } return &resp, nil @@ -172,6 +172,8 @@ func (s *grpcServer) BatchReadBlobs(ctx context.Context, func (s *grpcServer) GetTree(in *pb.GetTreeRequest, stream pb.ContentAddressableStorage_GetTreeServer) error { + reqCtx := newReqCtxGrpc(stream.Context()) + resp := pb.GetTreeResponse{ Directories: make([]*pb.Directory, 0), } @@ -181,7 +183,7 @@ func (s *grpcServer) GetTree(in *pb.GetTreeRequest, return err } - data, err := s.getBlobData(in.RootDigest.Hash, in.RootDigest.SizeBytes) + data, err := s.getBlobData(in.RootDigest.Hash, in.RootDigest.SizeBytes, reqCtx) if err == errBlobNotFound { s.accessLogger.Printf("GRPC CAS GETTREEREQUEST %s NOT FOUND", in.RootDigest.Hash) @@ -199,7 +201,7 @@ func (s *grpcServer) GetTree(in *pb.GetTreeRequest, return grpc_status.Error(codes.DataLoss, err.Error()) } - err = s.fillDirectories(&resp, &dir, errorPrefix) + err = s.fillDirectories(&resp, &dir, errorPrefix, reqCtx) if err != nil { return err } @@ -214,7 +216,7 @@ func (s *grpcServer) GetTree(in *pb.GetTreeRequest, // Attempt to populate `resp`. Return errors for invalid requests, but // otherwise attempt to return as many blobs as possible. -func (s *grpcServer) fillDirectories(resp *pb.GetTreeResponse, dir *pb.Directory, errorPrefix string) error { +func (s *grpcServer) fillDirectories(resp *pb.GetTreeResponse, dir *pb.Directory, errorPrefix string, reqCtx *reqCtxGrpc) error { // Add this dir. resp.Directories = append(resp.Directories, dir) @@ -227,7 +229,7 @@ func (s *grpcServer) fillDirectories(resp *pb.GetTreeResponse, dir *pb.Directory return err } - data, err := s.getBlobData(dirNode.Digest.Hash, dirNode.Digest.SizeBytes) + data, err := s.getBlobData(dirNode.Digest.Hash, dirNode.Digest.SizeBytes, reqCtx) if err == errBlobNotFound { s.accessLogger.Printf("GRPC GETTREEREQUEST BLOB %s NOT FOUND", dirNode.Digest.Hash) @@ -248,7 +250,7 @@ func (s *grpcServer) fillDirectories(resp *pb.GetTreeResponse, dir *pb.Directory s.accessLogger.Printf("GRPC GETTREEREQUEST BLOB %s ADDED OK", dirNode.Digest.Hash) - err = s.fillDirectories(resp, &dirMsg, errorPrefix) + err = s.fillDirectories(resp, &dirMsg, errorPrefix, reqCtx) if err != nil { return err } diff --git a/server/grpc_test.go b/server/grpc_test.go index 89eff7b9c..da52fa304 100644 --- a/server/grpc_test.go +++ b/server/grpc_test.go @@ -575,7 +575,7 @@ func TestGrpcByteStreamDeadline(t *testing.T) { t.Fatal(err) } - _, sz, err := diskCache.Get(cache.CAS, testBlobHash, testBlobSize) + _, sz, err := diskCache.Get(cache.CAS, testBlobHash, testBlobSize, nil) if err != nil { t.Fatalf("get error: %v\n", err) } diff --git a/server/http.go b/server/http.go index 8e336a742..8f8575d84 100644 --- a/server/http.go +++ b/server/http.go @@ -16,7 +16,6 @@ import ( pb "github.com/bazelbuild/remote-apis/build/bazel/remote/execution/v2" "github.com/buchgr/bazel-remote/cache" - "github.com/buchgr/bazel-remote/cache/disk" "github.com/golang/protobuf/jsonpb" "github.com/golang/protobuf/proto" ) @@ -30,7 +29,8 @@ type HTTPCache interface { } type httpCache struct { - cache *disk.Cache + cache cache.BlobAcStore + stats cache.Stats accessLogger cache.Logger errorLogger cache.Logger validateAC bool @@ -51,14 +51,14 @@ type statusPageData struct { // accessLogger will print one line for each HTTP request to stdout. // errorLogger will print unexpected server errors. Inexistent files and malformed URLs will not // be reported. -func NewHTTPCache(cache *disk.Cache, accessLogger cache.Logger, errorLogger cache.Logger, validateAC bool, mangleACKeys bool, commit string) HTTPCache { - - _, _, numItems := cache.Stats() +func NewHTTPCache(cache cache.BlobAcStore, stats cache.Stats, accessLogger cache.Logger, errorLogger cache.Logger, validateAC bool, mangleACKeys bool, commit string) HTTPCache { + _, _, numItems := stats.Stats() errorLogger.Printf("Loaded %d existing disk cache items.", numItems) hc := &httpCache{ cache: cache, + stats: stats, accessLogger: accessLogger, errorLogger: errorLogger, validateAC: validateAC, @@ -102,8 +102,8 @@ func parseRequestURL(url string, validateAC bool) (kind cache.EntryKind, hash st return cache.RAW, hash, instance, nil } -func (h *httpCache) handleContainsValidAC(w http.ResponseWriter, r *http.Request, hash string) { - _, data, err := h.cache.GetValidatedActionResult(hash) +func (h *httpCache) handleContainsValidAC(w http.ResponseWriter, r *http.Request, hash string, reqContext cache.RequestContext) { + _, data, err := h.cache.GetValidatedActionResult(hash, reqContext) if err != nil { http.Error(w, "Not found", http.StatusNotFound) h.logResponse(http.StatusNotFound, r) @@ -121,8 +121,8 @@ func (h *httpCache) handleContainsValidAC(w http.ResponseWriter, r *http.Request h.logResponse(http.StatusOK, r) } -func (h *httpCache) handleGetValidAC(w http.ResponseWriter, r *http.Request, hash string) { - _, data, err := h.cache.GetValidatedActionResult(hash) +func (h *httpCache) handleGetValidAC(w http.ResponseWriter, r *http.Request, hash string, reqContext cache.RequestContext) { + _, data, err := h.cache.GetValidatedActionResult(hash, reqContext) if err != nil { http.Error(w, "Not found", http.StatusNotFound) h.logResponse(http.StatusNotFound, r) @@ -151,6 +151,7 @@ func (h *httpCache) handleGetValidAC(w http.ResponseWriter, r *http.Request, has return } + h.logResponse(http.StatusOK, r) return } @@ -167,6 +168,8 @@ func (h *httpCache) handleGetValidAC(w http.ResponseWriter, r *http.Request, has h.logResponse(http.StatusInternalServerError, r) return } + + h.logResponse(http.StatusOK, r) } // Helper function for logging responses @@ -184,6 +187,8 @@ func (h *httpCache) logResponse(code int, r *http.Request) { func (h *httpCache) CacheHandler(w http.ResponseWriter, r *http.Request) { defer r.Body.Close() + reqContext := newReqContextHttp(r) + kind, hash, instance, err := parseRequestURL(r.URL.Path, h.validateAC) if err != nil { http.Error(w, err.Error(), http.StatusBadRequest) @@ -199,11 +204,11 @@ func (h *httpCache) CacheHandler(w http.ResponseWriter, r *http.Request) { case http.MethodGet: if h.validateAC && kind == cache.AC { - h.handleGetValidAC(w, r, hash) + h.handleGetValidAC(w, r, hash, reqContext) return } - rdr, sizeBytes, err := h.cache.Get(kind, hash, -1) + rdr, sizeBytes, err := h.cache.Get(kind, hash, -1, reqContext) if err != nil { if e, ok := err.(*cache.Error); ok { http.Error(w, e.Error(), e.Code) @@ -280,7 +285,7 @@ func (h *httpCache) CacheHandler(w http.ResponseWriter, r *http.Request) { rc = ioutil.NopCloser(bytes.NewReader(data)) } - err := h.cache.Put(kind, hash, contentLength, rc) + err := h.cache.Put(kind, hash, contentLength, rc, reqContext) if err != nil { if cerr, ok := err.(*cache.Error); ok { http.Error(w, err.Error(), cerr.Code) @@ -295,19 +300,18 @@ func (h *httpCache) CacheHandler(w http.ResponseWriter, r *http.Request) { case http.MethodHead: if h.validateAC && kind == cache.AC { - h.handleContainsValidAC(w, r, hash) + h.handleContainsValidAC(w, r, hash, reqContext) return } // Unvalidated path: - ok, size := h.cache.Contains(kind, hash, -1) + ok, size := h.cache.Contains(kind, hash, -1, reqContext) if !ok { http.Error(w, "Not found", http.StatusNotFound) h.logResponse(http.StatusNotFound, r) return } - w.Header().Set("Content-Length", strconv.FormatInt(size, 10)) w.WriteHeader(http.StatusOK) h.logResponse(http.StatusOK, r) @@ -358,13 +362,13 @@ func addWorkerMetadataHTTP(addr string, ct string, orig []byte) (data []byte, co func (h *httpCache) StatusPageHandler(w http.ResponseWriter, r *http.Request) { defer r.Body.Close() - totalSize, reservedSize, numItems := h.cache.Stats() + totalSize, reservedSize, numItems := h.stats.Stats() w.Header().Set("Content-Type", "application/json") enc := json.NewEncoder(w) enc.SetIndent("", " ") enc.Encode(statusPageData{ - MaxSize: h.cache.MaxSize(), + MaxSize: h.stats.MaxSize(), CurrSize: totalSize, ReservedSize: reservedSize, NumFiles: numItems, @@ -376,3 +380,23 @@ func (h *httpCache) StatusPageHandler(w http.ResponseWriter, r *http.Request) { func path(kind cache.EntryKind, hash string) string { return fmt.Sprintf("/%s/%s", kind, hash) } + +type reqCtxHttp struct { + request *http.Request +} + +func newReqContextHttp(request *http.Request) *reqCtxHttp { + rc := &reqCtxHttp{ + request: request, + } + return rc +} + +func (h *reqCtxHttp) GetHeader(headerName string) (headerValues []string) { + headerName = strings.Title(headerName) + if headerValues, ok := h.request.Header[headerName]; ok { + return headerValues + } else { + return []string{} + } +} diff --git a/server/http_test.go b/server/http_test.go index 104b79093..524585af7 100644 --- a/server/http_test.go +++ b/server/http_test.go @@ -36,8 +36,7 @@ func TestDownloadFile(t *testing.T) { } c := disk.New(cacheDir, blobSize, nil) - h := NewHTTPCache(c, testutils.NewSilentLogger(), testutils.NewSilentLogger(), true, false, "") - + h := NewHTTPCache(c, c, testutils.NewSilentLogger(), testutils.NewSilentLogger(), true, false, "") req, err := http.NewRequest("GET", "/cas/"+hash, bytes.NewReader([]byte{})) if err != nil { t.Fatal(err) @@ -99,7 +98,7 @@ func TestUploadFilesConcurrently(t *testing.T) { } c := disk.New(cacheDir, 1000*1024, nil) - h := NewHTTPCache(c, testutils.NewSilentLogger(), testutils.NewSilentLogger(), true, false, "") + h := NewHTTPCache(c, c, testutils.NewSilentLogger(), testutils.NewSilentLogger(), true, false, "") handler := http.HandlerFunc(h.CacheHandler) var wg sync.WaitGroup @@ -157,7 +156,7 @@ func TestUploadSameFileConcurrently(t *testing.T) { numWorkers := 100 c := disk.New(cacheDir, int64(len(data)*numWorkers), nil) - h := NewHTTPCache(c, testutils.NewSilentLogger(), testutils.NewSilentLogger(), true, false, "") + h := NewHTTPCache(c, c, testutils.NewSilentLogger(), testutils.NewSilentLogger(), true, false, "") handler := http.HandlerFunc(h.CacheHandler) var wg sync.WaitGroup @@ -203,7 +202,7 @@ func TestUploadCorruptedFile(t *testing.T) { } c := disk.New(cacheDir, 2048, nil) - h := NewHTTPCache(c, testutils.NewSilentLogger(), testutils.NewSilentLogger(), true, false, "") + h := NewHTTPCache(c, c, testutils.NewSilentLogger(), testutils.NewSilentLogger(), true, false, "") rr := httptest.NewRecorder() handler := http.HandlerFunc(h.CacheHandler) handler.ServeHTTP(rr, r) @@ -245,7 +244,7 @@ func TestUploadEmptyActionResult(t *testing.T) { c := disk.New(cacheDir, 2048, nil) validate := true mangle := false - h := NewHTTPCache(c, testutils.NewSilentLogger(), testutils.NewSilentLogger(), validate, mangle, "") + h := NewHTTPCache(c, c, testutils.NewSilentLogger(), testutils.NewSilentLogger(), validate, mangle, "") rr := httptest.NewRecorder() handler := http.HandlerFunc(h.CacheHandler) handler.ServeHTTP(rr, r) @@ -302,7 +301,7 @@ func testEmptyBlobAvailable(t *testing.T, method string) { c := disk.New(cacheDir, 2048, nil) validate := true mangle := false - h := NewHTTPCache(c, testutils.NewSilentLogger(), testutils.NewSilentLogger(), validate, mangle, "") + h := NewHTTPCache(c, c, testutils.NewSilentLogger(), testutils.NewSilentLogger(), validate, mangle, "") rr := httptest.NewRecorder() handler := http.HandlerFunc(h.CacheHandler) handler.ServeHTTP(rr, r) @@ -325,7 +324,7 @@ func TestStatusPage(t *testing.T) { } c := disk.New(cacheDir, 2048, nil) - h := NewHTTPCache(c, testutils.NewSilentLogger(), testutils.NewSilentLogger(), true, false, "") + h := NewHTTPCache(c, c, testutils.NewSilentLogger(), testutils.NewSilentLogger(), true, false, "") rr := httptest.NewRecorder() handler := http.HandlerFunc(h.StatusPageHandler) handler.ServeHTTP(rr, r) @@ -466,7 +465,7 @@ func TestRemoteReturnsNotFound(t *testing.T) { defer os.RemoveAll(cacheDir) emptyCache := disk.New(cacheDir, 1024, nil) - h := NewHTTPCache(emptyCache, testutils.NewSilentLogger(), testutils.NewSilentLogger(), true, false, "") + h := NewHTTPCache(emptyCache, emptyCache, testutils.NewSilentLogger(), testutils.NewSilentLogger(), true, false, "") // create a fake http.Request _, hash := testutils.RandomDataAndHash(1024) url, _ := url.Parse(fmt.Sprintf("http://localhost:8080/ac/%s", hash))