Skip to content
Merged
Show file tree
Hide file tree
Changes from 24 commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
81623bc
CaptureResponseWriter
jranson Dec 16, 2025
1fecafe
move Target to own file, add HealthyTargets()
jranson Dec 16, 2025
917bf95
refactor tsm
jranson Dec 16, 2025
bc04b10
bugfix: regression - expand '*' path wildcard
jranson Dec 16, 2025
f710769
restore "stack" field to all log entries
jranson Dec 16, 2025
786f747
refactor cache name/id consts
jranson Dec 16, 2025
2783088
cleanup healthcheck
jranson Dec 16, 2025
9ac1329
refactor resources
jranson Dec 16, 2025
c084889
refactor first response / first good response
jranson Dec 16, 2025
6c5eede
add Pool() to rr mech
jranson Dec 16, 2025
4276722
refactor newest last modified
jranson Dec 16, 2025
c5d6ec2
add Pool() to ur mech
jranson Dec 16, 2025
681d87d
remove unused struct
jranson Dec 16, 2025
fe77a15
linter
jranson Dec 16, 2025
fc3351e
fix tests
jranson Dec 16, 2025
33fcb43
merge vectors
jranson Dec 16, 2025
1957dc3
add nil check
jranson Dec 16, 2025
d0c9791
Merge branch 'main' into jr/resource-consumption
jranson Dec 16, 2025
3557160
wording
jranson Dec 16, 2025
88c3bdb
cleanup comments
jranson Dec 17, 2025
c50c26b
respect timeout configs
jranson Dec 17, 2025
a8f1c75
update timeout defaults
jranson Dec 17, 2025
303867b
fix typo
jranson Dec 17, 2025
bcd26af
remove redundant responseWriter
jranson Dec 17, 2025
aa90f1c
simplify status aggregation
jranson Dec 17, 2025
c959edd
avoid data race
jranson Dec 17, 2025
14994d7
remove unnecessary cancels
jranson Dec 17, 2025
5e8db07
remove unnecessary cancels
jranson Dec 17, 2025
1234ef3
fix stray comment
jranson Dec 17, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions examples/conf/example.full.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -300,12 +300,12 @@ backends:
# compressable_types:
# - text/javascript, text/css, text/plain, text/xml, text/json, application/json, application/javascript, application/xml ]

# # timeout defines how long Trickster will wait before aborting and upstream http request. Default: 180s
# timeout: 180s
# # timeout defines how long Trickster will wait before aborting an upstream http request. Default: 60s
# timeout: 60s

# # keep_alive_timeout defines how long Trickster will wait before closing a keep-alive connection due to inactivity
# # if the origins keep-alive timeout is shorter than Tricksters, the connect will be closed sooner. Default: 300
# keep_alive_timeout: 5m
# # if the origins keep-alive timeout is shorter than Tricksters, the connect will be closed sooner. Default: 120s
# keep_alive_timeout: 120s

