k8s运维: kafka和zookeeper在k8s集群踩的一些坑

zookeeper配置istio sidecar后存在的网络不可用问题

如果zookeeper配置了istio sidecar ,在选举阶段就会报connection refused(Connection refused)错误,如下图:

这主要是因为 zookeeper 在server之间通信默认是监听 pod IP 地址,而istio要求监听0.0.0.0,因此需要设置quorumListenOnAllIPs=true

具体问题可以参考:https://istio.io/latest/faq/applications/

这个不止在 zookeeper 中会出现,包括 Apache NiFi 、 Cassandra、 Elasticsearch、Redis 中安装 sidecar 模式都会存在这个问题。

由于docker官方的zookeeper镜像没有提供 quorumListenOnAllIPs 的参数,我们需要直接手动添加,详细参考这个issue:
https://github.com/31z4/zookeeper-docker/issues/117

或者可以用 bitnami/zookeeper 这个镜像,这个镜像提供了 quorumListenOnAllIPs 支持,可以通过设置ZOO_LISTEN_ALLIPS_ENABLED环境变量来控制,下面是简单的deployment文件:

kind: Deployment
apiVersion: apps/v1
metadata:
name: zookeeper-1
namespace: rcmd
spec:
replicas: 1
selector:
matchLabels:
app: zookeeper-1
template:
metadata:
labels:
app: zookeeper-1
spec:
containers:
- name: zookeeper
image: bitnami/zookeeper:3.6.2
imagePullPolicy: Always
ports:
- containerPort: 2181
env:
- name: ALLOW_ANONYMOUS_LOGIN
value: "yes"
- name: ZOO_LISTEN_ALLIPS_ENABLED
value: "true"
- name: ZOO_SERVER_ID
value: "1"
- name: ZOO_SERVERS
value: 0.0.0.0:2888:3888,zookeeper-2:2888:3888,zookeeper-3:2888:3888
---
kind: Deployment
apiVersion: apps/v1
metadata:
name: zookeeper-2
namespace: rcmd
spec:
replicas: 1
selector:
matchLabels:
app: zookeeper-2
template:
metadata:
labels:
app: zookeeper-2
spec:
containers:
- name: zookeeper
image: bitnami/zookeeper:3.6.2
imagePullPolicy: Always
ports:
- containerPort: 2181
env:
- name: ALLOW_ANONYMOUS_LOGIN
value: "yes"
- name: ZOO_LISTEN_ALLIPS_ENABLED
value: "true"
- name: ZOO_SERVER_ID
value: "2"
- name: ZOO_SERVERS
value: zookeeper-1:2888:3888,0.0.0.0:2888:3888,zookeeper-3:2888:3888
---

kind: Deployment
apiVersion: apps/v1
metadata:
name: zookeeper-3
namespace: rcmd
spec:
replicas: 1
selector:
matchLabels:
app: zookeeper-3
template:
metadata:
labels:
app: zookeeper-3
spec:
containers:
- name: zookeeper
image: bitnami/zookeeper:3.6.2
imagePullPolicy: Always
ports:
- containerPort: 2181
env:
- name: ALLOW_ANONYMOUS_LOGIN
value: "yes"
- name: ZOO_LISTEN_ALLIPS_ENABLED
value: "true"
- name: ZOO_SERVER_ID
value: "3"
- name: ZOO_SERVERS
value: zookeeper-1:2888:3888,zookeeper-2:2888:3888,0.0.0.0:2888:3888

再设置好 services 就可以 running 了。

kafka在k8s外网访问设置项

k8s 对外暴露一般都会走ingress,但kafka由于起自身特殊的connect机制,我们需要专门设置kafka让其客户端感知到其目标连接。

kafka 和客户端建立连接:

  • 客户端向 kafka server 发起 findCoordinator 请求,寻找可以建立连接的协调者,server 会返回broker连接地址
  • 客户端获得地址后,会创建该 Broker 的 Socket 连接,并保持心跳上报,连接建立起来之后初始和第一个borker的连接会被关闭

