文章目录
本文主要是使用 Java 语言中 spring-kafka 依赖 对 Kafka 进行使用。
使用以下依赖对 Kafka 进行操作:
<!-- https://mvnrepository.com/artifact/org.springframework.kafka/spring-kafka -->
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>2.8.11</version>
</dependency>
需要更改版本的话,可以前往:Maven 仓库
创建项目,先创建一个简单的 Maven 项目,删除无用的包、类之后,使用其作为一个父级项目。
以下内容如果在项目启动时报这个错:
org.yaml.snakeyaml.error.YAMLException: java.nio.charset.MalformedInputException: Input length = 1
把注释删除就可以了。
随后创建SpringBoot模块。选择 Kafka 组件

随后调整该项目的POM依赖为:
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.8.1</version>
<configuration>
<source>11</source>
<target>11</target>
<encoding>UTF-8</encoding>
</configuration>
</plugin>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
注意这一步同时需要将Java版本、Maven版本都调整好。我这里目前使用的是Java11。
我们先看 spring-kafka-demo模块的内容

主要指定集群信息、生产者信息、消费者信息。尤其重要的是序列化方式。
server:
# 优雅停机
shutdown: graceful
spring:
kafka:
# kafka集群信息,多个用逗号间隔
bootstrap-servers: localhost:9092
# 生产者
producer:
# 重试次数,设置大于0的值,则客户端会将发送失败的记录重新发送
retries: 3
#批量处理大小,16K
batch-size: 16384
#缓冲存储大,32M
buffer-memory: 33554432
acks: 1
# 指定消息key和消息体的编码方式:字符串序列化
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
# 消费者
consumer:
# 消费者组
group-id: TestGroup
# 是否自动提交
enable-auto-commit: false
# 消费偏移配置
# none:如果没有为消费者找到先前的offset的值,即没有自动维护偏移量,也没有手动维护偏移量,则抛出异常
# earliest:在各分区下有提交的offset时:从offset处开始消费;在各分区下无提交的offset时:从头开始消费
# latest:在各分区下有提交的offset时:从offset处开始消费;在各分区下无提交的offset时:从最新的数据开始消费
auto-offset-reset: latest
# 指定消息key和消息体的解码方式:字符串反序列化
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
# 监听
listener:
# record:当每一条记录被消费者监听器(ListenerConsumer)处理之后提交
# batch:当每一批poll()的数据被ListenerConsumer处理之后提交
# time:当每一批poll()的数据被ListenerConsumer处理之后,距离上次提交时间大于TIME时提交
# count:当每一批poll()的数据被ListenerConsumer处理之后,被处理record数量大于等于COUNT时提交
# count_time:TIME或COUNT中有一个条件满足时提交
# manual:当每一批poll()的数据被ListenerConsumer处理之后, 手动调用Acknowledgment.acknowledge()后提交
# manual_immediate:手动调用Acknowledgment.acknowledge()后立即提交,一般推荐使用这种
ack-mode: manual_immediate
package org.feng.kafka.sender;
import lombok.extern.slf4j.Slf4j;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
import org.springframework.lang.NonNull;
import org.springframework.stereotype.Component;
import org.springframework.util.concurrent.ListenableFutureCallback;
import javax.annotation.Resource;
/**
* Kafka消息生产者组件
*
* @version v1.0
* @author: fengjinsong
* @date: 2023年03月16日 23时26分
*/
@Slf4j
@Component
public class KafkaProducerComponent {
@Resource
private KafkaTemplate<String, String> kafkaTemplate;
/**
* 预先在 Kafka 中创建好的 topic
*/
public static final String TOPIC = "testTopic";
public void send(String topic, String data) {
kafkaTemplate.send(topic, data)
// 回调
.addCallback(new ListenableFutureCallback<>() {
@Override
public void onFailure(@NonNull Throwable throwable) {
log.error("主题[{}]发送消息[{}]失败", topic, data, throwable);
}
@Override
public void onSuccess(SendResult<String, String> result) {
log.info("主题[{}]发送消息[{}]成功", topic, data);
}
});
}
}
package org.feng.kafka.receiver;
import lombok.extern.slf4j.Slf4j;
import org.feng.kafka.sender.KafkaProducerComponent;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
/**
* 监听消息:消费端
*
* @version v1.0
* @author: fengjinsong
* @date: 2023年03月17日 19时54分
*/
@Slf4j
@Component
public class KafkaConsumerComponent {
@KafkaListener(topics = KafkaProducerComponent.TOPIC)
public void consumerTestTopic(String data) {
log.info("消费者监听到数据:{}", data);
}
}
package org.feng.kafka.controller;
import lombok.extern.slf4j.Slf4j;
import org.feng.kafka.sender.KafkaProducerComponent;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RestController;
import javax.annotation.Resource;
/**
* 发送消息控制器
*
* @version v1.0
* @author: fengjinsong
* @date: 2023年03月17日 19时56分
*/
@Slf4j
@RestController
public class SendMessageController {
@Resource
private KafkaProducerComponent kafkaProducerComponent;
@GetMapping("/send/{data}")
public String send(@PathVariable("data") String data) {
log.info("即将把数据【{}】发送到消息队列", data);
kafkaProducerComponent.send(KafkaProducerComponent.TOPIC, data);
return "send ok";
}
}
package org.feng.kafka;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
public class SpringKafkaDemoApplication {
public static void main(String[] args) {
SpringApplication.run(SpringKafkaDemoApplication.class, args);
}
}
我这边已经启动了 Kafka ,随后在本地再启动本项目,待项目启动后,使用 GET 请求给 Kafka 中扔消息。

