diff --git a/plugins/out_azure_kusto/azure_kusto.c b/plugins/out_azure_kusto/azure_kusto.c index 716768236c8..e93d08ffdf1 100644 --- a/plugins/out_azure_kusto/azure_kusto.c +++ b/plugins/out_azure_kusto/azure_kusto.c @@ -45,7 +45,7 @@ static int azure_kusto_get_msi_token(struct flb_azure_kusto *ctx) /* Retrieve access token */ token = flb_azure_msiauth_token_get(ctx->o); if (!token) { - flb_plg_error(ctx->ins, "error retrieving oauth2 access token"); + flb_plg_error(ctx->ins, "error retrieving oauth2 access token (MSI access token is NULL)"); return -1; } @@ -99,11 +99,16 @@ static int azure_kusto_get_service_principal_token(struct flb_azure_kusto *ctx) flb_plg_error(ctx->ins, "error appending oauth2 params"); return -1; } + /* Enable OAuth2 for token retrieval */ + ctx->o->cfg.enabled = FLB_TRUE; /* Retrieve access token */ char *token = flb_oauth2_token_get(ctx->o); if (!token) { - flb_plg_error(ctx->ins, "error retrieving oauth2 access token"); + flb_plg_error(ctx->ins, "error retrieving oauth2 access token - " + "check Fluent Bit logs for '[oauth2]' errors " + "(common causes: connection failure to '%s', invalid credentials, " + "or malformed response)", ctx->oauth_url ? ctx->oauth_url : "unknown"); return -1; } @@ -117,7 +122,6 @@ flb_sds_t get_azure_kusto_token(struct flb_azure_kusto *ctx) flb_sds_t output = NULL; if (pthread_mutex_lock(&ctx->token_mutex)) { - flb_plg_error(ctx->ins, "error locking mutex"); return NULL; } @@ -143,6 +147,7 @@ flb_sds_t get_azure_kusto_token(struct flb_azure_kusto *ctx) flb_sds_len(ctx->o->access_token) + 2); if (!output) { flb_plg_error(ctx->ins, "error creating token buffer"); + pthread_mutex_unlock(&ctx->token_mutex); return NULL; } flb_sds_snprintf(&output, flb_sds_alloc(output), "%s %s", ctx->o->token_type, @@ -892,6 +897,7 @@ static int cb_azure_kusto_init(struct flb_output_instance *ins, struct flb_confi if (ret == -1) { flb_plg_error(ctx->ins, "Failed to initialize kusto storage: %s", ctx->store_dir); + flb_azure_kusto_conf_destroy(ctx); return -1; } ctx->has_old_buffers = azure_kusto_store_has_data(ctx); @@ -899,14 +905,17 @@ static int cb_azure_kusto_init(struct flb_output_instance *ins, struct flb_confi /* validate 'total_file_size' */ if (ctx->file_size <= 0) { flb_plg_error(ctx->ins, "Failed to parse upload_file_size"); + flb_azure_kusto_conf_destroy(ctx); return -1; } if (ctx->file_size < 1000000) { flb_plg_error(ctx->ins, "upload_file_size must be at least 1MB"); + flb_azure_kusto_conf_destroy(ctx); return -1; } if (ctx->file_size > MAX_FILE_SIZE) { flb_plg_error(ctx->ins, "Max total_file_size must be lower than %ld bytes", MAX_FILE_SIZE); + flb_azure_kusto_conf_destroy(ctx); return -1; } @@ -933,15 +942,20 @@ static int cb_azure_kusto_init(struct flb_output_instance *ins, struct flb_confi * Create upstream context for Kusto Ingestion endpoint */ ctx->u = flb_upstream_create_url(config, ctx->ingestion_endpoint, io_flags, ins->tls); + if (!ctx->u) { + flb_plg_error(ctx->ins, "upstream creation failed"); + pthread_mutex_destroy(&ctx->resources_mutex); + pthread_mutex_destroy(&ctx->token_mutex); + pthread_mutex_destroy(&ctx->blob_mutex); + flb_azure_kusto_conf_destroy(ctx); + return -1; + } + if (ctx->buffering_enabled == FLB_TRUE){ flb_stream_disable_flags(&ctx->u->base, FLB_IO_ASYNC); ctx->u->base.net.io_timeout = ctx->io_timeout; ctx->has_old_buffers = azure_kusto_store_has_data(ctx); } - if (!ctx->u) { - flb_plg_error(ctx->ins, "upstream creation failed"); - return -1; - } flb_plg_debug(ctx->ins, "async flag is %d", flb_stream_is_async(&ctx->u->base)); @@ -950,6 +964,11 @@ static int cb_azure_kusto_init(struct flb_output_instance *ins, struct flb_confi flb_oauth2_create(ctx->config, ctx->oauth_url, FLB_AZURE_KUSTO_TOKEN_REFRESH); if (!ctx->o) { flb_plg_error(ctx->ins, "cannot create oauth2 context"); + flb_upstream_destroy(ctx->u); + pthread_mutex_destroy(&ctx->resources_mutex); + pthread_mutex_destroy(&ctx->token_mutex); + pthread_mutex_destroy(&ctx->blob_mutex); + flb_azure_kusto_conf_destroy(ctx); return -1; } flb_output_upstream_set(ctx->u, ins);