jjzjj

Hadoop MultipleOutPutFormat 和连接查询

coder 2024-01-06 原文

我正在处理一个 hadoop 任务,该任务之前在目录中填充了几个文件说

部分-o 第1部分 第二部分

我根据要求修改了此任务,并使用 MultipleOutputs 捕获更多输出。所以现在目录结构看起来像

第0部分 第1部分 第2部分 输出-1 输出2 输出-3

问题:之前很少有作业使用此目录进行映射侧外部连接,但现在该作业必须仅采用部分 -* 文件进行连接并丢弃其余文件。

我尝试将输入作为“,”分隔的目录 即 /part-1,/part-2,*/part-3

并根据以下表达式 jobConf.set("mapred.join.expr", CompositeInputFormat.compose(outer, KeyValueTextInputFormat.class, path[]))

现在我的路径 [] 包含 5 个整体,以前是 3 个,在某种程度上,起始三个索引具有 /part-1、/part-2、*/part-3 路径和像之前一样休息两个。

我不确定我这样做是否正确,请建议我应该怎么做才能使此连接像以前一样在没有输出-* 文件的情况下工作。

使用上述方法抛出以下异常。

java.io.IOException:子 1 的不一致拆分基数 (12/6) org.apache.hadoop.mapred.join.Parser$CNode.getSplits(Parser.java:369) org.apache.hadoop.mapred.join.CompositeInputFormat.getSplits(CompositeInputFormat.java:117) org.apache.hadoop.mapred.JobClient.writeOldSplits(JobClient.java:810) org.apache.hadoop.mapred.JobClient.submitJobInternal(JobClient.java:781) org.apache.hadoop.mapred.JobClient.submitJob(JobClient.java:730)

JobConf 值如下。

 jobConf.setMapperClass(MyMapper.class);
    jobConf.setReducerClass(MyReducer.class);

  String[] foldersToJoin = StringUtils.split(getInputString(), Constants.COMMA);
    Path[] pathsToJoin =  new Path[foldersToJoin.length];
    int i = 0;
    for(String folder : foldersToJoin){
        pathsToJoin[i++] = new Path(folder);
    }

 FileOutputFormat.setOutputPath(jobConf, new Path("/MyOutPUT"));

    jobConf.setInputFormat(CompositeInputFormat.class);
    jobConf.set("mapred.join.expr", CompositeInputFormat.compose(Constants.OUTER_JOIN_OP,
            KeyValueTextInputFormat.class, pathsToJoin));

    jobConf.setOutputFormat(TextOutputFormat.class);
    jobConf.setOutputKeyClass(Text.class);
    jobConf.setOutputValueClass(Text.class);

    MultipleOutputs.addNamedOutput(jobConf, CHANGE_SET_A,
            TextOutputFormat.class, Text.class, Text.class);
    MultipleOutputs.addNamedOutput(jobConf, CHANGE_SET_B,
            TextOutputFormat.class, Text.class, Text.class);
    MultipleOutputs.addNamedOutput(jobConf,CHANGE_SET_C,
            TextOutputFormat.class, Text.class, Text.class);
    MultipleOutputs.addNamedOutput(jobConf, CHANGE_SET_D,
            TextOutputFormat.class, Text.class, Text.class);

我将文件夹中的所有文件附加为“,”分隔的字符串,后面是代码

  mapred.join.expr = CompositeInputFormat.compose(Constants.OUTER_JOIN_OP,
        KeyValueTextInputFormat.class, pathsToJoin)

   where pathsToJoin =  new Path[]{new Path["/home/hadoop/folder1/part-1"],
  new Path["/home/hadoop/folder1/part-2"],new Path["/home/hadoop/folder1/part-3"],
   new Path["/home/hadoop/folder2"],new Path["/home/hadoop/folder3"] }

所以基本上我只是尝试将 folder1 中存在的 part-* 文件与 folder2 和 folder3 合并

任何文档或指向此类场景的任何链接都会有很大帮助。

最佳答案

好的,我想我现在明白了。

你已经执行了一些将它们的内容输出到一个文件夹的作业:

job1 -> folder1
job2 -> folder2
job3 -> folder3

现在您想使用 CompositeInputFormat 合并每个文件夹中每个 part-r-x 的输出,并在单个映射器中处理

map0 - merged contents of folder1/part-r-0, folder2/part-r-0, folder3/part-r-0
map1 - merged contents of folder1/part-r-1, folder2/part-r-1, folder3/part-r-1
.. and so on ..

增加的复杂性是一个或多个作业使用了 MultipleOutputs,因此您拥有的不是文件夹 1 中的 part-r-x 文件

job1 -> folder1/part-x and folder1/output-x

当您开始使用 CompositeInputFormat 时,它会出错,因为 folder1 的文件比 folder2 和 3 多

在这种情况下,我认为您需要修改 mapred.join.expr 值以使用一些 glob:

// use glob for folder1, to only include the part-x files (ignoring the output-x files)
CompositeInputFormat.compose(Constants.OUTER_JOIN_OP, KeyValueTextInputFormat.class,
   new Path[] {
       new Path('folder1/part-*'),
       new Path('folder2/part-r-*'),
       new Path('folder3/part-r-*'),
   });

关于Hadoop MultipleOutPutFormat 和连接查询,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/9840451/

