Go中用channel实现观察者模式,核心是Subject通过channel向Observer异步发送事件。定义Event接口和Observer接口,Observer持有eventCh接收事件,Subject维护observers映射并用sync.RWMutex保证并发安全。Notify时复制观察者列表并为每个Observer启动goroutine调用Update,避免阻塞。Observer用独立goroutine监听eventCh,处理事件。channel天然支持并发安全、解耦、非阻塞通信,符合Go“通过通信共享内存”理念。相比回调函数,channel避免阻塞通知者;相比互斥锁,channel更专注通信而非资源保护。缓冲channel和超时机制防阻塞,stopCh实现优雅退出。适用于进程内高并发事件通知,但不适合跨进程通信或需严格顺序处理场景。
在Go语言中,实现观察者模式并利用channel来构建事件通知机制,核心思路其实挺直接的:让事件的发布者(Subject)持有一系列channel,每个channel代表一个订阅者(Observer)。当有事件发生时,发布者就将事件数据发送到这些channel中,而每个订阅者则在自己的goroutine里监听它专属的channel,接收并处理事件。这样一来,事件的发送和接收就自然而然地解耦了,而且Go的并发原语让这一切变得异常流畅。
要搞定这个事儿,我们通常会定义一个事件接口,一个观察者接口,然后就是核心的被观察者(或称主题)结构体。
首先,我们来定义事件和观察者。事件可以是个空接口,这样可以传递任何类型的数据。观察者呢,就监听并处理事件。
package main import ( "fmt" "sync" "time" ) // Event 是我们事件的通用接口,可以是任何类型 type Event interface{} // Observer 是观察者接口,定义了接收事件的方法 type Observer interface { Update(event Event) } // Subject 是被观察者(主题)的接口,定义了注册、注销和通知观察者的方法 type Subject interface { Register(observer Observer) Unregister(observer Observer) Notify(event Event) } // ConcreteObserver 是一个具体的观察者实现 type ConcreteObserver struct { id string eventCh chan Event // 每个观察者拥有一个独立的channel来接收事件 stopCh chan struct{} // 用于停止观察者的监听循环 } // NewConcreteObserver 创建一个新的具体观察者 func NewConcreteObserver(id string) *ConcreteObserver { return &ConcreteObserver{ id: id, eventCh: make(chan Event, 5), // 带缓冲的channel,防止阻塞通知者 stopCh: make(chan struct{}), } } // Update 实现了Observer接口,将事件发送到自己的channel func (o *ConcreteObserver) Update(event Event) { select { case o.eventCh <- event: // 事件成功发送 case <-time.After(50 * time.Millisecond): // 设置一个超时,防止长时间阻塞 fmt.Printf("Observer %s: send event timeout, event: %v\n", o.id, event) } } // StartListening 启动一个goroutine监听事件channel func (o *ConcreteObserver) StartListening() { go func() { fmt.Printf("Observer %s: 开始监听事件...\n", o.id) for { select { case event := <-o.eventCh: fmt.Printf("Observer %s: 收到事件 -> %v\n", o.id, event) // 模拟处理事件的耗时操作 time.Sleep(time.Millisecond * 100) case <-o.stopCh: fmt.Printf("Observer %s: 停止监听。\n", o.id) return } } }() } // StopListening 停止监听goroutine func (o *ConcreteObserver) StopListening() { close(o.stopCh) close(o.eventCh) // 关闭channel,确保goroutine退出 } // ConcreteSubject 是一个具体的被观察者实现 type ConcreteSubject struct { observers map[Observer]struct{} // 使用map来存储观察者,方便查找和删除 mu sync.RWMutex // 读写锁,保护observers map的并发访问 } // NewConcreteSubject 创建一个新的具体被观察者 func NewConcreteSubject() *ConcreteSubject { return &ConcreteSubject{ observers: make(map[Observer]struct{}), } } // Register 注册一个观察者 func (s *ConcreteSubject) Register(observer Observer) { s.mu.Lock() defer s.mu.Unlock() s.observers[observer] = struct{}{} fmt.Printf("Subject: 注册了观察者 %T\n", observer) } // Unregister 注销一个观察者 func (s *ConcreteSubject) Unregister(observer Observer) { s.mu.Lock() defer s.mu.Unlock() if _, ok := s.observers[observer]; ok { delete(s.observers, observer) fmt.Printf("Subject: 注销了观察者 %T\n", observer) } } // Notify 通知所有注册的观察者 func (s *ConcreteSubject) Notify(event Event) { s.mu.RLock() // 使用读锁,允许多个goroutine同时读取observers map // 为了避免在通知过程中修改map导致panic,通常会复制一份观察者列表 // 但在这里,我们只进行读取操作,所以直接遍历map是安全的 // 并且,每个通知操作都在单独的goroutine中进行,进一步解耦 observersToNotify := make([]Observer, 0, len(s.observers)) for obs := range s.observers { observersToNotify = append(observersToNotify, obs) } s.mu.RUnlock() fmt.Printf("Subject: 正在通知事件 -> %v\n", event) for _, obs := range observersToNotify { // 每个通知操作在一个独立的goroutine中进行,防止一个慢速观察者阻塞所有通知 go obs.Update(event) } } func main() { subject := NewConcreteSubject() obs1 := NewConcreteObserver("Observer-1") obs2 := NewConcreteObserver("Observer-2") obs3 := NewConcreteObserver("Observer-3") obs1.StartListening() obs2.StartListening() obs3.StartListening() subject.Register(obs1) subject.Register(obs2) subject.Register(obs3) fmt.Println("\n--- 第一次事件通知 ---") subject.Notify("系统启动完成") time.Sleep(time.Second * 1) // 等待事件处理 fmt.Println("\n--- 注销Observer-2 ---") subject.Unregister(obs2) obs2.StopListening() // 停止注销的观察者监听 fmt.Println("\n--- 第二次事件通知 ---") subject.Notify("用户登录成功") time.Sleep(time.Second * 1) // 等待事件处理 fmt.Println("\n--- 注册Observer-4 ---") obs4 := NewConcreteObserver("Observer-4") obs4.StartListening() subject.Register(obs4) fmt.Println("\n--- 第三次事件通知 ---") subject.Notify("数据更新通知") time.Sleep(time.Second * 1) // 等待事件处理 // 停止所有观察者 obs1.StopListening() obs3.StopListening() obs4.StopListening() fmt.Println("\n所有操作完成。") time.Sleep(time.Millisecond * 500) // 确保所有goroutine有时间退出 }
这段代码里,
ConcreteObserver
eventCh
Update
StartListening
ConcreteSubject
observers
Register
Unregister
Notify
Update
立即学习“go语言免费学习笔记(深入)”;
我个人觉得,在Go里搞事件通知,用channel简直是水到渠成,比传统的回调函数或者单纯依赖互斥锁要优雅得多。这事儿得从Go的并发哲学说起:“不要通过共享内存来通信,而要通过通信来共享内存。”
首先,回调函数虽然直观,但它本质上是在调用者的goroutine里直接执行被调用者的逻辑。这意味着如果一个回调函数执行得很慢,它会直接阻塞通知者,甚至可能阻塞整个事件链条。更麻烦的是,如果回调函数需要访问共享状态,你还得自己去操心加锁、解锁,很容易就搞出死锁或者竞态条件。Go里虽然也可以用
sync.Mutex
其次,互斥锁固然是并发控制的利器,但它更多是用来保护共享资源访问的。在观察者模式里,我们关注的是“事件的传递”和“通知的解耦”。如果单纯用互斥锁来保护一个共享的事件队列,虽然能保证数据安全,但事件处理的并发性可能得不到充分发挥,而且通知者和观察者之间的耦合度仍然存在,因为它们都得知道如何操作这个队列和锁。
而channel呢?它就是为并发通信而生的。
select
说实话,每次遇到需要事件通知或者生产者-消费者模型的场景,我第一个想到的就是channel。它带来的那种心智负担的减轻,是其他方案很难比拟的。
在实际操作中,尤其是在高并发环境下,处理观察者的注册、注销以及事件通知,有几个细节是必须得注意的,否则很容易踩坑。
保护Subject
observers
Subject
observers
sync.RWMutex
Register
Unregister
s.mu.Lock()
s.mu.Unlock()
Notify
observers
s.mu.RLock()
s.mu.RUnlock()
Notify
通知时的并发投递: 在
Notify
for ... go obs.Update(event)
Subject
Update
Update
eventCh
Subject
Notify
observers
observersToNotify
map
map
panic
观察者内部的Channel管理:
ConcreteObserver
eventCh
make(chan Event, 5)
Observer
Subject
ConcreteObserver.Update
select
time.After
eventCh
ConcreteObserver
stopCh chan struct{}
close(o.stopCh)
StartListening
eventCh
这些细节,虽然看起来有点繁琐,但它们是构建健壮、高效并发系统的基石。没有它们,代码在高并发场景下很可能出现各种意想不到的问题。
尽管用Go的channel来实现观察者模式在多数场景下都显得非常自然和高效,但凡事没有银弹,总有些情况,它可能不是最优解,或者说,你得考虑其他的方案:
极高扇出且事件数据量微乎其微: 比如说,你有上百万个观察者,而每次通知的事件仅仅是一个布尔值或者一个非常小的整数。在这种极端情况下,为每个通知启动一个goroutine、以及channel本身的开销(虽然Go的goroutine和channel已经非常轻量了),累积起来可能会比直接在一个共享的、经过精心优化且加锁的切片上循环调用回调函数要略高一些。但这通常是过度优化,而且维护起来会更复杂。对于绝大多数业务场景,这点性能差异几乎可以忽略不计。
跨进程/跨网络通信的“观察者”: Go的channel是进程内的通信机制。如果你需要实现的是一个分布式系统中的事件通知,比如一个服务发布事件,另一个完全独立的微服务订阅并处理,那么channel就无能为力了。这时候,你需要的是专业的消息队列(如Kafka、RabbitMQ、NATS等),或者RPC框架(如gRPC)配合发布订阅模式。它们提供了持久化、可靠传输、负载均衡、服务发现等channel无法提供的特性。
复杂的双向或多轮交互: 观察者模式的核心是单向的事件通知——Subject通知Observer。如果你的“观察者”和“被观察者”之间需要进行频繁的、复杂的双向通信,或者Observer需要主动查询Subject的内部状态并根据查询结果进行多轮交互,那么单纯的事件通知模式可能就不够了。这种情况下,你可能需要设计更复杂的API接口,或者引入更高级的并发模式(如Actor模型),甚至直接暴露一个包含共享状态的服务,并通过更细粒度的锁来管理。虽然channel可以用来构建复杂的通信流,但如果核心需求是请求-响应而非简单的通知,直接的函数调用或RPC可能更直观。
严格的事件顺序保证且不允许并发处理: 尽管channel保证了发送和接收的顺序,但如果你在
Notify
Update
Subject
总的来说,channel在Go中实现观察者模式,其优势在于简洁、并发安全、解耦和Go-idiomatic。只有当遇到非常特定的、边缘的性能需求或者跨进程/网络通信时,才需要考虑其他方案。
以上就是Golang观察者模式怎么做 使用channel实现事件通知机制的详细内容,更多请关注php中文网其它相关文章!
每个人都需要一台速度更快、更稳定的 PC。随着时间的推移,垃圾文件、旧注册表数据和不必要的后台进程会占用资源并降低性能。幸运的是,许多工具可以让 Windows 保持平稳运行。
Copyright 2014-2025 https://www.php.cn/ All Rights Reserved | php.cn | 湘ICP备2023035733号