使用以上的链接触发。
可以依次观察到日志记录:
即将把数据【测试呢111】发送到消息队列
消费者监听到数据:测试呢111
主题[testTopic]发送消息[测试呢111]成功
其实就是自定义一个对象,直接扔到消息队列里。然后再使用监听器监听到,并作出处理。
核心改变的地方是消息Value 的序列化方式、反序列化方式,更改为:

修改了 group-id、值的序列化、反序列化,以及增加了属性“信任的包”。
你想把哪个类的对象放消息队列,就得在这个包下进行定义这个类。
项目的版本和写法一保持一致。
包括 POM 文件也是一致的。

PS:这里将注释几乎全部去掉了
server:
shutdown: graceful
spring:
kafka:
bootstrap-servers: localhost:9092
producer:
retries: 3
batch-size: 16384
buffer-memory: 33554432
acks: 1
key-serializer: org.apache.kafka.common.serialization.StringSerializer
# 值序列化:使用Json
value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
consumer:
group-id: TestObjectGroup
enable-auto-commit: false
auto-offset-reset: latest
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
# 值反序列化:使用Json
value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer
# 信任的包
properties:
spring:
json:
trusted:
packages: org.feng.entity
listener:
ack-mode: manual_immediate
package org.feng.entity;
/**
* kafka 消息
*
* @version v1.0
* @author: fengjinsong
* @date: 2023年03月17日 20时16分
*/
public interface KafkaMessage {
}
实体实现了 KafkaMessage 规则。
并定义了简单的属性值。
package org.feng.entity;
import lombok.Data;
import java.time.LocalDateTime;
import java.util.Locale;
import java.util.UUID;
/**
* 测试kafka消息对象
*
* @version v1.0
* @author: fengjinsong
* @date: 2023年03月17日 20时18分
*/
@Data
public class TestKafkaMessage implements KafkaMessage {
private LocalDateTime time = LocalDateTime.now();
private String message;
private String business = "test";
private String messageId = UUID.randomUUID().toString().toLowerCase(Locale.ROOT).replaceAll("-", "");
}
package org.feng.producer;
import lombok.NonNull;
import lombok.extern.slf4j.Slf4j;
import org.feng.entity.KafkaMessage;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
import org.springframework.stereotype.Component;
import org.springframework.util.concurrent.ListenableFutureCallback;
import javax.annotation.Resource;
/**
* 生产者
*
* @version v1.0
* @author: fengjinsong
* @date: 2023年03月17日 20时21分
*/
@Slf4j
@Component
public class KafkaObjectSerializerProducerComponent {
/**
* 预先在 Kafka 中创建好的 topic
*/
public static final String TOPIC = "testObjectTopic";
@Resource
private KafkaTemplate<String, ? super KafkaMessage> kafkaTemplate;
public void sendTest(String topic, KafkaMessage kafkaMessage) {
kafkaTemplate.send(topic, kafkaMessage)
// 回调
.addCallback(new ListenableFutureCallback<SendResult<String, ? super KafkaMessage>>() {
@Override
public void onFailure(@NonNull Throwable throwable) {
log.error("主题[{}]发送消息[{}]失败", topic, kafkaMessage, throwable);
}
@Override
public void onSuccess(SendResult<String, ? super KafkaMessage> result) {
log.info("主题[{}]发送消息[{}]成功,发送结果:{}", topic, kafkaMessage, result);
}
});
}
}
package org.feng.consumer;
import lombok.extern.slf4j.Slf4j;
import org.feng.entity.TestKafkaMessage;
import org.feng.producer.KafkaObjectSerializerProducerComponent;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
/**
* 消费者
*
* @version v1.0
* @author: fengjinsong
* @date: 2023年03月17日 20时30分
*/
@Component
@Slf4j
public class KafkaObjectSerializerConsumerComponent {
@KafkaListener(topics = KafkaObjectSerializerProducerComponent.TOPIC)
public void consumerTestTopic(TestKafkaMessage data) {
log.info("消费者监听到数据:{}", data);
}
}
重点在于,消息内容是自定义的 TestKafkaMessage 实例。
package org.feng.controller;
import lombok.extern.slf4j.Slf4j;
import org.feng.entity.TestKafkaMessage;
import org.feng.producer.KafkaObjectSerializerProducerComponent;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RestController;
import javax.annotation.Resource;
/**
* 发送消息控制器
*
* @version v1.0
* @author: fengjinsong
* @date: 2023年03月17日 19时56分
*/
@Slf4j
@RestController
public class SendMessageController {
@Resource
private KafkaObjectSerializerProducerComponent kafkaObjectSerializerProducerComponent;
@GetMapping("/send")
public String send(@RequestBody TestKafkaMessage data) {
log.info("即将把数据【{}】发送到消息队列", data);
kafkaObjectSerializerProducerComponent.sendTest(KafkaObjectSerializerProducerComponent.TOPIC, data);
return "send ok";
}
}
和写法一基本一致(除了类名不同)
我这边已经启动了 Kafka ,随后在本地再启动本项目,待项目启动后,使用 GET 请求给 Kafka 中扔消息。

