diff --git a/test/test-subscription-content-filter.js b/test/test-subscription-content-filter.js index dafa3e57..a44a2b31 100644 --- a/test/test-subscription-content-filter.js +++ b/test/test-subscription-content-filter.js @@ -4,10 +4,13 @@ const childProcess = require('child_process'); const assert = require('assert'); const rclnodejs = require('../index.js'); const DistroUtils = rclnodejs.DistroUtils; -const RMWUtils = rclnodejs.RMWUtils; -const TIME1 = 5000; // ms -const TIME2 = 10000; // ms +const RMWUtils = rclnodejs.RMWUtils; +const Node = rclnodejs.Node; +const DEFAULT_NODE_OPTIONS = Node.getDefaultOptions(); +const TOPIC = 'test'; +const PUBLISHER_INTERVAL = 200; +const SUBSCRIBER_WAIT_TIME = 1000; function isContentFilteringSupported() { return ( @@ -16,40 +19,62 @@ function isContentFilteringSupported() { ); } +function createAndRunPublisher(node, typeclass, topic, msgValue, interval=PUBLISHER_INTERVAL) { + const publisher = node.createPublisher(typeclass, topic); + const msg = rclnodejs.createMessage(typeclass); + msg.data = msgValue; + const timer = setInterval(() => { + publisher.publish(msg); + }, interval); + return timer; +} + describe('subscription content-filtering', function () { + let publisherNode; + let subscriberNode; + let intervals; + this.timeout(30 * 1000); + before(function() { + if (!isContentFilteringSupported()) { + this.skip(); + } + }); + beforeEach(async function () { - return await rclnodejs.init(); + await rclnodejs.init(); + this.publisherNode = new Node('ctf_test_publisher_node'); + this.subscriberNode = new Node('ctf_test_subscriber_node'); + this.intervals = []; }); afterEach(function () { + this.intervals.forEach(interval => clearInterval(interval)); + this.publisherNode.destroy(); + this.subscriberNode.destroy(); rclnodejs.shutdown(); }); it('isContentFilteringEnabled', function (done) { - let node = new rclnodejs.Node('string_subscription'); - let msgString = 'std_msgs/msg/Int16'; - let options = rclnodejs.Node.getDefaultOptions(); + const typeclass = 'std_msgs/msg/Int16'; + const options = Node.getDefaultOptions(); options.contentFilter = { expression: 'data = 16', }; - - let subscription = node.createSubscription( - msgString, - 'channel', + let subscription = this.subscriberNode.createSubscription( + typeclass, + TOPIC, options, (msg) => {} ); - assert.ok( - subscription.hasContentFilter() === isContentFilteringSupported() - ); + assert.ok(subscription.hasContentFilter()); - node.destroySubscription(subscription); - subscription = node.createSubscription( - msgString, - 'String_channel', + this.subscriberNode.destroySubscription(subscription); + subscription = this.subscriberNode.createSubscription( + typeclass, + TOPIC, (msg) => {} ); assert.ok(!subscription.hasContentFilter()); @@ -57,160 +82,182 @@ describe('subscription content-filtering', function () { done(); }); - it('no parameters', function (done) { - if (!isContentFilteringSupported()) { - this.skip(); - } + it('no parameters', async function() { + const typeclass = 'std_msgs/msg/String'; + const publisherTimer1 = + createAndRunPublisher( + this.publisherNode, + typeclass, + TOPIC, + 'FilteredData' + ); + this.intervals.push(publisherTimer1); + + const publisherTimer2 = + createAndRunPublisher( + this.publisherNode, + typeclass, + TOPIC, + 'Data' + ); + this.intervals.push(publisherTimer2); - let node = new rclnodejs.Node('string_subscription'); - let msgString = 'std_msgs/msg/String'; - let options = rclnodejs.Node.getDefaultOptions(); + let options = Node.getDefaultOptions(); options.contentFilter = { expression: "data = 'FilteredData'", }; let msgCnt = 0; let fail = false; - let subscription = node.createSubscription( - msgString, - 'String_channel', + let subscription = this.subscriberNode.createSubscription( + typeclass, + TOPIC, options, (msg) => { msgCnt++; - if (msg.data != 'FilteredData') { - fail = true; - } + if (msg.data != 'FilteredData') fail = true; } ); assert.ok(subscription.hasContentFilter()); - let publisher1 = childProcess.fork(`${__dirname}/publisher_msg.js`, [ - 'String', - "'FilteredData'", - ]); - - let publisher2 = childProcess.fork(`${__dirname}/publisher_msg.js`, [ - 'String', - "'Data'", - ]); + this.subscriberNode.spin(); - setTimeout(() => { - publisher1.kill('SIGINT'); - publisher2.kill('SIGINT'); - assert.ok(msgCnt && !fail); - done(); - }, 1000); + const p = new Promise((resolve) => + setTimeout(() => { + resolve(msgCnt && !fail); + }, SUBSCRIBER_WAIT_TIME) + ); + let result = await p; - node.spin(node); + assert.ok(result); }); - it('single parameter', function (done) { - if (!isContentFilteringSupported()) { - this.skip(); - } - - let node = new rclnodejs.Node('string_subscription'); - let msgString = 'std_msgs/msg/String'; - let options = rclnodejs.Node.getDefaultOptions(); + it('single parameter', async function() { + const typeclass = 'std_msgs/msg/String'; + const publisherTimer1 = + createAndRunPublisher( + this.publisherNode, + typeclass, + TOPIC, + 'FilteredData' + ); + this.intervals.push(publisherTimer1); + + const publisherTimer2 = + createAndRunPublisher( + this.publisherNode, + typeclass, + TOPIC, + 'Data', + 200); + this.intervals.push(publisherTimer2); + + let options = Node.getDefaultOptions(); options.contentFilter = { expression: 'data = %0', parameters: ["'FilteredData'"], }; - + let msgCnt = 0; let fail = false; - let subscription = node.createSubscription( - msgString, - 'String_channel', + let subscription = this.subscriberNode.createSubscription( + typeclass, + TOPIC, options, (msg) => { msgCnt++; - if (msg.data != 'FilteredData') { - fail = true; - } + if (msg.data != 'FilteredData') fail = true; } ); assert.ok(subscription.hasContentFilter()); - let publisher1 = childProcess.fork(`${__dirname}/publisher_msg.js`, [ - 'String', - "'FilteredData'", - ]); + this.subscriberNode.spin(); - let publisher2 = childProcess.fork(`${__dirname}/publisher_msg.js`, [ - 'String', - "'Data'", - ]); - - setTimeout(() => { - publisher1.kill('SIGINT'); - publisher2.kill('SIGINT'); - assert.ok(msgCnt && !fail); - done(); - }, 1000); + const p = new Promise((resolve) => + setTimeout(() => { + resolve(msgCnt && !fail); + }, 1000) + ); + let result = await p; - node.spin(node); + assert.ok(result); }); - it('multiple parameters', function (done) { - if (!isContentFilteringSupported()) { - this.skip(); - } - - let node = new rclnodejs.Node('int32_subscription'); - let msgString = 'std_msgs/msg/Int32'; - let options = rclnodejs.Node.getDefaultOptions(); + it('multiple parameters', async function() { + const typeclass = 'std_msgs/msg/Int32'; + const publisherTimer1 = + createAndRunPublisher( + this.publisherNode, + typeclass, + TOPIC, + 0 + ); + this.intervals.push(publisherTimer1); + + const publisherTimer2 = + createAndRunPublisher( + this.publisherNode, + typeclass, + TOPIC, + 7, + 200); + this.intervals.push(publisherTimer2); + + let options = Node.getDefaultOptions(); options.contentFilter = { expression: 'data >= %0 AND data <= %1', parameters: [5, 10], }; - + let msgCnt = 0; let fail = false; - let subscription = node.createSubscription( - msgString, - 'Int32_channel', + const subscription = this.subscriberNode.createSubscription( + typeclass, + TOPIC, options, (msg) => { msgCnt++; - if (msg.data === 0) { - fail = true; - } + if (msg.data === 0) fail = true; } ); assert.ok(subscription.hasContentFilter()); - let publisher1 = childProcess.fork(`${__dirname}/publisher_msg.js`, [ - 'Int32', - '0', - ]); - - let publisher2 = childProcess.fork(`${__dirname}/publisher_msg.js`, [ - 'Int32', - '7', - ]); + this.subscriberNode.spin(); - setTimeout(() => { - publisher1.kill('SIGINT'); - publisher2.kill('SIGINT'); - assert.ok(msgCnt && !fail); - done(); - }, 1000); + const p = new Promise((resolve) => + setTimeout(() => { + resolve(msgCnt && !fail); + }, 1000) + ); + let result = await p; - node.spin(node); + assert.ok(result); }); - it('setContentFilter', function (done) { - if (!isContentFilteringSupported()) { - this.skip(); - } - - let node = new rclnodejs.Node('int32_subscription'); - let msgString = 'std_msgs/msg/Int32'; - let options = rclnodejs.Node.getDefaultOptions(); + it('setContentFilter', async function() { + const typeclass = 'std_msgs/msg/Int32'; + const publisherTimer1 = + createAndRunPublisher( + this.publisherNode, + typeclass, + TOPIC, + 0 + ); + this.intervals.push(publisherTimer1); + + const publisherTimer2 = + createAndRunPublisher( + this.publisherNode, + typeclass, + TOPIC, + 5, + 200); + this.intervals.push(publisherTimer2); + + let options = Node.getDefaultOptions(); options.contentFilter = { expression: 'data = %0', parameters: [3], @@ -219,9 +266,9 @@ describe('subscription content-filtering', function () { let msgCnt0 = 0; let msgCnt5 = 0; let fail = false; - let subscription = node.createSubscription( - msgString, - 'Int32_channel', + const subscription = this.subscriberNode.createSubscription( + typeclass, + TOPIC, options, (msg) => { switch (msg.data) { @@ -239,57 +286,64 @@ describe('subscription content-filtering', function () { assert.ok(subscription.hasContentFilter()); - let publisher1 = childProcess.fork(`${__dirname}/publisher_msg.js`, [ - 'Int32', - '0', - ]); - - let publisher2 = childProcess.fork(`${__dirname}/publisher_msg.js`, [ - 'Int32', - '5', - ]); - - setTimeout(() => { - const contentFilter5 = { - expression: 'data = 5', - }; - subscription.setContentFilter(contentFilter5); - }, TIME1); - - setTimeout(() => { - publisher1.kill('SIGINT'); - publisher2.kill('SIGINT'); - assert.ok(!fail && msgCnt5 && !msgCnt0); - done(); - }, TIME2); - - node.spin(); - }); + this.subscriberNode.spin(); + + const p1 = new Promise((resolve) => + setTimeout(() => { + let msgCnt = msgCnt0 + msgCnt5; + const contentFilter5 = { + expression: 'data = 5', + }; + subscription.setContentFilter(contentFilter5); + resolve(msgCnt); + }, SUBSCRIBER_WAIT_TIME) + ); + let result = await p1; + assert.strictEqual(result, 0); - it('clearContentFilter', function (done) { - if (!isContentFilteringSupported()) { - this.skip(); - } + const p2 = new Promise((resolve) => + setTimeout(() => { + resolve(!fail && msgCnt5 && !msgCnt0); + }, SUBSCRIBER_WAIT_TIME) + ); + result = await p2; + assert.ok(result); + }); - let node = new rclnodejs.Node('int32_subscription'); - let msgString = 'std_msgs/msg/Int32'; - let options = rclnodejs.Node.getDefaultOptions(); + it('set undefined content filter', async function() { + const typeclass = 'std_msgs/msg/Int32'; + const publisherTimer1 = + createAndRunPublisher( + this.publisherNode, + typeclass, + TOPIC, + 0 + ); + this.intervals.push(publisherTimer1); + + const publisherTimer2 = + createAndRunPublisher( + this.publisherNode, + typeclass, + TOPIC, + 5, + 200); + this.intervals.push(publisherTimer2); + + let options = Node.getDefaultOptions(); options.contentFilter = { expression: 'data = %0', parameters: [5], }; - let msgCnt = 0; let msgCnt0 = 0; let msgCnt5 = 0; let fail = false; - let filterCleared = false; - let subscription = node.createSubscription( - msgString, - 'Int32_channel', + const subscription = this.subscriberNode.createSubscription( + typeclass, + TOPIC, options, (msg) => { - msgCnt++; switch (msg.data) { case 0: msgCnt0++; @@ -305,179 +359,95 @@ describe('subscription content-filtering', function () { assert.ok(subscription.hasContentFilter()); - let publisher1 = childProcess.fork(`${__dirname}/publisher_msg.js`, [ - 'Int32', - '0', - ]); - - let publisher2 = childProcess.fork(`${__dirname}/publisher_msg.js`, [ - 'Int32', - '5', - ]); - - setTimeout(() => { - assert.ok(msgCnt5 && !msgCnt0 && !fail); - subscription.clearContentFilter(); - }, TIME1); - - setTimeout(() => { - publisher1.kill('SIGINT'); - publisher2.kill('SIGINT'); - assert.ok(!subscription.hasContentFilter()); - assert.ok(!fail && msgCnt5 && msgCnt0); - done(); - }, TIME2); - - node.spin(); - }); + this.subscriberNode.spin(); - it('multiple clearContentFilter', function (done) { - if (!isContentFilteringSupported()) { - this.skip(); - } + const p1 = new Promise((resolve) => + setTimeout(() => { + const result = !msgCnt0 && msgCnt5 && !fail; + subscription.setContentFilter(); + resolve(result); + }, SUBSCRIBER_WAIT_TIME) + ); + let result = await p1; + assert.ok(result); + assert.ok(!subscription.hasContentFilter()); + + const p2 = new Promise((resolve) => + setTimeout(() => { + resolve(msgCnt0 && msgCnt5 && !fail); + }, SUBSCRIBER_WAIT_TIME) + ); + result = await p2; + assert.ok(result); + }); - let node = new rclnodejs.Node('int32_subscription'); - let msgString = 'std_msgs/msg/Int32'; - let options = rclnodejs.Node.getDefaultOptions(); + it('clearContentFilter', function(done) { + const typeclass = 'std_msgs/msg/Int32'; + let options = Node.getDefaultOptions(); options.contentFilter = { expression: 'data = %0', parameters: [5], }; - let subscription = node.createSubscription( - msgString, - 'Int32_channel', + const subscription = this.subscriberNode.createSubscription( + typeclass, + TOPIC, options, (msg) => {} ); assert.ok(subscription.hasContentFilter()); assert.ok(subscription.clearContentFilter()); - assert.ok(subscription.clearContentFilter()); + assert.ok(!subscription.hasContentFilter()); + done(); }); - it('no content-filter', function (done) { - if (!isContentFilteringSupported()) { - this.skip(); - } - - let node = new rclnodejs.Node('string_subscription'); - let msgString = 'std_msgs/msg/String'; + it('multiple clearContentFilter', function(done) { + const typeclass = 'std_msgs/msg/Int32'; + let options = Node.getDefaultOptions(); + options.contentFilter = { + expression: 'data = %0', + parameters: [5], + }; - let msgCnt = 0; - let subscription = node.createSubscription( - msgString, - 'String_channel', - (msg) => { - msgCnt++; - } + const subscription = this.subscriberNode.createSubscription( + typeclass, + TOPIC, + options, + (msg) => {} ); + assert.ok(subscription.hasContentFilter()); + assert.ok(subscription.clearContentFilter()); + assert.ok(!subscription.hasContentFilter()); + assert.ok(subscription.clearContentFilter()); + assert.ok(subscription.clearContentFilter()); assert.ok(!subscription.hasContentFilter()); - let publisher = childProcess.fork(`${__dirname}/publisher_msg.js`, [ - 'String', - "'Data'", - ]); - - setTimeout(() => { - publisher.kill('SIGINT'); - assert.ok(msgCnt > 0); - done(); - }, TIME1); - - node.spin(node); + done(); }); it('bad expression', function (done) { - if (!isContentFilteringSupported()) { - this.skip(); - } - - let node = new rclnodejs.Node('string_subscription'); - let msgString = 'std_msgs/msg/String'; - let options = rclnodejs.Node.getDefaultOptions(); + const typeclass = 'std_msgs/msg/Int16'; + const options = Node.getDefaultOptions(); options.contentFilter = { expression: 'this will fail', }; let subscription; try { - subscription = subscription = node.createSubscription( - msgString, - 'String_channel', + this.subscriberNode.createSubscription( + typeclass, + TOPIC, options, (msg) => {} ); - } catch (e) {} + } catch (err) {} assert.ok(!subscription || !subscription.hasContentFilter()); + done(); }); - it('setContentFilter(undefined)', function (done) { - if (!isContentFilteringSupported()) { - this.skip(); - } - - let node = new rclnodejs.Node('int32_subscription'); - let msgString = 'std_msgs/msg/Int32'; - let options = rclnodejs.Node.getDefaultOptions(); - options.contentFilter = { - expression: 'data = %0', - parameters: [5], - }; - - let msgCnt = 0; - let msgCnt0 = 0; - let msgCnt5 = 0; - let fail = false; - let filterCleared = false; - let subscription = node.createSubscription( - msgString, - 'Int32_channel', - options, - (msg) => { - msgCnt++; - switch (msg.data) { - case 0: - msgCnt0++; - break; - case 5: - msgCnt5++; - break; - default: - fail = true; - } - } - ); - - assert.ok(subscription.hasContentFilter()); - - let publisher1 = childProcess.fork(`${__dirname}/publisher_msg.js`, [ - 'Int32', - '0', - ]); - - let publisher2 = childProcess.fork(`${__dirname}/publisher_msg.js`, [ - 'Int32', - '5', - ]); - - setTimeout(() => { - assert.ok(msgCnt5 && !msgCnt0 && !fail); - subscription.setContentFilter(); - }, TIME1); - - setTimeout(() => { - publisher1.kill('SIGINT'); - publisher2.kill('SIGINT'); - assert.ok(!subscription.hasContentFilter()); - assert.ok(!fail && msgCnt5 && msgCnt0); - done(); - }, TIME2); - - node.spin(); - }); });