不能直接用 clientset.CoreV1().Pods().Watch(),因其需手动维护 resourceVersion、无自动重连、事件积压易断连、无法感知全量同步完成;生产环境必须用 SharedInformer。

Go 语言监听 Kubernetes 资源变更,核心是用 client-go 的 Watch 机制配合 Informer —— 直接调 Watch() 容易丢事件、不重连、难处理资源版本(resourceVersion),生产环境必须用 SharedInformer。
为什么不能直接用 clientset.CoreV1().Pods().Watch()?
裸 Watch() 调用看似简单,但实际落地会立刻遇到几个硬伤:
-
resourceVersion需手动维护:首次 Watch 要传"",后续断连重试必须带上次收到的最新resourceVersion,否则报"too old resource version" - 连接中断后无自动重试:HTTP 连接可能被 kube-apiserver 关闭或网络抖动中断,裸 Watch 不会重连,监听直接静默失效
- 事件积压无缓冲:如果处理逻辑慢(比如写 DB、发 HTTP),
watch.Event会阻塞在 channel 上,导致 apiserver 主动关闭连接(默认 5 分钟无读) - 无法感知“全量同步完成”:你不知道什么时候缓存已热,适合开始业务逻辑
用 SharedInformer 实现稳定监听(推荐方式)
SharedInformer 封装了重试、reflector、本地缓存、事件分发等全部逻辑,是 client-go 官方唯一推荐的监听模式。关键步骤如下:
- 用
cache.NewSharedIndexInformer()或更常用的cache.NewSharedInformer()构建实例,传入ListWatch和resyncPeriod -
ListWatch必须包含ListFunc(拉全量)和WatchFunc(建立 Watch 流),client-go 提供了cache.DefaultListWatch简化构造 - 通过
AddEventHandler()注册回调,四个标准方法:OnAdd、OnUpdate、OnDelete、OnSynced - 调用
informer.Run(stopCh)启动 —— 它会在后台开 goroutine 拉取 + Watch,并把事件分发到所有 handler
informer := cache.NewSharedInformer(
&cache.ListWatch{
ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
return clientset.CoreV1().Pods("default").List(context.TODO(), options)
},
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
return clientset.CoreV1().Pods("default").Watch(context.TODO(), options)
},
},
&corev1.Pod{},
0, // resyncPeriod: 0 表示不周期性 resync
)
informer.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
pod := obj.(*corev1.Pod)
log.Printf("Pod added: %s", pod.Name)
},
UpdateFunc: func(old, new interface{}) {
oldPod := old.(*corev1.Pod)
newPod := new.(*corev1.Pod)
if oldPod.ResourceVersion != newPod.ResourceVersion {
log.Printf("Pod updated: %s", newPod.Name)
}
},
DeleteFunc: func(obj interface{}) {
pod, ok := obj.(*corev1.Pod)
if !ok {
tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
if !ok {
return
}
pod, ok = tombstone.Obj.(*corev1.Pod)
if !ok {
return
}
}
log.Printf("Pod deleted: %s", pod.Name)
},
OnSynced: func() {
log.Println("Informer synced — cache is ready")
},
})
stopCh := make(chan struct{})
go informer.Run(stopCh)
监听特定命名空间 or 集群范围资源?看 ListWatch 构造
资源作用域完全由 ListFunc 和 WatchFunc 决定:
立即学习“go语言免费学习笔记(深入)”;
- 监听单 namespace:用
clientset.CoreV1().Pods("myns")—— 如上例中的"default" - 监听所有 namespace:用
clientset.CoreV1().Pods("")(注意不是"all") - 监听集群级资源(如
Node,Namespace):直接调clientset.CoreV1().Nodes()等,无需指定 ns - 加 label/field selector:在
ListOptions中设置LabelSelector或FieldSelector,例如options.LabelSelector = "app=nginx"
Informer 启动后,如何安全访问本地缓存?
别在 OnAdd/OnUpdate 回调里直接用 obj 做长期持有 —— 它可能被 informer 内部复用。需要深拷贝或转成不可变结构;更重要的是,若需查其他关联资源(比如 Pod 对应的 Node),应从 informer 的 GetStore() 或 GetIndexer() 取缓存,而不是再发 API 请求:
-
informer.GetIndexer().List()返回当前全部对象快照(注意是[]interface{},需类型断言) -
informer.GetIndexer().GetByKey("default/my-pod")按 key 查单个对象(key 格式为"namespace/name") - 所有缓存读操作都是线程安全的,无需额外锁
- 但写操作(如
Add/Update)只能由 informer 自己执行,外部禁止修改缓存内容
最常被忽略的一点:如果你在 OnSynced 触发后立刻调 GetIndexer().List(),拿到的未必是最终一致状态 —— 因为 OnSynced 只表示“初始 list 已完成”,而 list 到 watch 切换之间仍有极小窗口可能漏掉新事件。真正安全的做法是:把初始化逻辑放在 OnSynced 之后,并接受后续 OnAdd/OnUpdate 补充更新。