使用以上的链接触发。
可以依次观察到日志记录:
即将把数据【TestKafkaMessage(time=2023-03-17T21:29:21.564849900, message=消息内容就是我了, business=test, messageId=aa12c2e29bed431090918d971477de16)】发送到消息队列
消费者监听到数据:TestKafkaMessage(time=2023-03-17T21:29:21.564849900, message=消息内容就是我了, business=test, messageId=aa12c2e29bed431090918d971477de16)
主题[testObjectTopic]发送消息[TestKafkaMessage(time=2023-03-17T21:29:21.564849900, message=消息内容就是我了, business=test, messageId=aa12c2e29bed431090918d971477de16)]成功,发送结果:SendResult [producerRecord=ProducerRecord(topic=testObjectTopic, partition=null, headers=RecordHeaders(headers = [RecordHeader(key = __TypeId__, value = [111, 114, 103, 46, 102, 101, 110, 103, 46, 101, 110, 116, 105, 116, 121, 46, 84, 101, 115, 116, 75, 97, 102, 107, 97, 77, 101, 115, 115, 97, 103, 101])], isReadOnly = true), key=null, value=TestKafkaMessage(time=2023-03-17T21:29:21.564849900, message=消息内容就是我了, business=test, messageId=aa12c2e29bed431090918d971477de16), timestamp=null), recordMetadata=testObjectTopic-0@1]
默认情况下,Kafka的日志很多都会打印出来,但是又与我们业务本身无关。需要屏蔽一下。
这里做了简单的处理,使用 logback 设置了日志级别。

