Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 19 additions & 24 deletions src/app/statistics.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,9 @@
from django.apps import apps
from django.db import models
from django.db.models import (
Count,
Prefetch,
Q,
)
from django.db.models.functions import TruncDate
from django.utils import timezone

from app import media_type_config
Expand Down Expand Up @@ -514,36 +512,33 @@ def get_level(count):


def get_filtered_historical_data(start_date, end_date, user):
"""Get historical data filtered by date range."""
"""Return [{"date": datetime.date, "count": int}]."""
historical_models = BasicMedia.objects.get_historical_models()
combined_data = []
local_timezone = timezone.get_current_timezone()
local_tz = timezone.get_current_timezone()

day_buckets = defaultdict(int)

for model_name in historical_models:
historical_model = apps.get_model("app", model_name)
model = apps.get_model("app", model_name)

# Start with base query
query = historical_model.objects.filter(
history_user_id=user,
)
qs = model.objects.filter(history_user_id=user)

# Add date filters conditionally
if start_date is not None:
query = query.filter(history_date__date__gte=start_date)
if end_date is not None:
query = query.filter(history_date__date__lte=end_date)
if start_date:
qs = qs.filter(history_date__gte=start_date)
if end_date:
qs = qs.filter(history_date__lte=end_date)

# Annotate and aggregate
data = (
query.annotate(
date=TruncDate("history_date", tzinfo=local_timezone),
)
.values("date")
.annotate(count=Count("id"))
)
# We only need the timestamp, stream results to keep memory usage flat
for ts in qs.values_list("history_date", flat=True).iterator(chunk_size=2_000):
aware_ts = timezone.localtime(ts, local_tz)

day_buckets[aware_ts.date()] += 1

combined_data.extend(data)
combined_data = [
{"date": day, "count": count} for day, count in day_buckets.items()
]

logger.info("%s - built historical data (%s rows)", user, len(combined_data))
return combined_data


Expand Down
129 changes: 35 additions & 94 deletions src/app/tests/test_statistics.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@
from unittest.mock import MagicMock, patch

from django.contrib.auth import get_user_model
from django.db.models import Count
from django.test import TestCase

from app import statistics
Expand Down Expand Up @@ -819,109 +818,51 @@ def test_get_activity_data(self, mock_get_filtered_data):

@patch("app.statistics.BasicMedia.objects.get_historical_models")
@patch("app.statistics.apps.get_model")
def test_get_filtered_historical_data(
self,
mock_get_model,
mock_get_historical_models,
):
def test_get_filtered_historical_data(self, mock_get_model, mock_get_hist_models):
"""Test the get_filtered_historical_data function."""
# Setup test dates
start_date = datetime.datetime(2025, 1, 1, 0, 0, tzinfo=datetime.UTC)
end_date = datetime.datetime(2025, 3, 31, 0, 0, tzinfo=datetime.UTC)

