我正在使用 Scala 处理 Spark Streaming。我需要使用此行从 HDFS 目录动态读取 .csv 文件:
val lines = ssc.textFileStream("/user/root/")
我使用以下命令行将文件放入 HDFS:
hdfs dfs -put ./head40k.csv
它适用于相对较小的文件。 当我尝试使用更大的一个时,出现此错误:
org.apache.hadoop.ipc.RemoteException(java.io.FileNotFoundException): File does not exist: /user/root/head800k.csv._COPYING
我能理解为什么,但我不知道如何解决。我也试过这个解决方案:
hdfs dfs -put ./head800k.csv /user
hdfs dfs -mv /usr/head800k.csv /user/root
但我的程序不读取文件。 有任何想法吗? 提前致谢
程序:
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.SparkConf
import org.apache.spark.rdd.RDD
import org.apache.spark.mllib.rdd.RDDFunctions._
import scala.sys.process._
import org.apache.spark.mllib.linalg.Vectors
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord}
import java.util.HashMap
import org.apache.hadoop.io.{LongWritable, NullWritable, Text}
import org.apache.hadoop.fs.Path
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat
import kafka.serializer.StringDecoder
import org.apache.spark.streaming._
import org.apache.spark.streaming.kafka._
import org.apache.spark.SparkConf
import StreamingContext._
object Traccia2014{
def main(args: Array[String]){
if (args.length < 2) {
System.err.println(s"""
|Usage: DirectKafkaWordCount <brokers> <test><topicRisultato>
| <brokers> is a list of one or more Kafka brokers
| <topics> is a list of one or more kafka topics to consume from
|
""".stripMargin)
System.exit(1)
}
val Array(brokers,risultato) = args
val sparkConf = new SparkConf().setAppName("Traccia2014")
val ssc = new StreamingContext(sparkConf, Seconds(5))
val lines = ssc.textFileStream("/user/root/")
//val lines= ssc.fileStream[LongWritable, Text, TextInputFormat](directory="/user/root/",
// filter = (path: org.apache.hadoop.fs.Path) => //(!path.getName.endsWith("._COPYING")),newFilesOnly = true)
//********** Definizioni Producer***********
val props = new HashMap[String, Object]()
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers)
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer")
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer")
val producer = new KafkaProducer[String, String](props)
val slice=30
lines.foreachRDD( rdd => {
if(!rdd.isEmpty){
val min=rdd.map(x => x.split(",")(0)).reduce((a, b) => if (a < b) a else b)
if(!min.isEmpty){
val ipDst= rdd.map(x => (((x.split(",")(0).toInt - min.toInt).toLong/slice).round*slice+" "+(x.split(",")(2)),1)).reduceByKey(_ + _)
if(!ipDst.isEmpty){
val ipSrc=rdd.map(x => (((x.split(",")(0).toInt - min.toInt).toLong/slice).round*slice+" "+(x.split(",")(1)),1)).reduceByKey(_ + _)
if(!ipSrc.isEmpty){
val Rapporto=ipSrc.leftOuterJoin(ipDst).mapValues{case (x,y) => x.asInstanceOf[Int] / y.getOrElse(1) }
val RapportoFiltrato=Rapporto.filter{case (key, value) => value > 100 }
println("###(ConsumerScala) CalcoloRapporti: ###")
Rapporto.collect().foreach(println)
val str = Rapporto.collect().mkString("\n")
println(s"###(ConsumerScala) Produco Risultato : ${str}")
val message = new ProducerRecord[String, String](risultato, null, str)
producer.send(message)
Thread.sleep(1000)
}else{
println("src vuoto")
}
}else{
println("dst vuoto")
}
}else{
println("min vuoto")
}
}else
{
println("rdd vuoto")
}
})//foreach
ssc.start()
ssc.awaitTermination()
} }
最佳答案
/user/root/head800k.csv._COPYING 是在复制过程中创建的临时文件。等待复制过程完成,如果没有 _COPYING 后缀即 /user/root/head800k.csv,您将失败。
要在您的 spark-streaming 作业中过滤这些瞬变,您可以使用记录在案的 fileStream 方法 here
例如如下所示
ssc.fileStream[LongWritable, Text, TextInputFormat](
directory="/user/root/",
filter = (path: org.apache.hadoop.fs.Path) => (!path.getName.endsWith("_COPYING")), // add other filters like files starting with dot etc
newFilesOnly = true)
编辑
由于您要将文件从本地文件系统移动到 HDFS,因此最好的解决方案是将文件移动到 HDFS 中的临时暂存位置,然后将它们移动到目标目录。在 HDFS 文件系统中复制或移动应该避免临时文件
关于scala - HDFS : java. io.FileNotFoundException : File does not exist: name. _COPYING,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/42041110/
我正在使用这个:4.times{|i|assert_not_equal("content#{i+2}".constantize,object.first_content)}我之前声明过局部变量content1content2content3content4content5我得到的错误NameError:wrongconstantnamecontent2这个错误是什么意思?我很确定我想要content2=\ 最佳答案 你必须用一个大字母来调用ruby常量:Content2而不是content2。Aconstantnamestart
我真的很习惯使用Ruby编写以下代码:my_hash={}my_hash['test']=1Java中对应的数据结构是什么? 最佳答案 HashMapmap=newHashMap();map.put("test",1);我假设? 关于java-等价于Java中的RubyHash,我们在StackOverflow上找到一个类似的问题: https://stackoverflow.com/questions/22737685/
这里有一个很好的答案解释了如何在Ruby中下载文件而不将其加载到内存中:https://stackoverflow.com/a/29743394/4852737require'open-uri'download=open('http://example.com/image.png')IO.copy_stream(download,'~/image.png')我如何验证下载文件的IO.copy_stream调用是否真的成功——这意味着下载的文件与我打算下载的文件完全相同,而不是下载一半的损坏文件?documentation说IO.copy_stream返回它复制的字节数,但是当我还没有下
“输出”是一个序列化的OpenStruct。定义标题try(:output).try(:data).try(:title)结束什么会更好?:) 最佳答案 或者只是这样:deftitleoutput.data.titlerescuenilend 关于ruby-on-rails-更好的替代方法try(:output).try(:data).try(:name)?,我们在StackOverflow上找到一个类似的问题: https://stackoverflow.c
我正在尝试解析一个文本文件,该文件每行包含可变数量的单词和数字,如下所示:foo4.500bar3.001.33foobar如何读取由空格而不是换行符分隔的文件?有什么方法可以设置File("file.txt").foreach方法以使用空格而不是换行符作为分隔符? 最佳答案 接受的答案将slurp文件,这可能是大文本文件的问题。更好的解决方案是IO.foreach.它是惯用的,将按字符流式传输文件:File.foreach(filename,""){|string|putsstring}包含“thisisanexample”结果的
我正在尝试使用boilerpipe来自JRuby。我看过guide从JRuby调用Java,并成功地将它与另一个Java包一起使用,但无法弄清楚为什么同样的东西不能用于boilerpipe。我正在尝试基本上从JRuby中执行与此Java等效的操作:URLurl=newURL("http://www.example.com/some-location/index.html");Stringtext=ArticleExtractor.INSTANCE.getText(url);在JRuby中试过这个:require'java'url=java.net.URL.new("http://www
我有一个涉及多台机器、消息队列和事务的问题。因此,例如用户点击网页,点击将消息发送到另一台机器,该机器将付款添加到用户的帐户。每秒可能有数千次点击。事务的所有方面都应该是容错的。我以前从未遇到过这样的事情,但一些阅读表明这是一个众所周知的问题。所以我的问题。我假设安全的方法是使用两阶段提交,但协议(protocol)是阻塞的,所以我不会获得所需的性能,我是否正确?我通常写Ruby,但似乎Redis之类的数据库和Rescue、RabbitMQ等消息队列系统对我的帮助不大——即使我实现某种两阶段提交,如果Redis崩溃,数据也会丢失,因为它本质上只是内存。所有这些让我开始关注erlang和
我只想对我一直在思考的这个问题有其他意见,例如我有classuser_controller和classuserclassUserattr_accessor:name,:usernameendclassUserController//dosomethingaboutanythingaboutusersend问题是我的User类中是否应该有逻辑user=User.newuser.do_something(user1)oritshouldbeuser_controller=UserController.newuser_controller.do_something(user1,user2)我
什么是ruby的rack或python的Java的wsgi?还有一个路由库。 最佳答案 来自Python标准PEP333:Bycontrast,althoughJavahasjustasmanywebapplicationframeworksavailable,Java's"servlet"APImakesitpossibleforapplicationswrittenwithanyJavawebapplicationframeworktoruninanywebserverthatsupportstheservletAPI.ht
这篇文章是继上一篇文章“Observability:从零开始创建Java微服务并监控它(一)”的续篇。在上一篇文章中,我们讲述了如何创建一个Javaweb应用,并使用Filebeat来收集应用所生成的日志。在今天的文章中,我来详述如何收集应用的指标,使用APM来监控应用并监督web服务的在线情况。源码可以在地址 https://github.com/liu-xiao-guo/java_observability 进行下载。摄入指标指标被视为可以随时更改的时间点值。当前请求的数量可以改变任何毫秒。你可能有1000个请求的峰值,然后一切都回到一个请求。这也意味着这些指标可能不准确,你还想提取最小/