# # max_idle_conns set the maximum concurrent keep-alive connections Trickster may have opened to this backend
# # additional requests will be queued. Default: 20
Expand Down
4 changes: 2 additions & 2 deletions pkg/backends/alb/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@ func (c *Client) ValidateAndStartPool(clients backends.Backends, hcs healthcheck
if o.MechanismName == names.MechanismUR && o.UserRouter != nil {
return c.validateAndStartUserRouter(clients)
}
targets := make([]*pool.Target, 0, len(o.Pool))
targets := make(pool.Targets, 0, len(o.Pool))
for _, n := range o.Pool {
tc, ok := clients[n]
if !ok {
Expand All @@ -166,7 +166,7 @@ func (c *Client) ValidateAndStartPool(clients backends.Backends, hcs healthcheck
if !ok {
continue // virtual backends (rule, alb) don't currently have health checks
}
targets = append(targets, pool.NewTarget(tc.Router(), hc))
targets = append(targets, pool.NewTarget(tc.Router(), hc, tc))
}
if c.handler != nil {
c.handler.SetPool(pool.New(targets, o.HealthyFloor))
Expand Down
125 changes: 72 additions & 53 deletions pkg/backends/alb/mech/fr/first_response.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,10 @@
package fr

import (
"context"
"net/http"
"sync"
"sync/atomic"

"github.com/trickstercache/trickster/v2/pkg/backends/alb/mech/types"
"github.com/trickstercache/trickster/v2/pkg/backends/alb/names"
Expand All @@ -28,6 +30,7 @@ import (
"github.com/trickstercache/trickster/v2/pkg/proxy/handlers/trickster/failures"
"github.com/trickstercache/trickster/v2/pkg/proxy/headers"
"github.com/trickstercache/trickster/v2/pkg/proxy/request"
"github.com/trickstercache/trickster/v2/pkg/proxy/response/capture"
"github.com/trickstercache/trickster/v2/pkg/util/sets"
)

Expand Down Expand Up @@ -68,6 +71,10 @@ func (h *handler) SetPool(p pool.Pool) {
h.pool = p
}

func (h *handler) Pool() pool.Pool {
return h.pool
}

func (h *handler) ID() types.ID {
if h.fgr {
return FGRID
Expand Down Expand Up @@ -101,67 +108,79 @@ func (h *handler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
return
}
// otherwise iterate the fanout
wc := newResponderClaim(l)
var claimed int64 = -1
contexts := make([]context.Context, l)
cancels := make([]context.CancelFunc, l)
for i := range l {
contexts[i], cancels[i] = context.WithCancel(r.Context())
}
captures := make([]*capture.CaptureResponseWriter, l)
var wg sync.WaitGroup
responseWritten := make(chan struct{}, 1)

serveAndCancelOthers := func(i int, crw *capture.CaptureResponseWriter) {
go func() {
// cancels all other contexts
for j, cancel := range cancels {
if j != i {
cancel()
}
}
}()
headers.Merge(w.Header(), crw.Header())
w.WriteHeader(crw.StatusCode())
w.Write(crw.Body())
responseWritten <- struct{}{}
}

// fanout to all healthy targets
for i := range l {
// only the one of these i fanouts to respond will be mapped back to the
// end user based on the methodology and the rest will have their
// contexts canceled
if hl[i] == nil {
continue
}
wg.Go(func() {
if hl[i] == nil {
return
}
wm := newFirstResponseGate(w, wc, i, h.fgr)
r2, _ := request.Clone(r)
r2 = r2.WithContext(wc.contexts[i])
hl[i].ServeHTTP(wm, r2)
r2 = r2.WithContext(contexts[i])
r2 = request.SetResources(r2, &request.Resources{Cancelable: true})
crw := capture.NewCaptureResponseWriter()
captures[i] = crw
hl[i].ServeHTTP(crw, r2)
statusCode := crw.StatusCode()
custom := h.fgr && len(h.fgrCodes) > 0
isGood := custom && h.fgrCodes.Contains(statusCode)
// this checks if the response qualifies as a client response
if (!h.fgr || (!custom && statusCode < 400) || isGood) &&
// this checks that the qualifying response is the first response
atomic.CompareAndSwapInt64(&claimed, -1, int64(i)) {
// this serves only the first qualifying response
serveAndCancelOthers(i, crw)
// this signals the response is written
}
})
}
wg.Wait()
}

type firstResponseGate struct {
http.ResponseWriter
i int
fh http.Header
c *responderClaim
fgr bool
fgrCodes sets.Set[int]
}

func newFirstResponseGate(w http.ResponseWriter, c *responderClaim, i int,
fgr bool,
) *firstResponseGate {
return &firstResponseGate{ResponseWriter: w, c: c, fh: http.Header{}, i: i, fgr: fgr}
}

func (frg *firstResponseGate) Header() http.Header {
return frg.fh
}

func (frg *firstResponseGate) WriteHeader(i int) {
custom := frg.fgr && len(frg.fgrCodes) > 0
var isGood bool
if custom {
_, isGood = frg.fgrCodes[i]
}
if (!frg.fgr || !custom && i < 400 || custom && isGood) && frg.c.Claim(int64(frg.i)) {
if len(frg.fh) > 0 {
headers.Merge(frg.ResponseWriter.Header(), frg.fh)
frg.fh = nil
// this is a fallback case for when no qualifying upstream response arrives,
// the first response is used, regardless of qualification
go func() {
wg.Wait()
// if claimed is still -1, the fallback case must be used
if atomic.LoadInt64(&claimed) == -1 && r.Context().Err() == nil {
// this iterates the captures and serves the first non-nil response
for i, crw := range captures {
if crw != nil {
serveAndCancelOthers(i, crw)
break
}
}
}
frg.ResponseWriter.WriteHeader(i)
return
}
}
}()

func (frg *firstResponseGate) Write(b []byte) (int, error) {
if frg.c.Claim(int64(frg.i)) {
if len(frg.fh) > 0 {
headers.Merge(frg.ResponseWriter.Header(), frg.fh)
frg.fh = nil
}
return frg.ResponseWriter.Write(b)
// this prevents ServeHTTP from returning until the response is fully
// written or the request context is canceled
select {
case <-responseWritten:
return
case <-r.Context().Done():
return
}
return len(b), nil
}
53 changes: 0 additions & 53 deletions pkg/backends/alb/mech/fr/responder_claim.go

This file was deleted.

48 changes: 0 additions & 48 deletions pkg/backends/alb/mech/fr/responder_claim_test.go

This file was deleted.

Loading
Loading