Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions hparams/hparams.json
Original file line number Diff line number Diff line change
Expand Up @@ -53,8 +53,8 @@
"eval_lr_factor": 0.2,
"openskill_beta": 7,
"openskill_tau": 0.1,
"checkpoint_init_version": "2.1.18",
"checkpoint_init_window": 62834,
"checkpoint_init_version": "2.1.24",
"checkpoint_init_window": 63571,
"num_evaluation_bins": 5,
"quantization_bins": 4,
"quantization_range": 6,
Expand Down
2 changes: 1 addition & 1 deletion src/tplr/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
# mypy: ignore-errors
# type: ignore

__version__ = "2.1.24"
__version__ = "2.1.25"

# Import package.
from .chain import *
Expand Down
49 changes: 43 additions & 6 deletions src/tplr/comms.py
Original file line number Diff line number Diff line change
Expand Up @@ -1095,13 +1095,39 @@ async def download_chunk_streaming(chunk_number: int, max_retries: int = 3):

except asyncio.CancelledError:
raise
except Exception as e:
tplr.logger.error(
f"Error downloading chunk {chunk_number} (attempt {attempt + 1}/{max_retries}): {e}"
except asyncio.TimeoutError:
tplr.logger.warning(
f"Timeout downloading chunk {chunk_number} (attempt {attempt + 1}/{max_retries}) - "
f"possible rate limiting or slow network"
)
if attempt == max_retries - 1:
raise
await asyncio.sleep((2**attempt) + random.uniform(0, 1))
except Exception as e:
# Check for rate limiting indicators
error_str = str(e).lower()
if any(
indicator in error_str
for indicator in [
"429",
"throttl",
"slow down",
"rate limit",
"503",
"service unavailable",
"too many",
]
):
tplr.logger.error(
f"RATE LIMITING DETECTED downloading chunk {chunk_number}: {e}"
)
else:
tplr.logger.error(
f"Error downloading chunk {chunk_number} (attempt {attempt + 1}/{max_retries}): {e}"
)
if attempt == max_retries - 1:
raise
await asyncio.sleep((2**attempt) + random.uniform(0, 1))

try:
# Download remaining chunks
Expand Down Expand Up @@ -1700,9 +1726,20 @@ async def gather(

try:
download_start = tplr.T()
batch_responses = await asyncio.gather(
*batch_tasks, return_exceptions=True
)
# Overall timeout for all downloads: 10 minutes max
# This prevents indefinite hangs from rate limiting or network issues
gather_timeout = 600 # 10 minutes
try:
batch_responses = await asyncio.wait_for(
asyncio.gather(*batch_tasks, return_exceptions=True),
timeout=gather_timeout,
)
except asyncio.TimeoutError:
tplr.logger.error(
f"Gather download phase timed out after {gather_timeout}s - "
f"possible rate limiting or network issue. Aborting gather."
)
return None
tplr.logger.info(
f"{tplr.P(window, tplr.T() - download_start)} Downloaded peer gradients <--"
)
Expand Down