logback 文件内容如下:
<?xml version="1.0" encoding="UTF-8" ?>
<configuration debug="false">
<!-- 配置控制台输出 -->
<appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
<encoder class="ch.qos.logback.classic.encoder.PatternLayoutEncoder">
<!-- 格式化输出: %d表示日期, %thread表示线程名, %-5level: 级别从左显示5个字符宽度 %msg:日志消息, %n是换行符 -->
<pattern>%d{yyyy-MM-dd HH:mm:ss.SSS}[%thread] %-5level %logger{50} - %msg%n</pattern>
</encoder>
</appender>
<!-- 日志输出级别 -->
<root level="INFO">
<appender-ref ref="STDOUT"/>
</root>
<!-- 定制化某些包的日志输出级别 -->
<logger name="org.apache.kafka" level="warn" additivity="false"/>
<logger name="org.springframework" level="info">
<appender-ref ref="STDOUT" />
</logger>
</configuration>
效果如下:

发现日志确实少了很多。这样也方便我们后续开发。
细心的朋友们可能已经发现了,以上的实例中,在项目重新启动时,会自动消费几条数据,这主要是因为我们设置了“不自动提交偏移量”,但是程序中又没有去手动提交。
现在我们来处理这个问题,首先是对原先的配置进行微调:
server:
shutdown: graceful
spring:
kafka:
bootstrap-servers: localhost:9092
producer:
retries: 3
batch-size: 16384
buffer-memory: 33554432
acks: 1
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
consumer:
group-id: TestObjectGroup
# 依然使用非自动提交
enable-auto-commit: false
# 修改读取的偏移量的方式
auto-offset-reset: earliest
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer
properties:
spring:
json:
trusted:
packages: org.feng.entity
listener:
ack-mode: manual
# 设置并发量
concurrency: 3
以上修改了读取偏移量的方式为:在各分区下有提交的offset时,从offset处开始消费;在各分区下无提交的offset时:从头开始消费
然后调整监听者的配置 ack-mode: manual,当每一批poll()的数据被消费端处理之后, 手动调用Acknowledgment.acknowledge()后提交。
监听器的写法上也做一下调整:
package org.feng.consumer;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.feng.entity.TestKafkaMessage;
import org.feng.producer.KafkaObjectSerializerProducerComponent;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.stereotype.Component;
/**
* 消费者
*
* @version v1.0
* @author: fengjinsong
* @date: 2023年03月17日 20时30分
*/
@Component
@Slf4j
public class KafkaObjectSerializerConsumerComponent {
@KafkaListener(topics = KafkaObjectSerializerProducerComponent.TOPIC)
public void consumeTestTopicAndCommit(ConsumerRecord<String, TestKafkaMessage> record, Acknowledgment ack) {
try {
log.info("消费者监听到数据:{}", record.value());
// 手动提交
ack.acknowledge();
} catch (Exception e) {
log.info("消费失败,数据:{}", record.value(), e);
}
}
}
监听器的主要调整在于方法入参、消费者处理消息后增加手动提交的操作。
我正在使用Ruby2.1.1和Rails4.1.0.rc1。当执行railsc时,它被锁定了。使用Ctrl-C停止,我得到以下错误日志:~/.rvm/gems/ruby-2.1.1/gems/spring-1.1.2/lib/spring/client/run.rb:47:in`gets':Interruptfrom~/.rvm/gems/ruby-2.1.1/gems/spring-1.1.2/lib/spring/client/run.rb:47:in`verify_server_version'from~/.rvm/gems/ruby-2.1.1/gems/spring-1.1.
我是rails的新手,想在form字段上应用验证。myviewsnew.html.erb.....模拟.rbclassSimulation{:in=>1..25,:message=>'Therowmustbebetween1and25'}end模拟Controller.rbclassSimulationsController我想检查模型类中row字段的整数范围,如果不在范围内则返回错误信息。我可以检查上面代码的范围,但无法返回错误消息提前致谢 最佳答案 关键是您使用的是模型表单,一种显示ActiveRecord模型实例属性的表单。c
rails中是否有任何规定允许站点的所有AJAXPOST请求在没有authenticity_token的情况下通过?我有一个调用Controller方法的JqueryPOSTajax调用,但我没有在其中放置任何真实性代码,但调用成功。我的ApplicationController确实有'request_forgery_protection'并且我已经改变了config.action_controller.consider_all_requests_local在我的environments/development.rb中为false我还搜索了我的代码以确保我没有重载ajaxSend来发送
我的工作要求我为某些测试自动生成电子邮件。我一直在四处寻找,但未能找到可以快速实现的合理解决方案。它需要在outlook而不是其他邮件服务器中,因为我们有一些奇怪的身份验证规则,我们需要保存草稿而不是仅仅发送邮件的选项。显然win32ole可以做到这一点,但我找不到任何相当简单的例子。 最佳答案 假设存储了Outlook凭据并且您设置为自动登录到Outlook,WIN32OLE可以很好地完成此操作:require'win32ole'outlook=WIN32OLE.new('Outlook.Application')message=
我正在使用Ruby,我正在与一个网络端点通信,该端点在发送消息本身之前需要格式化“header”。header中的第一个字段必须是消息长度,它被定义为网络字节顺序中的2二进制字节消息长度。比如我的消息长度是1024。如何将1024表示为二进制双字节? 最佳答案 Ruby(以及Perl和Python等)中字节整理的标准工具是pack和unpack。ruby的packisinArray.您的长度应该是两个字节长,并且按网络字节顺序排列,这听起来像是n格式说明符的工作:n|Integer|16-bitunsigned,network(bi
如果我在模型中设置验证消息validates:name,:presence=>{:message=>'Thenamecantbeblank.'}我如何让该消息显示在闪光警报中,这是我迄今为止尝试过的方法defcreate@message=Message.new(params[:message])if@message.valid?ContactMailer.send_mail(@message).deliverredirect_to(root_path,:notice=>"Thanksforyourmessage,Iwillbeintouchsoon")elseflash[:error]
转自:spring.profiles.active和spring.profiles.include的使用及区别说明下文笔者讲述spring.profiles.active和spring.profiles.include的区别简介说明,如下所示我们都知道,在日常开发中,开发|测试|生产环境都拥有不同的配置信息如:jdbc地址、ip、端口等此时为了避免每次都修改全部信息,我们则可以采用以上的属性处理此类异常spring.profiles.active属性例:配置文件,可使用以下方式定义application-${profile}.properties开发环境配置文件:application-dev
RSpec似乎按顺序匹配方法接收的消息。我不确定如何使以下代码工作:allow(a).toreceive(:f)expect(a).toreceive(:f).with(2)a.f(1)a.f(2)a.f(3)我问的原因是a.f的一些调用是由我的代码的上层控制的,所以我不能对这些方法调用添加期望。 最佳答案 RSpecspy是测试这种情况的一种方式。要监视一个方法,用allowstub,除了方法名称之外没有任何约束,调用该方法,然后expect确切的方法调用。例如:allow(a).toreceive(:f)a.f(2)a.f(1)
s=Socket.new(Socket::AF_INET,Socket::SOCK_STREAM,0)s.connect(Socket.pack_sockaddr_in('port','hostname'))ssl=OpenSSL::SSL::SSLSocket.new(s,sslcert)ssl.connect从这里开始,如果ssl连接和底层套接字仍然是ESTABLISHED,或者它是否在默认值7200之后进入CLOSE_WAIT,我想检查一个线程几秒钟甚至更糟的是在实际上不需要.write()或.read()的情况下关闭。是用select()、IO.select()还是其他方法完成
我以为它们存储在cookie中-但不,检查cookie没有任何结果。session也不存储它们。那么,我在哪里可以找到它们?我需要这个来直接设置它们(而不是通过flashhash)。 最佳答案 它们存储在inyoursessionstore.自rails2.0以来的默认设置是cookie存储,但请检查config/initializers/session_store.rb以检查您是否使用默认设置以外的东西。 关于ruby-on-rails-闪存消息存储在哪里?,我们在StackOverf