参考:Kafka参数
@KafkaListener(id = "11111", groupId = "demo-group",topics = Constants.TOPIC)
public void listen(String msgData) {
LOGGER.info("收到消息" + msgData);
}
@KafkaListener(id = "22222", groupId = "demo-group", clientIdPrefix = "prefix",
topics = Constants.TOPIC)
public void listen2(String msgData) {
LOGGER.info("收到消息" + msgData);
}
@KafkaListener(id = "3333", groupId = "demo-group2", topics = Constants.TOPIC)
public void listen3(String msgData) {
LOGGER.info("收到消息" + msgData);
}
@KafkaListener(id = "4444", groupId = "demo-group2", topics = Constants.TOPIC)
public void listen4(String msgData) {
LOGGER.info("收到消息" + msgData);
}
(1) id: 默认是每个Listener实例的重要标识。
对于整个日志的排查起着至关重要的作用。如果不指定groupId,那么id将直接作为groupId。可以使用另外一个属性 idIsGroup=false关闭,默认是true。
(2) goupId: 每个消费者所属的组。
每个消费者都有自己所属的组。一个组中可以有多个消费者。
一个Topic的分区只能被同一个组下的某个消费者消费。从日志上来看,侧面也反映的消费模式是 Subscribed 订阅模式,不是手动的assign模式。
[Consumer clientId=consumer-1, groupId=demo-group2] Subscribed to topic(s): COLA
[Consumer clientId=consumer-2, groupId=demo-group] Subscribed to topic(s): COLA
[Consumer clientId=consumer-3, groupId=demo-group2] Subscribed to topic(s): COLA
[Consumer clientId=prefix-0, groupId=demo-group] Subscribed to topic(s): COLA
(3) clientIdPrefix: 消费者clientId前缀
@KafkaListener(id = "22222", groupId = "demo-group", clientIdPrefix = "prefix", topics = Constants.TOPIC)
public void listen2(String msgData) {
LOGGER.info("收到消息" + msgData);
}
如下图,共有4个消费者。有个消费者配置了clientIdPrefix属性为"prefix",所以该消费者的clientId以指定的"prefix"开头。如果没有配置,该实例的clientId默认为"consumer"。同时,每个实例的clientId后面挂了一个数字,来标示它在整个kafka集群中的消费者编号,编号从0开始。这里配置了4个消费者,所以消费者实例编号有0、 1、 2、 3。
(4) autoStartup
public @interface KafkaListener ...
/**
* Set to true or false, to override the default setting in the container factory. May
* be a property placeholder or SpEL expression that evaluates to a {@link Boolean} or
* a {@link String}, in which case the {@link Boolean#parseBoolean(String)} is used to
* obtain the value.
* <p>SpEL {@code #{...}} and property place holders {@code ${...}} are supported.
* @return true to auto start, false to not auto start.
* @since 2.2
*/
String autoStartup() default "";
是否自动启动,如果是 false,默认不生效,需要手动唤醒。
看源代码上作者给的的注释:该注解指定的值优先级比工厂里指定的高。
另外可以使用 ${} 占位符的形式,支持配置。
application.yaml:
listener:
auto:
startup: true
java :
@KafkaListener(... containerFactory = "batchContainerFactory",
autoStartup = "${listener.auto.startup}")
public void listen4(List<ConsumerRecord> list, Acknowledgment acknowledgment)...
注:每个消费者实例对象内部持有两个属性。
boolean running
boolean paused
有几个改变状态的方法:
调用start()方法后,running转为true
调用stop()方法后,running转为false
调用pause()方法后,paused转为true
调用resume()方法后,paused转为false
只有running=true 、 paused=false 的消费者实例才能正常消费数据。
注解上的autoStartup改变的是running属性。
@KafkaListener(id = "11111", groupId = "demo-group",
topics = Constants.TOPIC, autoStartup = "false")
public void listen(String msgData) throws InterruptedException {
LOGGER.info("收到消息" + msgData);
Thread.sleep(1000);
}
@Autowired
private KafkaListenerEndpointRegistry registry;
// 获取到id="11111" 的消费实例对象
MessageListenerContainer listenerContainer =
this.registry.getListenerContainer("11111");
listenerContainer.pause(); //paused ==> true
// listenerContainer.stop(); //running==> false
@Autowired
private KafkaListenerEndpointRegistry registry;
// 获取到id="11111" 的消费实例对象
MessageListenerContainer listenerContainer =
this.registry.getListenerContainer("11111");
listenerContainer.pause(); //paused ==> true
// listenerContainer.stop(); //running==> false
@Autowired
private KafkaListenerEndpointRegistry registry;
// 定时器,每天凌晨0点开启监听
@Scheduled(cron = "0 0 0 * * ?")
public void startListener() {
log.info("开启监听");
// 判断监听容器是否启动,未启动则将其启动
if (!registry.getListenerContainer("11111").isRunning()) {
registry.getListenerContainer("11111").start();
}
registry.getListenerContainer("11111").resume();
}
// 定时器,每天早上10点关闭监听
@Scheduled(cron = "0 0 10 * * ?")
public void shutDownListener() {
log.info("关闭监听");
registry.getListenerContainer("11111").pause();
}
@KafkaListener注解能够使用到如下8种方法上面。至于监听单条数据的前4种方法,与批量监听多条数据的后4种方法,主要依据kafka的具体配置。
@KafkaListener(....)
public void listen1(String data)
@KafkaListener(....)
public void listen2(ConsumerRecord<K,V> data)
@KafkaListener(....)
public void listen3(ConsumerRecord<K,V> data, Acknowledgment acknowledgment)
@KafkaListener(....)
public void listen4(ConsumerRecord<K,V> data,
Acknowledgment acknowledgment, Consumer<K,V> consumer)
@KafkaListener(....)
public void listen5(List<String> data)
@KafkaListener(....)
public void listen6(List<ConsumerRecord<K,V>> data)
@KafkaListener(....)
public void listen7(List<ConsumerRecord<K,V>> data, Acknowledgment acknowledgment)
@KafkaListener(....)
public void listen8(List<ConsumerRecord<K,V>> data,
Acknowledgment acknowledgment, Consumer<K,V> consumer)
在application.yaml中配置的kafka参数,以spring.kafka开头的参数族,全部用于kafka默认对象的创建。
所有kafka参数默认封装到对象:KafkaProperties对象中,可使用@Autowired自动注入。
@Autowired
private KafkaProperties properties;
如不特殊指定,默认使用在yaml中的所有spring.kafka.consumer与spring.kafka.listener下的参数。
监听器实例对象自动绑定到上述配置文件,是由于它默认使用的"containerFactory" 是名为"kafkaListenerContainerFactory"的bean。
源码注释如下,如果不特殊指定,则默认的容器工厂将会被使用。
package org.springframework.kafka.annotation;
public @interface KafkaListener ...
/**
* The bean name of the {@link
org.springframework.kafka.config.KafkaListenerContainerFactory}
* to use to create the message listener container
responsible to serve this endpoint.
* <p>If not specified, the default container factory is used, if any.
* @return the container factory bean name.
*/
String containerFactory() default "";
默认的容器工厂代码如下,均为Springboot与Kafka框架提供的类。
这两个bean将spring.kafka.listener与spring.kafka.consumer下的参数全部组装到名为"kafkaListenerContainerFactory"这个bean中。该bean供@KafkaListener标记的监听实例使用。
因此可以得出结论:
如果不想使用默认的"kafkaListenerContainerFactory"容器工厂,则必须手动创建一个"ConcurrentKafkaListenerContainerFactory"类的实例,并且其bean name 不能叫"kafkaListenerContainerFactory"(不然与默认的工厂实例重名了),然后把该对象加入spring容器中。当在使用@KafkaListener标注的监听实例对象时,手动指定该注解"containerFactory"属性为刚才自定义的容器工厂实例bean name。
package org.springframework.boot.autoconfigure.kafka;
class KafkaAnnotationDrivenConfiguration {
@Bean
@ConditionalOnMissingBean
ConcurrentKafkaListenerContainerFactoryConfigurer kafkaListenerContainerFactoryConfigurer() {
ConcurrentKafkaListenerContainerFactoryConfigurer configurer =
new ConcurrentKafkaListenerContainerFactoryConfigurer();
configurer.setKafkaProperties(this.properties);
MessageConverter messageConverterToUse =
(this.properties.getListener().getType().equals(Type.BATCH))
? this.batchMessageConverter : this.messageConverter;
configurer.setMessageConverter(messageConverterToUse);
configurer.setReplyTemplate(this.kafkaTemplate);
configurer.setTransactionManager(this.transactionManager);
configurer.setRebalanceListener(this.rebalanceListener);
configurer.setErrorHandler(this.errorHandler);
configurer.setBatchErrorHandler(this.batchErrorHandler);
configurer.setAfterRollbackProcessor(this.afterRollbackProcessor);
configurer.setRecordInterceptor(this.recordInterceptor);
return configurer;
}
@Bean
@ConditionalOnMissingBean(name = "kafkaListenerContainerFactory")
ConcurrentKafkaListenerContainerFactory<?, ?> kafkaListenerContainerFactory(
ConcurrentKafkaListenerContainerFactoryConfigurer configurer,
ObjectProvider<ConsumerFactory<Object, Object>> kafkaConsumerFactory) {
ConcurrentKafkaListenerContainerFactory<Object, Object> factory =
new ConcurrentKafkaListenerContainerFactory<>();
configurer.configure(factory, kafkaConsumerFactory
.getIfAvailable(() ->
new DefaultKafkaConsumerFactory<>(this.properties.buildConsumerProperties())));
return factory;
}
@Autowired
private KafkaProperties properties;
@Bean("batchContainerFactory")
public ConcurrentKafkaListenerContainerFactory<?, ?> listenerContainer() {
ConcurrentKafkaListenerContainerFactory<?, ?> container =
new ConcurrentKafkaListenerContainerFactory<>();
Map<String, Object> stringObjectMap = this.properties.buildConsumerProperties();
stringObjectMap.put("enable.auto.commit", false);
container.setConsumerFactory(new DefaultKafkaConsumerFactory<>(stringObjectMap));
// 没有topic是否禁止系统启动
container.setMissingTopicsFatal(true);
// 并发
container.setConcurrency(1);
// 批量接收
container.setBatchListener(true);
// 如果消息队列中没有消息,等待timeout毫秒后,调用poll()方法。
container.getContainerProperties().setPollTimeout(5000);
// 设置提交偏移量的方式, MANUAL_IMMEDIATE 表示消费一条提交一次;MANUAL表示批量提交一次
container.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);
// 设置kafka 异常重试次数 第一个参数等待重试时间,第二个参数数提交次数,这里设置不重试,默认重试10次 抛出异常后调用
// factory.setCommonErrorHandler(new DefaultErrorHandler(new FixedBackOff(1000L, 0L)));
return container;
}
@KafkaListener(id = "4444", groupId = "demo-group2", topics = Constants.TOPIC, containerFactory = "batchContainerFactory")
public void listen4(List<ConsumerRecord> list, Acknowledgment acknowledgment) {
LOGGER.info("4444收到消息" + list.size());
acknowledgment.acknowledge();
}
如下,这里我只列出了影响本例的几条参数。
spring:
kafka:
consumer:
enable-auto-commit: true
# max-poll-records: 20
listener:
ack-mode: batch
type: batch
concurrency: 5
如果设置spring.kafka.listener.concurrency为5,共两个消费者,Topic名为"COLA",共8个分区。代码如下。
@KafkaListener(id = "4444", groupId = "demo-group2", topics = "COLA")
public void listen4(List<String> msgData) {
LOGGER.info("收到消息" + msgData);
}
@KafkaListener(id = "5555", groupId = "demo-group2", topics = "COLA")
public void listen5(List<String> msgData) {
LOGGER.info("收到消息" + msgData);
}
@Bean
public NewTopic newTopic() {
return new NewTopic(Constants.TOPIC, 8, (short) 1);
}
系统每个消费者都创建了5个线程,共10个线程。换句话说,每个消费者实例(@KafkaListener标记的方法)同时都会有5个线程在跑。每个线程接收的分区都不一样。
另外,这两个消费者属于同一个组,Topic只有8个分区,2个消费者共10个线程,一个线程消费一个分区,所以必然有两个线程最后属于空闲状态。
从实际结果上来看(下面的日志),没想到系统为id="4444"的消费者实际只分配到了3个分区,有两个线程处于空闲状态。id="5555"的消费者达到了预期,共消费了5个分区,分配到了5个线程!
[4444-2-C-1]: demo-group2: partitions assigned: []
[4444-3-C-1]: demo-group2: partitions assigned: []
[4444-4-C-1]: demo-group2: partitions assigned: [COLA-1]
[4444-1-C-1]: demo-group2: partitions assigned: [COLA-7]
[5555-2-C-1]: demo-group2: partitions assigned: [COLA-3]
[5555-4-C-1]: demo-group2: partitions assigned: [COLA-5]
[5555-3-C-1]: demo-group2: partitions assigned: [COLA-4]
[4444-0-C-1]: demo-group2: partitions assigned: [COLA-6]
[5555-0-C-1]: demo-group2: partitions assigned: [COLA-0]
[5555-1-C-1]: demo-group2: partitions assigned: [COLA-2]
concurrency值对应@KafkaListener的消费者实例线程数目,如果concurrency数量大于partition数量,多出的部分分配不到partition,会被闲置。
设置的并发量不能大于partition的数量,如果需要提高吞吐量,可以通过增加partition的数量达到快速提升吞吐量的效果。
当我在Rails控制台中按向上或向左箭头时,出现此错误:irb(main):001:0>/Users/me/.rvm/gems/ruby-2.0.0-p247/gems/rb-readline-0.4.2/lib/rbreadline.rb:4269:in`blockin_rl_dispatch_subseq':invalidbytesequenceinUTF-8(ArgumentError)我使用rvm来管理我的ruby安装。我正在使用=>ruby-2.0.0-p247[x86_64]我使用bundle来管理我的gem,并且我有rb-readline(0.4.2)(人们推荐的最少
我正在使用Ruby2.1.1和Rails4.1.0.rc1。当执行railsc时,它被锁定了。使用Ctrl-C停止,我得到以下错误日志:~/.rvm/gems/ruby-2.1.1/gems/spring-1.1.2/lib/spring/client/run.rb:47:in`gets':Interruptfrom~/.rvm/gems/ruby-2.1.1/gems/spring-1.1.2/lib/spring/client/run.rb:47:in`verify_server_version'from~/.rvm/gems/ruby-2.1.1/gems/spring-1.1.
我将我的Rails应用程序部署到OpenShift,它运行良好,但我无法在生产服务器上运行“Rails控制台”。它给了我这个错误。我该如何解决这个问题?我尝试更新rubygems,但它也给出了权限被拒绝的错误,我也无法做到。railsc错误:Warning:You'reusingRubygems1.8.24withSpring.UpgradetoatleastRubygems2.1.0andrun`gempristine--all`forbetterstartupperformance./opt/rh/ruby193/root/usr/share/rubygems/rubygems
我是rails的新手,想在form字段上应用验证。myviewsnew.html.erb.....模拟.rbclassSimulation{:in=>1..25,:message=>'Therowmustbebetween1and25'}end模拟Controller.rbclassSimulationsController我想检查模型类中row字段的整数范围,如果不在范围内则返回错误信息。我可以检查上面代码的范围,但无法返回错误消息提前致谢 最佳答案 关键是您使用的是模型表单,一种显示ActiveRecord模型实例属性的表单。c
我的工作要求我为某些测试自动生成电子邮件。我一直在四处寻找,但未能找到可以快速实现的合理解决方案。它需要在outlook而不是其他邮件服务器中,因为我们有一些奇怪的身份验证规则,我们需要保存草稿而不是仅仅发送邮件的选项。显然win32ole可以做到这一点,但我找不到任何相当简单的例子。 最佳答案 假设存储了Outlook凭据并且您设置为自动登录到Outlook,WIN32OLE可以很好地完成此操作:require'win32ole'outlook=WIN32OLE.new('Outlook.Application')message=
我正在使用Ruby,我正在与一个网络端点通信,该端点在发送消息本身之前需要格式化“header”。header中的第一个字段必须是消息长度,它被定义为网络字节顺序中的2二进制字节消息长度。比如我的消息长度是1024。如何将1024表示为二进制双字节? 最佳答案 Ruby(以及Perl和Python等)中字节整理的标准工具是pack和unpack。ruby的packisinArray.您的长度应该是两个字节长,并且按网络字节顺序排列,这听起来像是n格式说明符的工作:n|Integer|16-bitunsigned,network(bi
说在前面这部分我本来是合为一篇来写的,因为目的是一样的,都是通过独立按键来控制LED闪灭本质上是起到开关的作用,即调用函数和中断函数。但是写一篇太累了,我还是决定分为两篇写,这篇是调用函数篇。在本篇中你主要看到这些东西!!!1.调用函数的方法(主要讲语法和格式)2.独立按键如何控制LED亮灭3.程序中的一些细节(软件消抖等)1.调用函数的方法思路还是比较清晰地,就是通过按下按键来控制LED闪灭,即每按下一次,LED取反一次。重要的是,把按键与LED联系在一起。我打算用K1来作为开关,看了一下开发板原理图,K1连接的是单片机的P31口,当按下K1时,P31是与GND相连的,也就是说,当我按下去时
如果我在模型中设置验证消息validates:name,:presence=>{:message=>'Thenamecantbeblank.'}我如何让该消息显示在闪光警报中,这是我迄今为止尝试过的方法defcreate@message=Message.new(params[:message])if@message.valid?ContactMailer.send_mail(@message).deliverredirect_to(root_path,:notice=>"Thanksforyourmessage,Iwillbeintouchsoon")elseflash[:error]
RSpec似乎按顺序匹配方法接收的消息。我不确定如何使以下代码工作:allow(a).toreceive(:f)expect(a).toreceive(:f).with(2)a.f(1)a.f(2)a.f(3)我问的原因是a.f的一些调用是由我的代码的上层控制的,所以我不能对这些方法调用添加期望。 最佳答案 RSpecspy是测试这种情况的一种方式。要监视一个方法,用allowstub,除了方法名称之外没有任何约束,调用该方法,然后expect确切的方法调用。例如:allow(a).toreceive(:f)a.f(2)a.f(1)
在我的Character模型中,我添加了:字符.rbbefore_savedoself.profile_picture_url=asset_path('icon.png')end但是,对于数据库中已存在的所有角色,它们的profile_picture_url为nil。因此,我想进入控制台并遍历所有这些并进行设置。在我试过的控制台中:Character.find_eachdo|c|c.profile_picture_url=asset_path('icon.png')end但这给出了错误:NoMethodError:undefinedmethod`asset_path'formain:O