Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 23 additions & 21 deletions src/sdam/monitor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,6 @@ const kConnection = Symbol('connection');
/** @internal */
const kCancellationToken = Symbol('cancellationToken');
/** @internal */
const kRTTPinger = Symbol('rttPinger');
/** @internal */
const kRoundTripTime = Symbol('roundTripTime');

const STATE_IDLE = 'idle';
Expand Down Expand Up @@ -81,7 +79,7 @@ export class Monitor extends TypedEventEmitter<MonitorEvents> {
[kCancellationToken]: CancellationToken;
/** @internal */
[kMonitorId]?: MonitorInterval;
[kRTTPinger]?: RTTPinger;
rttPinger?: RTTPinger;

get connection(): Connection | undefined {
return this[kConnection];
Expand Down Expand Up @@ -197,8 +195,8 @@ function resetMonitorState(monitor: Monitor) {
monitor[kMonitorId]?.stop();
monitor[kMonitorId] = undefined;

monitor[kRTTPinger]?.close();
monitor[kRTTPinger] = undefined;
monitor.rttPinger?.close();
monitor.rttPinger = undefined;

monitor[kCancellationToken].emit('cancel');

Expand Down Expand Up @@ -251,8 +249,8 @@ function checkServer(monitor: Monitor, callback: Callback<Document | null>) {
}
: { socketTimeoutMS: connectTimeoutMS };

if (isAwaitable && monitor[kRTTPinger] == null) {
monitor[kRTTPinger] = new RTTPinger(
if (isAwaitable && monitor.rttPinger == null) {
monitor.rttPinger = new RTTPinger(
monitor[kCancellationToken],
Object.assign(
{ heartbeatFrequencyMS: monitor.options.heartbeatFrequencyMS },
Expand All @@ -271,9 +269,10 @@ function checkServer(monitor: Monitor, callback: Callback<Document | null>) {
hello.isWritablePrimary = hello[LEGACY_HELLO_COMMAND];
}

const rttPinger = monitor[kRTTPinger];
const duration =
isAwaitable && rttPinger ? rttPinger.roundTripTime : calculateDurationInMs(start);
isAwaitable && monitor.rttPinger
? monitor.rttPinger.roundTripTime
: calculateDurationInMs(start);

monitor.emit(
Server.SERVER_HEARTBEAT_SUCCEEDED,
Expand All @@ -289,8 +288,8 @@ function checkServer(monitor: Monitor, callback: Callback<Document | null>) {
);
start = now();
} else {
monitor[kRTTPinger]?.close();
monitor[kRTTPinger] = undefined;
monitor.rttPinger?.close();
monitor.rttPinger = undefined;

callback(undefined, hello);
}
Expand Down Expand Up @@ -383,7 +382,7 @@ export interface RTTPingerOptions extends ConnectionOptions {
/** @internal */
export class RTTPinger {
/** @internal */
[kConnection]?: Connection;
connection?: Connection;
/** @internal */
[kCancellationToken]: CancellationToken;
/** @internal */
Expand All @@ -393,7 +392,7 @@ export class RTTPinger {
closed: boolean;

constructor(cancellationToken: CancellationToken, options: RTTPingerOptions) {
this[kConnection] = undefined;
this.connection = undefined;
this[kCancellationToken] = cancellationToken;
this[kRoundTripTime] = 0;
this.closed = false;
Expand All @@ -410,8 +409,8 @@ export class RTTPinger {
this.closed = true;
clearTimeout(this[kMonitorId]);

this[kConnection]?.destroy({ force: true });
this[kConnection] = undefined;
this.connection?.destroy({ force: true });
this.connection = undefined;
}
}

Expand All @@ -430,8 +429,8 @@ function measureRoundTripTime(rttPinger: RTTPinger, options: RTTPingerOptions) {
return;
}

if (rttPinger[kConnection] == null) {
rttPinger[kConnection] = conn;
if (rttPinger.connection == null) {
rttPinger.connection = conn;
}

rttPinger[kRoundTripTime] = calculateDurationInMs(start);
Expand All @@ -441,11 +440,11 @@ function measureRoundTripTime(rttPinger: RTTPinger, options: RTTPingerOptions) {
);
}

const connection = rttPinger[kConnection];
const connection = rttPinger.connection;
if (connection == null) {
connect(options, (err, conn) => {
if (err) {
rttPinger[kConnection] = undefined;
rttPinger.connection = undefined;
rttPinger[kRoundTripTime] = 0;
return;
}
Expand All @@ -456,9 +455,12 @@ function measureRoundTripTime(rttPinger: RTTPinger, options: RTTPingerOptions) {
return;
}

connection.command(ns('admin.$cmd'), { [LEGACY_HELLO_COMMAND]: 1 }, undefined, err => {
const commandName =
connection.serverApi?.version || connection.helloOk ? 'hello' : LEGACY_HELLO_COMMAND;
connection.command(ns('admin.$cmd'), { [commandName]: 1 }, undefined, err => {
if (err) {
rttPinger[kConnection] = undefined;
rttPinger.connection?.destroy({ force: true });
rttPinger.connection = undefined;
rttPinger[kRoundTripTime] = 0;
return;
}
Expand Down
24 changes: 10 additions & 14 deletions src/sdam/server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -72,9 +72,6 @@ const stateTransition = makeStateMachine({
[STATE_CLOSING]: [STATE_CLOSING, STATE_CLOSED]
});

/** @internal */
const kMonitor = Symbol('monitor');

/** @public */
export type ServerOptions = Omit<ConnectionPoolOptions, 'id' | 'generation' | 'hostAddress'> &
MonitorOptions;
Expand Down Expand Up @@ -118,7 +115,7 @@ export class Server extends TypedEventEmitter<ServerEvents> {
s: ServerPrivate;
serverApi?: ServerApi;
hello?: Document;
[kMonitor]: Monitor | null;
monitor: Monitor | null;

/** @event */
static readonly SERVER_HEARTBEAT_STARTED = SERVER_HEARTBEAT_STARTED;
Expand Down Expand Up @@ -164,22 +161,21 @@ export class Server extends TypedEventEmitter<ServerEvents> {
});

if (this.loadBalanced) {
this[kMonitor] = null;
this.monitor = null;
// monitoring is disabled in load balancing mode
return;
}

// create the monitor
// TODO(NODE-4144): Remove new variable for type narrowing
const monitor = new Monitor(this, this.s.options);
this[kMonitor] = monitor;
this.monitor = new Monitor(this, this.s.options);

for (const event of HEARTBEAT_EVENTS) {
monitor.on(event, (e: any) => this.emit(event, e));
this.monitor.on(event, (e: any) => this.emit(event, e));
}

monitor.on('resetServer', (error: MongoError) => markServerUnknown(this, error));
monitor.on(Server.SERVER_HEARTBEAT_SUCCEEDED, (event: ServerHeartbeatSucceededEvent) => {
this.monitor.on('resetServer', (error: MongoError) => markServerUnknown(this, error));
this.monitor.on(Server.SERVER_HEARTBEAT_SUCCEEDED, (event: ServerHeartbeatSucceededEvent) => {
this.emit(
Server.DESCRIPTION_RECEIVED,
new ServerDescription(this.description.hostAddress, event.reply, {
Expand Down Expand Up @@ -235,7 +231,7 @@ export class Server extends TypedEventEmitter<ServerEvents> {
// a load balancer. It never transitions out of this state and
// has no monitor.
if (!this.loadBalanced) {
this[kMonitor]?.connect();
this.monitor?.connect();
} else {
stateTransition(this, STATE_CONNECTED);
this.emit(Server.CONNECT, this);
Expand All @@ -258,7 +254,7 @@ export class Server extends TypedEventEmitter<ServerEvents> {
stateTransition(this, STATE_CLOSING);

if (!this.loadBalanced) {
this[kMonitor]?.close();
this.monitor?.close();
}

this.s.pool.close(options, err => {
Expand All @@ -276,7 +272,7 @@ export class Server extends TypedEventEmitter<ServerEvents> {
*/
requestCheck(): void {
if (!this.loadBalanced) {
this[kMonitor]?.requestCheck();
this.monitor?.requestCheck();
}
}

Expand Down Expand Up @@ -437,7 +433,7 @@ function markServerUnknown(server: Server, error?: MongoServerError) {
}

if (error instanceof MongoNetworkError && !(error instanceof MongoNetworkTimeoutError)) {
server[kMonitor]?.reset();
server.monitor?.reset();
}

server.emit(
Expand Down
175 changes: 175 additions & 0 deletions test/integration/connection-monitoring-and-pooling/rtt_pinger.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,175 @@
import { expect } from 'chai';
import * as semver from 'semver';
import * as sinon from 'sinon';

import { type Connection, type MongoClient, type RTTPinger } from '../../../src';
import { LEGACY_HELLO_COMMAND } from '../../../src/constants';
import { sleep } from '../../tools/utils';

/**
* RTTPingers are only created after getting a hello from the server that defines topologyVersion
* Each monitor is reaching out to a different node and rttPinger's are created async as a result.
*
* This function checks for rttPingers and sleeps if none are found.
*/
async function getRTTPingers(client: MongoClient) {
type RTTPingerConnection = Omit<RTTPinger, 'connection'> & { connection: Connection };
const pingers = (rtt => rtt?.connection != null) as (r?: RTTPinger) => r is RTTPingerConnection;

if (!client.topology) expect.fail('Must provide a connected client');

// eslint-disable-next-line no-constant-condition
while (true) {
const servers = client.topology.s.servers.values();
const rttPingers = Array.from(servers, s => s.monitor?.rttPinger).filter(pingers);

if (rttPingers.length !== 0) {
return rttPingers;
}

await sleep(5);
}
}

describe('class RTTPinger', () => {
afterEach(() => sinon.restore());

beforeEach(async function () {
if (!this.currentTest) return;
if (this.configuration.isLoadBalanced) {
this.currentTest.skipReason = 'No monitoring in LB mode, test not relevant';
return this.skip();
}
if (semver.gte('4.4.0', this.configuration.version)) {
this.currentTest.skipReason =
'Test requires streaming monitoring, needs to be on MongoDB 4.4+';
return this.skip();
}
});

context('when serverApi is enabled', () => {
let serverApiClient: MongoClient;

beforeEach(async function () {
if (!this.currentTest) return;

if (semver.gte('5.0.0', this.configuration.version)) {
this.currentTest.skipReason = 'Test requires serverApi, needs to be on MongoDB 5.0+';
return this.skip();
}

serverApiClient = this.configuration.newClient(
{},
{ serverApi: { version: '1', strict: true }, heartbeatFrequencyMS: 10 }
);
});

afterEach(async () => {
await serverApiClient?.close();
});

it('measures rtt with a hello command', async function () {
await serverApiClient.connect();
const rttPingers = await getRTTPingers(serverApiClient);

const spies = rttPingers.map(rtt => sinon.spy(rtt.connection, 'command'));

await sleep(11); // allow for another ping after spies have been made

expect(spies).to.have.lengthOf.at.least(1);
for (const spy of spies) {
expect(spy).to.have.been.calledWith(sinon.match.any, { hello: 1 }, sinon.match.any);
}
});
});

context('when serverApi is disabled', () => {
let client: MongoClient;

beforeEach(async function () {
if (!this.currentTest) return;
if (this.configuration.serverApi) {
this.currentTest.skipReason = 'Test requires serverApi to NOT be enabled';
return this.skip();
}

client = this.configuration.newClient({}, { heartbeatFrequencyMS: 10 });
});

afterEach(async () => {
await client?.close();
});

context('connected to a pre-hello server', () => {
it('measures rtt with a LEGACY_HELLO_COMMAND command', async function () {
await client.connect();
const rttPingers = await getRTTPingers(client);

// Fake pre-hello server.
// Hello was back-ported to feature versions of the server so we would need to pin
// versions prior to 4.4.2, 4.2.10, 4.0.21, and 3.6.21 to integration test
for (const rtt of rttPingers) rtt.connection.helloOk = false;

const spies = rttPingers.map(rtt => sinon.spy(rtt.connection, 'command'));

await sleep(11); // allow for another ping after spies have been made

expect(spies).to.have.lengthOf.at.least(1);
for (const spy of spies) {
expect(spy).to.have.been.calledWith(
sinon.match.any,
{ [LEGACY_HELLO_COMMAND]: 1 },
sinon.match.any
);
}
});
});

context('connected to a helloOk server', () => {
it('measures rtt with a hello command', async function () {
await client.connect();
const rttPingers = await getRTTPingers(client);

const spies = rttPingers.map(rtt => sinon.spy(rtt.connection, 'command'));

// We should always be connected to helloOk servers
for (const rtt of rttPingers) expect(rtt.connection).to.have.property('helloOk', true);

await sleep(11); // allow for another ping after spies have been made

expect(spies).to.have.lengthOf.at.least(1);
for (const spy of spies) {
expect(spy).to.have.been.calledWith(sinon.match.any, { hello: 1 }, sinon.match.any);
}
});
});
});

context(`when the RTTPinger's hello command receives any error`, () => {
let client: MongoClient;
beforeEach(async function () {
client = this.configuration.newClient({}, { heartbeatFrequencyMS: 10 });
});

afterEach(async () => {
await client?.close();
});

it('destroys the connection with force=true', async function () {
await client.connect();
const rttPingers = await getRTTPingers(client);

for (const rtt of rttPingers) {
sinon.stub(rtt.connection, 'command').yieldsRight(new Error('any error'));
}
const spies = rttPingers.map(rtt => sinon.spy(rtt.connection, 'destroy'));

await sleep(11); // allow for another ping after spies have been made

expect(spies).to.have.lengthOf.at.least(1);
for (const spy of spies) {
expect(spy).to.have.been.calledWithExactly({ force: true });
}
});
});
});
Loading