首页 > 后端开发 > Golang > 正文

Golang观察者模式怎么做 使用channel实现事件通知机制

P粉602998670
发布: 2025-08-20 11:44:01
原创
3236人浏览过
Go中用channel实现观察者模式,核心是Subject通过channel向Observer异步发送事件。定义Event接口和Observer接口,Observer持有eventCh接收事件,Subject维护observers映射并用sync.RWMutex保证并发安全。Notify时复制观察者列表并为每个Observer启动goroutine调用Update,避免阻塞。Observer用独立goroutine监听eventCh,处理事件。channel天然支持并发安全、解耦、非阻塞通信,符合Go“通过通信共享内存”理念。相比回调函数,channel避免阻塞通知者;相比互斥锁,channel更专注通信而非资源保护。缓冲channel和超时机制防阻塞,stopCh实现优雅退出。适用于进程内高并发事件通知,但不适合跨进程通信或需严格顺序处理场景。

golang观察者模式怎么做 使用channel实现事件通知机制

在Go语言中,实现观察者模式并利用channel来构建事件通知机制,核心思路其实挺直接的:让事件的发布者(Subject)持有一系列channel,每个channel代表一个订阅者(Observer)。当有事件发生时,发布者就将事件数据发送到这些channel中,而每个订阅者则在自己的goroutine里监听它专属的channel,接收并处理事件。这样一来,事件的发送和接收就自然而然地解耦了,而且Go的并发原语让这一切变得异常流畅。

解决方案

要搞定这个事儿,我们通常会定义一个事件接口,一个观察者接口,然后就是核心的被观察者(或称主题)结构体。

首先,我们来定义事件和观察者。事件可以是个空接口,这样可以传递任何类型的数据。观察者呢,就监听并处理事件。

package main

import (
    "fmt"
    "sync"
    "time"
)

// Event 是我们事件的通用接口,可以是任何类型
type Event interface{}

// Observer 是观察者接口,定义了接收事件的方法
type Observer interface {
    Update(event Event)
}

// Subject 是被观察者(主题)的接口,定义了注册、注销和通知观察者的方法
type Subject interface {
    Register(observer Observer)
    Unregister(observer Observer)
    Notify(event Event)
}

// ConcreteObserver 是一个具体的观察者实现
type ConcreteObserver struct {
    id      string
    eventCh chan Event // 每个观察者拥有一个独立的channel来接收事件
    stopCh  chan struct{} // 用于停止观察者的监听循环
}

// NewConcreteObserver 创建一个新的具体观察者
func NewConcreteObserver(id string) *ConcreteObserver {
    return &ConcreteObserver{
        id:      id,
        eventCh: make(chan Event, 5), // 带缓冲的channel,防止阻塞通知者
        stopCh:  make(chan struct{}),
    }
}

// Update 实现了Observer接口,将事件发送到自己的channel
func (o *ConcreteObserver) Update(event Event) {
    select {
    case o.eventCh <- event:
        // 事件成功发送
    case <-time.After(50 * time.Millisecond): // 设置一个超时,防止长时间阻塞
        fmt.Printf("Observer %s: send event timeout, event: %v\n", o.id, event)
    }
}

// StartListening 启动一个goroutine监听事件channel
func (o *ConcreteObserver) StartListening() {
    go func() {
        fmt.Printf("Observer %s: 开始监听事件...\n", o.id)
        for {
            select {
            case event := <-o.eventCh:
                fmt.Printf("Observer %s: 收到事件 -> %v\n", o.id, event)
                // 模拟处理事件的耗时操作
                time.Sleep(time.Millisecond * 100)
            case <-o.stopCh:
                fmt.Printf("Observer %s: 停止监听。\n", o.id)
                return
            }
        }
    }()
}

// StopListening 停止监听goroutine
func (o *ConcreteObserver) StopListening() {
    close(o.stopCh)
    close(o.eventCh) // 关闭channel,确保goroutine退出
}

// ConcreteSubject 是一个具体的被观察者实现
type ConcreteSubject struct {
    observers map[Observer]struct{} // 使用map来存储观察者,方便查找和删除
    mu        sync.RWMutex          // 读写锁,保护observers map的并发访问
}

// NewConcreteSubject 创建一个新的具体被观察者
func NewConcreteSubject() *ConcreteSubject {
    return &ConcreteSubject{
        observers: make(map[Observer]struct{}),
    }
}

// Register 注册一个观察者
func (s *ConcreteSubject) Register(observer Observer) {
    s.mu.Lock()
    defer s.mu.Unlock()
    s.observers[observer] = struct{}{}
    fmt.Printf("Subject: 注册了观察者 %T\n", observer)
}

