Apache Kafka是分布式发布-订阅消息系统。
它最初由LinkedIn公司开发,之后成为Apache项目的一部分。
Kafka是一种快速、可扩展的、设计内在就是分布式的,分区的和可复制的提交日志服务。
Apache Kafka与传统消息系统相比,有以下不同:
| 概念 | 解释 |
|---|---|
| Broker | 节点,一个Broker代表是一个Kafka实例节点,多个Broker可以组成Kafka集群 |
| Topic | 主题,等同于消息系统中的队列(queue),一个Topic中存在多个Partition |
| Partition | 分区,构成Kafka存储结构的最小单位 |
| Partition offset | offset为消息偏移量,以Partition为单位,即使在同一个Topic中,不同Partition的offset也是重新开始计算(也就是会重复) |
| Group | 消费者组,一个Group里面包含多个消费者 |
| Message | 消息,是队列中消息的承载体,也就是通信的基本单位,Producer可以向Topic中发送Message |
windows下安装kafka可以参考这一篇博客:https://blog.csdn.net/w546097639/article/details/88578635
在spring boot环境中使用,引入需要依赖的jar包,引入POM文件
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
</dependency>
spring:
kafka:
bootstrap-servers: ${KAFKA_SERVER:IP地址:端口号,IP地址:端口号,IP地址:端口号,IP地址:端口号}
consumer:
auto-commit-interval: ${KAFKA_CONSUMER_AUTO_COMMIT_INTERVAL:5000}
auto-offset-reset: ${KAFKA_CONSUMER_AUTO_OFFSET_RESET:earliest}
enable-auto-commit: ${KAFKA_CONSUMER_ENABLE_AUTO_COMMIT:true}
group-id: ${KAFKA_CONSUMER_GROUPID:default_kafka_group}
producer:
acks: ${KAFKA_PRODUCER_ACKS:all}
batch-size: ${KAFKA_PRODUCER_BATCH:16384}
buffer-memory: ${KAFKA_PRODUCER_BUFFER_MEMORY:33554432}
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
retries: ${KAFKA_PRODUCER_RETRIES:0}
@RestController
@RequestMapping(value = "/kafka/v1")
@Slf4j
public class KafkaProducerController {
public static final String Upstream_C2S_Topic = "Upstream_C2S_Topic";
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
@ResponseBody
@PostMapping(value = "/serviceListChanges", produces = "application/json")
public JSONObject serviceListChanges(@RequestBody JSONObject jsonData) {
log.info("URL = {},vin={}, 请求的jsonObject的值 = {}",
"serviceListChanges", jsonData.getStr("vin"), jsonData);
try {
kafkaTemplate.send(Upstream_C2S_Topic, jsonData.toString());
jsonData.set("success", true);
return jsonData;
} catch (Exception e) {
log.error("KafkaProducerController serviceListChanges error = {}", e.getMessage());
}
return new JSONObject();
}
}
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
</dependency>
spring:
kafka:
bootstrap-servers: ${KAFKA_SERVER:IP地址:端口号,IP地址:端口号,IP地址:端口号,IP地址:端口号}
consumer:
auto-commit-interval: ${KAFKA_CONSUMER_AUTO_COMMIT_INTERVAL:5000}
auto-offset-reset: ${KAFKA_CONSUMER_AUTO_OFFSET_RESET:earliest}
enable-auto-commit: ${KAFKA_CONSUMER_ENABLE_AUTO_COMMIT:true}
group-id: ${KAFKA_CONSUMER_GROUPID:default_kafka_group}
max-poll-records: ${KAFKA_CONSUMER_MAX_POLL_RECORDS:100}
properties:
session:
timeout:
ms: ${KAFKA_CONSUMER_PROPERTIES_SESSION_TIMEOUT_MS:10000}
listener:
concurrency: ${KAFKA_LISTENER_CONCURRENCY:4}
producer:
acks: ${KAFKA_PRODUCER_ACKS:all}
retries: ${KAFKA_PRODUCER_RETRIES:0}
batch-size: ${KAFKA_PRODUCER_BATCH:16384}
buffer-memory: ${KAFKA_PRODUCER_BUFFER_MEMORY:33554432}
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
properties:
autoCommitInterval: ${KAFKA_PRODUCER_PROPERTIES_AUTO_COMMIT_INTERVAL:100}
autoOffsetReset: ${KAFKA_PRODUCER_PROPERTIES_AUTO_OFFSET_RESET:latest}
concurrency: ${KAFKA_PRODUCER_PROPERTIES_CONCURRENCY:10}
enableAutoCommit: ${KAFKA_PRODUCER_PROPERTIES_ENABLE_AUTO_COMMIT:true}
groupId: ${KAFKA_PRODUCER_PROPERTIES_GROUPID:default_kafka_group}
linger: ${KAFKA_PRODUCER_PROPERTIES_LINGER:0}
maxPollRecords: ${KAFKA_PRODUCER_PROPERTIES_MAX_POLL_RECORDS:100}
sessionTimeout: ${KAFKA_PRODUCER_PROPERTIES_SESSION_TIMEOUT:60000}
见底部:配置文件的参数详解
@Slf4j
public abstract class Consumer {
public void listenTopic(ConsumerRecord<String, String> record) {
String topic = record.topic();
String value = record.value();
log.info("kafka的key:{},value:{} ", topic, value);
if (JSONUtil.isJson(value)) {
consumerTopic(topic, value);
}
}
public void add(String value) {
this.consumerTopic(null, value);
}
//执行消费逻辑
public abstract void consumerTopic(String topic, String value);
}
@Component
@Slf4j
public class ServiceListChangesConsumer extends Consumer {
public static final String Upstream_C2S_Topic = "Upstream_C2S_Topic";
@Override
@KafkaListener(topicPattern = Upstream_C2S_Topic)
public void listenTopic(ConsumerRecord<String, String> record) {
super.listenTopic(record);
}
// 执行消费逻辑
@Override
public void consumerTopic(String topic, String value) {
JSONObject jsonObject = JSONUtil.parseObj(value);
String vin = jsonObject.getStr("vin");
log.info("VIN={},ServiceListChangesConsumer消费成功,消息id={} ", vin, jsonObject.getStr("eventID"));
}
}
将两个服务正常运行,看到控制台中输出一些Kafka配置的相关参数
配置如下的ApiFox用于接口调试

