Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions api/kubernetes/customresourcedefinitions.gen.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -332,6 +332,20 @@ spec:
type: string
type:
type: string
vport:
properties:
default:
type: integer
services:
items:
properties:
name:
type: string
value:
type: integer
type: object
type: array
type: object
zkServicesPath:
items:
type: string
Expand Down
317 changes: 239 additions & 78 deletions api/networking/v1/mcp_bridge.pb.go

Large diffs are not rendered by default.

9 changes: 9 additions & 0 deletions api/networking/v1/mcp_bridge.proto
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,15 @@ message RegistryConfig {
repeated string allowMcpServers = 24;
map<string, InnerMap> metadata = 25;
string proxyName = 26;
message VPort {
uint32 default = 1;
message Services {
string name = 1;
uint32 value = 2;
}
repeated Services services = 2;
}
VPort vport = 27;
}

message ProxyConfig {
Expand Down
42 changes: 42 additions & 0 deletions api/networking/v1/mcp_bridge_deepcopy.gen.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

22 changes: 22 additions & 0 deletions api/networking/v1/mcp_bridge_json.gen.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

14 changes: 14 additions & 0 deletions helm/core/crds/customresourcedefinitions.gen.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -332,6 +332,20 @@ spec:
type: string
type:
type: string
vport:
properties:
default:
type: integer
services:
items:
properties:
name:
type: string
value:
type: integer
type: object
type: array
type: object
zkServicesPath:
items:
type: string
Expand Down
23 changes: 19 additions & 4 deletions registry/eureka/watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,12 @@ func NewWatcher(cache memory.Cache, opts ...WatcherOption) (provider.Watcher, er
return w, nil
}

func WithVport(vport *apiv1.RegistryConfig_VPort) WatcherOption {
return func(w *watcher) {
w.Vport = vport
}
}

func WithEurekaFullRefreshInterval(refreshInterval int64) WatcherOption {
return func(w *watcher) {
if refreshInterval < int64(DefaultFullRefreshIntervalLimit) {
Expand Down Expand Up @@ -151,6 +157,9 @@ func (w *watcher) Stop() {
w.cache.DeleteServiceWrapper(makeHost(serviceName))
}
w.UpdateService()
w.isStop = true
close(w.stop)
w.Ready(false)
}

func (w *watcher) IsHealthy() bool {
Expand Down Expand Up @@ -200,7 +209,7 @@ func (w *watcher) subscribe(service *fargo.Application) error {
defer w.UpdateService()

if len(service.Instances) != 0 {
se, err := generateServiceEntry(service)
se, err := w.generateServiceEntry(service)
if err != nil {
return err
}
Expand Down Expand Up @@ -252,10 +261,10 @@ func convertMap(m map[string]interface{}) map[string]string {
return result
}

func generateServiceEntry(app *fargo.Application) (*v1alpha3.ServiceEntry, error) {
func (w *watcher) generateServiceEntry(app *fargo.Application) (*v1alpha3.ServiceEntry, error) {
portList := make([]*v1alpha3.ServicePort, 0)
endpoints := make([]*v1alpha3.WorkloadEntry, 0)

sePort := provider.GetServiceVport(makeHost(app.Name), w.Vport)
for _, instance := range app.Instances {
protocol := common.HTTP
if val, _ := instance.Metadata.GetString("protocol"); val != "" {
Expand All @@ -269,7 +278,13 @@ func generateServiceEntry(app *fargo.Application) (*v1alpha3.ServiceEntry, error
Protocol: protocol.String(),
}
if len(portList) == 0 {
portList = append(portList, port)
if sePort != nil {
sePort.Name = port.Name
sePort.Protocol = port.Protocol
portList = append(portList, sePort)
} else {
portList = append(portList, port)
}
}
endpoint := v1alpha3.WorkloadEntry{
Address: instance.IPAddr,
Expand Down
16 changes: 14 additions & 2 deletions registry/nacos/v2/watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,12 @@ func NewWatcher(cache memory.Cache, opts ...WatcherOption) (provider.Watcher, er
}
}

func WithVport(vport *apiv1.RegistryConfig_VPort) WatcherOption {
return func(w *watcher) {
w.Vport = vport
}
}

func WithNacosAddressServer(nacosAddressServer string) WatcherOption {
return func(w *watcher) {
w.NacosAddressServer = nacosAddressServer
Expand Down Expand Up @@ -529,7 +535,7 @@ func (w *watcher) generateServiceEntry(host string, services []model.Instance) *
portList := make([]*v1alpha3.ServicePort, 0)
endpoints := make([]*v1alpha3.WorkloadEntry, 0)
isDnsService := false

sePort := provider.GetServiceVport(host, w.Vport)
for _, service := range services {
protocol := common.HTTP
if service.Metadata != nil && service.Metadata["protocol"] != "" {
Expand All @@ -541,7 +547,13 @@ func (w *watcher) generateServiceEntry(host string, services []model.Instance) *
Protocol: protocol.String(),
}
if len(portList) == 0 {
portList = append(portList, port)
if sePort != nil {
sePort.Name = port.Name
sePort.Protocol = port.Protocol
portList = append(portList, sePort)
} else {
portList = append(portList, port)
}
}
if !isValidIP(service.Ip) {
isDnsService = true
Expand Down
16 changes: 14 additions & 2 deletions registry/nacos/watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,12 @@ func NewWatcher(cache memory.Cache, opts ...WatcherOption) (provider.Watcher, er
return w, nil
}

func WithVport(vport *apiv1.RegistryConfig_VPort) WatcherOption {
return func(w *watcher) {
w.Vport = vport
}
}

func WithNacosNamespaceId(nacosNamespaceId string) WatcherOption {
return func(w *watcher) {
if nacosNamespaceId == "" {
Expand Down Expand Up @@ -326,7 +332,7 @@ func (w *watcher) getSubscribeCallback(groupName string, serviceName string) fun
func (w *watcher) generateServiceEntry(host string, services []model.SubscribeService) *v1alpha3.ServiceEntry {
portList := make([]*v1alpha3.ServicePort, 0)
endpoints := make([]*v1alpha3.WorkloadEntry, 0)

sePort := provider.GetServiceVport(host, w.Vport)
for _, service := range services {
protocol := common.HTTP
if service.Metadata != nil && service.Metadata["protocol"] != "" {
Expand All @@ -340,7 +346,13 @@ func (w *watcher) generateServiceEntry(host string, services []model.SubscribeSe
Protocol: protocol.String(),
}
if len(portList) == 0 {
portList = append(portList, port)
if sePort != nil {
sePort.Name = port.Name
sePort.Protocol = port.Protocol
portList = append(portList, sePort)
} else {
portList = append(portList, port)
}
}
endpoint := v1alpha3.WorkloadEntry{
Address: service.Ip,
Expand Down
3 changes: 3 additions & 0 deletions registry/reconcile/reconcile.go
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,7 @@ func (r *Reconciler) generateWatcherFromRegistryConfig(registry *apiv1.RegistryC
nacos.WithNacosGroups(registry.NacosGroups),
nacos.WithNacosRefreshInterval(registry.NacosRefreshInterval),
nacos.WithAuthOption(authOption),
nacos.WithVport(registry.Vport),
)
case string(Nacos2), string(Nacos3):
watcher, err = nacosv2.NewWatcher(
Expand All @@ -226,6 +227,7 @@ func (r *Reconciler) generateWatcherFromRegistryConfig(registry *apiv1.RegistryC
nacosv2.WithClusterId(r.clusterId),
nacosv2.WithNamespace(r.namespace),
nacosv2.WithAuthOption(authOption),
nacosv2.WithVport(registry.Vport),
)
case string(Zookeeper):
watcher, err = zookeeper.NewWatcher(
Expand Down Expand Up @@ -266,6 +268,7 @@ func (r *Reconciler) generateWatcherFromRegistryConfig(registry *apiv1.RegistryC
eureka.WithDomain(registry.Domain),
eureka.WithType(registry.Type),
eureka.WithPort(registry.Port),
eureka.WithVport(registry.Vport),
)
default:
return nil, errors.New("unsupported registry type:" + registry.Type)
Expand Down
30 changes: 30 additions & 0 deletions registry/watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,11 @@
package registry

import (
apiv1 "github.com/alibaba/higress/api/networking/v1"
"istio.io/api/networking/v1alpha3"
"istio.io/pkg/log"
"net"
"strings"
"time"
)

Expand Down Expand Up @@ -89,3 +93,29 @@ func ProbeWatcherStatus(host string, port string) WatcherStatus {
_ = conn.Close()
return Healthy
}

func GetServiceVport(host string, vport *apiv1.RegistryConfig_VPort) *v1alpha3.ServicePort {
if vport == nil {
log.Warnf("there is no vport exist for: %s, skip", host)
return nil
}
for _, service := range vport.Services {
if strings.EqualFold(service.Name, host) && isValidPort(service.Value) {
log.Infof("service %s vport exist, use service vport %d", host, service.Value)
return &v1alpha3.ServicePort{
Number: service.Value,
}
}
}
if isValidPort(vport.Default) {
log.Infof("there is only default vport exist, use default vport %d", vport.Default)
return &v1alpha3.ServicePort{
Number: vport.Default,
}
}
return nil
}

func isValidPort(port uint32) bool {
return port > 0 && port <= 65535
}
Loading