diff --git a/.trunk/configs/custom-words.txt b/.trunk/configs/custom-words.txt index f770a6888..10fa10809 100644 --- a/.trunk/configs/custom-words.txt +++ b/.trunk/configs/custom-words.txt @@ -172,3 +172,10 @@ webmvc xvzf yourpassword shaofstring +kairpc +rerr +werr +clientwrapper +extjava +winio +pipepath \ No newline at end of file diff --git a/Makefile b/Makefile index d0a0840c2..7a7798c08 100644 --- a/Makefile +++ b/Makefile @@ -36,11 +36,10 @@ build-kai-rpc-server: pyinstaller --clean build/build.spec # This will build both binaries for kai, the kai-analyzer-rpc and the kai-rpc-server -build-binaries: build-kai-analyzer build-kai-rpc-server +build-binaries: build-kai-analyzer # This will build the binaries in build-binaries and then move them to the correct location for run_demo.py set-binaries-demo: build-binaries - mv dist/kai-rpc-server example/analysis/kai-rpc-server mv kai_analyzer_rpc/kai-analyzer example/analysis/kai-analyzer-rpc # This will set up the demo run, with all the things that you need for run_demo.py diff --git a/kai/analyzer.py b/kai/analyzer.py index eceefe1e7..d97d4bfbb 100644 --- a/kai/analyzer.py +++ b/kai/analyzer.py @@ -115,7 +115,7 @@ async def start(self) -> None: cast(BufferedReader, self.rpc_server.stdout), cast(BufferedWriter, self.rpc_server.stdin), ), - request_timeout=4 * 60, + request_timeout=None, log=get_logger("kai.analyzer-rpc-client"), ) diff --git a/kai_analyzer_rpc/go.mod b/kai_analyzer_rpc/go.mod index 5e7fc0132..834d9f0a3 100644 --- a/kai_analyzer_rpc/go.mod +++ b/kai_analyzer_rpc/go.mod @@ -3,9 +3,11 @@ module github.com/konveyor/kai-analyzer go 1.23.1 require ( + github.com/Microsoft/go-winio v0.6.2 + github.com/cenkalti/rpc2 v1.0.4 github.com/go-logr/logr v1.4.2 - github.com/konveyor/analyzer-lsp v0.6.0-alpha.2.0.20250306150354-b062f3757592 - github.com/konveyor/analyzer-lsp/external-providers/java-external-provider v0.0.0-20250228162759-837d328174c3 + github.com/konveyor/analyzer-lsp v0.7.0-alpha.2.0.20250414173818-69f16ed24d65 + github.com/konveyor/analyzer-lsp/external-providers/java-external-provider v0.0.0-20250414173818-69f16ed24d65 go.opentelemetry.io/otel v1.35.0 go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp v1.34.0 go.opentelemetry.io/otel/sdk v1.35.0 @@ -23,9 +25,10 @@ require ( github.com/bufbuild/protocompile v0.10.0 // indirect github.com/cbroglie/mustache v1.4.0 // indirect github.com/cenkalti/backoff/v4 v4.3.0 // indirect - github.com/fsnotify/fsnotify v1.8.0 // indirect + github.com/cenkalti/hub v1.0.2 // indirect + github.com/fsnotify/fsnotify v1.9.0 // indirect github.com/go-logr/stdr v1.2.2 // indirect - github.com/golang-jwt/jwt/v5 v5.2.1 // indirect + github.com/golang-jwt/jwt/v5 v5.2.2 // indirect github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da // indirect github.com/golang/protobuf v1.5.4 // indirect github.com/google/uuid v1.6.0 // indirect @@ -35,7 +38,6 @@ require ( github.com/nxadm/tail v1.4.11 // indirect github.com/phayes/freeport v0.0.0-20220201140144-74d24b5ae9f5 // indirect github.com/shopspring/decimal v1.4.0 // indirect - github.com/sirupsen/logrus v1.9.3 // indirect github.com/swaggest/jsonschema-go v0.3.73 // indirect github.com/swaggest/openapi-go v0.2.57 // indirect github.com/swaggest/refl v1.3.1 // indirect @@ -45,16 +47,15 @@ require ( go.opentelemetry.io/otel/exporters/jaeger v1.17.0 // indirect go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.34.0 // indirect go.opentelemetry.io/otel/metric v1.35.0 // indirect - go.opentelemetry.io/otel/sdk/metric v1.34.0 // indirect go.opentelemetry.io/proto/otlp v1.5.0 // indirect - golang.org/x/net v0.37.0 // indirect - golang.org/x/oauth2 v0.25.0 // indirect - golang.org/x/sys v0.31.0 // indirect - golang.org/x/text v0.23.0 // indirect + golang.org/x/net v0.39.0 // indirect + golang.org/x/oauth2 v0.26.0 // indirect + golang.org/x/sys v0.32.0 // indirect + golang.org/x/text v0.24.0 // indirect google.golang.org/genproto/googleapis/api v0.0.0-20250115164207-1a7da9e5054f // indirect - google.golang.org/genproto/googleapis/rpc v0.0.0-20250303144028-a0af3efb3deb // indirect - google.golang.org/grpc v1.71.0 // indirect - google.golang.org/protobuf v1.36.5 // indirect + google.golang.org/genproto/googleapis/rpc v0.0.0-20250414145226-207652e42e2e // indirect + google.golang.org/grpc v1.71.1 // indirect + google.golang.org/protobuf v1.36.6 // indirect gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7 // indirect gopkg.in/yaml.v2 v2.4.0 // indirect diff --git a/kai_analyzer_rpc/go.sum b/kai_analyzer_rpc/go.sum index 08a7f7ecc..54da5da36 100644 --- a/kai_analyzer_rpc/go.sum +++ b/kai_analyzer_rpc/go.sum @@ -1,7 +1,7 @@ -cloud.google.com/go/compute/metadata v0.5.2 h1:UxK4uu/Tn+I3p2dYWTfiX4wva7aYlKixAHn3fyqngqo= -cloud.google.com/go/compute/metadata v0.5.2/go.mod h1:C66sj2AluDcIqakBq/M8lw8/ybHgOZqin2obFxa/E5k= cloud.google.com/go/compute/metadata v0.6.0 h1:A6hENjEsCDtC1k8byVsgwvVcioamEHvZ4j01OwKxG9I= cloud.google.com/go/compute/metadata v0.6.0/go.mod h1:FjyFAW1MW0C203CEOMDTu3Dk1FlqW3Rga40jzHL4hfg= +github.com/Microsoft/go-winio v0.6.2 h1:F2VQgta7ecxGYO8k3ZZz3RS8fVIXVxONVUPlNERoyfY= +github.com/Microsoft/go-winio v0.6.2/go.mod h1:yd8OoFMLzJbo9gZq8j5qaps8bJ9aShtEA8Ipt1oGCvU= github.com/PaesslerAG/gval v1.2.4 h1:rhX7MpjJlcxYwL2eTTYIOBUyEKZ+A96T9vQySWkVUiU= github.com/PaesslerAG/gval v1.2.4/go.mod h1:XRFLwvmkTEdYziLdaCeCa5ImcGVrfQbeNUbVR+C6xac= github.com/PaesslerAG/jsonpath v0.1.0 h1:gADYeifvlqK3R3i2cR5B4DGgxLXIPb3TRTH1mGi0jPI= @@ -25,28 +25,31 @@ github.com/cbroglie/mustache v1.4.0 h1:Azg0dVhxTml5me+7PsZ7WPrQq1Gkf3WApcHMjMprY github.com/cbroglie/mustache v1.4.0/go.mod h1:SS1FTIghy0sjse4DUVGV1k/40B1qE1XkD9DtDsHo9iM= github.com/cenkalti/backoff/v4 v4.3.0 h1:MyRJ/UdXutAwSAT+s3wNd7MfTIcy71VQueUuFK343L8= github.com/cenkalti/backoff/v4 v4.3.0/go.mod h1:Y3VNntkOUPxTVeUxJ/G5vcM//AlwfmyYozVcomhLiZE= +github.com/cenkalti/hub v1.0.2 h1:Nqv9TNaA9boeO2wQFW8o87BY3zKthtnzXmWGmJqhAV8= +github.com/cenkalti/hub v1.0.2/go.mod h1:8LAFAZcCasb83vfxatMUnZHRoQcffho2ELpHb+kaTJU= +github.com/cenkalti/rpc2 v1.0.4 h1:MJWmm7mbt8r/ZkQS+qr/e2KMMrhMLPr/62CYZIHybdI= +github.com/cenkalti/rpc2 v1.0.4/go.mod h1:2yfU5b86vOr16+iY1jN3MvT6Kxc9Nf8j5iZWwUf7iaw= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/fsnotify/fsnotify v1.6.0/go.mod h1:sl3t1tCWJFWoRz9R8WJCbQihKKwmorjAbSClcnxKAGw= -github.com/fsnotify/fsnotify v1.8.0 h1:dAwr6QBTBZIkG8roQaJjGof0pp0EeF+tNV7YBP3F/8M= -github.com/fsnotify/fsnotify v1.8.0/go.mod h1:8jBTzvmWwFyi3Pb8djgCCO5IBqzKJ/Jwo8TRcHyHii0= +github.com/fsnotify/fsnotify v1.9.0 h1:2Ml+OJNzbYCTzsxtv8vKSFD9PbJjmhYF14k/jKC7S9k= +github.com/fsnotify/fsnotify v1.9.0/go.mod h1:8jBTzvmWwFyi3Pb8djgCCO5IBqzKJ/Jwo8TRcHyHii0= github.com/go-logr/logr v1.2.2/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A= github.com/go-logr/logr v1.4.2 h1:6pFjapn8bFcIbiKo3XT4j/BhANplGihG6tvd+8rYgrY= github.com/go-logr/logr v1.4.2/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY= github.com/go-logr/stdr v1.2.2 h1:hSWxHoqTgW2S2qGc0LTAI563KZ5YKYRhT3MFKZMbjag= github.com/go-logr/stdr v1.2.2/go.mod h1:mMo/vtBO5dYbehREoey6XUKy/eSumjCCveDpRre4VKE= -github.com/golang-jwt/jwt/v5 v5.2.1 h1:OuVbFODueb089Lh128TAcimifWaLhJwVflnrgM17wHk= -github.com/golang-jwt/jwt/v5 v5.2.1/go.mod h1:pqrtFR0X4osieyHYxtmOUWsAWrfe1Q5UVIyoH402zdk= +github.com/golang-jwt/jwt/v5 v5.2.2 h1:Rl4B7itRWVtYIHFrSNd7vhTiz9UpLdi6gZhZ3wEeDy8= +github.com/golang-jwt/jwt/v5 v5.2.2/go.mod h1:pqrtFR0X4osieyHYxtmOUWsAWrfe1Q5UVIyoH402zdk= github.com/golang/groupcache v0.0.0-20200121045136-8c9f03a8e57e/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc= github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da h1:oI5xCqsCo564l8iNU+DwB5epxmsaqB+rhGL0m5jtYqE= github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc= github.com/golang/protobuf v1.5.4 h1:i7eJL8qZTpSEXOPTxNKhASYpMn+8e5Q6AdndVa1dWek= github.com/golang/protobuf v1.5.4/go.mod h1:lnTiLA8Wa4RWRcIUkrtSVa5nRhsEGBg48fD6rSs7xps= github.com/google/go-cmp v0.4.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= -github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI= -github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= github.com/google/go-cmp v0.7.0 h1:wk8382ETsv4JYUZwIsn6YpYiWiBsYLSJiTsyBybVuN8= +github.com/google/go-cmp v0.7.0/go.mod h1:pXiqmnSA92OHEEa9HXL2W4E7lf9JzCmGVUdgjX3N/iU= github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/grpc-ecosystem/grpc-gateway/v2 v2.25.1 h1:VNqngBF40hVlDloBruUehVYC3ArSgIyScOAyMRqBxRg= @@ -57,12 +60,10 @@ github.com/iancoleman/orderedmap v0.3.0 h1:5cbR2grmZR/DiVt+VJopEhtVs9YGInGIxAoMJ github.com/iancoleman/orderedmap v0.3.0/go.mod h1:XuLcCUkdL5owUCQeF2Ue9uuw1EptkJDkXXS7VoV7XGE= github.com/jhump/protoreflect v1.16.0 h1:54fZg+49widqXYQ0b+usAFHbMkBGR4PpXrsHc8+TBDg= github.com/jhump/protoreflect v1.16.0/go.mod h1:oYPd7nPvcBw/5wlDfm/AVmU9zH9BgqGCI469pGxfj/8= -github.com/konveyor/analyzer-lsp v0.6.0-alpha.2.0.20250228162759-837d328174c3 h1:/U1DF5WHznNlki2C17RyQHWnfYFKrVraMI95eOCBGTM= -github.com/konveyor/analyzer-lsp v0.6.0-alpha.2.0.20250228162759-837d328174c3/go.mod h1:l9XC3uazLba8yXoAFJWN7uBDju1s/g1Hc8TKBpE3B2U= -github.com/konveyor/analyzer-lsp v0.6.0-alpha.2.0.20250306150354-b062f3757592 h1:vvMR/OLw3C9LkVhcV2KQnYGyaOmCFRGNsLSp1Odxyr4= -github.com/konveyor/analyzer-lsp v0.6.0-alpha.2.0.20250306150354-b062f3757592/go.mod h1:l9XC3uazLba8yXoAFJWN7uBDju1s/g1Hc8TKBpE3B2U= -github.com/konveyor/analyzer-lsp/external-providers/java-external-provider v0.0.0-20250228162759-837d328174c3 h1:eGSaGwickMBOoSnPpI8fa41Yf+8BJ+pAiTZkHrtPm1E= -github.com/konveyor/analyzer-lsp/external-providers/java-external-provider v0.0.0-20250228162759-837d328174c3/go.mod h1:9hR5THTSExZSAPz8uDEVgm2gm47F8gqFj8cU9K/d32M= +github.com/konveyor/analyzer-lsp v0.7.0-alpha.2.0.20250414173818-69f16ed24d65 h1:YbN65QMFBX88onRsZm7fsrPTCN/Ty0/17rbm9KOCQKo= +github.com/konveyor/analyzer-lsp v0.7.0-alpha.2.0.20250414173818-69f16ed24d65/go.mod h1:nlSmohI+wwPTzvWXAtHI3PXe7B9RRU5NKnqnu222Ia0= +github.com/konveyor/analyzer-lsp/external-providers/java-external-provider v0.0.0-20250414173818-69f16ed24d65 h1:KVveBLtmT54T7Jrlh1zcjn4XgXsblPR3zU5IVt8t9oU= +github.com/konveyor/analyzer-lsp/external-providers/java-external-provider v0.0.0-20250414173818-69f16ed24d65/go.mod h1:H0OJVLr/1wheQn/fidth5Q9X00tr80ZjVMEyLjkJtYo= github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE= github.com/kr/pretty v0.3.1/go.mod h1:hoEshYVHaxMs3cyo3Yncou5ZscifuDolrwPKZanG3xk= github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= @@ -86,7 +87,6 @@ github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+ github.com/stretchr/objx v0.5.0 h1:1zr/of2m5FGMsad5YfcqgdqdWrIhu+EBEJRhR1U7z/c= github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo= github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= -github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/stretchr/testify v1.10.0 h1:Xv5erBjTwe/5IxqUQTdXv5kgmIvbHo3QQyRwhJsOfJA= github.com/stretchr/testify v1.10.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= github.com/swaggest/assertjson v1.9.0 h1:dKu0BfJkIxv/xe//mkCrK5yZbs79jL7OVf9Ija7o2xQ= @@ -107,8 +107,6 @@ go.lsp.dev/uri v0.3.0 h1:KcZJmh6nFIBeJzTugn5JTU6OOyG0lDOo3R9KwTxTYbo= go.lsp.dev/uri v0.3.0/go.mod h1:P5sbO1IQR+qySTWOCnhnK7phBx+W3zbLqSMDJNTw88I= go.opentelemetry.io/auto/sdk v1.1.0 h1:cH53jehLUN6UFLY71z+NDOiNJqDdPRaXzTel0sJySYA= go.opentelemetry.io/auto/sdk v1.1.0/go.mod h1:3wSPjt5PWp2RhlCcmmOial7AvC4DQqZb7a7wCow3W8A= -go.opentelemetry.io/otel v1.34.0 h1:zRLXxLCgL1WyKsPVrgbSdMN4c0FMkDAskSTQP+0hdUY= -go.opentelemetry.io/otel v1.34.0/go.mod h1:OWFPOQ+h4G8xpyjgqo4SxJYdDQ/qmRH+wivy7zzx9oI= go.opentelemetry.io/otel v1.35.0 h1:xKWKPxrxB6OtMCbmMY021CqC45J+3Onta9MqjhnusiQ= go.opentelemetry.io/otel v1.35.0/go.mod h1:UEqy8Zp11hpkUrL73gSlELM0DupHoiq72dR+Zqel/+Y= go.opentelemetry.io/otel/exporters/jaeger v1.17.0 h1:D7UpUy2Xc2wsi1Ras6V40q806WM07rqoCWzXu7Sqy+4= @@ -117,62 +115,44 @@ go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.34.0 h1:OeNbIYk/2C15ckl7glB go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.34.0/go.mod h1:7Bept48yIeqxP2OZ9/AqIpYS94h2or0aB4FypJTc8ZM= go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp v1.34.0 h1:BEj3SPM81McUZHYjRS5pEgNgnmzGJ5tRpU5krWnV8Bs= go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp v1.34.0/go.mod h1:9cKLGBDzI/F3NoHLQGm4ZrYdIHsvGt6ej6hUowxY0J4= -go.opentelemetry.io/otel/metric v1.34.0 h1:+eTR3U0MyfWjRDhmFMxe2SsW64QrZ84AOhvqS7Y+PoQ= -go.opentelemetry.io/otel/metric v1.34.0/go.mod h1:CEDrp0fy2D0MvkXE+dPV7cMi8tWZwX3dmaIhwPOaqHE= go.opentelemetry.io/otel/metric v1.35.0 h1:0znxYu2SNyuMSQT4Y9WDWej0VpcsxkuklLa4/siN90M= go.opentelemetry.io/otel/metric v1.35.0/go.mod h1:nKVFgxBZ2fReX6IlyW28MgZojkoAkJGaE8CpgeAU3oE= -go.opentelemetry.io/otel/sdk v1.34.0 h1:95zS4k/2GOy069d321O8jWgYsW3MzVV+KuSPKp7Wr1A= -go.opentelemetry.io/otel/sdk v1.34.0/go.mod h1:0e/pNiaMAqaykJGKbi+tSjWfNNHMTxoC9qANsCzbyxU= go.opentelemetry.io/otel/sdk v1.35.0 h1:iPctf8iprVySXSKJffSS79eOjl9pvxV9ZqOWT0QejKY= go.opentelemetry.io/otel/sdk v1.35.0/go.mod h1:+ga1bZliga3DxJ3CQGg3updiaAJoNECOgJREo9KHGQg= go.opentelemetry.io/otel/sdk/metric v1.34.0 h1:5CeK9ujjbFVL5c1PhLuStg1wxA7vQv7ce1EK0Gyvahk= go.opentelemetry.io/otel/sdk/metric v1.34.0/go.mod h1:jQ/r8Ze28zRKoNRdkjCZxfs6YvBTG1+YIqyFVFYec5w= -go.opentelemetry.io/otel/trace v1.34.0 h1:+ouXS2V8Rd4hp4580a8q23bg0azF2nI8cqLYnC8mh/k= -go.opentelemetry.io/otel/trace v1.34.0/go.mod h1:Svm7lSjQD7kG7KJ/MUHPVXSDGz2OX4h0M2jHBhmSfRE= go.opentelemetry.io/otel/trace v1.35.0 h1:dPpEfJu1sDIqruz7BHFG3c7528f6ddfSWfFDVt/xgMs= go.opentelemetry.io/otel/trace v1.35.0/go.mod h1:WUk7DtFp1Aw2MkvqGdwiXYDZZNvA/1J8o6xRXLrIkyc= go.opentelemetry.io/proto/otlp v1.5.0 h1:xJvq7gMzB31/d406fB8U5CBdyQGw4P399D1aQWU/3i4= go.opentelemetry.io/proto/otlp v1.5.0/go.mod h1:keN8WnHxOy8PG0rQZjJJ5A2ebUoafqWp0eVQ4yIXvJ4= +go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto= +go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE= golang.org/x/net v0.0.0-20220127200216-cd36cc0744dd/go.mod h1:CfG3xpIq0wQ8r1q4Su4UZFWDARRcnwPjda9FqA0JpMk= -golang.org/x/net v0.35.0 h1:T5GQRQb2y08kTAByq9L4/bz8cipCdA8FbRTXewonqY8= -golang.org/x/net v0.35.0/go.mod h1:EglIi67kWsHKlRzzVMUD93VMSWGFOMSZgxFjparz1Qk= -golang.org/x/net v0.37.0 h1:1zLorHbz+LYj7MQlSf1+2tPIIgibq2eL5xkrGk6f+2c= -golang.org/x/net v0.37.0/go.mod h1:ivrbrMbzFq5J41QOQh0siUuly180yBYtLp+CKbEaFx8= -golang.org/x/oauth2 v0.24.0 h1:KTBBxWqUa0ykRPLtV69rRto9TLXcqYkeswu48x/gvNE= -golang.org/x/oauth2 v0.24.0/go.mod h1:XYTD2NtWslqkgxebSiOHnXEap4TF09sJSc7H1sXbhtI= -golang.org/x/oauth2 v0.25.0 h1:CY4y7XT9v0cRI9oupztF8AgiIu99L/ksR/Xp/6jrZ70= -golang.org/x/oauth2 v0.25.0/go.mod h1:XYTD2NtWslqkgxebSiOHnXEap4TF09sJSc7H1sXbhtI= -golang.org/x/sync v0.11.0 h1:GGz8+XQP4FvTTrjZPzNKTMFtSXH80RAzG+5ghFPgK9w= -golang.org/x/sync v0.11.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk= -golang.org/x/sync v0.12.0 h1:MHc5BpPuC30uJk597Ri8TV3CNZcTLu6B6z4lJy+g6Jw= +golang.org/x/net v0.39.0 h1:ZCu7HMWDxpXpaiKdhzIfaltL9Lp31x/3fCP11bc6/fY= +golang.org/x/net v0.39.0/go.mod h1:X7NRbYVEA+ewNkCNyJ513WmMdQ3BineSwVtN2zD/d+E= +golang.org/x/oauth2 v0.26.0 h1:afQXWNNaeC4nvZ0Ed9XvCCzXM6UHJG7iCg0W4fPqSBE= +golang.org/x/oauth2 v0.26.0/go.mod h1:XYTD2NtWslqkgxebSiOHnXEap4TF09sJSc7H1sXbhtI= +golang.org/x/sync v0.13.0 h1:AauUjRAJ9OSnvULf/ARrrVywoJDy0YS2AwQ98I37610= +golang.org/x/sync v0.13.0/go.mod h1:1dzgHSNfp02xaA81J2MS99Qcpr2w7fw1gpm99rleRqA= golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20211216021012-1d35b9e2eb4e/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.0.0-20220715151400-c0bba94af5f8/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220908164124-27713097b956/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.30.0 h1:QjkSwP/36a20jFYWkSue1YwXzLmsV5Gfq7Eiy72C1uc= -golang.org/x/sys v0.30.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= -golang.org/x/sys v0.31.0 h1:ioabZlmFYtWhL+TRYpcnNlLwhyxaM9kWTDEmfnprqik= -golang.org/x/sys v0.31.0/go.mod h1:BJP2sWEmIv4KK5OTEluFJCKSidICx8ciO85XgH3Ak8k= +golang.org/x/sys v0.32.0 h1:s77OFDvIQeibCmezSnk/q6iAfkdiQaJi4VzroCFrN20= +golang.org/x/sys v0.32.0/go.mod h1:BJP2sWEmIv4KK5OTEluFJCKSidICx8ciO85XgH3Ak8k= golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8= golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ= -golang.org/x/text v0.22.0 h1:bofq7m3/HAFvbF51jz3Q9wLg3jkvSPuiZu/pD1XwgtM= -golang.org/x/text v0.22.0/go.mod h1:YRoo4H8PVmsu+E3Ou7cqLVH8oXWIHVoX0jqUWALQhfY= -golang.org/x/text v0.23.0 h1:D71I7dUrlY+VX0gQShAThNGHFxZ13dGLBHQLVl1mJlY= -golang.org/x/text v0.23.0/go.mod h1:/BLNzu4aZCJ1+kcD0DNRotWKage4q2rGVAg4o22unh4= +golang.org/x/text v0.24.0 h1:dd5Bzh4yt5KYA8f9CJHCP4FB4D51c2c6JvN37xJJkJ0= +golang.org/x/text v0.24.0/go.mod h1:L8rBsPeo2pSS+xqN0d5u2ikmjtmoJbDBT1b7nHvFCdU= golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= google.golang.org/genproto/googleapis/api v0.0.0-20250115164207-1a7da9e5054f h1:gap6+3Gk41EItBuyi4XX/bp4oqJ3UwuIMl25yGinuAA= google.golang.org/genproto/googleapis/api v0.0.0-20250115164207-1a7da9e5054f/go.mod h1:Ic02D47M+zbarjYYUlK57y316f2MoN0gjAwI3f2S95o= -google.golang.org/genproto/googleapis/rpc v0.0.0-20250219182151-9fdb1cabc7b2 h1:DMTIbak9GhdaSxEjvVzAeNZvyc03I61duqNbnm3SU0M= -google.golang.org/genproto/googleapis/rpc v0.0.0-20250219182151-9fdb1cabc7b2/go.mod h1:LuRYeWDFV6WOn90g357N17oMCaxpgCnbi/44qJvDn2I= -google.golang.org/genproto/googleapis/rpc v0.0.0-20250303144028-a0af3efb3deb h1:TLPQVbx1GJ8VKZxz52VAxl1EBgKXXbTiU9Fc5fZeLn4= -google.golang.org/genproto/googleapis/rpc v0.0.0-20250303144028-a0af3efb3deb/go.mod h1:LuRYeWDFV6WOn90g357N17oMCaxpgCnbi/44qJvDn2I= -google.golang.org/grpc v1.70.0 h1:pWFv03aZoHzlRKHWicjsZytKAiYCtNS0dHbXnIdq7jQ= -google.golang.org/grpc v1.70.0/go.mod h1:ofIJqVKDXx/JiXrwr2IG4/zwdH9txy3IlF40RmcJSQw= -google.golang.org/grpc v1.71.0 h1:kF77BGdPTQ4/JZWMlb9VpJ5pa25aqvVqogsxNHHdeBg= -google.golang.org/grpc v1.71.0/go.mod h1:H0GRtasmQOh9LkFoCPDu3ZrwUtD1YGE+b2vYBYd/8Ec= -google.golang.org/protobuf v1.36.5 h1:tPhr+woSbjfYvY6/GPufUoYizxw1cF/yFoxJ2fmpwlM= -google.golang.org/protobuf v1.36.5/go.mod h1:9fA7Ob0pmnwhb644+1+CVWFRbNajQ6iRojtC/QF5bRE= +google.golang.org/genproto/googleapis/rpc v0.0.0-20250414145226-207652e42e2e h1:ztQaXfzEXTmCBvbtWYRhJxW+0iJcz2qXfd38/e9l7bA= +google.golang.org/genproto/googleapis/rpc v0.0.0-20250414145226-207652e42e2e/go.mod h1:qQ0YXyHHx3XkvlzUtpXDkS29lDSafHMZBAZDc03LQ3A= +google.golang.org/grpc v1.71.1 h1:ffsFWr7ygTUscGPI0KKK6TLrGz0476KUvvsbqWK0rPI= +google.golang.org/grpc v1.71.1/go.mod h1:H0GRtasmQOh9LkFoCPDu3ZrwUtD1YGE+b2vYBYd/8Ec= +google.golang.org/protobuf v1.36.6 h1:z1NpPI8ku2WgiWnf+t9wTPsn6eP1L7ksHUlkfLvd9xY= +google.golang.org/protobuf v1.36.6/go.mod h1:jduwjTPXsFjZGTmRluh+L6NjiWu7pchiJ2/5YcXBHnY= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk= gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q= diff --git a/kai_analyzer_rpc/main.go b/kai_analyzer_rpc/main.go index e5f7e8eac..2a89d9a25 100644 --- a/kai_analyzer_rpc/main.go +++ b/kai_analyzer_rpc/main.go @@ -6,14 +6,16 @@ import ( "flag" "fmt" "log/slog" - "net/rpc" "os" "os/exec" "os/signal" "syscall" + rpc "github.com/cenkalti/rpc2" + "github.com/go-logr/logr" "github.com/konveyor/kai-analyzer/pkg/codec" + kairpc "github.com/konveyor/kai-analyzer/pkg/rpc" "github.com/konveyor/kai-analyzer/pkg/service" "github.com/konveyor/kai-analyzer/pkg/tracing" ) @@ -25,30 +27,16 @@ func main() { lspServerPath := flag.String("lspServerPath", "", "this will be the path to the lsp") bundles := flag.String("bundles", "", "Comma separated list of path to java analyzer bundles") depOpenSourceLabelsFile := flag.String("depOpenSourceLabelsFile", "", "Path to the dep open source labels file") + pipePath := flag.String("pipePath", "", "Path to the pipe to use for bi-directional communication") + logVerbosity := flag.Int("verbosity", -4, "how verbose would you like the logs to be, error logs are 8, warning logs are 4 info logs are 0 and debug logs are -4, going more negative gives more logs.") // TODO(djzager): We should do verbosity type argument(s) - logLevel := slog.LevelDebug + logLevel := slog.Level(*logVerbosity) flag.Parse() // In the future add cobra for flags maybe // create log file in working directory for now. - if sourceDirectory == nil || *sourceDirectory == "" { - panic(fmt.Errorf("source directory must be valid")) - } - - if rules == nil || *rules == "" { - panic(fmt.Errorf("rules must be set")) - } - - if lspServerPath == nil || *lspServerPath == "" { - panic(fmt.Errorf("lspServerPath must be set")) - } - - if bundles == nil || *bundles == "" { - panic(fmt.Errorf("bundles must be set")) - } - // TODO(djzager): Handle log level/location more like reputable LSP servers // ChatGPT told me gopls and jdtls default to stderr but I couldn't confirm that // for now I think it's enough to make it configurable to be one or the other. @@ -69,18 +57,10 @@ func main() { } l := logr.FromSlogHandler(logger) - - // Check if Java exists on the PATH - if err := exec.Command("java", "-version").Run(); err != nil { - panic("Java is not installed or not on the PATH") - } - l.Info("Java is installed") - - // Check if Maven exists on the PATH - if err := exec.Command("mvn", "-version").Run(); err != nil { - panic("Maven is not installed or not on the PATH") + usingPipe, err := validateFlags(sourceDirectory, rules, lspServerPath, bundles, depOpenSourceLabelsFile, pipePath, l) + if err != nil { + panic(err) } - l.Info("Maven is installed") // Check if ENABLE_TRACING is set in the environment. if _, enable_tracing := os.LookupEnv("ENABLE_TRACING"); enable_tracing { @@ -95,40 +75,82 @@ func main() { err = errors.Join(err, otelShutdown(context.Background())) }() } - - l.Info("Starting Analyzer", "source-dir", *sourceDirectory, "rules-dir", *rules, "lspServerPath", *lspServerPath, "bundles", *bundles, "depOpenSourceLabelsFile", *depOpenSourceLabelsFile) - // We need to start up the JSON RPC server and start listening for messages - analyzerService, err := service.NewAnalyzer( - 10000, 10, 10, - *sourceDirectory, - "", - *lspServerPath, - *bundles, - *depOpenSourceLabelsFile, - *rules, - l, - ) - if err != nil { - panic(err) - } server := rpc.NewServer() - err = server.RegisterName("analysis_engine", analyzerService) - if err != nil { - panic(err) - } + notificationService := &service.NotificationService{Logger: l} + server.Handle("notification.Notify", notificationService.Notify) + ctx, cancelFunc := context.WithCancel(context.Background()) cancelChan := make(chan os.Signal, 1) // catch SIGETRM or SIGINTERRUPT signal.Notify(cancelChan, syscall.SIGTERM, syscall.SIGINT) - codec := codec.NewCodec(codec.Connection{Input: os.Stdin, Output: os.Stdout}, l) go func() { l.Info("Starting Server") - server.ServeCodec(codec) + var s *kairpc.Server + if !usingPipe { + l.Info("Starting Analyzer", "source-dir", *sourceDirectory, "rules-dir", *rules, "lspServerPath", *lspServerPath, "bundles", *bundles, "depOpenSourceLabelsFile", *depOpenSourceLabelsFile) + // We need to start up the JSON RPC server and start listening for messages + analyzerService, err := service.NewAnalyzer( + 10000, 10, 10, + *sourceDirectory, + "", + *lspServerPath, + *bundles, + *depOpenSourceLabelsFile, + *rules, + l, + ) + if err != nil { + panic(err) + } + server.Handle("analysis_engine.Analyze", analyzerService.Analyze) + codec := codec.NewConnectionCodec(codec.Connection{Input: os.Stdin, Output: os.Stdout}, l) + server.ServeCodec(codec) + } else { + s = kairpc.NewServer(ctx, server, l, "notification.Notify", *rules, *sourceDirectory) + s.Accept(*pipePath) + } + l.Info("Stopping Server") }() sig := <-cancelChan + cancelFunc() // When we get here, call stop on the analyzer server l.Info("stopping server", "signal", sig) - analyzerService.Stop() + +} + +func validateFlags(sourceDirectory, rules, lspServerPath, bundles, depOpenSourceLabelsFile, pipePath *string, l logr.Logger) (bool, error) { + // If we are using a named pipe, jdtls is initialized by the caller. + if pipePath != nil && *pipePath != "" && rules != nil && len(*rules) > 0 && sourceDirectory != nil && *sourceDirectory != "" { + return true, nil + } + + if sourceDirectory == nil || *sourceDirectory == "" { + return false, fmt.Errorf("source directory must be valid") + } + + if rules == nil || *rules == "" { + return false, fmt.Errorf("rules must be set") + } + + if lspServerPath == nil || *lspServerPath == "" { + return false, fmt.Errorf("lspServerPath must be set") + } + + if bundles == nil || *bundles == "" { + return false, fmt.Errorf("bundles must be set") + } + // Check if Java exists on the PATH + if err := exec.Command("java", "-version").Run(); err != nil { + return false, fmt.Errorf("java is not installed or not on the PATH") + } + l.Info("Java is installed") + + // Check if Maven exists on the PATH + if err := exec.Command("mvn", "-version").Run(); err != nil { + return false, fmt.Errorf("maven is not installed or not on the PATH") + } + l.Info("Maven is installed") + return false, nil } diff --git a/kai_analyzer_rpc/pkg/codec/json_codec.go b/kai_analyzer_rpc/pkg/codec/json_codec.go index bd3f4fe01..93b7c00b4 100644 --- a/kai_analyzer_rpc/pkg/codec/json_codec.go +++ b/kai_analyzer_rpc/pkg/codec/json_codec.go @@ -1,220 +1,378 @@ package codec import ( + "bufio" + "bytes" "context" "encoding/json" - "errors" "fmt" "io" - "net/rpc" - "net/rpc/jsonrpc" - "sync" + "reflect" + "strconv" + "strings" + "sync/atomic" + "time" + + rpc "github.com/cenkalti/rpc2" "github.com/go-logr/logr" ) -// Connection wraps a pair of unidirectional streams as an io.ReadWriteCloser. -type Connection struct { - Input io.ReadCloser - Output io.WriteCloser -} +const headerString = "Content-Length: %d\r\n\r\n" -// Read implements io.ReadWriteCloser. -func (c *Connection) Read(p []byte) (n int, err error) { - return c.Input.Read(p) +type Codec interface { + rpc.Codec } -// Write implements io.ReadWriteCloser. -func (c *Connection) Write(p []byte) (n int, err error) { - return c.Output.Write(p) -} +type codec struct { + reader *bufio.Reader + writer *bufio.Writer + logger logr.Logger + scanner *bufio.Scanner -// Close closes c's underlying ReadCloser and WriteCloser. -func (c *Connection) Close() error { - rerr := c.Input.Close() - werr := c.Output.Close() - if rerr != nil { - return rerr - } - return werr -} + //temp workspace a new codec is created on every connection + // This is the workspace to deal with a single connection + msg message + serverRequest serverRequest + clientResponse clientResponse + isLanguageServer bool -type Codec interface { - rpc.ClientCodec - rpc.ServerCodec + state *rpc.State + ctx context.Context + + notificationServiceName string } -type codec struct { - rpc.ClientCodec - rpc.ServerCodec - logger logr.Logger +func NewCodec(ctx context.Context, reader *bufio.Reader, writer *bufio.Writer, logger logr.Logger, notificationServiceName string, state *rpc.State) Codec { + // Set new seq for this codec/connection + scanner := bufio.NewScanner(reader) + c := codec{ + logger: logger.WithName("json codec"), + reader: reader, + scanner: scanner, + writer: writer, + state: state, + ctx: ctx, + notificationServiceName: notificationServiceName, + isLanguageServer: true, + } + scanner.Split(c.splitLanguageServer) + return &c } -func (c *codec) WriteRequest(r *rpc.Request, v any) error { - c.logger.V(7).Info("write request", "request", r, "value", v) - err := c.ClientCodec.WriteRequest(r, v) - c.logger.V(7).Info("finished request header", "err", err, "request", r) - return err +//TODO: we need to rewrite the rpc.Client to use the context from here, when calling a handler. +// func (c *codec) setRequestContext(seq uint64, ctx context.Context, cancelFunc func()) { +// c.state.Set(fmt.Sprintf("context-%v", seq), ctx) +// c.state.Set(fmt.Sprintf("context-cancel-%v", seq), cancelFunc) +// } + +func (c *codec) getPending(seq uint64) (*json.RawMessage, bool) { + if v, ok := c.state.Get(fmt.Sprintf("pending-%v", seq)); ok { + if m, isCorrectType := v.(*json.RawMessage); isCorrectType { + return m, true + } + } + return nil, false } -func (c *codec) ReadRequestHeader(r *rpc.Request) error { - c.logger.V(7).Info("read request header", "request", r) - err := c.ServerCodec.ReadRequestHeader(r) - c.logger.V(7).Info("finished request header", "err", err, "request", r) - return err +func (c *codec) getSeq() *atomic.Uint64 { + if v, ok := c.state.Get("seq"); ok { + if m, isCorrectType := v.(*atomic.Uint64); isCorrectType { + return m + } + } + return nil } -func (c *codec) ReadRequestBody(r any) error { - c.logger.V(7).Info("read request body", "request", r) - err := c.ServerCodec.ReadRequestBody(r) - c.logger.V(7).Info("finished request body", "err", err, "request", r) - return err +func (c *codec) setPending(a uint64, m *json.RawMessage) { + c.state.Set(fmt.Sprintf("pending-%v", a), m) } -func (c *codec) WriteResponse(r *rpc.Response, v any) error { - c.logger.V(7).Info("writing response", "response", r, "object", v) - err := c.ServerCodec.WriteResponse(r, v) - c.logger.V(7).Info("finished write response", "err", err) - return err +func (c *codec) removePending(a uint64) { + c.state.Set(fmt.Sprintf("pending-%v", a), nil) } -func (c *codec) Close() error { - err := c.ClientCodec.Close() - if err != nil { - return err +func (c *codec) ReadHeader(r *rpc.Request, o *rpc.Response) error { + for !c.scanner.Scan() { + select { + case <-c.ctx.Done(): + return io.EOF + case <-time.After(100 * time.Millisecond): + continue + } } - err = c.ServerCodec.Close() + c.logger.V(7).Info("reading request header", "request", r, "body", c.scanner.Text()) + // Need to figure out how to stop this + // Strip null characters. + c.msg = message{} + b := c.scanner.Bytes() + if len(b) == 0 { + return io.EOF + } + err := json.Unmarshal(b, &c.msg) if err != nil { + c.logger.Error(err, "error unmarshaling body into request/response message", "b", string(b)) return err } + err = c.handleMessage(r, o) + c.logger.V(7).Info("finished request header", "err", err, "request", r) return nil } -func NewCodec(connection Connection, logger logr.Logger) Codec { - return &codec{ - ClientCodec: jsonrpc.NewClientCodec(&connection), ServerCodec: NewServerCodec(&connection, logger), - logger: logger.WithName("json codec"), - } -} +func (c *codec) handleMessage(r *rpc.Request, o *rpc.Response) error { + if c.msg.Method != "" { + // request comes to server + c.serverRequest.Id = c.msg.Id + c.serverRequest.Method = c.msg.Method + c.serverRequest.Params = c.msg.Params -type serverCodec struct { - dec *json.Decoder // for reading JSON values - enc *json.Encoder // for writing JSON values - c io.Closer - - // temporary work space - req serverRequest - - // JSON-RPC clients can use arbitrary json values as request IDs. - // Package rpc expects uint64 request IDs. - // We assign uint64 sequence numbers to incoming requests - // but save the original request ID in the pending map. - // When rpc responds, we use the sequence number in - // the response to find the original request ID. - mutex sync.Mutex // protects seq, pending - seq uint64 - pending map[uint64]*json.RawMessage - - logger logr.Logger -} + r.Method = c.serverRequest.Method -func NewServerCodec(conn io.ReadWriteCloser, log logr.Logger) rpc.ServerCodec { - return &serverCodec{ - dec: json.NewDecoder(conn), - enc: json.NewEncoder(conn), - c: conn, - pending: make(map[uint64]*json.RawMessage), - logger: log, - } -} + if c.serverRequest.Id != nil { + b, err := c.serverRequest.Id.MarshalJSON() + if err != nil { + c.logger.Error(err, "unable to marshal id") + return err + } + c.logger.Info("b", "b", b) + i, err := strconv.ParseUint(string(b), 10, 64) + if err != nil { + c.logger.Error(err, "unable to parse id") + return err + } + c.serverRequest.unMarshalledId = &i + } -type serverRequest struct { - Method string `json:"method"` - Params *json.RawMessage `json:"params"` - Id *json.RawMessage `json:"id"` -} + if c.serverRequest.unMarshalledId == nil && c.serverRequest.Params == nil { + params := fmt.Sprintf(`[{"type": "%s"}]`, c.msg.Method) + m := json.RawMessage(params) + c.serverRequest.Params = &m + } -func (r *serverRequest) reset() { - r.Method = "" - r.Params = nil - r.Id = nil -} + // JSON request id can be any JSON value; + // RPC package expects uint64. Translate to + // internal uint64 and save JSON on the side. + c.logger.Info("server request", "server request", c.serverRequest, "notificationService", c.notificationServiceName) + if c.serverRequest.unMarshalledId == nil && c.notificationServiceName != "" { + c.serverRequest.Method = c.notificationServiceName + r.Seq = 0 + r.Method = c.notificationServiceName + } else { + s := c.getSeq() + v := s.Add(1) + c.setPending(v, c.serverRequest.Id) + // TODO: enable when the client can use the state, to make pass the context to the handler + // requestContext, cancelFunc := context.WithCancel(c.ctx) + // c.setRequestContext(v, requestContext, cancelFunc) + c.serverRequest.Id = nil + r.Seq = v + } + } else { + // response comes to client + err := json.Unmarshal(*c.msg.Id, &c.clientResponse.Id) + if err != nil { + return err + } + c.clientResponse.Result = c.msg.Result + c.clientResponse.Error = c.msg.Error -type serverResponse struct { - Id *json.RawMessage `json:"id"` - Result any `json:"result"` - Error any `json:"error"` + o.Error = "" + o.Seq = c.clientResponse.Id + if c.clientResponse.Error != nil || c.clientResponse.Result == nil { + x, ok := c.clientResponse.Error.(string) + if !ok { + c.logger.Info("unable to get client response as error string", "err", err) + return nil + } + if x == "" { + x = "unspecified error" + } + o.Error = x + } + } + return nil } -func (c *serverCodec) ReadRequestHeader(r *rpc.Request) error { - c.req.reset() - if err := c.dec.Decode(&c.req); err != nil { - return err - } - r.ServiceMethod = c.req.Method +func (c *codec) WriteRequest(r *rpc.Request, v any) error { + c.logger.V(7).Info("write request", "request", r, "value", v) + req := &clientRequest{Method: r.Method, JsonRPCVersion: "2.0"} - // JSON request id can be any JSON value; - // RPC package expects uint64. Translate to - // internal uint64 and save JSON on the side. - c.mutex.Lock() - c.seq++ - c.pending[c.seq] = c.req.Id - c.req.Id = nil - r.Seq = c.seq - c.mutex.Unlock() + // Check if param is a slice of any kind + if v != nil && reflect.TypeOf(v).Kind() == reflect.Slice { + // If it's a slice, leave as is + req.Params = v + } else { + // Put anything else into a slice + req.Params = []interface{}{v} + } + if r.Seq == 0 { + // Notification + req.Id = nil + } else { + seq := r.Seq + req.Id = &seq + } + b, err := json.Marshal(req) + if err != nil { + c.logger.V(7).Info("unable to marshal request", "err", err) + return err + } + if c.isLanguageServer { + n, err := c.writer.Write(([]byte(fmt.Sprintf(headerString, len(b))))) + if err != nil { + c.logger.Error(err, "unable to write header string") + return err + } + c.logger.V(7).Info("finished writing language server header", "wrote", n) + } + n, err := c.writer.Write(b) + if err != nil { + c.logger.Error(err, "unable to write request body") + return err + } + err = c.writer.Flush() + if err != nil { + c.logger.Error(err, "unable to flush writer for request") + return err + } + c.logger.V(7).Info("finished writing request", "err", err, "request", r, "wrote", n) return nil } -func (c *serverCodec) ReadRequestBody(x any) error { - if x == nil { +//var errMissingParams = errors.New("jsonrpc: request body missing params") + +func (c *codec) ReadRequestBody(r any) error { + if r == nil { + return nil + } + c.logger.V(7).Info("read request body", "request", r, "type of r", reflect.TypeOf(r).Kind()) + + if c.serverRequest.Params == nil { + c.logger.Info("here missing params") return nil } - if c.req.Params == nil { - return fmt.Errorf("invalid params") + + var err error + + rt := reflect.TypeOf(r) + if rt.Kind() == reflect.Ptr && rt.Elem().Kind() == reflect.Slice { + // If it's a slice, unmarshal as is + err = json.Unmarshal(*c.serverRequest.Params, r) + } else { + // Anything else unmarshal into a slice containing x + params := &[]interface{}{r} + err = json.Unmarshal(*c.serverRequest.Params, params) + if err != nil { + err = json.Unmarshal(*c.serverRequest.Params, r) + } } - // JSON params is array value. - // RPC params is struct. - // Unmarshal into array containing struct for now. - // Should think about making RPC more general. - var params [1]any - params[0] = x - return json.Unmarshal(*c.req.Params, ¶ms) + c.logger.V(7).Info("finished request body", "err", err, "request", r) + return nil } -var null = json.RawMessage([]byte("null")) +//var null = json.RawMessage([]byte("null")) -func (c *serverCodec) WriteResponse(r *rpc.Response, x any) error { - c.logger.V(7).Info("writing response", "id", r.Seq, "pending", c.pending) - c.mutex.Lock() - b, ok := c.pending[r.Seq] +func (c *codec) WriteResponse(r *rpc.Response, v any) error { + c.logger.V(7).Info("writing response", "response", r, "object", v) + if c.serverRequest.unMarshalledId == nil { + c.logger.V(7).Info("dont respond on notifications") + return nil + } + b, ok := c.getPending(r.Seq) if !ok { - c.mutex.Unlock() - return errors.New("invalid sequence number in response") + c.logger.Info("invalid sequence number in response", "number", r.Seq) + return fmt.Errorf("unable to find pending matching request") } - delete(c.pending, r.Seq) - c.mutex.Unlock() + c.removePending((r.Seq)) - if b == nil { - // Invalid request so no id. Use JSON null. - b = &null - } - resp := serverResponse{Id: b} + resp := serverResponse{Id: b, JsonRPCVersion: "2.0"} if r.Error == "" { - resp.Result = x + resp.Result = v } else { resp.Error = r.Error } - return c.enc.Encode(resp) + outBytes, err := json.Marshal(resp) + if err != nil { + c.logger.V(7).Info("unable to marshall response bytes", "err", err) + return nil + } + if c.isLanguageServer { + n, err := c.writer.Write(([]byte(fmt.Sprintf(headerString, len(outBytes))))) + if err != nil { + c.logger.Info("unable to write header", "err", err) + return nil + } + c.logger.V(7).Info("finished writing language server header", "wrote", n) + } + n, err := c.writer.Write(outBytes) + if err != nil { + c.logger.Error(err, "unable to write response body") + return err + } + err = c.writer.Flush() + if err != nil { + c.logger.Error(err, "unable to flush to writer for response") + return err + } + c.logger.V(7).Info("finished write response", "err", err, "bytes", n, "outBytes", string(outBytes)) + return nil } -func (c *serverCodec) Close() error { - return c.c.Close() +func (c *codec) ReadResponseBody(v any) error { + c.logger.V(7).Info("read response body", "response", v) + if v == nil { + return nil + } + if c.clientResponse.Result == nil { + return nil + } + b, err := c.clientResponse.Result.MarshalJSON() + if err != nil { + return err + } + if string(b) == string([]byte("null")) { + return nil + } + err = json.Unmarshal(*c.clientResponse.Result, v) + c.logger.V(7).Info("finished read response body", "err", err, "body", v) + return err +} + +func (c *codec) Close() error { + c.logger.Info("Calling Close!!!") + return nil } -// ServeConn runs the JSON-RPC server on a single connection. -// ServeConn blocks, serving the connection until the client hangs up. -// The caller typically invokes ServeConn in a go statement. -func ServeConn(conn io.ReadWriteCloser) { - rpc.ServeCodec(NewServerCodec(conn, logr.FromContextOrDiscard(context.Background()))) +func (c *codec) splitLanguageServer(data []byte, atEOF bool) (advance int, token []byte, err error) { + if atEOF && len(data) == 0 { + return 0, nil, nil + } + // Find any headers + if i := bytes.Index(data, []byte("\r\n\r\n")); i >= 0 { + readContentHeader := data[0:i] + c.logger.V(7).Info("found header", "header", fmt.Sprintf("%q", readContentHeader)) + if !strings.Contains(string(readContentHeader), "Content-Length") { + return 0, nil, fmt.Errorf("found header separator but not content-length header") + } + pieces := strings.Split(string(readContentHeader), ":") + if len(pieces) != 2 { + return 0, nil, fmt.Errorf("invalid pieces") + } + addedLength, err := strconv.Atoi(strings.TrimSpace(pieces[1])) + if err != nil { + return 0, nil, err + } + if i+4+addedLength > len(data) { + // wait for the buffer to fill up + c.logger.Info("here - waiting for more data from buffer in scanner") + return 0, nil, nil + } + return i + addedLength + 4, data[i+4 : i+4+addedLength], nil + } + if atEOF { + c.logger.V(7).Info("scanner at EOF") + return len(data), data, nil + } + return 0, nil, nil } diff --git a/kai_analyzer_rpc/pkg/codec/stdio.go b/kai_analyzer_rpc/pkg/codec/stdio.go new file mode 100644 index 000000000..6cfe8a249 --- /dev/null +++ b/kai_analyzer_rpc/pkg/codec/stdio.go @@ -0,0 +1,52 @@ +package codec + +import ( + "io" + + "github.com/cenkalti/rpc2" + "github.com/cenkalti/rpc2/jsonrpc" + "github.com/go-logr/logr" +) + +// Connection wraps a pair of unidirectional streams as an io.ReadWriteCloser. +type Connection struct { + Input io.ReadCloser + Output io.WriteCloser + Logger logr.Logger +} + +// Read implements io.ReadWriteCloser. +func (c Connection) Read(p []byte) (n int, err error) { + r, err := c.Input.Read(p) + c.Logger.Info("read from connection", "bytes read", r, "error", err, "bytes", string(p)) + return r, err + +} + +// Write implements io.ReadWriteCloser. +func (c Connection) Write(p []byte) (n int, err error) { + return c.Output.Write(p) +} + +// Close closes c's underlying ReadCloser and WriteCloser. +func (c Connection) Close() error { + rerr := c.Input.Close() + werr := c.Output.Close() + if rerr != nil { + return rerr + } + return werr +} + +type connectionCodec struct { + rpc2.Codec + log logr.Logger +} + +func NewConnectionCodec(connection Connection, log logr.Logger) rpc2.Codec { + c := jsonrpc.NewJSONCodec(connection) + return connectionCodec{ + Codec: c, + log: log.WithName("connection-codec"), + } +} diff --git a/kai_analyzer_rpc/pkg/codec/types.go b/kai_analyzer_rpc/pkg/codec/types.go new file mode 100644 index 000000000..d415a3874 --- /dev/null +++ b/kai_analyzer_rpc/pkg/codec/types.go @@ -0,0 +1,41 @@ +package codec + +import "encoding/json" + +// serverRequest and clientResponse combined +type message struct { + Method string `json:"method"` + Params *json.RawMessage `json:"params"` + Id *json.RawMessage `json:"id,omitempty"` + Result *json.RawMessage `json:"result"` + Error interface{} `json:"error"` + JsonRPCVersion string `json:"jsonrpc"` +} + +// Unmarshal to +type serverRequest struct { + Method string `json:"method"` + Params *json.RawMessage `json:"params"` + Id *json.RawMessage `json:"id"` + unMarshalledId *uint64 `json:"-"` +} +type clientResponse struct { + Id uint64 `json:"id"` + Result *json.RawMessage `json:"result"` + Error interface{} `json:"error"` + JsonRPCVersion string `json:"jsonrpc"` +} + +// to Marshal +type serverResponse struct { + Id *json.RawMessage `json:"id"` + Result interface{} `json:"result"` + Error interface{} `json:"error"` + JsonRPCVersion string `json:"jsonrpc"` +} +type clientRequest struct { + Method string `json:"method"` + Params interface{} `json:"params"` + Id *uint64 `json:"id,omitempty"` + JsonRPCVersion string `json:"jsonrpc"` +} diff --git a/kai_analyzer_rpc/pkg/rpc/client/client.go b/kai_analyzer_rpc/pkg/rpc/client/client.go new file mode 100644 index 000000000..2579f732a --- /dev/null +++ b/kai_analyzer_rpc/pkg/rpc/client/client.go @@ -0,0 +1,19 @@ +package client + +import ( + "context" + + "github.com/cenkalti/rpc2" +) + +type Client struct { + *rpc2.Client +} + +func (c *Client) Call(ctx context.Context, method string, args interface{}, reply interface{}) error { + return c.Client.CallWithContext(ctx, method, args, reply) +} + +func (c *Client) Notify(ctx context.Context, method string, args interface{}) error { + return c.Client.Notify(method, args) +} diff --git a/kai_analyzer_rpc/pkg/rpc/server.go b/kai_analyzer_rpc/pkg/rpc/server.go new file mode 100644 index 000000000..a0fbd0698 --- /dev/null +++ b/kai_analyzer_rpc/pkg/rpc/server.go @@ -0,0 +1,88 @@ +//go:build !windows + +package rpc + +import ( + "bufio" + "context" + "errors" + "fmt" + "net" + "path/filepath" + "sync/atomic" + + rpc "github.com/cenkalti/rpc2" + "github.com/go-logr/logr" + "github.com/konveyor/kai-analyzer/pkg/codec" + "github.com/konveyor/kai-analyzer/pkg/service" +) + +type Server struct { + *rpc.Server + ctx context.Context + log logr.Logger + state *rpc.State + notificationServiceName string + connections []net.Conn + rules string + sourceDirectory string +} + +func NewServer(ctx context.Context, s *rpc.Server, log logr.Logger, notificationServiceName string, rules string, sourceDirectory string) *Server { + state := rpc.NewState() + state.Set("seq", &atomic.Uint64{}) + return &Server{ctx: ctx, + Server: s, + log: log, + state: state, + notificationServiceName: notificationServiceName, + rules: rules, + sourceDirectory: sourceDirectory} +} + +func (s *Server) Accept(pipePath string) { + pipePath, err := filepath.Abs(pipePath) + if err != nil { + panic(err) + } + pipePath = filepath.Clean(pipePath) + s.log.Info("dialing connection connections") + lc := net.ListenConfig{} + l, err := lc.Listen(s.ctx, "unix", pipePath) + if err != nil { + s.log.Error(err, "can not listen") + panic(err) + } + // Register pipe analysis handler + analyzerService, err := service.NewPipeAnalyzer(s.ctx, 10000, 10, 10, pipePath, s.rules, s.sourceDirectory, s.log.WithName("analyzer-service")) + if err != nil { + s.log.Error(err, "unable to create analyzer service") + return + } + s.Server.Handle("analysis_engine.Analyze", analyzerService.Analyze) + for { + conn, err := l.Accept() + if err != nil { + if !errors.Is(err, net.ErrClosed) { + s.log.Info("rpc.Serve: accept:", err.Error()) + } + return + } + s.connections = append(s.connections, conn) + s.log.Info("got connection", "conn", fmt.Sprintf("%v", conn)) + go s.run(conn) + } +} +func (s *Server) run(conn net.Conn) { + s.log.Info("connection", "localAddr", conn.LocalAddr(), "remoteAddr", conn.RemoteAddr()) + reader := bufio.NewReader(conn) + writer := bufio.NewWriter(conn) + c := codec.NewCodec(s.ctx, reader, writer, s.log.WithName("conn"), s.notificationServiceName, s.state) + s.log.Info("server codec", "codec", fmt.Sprintf("%+v", c)) + s.Server.ServeCodecWithState(c, s.state) + s.log.Info("here not in run") +} + +func (s *Server) ServeCodecWithState(codec codec.Codec, state *rpc.State) { + +} diff --git a/kai_analyzer_rpc/pkg/rpc/server_windows.go b/kai_analyzer_rpc/pkg/rpc/server_windows.go new file mode 100644 index 000000000..80aba5213 --- /dev/null +++ b/kai_analyzer_rpc/pkg/rpc/server_windows.go @@ -0,0 +1,88 @@ +//go:build windows + +package rpc + +import ( + "bufio" + "context" + "errors" + "fmt" + "net" + "sync" + "sync/atomic" + + "github.com/Microsoft/go-winio" + rpc "github.com/cenkalti/rpc2" + "github.com/go-logr/logr" + "github.com/konveyor/kai-analyzer/pkg/codec" + "github.com/konveyor/kai-analyzer/pkg/service" +) + +type Server struct { + *rpc.Server + ctx context.Context + log logr.Logger + state *rpc.State + notificationServiceName string + connections []net.Conn + rules string + sourceDirectory string + initService *sync.Once +} + +func NewServer(ctx context.Context, s *rpc.Server, log logr.Logger, notificationServiceName string, rules string, sourceDirectory string) *Server { + state := rpc.NewState() + state.Set("seq", &atomic.Uint64{}) + return &Server{ctx: ctx, + Server: s, + log: log, + state: state, + notificationServiceName: notificationServiceName, + rules: rules, + sourceDirectory: sourceDirectory, + initService: &sync.Once{}, + } +} + +func (s *Server) Accept(pipePath string) { + s.log.Info("pipepath", "p", pipePath) + s.log.Info("dialing connection connections") + l, err := winio.ListenPipe(pipePath, &winio.PipeConfig{}) + if err != nil { + panic(err) + } + analyzerService, err := service.NewPipeAnalyzer(s.ctx, 10000, 10, 10, pipePath, s.rules, s.sourceDirectory, s.log.WithName("analyzer-service")) + if err != nil { + s.log.Error(err, "unable to create analyzer service") + return + } + s.log.Info("handle analysis") + s.Server.Handle("analysis_engine.Analyze", analyzerService.Analyze) + // Register pipe analysis handler + for { + conn, err := l.Accept() + + if err != nil { + if !errors.Is(err, net.ErrClosed) { + s.log.Info("rpc.Serve: accept:", err.Error()) + } + return + } + s.connections = append(s.connections, conn) + s.log.Info("got connection", "conn", fmt.Sprintf("%v", conn)) + go s.run(conn) + } +} +func (s *Server) run(conn net.Conn) { + s.log.Info("connection", "localAddr", conn.LocalAddr(), "remoteAddr", conn.RemoteAddr()) + reader := bufio.NewReader(conn) + writer := bufio.NewWriter(conn) + c := codec.NewCodec(s.ctx, reader, writer, s.log.WithName("conn"), s.notificationServiceName, s.state) + s.log.Info("server codec", "codec", fmt.Sprintf("%+v", c)) + s.Server.ServeCodecWithState(c, s.state) + s.log.Info("here not in run") +} + +func (s *Server) ServeCodecWithState(codec codec.Codec, state *rpc.State) { + +} diff --git a/kai_analyzer_rpc/pkg/service/analyzer.go b/kai_analyzer_rpc/pkg/service/analyzer.go index dbf2945eb..77908c2f9 100644 --- a/kai_analyzer_rpc/pkg/service/analyzer.go +++ b/kai_analyzer_rpc/pkg/service/analyzer.go @@ -7,6 +7,8 @@ import ( "strings" "sync" + rpc "github.com/cenkalti/rpc2" + "github.com/go-logr/logr" "github.com/konveyor/analyzer-lsp/engine" "github.com/konveyor/analyzer-lsp/engine/labels" @@ -37,7 +39,12 @@ type cacheValue struct { ruleset konveyor.RuleSet } -type Analyzer struct { +type Analyzer interface { + Analyze(client *rpc.Client, args Args, response *Response) error + Stop() +} + +type analyzer struct { Logger logr.Logger engine engine.RuleEngine @@ -52,9 +59,15 @@ type Analyzer struct { discoveryCache []konveyor.RuleSet cache map[string][]cacheValue cacheMutex sync.RWMutex + + contextLines int + location string + rules string + + updateConditionProvider func(*rpc.Client, []engine.RuleSet, []engine.RuleSet, logr.Logger, int, string, string) ([]engine.RuleSet, []engine.RuleSet, error) } -func NewAnalyzer(limitIncidents, limitCodeSnips, contextLines int, location, incidentSelector, lspServerPath, bundles, depOpenSourceLabelsFile, rules string, log logr.Logger) (*Analyzer, error) { +func NewAnalyzer(limitIncidents, limitCodeSnips, contextLines int, location, incidentSelector, lspServerPath, bundles, depOpenSourceLabelsFile, rules string, log logr.Logger) (Analyzer, error) { prefix, err := filepath.Abs(location) if err != nil { return nil, err @@ -149,7 +162,7 @@ func NewAnalyzer(limitIncidents, limitCodeSnips, contextLines int, location, inc log.Info("using rulesets", "discoverRulesets", len(discoveryRulesets), "violationRulesets", len(violationRulesets)) // Generate discoveryRulesetCache here?? - return &Analyzer{ + return &analyzer{ Logger: log, engine: eng, engineCtx: ctx, @@ -195,7 +208,7 @@ type Response struct { Rulesets []konveyor.RuleSet } -func (a *Analyzer) Stop() { +func (a *analyzer) Stop() { a.Logger.Info("stopping engine") a.engine.Stop() a.Logger.Info("engine stopped") @@ -206,12 +219,25 @@ func (a *Analyzer) Stop() { } } -func (a *Analyzer) Analyze(args Args, response *Response) error { +func (a *analyzer) Analyze(client *rpc.Client, args Args, response *Response) error { prop := otel.GetTextMapPropagator() ctx := prop.Extract(context.Background(), args.Carrier) ctx, span := tracer.Start(ctx, "analyze") defer span.End() + dRulesets, vRulesets := a.discoveryRulesets, a.violationRulesets + if a.updateConditionProvider != nil { + var err error + dRulesets, vRulesets, err = a.updateConditionProvider(client, a.discoveryRulesets, a.violationRulesets, a.Logger.WithName("provider update"), a.contextLines, a.location, a.rules) + if err != nil { + a.Logger.Error(err, "unable to update Conditions with new client") + return err + } + a.Logger.Info("updated rulesets", "discovery", len(a.discoveryRulesets), "violations", len(a.violationRulesets)) + } + + //a.Logger.Info("compare before after", "before", fmt.Sprintf("%+v", a.violationRulesets), "after", fmt.Sprintf("%+v", vRulesets)) + selectors := []engine.RuleSelector{} if args.LabelSelector != "" { selector, err := labels.NewLabelSelector[*engine.RuleMeta](args.LabelSelector, nil) @@ -248,10 +274,10 @@ func (a *Analyzer) Analyze(args Args, response *Response) error { // Adding spans to the discovery rules run and for the violation rules run // to determine if discovery rule segmentation saves us enough - if len(a.discoveryRulesets) != 0 && args.ResetCache { + if len(dRulesets) != 0 && args.ResetCache { // Here we want to refresh the discovery ruleset cache ctx, span := tracer.Start(ctx, "discovery-rules") - rulesets := a.engine.RunRulesScoped(ctx, a.discoveryRulesets, engine.NewScope(scopes...), selectors...) + rulesets := a.engine.RunRulesScoped(ctx, dRulesets, engine.NewScope(scopes...), selectors...) a.discoveryCacheMutex.Lock() a.discoveryCache = rulesets a.discoveryCacheMutex.Unlock() @@ -262,7 +288,7 @@ func (a *Analyzer) Analyze(args Args, response *Response) error { // This will already wait // violationCTX, violationSpan := tracer.Start(ctx, "violation-rules") - rulesets := a.engine.RunRulesScoped(violationCTX, a.violationRulesets, engine.NewScope(scopes...), selectors...) + rulesets := a.engine.RunRulesScoped(violationCTX, vRulesets, engine.NewScope(scopes...), selectors...) violationSpan.End() sort.SliceStable(rulesets, func(i, j int) bool { @@ -283,7 +309,7 @@ func (a *Analyzer) Analyze(args Args, response *Response) error { return nil } -func (a *Analyzer) setCache(rulesets []konveyor.RuleSet) { +func (a *analyzer) setCache(rulesets []konveyor.RuleSet) { a.cacheMutex.Lock() defer a.cacheMutex.Unlock() a.cache = map[string][]cacheValue{} @@ -291,7 +317,7 @@ func (a *Analyzer) setCache(rulesets []konveyor.RuleSet) { a.addRulesetsToCache(rulesets) } -func (a *Analyzer) updateCache(rulesets []konveyor.RuleSet, includedPaths []string) { +func (a *analyzer) updateCache(rulesets []konveyor.RuleSet, includedPaths []string) { a.cacheMutex.Lock() defer a.cacheMutex.Unlock() if includedPaths != nil { @@ -300,7 +326,7 @@ func (a *Analyzer) updateCache(rulesets []konveyor.RuleSet, includedPaths []stri a.addRulesetsToCache(rulesets) } -func (a *Analyzer) addRulesetsToCache(rulesets []konveyor.RuleSet) { +func (a *analyzer) addRulesetsToCache(rulesets []konveyor.RuleSet) { for _, r := range rulesets { for violationName, v := range r.Violations { @@ -349,14 +375,14 @@ func (a *Analyzer) addRulesetsToCache(rulesets []konveyor.RuleSet) { } } -func (a *Analyzer) invalidateCachePerFile(paths []string) { +func (a *analyzer) invalidateCachePerFile(paths []string) { for _, p := range paths { a.Logger.Info("deleting cache entry for path", "path", p) delete(a.cache, p) } } -func (a *Analyzer) createRulesetsFromCache() []konveyor.RuleSet { +func (a *analyzer) createRulesetsFromCache() []konveyor.RuleSet { a.cacheMutex.RLock() defer a.cacheMutex.RUnlock() diff --git a/kai_analyzer_rpc/pkg/service/notification_service.go b/kai_analyzer_rpc/pkg/service/notification_service.go new file mode 100644 index 000000000..c10033292 --- /dev/null +++ b/kai_analyzer_rpc/pkg/service/notification_service.go @@ -0,0 +1,24 @@ +package service + +import ( + rpc "github.com/cenkalti/rpc2" + "github.com/go-logr/logr" +) + +type NotificationService struct { + Logger logr.Logger +} + +func (n *NotificationService) Notify(client *rpc.Client, args map[string]interface{}, response *Response) error { + n.Logger.Info("here in notification service !! Notify", "args", args) + if v, ok := args["type"]; ok { + if v == "start" { + err := client.Notify("started", nil) + if err != nil { + n.Logger.Error(err, "could not notify") + } + } + + } + return nil +} diff --git a/kai_analyzer_rpc/pkg/service/pipe_analyzer.go b/kai_analyzer_rpc/pkg/service/pipe_analyzer.go new file mode 100644 index 000000000..3367e1959 --- /dev/null +++ b/kai_analyzer_rpc/pkg/service/pipe_analyzer.go @@ -0,0 +1,285 @@ +// go: build linux || darwin || freebsd || openbsd || netbsd || solaris || dragonfly || plan9 +package service + +import ( + "context" + "path/filepath" + "strings" + "sync" + + "github.com/cenkalti/rpc2" + "github.com/go-logr/logr" + "github.com/konveyor/analyzer-lsp/engine" + "github.com/konveyor/analyzer-lsp/output/v1/konveyor" + "github.com/konveyor/analyzer-lsp/parser" + "github.com/konveyor/analyzer-lsp/provider" + "github.com/konveyor/analyzer-lsp/provider/lib" + "github.com/konveyor/kai-analyzer/provider/java" +) + +func NewPipeAnalyzer(ctx context.Context, limitIncidents, limitCodeSnips, contextLines int, pipePath, rules, location string, l logr.Logger) (Analyzer, error) { + prefix, err := filepath.Abs(location) + if err != nil { + return nil, err + } + ctx, cancelFunc := context.WithCancel(ctx) + eng := engine.CreateRuleEngine(ctx, + 10, + l, + engine.WithIncidentLimit(limitIncidents), + engine.WithCodeSnipLimit(limitCodeSnips), + engine.WithContextLines(contextLines), + //engine.WithIncidentSelector(incidentSelector), + engine.WithLocationPrefixes([]string{prefix}), + ) + + // this function already init's the java provider + jProvider, err := java.NewInternalProviderClientForPipe(ctx, l, contextLines, location, pipePath) + if err != nil { + cancelFunc() + return nil, err + } + + bProvider, err := lib.GetProviderClient(provider.Config{Name: "builtin"}, l) + if err != nil { + cancelFunc() + return nil, err + } + _, err = bProvider.ProviderInit(context.Background(), []provider.InitConfig{{Location: location}}) + if err != nil { + cancelFunc() + return nil, err + } + + providers := map[string]provider.InternalProviderClient{ + "java": jProvider, + "builtin": bProvider, + } + + parser := parser.RuleParser{ + ProviderNameToClient: providers, + Log: l.WithName("parser"), + } + + discoveryRulesets, violationRulesets, err := parseRules(parser, rules, l, cancelFunc) + if err != nil { + return nil, err + } + + l.Info("using rulesets", "discoverRulesets", len(discoveryRulesets), "violationRulesets", len(violationRulesets)) + // Generate discoveryRulesetCache here?? + + return &analyzer{ + Logger: l, + engine: eng, + engineCtx: ctx, + cancelFunc: cancelFunc, + initedProviders: providers, + discoveryRulesets: discoveryRulesets, + violationRulesets: violationRulesets, + discoveryCache: []konveyor.RuleSet{}, + discoveryCacheMutex: sync.Mutex{}, + cache: map[string][]cacheValue{}, + cacheMutex: sync.RWMutex{}, + location: location, + contextLines: contextLines, + rules: rules, + updateConditionProvider: updateProviderConditionToUseNewRPClientParseRules, + }, nil + +} + +func parseRules(parser parser.RuleParser, rules string, l logr.Logger, cancelFunc func()) ([]engine.RuleSet, []engine.RuleSet, error) { + discoveryRulesets := []engine.RuleSet{} + violationRulesets := []engine.RuleSet{} + for _, f := range strings.Split(rules, ",") { + internRuleSets, _, err := parser.LoadRules(strings.TrimSpace(f)) + if err != nil { + l.Error(err, "unable to parse all the rules for ruleset", "file", f) + cancelFunc() + return nil, nil, err + } + + for _, interimRuleSet := range internRuleSets { + runCacheResetRuleset := engine.RuleSet{ + Name: interimRuleSet.Name, + Description: interimRuleSet.Description, + Labels: interimRuleSet.Labels, + Tags: interimRuleSet.Tags, + Rules: []engine.Rule{}, + } + allOtherRuleSet := engine.RuleSet{ + Name: interimRuleSet.Name, + Description: interimRuleSet.Description, + Labels: interimRuleSet.Labels, + Tags: interimRuleSet.Tags, + Rules: []engine.Rule{}, + } + for _, interimRule := range interimRuleSet.Rules { + hasDiscovery, hasAlways := labelsContainDiscoveryOrAlways(append(interimRule.Labels, interimRuleSet.Labels...)) + if len(interimRule.Labels) == 2 && hasDiscovery && hasAlways { + runCacheResetRuleset.Rules = append(runCacheResetRuleset.Rules, interimRule) + } else if interimRule.Perform.Tag != nil && !(interimRule.Perform.Message.Text != nil && interimRule.Effort != nil && *interimRule.Effort != 0) { + // We want to pull out tagging rules and insight only rules + // These don't generate violations, and we should treat them + // like discovery rules + runCacheResetRuleset.Rules = append(runCacheResetRuleset.Rules, interimRule) + } else { + allOtherRuleSet.Rules = append(allOtherRuleSet.Rules, interimRule) + } + } + + if len(allOtherRuleSet.Rules) > 0 { + violationRulesets = append(violationRulesets, allOtherRuleSet) + } + if len(runCacheResetRuleset.Rules) > 0 { + discoveryRulesets = append(discoveryRulesets, runCacheResetRuleset) + } + } + } + return discoveryRulesets, violationRulesets, nil +} + +func updateProviderConditionToUseNewRPClientParseRules(client *rpc2.Client, + discoveryRulesets, violationRulesets []engine.RuleSet, + log logr.Logger, + contextLines int, + location, rules string) ([]engine.RuleSet, []engine.RuleSet, error) { + + jProvider, err := java.NewInternalProviderClientForRPCClient(context.TODO(), log, contextLines, location, client) + if err != nil { + return nil, nil, err + } + + bProvider, err := lib.GetProviderClient(provider.Config{Name: "builtin"}, log) + if err != nil { + return nil, nil, err + } + _, err = bProvider.ProviderInit(context.Background(), []provider.InitConfig{{Location: location}}) + if err != nil { + return nil, nil, err + } + + providers := map[string]provider.InternalProviderClient{ + "java": jProvider, + "builtin": bProvider, + } + + parser := parser.RuleParser{ + ProviderNameToClient: providers, + Log: log.WithName("parser"), + } + return parseRules(parser, rules, log, func() {}) +} + +// TODO: This code was not working but should work and should be the more correct way to do this rather then re-parsing rules. +// func updateProviderConditionToUseNewRPClient(client *rpc2.Client, +// discoveryRulesets, violationRulesets []engine.RuleSet, +// log logr.Logger, +// contextLines int, +// location, _ string) ([]engine.RuleSet, []engine.RuleSet, error) { +// //create new java provider + +// c, err := java.NewInternalProviderClientForRPCClient(context.TODO(), log, contextLines, location, client) +// if err != nil { +// return nil, nil, err +// } +// client.Notify("started", nil) +// reply := map[string]interface{}{} +// client.Call("workspace/executeCommand", []map[string]interface{}{{"command": "io.konveyor.tackle.ruleEntry", +// "arguments": map[string]interface{}{ +// "analysisMode": "source-only", +// "location": "11", +// "project": "java", +// "query": "java.rmi*"}, +// "id": 1, +// "jsonrpc": "2.0"}}, reply) + +// for _, rs := range discoveryRulesets { +// for _, r := range rs.Rules { +// switch r.When.(type) { +// case engine.ConditionEntry: +// log.Info("dealing with simplest case", "before", fmt.Sprintf("%+v", r.When)) +// v := r.When.(engine.ConditionEntry) +// if x, ok := v.ProviderSpecificConfig.(provider.ProviderCondition); ok { +// if strings.Contains(reflect.TypeOf(x.Client).String(), "java") { +// x.Client = c +// v.ProviderSpecificConfig = x +// } +// r.When = v +// log.Info("dealing with simplest case", "after", fmt.Sprintf("%+v", r.When)) +// } +// case engine.AndCondition: +// v := r.When.(engine.AndCondition) +// conditions := handleConditionEntries(v.Conditions, c) +// v.Conditions = conditions +// r.When = v +// case engine.OrCondition: +// v := r.When.(engine.OrCondition) +// conditions := handleConditionEntries(v.Conditions, c) +// v.Conditions = conditions +// r.When = v +// default: +// panic(fmt.Errorf("invalid top level condition when type: %T -- %+v", r.When, r.When)) +// } +// } +// } +// for _, rs := range violationRulesets { +// for _, r := range rs.Rules { +// switch r.When.(type) { +// case engine.ConditionEntry: +// v := r.When.(engine.ConditionEntry) +// if x, ok := v.ProviderSpecificConfig.(provider.ProviderCondition); ok { +// if strings.Contains(reflect.TypeOf(x.Client).String(), "java") { +// x.Client = c +// v.ProviderSpecificConfig = x +// } +// } +// r.When = v +// case engine.AndCondition: +// v := r.When.(engine.AndCondition) +// conditions := handleConditionEntries(v.Conditions, c) +// v.Conditions = conditions +// r.When = v +// case engine.OrCondition: +// v := r.When.(engine.OrCondition) +// conditions := handleConditionEntries(v.Conditions, c) +// v.Conditions = conditions +// r.When = v +// default: +// panic(fmt.Errorf("invalid top level condition when type: %T -- %+v", r.When, r.When)) +// } + +// } +// } +// return discoveryRulesets, violationRulesets, nil +// } + +// func handleConditionEntries(entries []engine.ConditionEntry, c provider.InternalProviderClient) []engine.ConditionEntry { +// ret := []engine.ConditionEntry{} +// for _, ce := range entries { +// switch ce.ProviderSpecificConfig.(type) { +// case engine.ConditionEntry: +// v := ce.ProviderSpecificConfig.(engine.ConditionEntry) +// if x, ok := v.ProviderSpecificConfig.(provider.ProviderCondition); ok { +// x.Client = c +// v.ProviderSpecificConfig = x +// } +// ret = append(ret, ce) +// case engine.AndCondition: +// v := ce.ProviderSpecificConfig.(engine.AndCondition) +// conditions := handleConditionEntries(v.Conditions, c) +// v.Conditions = conditions +// ce.ProviderSpecificConfig = v +// ret = append(ret, ce) + +// case engine.OrCondition: +// v := ce.ProviderSpecificConfig.(engine.OrCondition) +// conditions := handleConditionEntries(v.Conditions, c) +// v.Conditions = conditions +// ce.ProviderSpecificConfig = v +// ret = append(ret, ce) +// } +// } +// return ret +// } diff --git a/kai_analyzer_rpc/provider/java/internal_java_provider.go b/kai_analyzer_rpc/provider/java/internal_java_provider.go index f616300fc..e8753ec90 100644 --- a/kai_analyzer_rpc/provider/java/internal_java_provider.go +++ b/kai_analyzer_rpc/provider/java/internal_java_provider.go @@ -1,11 +1,18 @@ +//go:build !windows + package java import ( + "bufio" "context" + "net" + "github.com/cenkalti/rpc2" "github.com/go-logr/logr" extjava "github.com/konveyor/analyzer-lsp/external-providers/java-external-provider/pkg/java_external_provider" "github.com/konveyor/analyzer-lsp/provider" + "github.com/konveyor/kai-analyzer/pkg/codec" + clientwrapper "github.com/konveyor/kai-analyzer/pkg/rpc/client" ) type InternalProviderClient struct { @@ -45,6 +52,72 @@ func NewInternalProviderClient(ctx context.Context, log logr.Logger, contextLine } +func NewInternalProviderClientForPipe(ctx context.Context, log logr.Logger, contextLines int, location, pipeFile string) (provider.InternalProviderClient, error) { + p := extjava.NewJavaProvider(log, "java", contextLines, provider.Config{ + Name: "java", + }) + log.Info("logger", "v", p) + providerConfig := map[string]interface{}{ + "lspServerName": "java", + } + + conn, err := net.Dial("unix", pipeFile) + if err != nil { + return p, err + } + reader := bufio.NewReader(conn) + writer := bufio.NewWriter(conn) + c := codec.NewCodec(ctx, reader, writer, log.WithName("provider"), "", rpc2.NewState()) + client := rpc2.NewClientWithCodec(c) + + svcClient, _, err := p.Init(ctx, log, provider.InitConfig{ + Location: location, + Proxy: &provider.Proxy{}, + ProviderSpecificConfig: providerConfig, + AnalysisMode: "source-only", + RPC: &clientwrapper.Client{ + Client: client, + }, + }) + if err != nil { + return &InternalProviderClient{}, err + } + + return &InternalProviderClient{ + BaseClient: p, + ServiceClient: svcClient, + }, nil + +} + +func NewInternalProviderClientForRPCClient(ctx context.Context, log logr.Logger, contextLines int, location string, client *rpc2.Client) (provider.InternalProviderClient, error) { + p := extjava.NewJavaProvider(log, "java", contextLines, provider.Config{ + Name: "java", + }) + log.Info("logger", "v", p) + providerConfig := map[string]interface{}{ + "lspServerName": "java", + } + svcClient, _, err := p.Init(ctx, log, provider.InitConfig{ + Location: location, + Proxy: &provider.Proxy{}, + ProviderSpecificConfig: providerConfig, + AnalysisMode: "source-only", + RPC: &clientwrapper.Client{ + Client: client, + }, + }) + if err != nil { + return &InternalProviderClient{}, err + } + + return &InternalProviderClient{ + BaseClient: p, + ServiceClient: svcClient, + }, nil + +} + func (i *InternalProviderClient) ProviderInit(ctx context.Context, configs []provider.InitConfig) ([]provider.InitConfig, error) { return configs, nil } diff --git a/kai_analyzer_rpc/provider/java/internal_java_provider_windows.go b/kai_analyzer_rpc/provider/java/internal_java_provider_windows.go new file mode 100644 index 000000000..462ab9799 --- /dev/null +++ b/kai_analyzer_rpc/provider/java/internal_java_provider_windows.go @@ -0,0 +1,119 @@ +//go:build windows + +package java + +import ( + "bufio" + "bytes" + "context" + + "github.com/cenkalti/rpc2" + "github.com/go-logr/logr" + extjava "github.com/konveyor/analyzer-lsp/external-providers/java-external-provider/pkg/java_external_provider" + "github.com/konveyor/analyzer-lsp/provider" + "github.com/konveyor/kai-analyzer/pkg/codec" + clientwrapper "github.com/konveyor/kai-analyzer/pkg/rpc/client" +) + +type InternalProviderClient struct { + provider.BaseClient + provider.ServiceClient +} + +func NewInternalProviderClient(ctx context.Context, log logr.Logger, contextLines int, location, lspServerPath, bundles, depOpenSourceLabelsFile string) (provider.InternalProviderClient, error) { + // Create JavaProvider From external provider + p := extjava.NewJavaProvider(log, "java", contextLines, provider.Config{ + Name: "java", + }) + log.Info("logger", "v", p) + providerConfig := map[string]interface{}{ + "lspServerName": "java", + "bundles": bundles, + "lspServerPath": lspServerPath, + } + if depOpenSourceLabelsFile != "" { + providerConfig["depOpenSourceLabelsFile"] = depOpenSourceLabelsFile + } + + svcClient, _, err := p.Init(ctx, log, provider.InitConfig{ + Location: location, + Proxy: &provider.Proxy{}, + ProviderSpecificConfig: providerConfig, + AnalysisMode: "source-only", + }) + if err != nil { + return &InternalProviderClient{}, err + } + + return &InternalProviderClient{ + BaseClient: p, + ServiceClient: svcClient, + }, nil + +} + +func NewInternalProviderClientForPipe(ctx context.Context, log logr.Logger, contextLines int, location, pipeFile string) (provider.InternalProviderClient, error) { + p := extjava.NewJavaProvider(log, "java", contextLines, provider.Config{ + Name: "java", + }) + log.Info("logger", "v", p) + providerConfig := map[string]interface{}{ + "lspServerName": "java", + } + + reader := bufio.NewReader(bytes.NewBuffer([]byte{})) + writer := bufio.NewWriter(bytes.NewBuffer([]byte{})) + c := codec.NewCodec(ctx, reader, writer, log.WithName("provider"), "", rpc2.NewState()) + client := rpc2.NewClientWithCodec(c) + + svcClient, _, err := p.Init(ctx, log, provider.InitConfig{ + Location: location, + Proxy: &provider.Proxy{}, + ProviderSpecificConfig: providerConfig, + AnalysisMode: "source-only", + RPC: &clientwrapper.Client{ + Client: client, + }, + }) + if err != nil { + return &InternalProviderClient{}, err + } + + return &InternalProviderClient{ + BaseClient: p, + ServiceClient: svcClient, + }, nil + +} + +func NewInternalProviderClientForRPCClient(ctx context.Context, log logr.Logger, contextLines int, location string, client *rpc2.Client) (provider.InternalProviderClient, error) { + p := extjava.NewJavaProvider(log, "java", contextLines, provider.Config{ + Name: "java", + }) + log.Info("logger", "v", p) + providerConfig := map[string]interface{}{ + "lspServerName": "java", + } + svcClient, _, err := p.Init(ctx, log, provider.InitConfig{ + Location: location, + Proxy: &provider.Proxy{}, + ProviderSpecificConfig: providerConfig, + AnalysisMode: "source-only", + RPC: &clientwrapper.Client{ + Client: client, + }, + }) + if err != nil { + return &InternalProviderClient{}, err + } + + return &InternalProviderClient{ + BaseClient: p, + ServiceClient: svcClient, + }, nil + +} + +func (i *InternalProviderClient) ProviderInit(ctx context.Context, configs []provider.InitConfig) ([]provider.InitConfig, error) { + return configs, nil +}