Skip to content
Open
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 21 additions & 7 deletions plugins/out_azure_kusto/azure_kusto.c
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ static int azure_kusto_get_msi_token(struct flb_azure_kusto *ctx)
/* Retrieve access token */
token = flb_azure_msiauth_token_get(ctx->o);
if (!token) {
flb_plg_error(ctx->ins, "error retrieving oauth2 access token");
flb_plg_error(ctx->ins, "error retrieving oauth2 access token (MSI access token is NULL)");
return -1;
}

Expand Down Expand Up @@ -99,11 +99,16 @@ static int azure_kusto_get_service_principal_token(struct flb_azure_kusto *ctx)
flb_plg_error(ctx->ins, "error appending oauth2 params");
return -1;
}
/* Enable OAuth2 for token retrieval */
ctx->o->cfg.enabled = FLB_TRUE;

/* Retrieve access token */
char *token = flb_oauth2_token_get(ctx->o);
if (!token) {
flb_plg_error(ctx->ins, "error retrieving oauth2 access token");
flb_plg_error(ctx->ins, "error retrieving oauth2 access token - "
"check Fluent Bit logs for '[oauth2]' errors "
"(common causes: connection failure to '%s', invalid credentials, "
"or malformed response)", ctx->oauth_url ? ctx->oauth_url : "unknown");
return -1;
}

Expand All @@ -117,7 +122,6 @@ flb_sds_t get_azure_kusto_token(struct flb_azure_kusto *ctx)
flb_sds_t output = NULL;

if (pthread_mutex_lock(&ctx->token_mutex)) {
flb_plg_error(ctx->ins, "error locking mutex");
return NULL;
}

Expand Down Expand Up @@ -933,15 +937,20 @@ static int cb_azure_kusto_init(struct flb_output_instance *ins, struct flb_confi
* Create upstream context for Kusto Ingestion endpoint
*/
ctx->u = flb_upstream_create_url(config, ctx->ingestion_endpoint, io_flags, ins->tls);
if (!ctx->u) {
flb_plg_error(ctx->ins, "upstream creation failed");
pthread_mutex_destroy(&ctx->resources_mutex);
pthread_mutex_destroy(&ctx->token_mutex);
pthread_mutex_destroy(&ctx->blob_mutex);
flb_azure_kusto_conf_destroy(ctx);
return -1;
}

if (ctx->buffering_enabled == FLB_TRUE){
flb_stream_disable_flags(&ctx->u->base, FLB_IO_ASYNC);
ctx->u->base.net.io_timeout = ctx->io_timeout;
ctx->has_old_buffers = azure_kusto_store_has_data(ctx);
}
if (!ctx->u) {
flb_plg_error(ctx->ins, "upstream creation failed");
return -1;
}

flb_plg_debug(ctx->ins, "async flag is %d", flb_stream_is_async(&ctx->u->base));

Expand All @@ -950,6 +959,11 @@ static int cb_azure_kusto_init(struct flb_output_instance *ins, struct flb_confi
flb_oauth2_create(ctx->config, ctx->oauth_url, FLB_AZURE_KUSTO_TOKEN_REFRESH);
if (!ctx->o) {
flb_plg_error(ctx->ins, "cannot create oauth2 context");
flb_upstream_destroy(ctx->u);
pthread_mutex_destroy(&ctx->resources_mutex);
pthread_mutex_destroy(&ctx->token_mutex);
pthread_mutex_destroy(&ctx->blob_mutex);
flb_azure_kusto_conf_destroy(ctx);
return -1;
}
flb_output_upstream_set(ctx->u, ins);
Expand Down
Loading