Skip to content

Commit 0262682

Browse files
authored
Configurable max_msg_size (#79)
Upper layer can pass in params.max_message_size_ to specified the max message size the GRPC server will accept. UT with params.max_message_size_ set to 65MB and SEND_DATA up to 4MB. SEND_DATA with 64MB is not stable as raft hb timeout can be easily triggered. Signed-off-by: Xiaoxi Chen <[email protected]>
1 parent 009f47c commit 0262682

7 files changed

Lines changed: 51 additions & 10 deletions

File tree

conanfile.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@
1010

1111
class NuRaftMesgConan(ConanFile):
1212
name = "nuraft_mesg"
13-
version = "3.3.1"
13+
version = "3.3.2"
1414

1515
homepage = "https://github.com/eBay/nuraft_mesg"
1616
description = "A gRPC service for NuRAFT"

include/nuraft_mesg/nuraft_mesg.hpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,7 @@ class Manager {
6464
std::string ssl_cert_;
6565
std::shared_ptr< sisl::GrpcTokenVerifier > token_verifier_{nullptr};
6666
std::shared_ptr< sisl::GrpcTokenClient > token_client_{nullptr};
67+
uint32_t max_message_size_{0};
6768
};
6869
using group_params = nuraft::raft_params;
6970
virtual ~Manager() = default;

src/lib/manager_impl.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -96,7 +96,7 @@ void ManagerImpl::restart_server() {
9696
_grpc_server.reset();
9797
_grpc_server = std::unique_ptr< sisl::GrpcServer >(sisl::GrpcServer::make(
9898
listen_address, start_params_.token_verifier_, NURAFT_MESG_CONFIG(grpc_server_thread_cnt),
99-
start_params_.ssl_key_, start_params_.ssl_cert_));
99+
start_params_.ssl_key_, start_params_.ssl_cert_, start_params_.max_message_size_));
100100
_mesg_service->associate(_grpc_server.get());
101101

102102
_grpc_server->run();

src/tests/data_service_tests.cpp

Lines changed: 28 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ class DataServiceFixture : public MessagingFixtureBase {
66
void SetUp() override {
77
MessagingFixtureBase::SetUp();
88
start(true);
9-
test_state_mgr::fill_data_vec(cli_buf);
9+
test_state_mgr::fill_data_vec(cli_buf, 8);
1010
}
1111

1212
void TearDown() override {
@@ -94,6 +94,7 @@ TEST_F(DataServiceFixture, BasicTest1) {
9494
return folly::Unit();
9595
}));
9696

