Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
version: v1
kind: readable_storage
name: eap_item_co_occurring_attrs_v2

storage:
key: eap_item_co_occurring_attrs_v2
set_key: events_analytics_platform

readiness_state: complete

schema:
columns:
[
{ name: organization_id, type: UInt, args: { size: 64 } },
{ name: project_id, type: UInt, args: { size: 64 } },
{ name: item_type, type: UInt, args: { size: 8 } },
{ name: date, type: Date },
{ name: retention_days, type: UInt, args: { size: 16 } },
{ name: attribute_keys_hash, type: Array, args: { inner_type: { type: UInt, args: { size: 64 } } } },
{ name: attributes_string, type: Array, args: { inner_type: { type: String } } },
{ name: attributes_float, type: Array, args: { inner_type: { type: String } } },
{ name: attributes_bool, type: Array, args: { inner_type: { type: String } } },
{ name: count, type: UInt, args: { size: 64 } },
]
local_table_name: eap_item_co_occurring_attrs_2_local
dist_table_name: eap_item_co_occurring_attrs_2_dist
partition_format: [retention_days, date]
allocation_policies: []
Original file line number Diff line number Diff line change
@@ -0,0 +1,144 @@
from typing import List, Sequence

from snuba.clusters.storage_sets import StorageSetKey
from snuba.datasets.storages.tags_hash_map import get_array_vals_hash
from snuba.migrations import migration, operations, table_engines
from snuba.migrations.columns import MigrationModifiers as Modifiers
from snuba.migrations.operations import OperationTarget, SqlOperation
from snuba.utils.schemas import Array, Column, Date, String, UInt

num_attr_buckets = 40

columns: List[Column[Modifiers]] = [
Column("organization_id", UInt(64)),
Column("project_id", UInt(64)),
Column("item_type", UInt(8)),
Column("date", Date(Modifiers(codecs=["DoubleDelta", "ZSTD(1)"]))),
Column(
"retention_days",
UInt(16),
),
Column(
"attribute_keys_hash",
Array(
UInt(64),
Modifiers(
materialized=get_array_vals_hash(
"arrayDistinct(arrayConcat(attributes_string, attributes_float, attributes_bool))"
)
),
),
),
Column("attributes_string", Array(String())),
Column("attributes_float", Array(String())),
Column("attributes_bool", Array(String())),
# a hash of all the attribute keys of the item in sorted order
# this lets us deduplicate rows with merges
Column(
"key_hash",
UInt(
64,
Modifiers(
materialized="cityHash64(arraySort(arrayDistinct(arrayConcat(attributes_string, attributes_float, attributes_bool))))"
),
),
),
Column("count", UInt(64)),
]

_attr_num_names = ", ".join([f"mapKeys(attributes_float_{i})" for i in range(num_attr_buckets)])
_attr_str_names = ", ".join([f"mapKeys(attributes_string_{i})" for i in range(num_attr_buckets)])

MV_QUERY = f"""
SELECT
organization_id AS organization_id,
project_id AS project_id,
item_type as item_type,
toMonday(timestamp) AS date,
retention_days as retention_days,
arrayConcat({_attr_str_names}) AS attributes_string,
mapKeys(attributes_bool) AS attributes_bool,
arrayConcat({_attr_num_names}) AS attributes_float,
1 AS count
FROM eap_items_1_local
"""


class Migration(migration.ClickhouseNodeMigration):
blocking = False
storage_set_key = StorageSetKey.EVENTS_ANALYTICS_PLATFORM

local_table_name = "eap_item_co_occurring_attrs_2_local"
dist_table_name = "eap_item_co_occurring_attrs_2_dist"
mv_name = "eap_item_co_occurring_attrs_3_mv"

def forwards_ops(self) -> Sequence[SqlOperation]:
create_table_ops = [
operations.CreateTable(
storage_set=self.storage_set_key,
table_name=self.local_table_name,
engine=table_engines.SummingMergeTree(
storage_set=self.storage_set_key,
primary_key="(organization_id, project_id, date, item_type, key_hash)",
order_by="(organization_id, project_id, date, item_type, key_hash, retention_days)",
partition_by="(retention_days, toMonday(date))",
ttl="date + toIntervalDay(retention_days)",
),
columns=columns,
target=OperationTarget.LOCAL,
),
operations.CreateTable(
storage_set=self.storage_set_key,
table_name=self.dist_table_name,
engine=table_engines.Distributed(
local_table_name=self.local_table_name,
sharding_key=None,
),
columns=columns,
target=OperationTarget.DISTRIBUTED,
),
]

index_ops = [
operations.AddIndex(
storage_set=self.storage_set_key,
table_name=self.local_table_name,
index_name="bf_attribute_keys_hash",
index_expression="attribute_keys_hash",
index_type="bloom_filter",
granularity=1,
target=operations.OperationTarget.LOCAL,
),
]

materialized_view_ops: list[SqlOperation] = [
operations.CreateMaterializedView(
storage_set=self.storage_set_key,
view_name=self.mv_name,
columns=columns,
destination_table_name=self.local_table_name,
target=OperationTarget.LOCAL,
query=MV_QUERY,
),
]

return create_table_ops + index_ops + materialized_view_ops