// Unregister 注销一个观察者
func (s *ConcreteSubject) Unregister(observer Observer) {
    s.mu.Lock()
    defer s.mu.Unlock()
    if _, ok := s.observers[observer]; ok {
        delete(s.observers, observer)
        fmt.Printf("Subject: 注销了观察者 %T\n", observer)
    }
}

// Notify 通知所有注册的观察者
func (s *ConcreteSubject) Notify(event Event) {
    s.mu.RLock() // 使用读锁,允许多个goroutine同时读取observers map
    // 为了避免在通知过程中修改map导致panic,通常会复制一份观察者列表
    // 但在这里,我们只进行读取操作,所以直接遍历map是安全的
    // 并且,每个通知操作都在单独的goroutine中进行,进一步解耦
    observersToNotify := make([]Observer, 0, len(s.observers))
    for obs := range s.observers {
        observersToNotify = append(observersToNotify, obs)
    }
    s.mu.RUnlock()

    fmt.Printf("Subject: 正在通知事件 -> %v\n", event)
    for _, obs := range observersToNotify {
        // 每个通知操作在一个独立的goroutine中进行,防止一个慢速观察者阻塞所有通知
        go obs.Update(event)
    }
}

func main() {
    subject := NewConcreteSubject()

    obs1 := NewConcreteObserver("Observer-1")
    obs2 := NewConcreteObserver("Observer-2")
    obs3 := NewConcreteObserver("Observer-3")

    obs1.StartListening()
    obs2.StartListening()
    obs3.StartListening()

    subject.Register(obs1)
    subject.Register(obs2)
    subject.Register(obs3)

    fmt.Println("\n--- 第一次事件通知 ---")
    subject.Notify("系统启动完成")
    time.Sleep(time.Second * 1) // 等待事件处理

    fmt.Println("\n--- 注销Observer-2 ---")
    subject.Unregister(obs2)
    obs2.StopListening() // 停止注销的观察者监听

    fmt.Println("\n--- 第二次事件通知 ---")
    subject.Notify("用户登录成功")
    time.Sleep(time.Second * 1) // 等待事件处理

    fmt.Println("\n--- 注册Observer-4 ---")
    obs4 := NewConcreteObserver("Observer-4")
    obs4.StartListening()
    subject.Register(obs4)

    fmt.Println("\n--- 第三次事件通知 ---")
    subject.Notify("数据更新通知")
    time.Sleep(time.Second * 1) // 等待事件处理

    // 停止所有观察者
    obs1.StopListening()
    obs3.StopListening()
    obs4.StopListening()

    fmt.Println("\n所有操作完成。")
    time.Sleep(time.Millisecond * 500) // 确保所有goroutine有时间退出
}
登录后复制

这段代码里,

ConcreteObserver
登录后复制
内部有一个
eventCh
登录后复制
channel,它通过
Update
登录后复制
方法接收事件,然后在一个独立的
StartListening
登录后复制
goroutine 中持续从这个 channel 读取事件并处理。
ConcreteSubject
登录后复制
则维护了一个
observers
登录后复制
map,并通过
Register
登录后复制
Unregister
登录后复制
管理观察者。当
Notify
登录后复制
方法被调用时,它会遍历所有注册的观察者,并为每个观察者启动一个新的goroutine来调用其
Update
登录后复制
方法,从而将事件“投递”出去。

立即学习go语言免费学习笔记(深入)”;

为什么选择Channel而不是回调函数或互斥锁?

我个人觉得,在Go里搞事件通知,用channel简直是水到渠成,比传统的回调函数或者单纯依赖互斥锁要优雅得多。这事儿得从Go的并发哲学说起:“不要通过共享内存来通信,而要通过通信来共享内存。”

首先,回调函数虽然直观,但它本质上是在调用者的goroutine里直接执行被调用者的逻辑。这意味着如果一个回调函数执行得很慢,它会直接阻塞通知者,甚至可能阻塞整个事件链条。更麻烦的是,如果回调函数需要访问共享状态,你还得自己去操心加锁、解锁,很容易就搞出死锁或者竞态条件。Go里虽然也可以用

sync.Mutex
登录后复制
来保护,但当并发量上来,或者逻辑变得复杂时,管理这些锁会变得非常痛苦,代码可读性也会下降。

其次,互斥锁固然是并发控制的利器,但它更多是用来保护共享资源访问的。在观察者模式里,我们关注的是“事件的传递”和“通知的解耦”。如果单纯用互斥锁来保护一个共享的事件队列,虽然能保证数据安全,但事件处理的并发性可能得不到充分发挥,而且通知者和观察者之间的耦合度仍然存在,因为它们都得知道如何操作这个队列和锁。

