jjzjj

kafka中topic的创建和查询

jjt_!! 2023-10-12 原文

启动

kafka的启动依赖zookeeper,先启动zookeeper,再启动kafka

  1. bin/zookeeper-server-start.sh config/zookeeper.properties
  2. bin/kafka-server-start.sh config/server.properties

topic

topic使用文件kafka-topics.sh,基本使用:
命令必须包含一个操作: - list,–describe, - create, --alter或–delete

创建

./kafka-topics.sh --create --zookeeper "kafka001:2181,kafka002:2181,kafka003:2181"  --partitions 1 --replication-factor 1  --topic zaj

新版本需要使用命令:

./kafka-topics.sh --create --bootstrap-server  10.130.44.103:9092   --partitions 1 --replication-factor 1  --topic zaj

和端口
其中参数

  • –create 是创建topic特有的;
  • –zookeeper是一个必须的参数;新版本被--bootstrap-server替换。--bootstrap-server 填写的是kafka server的ip和端口,--zookeeper 填写的是zookeeper的ip
  • –partitions表示创建的消息分区数量;分区数量在集群上平均分配;
  • –replication-factor表示每一个消息分区复制的分数,1表示不复制;当有3个broder时,复制3份则每个broder上一份;

  1. 从命令行中获取要创建的topic名称
  2. 解析命令行指定的topic配置(如果存在的话),配置都是x=a的格式
  3. 若指定了replica-assignment参数表明用户想要自己分配分区副本与broker的映射——通常都不这么做,如果不提供该参数Kafka帮你做这件事情
  4. 检查必要的参数是否已指定,包括:zookeeper, replication-factor,partition和topic
  5. 获取/brokers/ids下所有broker并按照broker id进行升序排序
  6. 在broker上分配各个分区的副本映射 (没有指定replica-assignment参数,这也是默认的情况)
  7. 检查topic名字合法性、自定义配置的合法性,并且要保证每个分区都必须有相同的副本数
  8. 若zookeeper上已有对应的路径存在,直接抛出异常表示该topic已经存在
  9. 确保某个分区的多个副本不会被分配到同一个broker
  10. 若提供了自定义的配置,更新zookeeper的/config/topics/[topic]节点的数据
  11. 创建/brokers/topics/[topic]节点,并将分区副本分配映射数据写入该节点

查看

./kafka-topics.sh --zookeeper "kafka001:2181,kafka002:2181,kafka003:2181" --list\
  • 从zookeeper的/brokers/topics节点下获取所有topic封装成topic集合
  • 遍历该集合,查看每个topic是否是待删除topic——即在/admin/delete_topics下是否存在同名节点。如果是,打印topic已经被标记为删除;否则直接打印topic名称

删除

./kafka-topics.sh --zookeeper "kafka001:2181,kafka002:2181,kafka003:2181" --delete --topic zaj13,zaj14

查看topic详情

[kafka@kafka003 bin]$ ./kafka-topics.sh --zookeeper "kafka001:2181,kafka002:2181,kafka003:2181" --describe --topic topic_test
Topic:topic_test	PartitionCount:3	ReplicationFactor:3	Configs:
	Topic: topic_test	Partition: 0	Leader: 2	Replicas: 2,0,1	Isr: 0,2,1
	Topic: topic_test	Partition: 1	Leader: 0	Replicas: 0,1,2	Isr: 0,2,1
	Topic: topic_test	Partition: 2	Leader: 1	Replicas: 1,2,0	Isr: 0,2,1
[kafka@kafka003 bin]$ ./kafka-topics.sh --zookeeper "kafka001:2181,kafka002:2181,kafka003:2181" --describe --topic sd_call_result
Topic:sd_call_result	PartitionCount:3	ReplicationFactor:1	Configs:
	Topic: sd_call_result	Partition: 0	Leader: 2	Replicas: 2	Isr: 2
	Topic: sd_call_result	Partition: 1	Leader: 0	Replicas: 0	Isr: 0
	Topic: sd_call_result	Partition: 2	Leader: 1	Replicas: 1	Isr: 1

展示信息:

topic名称,有几个分区,几个复制版本

每个消息分区的配置:消息分区的id,消息分区的leader处于哪一个broder,该消息分区的分布情况

“replicas” 信息,在节点1,2,0上,不管node死活,只是列出信息而已.
“isr” 工作中的复制节点的集合. 也就是活的节点的集合.

参考链接:

http://www.cnblogs.com/hopelee/p/7285340.html

https://github.com/Parsely/pykafka

https://cloud.tencent.com/developer/article/1010955


