Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
import org.apache.pekko.cluster.sharding.ClusterSharding;
import org.apache.pekko.cluster.sharding.ClusterShardingSettings;
import org.apache.pekko.event.DiagnosticLoggingAdapter;
import org.apache.pekko.cluster.singleton.ClusterSingletonProxy;
import org.apache.pekko.cluster.singleton.ClusterSingletonProxySettings;
import org.apache.pekko.japi.pf.DeciderBuilder;
import org.eclipse.ditto.base.service.RootChildActorStarter;
import org.eclipse.ditto.base.service.actors.DittoRootActor;
Expand All @@ -33,6 +35,7 @@
import org.eclipse.ditto.connectivity.service.messaging.persistence.ConnectionPersistenceOperationsActor;
import org.eclipse.ditto.connectivity.service.messaging.persistence.ConnectionPersistenceStreamingActorCreator;
import org.eclipse.ditto.connectivity.service.messaging.persistence.ConnectionSupervisorActor;
import org.eclipse.ditto.connectivity.service.messaging.persistence.EncryptionMigrationActor;
import org.eclipse.ditto.edge.service.dispatching.EdgeCommandForwarderActor;
import org.eclipse.ditto.edge.service.dispatching.ShardRegions;
import org.eclipse.ditto.internal.utils.cluster.ClusterUtil;
Expand All @@ -46,6 +49,7 @@
import org.eclipse.ditto.internal.utils.health.config.PersistenceConfig;
import org.eclipse.ditto.internal.utils.namespaces.BlockedNamespaces;
import org.eclipse.ditto.internal.utils.pekko.logging.DittoLoggerFactory;
import org.eclipse.ditto.internal.utils.persistence.mongo.MongoClientWrapper;
import org.eclipse.ditto.internal.utils.persistence.mongo.MongoHealthChecker;
import org.eclipse.ditto.internal.utils.persistence.mongo.streaming.MongoReadJournal;
import org.eclipse.ditto.internal.utils.persistentactors.PersistencePingActor;
Expand Down Expand Up @@ -116,6 +120,8 @@ private ConnectivityRootActor(final ConnectivityConfig connectivityConfig,
ConnectionPersistenceOperationsActor.props(pubSubMediator, connectivityConfig.getMongoDbConfig(),
config, connectivityConfig.getPersistenceOperationsConfig()));

startEncryptionMigrationSingleton(actorSystem, connectivityConfig);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

startEncryptionMigrationSingleton() is called unconditionally, which means every deployment — including those that never use encryption — pays the cost of:

  • A ClusterSingletonManager + ClusterSingletonProxy actor pair
  • A dedicated MongoClientWrapper with its own connection pool


RootChildActorStarter.get(actorSystem, ScopedConfig.dittoExtension(config)).execute(getContext());


Expand Down Expand Up @@ -150,6 +156,21 @@ protected PartialFunction<Throwable, SupervisorStrategy.Directive> getSupervisio
}).build().orElse(super.getSupervisionDecider());
}

private void startEncryptionMigrationSingleton(final ActorSystem actorSystem,
final ConnectivityConfig connectivityConfig) {
final MongoClientWrapper mongoClientWrapper =
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will create also its own connection pool, based on the min/max config of the service.

This is a little much, couldn't we reuse the existing MongoClientWrapper, via MongoClientExtension?

MongoClientWrapper.newInstance(connectivityConfig.getMongoDbConfig());
final String managerName = EncryptionMigrationActor.ACTOR_NAME + "Singleton";
final ActorRef singletonManager = startClusterSingletonActor(
EncryptionMigrationActor.props(connectivityConfig, mongoClientWrapper), managerName);

final ClusterSingletonProxySettings proxySettings =
ClusterSingletonProxySettings.create(actorSystem).withRole(CLUSTER_ROLE);
final Props proxyProps = ClusterSingletonProxy.props(
singletonManager.path().toStringWithoutAddress(), proxySettings);
getContext().actorOf(proxyProps, EncryptionMigrationActor.ACTOR_NAME);
}

