Encryption Key Rotation Support#2350
Encryption Key Rotation Support#2350alstanchev wants to merge 5 commits intoeclipse-ditto:masterfrom
Conversation
Implement dual-key configuration and fallback decryption to enable safe encryption key rotation without downtime or data loss. Signed-off-by: Aleksandar Stanchev <[email protected]>
Signed-off-by: Aleksandar Stanchev <[email protected]>
Signed-off-by: Aleksandar Stanchev <[email protected]>
5abcf4d to
110624e
Compare
thjaeckle
left a comment
There was a problem hiding this comment.
PR Review: Encryption Key Rotation Support
Note: This review was supported with the help of an LLM (Claude Code).
+5522 / -48 | 33 files changed
Nice feature — the architecture is solid: ClusterSingleton migration actor, stream-based processing with throttling, progress persistence for resume, and abort support. The separation into DocumentProcessor, MigrationStreamFactory, MigrationProgressTracker, and MigrationContext is clean.
Below are my findings, ordered by severity.
Critical Issues
1. Actor state mutation from non-actor thread
In EncryptionMigrationActor.handleMigration(), the resume path mutates actor state directly inside a thenCompose callback, which runs on a CompletionStage thread, not the actor thread:
migrationResult = progressTracker.loadProgress().thenCompose(optProgress -> {
if (optProgress.isEmpty() || PHASE_COMPLETED.equals(optProgress.get().phase)) {
// ...
migrationInProgress = false; // ⚠️ UNSAFE: not on actor thread
currentProgress = completed; // ⚠️ UNSAFE: not on actor thread
sender.tell(...);
return CompletableFuture.completedFuture(completed);
}
// ...
});This violates the core Pekko actor concurrency rule. These state changes must be piped back to the actor via self.tell(), similar to how MigrationCompleted is already used for the normal completion path.
Design Concerns
2. Missing Helm chart updates
New HOCON configuration requires corresponding Helm chart updates. The following new config values have no Helm equivalents:
old-symmetrical-key/CONNECTIVITY_CONNECTION_OLD_ENCRYPTION_KEYmigration.batch-size/CONNECTIVITY_ENCRYPTION_MIGRATION_BATCH_SIZEmigration.max-documents-per-minute/CONNECTIVITY_ENCRYPTION_MIGRATION_MAX_DOCS_PER_MINUTE
3. Own MongoClient instance
EncryptionMigrationActor creates its own MongoClientWrapper rather than reusing the existing one from the service. This means an additional MongoDB connection pool per cluster node. Consider accepting a MongoClientWrapper as a constructor parameter instead.
4. Batch size default mismatch
FieldsEncryptionConfig.ConfigValue.MIGRATION_BATCH_SIZE defaults to 100 in code, but connectivity.conf overrides it to 1. The effective default is 1 document per batch, which is extremely conservative. The grouped(batchSize) in the stream means each MongoDB bulk write contains only 1 document — is this intentional?
5. MigrationProgress public fields
MigrationProgress exposes all fields as public final, which is atypical for the Ditto codebase. Using private fields with getters would be more consistent, or making it a record would be more idiomatic.
Behavioral Changes to Call Out
6. /credentials/cert added to encryption pointers
The diff adds "/credentials/cert" to the default json-pointers list in connectivity.conf. This is an independent behavioral change not mentioned in the PR description — existing connections with credentials/cert will now have that field encrypted on next snapshot write. Should this be in a separate commit or at least mentioned in the description?
Signed-off-by: Aleksandar Stanchev <[email protected]>
110624e to
3dcf373
Compare
Signed-off-by: Aleksandar Stanchev <[email protected]>
thjaeckle
left a comment
There was a problem hiding this comment.
@alstanchev I left some comments - Regarding the actor field changes outside of the actor thread we must IMO be really careful. That is something which will eventually fail - with very strange behavior ;)
| } else { | ||
| migrationResult = deleteProgress().thenCompose(v -> | ||
| migrationResult = progressTracker.deleteProgress().thenCompose(v -> | ||
| runMigration(new MigrationProgress(), oldKey, newKey, pointers, dryRun)); |
There was a problem hiding this comment.
oldKey and newKey might be null and must therefore be specified as @Nullable in the runMigration method
| ? "no previous migration found" : "previous migration already completed"; | ||
| LOG.info("Resume requested but {}, nothing to do", reason); | ||
| log.info("Resume requested but {}, nothing to do", reason); | ||
| migrationInProgress = false; |
There was a problem hiding this comment.
Still this modifies the actor state on another thread than the actor thread - which might lead to unexpected behavior.
If you are sure this is no problem in this case, it could be acceptable - but shoul be documented then.
| migrationInProgress = false; | ||
| final MigrationProgress completed = optProgress.orElseGet(MigrationProgress::new) | ||
| .withPhase(PHASE_COMPLETED); | ||
| currentProgress = completed; |
There was a problem hiding this comment.
Still this modifies the actor state on another thread than the actor thread - which might lead to unexpected behavior.
If you are sure this is no problem in this case, it could be acceptable - but shoul be documented then.
There was a problem hiding this comment.
Writing migrationInProgress and currentProgress here races with handleStatus(), handleAbort(), and handleMigration() on the actor thread.
There was a problem hiding this comment.
Suggestion: Pipe the "nothing to resume" result back to the actor via self.tell() with a dedicated message (similar to MigrationCompleted), and handle the state transition on the actor thread.
|
|
||
| private boolean migrationInProgress = false; | ||
| private boolean currentDryRun = false; | ||
| private volatile boolean abortRequested = false; |
There was a problem hiding this comment.
What should the volatile do in the context of an actor?
Likely the same "smell" of accessing this field not always via the actor thread, but across several threads.
This pattern screams a little for concurrency issues coming up and is concerning.
|
|
||
| startChildActor(EncryptionMigrationActor.ACTOR_NAME, | ||
| EncryptionMigrationActor.props(connectivityConfig)); | ||
| startEncryptionMigrationSingleton(actorSystem, connectivityConfig); |
There was a problem hiding this comment.
startEncryptionMigrationSingleton() is called unconditionally, which means every deployment — including those that never use encryption — pays the cost of:
- A ClusterSingletonManager + ClusterSingletonProxy actor pair
- A dedicated MongoClientWrapper with its own connection pool
|
|
||
| private void startEncryptionMigrationSingleton(final ActorSystem actorSystem, | ||
| final ConnectivityConfig connectivityConfig) { | ||
| final MongoClientWrapper mongoClientWrapper = |
There was a problem hiding this comment.
This will create also its own connection pool, based on the min/max config of the service.
This is a little much, couldn't we reuse the existing MongoClientWrapper, via MongoClientExtension?
| value: "{{ .Values.connectivity.config.connections.kafka.producer.parallelism }}" | ||
| - name: PEKKO_HTTP_HOSTPOOL_MAX_CONNECTION_LIFETIME | ||
| value: "{{ .Values.connectivity.config.connections.httpPush.maxConnectionLifetime }}" | ||
| {{- if .Values.connectivity.config.connections.encryption.enabled }} |
There was a problem hiding this comment.
disable-encryption workflow won't get the old key
The encryption env vars are only set inside {{- if .Values.connectivity.config.connections.encryption.enabled }}.
But the "disable encryption" workflow requires encryption.enabled = false with an old
key set. A Helm user following the documented disable workflow would need to manually add the old key env var outside the Helm chart, which is error-prone.
This PR implements encryption key rotation for Ditto's connectivity service using a dual-key approach. The feature enables secure rotation
of AES-256-GCM encryption keys for sensitive connection data (credentials, URIs) without service interruption.
Key Features:
decryption
batches
Implements #2340