Skip to content

Commit 536aa2d

Browse files
authored
Merge branch 'apache:master' into master
2 parents dbd7818 + 1c5f0f3 commit 536aa2d

File tree

66 files changed

+5163
-838
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

66 files changed

+5163
-838
lines changed

.asf.yaml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,7 @@ github:
5151

5252
protected_branches:
5353
master: {}
54+
release-2.69.0-postrelease: {}
5455
release-2.69: {}
5556
release-2.68.0-postrelease: {}
5657
release-2.68: {}

.github/workflows/beam_PerformanceTests_xlang_KafkaIO_Python.yml

Lines changed: 75 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -83,20 +83,85 @@ jobs:
8383
- name: Install Kafka
8484
id: install_kafka
8585
run: |
86-
kubectl apply -k ${{ github.workspace }}/.test-infra/kafka/strimzi/02-kafka-persistent/overlays/gke-internal-load-balanced
87-
kubectl wait kafka beam-testing-cluster --for=condition=Ready --timeout=1800s
86+
echo "Deploying Kafka cluster using existing .test-infra/kubernetes/kafka-cluster configuration..."
87+
kubectl apply -R -f ${{ github.workspace }}/.test-infra/kubernetes/kafka-cluster/
88+
89+
# Wait for pods to be created and ready
90+
echo "Waiting for Kafka cluster to be ready..."
91+
sleep 180
92+
93+
# Check pod status
94+
echo "Checking pod status..."
95+
kubectl get pods -l app=kafka
96+
kubectl get pods -l app=zookeeper
97+
98+
# Wait for at least one Kafka pod to be ready
99+
echo "Waiting for Kafka pods to be ready..."
100+
kubectl wait --for=condition=ready pod -l app=kafka --timeout=300s || echo "Kafka pods not ready, continuing anyway"
101+
102+
# Wait for Zookeeper to be ready
103+
echo "Waiting for Zookeeper pods to be ready..."
104+
kubectl wait --for=condition=ready pod -l app=zookeeper --timeout=300s || echo "Zookeeper pods not ready, continuing anyway"
105+
88106
- name: Set up Kafka brokers
89107
id: set_brokers
90108
run: |
109+
echo "Setting up Kafka brokers for existing cluster configuration..."
91110
declare -a kafka_service_brokers
92111
declare -a kafka_service_brokers_ports
112+
93113
for INDEX in {0..2}; do
94-
kubectl wait svc/beam-testing-cluster-kafka-${INDEX} --for=jsonpath='{.status.loadBalancer.ingress[0].ip}' --timeout=1200s
95-
kafka_service_brokers[$INDEX]=$(kubectl get svc beam-testing-cluster-kafka-${INDEX} -o jsonpath='{.status.loadBalancer.ingress[0].ip}')
96-
kafka_service_brokers_ports[$INDEX]=$(kubectl get svc beam-testing-cluster-kafka-${INDEX} -o jsonpath='{.spec.ports[0].port}')
114+
echo "Setting up broker ${INDEX}..."
115+
116+
# Try to get LoadBalancer IP
117+
LB_IP=$(kubectl get svc outside-${INDEX} -o jsonpath='{.status.loadBalancer.ingress[0].ip}' 2>/dev/null || echo "")
118+
119+
if [ -n "$LB_IP" ] && [ "$LB_IP" != "null" ]; then
120+
echo "Using LoadBalancer IP: $LB_IP"
121+
kafka_service_brokers[$INDEX]=$LB_IP
122+
else
123+
echo "LoadBalancer IP not available, using NodePort approach..."
124+
# Get the first node's internal IP
125+
NODE_IP=$(kubectl get nodes -o jsonpath='{.items[0].status.addresses[?(@.type=="InternalIP")].address}')
126+
kafka_service_brokers[$INDEX]=$NODE_IP
127+
fi
128+
129+
# Get the port
130+
PORT=$(kubectl get svc outside-${INDEX} -o jsonpath='{.spec.ports[0].port}' 2>/dev/null || echo "9094")
131+
kafka_service_brokers_ports[$INDEX]=$PORT
132+
97133
echo "KAFKA_SERVICE_BROKER_${INDEX}=${kafka_service_brokers[$INDEX]}" >> $GITHUB_OUTPUT
98134
echo "KAFKA_SERVICE_BROKER_PORTS_${INDEX}=${kafka_service_brokers_ports[$INDEX]}" >> $GITHUB_OUTPUT
135+
136+
echo "Broker ${INDEX}: ${kafka_service_brokers[$INDEX]}:${kafka_service_brokers_ports[$INDEX]}"
99137
done
138+
139+
- name: Create Kafka topic
140+
id: create_topic
141+
run: |
142+
echo "Creating Kafka topic 'beam'..."
143+
144+
# Get the first available Kafka pod
145+
KAFKA_POD=$(kubectl get pods -l app=kafka -o jsonpath='{.items[0].metadata.name}' 2>/dev/null || echo "")
146+
147+
if [ -z "$KAFKA_POD" ]; then
148+
echo "No Kafka pods found, skipping topic creation"
149+
exit 0
150+
fi
151+
152+
echo "Using Kafka pod: $KAFKA_POD"
153+
154+
# Wait a bit more for the pod to be fully operational
155+
echo "Waiting for pod to be fully operational..."
156+
sleep 60
157+
158+
# Create the topic using the correct container and path
159+
echo "Creating topic 'beam'..."
160+
kubectl exec $KAFKA_POD -c broker -- /opt/kafka/bin/kafka-topics.sh --create --topic beam --zookeeper zookeeper:2181 --partitions 1 --replication-factor 1 || echo "Topic may already exist"
161+
162+
# Verify topic was created
163+
echo "Verifying topic creation..."
164+
kubectl exec $KAFKA_POD -c broker -- /opt/kafka/bin/kafka-topics.sh --list --zookeeper zookeeper:2181 || echo "Could not list topics"
100165
- name: Prepare test arguments
101166
uses: ./.github/actions/test-arguments-action
102167
with:
@@ -105,8 +170,11 @@ jobs:
105170
argument-file-paths: |
106171
${{ github.workspace }}/.github/workflows/performance-tests-pipeline-options/xlang_KafkaIO_Python.txt
107172
arguments: |
108-
--filename_prefix=gs://temp-storage-for-perf-tests/${{ matrix.job_name }}/${{github.run_id}}/
173+
--test_class=KafkaIOPerfTest
174+
--kafka_topic=beam
109175
--bootstrap_servers=${{ steps.set_brokers.outputs.KAFKA_SERVICE_BROKER_0 }}:${{ steps.set_brokers.outputs.KAFKA_SERVICE_BROKER_PORTS_0 }},${{ steps.set_brokers.outputs.KAFKA_SERVICE_BROKER_1 }}:${{ steps.set_brokers.outputs.KAFKA_SERVICE_BROKER_PORTS_1 }},${{ steps.set_brokers.outputs.KAFKA_SERVICE_BROKER_2 }}:${{ steps.set_brokers.outputs.KAFKA_SERVICE_BROKER_PORTS_2 }}
176+
--read_timeout=3000
177+
--filename_prefix=gs://temp-storage-for-perf-tests/${{ matrix.job_name }}/${{github.run_id}}/
110178
- name: run shadowJar
111179
uses: ./.github/actions/gradle-command-self-hosted-action
112180
with:
@@ -120,4 +188,4 @@ jobs:
120188
-Prunner=DataflowRunner \
121189
-PloadTest.mainClass=apache_beam.io.external.xlang_kafkaio_perf_test \
122190
-PpythonVersion=3.9 \
123-
'-PloadTest.args=${{ env.beam_PerfTests_xlang_KafkaIO_Python_test_arguments_1 }}'
191+
'-PloadTest.args=${{ env.beam_PerfTests_xlang_KafkaIO_Python_test_arguments_1 }}'

