Skip to content

[Proposal] Add published segment cache in broker #6834

@surekhasaharan

Description

@surekhasaharan

Problem

Some of the sys.segments queries are slow, they are taking as long as ~10-20 sec, which is not desirable. The cause of this slowness is call from broker to coordinator API which happens every time a query is issued to sys.segments table, it’s the getMetaDataSegments (invokes coordinator api /druid/coordinator/v1/metadata/segments) method which gets called from the SegmentsTable#scan() in SystemSchema.java. Coordinator can potentially returns millions of segments and most of the time is spent in parsing the json response and creating DataSegment objects.

Motivation

It would be useful to make these queries faster as these are used in an interactive way by the end user today. In future a unified druid console can be built on top of sys tables(#6832) and the new segment locking can also benefit from all used segments present in broker.

Proposed Changes

To fix this performance bottleneck, plan to add :

  1. segment cache in broker (phase 1)
  2. a new api in coordinator (phase 2)

Phase 1

To speed up the sys.segments queries, in phase1 I want to add a published segments cache in broker. Broker already maintains a cache of all available segments via the BrokerServerView, as brokers are caching segments announced by historicals and realtime tasks, but not from metadata store (published segments are cached in coordinator only). It's going to be a pull model in this phase, where broker polls the coordinator to get latest published segments and updates it's cache periodically in a background thread.
Potential issue is it could lead to memory pressure on broker if the number of published segments is large. To minimize this memory pressure on Broker, the DataSegment instance should be shared between the “available” segments and “published” segments in broker. DataSegment already uses Interners, so we can keep interned DataSegment objects in the published and available segments, which prevents memory bloat by using the same reference in both places. Then, roughly, the extra memory would only be required for DataSegment objects which are “published but unavailable” segments, which ideally should be close to 0 segments.
Another point to consider is if a broker is configured to watch some particular tiers or datasources.
If these broker configs are enabled, then extra filtering would be done on watchedDataSources in the published segment cache in broker. There is no need to filter on watchedTiers because we can't tell the difference between published segments that are unavailable, and published segments that are served by tiers not in watchedTiers. This would be documented in the sys table docs.

Implementation details:

A new class MetadataSegmentView would be added which maintains the published segments cache in memory. The broker will keep a single DataSegment object in heap in case a segment overlaps in both published and available segment collection, via "interned" objects. BrokerServerView
maintains the available segments via DataSegment in ServerSelector, the AtomicReference for DataSegment object will store an interned DataSegment object. On deserialization of json stream of published segments from coordinator, each DataSegment object is interned with the same interner (used in ServerSelector) and cached in the published segments collection.
For the filter on watchedDataSources, if BrokerSegmentWatcherConfig#watchedDataSources is not null, then the segments with dataSource not in watchedDataSources would be filtered out and will not be stored in the published segment broker cache.

Phase 2

In phase 2 for this improvement, a more efficient coordinator API should be added. There can be several ways to add this new coordinator API, see rejected alternatives for other options considered.

This API returns a delta of added/removed segments and takes timestamp as argument. When broker comes up, it gets all the published segments from coordinator. Broker does following: orders the received segments by the timestamp (created_date), saves the published segment in it’s cache and keeps track of the last received segment’s timestamp. Subsequent calls to the coordinator api will only return the segments that have been added or removed since the last timestamp.The broker will poll the coordinator API at a regular interval to keep the published segment cache synced in a background thread. "added_ segments" delta can be computed based on the created_date, additional work would be required to compute the "deleted_segments" delta. Coordinator will need to maintain an in-memory list of deleted segments and will need to be notified when a segment gets killed external to coordinator (unless this behavior is changed as suggested in #6816). Since the deleted segments count can increase, to avoid memory pressure, coordinator can remember an hour(or some other configurable value) of deleted segments. In case, the requested timestamp is older than an hour, all the published segments can be resynced. In case of coordinator restart or leader change, again, it can send all the published segments.

New or Changed Public Interface

A new rest endpoint will be added to coordinator
GET /druid/coordinator/v1/metadata/segments/{timestamp}

Add a timestamp field to DataSegment object which represents the created_date from druid_segments table in metadata store.

Rejected Alternatives:

These options were also considered for the coordinator API

  1. Coordinator sends just the ids of the published segments instead of complete DataSegment serialized objects, and then broker does a diff and finds out the segments which are not available, and then makes another call to get details for those segments. This approach was rejected because sometimes the segment_id list can be pretty large and it can cause a lot of network traffic between coordinator and broker processes and we may not achieve the performance improvement we are looking for.
  2. Add a new table “druid_transactionlogs” to the metadata store, which keeps track of the segment addition and removal. The coordinator API can then query this table when it receives a GET request from broker for any timestamp, it can also query this to maintain it’s own cache. For example,
operation segment_id timestamp
add s1 ts_0
disable s2 ts_1
delete s3 ts_1

It can use write ahead logging to take care of failures/restarts in any process. While this approach is good for maintaining consistency between coordinator and broker cache as well as fault tolerance, it may not give the speed improvement if we invoke the db call on each API invocation. Another challenge would be to keep the druid_segments table and druid_transactionlogs table in sync. Unless we need this for broader use cases, it may not be worth the extra design and effort.

Metadata

Metadata

Assignees

No one assigned

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions