Skip to content
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 25 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -352,6 +352,31 @@ python krr.py simple --selector 'app.kubernetes.io/instance in (robusta, ingress
```
</details>

<details>
<summary>Group jobs by specific labels</summary>

Group jobs that have specific labels into GroupedJob objects for consolidated resource recommendations. This is useful for batch jobs, data processing pipelines, or any workload where you want to analyze resource usage across multiple related jobs.

```sh
krr simple --job-grouping-labels app,team
```

This will:
- Group jobs that have either `app` or `team` labels (or both)
- Create GroupedJob objects with names like `app=frontend`, `team=backend`, etc.
- Provide resource recommendations for the entire group instead of individual jobs
- Jobs with the specified labels will be excluded from regular Job listing

You can specify multiple labels separated by commas:

```sh
krr simple --job-grouping-labels app,team,environment
```

Each job will be grouped by each label it has, so a job with `app=api,team=backend` will appear in both `app=api` and `team=backend` groups.

</details>

<details>
<summary>Override the kubectl context</summary>

Expand Down
8 changes: 4 additions & 4 deletions poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ kubernetes = "^26.1.0"
prometheus-api-client = "0.5.3"
numpy = ">=1.26.4,<1.27.0"
alive-progress = "^3.1.2"
prometrix = "^0.2.7"
prometrix = "0.2.5"
slack-sdk = "^3.21.3"
pandas = "2.2.2"
requests = ">2.32.4"
Expand Down
2 changes: 1 addition & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ packaging==24.0 ; python_version >= "3.9" and python_full_version < "3.13"
pandas==2.2.2 ; python_version >= "3.9" and python_full_version < "3.13"
pillow==10.3.0 ; python_version >= "3.9" and python_full_version < "3.13"
prometheus-api-client==0.5.3 ; python_version >= "3.9" and python_full_version < "3.13"
prometrix==0.2.7 ; python_version >= "3.9" and python_full_version < "3.13"
prometrix==0.2.5 ; python_version >= "3.9" and python_full_version < "3.13"
pyasn1-modules==0.3.0 ; python_version >= "3.9" and python_full_version < "3.13"
pyasn1==0.5.1 ; python_version >= "3.9" and python_full_version < "3.13"
pydantic==1.10.15 ; python_version >= "3.9" and python_full_version < "3.13"
Expand Down
88 changes: 84 additions & 4 deletions robusta_krr/core/integrations/kubernetes/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,7 @@ async def list_scannable_objects(self) -> list[K8sObjectData]:
self._list_all_daemon_set(),
self._list_all_jobs(),
self._list_all_cronjobs(),
self._list_all_groupedjobs(),
)

return [
Expand Down Expand Up @@ -146,6 +147,20 @@ async def list_pods(self, object: K8sObjectData) -> list[PodData]:
]
selector = f"batch.kubernetes.io/controller-uid in ({','.join(ownered_jobs_uids)})"

elif object.kind == "GroupedJob":
if not hasattr(object._api_resource, '_label_filter') or not object._api_resource._label_filter:
return []

# Use the label+value filter to get pods
ret: V1PodList = await loop.run_in_executor(
self.executor,
lambda: self.core.list_namespaced_pod(
namespace=object.namespace, label_selector=object._api_resource._label_filter
),
)

return [PodData(name=pod.metadata.name, deleted=False) for pod in ret.items]

else:
if object.selector is None:
return []
Expand Down Expand Up @@ -442,15 +457,24 @@ def _list_all_daemon_set(self) -> list[K8sObjectData]:
)

def _list_all_jobs(self) -> list[K8sObjectData]:
def filter_jobs(item):
# Skip jobs owned by CronJobs
if any(owner.kind == "CronJob" for owner in item.metadata.owner_references or []):
return False

# Skip jobs that have any of the grouping labels (they will be handled by GroupedJob)
if settings.job_grouping_labels and item.metadata.labels:
if any(label in item.metadata.labels for label in settings.job_grouping_labels):
return False

return True

return self._list_scannable_objects(
kind="Job",
all_namespaces_request=self.batch.list_job_for_all_namespaces,
namespaced_request=self.batch.list_namespaced_job,
extract_containers=lambda item: item.spec.template.spec.containers,
# NOTE: If the job has ownerReference and it is a CronJob, then we should skip it
filter_workflows=lambda item: not any(
owner.kind == "CronJob" for owner in item.metadata.owner_references or []
),
filter_workflows=filter_jobs,
)

def _list_all_cronjobs(self) -> list[K8sObjectData]:
Expand All @@ -461,6 +485,62 @@ def _list_all_cronjobs(self) -> list[K8sObjectData]:
extract_containers=lambda item: item.spec.job_template.spec.template.spec.containers,
)

async def _list_all_groupedjobs(self) -> list[K8sObjectData]:
"""List all GroupedJob objects by grouping jobs with the specified labels."""
if not settings.job_grouping_labels:
logger.debug("No job grouping labels configured, skipping GroupedJob listing")
return []

if not self._should_list_resource("GroupedJob"):
logger.debug("Skipping GroupedJob in cluster")
return []

logger.debug(f"Listing GroupedJobs with grouping labels: {settings.job_grouping_labels}")

# Get all jobs that have any of the grouping labels
all_jobs = await self._list_namespaced_or_global_objects(
kind="Job",
all_namespaces_request=self.batch.list_job_for_all_namespaces,
namespaced_request=self.batch.list_namespaced_job,
)

grouped_jobs = defaultdict(list)
for job in all_jobs:
if (job.metadata.labels and
not any(owner.kind == "CronJob" for owner in job.metadata.owner_references or [])):

for label_name in settings.job_grouping_labels:
if label_name in job.metadata.labels:
label_value = job.metadata.labels[label_name]
group_key = f"{label_name}={label_value}"
grouped_jobs[group_key].append(job)

result = []
for group_name, jobs in grouped_jobs.items():
jobs_by_namespace = defaultdict(list)
for job in jobs:
jobs_by_namespace[job.metadata.namespace].append(job)

for namespace, namespace_jobs in jobs_by_namespace.items():
template_job = namespace_jobs[0]
template_container = template_job.spec.template.spec.containers[0]

grouped_job = self.__build_scannable_object(
item=template_job,
container=template_container,
kind="GroupedJob"
)

grouped_job.name = group_name
grouped_job.namespace = namespace
grouped_job._api_resource._grouped_jobs = namespace_jobs
grouped_job._api_resource._label_filter = group_name

result.append(grouped_job)

logger.debug("Found %d GroupedJob groups", len(result))
return result

async def __list_hpa_v1(self) -> dict[HPAKey, HPAData]:
loop = asyncio.get_running_loop()
res = await loop.run_in_executor(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
from ..prometheus_utils import ClusterNotSpecifiedException, generate_prometheus_config
from .base_metric_service import MetricsService

PROM_REFRESH_CREDS_SEC = int(os.environ.get("PROM_REFRESH_CREDS_SEC", "600")) # 10 minutes

logger = logging.getLogger("krr")


Expand Down Expand Up @@ -104,31 +106,43 @@ def __init__(
headers |= {"Authorization": self.auth_header}
elif not settings.inside_cluster and self.api_client is not None:
self.api_client.update_params_for_auth(headers, {}, ["BearerToken"])
self.prom_config = generate_prometheus_config(url=self.url, headers=headers, metrics_service=self)
self.prometheus = get_custom_prometheus_connect(self.prom_config)
self.headers = headers
self.prom_config = None
self.prometheus = None
self.get_prometheus()

def get_prometheus(self):
now = datetime.utcnow()
if (not self.prometheus
or not self._last_init_at
or now - self._last_init_at >= timedelta(seconds=PROM_REFRESH_CREDS_SEC)):
self.prom_config = generate_prometheus_config(url=self.url, headers=self.headers, metrics_service=self) # type: ignore
self.prometheus = get_custom_prometheus_connect(self.prom_config)
self._last_init_at = now
return self.prometheus

def check_connection(self):
"""
Checks the connection to Prometheus.
Raises:
PrometheusNotFound: If the connection to Prometheus cannot be established.
"""
self.prometheus.check_prometheus_connection()
self.get_prometheus().check_prometheus_connection()

@retry(wait=wait_random(min=2, max=10), stop=stop_after_attempt(5))
async def query(self, query: str) -> dict:
loop = asyncio.get_running_loop()
return await loop.run_in_executor(
self.executor,
lambda: self.prometheus.safe_custom_query(query=query)["result"],
lambda: self.get_prometheus().safe_custom_query(query=query)["result"],
)

@retry(wait=wait_random(min=2, max=10), stop=stop_after_attempt(5))
async def query_range(self, query: str, start: datetime, end: datetime, step: timedelta) -> dict:
loop = asyncio.get_running_loop()
return await loop.run_in_executor(
self.executor,
lambda: self.prometheus.safe_custom_query_range(
lambda: self.get_prometheus().safe_custom_query_range(
query=query, start_time=start, end_time=end, step=f"{step.seconds}s"
)["result"],
)
Expand All @@ -155,7 +169,7 @@ def validate_cluster_name(self):

def get_cluster_names(self) -> Optional[List[str]]:
try:
return self.prometheus.get_label_values(label_name=settings.prometheus_label)
return self.get_prometheus().get_label_values(label_name=settings.prometheus_label)
except PrometheusApiClientException:
logger.error("Labels api not present on prometheus client")
return []
Expand Down Expand Up @@ -194,7 +208,7 @@ async def gather_data(
"""
logger.debug(f"Gathering {LoaderClass.__name__} metric for {object}")
try:
metric_loader = LoaderClass(self.prometheus, self.name(), self.executor)
metric_loader = LoaderClass(self.get_prometheus(), self.name(), self.executor)
data = await metric_loader.load_data(object, period, step)
except Exception:
logger.exception("Failed to gather resource history data for %s", object)
Expand Down Expand Up @@ -334,6 +348,13 @@ async def load_pods(self, object: K8sObjectData, period: timedelta) -> list[PodD
pod_owner_kind = "Job"

del jobs
elif object.kind == "GroupedJob":
if hasattr(object._api_resource, '_grouped_jobs'):
pod_owners = [job.metadata.name for job in object._api_resource._grouped_jobs]
pod_owner_kind = "Job"
else:
pod_owners = [object.name]
pod_owner_kind = object.kind
else:
pod_owners = [object.name]
pod_owner_kind = object.kind
Expand Down
12 changes: 12 additions & 0 deletions robusta_krr/core/models/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,9 @@ class Config(pd.BaseSettings):

# Threading settings
max_workers: int = pd.Field(6, ge=1)

# Job grouping settings
job_grouping_labels: Union[list[str], str, None] = pd.Field(None, description="Label name(s) to use for grouping jobs into GroupedJob workload type")

# Logging Settings
format: str
Expand Down Expand Up @@ -130,6 +133,15 @@ def validate_resources(cls, v: Union[list[str], Literal["*"]]) -> Union[list[str
# So this will preserve the big and small letters of the resource
return [next(r for r in KindLiteral.__args__ if r.lower() == val.lower()) for val in v]

@pd.validator("job_grouping_labels", pre=True)
def validate_job_grouping_labels(cls, v: Union[list[str], str, None]) -> Union[list[str], None]:
if v is None:
return None
if isinstance(v, str):
# Split comma-separated string into list
return [label.strip() for label in v.split(',')]
return v

def create_strategy(self) -> AnyStrategy:
StrategyType = AnyStrategy.find(self.strategy)
StrategySettingsType = StrategyType.get_settings_type()
Expand Down
2 changes: 1 addition & 1 deletion robusta_krr/core/models/objects.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
from robusta_krr.utils.batched import batched
from kubernetes.client.models import V1LabelSelector

KindLiteral = Literal["Deployment", "DaemonSet", "StatefulSet", "Job", "CronJob", "Rollout", "DeploymentConfig", "StrimziPodSet"]
KindLiteral = Literal["Deployment", "DaemonSet", "StatefulSet", "Job", "CronJob", "Rollout", "DeploymentConfig", "StrimziPodSet", "GroupedJob"]


class PodData(pd.BaseModel):
Expand Down
7 changes: 7 additions & 0 deletions robusta_krr/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -220,6 +220,12 @@ def run_strategy(
help="Max workers to use for async requests.",
rich_help_panel="Threading Settings",
),
job_grouping_labels: Optional[str] = typer.Option(
None,
"--job-grouping-labels",
help="Label name(s) to use for grouping jobs into GroupedJob workload type. Can be a single label or comma-separated labels (e.g., 'app,team').",
rich_help_panel="Job Grouping Settings",
),
format: str = typer.Option(
"table",
"--formatter",
Expand Down Expand Up @@ -357,6 +363,7 @@ def run_strategy(
coralogix_token=coralogix_token,
openshift=openshift,
max_workers=max_workers,
job_grouping_labels=job_grouping_labels,
format=format,
show_cluster_name=show_cluster_name,
verbose=verbose,
Expand Down