Skip to content

Commit e687342

Browse files
committed
wip
1 parent 7b09a4e commit e687342

2 files changed

Lines changed: 21 additions & 9 deletions

File tree

core/database_arango.py

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -590,7 +590,7 @@ def link_to_tag(
590590
edge = json.loads(tag_relationship.model_dump_json())
591591
edge["_id"] = tag_relationship.id
592592
graph.update_edge(edge)
593-
if self._collection_name != "auditlog":
593+
if self._collection_name not in ("auditlog", "timeline"):
594594
try:
595595
event = message.TagEvent(
596596
type=message.EventType.update,
@@ -627,7 +627,7 @@ def link_to_tag(
627627
time.sleep(ASYNC_JOB_WAIT_TIME)
628628
result = job.result()["new"]
629629
result["__id"] = result.pop("_key")
630-
if self._collection_name != "auditlog":
630+
if self._collection_name not in ("auditlog", "timeline"):
631631
try:
632632
event = message.TagEvent(
633633
type=message.EventType.new, tagged_object=self, tag_object=tag_obj
@@ -672,7 +672,7 @@ def clear_tags(self):
672672
time.sleep(ASYNC_JOB_WAIT_TIME)
673673
results = job.result()
674674
for edge in results["edges"]:
675-
if self._collection_name != "auditlog":
675+
if self._collection_name not in ("auditlog", "timeline"):
676676
try:
677677
job = self._db.collection("tagged").get(edge["_id"])
678678
while job.status() != "done":
@@ -739,7 +739,7 @@ def link_to(
739739
job = async_graph.update_edge(edge)
740740
while job.status() != "done":
741741
time.sleep(ASYNC_JOB_WAIT_TIME)
742-
if self._collection_name != "auditlog":
742+
if self._collection_name not in ("auditlog", "timeline"):
743743
try:
744744
event = message.LinkEvent(
745745
type=message.EventType.update,
@@ -773,7 +773,7 @@ def link_to(
773773
result = job.result()["new"]
774774
result["__id"] = result.pop("_key")
775775
relationship = Relationship.load(result)
776-
if self._collection_name != "auditlog":
776+
if self._collection_name not in ("auditlog", "timeline"):
777777
try:
778778
event = message.LinkEvent(
779779
type=message.EventType.new,
@@ -910,7 +910,7 @@ def neighbors(
910910
str,
911911
"observable.ObservableTypes | entity.EntityTypes | indicator.IndicatorTypes | tag.Tag",
912912
],
913-
List[List["Relationship | TagRelationship"]],
913+
List[List["RelationshipTypes"]],
914914
int,
915915
]:
916916
"""Fetches neighbors of the YetiObject.

core/events/consumers.py

Lines changed: 15 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
import logging
55
import multiprocessing
66
import os
7+
from collections import defaultdict
78

89
from kombu import Connection, Exchange, Queue
910
from kombu.mixins import ConsumerMixin
@@ -63,6 +64,9 @@ def get_consumers(self, consumer, channel):
6364
consumer(queues=self.queues, callbacks=[self.on_message], accept=["json"])
6465
]
6566

67+
MESSAGE_TYPES_COUNT = defaultdict(int)
68+
ctr = 0
69+
6670

6771
class EventConsumer(Consumer):
6872
def __init__(self, stop_event, connection, queues):
@@ -72,21 +76,29 @@ def debug(self, message, body):
7276
message_digest = hashlib.sha256(body.encode()).hexdigest()
7377
ts = int(message.timestamp.timestamp())
7478
if isinstance(message.event, ObjectEvent):
75-
self.logger.debug(
79+
self.logger.info(
7680
f"Message received at {ts} - digest: {message_digest} | {message.event.event_message}"
7781
)
82+
MESSAGE_TYPES_COUNT[message.event.event_message] += 1
7883
if isinstance(message.event, LinkEvent):
7984
source = message.event.link_source_event
8085
target = message.event.link_target_event
81-
self.logger.debug(
86+
self.logger.info(
8287
f"Message received at {ts} - digest: {message_digest} | {source} --> {target}"
8388
)
89+
MESSAGE_TYPES_COUNT[f"{source}->{target}"] += 1
8490
if isinstance(message.event, TagEvent):
85-
self.logger.debug(
91+
self.logger.info(
8692
f"Message received at {ts} - digest: {message_digest} | {message.event.tag_message}"
8793
)
94+
MESSAGE_TYPES_COUNT[message.event.tag_message] += 1
95+
96+
if ctr % 300 == 0:
97+
for k, v in MESSAGE_TYPES_COUNT.items():
98+
self.logger.info(f"{k}: {v}")
8899

89100
def on_message(self, body, received_message):
101+
90102
try:
91103
message = EventMessage(**json.loads(body))
92104
except Exception:

0 commit comments

Comments
 (0)