jjzjj

Scalding

全部标签

scala - Scalding:解析带 header 的逗号分隔数据

我有以下格式的数据:"header1","header2","header3",..."value11","value12","value13",..."value21","value22","value23",.......在Scalding中解析它的最佳方法是什么?我总共有50多个专栏,但我只对其中的一些感兴趣。我尝试使用Csv("file")导入它,但这不起作用。想到的唯一解决方案是使用TextLine手动解析它并忽略偏移量==0的行。但我相信一定有更好的解决方案。 最佳答案 最后我通过如下手动解析每一行解决了它:deftip

scala - 如何衡量一个scala烫伤程序的运行时间?

我有一个简单的scalding程序来转换我在本地模式下使用com.twitter.scalding.Tool执行的一些数据。valstart=System.nanoTimevalinputPaths=args("input").split(",").toListvalpipe=Tsv(inputPaths(0))//standardpipeoperationsonmydatalike.filter('myField),etc..write(Tsv(args("output")))println("runningtime:"+(System.nanoTime-start)/1e6+"ms

scala - Scalding 示例 WordCount 本地模式

我正在尝试运行Scalding示例字数统计示例。我已按照此github链接执行步骤:-https://github.com/twitter/scalding/wiki/Getting-Started但是我遇到了ClassNotFoundException。下面是我的StackTrace:-[cloudera@localhostscalding-develop]$**sudoscripts/scald.rb--localWordCount--inputinput.txt--output./someOutputFile.tsv**cannotfind/root/.sbt/boot/scal

scala - 使用 scalding 读取多个文件并输出单个文件

这些天我遇到了一个问题,我正在尝试使用scalding从多个文件中读取数据并使用单个文件创建输出。我的代码是这样的:defgetFilesSource(paths:Seq[String])={newMultipleTextLineFiles(paths:_*){overrideprotecteddefcreateHdfsReadTap(hdfsMode:Hdfs):Tap[JobConf,_,_]={valtaps=goodHdfsPaths(hdfsMode).toList.map{path=>CastHfsTap(newHfs(hdfsScheme,path,sinkMode))}

java - 以编程方式确定 Scalding/Cascading Pipe 的字段名称

我正在使用Scalding处理包含许多(>22)个字段的记录。在该过程结束时,我想将最终Pipe的字段名称写到一个文件中。我知道这是可能的,因为Mapper和Reducer日志会显示此信息。我想在工作本身中获取此信息,以将其用作穷人模式的基础。如果这不可能做到,那么是否有一种很好的方法可以将类型安全的PipesAPI用于大型记录(即,无需求助于任意嵌套的元组或案例类)? 最佳答案 .write(Tsv("filename.tsv"),writeHeader=true)通过设置writeHeader=true,您告诉.write函数也

scala 文件名太长

我正在使用scala2.10和gradle1.11我的问题是,当我尝试在hadoop集群中运行时,编译的jar会出现错误。我想在hadoop上运行,因为我使用scalding。异常(exception)情况是:Exceptioninthread"main"java.io.FileNotFoundException:/tmp/hadoop-root/hadoop-unjar6538587701808097105/com/twitter/bijection/GeneratedTupleCollectionInjections$$anon$31$$anonfun$invert$10$$ano

mongodb - 烫伤 MongoDB 连接器

我正在使用Scalding实现ETL,我正在寻找一种简单的方法将Scalding输出转发到MongoDB而不是HDFS。任何建议表示赞赏。谢谢。 最佳答案 这是最近discussedonTwitter.具体见examplecode.据我所知,截至撰写本文时,这还没有打包成随时可用的源代码。 关于mongodb-烫伤MongoDB连接器,我们在StackOverflow上找到一个类似的问题: https://stackoverflow.com/questions

scala - 使用 Storehaus 存储 algebird Bloom Filter

我有一个Spark作业,其最终输出是一个Algebird布隆过滤器,我需要在另一个Spark作业中重用这个布隆过滤器。有没有办法使用TwitterStorehaus将此布隆过滤器存储在kv存储(例如:redis)中并在其他作业中检索它(反序列化为algebird布隆过滤器)? 最佳答案 如果您不打算对bloomfilter进行并发修改,最好的方法是将bloomfilter存储为分布式位集。将键空间视为数组分区的索引,而值是该索引的数组部分。然后你可以用更少的IO进行读写。这基本上需要您在storehausMergeableStore

scala - 使用 Storehaus 存储 algebird Bloom Filter

我有一个Spark作业,其最终输出是一个Algebird布隆过滤器,我需要在另一个Spark作业中重用这个布隆过滤器。有没有办法使用TwitterStorehaus将此布隆过滤器存储在kv存储(例如:redis)中并在其他作业中检索它(反序列化为algebird布隆过滤器)? 最佳答案 如果您不打算对bloomfilter进行并发修改,最好的方法是将bloomfilter存储为分布式位集。将键空间视为数组分区的索引,而值是该索引的数组部分。然后你可以用更少的IO进行读写。这基本上需要您在storehausMergeableStore

scala - 如何将 Scalding ValuePipe 加入 TypedPipe?

我已经改编了scaldingKMeans示例来执行KModes。问题是当作业完成后,我需要将聚类记录与匹配的质心连接起来。KMeans代码使用ValuePipe来保存质心。因此,为了从ValuePipe中取出质心,我对其进行了平面映射。然后我像这样加入:HVKModes(500000,inputSets,10).waitFor(Config.default,mode)match{caseSuccess((a,centroids:ValuePipe[List[LabeledCentroid]],points:TypedPipe[LabeledVector]))=>{valjoined=c