Skip to content
Merged
56 changes: 53 additions & 3 deletions gallery_dl/extractor/tiktok.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@
from .. import text, util, ytdl, exception
import functools
import itertools
import hashlib
import base64
import random
import time

Expand Down Expand Up @@ -119,6 +121,8 @@ def _sanitize_url(self, url):
def _extract_rehydration_data(self, url, additional_keys=[], *,
has_keys=[]):
tries = 0
html = None
challenge_attempt = False
while True:
try:
response = self.request(url)
Expand All @@ -139,15 +143,31 @@ def _extract_rehydration_data(self, url, additional_keys=[], *,
return data
except (ValueError, KeyError):
# We failed to retrieve rehydration data. This happens
# relatively frequently when making many requests, so
# retry.
# relatively frequently when making many requests, so retry.
if tries >= self._retries:
raise
tries += 1
self.log.warning("%s: Failed to retrieve rehydration data "
"(%s/%s)", url.rpartition("/")[2], tries,
self._retries)
self.sleep(self._timeout, "retry")
if challenge_attempt:
self.sleep(self._timeout, "retry")
challenge_attempt = False
else:
self.log.info("Solving JavaScript challenge")
try:
self._solve_challenge(html)
except Exception as exc:
self.log.traceback(exc)
self.log.warning(
"%s: Failed to solve JavaScript challenge. If you "
"keep encountering this issue, please try again "
"with the --write-pages option and include the "
"resulting page in your bug report",
url.rpartition("/")[2])
self.sleep(self._timeout, "retry")
html = None
challenge_attempt = True

def _extract_rehydration_data_user(self, profile_url, additional_keys=()):
if profile_url in self.rehydration_data_cache:
Expand Down Expand Up @@ -181,6 +201,36 @@ def _ensure_rehydration_data_app_context_cache_is_populated(self):
self._extract_rehydration_data_user(
"https://www.tiktok.com/", ["webapp.app-context"])

def _solve_challenge(self, html):
cs = text.extr(text.extr(html, 'id="cs"', '>'), 'class="', '"')
cs = base64.b64decode(cs + '==', validate=False).decode()
c = util.json_loads(cs)

expected = base64.b64decode(c["v"]["c"] + '==', validate=False)
base = hashlib.sha256(base64.b64decode(
c["v"]["a"] + '==', validate=False))

for i in range(1_000_000):
test = base.copy()
test.update(str(i).encode())
if test.digest() == expected:
break
else:
raise exception.ExtractionError("failed to find mstching digest")

# extract cookie names
wci = text.extr(text.extr(html, 'id="wci"', '>'), 'class="', '"')
rci = text.extr(text.extr(html, 'id="rci"', '>'), 'class="', '"')
rs = text.extr(text.extr(html, 'id="rs"', '>'), 'class="', '"')

c["d"] = base64.b64encode(str(i).encode()).decode()
self.cookies.set(wci, base64.b64encode(
util.json_dumps(c).encode()).decode())
if rs:
self.cookies.set(rci, rs)
self.cookies.set("Max-Age", "1")
return

def _extract_sec_uid(self, profile_url, user_name):
sec_uid = self._extract_id(
profile_url, user_name, r"MS4wLjABAAAA[\w-]{64}", "secUid")
Expand Down