def backwards_ops(self) -> Sequence[SqlOperation]:
return [
operations.DropTable(
storage_set=self.storage_set_key,
table_name=self.mv_name,
target=OperationTarget.LOCAL,
),
operations.DropTable(
storage_set=self.storage_set_key,
table_name=self.dist_table_name,
target=OperationTarget.DISTRIBUTED,
),
operations.DropTable(
storage_set=self.storage_set_key,
table_name=self.local_table_name,
target=OperationTarget.LOCAL,
),
]
16 changes: 10 additions & 6 deletions snuba/web/rpc/v1/endpoint_trace_item_attribute_names.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
TraceItemFilter,
)

from snuba import state
from snuba.attribution.appid import AppID
from snuba.attribution.attribution_info import AttributionInfo
from snuba.datasets.pluggable_dataset import PluggableDataset
Expand Down Expand Up @@ -54,6 +55,10 @@
NON_STORED_ATTRIBUTE_KEYS = ["sentry.service"]


def _use_v2_co_occurring_attrs() -> bool:
return state.get_config("use_co_occurring_attrs_v2", 0) == 1


class AttributeKeyCollector(ProtoVisitor):
def __init__(self) -> None:
self.keys: set[str] = set()
Expand Down Expand Up @@ -161,7 +166,10 @@ def get_co_occurring_attributes(
collector = AttributeKeyCollector()
TraceItemFilterWrapper(request.intersecting_attributes_filter).accept(collector)
attribute_keys_to_search = collector.keys
storage_key = StorageKey("eap_item_co_occurring_attrs")
if _use_v2_co_occurring_attrs():
storage_key = StorageKey("eap_item_co_occurring_attrs_v2")
else:
storage_key = StorageKey("eap_item_co_occurring_attrs")

storage = Storage(
key=storage_key,
Expand Down Expand Up @@ -305,11 +313,7 @@ def convert_co_occurring_results_to_attributes(
query_res: QueryResult,
) -> list[TraceItemAttributeNamesResponse.Attribute]:
def t(row: Row) -> TraceItemAttributeNamesResponse.Attribute:
# our query to snuba only selected 1 column, attr_key
# so the result should only have 1 item per row
vals = row.values()
assert len(vals) == 1
attr_type, attr_name = list(vals)[0]
attr_type, attr_name = row["attr_key"]
assert isinstance(attr_type, str)
return TraceItemAttributeNamesResponse.Attribute(
name=attr_name, type=getattr(AttributeKey.Type, attr_type)
Expand Down
71 changes: 71 additions & 0 deletions tests/web/rpc/v1/test_endpoint_trace_item_attribute_names.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
from sentry_protos.snuba.v1.trace_item_filter_pb2 import ExistsFilter, TraceItemFilter
from sentry_protos.snuba.v1.trace_item_pb2 import AnyValue

from snuba import state
from snuba.datasets.storages.factory import get_storage
from snuba.datasets.storages.storage_key import StorageKey
from snuba.web.rpc.v1.endpoint_trace_item_attribute_names import (
Expand Down Expand Up @@ -236,6 +237,76 @@ def test_basic_co_occurring_attrs(self) -> None:
]
assert res.attributes == expected

def test_basic_with_v2_storage(self) -> None:
state.set_config("use_co_occurring_attrs_v2", 1)
try:
req = TraceItemAttributeNamesRequest(
meta=RequestMeta(
project_ids=[1, 2, 3],
organization_id=1,
cogs_category="something",
referrer="something",
start_timestamp=Timestamp(
seconds=int((BASE_TIME - timedelta(days=1)).timestamp())
),
end_timestamp=Timestamp(
seconds=int((BASE_TIME + timedelta(days=1)).timestamp())
),
),
limit=TOTAL_GENERATED_ATTR_PER_TYPE,
type=AttributeKey.Type.TYPE_STRING,
value_substring_match="a_tag",
)
res = EndpointTraceItemAttributeNames().execute(req)
expected = []
for i in range(TOTAL_GENERATED_ATTR_PER_TYPE):
expected.append(
TraceItemAttributeNamesResponse.Attribute(
name=f"a_tag_{str(i).zfill(3)}", type=AttributeKey.Type.TYPE_STRING
)
)
assert res.attributes == expected
finally:
state.set_config("use_co_occurring_attrs_v2", 0)

def test_v2_storage_with_co_occurring_filter(self) -> None:
state.set_config("use_co_occurring_attrs_v2", 1)
try:
req = TraceItemAttributeNamesRequest(
meta=RequestMeta(
project_ids=[1, 2, 3],
organization_id=1,
cogs_category="something",
referrer="something",
start_timestamp=Timestamp(
seconds=int((BASE_TIME - timedelta(days=1)).timestamp())
),
end_timestamp=Timestamp(
seconds=int((BASE_TIME + timedelta(days=1)).timestamp())
),
),
limit=TOTAL_GENERATED_ATTR_PER_TYPE,
intersecting_attributes_filter=TraceItemFilter(
exists_filter=ExistsFilter(
key=AttributeKey(type=AttributeKey.TYPE_STRING, name="a_tag_000")
)
),
value_substring_match="000",
type=AttributeKey.Type.TYPE_STRING,
)
res = EndpointTraceItemAttributeNames().execute(req)
expected = [
TraceItemAttributeNamesResponse.Attribute(
name="a_tag_000", type=AttributeKey.Type.TYPE_STRING
),
TraceItemAttributeNamesResponse.Attribute(
name="c_tag_000", type=AttributeKey.Type.TYPE_STRING
),
]
assert res.attributes == expected
finally:
state.set_config("use_co_occurring_attrs_v2", 0)

def test_simple_boolean(self) -> None:
req = TraceItemAttributeNamesRequest(
meta=RequestMeta(
Expand Down
Loading