.github/workflows/beam_StressTests_Java_KafkaIO.yml

Lines changed: 71 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -80,21 +80,86 @@ jobs:
8080
- name: Install Kafka
8181
id: install_kafka
8282
run: |
83-
kubectl apply -k ${{ github.workspace }}/.test-infra/kafka/strimzi/02-kafka-persistent/overlays/gke-internal-load-balanced
84-
kubectl wait kafka beam-testing-cluster --for=condition=Ready --timeout=1800s
83+
echo "Deploying Kafka cluster using existing .test-infra/kubernetes/kafka-cluster configuration..."
84+
kubectl apply -R -f ${{ github.workspace }}/.test-infra/kubernetes/kafka-cluster/
85+
86+
# Wait for pods to be created and ready
87+
echo "Waiting for Kafka cluster to be ready..."
88+
sleep 180
89+
90+
# Check pod status
91+
echo "Checking pod status..."
92+
kubectl get pods -l app=kafka
93+
kubectl get pods -l app=zookeeper
94+
95+
# Wait for at least one Kafka pod to be ready
96+
echo "Waiting for Kafka pods to be ready..."
97+
kubectl wait --for=condition=ready pod -l app=kafka --timeout=300s || echo "Kafka pods not ready, continuing anyway"
98+
99+
# Wait for Zookeeper to be ready
100+
echo "Waiting for Zookeeper pods to be ready..."
101+
kubectl wait --for=condition=ready pod -l app=zookeeper --timeout=300s || echo "Zookeeper pods not ready, continuing anyway"
102+
85103
- name: Set up Kafka brokers
86104
id: set_brokers
87105
run: |
106+
echo "Setting up Kafka brokers for existing cluster configuration..."
88107
declare -a kafka_service_brokers
89108
declare -a kafka_service_brokers_ports
109+
90110
for INDEX in {0..2}; do
91-
kubectl wait svc/beam-testing-cluster-kafka-${INDEX} --for=jsonpath='{.status.loadBalancer.ingress[0].ip}' --timeout=1200s
92-
kafka_service_brokers[$INDEX]=$(kubectl get svc beam-testing-cluster-kafka-${INDEX} -o jsonpath='{.status.loadBalancer.ingress[0].ip}')
93-
kafka_service_brokers_ports[$INDEX]=$(kubectl get svc beam-testing-cluster-kafka-${INDEX} -o jsonpath='{.spec.ports[0].port}')
111+
echo "Setting up broker ${INDEX}..."
112+
113+
# Try to get LoadBalancer IP
114+
LB_IP=$(kubectl get svc outside-${INDEX} -o jsonpath='{.status.loadBalancer.ingress[0].ip}' 2>/dev/null || echo "")
115+
116+
if [ -n "$LB_IP" ] && [ "$LB_IP" != "null" ]; then
117+
echo "Using LoadBalancer IP: $LB_IP"
118+
kafka_service_brokers[$INDEX]=$LB_IP
119+
else
120+
echo "LoadBalancer IP not available, using NodePort approach..."
121+
# Get the first node's internal IP
122+
NODE_IP=$(kubectl get nodes -o jsonpath='{.items[0].status.addresses[?(@.type=="InternalIP")].address}')
123+
kafka_service_brokers[$INDEX]=$NODE_IP
124+
fi
125+
126+
# Get the port
127+
PORT=$(kubectl get svc outside-${INDEX} -o jsonpath='{.spec.ports[0].port}' 2>/dev/null || echo "9094")
128+
kafka_service_brokers_ports[$INDEX]=$PORT
129+
94130
echo "KAFKA_SERVICE_BROKER_${INDEX}=${kafka_service_brokers[$INDEX]}" >> $GITHUB_OUTPUT
95131
echo "KAFKA_SERVICE_BROKER_PORTS_${INDEX}=${kafka_service_brokers_ports[$INDEX]}" >> $GITHUB_OUTPUT
132+
133+
echo "Broker ${INDEX}: ${kafka_service_brokers[$INDEX]}:${kafka_service_brokers_ports[$INDEX]}"
96134
done
135+
136+
- name: Create Kafka topic
137+
id: create_topic
138+
run: |
139+
echo "Creating Kafka topic 'beam'..."
140+
141+
# Get the first available Kafka pod
142+
KAFKA_POD=$(kubectl get pods -l app=kafka -o jsonpath='{.items[0].metadata.name}' 2>/dev/null || echo "")
143+
144+
if [ -z "$KAFKA_POD" ]; then
145+
echo "No Kafka pods found, skipping topic creation"
146+
exit 0
147+
fi
148+
149+
echo "Using Kafka pod: $KAFKA_POD"
150+
151+
# Wait a bit more for the pod to be fully operational
152+
echo "Waiting for pod to be fully operational..."
153+
sleep 60
154+
155+
# Create the topic using the correct container and path
156+
echo "Creating topic 'beam'..."
157+
kubectl exec $KAFKA_POD -c broker -- /opt/kafka/bin/kafka-topics.sh --create --topic beam --zookeeper zookeeper:2181 --partitions 1 --replication-factor 1 || echo "Topic may already exist"
158+
159+
# Verify topic was created
160+
echo "Verifying topic creation..."
161+
kubectl exec $KAFKA_POD -c broker -- /opt/kafka/bin/kafka-topics.sh --list --zookeeper zookeeper:2181 || echo "Could not list topics"
97162
- name: run Kafka StressTest Large
98163
uses: ./.github/actions/gradle-command-self-hosted-action
99164
with:
100-
gradle-command: :it:kafka:KafkaStressTestLarge --info -DbootstrapServers="${{ steps.set_brokers.outputs.KAFKA_SERVICE_BROKER_0 }}:${{ steps.set_brokers.outputs.KAFKA_SERVICE_BROKER_PORTS_0 }},${{ steps.set_brokers.outputs.KAFKA_SERVICE_BROKER_1 }}:${{ steps.set_brokers.outputs.KAFKA_SERVICE_BROKER_PORTS_1 }},${{ steps.set_brokers.outputs.KAFKA_SERVICE_BROKER_2 }}:${{ steps.set_brokers.outputs.KAFKA_SERVICE_BROKER_PORTS_2 }}" -DinfluxHost="http://10.128.0.96:8086" -DinfluxDatabase="beam_test_metrics" -DinfluxMeasurement="java_stress_test_kafka"
165+
gradle-command: :it:kafka:KafkaStressTestLarge --info -DbootstrapServers="${{ steps.set_brokers.outputs.KAFKA_SERVICE_BROKER_0 }}:${{ steps.set_brokers.outputs.KAFKA_SERVICE_BROKER_PORTS_0 }},${{ steps.set_brokers.outputs.KAFKA_SERVICE_BROKER_1 }}:${{ steps.set_brokers.outputs.KAFKA_SERVICE_BROKER_PORTS_1 }},${{ steps.set_brokers.outputs.KAFKA_SERVICE_BROKER_2 }}:${{ steps.set_brokers.outputs.KAFKA_SERVICE_BROKER_PORTS_2 }}" -DinfluxHost="http://10.128.0.96:8086" -DinfluxDatabase="beam_test_metrics" -DinfluxMeasurement="java_stress_test_kafka"

