Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
71 changes: 37 additions & 34 deletions src/Common/CaresPTRResolver.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,25 @@ namespace DB
}
}

std::mutex CaresPTRResolver::mutex;
struct AresChannelRAII
{
AresChannelRAII()
{
if (ares_init(&channel) != ARES_SUCCESS)
{
throw DB::Exception(DB::ErrorCodes::DNS_ERROR, "Failed to initialize c-ares channel");
}
}

~AresChannelRAII()
{
ares_destroy(channel);
}

ares_channel channel;
};

CaresPTRResolver::CaresPTRResolver(CaresPTRResolver::provider_token) : channel(nullptr)
CaresPTRResolver::CaresPTRResolver(CaresPTRResolver::provider_token)
{
/*
* ares_library_init is not thread safe. Currently, the only other usage of c-ares seems to be in grpc.
Expand All @@ -57,34 +73,22 @@ namespace DB
* */
static const auto library_init_result = ares_library_init(ARES_LIB_INIT_ALL);

if (library_init_result != ARES_SUCCESS || ares_init(&channel) != ARES_SUCCESS)
if (library_init_result != ARES_SUCCESS)
{
throw DB::Exception(DB::ErrorCodes::DNS_ERROR, "Failed to initialize c-ares");
}
}

CaresPTRResolver::~CaresPTRResolver()
{
ares_destroy(channel);
/*
* Library initialization is currently done only once in the constructor. Multiple instances of CaresPTRResolver
* will be used in the lifetime of ClickHouse, thus it's problematic to have de-init here.
* In a practical view, it makes little to no sense to de-init a DNS library since DNS requests will happen
* until the end of the program. Hence, ares_library_cleanup() will not be called.
* */
}

