Skip to content
Closed
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion ros2bag/ros2bag/verb/play.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,9 @@ def add_arguments(self, parser, cli_name): # noqa: D102
help='size of message queue rosbag tries to hold in memory to help deterministic '
'playback. Larger size will result in larger memory needs but might prevent '
'delay of message playback.')
parser.add_argument(
'-e', '--exclude-topics', nargs='*', default=[], help='topics which are not played from the bag file')


def main(self, *, args): # noqa: D102
bag_file = args.bag_file
Expand All @@ -43,4 +46,5 @@ def main(self, *, args): # noqa: D102
uri=bag_file,
storage_id=args.storage,
node_prefix=NODE_NAME_PREFIX,
read_ahead_queue_size=args.read_ahead_queue_size)
read_ahead_queue_size=args.read_ahead_queue_size,
exclude_topics=args.exclude_topics)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The check for unique topic names can be done in python. I guess that's a bit easier and avoids the need of a std::set.
Something like exclude_topics = list(set(args.exclude_topics))

Copy link
Author

@danieldube danieldube May 13, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For me a set would be the natural choice of container. I guess at this point it doesn't make a difference, but the idea is to have a O(log n) complexity for excluding topic names.

Using a set on python side, wouldn't guarantee, that the API isn't misused from Python side in the future. Using a sorted vector and binary search for the topic lookup bloats the code. @Karsten1987, are you sure you want to have a std::vector instead of a std::set?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not completely certain that I follow correctly. The idea to filter out duplicates in Python is really just a convenience thing because it's a one-liner in Python. That allows us further to pass in a vector with unique topic names in C++.
I am not so much concerned about a misuse from Python side as the current c++ file relies on receiving PyObjects.

Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,11 @@ namespace rosbag2_transport
struct PlayOptions
{
public:
using TopicFilter = std::function<bool(const std::string& topic)>;

size_t read_ahead_queue_size;
std::string node_prefix = "";
TopicFilter topic_filter = [](const std::string&){return false;};
};

} // namespace rosbag2_transport
Expand Down
21 changes: 14 additions & 7 deletions rosbag2_transport/src/rosbag2_transport/player.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -54,14 +54,14 @@ bool Player::is_storage_completely_loaded() const

void Player::play(const PlayOptions & options)
{
prepare_publishers();
prepare_publishers(options);

storage_loading_future_ = std::async(std::launch::async,
[this, options]() {load_storage_content(options);});

wait_for_filled_queue(options);

play_messages_from_queue();
play_messages_from_queue(options);
}

void Player::wait_for_filled_queue(const PlayOptions & options) const
Expand Down Expand Up @@ -114,36 +114,43 @@ void Player::enqueue_up_to_boundary(const TimePoint & time_first_message, uint64
}
}

void Player::play_messages_from_queue()
void Player::play_messages_from_queue(const PlayOptions & options)
{
start_time_ = std::chrono::system_clock::now();
do {
play_messages_until_queue_empty();
play_messages_until_queue_empty(options);
if (!is_storage_completely_loaded() && rclcpp::ok()) {
ROSBAG2_TRANSPORT_LOG_WARN("Message queue starved. Messages will be delayed. Consider "
"increasing the --read-ahead-queue-size option.");
}
} while (!is_storage_completely_loaded() && rclcpp::ok());
}

void Player::play_messages_until_queue_empty()
void Player::play_messages_until_queue_empty(const PlayOptions & options)
{
ReplayableMessage message;
while (message_queue_.try_dequeue(message) && rclcpp::ok()) {
if (options.topic_filter(message.message->topic_name))
continue;
std::this_thread::sleep_until(start_time_ + message.time_since_start);
if (rclcpp::ok()) {
publishers_[message.message->topic_name]->publish(message.message->serialized_data);
}
}
}