channel呢?它就是为并发通信而生的。

  1. 天然的并发安全: Channel本身就是并发安全的队列,你往里发数据,它帮你处理好同步问题,不用自己手动加锁解锁。
  2. 解耦: Subject只管往channel里发事件,Observer只管从channel里收事件。它们之间不需要知道对方的具体实现细节,甚至不知道对方是哪个goroutine。这种“只知道channel,不知道对方”的模式,极大降低了耦合。
  3. 非阻塞通知(可选): 就像我代码里写的,你可以给channel加缓冲,甚至用
    select
    登录后复制
    语句实现非阻塞发送,这样即使某个观察者处理慢了,也不会阻塞Subject的通知流程。
  4. Go语言的惯用法: 使用channel来构建并发模型,是Go语言推荐且最符合其设计哲学的做法。代码写出来,Go开发者一看就懂,维护起来也方便。
  5. 易于控制生命周期: 通过关闭channel,可以很方便地通知监听goroutine退出,管理资源的生命周期。

说实话,每次遇到需要事件通知或者生产者-消费者模型的场景,我第一个想到的就是channel。它带来的那种心智负担的减轻,是其他方案很难比拟的。

实现细节:如何处理并发注册与通知?

在实际操作中,尤其是在高并发环境下,处理观察者的注册、注销以及事件通知,有几个细节是必须得注意的,否则很容易踩坑。

知我AI·PC客户端
知我AI·PC客户端

离线运行 AI 大模型,构建你的私有个人知识库,对话式提取文件知识,保证个人文件数据安全

知我AI·PC客户端0
查看详情 知我AI·PC客户端
  1. 保护

    Subject
    登录后复制
    observers
    登录后复制
    map:
    Subject
    登录后复制
    内部的
    observers
    登录后复制
    map是一个共享资源,多个goroutine可能会同时尝试注册、注销或遍历它。这时候,
    sync.RWMutex
    登录后复制
    (读写锁)就派上用场了。

    • Register
      登录后复制
      Unregister
      登录后复制
      方法中,因为涉及到对map的写入(添加或删除元素),我们必须使用
      s.mu.Lock()
      登录后复制
      s.mu.Unlock()
      登录后复制
      来获取写锁,确保同一时间只有一个goroutine能修改map。
    • Notify
      登录后复制
      方法中,我们需要遍历
      observers
      登录后复制
      map。理论上,遍历是读操作,可以用
      s.mu.RLock()
      登录后复制
      s.mu.RUnlock()
      登录后复制
      获取读锁。读写锁允许多个读操作同时进行,但读写操作是互斥的。这样做的好处是,当有大量事件通知时,多个
      Notify
      登录后复制
      调用可以并行地读取观察者列表,提高了并发效率。
  2. 通知时的并发投递:

    Notify
    登录后复制
    方法里,我特意用了一个
    for ... go obs.Update(event)
    登录后复制
    的模式。这意味着每当一个事件被通知时,
    Subject
    登录后复制
    会为每个观察者启动一个独立的goroutine去调用其
    Update
    登录后复制
    方法。这样做有几个非常重要的好处:

    • 防止阻塞: 如果某个观察者的
      Update
      登录后复制
      方法内部处理耗时,或者其内部的
      eventCh
      登录后复制
      满了导致发送阻塞,这个阻塞只会影响到那个特定的通知goroutine,而不会阻塞
      Subject
      登录后复制
      的主通知流程,也不会影响其他观察者的通知。
    • 最大化并发: 所有的观察者可以并行地接收和处理事件,大大提高了事件处理的吞吐量。
    • 读写分离:
      Notify
      登录后复制
      方法中,我首先获取读锁,然后将
      observers
      登录后复制
      map中的观察者复制到一个临时切片
      observersToNotify
      登录后复制
      中,然后立即释放读锁。接着,遍历这个临时切片并启动goroutine进行通知。这样做可以避免在遍历
      map
      登录后复制
      时,
      map
      登录后复制
      被其他goroutine修改(比如注销了一个观察者),从而引发
      panic
      登录后复制
      。虽然Go的map在并发读写时不会直接panic,但并发写入和并发遍历(读写交叉)是未定义的行为,最好避免。复制一份再遍历,是最稳妥的做法。
  3. 观察者内部的Channel管理:

    • 缓冲Channel:
      ConcreteObserver
      登录后复制
      内部的
      eventCh
      登录后复制
      我设置了缓冲(
      make(chan Event, 5)
      登录后复制
      )。这意味着即使
      Observer
      登录后复制
      处理事件的速度稍慢,
      Subject
      登录后复制
      也能连续发送几个事件而不会立即阻塞。合理设置缓冲大小很重要,太小容易阻塞,太大可能浪费内存。
    • 超时处理:
      ConcreteObserver.Update
      登录后复制
      中,我加了一个
      select
      登录后复制
      配合
      time.After
      登录后复制
      的超时机制。如果事件在50毫秒内未能成功发送到
      eventCh
      登录后复制
      (可能是因为channel已满且观察者处理太慢),它会打印一个警告并放弃发送。这是一种防止通知方被慢速观察者永久阻塞的策略。
    • 优雅退出:
      ConcreteObserver
      登录后复制
      还引入了一个
      stopCh chan struct{}
      登录后复制
      。当需要停止观察者时,通过
      close(o.stopCh)
      登录后复制
      发送信号,
      StartListening
      登录后复制
      中的goroutine会捕获到这个信号并退出循环,从而实现资源的优雅释放。同时,也要记得关闭
      eventCh
      登录后复制
      ,确保所有相关的goroutine都能感知到channel关闭并退出。

