Go实现异步观察者模式的核心是用channel+goroutine解耦Subject与Observer:Notify()仅发事件到缓冲channel,dispatcher goroutine异步分发并并发调用各Observer.Update(),配合map存储+互斥锁支持安全注册/注销,Close()确保资源清理。

用 Go 实现观察者模式的异步通知,核心是解耦事件发布者(Subject)与订阅者(Observer),并通过 goroutine 避免阻塞主线程。Go 本身没有内置 Observer 接口,但可以轻量、清晰地自己构建。
定义 Observer 和 Subject 接口
先约定行为:Observer 要能接收事件;Subject 要支持注册、移除和广播。
- Observer 接口只需一个
Update(event interface{})方法 - Subject 可用结构体实现,内部维护
[]Observer切片 + 互斥锁(防止并发修改) - 为避免强耦合,event 类型建议用自定义 struct(如
OrderCreatedEvent{ID string, Time time.Time}),而非空接口
用 channel + goroutine 实现非阻塞通知
关键在 Notify() 方法:不直接调用每个 Observer 的 Update,而是把事件发到一个共享 channel,另起 goroutine 消费并分发。
- 启动一个长期运行的 dispatcher goroutine:
go func() { for event := range subject.eventCh { /* 遍历 observers 并 go obs.Update(event) */ } }() - Subject.Notify() 只需
subject.eventCh ,立刻返回 - 每个 observer 的 Update 在独立 goroutine 中执行,彼此不干扰,也不会拖慢发布者
支持取消订阅与资源清理
真实场景中 Observer 可能生命周期短于 Subject(比如 HTTP handler 注册后退出),需提供 Unsubscribe。
立即学习“go语言免费学习笔记(深入)”;
- 用
map[uintptr]Observer替代切片,key 可用unsafe.Pointer(reflect.ValueOf(obs).UnsafeAddr())或更稳妥的自定义 ID - Unsubscribe 时加锁删除 map 条目
- Subject 可暴露
Close()方法:关闭 eventCh,等待 dispatcher 退出,避免 goroutine 泄漏
简单可用示例(无第三方依赖)
以下是最小可运行骨架:
// Observer 定义
type Observer interface {
Update(event interface{})
}
// Subject 实现
type Subject struct {
mu sync.RWMutex
obs map[uintptr]Observer
eventCh chan interface{}
}
func NewSubject() *Subject {
s := &Subject{
obs: make(map[uintptr]Observer),
eventCh: make(chan interface{}, 100), // 缓冲防压垮
}
go s.dispatcher()
return s
}
func (s *Subject) Register(obs Observer) {
s.mu.Lock()
defer s.mu.Unlock()
ptr := uintptr(unsafe.Pointer(reflect.ValueOf(obs).UnsafeAddr()))
s.obs[ptr] = obs
}
func (s *Subject) Unregister(obs Observer) {
s.mu.Lock()
defer s.mu.Unlock()
ptr := uintptr(unsafe.Pointer(reflect.ValueOf(obs).UnsafeAddr()))
delete(s.obs, ptr)
}
func (s *Subject) Notify(event interface{}) {
select {
case s.eventCh <- event:
default:
// 可选:日志告警或丢弃,避免阻塞
}
}
func (s *Subject) dispatcher() {
for event := range s.eventCh {
s.mu.RLock()
obsList := make([]Observer, 0, len(s.obs))
for _, o := range s.obs {
obsList = append(obsList, o)
}
s.mu.RUnlock()
for _, o := range obsList {
go o.Update(event) // 异步触发
}
}}
func (s *Subject) Close() {
close(s.eventCh)
}
基本上就这些。不需要复杂框架,几份结构+channel+goroutine 就能稳稳跑起异步观察者。重点是别让 Update 同步阻塞 Notify,也别忘了清理。










