jjzjj

Streaming

全部标签

scala - Spark Streaming textFileStream 复制

我正在尝试监视HDFS中的存储库以读取和处理复制到它的文件中的数据(将文件从本地系统复制到HDFS我使用hdfsdfs-put),有时它会产生问题:SparkStreaming:java.io.FileNotFoundException:Filedoesnotexist:.COPYING所以我阅读了论坛中的问题和此处的问题SparkStreaming:java.io.FileNotFoundException:Filedoesnotexist:._COPYING_根据我读到的内容,问题与Spark流式传输在文件完成复制到HDFS和Github之前读取文件有关:https://githu

apache-spark - Spark Streaming 创建许多小文件

我已经实现了一个SparkStreaming作业,它将过去6个月收到的事件流式传输到HDFS。它在HDFS中创建许多小文件,我希望它们的每个文件大小为HDFS的128MB(block大小)。如果我使用追加模式,所有数据都将写入一个parquet文件。如何配置Spark为每128MB数据创建一个新的HDFSparquet文件? 最佳答案 Spark会在写入之前在对象上写入与分区一样多的文件。这可能真的很低效。要减少部分文件的总数,试试这个,它会检查对象的总字节大小并将其重新调整为+1最佳大小。importorg.apache.spar

python - 如何为 Amazon EMR 上的 Hadoop Streaming 作业加载额外的 JAR

长话短说我如何上传或指定额外的JAR到AmazonElasticMapReduce(AmazonEMR)上的Hadoop流作业?长版我想分析一组Avro文件(>2000个文件)在AmazonElasticMapReduce(AmazonEMR)上使用Hadoop。这应该是一个简单的练习,通过它我应该对MapReduce和AmazonEMR有一定的信心(我对这两个都是新手)。因为python是我最喜欢的语言,所以我决定使用HadoopStreaming.我在python中构建了一个简单的映射器和缩减器,并在本地Hadoop(单节点安装)上对其进行了测试。我在本地Hadoop安装上发出的命

hadoop - Hadoop Streaming 的向后兼容性

AFAK,HadoopStreaming只支持文本输入,这意味着数据是按行组织的。但是如果我们想要向后兼容,映射器代码将变得困惑,在用C++编写的同一个映射器程序中支持不同版本的日志行。之前考虑过avro或者protobuf,但是streaming模式好像不支持,是这样吗?还有其他解决办法吗? 最佳答案 其他输入/输出格式也可以是used以及Hadoop流。Avrosupport已为HadoopStreaming添加。参见AVRO-808&AVRO-830.还有这个Thread可能会有用。我找不到ProtoBuf的InputForm

apache-spark - Spark Streaming to Hive,每个分区的小文件太多

我有一个批处理间隔为2分钟(可配置)的Spark流作业。此作业从Kafka主题读取并创建数据集并在其上应用模式并将这些记录插入到Hive表中。Spark作业在Hive分区中每个批处理间隔创建一个文件,如下所示:dataset.coalesce(1).write().mode(SaveMode.Append).insertInto(targetEntityName);现在传入的数据不是那么大,如果我将批处理持续时间增加到10分钟左右,那么即使我最终也可能只获得2-3mb的数据,这远小于block大小。这是SparkStreaming中的预期行为。我正在寻找有效的方法来进行后处理以合并所有

python - # 失败的映射任务超出了允许的限制

我正在尝试使用Python进行Hadoop流式处理。我在here的帮助下编写了简单的map和减少脚本。map脚本如下:#!/usr/bin/envpythonimportsys,urllib,retitle_re=re.compile("(.*?)",re.MULTILINE|re.DOTALL|re.IGNORECASE)forlineinsys.stdin:url=line.strip()match=title_re.search(urllib.urlopen(url).read())ifmatch:printurl,"\t",match.group(1).strip()和redu

hadoop - 用于 hadoop 流的组合器 hack

当前版本的hadoop-streaming需要一个用于组合器的Java类,但我在某处读到我们可以使用如下hack:hadoopjar./contrib/streaming/hadoop-0.20.2-streaming.jar-input/testinput-output/testoutput-mapper"python/code/triples-mapper.py|sort|python/code/triples-reducer.py"-reducer/code/triples-reducer.py但是,这似乎行不通。我做错了什么? 最佳答案

ruby - 如何使用 Hadoop 将 XML 转换为 TSV?

我有一个格式非常简单的XML文档,我想将其转换为适合导入Hive的TSV。本文档的格式很简单:00000我有一个有效的Ruby脚本,可以将上述格式的文档正确转换为TSV。就在这里:require"rubygems"require"crack"xml=Crack::XML.parse(File.read("sample.xml"))xml['root']['row'].each{|i|puts"#{i['ID']}#{i['ParentID']}#{i['Url']}#{i['Title']}..."}不幸的是,我需要翻译的文件远远大于此脚本可以处理的文件(>1GB)。这就是Hadoop

java - FSDataInputStream 是否仅限于创建时已经写入的那些字节?

所以我试图了解HDFS中的一些行为。我的目标是设置一个配置,在该配置中我将FSDataOutputStream打开到某个位置,然后在我写入任何字节之前,我的应用程序的其他部分立即将FSDataInputStream打开到同一位置。我的想法是,当我将字节写入FSDataOutputStream、刷新它们并调用“sync()”时,任何有权访问相同位置的FSDataInputStream的人都应该能够读取这些字节。可悲的是,它似乎并没有那样工作。当我以这种方式设置我的代码时:FSDataOutputStreamwriter=fs.create(newPath("/foo/bar"));FSD

用于 Flume 接收器文件的 Hadoop Streaming MapReduce - FileNotFoundException

我遇到以下异常:java.io.FileNotFoundException:Filedoesnotexist:/log1/20131025/2013102509_at1.1382659200021.tmpatorg.apache.hadoop.hdfs.DFSClient$DFSInputStream.fetchLocatedBlocks(DFSClient.java:2006)atorg.apache.hadoop.hdfs.DFSClient$DFSInputStream.openInfo(DFSClient.java:1975)...当MR作业正在运行时。Flume将文件名从xx