Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
219 changes: 219 additions & 0 deletions bridge/bridge.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"sync"

dockerapi "github.com/fsouza/go-dockerclient"
"github.com/docker/docker/api/types/swarm"
)

var serviceIDPattern = regexp.MustCompile(`^(.+?):([a-zA-Z0-9][a-zA-Z0-9_.-]+):[0-9]+(?::udp)?$`)
Expand Down Expand Up @@ -116,6 +117,11 @@ func (b *Bridge) Sync(quiet bool) {
}
}


// Sync Swarm services
b.SyncSwarmServices()


// Clean up services that were registered previously, but aren't
// acknowledged within registrator
if b.config.Cleanup {
Expand Down Expand Up @@ -216,6 +222,29 @@ func (b *Bridge) add(containerId string, quiet bool) {
return
}

for _, port := range ports {
if b.config.Internal != true && port.HostPort == "" {
if !quiet {
log.Println("ignored:", container.ID[:12], "port", port.ExposedPort, "not published on host")
}
continue
}
service := b.newService(port, len(ports) > 1)
if service == nil {
if !quiet {
log.Println("ignored:", container.ID[:12], "service on port", port.ExposedPort)
}
continue
}
err := b.registry.Register(service)
if err != nil {
log.Println("register failed:", service, err)
continue
}
b.services[container.ID] = append(b.services[container.ID], service)
log.Println("added:", container.ID[:12], service.ID)
}

servicePorts := make(map[string]ServicePort)
for key, port := range ports {
if b.config.Internal != true && port.HostPort == "" {
Expand Down Expand Up @@ -244,6 +273,78 @@ func (b *Bridge) add(containerId string, quiet bool) {
b.services[container.ID] = append(b.services[container.ID], service)
log.Println("added:", container.ID[:12], service.ID)
}

// if swarm container belongs to swarm mode service, publish VIP services
if swarmServiceName, ok := container.Config.Labels["com.docker.swarm.service.name"]; ok {
filters := map[string][]string{"name": {swarmServiceName}}
services, err := b.docker.ListServices(dockerapi.ListServicesOptions{Filters: filters})
if err != nil {
log.Println("error listing swarm services, wont register VIP service", err)
} else if len(services) == 1 { // container cannot belong to no or more than one service
if services[0].Spec.EndpointSpec != nil {
mode := services[0].Spec.EndpointSpec.Mode
if mode == swarm.ResolutionModeVIP { // endpoint should be VIP
if (len(services[0].Endpoint.VirtualIPs) > 0) {
b.registerSwarmVipServices(services[0])
}
}
}

}
}
}

func (b *Bridge) SyncSwarmServices() {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Whats the purpose of this function? Why remove already registered services?

// get existing swarm services
servicefilters := map[string][]string{}
swarmServices, err := b.docker.ListServices(dockerapi.ListServicesOptions{Filters: servicefilters})
if err != nil {
log.Println("error listing swarm services, wont register VIP service", err)
}

// get register services
myservices, err := b.registry.Services()
if err != nil {
log.Println("error listing registry services", err)
}

// remove register services doesn't exist in swarm services
for _, myservice := range myservices {
for _, tag := range myservice.Tags {
if tag == "vip-outside" {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

According to documentation only consul backend support tags currently, so this will not work with other backends.

founded := false
for _, swarmService := range swarmServices {
if swarmService.Spec.Name == myservice.Name {
founded = true
} else if swarmService.Spec.Name == strings.Split(myservice.Name, "-")[0] {
founded = true
} else {
for _, env := range swarmService.Spec.TaskTemplate.ContainerSpec.Env {
split_string := strings.Split(env, "_")
if split_string[0] == "SERVICE" && len(split_string) >= 3 {
if strings.Split(split_string[2], "=")[0] == "NAME" {
if strings.Split(split_string[2], "=")[1] == myservice.Name {
founded = true
}
}
} else if split_string[0] == "SERVICE" && len(split_string) == 2 {
if strings.Split(split_string[1], "=")[0] == "NAME" {
if strings.Split(split_string[1], "=")[1] == myservice.Name {
founded = true
}
}
}
}
}
}

if founded == false {
b.registry.Deregister(myservice)
log.Println("remove:", myservice.Name)
}
}
}
}
}

func (b *Bridge) newService(port ServicePort, isgroup bool) *Service {
Expand Down Expand Up @@ -275,6 +376,16 @@ func (b *Bridge) newService(port ServicePort, isgroup bool) *Service {

service := new(Service)
service.Origin = port

// consider swarm mode
if swarmServiceName, ok := port.container.Config.Labels["com.docker.swarm.service.name"]; ok {
// swarm mode has concept of services
service.Name = mapDefault(metadata, "name", swarmServiceName)
} else {
// use node id, which is more reliable
service.Name = mapDefault(metadata, "name", defaultName)
}

service.ID = hostname + ":" + container.Name[1:] + ":" + port.ExposedPort
service.Name = mapDefault(metadata, "name", defaultName)
if isgroup && !metadataFromPort["name"] {
Expand Down Expand Up @@ -347,6 +458,111 @@ func (b *Bridge) newService(port ServicePort, isgroup bool) *Service {
return service
}

// there are two types of endpoints VIP and DNS rr based
// DNS rr happens implicitly by registering multiple services with the same name
// so that no extra effort is required
// in case of VIP based services, user specifies the published ports
// which are equivalent of docker port binding, but works differently
// swarm mode provides ingress network, where services are load-balanced
// behind VIP address. From inside network (if there any) perspective
// only one service is need, with swarm mode assigned VIP address.
// From outside perspective, every docker host IP address becomes an entry point
// for load-balancer, so published ports shall be registered for each docker host
func (b *Bridge) registerSwarmVipServices(service swarm.Service) {
// if internal, register the internal VIP services
if b.config.Internal {
for _, vip := range service.Endpoint.VirtualIPs {
if network, err := b.docker.NetworkInfo(vip.NetworkID); err != nil {
log.Println("unable to inspect network while evaluating VIPs for service:", service.Spec.Name, err)
} else {
// no point to publish docker swarm internal ingress network VIP
if network.Name != "ingress" && len(vip.Addr) > 0 && strings.Contains(vip.Addr, "/") {
vipAddr := strings.Split(vip.Addr, "/")[0]
if len(service.Endpoint.Ports) > 0 {
b.registerSwarmVipServicePorts(service.Spec.Name, true, vipAddr, service.Endpoint.Ports, service.Spec.TaskTemplate.ContainerSpec)
}
}
}
}
} else {
// if there is no published ports, no point to register it out side
if len(service.Endpoint.Ports) > 0 {
b.registerSwarmVipServicePorts(service.Spec.Name, false, b.config.HostIp, service.Endpoint.Ports, service.Spec.TaskTemplate.ContainerSpec)
}
}
}

// current implementation attempts to register VIP service every container add event
// better way could be to listen for service create events, however according to
// docker configuration there is no such events
// registrations created here are unique, and not based on containers
// so we will just create them and forget, i don't see proper way to cleanup them at the moment
func (b *Bridge) registerSwarmVipServicePorts(serviceName string, inside bool, vip string, ports []swarm.PortConfig, config *swarm.ContainerSpec) {
for _, port := range ports {
b.registerSwarmVipService(serviceName, inside, vip, true, int(port.PublishedPort), port.Protocol, int(port.TargetPort), config)
}
}

func (b *Bridge) registerSwarmVipService(serviceName string, inside bool, vip string, isGroup bool, port int, protocol swarm.PortConfigProtocol, targetPort int, config *swarm.ContainerSpec) {

var tag string
if tag = "vip-outside"; inside {
tag = "vip-inside"
}

service := new(Service)
defaultName := serviceName + "-" + strconv.Itoa(port)

metadata, _ := swarmServiceMetaData(config, strconv.Itoa(targetPort))
service.Name = mapDefault(metadata, "name", defaultName)
// for _, env := range envs {
// envSplited := strings.Split(env, "_")
// if len(envSplited) == 3 {
// if envSplited[0] == "SERVICE" {
// envPort, err := strconv.Atoi(envSplited[1])
// if err != nil {
// log.Println("Impossile to converse str to int", err)
// }
// if envPort == targetPort {
// if strings.Split(envSplited[2], "=")[0] == "NAME" {
// service.Name = strings.Split(envSplited[2], "=")[1]
// } else if strings.Split(envSplited[2], "=")[0] == "TAGS" {
// tag = "vip-outside," + strings.Split(envSplited[2], "=")[1]
// }
// }
// }
// }
// }

if inside {
// VIP is global and singleton, so we can use service name as service id
service.ID = service.Name
Copy link

@marcuslinke marcuslinke Sep 8, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

service.ID has to be in the format HOST_NAME:CONTAINER_NAME:EXPOSED_PORT. Otherwise it will not be cleaned up properly. See https://github.com/alterway/registrator/blob/175e46ca15cc9b9a66ee1e61daa0f49e9b62b090/bridge/bridge.go#L160

} else {
// VIP is actually host ip address or whatever provided by user
service.ID = b.config.NodeId + "-" + service.Name
}
// tag it for convenience
if protocol != swarm.PortConfigProtocolTCP {
service.Tags = combineTags(
mapDefault(metadata, "tags", ""), b.config.ForceTags, tag, string(protocol))
} else {
service.Tags = combineTags(
mapDefault(metadata, "tags", ""), b.config.ForceTags, tag)
}

delete(metadata, "name")
delete(metadata, "tags")
service.IP = vip
service.Port = port
service.Attrs = metadata

err := b.registry.Register(service)
if err != nil {
log.Println("register failed:", service.Name, err)
}
log.Println("added:", service.Name)
}

func (b *Bridge) remove(containerId string, deregister bool) {
b.Lock()
defer b.Unlock()
Expand All @@ -372,6 +588,9 @@ func (b *Bridge) remove(containerId string, deregister bool) {
b.deadContainers[containerId] = &DeadContainer{b.config.RefreshTtl, b.services[containerId]}
}
delete(b.services, containerId)

// Consider swarm service
b.SyncSwarmServices()
}

// bit set on ExitCode if it represents an exit via a signal
Expand Down
1 change: 1 addition & 0 deletions bridge/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ type RegistryAdapter interface {
}

type Config struct {
NodeId string
HostIp string
Internal bool
UseIpFromLabel string
Expand Down
31 changes: 31 additions & 0 deletions bridge/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (

"github.com/cenkalti/backoff"
dockerapi "github.com/fsouza/go-dockerclient"
"github.com/docker/docker/api/types/swarm"
)

func retry(fn func() error) error {
Expand Down Expand Up @@ -84,6 +85,36 @@ func serviceMetaData(config *dockerapi.Config, port string) (map[string]string,
return metadata, metadataFromPort
}

func swarmServiceMetaData(config *swarm.ContainerSpec, port string) (map[string]string, map[string]bool) {
meta := config.Env
for k, v := range config.Labels {
meta = append(meta, k+"="+v)
}
metadata := make(map[string]string)
metadataFromPort := make(map[string]bool)
for _, kv := range meta {
kvp := strings.SplitN(kv, "=", 2)
if strings.HasPrefix(kvp[0], "SERVICE_") && len(kvp) > 1 {
key := strings.ToLower(strings.TrimPrefix(kvp[0], "SERVICE_"))
if metadataFromPort[key] {
continue
}
portkey := strings.SplitN(key, "_", 2)
_, err := strconv.Atoi(portkey[0])
if err == nil && len(portkey) > 1 {
if portkey[0] != port {
continue
}
metadata[portkey[1]] = kvp[1]
metadataFromPort[portkey[1]] = true
} else {
metadata[key] = kvp[1]
}
}
}
return metadata, metadataFromPort
}

func servicePort(container *dockerapi.Container, port dockerapi.Port, published []dockerapi.PortBinding) ServicePort {
var hp, hip, ep, ept, eip, nm string
if len(published) > 0 {
Expand Down