有关Hadoop MultipleOutPutFormat 和连接查询的更多相关文章

  1. ruby - ECONNRESET (Whois::ConnectionError) - 尝试在 Ruby 中查询 Whois 时出错 - 2

    我正在用Ruby编写一个简单的程序来检查域列表是否被占用。基本上它循环遍历列表,并使用以下函数进行检查。require'rubygems'require'whois'defcheck_domain(domain)c=Whois::Client.newc.query("google.com").available?end程序不断出错(即使我在google.com中进行硬编码),并打印以下消息。鉴于该程序非常简单,我已经没有什么想法了-有什么建议吗?/Library/Ruby/Gems/1.8/gems/whois-2.0.2/lib/whois/server/adapters/base.

  2. ruby - 续集在添加关联时访问many_to_many连接表 - 2

    我正在使用Sequel构建一个愿望list系统。我有一个wishlists和itemstable和一个items_wishlists连接表(该名称是续集选择的名称)。items_wishlists表还有一个用于facebookid的额外列(因此我可以存储opengraph操作),这是一个NOTNULL列。我还有Wishlist和Item具有续集many_to_many关联的模型已建立。Wishlist类也有:selectmany_to_many关联的选项设置为select:[:items.*,:items_wishlists__facebook_action_id].有没有一种方法可以

  3. ruby-on-rails - 在 Rails 和 ActiveRecord 中查询时忽略某些字段 - 2

    我知道我可以指定某些字段来使用pluck查询数据库。ids=Item.where('due_at但是我想知道,是否有一种方法可以指定我想避免从数据库查询的某些字段。某种反拔?posts=Post.where(published:true).do_not_lookup(:enormous_field) 最佳答案 Model#attribute_names应该返回列/属性数组。您可以排除其中一些并传递给pluck或select方法。像这样:posts=Post.where(published:true).select(Post.attr

  4. ruby - 无法在 60 秒内获得稳定的 Firefox 连接 (127.0.0.1 :7055) - 2

    我使用的是Firefox版本36.0.1和Selenium-Webdrivergem版本2.45.0。我能够创建Firefox实例,但无法使用脚本继续进行进一步的操作无法在60秒内获得稳定的Firefox连接(127.0.0.1:7055)错误。有人能帮帮我吗? 最佳答案 我遇到了同样的问题。降级到firefoxv33后一切正常。您可以找到旧版本here 关于ruby-无法在60秒内获得稳定的Firefox连接(127.0.0.1:7055),我们在StackOverflow上找到一个类

  5. sql - 查询忽略时间戳日期的时间范围 - 2

    我正在尝试查询我的Rails数据库(Postgres)中的购买表,我想查询时间范围。例如,我想知道在所有日期的下午2点到3点之间进行了多少次购买。此表中有一个created_at列,但我不知道如何在不搜索特定日期的情况下完成此操作。我试过:Purchases.where("created_atBETWEEN?and?",Time.now-1.hour,Time.now)但这最终只会搜索今天与那些时间的日期。 最佳答案 您需要使用PostgreSQL'sdate_part/extractfunction从created_at中提取小时

  6. ruby - 我的 Ruby IRC 机器人没有连接到 IRC 服务器。我究竟做错了什么? - 2

    require"socket"server="irc.rizon.net"port="6667"nick="RubyIRCBot"channel="#0x40"s=TCPSocket.open(server,port)s.print("USERTesting",0)s.print("NICK#{nick}",0)s.print("JOIN#{channel}",0)这个IRC机器人没有连接到IRC服务器,我做错了什么? 最佳答案 失败并显示此消息::irc.shakeababy.net461*USER:Notenoughparame

  7. ruby-on-rails - 连接字符串时如何在 <%=%> block 内输出 html_safe? - 2

    考虑一下:现在这些情况:#output:http://domain.com/?foo=1&bar=2#output:http://domain.com/?foo=1&bar=2#output:http://domain.com/?foo=1&bar=2#output:http://domain.com/?foo=1&bar=2我需要用其他字符串输出URL。我如何保证&符号不会被转义?由于我无法控制的原因,我无法发送&。求助!把我的头发拉到这里:\编辑:为了澄清,我实际上有一个像这样的数组:@images=[{:id=>"fooid",:url=>"http://

  8. ruby - Faye WebSocket,关闭处理程序被触发后重新连接到套接字 - 2

    我有一个super简单的脚本,它几乎包含了FayeWebSocketGitHub页面上用于处理关闭连接的内容:ws=Faye::WebSocket::Client.new(url,nil,:headers=>headers)ws.on:opendo|event|p[:open]#sendpingcommand#sendtestcommand#ws.send({command:'test'}.to_json)endws.on:messagedo|event|#hereistheentrypointfordatacomingfromtheserver.pJSON.parse(event.d

  9. ruby-on-rails - solr 清理查询 - 2

    我在Rails上使用带有ruby​​的solr。一切正常,我只需要知道是否有任何现有代码来清理用户输入,比如以?开头的查询。或* 最佳答案 我不知道执行此操作的任何代码,但理论上可以通过查看parsingcodeinLucene来完成并搜索thrownewParseException(只有16个匹配!)。在实践中,我认为您最好只捕获代码中的任何solr异常并显示“无效查询”消息或类似信息。编辑:这里有几个“sanitizer”:http://pivotallabs.com/users/zach/blog/articles/937-s

  10. ruby-on-rails - Rails 3 在一个查询中包含多个表 - 2

    我正在为锦标赛开发一个Rails应用程序。我在这个查询中使用了三个模型:classPlayertruehas_and_belongs_to_many:tournamentsclassTournament:destroyclassPlayerMatch"Player",:foreign_key=>"player_one"belongs_to:player_two,:class_name=>"Player",:foreign_key=>"player_two"在tournaments_controller的显示操作中,我调用以下查询:Tournament.where(:id=>params

随机推荐