Skip to content

Multi-Threaded rd_jitter Broken on Windows #3929

@brimonk

Description

@brimonk

Read the FAQ first: https://github.com/edenhill/librdkafka/wiki/FAQ

Description

This has been discussed in the past (#2795), but the output of rand() is predictable. This predictability is usually alleviated with the use of srand(), or by calling rand_r().

While POSIX went and defined rand_r to allow for reentrant PRNG, the Microsoft solution to this problem was to make rand and similar C functions thread-safe by storing the CRT global state in thread-safe storage. Given two threads, A and B, if thread A is the one to call srand() followed by thread B calling rand(), the PRGN seed will be the default for thread B, resulting in the same fixed sequence.

Long story short, this means that librdkakfa cannot ensure the calling application may call any of the API functions from any of its own threads at any time on Windows.

How to reproduce

Reproduction steps below assume Windows:

  1. Enable debug output
  2. Call rd_kafka_new and obtain an rd_kafka_t handle on thread A
  3. In a loop (more than once to validate), spawn a new thread, passing this handle to the new thread
  4. Produce a single message to the configured topic on thread B
  5. Analyze the logging output to see messages like the following:
Debug : [thrd:main]: TOPIC [5] is the new sticky partition

In a production scenario where this was spotted, the particular topic had 12 partitions, and partition 5 was the one that was selected every single time. To demonstrate this without using librdkafka, you can look at the following example:

// Compile with:
//     cl randtest.c
//
// Given PARTITION_CNT == 12, this will output:
//
//   0 0
//   1 0
//   2 0
//   3 0
//   4 0
//   5 1000
//   6 0
//   7 0
//   8 0
//   9 0
//   10 0
//   11 0

#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <time.h>

#define WIN32_LEAN_AND_MEAN
#include <windows.h>

#define PARTITION_CNT 12

long RANDVALS[PARTITION_CNT];

// rd_jitter: the rd_jitter definition from v1.2.0 (effectively the same from v1.8.0 without rand_r)
static inline int rd_jitter(int low, int high)
{
	return (low + (rand() % ((high - low) + 1)));
}

// randtest: does the rand test
DWORD __stdcall randtest(void *p)
{
	RANDVALS[rd_jitter(0, PARTITION_CNT - 1) % PARTITION_CNT]++;
	return 0;
}

int main(int argc, char **argv)
{
	memset(RANDVALS, 0, sizeof RANDVALS);

	srand(time(NULL)); // only seeds for the main thread

	for (int i = 0; i < 1000; i++) {
		HANDLE thrd = CreateThread(NULL, 0, randtest, NULL, 0, NULL);
		WaitForSingleObject(thrd, INFINITE);
	}

	for (int i = 0; i < PARTITION_CNT; i++) {
		printf("%d %ld\n", i, RANDVALS[i]);
	}

	return 0;
}

Checklist

IMPORTANT: We will close issues where the checklist has not been completed.

Please provide the following information:

  • librdkafka version (release number or git tag): 4faeb8132521da70b6bcde14423a14eb7ed5c55e
  • Apache Kafka version: 2.6.2
  • librdkafka client configuration: enable.random.seed=true (only relevant config here)
  • Operating system: Any Windows Version
  • Provide logs (with debug=.. as necessary) from librdkafka
  • Provide broker log excerpts
  • Critical issue

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions