jjzjj

go - 暂停 RabbitMQ 消费者

coder 2024-07-08 原文

我正在使用 Go 为 RabbitMQ 编写消费者,它必须暂停消息消费一段时间,然后恢复以再次使用队列中的消息。 在阅读文档时 https://godoc.org/github.com/streadway/amqp我无法确定我需要在我的代码中实现的机制。

这有可能吗?有例子吗?

我的代码片段:

rabbitMQMessages, err = ch.Consume(
        "TestQ",
        "testConsumer",
        false,
        true,
        false,
        false,
        nil,
    )
    failOnError(err, "Failed to register a consumer")

    forever := make(chan bool)

    go func() {
        select {
        case d := <-rabbitMQMessages: // Cheking if messge was recieved
            log.Printf("Received a message: %s", d.Body)
            dotcount := bytes.Count(d.Body, []byte("."))

            err = ch.Flow(false) // Returns error: Exception (540) Reason: "NOT_IMPLEMENTED - active=false
            failOnError(err, "Failed to close channel")

            t := time.Duration(dotcount)
            time.Sleep(t * time.Second)
            log.Printf("Done")

            err = ch.Flow(true)

            d.Ack(false)
        default:
            log.Println("Default section")
        }
    }()

    log.Printf(" [*] Waiting for messages. To exit press CTRL+C")
    <-forever

最佳答案

我想通了。我需要关闭 连接然后重新打开它。这可以防止消息被提前阅读。不确定它是否正确,但它对我有用。添加我的测试代码片段。

func main() {
    var rabbitMQMessages <-chan amqp.Delivery
    var err error
    var rabbitMQ RabbitMQ

    err = rabbitMQ.dial()
    failOnError(err, "Failed to connect to RabbitMQ")
    defer rabbitMQ.Close()

    err = rabbitMQ.setUpChannel()
    failOnError(err, "Failed to open a channel")

    err = rabbitMQ.Consumme()
    failOnError(err, "Failed to consume")

    log.Printf(" [*] Waiting for messages. To exit press CTRL+C")

    rabbitMQMessages = rabbitMQ.GetChan()

    for {
        select {
        case d, ok := <-rabbitMQMessages: // Cheking if messge was recieved
            log.Printf("Chan status at start of function %t", ok)

            if !ok {
                err = rabbitMQ.setUpChannel()
                failOnError(err, "Unable to open channel")
                defer rabbitMQ.Close()

                err = rabbitMQ.Consumme()
                failOnError(err, "Recover. Failed to register a consumer")

                rabbitMQMessages = rabbitMQ.GetChan()

                continue
            }

            log.Printf("Chan status at later of function %t", ok)

            log.Printf("Received a message: %s", d.Body)
            dotcount := bytes.Count(d.Body, []byte("."))
            d.Ack(false)

            err = rabbitMQ.CloseChannel()
            failOnError(err, "Failed to close channel")
            t := time.Duration(dotcount)
            time.Sleep(t * time.Second)
            log.Printf("Done")
        }
    }

}

关于go - 暂停 RabbitMQ 消费者,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/55512484/