private ActorRef startClusterSingletonActor(final Props props, final String name) {
return ClusterUtil.startSingleton(getContext(), CLUSTER_ROLE, name, props);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,16 +31,48 @@ public final class DefaultFieldsEncryptionConfig implements FieldsEncryptionConf
private static final String CONFIG_PATH = "encryption";
private final boolean isEncryptionEnabled;
private final String symmetricalKey;
private final String oldSymmetricalKey;
private final List<String> jsonPointers;
private final int migrationBatchSize;
private final int migrationMaxDocumentsPerMinute;


private DefaultFieldsEncryptionConfig(final ConfigWithFallback config) {
this.isEncryptionEnabled = config.getBoolean(ConfigValue.ENCRYPTION_ENABLED.getConfigPath());
this.symmetricalKey = config.getString(ConfigValue.SYMMETRICAL_KEY.getConfigPath());
this.oldSymmetricalKey = config.getString(ConfigValue.OLD_SYMMETRICAL_KEY.getConfigPath());
this.jsonPointers = Collections.unmodifiableList(
new ArrayList<>(config.getStringList(ConfigValue.JSON_POINTERS.getConfigPath())));
if (isEncryptionEnabled && symmetricalKey.trim().isEmpty()) {
throw new DittoConfigError("Missing Symmetric key. It is mandatory when encryption is enabled for connections!");
this.migrationBatchSize = config.getInt(ConfigValue.MIGRATION_BATCH_SIZE.getConfigPath());
this.migrationMaxDocumentsPerMinute = config.getInt(ConfigValue.MIGRATION_MAX_DOCUMENTS_PER_MINUTE.getConfigPath());

validateConfiguration();
}

private void validateConfiguration() {
final boolean hasSymmetricalKey = !symmetricalKey.trim().isEmpty();
final boolean hasOldKey = !oldSymmetricalKey.trim().isEmpty();

// When encryption is enabled, we must have a current encryption key
if (isEncryptionEnabled && !hasSymmetricalKey) {
throw new DittoConfigError(
"Missing 'symmetrical-key'. It is mandatory when encryption is enabled for connections!");
}

if (migrationBatchSize <= 0) {
throw new DittoConfigError(
"'migration.batch-size' must be greater than 0, was: " + migrationBatchSize);
}
if (migrationMaxDocumentsPerMinute < 0) {
throw new DittoConfigError(
"'migration.max-documents-per-minute' must be >= 0, was: " + migrationMaxDocumentsPerMinute);
}

// If both keys are set, they must be different
if (hasSymmetricalKey && hasOldKey && symmetricalKey.equals(oldSymmetricalKey)) {
throw new DittoConfigError(
"Configuration error: 'symmetrical-key' and 'old-symmetrical-key' must be different! " +
"If you're not rotating keys, remove 'old-symmetrical-key'.");
}
}

Expand All @@ -61,11 +93,26 @@ public String getSymmetricalKey() {
return this.symmetricalKey;
}

@Override
public Optional<String> getOldSymmetricalKey() {
return oldSymmetricalKey.trim().isEmpty() ? Optional.empty() : Optional.of(oldSymmetricalKey);
}

@Override
public List<String> getJsonPointers() {
return this.jsonPointers;
}

@Override
public int getMigrationBatchSize() {
return migrationBatchSize;
}

@Override
public int getMigrationMaxDocumentsPerMinute() {
return migrationMaxDocumentsPerMinute;
}

@Override
public boolean equals(final Object o) {
if (this == o) {
Expand All @@ -77,20 +124,27 @@ public boolean equals(final Object o) {
final DefaultFieldsEncryptionConfig that = (DefaultFieldsEncryptionConfig) o;
return isEncryptionEnabled == that.isEncryptionEnabled &&
Objects.equals(symmetricalKey, that.symmetricalKey) &&
Objects.equals(jsonPointers, that.jsonPointers);
Objects.equals(oldSymmetricalKey, that.oldSymmetricalKey) &&
Objects.equals(jsonPointers, that.jsonPointers) &&
migrationBatchSize == that.migrationBatchSize &&
migrationMaxDocumentsPerMinute == that.migrationMaxDocumentsPerMinute;
}

@Override
public int hashCode() {
return Objects.hash(isEncryptionEnabled, symmetricalKey, jsonPointers);
return Objects.hash(isEncryptionEnabled, symmetricalKey, oldSymmetricalKey, jsonPointers,
migrationBatchSize, migrationMaxDocumentsPerMinute);
}

@Override
public String toString() {
return getClass().getSimpleName() + "[" +
"enabled=" + isEncryptionEnabled +
", symmetricalKey='***'" +
", oldSymmetricalKey='" + (oldSymmetricalKey.trim().isEmpty() ? "not set" : "***") + "'" +
", jsonPointers=" + jsonPointers +
", migrationBatchSize=" + migrationBatchSize +
", migrationMaxDocumentsPerMinute=" + migrationMaxDocumentsPerMinute +
']';
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
package org.eclipse.ditto.connectivity.service.config;

import java.util.List;
import java.util.Optional;

import org.eclipse.ditto.internal.utils.config.KnownConfigValue;

Expand All @@ -32,11 +33,29 @@ public interface FieldsEncryptionConfig {


/**
* Returns the symmetricalKey used for encryption.
* @return the symmetricalKey
* Returns the current symmetrical key used for encryption.
* This is THE key used for encrypting new data.
*
* @return the current symmetrical key
*/
String getSymmetricalKey();

/**
* Returns the old symmetrical key used for decryption fallback during key rotation.
* When set, the system will try to decrypt with the current key first, and fallback to this key if decryption fails.
* <p>
* Typical usage during key rotation:
* <ol>
* <li>Move current key to old-symmetrical-key</li>
* <li>Set new key as symmetrical-key</li>
* <li>Trigger migration via DevOps command</li>
* <li>Remove old-symmetrical-key after migration completes</li>
* </ol>
*
* @return the old symmetrical key, empty if not configured
*/
Optional<String> getOldSymmetricalKey();


/**
* Returns string json pointers to the values of json fields to be encrypted.
Expand All @@ -46,7 +65,20 @@ public interface FieldsEncryptionConfig {
*/
List<String> getJsonPointers();

/**
* Returns the batch size for the encryption migration process.
*
* @return the batch size
*/
int getMigrationBatchSize();

/**
* Returns the maximum number of documents to migrate per minute.
* This throttles the migration stream to avoid overwhelming the database.
*
* @return the maximum documents per minute, 0 means no throttling
*/
int getMigrationMaxDocumentsPerMinute();

/**
* An enumeration of the known config path expressions and their associated default values for {@code FieldsEncryptionConfig}.
Expand All @@ -57,11 +89,20 @@ enum ConfigValue implements KnownConfigValue {
* Determines whether json value encryption is enabled.
*/
ENCRYPTION_ENABLED("encryption-enabled", false),

/**
* The symmetrical key used for encryption.
* The current symmetrical key used for encryption.
* This is THE key used for encrypting all new data.
*/
SYMMETRICAL_KEY("symmetrical-key", ""),

/**
* The old symmetrical key used for decryption fallback during key rotation.
* When set, the system will attempt to decrypt with symmetrical-key first,
* and fallback to this key if decryption fails.
*/
OLD_SYMMETRICAL_KEY("old-symmetrical-key", ""),

/**
* The pointer to the json values to be encrypted.
*/
Expand All @@ -75,7 +116,19 @@ enum ConfigValue implements KnownConfigValue {
"/credentials/parameters/sharedKey",
"/credentials/clientSecret",
"/credentials/password"
));
)),

/**
* The batch size for the encryption migration process.
*/
MIGRATION_BATCH_SIZE("migration.batch-size", 100),

/**
* The maximum number of documents to migrate per minute.
* This throttles the migration stream to avoid overwhelming the database.
* 0 means no throttling.
*/
MIGRATION_MAX_DOCUMENTS_PER_MINUTE("migration.max-documents-per-minute", 200);

private final String configPath;
private final Object defaultValue;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ protected Connection createJsonifiableFrom(final JsonObject jsonObject) {
return ConnectionMigrationUtil.connectionFromJsonWithMigration(jsonObject);
}
final JsonObject decrypted = JsonFieldsEncryptor.decrypt(jsonObject, "", encryptionConfig.getJsonPointers(),
encryptionConfig.getSymmetricalKey());
encryptionConfig.getSymmetricalKey(), encryptionConfig.getOldSymmetricalKey());
return ConnectionMigrationUtil.connectionFromJsonWithMigration(decrypted);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,8 @@ protected JsonObjectBuilder performToJournalMigration(final Event<?> event, fina
@Override
protected JsonObject performFromJournalMigration(final JsonObject jsonObject) {
return JsonFieldsEncryptor.decrypt(jsonObject, ConnectivityConstants.ENTITY_TYPE.toString(),
encryptionConfig.getJsonPointers(), encryptionConfig.getSymmetricalKey());
encryptionConfig.getJsonPointers(), encryptionConfig.getSymmetricalKey(),
encryptionConfig.getOldSymmetricalKey());
}

private static EventRegistry<ConnectivityEvent<?>> createEventRegistry() {
Expand Down
Loading
Loading