The Go Programming Language
http://golang.org/
Go Playground
Go Projects
Revel Web Framework
weiweiwitch

go 的 channel 的一个疑问

  •  1
     
  •   weiweiwitch · Mar 20, 2017 · 2619 views
    This topic created in 3367 days ago, the information mentioned may be changed or developed.

    我们现在项目中使用 channel 来实现多个协程间通讯时遇到一个问题,想请教下这里的大牛。

    我们有多个生产端协程(数千个)会不断的产生数据,塞入到 channel 中,然后一个消费端协程不断的从 channel 中获取数据。

    现在,我们的多个生产端协程会不定时的产生短暂的峰值,这个峰值的量很大,而 channel 是有容量的。当 channel 的容量满了后,生产端就阻塞住了。这影响到了生产端的正常逻辑的执行。

    我们考虑过使用 select 和 default 的方式来避免阻塞。但如何在 channel 有空余容量时,生产端协程及时将积压的消息再次推入 channel ?

    由于消费端逻辑的特殊性,我们无法创建多个消费端协程来提高消费的速度。

    13 replies    2017-03-30 21:28:22 +08:00
    Muninn
        1
    Muninn  
       Mar 20, 2017 via Android   ❤️ 2
    这和 golang 没关系 传统的队列一样的 只能加大缓冲或者提高消费能力

    评估下内存够用不 不够了需要改架构持久化
    weiweiwitch
        2
    weiweiwitch  
    OP
       Mar 20, 2017
    @Muninn 内存是够的。我们现在暂时使用了 go-datastructures 的无边界 queue 来缓解这个问题。只是这个数据结构和其他 channel 没法太好的搭配使用。

    绝大部分时间,生产端的产生速度是很缓慢的,所以为了偶尔的波峰为 channel 分配巨量的缓冲,感觉比较浪费。
    znood
        3
    znood  
       Mar 20, 2017   ❤️ 1
    如 1 楼所说,加大缓冲,如果消费能力不足的情况下最好在 channel 和消费者中间加个持久化队列,如 kafka ,如果对延迟要求不是很高可以直接把 channel 换成 kafka
    pkking
        4
    pkking  
       Mar 20, 2017
    可以借鉴下拥塞算法
    PhilC
        5
    PhilC  
       Mar 20, 2017
    你可以看看 nsq 的代码,用 select ,当 channel 满了就写到文件里
    jiumingmao
        6
    jiumingmao  
       Mar 20, 2017
    使用 channel 的 channel a , a 不满的时候生产者定义一个长度为 1 的子 channel b ,往 b 中放一个元素,然后放到 a ; b 满的时候,生产者定义一个长度比较大(需要估计一下峰值大概多大)的子 channel c ,然后数据放入 c ,直接 a 不满,把 c 放入 a 。
    jiumingmao
        7
    jiumingmao  
       Mar 20, 2017
    不过 channel 都会有一个问题,进程挂了就啥都没有了,使用 kafka 可以防止数据丢失。
    iot
        8
    iot  
       Mar 20, 2017
    生产者写入 channel 时候能不能判断下, 如果快满了就再创建一个更大的 channel 替换旧的
    ghbai
        9
    ghbai  
       Mar 21, 2017
    gocrawl(开源爬虫类库)的一种方案
    https://github.com/PuerkitoBio/gocrawl/blob/master/popchannel.go

    ```
    type popChannel chan []*URLContext
    // The stack function ensures the specified URLs are added to the pop channel
    // with minimal blocking (since the channel is stacked, it is virtually equivalent
    // to an infinitely buffered channel).
    func (pc popChannel) stack(cmd ...*URLContext) {
    toStack := cmd
    for {
    select {
    case pc <- toStack:
    return
    case old := <-pc:
    // Content of the channel got emptied and is now in old, so append whatever
    // is in toStack to it, so that it can either be inserted in the channel,
    // or appended to some other content that got through in the meantime.
    toStack = append(old, toStack...)
    }
    }
    }
    ```
    weiweiwitch
        10
    weiweiwitch  
    OP
       Mar 21, 2017
    @ghbai 如果我理解的没问题的话,这个方案是无法保证同一个生产者产生的 cmd 被有序的消费。
    ghbai
        11
    ghbai  
       Mar 21, 2017
    @weiweiwitch 对,是不能保证有序的。
    khowarizmi
        12
    khowarizmi  
       Mar 21, 2017
    在你的 unbounded queue 前后接上两个 channel ,然后用两个 worker 搬数据,伪装成 unbounded channel 。
    gwind
        13
    gwind  
       Mar 30, 2017
    好比一条 TCP 连接达到最大吞吐,你再塞就没有意义。
    建议考虑下 ZeroMQ, nanomsg 等,重新定义模型。
    纯 golang 的 nanomsg : https://github.com/go-mangos/mangos
    About   ·   Help   ·   Advertise   ·   Blog   ·   API   ·   FAQ   ·   Solana   ·   3388 Online   Highest 6679   ·     Select Language
    创意工作者们的社区
    World is powered by solitude
    VERSION: 3.9.8.5 · 62ms · UTC 11:30 · PVG 19:30 · LAX 04:30 · JFK 07:30
    ♥ Do have faith in what you're doing.