Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGES/+no-traceback-task.removal
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Stopped leaking sensitive information of failures in the task API.
10 changes: 7 additions & 3 deletions pulpcore/app/models/task.py
Original file line number Diff line number Diff line change
Expand Up @@ -245,7 +245,7 @@ def set_completed(self, result=None):
)
self._cleanup_progress_reports(TASK_STATES.COMPLETED)

def set_failed(self, exc, tb):
def set_failed(self, exc, tb=None):
"""
Set this Task to the failed state and save it.

Expand All @@ -257,8 +257,12 @@ def set_failed(self, exc, tb):
tb (traceback): Traceback instance for the current exception.
"""
finished_at = timezone.now()
tb_str = "".join(traceback.format_tb(tb))
error = exception_to_dict(exc, tb_str)
error = {}
if tb:
tb_str = "".join(traceback.format_tb(tb))
error = exception_to_dict(exc, tb_str)
else:
error = exception_to_dict(exc)
rows = Task.objects.filter(
pk=self.pk,
state=TASK_STATES.RUNNING,
Expand Down
17 changes: 11 additions & 6 deletions pulpcore/download/http.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
DigestValidationError,
SizeValidationError,
TimeoutException,
DnsDomainNameException,
)


Expand Down Expand Up @@ -236,6 +237,7 @@ async def run(self, extra_data=None):
aiohttp.ClientPayloadError,
aiohttp.ClientResponseError,
aiohttp.ServerDisconnectedError,
DnsDomainNameException,
TimeoutError,
TimeoutException,
DigestValidationError,
Expand Down Expand Up @@ -289,12 +291,15 @@ async def _run(self, extra_data=None):
"""
if self.download_throttler:
await self.download_throttler.acquire()
async with self.session.get(
self.url, proxy=self.proxy, proxy_auth=self.proxy_auth, auth=self.auth
) as response:
self.raise_for_status(response)
to_return = await self._handle_response(response)
await response.release()
try:
async with self.session.get(
self.url, proxy=self.proxy, proxy_auth=self.proxy_auth, auth=self.auth
) as response:
self.raise_for_status(response)
to_return = await self._handle_response(response)
await response.release()
except aiohttp.ClientConnectorDNSError:
raise DnsDomainNameException(self.url)
if self._close_session_on_finalize:
await self.session.close()
return to_return
Expand Down
3 changes: 3 additions & 0 deletions pulpcore/exceptions/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,9 @@
TimeoutException,
exception_to_dict,
DomainProtectedError,
DnsDomainNameException,
ImmediateTaskTimeoutError,
NonAsyncImmediateTaskError,
)
from .validation import (
DigestValidationError,
Expand Down
61 changes: 61 additions & 0 deletions pulpcore/exceptions/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,3 +94,64 @@ def __init__(self):

def __str__(self):
return _("You cannot delete a domain that still contains repositories with content.")


class DnsDomainNameException(PulpException):
"""
Exception to signal that dns could not resolve the domain name for specified url.
"""

def __init__(self, url):
"""
:param url: the url that dns could not resolve
:type url: str
"""
super().__init__("PLP0008")
self.url = url

def __str__(self):
return _("Domain name was not found for {}. Check if specified url is valid.").format(
self.url
)


class ImmediateTaskTimeoutError(PulpException):
"""
Exception to signal that an immediate task timed out.
"""

def __init__(self, task_pk, timeout_seconds):
"""
:param task_pk: The PK of the task that timed out.
:type task_pk: str
:param timeout_seconds: The timeout duration.
:type timeout_seconds: int or str
"""
super().__init__("PLP0009")
self.task_pk = task_pk
self.timeout_seconds = timeout_seconds

def __str__(self):
return _("Immediate task {task_pk} timed out after {timeout_seconds} seconds.").format(
task_pk=self.task_pk, timeout_seconds=self.timeout_seconds
)


class NonAsyncImmediateTaskError(PulpException):
"""
Exception raised when a task is marked as 'immediate' but is not
an async coroutine function.
"""

def __init__(self, task_name):
"""
:param task_name: The name of the task that caused the error.
:type task_name: str
"""
super().__init__("PLP0010")
self.task_name = task_name

def __str__(self):
return _("Immediate task '{task_name}' must be an async function.").format(
task_name=self.task_name
)
29 changes: 24 additions & 5 deletions pulpcore/tasking/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,15 @@
TASK_WAKEUP_HANDLE,
TASK_WAKEUP_UNBLOCK,
)
from pulpcore.exceptions.base import (
PulpException,
ImmediateTaskTimeoutError,
NonAsyncImmediateTaskError,
)
from pulpcore.middleware import x_task_diagnostics_var
from pulpcore.tasking.kafka import send_task_notification


_logger = logging.getLogger(__name__)


Expand Down Expand Up @@ -75,10 +81,16 @@ def _execute_task(task):
log_task_start(task, domain)
task_function = get_task_function(task)
result = task_function()
except PulpException:
exc_type, exc, _ = sys.exc_info()
task.set_failed(exc)
send_task_notification(task)
except Exception:
exc_type, exc, tb = sys.exc_info()
task.set_failed(exc, tb)
log_task_failed(task, exc_type, exc, tb, domain)
# Generic exception for user
safe_exc = Exception("An internal error occured.")
task.set_failed(safe_exc)
send_task_notification(task)
else:
task.set_completed(result)
Expand All @@ -96,10 +108,16 @@ async def _aexecute_task(task):
try:
coroutine = get_task_function(task, ensure_coroutine=True)
result = await coroutine
except PulpException:
exc_type, exc, _ = sys.exc_info()
await sync_to_async(task.set_failed)(exc)
send_task_notification(task)
except Exception:
exc_type, exc, tb = sys.exc_info()
await sync_to_async(task.set_failed)(exc, tb)
log_task_failed(task, exc_type, exc, tb, domain)
# Generic exception for user
safe_exc = Exception("An internal error occured.")
await sync_to_async(task.set_failed)(safe_exc)
send_task_notification(task)
else:
await sync_to_async(task.set_completed)(result)
Expand Down Expand Up @@ -145,7 +163,8 @@ def log_task_failed(task, exc_type, exc, tb, domain):
domain=domain.name,
)
)
_logger.info("\n".join(traceback.format_list(traceback.extract_tb(tb))))
if tb:
_logger.info("\n".join(traceback.format_list(traceback.extract_tb(tb))))


def get_task_function(task, ensure_coroutine=False):
Expand All @@ -158,7 +177,7 @@ def get_task_function(task, ensure_coroutine=False):
is_coroutine_fn = asyncio.iscoroutinefunction(func)

if immediate and not is_coroutine_fn:
raise ValueError("Immediate tasks must be async functions.")
raise NonAsyncImmediateTaskError(task_name=task.name)

if ensure_coroutine:
if not is_coroutine_fn:
Expand All @@ -181,7 +200,7 @@ async def task_wrapper(): # asyncio.wait_for + async_to_sync requires wrapping
msg_template = "Immediate task %s timed out after %s seconds."
error_msg = msg_template % (task.pk, IMMEDIATE_TIMEOUT)
_logger.info(error_msg)
raise RuntimeError(error_msg)
raise ImmediateTaskTimeoutError(task_pk=task.pk, timeout_seconds=IMMEDIATE_TIMEOUT)

return async_to_sync(task_wrapper)

Expand Down
Loading