有关kafka中topic的创建和查询的更多相关文章

  1. ruby - ECONNRESET (Whois::ConnectionError) - 尝试在 Ruby 中查询 Whois 时出错 - 2

    我正在用Ruby编写一个简单的程序来检查域列表是否被占用。基本上它循环遍历列表,并使用以下函数进行检查。require'rubygems'require'whois'defcheck_domain(domain)c=Whois::Client.newc.query("google.com").available?end程序不断出错(即使我在google.com中进行硬编码),并打印以下消息。鉴于该程序非常简单,我已经没有什么想法了-有什么建议吗?/Library/Ruby/Gems/1.8/gems/whois-2.0.2/lib/whois/server/adapters/base.

  2. ruby-on-rails - 在 Rails 和 ActiveRecord 中查询时忽略某些字段 - 2

    我知道我可以指定某些字段来使用pluck查询数据库。ids=Item.where('due_at但是我想知道,是否有一种方法可以指定我想避免从数据库查询的某些字段。某种反拔?posts=Post.where(published:true).do_not_lookup(:enormous_field) 最佳答案 Model#attribute_names应该返回列/属性数组。您可以排除其中一些并传递给pluck或select方法。像这样:posts=Post.where(published:true).select(Post.attr

  3. sql - 查询忽略时间戳日期的时间范围 - 2

    我正在尝试查询我的Rails数据库(Postgres)中的购买表,我想查询时间范围。例如,我想知道在所有日期的下午2点到3点之间进行了多少次购买。此表中有一个created_at列,但我不知道如何在不搜索特定日期的情况下完成此操作。我试过:Purchases.where("created_atBETWEEN?and?",Time.now-1.hour,Time.now)但这最终只会搜索今天与那些时间的日期。 最佳答案 您需要使用PostgreSQL'sdate_part/extractfunction从created_at中提取小时

  4. ruby-on-rails - solr 清理查询 - 2

    我在Rails上使用带有ruby​​的solr。一切正常,我只需要知道是否有任何现有代码来清理用户输入,比如以?开头的查询。或* 最佳答案 我不知道执行此操作的任何代码,但理论上可以通过查看parsingcodeinLucene来完成并搜索thrownewParseException(只有16个匹配!)。在实践中,我认为您最好只捕获代码中的任何solr异常并显示“无效查询”消息或类似信息。编辑:这里有几个“sanitizer”:http://pivotallabs.com/users/zach/blog/articles/937-s

  5. ruby-on-rails - Rails 3 在一个查询中包含多个表 - 2

    我正在为锦标赛开发一个Rails应用程序。我在这个查询中使用了三个模型:classPlayertruehas_and_belongs_to_many:tournamentsclassTournament:destroyclassPlayerMatch"Player",:foreign_key=>"player_one"belongs_to:player_two,:class_name=>"Player",:foreign_key=>"player_two"在tournaments_controller的显示操作中,我调用以下查询:Tournament.where(:id=>params

  6. ruby-on-rails - Sunspot:如何对具有不同值的多个字段进行全文查询? - 2

    我想用sunspot重现以下原始solr查询q=exact_term_text:fooORterm_textv:foo*ORalternate_text:bar*但我无法通过标准的太阳黑子界面理解这是否可能以及如何实现,因为看起来:fulltext方法似乎不接受多个文本/搜索字段参数我不知道将什么参数作为第一个参数传递给fulltext,就好像我通过了"foo"或"bar"结果不匹配如果我传递一个空参数,我得到一个q=*:*范围过滤器(例如with(:term).starting_with('foo*')(顾名思义)作为过滤器查询应用,因此不参与评分。似乎可以手动编写字符串(或者可能使

  7. ruby-on-rails - 在不重新查询数据库的情况下重新排序 Rails 中的事件记录? - 2

    例如,假设我有一个名为Products的模型,并且在ProductsController中,我有以下代码用于product_listView以显示已排序的产品。@products=Product.order(params[:order_by])让我们想象一下,在product_listView中,用户可以使用下拉菜单按价格、评级、重量等进行排序。数据库中的产品不会经常更改。我很难理解的是,每次用户选择新的order_by过滤器时,rails是否必须查询,或者rails是否能够以某种方式缓存事件记录以在服务器端重新排序?有没有一种方法可以编写它,以便在用户排序时rails不会重新查询结果

  8. ruby-on-rails - 带句点(或句号)的 Rails 查询字符串。 - 2

    我目前正在尝试了解RoR。我将两个字符串传递到我的Controller中。一个是随机的十六进制字符串,另一个是电子邮件。该项目用于对数据库进行简单的电子邮件验证。我遇到的问题是当我输入如下内容来测试我的页面时:http://signup.testsite.local/confirm/da2fdbb49cf32c6848b0aba0f80fb78c/bob.villa@gmailcom我在:email的参数散列中得到的全部是'bob'。我在gmail和com之间留下了.,因为那样会导致匹配根本不起作用。我的路由匹配如下:match"confirm/:code/:email"=>"conf

  9. ruby - 如何将编码的查询值添加到 URL? - 2

    我正在寻找一种方便实用的方法来将编码值添加到Ruby中的URL查询字符串。目前,我有:require'open-uri'u=URI::HTTP.new("http",nil,"mydomain.example",nil,nil,"/tv",nil,"show="+URI::encode("Rosie&Jim"),nil)pu.to_s#=>"http://mydomain.example/tv?show=Rosie%20&%20Jim"这不是我要找的,因为我需要得到“http://mydomain.example/tv?show=Rosie%20%26%20Jim”,这样show=值就

  10. Linux磁盘分区中物理卷(PV)、卷组(VG)、逻辑卷(LV)创建和(LVM)管理 - 2

    文章目录一基础定义二创建逻辑卷2-1准备物理设备2-2创建物理卷2-3创建卷组2-4创建逻辑卷2-5创建文件系统并挂载文件三扩展卷组和缩减卷组3-1准备物理设备3-2创建物理卷3-3扩展卷组3-4查看卷组的详细信息以验证3-5缩减卷组四扩展逻辑卷4-1检查卷组是否有可用的空间4-2扩展逻辑卷4-3扩展文件系统五删除逻辑卷5-1备份数据5-2卸载文件系统5-3删除逻辑卷5-4删除卷组5-5删除物理卷六LVM逻辑卷缩容6-1缩容注意事项6-2标准缩容步骤一基础定义LVM,LogicalVolumeManger,逻辑卷管理,Linux磁盘分区管理的一种机制,建立在硬盘和分区上的一个逻辑层,提高磁盘分

随机推荐