我有一个需要从 Redis 实例流式传输其数据的 Storm 拓扑,我尝试运行从单个 Redis 实例读取的拓扑,但似乎没有从 Redis 读取任何内容,当我检查返回的队列时它是空的。我使用的是 Storm 版本 0.9.3。
这是我的 RedisQueueSpout这是一个 Storm spout,它将使用指定的模式(也称为 key )将您的拓扑插入 Redis,每次 Storm 轮询它时,它都会在其中查找输入数据。 spout 将带有 ID 消息的单个字段发送到它后面的任何一个 bolt。
package storm.starter.spout;
import java.util.List;
import java.util.Map;
import redis.clients.jedis.Jedis;
import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.base.BaseRichSpout;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;
import backtype.storm.utils.Utils;
public class RedisQueueSpout extends BaseRichSpout {
static final long serialVersionUID = 737015318988609460L;
private SpoutOutputCollector _collector;
private final String host;
private final int port;
private final String pattern;
private transient JedisQueue jq;
public RedisQueueSpout(String host, int port, String pattern) {
this.host = host;
this.port = port;
this.pattern = pattern;
}
@SuppressWarnings("rawtypes")
public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
_collector = collector;
Jedis newJedis = new Jedis(host, port);
newJedis.connect();
this.jq = new JedisQueue(newJedis, pattern);
}
public void close() {}
public void nextTuple() {
List<String> ret = this.jq.dequeue();
if (ret == null) {
Utils.sleep(5L);
}
else {
System.out.println(ret);
_collector.emit(new Values(ret));
}
}
@Override
public void ack(Object msgId) {}
@Override
public void fail(Object msgId) {}
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("name"));
}
}
这是我的 JedisQueue这是由 Redis 支持的标准队列数据结构的实现。请注意,dequeue 方法有些不同寻常地返回 List<String>。因为这是底层 Jedis 实现返回的内容:这是由于 Redis 能够为单个键存储多个值。
package storm.starter;
import java.util.List;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.exceptions.JedisDataException;
public class JedisQueue {
private transient Jedis jedis;
private final String pattern;
public JedisQueue(Jedis jedis, String pattern) {
this.jedis = jedis;
this.pattern = pattern;
}
public void clear() {
this.jedis.del(this.pattern);
}
public boolean isEmpty() {
return (this.size() == 0);
}
public int size() {
return new Integer(this.jedis.llen(this.pattern).toString());
}
public List<String> toArray() {
return this.jedis.lrange(this.pattern, 0, -1);
}
public void enqueue(String... elems) {
this.jedis.rpush(this.pattern, elems);
}
public List<String> dequeue() {
List<String> out = null;
try {
out = this.jedis.blpop(0, this.pattern);
}
catch (JedisDataException e) {
// It wasn't a list of strings
}
return out;
}
}
代码取自Storm-jedis有关更多信息,您可以查看链接。
这是我的拓扑结构:
package storm.starter;
import org.tomdz.storm.esper.EsperBolt;
import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.StormSubmitter;
import backtype.storm.topology.TopologyBuilder;
import backtype.storm.tuple.Fields;
import backtype.storm.utils.Utils;
import storm.starter.spout.RedisQueueSpout;;
public class NameCountTopology {
public static void main (String[] args) throws Exception {
String host = "10.0.0.251";
int port = 6379;
String pattern = "Name:*";
TopologyBuilder builder = new TopologyBuilder();
EsperBolt bolt = new EsperBolt.Builder().inputs().aliasComponent("spout").toEventType("names").outputs()
.onDefaultStream().emit("nps").statements()
.add("select count(*) as nps from names.win:time_batch(1 sec)").build();
builder.setSpout("spout", new RedisQueueSpout(host,port,pattern),1);
builder.setBolt("count-bolt", bolt, 1).fieldsGrouping("spout", new Fields("name"));
Config conf = new Config();
conf.setDebug(true);
if (args != null && args.length > 0) {
conf.setNumWorkers(1);
StormSubmitter.submitTopology(args[0], conf, builder.createTopology());
} else {
LocalCluster cluster = new LocalCluster();
cluster.submitTopology("name-count-topology", conf, builder.createTopology());
Utils.sleep(300000);
cluster.killTopology("name-count-topology");
cluster.shutdown();
}
}
我的 Redis 键值是使用 HMSET 按以下格式存储的:
HMSET Name:1 NAME Mary YEAR 1880 GENDER F COUNT 7065
HMSET Name:2 NAME Anna YEAR 1880 GENDER F COUNT 2604
...
这是我的主管节点的日志:
2016-05-04 07:37:56 b.s.d.executor [INFO] Opened spout spout:(3)
2016-05-04 07:37:56 b.s.d.executor [INFO] Activating spout spout:(3)
2016-05-04 07:37:56 STDIO [INFO] Queue is empty...
2016-05-04 07:37:56 c.e.e.c.EPServiceProviderImpl [INFO] Initializing engine URI 'org.tomdz.storm.esper.EsperBolt@44d83ea0' version 4.3.0
2016-05-04 07:37:58 b.s.d.executor [INFO] Prepared bolt count-bolt:(2)
2016-05-04 07:38:54 b.s.d.executor [INFO] Processing received message source: __system:-1, stream: __metrics_tick, id: {}, [60]
2016-05-04 07:38:54 b.s.d.task [INFO] Emitting: __system __metrics [#<TaskInfo backtype.storm.metric.api.IMetricsConsumer$TaskInfo@70f9b3ee> [#<DataPoint [__ack-count = {}]> #<DataPoint [memory/heap = {unusedBytes=9418640, usedBytes=14710896, maxBytes=259522560, initBytes=8035520, virtualFreeBytes=244811664, committedBytes=24129536}]> #<DataPoint [__receive = {write_pos=1, read_pos=0, capacity=1024, population=1}]> #<DataPoint [__fail-count = {}]> #<DataPoint [__execute-latency = {}]> #<DataPoint [newWorkerEvent = 1]> #<DataPoint [__emit-count = {}]> #<DataPoint [__execute-count = {}]> #<DataPoint [__sendqueue = {write_pos=-1, read_pos=-1, capacity=1024, population=0}]> #<DataPoint [memory/nonHeap = {unusedBytes=1218808, usedBytes=36529928, maxBytes=224395264, initBytes=24313856, virtualFreeBytes=187865336, committedBytes=37748736}]> #<DataPoint [uptimeSecs = 77.358]> #<DataPoint [__transfer = {write_pos=0, read_pos=0, capacity=1024, population=0}]> #<DataPoint [startTimeSecs = 1.462347457159E9]> #<DataPoint [__process-latency = {}]> #<DataPoint [__transfer-count = {}]>]]
2016-05-04 07:38:54 b.s.d.executor [INFO] Processing received message source: __system:-1, stream: __metrics_tick, id: {}, [60]
2016-05-04 07:38:54 b.s.d.task [INFO] Emitting: __acker __metrics [#<TaskInfo backtype.storm.metric.api.IMetricsConsumer$TaskInfo@19940834> [#<DataPoint [__ack-count = {}]> #<DataPoint [__sendqueue = {write_pos=-1, read_pos=-1, capacity=1024, population=0}]> #<DataPoint [__receive = {write_pos=1, read_pos=0, capacity=1024, population=1}]> #<DataPoint [__process-latency = {}]> #<DataPoint [__transfer-count = {}]> #<DataPoint [__execute-latency = {}]> #<DataPoint [__fail-count = {}]> #<DataPoint [__emit-count = {}]> #<DataPoint [__execute-count = {}]>]]
并且日志一直这样重复。 这是运行拓扑后我的用户界面: storm UI
现在我的问题是为什么 spout 不工作并且没有发出任何东西,似乎没有从 Redis 中获取任何东西。
PS:我已经检查了主机和端口,我可以从 Redis 获取数据,所以我认为与 Redis 的连接没有任何问题。
最佳答案
希望对您有所帮助。
关于redis - 使用带有 Redis 的 Storm 作为数据源,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/37039034/
我正在学习如何使用Nokogiri,根据这段代码我遇到了一些问题:require'rubygems'require'mechanize'post_agent=WWW::Mechanize.newpost_page=post_agent.get('http://www.vbulletin.org/forum/showthread.php?t=230708')puts"\nabsolutepathwithtbodygivesnil"putspost_page.parser.xpath('/html/body/div/div/div/div/div/table/tbody/tr/td/div
我有一个Ruby程序,它使用rubyzip压缩XML文件的目录树。gem。我的问题是文件开始变得很重,我想提高压缩级别,因为压缩时间不是问题。我在rubyzipdocumentation中找不到一种为创建的ZIP文件指定压缩级别的方法。有人知道如何更改此设置吗?是否有另一个允许指定压缩级别的Ruby库? 最佳答案 这是我通过查看rubyzip内部创建的代码。level=Zlib::BEST_COMPRESSIONZip::ZipOutputStream.open(zip_file)do|zip|Dir.glob("**/*")d
类classAprivatedeffooputs:fooendpublicdefbarputs:barendprivatedefzimputs:zimendprotecteddefdibputs:dibendendA的实例a=A.new测试a.foorescueputs:faila.barrescueputs:faila.zimrescueputs:faila.dibrescueputs:faila.gazrescueputs:fail测试输出failbarfailfailfail.发送测试[:foo,:bar,:zim,:dib,:gaz].each{|m|a.send(m)resc
很好奇,就使用rubyonrails自动化单元测试而言,你们正在做什么?您是否创建了一个脚本来在cron中运行rake作业并将结果邮寄给您?git中的预提交Hook?只是手动调用?我完全理解测试,但想知道在错误发生之前捕获错误的最佳实践是什么。让我们理所当然地认为测试本身是完美无缺的,并且可以正常工作。下一步是什么以确保他们在正确的时间将可能有害的结果传达给您? 最佳答案 不确定您到底想听什么,但是有几个级别的自动代码库控制:在处理某项功能时,您可以使用类似autotest的内容获得关于哪些有效,哪些无效的即时反馈。要确保您的提
假设我做了一个模块如下:m=Module.newdoclassCendend三个问题:除了对m的引用之外,还有什么方法可以访问C和m中的其他内容?我可以在创建匿名模块后为其命名吗(就像我输入“module...”一样)?如何在使用完匿名模块后将其删除,使其定义的常量不再存在? 最佳答案 三个答案:是的,使用ObjectSpace.此代码使c引用你的类(class)C不引用m:c=nilObjectSpace.each_object{|obj|c=objif(Class===objandobj.name=~/::C$/)}当然这取决于
我正在尝试使用ruby和Savon来使用网络服务。测试服务为http://www.webservicex.net/WS/WSDetails.aspx?WSID=9&CATID=2require'rubygems'require'savon'client=Savon::Client.new"http://www.webservicex.net/stockquote.asmx?WSDL"client.get_quotedo|soap|soap.body={:symbol=>"AAPL"}end返回SOAP异常。检查soap信封,在我看来soap请求没有正确的命名空间。任何人都可以建议我
关闭。这个问题是opinion-based.它目前不接受答案。想要改进这个问题?更新问题,以便editingthispost可以用事实和引用来回答它.关闭4年前。Improvethisquestion我想在固定时间创建一系列低音和高音调的哔哔声。例如:在150毫秒时发出高音调的蜂鸣声在151毫秒时发出低音调的蜂鸣声200毫秒时发出低音调的蜂鸣声250毫秒的高音调蜂鸣声有没有办法在Ruby或Python中做到这一点?我真的不在乎输出编码是什么(.wav、.mp3、.ogg等等),但我确实想创建一个输出文件。
我在我的项目目录中完成了compasscreate.和compassinitrails。几个问题:我已将我的.sass文件放在public/stylesheets中。这是放置它们的正确位置吗?当我运行compasswatch时,它不会自动编译这些.sass文件。我必须手动指定文件:compasswatchpublic/stylesheets/myfile.sass等。如何让它自动运行?文件ie.css、print.css和screen.css已放在stylesheets/compiled。如何在编译后不让它们重新出现的情况下删除它们?我自己编译的.sass文件编译成compiled/t
我想将html转换为纯文本。不过,我不想只删除标签,我想智能地保留尽可能多的格式。为插入换行符标签,检测段落并格式化它们等。输入非常简单,通常是格式良好的html(不是整个文档,只是一堆内容,通常没有anchor或图像)。我可以将几个正则表达式放在一起,让我达到80%,但我认为可能有一些现有的解决方案更智能。 最佳答案 首先,不要尝试为此使用正则表达式。很有可能你会想出一个脆弱/脆弱的解决方案,它会随着HTML的变化而崩溃,或者很难管理和维护。您可以使用Nokogiri快速解析HTML并提取文本:require'nokogiri'h
我想为Heroku构建一个Rails3应用程序。他们使用Postgres作为他们的数据库,所以我通过MacPorts安装了postgres9.0。现在我需要一个postgresgem并且共识是出于性能原因你想要pggem。但是我对我得到的错误感到非常困惑当我尝试在rvm下通过geminstall安装pg时。我已经非常明确地指定了所有postgres目录的位置可以找到但仍然无法完成安装:$envARCHFLAGS='-archx86_64'geminstallpg--\--with-pg-config=/opt/local/var/db/postgresql90/defaultdb/po