有关go - 暂停 RabbitMQ 消费者的更多相关文章

  1. ruby-on-rails - Rails 3 - 过滤器链暂停为 :authentication rendered or redirected - 2

    我仍然收到标题中的“错误”消息,但不知道如何解决。在ApplicationController中,classApplicationController在routes.rb#match'set_activity_account/:id/:value'=>'users#account_activity',:as=>:set_activity_account--thisdoesn'tworkaswell..resources:usersdomemberdoget:action_a,:action_bendcollectiondoget'account_activity'endend和User

  2. ruby-on-rails - Textmate 'Go to symbol' 相当于 Vim - 2

    在Railcasts上,我注意到一个非常有趣的功能“转到符号”窗口。它像Command-T一样工作,但显示当前文件中可用的类和方法。如何在vim中获取它? 最佳答案 尝试:helptags有各种程序和脚本可以生成标记文件。此外,标记文件格式非常简单,因此很容易将sed(1)或类似的脚本组合在一起,无论您使用何种语言,它们都可以生成标记文件。轻松获取标记文件(除了下载生成器之外)的关键在于格式化样式而不是实际解析语法。 关于ruby-on-rails-Textmate'Gotosymbol

  3. ruby - 在 Ruby 中实现生产者消费者模式 - 2

    假设我有200个昂贵的方法调用(每个都有不同的参数)。出于某种原因,我可以并行执行其中的5个调用,但不能更多。我可以一次执行一个,但一次执行5个要快5倍。我想一直执行五件事。不想排五个,等五个都排完了,再排五个。如果我排队A、B、C、D、E并且C先完成,我想立即用F替换它,即使A和B还没有完成。我一直在研究这个问题,因为我可以想象它会定期发生。解决方案似乎是生产者-消费者模式,Ruby在其标准库中内置了一些用于该模式的结构(Queue和SizedQueue)。我玩过代码示例,阅读了一些文档,我想我对它有一个粗略的了解。但是我有一些问题我对我的解决方案没有信心,而且多线程的整个领域对我来

  4. kafka如何动态消费新增topic主题 - 2

    一、解决痛点使用spring-kafka客户端,每次新增topic主题,都需要硬编码客户端并重新发布服务,操作麻烦耗时长。kafkaListener虽可以支持通配符消费topic,缺点是并发数需要手动改并且重启服务。对于业务逻辑相似场景,创建新主题动态监听可以用kafka-batch-starter组件二、组件能力1、新增topic名称为:auto.topic1(由于配置spring.kafka.consumer.prefix为auto,因此只有auto前缀的topic,才会被组件动态监听。)2、应用输出日志,监听到新增auto.topic1,并初始化客户端(主题刷新间隔为10s)3、发新的消

  5. ruby-on-rails - 用于 Ruby 的 vim 中的全局 "Go to definition"? - 2

    自97年以来我一直在使用vi/vim进行各种快速编辑和管理任务,但最近才考虑使用它来替换Netbeans作为我选择的ruby​​编辑器。我发现一件事在Netbeans和Eclipse中非常有用的是Ctrl+Click“转到定义”功能,您可以在其中按住Ctrl键并单击一个类或方法,然后它将带您了解定义。现在,我玩过丰富的ctags和rails.vim,而且很接近,但没有雪茄。这就是我想要的:默认情况下在Netbeans和Eclipse中,您可以在本地rails中按住ctrl并单击本地方法或类项目,但你也可以ctrl+click定义在gems或用Ruby编写的系统库。以Netbeans为例

  6. 绝对详细的 RabbitMQ 实践操作手册(一) - 2

    绝对详细的RabbitMQ实践操作手册,看完本系列就够了。一、什么是MQ?1、MQ的概念2、理解消息队列二、MQ的优势和劣势1、优势和作用2、劣势三、MQ的应用场景四、AMQP五、工作原理一、什么是MQ?1、MQ的概念MQ全称MessageQueue(消息队列),是在消息的传输过程中保存消息的容器。多用于系统之间的异步通信。下面用图来理解异步通信,并阐明与同步通信的区别。同步通信:甲乙两人面对面交流,你一句我一句必须同步进行,两人除此之外不做任何事情异步通信:异步通信相当于通过第三方转述对话,可能有消息的延迟,但不需要二人时刻保持联系,消息传给第三方后,两人可以做其他自己想做的事情,当需要获取

  7. ruby-on-rails - Rails 上的 ruby : How to have multiple submit buttons going to different methods (maybe with with_action? ) - 2

    这个问题在这里已经有了答案:HowdoIcreatemultiplesubmitbuttonsforthesameforminRails?(7个答案)关闭9年前。所以..'save'%>'library'%>然后在我的Controller中:with_actiondo|a|a.savedoenda.librarydoendend问题是只有一个操作被调用...两个submit_tags调用相同的操作...知道为什么吗?或者我如何获得两个按钮以将表单提交给两种不同的方法?

  8. 手把手教你搭建SpringCloud Alibaba之生产者与消费者 - 2

      SpringCloudAlibaba全集文章目录:零、手把手教你搭建SpringCloudAlibaba项目一、手把手教你搭建SpringCloudAlibaba之生产者与消费者二、手把手教你搭建SpringCloudAlibaba之Nacos服务注册中心三、手把手教你搭建SpringCloudAlibaba之Nacos服务配置中心四、手把手教你搭建SpringCloudAlibaba之Nacos服务集群配置五、手把手教你搭建SpringCloudAlibaba之Nacos服务持久化配置六、手把手教你搭建SpringCloudAlibaba之Sentinel实现流量实时监控七、手把手教你搭

  9. DiFi: A Go-as-You-Pay Wi-Fi Access System 精读笔记(三) - 2

    IV.SYSTEMIMPLEMENTATIONWeadoptmodulardesignfollowingtheintegrationofblockchain.Itbringsmoreflexibilitybyseparatingtheimplementationofdifferentfunctionalities,sowecouldleveragetheadvantagesoftheblockchain-basedsmartcontractwhilereducingoverhead.Figure3illustrateshowdifferentmodulesareinvolvedintheint

  10. go-templates - 如何根据表达式有条件地在 Go 模板中设置变量,如果不使用 if 语句包装可能会导致错误 - 2

    问题我该如何做这样的事情:{{$use_ssl:=(ne$.Env.CERT_NAME"")}}其中$.Env.CERT_NAME可能为零/未定义。如果它是零,它给出这个错误:at:errorcallingne:invalidtypeforcomparison注意:我无法控制传递给Go模板的对象,因此必须完全在模板本身内解决这个问题。我尝试过的我试图通过首先检查它是否为非空来变通:{{$use_ssl:=(($.Env.CERT_NAME)&&(ne$.Env.CERT_NAME""))}}但它给出了这个错误:unexpected"&"inoperand所以我切换到这个,这在语法上是允

随机推荐