97+
9798
auto repl_ctx1 = sm1->get_repl_context();
9899
for (auto svr : repl_ctx1->_server->get_config()->get_servers()) {
99100
if (svr->get_endpoint() == to_string(app_1_->id_)) continue;
@@ -107,6 +108,30 @@ TEST_F(DataServiceFixture, BasicTest1) {
107108

108109
folly::collectAll(results).via(folly::getGlobalCPUExecutor()).get();
109110

111+
// test big message
112+
LOGINFO("Starting large object write test")
113+
io_blob_list_t big_cli_buf;
114+
test_state_mgr::fill_data_vec_big(big_cli_buf, 4 * 1024 * 1024);
115+
sm1->data_service_request_unidirectional(nuraft_mesg::role_regex::ALL, SEND_DATA, big_cli_buf)
116+
.deferValue([](auto e) -> NullResult {
117+
EXPECT_TRUE(e.hasValue());
118+
return folly::Unit();
119+
}).get();
120+
LOGINFO("End large object write test")
121+
LOGINFO("Starting large object read test")
122+
123+
sm4_1->data_service_request_bidirectional(nuraft_mesg::role_regex::LEADER, REQUEST_DATA, big_cli_buf)
124+
.deferValue([](auto e) -> NullResult {
125+
EXPECT_TRUE(e.hasValue());
126+
test_state_mgr::verify_data(e.value().response_blob());
127+
return folly::Unit();
128+
}).get();
129+
LOGINFO("End large object read test")
130+
for (auto& buf : big_cli_buf) {
131+
buf.buf_free();
132+
}
133+
134+
110135
// add a new member to data_service_test_group and check if repl_ctx4 sends data to newly added member
111136
auto add_3 = app_4->instance_->add_member(data_group, app_3_->id_);
112137
std::this_thread::sleep_for(std::chrono::seconds(1));
@@ -119,9 +144,9 @@ TEST_F(DataServiceFixture, BasicTest1) {
119144
.get();
120145

121146
// TODO REVIEW THIS
122-
// test_group: 4 (1 SEND_DATA) + 5 (1 REQUEST_DATA) + 1 (SEND_DATA to a peer) = 10
147+
// test_group: 4 (2 * 1 SEND_DATA) + 6 (1 REQUEST_DATA) + 1 (SEND_DATA to a peer) = 15
123148
// data_service_test_group: 1 (1 REQUEST_DATA) + 4 (1 SEND_DATA) = 5
124-
EXPECT_EQ(test_state_mgr::get_server_counter(), 15);
149+
EXPECT_EQ(test_state_mgr::get_server_counter(), 20);
125150
app_5->instance_->leave_group(data_group);
126151
app_5->instance_->leave_group(group_id_);
127152
app_4->instance_->leave_group(data_group);

src/tests/test_fixture.ipp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -88,6 +88,7 @@ public:
8888
params.server_uuid_ = id_;
8989
params.mesg_port_ = port_;
9090
params.default_group_type_ = "test_type";
91+
params.max_message_size_ = 65 * 1024 * 1024;
9192
instance_ = init_messaging(params, weak_from_this(), data_svc_enabled);
9293
auto r_params = nuraft::raft_params()
9394
.with_election_timeout_lower(elect_to_low)

src/tests/test_state_manager.cpp

Lines changed: 17 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -215,12 +215,25 @@ void test_state_mgr::verify_data(sisl::io_blob const& buf) {
215215
}
216216
}
217217

218-
void test_state_mgr::fill_data_vec(nuraft_mesg::io_blob_list_t& cli_buf) {
219-
static int const data_size{8};
220-
for (int i = 0; i < data_size; i++) {
218+
void test_state_mgr::fill_data_vec_big(nuraft_mesg::io_blob_list_t& cli_buf, uint32_t size_bytes) {
219+
auto cnt = size_bytes / sizeof(uint32_t);
220+
sisl::io_blob data(size_bytes);
221+
data_vec.clear();
222+
uint32_t* const write_buf{reinterpret_cast< uint32_t* >(data.bytes())};
223+
for (uint32_t i = 0; i < cnt; i++) {
224+
data_vec.emplace_back(i);
225+
write_buf[i] = data_vec.back();
226+
}
227+
cli_buf.emplace_back(data);
228+
}
229+
230+
void test_state_mgr::fill_data_vec(nuraft_mesg::io_blob_list_t& cli_buf, uint32_t size_bytes) {
231+
auto cnt = size_bytes / sizeof(uint32_t);
232+
data_vec.clear();
233+
for (uint32_t i = 0; i < cnt; i++) {
221234
cli_buf.emplace_back(sizeof(uint32_t));
222235
uint32_t* const write_buf{reinterpret_cast< uint32_t* >(cli_buf[i].bytes())};
223-
data_vec.emplace_back(get_random_num());
236+
data_vec.emplace_back(i);
224237
*write_buf = data_vec.back();
225238
}
226239
}

src/tests/test_state_manager.h

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,8 @@ class test_state_mgr : public nuraft_mesg::mesg_state_mgr {
5454
nuraft_mesg::io_blob_list_t const& cli_buf);
5555

5656
bool register_data_service_apis(nuraft_mesg::Manager* messaging);
57-
static void fill_data_vec(nuraft_mesg::io_blob_list_t& cli_buf);
57+
static void fill_data_vec(nuraft_mesg::io_blob_list_t& cli_buf, uint32_t size_bytes);
58+
static void fill_data_vec_big(nuraft_mesg::io_blob_list_t& cli_buf, uint32_t size_bytes);
5859
static uint16_t get_random_num();
5960
static uint32_t get_server_counter();
6061
static void verify_data(sisl::io_blob const& buf);

0 commit comments

Comments
 (0)