std::unordered_set<std::string> CaresPTRResolver::resolve(const std::string & ip)
{
std::lock_guard guard(mutex);
AresChannelRAII channel_raii;

std::unordered_set<std::string> ptr_records;

resolve(ip, ptr_records);
resolve(ip, ptr_records, channel_raii.channel);

if (!wait_and_process())
if (!wait_and_process(channel_raii.channel))
{
cancel_requests();
throw DB::Exception(DB::ErrorCodes::DNS_ERROR, "Failed to complete reverse DNS query for IP {}", ip);
}

Expand All @@ -93,22 +97,21 @@ namespace DB

std::unordered_set<std::string> CaresPTRResolver::resolve_v6(const std::string & ip)
{
std::lock_guard guard(mutex);
AresChannelRAII channel_raii;

std::unordered_set<std::string> ptr_records;

resolve_v6(ip, ptr_records);
resolve_v6(ip, ptr_records, channel_raii.channel);

if (!wait_and_process())
if (!wait_and_process(channel_raii.channel))
{
cancel_requests();
throw DB::Exception(DB::ErrorCodes::DNS_ERROR, "Failed to complete reverse DNS query for IP {}", ip);
}

return ptr_records;
}

void CaresPTRResolver::resolve(const std::string & ip, std::unordered_set<std::string> & response)
void CaresPTRResolver::resolve(const std::string & ip, std::unordered_set<std::string> & response, ares_channel channel)
{
in_addr addr;

Expand All @@ -117,23 +120,23 @@ namespace DB
ares_gethostbyaddr(channel, reinterpret_cast<const void*>(&addr), sizeof(addr), AF_INET, callback, &response);
}

void CaresPTRResolver::resolve_v6(const std::string & ip, std::unordered_set<std::string> & response)
void CaresPTRResolver::resolve_v6(const std::string & ip, std::unordered_set<std::string> & response, ares_channel channel)
{
in6_addr addr;
inet_pton(AF_INET6, ip.c_str(), &addr);

ares_gethostbyaddr(channel, reinterpret_cast<const void*>(&addr), sizeof(addr), AF_INET6, callback, &response);
}

bool CaresPTRResolver::wait_and_process()
bool CaresPTRResolver::wait_and_process(ares_channel channel)
{
int sockets[ARES_GETSOCK_MAXNUM];
pollfd pollfd[ARES_GETSOCK_MAXNUM];

while (true)
{
auto readable_sockets = get_readable_sockets(sockets, pollfd);
auto timeout = calculate_timeout();
auto readable_sockets = get_readable_sockets(sockets, pollfd, channel);
auto timeout = calculate_timeout(channel);

int number_of_fds_ready = 0;
if (!readable_sockets.empty())
Expand All @@ -158,24 +161,24 @@ namespace DB

if (number_of_fds_ready > 0)
{
process_readable_sockets(readable_sockets);
process_readable_sockets(readable_sockets, channel);
}
else
{
process_possible_timeout();
process_possible_timeout(channel);
break;
}
}

return true;
}

void CaresPTRResolver::cancel_requests()
void CaresPTRResolver::cancel_requests(ares_channel channel)
{
ares_cancel(channel);
}

std::span<pollfd> CaresPTRResolver::get_readable_sockets(int * sockets, pollfd * pollfd)
std::span<pollfd> CaresPTRResolver::get_readable_sockets(int * sockets, pollfd * pollfd, ares_channel channel)
{
int sockets_bitmask = ares_getsock(channel, sockets, ARES_GETSOCK_MAXNUM);

Expand Down Expand Up @@ -205,7 +208,7 @@ namespace DB
return std::span<struct pollfd>(pollfd, number_of_sockets_to_poll);
}

int64_t CaresPTRResolver::calculate_timeout()
int64_t CaresPTRResolver::calculate_timeout(ares_channel channel)
{
timeval tv;
if (auto * tvp = ares_timeout(channel, nullptr, &tv))
Expand All @@ -218,14 +221,14 @@ namespace DB
return 0;
}

void CaresPTRResolver::process_possible_timeout()
void CaresPTRResolver::process_possible_timeout(ares_channel channel)
{
/* Call ares_process() unconditonally here, even if we simply timed out
above, as otherwise the ares name resolve won't timeout! */
ares_process_fd(channel, ARES_SOCKET_BAD, ARES_SOCKET_BAD);
}

void CaresPTRResolver::process_readable_sockets(std::span<pollfd> readable_sockets)
void CaresPTRResolver::process_readable_sockets(std::span<pollfd> readable_sockets, ares_channel channel)
{
for (auto readable_socket : readable_sockets)
{
Expand Down
29 changes: 16 additions & 13 deletions src/Common/CaresPTRResolver.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,32 +28,35 @@ namespace DB

public:
explicit CaresPTRResolver(provider_token);
~CaresPTRResolver() override;

/*
* Library initialization is currently done only once in the constructor. Multiple instances of CaresPTRResolver
* will be used in the lifetime of ClickHouse, thus it's problematic to have de-init here.
* In a practical view, it makes little to no sense to de-init a DNS library since DNS requests will happen
* until the end of the program. Hence, ares_library_cleanup() will not be called.
* */
~CaresPTRResolver() override = default;

std::unordered_set<std::string> resolve(const std::string & ip) override;

std::unordered_set<std::string> resolve_v6(const std::string & ip) override;

private:
bool wait_and_process();

void cancel_requests();

void resolve(const std::string & ip, std::unordered_set<std::string> & response);
bool wait_and_process(ares_channel channel);

void resolve_v6(const std::string & ip, std::unordered_set<std::string> & response);
void cancel_requests(ares_channel channel);

std::span<pollfd> get_readable_sockets(int * sockets, pollfd * pollfd);
void resolve(const std::string & ip, std::unordered_set<std::string> & response, ares_channel channel);

int64_t calculate_timeout();
void resolve_v6(const std::string & ip, std::unordered_set<std::string> & response, ares_channel channel);

void process_possible_timeout();
std::span<pollfd> get_readable_sockets(int * sockets, pollfd * pollfd, ares_channel channel);

void process_readable_sockets(std::span<pollfd> readable_sockets);
int64_t calculate_timeout(ares_channel channel);

ares_channel channel;
void process_possible_timeout(ares_channel channel);

static std::mutex mutex;
void process_readable_sockets(std::span<pollfd> readable_sockets, ares_channel channel);
};
}

41 changes: 21 additions & 20 deletions src/Common/tests/gtest_dns_reverse_resolve.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -9,34 +9,35 @@ namespace DB
{
TEST(Common, ReverseDNS)
{
auto addresses = std::vector<std::string>({
"8.8.8.8", "2001:4860:4860::8888", // dns.google
"142.250.219.35", // google.com
"157.240.12.35", // facebook
"208.84.244.116", "2600:1419:c400::214:c410", //www.terra.com.br,
"127.0.0.1", "::1"
});

auto func = [&]()
{
// Good random seed, good engine
auto rnd1 = std::mt19937(std::random_device{}());

for (int i = 0; i < 50; ++i)
for (int i = 0; i < 10; ++i)
{
auto & dns_resolver_instance = DNSResolver::instance();
// unfortunately, DNS cache can't be disabled because we might end up causing a DDoS attack
// dns_resolver_instance.setDisableCacheFlag();

auto addr_index = rnd1() % addresses.size();

[[maybe_unused]] auto result = dns_resolver_instance.reverseResolve(Poco::Net::IPAddress{ addresses[addr_index] });

// will not assert either because some of the IP addresses might change in the future and
// this test will become flaky
// ASSERT_TRUE(!result.empty());
dns_resolver_instance.setDisableCacheFlag();

auto val1 = rnd1() % static_cast<uint32_t>((pow(2, 31) - 1));
auto val2 = rnd1() % static_cast<uint32_t>((pow(2, 31) - 1));
auto val3 = rnd1() % static_cast<uint32_t>((pow(2, 31) - 1));
auto val4 = rnd1() % static_cast<uint32_t>((pow(2, 31) - 1));

uint32_t ipv4_buffer[1] = {
static_cast<uint32_t>(val1)
};

uint32_t ipv6_buffer[4] = {
static_cast<uint32_t>(val1),
static_cast<uint32_t>(val2),
static_cast<uint32_t>(val3),
static_cast<uint32_t>(val4)
};

dns_resolver_instance.reverseResolve(Poco::Net::IPAddress{ ipv4_buffer, sizeof(ipv4_buffer)});
dns_resolver_instance.reverseResolve(Poco::Net::IPAddress{ ipv6_buffer, sizeof(ipv6_buffer)});
}

};

auto number_of_threads = 200u;
Expand Down