贝利信息

标题:Go 中实现单通道多消费者(广播式事件分发)的正确方法

日期:2026-01-20 00:00 / 作者:花韻仙語

在 go 中,一个 channel 无法被多个 goroutine 同时“接收”同一消息;默认行为是竞争式消费。要实现“一个事件通知所有监听者”,需通过 fan-out 模式手动广播——即从源 channel 读取一次,再分别写入多个目标 channel。

Go 的 channel 是点对点通信原语,不具备内置广播能力。当你将同一个 incoming channel 同时传给 processEmail 和 processPagerDuty,两个 goroutine 实际上在竞争接收——每次仅有一个能成功读到事件,这正是你观察到“只有第一个 goroutine 收到事件”的根本原因。

要实现真正的“一对多”事件分发(即每个监听者都收到同一份事件副本),必须引入显式广播逻辑。推荐采用经典的 fan-out 模式:由一个中央分发 goroutine 从源 channel 读取事件,然后并发地、独立地将该事件

发送至多个专用 consumer channel。以下是改造后的完整、可运行示例:

package main

import (
    "fmt"
    "time"
)

type Event struct {
    Host    string
    Command string
    Output  string
}

// 全局事件源(只供写入)
var incoming = make(chan Event, 10)

// 各服务专属接收 channel(缓冲避免阻塞分发器)
var (
    emailChan     = make(chan Event, 10)
    pagerDutyChan = make(chan Event, 10)
)

// 【关键】广播分发器:读取一次,发给所有订阅者
func broadcast() {
    for e := range incoming {
        // 并发发送,确保各 consumer 独立接收(不相互阻塞)
        go func(event Event) {
            select {
            case emailChan <- event:
            default:
                fmt.Println("⚠️  emailChan full, dropped event")
            }
        }(e)

        go func(event Event) {
            select {
            case pagerDutyChan <- event:
            default:
                fmt.Println("⚠️  pagerDutyChan full, dropped event")
            }
        }(e)
    }
}

func processEmail(ticker *time.Ticker) {
    for {
        select {
        case t := <-ticker.C:
            fmt.Println("? Email Tick at", t)
        case e := <-emailChan:
            fmt.Println("? EMAIL GOT AN EVENT!")
            fmt.Printf("%+v\n", e)
        }
    }
}

func processPagerDuty(ticker *time.Ticker) {
    for {
        select {
        case t := <-ticker.C:
            fmt.Println("? PagerDuty Tick at", t)
        case e := <-pagerDutyChan:
            fmt.Println("? PAGERDUTY GOT AN EVENT!")
            fmt.Printf("%+v\n", e)
        }
    }
}

func eventAdd() {
    e := Event{
        Host:    "web01-east.domain.com",
        Command: "foo",
        Output:  "bar",
    }
    incoming <- e // 写入源 channel,触发广播
}

func main() {
    // 启动广播器(必须在任何写入前启动)
    go broadcast()

    // 启动各处理器
    emailTicker := time.NewTicker(10 * time.Second)
    go processEmail(emailTicker)

    pdTicker := time.NewTicker(1 * time.Second)
    go processPagerDuty(pdTicker)

    // 模拟 API 调用
    time.AfterFunc(2*time.Second, eventAdd)
    time.AfterFunc(5*time.Second, eventAdd)

    // 保持主 goroutine 运行
    select {}
}

关键设计要点说明:

⚠️ 注意事项:

通过此模式,你既能保持 Go channel 的简洁性,又能精准实现事件广播语义——每个监听者都获得完整、独立的事件副本,真正达成“一个事件,多方响应”。