由于 kafka 会主要告诉客户端 broker 的连接地址,因为在对外网开放的时候我们需要把 broker 地址设置成外网可访问的地址,这里以wurstmeister/kafka的kafka为例,可以通过以下设置让外网访问:

kind: Deployment
apiVersion: apps/v1
metadata:
name: kafka-broker0
namespace: databases
spec:
replicas: 1
selector:
matchLabels:
app: kafka
id: "kafka-broker0"
template:
metadata:
labels:
app: kafka
id: "kafka-broker0"
spec:
containers:
- name: kafka
image: "wurstmeister/kafka:2.12-2.5.0"
imagePullPolicy: "IfNotPresent"
env:
- name: KAFKA_ADVERTISED_LISTENERS
value: "INSIDE://kafka-broker0:9092,OUTSIDE://kafka.db.tensorbytes.com:10000"
- name: KAFKA_LISTENERS
value: "INSIDE://:9092,OUTSIDE://:10000"
- name: KAFKA_LISTENER_SECURITY_PROTOCOL_MAP
value: "INSIDE:PLAINTEXT,OUTSIDE:PLAINTEXT"
- name: KAFKA_INTER_BROKER_LISTENER_NAME
value: "INSIDE"
- name: KAFKA_ZOOKEEPER_CONNECT
value: zookeeper-1:2181
- name: KAFKA_BROKER_ID
value: "0"
- name: KAFKA_CREATE_TOPICS
value: mp_post_slog:1:1
- name: LOG4J_LOGGER_KAFKA_AUTHORIZER_LOGGER
value: "DEBUG"
resources:
limits:
cpu: 200m
memory: 512Mi

环境变量:

KAFKA_ADVERTISED_LISTENERS: INSIDE://:9092,OUTSIDE://_{HOSTNAME_COMMAND}:9094
KAFKA_LISTENERS: INSIDE://:9092,OUTSIDE://:9094
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INSIDE:PLAINTEXT,OUTSIDE:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: INSIDE

istio virtualserver 配置文件:

apiVersion: networking.istio.io/v1beta1
kind: VirtualService
metadata:
name: kafka-router
namespace: databases
spec:
gateways:
- db-external-gateway
hosts:
- kafka.db.tensorbytes.com
tcp:
- match:
- port: 10000
route:
- destination:
host: kafka-broker0.databases.svc.cluster.local
port:
number: 10000

---
apiVersion: v1
kind: Service
metadata:
name: kafka-broker0
labels:
name: kafka
namespace: rcmd
spec:
ports:
- port: 9092
name: internal-port
protocol: TCP
targetPort: 9092
- port: 10000
name: external-port
protocol: TCP
targetPort: 10000
selector:
app: kafka
id: "kafka-broker0"
type: ClusterIP

集群外通过网关访问的测试脚本

生产者:

#coding:utf-8
from kafka import KafkaProducer
import random

producer = KafkaProducer(bootstrap_servers='kafka.db.tensorbytes.com:10000')
for i in range(10):
producer.send('mp_post_slog', key=b'testping', value=b'bar')
producer.flush(timeout=10)

消费者:

#coding:utf-8
from kafka import KafkaConsumer

consumer = KafkaConsumer('mp_post_slog', bootstrap_servers='kafka.db.tensorbytes.com:10000', group_id='my_favorite_group')

for msg in consumer:
print(msg)

集群内测试脚本

生产者:

#coding:utf-8
from kafka import KafkaProducer
import random

producer = KafkaProducer(bootstrap_servers='kafka-broker0:9092')
for i in range(10):
producer.send('mp_post_slog', key=b'testping', value=b'bar')
producer.flush(timeout=10)

消费者:

#coding:utf-8
from kafka import KafkaProducer
import random

producer = KafkaProducer(bootstrap_servers='kafka-broker0:9092')
for i in range(10):
producer.send('mp_post_slog', key=b'testping', value=b'bar')
producer.flush(timeout=10)

shikanon wechat
欢迎您扫一扫,订阅我滴↑↑↑的微信公众号!