Skip to content

Commit bf3104a

Browse files
authored
Merge pull request #454 from WikipediaLibrary/revert-453-revert-431-T370980-render-from-swift
Render aggregates stored in object storage
2 parents 30a5995 + e0b0cf4 commit bf3104a

File tree

6 files changed

+948
-78
lines changed

6 files changed

+948
-78
lines changed

extlinks/aggregates/storage.py

Lines changed: 202 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,202 @@
1+
import datetime
2+
import gzip
3+
import itertools
4+
import json
5+
import logging
6+
import os
7+
import re
8+
9+
from typing import Callable, Dict, Hashable, Iterable, List, Optional, Set
10+
11+
from django.core.cache import cache
12+
from django.db.models import Q
13+
14+
from extlinks.common.helpers import extract_queryset_filter
15+
from extlinks.common.swift import (
16+
batch_download_files,
17+
get_object_list,
18+
swift_connection,
19+
)
20+
21+
logger = logging.getLogger("django")
22+
23+
DEFAULT_EXPIRATION_SECS = 60 * 60
24+
25+
26+
def get_archive_list(prefix: str, expiration=DEFAULT_EXPIRATION_SECS) -> List[Dict]:
27+
"""
28+
Gets a list of all available archives in object storage.
29+
"""
30+
31+
key = f"{prefix}_archive_list"
32+
33+
# Retrieves the list from cache if possible.
34+
archives = cache.get(key)
35+
if archives:
36+
return json.loads(archives)
37+
38+
# Download and cache the archive list if one wasn't available in the cache.
39+
try:
40+
archives = get_object_list(
41+
swift_connection(), os.environ.get("SWIFT_CONTAINER_AGGREGATES", "archive-aggregates"), f"{prefix}_"
42+
)
43+
cache.set(key, json.dumps(archives), expiration)
44+
except RuntimeError:
45+
# Swift is optional so return an empty list if it's not set up.
46+
return []
47+
48+
return archives
49+
50+
51+
def get_archives(
52+
archives: Iterable[str], expiration=DEFAULT_EXPIRATION_SECS
53+
) -> Dict[str, bytes]:
54+
"""
55+
Retrieves the requested archives from objects storage or cache.
56+
"""
57+
58+
# Retrieve as many of the archives from cache as possible.
59+
archives = list(archives)
60+
result = cache.get_many(archives)
61+
62+
# Identify missing archives that were not available in cache.
63+
missing = set()
64+
for archive in archives:
65+
if archive not in result:
66+
missing.add(archive)
67+
68+
# Download and cache missing archives.
69+
if len(missing) > 0:
70+
downloaded_archives = batch_download_files(
71+
swift_connection(), os.environ.get("SWIFT_CONTAINER_AGGREGATES", "archive-aggregates"), archives
72+
)
73+
cache.set_many(downloaded_archives, expiration)
74+
result |= downloaded_archives
75+
76+
return result
77+
78+
79+
def decode_archive(archive: bytes) -> List[Dict]:
80+
"""
81+
Decodes a gzipped archive into a list of dictionaries (row records).
82+
"""
83+
84+
return json.loads(gzip.decompress(archive))
85+
86+
87+
def download_aggregates(
88+
prefix: str,
89+
queryset_filter: Q,
90+
from_date: Optional[datetime.date] = None,
91+
to_date: Optional[datetime.date] = None,
92+
) -> List[Dict]:
93+
"""
94+
Find and download archives needed to augment aggregate results from the DB.
95+
96+
This function tries its best to apply the passed in Django queryset to the
97+
records it returns. This function supports filtering by collection, user
98+
list, and date ranges.
99+
"""
100+
101+
extracted_filters = extract_queryset_filter(queryset_filter)
102+
collection_id = extracted_filters["collection"].pk
103+
on_user_list = extracted_filters.get("on_user_list", False)
104+
105+
if from_date is None:
106+
from_date = extracted_filters.get("full_date__gte")
107+
if isinstance(from_date, str):
108+
from_date = datetime.datetime.strptime(from_date, "%Y-%m-%d").date()
109+
110+
if to_date is None:
111+
to_date = extracted_filters.get("full_date__lte")
112+
if isinstance(to_date, str):
113+
to_date = datetime.datetime.strptime(to_date, "%Y-%m-%d").date()
114+
115+
# We're only returning objects that match the following pattern. The
116+
# archive filenames use the following naming convention:
117+
#
118+
# {prefix}_{organisation}_{collection}_{full_date}_{on_user_list}.json.gz
119+
pattern = (
120+
rf"^{prefix}_([0-9]+)_([0-9]+)_([0-9]+-[0-9]{{2}}-[0-9]{{2}})_([01])\.json\.gz$"
121+
)
122+
123+
# Identify archives that need to be downloaded from object storage
124+
# because they are not available in the database.
125+
archives = []
126+
for archive in get_archive_list(prefix):
127+
details = re.search(pattern, archive["name"])
128+
if not details:
129+
continue
130+
131+
archive_collection_id = int(details.group(2))
132+
archive_date = datetime.datetime.strptime(details.group(3), "%Y-%m-%d").date()
133+
archive_on_user_list = bool(int(details.group(4)))
134+
135+
# Filter out archives that don't match the queryset filter.
136+
if (
137+
(archive_collection_id != collection_id)
138+
or (on_user_list != archive_on_user_list)
139+
or (to_date and archive_date > to_date)
140+
or (from_date and archive_date < from_date)
141+
):
142+
continue
143+
144+
archives.append(archive)
145+
146+
# Bail out if there's nothing to download.
147+
if len(archives) == 0:
148+
return []
149+
150+
# Download and decompress the archives from object storage.
151+
unflattened_records = (
152+
(record["fields"] for record in decode_archive(contents))
153+
for contents in get_archives(archive["name"] for archive in archives).values()
154+
)
155+
156+
# Each archive has its own records and are grouped together in a
157+
# two-dimensional array. Merge them all together.
158+
return list(itertools.chain(*unflattened_records))
159+
160+
161+
def calculate_totals(
162+
records: Iterable[Dict],
163+
group_by: Optional[Callable[[Dict], Hashable]] = None,
164+
) -> List[Dict]:
165+
"""
166+
Caclulate the totals of the passed in records.
167+
"""
168+
169+
totals = {}
170+
171+
for record in records:
172+
key = group_by(record) if group_by else "_default"
173+
174+
if key in totals:
175+
totals[key]["total_links_added"] += record["total_links_added"]
176+
totals[key]["total_links_removed"] += record["total_links_removed"]
177+
totals[key]["links_diff"] += (
178+
record["total_links_added"] - record["total_links_removed"]
179+
)
180+
else:
181+
totals[key] = record.copy()
182+
totals[key]["links_diff"] = (
183+
record["total_links_added"] - record["total_links_removed"]
184+
)
185+
186+
return list(totals.values())
187+
188+
189+
def find_unique(
190+
records: Iterable[Dict],
191+
group_by: Callable[[Dict], Hashable],
192+
) -> Set[Hashable]:
193+
"""
194+
Find all distinct values in the given records.
195+
"""
196+
197+
values = set()
198+
199+
for record in records:
200+
values.add(group_by(record))
201+
202+
return values