点击发送之后,能正常收到后台反馈的业务响应

2023-01-04 22:42:51.805 INFO 10132 — [nio-9091-exec-6] c.d.t.l.g.c.KafkaProducerController : URL = serviceListChanges,vin=LFV2B20L3M4999999, 请求的jsonObject的值 = {“vin”:“LFV2B20L3M4999999”,“time”:“LFV2B20L3M4999999”}

2023-01-04 22:42:51.818 INFO 13800 — [ntainer#0-2-C-1] c.d.tsp.logic.analyze.kafka.Consumer : kafka的key:Upstream_C2S_Topic,value:{“vin”:“LFV2B20L3M4999999”,“time”:“LFV2B20L3M4999999”}
2023-01-04 22:42:51.818 INFO 13800 — [ntainer#0-2-C-1] c.d.t.l.a.c.ServiceListChangesConsumer : VIN=LFV2B20L3M4999999,ServiceListChangesConsumer消费成功,消息id=null

@Component
public class KafkaUtils {
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
//发送消息到kafka
public boolean sendMsg(String topic, String json) {
if (StrUtil.isBlank(topic) || StrUtil.isBlank(json)) {
return false;
}
kafkaTemplate.send(topic, json);
return true;
}
//发送消息到kafka
public boolean sendMsg(String topic, String key, String json) {
if (StrUtil.isBlank(topic) || StrUtil.isBlank(json)) {
return false;
}
kafkaTemplate.send(topic, key, json);
return true;
}
//批量存储
public boolean sendBatchMsg(String topic, List<KafkaMsgBean> msgs) {
if (CollUtil.isEmpty(msgs)) {
return false;
}
msgs.forEach(msg -> {
sendMsg(topic, msg.getDeviceId(), msg.getMsgContent());
});
return true;
}
}
| 序号 | 内容 | 解释 |
|---|---|---|
| 1 | bootstrap-servers | 指定kafka 代理地址,可以多个 |
| 2 | producer | 定义生产者 |
| 3 | batch-size | 每次批量发送消息的数量 |
| 4 | key-serializer | 指定消息key的编解码方式 |
| 5 | value-serializer | 指定消息体的编解码方式 |
spring:
#重要提示:kafka配置,该配置属性将直接注入到KafkaTemplate中
kafka:
bootstrap-servers: 10.200.8.29:9092
#https://kafka.apache.org/documentation/#producerconfigs
producer:
bootstrap-servers: 10.200.8.29:9092
# 可重试错误的重试次数,例如“连接错误”、“无主且未选举出新Leader”
retries: 1 #生产者发送消息失败重试次数
# 多条消息放同一批次,达到多达就让Sender线程发送
batch-size: 16384 # 同一批次内存大小(默认16K)
# 发送消息的速度超过发送到服务器的速度,会导致空间不足。send方法要么被阻塞,要么抛异常
# 取决于如何设置max.block.ms,表示抛出异常前可以阻塞一段时间
buffer-memory: 314572800 #生产者内存缓存区大小(300M = 300*1024*1024)
#acks=0:无论成功还是失败,只发送一次。无需确认
#acks=1:即只需要确认leader收到消息
#acks=all或-1:ISR + Leader都确定收到
acks: 1
key-serializer: org.apache.kafka.common.serialization.StringSerializer #key的编解码方法
value-serializer: org.apache.kafka.common.serialization.StringSerializer #value的编解码方法
#开启事务,但是要求ack为all,否则无法保证幂等性
#transaction-id-prefix: "COLA_TX"
#额外的,没有直接有properties对应的参数,将存放到下面这个Map对象中,一并初始化
properties:
#自定义拦截器,注意,这里结尾时classes(先于分区器,快递先贴了标签再指定地址)
interceptor.classes: cn.com.controller.TimeInterceptor
#自定义分区器
#partitioner.class: com.alibaba.cola.kafka.test.customer.inteceptor.MyPartitioner
#即使达不到batch-size设定的大小,只要超过这个毫秒的时间,一样会发送消息出去
linger.ms: 1000
#最大请求大小,200M = 200*1024*1024,与服务器broker的message.max.bytes最好匹配一致
max.request.size: 209715200
#Producer.send()方法的最大阻塞时间(115秒)
# 发送消息的速度超过发送到服务器的速度,会导致空间不足。send方法要么被阻塞,要么抛异常
# 取决于如何设置max.block.ms,表示抛出异常前可以阻塞一段时间
max.block.ms: 115000
#该配置控制客户端等待服务器的响应的最长时间。
#如果超时之前仍未收到响应,则客户端将在必要时重新发送请求,如果重试次数(retries)已用尽,则会使请求失败。
#此值应大于replica.lag.time.max.ms(broker配置),以减少由于不必要的生产者重试而导致消息重复的可能性。
request.timeout.ms: 115000
#等待send回调的最大时间。常用语重试,如果一定要发送,retries则配Integer.MAX
#如果超过该时间:TimeoutException: Expiring 1 record(s) .. has passed since batch creation
delivery.timeout.ms: 120000
# 生产者在服务器响应之前能发多少个消息,若对消息顺序有严格限制,需要配置为1
# max.in.flight.requests.per.connection: 1
spring:
kafka:
#https://kafka.apache.org/documentation/#consumerconfigs
consumer:
bootstrap-servers: 10.200.8.29:9092
group-id: auto-dev #消费者组
#消费方式: 在有提交记录的时候,earliest与latest是一样的,从提交记录的下一条开始消费
# earliest:无提交记录,从头开始消费
#latest:无提交记录,从最新的消息的下一条开始消费
auto-offset-reset: earliest
enable-auto-commit: false #是否自动提交偏移量offset
auto-commit-interval: 1S #前提是 enable-auto-commit=true。自动提交的频率
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
max-poll-records: 2
properties:
#如果在这个时间内没有收到心跳,该消费者会被踢出组并触发{组再平衡 rebalance}
session.timeout.ms: 120000
#最大消费时间。此决定了获取消息后提交偏移量的最大时间,超过设定的时间(默认5分钟),服务端也会认为该消费者失效。踢出并再平衡
max.poll.interval.ms: 300000
#配置控制客户端等待请求响应的最长时间。
#如果在超时之前没有收到响应,客户端将在必要时重新发送请求,
#或者如果重试次数用尽,则请求失败。
request.timeout.ms: 60000
#订阅或分配主题时,允许自动创建主题。0.11之前,必须设置false
allow.auto.create.topics: true
#poll方法向协调器发送心跳的频率,为session.timeout.ms的三分之一
heartbeat.interval.ms: 40000
#每个分区里返回的记录最多不超max.partitions.fetch.bytes 指定的字节
#0.10.1版本后 如果 fetch 的第一个非空分区中的第一条消息大于这个限制
#仍然会返回该消息,以确保消费者可以进行
#max.partition.fetch.bytes=1048576 #1M
listener:
#当enable.auto.commit的值设置为false时,该值会生效;为true时不会生效
#manual_immediate:需要手动调用Acknowledgment.acknowledge()后立即提交
ack-mode: manual_immediate
missing-topics-fatal: true #如果至少有一个topic不存在,true启动失败。false忽略
#type: single #单条消费?批量消费? #批量消费需要配合 consumer.max-poll-records
type: batch
concurrency: 2 #配置多少,就为为每个消费者实例创建多少个线程。多出分区的线程空闲
template:
default-topic: "COLA"
如何在buildr项目中使用Ruby?我在很多不同的项目中使用过Ruby、JRuby、Java和Clojure。我目前正在使用我的标准Ruby开发一个模拟应用程序,我想尝试使用Clojure后端(我确实喜欢功能代码)以及JRubygui和测试套件。我还可以看到在未来的不同项目中使用Scala作为后端。我想我要为我的项目尝试一下buildr(http://buildr.apache.org/),但我注意到buildr似乎没有设置为在项目中使用JRuby代码本身!这看起来有点傻,因为该工具旨在统一通用的JVM语言并且是在ruby中构建的。除了将输出的jar包含在一个独特的、仅限ruby
我在我的Rails项目中使用Pow和powifygem。现在我尝试升级我的ruby版本(从1.9.3到2.0.0,我使用RVM)当我切换ruby版本、安装所有gem依赖项时,我通过运行railss并访问localhost:3000确保该应用程序正常运行以前,我通过使用pow访问http://my_app.dev来浏览我的应用程序。升级后,由于错误Bundler::RubyVersionMismatch:YourRubyversionis1.9.3,butyourGemfilespecified2.0.0,此url不起作用我尝试过的:重新创建pow应用程序重启pow服务器更新战俘
我已经像这样安装了一个新的Rails项目:$railsnewsite它执行并到达:bundleinstall但是当它似乎尝试安装依赖项时我得到了这个错误Gem::Ext::BuildError:ERROR:Failedtobuildgemnativeextension./System/Library/Frameworks/Ruby.framework/Versions/2.0/usr/bin/rubyextconf.rbcheckingforlibkern/OSAtomic.h...yescreatingMakefilemake"DESTDIR="cleanmake"DESTDIR="
我有一个用户工厂。我希望默认情况下确认用户。但是鉴于unconfirmed特征,我不希望它们被确认。虽然我有一个基于实现细节而不是抽象的工作实现,但我想知道如何正确地做到这一点。factory:userdoafter(:create)do|user,evaluator|#unwantedimplementationdetailshereunlessFactoryGirl.factories[:user].defined_traits.map(&:name).include?(:unconfirmed)user.confirm!endendtrait:unconfirmeddoenden
假设我有这个范围:("aaaaa".."zzzzz")如何在不事先/每次生成整个项目的情况下从范围中获取第N个项目? 最佳答案 一种快速简便的方法:("aaaaa".."zzzzz").first(42).last#==>"aaabp"如果出于某种原因你不得不一遍又一遍地这样做,或者如果你需要避免为前N个元素构建中间数组,你可以这样写:moduleEnumerabledefskip(n)returnto_enum:skip,nunlessblock_given?each_with_indexdo|item,index|yieldit
导读:随着叮咚买菜业务的发展,不同的业务场景对数据分析提出了不同的需求,他们希望引入一款实时OLAP数据库,构建一个灵活的多维实时查询和分析的平台,统一数据的接入和查询方案,解决各业务线对数据高效实时查询和精细化运营的需求。经过调研选型,最终引入ApacheDoris作为最终的OLAP分析引擎,Doris作为核心的OLAP引擎支持复杂地分析操作、提供多维的数据视图,在叮咚买菜数十个业务场景中广泛应用。作者|叮咚买菜资深数据工程师韩青叮咚买菜创立于2017年5月,是一家专注美好食物的创业公司。叮咚买菜专注吃的事业,为满足更多人“想吃什么”而努力,通过美好食材的供应、美好滋味的开发以及美食品牌的孵
华为OD机试题本篇题目:明明的随机数题目输入描述输出描述:示例1输入输出说明代码编写思路最近更新的博客华为od2023|什么是华为od,od薪资待遇,od机试题清单华为OD机试真题大全,用Python解华为机试题|机试宝典【华为OD机试】全流程解析+经验分享,题型分享,防作弊指南华为o
C#实现简易绘图工具一.引言实验目的:通过制作窗体应用程序(C#画图软件),熟悉基本的窗体设计过程以及控件设计,事件处理等,熟悉使用C#的winform窗体进行绘图的基本步骤,对于面向对象编程有更加深刻的体会.Tutorial任务设计一个具有基本功能的画图软件**·包括简单的新建文件,保存,重新绘图等功能**·实现一些基本图形的绘制,包括铅笔和基本形状等,学习橡皮工具的创建**·设计一个合理舒适的UI界面**注明:你可能需要先了解一些关于winform窗体应用程序绘图的基本知识,以及关于GDI+类和结构的知识二.实验环境Windows系统下的visualstudio2017C#窗体应用程序三.
MIMO技术的优缺点优点通过下面三个增益来总体概括:阵列增益。阵列增益是指由于接收机通过对接收信号的相干合并而活得的平均SNR的提高。在发射机不知道信道信息的情况下,MIMO系统可以获得的阵列增益与接收天线数成正比复用增益。在采用空间复用方案的MIMO系统中,可以获得复用增益,即信道容量成倍增加。信道容量的增加与min(Nt,Nr)成正比分集增益。在采用空间分集方案的MIMO系统中,可以获得分集增益,即可靠性性能的改善。分集增益用独立衰落支路数来描述,即分集指数。在使用了空时编码的MIMO系统中,由于接收天线或发射天线之间的间距较远,可认为它们各自的大尺度衰落是相互独立的,因此分布式MIMO
@作者:SYFStrive @博客首页:HomePage📜:微信小程序📌:个人社区(欢迎大佬们加入)👉:社区链接🔗📌:觉得文章不错可以点点关注👉:专栏连接🔗💃:感谢支持,学累了可以先看小段由小胖给大家带来的街舞👉微信小程序(🔥)目录自定义组件-behaviors 1、什么是behaviors 2、behaviors的工作方式 3、创建behavior 4、导入并使用behavior 5、behavior中所有可用的节点 6、同名字段的覆盖和组合规则总结最后自定义组件-behaviors 1、什么是behaviorsbehaviors是小程序中,用于实现