Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions rfq/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,10 @@ type ManagerCfg struct {
// determine whether a quote is accepted or rejected.
PriceOracle PriceOracle

// PortfolioPilot is the portfolio pilot that will make financial
// decisions based on RFQ requests.
PortfolioPilot PortfolioPilot

// ChannelLister is the channel lister that the RFQ manager will use to
// determine the available channels for routing.
ChannelLister ChannelLister
Expand Down Expand Up @@ -267,6 +271,7 @@ func (m *Manager) startSubsystems(ctx context.Context) error {
// Initialise and start the quote negotiator.
m.negotiator, err = NewNegotiator(NegotiatorCfg{
PriceOracle: m.cfg.PriceOracle,
PortfolioPilot: m.cfg.PortfolioPilot,
OutgoingMessages: m.outgoingMessages,
AcceptPriceDeviationPpm: m.cfg.AcceptPriceDeviationPpm,
SkipAcceptQuotePriceCheck: m.cfg.SkipAcceptQuotePriceCheck,
Expand Down
153 changes: 48 additions & 105 deletions rfq/negotiator.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,9 @@ type NegotiatorCfg struct {
// determine whether a quote is accepted or rejected.
PriceOracle PriceOracle

// PortfolioPilot makes financial decisions when evaluating quotes.
PortfolioPilot PortfolioPilot

// OutgoingMessages is a channel which is populated with outgoing peer
// messages. These are messages which are destined to be sent to peers.
OutgoingMessages chan<- rfqmsg.OutgoingMsg
Expand Down Expand Up @@ -93,6 +96,18 @@ type Negotiator struct {

// NewNegotiator creates a new quote negotiator.
func NewNegotiator(cfg NegotiatorCfg) (*Negotiator, error) {
// If the portfolio pilot is not specified, then we will use the
// internal portfolio pilot.
if cfg.PortfolioPilot == nil {
cfgPortfolioPilot := InternalPortfolioPilotConfig{
PriceOracle: cfg.PriceOracle,
ForwardPeerIDToOracle: cfg.SendPeerId,
}
cfg.PortfolioPilot = NewInternalPortfolioPilot(
cfgPortfolioPilot,
)
}

return &Negotiator{
cfg: cfg,

Expand Down Expand Up @@ -319,77 +334,56 @@ func (n *Negotiator) HandleIncomingBuyRequest(
}
}

// Reject the quote request if a price oracle is unavailable.
if n.cfg.PriceOracle == nil {
msg := rfqmsg.NewReject(
request.Peer, request.ID,
rfqmsg.ErrPriceOracleUnavailable,
)
go sendOutgoingMsg(msg)
return nil
}

// Ensure that we have a suitable sell offer for the asset that is being
// requested. Here we can handle the case where this node does not wish
// to sell a particular asset.
offerAvailable := n.HasAssetSellOffer(
request.AssetSpecifier, request.AssetMaxAmt,
)
if !offerAvailable {
log.Infof("Would reject buy request: no suitable buy offer, " +
"but ignoring for now")

// TODO(ffranr): Re-enable pre-price oracle rejection (i.e.
// reject on missing offer)

// If we do not have a suitable sell offer, then we will reject
// the quote request with an error.
// reject := rfqmsg.NewReject(
// request.Peer, request.ID,
// rfqmsg.ErrNoSuitableSellOffer,
// )
// go sendOutgoingMsg(reject)
//
// return nil
}

// Query the price oracle asynchronously using a separate goroutine.
// The price oracle might be an external service, responses could be
// delayed.
n.Wg.Add(1)
go func() {
defer n.Wg.Done()

var peerID fn.Option[route.Vertex]
if n.cfg.SendPeerId {
peerID = fn.Some(request.Peer)
}
n.Goroutine(func() error {
ctx, cancel := n.WithCtxQuitNoTimeout()
defer cancel()

// Query the price oracle for a sale price.
assetRate, err := n.querySellFromPriceOracle(
request.AssetSpecifier, fn.Some(request.AssetMaxAmt),
fn.None[lnwire.MilliSatoshi](), request.AssetRateHint,
peerID, request.PriceOracleMetadata, IntentRecvPayment,
resp, err := n.cfg.PortfolioPilot.ResolveBuyRequest(
ctx, request,
)
if err != nil {
// Send a reject message to the peer.
return fmt.Errorf("resolve buy request: %w", err)
}

if resp.IsReject() {
reason := rfqmsg.ErrUnknownReject
resp.WhenReject(func(err rfqmsg.RejectErr) {
reason = err
})

msg := rfqmsg.NewReject(
request.Peer, request.ID, reason,
)
sendOutgoingMsg(msg)
return nil
}

var assetRate *rfqmsg.AssetRate
resp.WhenAccept(func(rate rfqmsg.AssetRate) {
assetRate = &rate
})
if assetRate == nil {
msg := rfqmsg.NewReject(
request.Peer, request.ID,
rfqmsg.ErrUnknownReject,
)
sendOutgoingMsg(msg)

// Add an error to the error channel and return.
err = fmt.Errorf("failed to query sell price from "+
"oracle: %w", err)
n.cfg.ErrChan <- err
return
return fmt.Errorf("resolve buy request: missing " +
"asset rate on accept decision")
}

// Construct and send a buy accept message.
msg := rfqmsg.NewBuyAcceptFromRequest(request, *assetRate)
sendOutgoingMsg(msg)
}()

return nil
}, func(err error) {
n.cfg.ErrChan <- err
})

return nil
}
Expand Down Expand Up @@ -1003,57 +997,6 @@ func (n *Negotiator) RemoveAssetSellOffer(assetID *asset.ID,
return nil
}

// HasAssetSellOffer returns true if the negotiator has an asset sell offer
// which matches the given asset ID/group and asset amount.
//
// TODO(ffranr): This method should return errors which can be used to
// differentiate between a missing offer and an invalid offer.
func (n *Negotiator) HasAssetSellOffer(assetSpecifier asset.Specifier,
assetAmt uint64) bool {

// If the asset group key is not nil, then we will use it as the key for
// the offer. Otherwise, we will use the asset ID as the key.
var sellOffer *SellOffer

assetSpecifier.WhenGroupPubKey(func(assetGroupKey btcec.PublicKey) {
keyFixedBytes := asset.ToSerialized(&assetGroupKey)
offer, ok := n.assetGroupSellOffers.Load(keyFixedBytes)
if !ok {
// Corresponding offer not found.
return
}

sellOffer = &offer
})

assetSpecifier.WhenId(func(assetID asset.ID) {
offer, ok := n.assetSellOffers.Load(assetID)
if !ok {
// Corresponding offer not found.
return
}

sellOffer = &offer
})

// We should never have a nil sell offer at this point. Check added here
// for robustness.
if sellOffer == nil {
return false
}

// If the asset amount is greater than the maximum asset amount under
// offer, then we will return false (we do not have a suitable offer).
if assetAmt > sellOffer.MaxUnits {
log.Warnf("asset amount is greater than sell offer max units "+
"(asset_amt=%d, sell_offer_max_units=%d)", assetAmt,
sellOffer.MaxUnits)
return false
}

return true
}

// BuyOffer is a struct that represents an asset buy offer. This data structure
// describes the maximum amount of an asset that this node is willing to
// purchase.
Expand Down
142 changes: 142 additions & 0 deletions rfq/portfolio_pilot.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,142 @@
package rfq

import (
"context"
"fmt"

"github.com/lightninglabs/taproot-assets/fn"
"github.com/lightninglabs/taproot-assets/rfqmsg"
"github.com/lightningnetwork/lnd/lnwire"
"github.com/lightningnetwork/lnd/routing/route"
)

// ResolveResp captures the portfolio pilot's resolution decision for an RFQ. It
// carries either an accepted asset rate quote or a structured rejection reason.
type ResolveResp struct {
// outcome holds either the accepted asset rate (left) or the rejection
// error (right).
outcome fn.Either[rfqmsg.AssetRate, rfqmsg.RejectErr]
}

// NewAcceptResolveResp builds an acceptance response with the provided asset
// rate quote.
func NewAcceptResolveResp(assetRate rfqmsg.AssetRate) ResolveResp {
return ResolveResp{
outcome: fn.NewLeft[rfqmsg.AssetRate, rfqmsg.RejectErr](
assetRate,
),
}
}

// NewRejectResolveResp builds a rejection response that explains why a quote
// cannot be provided.
func NewRejectResolveResp(rejectErr rfqmsg.RejectErr) ResolveResp {
return ResolveResp{
outcome: fn.NewRight[rfqmsg.AssetRate, rfqmsg.RejectErr](
rejectErr,
),
}
}

// IsAccept reports whether the response contains an accepted asset rate.
func (r *ResolveResp) IsAccept() bool {
return r.outcome.IsLeft()
}

// IsReject reports whether the response contains a rejection error.
func (r *ResolveResp) IsReject() bool {
return r.outcome.IsRight()
}

// WhenAccept executes the callback with the asset rate when the response is an
// acceptance and does nothing otherwise.
func (r *ResolveResp) WhenAccept(pred func(rate rfqmsg.AssetRate)) {
r.outcome.WhenLeft(pred)
}

// WhenReject executes the callback with the rejection error when the response
// is a rejection and does nothing otherwise.
func (r *ResolveResp) WhenReject(pred func(err rfqmsg.RejectErr)) {
r.outcome.WhenRight(pred)
}

// PortfolioPilot evaluates RFQs and returns either an accepted asset rate quote
// or a rejection reason.
type PortfolioPilot interface {
// ResolveBuyRequest resolves a buy request by returning either an
// acceptable asset rate or a quote rejection error. Errors are reserved
// for unexpected failures while evaluating the request.
ResolveBuyRequest(context.Context, rfqmsg.BuyRequest) (ResolveResp,
error)
}

// InternalPortfolioPilotConfig holds settings for the built-in pilot that uses
// a price oracle for pricing decisions.
type InternalPortfolioPilotConfig struct {
// PriceOracle supplies pricing data. If nil, the pilot rejects requests
// as the oracle is considered unavailable.
PriceOracle PriceOracle

// ForwardPeerIDToOracle controls whether the requesting peer ID is sent
// to the oracle for peer-specific pricing. Disabling this avoids
// sharing caller identity with the oracle.
ForwardPeerIDToOracle bool
}

// InternalPortfolioPilot is the built-in RFQ decision logic that delegates
// pricing to an external price oracle.
type InternalPortfolioPilot struct {
// cfg holds settings and supporting components for the portfolio pilot.
cfg InternalPortfolioPilotConfig
}

// NewInternalPortfolioPilot constructs a new pilot using the provided config.
func NewInternalPortfolioPilot(
cfg InternalPortfolioPilotConfig) *InternalPortfolioPilot {

return &InternalPortfolioPilot{
cfg: cfg,
}
}

// ResolveBuyRequest resolves a buy request by querying the configured price
// oracle. It accepts with a quote when a valid rate is returned, rejects when
// no oracle is configured, and errors on unexpected oracle failures or missing
// rates.
func (p *InternalPortfolioPilot) ResolveBuyRequest(ctx context.Context,
request rfqmsg.BuyRequest) (ResolveResp, error) {

var zero ResolveResp

peerID := fn.None[route.Vertex]()
if p.cfg.ForwardPeerIDToOracle {
peerID = fn.Some(request.Peer)
}

if p.cfg.PriceOracle == nil {
return NewRejectResolveResp(
rfqmsg.ErrPriceOracleUnavailable,
), nil
}

resp, err := p.cfg.PriceOracle.QuerySellPrice(
ctx, request.AssetSpecifier, fn.Some(request.AssetMaxAmt),
fn.None[lnwire.MilliSatoshi](), request.AssetRateHint, peerID,
request.PriceOracleMetadata, IntentRecvPayment,
)
if err != nil {
return zero, fmt.Errorf("query sell price: %w", err)
}

if resp.Err != nil {
return zero, fmt.Errorf("price oracle returned error: %w",
resp.Err)
}

if resp.AssetRate.Rate.Coefficient.ToUint64() == 0 {
return zero, fmt.Errorf("price oracle did not specify an " +
"asset rate")
}

return NewAcceptResolveResp(resp.AssetRate), nil
}
Loading