# Mock historical models list
mock_get_historical_models.return_value = [
"historicalmodel1",
"historicalmodel2",
start = datetime.datetime(2025, 1, 1, tzinfo=datetime.UTC)
end = datetime.datetime(2025, 3, 31, tzinfo=datetime.UTC)

mock_get_hist_models.return_value = ["historicalmodel1", "historicalmodel2"]

def build_fake_model(timestamps):
qs = MagicMock()
qs.filter.return_value = qs
qs.values_list.return_value.iterator.return_value = timestamps
model = MagicMock()
model.objects = qs
return model

model1_ts = [
datetime.datetime(2025, 1, 5, 12, tzinfo=datetime.UTC),
datetime.datetime(2025, 1, 5, 18, tzinfo=datetime.UTC),
datetime.datetime(2025, 1, 10, 9, tzinfo=datetime.UTC),
datetime.datetime(2025, 1, 10, 10, tzinfo=datetime.UTC),
datetime.datetime(2025, 1, 10, 11, tzinfo=datetime.UTC),
]

# Create mock historical data for first model
mock_historical_model1 = MagicMock()
user_chain1 = mock_historical_model1.objects.filter.return_value
start_date_chain1 = user_chain1.filter.return_value
end_date_chain1 = start_date_chain1.filter.return_value
annotate_chain1 = end_date_chain1.annotate.return_value
values_chain1 = annotate_chain1.values.return_value
values_chain1.annotate.return_value = [
{"date": datetime.date(2025, 1, 5), "count": 3},
{"date": datetime.date(2025, 1, 10), "count": 2},
]

# Create mock historical data for second model
mock_historical_model2 = MagicMock()
user_chain2 = mock_historical_model2.objects.filter.return_value
start_date_chain2 = user_chain2.filter.return_value
end_date_chain2 = start_date_chain2.filter.return_value
annotate_chain2 = end_date_chain2.annotate.return_value
values_chain2 = annotate_chain2.values.return_value
values_chain2.annotate.return_value = [
{"date": datetime.date(2025, 2, 15), "count": 1},
{"date": datetime.date(2025, 3, 20), "count": 4},
model2_ts = [
datetime.datetime(2025, 2, 15, 8, tzinfo=datetime.UTC),
datetime.datetime(2025, 3, 20, 17, tzinfo=datetime.UTC),
datetime.datetime(2025, 3, 20, 18, tzinfo=datetime.UTC),
datetime.datetime(2025, 3, 20, 19, tzinfo=datetime.UTC),
datetime.datetime(2025, 3, 20, 20, tzinfo=datetime.UTC),
]

# Setup the get_model mock to return different models based on input
def side_effect(_, model_name):
if model_name == "historicalmodel1":
return mock_historical_model1
if model_name == "historicalmodel2":
return mock_historical_model2
return MagicMock()

mock_get_model.side_effect = side_effect

# Call the function
result = statistics.get_filtered_historical_data(
start_date,
end_date,
self.user,
fake_model1 = build_fake_model(model1_ts)
fake_model2 = build_fake_model(model2_ts)
mock_get_model.side_effect = lambda _, name: (
fake_model1 if name == "historicalmodel1" else fake_model2
)

# Verify results
self.assertEqual(len(result), 4) # Should have 4 date entries
result = statistics.get_filtered_historical_data(start, end, self.user)

# Check that the data from both models is combined
expected_data = [
{"date": datetime.date(2025, 1, 5), "count": 3},
{"date": datetime.date(2025, 1, 10), "count": 2},
expected = [
{"date": datetime.date(2025, 1, 5), "count": 2},
{"date": datetime.date(2025, 1, 10), "count": 3},
{"date": datetime.date(2025, 2, 15), "count": 1},
{"date": datetime.date(2025, 3, 20), "count": 4},
]

# Check that all expected data is in the result
for item in expected_data:
self.assertIn(item, result)

# Verify the filter calls were made correctly
for model_mock in [mock_historical_model1, mock_historical_model2]:
# Check first filter call (history_user_id)
first_filter_kwargs = model_mock.objects.filter.call_args[1]
self.assertEqual(first_filter_kwargs["history_user_id"], self.user)

# Check second filter call (start_date)
user_chain = model_mock.objects.filter.return_value
start_date_filter_kwargs = user_chain.filter.call_args[1]
self.assertEqual(
start_date_filter_kwargs["history_date__date__gte"],
start_date,
)

# Check third filter call (end_date)
start_date_chain = user_chain.filter.return_value
end_date_filter_kwargs = start_date_chain.filter.call_args[1]
self.assertEqual(
end_date_filter_kwargs["history_date__date__lte"],
end_date,
)

# Verify the annotation and values calls
end_date_chain = start_date_chain.filter.return_value
end_date_chain.annotate.assert_called_once()

annotate_chain = end_date_chain.annotate.return_value
annotate_chain.values.assert_called_once_with("date")

values_chain = annotate_chain.values.return_value
values_chain.annotate.assert_called_once_with(count=Count("id"))
self.assertCountEqual(result, expected)

def test_calculate_day_of_week_stats(self):
"""Test the calculate_day_of_week_stats function."""
Expand Down