This repository contains the source code for a buildable container image of an autoscaler that horizontally autoscales Kafka consumers, running on Cloud Run, based on consumer lag and CPU utilization. The autoscaler reads metrics from your Kafka cluster, and uses manual scaling to scale a Kafka consumer workload based on the Kafka consumer lag metric.
Please send any questions or feedback related to this autoscaler to run-oss- [email protected].
This autoscaler can be used with consumers running on Cloud Run services or the new worker pools resource (in Public Preview).
The following diagram provides a visual overview of the system's architecture
Here’s how it works:
- Scheduled Trigger: A Cloud Scheduler job is configured to periodically trigger the autoscaling logic at a defined interval (e.g., every minute). For scaling checks more frequent than once per minute, the scheduler can be configured to push a task to a Cloud Tasks queue.
- Read Kafka Lag: The trigger invokes the Kafka Autoscaler Cloud Run service. The autoscaler connects directly to your Kafka cluster to read the consumer offset lag for your specified consumer group. It can also optionally fetch CPU utilization metrics for the consumer from Cloud Monitoring.
- Set instances: Based on the collected metrics and the user-defined scaling policies in the scaler_config.yaml secret, the autoscaler calculates the optimal number of consumer instances required to handle the current load. It then uses the Cloud Run Admin API's manual scaling feature to adjust the instance count of the target Kafka consumer workload (which can be a Cloud Run service or a worker pool).
A container image of Kafka Autoscaler can be built from this source code using Cloud Build.
Update the included cloudbuild.yaml to specify the output image name by
updating %ARTIFACT_REGISTRY_IMAGE%. Example:
us-central1-docker.pkg.dev/my-project/my-repo/my_kafka_autoscaler
gcloud builds submit --tag \
us-central1-docker.pkg.dev/my-project/my-repo/my_kafka_autoscalerThis will build the container image and push it to Artifact Registry. Record
the full Image Path (SCALER_IMAGE_PATH), since we'll need it for later
Note that the resulting image will not run locally. It is intended to be layered on top of a Java base image. See Automatic base image updates for more information including how to reassemble the container image to run locally.
You can set up the necessary Google Cloud components and deploy the Kafka Autoscaler service using one of two methods:
- Manual Setup: Using the
gcloudCLI commands and the provided shell script (setup_kafka_scaler.sh). This gives you fine-grained control over each step. - Automated Setup (Terraform): Using the provided Terraform module
(
terraform/) to automate the creation of all required resources.
Before deploying the autoscaler using either the manual or Terraform method, ensure the following prerequisites are met:
- A Kafka cluster must be running and accessible (e.g., via VPC or Managed Kafka).
- A Kafka Topic must be configured, with events that are being published to that topic.
- Your Kafka consumers should specify a Consumer Group ID when connecting to the cluster.
- A Kafka consumer workload must be deployed to Cloud Run (as a service or worker pool) and configured to connect to your Kafka cluster and topic/ consumer group.
- Identify the Service Account email used by this consumer workload
(
CONSUMER_SA_EMAIL). This service account needs permissions to interact with Kafka (e.g., read offsets). - (Best Practice) Connect your Kafka consumers to your VPC network using Direct VPC. This allows you to connect to your Kafka cluster using private IP addresses, and keep traffic on your VPC network.
- (Best Practice) Configure a liveness healthcheck for your Kafka consumers that checks the Consumer is pulling events. This ensures unhealthy instances automatically restart if they stop processing events, even if the container doesn't crash.
To get the permissions that you need to deploy and run this service, ask your administrator to grant you the following IAM roles:
- Cloud Run Developer
(
roles/run.developer) - Service Account User
(
roles/iam.serviceAccountUser) - Artifact Registry Reader
(
roles/artifactregistry.reader) to access the Scaler image - Cloud Scheduler Admin
(
roles/cloudscheduler.admin) to create the Cloud Scheduler job to trigger autoscaling checks - Cloud Tasks Queue Admin
(
roles/cloudtasks.queueAdmin) to create the Cloud Tasks queue for autoscaling checks - Security Admin
(
roles/iam.securityAdmin) to grant permissions to service accounts
The autoscaler requires two secrets containing configuration:
The Kafka autoscaler connects to the Kafka cluster using the configuration provided in a secret that will be mounted as a volume on the Kafka autoscaler service.
At a minimum, the bootstrap.servers property must be configured with the
bootstrap servers as a list of HOST:PORT.
bootstrap.servers=%BOOTSTRAP-SERVER-LIST%
This secret can contain any of the Kafka Admin
client properties.
For example, to connect to a Google Managed Service for Apache
Kafka
cluster using application default
credentials
with Google OAuth, this secret should include the following properties,
substituting the %BOOTSTRAP-SERVER-LIST% placeholder with the HOST:PORT
list:
bootstrap.servers=%BOOTSTRAP-SERVER-LIST%
security.protocol=SASL_SSL
sasl.mechanism=OAUTHBEARER
sasl.login.callback.handler.class=com.google.cloud.hosted.kafka.auth.GcpLoginCallbackHandler
sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required;
To create the secret volume, copy the configuration into a file named
kafka_auth_config.txt. From the same directory containing the file, run:
gcloud secrets create $ADMIN_CLIENT_SECRET --data-file=kafka_auth_config.txtThe Kafka autoscaler uses the configuration provided in this secret to specify the Kafka consumer to autoscale, as well as to adjust scaling behavior. Like the Kafka Admin client secret, this will be mounted as a volume on the Kafka autoscaler service.
Create a file scaler_config.yaml and copy the configuration below,
substituting the following placeholders:
%FULLY_QUALIFIED_CONSUMER_NAME%- The Kafka consumer workload to be autoscaled. (Format: projects/$PROJECT_ID/locations/$REGION/[workerpools|services]/$CONSUMER_SERVICE_NAME)%TARGET_CPU_UTILIZATION%- Target CPU utilization for autoscaling calculations (e.g. 60)%LAG_THRESHOLD%- Threshold for theconsumer_lagmetric to trigger (e.g. 1000) autoscaling
You may also change any of the settings under behavior, as desired, or you may
leave the defaults. This section supports many of the same properties as
Kubernetes HPA scaling
policies.
The following elements must be configured for the Kafka autoscaler to function:
spec:
scaleTargetRef:
name: %FULLY_QUALIFIED_CONSUMER_NAME%
metrics:
- type: Resource
resource:
name: cpu
target:
type: Utilization
averageUtilization: %TARGET_CPU_UTILIZATION%
- type: External
external:
metric:
name: consumer_lag
target:
type: AverageValue
averageValue: %LAG_THRESHOLD%Note: While you may choose to configure consumer_lag only, we recommend
starting with both consumer_lag and cpu, as just using consumer_lag can
have unexpected results.
In addition to the metric targets, you can optionally configure additional elements to adjust scaling behavior. If not configured, the default values are used.
%CPU_ACTIVATION_THRESHOLD%(default: 0) - Metric will be considered "inactive" when below this threshold. When all metrics are "inactive", target consumer will be scaled to zero.%CPU_TOLERANCE%(default: 0.1) - Prevent scaling changes if within specified range (as a percent of the configured TARGET_CPU_UTILIZATION)%CPU_METRIC_WINDOW%(default: 120) - Period, in seconds, over which the average CPU utilization is calculated%LAG_ACTIVATION_THRESHOLD%(default: 0) - Metric will be considered "inactive" when below this threshold. When all metrics are "inactive", target consumer will be scaled to zero.%LAG_TOLERANCE%(default: 0.1) - Prevent scaling changes if within specified range (as a percent of the configured LAG_THRESHOLD)
spec:
scaleTargetRef:
name: %FULLY_QUALIFIED_CONSUMER_NAME%
metrics:
- type: Resource
resource:
name: cpu
target:
type: Utilization
averageUtilization: %TARGET_CPU_UTILIZATION%
activationThreshold: `%CPU_ACTIVATION_THRESHOLD%`
tolerance: %CPU_TOLERANCE%`
windowSeconds: `%CPU_METRIC_WINDOW%`
- type: External
external:
metric:
name: consumer_lag
target:
type: AverageValue
averageValue: %LAG_THRESHOLD%
activationThreshold: `%LAG_ACTIVATION_THRESHOLD%`
tolerance: `%LAG_TOLERANCE%`If only the required elements are set, the default scaling policies, below,
are used. Alternatively, you can configure either, or both, scaleDown and/or
scaleUp policies. If only one of scaleDown or scaleUp is configured, the
other will use the default configuration.
behavior:
scaleDown:
stabilizationWindowSeconds: 300
policies:
- type: Percent
value: 50
periodSeconds: 30
selectPolicy: Min
scaleUp:
stabilizationWindowSeconds: 0
policies:
- type: Percent
value: 100
periodSeconds: 15
- type: Instances
value: 4
periodSeconds: 15
selectPolicy: Maxspec:
scaleTargetRef:
name: projects/PROJECT-ID/locations/us-central1/workerpools/kafka-consumer-worker
metrics:
- type: Resource
resource:
name: cpu
target:
type: Utilization
averageUtilization: 60
activationThreshold: 0
tolerance: 0.1
windowSeconds: 120
- type: External
external:
metric:
name: consumer_lag
target:
type: AverageValue
averageValue: 1000
activationThreshold: 0
tolerance: 0.1
behavior:
scaleDown:
stabilizationWindowSeconds: 300
policies:
- type: Percent
value: 50
periodSeconds: 30
selectPolicy: Min
scaleUp:
stabilizationWindowSeconds: 0
policies:
- type: Percent
value: 100
periodSeconds: 15
- type: Instances
value: 4
periodSeconds: 15
selectPolicy: MaxTo create the secret, from the same directory containing the
scaler_config.yaml file, run:
gcloud secrets create $SCALER_CONFIG_SECRET --data-file=scaler_config.yamlWith the prerequisites completed, as mentioned earlier, you can deploy the Kafka Autoscaler service and its supporting infrastructure using one of the following methods:
- Manual Deployment: Using
gcloudCLI commands and the provided shell script. - Automated Deployment (Terraform): Using the provided Terraform module.
Choose the method that best suits your workflow.
This approach uses the setup_kafka_scaler.sh script to create and configure the necessary resources.
Before running the script, ensure that you have set all the environment variables mentioned below
# Details for already-deployed Kafka consumer
export PROJECT_ID=<project-id>
export REGION=<region>
export CONSUMER_SERVICE_NAME=<deployed-kafka-consumer>
export CONSUMER_SA_EMAIL=<kafka-consumer-service-account-email i.e. [email protected]>
export CONSUMER_GROUP_ID=<kafka-consumer-group-id>
export NETWORK=<vpc-network>
export SUBNET=<vpc-subnet>
# Details for new items to be created during this setup
export SCALER_SERVICE_NAME=<kafka-autoscaler-service-name>
export SCALER_IMAGE_PATH=<kafka-autoscaler-image-URI>
export SCALER_CONFIG_SECRET=<kafka-autoscaler-config-secret-name>
export CYCLE_SECONDS=<scaler-check-frequency e.g. 15> # Note: this should be at
least 5 seconds
export CLOUD_TASKS_QUEUE_NAME=<cloud-tasks-queue-for-scaling-checks>
export TASKS_SERVICE_ACCOUNT=<tasks-service-account-name>
export OUTPUT_SCALER_METRICS=false # If you want scaling metrics to outputted
to Cloud Monitoring set this to true and ensure your scaler service account has
permission to write metrics (e.g. via roles/monitoring.metricWriter).By default, this autoscaler will scale on the combined lag across all topics that the specified consumer group is subscribed to. You can optionally specify a single topic id which will cause your scaler to scale solely on the lag of specified topic.
export TOPIC_ID=<kafka-topic-id>Execute the provided setup_kafka_scaler.sh script, as follows
./setup_kafka_scaler.shThe script performs these actions:
- Creates the Cloud Tasks queue used to trigger autoscaling checks
- Creates the Kafka autoscaler service account, and grants necessary permissions
- Configures and deploys the Kafka autoscaler
- Create the Cloud Scheduler job that periodically triggers autoscaling checks
When run, setup_kafka_scaler.sh will output the configured environment
variables. Please verify they are correct before continuing.
In order to change the instance count of the Kafka consumer, the Kafka autoscaler service account must have view permission on the deployed container image. For example, if the consumer image was deployed from Artifact Registry, please run:
gcloud projects add-iam-policy-binding $PROJECT_ID \
--member="serviceAccount:$SCALER_SA_NAME@$PROJECT_ID.iam.gserviceaccount.com" \
--role="roles/artifactregistry.reader" # Or appropriate role for your registryThe terraform/ directory contains a reusable Terraform module to provision
the Kafka Autoscaler and its associated resources.
This module automates the creation of:
- The Kafka Autoscaler Cloud Run service
- Supporting Service Accounts and IAM bindings
- Cloud Tasks queue
- Cloud Scheduler job
For detailed instructions, usage examples, and descriptions of all input/output
variables, please refer to terraform/README.md
Remember to provide the necessary variables to the Terraform module, including details from the prerequisites (project ID, region, consumer SA email, secret names, scaler image path, topic ID, etc.).
The Kafka autoscaler service's scaling is triggered with a request to the service URL.
POSTrequests will trigger the autoscaling calculation, output to logging, AND change the instance count based on the recommendation
In the logs of your Kafka autoscaler service, you should see messages like There are currently X consumers and Y total lag. Recommending Z consumers.
If the OUTPUT_SCALER_METRICS flag is enabled, you can also find scaler Cloud
Monitoring metrics under custom.googleapis.com/cloud-run-kafkascaler/.
This solution uses the following billable Google Cloud components:
- Cloud Run (Kafka consumer workload and the autoscaler service)
- Cloud Scheduler
- Cloud Tasks
- Secret Manager
The deployed Cloud Run resources are billed for the compute time used at the standard Cloud Run prices.
The autoscaler runs as a request-billed service with max-instances=1, so its
costs are minimal (typically less than $1 per month).
To stop all billing, you must delete the created resources. The cleanup.sh
script in the root of this repository can be used to delete the resources
created by the setup script. If you used Terraform to deploy the resources,
run terraform destroy to remove all associated infrastructure.
Note: See Kubernetes Horizontal Pod Autoscaling documentation for detailed explanation of each element.
behavior:
scaleDown:
stabilizationWindowSeconds: [INT]
policies:
- type: [Percent, Instances]
value: [INT]
periodSeconds: [INT]
selectPolicy: [Min, Max]
scaleUp:
stabilizationWindowSeconds: [INT]
policies:
- type: [Percent, Instances]
value: [INT]
periodSeconds: [INT]
selectPolicy: [Min, Max]scaleDown: Behavior when reducing instance count (scaling down)scaleUp: Behavior when increasing instance count (scaling up)stabilizationWindowSeconds: Uses the highest (scaleDown) or lowest (scaleUp) calculated instance count over a rolling period. Setting to 0 means the latest calculated value is used.selectPolicy: Which outcome to enforce when multiple policies are configured. Min: smallest change, Max: largest changePercent: Changes per-period are limited to the configured percent of total instancesInstances: Changes per-period are limited to the configured number of instancesperiodSeconds: Length of time over which the policy is enforced
If you're seeing java.lang.OutOfMemoryError on startup, verify that your
Kafka Admin Client Secret matches your broker's SSL configuration as the error
may be a red herring. See KAFKA-4493
for details.
Q: How do I decide whether to use services or worker pools for my Kafka Consumers?
A: Kafka Autoscaler can be used to scale your Kafka Consumers running as either Cloud Run services or worker pools, but we strongly recommend using worker pools. They are purpose-built for non-HTTP, pull-based workloads and are 40% cheaper than comparable instance-billed services. Also, with no HTTP endpoint, worker pools reduce the attack surface and simplify application code.
If you already have existing Kafka consumers running as Cloud Run services, you can still use the Kafka Autoscaler for queue-aware autoscaling with the same configuration.
