diff --git a/src/rdkafka.h b/src/rdkafka.h index 42a1733bda..29ab00e111 100644 --- a/src/rdkafka.h +++ b/src/rdkafka.h @@ -5610,6 +5610,8 @@ typedef int rd_kafka_event_type_t; #define RD_KAFKA_EVENT_LISTOFFSETS_RESULT 0x400000 /** ElectLeaders_result_t */ #define RD_KAFKA_EVENT_ELECTLEADERS_RESULT 0x800000 +/** DescribeLogDirs_result_t */ +#define RD_KAFKA_EVENT_DESCRIBELOGDIRS_RESULT 0x1000000 /** * @returns the event type for the given event. @@ -5894,6 +5896,8 @@ typedef rd_kafka_event_t rd_kafka_AlterUserScramCredentials_result_t; typedef rd_kafka_event_t rd_kafka_ListOffsets_result_t; /*! ElectLeaders result type */ typedef rd_kafka_event_t rd_kafka_ElectLeaders_result_t; +/*! DescribeLogDirs result type */ +typedef rd_kafka_event_t rd_kafka_DescribeLogDirs_result_t; /** * @brief Get CreateTopics result. @@ -7118,6 +7122,7 @@ typedef enum rd_kafka_admin_op_t { RD_KAFKA_ADMIN_OP_DESCRIBECLUSTER, /**< DescribeCluster */ RD_KAFKA_ADMIN_OP_LISTOFFSETS, /**< ListOffsets */ RD_KAFKA_ADMIN_OP_ELECTLEADERS, /**< ElectLeaders */ + RD_KAFKA_ADMIN_OP_DESCRIBELOGDIRS, /**< DescribeLogDirs */ RD_KAFKA_ADMIN_OP__CNT /**< Number of ops defined */ } rd_kafka_admin_op_t; @@ -10140,6 +10145,161 @@ rd_kafka_ElectLeaders_result_partitions( /**@}*/ +/** + * @name Admin API - Describe Log Dirs + * @{ + * + * + * + */ + +/** + * @brief Opaque type for a log directory description. + */ +typedef struct rd_kafka_LogDirDescription_s rd_kafka_LogDirDescription_t; + +/** + * @brief Opaque type for a log directory topic description. + */ +typedef struct rd_kafka_LogDirTopicDescription_s + rd_kafka_LogDirTopicDescription_t; + +/** + * @brief Opaque type for a log directory partition description. + */ +typedef struct rd_kafka_LogDirPartitionDescription_s + rd_kafka_LogDirPartitionDescription_t; + +/** + * @brief Describe log directories on a broker. + * + * @param rk Client instance. + * @param topics Topic-partition list to filter results, or NULL to describe + * all topics on the target broker. + * @param options Optional admin options, or NULL for defaults. + * Use rd_kafka_AdminOptions_set_broker() to target a specific broker. + * Defaults to controller. + * @param rkqu Queue to emit result on. + * + * Supported admin options: + * - rd_kafka_AdminOptions_set_request_timeout() - default socket.timeout.ms. + * Controls how long \c rdkafka will wait for the request to complete. + * - rd_kafka_AdminOptions_set_broker() - target a specific broker. + * + * @remark The result event type emitted on the supplied queue is of type + * \c RD_KAFKA_EVENT_DESCRIBELOGDIRS_RESULT + */ +RD_EXPORT void rd_kafka_DescribeLogDirs( + rd_kafka_t *rk, + const rd_kafka_topic_partition_list_t *topics, + const rd_kafka_AdminOptions_t *options, + rd_kafka_queue_t *rkqu); + +/** + * @brief Get the array of log dir description objects from the + * DescribeLogDirs result event. + * + * @param result The DescribeLogDirs result. + * @param cntp Is updated to the number of elements in the array. + * + * @returns the array of log dir descriptions. + */ +RD_EXPORT const rd_kafka_LogDirDescription_t ** +rd_kafka_DescribeLogDirs_result_descriptions( + const rd_kafka_DescribeLogDirs_result_t *result, + size_t *cntp); + +/** + * @brief Get the DescribeLogDirs result from an event. + * + * @param rkev Event of type \c RD_KAFKA_EVENT_DESCRIBELOGDIRS_RESULT. + * + * @returns the result, or NULL if event is of different type. + */ +RD_EXPORT const rd_kafka_DescribeLogDirs_result_t * +rd_kafka_event_DescribeLogDirs_result(rd_kafka_event_t *rkev); + +/** + * @brief Get the error code from a log dir description. + */ +RD_EXPORT rd_kafka_resp_err_t +rd_kafka_LogDirDescription_error_code( + const rd_kafka_LogDirDescription_t *logdir); + +/** + * @brief Get the log directory path from a log dir description. + */ +RD_EXPORT const char * +rd_kafka_LogDirDescription_log_dir( + const rd_kafka_LogDirDescription_t *logdir); + +/** + * @brief Get the array of topic descriptions from a log dir description. + * + * @param logdir The log dir description. + * @param cntp Is updated to the number of topics. + * + * @returns the array of topic descriptions. + */ +RD_EXPORT const rd_kafka_LogDirTopicDescription_t ** +rd_kafka_LogDirDescription_topics( + const rd_kafka_LogDirDescription_t *logdir, + size_t *cntp); + +/** + * @brief Get the topic name from a log dir topic description. + */ +RD_EXPORT const char * +rd_kafka_LogDirTopicDescription_topic( + const rd_kafka_LogDirTopicDescription_t *topic_desc); + +/** + * @brief Get the array of partition descriptions from a log dir topic + * description. + * + * @param topic_desc The topic description. + * @param cntp Is updated to the number of partitions. + * + * @returns the array of partition descriptions. + */ +RD_EXPORT const rd_kafka_LogDirPartitionDescription_t ** +rd_kafka_LogDirTopicDescription_partitions( + const rd_kafka_LogDirTopicDescription_t *topic_desc, + size_t *cntp); + +/** + * @brief Get the partition index from a log dir partition description. + */ +RD_EXPORT int32_t +rd_kafka_LogDirPartitionDescription_partition( + const rd_kafka_LogDirPartitionDescription_t *partition_desc); + +/** + * @brief Get the partition size in bytes from a log dir partition description. + */ +RD_EXPORT int64_t +rd_kafka_LogDirPartitionDescription_size( + const rd_kafka_LogDirPartitionDescription_t *partition_desc); + +/** + * @brief Get the offset lag from a log dir partition description. + */ +RD_EXPORT int64_t +rd_kafka_LogDirPartitionDescription_offset_lag( + const rd_kafka_LogDirPartitionDescription_t *partition_desc); + +/** + * @brief Get the is_future flag from a log dir partition description. + * + * @returns 1 if this is a future log directory (i.e., the partition is being + * moved to this log dir), 0 otherwise. + */ +RD_EXPORT int +rd_kafka_LogDirPartitionDescription_is_future( + const rd_kafka_LogDirPartitionDescription_t *partition_desc); + +/**@}*/ + /** * @name Security APIs * @{ diff --git a/src/rdkafka_admin.c b/src/rdkafka_admin.c index 7986371afe..d27bc9420f 100644 --- a/src/rdkafka_admin.c +++ b/src/rdkafka_admin.c @@ -9809,3 +9809,290 @@ void rd_kafka_ElectLeaders(rd_kafka_t *rk, } /**@}*/ + +/** + * @name DescribeLogDirs + * @{ + */ + +static rd_kafka_LogDirPartitionDescription_t * +rd_kafka_LogDirPartitionDescription_new(int32_t partition, + int64_t size, + int64_t offset_lag, + rd_bool_t is_future) { + rd_kafka_LogDirPartitionDescription_t *part; + part = rd_calloc(1, sizeof(*part)); + part->partition = partition; + part->size = size; + part->offset_lag = offset_lag; + part->is_future = is_future; + return part; +} + +static void +rd_kafka_LogDirPartitionDescription_destroy( + rd_kafka_LogDirPartitionDescription_t *part) { + rd_free(part); +} + +static rd_kafka_LogDirTopicDescription_t * +rd_kafka_LogDirTopicDescription_new(const char *topic, + int partition_cnt) { + rd_kafka_LogDirTopicDescription_t *topic_desc; + topic_desc = rd_calloc(1, sizeof(*topic_desc)); + topic_desc->topic = rd_strdup(topic); + topic_desc->partition_cnt = partition_cnt; + if (partition_cnt > 0) + topic_desc->partitions = rd_calloc( + partition_cnt, sizeof(*topic_desc->partitions)); + return topic_desc; +} + +static void +rd_kafka_LogDirTopicDescription_destroy( + rd_kafka_LogDirTopicDescription_t *topic_desc) { + int i; + for (i = 0; i < topic_desc->partition_cnt; i++) + rd_kafka_LogDirPartitionDescription_destroy( + topic_desc->partitions[i]); + rd_free(topic_desc->partitions); + rd_free(topic_desc->topic); + rd_free(topic_desc); +} + +static rd_kafka_LogDirDescription_t * +rd_kafka_LogDirDescription_new(rd_kafka_resp_err_t error_code, + const char *log_dir, + int topic_cnt) { + rd_kafka_LogDirDescription_t *logdir; + logdir = rd_calloc(1, sizeof(*logdir)); + logdir->error_code = error_code; + logdir->log_dir = rd_strdup(log_dir); + logdir->topic_cnt = topic_cnt; + if (topic_cnt > 0) + logdir->topics = + rd_calloc(topic_cnt, sizeof(*logdir->topics)); + return logdir; +} + +static void +rd_kafka_LogDirDescription_destroy(rd_kafka_LogDirDescription_t *logdir) { + int i; + for (i = 0; i < logdir->topic_cnt; i++) + rd_kafka_LogDirTopicDescription_destroy(logdir->topics[i]); + rd_free(logdir->topics); + rd_free(logdir->log_dir); + rd_free(logdir); +} + +static void rd_kafka_LogDirDescription_free(void *ptr) { + rd_kafka_LogDirDescription_destroy(ptr); +} + +/* Public accessors */ + +rd_kafka_resp_err_t +rd_kafka_LogDirDescription_error_code( + const rd_kafka_LogDirDescription_t *logdir) { + return logdir->error_code; +} + +const char * +rd_kafka_LogDirDescription_log_dir( + const rd_kafka_LogDirDescription_t *logdir) { + return logdir->log_dir; +} + +const rd_kafka_LogDirTopicDescription_t ** +rd_kafka_LogDirDescription_topics( + const rd_kafka_LogDirDescription_t *logdir, + size_t *cntp) { + *cntp = logdir->topic_cnt; + return (const rd_kafka_LogDirTopicDescription_t **)logdir->topics; +} + +const char * +rd_kafka_LogDirTopicDescription_topic( + const rd_kafka_LogDirTopicDescription_t *topic_desc) { + return topic_desc->topic; +} + +const rd_kafka_LogDirPartitionDescription_t ** +rd_kafka_LogDirTopicDescription_partitions( + const rd_kafka_LogDirTopicDescription_t *topic_desc, + size_t *cntp) { + *cntp = topic_desc->partition_cnt; + return (const rd_kafka_LogDirPartitionDescription_t **) + topic_desc->partitions; +} + +int32_t +rd_kafka_LogDirPartitionDescription_partition( + const rd_kafka_LogDirPartitionDescription_t *partition_desc) { + return partition_desc->partition; +} + +int64_t +rd_kafka_LogDirPartitionDescription_size( + const rd_kafka_LogDirPartitionDescription_t *partition_desc) { + return partition_desc->size; +} + +int64_t +rd_kafka_LogDirPartitionDescription_offset_lag( + const rd_kafka_LogDirPartitionDescription_t *partition_desc) { + return partition_desc->offset_lag; +} + +int +rd_kafka_LogDirPartitionDescription_is_future( + const rd_kafka_LogDirPartitionDescription_t *partition_desc) { + return partition_desc->is_future ? 1 : 0; +} + +const rd_kafka_LogDirDescription_t ** +rd_kafka_DescribeLogDirs_result_descriptions( + const rd_kafka_DescribeLogDirs_result_t *result, + size_t *cntp) { + *cntp = rd_list_cnt(&result->rko_u.admin_result.results); + return (const rd_kafka_LogDirDescription_t **) + result->rko_u.admin_result.results.rl_elems; +} + +/** + * @brief Parse DescribeLogDirsResponse and create ADMIN_RESULT op. + */ +static rd_kafka_resp_err_t +rd_kafka_DescribeLogDirsResponse_parse(rd_kafka_op_t *rko_req, + rd_kafka_op_t **rko_resultp, + rd_kafka_buf_t *reply, + char *errstr, + size_t errstr_size) { + const int log_decode_errors = LOG_ERR; + rd_kafka_op_t *rko_result = NULL; + int32_t LogDirArrayCnt; + int i; + + rd_kafka_buf_read_throttle_time(reply); + + /* #LogDirs */ + rd_kafka_buf_read_arraycnt(reply, &LogDirArrayCnt, 10000); + + rko_result = rd_kafka_admin_result_new(rko_req); + rd_list_init(&rko_result->rko_u.admin_result.results, + LogDirArrayCnt, rd_kafka_LogDirDescription_free); + + for (i = 0; i < LogDirArrayCnt; i++) { + int16_t error_code; + rd_kafkap_str_t klogdir; + char *log_dir; + int32_t TopicArrayCnt; + rd_kafka_LogDirDescription_t *logdir_desc; + int j; + + rd_kafka_buf_read_i16(reply, &error_code); + rd_kafka_buf_read_str(reply, &klogdir); + RD_KAFKAP_STR_DUPA(&log_dir, &klogdir); + + rd_kafka_buf_read_arraycnt(reply, &TopicArrayCnt, + RD_KAFKAP_TOPICS_MAX); + + logdir_desc = rd_kafka_LogDirDescription_new( + error_code, log_dir, TopicArrayCnt); + + for (j = 0; j < TopicArrayCnt; j++) { + rd_kafkap_str_t ktopic; + char *topic; + int32_t PartArrayCnt; + rd_kafka_LogDirTopicDescription_t *topic_desc; + int k; + + rd_kafka_buf_read_str(reply, &ktopic); + RD_KAFKAP_STR_DUPA(&topic, &ktopic); + + rd_kafka_buf_read_arraycnt(reply, &PartArrayCnt, + RD_KAFKAP_PARTITIONS_MAX); + + topic_desc = rd_kafka_LogDirTopicDescription_new( + topic, PartArrayCnt); + + for (k = 0; k < PartArrayCnt; k++) { + int32_t partition; + int64_t size; + int64_t offset_lag; + rd_bool_t is_future; + + rd_kafka_buf_read_i32(reply, &partition); + rd_kafka_buf_read_i64(reply, &size); + rd_kafka_buf_read_i64(reply, &offset_lag); + rd_kafka_buf_read_bool(reply, &is_future); + + rd_kafka_buf_skip_tags(reply); + + topic_desc->partitions[k] = + rd_kafka_LogDirPartitionDescription_new( + partition, size, offset_lag, is_future); + } + + rd_kafka_buf_skip_tags(reply); + + logdir_desc->topics[j] = topic_desc; + } + + rd_kafka_buf_skip_tags(reply); + + rd_list_add(&rko_result->rko_u.admin_result.results, + logdir_desc); + } + + rd_kafka_buf_skip_tags(reply); + + *rko_resultp = rko_result; + + return RD_KAFKA_RESP_ERR_NO_ERROR; + +err_parse: + if (rko_result) + rd_kafka_op_destroy(rko_result); + + rd_snprintf(errstr, errstr_size, + "DescribeLogDirs response protocol parse failure: %s", + rd_kafka_err2str(reply->rkbuf_err)); + + return reply->rkbuf_err; +} + +static void rd_kafka_DescribeLogDirs_topics_free(void *ptr) { + rd_kafka_topic_partition_list_destroy(ptr); +} + +void rd_kafka_DescribeLogDirs(rd_kafka_t *rk, + const rd_kafka_topic_partition_list_t *topics, + const rd_kafka_AdminOptions_t *options, + rd_kafka_queue_t *rkqu) { + rd_kafka_op_t *rko; + + static const struct rd_kafka_admin_worker_cbs cbs = { + rd_kafka_DescribeLogDirsRequest, + rd_kafka_DescribeLogDirsResponse_parse, + }; + + rd_assert(rkqu); + + rko = rd_kafka_admin_request_op_new( + rk, RD_KAFKA_OP_DESCRIBELOGDIRS, + RD_KAFKA_EVENT_DESCRIBELOGDIRS_RESULT, &cbs, options, + rkqu->rkqu_q); + + rd_list_init(&rko->rko_u.admin_request.args, 1, + rd_kafka_DescribeLogDirs_topics_free); + + if (topics) { + rd_list_add(&rko->rko_u.admin_request.args, + rd_kafka_topic_partition_list_copy(topics)); + } + + rd_kafka_q_enq(rk->rk_ops, rko); +} + +/**@}*/ diff --git a/src/rdkafka_admin.h b/src/rdkafka_admin.h index 8129078bc0..744cc929ce 100644 --- a/src/rdkafka_admin.h +++ b/src/rdkafka_admin.h @@ -655,4 +655,40 @@ typedef struct rd_kafka_ElectLeadersResult_s { /**@}*/ +/** + * @name DescribeLogDirs + * @{ + */ + +/** + * @struct DescribeLogDirs partition result + */ +struct rd_kafka_LogDirPartitionDescription_s { + int32_t partition; + int64_t size; + int64_t offset_lag; + rd_bool_t is_future; +}; + +/** + * @struct DescribeLogDirs topic result + */ +struct rd_kafka_LogDirTopicDescription_s { + char *topic; + rd_kafka_LogDirPartitionDescription_t **partitions; + int partition_cnt; +}; + +/** + * @struct DescribeLogDirs log dir result + */ +struct rd_kafka_LogDirDescription_s { + rd_kafka_resp_err_t error_code; + char *log_dir; + rd_kafka_LogDirTopicDescription_t **topics; + int topic_cnt; +}; + +/**@}*/ + #endif /* _RDKAFKA_ADMIN_H_ */ diff --git a/src/rdkafka_event.c b/src/rdkafka_event.c index 7e8cd200ae..030554e7e4 100644 --- a/src/rdkafka_event.c +++ b/src/rdkafka_event.c @@ -99,6 +99,8 @@ const char *rd_kafka_event_name(const rd_kafka_event_t *rkev) { return "ListOffsetsResult"; case RD_KAFKA_EVENT_ELECTLEADERS_RESULT: return "ElectLeadersResult"; + case RD_KAFKA_EVENT_DESCRIBELOGDIRS_RESULT: + return "DescribeLogDirsResult"; default: return "?unknown?"; } @@ -500,3 +502,12 @@ rd_kafka_event_ElectLeaders_result(rd_kafka_event_t *rkev) { else return (const rd_kafka_ElectLeaders_result_t *)rkev; } + +const rd_kafka_DescribeLogDirs_result_t * +rd_kafka_event_DescribeLogDirs_result(rd_kafka_event_t *rkev) { + if (!rkev || + rkev->rko_evtype != RD_KAFKA_EVENT_DESCRIBELOGDIRS_RESULT) + return NULL; + else + return (const rd_kafka_DescribeLogDirs_result_t *)rkev; +} diff --git a/src/rdkafka_event.h b/src/rdkafka_event.h index cf63e414eb..92ca1c7659 100644 --- a/src/rdkafka_event.h +++ b/src/rdkafka_event.h @@ -118,6 +118,7 @@ static RD_UNUSED RD_INLINE int rd_kafka_event_setup(rd_kafka_t *rk, case RD_KAFKA_EVENT_ALTERUSERSCRAMCREDENTIALS_RESULT: case RD_KAFKA_EVENT_LISTOFFSETS_RESULT: case RD_KAFKA_EVENT_ELECTLEADERS_RESULT: + case RD_KAFKA_EVENT_DESCRIBELOGDIRS_RESULT: return 1; default: diff --git a/src/rdkafka_op.c b/src/rdkafka_op.c index 5dbbf9c9d4..0dd85c52b0 100644 --- a/src/rdkafka_op.c +++ b/src/rdkafka_op.c @@ -123,6 +123,7 @@ const char *rd_kafka_op2str(rd_kafka_op_type_t type) { [RD_KAFKA_OP_TERMINATE_TELEMETRY] = "REPLY:RD_KAFKA_OP_TERMINATE_TELEMETRY", [RD_KAFKA_OP_ELECTLEADERS] = "REPLY:ELECTLEADERS", + [RD_KAFKA_OP_DESCRIBELOGDIRS] = "REPLY:DESCRIBELOGDIRS", }; if (type & RD_KAFKA_OP_REPLY) @@ -287,6 +288,8 @@ rd_kafka_op_t *rd_kafka_op_new0(const char *source, rd_kafka_op_type_t type) { sizeof(rko->rko_u.telemetry_broker), [RD_KAFKA_OP_TERMINATE_TELEMETRY] = _RD_KAFKA_OP_EMPTY, [RD_KAFKA_OP_ELECTLEADERS] = sizeof(rko->rko_u.admin_request), + [RD_KAFKA_OP_DESCRIBELOGDIRS] = + sizeof(rko->rko_u.admin_request), }; size_t tsize = op2size[type & ~RD_KAFKA_OP_FLAGMASK]; @@ -441,6 +444,7 @@ void rd_kafka_op_destroy(rd_kafka_op_t *rko) { case RD_KAFKA_OP_DESCRIBEUSERSCRAMCREDENTIALS: case RD_KAFKA_OP_LISTOFFSETS: case RD_KAFKA_OP_ELECTLEADERS: + case RD_KAFKA_OP_DESCRIBELOGDIRS: rd_kafka_replyq_destroy(&rko->rko_u.admin_request.replyq); rd_list_destroy(&rko->rko_u.admin_request.args); if (rko->rko_u.admin_request.options.match_consumer_group_states diff --git a/src/rdkafka_op.h b/src/rdkafka_op.h index e79309aa02..6077f382de 100644 --- a/src/rdkafka_op.h +++ b/src/rdkafka_op.h @@ -189,6 +189,9 @@ typedef enum { RD_KAFKA_OP_ELECTLEADERS, /**< Admin: * ElectLeaders * u.admin_request */ + RD_KAFKA_OP_DESCRIBELOGDIRS, /**< Admin: + * DescribeLogDirs + * u.admin_request */ RD_KAFKA_OP__END } rd_kafka_op_type_t; diff --git a/src/rdkafka_request.c b/src/rdkafka_request.c index d38101629d..2fe78108b1 100644 --- a/src/rdkafka_request.c +++ b/src/rdkafka_request.c @@ -6119,6 +6119,70 @@ rd_kafka_resp_err_t rd_kafka_ElectLeadersRequest( return RD_KAFKA_RESP_ERR_NO_ERROR; } +/** + * @brief Construct and send DescribeLogDirsRequest to \p rkb. + * + * @param topics list containing a single + * rd_kafka_topic_partition_list_t* element (or empty for all topics). + */ +rd_kafka_resp_err_t rd_kafka_DescribeLogDirsRequest( + rd_kafka_broker_t *rkb, + const rd_list_t *topics, + rd_kafka_AdminOptions_t *options, + char *errstr, + size_t errstr_size, + rd_kafka_replyq_t replyq, + rd_kafka_resp_cb_t *resp_cb, + void *opaque) { + rd_kafka_buf_t *rkbuf; + int16_t ApiVersion; + const rd_kafka_topic_partition_list_t *partitions = NULL; + int rd_buf_size_estimate; + + ApiVersion = rd_kafka_broker_ApiVersion_supported( + rkb, RD_KAFKAP_DescribeLogDirs, 0, 4, NULL); + if (ApiVersion == -1) { + rd_snprintf(errstr, errstr_size, + "DescribeLogDirs Admin API (KIP-113) not supported " + "by broker, requires broker version >= 1.1.0"); + rd_kafka_replyq_destroy(&replyq); + return RD_KAFKA_RESP_ERR__UNSUPPORTED_FEATURE; + } + + if (rd_list_cnt(topics) > 0) + partitions = rd_list_elem(topics, 0); + + rd_buf_size_estimate = 4 /* #Topics array */; + if (partitions) + rd_buf_size_estimate += + (50 + 4) * partitions->cnt; + + rkbuf = rd_kafka_buf_new_flexver_request( + rkb, RD_KAFKAP_DescribeLogDirs, 1, rd_buf_size_estimate, + ApiVersion >= 2); + + if (!partitions) { + /* Null array = describe all topics */ + rd_kafka_buf_write_arraycnt(rkbuf, -1); + } else { + const rd_kafka_topic_partition_field_t fields[] = { + RD_KAFKA_TOPIC_PARTITION_FIELD_PARTITION, + RD_KAFKA_TOPIC_PARTITION_FIELD_END}; + rd_kafka_buf_write_topic_partitions( + rkbuf, partitions, + rd_false /*don't skip invalid offsets*/, + rd_false /* any offset */, + rd_false /* don't use topic_id */, + rd_true /* use topic_names */, fields); + } + + rd_kafka_buf_ApiVersion_set(rkbuf, ApiVersion, 0); + + rd_kafka_broker_buf_enq_replyq(rkb, rkbuf, replyq, resp_cb, opaque); + + return RD_KAFKA_RESP_ERR_NO_ERROR; +} + /** * @brief Construct and send ConsumerGroupDescribe requests * to \p rkb with the groups (const char *) in \p groups. diff --git a/src/rdkafka_request.h b/src/rdkafka_request.h index c508ffdaaf..c932c8a230 100644 --- a/src/rdkafka_request.h +++ b/src/rdkafka_request.h @@ -645,6 +645,16 @@ rd_kafka_resp_err_t rd_kafka_ElectLeadersRequest( rd_kafka_resp_cb_t *resp_cb, void *opaque); +rd_kafka_resp_err_t rd_kafka_DescribeLogDirsRequest( + rd_kafka_broker_t *rkb, + const rd_list_t *topics /*(rd_kafka_topic_partition_list_t*)*/, + rd_kafka_AdminOptions_t *options, + char *errstr, + size_t errstr_size, + rd_kafka_replyq_t replyq, + rd_kafka_resp_cb_t *resp_cb, + void *opaque); + rd_kafka_error_t * rd_kafka_ConsumerGroupDescribeRequest(rd_kafka_broker_t *rkb, char **groups, diff --git a/tests/0080-admin_ut.c b/tests/0080-admin_ut.c index a9f0e1181f..13ee89e867 100644 --- a/tests/0080-admin_ut.c +++ b/tests/0080-admin_ut.c @@ -2377,6 +2377,83 @@ static void do_test_AlterUserScramCredentials(const char *what, SUB_TEST_PASS(); } +/** + * @brief Test DescribeLogDirs + */ +static void do_test_DescribeLogDirs(const char *what, + rd_kafka_t *rk, + rd_kafka_queue_t *useq, + int with_options) { + rd_kafka_queue_t *q; + rd_kafka_AdminOptions_t *options = NULL; + rd_kafka_event_t *rkev; + rd_kafka_resp_err_t err; + const rd_kafka_DescribeLogDirs_result_t *res; + int exp_timeout = MY_SOCKET_TIMEOUT_MS; + test_timing_t timing; + char errstr[512]; + void *my_opaque = NULL, *opaque; + + SUB_TEST_QUICK("%s DescribeLogDirs with %s, timeout %dms", + rd_kafka_name(rk), what, exp_timeout); + + q = useq ? useq : rd_kafka_queue_new(rk); + + if (with_options) { + options = rd_kafka_AdminOptions_new( + rk, RD_KAFKA_ADMIN_OP_DESCRIBELOGDIRS); + + exp_timeout = MY_SOCKET_TIMEOUT_MS * 2; + + err = rd_kafka_AdminOptions_set_request_timeout( + options, exp_timeout, errstr, sizeof(errstr)); + TEST_ASSERT(!err, "%s", rd_kafka_err2str(err)); + + if (useq) { + my_opaque = (void *)99982; + rd_kafka_AdminOptions_set_opaque(options, my_opaque); + } + } + + TIMING_START(&timing, "DescribeLogDirs"); + TEST_SAY("Call DescribeLogDirs, timeout is %dms\n", exp_timeout); + rd_kafka_DescribeLogDirs(rk, NULL /* all topics */, options, q); + TIMING_ASSERT_LATER(&timing, 0, 10); + + /* Poll result queue */ + TIMING_START(&timing, "DescribeLogDirs.queue_poll"); + rkev = rd_kafka_queue_poll(q, exp_timeout + 1000); + TIMING_ASSERT(&timing, exp_timeout - 100, exp_timeout + 100); + TEST_ASSERT(rkev != NULL, "expected result in %dms", exp_timeout); + TEST_SAY("DescribeLogDirs: got %s in %.3fs\n", + rd_kafka_event_name(rkev), + TIMING_DURATION(&timing) / 1000.0f); + + /* Convert event to proper result */ + res = rd_kafka_event_DescribeLogDirs_result(rkev); + TEST_ASSERT(res, "expected DescribeLogDirs_result, not %s", + rd_kafka_event_name(rkev)); + opaque = rd_kafka_event_opaque(rkev); + TEST_ASSERT(opaque == my_opaque, "expected opaque to be %p, not %p", + my_opaque, opaque); + /* Expecting error (no real broker) */ + err = rd_kafka_event_error(rkev); + const char *event_err = rd_kafka_event_error_string(rkev); + TEST_ASSERT(err, "expected DescribeLogDirs to fail"); + TEST_ASSERT(err == RD_KAFKA_RESP_ERR__TIMED_OUT, + "expected RD_KAFKA_RESP_ERR__TIMED_OUT, not %s", + rd_kafka_err2name(err)); + TEST_SAY("DescribeLogDirs error: %s\n", event_err); + rd_kafka_event_destroy(rkev); + + if (options) + rd_kafka_AdminOptions_destroy(options); + if (!useq) + rd_kafka_queue_destroy(q); + + SUB_TEST_PASS(); +} + static void do_test_ElectLeaders(const char *what, rd_kafka_t *rk, rd_kafka_queue_t *useq, @@ -3038,6 +3115,11 @@ static void do_test_apis(rd_kafka_type_t cltype) { do_test_AlterUserScramCredentials("main queue", rk, mainq); do_test_AlterUserScramCredentials("temp queue", rk, NULL); + do_test_DescribeLogDirs("main queue, options", rk, mainq, 1); + do_test_DescribeLogDirs("main queue, no options", rk, mainq, 0); + do_test_DescribeLogDirs("temp queue, options", rk, NULL, 1); + do_test_DescribeLogDirs("temp queue, no options", rk, NULL, 0); + do_test_ElectLeaders("main queue, options, Preffered Elections", rk, mainq, 1, RD_KAFKA_ELECTION_TYPE_PREFERRED); do_test_ElectLeaders("main queue, options, Unclean Elections", rk,