Skip to content

Commit

Permalink
chore: add pulsar cluster helm tests (apecloud#4353)
Browse files Browse the repository at this point in the history
  • Loading branch information
nashtsai authored Jul 19, 2023
1 parent 0cab392 commit 819a7b6
Show file tree
Hide file tree
Showing 6 changed files with 167 additions and 13 deletions.
2 changes: 2 additions & 0 deletions apis/apps/v1alpha1/clusterdefinition_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -536,6 +536,8 @@ type ServiceSpec struct {
// +listMapKey=protocol
// +optional
Ports []ServicePort `json:"ports,omitempty" patchStrategy:"merge" patchMergeKey:"port" protobuf:"bytes,1,rep,name=ports"`

// NOTES: name also need to be key
}

func (r *ServiceSpec) toSVCPorts() []corev1.ServicePort {
Expand Down
8 changes: 8 additions & 0 deletions deploy/pulsar-cluster/templates/_helpers.tpl
Original file line number Diff line number Diff line change
Expand Up @@ -60,3 +60,11 @@ Create the name of the service account to use
{{- define "pulsar-cluster.serviceAccountName" -}}
{{- default (printf "kb-%s" (include "clustername" .)) .Values.serviceAccount.name }}
{{- end }}


{{/*
Pulsar broker FQDN
*/}}
{{- define "pulsar-cluster.brokerFQDN" -}}
{{ include "kblib.clusterName" . }}-broker.{{ .Release.Namespace }}.svc{{ .Values.clusterDomain }}
{{- end }}
134 changes: 134 additions & 0 deletions deploy/pulsar-cluster/templates/tests/benchmark.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
{{- if .Values.tests.benchmark.kafka2Kafka }}

apiVersion: v1
kind: ConfigMap
metadata:
name: {{ include "kblib.clusterName" . }}-omb-driver-cfg
labels: {{ include "kblib.clusterLabels" . | nindent 4 }}
annotations:
"helm.sh/hook": test
data:
kafka_to_pulsar.yaml: |-
name: Kafka producer and Pulsar consumer
driverClass: io.openmessaging.benchmark.driver.kop.KopBenchmarkDriver
producerType: kafka
consumerType: pulsar
# Pulsar configs
pulsarConfig:
serviceUrl: pulsar://{{ include "pulsar-cluster.brokerFQDN" . }}:6650
batchingMaxPublishDelayMs: 1
batchingMaxBytes: 1048576
# Kafka configs
kafkaConfig: |
bootstrap.servers={{ include "pulsar-cluster.brokerFQDN" . }}:9092
linger.ms=1
batch.size=1048576
kafka_to_kafka.yaml: |-
name: Kafka producer and Pulsar consumer
driverClass: io.openmessaging.benchmark.driver.kop.KopBenchmarkDriver
producerType: kafka
consumerType: kafka
# Pulsar configs
pulsarConfig:
serviceUrl: pulsar://{{ include "pulsar-cluster.brokerFQDN" . }}:6650
# producer configs
batchingEnabled: true
batchingMaxPublishDelayMs: 1
batchingMaxBytes: 1048576
blockIfQueueFull: true
pendingQueueSize: 1000
maxPendingMessagesAcrossPartitions: 50000
# consumer configs
maxTotalReceiverQueueSizeAcrossPartitions: 50000
receiverQueueSize: 1000
# Kafka configs
kafkaConfig: |
bootstrap.servers={{ include "pulsar-cluster.brokerFQDN" . }}:9092
linger.ms=1
batch.size=1048576
pulsar_to_kafka.yaml: |-
name: Pulsar producer and Kafka consumer
driverClass: io.openmessaging.benchmark.driver.kop.KopBenchmarkDriver
producerType: pulsar
consumerType: kafka
# Pulsar configs
pulsarConfig:
serviceUrl: pulsar://{{ include "pulsar-cluster.brokerFQDN" . }}:6650
batchingMaxPublishDelayMs: 1
batchingMaxBytes: 1048576
# Kafka configs
kafkaConfig: |
bootstrap.servers={{ include "pulsar-cluster.brokerFQDN" . }}:9092
linger.ms=1
batch.size=1048576
---
apiVersion: v1
kind: Pod
metadata:
name: {{ include "kblib.clusterName" . }}-omb-k2k
labels: {{ include "kblib.clusterLabels" . | nindent 4 }}
annotations:
"helm.sh/hook": test
spec:
containers:
- name: omb-k2k
image: docker.io/apecloud/omb:0.1
command: ['bin/benchmark']
args:
- --drivers
- driver-kop/kafka_to_kafka.yaml
- workloads/1-topic-16-partitions-1kb.yaml
env:
- name: BOOTSTRAP_SERVERS
value: "{{ include "pulsar-cluster.brokerFQDN" . }}:9092"
volumeMounts:
- name: out
mountPath: /out
- name: driver-cfg-k2k
mountPath: /benchmark/driver-kop/kafka_to_kafka.yaml
subPath: kafka_to_kafka.yaml
readOnly: true
- name: driver-cfg-k2p
mountPath: /benchmark/driver-kop/kafka_to_pulsar.yaml
subPath: kafka_to_pulsar.yaml
readOnly: true
- name: driver-cfg-p2k
mountPath: /benchmark/driver-kop/pulsar_to_kafka.yaml
subPath: pulsar_to_kafka.yaml
readOnly: true
restartPolicy: Never
volumes:
- name: out
emptyDir: {}
- name: driver-cfg-k2k
configMap:
name: {{ include "kblib.clusterName" . }}-omb-driver-cfg
items:
- key: kafka_to_kafka.yaml
path: kafka_to_kafka.yaml
- name: driver-cfg-k2p
configMap:
name: {{ include "kblib.clusterName" . }}-omb-driver-cfg
items:
- key: kafka_to_pulsar.yaml
path: kafka_to_pulsar.yaml
- name: driver-cfg-p2k
configMap:
name: {{ include "kblib.clusterName" . }}-omb-driver-cfg
items:
- key: pulsar_to_kafka.yaml
path: pulsar_to_kafka.yaml
{{- end }}
10 changes: 10 additions & 0 deletions deploy/pulsar-cluster/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -157,3 +157,13 @@ fullnameOverride: ""
# The RBAC permission used by cluster component pod, now include event.create
serviceAccount:
name: ""

# Cluster domain
#
clusterDomain: ".cluster.local"


# test suites
tests:
benchmark:
kafka2Kafka: true
14 changes: 1 addition & 13 deletions deploy/pulsar/config/broker-env.tpl
Original file line number Diff line number Diff line change
Expand Up @@ -47,22 +47,10 @@ PULSAR_PREFIX_subscriptionKeySharedEnable: "true"
PULSAR_PREFIX_messagingProtocols: kafka
#PULSAR_PREFIX_protocolHandlerDirectory: ./protocols
#PULSAR_PREFIX_narExtractionDirectory: /tmp/pulsar-nar
PULSAR_PREFIX_allowAutoTopicCreationType: partitioned
PULSAR_PREFIX_kafkaListeners: PLAINTEXT://0.0.0.0:9092
#PULSAR_PREFIX_kafkaListeners: kafka_external://0.0.0.0:9092
#PULSAR_PREFIX_kafkaProtocolMap: kafka_external:PLAINTEXT
{{- $clusterName := $.cluster.metadata.name }}
{{- $namespace := $.cluster.metadata.namespace }}
{{- $pulsar_broker_component := fromJson "{}" }}
{{- range $i, $e := $.cluster.spec.componentSpecs }}
{{- if eq $e.componentDefRef "pulsar-broker" }}
{{- $pulsar_broker_component = $e }}
{{- end }}
{{- end }}
{{- $brokerSvcFDQN := printf "%s-%s.%s.svc" $clusterName $pulsar_broker_component.name $namespace }}
#PULSAR_PREFIX_kafkaAdvertisedListeners: kafka_external://{{ $brokerSvcFDQN }}:9092

# Set offset management as below, since offset management for KoP depends on Pulsar "Broker Entry Metadata".
# Set offset management as below, since offset management for KoP depeocalnds on Pulsar "Broker Entry Metadata".
# It’s required for KoP 2.8.0 or higher version.
PULSAR_PREFIX_brokerEntryMetadataInterceptors: org.apache.pulsar.common.intercept.AppendIndexMetadataInterceptor
# Disable the deletion of inactive topics. It’s not required but very important in KoP. Currently,
Expand Down
12 changes: 12 additions & 0 deletions deploy/pulsar/templates/clusterdefinition.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,9 @@ spec:
- name: http
port: 80
targetPort: http
- name: http-alt
port: 8080
targetPort: http
- name: kafka-client
port: 9092
targetPort: kafka-client
Expand Down Expand Up @@ -148,6 +151,15 @@ spec:
- name: PULSAR_PREFIX_advertisedListeners
value: cluster:pulsar://$(POD_NAME).$(KB_CLUSTER_COMP_NAME)-headless.$(KB_NAMESPACE).svc{{ .Values.clusterDomain }}:6650
- name: PULSAR_PREFIX_bindAddresses

# KoP configs
- name: PULSAR_PREFIX_allowAutoTopicCreationType
value: partitioned
- name: PULSAR_PREFIX_kafkaListeners
value: PLAINTEXT://0.0.0.0:9092
- name: PULSAR_PREFIX_kafkaAdvertisedListeners
value: PLAINTEXT://$(POD_NAME).$(KB_CLUSTER_COMP_NAME)-headless.$(KB_NAMESPACE).svc{{ .Values.clusterDomain }}:9092

- name: brokerServiceUrl
value: pulsar://$(KB_CLUSTER_COMP_NAME).$(KB_NAMESPACE).svc{{ .Values.clusterDomain }}:6650
- name: clusterName
Expand Down

0 comments on commit 819a7b6

Please sign in to comment.