diff --git a/test/test_rosbag/CMakeLists.txt b/test/test_rosbag/CMakeLists.txt index 534403f7ca..f0d779a5d9 100644 --- a/test/test_rosbag/CMakeLists.txt +++ b/test/test_rosbag/CMakeLists.txt @@ -50,6 +50,7 @@ if(CATKIN_ENABLE_TESTING) add_rostest(${PROJECT_BINARY_DIR}/test/latched_sub.test) add_rostest(test/record_two_publishers.test) add_rostest(test/record_one_publisher_two_topics.test) + add_rostest(test/snapshot.test) include_directories(${GTEST_INCLUDE_DIRS}) add_executable(double_pub EXCLUDE_FROM_ALL test/double_pub.cpp) diff --git a/test/test_rosbag/test/snapshot.test b/test/test_rosbag/test/snapshot.test new file mode 100644 index 0000000000..df4d640f12 --- /dev/null +++ b/test/test_rosbag/test/snapshot.test @@ -0,0 +1,20 @@ + + + + default_duration_limit: 1 # Maximum time difference between newest and oldest message, seconds, overrides -d flag + default_memory_limit: 0.1 # Maximum memory used by messages in each topic's buffer, MB, override -s flag + topics: + - test1 # Inherit defaults + - test2: # Override duration limit, inherit memory limit + duration: 2 + - test3: # Override both limits + duration: -1 # Negative means no duration limit + memory: 0.00 + + + + + + + + diff --git a/test/test_rosbag/test/test_snapshot.py b/test/test_rosbag/test/test_snapshot.py new file mode 100755 index 0000000000..8bd6aad4fd --- /dev/null +++ b/test/test_rosbag/test/test_snapshot.py @@ -0,0 +1,250 @@ +#!/usr/bin/env python +# Software License Agreement (BSD License) +# +# Copyright (c) 2018, Open Source Robotics Foundation, Inc. +# All rights reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions +# are met: +# +# * Redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer. +# * Redistributions in binary form must reproduce the above +# copyright notice, this list of conditions and the following +# disclaimer in the documentation and/or other materials provided +# with the distribution. +# * Neither the name of Open Source Robotics Foundation, Inc. nor the +# names of its contributors may be used to endorse or promote products +# derived from this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS +# FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE +# COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, +# INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, +# BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; +# LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER +# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT +# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN +# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE +# POSSIBILITY OF SUCH DAMAGE. + +import os +import sys +import tempfile +import unittest +import rospy +from rosbag import Bag +from std_msgs.msg import String +from std_srvs.srv import SetBool +from rosbag_msgs.msg import SnapshotStatus +from rosbag_msgs.srv import TriggerSnapshot, TriggerSnapshotRequest, TriggerSnapshotResponse + + +class TestRosbagSnapshot(unittest.TestCase): + ''' + Tests the "rosbag snapshot" command. + Relies on the nodes launched in snapshot.test + ''' + def __init__(self, *args): + self.params = rospy.get_param("snapshot") + self._parse_params(self.params) + self.last_status = None + self.status_sub = rospy.Subscriber("snapshot_status", SnapshotStatus, self._status_cb, queue_size=5) + self.trigger = rospy.ServiceProxy("trigger_snapshot", TriggerSnapshot) + self.enable = rospy.ServiceProxy("enable_snapshot", SetBool) + super(TestRosbagSnapshot, self).__init__(*args) + + def _parse_params(self, params): + ''' + Parse launch parameters of snapshot to cache the topic limits in a map + so it is easier to check compliance in later tests. + ''' + self.topic_limits = {} + self.default_duration_limit = params['default_duration_limit'] + self.default_memory_limit = params['default_memory_limit'] + for topic_obj in self.params['topics']: + duration = self.default_duration_limit + memory = self.default_memory_limit + if type(topic_obj) == dict: + topic = topic_obj.keys()[0] + duration = topic_obj[topic].get('duration', duration) + memory = topic_obj[topic].get('memory', memory) + else: + topic = topic_obj + topic = rospy.resolve_name(topic) + duration = rospy.Duration(duration) + memory = 1E6 * memory + self.topic_limits[topic] = (duration, memory) + + def _status_cb(self, msg): + self.last_status = msg + + def _assert_no_data(self, topics=[]): + ''' + Asserts that calling TriggerWrite service with + specifed parameters responds non-success and did not create + a bag file. + ''' + filename = tempfile.mktemp() + res = self.trigger(filename=filename, topics=topics) + self.assertFalse(res.success) + self.assertEqual(res.message, TriggerSnapshotResponse.NO_DATA) + self.assertFalse(os.path.isfile(filename)) + + def _assert_record_success(self, data): + ''' + Assert that the recording SetBool service responds with success + ''' + res = self.enable(data) + self.assertTrue(res.success) + self.assertEqual(res.message, "") + + def _pause(self): + self._assert_record_success(False) + + def _resume(self): + self._assert_record_success(True) + + def _assert_write_success(self, topics=[], prefix_mode=False, **kwargs): + ''' + Asserts that the TriggerWrite services succeeds for the specified request arguments + and that the specified bag file is actually created + @param prefix_mode: If True, don't put .bag at the end of reqest to check prefix filename mode + ''' + if prefix_mode: + d = tempfile.mkdtemp() + filename = tempfile.mktemp(dir=d) + else: + filename = tempfile.mktemp(suffix='.bag') + req = TriggerSnapshotRequest(filename=filename, topics=topics, **kwargs) + res = self.trigger(req) + self.assertTrue(res.success, msg="snapshot should have succeeded. message: {}".format(res.message)) + self.assertTrue(res.message == "") + if prefix_mode: + dircontents = os.listdir(d) + self.assertEqual(len(dircontents), 1) + filename = os.path.join(d, dircontents[0]) + self.assertEqual(filename[-4:], '.bag') + self.assertTrue(os.path.isfile(filename)) + return filename + + def _assert_limits_enforced(self, test_topic, duration, memory): + ''' + Asserts that the measured duration and memory for a topic comply with the launch parameters + @param topic: string + @param duration: rospy.Duration, age of buffer + @param memory: integer, bytes of memory used + ''' + test_topic = rospy.resolve_name(test_topic) + self.assertIn(test_topic, self.topic_limits) + limits = self.topic_limits[test_topic] + if limits[0] > rospy.Duration(): + self.assertLessEqual(duration, limits[0]) + if limits[1] > 0: + self.assertLessEqual(memory, limits[1]) + + def _assert_status_valid(self): + ''' + Check that a status message contains info on all subscribed topics + and reports that their buffer complies with the configured limits. + ''' + self.assertIsNotNone(self.last_status) # A message was recieved + topics = [msg.topic for msg in self.last_status.topics] + # Oneliners :) + status_topics = [rospy.resolve_name(topic.keys()[0] if type(topic) == dict else topic) + for topic in self.params['topics']] + self.assertEquals(set(topics), set(status_topics)) # Topics from params are same as topics in status message + for topic in self.last_status.topics: + duration = topic.window_stop - topic.window_start + memory = topic.traffic + self._assert_limits_enforced(topic.topic, duration, memory) + + def _assert_bag_valid(self, filename, topics=None, start_time=None, stop_time=None): + ''' + Open the bagfile at the specified filename and read it to ensure topic limits were + enforced and the optional topic list and start/stop times are also enforced. + ''' + bag = Bag(filename) + topics_dict = bag.get_type_and_topic_info()[1] + bag_topics = set(topics_dict.keys()) + param_topics = set(self.topic_limits.keys()) + if topics: + self.assertEqual(bag_topics, set(topics)) + self.assertTrue(bag_topics.issubset(param_topics)) + for topic in topics_dict: + size = topics_dict[topic].message_count * 8 # Calculate stored message size as each message is 8 bytes + gen = bag.read_messages(topics=topic) + _, _, first_time = gen.next() + if start_time: + self.assertGreaterEqual(first_time, start_time) + for _, _, last_time in gen: # Read through all messages so last_time is valid + pass + if stop_time: + self.assertLessEqual(last_time, stop_time) + duration = last_time - first_time + self._assert_limits_enforced(topic, duration, size) + + def test_1_service_connects(self): + ''' + Check that both services provided by snapshot exist. + ''' + self.trigger.wait_for_service() + self.enable.wait_for_service() + + def test_write_all(self): + ''' + Wait long enough for memory & duration limits to need to be used + ''' + rospy.sleep(3.0) # Give some time to fill buffers to maximums + self._assert_status_valid() + filename = self._assert_write_success(prefix_mode=True) + self._assert_bag_valid(filename) + + def test_write_advanced(self): + ''' + Test the more advanced features: pausing and resuming, specific write times, and specific topic list. + ''' + # Pause, resume, and pause again so buffer should only contain data from a known time internal + self._pause() + rospy.sleep(1.5) + start = rospy.Time.now() + self._resume() + rospy.sleep(3.0) + self._pause() + stop = rospy.Time.now() + rospy.sleep(1.0) + + # Write all buffer data, check that only data from resumed interval is present + filename = self._assert_write_success() + self._assert_bag_valid(filename, start_time=start, stop_time=stop) + + # With recording still paused (same buffer as last write), write only a shorter time range + cropped_start = start + rospy.Duration(0.5) + cropped_stop = stop - rospy.Duration(1.0) + filename = self._assert_write_success(start_time=cropped_start, stop_time=cropped_stop) + self._assert_bag_valid(filename, start_time=cropped_start, stop_time=cropped_stop) + + # Write only specific topics and ensure that only those are present in bag file + specific_topics = [rospy.resolve_name(topic) for topic in ['/test2', 'test1']] + filename = self._assert_write_success(topics=specific_topics) + self._assert_bag_valid(filename, topics=specific_topics) + + # Resume recording for other tests + self._resume() + + def test_invalid_topics(self): + ''' + Test that an invalid topic or one not subscribed to fails + ''' + self._assert_no_data(['_invalid_graph_name']) + self._assert_no_data(['/test4']) + + +if __name__ == '__main__': + import rostest + PKG = 'rosbag' + rospy.init_node('test_rosbag_snapshot', anonymous=True) + rostest.run(PKG, 'TestRosbagSnapshot', TestRosbagSnapshot, sys.argv) diff --git a/tools/rosbag/CMakeLists.txt b/tools/rosbag/CMakeLists.txt index 4fa8ff90d4..34d4f4fb84 100644 --- a/tools/rosbag/CMakeLists.txt +++ b/tools/rosbag/CMakeLists.txt @@ -5,7 +5,7 @@ if(NOT WIN32) set_directory_properties(PROPERTIES COMPILE_OPTIONS "-Wall;-Wextra") endif() -find_package(catkin REQUIRED COMPONENTS rosbag_storage rosconsole roscpp std_srvs topic_tools xmlrpcpp) +find_package(catkin REQUIRED COMPONENTS rosbag_storage rosconsole roscpp std_srvs topic_tools xmlrpcpp rosbag_msgs) find_package(Boost REQUIRED COMPONENTS date_time regex program_options filesystem) find_package(BZip2 REQUIRED) @@ -21,13 +21,16 @@ include_directories(include ${catkin_INCLUDE_DIRS} ${Boost_INCLUDE_DIRS} catkin_package( LIBRARIES rosbag INCLUDE_DIRS include - CATKIN_DEPENDS rosbag_storage rosconsole roscpp std_srvs topic_tools xmlrpcpp) + CATKIN_DEPENDS rosbag_storage rosconsole roscpp std_srvs topic_tools xmlrpcpp rosbag_msgs) add_library(rosbag src/player.cpp src/recorder.cpp + src/snapshotter.cpp src/time_translator.cpp) +add_dependencies(rosbag ${catkin_EXPORTED_TARGETS}) + target_link_libraries(rosbag ${catkin_LIBRARIES} ${Boost_LIBRARIES} ${BZIP2_LIBRARIES} ) @@ -35,6 +38,9 @@ target_link_libraries(rosbag ${catkin_LIBRARIES} ${Boost_LIBRARIES} add_executable(record src/record.cpp) target_link_libraries(record rosbag) +add_executable(snapshot src/snapshot.cpp) +target_link_libraries(snapshot rosbag) + add_executable(play src/play.cpp) target_link_libraries(play rosbag) @@ -50,7 +56,7 @@ install(TARGETS rosbag ARCHIVE DESTINATION ${CATKIN_PACKAGE_LIB_DESTINATION} LIBRARY DESTINATION ${CATKIN_PACKAGE_LIB_DESTINATION} RUNTIME DESTINATION ${CATKIN_GLOBAL_BIN_DESTINATION}) -install(TARGETS record play +install(TARGETS record play snapshot ARCHIVE DESTINATION ${CATKIN_PACKAGE_LIB_DESTINATION} LIBRARY DESTINATION ${CATKIN_PACKAGE_LIB_DESTINATION} RUNTIME DESTINATION ${CATKIN_PACKAGE_BIN_DESTINATION}) diff --git a/tools/rosbag/include/rosbag/snapshotter.h b/tools/rosbag/include/rosbag/snapshotter.h new file mode 100644 index 0000000000..e93e74b89d --- /dev/null +++ b/tools/rosbag/include/rosbag/snapshotter.h @@ -0,0 +1,258 @@ +/********************************************************************* +* Software License Agreement (BSD License) +* +* Copyright (c) 2018, Open Source Robotics Foundation, Inc. +* All rights reserved. +* +* Redistribution and use in source and binary forms, with or without +* modification, are permitted provided that the following conditions +* are met: +* +* * Redistributions of source code must retain the above copyright +* notice, this list of conditions and the following disclaimer. +* * Redistributions in binary form must reproduce the above +* copyright notice, this list of conditions and the following +* disclaimer in the documentation and/or other materials provided +* with the distribution. +* * Neither the name of Open Source Robotics Foundation, Inc. nor the +* names of its contributors may be used to endorse or promote products +* derived from this software without specific prior written permission. +* +* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS +* FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE +* COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, +* INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, +* BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; +* LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER +* CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT +* LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN +* ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE +* POSSIBILITY OF SUCH DAMAGE. +********************************************************************/ +#ifndef ROSBAG_SNAPSHOTER_H +#define ROSBAG_SNAPSHOTER_H + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include "rosbag/bag.h" +#include "rosbag/macros.h" + +namespace rosbag +{ +class ROSBAG_DECL Snapshotter; + +/* Configuration for a single topic in the Snapshotter node. Holds + * the buffer limits for a topic by duration (time difference between newest and oldest message) + * and memory usage, in bytes. + */ +struct ROSBAG_DECL SnapshotterTopicOptions +{ + // When the value of duration_limit_, do not truncate the buffer no matter how large the duration is + static const ros::Duration NO_DURATION_LIMIT; + // When the value of memory_limit_, do not trunctate the buffer no matter how much memory it consumes (DANGROUS) + static const int32_t NO_MEMORY_LIMIT; + // When the value of duration_limit_, inherit the limit from the node's configured default + static const ros::Duration INHERIT_DURATION_LIMIT; + // When the value of memory_limit_, inherit the limit from the node's configured default + static const int32_t INHERIT_MEMORY_LIMIT; + + // Maximum difference in time from newest and oldest message in buffer before older messages are removed + ros::Duration duration_limit_; + // Maximum memory usage of the buffer before older messages ar eremoved + int32_t memory_limit_; + + SnapshotterTopicOptions(ros::Duration duration_limit = INHERIT_DURATION_LIMIT, + int32_t memory_limit = INHERIT_MEMORY_LIMIT); +}; + +/* Configuration for the Snapshotter node. Contains default limits for memory and duration + * and a map of topics to their limits which may override the defaults. + */ +struct ROSBAG_DECL SnapshotterOptions +{ + // Duration limit to use for a topic's buffer if one is not specified + ros::Duration default_duration_limit_; + // Memory limit to use for a topic's buffer if one is not specified + int32_t default_memory_limit_; + // Period between publishing topic status messages. If <= ros::Duration(0), don't publish status + ros::Duration status_period_; + typedef std::map topics_t; + // Provides list of topics to snapshot and their limit configurations + topics_t topics_; + + SnapshotterOptions(ros::Duration default_duration_limit = ros::Duration(30), int32_t default_memory_limit = -1, + ros::Duration status_period = ros::Duration(1)); + + // Add a new topic to the configuration + void addTopic(std::string const& topic, ros::Duration duration_limit = SnapshotterTopicOptions::INHERIT_DURATION_LIMIT, + int32_t memory_limit = SnapshotterTopicOptions::INHERIT_MEMORY_LIMIT); +}; + +/* Stores a buffered message of an ambiguous type and it's associated metadata (time of arrival, connection data), + * for later writing to disk + */ +struct ROSBAG_DECL SnapshotMessage +{ + SnapshotMessage(topic_tools::ShapeShifter::ConstPtr _msg, boost::shared_ptr _connection_header, + ros::Time _time); + topic_tools::ShapeShifter::ConstPtr msg; + boost::shared_ptr connection_header; + // ROS time when messaged arrived (does not use header stamp) + ros::Time time; +}; + +/* Stores a queue of buffered messages for a single topic ensuring + * that the duration and memory limits are respected by truncating + * as needed on push() operations. + */ +class ROSBAG_DECL MessageQueue +{ + friend Snapshotter; + +private: + // Locks access to size_ and queue_ + boost::mutex lock; + // Stores limits on buffer size and duration + SnapshotterTopicOptions options_; + // Current total size of the queue, in bytes + int64_t size_; + typedef std::deque queue_t; + queue_t queue_; + // Subscriber to the callback which uses this queue + boost::shared_ptr sub_; + +public: + MessageQueue(SnapshotterTopicOptions const& options); + // Add a new message to the internal queue if possible, truncating the front of the queue as needed to enforce limits + void push(SnapshotMessage const& msg); + // Removes the message at the front of the queue (oldest) and returns it + SnapshotMessage pop(); + // Returns the time difference between back and front of queue, or 0 if size <= 1 + ros::Duration duration() const; + // Clear internal buffer + void clear(); + // Store the subscriber for this topic's queue internaly so it is not deleted + void setSubscriber(boost::shared_ptr sub); + // Put data about oldest/newest message time, message count, and buffersize into status message + void fillStatus(rosgraph_msgs::TopicStatistics& status); + typedef std::pair range_t; + // Get a begin and end iterator into the buffer respecting the start and end timestamp constraints + range_t rangeFromTimes(ros::Time const& start, ros::Time const& end); + +private: + // Internal push whitch does not obtain lock + void _push(SnapshotMessage const& msg); + // Internal pop which does not obtain lock + SnapshotMessage _pop(); + // Internal clear which does not obtain lock + void _clear(); + // Truncate front of queue as needed to fit a new message of specified size and time. Returns False if this is + // impossible. + bool preparePush(int32_t size, ros::Time const& time); +}; + +/* Snapshotter node. Maintains a circular buffer of the most recent messages from configured topics + * while enforcing limits on memory and duration. The node can be triggered to write some or all + * of these buffers to a bag file via a service call. Useful in live testing scenerios where interesting + * data may be produced before a user has the oppurtunity to "rosbag record" the data. + */ +class ROSBAG_DECL Snapshotter +{ +public: + Snapshotter(SnapshotterOptions const& options); + // Sets up callbacks and spins until node is killed + int run(); + +private: + // Subscribe queue size for each topic + static const int QUEUE_SIZE; + SnapshotterOptions options_; + typedef std::map > buffers_t; + buffers_t buffers_; + // Locks recording_ and writing_ states. + boost::upgrade_mutex state_lock_; + // True if new messages are being written to the internal buffer + bool recording_; + // True if currently writing buffers to a bag file + bool writing_; + ros::NodeHandle nh_; + ros::ServiceServer trigger_snapshot_server_; + ros::ServiceServer enable_server_; + ros::Publisher status_pub_; + ros::Timer status_timer_; + + // Replace individual topic limits with node defaults if they are flagged for it (see SnapshotterTopicOptions) + void fixTopicOptions(SnapshotterTopicOptions& options); + // If file is "prefix" mode (doesn't end in .bag), append current datetime and .bag to end + bool postfixFilename(std::string& file); + /// Return current local datetime as a string such as 2018-05-22-14-28-51. Used to generate bag filenames + std::string timeAsStr(); + // Clear the internal buffers of all topics. Used when resuming after a pause to avoid time gaps + void clear(); + // Subscribe to one of the topics, setting up the callback to add to the respective queue + void subscribe(std::string const& topic, boost::shared_ptr queue); + // Called on new message from any configured topic. Adds to queue for that topic + void topicCB(const ros::MessageEvent& msg_event, + boost::shared_ptr queue); + // Service callback, write all of part of the internal buffers to a bag file according to request parameters + bool triggerSnapshotCb(rosbag_msgs::TriggerSnapshot::Request& req, rosbag_msgs::TriggerSnapshot::Response& res); + // Service callback, enable or disable recording (storing new messages into queue). Used to pause before writing + bool enableCB(std_srvs::SetBool::Request& req, std_srvs::SetBool::Response& res); + // Set recording_ to false and do nessesary cleaning, CALLER MUST OBTAIN LOCK + void pause(); + // Set recording_ to true and do nesessary cleaning, CALLER MUST OBTAIN LOCK + void resume(); + // Publish status containing statistics of currently buffered topics and other state + void publishStatus(ros::TimerEvent const& e); + // Write the parts of message_queue within the time constraints of req to the queue + // If returns false, there was an error opening/writing the bag and an error message was written to res.message + bool writeTopic(rosbag::Bag& bag, MessageQueue& message_queue, std::string const& topic, + rosbag_msgs::TriggerSnapshot::Request& req, rosbag_msgs::TriggerSnapshot::Response& res); +}; + +// Configuration for SnapshotterClient +struct ROSBAG_DECL SnapshotterClientOptions +{ + SnapshotterClientOptions(); + enum Action + { + TRIGGER_WRITE, + PAUSE, + RESUME + }; + // What to do when SnapshotterClient.run is called + Action action_; + // List of topics to write when action_ == TRIGGER_WRITE. If empty, write all buffered topics. + std::vector topics_; + // Name of file to write to when action_ == TRIGGER_WRITE, relative to snapshot node. If empty, use prefix + std::string filename_; + // Prefix of the name of file written to when action_ == TRIGGER_WRITE. + std::string prefix_; +}; + +// Node used to call services which interface with the snapshotter node to trigger write, pause, and resume +class ROSBAG_DECL SnapshotterClient +{ +public: + SnapshotterClient(); + int run(SnapshotterClientOptions const& opts); + +private: + ros::NodeHandle nh_; +}; + +} // namespace rosbag + +#endif diff --git a/tools/rosbag/package.xml b/tools/rosbag/package.xml index 836eba113c..f259327006 100644 --- a/tools/rosbag/package.xml +++ b/tools/rosbag/package.xml @@ -34,6 +34,7 @@ python3-pil roscpp_serialization topic_tools + rosbag_msgs genmsg genpy @@ -46,6 +47,7 @@ roslib rospy topic_tools + rosbag_msgs diff --git a/tools/rosbag/src/rosbag/rosbag_main.py b/tools/rosbag/src/rosbag/rosbag_main.py index 5dcfbfe68e..bffe1d1626 100644 --- a/tools/rosbag/src/rosbag/rosbag_main.py +++ b/tools/rosbag/src/rosbag/rosbag_main.py @@ -146,6 +146,21 @@ def record_cmd(argv): process = subprocess.Popen(cmd) process.wait() +def snapshot_cmd(argv): + exepath = roslib.packages.find_node('rosbag', 'snapshot') + if not exepath: + parser.error("Cannot find rosbag/snapshot executable") + cmd = [exepath[0]] + cmd.extend(argv) + + old_handler = signal.signal( + signal.SIGTERM, + lambda signum, frame: _stop_process(signum, frame, old_handler, process) + ) + # Better way of handling it than os.execv + # This makes sure stdin handles are passed to the process. + process = subprocess.Popen(cmd) + process.wait() def info_cmd(argv): parser = optparse.OptionParser(usage='rosbag info [options] BAGFILE1 [BAGFILE2 BAGFILE3 ...]', @@ -983,6 +998,7 @@ def terminal_width(): def rosbagmain(argv=None): cmds = RosbagCmds() cmds.add_cmd('record', record_cmd, "Record a bag file with the contents of specified topics.") + cmds.add_cmd('snapshot', snapshot_cmd, "Keep a buffer of recent message then write to bag file when externally triggered.") cmds.add_cmd('info', info_cmd, 'Summarize the contents of one or more bag files.') cmds.add_cmd('play', play_cmd, "Play back the contents of one or more bag files in a time-synchronized fashion.") cmds.add_cmd('check', check_cmd, 'Determine whether a bag is playable in the current system, or if it can be migrated.') diff --git a/tools/rosbag/src/snapshot.cpp b/tools/rosbag/src/snapshot.cpp new file mode 100644 index 0000000000..2733c09f0c --- /dev/null +++ b/tools/rosbag/src/snapshot.cpp @@ -0,0 +1,252 @@ +/********************************************************************* +* Software License Agreement (BSD License) +* +* Copyright (c) 2018, Open Source Robotics Foundation, Inc. +* All rights reserved. +* +* Redistribution and use in source and binary forms, with or without +* modification, are permitted provided that the following conditions +* are met: +* +* * Redistributions of source code must retain the above copyright +* notice, this list of conditions and the following disclaimer. +* * Redistributions in binary form must reproduce the above +* copyright notice, this list of conditions and the following +* disclaimer in the documentation and/or other materials provided +* with the distribution. +* * Neither the name of Open Source Robotics Foundation, Inc. nor the +* names of its contributors may be used to endorse or promote products +* derived from this software without specific prior written permission. +* +* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS +* FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE +* COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, +* INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, +* BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; +* LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER +* CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT +* LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN +* ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE +* POSSIBILITY OF SUCH DAMAGE. +********************************************************************/ + +#include "rosbag/snapshotter.h" +#include "rosbag/exceptions.h" + +#include "boost/program_options.hpp" +#include +#include + +namespace po = boost::program_options; + +using rosbag::Snapshotter; +using rosbag::SnapshotterClient; +using rosbag::SnapshotterOptions; +using rosbag::SnapshotterTopicOptions; +using rosbag::SnapshotterClientOptions; + +const int MB_TO_BYTES = 1E6; + +bool parseOptions(po::variables_map& vm, int argc, char** argv) +{ + // Strip ros arguments and reassemble + ros::V_string cleaned_args; + ros::removeROSArgs(argc, argv, cleaned_args); + int cleaned_argc = cleaned_args.size(); + char const* cleaned_argv[cleaned_argc]; + for (int i = 0; i < cleaned_argc; ++i) + cleaned_argv[i] = cleaned_args[i].c_str(); + + po::options_description desc("Options"); + // clang-format off + desc.add_options() + ("help,h", "produce help message") + ("trigger-write,t", "Write buffer of selected topcis to a bag file") + ("pause,p", "Stop buffering new messages until resumed or write is triggered") + ("resume,r", "Resume buffering new messages, writing over older messages as needed") + ("size,s", po::value()->default_value(-1), "Maximum memory per topic to use in buffering in MB. Default: no limit") + ("duration,d", po::value()->default_value(30.0), "Maximum difference between newest and oldest buffered message per topic in seconds. Default: 30") + ("output-prefix,o", po::value()->default_value(""), "When in trigger write mode, prepend PREFIX to name of writting bag file") + ("output-filename,O", po::value(), "When in trigger write mode, exact name of written bag file") + ("topic", po::value >(), "Topic to buffer. If triggering write, write only these topics instead of all buffered topics."); + // clang-format on + po::positional_options_description p; + p.add("topic", -1); + + try + { + po::store(po::command_line_parser(cleaned_argc, cleaned_argv).options(desc).positional(p).run(), vm); + po::notify(vm); + } + catch (boost::program_options::error const& e) + { + std::cout << "rosbag snapshot: " << e.what() << std::endl; + return false; + } + + if (vm.count("help")) + { + std::cout << "Usage: rosbag snapshot [options] [topic1 topic2 ...]" << std::endl + << std::endl + << "Buffer recent messages until triggered to write or trigger an already running instance." << std::endl + << std::endl; + std::cout << desc << std::endl; + return false; + } + return true; +} + +bool parseVariablesMap(SnapshotterOptions& opts, po::variables_map const& vm) +{ + if (vm.count("topic")) + { + std::vector topics = vm["topic"].as >(); + BOOST_FOREACH (std::string& str, topics) + opts.addTopic(str); + } + opts.default_memory_limit_ = int(MB_TO_BYTES * vm["size"].as()); + opts.default_duration_limit_ = ros::Duration(vm["duration"].as()); + return true; +} + +bool parseVariablesMapClient(SnapshotterClientOptions& opts, po::variables_map const& vm) +{ + if (vm.count("pause")) + opts.action_ = SnapshotterClientOptions::PAUSE; + else if (vm.count("resume")) + opts.action_ = SnapshotterClientOptions::RESUME; + else if (vm.count("trigger-write")) + { + opts.action_ = SnapshotterClientOptions::TRIGGER_WRITE; + if (vm.count("topic")) + opts.topics_ = vm["topic"].as >(); + if (vm.count("output-prefix")) + opts.prefix_ = vm["output-prefix"].as(); + if (vm.count("output-filename")) + opts.filename_ = vm["output-filename"].as(); + } + return true; +} + +/* Read configured topics and limits from ROS params + * TODO: use exceptions instead of asserts to follow style conventions + * See snapshot.test in test_rosbag for an example + */ +void appendParamOptions(ros::NodeHandle& nh, SnapshotterOptions& opts) +{ + using XmlRpc::XmlRpcValue; + XmlRpcValue topics; + + // Override program options for default limits if the parameters are set. + double tmp; + if (nh.getParam("default_memory_limit", tmp)) + opts.default_memory_limit_ = int(MB_TO_BYTES * tmp); + if (nh.getParam("default_duration_limit", tmp)) + opts.default_duration_limit_ = ros::Duration(tmp); + + if (!nh.getParam("topics", topics)) + { + return; + } + ROS_ASSERT_MSG(topics.getType() == XmlRpcValue::TypeArray, "topics param must be an array"); + // Iterator caused exception, hmmm... + size_t size = topics.size(); + for (size_t i = 0; i < size; ++i) + { + XmlRpcValue topic_value = topics[i]; + // If it is just a string, add this topic + if (topic_value.getType() == XmlRpcValue::TypeString) + { + opts.addTopic(topic_value); + } + else if (topic_value.getType() == XmlRpcValue::TypeStruct) + { + ROS_ASSERT_MSG(topic_value.size() == 1, "Paramater invalid for topic %lu", i); + std::string const& topic = (*topic_value.begin()).first; + XmlRpcValue& topic_config = (*topic_value.begin()).second; + ROS_ASSERT_MSG(topic_config.getType() == XmlRpcValue::TypeStruct, "Topic limits invalid for: '%s'", + topic.c_str()); + + ros::Duration dur = SnapshotterTopicOptions::INHERIT_DURATION_LIMIT; + int64_t mem = SnapshotterTopicOptions::INHERIT_MEMORY_LIMIT; + std::string duration = "duration"; + std::string memory = "memory"; + if (topic_config.hasMember(duration)) + { + XmlRpcValue& dur_limit = topic_config[duration]; + if (dur_limit.getType() == XmlRpcValue::TypeDouble) + { + double seconds = dur_limit; + dur = ros::Duration(seconds); + } + else if (dur_limit.getType() == XmlRpcValue::TypeInt) + { + int seconds = dur_limit; + dur = ros::Duration(seconds, 0); + } + else + ROS_FATAL("err"); + } + if (topic_config.hasMember("memory")) + { + XmlRpcValue& mem_limit = topic_config[memory]; + if (mem_limit.getType() == XmlRpcValue::TypeDouble) + { + double mb = mem_limit; + mem = int(MB_TO_BYTES * mb); + } + else if (mem_limit.getType() == XmlRpcValue::TypeInt) + { + int mb = mem_limit; + mem = MB_TO_BYTES * mb; + } + else + ROS_FATAL("err"); + } + opts.addTopic(topic, dur, mem); + } + else + ROS_ASSERT_MSG(false, "Parameter invalid for topic %lu", i); + } +} + +int main(int argc, char** argv) +{ + po::variables_map vm; + if (!parseOptions(vm, argc, argv)) + return 1; + + // If any of the client flags are on, use the client + if (vm.count("trigger-write") || vm.count("pause") || vm.count("resume")) + { + SnapshotterClientOptions opts; + if (!parseVariablesMapClient(opts, vm)) + return 1; + ros::init(argc, argv, "snapshot_client", ros::init_options::AnonymousName); + SnapshotterClient client; + return client.run(opts); + } + + // Parse the command-line options + SnapshotterOptions opts; + if (!parseVariablesMap(opts, vm)) + return 1; + + ros::init(argc, argv, "snapshot", ros::init_options::AnonymousName); + // Get additional topic configurations if they're in ROS params + ros::NodeHandle private_nh("~"); + appendParamOptions(private_nh, opts); + + // Exit if not topics selected + if (!opts.topics_.size()) + { + ROS_FATAL("No topics selected. Exiting."); + return 1; + } + + // Run the snapshotter + rosbag::Snapshotter snapshotter(opts); + return snapshotter.run(); +} diff --git a/tools/rosbag/src/snapshotter.cpp b/tools/rosbag/src/snapshotter.cpp new file mode 100644 index 0000000000..aefcb9c451 --- /dev/null +++ b/tools/rosbag/src/snapshotter.cpp @@ -0,0 +1,607 @@ +/********************************************************************* +* Software License Agreement (BSD License) +* +* Copyright (c) 2018, Open Source Robotics Foundation, Inc. +* All rights reserved. +* +* Redistribution and use in source and binary forms, with or without +* modification, are permitted provided that the following conditions +* are met: +* +* * Redistributions of source code must retain the above copyright +* notice, this list of conditions and the following disclaimer. +* * Redistributions in binary form must reproduce the above +* copyright notice, this list of conditions and the following +* disclaimer in the documentation and/or other materials provided +* with the distribution. +* * Neither the name of Open Source Robotics Foundation, Inc. nor the +* names of its contributors may be used to endorse or promote products +* derived from this software without specific prior written permission. +* +* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS +* FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE +* COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, +* INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, +* BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; +* LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER +* CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT +* LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN +* ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE +* POSSIBILITY OF SUCH DAMAGE. +********************************************************************/ +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include "rosbag/snapshotter.h" + +using std::string; +using boost::shared_ptr; +using ros::Time; + +namespace rosbag +{ +const ros::Duration SnapshotterTopicOptions::NO_DURATION_LIMIT = ros::Duration(-1); +const int32_t SnapshotterTopicOptions::NO_MEMORY_LIMIT = -1; +const ros::Duration SnapshotterTopicOptions::INHERIT_DURATION_LIMIT = ros::Duration(0); +const int32_t SnapshotterTopicOptions::INHERIT_MEMORY_LIMIT = 0; + +SnapshotterTopicOptions::SnapshotterTopicOptions(ros::Duration duration_limit, int32_t memory_limit) + : duration_limit_(duration_limit), memory_limit_(memory_limit) +{ +} + +SnapshotterOptions::SnapshotterOptions(ros::Duration default_duration_limit, int32_t default_memory_limit, + ros::Duration status_period) + : default_duration_limit_(default_duration_limit) + , default_memory_limit_(default_memory_limit) + , status_period_(status_period) + , topics_() +{ +} + +void SnapshotterOptions::addTopic(std::string const& topic, ros::Duration duration, int32_t memory) +{ + SnapshotterTopicOptions ops(duration, memory); + topics_.insert(topics_t::value_type(topic, ops)); +} + +SnapshotterClientOptions::SnapshotterClientOptions() : action_(SnapshotterClientOptions::TRIGGER_WRITE) +{ +} + +SnapshotMessage::SnapshotMessage(topic_tools::ShapeShifter::ConstPtr _msg, + boost::shared_ptr _connection_header, Time _time) + : msg(_msg), connection_header(_connection_header), time(_time) +{ +} + +MessageQueue::MessageQueue(SnapshotterTopicOptions const& options) : options_(options), size_(0) +{ +} + +void MessageQueue::setSubscriber(shared_ptr sub) +{ + sub_ = sub; +} + +void MessageQueue::fillStatus(rosgraph_msgs::TopicStatistics& status) +{ + boost::mutex::scoped_lock l(lock); + if (!queue_.size()) + return; + status.traffic = size_; + status.delivered_msgs = queue_.size(); + status.window_start = queue_.front().time; + status.window_stop = queue_.back().time; +} + +void MessageQueue::clear() +{ + boost::mutex::scoped_lock l(lock); + _clear(); +} + +void MessageQueue::_clear() +{ + queue_.clear(); + size_ = 0; +} + +ros::Duration MessageQueue::duration() const +{ + // No duration if 0 or 1 messages + if (queue_.size() <= 1) + return ros::Duration(); + return queue_.back().time - queue_.front().time; +} + +bool MessageQueue::preparePush(int32_t size, ros::Time const& time) +{ + // If new message is older than back of queue, time has gone backwards and buffer must be cleared + if (!queue_.empty() and time < queue_.back().time) + { + ROS_WARN("Time has gone backwards. Clearing buffer for this topic."); + _clear(); + } + + // The only case where message cannot be addded is if size is greater than limit + if (options_.memory_limit_ > SnapshotterTopicOptions::NO_MEMORY_LIMIT && size > options_.memory_limit_) + return false; + + // If memory limit is enforced, remove elements from front of queue until limit would be met once message is added + if (options_.memory_limit_ > SnapshotterTopicOptions::NO_MEMORY_LIMIT) + while (queue_.size() != 0 && size_ + size > options_.memory_limit_) + _pop(); + + // If duration limit is encforced, remove elements from front of queue until duration limit would be met once message + // is added + if (options_.duration_limit_ > SnapshotterTopicOptions::NO_DURATION_LIMIT && queue_.size() != 0) + { + ros::Duration dt = time - queue_.front().time; + while (dt > options_.duration_limit_) + { + _pop(); + if (queue_.empty()) + break; + dt = time - queue_.front().time; + } + } + return true; +} +void MessageQueue::push(SnapshotMessage const& _out) +{ + boost::mutex::scoped_try_lock l(lock); + if (!l.owns_lock()) + { + ROS_ERROR("Failed to lock. Time %f", _out.time.toSec()); + return; + } + _push(_out); +} + +SnapshotMessage MessageQueue::pop() +{ + boost::mutex::scoped_lock l(lock); + return _pop(); +} +void MessageQueue::_push(SnapshotMessage const& _out) +{ + int32_t size = _out.msg->size(); + // If message cannot be added without violating limits, it must be dropped + if (not preparePush(size, _out.time)) + return; + queue_.push_back(_out); + // Add size of new message to running count to maintain correctness + size_ += _out.msg->size(); +} + +SnapshotMessage MessageQueue::_pop() +{ + SnapshotMessage tmp = queue_.front(); + queue_.pop_front(); + // Remove size of popped message to maintain correctness of size_ + size_ -= tmp.msg->size(); + return tmp; +} + +MessageQueue::range_t MessageQueue::rangeFromTimes(Time const& start, Time const& stop) +{ + range_t::first_type begin = queue_.begin(); + range_t::second_type end = queue_.end(); + + // Increment / Decrement iterators until time contraints are met + if (not start.isZero()) + { + while (begin != end and (*begin).time < start) + ++begin; + } + if (not stop.isZero()) + { + while (end != begin and (*(end - 1)).time > stop) + --end; + } + return range_t(begin, end); +} + +const int Snapshotter::QUEUE_SIZE = 10; + +Snapshotter::Snapshotter(SnapshotterOptions const& options) : options_(options), recording_(true), writing_(false) +{ + status_pub_ = nh_.advertise("snapshot_status", 10); +} + +void Snapshotter::fixTopicOptions(SnapshotterTopicOptions& options) +{ + if (options.duration_limit_ == SnapshotterTopicOptions::INHERIT_DURATION_LIMIT) + options.duration_limit_ = options_.default_duration_limit_; + if (options.memory_limit_ == SnapshotterTopicOptions::INHERIT_MEMORY_LIMIT) + options.memory_limit_ = options_.default_memory_limit_; +} + +bool Snapshotter::postfixFilename(string& file) +{ + size_t ind = file.rfind(".bag"); + // If requested ends in .bag, this is literal name do not append date + if (ind != string::npos && ind == file.size() - 4) + { + return true; + } + // Otherwise treat as prefix and append datetime and extension + file += timeAsStr() + ".bag"; + return true; +} + +string Snapshotter::timeAsStr() +{ + std::stringstream msg; + const boost::posix_time::ptime now = boost::posix_time::second_clock::local_time(); + boost::posix_time::time_facet* const f = new boost::posix_time::time_facet("%Y-%m-%d-%H-%M-%S"); + msg.imbue(std::locale(msg.getloc(), f)); + msg << now; + return msg.str(); +} + +void Snapshotter::topicCB(const ros::MessageEvent& msg_event, + boost::shared_ptr queue) +{ + // If recording is paused (or writing), exit + { + boost::shared_lock lock(state_lock_); + if (!recording_) + { + return; + } + } + + // Pack message and metadata into SnapshotMessage holder + SnapshotMessage out(msg_event.getMessage(), msg_event.getConnectionHeaderPtr(), msg_event.getReceiptTime()); + queue->push(out); +} + +void Snapshotter::subscribe(string const& topic, boost::shared_ptr queue) +{ + ROS_INFO("Subscribing to %s", topic.c_str()); + + shared_ptr sub(boost::make_shared()); + ros::SubscribeOptions ops; + ops.topic = topic; + ops.queue_size = QUEUE_SIZE; + ops.md5sum = ros::message_traits::md5sum(); + ops.datatype = ros::message_traits::datatype(); + ops.helper = + boost::make_shared&> >( + boost::bind(&Snapshotter::topicCB, this, _1, queue)); + *sub = nh_.subscribe(ops); + queue->setSubscriber(sub); +} + +bool Snapshotter::writeTopic(rosbag::Bag& bag, MessageQueue& message_queue, string const& topic, + rosbag_msgs::TriggerSnapshot::Request& req, rosbag_msgs::TriggerSnapshot::Response& res) +{ + // acquire lock for this queue + boost::mutex::scoped_lock l(message_queue.lock); + + MessageQueue::range_t range = message_queue.rangeFromTimes(req.start_time, req.stop_time); + + // open bag if this the first valid topic and there is data + if (not bag.isOpen() and range.second > range.first) + { + try + { + bag.open(req.filename, bagmode::Write); + } + catch (rosbag::BagException const& err) + { + res.success = false; + res.message = string("failed to open bag: ") + err.what(); + return false; + } + ROS_INFO("Writing snapshot to %s", req.filename.c_str()); + } + + // write queue + try + { + for (MessageQueue::range_t::first_type msg_it = range.first; msg_it != range.second; ++msg_it) + { + SnapshotMessage const& msg = *msg_it; + bag.write(topic, msg.time, msg.msg, msg.connection_header); + } + } + catch (rosbag::BagException const& err) + { + res.success = false; + res.message = string("failed to write bag: ") + err.what(); + } + return true; +} + +bool Snapshotter::triggerSnapshotCb(rosbag_msgs::TriggerSnapshot::Request& req, + rosbag_msgs::TriggerSnapshot::Response& res) +{ + if (not postfixFilename(req.filename)) + { + res.success = false; + res.message = "invalid"; + return true; + } + bool recording_prior; // Store if we were recording prior to write to restore this state after write + { + boost::upgrade_lock read_lock(state_lock_); + recording_prior = recording_; + if (writing_) + { + res.success = false; + res.message = "Already writing"; + return true; + } + boost::upgrade_to_unique_lock write_lock(read_lock); + if (recording_prior) + pause(); + writing_ = true; + } + + // Ensure that state is updated when function exits, regardlesss of branch path / exception events + BOOST_SCOPE_EXIT(&state_lock_, &writing_, recording_prior, this_) + { + // Clear buffers beacuase time gaps (skipped messages) may have occured while paused + boost::unique_lock write_lock(state_lock_); + // Turn off writing flag and return recording to its state before writing + writing_ = false; + if (recording_prior) + this_->resume(); + } + BOOST_SCOPE_EXIT_END + + // Create bag + Bag bag; + + // Write each selected topic's queue to bag file + if (req.topics.size()) + { + BOOST_FOREACH (std::string& topic, req.topics) + { + // Resolve and clean topic + try + { + topic = ros::names::resolve(nh_.getNamespace(), topic); + } + catch (ros::InvalidNameException const& err) + { + ROS_WARN("Requested topic %s is invalid, skipping.", topic.c_str()); + continue; + } + + // Find the message queue for this topic if it exsists + buffers_t::iterator found = buffers_.find(topic); + // If topic not found, error and exit + if (found == buffers_.end()) + { + ROS_WARN("Requested topic %s is not subscribed, skipping.", topic.c_str()); + continue; + } + MessageQueue& message_queue = *(*found).second; + if (not writeTopic(bag, message_queue, topic, req, res)) + return true; + } + } + // If topic list empty, record all buffered topics + else + { + BOOST_FOREACH (buffers_t::value_type& pair, buffers_) + { + MessageQueue& message_queue = *(pair.second); + std::string const& topic = pair.first; + if (not writeTopic(bag, message_queue, topic, req, res)) + return true; + } + } + + // If no topics were subscribed/valid/contained data, this is considered a non-success + if (not bag.isOpen()) + { + res.success = false; + res.message = res.NO_DATA; + return true; + } + + res.success = true; + return true; +} + +void Snapshotter::clear() +{ + BOOST_FOREACH (buffers_t::value_type& pair, buffers_) + { + pair.second->clear(); + } +} + +void Snapshotter::pause() +{ + ROS_INFO("Buffering paused"); + recording_ = false; +} + +void Snapshotter::resume() +{ + clear(); + recording_ = true; + ROS_INFO("Buffering resumed and old data cleared."); +} + +bool Snapshotter::enableCB(std_srvs::SetBool::Request& req, std_srvs::SetBool::Response& res) +{ + boost::upgrade_lock read_lock(state_lock_); + if (req.data && writing_) // Cannot enable while writing + { + res.success = false; + res.message = "cannot enable recording while writing."; + return true; + } + // Obtain write lock and update state if requested state is different from current + if (req.data and not recording_) + { + boost::upgrade_to_unique_lock write_lock(read_lock); + resume(); + } + else if (not req.data and recording_) + { + boost::upgrade_to_unique_lock write_lock(read_lock); + pause(); + } + res.success = true; + return true; +} + +void Snapshotter::publishStatus(ros::TimerEvent const& e) +{ + (void)e; // Make your "unused variable" warnings a thing of the past with cast to void! + // Don't bother doing computing if no one is subscribed + if (!status_pub_.getNumSubscribers()) + return; + + // TODO: consider options to make this faster (caching and updating last status, having queues track their own status) + rosbag_msgs::SnapshotStatus msg; + { + boost::shared_lock lock(state_lock_); + msg.enabled = recording_; + } + std::string node_id = ros::this_node::getName(); + BOOST_FOREACH (buffers_t::value_type& pair, buffers_) + { + rosgraph_msgs::TopicStatistics status; + status.node_sub = node_id; + status.topic = pair.first; + pair.second->fillStatus(status); + msg.topics.push_back(status); + } + + status_pub_.publish(msg); +} + +int Snapshotter::run() +{ + if (!nh_.ok()) + return 0; + + // Create the queue for each topic and set up the subscriber to add to it on new messages + BOOST_FOREACH (SnapshotterOptions::topics_t::value_type& pair, options_.topics_) + { + string topic = ros::names::resolve(nh_.getNamespace(), pair.first); + fixTopicOptions(pair.second); + shared_ptr queue; + queue.reset(new MessageQueue(pair.second)); + std::pair res = buffers_.insert(buffers_t::value_type(topic, queue)); + ROS_ASSERT_MSG(res.second, "failed to add %s to topics. Perhaps it is a duplicate?", topic.c_str()); + subscribe(topic, queue); + } + + // Now that subscriptions are setup, setup service servers for writing and pausing + trigger_snapshot_server_ = nh_.advertiseService("trigger_snapshot", &Snapshotter::triggerSnapshotCb, this); + enable_server_ = nh_.advertiseService("enable_snapshot", &Snapshotter::enableCB, this); + + // Start timer to publish status regularly + if (options_.status_period_ > ros::Duration(0)) + status_timer_ = nh_.createTimer(options_.status_period_, &Snapshotter::publishStatus, this); + + // Use multiple callback threads + ros::MultiThreadedSpinner spinner(4); // Use 4 threads + spinner.spin(); // spin() will not return until the node has been shutdown + return 0; +} + +SnapshotterClient::SnapshotterClient() +{ +} + +int SnapshotterClient::run(SnapshotterClientOptions const& opts) +{ + if (opts.action_ == SnapshotterClientOptions::TRIGGER_WRITE) + { + ros::ServiceClient client = nh_.serviceClient("trigger_snapshot"); + if (not client.exists()) + { + ROS_ERROR("Service %s does not exist. Is snapshot running in this namespace?", "trigger_snapshot"); + return 1; + } + rosbag_msgs::TriggerSnapshotRequest req; + req.topics = opts.topics_; + // Prefix mode + if (opts.filename_.empty()) + { + req.filename = opts.prefix_; + size_t ind = req.filename.rfind(".bag"); + if (ind != string::npos && ind == req.filename.size() - 4) + req.filename.erase(ind); + } + else + { + req.filename = opts.filename_; + size_t ind = req.filename.rfind(".bag"); + if (ind == string::npos || ind != req.filename.size() - 4) + req.filename += ".bag"; + } + + // Resolve filename relative to clients working directory to avoid confusion + if (req.filename.empty()) // Special case of no specified file, ensure still in working directory of client + req.filename = "./"; + boost::filesystem::path p(boost::filesystem::system_complete(req.filename)); + req.filename = p.string(); + + rosbag_msgs::TriggerSnapshotResponse res; + if (not client.call(req, res)) + { + ROS_ERROR("Failed to call service"); + return 1; + } + if (not res.success) + { + ROS_ERROR("%s", res.message.c_str()); + return 1; + } + return 0; + } + else if (opts.action_ == SnapshotterClientOptions::PAUSE || opts.action_ == SnapshotterClientOptions::RESUME) + { + ros::ServiceClient client = nh_.serviceClient("enable_snapshot"); + if (not client.exists()) + { + ROS_ERROR("Service %s does not exist. Is snapshot running in this namespace?", "enable_snapshot"); + return 1; + } + std_srvs::SetBoolRequest req; + req.data = (opts.action_ == SnapshotterClientOptions::RESUME); + std_srvs::SetBoolResponse res; + if (not client.call(req, res)) + { + ROS_ERROR("Failed to call service."); + return 1; + } + if (not res.success) + { + ROS_ERROR("%s", res.message.c_str()); + return 1; + } + return 0; + } + else + { + ROS_ASSERT_MSG(false, "Invalid value of enum."); + return 1; + } +} + +} // namespace rosbag diff --git a/tools/rosbag_msgs/CMakeLists.txt b/tools/rosbag_msgs/CMakeLists.txt new file mode 100644 index 0000000000..0724ce38f1 --- /dev/null +++ b/tools/rosbag_msgs/CMakeLists.txt @@ -0,0 +1,11 @@ +cmake_minimum_required(VERSION 2.8.3) +project(rosbag_msgs) + +find_package(catkin REQUIRED COMPONENTS message_generation rosgraph_msgs) + +add_message_files(DIRECTORY msg FILES SnapshotStatus.msg) +add_service_files(DIRECTORY srv FILES TriggerSnapshot.srv) + +generate_messages(DEPENDENCIES rosgraph_msgs) + +catkin_package(CATKIN_DEPENDS message_runtime rosgraph_msgs) diff --git a/tools/rosbag_msgs/msg/SnapshotStatus.msg b/tools/rosbag_msgs/msg/SnapshotStatus.msg new file mode 100644 index 0000000000..e78bb60b1c --- /dev/null +++ b/tools/rosbag_msgs/msg/SnapshotStatus.msg @@ -0,0 +1,8 @@ +# Describes the current status of a running 'rosbag snapshot' node + +# Information about the buffer of subscribed topics +# The period, stamp, node_pub, and dropped_msgs fields are left empty +rosgraph_msgs/TopicStatistics[] topics +# If true, new messages are being added to the snapshot buffers +# If false, snapshoter is currently paused or writing to a bag file +bool enabled diff --git a/tools/rosbag_msgs/package.xml b/tools/rosbag_msgs/package.xml new file mode 100644 index 0000000000..49cad73ad1 --- /dev/null +++ b/tools/rosbag_msgs/package.xml @@ -0,0 +1,14 @@ + + rosbag_msgs + 1.0.0 + Service and message definitions for rosbag + Dirk Thomas + BSD + Kevin Allen + catkin + + message_generation + rosgraph_msgs + message_runtime + rosgraph_msgs + diff --git a/tools/rosbag_msgs/srv/TriggerSnapshot.srv b/tools/rosbag_msgs/srv/TriggerSnapshot.srv new file mode 100644 index 0000000000..e26ac6029c --- /dev/null +++ b/tools/rosbag_msgs/srv/TriggerSnapshot.srv @@ -0,0 +1,18 @@ +string filename # Name of bag file to save snapshot to. + # if it ends in .bag, this exact filename will be saved. + # otherwise, the current datatime and .bag will be appended to the name + # example: "test" -> test2018-05-23-10-04-20.bag + # example: "test.bag" -> test.bag +string[] topics # List of topics to include in snapshot. + # If empty, all buffered topics will be written +time start_time # Earliest timestamp for a message to be included in written bag + # If time(0), start at the ealiest message in each topic's buffer +time stop_time # Latest timestamp for a message to be includued in written bag + # If time(0), stop at the most recent message in each topic's buffer + +--- + +bool success # True if snapshot succesfully written to disk + +string NO_DATA=no messages buffered on selected topics +string message # Error description if failed