@KafkaListener原理和动态监听topic
当使用Kafka时可以使用@KafkaListener很方便的对topic进行监听。但是对于在项目启动时,动态增加topic的监听,这种方式就无法实现,因此需要一种动态监听kafka topic的方式。
这种方式需要读取新增的kafka topic,这个不是难点,使用@Schedule注解轮询就可实现,难点在于如何通过代码监听,实现和@KafkaListener同样的效果。

从图中不难理解@KafkaListener从启动到拉取消息的过程,可以看到最终是调用KafkaMessageListenerContainer的start()方法,启动线程调用kafkaConsumer的poll()方法和被注解的方法。
从上面已经可以看出最终是调用KafkaMessageListenerContainer的start()方法进行监听kafka topic的消息,那么我们将动态变化的kafka配置生成一个KafkaMessageListenerContainer,并启动即可。
以下源码是KafkaMessageListenerContainer的构造函数
public KafkaMessageListenerContainer(ConsumerFactory<? super K, ? super V> consumerFactory,
ContainerProperties containerProperties) {
this(null, consumerFactory, containerProperties, (TopicPartitionInitialOffset[]) null);
}
因此我们需要构建ConsumerFactory和ContainerProperties,对于ConsumerFactory,其实现类为DefaultKafkaConsumerFactory,构造函数为:
public DefaultKafkaConsumerFactory(Map<String, Object> configs,
@Nullable Deserializer<K> keyDeserializer,
@Nullable Deserializer<V> valueDeserializer) {
this.configs = new HashMap<>(configs);
this.keyDeserializer = keyDeserializer;
this.valueDeserializer = valueDeserializer;
}
通过kafka的属性和序列化方式即可初始化DefaultKafkaConsumerFactory。
ContainerProperties存放了kafka监听器运行时的相关属性,因此在初始化后,还需要将kafka的相关属性赋值进去。
最后示例代码:
// consumer配置
Map<String, Object> configMap = Maps.newHashMap();
// 采用手动提交的方式
configMap.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
configMap.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 5000);
configMap.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
configMap.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 1000);
configMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "xxxxxx");
configMap.put(ConsumerConfig.GROUP_ID_CONFIG, "test-group");
// kafka监听器
Deserializer<String> stringDeserializer = new StringDeserializer();
DefaultKafkaConsumerFactory<String, String> factory = new DefaultKafkaConsumerFactory<>(configMap, stringDeserializer, stringDeserializer);
ContainerProperties props = new ContainerProperties("test-topic");
props.setMessageListener(new CustomerMsgHandler("test-topic"));
props.setGroupId(configMap.get(ConsumerConfig.GROUP_ID_CONFIG).toString());
props.setAckMode(ContainerProperties.AckMode.MANUAL);
KafkaMessageListenerContainer<String, String> container = new KafkaMessageListenerContainer<>(factory, props);
// 启动
container.start();
对于自己处理消息的类,需要实现AcknowledgingMessageListener的onMessage方法:
@Slf4j
public class CustomerMsgHandler implements AcknowledgingMessageListener<String, String> {
private String topic;
public CustomerMsgHandler(String topic) {
this.topic = topic;
}
@Override
public void onMessage(ConsumerRecord<String, String> data, Acknowledgment acknowledgment) {
// doSomething
// 因为前面设置了手动提交ack的方式,这里需要在消息处理完成后提交ack
acknowledgment.acknowledge();
}
}
以上,可以通过读取配置,实例化KafkaMessageListenerContainer并调用其start()方法,实现动态kafka topic的监听。
有没有办法在Ruby中动态创建数组?例如,假设我想遍历用户输入的书籍数组:books=gets.chomp用户输入:"TheGreatGatsby,CrimeandPunishment,Dracula,Fahrenheit451,PrideandPrejudice,SenseandSensibility,Slaughterhouse-Five,TheAdventuresofHuckleberryFinn"我把它变成一个数组:books_array=books.split(",")现在,对于用户输入的每一本书,我想用Ruby创建一个数组。伪代码来做到这一点:x=0books_array.
我想在IRB中浏览文件系统并让提示更改以反射(reflect)当前工作目录,但我不知道如何在每个命令后进行提示更新。最终,我想在日常工作中更多地使用IRB,让bash溜走。我在我的.irbrc中试过这个:require'fileutils'includeFileUtilsIRB.conf[:PROMPT][:CUSTOM]={:PROMPT_N=>"\e[1m:\e[m",:PROMPT_I=>"\e[1m#{pwd}>\e[m",:PROMPT_S=>"FOO",:PROMPT_C=>"\e[1m#{pwd}>\e[m",:RETURN=>""}IRB.conf[:PROMPT_MO
首先,我使用的是rails3.1.3和来自master的carrierwavegithub仓库的分支。我使用after_init钩子(Hook)来确定基于属性的字段页面模型实例并为这些字段定义属性访问器将值存储在序列化哈希中(希望它清楚我是什么谈论)。这是我正在做的事情的精简版:classPage省略mount_uploader命令让我可以访问我想要的属性。但是当我安装uploader时出现错误消息说“nil类的未定义新方法”我在源代码中读到有方法read_uploader和扩展模块中的write_uploader。我如何必须覆盖这些来制作mount_uploader命令使用我的“虚拟
我正在尝试动态构建一个多维数组。我想要的基本上是这样的(为简单起见写出来):b=0test=[[]]test[b]这给了我错误:NoMethodError:undefinedmethod`test=[[],[],[]]而且它工作正常,但在我的实际使用中,我不会事先知道需要多少个数组。有一个更好的方法吗?谢谢 最佳答案 不需要像您正在使用的索引变量。只需将每个数组附加到您的test数组:irb>test=[]=>[]irb>test[["a","b","c"]]irb>test[["a","b","c"],["d","e","f"]]
如何只加载map边界内的标记gmaps4rails?当然,在平移和/或缩放后加载新的。与此直接相关的是,如何获取map的当前边界和缩放级别? 最佳答案 我是这样做的,我只在用户完成平移或缩放后替换标记,如果您需要不同的行为,请使用不同的事件监听器:在你看来(index.html.erb):{"zoom"=>15,"auto_adjust"=>false,"detect_location"=>true,"center_on_user"=>true}},false,true)%>在View的底部添加:functiongmaps4rail
如何在对象上调用方法名称的嵌套哈希?例如,给定以下哈希:hash={:a=>{:b=>{:c=>:d}}}我想创建一个方法,给定上面的散列,执行以下操作:object.send(:a).send(:b).send(:c).send(:d)我的想法是我需要从一个未知的关联中获取一个特定的属性(这个方法不知道,但程序员知道)。我希望能够指定一个方法链来以嵌套哈希的形式检索该属性。例如:hash={:manufacturer=>{:addresses=>{:first=>:postal_code}}}car.execute_method_hash(hash)=>90210
我有一个ruby程序,我想接受用户创建的方法,并使用该名称创建一个新方法。我试过这个:defmethod_missing(meth,*args,&block)name=meth.to_sclass我收到以下错误:`define_method':interningemptystring(ArgumentError)in'method_missing'有什么想法吗?谢谢。编辑:我以不同的方式让它工作,但我仍然很好奇如何以这种方式做到这一点。这是我的代码:defmethod_missing(meth,*args,&block)Adder.class_evaldodefine_method
假设我们有A、B、C类。Adefself.inherited(sub)#metaprogramminggoeshere#takeclassthathasjustinheritedclassA#andforfooclassesinjectprepare_foo()as#firstlineofmethodthenrunrestofthecodeenddefprepare_foo#=>prepare_foo()neededhere#somecodeendendBprepare_foo()neededhere#somecodeendend如您所见,我正在尝试将foo_prepare()调用注入
这里我想输出带有动态组名的json而不是单词组@tickets.eachdo|group,v|json.group{json.array!vdo|ticket|json.partial!'tickets/ticket',ticket:ticketend}end@ticket是这样的散列{a:[....],b:[.....]}我想要这样的输出{a:[.....],b:[....]} 最佳答案 感谢@AntarrByrd,这个问题有类似的答案:JBuilderdynamickeysformodelattributes使用上面的逻辑我已经
我正在根据Rakefile中的现有测试文件动态生成测试任务。假设您有各种以模式命名的单元测试文件test_.rb.所以我正在做的是创建一个以“测试”命名空间内的文件名命名的任务。使用下面的代码,我可以用raketest:调用所有测试require'rake/testtask'task:default=>'test:all'namespace:testdodesc"Runalltests"Rake::TestTask.new(:all)do|t|t.test_files=FileList['test_*.rb']endFileList['test_*.rb'].eachdo|task|n