k8s kubernetes kafka zookeeper

kafka cluster on kubernetes

Bob Lien 2021/09/16 09:50:52
1291

本文中的zookeeper和kafka皆採用自製的image

創建kafka和zookeeper cluster

kafka version: 2.8.0 (scala: 2.13)

zookeeper version: 3.7.0

環境

說明 OS IP domain
master node CentOS 8 192.168.218.44 k8s-01-master-node-01
worker node CentOS 8 192.168.218.47 k8s-01-worker-node-01
worker node CentOS 8 192.168.218.45 k8s-01-worker-node-02
worker node CentOS 8 192.168.218.46 k8s-01-worker-node-03
nfs file server CentOS 8 192.168.218.53  
harbor(測試用) CentOS 8 192.168.218.48  


zookeeper

• step 1. 建立zookeeper image

start-zookeeper.sh

${HOSTNAME##*-}: 取得statefulset hostname上的order當作zookeeper id

echo $ZOOKEEPER_SERVER_ID > $ZOOKEEPER_DATA_DIRS/myid: 將取得的zookeeper id 放入myid文件中

sh $ZOOKEEPER_HOME/bin/zkServer.sh start-foreground: 啟動zookeeper

#!/bin/sh

export ZOOKEEPER_SERVER_ID=${HOSTNAME##*-}
export ZOOKEEPER_DATA_DIRS="$ZOOKEEPER_HOME/servers/zookeeper-$ZOOKEEPER_SERVER_ID/data"
export ZOOKEEPER_LOG_DIRS="$ZOOKEEPER_HOME/servers/zookeeper-$ZOOKEEPER_SERVER_ID/data/log"
echo "ZOOKEEPER_SERVER_ID =  $ZOOKEEPER_SERVER_ID"
echo "ZOOKEEPER_DATA_DIRS =  $ZOOKEEPER_DATA_DIRS"
echo "ZOOKEEPER_LOG_DIRS =  $ZOOKEEPER_LOG_DIRS"

cp /config/* $ZOOKEEPER_HOME/conf/

mkdir -p "$ZOOKEEPER_DATA_DIRS"
mkdir -p "$ZOOKEEPER_LOG_DIRS"

echo dataDir=$ZOOKEEPER_DATA_DIRS >> $ZOOKEEPER_HOME/conf/zoo.cfg
echo dataLogDir=$ZOOKEEPER_LOG_DIRS >> $ZOOKEEPER_HOME/conf/zoo.cfg

echo $ZOOKEEPER_SERVER_ID > $ZOOKEEPER_DATA_DIRS/myid

sh $ZOOKEEPER_HOME/bin/zkServer.sh start-foreground

Dockerfile

FROM centos:centos8

ARG zookeeper_version=3.7.0

ENV ZOOKEEPER_VERSION=$zookeeper_version \
    ZOOKEEPER_HOME=/opt/zookeeper
ENV PATH=${PATH}:${ZOOKEEPER_HOME}/bin/

ADD start-zookeeper.sh zookeeper-ready.sh  /tmp/

RUN yum install -y wget java-1.8.0-openjdk nc
RUN wget -O /tmp/apache-zookeeper-${ZOOKEEPER_VERSION}-bin.tar.gz https://downloads.apache.org/zookeeper/zookeeper-${ZOOKEEPER_VERSION}/apache-zookeeper-${ZOOKEEPER_VERSION}-bin.tar.gz

RUN tar -zxvf /tmp/apache-zookeeper-${ZOOKEEPER_VERSION}-bin.tar.gz -C /tmp/ \
    && mv /tmp/apache-zookeeper-${ZOOKEEPER_VERSION}-bin /opt/zookeeper \
    && chmod a+x /tmp/*.sh \
    && mv /tmp/*.sh /usr/bin

ENTRYPOINT ["start-zookeeper.sh"]

• step 2. 建立zookeeper headless service

zookeeper-service-headless.yaml

statefulset 皆需建立headless service

apiVersion: v1
kind: Service
metadata:
  name: zookeeper-service-headless
  labels:
    app: zookeeper-service-headless
spec:
  clusterIP: None
  selector:
    app: zookeeper
  ports:
  - port: 9092
    name: server

• step 3. 建立zookeeper pvc

zookeeper-cluster-pvc.yaml

apiVersion: v1
kind: PersistentVolumeClaim
metadata:
  name: zookeeper-cluster-pvc
  namespace: default
spec:
  accessModes: [ "ReadWriteMany" ]
  storageClassName: "zookeeper-storage-class"
  resources:
    requests:
      storage: 15Gi

• step 4. 建立zookeeper pv

zookeeper-pv.yaml

pv 採用nfs

apiVersion: v1
kind: PersistentVolume
metadata:
  name: zookeeper-pv
spec:
  storageClassName: zookeeper-storage-class
  capacity:
    storage: 15Gi
  accessModes:
    - ReadWriteMany
  nfs:
    path: "/var/nfsshare/zookeeper"
    server: 192.168.218.53

• step 5. 建立zookeeper configmap

zookeeper-config-map.yaml

zookeeper參數放入configmap中

apiVersion: v1
kind: ConfigMap
metadata:
  name: zookeeper-config-map
  namespace: default
data:
  zoo.cfg: |
    clientPort=2181
    tickTime=2000
    initLimit=10
    syncLimit=5
    maxClientCnxns=60
    minSessionTimeout=4000
    maxSessionTimeout=40000
    autopurge.snapRetainCount=3
    autopurge.purgeInteval=12
    reconfigEnabled=true
    standaloneEnabled=false
    4lw.commands.whitelist=ruok
    server.0=zookeeper-cluster-0.zookeeper-service-headless.default.svc.cluster.local:2888:3888
    server.1=zookeeper-cluster-1.zookeeper-service-headless.default.svc.cluster.local:2888:3888
    server.2=zookeeper-cluster-2.zookeeper-service-headless.default.svc.cluster.local:2888:3888

• step 6. 建立zookeeper configmap

zookeeper-cluster-statefulset.yaml

zookeeper-cluster-pvc: 掛載zookeeper資料

zookeeper-config-map: 掛載zookeeper設定檔

apiVersion: apps/v1
kind: StatefulSet
metadata:
  name: zookeeper-cluster
spec:
  serviceName: zookeeper-service-headless
  replicas: 3
  selector:
    matchLabels:
      app: zookeeper
  template:
    metadata:
      labels:
        app: zookeeper
    spec:
      containers:
      - name: zookeeper
        image: 192.168.218.48:5000/test/zookeeper-custom-image:0.0.30
        ports:
        - containerPort: 9092
          name: server
        volumeMounts:
        - name: zookeeper-cluster-pvc
          mountPath: /opt/zookeeper/servers
        - name: zookeeper-config-map
          mountPath: /config/
      volumes:
        - name: zookeeper-cluster-pvc
          persistentVolumeClaim:
            claimName: zookeeper-cluster-pvc
        - name: zookeeper-config-map
          configMap:
            name: zookeeper-config-map
      imagePullSecrets:
        - name: registry-key

 

kafka

• step 1. 建立kafka image

start-kafka.sh

#!/bin/sh

export KAFKA_BROKER_ID=${HOSTNAME##*-}
export KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://${POD_IP}:9092
export KAFKA_LOG_DIRS="$KAFKA_HOME/kafka-log/kafka-$KAFKA_BROKER_ID/logs"

echo "KAFKA_BROKER_ID =  $KAFKA_BROKER_ID"
echo "KAFKA_ADVERTISED_LISTENERS =  $KAFKA_ADVERTISED_LISTENERS"
echo "KAFKA_LOG_DIRS =  $KAFKA_LOG_DIRS"

cp /config/* $KAFKA_HOME/config/

mkdir -p "$KAFKA_LOG_DIRS"
echo "log.dirs=$KAFKA_LOG_DIRS" >> "$KAFKA_HOME/config/server.properties"

sh "$KAFKA_HOME/bin/kafka-server-start.sh" "$KAFKA_HOME/config/server.properties"

Dockerfile

FROM centos:centos8

ARG kafka_version=2.8.0
ARG scala_version=2.13


ENV KAFKA_VERSION=$kafka_version \
    SCALA_VERSION=$scala_version \
    KAFKA_HOME=/opt/kafka
ENV PATH=${PATH}:${KAFKA_HOME}/bin/

ADD start-kafka.sh /tmp/

RUN yum install -y java-1.8.0-openjdk wget
RUN wget -O /tmp/kafka_${SCALA_VERSION}-${KAFKA_VERSION}.tgz https://downloads.apache.org/kafka/${KAFKA_VERSION}/kafka_${SCALA_VERSION}-${KAFKA_VERSION}.tgz

RUN tar -zxvf /tmp/kafka_${SCALA_VERSION}-${KAFKA_VERSION}.tgz -C /tmp/ \
    && mv /tmp/kafka_${SCALA_VERSION}-${KAFKA_VERSION} /opt/kafka \
    && chmod a+x /tmp/*.sh \
    && mv /tmp/start-kafka.sh /usr/bin

ENTRYPOINT ["start-kafka.sh"]

• step 2. 建立kafka headless service

kafka-service-headless.yaml

apiVersion: v1
kind: Service
metadata:
  name: kafka-service-headless
  labels:
    app: kafka-service-headless
spec:
  clusterIP: None
  selector:
    app: kafka
  ports:
  - port: 9092
    name: server

• step 3. 建立kafka pvc

kafka-cluster-pvc.yaml

apiVersion: v1
kind: PersistentVolumeClaim
metadata:
  name: kafka-cluster-pvc
  namespace: default
spec:
  accessModes: [ "ReadWriteMany" ]
  storageClassName: "kafka-storage-class"
  resources:
    requests:
      storage: 15Gi

• step 4. 建立kafka pv

kafka-pv.yaml

apiVersion: v1
kind: PersistentVolume
metadata:
  name: kafka-pv
spec:
  storageClassName: kafka-storage-class
  capacity:
    storage: 15Gi
  accessModes:
    - ReadWriteMany
  nfs:
    path: "/var/nfsshare/kafka"
    server: 192.168.218.53

• step 5. 建立kafka configmap

kafka-config-map.yaml

apiVersion: v1
kind: ConfigMap
metadata:
  name: kafka-config-map
  namespace: default
data:
  server.properties: |
    zookeeper.connect=zookeeper-cluster-0.zookeeper-service-headless.default.svc.cluster.local:2181,\
    zookeeper-cluster-1.zookeeper-service-headless.default.svc.cluster.local:2181,\
    zookeeper-cluster-2.zookeeper-service-headless.default.svc.cluster.local:2181

• step 6. 建立kafka statefulset

kafka-broker.yaml

apiVersion: apps/v1
kind: StatefulSet
metadata:
  name: kafka-broker
spec:
  serviceName: kafka-service-headless
  replicas: 3
  selector:
    matchLabels:
      app: kafka
  template:
    metadata:
      labels:
        app: kafka
    spec:
      containers:
      - name: kafka
        image: 192.168.218.48:5000/test/kafka-custom-image:0.0.13
        ports:
        - containerPort: 9092
          name: server
        env:
        - name: POD_IP
          valueFrom:
            fieldRef:
              fieldPath: status.podIP
        volumeMounts:
        - name: kafka-cluster-pvc
          mountPath: /opt/kafka/kafka-log
        - name: kafka-config
          mountPath: /config/
      initContainers:
      - name: copy-ro-scripts
        image: busybox
        command: ['sh', '-c', 'cp /etc/pre-install/* /config/']
        volumeMounts:
          - name: kafka-config
            mountPath: /config/
          - name: kafka-config-map
            mountPath: /etc/pre-install
      volumes:
        - name: kafka-config
          emptyDir: {}
        - name: kafka-cluster-pvc
          persistentVolumeClaim:
            claimName: kafka-cluster-pvc
        - name: kafka-config-map
          configMap:
            name: kafka-config-map
      imagePullSecrets:
        - name: registry-key

結果

測試一

創建test1 topic

kubectl exec -it kafka-broker-0 -- /opt/kafka/bin/kafka-topics.sh --bootstrap-server kafka-broker-1.kafka-service-headless.default.svc.cluster.local:9092 --topic test1 --create

使用broker0中的console producer

kubectl exec -it kafka-broker-0 -- /opt/kafka/bin/kafka-console-producer.sh --bootstrap-server kafka-broker-1.kafka-service-headless.default.svc.cluster.local:9092 --topic test1

使用broker0中的console consumer

kubectl exec -it kafka-broker-0 -- /opt/kafka/bin/kafka-console-consumer.sh --bootstrap-server kafka-broker-2.kafka-service-headless.default.svc.cluster.local:9092 --topic test1

創建topic test1

describe topic test1

在console producer中produce data

在console consumer中consume data

測試二

scale up kafka

kubectl scale sts kafka-broker --replicas=10

創建topic test 

replication-factor: 5

partitions: 3

kubectl exec -it kafka-broker-0 -- /opt/kafka/bin/kafka-topics.sh --bootstrap-server kafka-broker-1.kafka-service-headless.default.svc.cluster.local:9092 --topic test --create --partitions 3 --replication-factor 5

建立一個測試的排程輸出資料到kafka

@Component
@EnableScheduling
public class TestSchedule {

    @Autowired
    private TestProducer testProducer;

    @Scheduled(fixedRate = 1000)
    public void produceData() {
        testProducer.send(UUID.randomUUID().toString(), "test");
    }
}
@Configuration
public class ProducerConfiguration {

    @Bean
    public Map<String, Object> producerConfigs() {
        Map<String, Object> props = new HashMap<>();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka-broker-0.kafka-service-headless.default.svc.cluster.local:9092,kafka-broker-1.kafka-service-headless.default.svc.cluster.local:9092,kafka-broker-2.kafka-service-headless.default.svc.cluster.local:9092");
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        return props;
    }

    @Bean
    public ProducerFactory<String, String> producerFactory() {
        return new DefaultKafkaProducerFactory<>(producerConfigs());
    }

    @Bean
    public KafkaTemplate<String, String> kafkaTemplate() {
        return new KafkaTemplate<>(producerFactory());
    }
}
@Service
public class TestProducer {
    private static final Logger LOG = LoggerFactory.getLogger(TestProducer.class);

    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;

    public void send(String message, String topic){
        LOG.info("sending message='{}' to topic='{}'", message, topic);
        kafkaTemplate.send(topic, message);
    }
}

建立一個consumer消費topic test

@Configuration
@EnableKafka
public class ConsumerConfiguration {
    @Bean
    public Map<String, Object> consumerConfigs() {
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, 
                "kafka-broker-0.kafka-service-headless.default.svc.cluster.local:9092," +
                "kafka-broker-1.kafka-service-headless.default.svc.cluster.local:9092," +
                "kafka-broker-2.kafka-service-headless.default.svc.cluster.local:9092");
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "consumer-group");
        return props;
    }

    @Bean
    public ConsumerFactory<String, String> consumerFactory() {
        return new DefaultKafkaConsumerFactory<>(consumerConfigs());
    }

    @Bean
    public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        return factory;
    }
}
@Service
public class TestConsumer {
    private static final Logger LOGGER = LoggerFactory.getLogger(TestConsumer.class);

    @KafkaListener(topics = "test")
    public void listen(@Payload String message) {
        LOGGER.info("received message='{}'", message);
    }
}

將上述的producer和consumer部署到kubernetes

apiVersion: apps/v1
kind: Deployment
metadata:
  labels:
    app: kafka-data-producer-deployment
  name: kafka-data-producer-deployment
  namespace: default
spec:
  replicas: 1
  selector:
    matchLabels:
      app: kafka-data-producer-deployment
  template:
    metadata:
      labels:
        app: kafka-data-producer-deployment
    spec:
      containers:
        - name: kafka-data-producer-service
          image: 192.168.218.48:5000/test/kafka-producer:0.0.1
          imagePullPolicy: Always
      imagePullSecrets:
        - name: registry-key
apiVersion: apps/v1
kind: Deployment
metadata:
  labels:
    app: kafka-data-consumer-deployment
  name: kafka-data-consumer-deployment
  namespace: default
spec:
  replicas: 1
  selector:
    matchLabels:
      app: kafka-data-consumer-deployment
  template:
    metadata:
      labels:
        app: kafka-data-consumer-deployment
    spec:
      containers:
        - name: kafka-data-consumer-service
          image: 192.168.218.48:5000/test/kafka-consumer:0.0.1
          imagePullPolicy: Always
      imagePullSecrets:
        - name: registry-key

查詢producer產出的log

kubectl logs -f kafka-data-producer-deployment-5dfcb5d7d7-4smhv

查詢consumer產出的log

kubectl logs -f kafka-data-consumer-deployment-7b86b4b7c6-rkq9x

測試三

刪除其中一個kafka broker

kubectl delete pod kafka-broker-0

pod自動恢復

也可正常生產、消費資料

參考資料

zookeeper官網

kafka 官網

k8s 官網 zookeeper example

kafka HA

如何於Kubernetes上佈建Apache Kafka Cluster運行環境

Spring消費者和生產者

kafka docker example

Bob Lien