jjzjj

Scalding

全部标签

scala - 下面的热烫预处理和后处理将在哪个hadoop节点上运行?

我有下面的example代码,用于在slading作业运行之前进行一些预处理和一些后处理。由于这些预处理和后处理正在调用一些mysql数据库,我想知道hadoop可能会在哪些hadoop节点上运行它们?(我需要打开从这些节点到数据库的端口)它可以运行任何hadoopdata-node的预处理和后处理吗?我尝试做一些研究但找不到任何迹象,如何通过文档/来源找到它将在哪个节点上运行?(PS工作安排在oozie)preProcessingBeforeJobRuns()//**inwhichhadoopnodewouldthisberun?coulditrunonanydatanode?**l

scala - 基于级联的烫伤(旧版本)计数器

在scalding的旧版本中,其API中仍然没有引入计数器。HadoopCountersInScalding建议如何在烫伤中回退到级联计数器defaddCounter(pipe:Pipe,group:String,counter:String)={pipe.each(()->('addCounter))(fields=>newBaseOperation[Any](fields)withFunction[Any]{defoperate(flowProcess:FlowProcess[_],functionCall:FunctionCall[Any]){try{flowProcess.as

scala - 烫伤:成对比较字符串?

使用Scalding我需要:按前3个字符对字符串字段进行分组使用edit-distance指标(http://en.wikipedia.org/wiki/Edit_distance)比较每组中所有对的字符串将结果写入CSV文件,记录为string;字符串;距离为了对字符串进行分组,我使用了map和groupBy,如下例所示:importcascading.tuple.Fieldsimportcom.twitter.scalding._classScan(args:Args)extendsJob(args){valoutput=TextLine("tmp/out.txt")valword

scala - groupBy toList 元素顺序

我有一个包含多个字段的RichPipe,比方说:'sex'weight'age我需要按“性别”分组,然后获取元组列表(“体重”和“年龄”)。然后我想对每个组的列表执行scanLeft操作,并获得带有“性别”和“结果”的管道。我目前通过这样做来做到这一点pipe.groupBy('sex){_.toList('weight->'weights).toList('age-'ages)}然后将两个列表压缩在一起。我不确定这是最好的方法,而且我不确定列表中值的顺序是否相同,所以当我压缩两个列表时,元组不会混淆错误值。我在文档中没有发现任何相关信息。 最佳答案

hadoop - 基于字段之一将 Scalding TypedPipe 输出到多个目录中的 SequenceFile

我在Hadoop上使用Scalding,我有一个TypedPipe形式的大型数据集,我希望根据其中一个数据字段以block的形式输出。例如数据是,我希望每个类别的数据存储在单独类别的SequenceFile中,例如outPath/cat1,outPath/cat2等。我想要一个MapReduce阶段(或避免循环)。我已阅读有关TemplatedTsv的信息选项在这里:HowtobucketoutputsinScalding这里:HowtooutputdatawithHive-styledirectorystructureinScalding?然而,这仅在您需要Tsv文件而不是Seque

hadoop - 如何找到运行我的工作的确切 hadoop jar 命令?

我正在使用CDH5.4。我正在运行一个从命令行看起来没问题的hadoop作业(当简单地使用hadoopjar运行时)。但是,如果我从yarn运行它,它会以单个映射器和没有缩减器静默完成。我真的怀疑这两个“运行”都在运行完全相同的命令。但是,我想确定这一点。所以我查看日志:(请注意它是一个使用自定义运行器的烫伤工作-当我从命令行运行它时一切正常)。/container_1432733015407_0953_01_000001/container_1432733015407_0953_01_000001/user/stdout/?start=0我看到了类似的东西:Mainclass:org

hadoop - 前 10 个路径缩减图 reduce

我正在做一个需要路径导航图的项目。问题描述:为了提供项目上下文,示例UI应类似于:http://bl.ocks.org/mbostock/4063570.区别在于它将用于站点导航。我的问题是在后端处理数据。对于用户路径A->B->C->D->E我预先计算的数据格式如下所示:Origin:Start:End:LevelAABL1ABCL2ACDL3ADEL4现在,假设我有数百万条这样的记录,其中有100个起源,我可以将它们分组,聚合大小并按大小desc排序并取前10个。因此对于每个起源、开始和级别,我应该有10个记录每一个。因此,对于4个级别的图表,对于图表中给定的起始节点,我将有10.

scala - 从 RichPipe 获取一个值

我有一个包含3个字段的RichPipe:名称:String、时间:Long和值:Int。我需要获取特定名称、时间对的值。我该怎么做?我无法从scalding文档中弄清楚,因为它非常神秘并且找不到任何这样做的例子。 最佳答案 RichPipe不是键值存储,这就是为什么没有关于用作键值存储的文档的原因:)应该考虑RichPipe作为管道-因此如果不首先进入管道的一端并遍历管道直到找到所需的元素,就无法在中间获取数据。此外,这在Scalding中有点痛苦,因为您必须将结果写入磁盘(因为它构建在Hadoop之上),然后从磁盘读取结果以便在您

scala - mutable.Buffer 不适用于类型安全 API 的 Scalding JobTest

我几乎完成了我的Scalding项目,该项目使用类型安全API而不是字段API。在整个项目设置中留给我的最后一个问题是整个Scalding作业本身的集成测试(我已经完成了类型安全外部操作模式的单元测试耶!)。这意味着运行完整的作业并测试我的作业的各种接收器的输出。然而,一些非常奇怪的事情正在发生。在我的typedSink{scala.collection.mutable.Buffer[]=>Unit}似乎我的程序没有看到缓冲区或对缓冲区做任何事情,所以集成测试总是通过,即使它不应该通过。下面是工作本身和有助于阐明正在发生的事情的测试:objectMyJob{valinputArgPat

hadoop - 如何在 Scalding 中一次对多列进行平均?

作为使用Scalding进行某些计算的最后一步,我想计算管道中列的多个平均值。但是下面的代码不起作用myPipe.groupAll{_average('col1,'col2,'col3)}有没有什么方法可以在不进行多次传递的情况下计算此类函数sum、max、average?我很关心性能,但也许Scalding足够聪明,可以通过编程方式检测到这一点。 最佳答案 这个问题在cascading-user中得到了回答论坛。在这里留下答案作为引用myPipe.groupAll{_.average('col1).average('col2).a