概述
kube-proxy踩坑日志(三)
根据某同事反馈,在某环境中,创建容器化的kafka集群的时候,发现调度到节点上的kafka的容器起不来,通过查看kafka容器的日志,发现日志中报错,端口xxxx已经被使用,通过 ss -lptn | grep xxxx发现对应的进程居然是kube-proxy。(环境中kube-proxy使用的是iptables模式)
关于kube-proxy为什么要监听除他自己本身意外的端口?
通过仔细阅读对应的iptables proxy代码,kube-proxy 除了syncService,syncEndpoint,syncEndpointSlice,更新iptables规则外,还会对特殊的NodePort的service做HeathCheck,我们环境中用的service类型是Loadbalancer, 每次创建一个Loadbalancer的service后,对应会给每个servicePort都回写一个NodePort。
上面说到kube-proxy会对特殊的NodePort的service做健康检查,在做健康检查的时候,会通过内部goroutine 启动一个httpserver去监听对的service的NodePort,大家都知道,启动一个httpserver就相当于本地启动一个服务,会占用对应的端口(service port的NodePort)的。此外kube-proxy不是对所有的有NodePort的service进行健康检查,而是对特殊的service才会进行健康检查,特殊之处在于要满足做健康检查的service需要满足三个条件,只有同时满足三个条件时才会做健康检查。这就是这个service的特殊之处。
1、service port中有nodePort
2、service.spec.externalTrafficPolicy: Local
3、service类型是LoadBalancer
下面结合下实际代码来看看
// This is where all of the iptables-save/restore calls happen.
// The only other iptables rules are those that are setup in iptablesInit()
// This assumes proxier.mu is NOT held
func (proxier *Proxier) syncProxyRules() {
......................................
serviceUpdateResult := proxy.UpdateServiceMap(proxier.serviceMap, proxier.serviceChanges)
......................................
if err := proxier.serviceHealthServer.SyncServices(serviceUpdateResult.HCServiceNodePorts); err != nil {
klog.Errorf("Error syncing healthcheck services: %v", err)
}
......................................
}
-------------------------------------------------------------------
func UpdateServiceMap(serviceMap ServiceMap, changes *ServiceChangeTracker) (result UpdateServiceMapResult) {
result.UDPStaleClusterIP = sets.NewString()
serviceMap.apply(changes, result.UDPStaleClusterIP)
result.HCServiceNodePorts = make(map[types.NamespacedName]uint16)
for svcPortName, info := range serviceMap {
if info.HealthCheckNodePort() != 0 { // 核心是这个HealthCheckNodePort方法
result.HCServiceNodePorts[svcPortName.NamespacedName] = uint16(info.HealthCheckNodePort())
}
}
return result
}
-------------------------------------------------------------------
// HealthCheckNodePort is part of ServicePort interface.
func (info *BaseServiceInfo) HealthCheckNodePort() int {
return info.healthCheckNodePort
}
-------------------------------------------------------------------
func (sct *ServiceChangeTracker) newBaseServiceInfo(port *v1.ServicePort, service *v1.Service) *BaseServiceInfo {
....................................
// 最核心的一个地方,就是这个代码块的逻辑判断,确定了是否需要给service添加heathCheck
if apiservice.NeedsHealthCheck(service) {
p := service.Spec.HealthCheckNodePort
if p == 0 {
klog.Errorf("Service %s/%s has no healthcheck nodeport", service.Namespace, service.Name)
} else {
info.healthCheckNodePort = int(p)
}
}
return info
}
-------------------------------------------------------------------
// RequestsOnlyLocalTraffic checks if service requests OnlyLocal traffic.
func RequestsOnlyLocalTraffic(service *v1.Service) bool {
if service.Spec.Type != v1.ServiceTypeLoadBalancer &&
service.Spec.Type != v1.ServiceTypeNodePort {
return false
}
return service.Spec.ExternalTrafficPolicy == v1.ServiceExternalTrafficPolicyTypeLocal
}
// NeedsHealthCheck checks if service needs health check.
func NeedsHealthCheck(service *v1.Service) bool {
if service.Spec.Type != v1.ServiceTypeLoadBalancer {
return false
}
return RequestsOnlyLocalTraffic(service)
}
-------------------------------------------------------------------
// 最后是heathCheck的代码,会创建一个httpserver进行监听
func (hcs *server) SyncServices(newServices map[types.NamespacedName]uint16) error {
hcs.lock.Lock()
defer hcs.lock.Unlock()
// Remove any that are not needed any more.
for nsn, svc := range hcs.services {
if port, found := newServices[nsn]; !found || port != svc.port {
klog.V(2).Infof("Closing healthcheck %q on port %d", nsn.String(), svc.port)
if err := svc.listener.Close(); err != nil {
klog.Errorf("Close(%v): %v", svc.listener.Addr(), err)
}
delete(hcs.services, nsn)
}
}
// Add any that are needed.
for nsn, port := range newServices {
if hcs.services[nsn] != nil {
klog.V(3).Infof("Existing healthcheck %q on port %d", nsn.String(), port)
continue
}
klog.V(2).Infof("Opening healthcheck %q on port %d", nsn.String(), port)
svc := &hcInstance{port: port}
addr := fmt.Sprintf(":%d", port)
svc.server = hcs.httpFactory.New(addr, hcHandler{name: nsn, hcs: hcs})
var err error
svc.listener, err = hcs.listener.Listen(addr)
if err != nil {
msg := fmt.Sprintf("node %s failed to start healthcheck %q on port %d: %v", hcs.hostname, nsn.String(), port, err)
if hcs.recorder != nil {
hcs.recorder.Eventf(
&v1.ObjectReference{
Kind: "Service",
Namespace: nsn.Namespace,
Name: nsn.Name,
UID: types.UID(nsn.String()),
}, api.EventTypeWarning, "FailedToStartServiceHealthcheck", msg)
}
klog.Error(msg)
continue
}
hcs.services[nsn] = svc
go func(nsn types.NamespacedName, svc *hcInstance) {
// Serve() will exit when the listener is closed.
klog.V(3).Infof("Starting goroutine for healthcheck %q on port %d", nsn.String(), svc.port)
if err := svc.server.Serve(svc.listener); err != nil {
klog.V(3).Infof("Healthcheck %q closed: %v", nsn.String(), err)
return
}
klog.V(3).Infof("Healthcheck %q closed", nsn.String())
}(nsn, svc)
}
return nil
}
-------------------------------------------------------------------
在我们生产环境中,通过operator创建出来的kafka集群的service,恰好都满足这三个条件,豌豆滚屁股,遇缘了,巧了又巧,我们kafka集群使用的是hostnetwork模式,kafka容器的conatinerPort.hostport使用的端口xxxx,恰好在创建kafka的lb的service的时候给servicePort设置的nodePort就恰好是的kafka容器的conatinerPort.hostport,导致端口被占用,kafka容器一直起就报错端口被占用。
聊聊externalTrafficPolicy
一般没用到LB类型的svc没太关注这个属性,这个属性主要是用来对lb service和nodePort service做route 规则的,只有两种类型的policy
1、Local local的意思是通过service请求,直接就到svc关联的pod的节点上,由于没有经过其他节点的转发,所以保留了client端的源ip,但是这种类型有个缺点,是没有负载均衡,直接链接
client
^ /
/ /
/ v X
node 1 node 2
^ |
| |
| v
endpoint
2、Cluster Cluster的意思是可以可以通过链接到其他的非Pod所在的node上,通过其他的node进行转发在到pod所在的node,从而到达pod,这种就相当于给一个pod多个负载均衡器,通过各国node都能访问到pod,缺点是由于转发,改变了client的源ip为转发的节点的ip,对于需要保留源ip的服务来说不太合适
client
^
v
node 1 <--- node 2
| ^ SNAT
| | --->
v |
endpoint
思考?
为什么端口都被占用了,k8s的调度器却还要把pod网节点上调度呢? 看过k8s源码的调度的应该知道,因为调度器内部维持的nodeCache虽然也计算了节点使用的hostPort,但是它计算的是已经在该节点上,或者经过它调度到该节点上的那些的pod(pod的containerPort中的hostport不为空值)。除此之外的node上的其他服务占用的node port并没有进行计算,并缓存到调度器的nodeCache。个人感觉这个有点不合理,虽然节点上的hostport可能会随着节点上的服务服务不断的暂用hostport,释放port(主要是tcp的链接建立,释放),但是应该在调度的nodeCache中统计把hostPort的使用也考虑上,不然当hostnetwork的pod调度到节点上后,端口被占用了,却起不来
最后
以上就是酷酷玉米为你收集整理的kube-proxy踩坑日记(三)kube-proxy踩坑日志(三)的全部内容,希望文章能够帮你解决kube-proxy踩坑日记(三)kube-proxy踩坑日志(三)所遇到的程序开发问题。
如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。
发表评论 取消回复