Skip to content

Commit aaa0d0b

Browse files
authored
Major refactoring, improvements and arm64 support (#9)
Major refactoring and improvements Restructured monitors to broadcast events for resource changes Refactored heavy reflectors into event handlers for resource changes Added health checks for monitors Refactored watchers to work on single consumer queues Split project into modules Code cleanup Added arm64 support
1 parent 01247c4 commit aaa0d0b

File tree

55 files changed

+1336
-766
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

55 files changed

+1336
-766
lines changed
Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
using Autofac;
2+
3+
namespace ES.Kubernetes.Reflector.CertManager
4+
{
5+
public class CertManagerModule : Module
6+
{
7+
protected override void Load(ContainerBuilder builder)
8+
{
9+
builder.RegisterType<CertificateSecretAnnotator>().AsImplementedInterfaces();
10+
builder.RegisterType<CertificatesMonitor>().AsImplementedInterfaces().SingleInstance();
11+
}
12+
}
13+
}
Lines changed: 147 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,147 @@
1+
using System.Collections.Generic;
2+
using System.Linq;
3+
using System.Net;
4+
using System.Threading;
5+
using System.Threading.Tasks;
6+
using ES.Kubernetes.Reflector.CertManager.Constants;
7+
using ES.Kubernetes.Reflector.CertManager.Events;
8+
using ES.Kubernetes.Reflector.CertManager.Resources;
9+
using ES.Kubernetes.Reflector.Core.Constants;
10+
using k8s;
11+
using k8s.Models;
12+
using MediatR;
13+
using Microsoft.AspNetCore.JsonPatch;
14+
using Microsoft.Extensions.Logging;
15+
using Microsoft.Rest;
16+
using Newtonsoft.Json.Linq;
17+
18+
namespace ES.Kubernetes.Reflector.CertManager
19+
{
20+
public class CertificateSecretAnnotator :
21+
INotificationHandler<InternalCertificateWatcherEvent>,
22+
INotificationHandler<InternalSecretWatcherEvent>
23+
{
24+
private readonly IKubernetes _client;
25+
private readonly ILogger<CertificateSecretAnnotator> _logger;
26+
27+
public CertificateSecretAnnotator(ILogger<CertificateSecretAnnotator> logger, IKubernetes client)
28+
{
29+
_logger = logger;
30+
_client = client;
31+
}
32+
33+
34+
public async Task Handle(InternalCertificateWatcherEvent notification, CancellationToken cancellationToken)
35+
{
36+
if (notification.Type != WatchEventType.Added && notification.Type != WatchEventType.Modified) return;
37+
if (notification.Item.Metadata == null) return;
38+
if (notification.Item.Spec == null) return;
39+
40+
var certificate = notification.Item;
41+
42+
_logger.LogDebug("Certificate {certNs}/{certName} has secret {secretNs}/{secretName}",
43+
certificate.Metadata.NamespaceProperty, certificate.Metadata.Name,
44+
certificate.Metadata.NamespaceProperty, certificate.Spec.SecretName);
45+
V1Secret secret = null;
46+
try
47+
{
48+
secret = await _client.ReadNamespacedSecretAsync(certificate.Spec.SecretName,
49+
certificate.Metadata.NamespaceProperty, cancellationToken: cancellationToken);
50+
}
51+
catch (HttpOperationException ex) when (ex.Response.StatusCode == HttpStatusCode.NotFound)
52+
{
53+
_logger.LogDebug("Could not find matching secret {secretNs}/{secretName}",
54+
certificate.Metadata.NamespaceProperty, certificate.Spec.SecretName);
55+
}
56+
57+
if (secret != null) await Annotate(secret, certificate);
58+
}
59+
60+
public async Task Handle(InternalSecretWatcherEvent notification, CancellationToken cancellationToken)
61+
{
62+
if (notification.Type != WatchEventType.Added && notification.Type != WatchEventType.Modified) return;
63+
64+
var secret = notification.Item;
65+
var metadata = secret.Metadata;
66+
if (metadata.Labels == null) return;
67+
68+
if (metadata.Labels.TryGetValue(CertManagerConstants.CertificateNameLabel, out var certificateName))
69+
{
70+
_logger.LogDebug("Secret {secretNs}/{secretName} belongs to certificate {certNs}/{certName}",
71+
metadata.NamespaceProperty, metadata.Name,
72+
metadata.NamespaceProperty, certificateName);
73+
74+
Certificate certificate = null;
75+
try
76+
{
77+
var certificateJObject = await _client.GetNamespacedCustomObjectAsync(CertManagerConstants.CrdGroup,
78+
notification.CertificateResourceDefinitionVersion, metadata.NamespaceProperty,
79+
CertManagerConstants.CertificatePlural,
80+
certificateName, cancellationToken);
81+
certificate = ((JObject) certificateJObject).ToObject<Certificate>();
82+
}
83+
catch (HttpOperationException exception) when (exception.Response.StatusCode == HttpStatusCode.NotFound)
84+
{
85+
_logger.LogDebug("Could not find certificate {certNs}/{certName}",
86+
metadata.NamespaceProperty, certificateName);
87+
}
88+
89+
if (certificate != null) await Annotate(secret, certificate);
90+
}
91+
}
92+
93+
94+
private async Task Annotate(V1Secret secret, Certificate certificate)
95+
{
96+
var secretAnnotations =
97+
new Dictionary<string, string>(secret.Metadata.Annotations ?? new Dictionary<string, string>());
98+
var original = secretAnnotations.ToDictionary(s => s.Key, s => s.Value);
99+
100+
var certAnnotations =
101+
new Dictionary<string, string>(certificate.Metadata.Annotations ?? new Dictionary<string, string>());
102+
103+
void MatchAnnotations(Dictionary<string, string> pairs)
104+
{
105+
foreach (var pair in pairs)
106+
if (certAnnotations.TryGetValue(pair.Key, out var value))
107+
secretAnnotations[pair.Value] = value;
108+
else
109+
secretAnnotations.Remove(pair.Value);
110+
}
111+
112+
MatchAnnotations(new Dictionary<string, string>
113+
{
114+
{
115+
Annotations.CertManagerCertificate.SecretReflectionAllowed,
116+
Annotations.Reflection.Allowed
117+
},
118+
{
119+
Annotations.CertManagerCertificate.SecretReflectionAllowedNamespaces,
120+
Annotations.Reflection.AllowedNamespaces
121+
}
122+
});
123+
124+
125+
if (secretAnnotations.Count == original.Count &&
126+
secretAnnotations.Keys.All(s => original.ContainsKey(s)) &&
127+
secretAnnotations.All(s => original[s.Key] == s.Value))
128+
{
129+
_logger.LogDebug(
130+
"Secret {secretNs}/{secretName} matches certificate {certNs}/{certName} reflection annotations",
131+
secret.Metadata.NamespaceProperty, secret.Metadata.Name,
132+
certificate.Metadata.NamespaceProperty, certificate.Metadata.Name);
133+
return;
134+
}
135+
136+
_logger.LogInformation(
137+
"Annotating secret {secretNamespace}/{secretName} to match certificate {certificateNamespace}/{certificateName} reflection annotations",
138+
secret.Metadata.NamespaceProperty, secret.Metadata.Name,
139+
certificate.Metadata.NamespaceProperty, certificate.Metadata.Name);
140+
141+
var patch = new JsonPatchDocument<V1Secret>();
142+
patch.Replace(e => e.Metadata.Annotations, secretAnnotations);
143+
await _client.PatchNamespacedSecretWithHttpMessagesAsync(new V1Patch(patch),
144+
secret.Metadata.Name, secret.Metadata.NamespaceProperty);
145+
}
146+
}
147+
}
Lines changed: 133 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,133 @@
1+
using System.Threading;
2+
using System.Threading.Tasks;
3+
using ES.Kubernetes.Reflector.CertManager.Constants;
4+
using ES.Kubernetes.Reflector.CertManager.Events;
5+
using ES.Kubernetes.Reflector.CertManager.Resources;
6+
using ES.Kubernetes.Reflector.Core.Events;
7+
using ES.Kubernetes.Reflector.Core.Monitoring;
8+
using k8s;
9+
using k8s.Models;
10+
using MediatR;
11+
using Microsoft.Extensions.Hosting;
12+
using Microsoft.Extensions.Logging;
13+
14+
namespace ES.Kubernetes.Reflector.CertManager
15+
{
16+
public class CertificatesMonitor : IHostedService,
17+
INotificationHandler<WatcherEvent<V1beta1CustomResourceDefinition>>,
18+
IRequestHandler<HealthCheckRequest<CertificatesMonitor>, bool>
19+
{
20+
private readonly BroadcastWatcher<Certificate, InternalCertificateWatcherEvent> _certificatesWatcher;
21+
private readonly IKubernetes _client;
22+
private readonly ILogger<CertificatesMonitor> _logger;
23+
private readonly BroadcastWatcher<V1Secret, InternalSecretWatcherEvent> _secretsWatcher;
24+
private string _certificateResourceDefinitionVersion;
25+
26+
public CertificatesMonitor(ILogger<CertificatesMonitor> logger,
27+
BroadcastWatcher<Certificate, InternalCertificateWatcherEvent> certificatesWatcher,
28+
BroadcastWatcher<V1Secret, InternalSecretWatcherEvent> secretsWatcher,
29+
IKubernetes client)
30+
{
31+
_logger = logger;
32+
_certificatesWatcher = certificatesWatcher;
33+
_secretsWatcher = secretsWatcher;
34+
_client = client;
35+
36+
37+
_secretsWatcher.OnBeforePublish = e =>
38+
e.CertificateResourceDefinitionVersion = _certificateResourceDefinitionVersion;
39+
_secretsWatcher.OnStateChanged = async (sender, update) =>
40+
{
41+
switch (update.State)
42+
{
43+
case BroadcastWatcherState.Closed:
44+
_logger.LogDebug("Secrets watcher {state}", update.State);
45+
await sender.Start();
46+
break;
47+
case BroadcastWatcherState.Faulted:
48+
_logger.LogError(update.Exception, "Secrets watcher {state}", update.State);
49+
break;
50+
default:
51+
_logger.LogDebug("Secrets watcher {state}", update.State);
52+
break;
53+
}
54+
};
55+
56+
_certificatesWatcher.OnBeforePublish = e =>
57+
e.CertificateResourceDefinitionVersion = _certificateResourceDefinitionVersion;
58+
_certificatesWatcher.OnStateChanged = async (sender, update) =>
59+
{
60+
switch (update.State)
61+
{
62+
case BroadcastWatcherState.Closed:
63+
_logger.LogDebug("Certificates watcher {state}", update.State);
64+
await _secretsWatcher.Stop();
65+
await sender.Start();
66+
await _secretsWatcher.Start();
67+
break;
68+
case BroadcastWatcherState.Faulted:
69+
_logger.LogError(update.Exception, "Certificates watcher {state}", update.State);
70+
break;
71+
default:
72+
_logger.LogDebug("Certificates watcher {state}", update.State);
73+
break;
74+
}
75+
};
76+
}
77+
78+
public Task StartAsync(CancellationToken cancellationToken)
79+
{
80+
_logger.LogDebug("Starting");
81+
82+
_secretsWatcher.RequestFactory = async client =>
83+
await client.ListSecretForAllNamespacesWithHttpMessagesAsync(watch: true);
84+
85+
86+
_logger.LogInformation("Started");
87+
return Task.CompletedTask;
88+
}
89+
90+
public async Task StopAsync(CancellationToken cancellationToken)
91+
{
92+
_logger.LogDebug("Stopping");
93+
await _certificatesWatcher.Stop();
94+
await _secretsWatcher.Stop();
95+
_logger.LogInformation("Stopped");
96+
}
97+
98+
99+
public async Task Handle(WatcherEvent<V1beta1CustomResourceDefinition> request,
100+
CancellationToken cancellationToken)
101+
{
102+
if (request.Type != WatchEventType.Added && request.Type != WatchEventType.Modified) return;
103+
if (request.Item.Spec?.Names == null) return;
104+
if (request.Item.Spec.Group != CertManagerConstants.CrdGroup ||
105+
request.Item.Spec.Names.Kind != CertManagerConstants.CertificateKind) return;
106+
107+
var resourceDefinition = request.Item;
108+
109+
_certificateResourceDefinitionVersion = request.Item.Spec.Version;
110+
_logger.LogInformation("Updating watchers for {kind} version {version}",
111+
CertManagerConstants.CertificateKind, request.Item.Spec.Version);
112+
113+
await _certificatesWatcher.Stop();
114+
await _secretsWatcher.Stop();
115+
116+
_certificatesWatcher.RequestFactory = async client =>
117+
await client.ListClusterCustomObjectWithHttpMessagesAsync(request.Item.Spec.Group,
118+
request.Item.Spec.Version, request.Item.Spec.Names.Plural, watch: true);
119+
120+
await _certificatesWatcher.Start();
121+
await _secretsWatcher.Start();
122+
123+
_logger.LogInformation("Watchers updated for {kind} version {version}",
124+
CertManagerConstants.CertificateKind, request.Item.Spec.Version);
125+
}
126+
127+
public Task<bool> Handle(HealthCheckRequest<CertificatesMonitor> request,
128+
CancellationToken cancellationToken)
129+
{
130+
return Task.FromResult(!_secretsWatcher.IsFaulted && !_certificatesWatcher.IsFaulted);
131+
}
132+
}
133+
}

ES.Kubernetes.Reflector/Constants/CertManagerConstants.cs renamed to ES.Kubernetes.Reflector.CertManager/Constants/CertManagerConstants.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
namespace ES.Kubernetes.Reflector.Constants
1+
namespace ES.Kubernetes.Reflector.CertManager.Constants
22
{
33
public static class CertManagerConstants
44
{
Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
using System.Collections.Generic;
2+
using System.Threading;
3+
using System.Threading.Tasks;
4+
using ES.Kubernetes.Reflector.Core.Events;
5+
using MediatR;
6+
using Microsoft.Extensions.Diagnostics.HealthChecks;
7+
8+
namespace ES.Kubernetes.Reflector.CertManager
9+
{
10+
public class CertManagerHealthCheck : IHealthCheck
11+
{
12+
private readonly IMediator _mediator;
13+
14+
public CertManagerHealthCheck(IMediator mediator)
15+
{
16+
_mediator = mediator;
17+
}
18+
19+
public async Task<HealthCheckResult> CheckHealthAsync(HealthCheckContext context,
20+
CancellationToken cancellationToken = default)
21+
{
22+
var checkMessages = new List<IRequest<bool>>
23+
{
24+
new HealthCheckRequest<CertificatesMonitor>()
25+
};
26+
27+
var healthy = true;
28+
foreach (var checkMessage in checkMessages)
29+
{
30+
var result = await _mediator.Send(checkMessage, cancellationToken);
31+
if (!result)
32+
{
33+
healthy = false;
34+
break;
35+
}
36+
}
37+
38+
return healthy
39+
? HealthCheckResult.Healthy("CertManager extension is healthy")
40+
: HealthCheckResult.Unhealthy("CertManager extension is unhealthy");
41+
}
42+
}
43+
}
Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
<Project Sdk="Microsoft.NET.Sdk">
2+
3+
<PropertyGroup>
4+
<TargetFramework>netstandard2.0</TargetFramework>
5+
</PropertyGroup>
6+
7+
<ItemGroup>
8+
<ProjectReference Include="..\ES.Kubernetes.Reflector.Core\ES.Kubernetes.Reflector.Core.csproj" />
9+
</ItemGroup>
10+
11+
</Project>
Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
using ES.Kubernetes.Reflector.CertManager.Resources;
2+
using ES.Kubernetes.Reflector.Core.Events;
3+
4+
namespace ES.Kubernetes.Reflector.CertManager.Events
5+
{
6+
public class InternalCertificateWatcherEvent : WatcherEvent<Certificate>
7+
{
8+
public string CertificateResourceDefinitionVersion { get; set; }
9+
}
10+
}
Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
using ES.Kubernetes.Reflector.Core.Events;
2+
using k8s.Models;
3+
4+
namespace ES.Kubernetes.Reflector.CertManager.Events
5+
{
6+
public class InternalSecretWatcherEvent : WatcherEvent<V1Secret>
7+
{
8+
public string CertificateResourceDefinitionVersion { get; set; }
9+
}
10+
}

0 commit comments

Comments
 (0)