diff --git a/go.mod b/go.mod index a0c9a34..dc1d779 100644 --- a/go.mod +++ b/go.mod @@ -8,18 +8,21 @@ require ( github.com/0xSplits/workit v0.7.1 github.com/google/gofuzz v1.2.0 github.com/gorilla/mux v1.8.1 + github.com/jackc/pgx/v5 v5.7.6 github.com/joho/godotenv v1.5.1 github.com/kelseyhightower/envconfig v1.4.0 github.com/prometheus/client_golang v1.23.2 github.com/rs/cors v1.11.1 github.com/spf13/cobra v1.10.1 github.com/twitchtv/twirp v8.1.3+incompatible + github.com/xh3b4sd/choreo v0.6.0 github.com/xh3b4sd/logger v0.11.1 github.com/xh3b4sd/tracer v1.0.0 go.opentelemetry.io/otel/metric v1.38.0 ) require ( + cirello.io/pglock v1.16.1 github.com/0xSplits/indexingo v0.1.0 github.com/beorn7/perks v1.0.1 // indirect github.com/cespare/xxhash/v2 v2.3.0 // indirect @@ -28,7 +31,11 @@ require ( github.com/go-logr/stdr v1.2.2 // indirect github.com/google/uuid v1.6.0 // indirect github.com/inconshreveable/mousetrap v1.1.0 // indirect + github.com/jackc/pgpassfile v1.0.0 // indirect + github.com/jackc/pgservicefile v0.0.0-20240606120523-5a60cdf6a761 // indirect + github.com/jackc/puddle/v2 v2.2.2 // indirect github.com/klauspost/compress v1.18.1 // indirect + github.com/lib/pq v1.10.9 // indirect github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect github.com/prometheus/client_model v0.6.2 // indirect github.com/prometheus/common v0.67.2 // indirect @@ -42,6 +49,9 @@ require ( go.opentelemetry.io/otel/sdk/metric v1.38.0 // indirect go.opentelemetry.io/otel/trace v1.38.0 // indirect go.yaml.in/yaml/v2 v2.4.3 // indirect + golang.org/x/crypto v0.38.0 // indirect + golang.org/x/sync v0.17.0 // indirect golang.org/x/sys v0.37.0 // indirect + golang.org/x/text v0.30.0 // indirect google.golang.org/protobuf v1.36.10 // indirect ) diff --git a/go.sum b/go.sum index 98af6c3..e473032 100644 --- a/go.sum +++ b/go.sum @@ -1,3 +1,5 @@ +cirello.io/pglock v1.16.1 h1:NKVe133vhtJMnzdL0UlCtyEb9rm3vSnT2/R7MNu6mfA= +cirello.io/pglock v1.16.1/go.mod h1:aCm2qCmp8jc2cNSFUW+n94Ie6ZGXmauBBSXVkWSKD/M= github.com/0xSplits/indexingo v0.1.0 h1:t4Bl+LpUTG3tpNAJG8JIMTAr4UOgo+JhanBwBBaRJhs= github.com/0xSplits/indexingo v0.1.0/go.mod h1:nS0gqM7Y3n++v/5OU2dTtjo5u1pAOKLNnk3KuX2Ic9g= github.com/0xSplits/otelgo v0.1.2 h1:QjbUMNNQcUsnkOmZ35bc3Fbhz7u0PA611LYrh4aOpPk= @@ -6,6 +8,8 @@ github.com/0xSplits/pulsargocode v0.1.0 h1:wWHCxp+IfxZIJ6mbxnG2hG+R8Hrpu15lPbI6n github.com/0xSplits/pulsargocode v0.1.0/go.mod h1:lt3sg7Pvgam0AEOINEdoyHO2z80n4W7tm6XuDmm1kaA= github.com/0xSplits/workit v0.7.1 h1:ER2ITiqmahwoN94oz9XCUvdUJLnYB5yZyA9y8FAOwug= github.com/0xSplits/workit v0.7.1/go.mod h1:Ylbvjp0puOXWNVzYtI0cxFO37hqPNy/prwHIWau2gpQ= +github.com/DATA-DOG/go-sqlmock v1.5.2 h1:OcvFkGmslmlZibjAjaHm3L//6LiuBgolP7OputlJIzU= +github.com/DATA-DOG/go-sqlmock v1.5.2/go.mod h1:88MAG/4G7SMwSE3CeA0ZKzrT5CiOU3OJ+JlNzwDqpNU= github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM= github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw= github.com/cespare/xxhash/v2 v2.3.0 h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UFvs= @@ -13,6 +17,7 @@ github.com/cespare/xxhash/v2 v2.3.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XL github.com/coder/websocket v1.8.14 h1:9L0p0iKiNOibykf283eHkKUHHrpG7f65OE3BhhO7v9g= github.com/coder/websocket v1.8.14/go.mod h1:NX3SzP+inril6yawo5CQXx8+fk145lPDC6pumgx0mVg= github.com/cpuguy83/go-md2man/v2 v2.0.6/go.mod h1:oOW0eioCTA6cOiMLiUPZOpcVxMig6NIQQ7OS05n1F4g= +github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/go-logr/logr v1.2.2/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A= @@ -30,6 +35,14 @@ github.com/gorilla/mux v1.8.1 h1:TuBL49tXwgrFYWhqrNgrUNEY92u81SPhu7sTdzQEiWY= github.com/gorilla/mux v1.8.1/go.mod h1:AKf9I4AEqPTmMytcMc0KkNouC66V3BtZ4qD5fmWSiMQ= github.com/inconshreveable/mousetrap v1.1.0 h1:wN+x4NVGpMsO7ErUn/mUI3vEoE6Jt13X2s0bqwp9tc8= github.com/inconshreveable/mousetrap v1.1.0/go.mod h1:vpF70FUmC8bwa3OWnCshd2FqLfsEA9PFc4w1p2J65bw= +github.com/jackc/pgpassfile v1.0.0 h1:/6Hmqy13Ss2zCq62VdNG8tM1wchn8zjSGOBJ6icpsIM= +github.com/jackc/pgpassfile v1.0.0/go.mod h1:CEx0iS5ambNFdcRtxPj5JhEz+xB6uRky5eyVu/W2HEg= +github.com/jackc/pgservicefile v0.0.0-20240606120523-5a60cdf6a761 h1:iCEnooe7UlwOQYpKFhBabPMi4aNAfoODPEFNiAnClxo= +github.com/jackc/pgservicefile v0.0.0-20240606120523-5a60cdf6a761/go.mod h1:5TJZWKEWniPve33vlWYSoGYefn3gLQRzjfDlhSJ9ZKM= +github.com/jackc/pgx/v5 v5.7.6 h1:rWQc5FwZSPX58r1OQmkuaNicxdmExaEz5A2DO2hUuTk= +github.com/jackc/pgx/v5 v5.7.6/go.mod h1:aruU7o91Tc2q2cFp5h4uP3f6ztExVpyVv88Xl/8Vl8M= +github.com/jackc/puddle/v2 v2.2.2 h1:PR8nw+E/1w0GLuRFSmiioY6UooMp6KJv0/61nB7icHo= +github.com/jackc/puddle/v2 v2.2.2/go.mod h1:vriiEXHvEE654aYKXXjOvZM39qJ0q+azkZFrfEOc3H4= github.com/joho/godotenv v1.5.1 h1:7eLL/+HRGLY0ldzfGMeQkb7vMd0as4CfYvUVzLqw0N0= github.com/joho/godotenv v1.5.1/go.mod h1:f4LDr5Voq0i2e/R5DDNOoa2zzDfwtkZa6DnEwAbqwq4= github.com/kelseyhightower/envconfig v1.4.0 h1:Im6hONhd3pLkfDFsbRgu68RDNkGF1r3dvMUtDTo2cv8= @@ -42,6 +55,8 @@ github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= github.com/kylelemons/godebug v1.1.0 h1:RPNrshWIDI6G2gRW9EHilWtl7Z6Sb1BR0xunSBf0SNc= github.com/kylelemons/godebug v1.1.0/go.mod h1:9/0rRGxNHcop5bhtWyNeEfOS8JIWk580+fNqagV/RAw= +github.com/lib/pq v1.10.9 h1:YXG7RB+JIjhP29X+OtkiDnYaXQwpS4JEWq7dtCCRUEw= +github.com/lib/pq v1.10.9/go.mod h1:AlVN5x4E4T544tWzH6hKfbfQvm3HdbOxrmggDNAPY9o= github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 h1:C3w9PqII01/Oq1c1nUAm88MOHcQC9l5mIlSMApZMrHA= github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822/go.mod h1:+n7T8mK8HuQTcFwEeznm/DIxMOiR9yIdICNftLE1DvQ= github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= @@ -68,10 +83,15 @@ github.com/spf13/cobra v1.10.1/go.mod h1:7SmJGaTHFVBY0jW4NXGluQoLvhqFQM+6XSKD+P4 github.com/spf13/pflag v1.0.9/go.mod h1:McXfInJRrz4CZXVZOBLb0bTZqETkiAhM9Iw0y3An2Bg= github.com/spf13/pflag v1.0.10 h1:4EBh2KAYBwaONj6b2Ye1GiHfwjqyROoF4RwYO+vPwFk= github.com/spf13/pflag v1.0.10/go.mod h1:McXfInJRrz4CZXVZOBLb0bTZqETkiAhM9Iw0y3An2Bg= +github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= +github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/stretchr/testify v1.11.1 h1:7s2iGBzp5EwR7/aIZr8ao5+dra3wiQyKjjFuvgVKu7U= github.com/stretchr/testify v1.11.1/go.mod h1:wZwfW3scLgRK+23gO65QZefKpKQRnfz6sD981Nm4B6U= github.com/twitchtv/twirp v8.1.3+incompatible h1:+F4TdErPgSUbMZMwp13Q/KgDVuI7HJXP61mNV3/7iuU= github.com/twitchtv/twirp v8.1.3+incompatible/go.mod h1:RRJoFSAmTEh2weEqWtpPE3vFK5YBhA6bqp2l1kfCC5A= +github.com/xh3b4sd/choreo v0.6.0 h1:x6B/zPRwhwqaSt6eQ9bMom+9ROVGf728946hlcLwMjA= +github.com/xh3b4sd/choreo v0.6.0/go.mod h1:FSmCDJYDGChWwn06JVitOrbFoSie8G5G34Zt4k16NBQ= github.com/xh3b4sd/logger v0.11.1 h1:aTK4ygh7aPv1jq54J8bx+zjH6A8RYdkKAgOZYw867C0= github.com/xh3b4sd/logger v0.11.1/go.mod h1:MC7Dp7RC3tZ182KlvSulGcRQVX/D2l+WlCSGLF1mvO8= github.com/xh3b4sd/tracer v1.0.0 h1:mr9uYCx/Ry2w1wdJz0V0Kq71/KeF+hUQjbZQJCxm3Zw= @@ -94,12 +114,19 @@ go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto= go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE= go.yaml.in/yaml/v2 v2.4.3 h1:6gvOSjQoTB3vt1l+CU+tSyi/HOjfOjRLJ4YwYZGwRO0= go.yaml.in/yaml/v2 v2.4.3/go.mod h1:zSxWcmIDjOzPXpjlTTbAsKokqkDNAVtZO0WOMiT90s8= +golang.org/x/crypto v0.38.0 h1:jt+WWG8IZlBnVbomuhg2Mdq0+BBQaHbtqHEFEigjUV8= +golang.org/x/crypto v0.38.0/go.mod h1:MvrbAqul58NNYPKnOra203SB9vpuZW0e+RRZV+Ggqjw= +golang.org/x/sync v0.17.0 h1:l60nONMj9l5drqw6jlhIELNv9I0A4OFgRsG9k2oT9Ug= +golang.org/x/sync v0.17.0/go.mod h1:9KTHXmSnoGruLpwFjVSX0lNNA75CykiMECbovNTZqGI= golang.org/x/sys v0.37.0 h1:fdNQudmxPjkdUTPnLn5mdQv7Zwvbvpaxqs831goi9kQ= golang.org/x/sys v0.37.0/go.mod h1:OgkHotnGiDImocRcuBABYBEXf8A9a87e/uXjp9XT3ks= +golang.org/x/text v0.30.0 h1:yznKA/E9zq54KzlzBEAWn1NXSQ8DIp/NYMy88xJjl4k= +golang.org/x/text v0.30.0/go.mod h1:yDdHFIX9t+tORqspjENWgzaCVXgk0yYnYuSZ8UzzBVM= google.golang.org/protobuf v1.36.10 h1:AYd7cD/uASjIL6Q9LiTjz8JLcrh/88q5UObnmY3aOOE= google.golang.org/protobuf v1.36.10/go.mod h1:HTf+CrKn2C3g5S8VImy6tdcUvCska2kB7j23XfzDpco= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk= gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q= +gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/pkg/daemon/daemon.go b/pkg/daemon/daemon.go index d8b2908..b672e35 100644 --- a/pkg/daemon/daemon.go +++ b/pkg/daemon/daemon.go @@ -1,6 +1,10 @@ package daemon import ( + "github.com/0xSplits/indexingo/client" + "github.com/0xSplits/indexingo/filters" + "github.com/0xSplits/indexingo/pipelines" + "github.com/0xSplits/indexingo/transformations" "github.com/0xSplits/otelgo/recorder" "github.com/0xSplits/pulsar/pkg/envvar" "github.com/0xSplits/pulsar/pkg/runtime" @@ -14,8 +18,11 @@ type Config struct { type Daemon struct { env envvar.Env + fil *filters.Filters log logger.Interface met metric.Meter + pip *pipelines.Pipelines + tra *transformations.Transformations } func New(c Config) *Daemon { @@ -35,6 +42,34 @@ func New(c Config) *Daemon { }) } + var cli client.Interface + { + cli = client.New(client.Config{ + Key: c.Env.IndexingcoApiKey, + }) + } + + var fil *filters.Filters + { + fil = filters.New(filters.Config{ + Cli: cli, + }) + } + + var tra *transformations.Transformations + { + tra = transformations.New(transformations.Config{ + Cli: cli, + }) + } + + var pip *pipelines.Pipelines + { + pip = pipelines.New(pipelines.Config{ + Cli: cli, + }) + } + log.Log( "level", "info", "message", "daemon is launching procs", @@ -43,7 +78,10 @@ func New(c Config) *Daemon { return &Daemon{ env: c.Env, + fil: fil, log: log, met: met, + pip: pip, + tra: tra, } } diff --git a/pkg/daemon/ensure.go b/pkg/daemon/ensure.go index b63963e..2e13e21 100644 --- a/pkg/daemon/ensure.go +++ b/pkg/daemon/ensure.go @@ -3,61 +3,11 @@ package daemon import ( "fmt" - "github.com/0xSplits/indexingo/client" - "github.com/0xSplits/indexingo/filters" "github.com/0xSplits/indexingo/pipelines" - "github.com/0xSplits/indexingo/transformations" "github.com/xh3b4sd/tracer" ) func (d *Daemon) Ensure() error { - var cli client.Interface - { - cli = client.New(client.Config{ - Key: d.env.IndexingcoApiKey, - }) - } - - //--------------------------------------------------------------------------// - - var fil *filters.Filters - { - fil = filters.New(filters.Config{ - Cli: cli, - }) - } - - var tra *transformations.Transformations - { - tra = transformations.New(transformations.Config{ - Cli: cli, - }) - } - - var pip *pipelines.Pipelines - { - pip = pipelines.New(pipelines.Config{ - Cli: cli, - }) - } - - //--------------------------------------------------------------------------// - - { - res, err := fil.AddValues("test-filter", []string{"0xb7f5bf799fb265657c628ef4a13f90f83a3a616a"}) - if err != nil { - tracer.Panic(tracer.Mask(err)) - } - - d.log.Log( - "level", "info", - "message", "filter creation", - "status", res.Message, - ) - } - - //--------------------------------------------------------------------------// - // // curl -s --location --globoff 'https://app.indexing.co/dw/transformations/test?network=base&beat=37740907&filter=xh3b4sd-test-filter&filterKeys[0]=to&filterKeys[1]=from' --header "x-api-key: $INDEXINGCO_API_KEY" --form 'code="function traByBlock(blo) { const tra = templates.tokenTransfers(blo); return tra.map(x => ({ network: blo._network, chainId: utils.evmChainToId(blo._network), blockHash: blo.hash, blockNumber: blo.number, timestamp: utils.blockToTimestamp(blo), ...x, })); }"' | jq . // @@ -81,7 +31,7 @@ func (d *Daemon) Ensure() error { } { - res, err := tra.CreateTransformation("test-transformation", cod) + res, err := d.tra.CreateTransformation(fmt.Sprintf("%s-transformation", d.env.Environment), cod) if err != nil { tracer.Panic(tracer.Mask(err)) } @@ -98,9 +48,9 @@ func (d *Daemon) Ensure() error { var cpr pipelines.CreatePipelineRequest { cpr = pipelines.CreatePipelineRequest{ - Name: "test-pipeline", - Transformation: "test-transformation", - Filter: "test-filter", + Name: fmt.Sprintf("%s-pipeline", d.env.Environment), + Transformation: fmt.Sprintf("%s-transformation", d.env.Environment), + Filter: fmt.Sprintf("%s-filter", d.env.Environment), FilterKeys: []string{"from", "to"}, Networks: []string{"ethereum", "base"}, Enabled: true, @@ -117,7 +67,7 @@ func (d *Daemon) Ensure() error { } { - res, err := pip.CreatePipeline(cpr) + res, err := d.pip.CreatePipeline(cpr) if err != nil { tracer.Panic(tracer.Mask(err)) } @@ -141,7 +91,7 @@ func (d *Daemon) Ensure() error { // } // { - // res, err := pip.BackfillPipeline("test-pipeline", bpr) + // res, err := d.pip.BackfillPipeline(fmt.Sprintf("%s-pipeline", d.env.Environment), bpr) // if err != nil { // tracer.Panic(tracer.Mask(err)) // } diff --git a/pkg/daemon/worker.go b/pkg/daemon/worker.go index aa91851..90d65c0 100644 --- a/pkg/daemon/worker.go +++ b/pkg/daemon/worker.go @@ -1,13 +1,31 @@ package daemon import ( - "github.com/0xSplits/pulsar/pkg/worker/handler/image" + "fmt" + + "github.com/0xSplits/pulsar/pkg/worker/handler/pipeline" + "github.com/0xSplits/pulsar/pkg/worker/handler/pipeline/accounts" + "github.com/0xSplits/pulsar/pkg/worker/handler/pipeline/accounts/client" "github.com/0xSplits/workit/handler" "github.com/0xSplits/workit/registry" "github.com/0xSplits/workit/worker/parallel" ) func (d *Daemon) Worker() *parallel.Worker { + var cli *client.Client + { + cli = client.New(client.Config{ + Url: fmt.Sprintf("https://server.%s.splits.org", d.env.Environment), + }) + } + + var acc *accounts.Accounts + { + acc = accounts.New(accounts.Config{ + Cli: cli, + }) + } + var reg *registry.Registry { reg = registry.New(registry.Config{ @@ -21,7 +39,7 @@ func (d *Daemon) Worker() *parallel.Worker { { par = parallel.New(parallel.Config{ Han: []handler.Cooler{ - image.New(image.Config{Log: d.log}), + pipeline.New(pipeline.Config{Acc: acc, Env: d.env, Fil: d.fil, Log: d.log}), }, Log: d.log, Reg: reg, diff --git a/pkg/envvar/envvar.go b/pkg/envvar/envvar.go index 14ac596..fe9f949 100644 --- a/pkg/envvar/envvar.go +++ b/pkg/envvar/envvar.go @@ -12,6 +12,7 @@ import ( type Env struct { Environment string `split_words:"true" required:"true"` IndexingcoApiKey string `split_words:"true" required:"true"` + PostgresUrl string `split_words:"true" required:"true"` WebsocketSecret string `split_words:"true" required:"true"` HttpHost string `split_words:"true" required:"true"` diff --git a/pkg/worker/handler/image/cooler.go b/pkg/worker/handler/image/cooler.go deleted file mode 100644 index be9e373..0000000 --- a/pkg/worker/handler/image/cooler.go +++ /dev/null @@ -1,9 +0,0 @@ -package image - -import ( - "time" -) - -func (h *Handler) Cooler() time.Duration { - return 10 * time.Second -} diff --git a/pkg/worker/handler/image/ensure.go b/pkg/worker/handler/image/ensure.go deleted file mode 100644 index 79c9208..0000000 --- a/pkg/worker/handler/image/ensure.go +++ /dev/null @@ -1,6 +0,0 @@ -package image - -func (h *Handler) Ensure() error { - // TODO - return nil -} diff --git a/pkg/worker/handler/image/handler.go b/pkg/worker/handler/image/handler.go deleted file mode 100644 index bda9c46..0000000 --- a/pkg/worker/handler/image/handler.go +++ /dev/null @@ -1,26 +0,0 @@ -package image - -import ( - "fmt" - - "github.com/xh3b4sd/logger" - "github.com/xh3b4sd/tracer" -) - -type Config struct { - Log logger.Interface -} - -type Handler struct { - log logger.Interface -} - -func New(c Config) *Handler { - if c.Log == nil { - tracer.Panic(tracer.Mask(fmt.Errorf("%T.Log must not be empty", c))) - } - - return &Handler{ - log: c.Log, - } -} diff --git a/pkg/worker/handler/pipeline/accounts/accounts.go b/pkg/worker/handler/pipeline/accounts/accounts.go new file mode 100644 index 0000000..8bd2709 --- /dev/null +++ b/pkg/worker/handler/pipeline/accounts/accounts.go @@ -0,0 +1,26 @@ +package accounts + +import ( + "fmt" + + "github.com/0xSplits/pulsar/pkg/worker/handler/pipeline/accounts/client" + "github.com/xh3b4sd/tracer" +) + +type Config struct { + Cli client.Interface +} + +type Accounts struct { + cli client.Interface +} + +func New(c Config) *Accounts { + if c.Cli == nil { + tracer.Panic(tracer.Mask(fmt.Errorf("%T.Cli must not be empty", c))) + } + + return &Accounts{ + cli: c.Cli, + } +} diff --git a/pkg/worker/handler/pipeline/accounts/client/client.go b/pkg/worker/handler/pipeline/accounts/client/client.go new file mode 100644 index 0000000..6b0e99c --- /dev/null +++ b/pkg/worker/handler/pipeline/accounts/client/client.go @@ -0,0 +1,37 @@ +package client + +import ( + "fmt" + "net/http" + "strings" + + "github.com/xh3b4sd/tracer" +) + +type Config struct { + // Cli is the optional HTTP client being used to handle requests and + // responses. Defaults to http.DefaultClient. + Cli *http.Client + + // Url is the required API endpoint, e.g. "https://server.testing.splits.org". + Url string +} + +type Client struct { + cli *http.Client + url string +} + +func New(c Config) *Client { + if c.Cli == nil { + c.Cli = http.DefaultClient + } + if c.Url == "" { + tracer.Panic(tracer.Mask(fmt.Errorf("%T.Url must not be empty", c))) + } + + return &Client{ + cli: c.Cli, + url: strings.TrimSuffix(strings.TrimSpace(c.Url), "/"), + } +} diff --git a/pkg/worker/handler/pipeline/accounts/client/endpoint.go b/pkg/worker/handler/pipeline/accounts/client/endpoint.go new file mode 100644 index 0000000..9899e68 --- /dev/null +++ b/pkg/worker/handler/pipeline/accounts/client/endpoint.go @@ -0,0 +1,5 @@ +package client + +func (c *Client) Endpoint() string { + return c.url +} diff --git a/pkg/worker/handler/pipeline/accounts/client/error.go b/pkg/worker/handler/pipeline/accounts/client/error.go new file mode 100644 index 0000000..bd92a58 --- /dev/null +++ b/pkg/worker/handler/pipeline/accounts/client/error.go @@ -0,0 +1,15 @@ +package client + +import ( + "errors" + + "github.com/xh3b4sd/tracer" +) + +func IsStatusCode(err error) bool { + return errors.Is(err, statusCodeError) +} + +var statusCodeError = &tracer.Error{ + Description: "The request expects a response with a status code of the 200 family. The response status code was >= 300. Therefore the request failed.", +} diff --git a/pkg/worker/handler/pipeline/accounts/client/interface.go b/pkg/worker/handler/pipeline/accounts/client/interface.go new file mode 100644 index 0000000..da6e47d --- /dev/null +++ b/pkg/worker/handler/pipeline/accounts/client/interface.go @@ -0,0 +1,11 @@ +package client + +import "net/http" + +type Interface interface { + // Endpoint returns the configured base URL for the Neighbourhood API. + Endpoint() string + + // Request performs an HTTP request to the Indexing Co Neighbourhood API. + Request(*http.Request, any) error +} diff --git a/pkg/worker/handler/pipeline/accounts/client/request.go b/pkg/worker/handler/pipeline/accounts/client/request.go new file mode 100644 index 0000000..901bb23 --- /dev/null +++ b/pkg/worker/handler/pipeline/accounts/client/request.go @@ -0,0 +1,79 @@ +package client + +import ( + "context" + "encoding/json" + "io" + "net/http" + "time" + + "github.com/xh3b4sd/tracer" +) + +func (c *Client) Request(req *http.Request, val any) error { + var err error + + // Ensure the request does not block forever, and then make sure to call the + // cancel function at the end of request(). + + var ctx context.Context + var can context.CancelFunc + { + ctx, can = context.WithTimeout(context.Background(), 15*time.Second) + } + + { + defer can() + } + + { + req = req.WithContext(ctx) + } + + // // Ensure the API key header so our requests are authenticated. + + // { + // req.Header.Set("x-api-key", c.key) + // } + + // Do the actual HTTP request to the Indexing Co API. + + var res *http.Response + { + res, err = c.cli.Do(req) + if err != nil { + return tracer.Mask(err) + } + } + + { + defer res.Body.Close() // nolint:errcheck + } + + // Handle unwanted status codes using a typed error and wrap any failures with + // additional context. + + if res.StatusCode >= 300 { + byt, err := io.ReadAll(res.Body) + if err != nil { + return tracer.Mask(err) + } + + return tracer.Mask(statusCodeError, + tracer.Context{Key: "code", Value: res.Status}, + tracer.Context{Key: "body", Value: string(byt)}, + ) + } + + // At this point we got a valid response back and can simply decode the + // response body into the provided pointer. + + { + err = json.NewDecoder(res.Body).Decode(val) + if err != nil { + return tracer.Mask(err) + } + } + + return nil +} diff --git a/pkg/worker/handler/pipeline/accounts/search.go b/pkg/worker/handler/pipeline/accounts/search.go new file mode 100644 index 0000000..c449959 --- /dev/null +++ b/pkg/worker/handler/pipeline/accounts/search.go @@ -0,0 +1,53 @@ +package accounts + +import ( + "net/http" + "net/url" + "strconv" + + "github.com/xh3b4sd/tracer" +) + +type SearchResponse struct { + Data []SearchResponseObject `json:"data"` + Next *int64 `json:"next"` +} + +type SearchResponseObject struct { + ID string `json:"id"` + Address string `json:"address"` + Created int64 `json:"created"` +} + +func (a *Accounts) Search(cur int64, lim int64) (SearchResponse, error) { + var err error + + var qry url.Values + { + qry.Set("cursor", strconv.FormatInt(cur, 10)) + qry.Set("limit", strconv.FormatInt(lim, 10)) + } + + var pat string + { + pat = a.cli.Endpoint() + "/public/v1/accounts/search?" + qry.Encode() + } + + var req *http.Request + { + req, err = http.NewRequest(http.MethodGet, pat, nil) + if err != nil { + return SearchResponse{}, tracer.Mask(err) + } + } + + var res SearchResponse + { + err = a.cli.Request(req, &res) + if err != nil { + return SearchResponse{}, tracer.Mask(err) + } + } + + return res, nil +} diff --git a/pkg/worker/handler/image/active.go b/pkg/worker/handler/pipeline/active.go similarity index 86% rename from pkg/worker/handler/image/active.go rename to pkg/worker/handler/pipeline/active.go index 1d4fbb4..e9a7a21 100644 --- a/pkg/worker/handler/image/active.go +++ b/pkg/worker/handler/pipeline/active.go @@ -1,4 +1,4 @@ -package image +package pipeline // Active defines this worker handler to always be executed. func (h *Handler) Active() bool { diff --git a/pkg/worker/handler/pipeline/cooler.go b/pkg/worker/handler/pipeline/cooler.go new file mode 100644 index 0000000..1d585ad --- /dev/null +++ b/pkg/worker/handler/pipeline/cooler.go @@ -0,0 +1,15 @@ +package pipeline + +import ( + "time" +) + +// Cooler is configured to return a dynamically adjusted wait duration for this +// worker handler to sleep before running again. The introduced jitter has the +// purpose of spreading out the same type of work across time, so that we ease +// the load on our dependency APIs, here Postgres, and effectively try to +// prevent unneccessary contention. E.g. a jitter of 20% applied to 10s results +// in execution variation of +-2s. +func (h *Handler) Cooler() time.Duration { + return h.jit.Percent(10 * time.Second) +} diff --git a/pkg/worker/handler/pipeline/cursor/create.go b/pkg/worker/handler/pipeline/cursor/create.go new file mode 100644 index 0000000..7abfb0d --- /dev/null +++ b/pkg/worker/handler/pipeline/cursor/create.go @@ -0,0 +1,52 @@ +package cursor + +import ( + "context" + "time" + + "github.com/xh3b4sd/tracer" +) + +const ( + create = ` + -- + -- create the database table if it does not exists + -- + CREATE TABLE IF NOT EXISTS accounts_pagination_cursor ( + id boolean PRIMARY KEY DEFAULT true, + cursor timestamptz NOT NULL, + updated timestamptz NOT NULL DEFAULT now(), + CHECK (id) + ); + + -- + -- create the default cursor if it does not exists + -- + INSERT INTO accounts_pagination_cursor (id, cursor) + VALUES (true, '2020-01-01T00:00:00.000Z') + ON CONFLICT (id) DO NOTHING; + ` +) + +func (c *Cursor) Create() error { + var err error + + var ctx context.Context + var can context.CancelFunc + { + ctx, can = context.WithTimeout(context.Background(), 3*time.Second) + } + + { + defer can() + } + + { + _, err = c.poo.Exec(ctx, create) + if err != nil { + return tracer.Mask(err) + } + } + + return nil +} diff --git a/pkg/worker/handler/pipeline/cursor/cursor.go b/pkg/worker/handler/pipeline/cursor/cursor.go new file mode 100644 index 0000000..d19e309 --- /dev/null +++ b/pkg/worker/handler/pipeline/cursor/cursor.go @@ -0,0 +1,53 @@ +package cursor + +import ( + "context" + "fmt" + + "github.com/jackc/pgx/v5/pgxpool" + "github.com/xh3b4sd/logger" + "github.com/xh3b4sd/tracer" +) + +type Config struct { + // Dsn is the data source name for the underlying Postgres instance in order + // to establish a new database connection. + Dsn string + // Log is the standard logger interface to emit useful log messages at + // runtime. + Log logger.Interface +} + +type Cursor struct { + log logger.Interface + poo *pgxpool.Pool +} + +func New(c Config) *Cursor { + if c.Dsn == "" { + tracer.Panic(tracer.Mask(fmt.Errorf("%T.Dsn must not be empty", c))) + } + if c.Log == nil { + tracer.Panic(tracer.Mask(fmt.Errorf("%T.Log must not be empty", c))) + } + + var err error + + // The connection pool set up here will be used for executing Postgres + // commands related to our actual business logic. In other words, this + // connection pool is unrelated to the database connection managed for the + // distributed lock. + + var poo *pgxpool.Pool + { + poo, err = pgxpool.New(context.Background(), c.Dsn) + if err != nil { + tracer.Panic(tracer.Mask(err)) + } + } + + return &Cursor{ + log: c.Log, + poo: poo, + } +} diff --git a/pkg/worker/handler/pipeline/cursor/search.go b/pkg/worker/handler/pipeline/cursor/search.go new file mode 100644 index 0000000..d017726 --- /dev/null +++ b/pkg/worker/handler/pipeline/cursor/search.go @@ -0,0 +1,43 @@ +package cursor + +import ( + "context" + "time" + + "github.com/xh3b4sd/tracer" +) + +const ( + search = ` + -- + -- search the current pagination cursor + -- + SELECT cursor + FROM accounts_pagination_cursor + WHERE id = true + ` +) + +func (c *Cursor) Search() (time.Time, error) { + var err error + + var ctx context.Context + var can context.CancelFunc + { + ctx, can = context.WithTimeout(context.Background(), 3*time.Second) + } + + { + defer can() + } + + var cur time.Time + { + err = c.poo.QueryRow(ctx, search).Scan(&cur) + if err != nil { + return time.Time{}, tracer.Mask(err) + } + } + + return cur, nil +} diff --git a/pkg/worker/handler/pipeline/cursor/update.go b/pkg/worker/handler/pipeline/cursor/update.go new file mode 100644 index 0000000..c762c88 --- /dev/null +++ b/pkg/worker/handler/pipeline/cursor/update.go @@ -0,0 +1,44 @@ +package cursor + +import ( + "context" + "time" + + "github.com/xh3b4sd/tracer" +) + +const ( + update = ` + -- + -- update the pagination cursor to set its new version + -- + UPDATE accounts_pagination_cursor + SET + cursor = $1, + updated = now() + WHERE id = true + ` +) + +func (c *Cursor) Update(cur time.Time) error { + var err error + + var ctx context.Context + var can context.CancelFunc + { + ctx, can = context.WithTimeout(context.Background(), 3*time.Second) + } + + { + defer can() + } + + { + _, err = c.poo.Exec(ctx, update, cur) + if err != nil { + return tracer.Mask(err) + } + } + + return nil +} diff --git a/pkg/worker/handler/pipeline/ensure.go b/pkg/worker/handler/pipeline/ensure.go new file mode 100644 index 0000000..e4f32b8 --- /dev/null +++ b/pkg/worker/handler/pipeline/ensure.go @@ -0,0 +1,134 @@ +package pipeline + +import ( + "fmt" + "strconv" + "time" + + "cirello.io/pglock" + "github.com/0xSplits/pulsar/pkg/worker/handler/pipeline/accounts" + "github.com/xh3b4sd/tracer" +) + +const ( + // limit is the maximum amount of wallet addresses to search for at once. + limit = 10 +) + +func (h *Handler) Ensure() error { + var err error + + var loc *pglock.Lock + { + loc, err = h.loc.Acquire() + if err != nil { + return tracer.Mask(err) + } + } + + if loc == nil { + return nil // lock not acquired + } + + h.log.Log( + "level", "info", + "message", "acquired distributed lock", + "owner", loc.Owner(), + "version", strconv.FormatInt(loc.RecordVersionNumber(), 10), + ) + + { + defer h.loc.Release(loc) // nolint:errcheck + } + + var cur time.Time + { + cur, err = h.cur.Search() + if err != nil { + return tracer.Mask(err) + } + } + + h.log.Log( + "level", "info", + "message", "searched pagination cursor", + "cursor", strconv.FormatInt(cur.Unix(), 10), + ) + + var res accounts.SearchResponse + { + res, err = h.acc.Search(cur.Unix(), limit) + if err != nil { + return tracer.Mask(err) + } + } + + h.log.Log( + "level", "info", + "message", "searched account addresses", + "amount", strconv.Itoa(len(res.Data)), + ) + + // Note that we do not update the cursor in case of an empty search result. + // That means we are searching for accounts with the very same cursor until we + // find an actual non-empty result again. + + if len(res.Data) == 0 { + return nil + } + + var add []string + for _, x := range res.Data { + add = append(add, x.Address) + } + + // Adding values to the filter is idempotent as per the underlying indexing + // provider Indexing Co. It is therefore not necessary to de-duplicate the + // results received from the accounts/search endpoint. + + { + _, err = h.fil.AddValues(fmt.Sprintf("%s-filter", h.env.Environment), add) + if err != nil { + return tracer.Mask(err) + } + } + + h.log.Log( + "level", "info", + "message", "updated pipeline filter", + "amount", strconv.Itoa(len(res.Data)), + ) + + // Given the next page, set the next cursor to this next page. If there is no + // next page, then we are at the end of the line. Given that we have accounts + // on that last page, set the next cursor to the created timestamp of this + // very last result object. If there is no next page, and if there is no data, + // then return early as to not update the next cursor, and keep the current + // cursor in place for the next iteration. This last case should never happen + // if the accounts/search API works properly. + + var nxt time.Time + if res.Next != nil { + nxt = time.Unix(*res.Next, 0) + } else if len(res.Data) != 0 { + nxt = time.Unix(res.Data[len(res.Data)-1].Created, 0) + } else { + return nil + } + + { + err = h.cur.Update(nxt) + if err != nil { + return tracer.Mask(err) + } + } + + h.log.Log( + "level", "info", + "message", "updated pagination cursor", + "next", strconv.FormatInt(nxt.Unix(), 10), + "delta", strconv.FormatInt(nxt.Unix()-cur.Unix(), 10), + ) + + return nil +} diff --git a/pkg/worker/handler/pipeline/handler.go b/pkg/worker/handler/pipeline/handler.go new file mode 100644 index 0000000..72e4127 --- /dev/null +++ b/pkg/worker/handler/pipeline/handler.go @@ -0,0 +1,109 @@ +package pipeline + +import ( + "fmt" + "time" + + "github.com/0xSplits/indexingo/filters" + "github.com/0xSplits/pulsar/pkg/envvar" + "github.com/0xSplits/pulsar/pkg/worker/handler/pipeline/accounts" + "github.com/0xSplits/pulsar/pkg/worker/handler/pipeline/cursor" + "github.com/0xSplits/pulsar/pkg/worker/handler/pipeline/locker" + "github.com/xh3b4sd/choreo/jitter" + "github.com/xh3b4sd/logger" + "github.com/xh3b4sd/tracer" +) + +type Config struct { + // Acc is the accounts client to search iteratively for all relevant wallet + // addresses known to the system. + Acc *accounts.Accounts + // Env is the configuration injected into the process environment. + Env envvar.Env + // Fil is the pipeline filter kept up to date by this worker handler. + Fil *filters.Filters + // Log is the standard logger interface to emit useful log messages at + // runtime. + Log logger.Interface +} + +type Handler struct { + acc *accounts.Accounts + cur *cursor.Cursor + env envvar.Env + fil *filters.Filters + jit *jitter.Jitter[time.Duration] + loc *locker.Locker + log logger.Interface +} + +func New(c Config) *Handler { + if c.Acc == nil { + tracer.Panic(tracer.Mask(fmt.Errorf("%T.Acc must not be empty", c))) + } + if c.Env.Environment == "" { + tracer.Panic(tracer.Mask(fmt.Errorf("%T.Env must not be empty", c))) + } + if c.Fil == nil { + tracer.Panic(tracer.Mask(fmt.Errorf("%T.Fil must not be empty", c))) + } + if c.Log == nil { + tracer.Panic(tracer.Mask(fmt.Errorf("%T.Log must not be empty", c))) + } + + // Note that the setup below does only relate to the cursor specific database + // connection and table. This cursor client should ideally not live in this + // constructor, since it is an antipattern to allow third party dependencies + // to cause worker handler failure on creation seemingly randomly. For the + // time being, keeping this code here is just easy and convenient to get the + // first version going. + + var err error + + var cur *cursor.Cursor + { + cur = cursor.New(cursor.Config{ + Dsn: c.Env.PostgresUrl, + Log: c.Log, + }) + } + + { + err = cur.Create() + if err != nil { + tracer.Panic(tracer.Mask(err)) + } + } + + // Note that the setup below does only relate to the locker specific database + // connection and table. This locker client should ideally not live in this + // constructor, since it is an antipattern to allow third party dependencies + // to cause worker handler failure on creation seemingly randomly. For the + // time being, keeping this code here is just easy and convenient to get the + // first version going. + + var loc *locker.Locker + { + loc = locker.New(locker.Config{ + Dsn: c.Env.PostgresUrl, + Log: c.Log, + }) + } + + { + err = loc.Create() + if err != nil { + tracer.Panic(tracer.Mask(err)) + } + } + + return &Handler{ + acc: c.Acc, + cur: cur, + env: c.Env, + fil: c.Fil, + jit: jitter.New[time.Duration](jitter.Config{Per: 0.20}), + loc: loc, + log: c.Log, + } +} diff --git a/pkg/worker/handler/pipeline/locker/acquire.go b/pkg/worker/handler/pipeline/locker/acquire.go new file mode 100644 index 0000000..8ab588f --- /dev/null +++ b/pkg/worker/handler/pipeline/locker/acquire.go @@ -0,0 +1,36 @@ +package locker + +import ( + "context" + "errors" + "time" + + "cirello.io/pglock" + "github.com/xh3b4sd/tracer" +) + +func (l *Locker) Acquire() (*pglock.Lock, error) { + var err error + + var ctx context.Context + var can context.CancelFunc + { + ctx, can = context.WithTimeout(context.Background(), 3*time.Second) + } + + { + defer can() + } + + var loc *pglock.Lock + { + loc, err = l.cli.AcquireContext(ctx, "accounts") + if errors.Is(err, pglock.ErrNotAcquired) { + return nil, nil + } else if err != nil { + return nil, tracer.Mask(err) + } + } + + return loc, nil +} diff --git a/pkg/worker/handler/pipeline/locker/create.go b/pkg/worker/handler/pipeline/locker/create.go new file mode 100644 index 0000000..19074b3 --- /dev/null +++ b/pkg/worker/handler/pipeline/locker/create.go @@ -0,0 +1,21 @@ +package locker + +import ( + "github.com/xh3b4sd/tracer" +) + +// Create the database table required for the lock client to work properly, but +// only create the table if it does not already exist. So in case the database +// table is already setup properly, this call should not fail. +func (l *Locker) Create() error { + var err error + + { + err = l.cli.TryCreateTable() + if err != nil { + return tracer.Mask(err) + } + } + + return nil +} diff --git a/pkg/worker/handler/pipeline/locker/locker.go b/pkg/worker/handler/pipeline/locker/locker.go new file mode 100644 index 0000000..9bd5954 --- /dev/null +++ b/pkg/worker/handler/pipeline/locker/locker.go @@ -0,0 +1,65 @@ +package locker + +import ( + "database/sql" + "fmt" + "time" + + "cirello.io/pglock" + "github.com/xh3b4sd/logger" + "github.com/xh3b4sd/tracer" +) + +type Config struct { + // Dsn is the data source name for the underlying Postgres instance in order + // to establish a new database connection. + Dsn string + // Log is the standard logger interface to emit useful log messages at + // runtime. + Log logger.Interface +} + +type Locker struct { + cli *pglock.Client + log logger.Interface +} + +func New(c Config) *Locker { + if c.Dsn == "" { + tracer.Panic(tracer.Mask(fmt.Errorf("%T.Dsn must not be empty", c))) + } + if c.Log == nil { + tracer.Panic(tracer.Mask(fmt.Errorf("%T.Log must not be empty", c))) + } + + var err error + + var dat *sql.DB + { + dat, err = sql.Open("postgres", c.Dsn) + if err != nil { + tracer.Panic(tracer.Mask(err)) + } + } + + var opt []pglock.ClientOption + { + opt = []pglock.ClientOption{ + pglock.WithLeaseDuration(5 * time.Second), + pglock.WithHeartbeatFrequency(1 * time.Second), + } + } + + var cli *pglock.Client + { + cli, err = pglock.New(dat, opt...) + if err != nil { + tracer.Panic(tracer.Mask(err)) + } + } + + return &Locker{ + cli: cli, + log: c.Log, + } +} diff --git a/pkg/worker/handler/pipeline/locker/release.go b/pkg/worker/handler/pipeline/locker/release.go new file mode 100644 index 0000000..260a2cc --- /dev/null +++ b/pkg/worker/handler/pipeline/locker/release.go @@ -0,0 +1,32 @@ +package locker + +import ( + "context" + "time" + + "cirello.io/pglock" + "github.com/xh3b4sd/tracer" +) + +func (l *Locker) Release(loc *pglock.Lock) error { + var err error + + var ctx context.Context + var can context.CancelFunc + { + ctx, can = context.WithTimeout(context.Background(), 3*time.Second) + } + + { + defer can() + } + + { + err = l.cli.ReleaseContext(ctx, loc) + if err != nil { + return tracer.Mask(err) + } + } + + return nil +}