extlinks/common/helpers.py

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
1+
import calendar
12
from datetime import date, timedelta
23
from itertools import islice
4+
from typing import Any, Dict
35

46
from django.db.models import Avg, Q
57
from django.db.models.functions import TruncMonth
@@ -159,6 +161,28 @@ def build_queryset_filters(form_data, collection_or_organisations):
159161
)
160162

161163

164+
def extract_queryset_filter(queryset_filter: Q, filters=None) -> Dict[str, Any]:
165+
"""
166+
Extract fields from a queryset filter that works for simple querysets.
167+
168+
This is used by functions that need to query from multiple different
169+
datasources with the same filters. In-particular when querying data from
170+
both the database and object storage.
171+
"""
172+
173+
if filters is None:
174+
filters = {}
175+
176+
for child in queryset_filter.children:
177+
if isinstance(child, Q):
178+
extract_queryset_filter(child, filters=filters)
179+
elif isinstance(child, tuple):
180+
key, value = child
181+
filters[key] = value
182+
183+
return filters
184+
185+
162186
def batch_iterator(iterable, size=1000):
163187
"""
164188
This yields successive batches from an iterable (memory-efficient).
@@ -185,3 +209,11 @@ def batch_iterator(iterable, size=1000):
185209
iterator = iter(iterable)
186210
while batch := list(islice(iterator, size)):
187211
yield batch
212+
213+
214+
def last_day(date: date) -> int:
215+
"""
216+
Finds the last day of the month for the given date.
217+
"""
218+
219+
return calendar.monthrange(date.year, date.month)[1]

0 commit comments

Comments
 (0)