这些细节,虽然看起来有点繁琐,但它们是构建健壮、高效并发系统的基石。没有它们,代码在高并发场景下很可能出现各种意想不到的问题。

什么时候不适合用Channel实现观察者模式?

尽管用Go的channel来实现观察者模式在多数场景下都显得非常自然和高效,但凡事没有银弹,总有些情况,它可能不是最优解,或者说,你得考虑其他的方案:

  1. 极高扇出且事件数据量微乎其微: 比如说,你有上百万个观察者,而每次通知的事件仅仅是一个布尔值或者一个非常小的整数。在这种极端情况下,为每个通知启动一个goroutine、以及channel本身的开销(虽然Go的goroutine和channel已经非常轻量了),累积起来可能会比直接在一个共享的、经过精心优化且加锁的切片上循环调用回调函数要略高一些。但这通常是过度优化,而且维护起来会更复杂。对于绝大多数业务场景,这点性能差异几乎可以忽略不计。

  2. 跨进程/跨网络通信的“观察者”: Go的channel是进程内的通信机制。如果你需要实现的是一个分布式系统中的事件通知,比如一个服务发布事件,另一个完全独立的微服务订阅并处理,那么channel就无能为力了。这时候,你需要的是专业的消息队列(如Kafka、RabbitMQ、NATS等),或者RPC框架(如gRPC)配合发布订阅模式。它们提供了持久化、可靠传输、负载均衡、服务发现等channel无法提供的特性。

  3. 复杂的双向或多轮交互: 观察者模式的核心是单向的事件通知——Subject通知Observer。如果你的“观察者”和“被观察者”之间需要进行频繁的、复杂的双向通信,或者Observer需要主动查询Subject的内部状态并根据查询结果进行多轮交互,那么单纯的事件通知模式可能就不够了。这种情况下,你可能需要设计更复杂的API接口,或者引入更高级的并发模式(如Actor模型),甚至直接暴露一个包含共享状态的服务,并通过更细粒度的锁来管理。虽然channel可以用来构建复杂的通信流,但如果核心需求是请求-响应而非简单的通知,直接的函数调用或RPC可能更直观。

  4. 严格的事件顺序保证且不允许并发处理: 尽管channel保证了发送和接收的顺序,但如果你在

    Notify
    登录后复制
    方法中为每个观察者启动了独立的goroutine来处理事件(就像我示例中做的那样),那么不同观察者之间对事件的处理顺序就无法保证了,甚至单个观察者内部,如果其
    Update
    登录后复制
    方法没有严格同步,也可能出现乱序。如果业务逻辑要求所有观察者必须严格按照事件发生的顺序,且不能并发处理,那么你就得牺牲一部分并发性,比如让
    Subject
    登录后复制
    在一个goroutine里顺序通知,或者让观察者在一个队列里顺序处理。但这通常是业务约束,而非channel本身的缺陷。

总的来说,channel在Go中实现观察者模式,其优势在于简洁、并发安全、解耦和Go-idiomatic。只有当遇到非常特定的、边缘的性能需求或者跨进程/网络通信时,才需要考虑其他方案。

以上就是Golang观察者模式怎么做 使用channel实现事件通知机制的详细内容,更多请关注php中文网其它相关文章!

最佳 Windows 性能的顶级免费优化软件
最佳 Windows 性能的顶级免费优化软件

每个人都需要一台速度更快、更稳定的 PC。随着时间的推移,垃圾文件、旧注册表数据和不必要的后台进程会占用资源并降低性能。幸运的是,许多工具可以让 Windows 保持平稳运行。

下载
来源:php中文网
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系admin@php.cn
最新问题
开源免费商场系统广告
热门教程
更多>
最新下载
更多>
网站特效
网站源码
网站素材
前端模板
关于我们 免责申明 意见反馈 讲师合作 广告合作 最新更新 English
php中文网:公益在线php培训,帮助PHP学习者快速成长!
关注服务号 技术交流群
PHP中文网订阅号
每天精选资源文章推送
PHP中文网APP
随时随地碎片化学习
PHP中文网抖音号
发现有趣的

Copyright 2014-2025 https://www.php.cn/ All Rights Reserved | php.cn | 湘ICP备2023035733号