.test-infra/kubernetes/kafka-cluster/03-zookeeper/50pzoo.yml

Lines changed: 63 additions & 58 deletions
Original file line numberDiff line numberDiff line change
@@ -36,65 +36,70 @@ spec:
3636
spec:
3737
terminationGracePeriodSeconds: 10
3838
initContainers:
39-
- name: init-config
40-
image: solsson/kafka-initutils@sha256:2cdb90ea514194d541c7b869ac15d2d530ca64889f56e270161fe4e5c3d076ea
41-
command: ['/bin/bash', '/etc/kafka-configmap/init.sh']
42-
volumeMounts:
43-
- name: configmap
44-
mountPath: /etc/kafka-configmap
45-
- name: config
46-
mountPath: /etc/kafka
47-
- name: data
48-
mountPath: /var/lib/zookeeper
39+
- name: init-config
40+
image: solsson/kafka-initutils@sha256:2cdb90ea514194d541c7b869ac15d2d530ca64889f56e270161fe4e5c3d076ea
41+
command: ['/bin/bash', '/etc/kafka-configmap/init.sh']
42+
volumeMounts:
43+
- name: configmap
44+
mountPath: /etc/kafka-configmap
45+
- name: config
46+
mountPath: /etc/kafka
47+
- name: data
48+
mountPath: /var/lib/zookeeper
4949
containers:
50-
- name: zookeeper
51-
image: solsson/kafka:2.1.1@sha256:8bc8242c649c395ab79d76cc83b1052e63b4efea7f83547bf11eb3ef5ea6f8e1
52-
env:
53-
- name: KAFKA_LOG4J_OPTS
54-
value: -Dlog4j.configuration=file:/etc/kafka/log4j.properties
55-
command:
56-
- ./bin/zookeeper-server-start.sh
57-
- /etc/kafka/zookeeper.properties
58-
lifecycle:
59-
preStop:
50+
- name: zookeeper
51+
image: solsson/kafka:2.1.1@sha256:8bc8242c649c395ab79d76cc83b1052e63b4efea7f83547bf11eb3ef5ea6f8e1
52+
env:
53+
- name: KAFKA_LOG4J_OPTS
54+
value: -Dlog4j.configuration=file:/etc/kafka/log4j.properties
55+
command:
56+
- ./bin/zookeeper-server-start.sh
57+
- /etc/kafka/zookeeper.properties
58+
lifecycle:
59+
preStop:
60+
exec:
61+
command: ["sh", "-ce", "kill -s TERM 1; while $(kill -0 1 2>/dev/null); do sleep 1; done"]
62+
ports:
63+
- containerPort: 2181
64+
name: client
65+
- containerPort: 2888
66+
name: peer
67+
- containerPort: 3888
68+
name: leader-election
69+
resources:
70+
requests:
71+
cpu: 50m
72+
memory: 128Mi
73+
limits:
74+
cpu: 500m
75+
memory: 512Mi
76+
readinessProbe:
6077
exec:
61-
command: ["sh", "-ce", "kill -s TERM 1; while $(kill -0 1 2>/dev/null); do sleep 1; done"]
62-
ports:
63-
- containerPort: 2181
64-
name: client
65-
- containerPort: 2888
66-
name: peer
67-
- containerPort: 3888
68-
name: leader-election
69-
resources:
70-
requests:
71-
cpu: 10m
72-
memory: 100Mi
73-
limits:
74-
memory: 120Mi
75-
readinessProbe:
76-
exec:
77-
command:
78-
- /bin/sh
79-
- -c
80-
- '[ "imok" = "$(echo ruok | nc -w 1 -q 1 127.0.0.1 2181)" ]'
81-
volumeMounts:
82-
- name: config
83-
mountPath: /etc/kafka
84-
- name: data
85-
mountPath: /var/lib/zookeeper
78+
command:
79+
- /bin/sh
80+
- -c
81+
- '[ "imok" = "$(echo ruok | nc -w 1 -q 1 127.0.0.1 2181)" ]'
82+
initialDelaySeconds: 60
83+
periodSeconds: 10
84+
timeoutSeconds: 5
85+
failureThreshold: 3
86+
volumeMounts:
87+
- name: config
88+
mountPath: /etc/kafka
89+
- name: data
90+
mountPath: /var/lib/zookeeper
8691
volumes:
87-
- name: configmap
88-
configMap:
89-
name: zookeeper-config
90-
- name: config
91-
emptyDir: {}
92+
- name: configmap
93+
configMap:
94+
name: zookeeper-config
95+
- name: config
96+
emptyDir: {}
9297
volumeClaimTemplates:
93-
- metadata:
94-
name: data
95-
spec:
96-
accessModes: [ "ReadWriteOnce" ]
97-
storageClassName: kafka-zookeeper
98-
resources:
99-
requests:
100-
storage: 1Gi
98+
- metadata:
99+
name: data
100+
spec:
101+
accessModes: [ "ReadWriteOnce" ]
102+
storageClassName: kafka-zookeeper
103+
resources:
104+
requests:
105+
storage: 1Gi

0 commit comments

Comments
 (0)