Skip to content
Merged
Show file tree
Hide file tree
Changes from 10 commits
Commits
Show all changes
17 commits
Select commit Hold shift + click to select a range
ba4ecf4
feat/added-retries-for-platform-service-calls [FEAT] Added exponentia…
chandrasekharan-zipstack Sep 9, 2025
403492c
Delete mypy-errors.txt
chandrasekharan-zipstack Sep 9, 2025
8e305d7
Apply suggestions from code review
chandrasekharan-zipstack Sep 9, 2025
565078a
UN-2793 Moved ConnectionError handling to allow the retry decorator a…
chandrasekharan-zipstack Sep 9, 2025
ae9de7a
Merge branch 'main' into feat/added-retries-for-platform-service-calls
chandrasekharan-zipstack Sep 22, 2025
fd4240f
UN-2793 [FEAT] Refactor retry mechanism to use backoff library with c…
chandrasekharan-zipstack Sep 25, 2025
8d50eca
Apply suggestion from @coderabbitai[bot]
chandrasekharan-zipstack Sep 26, 2025
2c50779
Apply suggestion from @chandrasekharan-zipstack
chandrasekharan-zipstack Sep 26, 2025
4452b09
Apply suggestion from @chandrasekharan-zipstack
chandrasekharan-zipstack Sep 26, 2025
3649d4a
UN-2793 [FEAT] Added retry decorator for prompt service calls
chandrasekharan-zipstack Sep 26, 2025
5fbc56b
UN-2793 Removed use of backoff lib and added own decorator for retries
chandrasekharan-zipstack Sep 28, 2025
31f7497
minor: Removed a default argument to make calls to decorator explicit
chandrasekharan-zipstack Sep 28, 2025
2039147
misc: Raised err to validate envs for retry
chandrasekharan-zipstack Oct 1, 2025
089cac3
Update src/unstract/sdk/utils/retry_utils.py
chandrasekharan-zipstack Oct 1, 2025
bab02a3
Apply suggestion from @coderabbitai[bot]
chandrasekharan-zipstack Oct 1, 2025
c86fa90
Apply suggestion from @coderabbitai[bot]
chandrasekharan-zipstack Oct 1, 2025
987f11f
Apply suggestion from @coderabbitai[bot]
chandrasekharan-zipstack Oct 1, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ dependencies = [
"jsonschema>=4.18.6,<5.0",
"python-magic~=0.4.27",
"python-dotenv==1.0.1",
# Retry mechanism
"backoff>=2.2.1",
# Adapter changes
"llama-index==0.13.2",
"tiktoken~=0.9.0",
Expand Down
3 changes: 2 additions & 1 deletion src/unstract/sdk/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
__version__ = "v0.77.3"
__version__ = "v0.78.0"


def get_sdk_version() -> str:
"""Returns the SDK version."""
Expand Down
35 changes: 24 additions & 11 deletions src/unstract/sdk/adapter.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import json
import logging
from typing import Any

import requests
Expand All @@ -9,6 +10,9 @@
from unstract.sdk.helper import SdkHelper
from unstract.sdk.platform import PlatformBase
from unstract.sdk.tool.base import BaseTool
from unstract.sdk.utils.retry_utils import retry_platform_service_call

logger = logging.getLogger(__name__)


class ToolAdapter(PlatformBase):
Expand All @@ -24,10 +28,12 @@ def __init__(
platform_host: str,
platform_port: str,
) -> None:
"""Args:
"""Constructor for ToolAdapter.

Args:
tool (AbstractTool): Instance of AbstractTool
platform_host (str): Host of platform service
platform_port (str): Port of platform service
platform_port (str): Port of platform service.

Notes:
- PLATFORM_SERVICE_API_KEY environment variable is required.
Expand All @@ -38,14 +44,19 @@ def __init__(
tool=tool, platform_host=platform_host, platform_port=platform_port
)

@retry_platform_service_call
def _get_adapter_configuration(
self,
adapter_instance_id: str,
) -> dict[str, Any]:
"""Get Adapter
1. Get the adapter config from platform service
using the adapter_instance_id
"""Get Adapter.

Get the adapter config from platform service
using the adapter_instance_id. This method automatically
retries on connection errors with exponential backoff.

Retry behavior is configurable via environment variables:
Check decorator for details
Args:
adapter_instance_id (str): Adapter instance ID

Expand All @@ -70,18 +81,14 @@ def _get_adapter_configuration(
f"'{adapter_type}', provider: '{provider}', name: '{adapter_name}'",
level=LogLevel.DEBUG,
)
except ConnectionError:
raise SdkError(
"Unable to connect to platform service, please contact the admin."
)
except HTTPError as e:
default_err = (
"Error while calling the platform service, please contact the admin."
)
msg = AdapterUtils.get_msg_from_request_exc(
err=e, message_key="error", default_err=default_err
)
raise SdkError(f"Error retrieving adapter. {msg}")
raise SdkError(f"Error retrieving adapter. {msg}") from e
return adapter_data

@staticmethod
Expand Down Expand Up @@ -121,4 +128,10 @@ def get_adapter_config(
platform_host=platform_host,
platform_port=platform_port,
)
return tool_adapter._get_adapter_configuration(adapter_instance_id)

try:
return tool_adapter._get_adapter_configuration(adapter_instance_id)
except ConnectionError as e:
raise SdkError(
"Unable to connect to platform service, please contact the admin."
) from e
21 changes: 17 additions & 4 deletions src/unstract/sdk/platform.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,20 @@
import logging
from typing import Any

import requests
from requests import ConnectionError, RequestException, Response
from unstract.sdk.constants import (
LogLevel,
MimeType,
PromptStudioKeys,
RequestHeader,
ToolEnv,
)
from unstract.sdk.helper import SdkHelper
from unstract.sdk.tool.base import BaseTool
from unstract.sdk.utils.retry_utils import retry_platform_service_call

logger = logging.getLogger(__name__)


class PlatformBase:
Expand Down Expand Up @@ -86,6 +91,7 @@ def _get_headers(self, headers: dict[str, str] | None = None) -> dict[str, str]:
request_headers.update(headers)
return request_headers

@retry_platform_service_call
def _call_service(
self,
url_path: str,
Expand All @@ -97,6 +103,10 @@ def _call_service(
"""Talks to platform-service to make GET / POST calls.

Only GET calls are made to platform-service though functionality exists.
This method automatically retries on connection errors with exponential backoff.

Retry behavior is configurable via environment variables.
Check decorator for details

Args:
url_path (str): URL path to the service endpoint
Expand Down Expand Up @@ -130,9 +140,13 @@ def _call_service(

response.raise_for_status()
except ConnectionError as connect_err:
msg = "Unable to connect to platform service. Please contact admin."
msg += " \n" + str(connect_err)
self.tool.stream_error_and_exit(msg)
logger.exception("Connection error to platform service")
msg = (
"Unable to connect to platform service. Will retry with backoff, "
"please contact admin if retries ultimately fail."
)
self.tool.stream_log(msg, level=LogLevel.ERROR)
raise ConnectionError(msg) from connect_err
except RequestException as e:
# Extract error information from the response if available
error_message = str(e)
Expand Down Expand Up @@ -200,4 +214,3 @@ def get_llm_profile(self, llm_profile_id: str) -> dict[str, Any]:
headers=None,
method="GET",
)

10 changes: 10 additions & 0 deletions src/unstract/sdk/prompt.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
from unstract.sdk.platform import PlatformHelper
from unstract.sdk.tool.base import BaseTool
from unstract.sdk.utils.common_utils import log_elapsed
from unstract.sdk.utils.retry_utils import retry_prompt_service_call

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -185,6 +186,7 @@ def _get_headers(self, headers: dict[str, str] | None = None) -> dict[str, str]:
request_headers.update(headers)
return request_headers

@retry_prompt_service_call
def _call_service(
self,
url_path: str,
Expand All @@ -196,6 +198,14 @@ def _call_service(
"""Communicates to prompt service to fetch response for the prompt.

Only POST calls are made to prompt-service though functionality exists.
This method automatically retries on connection errors with exponential backoff.

Retry behavior is configurable via environment variables:
- PROMPT_SERVICE_MAX_RETRIES (default: 3)
- PROMPT_SERVICE_MAX_TIME (default: 60s)
- PROMPT_SERVICE_BASE_DELAY (default: 1.0s)
- PROMPT_SERVICE_MULTIPLIER (default: 2.0)
- PROMPT_SERVICE_JITTER (default: true)

Args:
url_path (str): URL path to the service endpoint
Expand Down
149 changes: 149 additions & 0 deletions src/unstract/sdk/utils/retry_utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,149 @@
"""Generic retry utilities using backoff library with configurable prefixes."""

import errno
import logging
import os
from collections.abc import Callable
from typing import Any

import backoff
from requests.exceptions import ConnectionError, HTTPError, Timeout

logger = logging.getLogger(__name__)


def is_retryable_error(error: Exception) -> bool:
"""Check if an error is retryable (preserving existing logic).

Handles:
- ConnectionError and Timeout from requests
- HTTPError with status codes 502, 503, 504
- OSError with specific errno codes (ECONNREFUSED, ECONNRESET, etc.)

Args:
error: The exception to check

Returns:
True if the error should trigger a retry
"""
# Requests connection and timeout errors
if isinstance(error, (ConnectionError, Timeout)):
return True

# HTTP errors with specific status codes
if isinstance(error, HTTPError):
if hasattr(error, "response") and error.response is not None:
status_code = error.response.status_code
# Retry on server errors and bad gateway
if status_code in [502, 503, 504]:
return True

# OS-level connection failures (preserving existing errno checks)
if isinstance(error, OSError) and error.errno in {
errno.ECONNREFUSED, # Connection refused
getattr(errno, "ECONNRESET", 104), # Connection reset by peer
getattr(errno, "ETIMEDOUT", 110), # Connection timed out
getattr(errno, "EHOSTUNREACH", 113), # No route to host
getattr(errno, "ENETUNREACH", 101), # Network is unreachable
}:
return True

return False


def create_retry_decorator(
prefix: str = "PLATFORM_SERVICE",
exceptions: tuple[type[Exception], ...] | None = None,
logger_instance: logging.Logger | None = None,
) -> Callable:
"""Create a configured backoff decorator for a specific service.

Args:
prefix: Environment variable prefix for configuration
logger_instance: Optional logger for retry events
exceptions: Tuple of exception types to retry on.
Defaults to (ConnectionError, HTTPError, Timeout, OSError)

Environment variables (using prefix):
{prefix}_MAX_RETRIES: Maximum retry attempts (default: 3)
{prefix}_MAX_TIME: Maximum total time in seconds (default: 60)
{prefix}_BASE_DELAY: Initial delay in seconds (default: 1.0)
{prefix}_MULTIPLIER: Backoff multiplier (default: 2.0)
{prefix}_JITTER: Enable jitter true/false (default: true)

Returns:
Configured backoff decorator
"""
# Set default exceptions if not provided
if exceptions is None:
exceptions = (ConnectionError, HTTPError, Timeout, OSError)

# Load configuration from environment
max_tries = int(os.getenv(f"{prefix}_MAX_RETRIES", "3")) + 1 # +1 for initial attempt
max_time = float(os.getenv(f"{prefix}_MAX_TIME", "60"))
base = float(os.getenv(f"{prefix}_BASE_DELAY", "1.0"))
factor = float(os.getenv(f"{prefix}_MULTIPLIER", "2.0"))
use_jitter = os.getenv(f"{prefix}_JITTER", "true").strip().lower() in {
"true",
"1",
"yes",
"on",
}

if logger_instance is None:
logger_instance = logger

def on_backoff_handler(details: dict[str, Any]) -> None:
"""Log retry attempts with useful context."""
exception = details["exception"]
tries = details["tries"]
wait = details.get("wait", 0)

logger_instance.warning(
"Retry %d/%d for %s: %s (waiting %.1fs)",
tries,
max_tries - 1,
prefix,
exception,
wait,
)

def on_giveup_handler(details: dict[str, Any]) -> None:
"""Log when giving up after all retries."""
exception = details["exception"]
tries = details["tries"]

logger_instance.exception(
"Giving up after %d retries for %s: %s", tries, prefix, exception
)

# Create the decorator with backoff
return backoff.on_exception(
backoff.expo,
exceptions, # Use the configurable exceptions
max_tries=max_tries,
max_time=max_time,
base=base,
factor=factor,
jitter=backoff.full_jitter if use_jitter else None,
giveup=lambda e: not is_retryable_error(e),
on_backoff=on_backoff_handler,
on_giveup=on_giveup_handler,
)


# Retry configured through below envs.
# - PLATFORM_SERVICE_MAX_RETRIES (default: 3)
# - PLATFORM_SERVICE_MAX_TIME (default: 60s)
# - PLATFORM_SERVICE_BASE_DELAY (default: 1.0s)
# - PLATFORM_SERVICE_MULTIPLIER (default: 2.0)
# - PLATFORM_SERVICE_JITTER (default: true)
retry_platform_service_call = create_retry_decorator("PLATFORM_SERVICE")

# Retry configured through below envs.
# - PROMPT_SERVICE_MAX_RETRIES (default: 3)
# - PROMPT_SERVICE_MAX_TIME (default: 60s)
# - PROMPT_SERVICE_BASE_DELAY (default: 1.0s)
# - PROMPT_SERVICE_MULTIPLIER (default: 2.0)
# - PROMPT_SERVICE_JITTER (default: true)
retry_prompt_service_call = create_retry_decorator("PROMPT_SERVICE")
Loading