void Player::prepare_publishers()
void Player::prepare_publishers(const PlayOptions & options)
{
auto topics = reader_->get_all_topics_and_types();

for (const auto & topic : topics) {
if (options.topic_filter(topic.name))
{
ROSBAG2_TRANSPORT_LOG_INFO("Excluding topic %s", topic.name.c_str());
continue;
}
publishers_.insert(std::make_pair(
topic.name, rosbag2_transport_->create_generic_publisher(topic.name, topic.type)));
}
}

} // namespace rosbag2_transport
6 changes: 3 additions & 3 deletions rosbag2_transport/src/rosbag2_transport/player.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -54,9 +54,9 @@ class Player
bool is_storage_completely_loaded() const;
void enqueue_up_to_boundary(const TimePoint & time_first_message, uint64_t boundary);
void wait_for_filled_queue(const PlayOptions & options) const;
void play_messages_from_queue();
void play_messages_until_queue_empty();
void prepare_publishers();
void play_messages_from_queue(const PlayOptions & options);
void play_messages_until_queue_empty(const PlayOptions & options);
void prepare_publishers(const PlayOptions & options);

static constexpr double read_ahead_lower_bound_percentage_ = 0.9;
static const std::chrono::milliseconds queue_read_wait_period_;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
#include <chrono>
#include <string>
#include <vector>
#include <set>

#include "rosbag2_transport/rosbag2_transport.hpp"
#include "rosbag2_transport/record_options.hpp"
Expand Down Expand Up @@ -102,18 +103,21 @@ rosbag2_transport_play(PyObject * Py_UNUSED(self), PyObject * args, PyObject * k
"storage_id",
"node_prefix",
"read_ahead_queue_size",
"exclude_topics",
nullptr
};

char * uri;
char * storage_id;
char * node_prefix;
PyObject * exclude_topics = nullptr;
size_t read_ahead_queue_size;
if (!PyArg_ParseTupleAndKeywords(args, kwargs, "sss|k", const_cast<char **>(kwlist),
if (!PyArg_ParseTupleAndKeywords(args, kwargs, "sss|kO", const_cast<char **>(kwlist),
&uri,
&storage_id,
&node_prefix,
&read_ahead_queue_size))
&read_ahead_queue_size,
&exclude_topics))
{
return nullptr;
}
Expand All @@ -124,6 +128,32 @@ rosbag2_transport_play(PyObject * Py_UNUSED(self), PyObject * args, PyObject * k
play_options.node_prefix = std::string(node_prefix);
play_options.read_ahead_queue_size = read_ahead_queue_size;

std::set<std::string> exclude_topics_list;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Given the command that the topic names could be filtered for uniqueness on the python side, I believe it makes sense to parse the topics as a vector and reuse the code for converting the PyObject to a vector from the record function. I believe this conversion can be excluded in a separate function.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Refactored the PyObject parser code into a generic parse_python_list() function.

if (exclude_topics) {
PyObject * topic_exclude_iterator = PyObject_GetIter(exclude_topics);
if (topic_exclude_iterator != nullptr) {
PyObject * topic;
while ((topic = PyIter_Next(topic_exclude_iterator))) {
exclude_topics_list.insert(PyUnicode_AsUTF8(topic));

Py_DECREF(topic);
}
Py_DECREF(topic_exclude_iterator);
}

if (exclude_topics_list.empty() == false) {
auto topic_filter_function = [exclude_topics_list](const std::string& topic)
{
auto entry = exclude_topics_list.find(topic);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could this block just be replaced by
return (exclude_topics_list.find(topic) != exclude_topics_list.end());

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would argue, that the current implementation is more readable and therefore more expressive than putting everything into one line. However, if you like to change this, I'll do it.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought to believe that it's easier for the compiler to inline and optimize this as there is no if and thus no branch prediction and stuff might happen. But it's solely premature optimization thinking :)

if (entry == exclude_topics_list.end())
return false;
return true;
};

play_options.topic_filter = topic_filter_function;
}
}

rosbag2_transport::Rosbag2Transport transport;
transport.init();
transport.play(storage_options, play_options);
Expand Down