From ad0ee016be2e0dcd57891cdc7ae7469e0f2770a6 Mon Sep 17 00:00:00 2001 From: Manish Dait Date: Tue, 1 Jul 2025 19:30:25 +0530 Subject: [PATCH 1/8] added method to subscribe topic Signed-off-by: Manish Dait --- .../openelements/hiero/base/HieroContext.java | 5 ++++ .../openelements/hiero/base/TopicClient.java | 23 ++++++++++++++++ .../hiero/base/config/HieroConfig.java | 10 ++++++- .../hiero/base/config/NetworkSettings.java | 8 ++++++ .../config/hedera/HederaMainnetSettings.java | 3 +++ .../config/hedera/HederaTestnetSettings.java | 3 +++ .../ProtocolLayerClientImpl.java | 2 +- .../base/implementation/TopicClientImpl.java | 23 ++++++++++++++-- .../protocol/data/TopicMessageResult.java | 12 ++++++++- .../base/test/ProtocolLayerClientTests.java | 7 +++++ .../base/test/config/HieroTestContext.java | 10 +++++++ .../config/SoloActionNetworkSettings.java | 3 +++ .../implementation/HieroConfigImpl.java | 9 +++++++ .../HieroAutoConfiguration.java | 4 ++- .../implementation/HieroConfigImpl.java | 9 +++++++ .../hiero/spring/test/TopicClientTest.java | 27 +++++++++++++++++++ .../hiero/test/SoloActionNetworkSettings.java | 3 +++ 17 files changed, 155 insertions(+), 6 deletions(-) diff --git a/hiero-enterprise-base/src/main/java/com/openelements/hiero/base/HieroContext.java b/hiero-enterprise-base/src/main/java/com/openelements/hiero/base/HieroContext.java index 98e72228..4493b932 100644 --- a/hiero-enterprise-base/src/main/java/com/openelements/hiero/base/HieroContext.java +++ b/hiero-enterprise-base/src/main/java/com/openelements/hiero/base/HieroContext.java @@ -4,6 +4,8 @@ import com.openelements.hiero.base.data.Account; import org.jspecify.annotations.NonNull; +import java.util.Set; + /** * Context for a specific Hiero connection to a network. */ @@ -25,4 +27,7 @@ public interface HieroContext { */ @NonNull Client getClient(); + + @NonNull + Set getMirrorNodeEndPoint(); } diff --git a/hiero-enterprise-base/src/main/java/com/openelements/hiero/base/TopicClient.java b/hiero-enterprise-base/src/main/java/com/openelements/hiero/base/TopicClient.java index bdd9c5d5..ab298aa2 100644 --- a/hiero-enterprise-base/src/main/java/com/openelements/hiero/base/TopicClient.java +++ b/hiero-enterprise-base/src/main/java/com/openelements/hiero/base/TopicClient.java @@ -1,10 +1,13 @@ package com.openelements.hiero.base; import com.hedera.hashgraph.sdk.PrivateKey; +import com.hedera.hashgraph.sdk.SubscriptionHandle; import com.hedera.hashgraph.sdk.TopicId; +import com.hedera.hashgraph.sdk.TopicMessage; import org.jspecify.annotations.NonNull; import java.util.Objects; +import java.util.function.Consumer; /** * Interface for interacting with a Hiero network. This interface provides methods for interacting with Hiero Topic, @@ -317,4 +320,24 @@ default void submitMessage(@NonNull String topicId, @NonNull String submitKey, @ Objects.requireNonNull(message, "message cannot be null"); submitMessage(TopicId.fromString(topicId), PrivateKey.fromString(submitKey), message); }; + + /** + * Subscribe to a Topic + * + * @param topicId the topicId of topic + * @return SubscriptionHandle for the Topic + * @throws HieroException if Topic could not be subscribed + */ + SubscriptionHandle subscribeTopic(@NonNull TopicId topicId, @NonNull Consumer subscription) throws HieroException; + + /** + * Subscribe to a Topic + * + * @param topicId the topicId of topic + * @return SubscriptionHandle for the Topic + * @throws HieroException if Topic could not be subscribed + */ + default SubscriptionHandle subscribeTopic(@NonNull String topicId, @NonNull Consumer subscription) throws HieroException { + return subscribeTopic(TopicId.fromString(topicId), subscription); + } } diff --git a/hiero-enterprise-base/src/main/java/com/openelements/hiero/base/config/HieroConfig.java b/hiero-enterprise-base/src/main/java/com/openelements/hiero/base/config/HieroConfig.java index 02dc41d5..d88b6f05 100644 --- a/hiero-enterprise-base/src/main/java/com/openelements/hiero/base/config/HieroConfig.java +++ b/hiero-enterprise-base/src/main/java/com/openelements/hiero/base/config/HieroConfig.java @@ -41,6 +41,9 @@ public interface HieroConfig { @NonNull Set getMirrorNodeAddresses(); + @NonNull + Set getConsensusServiceAddress(); + /** * Returns the consensus nodes. * @@ -88,6 +91,11 @@ default HieroContext createHieroContext() { public @NonNull Client getClient() { return client; } + + @Override + public @NonNull Set getMirrorNodeEndPoint() { + return getMirrorNodeAddresses(); + } }; } @@ -102,7 +110,7 @@ default Client createClient() { final Map nodes = getConsensusNodes().stream() .collect(Collectors.toMap(n -> n.getAddress(), n -> n.getAccountId())); final Client client = Client.forNetwork(nodes); - final List mirrorNodeAddresses = getMirrorNodeAddresses().stream().collect(Collectors.toList()); + final List mirrorNodeAddresses = getConsensusServiceAddress().stream().collect(Collectors.toList()); client.setMirrorNetwork(mirrorNodeAddresses); client.setOperator(getOperatorAccount().accountId(), getOperatorAccount().privateKey()); getRequestTimeout().ifPresent(client::setRequestTimeout); diff --git a/hiero-enterprise-base/src/main/java/com/openelements/hiero/base/config/NetworkSettings.java b/hiero-enterprise-base/src/main/java/com/openelements/hiero/base/config/NetworkSettings.java index b8516097..3b7b7442 100644 --- a/hiero-enterprise-base/src/main/java/com/openelements/hiero/base/config/NetworkSettings.java +++ b/hiero-enterprise-base/src/main/java/com/openelements/hiero/base/config/NetworkSettings.java @@ -40,6 +40,14 @@ public interface NetworkSettings { @NonNull Set getMirrorNodeAddresses(); + /** + * Returns the consensus service address. + * + * @return the consensus service addresses + */ + @NonNull + Set getConsensusServiceAddress(); + /** * Returns the consensus nodes. * diff --git a/hiero-enterprise-base/src/main/java/com/openelements/hiero/base/config/hedera/HederaMainnetSettings.java b/hiero-enterprise-base/src/main/java/com/openelements/hiero/base/config/hedera/HederaMainnetSettings.java index 4c0f8a4e..051c36e0 100644 --- a/hiero-enterprise-base/src/main/java/com/openelements/hiero/base/config/hedera/HederaMainnetSettings.java +++ b/hiero-enterprise-base/src/main/java/com/openelements/hiero/base/config/hedera/HederaMainnetSettings.java @@ -31,6 +31,9 @@ public final class HederaMainnetSettings implements NetworkSettings { return Set.of("https://mainnet.mirrornode.hedera.com:443"); } + @Override + public @NonNull Set getConsensusServiceAddress() {return Set.of("mainnet.mirrornode.hedera.com:443");} + @Override public @NonNull Set getConsensusNodes() { return Set.of(new ConsensusNode("35.186.191.247", "50211", "0.0.4")); diff --git a/hiero-enterprise-base/src/main/java/com/openelements/hiero/base/config/hedera/HederaTestnetSettings.java b/hiero-enterprise-base/src/main/java/com/openelements/hiero/base/config/hedera/HederaTestnetSettings.java index 5b4bd7c7..9639ce7e 100644 --- a/hiero-enterprise-base/src/main/java/com/openelements/hiero/base/config/hedera/HederaTestnetSettings.java +++ b/hiero-enterprise-base/src/main/java/com/openelements/hiero/base/config/hedera/HederaTestnetSettings.java @@ -31,6 +31,9 @@ public final class HederaTestnetSettings implements NetworkSettings { return Set.of("https://testnet.mirrornode.hedera.com:443"); } + @Override + public @NonNull Set getConsensusServiceAddress() {return Set.of("testnet.mirrornode.hedera.com:443");} + @Override public @NonNull Set getConsensusNodes() { return Set.of(new ConsensusNode("0.testnet.hedera.com", "50211", "0.0.3")); diff --git a/hiero-enterprise-base/src/main/java/com/openelements/hiero/base/implementation/ProtocolLayerClientImpl.java b/hiero-enterprise-base/src/main/java/com/openelements/hiero/base/implementation/ProtocolLayerClientImpl.java index 8bc6fa8e..83d7ff47 100644 --- a/hiero-enterprise-base/src/main/java/com/openelements/hiero/base/implementation/ProtocolLayerClientImpl.java +++ b/hiero-enterprise-base/src/main/java/com/openelements/hiero/base/implementation/ProtocolLayerClientImpl.java @@ -434,7 +434,7 @@ public TopicMessageResult executeTopicMessageQuery(TopicMessageRequest request) query.setLimit(request.limit()); } final SubscriptionHandle subscribe = query.subscribe(hieroContext.getClient(), request.subscription()); - return new TopicMessageResult(); + return new TopicMessageResult(subscribe); } catch (final Exception e) { throw new HieroException("Failed to execute query message transaction", e); } diff --git a/hiero-enterprise-base/src/main/java/com/openelements/hiero/base/implementation/TopicClientImpl.java b/hiero-enterprise-base/src/main/java/com/openelements/hiero/base/implementation/TopicClientImpl.java index bc060faa..5bb1dd84 100644 --- a/hiero-enterprise-base/src/main/java/com/openelements/hiero/base/implementation/TopicClientImpl.java +++ b/hiero-enterprise-base/src/main/java/com/openelements/hiero/base/implementation/TopicClientImpl.java @@ -1,15 +1,24 @@ package com.openelements.hiero.base.implementation; import com.hedera.hashgraph.sdk.PrivateKey; +import com.hedera.hashgraph.sdk.SubscriptionHandle; import com.hedera.hashgraph.sdk.TopicId; +import com.hedera.hashgraph.sdk.TopicMessage; import com.openelements.hiero.base.HieroException; import com.openelements.hiero.base.TopicClient; import com.openelements.hiero.base.data.Account; import com.openelements.hiero.base.protocol.ProtocolLayerClient; -import com.openelements.hiero.base.protocol.data.*; +import com.openelements.hiero.base.protocol.data.TopicCreateRequest; +import com.openelements.hiero.base.protocol.data.TopicCreateResult; +import com.openelements.hiero.base.protocol.data.TopicUpdateRequest; +import com.openelements.hiero.base.protocol.data.TopicDeleteRequest; +import com.openelements.hiero.base.protocol.data.TopicSubmitMessageRequest; +import com.openelements.hiero.base.protocol.data.TopicMessageRequest; +import com.openelements.hiero.base.protocol.data.TopicMessageResult; import org.jspecify.annotations.NonNull; import java.util.Objects; +import java.util.function.Consumer; public class TopicClientImpl implements TopicClient { private final ProtocolLayerClient client; @@ -107,7 +116,7 @@ public void updateTopic(@NonNull TopicId topicId, @NonNull PrivateKey updatedAdm Objects.requireNonNull(topicId, "topicId must not be null"); Objects.requireNonNull(submitKey, "submitKey must not be null"); Objects.requireNonNull(memo, "memo must not be null"); - updateTopic(topicId, operationalAccount.privateKey(), updatedAdminKey, submitKey, memo); + updateTopic(topicId, operationalAccount.privateKey(), updatedAdminKey, submitKey, memo); } @Override @@ -202,4 +211,14 @@ public void submitMessage(@NonNull TopicId topicId, @NonNull PrivateKey submitKe TopicSubmitMessageRequest request = TopicSubmitMessageRequest.of(topicId, submitKey, message); client.executeTopicMessageSubmitTransaction(request); } + + @Override + public SubscriptionHandle subscribeTopic(@NonNull TopicId topicId, @NonNull Consumer subscription) + throws HieroException { + Objects.requireNonNull(topicId, "topicId must not be null"); + Objects.requireNonNull(subscription, "subscription must not be null"); + TopicMessageRequest request = TopicMessageRequest.of(topicId, subscription); + TopicMessageResult result = client.executeTopicMessageQuery(request); + return result.subscriptionHandle(); + } } diff --git a/hiero-enterprise-base/src/main/java/com/openelements/hiero/base/protocol/data/TopicMessageResult.java b/hiero-enterprise-base/src/main/java/com/openelements/hiero/base/protocol/data/TopicMessageResult.java index 29781979..ecc31d24 100644 --- a/hiero-enterprise-base/src/main/java/com/openelements/hiero/base/protocol/data/TopicMessageResult.java +++ b/hiero-enterprise-base/src/main/java/com/openelements/hiero/base/protocol/data/TopicMessageResult.java @@ -1,4 +1,14 @@ package com.openelements.hiero.base.protocol.data; -public record TopicMessageResult() { +import com.hedera.hashgraph.sdk.Status; +import com.hedera.hashgraph.sdk.SubscriptionHandle; +import com.hedera.hashgraph.sdk.TransactionId; +import org.jspecify.annotations.NonNull; + +import java.util.Objects; + +public record TopicMessageResult(@NonNull SubscriptionHandle subscriptionHandle) { + public TopicMessageResult { + Objects.requireNonNull(subscriptionHandle, "subscriptionHandle must not be null"); + } } diff --git a/hiero-enterprise-base/src/test/java/com/openelements/hiero/base/test/ProtocolLayerClientTests.java b/hiero-enterprise-base/src/test/java/com/openelements/hiero/base/test/ProtocolLayerClientTests.java index 9f42e887..30a0e724 100644 --- a/hiero-enterprise-base/src/test/java/com/openelements/hiero/base/test/ProtocolLayerClientTests.java +++ b/hiero-enterprise-base/src/test/java/com/openelements/hiero/base/test/ProtocolLayerClientTests.java @@ -11,6 +11,8 @@ import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; +import java.util.Set; + public class ProtocolLayerClientTests { @Test @@ -31,6 +33,11 @@ void testNullParams() { public @NonNull Client getClient() { return null; } + + @Override + public @NonNull Set getMirrorNodeEndPoint() { + return null; + } }; final ProtocolLayerClient client = new ProtocolLayerClientImpl(context); diff --git a/hiero-enterprise-base/src/test/java/com/openelements/hiero/base/test/config/HieroTestContext.java b/hiero-enterprise-base/src/test/java/com/openelements/hiero/base/test/config/HieroTestContext.java index 05a844c4..c4761fb1 100644 --- a/hiero-enterprise-base/src/test/java/com/openelements/hiero/base/test/config/HieroTestContext.java +++ b/hiero-enterprise-base/src/test/java/com/openelements/hiero/base/test/config/HieroTestContext.java @@ -11,6 +11,8 @@ import java.util.HashMap; import java.util.Map; import java.util.Optional; +import java.util.Set; + import org.jspecify.annotations.NonNull; import org.slf4j.Logger; @@ -22,6 +24,8 @@ public class HieroTestContext implements HieroContext { private final Client client; + private final Set mirronNodeEnpoint; + public HieroTestContext() { final Dotenv dotenv = Dotenv.configure().ignoreIfMissing().load(); @@ -57,6 +61,7 @@ public HieroTestContext() { final Map nodes = new HashMap<>(); networkSettings.getConsensusNodes() .forEach(consensusNode -> nodes.put(consensusNode.getAddress(), consensusNode.getAccountId())); + mirronNodeEnpoint = networkSettings.getConsensusServiceAddress(); client = Client.forNetwork(nodes); if (!networkSettings.getMirrorNodeAddresses().isEmpty()) { try { @@ -76,4 +81,9 @@ public HieroTestContext() { public Client getClient() { return client; } + + @Override + public @NonNull Set getMirrorNodeEndPoint() { + return mirronNodeEnpoint; + } } diff --git a/hiero-enterprise-base/src/test/java/com/openelements/hiero/base/test/config/SoloActionNetworkSettings.java b/hiero-enterprise-base/src/test/java/com/openelements/hiero/base/test/config/SoloActionNetworkSettings.java index f63b7384..139ca643 100644 --- a/hiero-enterprise-base/src/test/java/com/openelements/hiero/base/test/config/SoloActionNetworkSettings.java +++ b/hiero-enterprise-base/src/test/java/com/openelements/hiero/base/test/config/SoloActionNetworkSettings.java @@ -24,6 +24,9 @@ public class SoloActionNetworkSettings implements NetworkSettings { return Set.of("http://localhost:8080"); } + @Override + public @NonNull Set getConsensusServiceAddress() {return Set.of("http://localhost:8080");} + @Override public @NonNull Set getConsensusNodes() { return Set.of(new ConsensusNode("127.0.0.1", "50211", "0.0.3")); diff --git a/hiero-enterprise-microprofile/src/main/java/com/openelements/hiero/microprofile/implementation/HieroConfigImpl.java b/hiero-enterprise-microprofile/src/main/java/com/openelements/hiero/microprofile/implementation/HieroConfigImpl.java index 0141ed19..6415079a 100644 --- a/hiero-enterprise-microprofile/src/main/java/com/openelements/hiero/microprofile/implementation/HieroConfigImpl.java +++ b/hiero-enterprise-microprofile/src/main/java/com/openelements/hiero/microprofile/implementation/HieroConfigImpl.java @@ -27,6 +27,8 @@ public class HieroConfigImpl implements HieroConfig { private final Set mirrorNodeAddresses; + private final Set consensusServiceAddress; + private final Set consensusNodes; private final Long chainId; @@ -51,12 +53,14 @@ public HieroConfigImpl(@NonNull final HieroOperatorConfiguration configuration, final NetworkSettings settings = networkSettings.get(); networkName = settings.getNetworkName().orElse(networkConfiguration.getName().orElse(null)); mirrorNodeAddresses = Collections.unmodifiableSet(settings.getMirrorNodeAddresses()); + consensusServiceAddress = Collections.unmodifiableSet(settings.getConsensusServiceAddress()); consensusNodes = Collections.unmodifiableSet(settings.getConsensusNodes()); chainId = settings.chainId().orElse(null); relayUrl = settings.relayUrl().orElse(null); } else { networkName = networkConfiguration.getName().orElse(null); mirrorNodeAddresses = networkConfiguration.getMirrornode().map(Set::of).orElse(Set.of()); + consensusServiceAddress = Set.of(); consensusNodes = Collections.unmodifiableSet(networkConfiguration.getNodes()); chainId = null; relayUrl = null; @@ -83,6 +87,11 @@ public Optional getRequestTimeout() { return mirrorNodeAddresses; } + @Override + public @NonNull Set getConsensusServiceAddress() { + return consensusServiceAddress; + } + @Override public @NonNull Set getConsensusNodes() { return consensusNodes; diff --git a/hiero-enterprise-spring/src/main/java/com/openelements/hiero/spring/implementation/HieroAutoConfiguration.java b/hiero-enterprise-spring/src/main/java/com/openelements/hiero/spring/implementation/HieroAutoConfiguration.java index 35879b71..c1745ca4 100644 --- a/hiero-enterprise-spring/src/main/java/com/openelements/hiero/spring/implementation/HieroAutoConfiguration.java +++ b/hiero-enterprise-spring/src/main/java/com/openelements/hiero/spring/implementation/HieroAutoConfiguration.java @@ -34,6 +34,8 @@ import java.net.URI; import java.net.URL; import java.util.List; +import java.util.stream.Collectors; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; @@ -110,7 +112,7 @@ TopicClient topicClient(final ProtocolLayerClient protocolLayerClient, HieroCont havingValue = "true", matchIfMissing = true) MirrorNodeClient mirrorNodeClient(final HieroContext hieroContext) { final String mirrorNodeEndpoint; - final List mirrorNetwork = hieroContext.getClient().getMirrorNetwork(); + final List mirrorNetwork = hieroContext.getMirrorNodeEndPoint().stream().toList(); if (mirrorNetwork.isEmpty()) { throw new IllegalArgumentException("Mirror node endpoint must be set"); } diff --git a/hiero-enterprise-spring/src/main/java/com/openelements/hiero/spring/implementation/HieroConfigImpl.java b/hiero-enterprise-spring/src/main/java/com/openelements/hiero/spring/implementation/HieroConfigImpl.java index c3954510..60f21e9c 100644 --- a/hiero-enterprise-spring/src/main/java/com/openelements/hiero/spring/implementation/HieroConfigImpl.java +++ b/hiero-enterprise-spring/src/main/java/com/openelements/hiero/spring/implementation/HieroConfigImpl.java @@ -23,6 +23,8 @@ public class HieroConfigImpl implements HieroConfig { private final Set mirrorNodeAddresses; + private final Set consensusServiceAddress; + private final Set consensusNodes; private final Long chainId; @@ -50,6 +52,7 @@ public HieroConfigImpl(@NonNull final HieroProperties properties) { final NetworkSettings settings = networkSettings.get(); networkName = settings.getNetworkName().orElse(properties.getNetwork().getName()); mirrorNodeAddresses = Collections.unmodifiableSet(settings.getMirrorNodeAddresses()); + consensusServiceAddress = Collections.unmodifiableSet(settings.getConsensusServiceAddress()); consensusNodes = Collections.unmodifiableSet(settings.getConsensusNodes()); chainId = settings.chainId().orElse(null); relayUrl = settings.relayUrl().orElse(null); @@ -61,6 +64,7 @@ public HieroConfigImpl(@NonNull final HieroProperties properties) { } else { mirrorNodeAddresses = Set.of(); } + consensusServiceAddress = Set.of(); final List nodes = properties.getNetwork().getNodes(); if (nodes == null || nodes.isEmpty()) { consensusNodes = Set.of(); @@ -106,6 +110,11 @@ public Set getMirrorNodeAddresses() { return mirrorNodeAddresses; } + @Override + public @NonNull Set getConsensusServiceAddress() { + return consensusServiceAddress; + } + @Override public Set getConsensusNodes() { return consensusNodes; diff --git a/hiero-enterprise-spring/src/test/java/com/openelements/hiero/spring/test/TopicClientTest.java b/hiero-enterprise-spring/src/test/java/com/openelements/hiero/spring/test/TopicClientTest.java index 753a99c3..606f3bfa 100644 --- a/hiero-enterprise-spring/src/test/java/com/openelements/hiero/spring/test/TopicClientTest.java +++ b/hiero-enterprise-spring/src/test/java/com/openelements/hiero/spring/test/TopicClientTest.java @@ -1,19 +1,28 @@ package com.openelements.hiero.spring.test; import com.hedera.hashgraph.sdk.PrivateKey; +import com.hedera.hashgraph.sdk.SubscriptionHandle; import com.hedera.hashgraph.sdk.TopicId; import com.openelements.hiero.base.HieroException; import com.openelements.hiero.base.TopicClient; +import com.openelements.hiero.test.HieroTestUtils; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; + @SpringBootTest(classes = HieroTestConfig.class) public class TopicClientTest { @Autowired private TopicClient topicClient; + @Autowired + private HieroTestUtils hieroTestUtils; + @Test void testCreateTopic() throws HieroException { final TopicId topicId = topicClient.createTopic(); @@ -198,4 +207,22 @@ void testSubmitMessageThrowExceptionFroInvalidId() { final PrivateKey submitKey = PrivateKey.generateECDSA(); Assertions.assertThrows(HieroException.class, () -> topicClient.submitMessage(topicId, submitKey, message)); } + + @Test + void testSubscribeTopic() throws HieroException { + final List messages = new ArrayList<>(); + final TopicId topicId = topicClient.createTopic(); + hieroTestUtils.waitForMirrorNodeRecords(); + + final SubscriptionHandle handle = topicClient.subscribeTopic(topicId, (message) -> { + messages.add(message.toString()); + System.out.println(Arrays.toString(message.contents)); + }); + + topicClient.submitMessage(topicId, "Hello Hiero"); + hieroTestUtils.waitForMirrorNodeRecords(); + + Assertions.assertNotNull(handle); + Assertions.assertEquals(1, messages.size()); + } } diff --git a/hiero-enterprise-test/src/main/java/com/openelements/hiero/test/SoloActionNetworkSettings.java b/hiero-enterprise-test/src/main/java/com/openelements/hiero/test/SoloActionNetworkSettings.java index 262eea04..8df2ec29 100644 --- a/hiero-enterprise-test/src/main/java/com/openelements/hiero/test/SoloActionNetworkSettings.java +++ b/hiero-enterprise-test/src/main/java/com/openelements/hiero/test/SoloActionNetworkSettings.java @@ -24,6 +24,9 @@ public class SoloActionNetworkSettings implements NetworkSettings { return Set.of("http://localhost:8080"); } + @Override + public @NonNull Set getConsensusServiceAddress() {return Set.of("http://localhost:8080");} //TBD + @Override public @NonNull Set getConsensusNodes() { return Set.of(new ConsensusNode("127.0.0.1", "50211", "0.0.3")); From 8bbc6ae6618dd72dd08eec88ae8b1db6d0c9e6d3 Mon Sep 17 00:00:00 2001 From: Manish Dait Date: Mon, 7 Jul 2025 12:51:02 +0530 Subject: [PATCH 2/8] added method to subscribe topic for limited message Signed-off-by: Manish Dait --- .../openelements/hiero/base/TopicClient.java | 30 ++++++++++- .../base/implementation/TopicClientImpl.java | 14 ++++++ .../protocol/data/TopicMessageRequest.java | 6 +++ .../hiero/spring/test/TopicClientTest.java | 50 ++++++++++++++++--- 4 files changed, 92 insertions(+), 8 deletions(-) diff --git a/hiero-enterprise-base/src/main/java/com/openelements/hiero/base/TopicClient.java b/hiero-enterprise-base/src/main/java/com/openelements/hiero/base/TopicClient.java index ab298aa2..62cf3786 100644 --- a/hiero-enterprise-base/src/main/java/com/openelements/hiero/base/TopicClient.java +++ b/hiero-enterprise-base/src/main/java/com/openelements/hiero/base/TopicClient.java @@ -328,7 +328,8 @@ default void submitMessage(@NonNull String topicId, @NonNull String submitKey, @ * @return SubscriptionHandle for the Topic * @throws HieroException if Topic could not be subscribed */ - SubscriptionHandle subscribeTopic(@NonNull TopicId topicId, @NonNull Consumer subscription) throws HieroException; + SubscriptionHandle subscribeTopic(@NonNull TopicId topicId, @NonNull Consumer subscription) + throws HieroException; /** * Subscribe to a Topic @@ -337,7 +338,32 @@ default void submitMessage(@NonNull String topicId, @NonNull String submitKey, @ * @return SubscriptionHandle for the Topic * @throws HieroException if Topic could not be subscribed */ - default SubscriptionHandle subscribeTopic(@NonNull String topicId, @NonNull Consumer subscription) throws HieroException { + default SubscriptionHandle subscribeTopic(@NonNull String topicId, @NonNull Consumer subscription) + throws HieroException { return subscribeTopic(TopicId.fromString(topicId), subscription); } + + /** + * Subscribe to a Topic + * + * @param topicId the topicId of topic + * @param limit the number of message to return + * @return SubscriptionHandle for the Topic + * @throws HieroException if Topic could not be subscribed + */ + SubscriptionHandle subscribeTopic(@NonNull TopicId topicId, @NonNull Consumer subscription, + long limit) throws HieroException; + + /** + * Subscribe to a Topic + * + * @param topicId the topicId of topic + * @param limit the number of message to return + * @return SubscriptionHandle for the Topic + * @throws HieroException if Topic could not be subscribed + */ + default SubscriptionHandle subscribeTopic(@NonNull String topicId, @NonNull Consumer subscription, + long limit) throws HieroException { + return subscribeTopic(TopicId.fromString(topicId), subscription, limit); + } } diff --git a/hiero-enterprise-base/src/main/java/com/openelements/hiero/base/implementation/TopicClientImpl.java b/hiero-enterprise-base/src/main/java/com/openelements/hiero/base/implementation/TopicClientImpl.java index 5bb1dd84..22a84e47 100644 --- a/hiero-enterprise-base/src/main/java/com/openelements/hiero/base/implementation/TopicClientImpl.java +++ b/hiero-enterprise-base/src/main/java/com/openelements/hiero/base/implementation/TopicClientImpl.java @@ -221,4 +221,18 @@ public SubscriptionHandle subscribeTopic(@NonNull TopicId topicId, @NonNull Cons TopicMessageResult result = client.executeTopicMessageQuery(request); return result.subscriptionHandle(); } + + @Override + public SubscriptionHandle subscribeTopic(@NonNull TopicId topicId, @NonNull Consumer subscription, + long limit) throws HieroException { + Objects.requireNonNull(topicId, "topicId must not be null"); + Objects.requireNonNull(subscription, "subscription must not be null"); + if (limit == 0) { + throw new IllegalArgumentException("limit must be greater than 0"); + } + + TopicMessageRequest request = TopicMessageRequest.of(topicId, subscription, limit); + TopicMessageResult result = client.executeTopicMessageQuery(request); + return result.subscriptionHandle(); + } } diff --git a/hiero-enterprise-base/src/main/java/com/openelements/hiero/base/protocol/data/TopicMessageRequest.java b/hiero-enterprise-base/src/main/java/com/openelements/hiero/base/protocol/data/TopicMessageRequest.java index dca2c425..da4afc92 100644 --- a/hiero-enterprise-base/src/main/java/com/openelements/hiero/base/protocol/data/TopicMessageRequest.java +++ b/hiero-enterprise-base/src/main/java/com/openelements/hiero/base/protocol/data/TopicMessageRequest.java @@ -26,4 +26,10 @@ public record TopicMessageRequest(@NonNull TopicId topicId, @NonNull Consumer subscription) { return new TopicMessageRequest(topicId, subscription, null, null, NO_LIMIT, null, null); } + + @NonNull + public static TopicMessageRequest of(@NonNull TopicId topicId, @NonNull Consumer subscription, + @NonNull long limit) { + return new TopicMessageRequest(topicId, subscription, null, null,limit, null, null); + } } diff --git a/hiero-enterprise-spring/src/test/java/com/openelements/hiero/spring/test/TopicClientTest.java b/hiero-enterprise-spring/src/test/java/com/openelements/hiero/spring/test/TopicClientTest.java index 606f3bfa..dfbd5046 100644 --- a/hiero-enterprise-spring/src/test/java/com/openelements/hiero/spring/test/TopicClientTest.java +++ b/hiero-enterprise-spring/src/test/java/com/openelements/hiero/spring/test/TopicClientTest.java @@ -12,7 +12,6 @@ import org.springframework.boot.test.context.SpringBootTest; import java.util.ArrayList; -import java.util.Arrays; import java.util.List; @SpringBootTest(classes = HieroTestConfig.class) @@ -210,19 +209,58 @@ void testSubmitMessageThrowExceptionFroInvalidId() { @Test void testSubscribeTopic() throws HieroException { + final String msg = "Hello Hiero"; final List messages = new ArrayList<>(); final TopicId topicId = topicClient.createTopic(); hieroTestUtils.waitForMirrorNodeRecords(); - final SubscriptionHandle handle = topicClient.subscribeTopic(topicId, (message) -> { - messages.add(message.toString()); - System.out.println(Arrays.toString(message.contents)); + final SubscriptionHandle handler = topicClient.subscribeTopic(topicId, (message) -> { + messages.add(new String(message.contents)); }); - topicClient.submitMessage(topicId, "Hello Hiero"); + topicClient.submitMessage(topicId, msg); hieroTestUtils.waitForMirrorNodeRecords(); - Assertions.assertNotNull(handle); + Assertions.assertNotNull(handler); Assertions.assertEquals(1, messages.size()); + Assertions.assertEquals(msg,messages.getFirst()); + } + + @Test + void testSubscribeTopicWithLimit() throws HieroException { + final String msg = "Hello Hiero"; + final long limit = 1; + + final List messages = new ArrayList<>(); + final TopicId topicId = topicClient.createTopic(); + hieroTestUtils.waitForMirrorNodeRecords(); + + final SubscriptionHandle handler = topicClient.subscribeTopic(topicId, (message) -> { + messages.add(new String(message.contents)); + }, limit); + + topicClient.submitMessage(topicId, msg); + hieroTestUtils.waitForMirrorNodeRecords(); + + topicClient.submitMessage(topicId, msg); + hieroTestUtils.waitForMirrorNodeRecords(); + + Assertions.assertNotNull(handler); + Assertions.assertEquals(limit, messages.size()); + } + + @Test + void testSubscribeTopicWithInvalidLimit() throws HieroException { + final String msg = "limit must be greater than 0"; + final long limit = 0; + + final TopicId topicId = topicClient.createTopic(); + hieroTestUtils.waitForMirrorNodeRecords(); + + final IllegalArgumentException e = Assertions.assertThrows(IllegalArgumentException.class, () -> topicClient.subscribeTopic( + topicId, (message) -> {/**/}, limit + )); + + Assertions.assertEquals(msg, e.getMessage()); } } From 8dfd9c22ba829e12d5ce739b3c9611c31d6e8c86 Mon Sep 17 00:00:00 2001 From: Manish Dait Date: Mon, 7 Jul 2025 13:50:13 +0530 Subject: [PATCH 3/8] added method to subscribe topic with start and end time Signed-off-by: Manish Dait --- .../openelements/hiero/base/TopicClient.java | 57 +++++++++++++++++++ .../base/implementation/TopicClientImpl.java | 45 +++++++++++++++ .../protocol/data/TopicMessageRequest.java | 12 ++++ .../hiero/spring/test/TopicClientTest.java | 25 ++++++++ 4 files changed, 139 insertions(+) diff --git a/hiero-enterprise-base/src/main/java/com/openelements/hiero/base/TopicClient.java b/hiero-enterprise-base/src/main/java/com/openelements/hiero/base/TopicClient.java index 62cf3786..68284e0d 100644 --- a/hiero-enterprise-base/src/main/java/com/openelements/hiero/base/TopicClient.java +++ b/hiero-enterprise-base/src/main/java/com/openelements/hiero/base/TopicClient.java @@ -6,6 +6,7 @@ import com.hedera.hashgraph.sdk.TopicMessage; import org.jspecify.annotations.NonNull; +import java.time.Instant; import java.util.Objects; import java.util.function.Consumer; @@ -366,4 +367,60 @@ default SubscriptionHandle subscribeTopic(@NonNull String topicId, @NonNull Cons long limit) throws HieroException { return subscribeTopic(TopicId.fromString(topicId), subscription, limit); } + + /** + * Subscribe to a Topic + * + * @param topicId the topicId of topic + * @param startTime time to start subscribing to a topic + * @param endTime time to stop subscribing to a topic + * @return SubscriptionHandle for the Topic + * @throws HieroException if Topic could not be subscribed + */ + SubscriptionHandle subscribeTopic(@NonNull TopicId topicId, @NonNull Consumer subscription, + @NonNull Instant startTime, @NonNull Instant endTime) throws HieroException; + + /** + * Subscribe to a Topic + * + * @param topicId the topicId of topic + * @param startTime time to start subscribing to a topic + * @param endTime time to stop subscribing to a topic + * @return SubscriptionHandle for the Topic + * @throws HieroException if Topic could not be subscribed + */ + default SubscriptionHandle subscribeTopic(@NonNull String topicId, @NonNull Consumer subscription, + @NonNull Instant startTime, @NonNull Instant endTime) throws HieroException { + return subscribeTopic(TopicId.fromString(topicId), subscription, startTime, endTime); + } + + /** + * Subscribe to a Topic + * + * @param topicId the topicId of topic + * @param startTime time to start subscribing to a topic + * @param endTime time to stop subscribing to a topic + * @param limit the number of message to return + * @return SubscriptionHandle for the Topic + * @throws HieroException if Topic could not be subscribed + */ + SubscriptionHandle subscribeTopic(@NonNull TopicId topicId, @NonNull Consumer subscription, + @NonNull Instant startTime, @NonNull Instant endTime, long limit) + throws HieroException; + + /** + * Subscribe to a Topic + * + * @param topicId the topicId of topic + * @param startTime time to start subscribing to a topic + * @param endTime time to stop subscribing to a topic + * @param limit the number of message to return + * @return SubscriptionHandle for the Topic + * @throws HieroException if Topic could not be subscribed + */ + default SubscriptionHandle subscribeTopic(@NonNull String topicId, @NonNull Consumer subscription, + @NonNull Instant startTime, @NonNull Instant endTime, long limit) + throws HieroException { + return subscribeTopic(TopicId.fromString(topicId), subscription, startTime, endTime, limit); + } } diff --git a/hiero-enterprise-base/src/main/java/com/openelements/hiero/base/implementation/TopicClientImpl.java b/hiero-enterprise-base/src/main/java/com/openelements/hiero/base/implementation/TopicClientImpl.java index 22a84e47..a98fdfaf 100644 --- a/hiero-enterprise-base/src/main/java/com/openelements/hiero/base/implementation/TopicClientImpl.java +++ b/hiero-enterprise-base/src/main/java/com/openelements/hiero/base/implementation/TopicClientImpl.java @@ -17,6 +17,7 @@ import com.openelements.hiero.base.protocol.data.TopicMessageResult; import org.jspecify.annotations.NonNull; +import java.time.Instant; import java.util.Objects; import java.util.function.Consumer; @@ -235,4 +236,48 @@ public SubscriptionHandle subscribeTopic(@NonNull TopicId topicId, @NonNull Cons TopicMessageResult result = client.executeTopicMessageQuery(request); return result.subscriptionHandle(); } + + @Override + public SubscriptionHandle subscribeTopic(@NonNull TopicId topicId, @NonNull Consumer subscription, + Instant startTime, Instant endTime) throws HieroException { + Objects.requireNonNull(topicId, "topicId must not be null"); + Objects.requireNonNull(subscription, "subscription must not be null"); + Objects.requireNonNull(startTime, "startTime must not be null"); + Objects.requireNonNull(endTime, "endTime must not be null"); + + if (startTime.isBefore(Instant.now())) { + throw new IllegalArgumentException("startTime must be greater than currentTime"); + } + if (endTime.isBefore(startTime)) { + throw new IllegalArgumentException("endTime must be greater than starTime"); + } + + TopicMessageRequest request = TopicMessageRequest.of(topicId, subscription, startTime, endTime); + TopicMessageResult result = client.executeTopicMessageQuery(request); + return result.subscriptionHandle(); + } + + @Override + public SubscriptionHandle subscribeTopic(@NonNull TopicId topicId, @NonNull Consumer subscription, + @NonNull Instant startTime, @NonNull Instant endTime, long limit) + throws HieroException { + Objects.requireNonNull(topicId, "topicId must not be null"); + Objects.requireNonNull(subscription, "subscription must not be null"); + Objects.requireNonNull(startTime, "startTime must not be null"); + Objects.requireNonNull(endTime, "endTime must not be null"); + + if (startTime.isBefore(Instant.now())) { + throw new IllegalArgumentException("startTime must be greater than currentTime"); + } + if (endTime.isBefore(startTime)) { + throw new IllegalArgumentException("endTime must be greater than starTime"); + } + if (limit == 0) { + throw new IllegalArgumentException("limit must be greater than 0"); + } + + TopicMessageRequest request = TopicMessageRequest.of(topicId, subscription, startTime, endTime, limit); + TopicMessageResult result = client.executeTopicMessageQuery(request); + return result.subscriptionHandle(); + } } diff --git a/hiero-enterprise-base/src/main/java/com/openelements/hiero/base/protocol/data/TopicMessageRequest.java b/hiero-enterprise-base/src/main/java/com/openelements/hiero/base/protocol/data/TopicMessageRequest.java index da4afc92..a87149d3 100644 --- a/hiero-enterprise-base/src/main/java/com/openelements/hiero/base/protocol/data/TopicMessageRequest.java +++ b/hiero-enterprise-base/src/main/java/com/openelements/hiero/base/protocol/data/TopicMessageRequest.java @@ -32,4 +32,16 @@ public static TopicMessageRequest of(@NonNull TopicId topicId, @NonNull Consumer @NonNull long limit) { return new TopicMessageRequest(topicId, subscription, null, null,limit, null, null); } + + @NonNull + public static TopicMessageRequest of(@NonNull TopicId topicId, @NonNull Consumer subscription, + @NonNull Instant startTime, @NonNull Instant endTime) { + return new TopicMessageRequest(topicId, subscription, startTime, endTime, NO_LIMIT, null, null); + } + + @NonNull + public static TopicMessageRequest of(@NonNull TopicId topicId, @NonNull Consumer subscription, + @NonNull Instant startTime, @NonNull Instant endTime, long limit) { + return new TopicMessageRequest(topicId, subscription, startTime, endTime, limit, null, null); + } } diff --git a/hiero-enterprise-spring/src/test/java/com/openelements/hiero/spring/test/TopicClientTest.java b/hiero-enterprise-spring/src/test/java/com/openelements/hiero/spring/test/TopicClientTest.java index dfbd5046..7b6c6721 100644 --- a/hiero-enterprise-spring/src/test/java/com/openelements/hiero/spring/test/TopicClientTest.java +++ b/hiero-enterprise-spring/src/test/java/com/openelements/hiero/spring/test/TopicClientTest.java @@ -7,10 +7,12 @@ import com.openelements.hiero.base.TopicClient; import com.openelements.hiero.test.HieroTestUtils; import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.Test; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest; +import java.time.Instant; import java.util.ArrayList; import java.util.List; @@ -263,4 +265,27 @@ void testSubscribeTopicWithInvalidLimit() throws HieroException { Assertions.assertEquals(msg, e.getMessage()); } + + @Test + @Disabled + // To fix + void testSubscribeTopicWithStartAndEndTime() throws HieroException { + final TopicId topicId = topicClient.createTopic(); + hieroTestUtils.waitForMirrorNodeRecords(); + + final Instant startTime = Instant.now().plusSeconds(60); + final Instant endTime = startTime.plusSeconds(120); + + final List messages = new ArrayList<>(); + + final SubscriptionHandle handler = topicClient.subscribeTopic(topicId, (message) -> { + messages.add(new String(message.contents)); + }, startTime, endTime); + + topicClient.submitMessage(topicId, "Hello Hiero"); + hieroTestUtils.waitForMirrorNodeRecords(); + + Assertions.assertNotNull(handler); + Assertions.assertEquals(1, messages.size()); + } } From 98b24c5ed7ade50fbdcc413902c759661a886b6a Mon Sep 17 00:00:00 2001 From: Manish Dait Date: Wed, 9 Jul 2025 08:59:05 +0530 Subject: [PATCH 4/8] added unit test for subscribeTopic() Signed-off-by: Manish Dait --- .../hiero/base/test/TopicClientImplTest.java | 184 ++++++++++++++++++ .../hiero/spring/test/TopicClientTest.java | 24 +-- 2 files changed, 186 insertions(+), 22 deletions(-) diff --git a/hiero-enterprise-base/src/test/java/com/openelements/hiero/base/test/TopicClientImplTest.java b/hiero-enterprise-base/src/test/java/com/openelements/hiero/base/test/TopicClientImplTest.java index bc94c452..0a2c4a0e 100644 --- a/hiero-enterprise-base/src/test/java/com/openelements/hiero/base/test/TopicClientImplTest.java +++ b/hiero-enterprise-base/src/test/java/com/openelements/hiero/base/test/TopicClientImplTest.java @@ -1,7 +1,9 @@ package com.openelements.hiero.base.test; import com.hedera.hashgraph.sdk.PrivateKey; +import com.hedera.hashgraph.sdk.SubscriptionHandle; import com.hedera.hashgraph.sdk.TopicId; +import com.hedera.hashgraph.sdk.TopicMessage; import com.openelements.hiero.base.HieroException; import com.openelements.hiero.base.data.Account; import com.openelements.hiero.base.implementation.TopicClientImpl; @@ -14,12 +16,17 @@ import com.openelements.hiero.base.protocol.data.TopicDeleteResult; import com.openelements.hiero.base.protocol.data.TopicSubmitMessageRequest; import com.openelements.hiero.base.protocol.data.TopicSubmitMessageResult; +import com.openelements.hiero.base.protocol.data.TopicMessageRequest; +import com.openelements.hiero.base.protocol.data.TopicMessageResult; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.mockito.ArgumentCaptor; import org.mockito.Mockito; +import java.time.Instant; +import java.util.function.Consumer; + import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.*; @@ -33,6 +40,7 @@ public class TopicClientImplTest { ArgumentCaptor topicUpdateCaptor = ArgumentCaptor.forClass(TopicUpdateRequest.class); ArgumentCaptor topicDeleteCaptor = ArgumentCaptor.forClass(TopicDeleteRequest.class); ArgumentCaptor topicSubmitCaptor = ArgumentCaptor.forClass(TopicSubmitMessageRequest.class); + ArgumentCaptor topicSubscribeCaptor = ArgumentCaptor.forClass(TopicMessageRequest.class); @BeforeEach void setup() { @@ -587,4 +595,180 @@ void shouldThrowExceptionForNullParamOnSubmitMessage() { Assertions.assertThrows(NullPointerException.class, () -> topicClient.submitMessage((TopicId) null, (String)null)); Assertions.assertThrows(NullPointerException.class, () -> topicClient.submitMessage((TopicId) null, null, (String)null)); } + + @Test + void shouldSubscribeTopic() throws HieroException { + final TopicMessageResult topicMessageResult = Mockito.mock(TopicMessageResult.class); + final SubscriptionHandle subscriptionHandle = Mockito.mock(SubscriptionHandle.class); + + // given + final TopicId topicId = TopicId.fromString("1.2.3"); + final Consumer subscription = (message) -> {}; + + when(protocolLayerClient.executeTopicMessageQuery(any(TopicMessageRequest.class))) + .thenReturn(topicMessageResult); + when(topicMessageResult.subscriptionHandle()).thenReturn(subscriptionHandle); + + final SubscriptionHandle handler = topicClient.subscribeTopic(topicId, subscription); + + verify(protocolLayerClient, times(1)) + .executeTopicMessageQuery(topicSubscribeCaptor.capture()); + final TopicMessageRequest capture = topicSubscribeCaptor.getValue(); + Assertions.assertEquals(topicId, capture.topicId()); + Assertions.assertEquals(subscription, capture.subscription()); + Assertions.assertEquals(-1, capture.limit()); // default limit infinite(-1) + Assertions.assertNull(capture.startTime()); + Assertions.assertNull(capture.endTime()); + + verify(topicMessageResult, times(1)).subscriptionHandle(); + + Assertions.assertNotNull(handler); + Assertions.assertEquals(subscriptionHandle, handler); + } + + @Test + void shouldSubscribeTopicWithLimit() throws HieroException { + final TopicMessageResult topicMessageResult = Mockito.mock(TopicMessageResult.class); + final SubscriptionHandle subscriptionHandle = Mockito.mock(SubscriptionHandle.class); + + // given + final TopicId topicId = TopicId.fromString("1.2.3"); + final int limit = 2; + final Consumer subscription = (message) -> {}; + + when(protocolLayerClient.executeTopicMessageQuery(any(TopicMessageRequest.class))) + .thenReturn(topicMessageResult); + when(topicMessageResult.subscriptionHandle()).thenReturn(subscriptionHandle); + + final SubscriptionHandle handler = topicClient.subscribeTopic(topicId, subscription, limit); + + verify(protocolLayerClient, times(1)) + .executeTopicMessageQuery(topicSubscribeCaptor.capture()); + final TopicMessageRequest capture = topicSubscribeCaptor.getValue(); + Assertions.assertEquals(topicId, capture.topicId()); + Assertions.assertEquals(subscription, capture.subscription()); + Assertions.assertEquals(limit, capture.limit()); + Assertions.assertNull(capture.startTime()); + Assertions.assertNull(capture.endTime()); + + verify(topicMessageResult, times(1)).subscriptionHandle(); + + Assertions.assertNotNull(handler); + Assertions.assertEquals(subscriptionHandle, handler); + } + + @Test + void shouldSubscribeTopicWithStartAndEndTime() throws HieroException { + final TopicMessageResult topicMessageResult = Mockito.mock(TopicMessageResult.class); + final SubscriptionHandle subscriptionHandle = Mockito.mock(SubscriptionHandle.class); + + // given + final TopicId topicId = TopicId.fromString("1.2.3"); + final Consumer subscription = (message) -> {}; + final Instant startTime = Instant.now().plusSeconds(120); + final Instant endTime = Instant.now().plusSeconds(1800); + + when(protocolLayerClient.executeTopicMessageQuery(any(TopicMessageRequest.class))) + .thenReturn(topicMessageResult); + when(topicMessageResult.subscriptionHandle()).thenReturn(subscriptionHandle); + + final SubscriptionHandle handler = topicClient.subscribeTopic(topicId, subscription, startTime, endTime); + + verify(protocolLayerClient, times(1)) + .executeTopicMessageQuery(topicSubscribeCaptor.capture()); + final TopicMessageRequest capture = topicSubscribeCaptor.getValue(); + Assertions.assertEquals(topicId, capture.topicId()); + Assertions.assertEquals(subscription, capture.subscription()); + Assertions.assertEquals(-1, capture.limit()); // default limit + Assertions.assertEquals(startTime, capture.startTime()); + Assertions.assertEquals(endTime, capture.endTime()); + + verify(topicMessageResult, times(1)).subscriptionHandle(); + + Assertions.assertNotNull(handler); + Assertions.assertEquals(subscriptionHandle, handler); + } + + @Test + void shouldSubscribeTopicWithAllParams() throws HieroException { + final TopicMessageResult topicMessageResult = Mockito.mock(TopicMessageResult.class); + final SubscriptionHandle subscriptionHandle = Mockito.mock(SubscriptionHandle.class); + + // given + final TopicId topicId = TopicId.fromString("1.2.3"); + final Consumer subscription = (message) -> {}; + final Instant startTime = Instant.now().plusSeconds(120); + final Instant endTime = Instant.now().plusSeconds(1800); + final int limit = 1; + + when(protocolLayerClient.executeTopicMessageQuery(any(TopicMessageRequest.class))) + .thenReturn(topicMessageResult); + when(topicMessageResult.subscriptionHandle()).thenReturn(subscriptionHandle); + + final SubscriptionHandle handler = topicClient.subscribeTopic(topicId, subscription, startTime, endTime, limit); + + verify(protocolLayerClient, times(1)) + .executeTopicMessageQuery(topicSubscribeCaptor.capture()); + final TopicMessageRequest capture = topicSubscribeCaptor.getValue(); + Assertions.assertEquals(topicId, capture.topicId()); + Assertions.assertEquals(subscription, capture.subscription()); + Assertions.assertEquals(limit, capture.limit()); + Assertions.assertEquals(startTime, capture.startTime()); + Assertions.assertEquals(endTime, capture.endTime()); + + verify(topicMessageResult, times(1)).subscriptionHandle(); + + Assertions.assertNotNull(handler); + Assertions.assertEquals(subscriptionHandle, handler); + } + + @Test + void shouldThrowExceptionOnSubscribeTopicWithInvalidStartAndEndTime() { + // given + final TopicId topicId = TopicId.fromString("1.2.3"); + final Consumer subscription = (message) -> {}; + + final Instant startTime1 = Instant.now().plusSeconds(120); + final Instant endTime1 = startTime1.minusSeconds(60); + final Instant startTime2 = Instant.now().minusSeconds(60); + final Instant endTime2 = Instant.now().plusSeconds(120); + final int limit = 1; + + //End time before start time + final IllegalArgumentException e1 = Assertions.assertThrows(IllegalArgumentException.class, + () -> topicClient.subscribeTopic(topicId, subscription, startTime1, endTime1)); + final IllegalArgumentException e2 = Assertions.assertThrows(IllegalArgumentException.class, + () -> topicClient.subscribeTopic(topicId, subscription, startTime1, endTime1, limit)); + + Assertions.assertEquals("endTime must be greater than starTime", e1.getMessage()); + Assertions.assertEquals("endTime must be greater than starTime", e2.getMessage()); + + //Start time before current time + final IllegalArgumentException e3 = Assertions.assertThrows(IllegalArgumentException.class, + () -> topicClient.subscribeTopic(topicId, subscription, startTime2, endTime2)); + final IllegalArgumentException e4 = Assertions.assertThrows(IllegalArgumentException.class, + () -> topicClient.subscribeTopic(topicId, subscription, startTime2, endTime2, limit)); + + Assertions.assertEquals("startTime must be greater than currentTime", e3.getMessage()); + Assertions.assertEquals("startTime must be greater than currentTime", e4.getMessage()); + } + + @Test + void shouldThrowExceptionOnSubscribeTopicWithLimitEqualsZero() { + final String msg = "limit must be greater than 0"; + // given + final TopicId topicId = TopicId.fromString("1.2.3"); + final Consumer subscription = (message) -> {}; + final Instant startTime = Instant.now().plusSeconds(120); + final Instant endTime = startTime.plusSeconds(120); + final int limit = 0; + + final IllegalArgumentException e1 = Assertions.assertThrows(IllegalArgumentException.class, + () -> topicClient.subscribeTopic(topicId, subscription, limit)); + final IllegalArgumentException e2 = Assertions.assertThrows(IllegalArgumentException.class, + () -> topicClient.subscribeTopic(topicId, subscription, startTime, endTime, limit)); + + Assertions.assertEquals(msg, e1.getMessage()); + Assertions.assertEquals(msg, e2.getMessage()); + } } diff --git a/hiero-enterprise-spring/src/test/java/com/openelements/hiero/spring/test/TopicClientTest.java b/hiero-enterprise-spring/src/test/java/com/openelements/hiero/spring/test/TopicClientTest.java index 7b6c6721..8750c087 100644 --- a/hiero-enterprise-spring/src/test/java/com/openelements/hiero/spring/test/TopicClientTest.java +++ b/hiero-enterprise-spring/src/test/java/com/openelements/hiero/spring/test/TopicClientTest.java @@ -7,12 +7,10 @@ import com.openelements.hiero.base.TopicClient; import com.openelements.hiero.test.HieroTestUtils; import org.junit.jupiter.api.Assertions; -import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.Test; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest; -import java.time.Instant; import java.util.ArrayList; import java.util.List; @@ -267,25 +265,7 @@ void testSubscribeTopicWithInvalidLimit() throws HieroException { } @Test - @Disabled - // To fix - void testSubscribeTopicWithStartAndEndTime() throws HieroException { - final TopicId topicId = topicClient.createTopic(); - hieroTestUtils.waitForMirrorNodeRecords(); - - final Instant startTime = Instant.now().plusSeconds(60); - final Instant endTime = startTime.plusSeconds(120); - - final List messages = new ArrayList<>(); - - final SubscriptionHandle handler = topicClient.subscribeTopic(topicId, (message) -> { - messages.add(new String(message.contents)); - }, startTime, endTime); - - topicClient.submitMessage(topicId, "Hello Hiero"); - hieroTestUtils.waitForMirrorNodeRecords(); - - Assertions.assertNotNull(handler); - Assertions.assertEquals(1, messages.size()); + void testSubscribeTopicWithStartAndEndTime() { + //TODO } } From 0f07e5198aa56e4c5fda5ff3c82891afb8df13c6 Mon Sep 17 00:00:00 2001 From: Manish Dait Date: Tue, 22 Jul 2025 18:02:29 +0530 Subject: [PATCH 5/8] added test for subscribeTopic with start and end time Signed-off-by: Manish Dait --- .../hiero/spring/test/TopicClientTest.java | 47 +++++++++++++++++-- 1 file changed, 42 insertions(+), 5 deletions(-) diff --git a/hiero-enterprise-spring/src/test/java/com/openelements/hiero/spring/test/TopicClientTest.java b/hiero-enterprise-spring/src/test/java/com/openelements/hiero/spring/test/TopicClientTest.java index 8750c087..c1c6e4a6 100644 --- a/hiero-enterprise-spring/src/test/java/com/openelements/hiero/spring/test/TopicClientTest.java +++ b/hiero-enterprise-spring/src/test/java/com/openelements/hiero/spring/test/TopicClientTest.java @@ -11,6 +11,8 @@ import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest; +import java.time.Duration; +import java.time.Instant; import java.util.ArrayList; import java.util.List; @@ -257,15 +259,50 @@ void testSubscribeTopicWithInvalidLimit() throws HieroException { final TopicId topicId = topicClient.createTopic(); hieroTestUtils.waitForMirrorNodeRecords(); - final IllegalArgumentException e = Assertions.assertThrows(IllegalArgumentException.class, () -> topicClient.subscribeTopic( - topicId, (message) -> {/**/}, limit - )); + final IllegalArgumentException e = Assertions.assertThrows( + IllegalArgumentException.class, + () -> topicClient.subscribeTopic(topicId, (message) -> {/**/}, limit) + ); Assertions.assertEquals(msg, e.getMessage()); } @Test - void testSubscribeTopicWithStartAndEndTime() { - //TODO + void testSubscribeTopicWithStartAndEndTime() throws HieroException { + final TopicId topicId = topicClient.createTopic(); + hieroTestUtils.waitForMirrorNodeRecords(); + final Instant start = Instant.now().plus(Duration.ofMinutes(10)); + final Instant end = Instant.now().plus(Duration.ofDays(2)); + final SubscriptionHandle handler = Assertions.assertDoesNotThrow( + () -> topicClient.subscribeTopic(topicId, (message) -> {}, start, end) + ); + + Assertions.assertNotNull(handler); + } + + @Test + void testSubscribeTopicWithStartAndEndTimeWithInvalidParams() throws HieroException { + final TopicId topicId = topicClient.createTopic(); + // Start time before Current time + final Instant invalidStart = Instant.now().minus(Duration.ofMinutes(10)); + final Instant end = Instant.now().plus(Duration.ofDays(2)); + + final IllegalArgumentException e1 = Assertions.assertThrows( + IllegalArgumentException.class, + () -> topicClient.subscribeTopic(topicId, (message) -> {}, invalidStart, end) + ); + + Assertions.assertEquals("startTime must be greater than currentTime", e1.getMessage()); + + // End time before than Start time + final Instant start = Instant.now().plus(Duration.ofMinutes(10)); + final Instant invalidEnd = start.minus(Duration.ofMinutes(1)); + + final IllegalArgumentException e2 = Assertions.assertThrows( + IllegalArgumentException.class, + () -> topicClient.subscribeTopic(topicId, (message) -> {}, start, invalidEnd) + ); + + Assertions.assertEquals("endTime must be greater than starTime", e2.getMessage()); } } From 39ea11f571488e469d1e5f22b20e0f8e64ae2feb Mon Sep 17 00:00:00 2001 From: Manish Dait Date: Wed, 23 Jul 2025 13:48:34 +0530 Subject: [PATCH 6/8] fix minnor issues Signed-off-by: Manish Dait --- .../hiero/base/implementation/TopicClientImpl.java | 4 ++-- .../hiero/base/protocol/data/TopicMessageRequest.java | 2 +- .../openelements/hiero/base/test/TopicClientImplTest.java | 4 ++-- .../hiero/base/test/config/HieroTestContext.java | 6 +++--- .../com/openelements/hiero/spring/test/TopicClientTest.java | 2 +- .../openelements/hiero/test/SoloActionNetworkSettings.java | 2 +- 6 files changed, 10 insertions(+), 10 deletions(-) diff --git a/hiero-enterprise-base/src/main/java/com/openelements/hiero/base/implementation/TopicClientImpl.java b/hiero-enterprise-base/src/main/java/com/openelements/hiero/base/implementation/TopicClientImpl.java index a98fdfaf..de1172a6 100644 --- a/hiero-enterprise-base/src/main/java/com/openelements/hiero/base/implementation/TopicClientImpl.java +++ b/hiero-enterprise-base/src/main/java/com/openelements/hiero/base/implementation/TopicClientImpl.java @@ -249,7 +249,7 @@ public SubscriptionHandle subscribeTopic(@NonNull TopicId topicId, @NonNull Cons throw new IllegalArgumentException("startTime must be greater than currentTime"); } if (endTime.isBefore(startTime)) { - throw new IllegalArgumentException("endTime must be greater than starTime"); + throw new IllegalArgumentException("endTime must be greater than startTime"); } TopicMessageRequest request = TopicMessageRequest.of(topicId, subscription, startTime, endTime); @@ -270,7 +270,7 @@ public SubscriptionHandle subscribeTopic(@NonNull TopicId topicId, @NonNull Cons throw new IllegalArgumentException("startTime must be greater than currentTime"); } if (endTime.isBefore(startTime)) { - throw new IllegalArgumentException("endTime must be greater than starTime"); + throw new IllegalArgumentException("endTime must be greater than startTime"); } if (limit == 0) { throw new IllegalArgumentException("limit must be greater than 0"); diff --git a/hiero-enterprise-base/src/main/java/com/openelements/hiero/base/protocol/data/TopicMessageRequest.java b/hiero-enterprise-base/src/main/java/com/openelements/hiero/base/protocol/data/TopicMessageRequest.java index a87149d3..e75d41d1 100644 --- a/hiero-enterprise-base/src/main/java/com/openelements/hiero/base/protocol/data/TopicMessageRequest.java +++ b/hiero-enterprise-base/src/main/java/com/openelements/hiero/base/protocol/data/TopicMessageRequest.java @@ -30,7 +30,7 @@ public static TopicMessageRequest of(@NonNull TopicId topicId, @NonNull Consumer @NonNull public static TopicMessageRequest of(@NonNull TopicId topicId, @NonNull Consumer subscription, @NonNull long limit) { - return new TopicMessageRequest(topicId, subscription, null, null,limit, null, null); + return new TopicMessageRequest(topicId, subscription, null, null, limit, null, null); } @NonNull diff --git a/hiero-enterprise-base/src/test/java/com/openelements/hiero/base/test/TopicClientImplTest.java b/hiero-enterprise-base/src/test/java/com/openelements/hiero/base/test/TopicClientImplTest.java index 0a2c4a0e..784af499 100644 --- a/hiero-enterprise-base/src/test/java/com/openelements/hiero/base/test/TopicClientImplTest.java +++ b/hiero-enterprise-base/src/test/java/com/openelements/hiero/base/test/TopicClientImplTest.java @@ -740,8 +740,8 @@ void shouldThrowExceptionOnSubscribeTopicWithInvalidStartAndEndTime() { final IllegalArgumentException e2 = Assertions.assertThrows(IllegalArgumentException.class, () -> topicClient.subscribeTopic(topicId, subscription, startTime1, endTime1, limit)); - Assertions.assertEquals("endTime must be greater than starTime", e1.getMessage()); - Assertions.assertEquals("endTime must be greater than starTime", e2.getMessage()); + Assertions.assertEquals("endTime must be greater than startTime", e1.getMessage()); + Assertions.assertEquals("endTime must be greater than startTime", e2.getMessage()); //Start time before current time final IllegalArgumentException e3 = Assertions.assertThrows(IllegalArgumentException.class, diff --git a/hiero-enterprise-base/src/test/java/com/openelements/hiero/base/test/config/HieroTestContext.java b/hiero-enterprise-base/src/test/java/com/openelements/hiero/base/test/config/HieroTestContext.java index c4761fb1..b53e2481 100644 --- a/hiero-enterprise-base/src/test/java/com/openelements/hiero/base/test/config/HieroTestContext.java +++ b/hiero-enterprise-base/src/test/java/com/openelements/hiero/base/test/config/HieroTestContext.java @@ -24,7 +24,7 @@ public class HieroTestContext implements HieroContext { private final Client client; - private final Set mirronNodeEnpoint; + private final Set mirronNodeEndpoint; public HieroTestContext() { final Dotenv dotenv = Dotenv.configure().ignoreIfMissing().load(); @@ -61,7 +61,7 @@ public HieroTestContext() { final Map nodes = new HashMap<>(); networkSettings.getConsensusNodes() .forEach(consensusNode -> nodes.put(consensusNode.getAddress(), consensusNode.getAccountId())); - mirronNodeEnpoint = networkSettings.getConsensusServiceAddress(); + mirronNodeEndpoint = networkSettings.getConsensusServiceAddress(); client = Client.forNetwork(nodes); if (!networkSettings.getMirrorNodeAddresses().isEmpty()) { try { @@ -84,6 +84,6 @@ public Client getClient() { @Override public @NonNull Set getMirrorNodeEndPoint() { - return mirronNodeEnpoint; + return mirronNodeEndpoint; } } diff --git a/hiero-enterprise-spring/src/test/java/com/openelements/hiero/spring/test/TopicClientTest.java b/hiero-enterprise-spring/src/test/java/com/openelements/hiero/spring/test/TopicClientTest.java index c1c6e4a6..edeb9919 100644 --- a/hiero-enterprise-spring/src/test/java/com/openelements/hiero/spring/test/TopicClientTest.java +++ b/hiero-enterprise-spring/src/test/java/com/openelements/hiero/spring/test/TopicClientTest.java @@ -303,6 +303,6 @@ void testSubscribeTopicWithStartAndEndTimeWithInvalidParams() throws HieroExcept () -> topicClient.subscribeTopic(topicId, (message) -> {}, start, invalidEnd) ); - Assertions.assertEquals("endTime must be greater than starTime", e2.getMessage()); + Assertions.assertEquals("endTime must be greater than startTime", e2.getMessage()); } } diff --git a/hiero-enterprise-test/src/main/java/com/openelements/hiero/test/SoloActionNetworkSettings.java b/hiero-enterprise-test/src/main/java/com/openelements/hiero/test/SoloActionNetworkSettings.java index 8df2ec29..de20f896 100644 --- a/hiero-enterprise-test/src/main/java/com/openelements/hiero/test/SoloActionNetworkSettings.java +++ b/hiero-enterprise-test/src/main/java/com/openelements/hiero/test/SoloActionNetworkSettings.java @@ -25,7 +25,7 @@ public class SoloActionNetworkSettings implements NetworkSettings { } @Override - public @NonNull Set getConsensusServiceAddress() {return Set.of("http://localhost:8080");} //TBD + public @NonNull Set getConsensusServiceAddress() {return Set.of("http://localhost:8080");} @Override public @NonNull Set getConsensusNodes() { From 07489dc1e6385ccda3baacea804c5e0355f19a1f Mon Sep 17 00:00:00 2001 From: Manish Dait Date: Fri, 25 Jul 2025 10:09:19 +0530 Subject: [PATCH 7/8] added missing consensus address value Signed-off-by: Manish Dait --- .../base/config/implementation/EnvBasedHieroConfig.java | 8 ++++++++ .../implementation/NetworkSettingsBasedHieroConfig.java | 5 +++++ .../hiero/test/SoloActionNetworkSettings.java | 2 +- 3 files changed, 14 insertions(+), 1 deletion(-) diff --git a/hiero-enterprise-base/src/main/java/com/openelements/hiero/base/config/implementation/EnvBasedHieroConfig.java b/hiero-enterprise-base/src/main/java/com/openelements/hiero/base/config/implementation/EnvBasedHieroConfig.java index 081f74fe..27f0a2b9 100644 --- a/hiero-enterprise-base/src/main/java/com/openelements/hiero/base/config/implementation/EnvBasedHieroConfig.java +++ b/hiero-enterprise-base/src/main/java/com/openelements/hiero/base/config/implementation/EnvBasedHieroConfig.java @@ -17,6 +17,7 @@ public class EnvBasedHieroConfig implements HieroConfig { private final PrivateKey operatorPrivateKey; private final String mirrorNodeAddress; + private final String consensusServiceAddress; private final String consensusNodeIp; private final String consensusNodePort; private final String consensusNodeAccount; @@ -37,6 +38,8 @@ public EnvBasedHieroConfig() { .orElseThrow(() -> new IllegalStateException("HEDERA_OPERATOR_PRIVATE_KEY is not set")); mirrorNodeAddress = getEnv("HEDERA_MIRROR_NODE_ADDRESS") .orElse(null); + consensusServiceAddress = getEnv("HEDERA_CONSENSUS_SERVICE_ADDRESS") + .orElseThrow(() -> new IllegalStateException("HEDERA_CONSENSUS_SERVICE_ADDRESS is not set")); consensusNodeIp = getEnv("HEDERA_CONSENSUS_NODE_IP") .orElseThrow(() -> new IllegalStateException("HEDERA_CONSENSUS_NODE_IP is not set")); consensusNodePort = getEnv("HEDERA_CONSENSUS_NODE_PORT") @@ -71,6 +74,11 @@ private Optional getEnv(String key) { return Set.of(mirrorNodeAddress); } + @Override + public @NonNull Set getConsensusServiceAddress() { + return Set.of(consensusServiceAddress); + } + @Override public @NonNull Set getConsensusNodes() { ConsensusNode node = new ConsensusNode( diff --git a/hiero-enterprise-base/src/main/java/com/openelements/hiero/base/config/implementation/NetworkSettingsBasedHieroConfig.java b/hiero-enterprise-base/src/main/java/com/openelements/hiero/base/config/implementation/NetworkSettingsBasedHieroConfig.java index 943d5965..caa57050 100644 --- a/hiero-enterprise-base/src/main/java/com/openelements/hiero/base/config/implementation/NetworkSettingsBasedHieroConfig.java +++ b/hiero-enterprise-base/src/main/java/com/openelements/hiero/base/config/implementation/NetworkSettingsBasedHieroConfig.java @@ -37,6 +37,11 @@ public NetworkSettingsBasedHieroConfig(@NonNull final Account operatorAccount, @ return networkSetting.getMirrorNodeAddresses(); } + @Override + public @NonNull Set getConsensusServiceAddress() { + return networkSetting.getConsensusServiceAddress(); + } + @Override public @NonNull Set getConsensusNodes() { return networkSetting.getConsensusNodes(); diff --git a/hiero-enterprise-test/src/main/java/com/openelements/hiero/test/SoloActionNetworkSettings.java b/hiero-enterprise-test/src/main/java/com/openelements/hiero/test/SoloActionNetworkSettings.java index 36fb7f88..6cdfa64d 100644 --- a/hiero-enterprise-test/src/main/java/com/openelements/hiero/test/SoloActionNetworkSettings.java +++ b/hiero-enterprise-test/src/main/java/com/openelements/hiero/test/SoloActionNetworkSettings.java @@ -25,7 +25,7 @@ public class SoloActionNetworkSettings implements NetworkSettings { } @Override - public @NonNull Set getConsensusServiceAddress() {return Set.of("http://localhost:8080");} + public @NonNull Set getConsensusServiceAddress() {return Set.of("localhost:5600");} @Override public @NonNull Set getConsensusNodes() { From 168de805afb20ca232589ee6327f0fbeaf72f29c Mon Sep 17 00:00:00 2001 From: Manish Dait Date: Fri, 25 Jul 2025 10:36:55 +0530 Subject: [PATCH 8/8] update test for subscribeTopic Signed-off-by: Manish Dait --- .../hiero/spring/test/TopicClientTest.java | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/hiero-enterprise-spring/src/test/java/com/openelements/hiero/spring/test/TopicClientTest.java b/hiero-enterprise-spring/src/test/java/com/openelements/hiero/spring/test/TopicClientTest.java index edeb9919..83234e91 100644 --- a/hiero-enterprise-spring/src/test/java/com/openelements/hiero/spring/test/TopicClientTest.java +++ b/hiero-enterprise-spring/src/test/java/com/openelements/hiero/spring/test/TopicClientTest.java @@ -210,7 +210,7 @@ void testSubmitMessageThrowExceptionFroInvalidId() { } @Test - void testSubscribeTopic() throws HieroException { + void testSubscribeTopic() throws Exception { final String msg = "Hello Hiero"; final List messages = new ArrayList<>(); final TopicId topicId = topicClient.createTopic(); @@ -222,14 +222,16 @@ void testSubscribeTopic() throws HieroException { topicClient.submitMessage(topicId, msg); hieroTestUtils.waitForMirrorNodeRecords(); + Thread.sleep(5000); // Make sure to wait after message get recorded in mirrornode Assertions.assertNotNull(handler); Assertions.assertEquals(1, messages.size()); Assertions.assertEquals(msg,messages.getFirst()); + handler.unsubscribe(); } @Test - void testSubscribeTopicWithLimit() throws HieroException { + void testSubscribeTopicWithLimit() throws Exception { final String msg = "Hello Hiero"; final long limit = 1; @@ -243,12 +245,15 @@ void testSubscribeTopicWithLimit() throws HieroException { topicClient.submitMessage(topicId, msg); hieroTestUtils.waitForMirrorNodeRecords(); + Thread.sleep(5000); // Make sure to wait after message get recorded in mirrornode topicClient.submitMessage(topicId, msg); hieroTestUtils.waitForMirrorNodeRecords(); + Thread.sleep(5000); // Make sure to wait after message get recorded in mirrornode Assertions.assertNotNull(handler); Assertions.assertEquals(limit, messages.size()); + handler.unsubscribe(); } @Test @@ -278,6 +283,7 @@ void testSubscribeTopicWithStartAndEndTime() throws HieroException { ); Assertions.assertNotNull(handler); + handler.unsubscribe(); } @Test