diff --git a/api/client.go b/api/client.go index 4c53ee4..87ad4c3 100644 --- a/api/client.go +++ b/api/client.go @@ -5,30 +5,40 @@ import ( "net/http" "github.com/JackalLabs/sequoia/api/types" + "github.com/JackalLabs/sequoia/rpc" sdk "github.com/cosmos/cosmos-sdk/types" - "github.com/desmos-labs/cosmos-go-wallet/client" storageTypes "github.com/jackalLabs/canine-chain/v5/x/storage/types" "github.com/rs/zerolog/log" ) -func SpaceHandler(c *client.Client, address string) func(http.ResponseWriter, *http.Request) { +func SpaceHandler(fc *rpc.FailoverClient) func(http.ResponseWriter, *http.Request) { return func(w http.ResponseWriter, req *http.Request) { - queryClient := storageTypes.NewQueryClient(c.GRPCConn) + queryClient := storageTypes.NewQueryClient(fc.GRPCConn()) + address := fc.AccAddress() params := &storageTypes.QueryProvider{ Address: address, } res, err := queryClient.Provider(context.Background(), params) if err != nil { - v := types.ErrorResponse{ - Error: err.Error(), + if rpc.IsConnectionError(err) { + log.Warn().Err(err).Msg("Connection error querying provider, attempting failover") + if fc.Failover() { + // Retry with new connection + queryClient = storageTypes.NewQueryClient(fc.GRPCConn()) + res, err = queryClient.Provider(context.Background(), params) + } } - w.WriteHeader(http.StatusInternalServerError) - err = json.NewEncoder(w).Encode(v) if err != nil { - log.Error().Err(err) + v := types.ErrorResponse{ + Error: err.Error(), + } + w.WriteHeader(http.StatusInternalServerError) + if encErr := json.NewEncoder(w).Encode(v); encErr != nil { + log.Error().Err(encErr).Msg("Failed to encode error response") + } + return } - return } totalSpace := res.Provider.Totalspace @@ -38,15 +48,24 @@ func SpaceHandler(c *client.Client, address string) func(http.ResponseWriter, *h } fsres, err := queryClient.FreeSpace(context.Background(), fsparams) if err != nil { - v := types.ErrorResponse{ - Error: err.Error(), + if rpc.IsConnectionError(err) { + log.Warn().Err(err).Msg("Connection error querying free space, attempting failover") + if fc.Failover() { + // Retry with new connection + queryClient = storageTypes.NewQueryClient(fc.GRPCConn()) + fsres, err = queryClient.FreeSpace(context.Background(), fsparams) + } } - w.WriteHeader(http.StatusInternalServerError) - err = json.NewEncoder(w).Encode(v) if err != nil { - log.Error().Err(err) + v := types.ErrorResponse{ + Error: err.Error(), + } + w.WriteHeader(http.StatusInternalServerError) + if encErr := json.NewEncoder(w).Encode(v); encErr != nil { + log.Error().Err(encErr).Msg("Failed to encode error response") + } + return } - return } freeSpace := fsres.Space diff --git a/api/file_handler.go b/api/file_handler.go index a323a52..05eec43 100644 --- a/api/file_handler.go +++ b/api/file_handler.go @@ -15,6 +15,7 @@ import ( "time" "github.com/JackalLabs/sequoia/api/gateway" + "github.com/JackalLabs/sequoia/rpc" sequoiaTypes "github.com/JackalLabs/sequoia/types" "github.com/JackalLabs/sequoia/utils" @@ -23,8 +24,6 @@ import ( "github.com/JackalLabs/sequoia/proofs" - "github.com/desmos-labs/cosmos-go-wallet/wallet" - "github.com/JackalLabs/sequoia/api/types" "github.com/JackalLabs/sequoia/file_system" "github.com/gorilla/mux" @@ -45,7 +44,7 @@ func handleErr(err error, w http.ResponseWriter, code int) { } } -func PostFileHandler(fio *file_system.FileSystem, prover *proofs.Prover, wl *wallet.Wallet, chunkSize int64) func(http.ResponseWriter, *http.Request) { +func PostFileHandler(fio *file_system.FileSystem, prover *proofs.Prover, fc *rpc.FailoverClient, chunkSize int64) func(http.ResponseWriter, *http.Request) { return func(w http.ResponseWriter, req *http.Request) { // Use streaming multipart parsing instead of loading entire form into memory sender, merkleString, startBlockString, proofTypeString, file, _, err := parseMultipartFormStreaming(req) @@ -80,7 +79,7 @@ func PostFileHandler(fio *file_system.FileSystem, prover *proofs.Prover, wl *wal // Size validation is now enforced during multipart streaming // Files larger than MaxFileSize (32GB) will be rejected immediately - cl := storageTypes.NewQueryClient(wl.Client.GRPCConn) + cl := storageTypes.NewQueryClient(fc.GRPCConn()) queryParams := storageTypes.QueryFile{ Merkle: merkle, Owner: sender, @@ -100,7 +99,7 @@ func PostFileHandler(fio *file_system.FileSystem, prover *proofs.Prover, wl *wal } if len(f.Proofs) == int(f.MaxProofs) { - if !f.ContainsProver(wl.AccAddress()) { + if !f.ContainsProver(fc.AccAddress()) { handleErr(fmt.Errorf("cannot accept file that I cannot claim"), w, http.StatusInternalServerError) return } @@ -133,7 +132,7 @@ func PostFileHandler(fio *file_system.FileSystem, prover *proofs.Prover, wl *wal } } -func PostFileHandlerV2(fio *file_system.FileSystem, prover *proofs.Prover, wl *wallet.Wallet, chunkSize int64) func(http.ResponseWriter, *http.Request) { +func PostFileHandlerV2(fio *file_system.FileSystem, prover *proofs.Prover, fc *rpc.FailoverClient, chunkSize int64) func(http.ResponseWriter, *http.Request) { return func(w http.ResponseWriter, req *http.Request) { // Use streaming multipart parsing instead of loading entire form into memory sender, merkleString, startBlockString, _, file, _, err := parseMultipartFormStreaming(req) @@ -187,7 +186,7 @@ func PostFileHandlerV2(fio *file_system.FileSystem, prover *proofs.Prover, wl *w log.Error().Err(fmt.Errorf("can't encode json : %w", err)) } - cl := storageTypes.NewQueryClient(wl.Client.GRPCConn) + cl := storageTypes.NewQueryClient(fc.GRPCConn()) queryParams := storageTypes.QueryFile{ Merkle: merkle, Owner: sender, @@ -211,7 +210,7 @@ func PostFileHandlerV2(fio *file_system.FileSystem, prover *proofs.Prover, wl *w } if len(f.Proofs) == int(f.MaxProofs) { - if !f.ContainsProver(wl.AccAddress()) { + if !f.ContainsProver(fc.AccAddress()) { log.Error().Err(fmt.Errorf("cannot accept file that I cannot claim")) up.Status = "Error: Can't claim" return @@ -413,7 +412,7 @@ func getFolderData(data io.Reader) (*sequoiaTypes.FolderData, bool) { // getMerkleData retrieves file data by merkle hash, first attempting local storage and then querying network providers if not found locally. // It returns the file data if successful, or an error if the file cannot be retrieved from any source. -func getMerkleData(merkle []byte, fileName string, f *file_system.FileSystem, wallet *wallet.Wallet, myIp string) (io.ReadSeekCloser, error) { +func getMerkleData(merkle []byte, fileName string, f *file_system.FileSystem, fc *rpc.FailoverClient, myIp string) (io.ReadSeekCloser, error) { file, err := f.GetFileData(merkle) if err == nil { return file, nil @@ -425,7 +424,7 @@ func getMerkleData(merkle []byte, fileName string, f *file_system.FileSystem, wa Merkle: merkle, } - cl := storageTypes.NewQueryClient(wallet.Client.GRPCConn) + cl := storageTypes.NewQueryClient(fc.GRPCConn()) res, err := cl.FindFile(context.Background(), queryParams) if err != nil { @@ -470,10 +469,10 @@ func getMerkleData(merkle []byte, fileName string, f *file_system.FileSystem, wa // GetMerklePathData recursively resolves a file or folder by traversing a path from a root merkle hash. // If the path leads to a file, returns its data; if it leads to a folder and raw is false, returns an HTML representation of the folder. // Returns the file or folder data, the resolved filename, and an error if the path is invalid or data retrieval fails. -func GetMerklePathData(root []byte, path []string, fileName string, f *file_system.FileSystem, wallet *wallet.Wallet, myIp string, currentPath string, raw bool) (io.ReadSeekCloser, string, error) { +func GetMerklePathData(root []byte, path []string, fileName string, f *file_system.FileSystem, fc *rpc.FailoverClient, myIp string, currentPath string, raw bool) (io.ReadSeekCloser, string, error) { currentRoot := root - fileData, err := getMerkleData(currentRoot, fileName, f, wallet, myIp) + fileData, err := getMerkleData(currentRoot, fileName, f, fc, myIp) if err != nil { return nil, fileName, err } @@ -491,7 +490,7 @@ func GetMerklePathData(root []byte, path []string, fileName string, f *file_syst for _, child := range children { if child.Name == p { - return GetMerklePathData(child.Merkle, path[1:], child.Name, f, wallet, myIp, currentPath, raw) // check the next item in the list + return GetMerklePathData(child.Merkle, path[1:], child.Name, f, fc, myIp, currentPath, raw) // check the next item in the list } } // did not find child @@ -522,7 +521,7 @@ func GetMerklePathData(root []byte, path []string, fileName string, f *file_syst // FindFileHandler returns an HTTP handler that serves files or folders by merkle hash and optional path, supporting raw or HTML folder views. // // The handler extracts the merkle hash and optional path from the request, resolves the requested file or folder (recursively if a path is provided), and serves the content. If the target is a folder and the `raw` query parameter is not set, an HTML representation is generated. If a filename is not specified, the merkle string is used as the default name. Errors are returned as JSON responses. -func FindFileHandler(f *file_system.FileSystem, wallet *wallet.Wallet, myIp string) func(http.ResponseWriter, *http.Request) { +func FindFileHandler(f *file_system.FileSystem, fc *rpc.FailoverClient, myIp string) func(http.ResponseWriter, *http.Request) { return func(w http.ResponseWriter, req *http.Request) { vars := mux.Vars(req) fileName := req.URL.Query().Get("filename") @@ -555,7 +554,7 @@ func FindFileHandler(f *file_system.FileSystem, wallet *wallet.Wallet, myIp stri } if len(filteredPaths) > 0 { - data, name, err := GetMerklePathData(merkle, filteredPaths, fileName, f, wallet, myIp, req.URL.Path, raw) + data, name, err := GetMerklePathData(merkle, filteredPaths, fileName, f, fc, myIp, req.URL.Path, raw) if err != nil { v := types.ErrorResponse{ Error: err.Error(), @@ -572,7 +571,7 @@ func FindFileHandler(f *file_system.FileSystem, wallet *wallet.Wallet, myIp stri // This code will only run if there's no path or the path is empty - fileData, err := getMerkleData(merkle, fileName, f, wallet, myIp) + fileData, err := getMerkleData(merkle, fileName, f, fc, myIp) if err != nil { v := types.ErrorResponse{ Error: err.Error(), diff --git a/api/index.go b/api/index.go index ce8cae9..5c1ae3d 100644 --- a/api/index.go +++ b/api/index.go @@ -5,9 +5,9 @@ import ( "net/http" "github.com/JackalLabs/sequoia/config" + "github.com/JackalLabs/sequoia/rpc" "github.com/JackalLabs/sequoia/api/types" - "github.com/desmos-labs/cosmos-go-wallet/wallet" "github.com/rs/zerolog/log" ) @@ -26,9 +26,9 @@ func IndexHandler(address string) func(http.ResponseWriter, *http.Request) { } } -func VersionHandler(wallet *wallet.Wallet) func(http.ResponseWriter, *http.Request) { +func VersionHandler(fc *rpc.FailoverClient) func(http.ResponseWriter, *http.Request) { return func(w http.ResponseWriter, req *http.Request) { - chainId, err := wallet.Client.GetChainID() + chainId, err := fc.Wallet().Client.GetChainID() if err != nil { w.WriteHeader(500) return @@ -48,15 +48,15 @@ func VersionHandler(wallet *wallet.Wallet) func(http.ResponseWriter, *http.Reque } } -func NetworkHandler(wallet *wallet.Wallet) func(http.ResponseWriter, *http.Request) { +func NetworkHandler(fc *rpc.FailoverClient) func(http.ResponseWriter, *http.Request) { return func(w http.ResponseWriter, req *http.Request) { - status, err := wallet.Client.RPCClient.Status(context.Background()) + status, err := fc.RPCClient().Status(context.Background()) if err != nil { w.WriteHeader(500) return } - grpcStatus := wallet.Client.GRPCConn.GetState() + grpcStatus := fc.GRPCConn().GetState() v := types.NetworkResponse{ GRPCStatus: grpcStatus.String(), diff --git a/api/server.go b/api/server.go index c86a23f..7410a27 100644 --- a/api/server.go +++ b/api/server.go @@ -13,12 +13,12 @@ import ( "github.com/rs/cors" "github.com/JackalLabs/sequoia/file_system" + "github.com/JackalLabs/sequoia/rpc" "github.com/prometheus/client_golang/prometheus/promhttp" "github.com/JackalLabs/sequoia/proofs" "github.com/rs/zerolog/log" - "github.com/desmos-labs/cosmos-go-wallet/wallet" "github.com/gorilla/mux" jsoniter "github.com/json-iterator/go" @@ -47,29 +47,29 @@ func (a *API) Close() error { return a.srv.Close() } -func (a *API) Serve(f *file_system.FileSystem, p *proofs.Prover, wallet *wallet.Wallet, chunkSize int64, myIp string) { +func (a *API) Serve(f *file_system.FileSystem, p *proofs.Prover, fc *rpc.FailoverClient, chunkSize int64, myIp string) { defer log.Info().Msg("API module stopped") r := mux.NewRouter() outline := types.NewOutline() - outline.RegisterGetRoute(r, "/", IndexHandler(wallet.AccAddress())) + outline.RegisterGetRoute(r, "/", IndexHandler(fc.AccAddress())) - outline.RegisterPostRoute(r, "/upload", PostFileHandler(f, p, wallet, chunkSize)) - outline.RegisterPostRoute(r, "/v2/upload", PostFileHandlerV2(f, p, wallet, chunkSize)) + outline.RegisterPostRoute(r, "/upload", PostFileHandler(f, p, fc, chunkSize)) + outline.RegisterPostRoute(r, "/v2/upload", PostFileHandlerV2(f, p, fc, chunkSize)) outline.RegisterPostRoute(r, "/v2/status/{id}", CheckUploadStatus()) outline.RegisterPostRoute(r, "/api/jobs", ListJobsHandler()) outline.RegisterGetRoute(r, "/download/{merkle}", DownloadFileHandler(f)) if a.cfg.OpenGateway { - outline.RegisterGetRoute(r, "/get/{merkle}/{path:.*}", FindFileHandler(f, wallet, myIp)) - outline.RegisterGetRoute(r, "/get/{merkle}", FindFileHandler(f, wallet, myIp)) + outline.RegisterGetRoute(r, "/get/{merkle}/{path:.*}", FindFileHandler(f, fc, myIp)) + outline.RegisterGetRoute(r, "/get/{merkle}", FindFileHandler(f, fc, myIp)) } outline.RegisterGetRoute(r, "/list", ListFilesHandler(f)) outline.RegisterGetRoute(r, "/api/client/list", ListFilesHandler(f)) outline.RegisterGetRoute(r, "/api/data/fids", LegacyListFilesHandler(f)) - outline.RegisterGetRoute(r, "/api/client/space", SpaceHandler(wallet.Client, wallet.AccAddress())) + outline.RegisterGetRoute(r, "/api/client/space", SpaceHandler(fc)) outline.RegisterGetRoute(r, "/ipfs/peers", IPFSListPeers(f)) outline.RegisterGetRoute(r, "/ipfs/hosts", IPFSListHosts(f)) @@ -79,8 +79,8 @@ func (a *API) Serve(f *file_system.FileSystem, p *proofs.Prover, wallet *wallet. // outline.RegisterGetRoute(r, "/dump", DumpDBHandler(f)) - outline.RegisterGetRoute(r, "/version", VersionHandler(wallet)) - outline.RegisterGetRoute(r, "/network", NetworkHandler(wallet)) + outline.RegisterGetRoute(r, "/version", VersionHandler(fc)) + outline.RegisterGetRoute(r, "/network", NetworkHandler(fc)) outline.RegisterGetRoute(r, "/api", outline.OutlineHandler()) diff --git a/cmd/wallet/wallet.go b/cmd/wallet/wallet.go index a334651..bed4fde 100644 --- a/cmd/wallet/wallet.go +++ b/cmd/wallet/wallet.go @@ -65,16 +65,16 @@ func balanceCMD() *cobra.Command { return err } - wallet, err := config.InitWallet(home) + fc, err := config.InitWallet(home) if err != nil { return err } - queryClient := bankTypes.NewQueryClient(wallet.Client.GRPCConn) + queryClient := bankTypes.NewQueryClient(fc.GRPCConn()) params := &bankTypes.QueryBalanceRequest{ Denom: "ujkl", - Address: wallet.AccAddress(), + Address: fc.AccAddress(), } res, err := queryClient.Balance(context.Background(), params) @@ -125,7 +125,7 @@ func withdrawCMD() *cobra.Command { &m, ).WithGasAuto().WithFeeAuto() - res, err := wallet.BroadcastTxCommit(data) + res, err := wallet.Wallet().BroadcastTxCommit(data) if err != nil { return err } diff --git a/config/types.go b/config/types.go index 562f804..3cbd4c4 100644 --- a/config/types.go +++ b/config/types.go @@ -10,13 +10,44 @@ type Seed struct { DerivationPath string `json:"derivation_path"` } -// required for the mapstructure tag +// ChainConfig holds the configuration for connecting to a blockchain node. +// It supports both single-node (RPCAddr/GRPCAddr) and multi-node failover +// (RPCAddrs/GRPCAddrs) configurations. If the array fields are set, they +// take precedence over the single address fields. type ChainConfig struct { - Bech32Prefix string `yaml:"bech32_prefix" mapstructure:"bech32_prefix"` - RPCAddr string `yaml:"rpc_addr" mapstructure:"rpc_addr"` - GRPCAddr string `yaml:"grpc_addr" mapstructure:"grpc_addr"` - GasPrice string `yaml:"gas_price" mapstructure:"gas_price"` - GasAdjustment float64 `yaml:"gas_adjustment" mapstructure:"gas_adjustment"` + Bech32Prefix string `yaml:"bech32_prefix" mapstructure:"bech32_prefix"` + RPCAddr string `yaml:"rpc_addr" mapstructure:"rpc_addr"` + GRPCAddr string `yaml:"grpc_addr" mapstructure:"grpc_addr"` + RPCAddrs []string `yaml:"rpc_addrs" mapstructure:"rpc_addrs"` + GRPCAddrs []string `yaml:"grpc_addrs" mapstructure:"grpc_addrs"` + GasPrice string `yaml:"gas_price" mapstructure:"gas_price"` + GasAdjustment float64 `yaml:"gas_adjustment" mapstructure:"gas_adjustment"` +} + +// GetRPCAddrs returns the list of RPC addresses to use. +// If RPCAddrs is set, it returns that. Otherwise, it returns a single-element +// slice containing RPCAddr for backward compatibility. +func (c ChainConfig) GetRPCAddrs() []string { + if len(c.RPCAddrs) > 0 { + return c.RPCAddrs + } + if c.RPCAddr != "" { + return []string{c.RPCAddr} + } + return []string{"http://localhost:26657"} +} + +// GetGRPCAddrs returns the list of GRPC addresses to use. +// If GRPCAddrs is set, it returns that. Otherwise, it returns a single-element +// slice containing GRPCAddr for backward compatibility. +func (c ChainConfig) GetGRPCAddrs() []string { + if len(c.GRPCAddrs) > 0 { + return c.GRPCAddrs + } + if c.GRPCAddr != "" { + return []string{c.GRPCAddr} + } + return []string{"localhost:9090"} } type Config struct { @@ -171,8 +202,8 @@ func (c Config) MarshalZerologObject(e *zerolog.Event) { Int64("StrayCheckInterval", c.StrayManagerCfg.CheckInterval). Int64("StrayRefreshInterval", c.StrayManagerCfg.RefreshInterval). Int("StrayHandCount", c.StrayManagerCfg.HandCount). - Str("ChainRPCAddr", c.ChainCfg.RPCAddr). - Str("ChainGRPCAddr", c.ChainCfg.GRPCAddr). + Strs("ChainRPCAddrs", c.ChainCfg.GetRPCAddrs()). + Strs("ChainGRPCAddrs", c.ChainCfg.GetGRPCAddrs()). Str("ChainGasPrice", c.ChainCfg.GasPrice). Float64("ChainGasAdjustment", c.ChainCfg.GasAdjustment). Str("IP", c.Ip). diff --git a/config/wallet.go b/config/wallet.go index f46998d..952d41e 100644 --- a/config/wallet.go +++ b/config/wallet.go @@ -5,10 +5,8 @@ import ( "os" "path" - sequoiaWallet "github.com/JackalLabs/sequoia/wallet" + "github.com/JackalLabs/sequoia/rpc" bip39 "github.com/cosmos/go-bip39" - "github.com/desmos-labs/cosmos-go-wallet/types" - "github.com/desmos-labs/cosmos-go-wallet/wallet" jsoniter "github.com/json-iterator/go" @@ -79,7 +77,7 @@ func createWallet(directory string) error { return nil } -func InitWallet(home string) (*wallet.Wallet, error) { +func InitWallet(home string) (*rpc.FailoverClient, error) { directory := os.ExpandEnv(home) err := os.MkdirAll(directory, os.ModePerm) @@ -92,10 +90,18 @@ func InitWallet(home string) (*wallet.Wallet, error) { return nil, err } + nodeCfg := rpc.NodeConfig{ + Bech32Prefix: config.ChainCfg.Bech32Prefix, + RPCAddrs: config.ChainCfg.GetRPCAddrs(), + GRPCAddrs: config.ChainCfg.GetGRPCAddrs(), + GasPrice: config.ChainCfg.GasPrice, + GasAdjustment: config.ChainCfg.GasAdjustment, + } + legacyWallet, err := detectLegacyWallet(home) if err == nil { log.Info().Msg("legacy wallet detected") - return sequoiaWallet.CreateWalletPrivKey(legacyWallet.Key, types.ChainConfig(config.ChainCfg)) + return rpc.NewFailoverClientWithPrivKey(nodeCfg, legacyWallet.Key) } err = createWallet(directory) @@ -113,7 +119,7 @@ func InitWallet(home string) (*wallet.Wallet, error) { return nil, err } - return sequoiaWallet.CreateWallet(seed.SeedPhrase, seed.DerivationPath, types.ChainConfig(config.ChainCfg)) + return rpc.NewFailoverClient(nodeCfg, seed.SeedPhrase, seed.DerivationPath) } // returns LegacyWallet if "priv_storkey.json" is found at sequoia home directory, diff --git a/core/app.go b/core/app.go index fc31b9b..684181d 100644 --- a/core/app.go +++ b/core/app.go @@ -32,9 +32,9 @@ import ( "github.com/JackalLabs/sequoia/config" "github.com/JackalLabs/sequoia/proofs" "github.com/JackalLabs/sequoia/queue" + "github.com/JackalLabs/sequoia/rpc" "github.com/JackalLabs/sequoia/strays" walletTypes "github.com/desmos-labs/cosmos-go-wallet/types" - "github.com/desmos-labs/cosmos-go-wallet/wallet" storageTypes "github.com/jackalLabs/canine-chain/v5/x/storage/types" "github.com/rs/zerolog/log" ) @@ -48,7 +48,7 @@ type App struct { home string monitor *monitoring.Monitor fileSystem *file_system.FileSystem - wallet *wallet.Wallet + wallet *rpc.FailoverClient } // NewApp initializes and returns a new App instance using the provided home directory. @@ -118,20 +118,26 @@ func NewApp(home string) (*App, error) { }, nil } -func initProviderOnChain(wallet *wallet.Wallet, ip string, totalSpace int64) error { - init := storageTypes.NewMsgInitProvider(wallet.AccAddress(), ip, totalSpace, "") +func initProviderOnChain(fc *rpc.FailoverClient, ip string, totalSpace int64) error { + w := fc.Wallet() + init := storageTypes.NewMsgInitProvider(w.AccAddress(), ip, totalSpace, "") data := walletTypes.NewTransactionData( init, ).WithGasAuto().WithFeeAuto() - builder, err := wallet.BuildTx(data) + builder, err := w.BuildTx(data) if err != nil { return err } - res, err := wallet.Client.BroadcastTxCommit(builder.GetTx()) + res, err := w.Client.BroadcastTxCommit(builder.GetTx()) if err != nil { + if rpc.IsConnectionError(err) { + if fc.Failover() { + return initProviderOnChain(fc, ip, totalSpace) + } + } return err } @@ -140,20 +146,26 @@ func initProviderOnChain(wallet *wallet.Wallet, ip string, totalSpace int64) err return nil } -func updateSpace(wallet *wallet.Wallet, totalSpace int64) error { - init := storageTypes.NewMsgSetProviderTotalSpace(wallet.AccAddress(), totalSpace) +func updateSpace(fc *rpc.FailoverClient, totalSpace int64) error { + w := fc.Wallet() + init := storageTypes.NewMsgSetProviderTotalSpace(w.AccAddress(), totalSpace) data := walletTypes.NewTransactionData( init, ).WithGasAuto().WithFeeAuto() - builder, err := wallet.BuildTx(data) + builder, err := w.BuildTx(data) if err != nil { return err } - res, err := wallet.Client.BroadcastTxCommit(builder.GetTx()) + res, err := w.Client.BroadcastTxCommit(builder.GetTx()) if err != nil { + if rpc.IsConnectionError(err) { + if fc.Failover() { + return updateSpace(fc, totalSpace) + } + } return err } @@ -162,20 +174,26 @@ func updateSpace(wallet *wallet.Wallet, totalSpace int64) error { return nil } -func updateIp(wallet *wallet.Wallet, ip string) error { - init := storageTypes.NewMsgSetProviderIP(wallet.AccAddress(), ip) +func updateIp(fc *rpc.FailoverClient, ip string) error { + w := fc.Wallet() + init := storageTypes.NewMsgSetProviderIP(w.AccAddress(), ip) data := walletTypes.NewTransactionData( init, ).WithGasAuto().WithFeeAuto() - builder, err := wallet.BuildTx(data) + builder, err := w.BuildTx(data) if err != nil { return err } - res, err := wallet.Client.BroadcastTxCommit(builder.GetTx()) + res, err := w.Client.BroadcastTxCommit(builder.GetTx()) if err != nil { + if rpc.IsConnectionError(err) { + if fc.Failover() { + return updateIp(fc, ip) + } + } return err } @@ -212,12 +230,17 @@ func (a *App) Start() error { Address: myAddress, } - cl := storageTypes.NewQueryClient(a.wallet.Client.GRPCConn) + cl := storageTypes.NewQueryClient(a.wallet.GRPCConn()) claimers := make([]string, 0) res, err := cl.Provider(context.Background(), queryParams) if err != nil { + if rpc.IsConnectionError(err) { + if a.wallet.Failover() { + return a.Start() // Retry with new connection + } + } log.Info().Err(err).Msg("Provider does not exist on network or is not connected...") err := initProviderOnChain(a.wallet, cfg.Ip, cfg.TotalSpace) if err != nil { @@ -251,7 +274,7 @@ func (a *App) Start() error { } } - params, err := a.GetStorageParams(a.wallet.Client.GRPCConn) + params, err := a.GetStorageParams(a.wallet.GRPCConn()) if err != nil { return err } @@ -304,7 +327,7 @@ func (a *App) Start() error { func (a *App) ConnectPeers() { log.Info().Msg("Starting IPFS Peering cycle...") ctx := context.Background() - queryClient := storageTypes.NewQueryClient(a.wallet.Client.GRPCConn) + queryClient := storageTypes.NewQueryClient(a.wallet.GRPCConn()) activeProviders, err := queryClient.ActiveProviders(ctx, &storageTypes.QueryActiveProviders{}) if err != nil { diff --git a/monitoring/monitor.go b/monitoring/monitor.go index 6b468f1..6923772 100644 --- a/monitoring/monitor.go +++ b/monitoring/monitor.go @@ -11,7 +11,7 @@ import ( ) func (m *Monitor) updateBurns() { - cl := types.NewQueryClient(m.wallet.Client.GRPCConn) + cl := types.NewQueryClient(m.wallet.GRPCConn()) provRes, err := cl.Provider(context.Background(), &types.QueryProvider{Address: m.wallet.AccAddress()}) if err != nil { return @@ -27,7 +27,7 @@ func (m *Monitor) updateBurns() { } func (m *Monitor) updateHeight() { - abciInfo, err := m.wallet.Client.RPCClient.ABCIInfo(context.Background()) + abciInfo, err := m.wallet.RPCClient().ABCIInfo(context.Background()) if err != nil { return } @@ -37,7 +37,7 @@ func (m *Monitor) updateHeight() { } func (m *Monitor) updateBalance() { - cl := bankTypes.NewQueryClient(m.wallet.Client.GRPCConn) + cl := bankTypes.NewQueryClient(m.wallet.GRPCConn()) provRes, err := cl.Balance(context.Background(), &bankTypes.QueryBalanceRequest{Address: m.wallet.AccAddress(), Denom: "ujkl"}) if err != nil { return diff --git a/monitoring/types.go b/monitoring/types.go index ad68f9a..867f19c 100644 --- a/monitoring/types.go +++ b/monitoring/types.go @@ -1,7 +1,7 @@ package monitoring import ( - "github.com/desmos-labs/cosmos-go-wallet/wallet" + "github.com/JackalLabs/sequoia/rpc" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promauto" ) @@ -30,10 +30,10 @@ var _ = catchingUp type Monitor struct { running bool - wallet *wallet.Wallet + wallet *rpc.FailoverClient } -func NewMonitor(wallet *wallet.Wallet) *Monitor { +func NewMonitor(wallet *rpc.FailoverClient) *Monitor { return &Monitor{ running: false, wallet: wallet, diff --git a/proofs/proofs.go b/proofs/proofs.go index 6b7dde7..5c0821d 100644 --- a/proofs/proofs.go +++ b/proofs/proofs.go @@ -21,7 +21,7 @@ import ( canine "github.com/jackalLabs/canine-chain/v5/app" "github.com/JackalLabs/sequoia/queue" - "github.com/desmos-labs/cosmos-go-wallet/wallet" + "github.com/JackalLabs/sequoia/rpc" "github.com/jackalLabs/canine-chain/v5/x/storage/types" "github.com/rs/zerolog/log" merkletree "github.com/wealdtech/go-merkletree/v2" @@ -108,7 +108,7 @@ func (p *Prover) GenerateProof(merkle []byte, owner string, start int64, blockHe Start: start, } - cl := types.NewQueryClient(p.wallet.Client.GRPCConn) + cl := types.NewQueryClient(p.wallet.GRPCConn()) res, err := cl.File(context.Background(), queryParams) if err != nil { @@ -325,18 +325,36 @@ func (p *Prover) Start() { log.Debug().Msg("Starting proof cycle...") c := context.Background() - abciInfo, err := p.wallet.Client.RPCClient.ABCIInfo(c) + abciInfo, err := p.wallet.RPCClient().ABCIInfo(c) if err != nil { - log.Error().Err(err) - continue + if rpc.IsConnectionError(err) { + log.Warn().Err(err).Msg("Connection error getting ABCI info, attempting failover") + if p.wallet.Failover() { + // Retry after successful failover + abciInfo, err = p.wallet.RPCClient().ABCIInfo(c) + } + } + if err != nil { + log.Error().Err(err).Msg("Failed to get ABCI info") + continue + } } height := abciInfo.Response.LastBlockHeight limit := 5000 - unconfirmedTxs, err := p.wallet.Client.RPCClient.UnconfirmedTxs(c, &limit) + unconfirmedTxs, err := p.wallet.RPCClient().UnconfirmedTxs(c, &limit) if err != nil { - log.Error().Err(err).Msg("could not get mempool status") - continue + if rpc.IsConnectionError(err) { + log.Warn().Err(err).Msg("Connection error getting mempool status, attempting failover") + if p.wallet.Failover() { + // Retry after successful failover + unconfirmedTxs, err = p.wallet.RPCClient().UnconfirmedTxs(c, &limit) + } + } + if err != nil { + log.Error().Err(err).Msg("could not get mempool status") + continue + } } if unconfirmedTxs.Total > 2000 { log.Error().Msg("Cannot make proofs when mempool is too large.") @@ -430,7 +448,7 @@ func (p *Prover) Stop() { p.running = false } -func NewProver(wallet *wallet.Wallet, q *queue.Queue, io FileSystem, interval uint64, threads int16, chunkSize int) *Prover { +func NewProver(wallet *rpc.FailoverClient, q *queue.Queue, io FileSystem, interval uint64, threads int16, chunkSize int) *Prover { p := Prover{ running: false, wallet: wallet, diff --git a/proofs/types.go b/proofs/types.go index 4099ef3..b555bc2 100644 --- a/proofs/types.go +++ b/proofs/types.go @@ -6,12 +6,12 @@ import ( merkletree "github.com/wealdtech/go-merkletree/v2" "github.com/JackalLabs/sequoia/queue" - "github.com/desmos-labs/cosmos-go-wallet/wallet" + "github.com/JackalLabs/sequoia/rpc" ) type Prover struct { running bool - wallet *wallet.Wallet + wallet *rpc.FailoverClient q *queue.Queue processed time.Time interval uint64 diff --git a/queue/queue.go b/queue/queue.go index e51973c..9dc9e7f 100644 --- a/queue/queue.go +++ b/queue/queue.go @@ -12,11 +12,11 @@ import ( "time" "github.com/JackalLabs/sequoia/config" + "github.com/JackalLabs/sequoia/rpc" storageTypes "github.com/jackalLabs/canine-chain/v5/x/storage/types" "github.com/cosmos/cosmos-sdk/types" walletTypes "github.com/desmos-labs/cosmos-go-wallet/types" - "github.com/desmos-labs/cosmos-go-wallet/wallet" "github.com/rs/zerolog/log" "golang.org/x/time/rate" ) @@ -71,7 +71,7 @@ func (m *Message) Done() { m.wg.Done() } -func NewQueue(w *wallet.Wallet, interval uint64, maxSizeBytes int64, domain string, rlCfg config.RateLimitConfig) *Queue { +func NewQueue(w *rpc.FailoverClient, interval uint64, maxSizeBytes int64, domain string, rlCfg config.RateLimitConfig) *Queue { if maxSizeBytes == 0 { maxSizeBytes = config.DefaultMaxSizeBytes() } @@ -174,10 +174,19 @@ func (q *Queue) BroadcastPending() (int, error) { log.Info().Msg(fmt.Sprintf("Queue: %d messages waiting to be put on-chain...", total)) limit := 5000 - unconfirmedTxs, err := q.wallet.Client.RPCClient.UnconfirmedTxs(context.Background(), &limit) + unconfirmedTxs, err := q.wallet.RPCClient().UnconfirmedTxs(context.Background(), &limit) if err != nil { - log.Error().Err(err).Msg("could not get mempool status") - return 0, err + if rpc.IsConnectionError(err) { + log.Warn().Err(err).Msg("Connection error getting mempool status, attempting failover") + if q.wallet.Failover() { + // Retry after successful failover + unconfirmedTxs, err = q.wallet.RPCClient().UnconfirmedTxs(context.Background(), &limit) + } + } + if err != nil { + log.Error().Err(err).Msg("could not get mempool status") + return 0, err + } } if unconfirmedTxs.Total > 2000 { log.Error().Msg("Cannot post messages when mempool is too large, waiting 30 minutes") @@ -231,8 +240,15 @@ func (q *Queue) BroadcastPending() (int, error) { var i int for !complete && i < 10 { i++ - res, err = q.wallet.BroadcastTxSync(data) + w := q.wallet.Wallet() + res, err = w.BroadcastTxSync(data) if err != nil { + if rpc.IsConnectionError(err) { + log.Warn().Err(err).Msg("Connection error during broadcast, attempting failover") + if q.wallet.Failover() { + continue // Retry with new connection + } + } if strings.Contains(err.Error(), "tx already exists in cache") { log.Info().Msg("TX already exists in mempool, we're going to skip it.") continue diff --git a/queue/types.go b/queue/types.go index 5e46d9e..18b58f5 100644 --- a/queue/types.go +++ b/queue/types.go @@ -4,13 +4,13 @@ import ( "sync" "time" + "github.com/JackalLabs/sequoia/rpc" "github.com/cosmos/cosmos-sdk/types" - "github.com/desmos-labs/cosmos-go-wallet/wallet" "golang.org/x/time/rate" ) type Queue struct { - wallet *wallet.Wallet + wallet *rpc.FailoverClient messages []*Message processed time.Time running bool diff --git a/rpc/failover.go b/rpc/failover.go new file mode 100644 index 0000000..915613e --- /dev/null +++ b/rpc/failover.go @@ -0,0 +1,363 @@ +package rpc + +import ( + "context" + "errors" + "strings" + "sync" + + sequoiaWallet "github.com/JackalLabs/sequoia/wallet" + "github.com/cosmos/gogoproto/grpc" + walletTypes "github.com/desmos-labs/cosmos-go-wallet/types" + "github.com/desmos-labs/cosmos-go-wallet/wallet" + "github.com/rs/zerolog/log" + "github.com/tendermint/tendermint/rpc/client" +) + +// ErrNoNodes is returned when no RPC/GRPC nodes are configured. +var ErrNoNodes = errors.New("no RPC/GRPC nodes configured") + +// ErrMismatchedNodeCounts is returned when RPCAddrs and GRPCAddrs have different lengths. +var ErrMismatchedNodeCounts = errors.New("RPCAddrs and GRPCAddrs must have the same length") + +// ErrInvalidNodeIndex is returned when an invalid node index is provided. +var ErrInvalidNodeIndex = errors.New("invalid node index") + +// ErrNilWallet is returned when the wallet is nil. +var ErrNilWallet = errors.New("wallet is nil") + +// NodeConfig contains the configuration needed to connect to blockchain nodes. +// This is separate from config.ChainConfig to avoid import cycles. +type NodeConfig struct { + Bech32Prefix string + RPCAddrs []string + GRPCAddrs []string + GasPrice string + GasAdjustment float64 +} + +// FailoverClient wraps a wallet and provides automatic failover between +// multiple RPC/GRPC nodes. When a connection error is detected, it +// automatically switches to the next available node. +type FailoverClient struct { + mu sync.RWMutex + + wallet *wallet.Wallet + + // Configuration + nodeCfg NodeConfig + seed string + derivation string + useLegacyKey bool + legacyKey string + + // Node tracking + currentIndex int + failoverCount int +} + +// NewFailoverClient creates a new FailoverClient with the given configuration. +// It initializes the first connection using the provided seed phrase. +func NewFailoverClient(nodeCfg NodeConfig, seed, derivation string) (*FailoverClient, error) { + if len(nodeCfg.RPCAddrs) == 0 || len(nodeCfg.GRPCAddrs) == 0 { + return nil, ErrNoNodes + } + if len(nodeCfg.RPCAddrs) != len(nodeCfg.GRPCAddrs) { + return nil, ErrMismatchedNodeCounts + } + + fc := &FailoverClient{ + nodeCfg: nodeCfg, + seed: seed, + derivation: derivation, + useLegacyKey: false, + currentIndex: 0, + } + + if err := fc.connectToFirstAvailable(); err != nil { + return nil, err + } + + return fc, nil +} + +// NewFailoverClientWithPrivKey creates a new FailoverClient using a legacy private key. +func NewFailoverClientWithPrivKey(nodeCfg NodeConfig, privKey string) (*FailoverClient, error) { + if len(nodeCfg.RPCAddrs) == 0 || len(nodeCfg.GRPCAddrs) == 0 { + return nil, ErrNoNodes + } + if len(nodeCfg.RPCAddrs) != len(nodeCfg.GRPCAddrs) { + return nil, ErrMismatchedNodeCounts + } + + fc := &FailoverClient{ + nodeCfg: nodeCfg, + useLegacyKey: true, + legacyKey: privKey, + currentIndex: 0, + } + + if err := fc.connectToFirstAvailable(); err != nil { + return nil, err + } + + return fc, nil +} + +// connectToFirstAvailable attempts to connect to the first available node. +func (fc *FailoverClient) connectToFirstAvailable() error { + var lastErr error + for i := 0; i < len(fc.nodeCfg.RPCAddrs); i++ { + w, err := fc.createWalletAtIndex(i) + if err == nil { + fc.wallet = w + fc.currentIndex = i + log.Info(). + Int("node_index", fc.currentIndex). + Str("rpc", fc.nodeCfg.RPCAddrs[fc.currentIndex]). + Str("grpc", fc.nodeCfg.GRPCAddrs[fc.currentIndex]). + Msg("Connected to blockchain node") + return nil + } + lastErr = err + if i > 0 { + log.Warn().Err(err).Int("index", i).Msg("Failed to connect to node, trying next") + } + } + return lastErr +} + +// createWalletAtIndex creates a new wallet connection using the node at the given index. +func (fc *FailoverClient) createWalletAtIndex(index int) (*wallet.Wallet, error) { + if index < 0 || index >= len(fc.nodeCfg.RPCAddrs) || index >= len(fc.nodeCfg.GRPCAddrs) { + return nil, ErrInvalidNodeIndex + } + + // Create a modified chain config with the specific node addresses + chainCfg := walletTypes.ChainConfig{ + Bech32Prefix: fc.nodeCfg.Bech32Prefix, + RPCAddr: fc.nodeCfg.RPCAddrs[index], + GRPCAddr: fc.nodeCfg.GRPCAddrs[index], + GasPrice: fc.nodeCfg.GasPrice, + GasAdjustment: fc.nodeCfg.GasAdjustment, + } + + if fc.useLegacyKey { + return sequoiaWallet.CreateWalletPrivKey(fc.legacyKey, chainCfg) + } + return sequoiaWallet.CreateWallet(fc.seed, fc.derivation, chainCfg) +} + +// Failover switches to the next available node. Returns true if a new node +// was connected, false if we've cycled through all nodes without success. +func (fc *FailoverClient) Failover() bool { + fc.mu.Lock() + defer fc.mu.Unlock() + + startIndex := fc.currentIndex + totalNodes := len(fc.nodeCfg.RPCAddrs) + + for i := 1; i <= totalNodes; i++ { + nextIndex := (startIndex + i) % totalNodes + log.Info(). + Int("from_index", fc.currentIndex). + Int("to_index", nextIndex). + Str("rpc", fc.nodeCfg.RPCAddrs[nextIndex]). + Str("grpc", fc.nodeCfg.GRPCAddrs[nextIndex]). + Msg("Attempting failover to next node") + + w, err := fc.createWalletAtIndex(nextIndex) + if err != nil { + log.Warn().Err(err). + Int("index", nextIndex). + Str("rpc", fc.nodeCfg.RPCAddrs[nextIndex]). + Msg("Failed to connect during failover, trying next") + continue + } + + fc.wallet = w + fc.currentIndex = nextIndex + fc.failoverCount++ + + log.Info(). + Int("node_index", fc.currentIndex). + Str("rpc", fc.nodeCfg.RPCAddrs[fc.currentIndex]). + Str("grpc", fc.nodeCfg.GRPCAddrs[fc.currentIndex]). + Int("total_failovers", fc.failoverCount). + Msg("Successfully failed over to new node") + + return true + } + + log.Error().Msg("Failed to connect to any node during failover") + return false +} + +// Wallet returns the underlying wallet. Use with caution - prefer using +// the FailoverClient methods which handle automatic failover. +func (fc *FailoverClient) Wallet() *wallet.Wallet { + fc.mu.RLock() + defer fc.mu.RUnlock() + return fc.wallet +} + +// GRPCConn returns the current GRPC connection. +// Panics if the wallet is nil (should not happen in normal operation). +func (fc *FailoverClient) GRPCConn() grpc.ClientConn { + fc.mu.RLock() + defer fc.mu.RUnlock() + if fc.wallet == nil || fc.wallet.Client == nil { + panic(ErrNilWallet) + } + return fc.wallet.Client.GRPCConn +} + +// RPCClient returns the current RPC client. +// Panics if the wallet is nil (should not happen in normal operation). +func (fc *FailoverClient) RPCClient() client.Client { + fc.mu.RLock() + defer fc.mu.RUnlock() + if fc.wallet == nil || fc.wallet.Client == nil { + panic(ErrNilWallet) + } + return fc.wallet.Client.RPCClient +} + +// AccAddress returns the account address. +// Panics if the wallet is nil (should not happen in normal operation). +func (fc *FailoverClient) AccAddress() string { + fc.mu.RLock() + defer fc.mu.RUnlock() + if fc.wallet == nil { + panic(ErrNilWallet) + } + return fc.wallet.AccAddress() +} + +// CurrentNodeIndex returns the index of the currently connected node. +func (fc *FailoverClient) CurrentNodeIndex() int { + fc.mu.RLock() + defer fc.mu.RUnlock() + return fc.currentIndex +} + +// CurrentRPCAddr returns the RPC address of the currently connected node. +func (fc *FailoverClient) CurrentRPCAddr() string { + fc.mu.RLock() + defer fc.mu.RUnlock() + return fc.nodeCfg.RPCAddrs[fc.currentIndex] +} + +// CurrentGRPCAddr returns the GRPC address of the currently connected node. +func (fc *FailoverClient) CurrentGRPCAddr() string { + fc.mu.RLock() + defer fc.mu.RUnlock() + return fc.nodeCfg.GRPCAddrs[fc.currentIndex] +} + +// FailoverCount returns the total number of failovers that have occurred. +func (fc *FailoverClient) FailoverCount() int { + fc.mu.RLock() + defer fc.mu.RUnlock() + return fc.failoverCount +} + +// NodeCount returns the total number of configured nodes. +func (fc *FailoverClient) NodeCount() int { + fc.mu.RLock() + defer fc.mu.RUnlock() + return len(fc.nodeCfg.RPCAddrs) +} + +// IsConnectionError checks if an error indicates a connection problem that +// should trigger a failover. +func IsConnectionError(err error) bool { + if err == nil { + return false + } + + errStr := err.Error() + + // Common connection error patterns + connectionErrors := []string{ + "connection refused", + "connection reset", + "no such host", + "network is unreachable", + "i/o timeout", + "context deadline exceeded", + "eof", + "connection closed", + "transport is closing", + "server misbehaving", + "unavailable", + "failed to connect", + } + + errStrLower := strings.ToLower(errStr) + for _, pattern := range connectionErrors { + if strings.Contains(errStrLower, pattern) { + return true + } + } + + return false +} + +// ExecuteWithFailover executes a function and automatically fails over +// to the next node if a connection error is detected. +func (fc *FailoverClient) ExecuteWithFailover(fn func() error) error { + err := fn() + if err != nil && IsConnectionError(err) { + log.Warn().Err(err).Msg("Connection error detected, attempting failover") + if fc.Failover() { + // Retry after failover + return fn() + } + } + return err +} + +// QueryWithFailover executes a query function and automatically fails over +// to the next node if a connection error is detected. It returns the result +// of the query function. +func QueryWithFailover[T any](fc *FailoverClient, fn func() (T, error)) (T, error) { + result, err := fn() + if err != nil && IsConnectionError(err) { + log.Warn().Err(err).Msg("Connection error detected during query, attempting failover") + if fc.Failover() { + // Retry after failover + return fn() + } + } + return result, err +} + +// HealthCheck performs a health check on the current node by querying ABCI info. +func (fc *FailoverClient) HealthCheck(ctx context.Context) error { + fc.mu.RLock() + rpcClient := fc.wallet.Client.RPCClient + fc.mu.RUnlock() + + _, err := rpcClient.ABCIInfo(ctx) + return err +} + +// EnsureHealthy checks if the current node is healthy, and if not, +// attempts to failover to a healthy node. After failover, it verifies +// that the new node is also healthy. +func (fc *FailoverClient) EnsureHealthy(ctx context.Context) error { + err := fc.HealthCheck(ctx) + if err != nil { + log.Warn().Err(err).Msg("Current node unhealthy, attempting failover") + if !fc.Failover() { + return err + } + // Verify the new node is healthy + if err := fc.HealthCheck(ctx); err != nil { + log.Warn().Err(err).Msg("New node also unhealthy after failover") + return err + } + } + return nil +} diff --git a/strays/hands.go b/strays/hands.go index a825d71..56e7ef1 100644 --- a/strays/hands.go +++ b/strays/hands.go @@ -115,7 +115,7 @@ func (h *Hand) Take(stray *types.UnifiedFile) { func (s *StrayManager) NewHand(q *queue.Queue) (*Hand, error) { offset := byte(len(s.hands)) + 1 - w, err := s.wallet.CloneWalletOffset(offset) + w, err := s.wallet.Wallet().CloneWalletOffset(offset) if err != nil { return nil, err } diff --git a/strays/manager.go b/strays/manager.go index f42d986..a0dcb7f 100644 --- a/strays/manager.go +++ b/strays/manager.go @@ -7,12 +7,12 @@ import ( "time" "github.com/JackalLabs/sequoia/file_system" + "github.com/JackalLabs/sequoia/rpc" "github.com/JackalLabs/sequoia/queue" sdk "github.com/cosmos/cosmos-sdk/types" "github.com/cosmos/cosmos-sdk/types/query" "github.com/cosmos/cosmos-sdk/x/feegrant" - "github.com/desmos-labs/cosmos-go-wallet/wallet" "github.com/jackalLabs/canine-chain/v5/x/storage/types" "github.com/rs/zerolog/log" ) @@ -26,7 +26,7 @@ const ( ) // NewStrayManager creates and initializes a new StrayManager with the specified number of hands, authorizing each hand to transact on behalf of the provided wallet if not already authorized. -func NewStrayManager(w *wallet.Wallet, q *queue.Queue, interval int64, refreshInterval int64, handCount int, authList []string) *StrayManager { +func NewStrayManager(w *rpc.FailoverClient, q *queue.Queue, interval int64, refreshInterval int64, handCount int, authList []string) *StrayManager { s := &StrayManager{ rand: rand.New(rand.NewSource(time.Now().Unix())), wallet: w, @@ -102,7 +102,7 @@ func (s *StrayManager) Start(f *file_system.FileSystem, q *queue.Queue, myUrl st defer log.Info().Msg("StrayManager stopped") for _, hand := range s.hands { - go hand.Start(f, s.wallet, q, myUrl, chunkSize) + go hand.Start(f, s.wallet.Wallet(), q, myUrl, chunkSize) } for s.running { @@ -179,10 +179,14 @@ func (s *StrayManager) RefreshList() error { Pagination: page, } - cl := types.NewQueryClient(s.wallet.Client.GRPCConn) + cl := types.NewQueryClient(s.wallet.GRPCConn()) res, err := cl.OpenFiles(context.Background(), queryParams) if err != nil { + if rpc.IsConnectionError(err) { + log.Warn().Err(err).Msg("Connection error during stray refresh, attempting failover") + s.wallet.Failover() + } return err } diff --git a/strays/types.go b/strays/types.go index 4f88a38..0405dc9 100644 --- a/strays/types.go +++ b/strays/types.go @@ -4,6 +4,7 @@ import ( "math/rand" "time" + "github.com/JackalLabs/sequoia/rpc" "github.com/desmos-labs/cosmos-go-wallet/wallet" "github.com/jackalLabs/canine-chain/v5/x/storage/types" ) @@ -17,7 +18,7 @@ type Hand struct { type StrayManager struct { strays []*types.UnifiedFile - wallet *wallet.Wallet + wallet *rpc.FailoverClient lastSize uint64 rand *rand.Rand interval time.Duration