Skip to content
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
Show all changes
19 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@
import org.apache.ignite.internal.configuration.SystemLocalConfiguration;
import org.apache.ignite.internal.configuration.testframework.ConfigurationExtension;
import org.apache.ignite.internal.configuration.testframework.InjectConfiguration;
import org.apache.ignite.internal.failure.NoOpFailureManager;
import org.apache.ignite.internal.network.NodeFinder;
import org.apache.ignite.internal.network.StaticNodeFinder;
import org.apache.ignite.internal.raft.RaftGroupConfiguration;
Expand Down Expand Up @@ -74,7 +73,7 @@ protected List<MockNode> createNodes(int numNodes) {

protected List<MockNode> createNodes(int numNodes, BiConsumer<Integer, RaftGroupConfiguration> onConfigurationCommittedListener) {
List<NetworkAddress> seedAddresses = createSeedAddresses(numNodes);
NodeFinder nodeFinder = new StaticNodeFinder(seedAddresses, new NoOpFailureManager());
NodeFinder nodeFinder = new StaticNodeFinder(seedAddresses);

return IntStream.range(0, numNodes)
.mapToObj(i -> new MockNode(
Expand Down Expand Up @@ -107,7 +106,7 @@ protected MockNode createNode(int idx, int clusterSize, Consumer<RaftGroupConfig
return new MockNode(
testInfo,
new NetworkAddress("localhost", PORT_BASE + idx),
new StaticNodeFinder(createSeedAddresses(clusterSize), new NoOpFailureManager()),
new StaticNodeFinder(createSeedAddresses(clusterSize)),
workDir,
raftConfiguration,
systemLocalConfiguration,
Expand All @@ -126,7 +125,7 @@ protected MockNode createNode(
return new MockNode(
testInfo,
new NetworkAddress("localhost", PORT_BASE + idx),
new StaticNodeFinder(createSeedAddresses(clusterSize), new NoOpFailureManager()),
new StaticNodeFinder(createSeedAddresses(clusterSize)),
workDir,
raftConfiguration,
systemLocalConfiguration,
Expand Down
2 changes: 1 addition & 1 deletion modules/network/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ dependencies {
integrationTestImplementation testFixtures(project(':ignite-core'))
integrationTestImplementation testFixtures(project(':ignite-configuration'))
integrationTestImplementation testFixtures(project(':ignite-runner'))
integrationTestImplementation testFixtures(project(':ignite-failure-handler'))
integrationTestImplementation testFixtures(project(':ignite-failure-handler:'))
integrationTestImplementation libs.compileTesting
integrationTestImplementation libs.netty.handler
integrationTestImplementation libs.scalecube.cluster
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,21 +17,18 @@

package org.apache.ignite.internal.network;

import static org.apache.ignite.internal.testframework.IgniteTestUtils.assertThrowsWithCause;
import static org.apache.ignite.internal.util.ExceptionUtils.unwrapRootCause;
import static org.apache.ignite.lang.ErrorGroups.Network.ADDRESS_UNRESOLVED_ERR;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;

import org.apache.ignite.internal.ClusterPerClassIntegrationTest;
import org.apache.ignite.internal.failure.FailureManager;
import org.apache.ignite.internal.lang.IgniteInternalException;
import org.apache.ignite.internal.testframework.log4j2.LogInspector;
import org.apache.logging.log4j.Level;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInfo;

/**
* Tests that node finder failure triggers failure manager.
* Tests that node finder failure causes node shutdown.
*/
class ItStaticNodeFinderTest extends ClusterPerClassIntegrationTest {
@Override
Expand All @@ -55,27 +52,12 @@ protected boolean needInitializeCluster() {

@Test
void testNodeShutdownOnNodeFinderFailure(TestInfo testInfo) {
LogInspector logInspector = new LogInspector(FailureManager.class.getName());
Throwable throwable = assertThrowsWithCause(
() -> CLUSTER.startAndInit(testInfo, initialNodes(), cmgMetastoreNodes(), this::configureInitParameters),
IgniteInternalException.class);

logInspector.addHandler(
evt -> {
if (evt.getLevel() == Level.ERROR) {
IgniteInternalException actual = (IgniteInternalException) unwrapRootCause(evt.getThrown());
assertEquals(ADDRESS_UNRESOLVED_ERR, actual.code());
assertEquals("No network addresses found", actual.getMessage());

return true;
}

return false;
}, () -> assertThrows(IllegalStateException.class, CLUSTER::aliveNode));

logInspector.start();

try {
CLUSTER.startAndInit(testInfo, initialNodes(), cmgMetastoreNodes(), this::configureInitParameters);
} finally {
logInspector.stop();
}
IgniteInternalException actual = (IgniteInternalException) unwrapRootCause(throwable);
assertEquals(ADDRESS_UNRESOLVED_ERR, actual.code());
assertEquals("No network addresses resolved through any provided names", actual.getMessage());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import org.apache.ignite.internal.failure.NoOpFailureManager;
import org.apache.ignite.internal.lang.NodeStoppingException;
import org.apache.ignite.internal.manager.ComponentContext;
import org.apache.ignite.internal.network.ClusterService;
Expand All @@ -56,7 +55,7 @@ public class ItClusterServiceTest extends BaseIgniteAbstractTest {
void testShutdown(TestInfo testInfo) {
var addr = new NetworkAddress("localhost", 10000);

ClusterService service = clusterService(testInfo, addr.port(), new StaticNodeFinder(List.of(addr), new NoOpFailureManager()));
ClusterService service = clusterService(testInfo, addr.port(), new StaticNodeFinder(List.of(addr)));

assertThat(service.startAsync(new ComponentContext()), willCompleteSuccessfully());

Expand All @@ -78,10 +77,8 @@ void testShutdown(TestInfo testInfo) {
void testUpdateMetadata(TestInfo testInfo) throws Exception {
var addr1 = new NetworkAddress("localhost", 10000);
var addr2 = new NetworkAddress("localhost", 10001);
ClusterService service1 = clusterService(testInfo, addr1.port(),
new StaticNodeFinder(List.of(addr1, addr2), new NoOpFailureManager()));
ClusterService service2 = clusterService(testInfo, addr2.port(),
new StaticNodeFinder(List.of(addr1, addr2), new NoOpFailureManager()));
ClusterService service1 = clusterService(testInfo, addr1.port(), new StaticNodeFinder(List.of(addr1, addr2)));
ClusterService service2 = clusterService(testInfo, addr2.port(), new StaticNodeFinder(List.of(addr1, addr2)));
assertThat(service1.startAsync(new ComponentContext()), willCompleteSuccessfully());
assertThat(service2.startAsync(new ComponentContext()), willCompleteSuccessfully());
assertTrue(waitForCondition(() -> service1.topologyService().allMembers().size() == 2, 1000));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.ignite.internal.failure.NoOpFailureManager;
import org.apache.ignite.internal.logger.IgniteLogger;
import org.apache.ignite.internal.logger.Loggers;
import org.apache.ignite.internal.manager.ComponentContext;
Expand Down Expand Up @@ -86,7 +85,7 @@ public void testRestarts(TestInfo testInfo) {

List<NetworkAddress> addresses = findLocalAddresses(initPort, initPort + 5);

var nodeFinder = new StaticNodeFinder(addresses, new NoOpFailureManager());
var nodeFinder = new StaticNodeFinder(addresses);

services = addresses.stream()
.map(addr -> startNetwork(testInfo, addr, nodeFinder))
Expand Down Expand Up @@ -180,7 +179,7 @@ public void testRestartDuringSends(TestInfo testInfo) {

List<NetworkAddress> addresses = findLocalAddresses(initPort, initPort + 2);

var nodeFinder = new StaticNodeFinder(addresses, new NoOpFailureManager());
var nodeFinder = new StaticNodeFinder(addresses);

services = addresses.stream()
.map(addr -> startNetwork(testInfo, addr, nodeFinder))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,6 @@
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;
import java.util.function.Predicate;
import org.apache.ignite.internal.failure.NoOpFailureManager;
import org.apache.ignite.internal.lang.IgniteBiTuple;
import org.apache.ignite.internal.lang.NodeStoppingException;
import org.apache.ignite.internal.manager.ComponentContext;
Expand Down Expand Up @@ -1410,7 +1409,7 @@ private static final class Cluster {

List<NetworkAddress> addresses = findLocalAddresses(initialPort, initialPort + numOfNodes);

this.nodeFinder = new StaticNodeFinder(addresses, new NoOpFailureManager());
this.nodeFinder = new StaticNodeFinder(addresses);

members = addresses.stream()
.map(addr -> startNode(testInfo, addr, clusterIdSupplierFactory, productVersionSourceFactory))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
import java.util.Arrays;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.ignite.internal.failure.FailureProcessor;
import org.apache.ignite.internal.lang.IgniteInternalException;
import org.apache.ignite.internal.network.configuration.MulticastNodeFinderConfigurationSchema;
import org.apache.ignite.internal.network.configuration.MulticastNodeFinderView;
Expand All @@ -50,16 +49,15 @@ public class NodeFinderFactory {
public static NodeFinder createNodeFinder(
NodeFinderView nodeFinderConfiguration,
String nodeName,
InetSocketAddress localBindAddress,
FailureProcessor failureProcessor
InetSocketAddress localBindAddress
) {
switch (nodeFinderConfiguration.type()) {
case StaticNodeFinderConfigurationSchema.TYPE:
StaticNodeFinderView staticConfig = (StaticNodeFinderView) nodeFinderConfiguration;

return Arrays.stream(staticConfig.netClusterNodes())
.map(NetworkAddress::from)
.collect(collectingAndThen(toUnmodifiableList(), addresses -> new StaticNodeFinder(addresses, failureProcessor)));
.collect(collectingAndThen(toUnmodifiableList(), StaticNodeFinder::new));
case MulticastNodeFinderConfigurationSchema.TYPE:
MulticastNodeFinderView multicastConfig = (MulticastNodeFinderView) nodeFinderConfiguration;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,8 @@
import java.net.UnknownHostException;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import org.apache.ignite.internal.failure.FailureContext;
import org.apache.ignite.internal.failure.FailureProcessor;
import java.util.Set;
import org.apache.ignite.internal.lang.IgniteInternalException;
import org.apache.ignite.internal.logger.IgniteLogger;
import org.apache.ignite.internal.logger.Loggers;
Expand All @@ -52,22 +50,19 @@ public class StaticNodeFinder implements NodeFinder {
/** List of seed cluster members. */
private final List<NetworkAddress> addresses;

private final FailureProcessor failureProcessor;

/**
* Constructor.
*
* @param addresses Addresses of initial cluster members.
*/
public StaticNodeFinder(List<NetworkAddress> addresses, FailureProcessor failureProcessor) {
public StaticNodeFinder(List<NetworkAddress> addresses) {
this.addresses = addresses;
this.failureProcessor = failureProcessor;
}

@Override
public Collection<NetworkAddress> findNodes() {
if (addresses.isEmpty()) {
return Collections.emptyList();
return Set.of();
}

Collection<NetworkAddress> networkAddresses = addresses.parallelStream()
Expand All @@ -78,8 +73,7 @@ public Collection<NetworkAddress> findNodes() {
.collect(toSet());

if (networkAddresses.isEmpty()) {
var err = new IgniteInternalException(ADDRESS_UNRESOLVED_ERR, "No network addresses found");
failureProcessor.process(new FailureContext(err, "Failed to resolve node addresses"));
throw new IgniteInternalException(ADDRESS_UNRESOLVED_ERR, "No network addresses resolved through any provided names");
}

return networkAddresses;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -192,8 +192,7 @@ public void onDisappeared(InternalClusterNode member) {
NodeFinder finder = NodeFinderFactory.createNodeFinder(
configView.nodeFinder(),
nodeName(),
connectionMgr.localBindAddress(),
failureProcessor
connectionMgr.localBindAddress()
);
finder.start();

Expand Down
1 change: 0 additions & 1 deletion modules/replicator/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,6 @@ dependencies {
testFixturesImplementation testFixtures(project(':ignite-network'))
testFixturesImplementation testFixtures(project(':ignite-configuration:'))
testFixturesImplementation testFixtures(project(':ignite-raft'))
testFixturesImplementation testFixtures(project(':ignite-failure-handler'))
}

description = 'ignite-replicator'
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,6 @@
import org.apache.ignite.internal.cluster.management.topology.api.LogicalTopologyService;
import org.apache.ignite.internal.configuration.testframework.ConfigurationExtension;
import org.apache.ignite.internal.configuration.testframework.InjectConfiguration;
import org.apache.ignite.internal.failure.NoOpFailureManager;
import org.apache.ignite.internal.lang.IgniteBiTuple;
import org.apache.ignite.internal.manager.ComponentContext;
import org.apache.ignite.internal.network.ClusterService;
Expand Down Expand Up @@ -234,8 +233,7 @@ private IgniteBiTuple<TopologyAwareRaftGroupService, TopologyAwareRaftGroupServi
// Start client service for the second client.
int clientPort = PORT_BASE + nodes + 1;
ClusterService clientClusterService =
clusterService(testInfo, clientPort,
new StaticNodeFinder(findLocalAddresses(PORT_BASE, PORT_BASE + nodes), new NoOpFailureManager()));
clusterService(testInfo, clientPort, new StaticNodeFinder(findLocalAddresses(PORT_BASE, PORT_BASE + nodes)));
assertThat(clientClusterService.startAsync(new ComponentContext()), willCompleteSuccessfully());

// Start the second topology aware client, that should not get the initial leader notification.
Expand Down Expand Up @@ -432,7 +430,7 @@ private void stopCluster() throws Exception {
) {
List<NetworkAddress> addresses = findLocalAddresses(PORT_BASE, PORT_BASE + nodes);

var nodeFinder = new StaticNodeFinder(addresses, new NoOpFailureManager());
var nodeFinder = new StaticNodeFinder(addresses);

TopologyAwareRaftGroupService raftClient = null;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -363,7 +363,7 @@ public ItTxTestCluster(
this.replicationConfiguration = replicationConfiguration;

localAddresses = findLocalAddresses(NODE_PORT_BASE, NODE_PORT_BASE + nodes);
nodeFinder = new StaticNodeFinder(localAddresses, new NoOpFailureManager());
nodeFinder = new StaticNodeFinder(localAddresses);
}

/**
Expand Down