jjzjj

java - Hadoop - reducer 未启动

coder 2024-01-08 原文

我正在尝试在 Hadoop 2.6.0 上为单节点集群运行开源 kNN 加入 MapReduce hbrj 算法 - 我的笔记本电脑 (OSX) 上安装了伪分布式操作。这是代码。

Mapper、reducer和主驱动:

public class RPhase2 extends Configured implements Tool 
{
    public static class MapClass extends MapReduceBase 
    implements Mapper<LongWritable, Text, IntWritable, RPhase2Value> 
    {
        public void map(LongWritable key, Text value, 
        OutputCollector<IntWritable, RPhase2Value> output, 
        Reporter reporter)  throws IOException 
        {
            String line = value.toString();
            String[] parts = line.split(" +");
            // key format <rid1>
            IntWritable mapKey = new IntWritable(Integer.valueOf(parts[0]));
            // value format <rid2, dist>
            RPhase2Value np2v = new RPhase2Value(Integer.valueOf(parts[1]), Float.valueOf(parts[2]));
            System.out.println("############### key:  " + mapKey.toString() + "   np2v:  " + np2v.toString());
            output.collect(mapKey, np2v);
        }
    }

    public static class Reduce extends MapReduceBase
    implements Reducer<IntWritable, RPhase2Value, NullWritable, Text> 
    {
        int numberOfPartition;  
        int knn;

        class Record {...}

        class RecordComparator implements Comparator<Record> {...}

        public void configure(JobConf job) 
        {
            numberOfPartition = job.getInt("numberOfPartition", 2); 
            knn = job.getInt("knn", 3);
            System.out.println("########## configuring!");
        }   

        public void reduce(IntWritable key, Iterator<RPhase2Value> values, 
        OutputCollector<NullWritable, Text> output, 
        Reporter reporter) throws IOException 
        {
            //initialize the pq
            RecordComparator rc = new RecordComparator();
            PriorityQueue<Record> pq = new PriorityQueue<Record>(knn + 1, rc);

            System.out.println("Phase 2 is at reduce");
            System.out.println("########## key: " + key.toString());

            // For each record we have a reduce task
            // value format <rid1, rid2, dist>
            while (values.hasNext()) 
            {
                RPhase2Value np2v = values.next();

                int id2 = np2v.getFirst().get();
                float dist = np2v.getSecond().get();
                Record record = new Record(id2, dist);
                pq.add(record);
                if (pq.size() > knn)
                    pq.poll();
            }

            while(pq.size() > 0) 
            {
                output.collect(NullWritable.get(), new Text(key.toString() + " " + pq.poll().toString()));
                //break; // only ouput the first record
            }

        } // reduce
    } // Reducer

    public int run(String[] args) throws Exception {
        JobConf conf = new JobConf(getConf(), RPhase2.class);
        conf.setJobName("RPhase2");

        conf.setMapOutputKeyClass(IntWritable.class);
        conf.setMapOutputValueClass(RPhase2Value.class);

        conf.setOutputKeyClass(NullWritable.class);
        conf.setOutputValueClass(Text.class);   

        conf.setMapperClass(MapClass.class);        
        conf.setReducerClass(Reduce.class);

        int numberOfPartition = 0;  
        List<String> other_args = new ArrayList<String>();

        for(int i = 0; i < args.length; ++i) 
        {
            try {
                if ("-m".equals(args[i])) {
                    //conf.setNumMapTasks(Integer.parseInt(args[++i]));
                    ++i;
                } else if ("-r".equals(args[i])) {
                    conf.setNumReduceTasks(Integer.parseInt(args[++i]));
                } else if ("-p".equals(args[i])) {
                    numberOfPartition = Integer.parseInt(args[++i]);
                    conf.setInt("numberOfPartition", numberOfPartition);
                } else if ("-k".equals(args[i])) {
                    int knn = Integer.parseInt(args[++i]);
                    conf.setInt("knn", knn);
                    System.out.println(knn + "~ hi");
                } else {
                    other_args.add(args[i]);
                }
                conf.setNumReduceTasks(numberOfPartition * numberOfPartition);
                //conf.setNumReduceTasks(1);
            } catch (NumberFormatException except) {
                System.out.println("ERROR: Integer expected instead of " + args[i]);
                return printUsage();
            } catch (ArrayIndexOutOfBoundsException except) {
                System.out.println("ERROR: Required parameter missing from " + args[i-1]);
                return printUsage();
            }
        } 


        FileInputFormat.setInputPaths(conf, other_args.get(0));
        FileOutputFormat.setOutputPath(conf, new Path(other_args.get(1)));

        JobClient.runJob(conf);
        return 0;
    }

    public static void main(String[] args) throws Exception {
        int res = ToolRunner.run(new Configuration(), new RPhase2(), args);
    }
} // RPhase2

当我运行它时,映射器是成功的,但作业突然终止,而且 reducer 从未实例化。此外,不会打印任何错误(即使在日志文件中)。我知道这也是因为 Reducer 配置中的打印语句永远不会打印出来。输出:

15/06/15 14:00:37 INFO mapred.LocalJobRunner: map task executor complete.
15/06/15 14:00:38 INFO mapreduce.Job:  map 100% reduce 0%
15/06/15 14:00:38 INFO mapreduce.Job: Job job_local833125918_0001 completed successfully
15/06/15 14:00:38 INFO mapreduce.Job: Counters: 20
    File System Counters
        FILE: Number of bytes read=12505456
        FILE: Number of bytes written=14977422
        FILE: Number of read operations=0
        FILE: Number of large read operations=0
        FILE: Number of write operations=0
        HDFS: Number of bytes read=11408
        HDFS: Number of bytes written=8724
        HDFS: Number of read operations=216
        HDFS: Number of large read operations=0
        HDFS: Number of write operations=99
    Map-Reduce Framework
        Map input records=60
        Map output records=60
        Input split bytes=963
        Spilled Records=0
        Failed Shuffles=0
        Merged Map outputs=0
        GC time elapsed (ms)=14
        Total committed heap usage (bytes)=1717567488
    File Input Format Counters 
        Bytes Read=2153
    File Output Format Counters 
        Bytes Written=1645

到目前为止我做了什么:

  • 我一直在看类似的问题,我发现最常见的问题是当 mapper 和 reducer 的输出不同时没有配置输出格式,这是在上面的代码中完成的:conf.setMapOutputKeyClass(Class ); conf.setMapOutputValueClass(类);

  • 在另一篇文章中,我发现了将 reduce(..., Iterator <...>, ...) 更改为 (..., Iterable <...>, ...) 的建议给我编译带来了麻烦。我无法再使用 .getNext() 和 .next() 方法,并且出现了这个错误:

    错误:Reduce 不是抽象的,没有覆盖 Reducer 中的抽象方法 reduce(IntWritable,Iterator,OutputCollector,Reporter)

如果有人对我可以尝试找出问题所在有任何提示或建议,我将不胜感激!

请注意,我之前曾在这里 (Hadoop kNN join algorithm stuck at map 100% reduce 0%) 发布过关于我的问题的问题,但没有得到足够的关注,所以我想从不同的角度重新提出这个问题。您可以使用此链接了解有关我的日志文件的更多详细信息。

最佳答案

我已经弄清楚了问题所在,这很愚蠢。如果您在上面的代码中注意到,numberOfPartition 在读取参数之前设置为 0,并且 reducer 的数量设置为 numberOfPartition * numberOfPartition。我,因为用户没有更改分区参数的数量(主要是因为我只是从他们提供的 README 中复制粘贴了参数行)所以这就是 reducer 从未启动的原因。

关于java - Hadoop - reducer 未启动,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/30853748/

有关java - Hadoop - reducer 未启动的更多相关文章

  1. java - 等价于 Java 中的 Ruby Hash - 2

    我真的很习惯使用Ruby编写以下代码:my_hash={}my_hash['test']=1Java中对应的数据结构是什么? 最佳答案 HashMapmap=newHashMap();map.put("test",1);我假设? 关于java-等价于Java中的RubyHash,我们在StackOverflow上找到一个类似的问题: https://stackoverflow.com/questions/22737685/

  2. ruby-on-rails - 启动 Rails 服务器时 ImageMagick 的警告 - 2

    最近,当我启动我的Rails服务器时,我收到了一长串警告。虽然它不影响我的应用程序,但我想知道如何解决这些警告。我的估计是imagemagick以某种方式被调用了两次?当我在警告前后检查我的git日志时。我想知道如何解决这个问题。-bcrypt-ruby(3.1.2)-better_errors(1.0.1)+bcrypt(3.1.7)+bcrypt-ruby(3.1.5)-bcrypt(>=3.1.3)+better_errors(1.1.0)bcrypt和imagemagick有关系吗?/Users/rbchris/.rbenv/versions/2.0.0-p247/lib/ru

  3. java - 从 JRuby 调用 Java 类的问题 - 2

    我正在尝试使用boilerpipe来自JRuby。我看过guide从JRuby调用Java,并成功地将它与另一个Java包一起使用,但无法弄清楚为什么同样的东西不能用于boilerpipe。我正在尝试基本上从JRuby中执行与此Java等效的操作:URLurl=newURL("http://www.example.com/some-location/index.html");Stringtext=ArticleExtractor.INSTANCE.getText(url);在JRuby中试过这个:require'java'url=java.net.URL.new("http://www

  4. java - 我的模型类或其他类中应该有逻辑吗 - 2

    我只想对我一直在思考的这个问题有其他意见,例如我有classuser_controller和classuserclassUserattr_accessor:name,:usernameendclassUserController//dosomethingaboutanythingaboutusersend问题是我的User类中是否应该有逻辑user=User.newuser.do_something(user1)oritshouldbeuser_controller=UserController.newuser_controller.do_something(user1,user2)我

  5. java - 什么相当于 ruby​​ 的 rack 或 python 的 Java wsgi? - 2

    什么是ruby​​的rack或python的Java的wsgi?还有一个路由库。 最佳答案 来自Python标准PEP333:Bycontrast,althoughJavahasjustasmanywebapplicationframeworksavailable,Java's"servlet"APImakesitpossibleforapplicationswrittenwithanyJavawebapplicationframeworktoruninanywebserverthatsupportstheservletAPI.ht

  6. UE4 源码阅读:从引擎启动到Receive Begin Play - 2

    一、引擎主循环UE版本:4.27一、引擎主循环的位置:Launch.cpp:GuardedMain函数二、、GuardedMain函数执行逻辑:1、EnginePreInit:加载大多数模块int32ErrorLevel=EnginePreInit(CmdLine);PreInit模块加载顺序:模块加载过程:(1)注册模块中定义的UObject,同时为每个类构造一个类默认对象(CDO,记录类的默认状态,作为模板用于子类实例创建)(2)调用模块的StartUpModule方法2、FEngineLoop::Init()1、检查Engine的配置文件找出使用了哪一个GameEngine类(UGame

  7. Observability:从零开始创建 Java 微服务并监控它 (二) - 2

    这篇文章是继上一篇文章“Observability:从零开始创建Java微服务并监控它(一)”的续篇。在上一篇文章中,我们讲述了如何创建一个Javaweb应用,并使用Filebeat来收集应用所生成的日志。在今天的文章中,我来详述如何收集应用的指标,使用APM来监控应用并监督web服务的在线情况。源码可以在地址 https://github.com/liu-xiao-guo/java_observability 进行下载。摄入指标指标被视为可以随时更改的时间点值。当前请求的数量可以改变任何毫秒。你可能有1000个请求的峰值,然后一切都回到一个请求。这也意味着这些指标可能不准确,你还想提取最小/

  8. 【Java 面试合集】HashMap中为什么引入红黑树,而不是AVL树呢 - 2

    HashMap中为什么引入红黑树,而不是AVL树呢1.概述开始学习这个知识点之前我们需要知道,在JDK1.8以及之前,针对HashMap有什么不同。JDK1.7的时候,HashMap的底层实现是数组+链表JDK1.8的时候,HashMap的底层实现是数组+链表+红黑树我们要思考一个问题,为什么要从链表转为红黑树呢。首先先让我们了解下链表有什么不好???2.链表上述的截图其实就是链表的结构,我们来看下链表的增删改查的时间复杂度增:因为链表不是线性结构,所以每次添加的时候,只需要移动一个节点,所以可以理解为复杂度是N(1)删:算法时间复杂度跟增保持一致查:既然是非线性结构,所以查询某一个节点的时候

  9. 【Java入门】使用Java实现文件夹的遍历 - 2

    遍历文件夹我们通常是使用递归进行操作,这种方式比较简单,也比较容易理解。本文为大家介绍另一种不使用递归的方式,由于没有使用递归,只用到了循环和集合,所以效率更高一些!一、使用递归遍历文件夹整体思路1、使用File封装初始目录,2、打印这个目录3、获取这个目录下所有的子文件和子目录的数组。4、遍历这个数组,取出每个File对象4-1、如果File是否是一个文件,打印4-2、否则就是一个目录,递归调用代码实现publicclassSearchFile{publicstaticvoidmain(String[]args){//初始目录Filedir=newFile("d:/Dev");Datebeg

  10. hadoop安装之保姆级教程(二)之YARN的配置 - 2

    1.1.1 YARN的介绍 为克服Hadoop1.0中HDFS和MapReduce存在的各种问题⽽提出的,针对Hadoop1.0中的MapReduce在扩展性和多框架⽀持⽅⾯的不⾜,提出了全新的资源管理框架YARN. ApacheYARN(YetanotherResourceNegotiator的缩写)是Hadoop集群的资源管理系统,负责为计算程序提供服务器计算资源,相当于⼀个分布式的操作系统平台,⽽MapReduce等计算程序则相当于运⾏于操作系统之上的应⽤程序。 YARN被引⼊Hadoop2,最初是为了改善MapReduce的实现,但是因为具有⾜够的通⽤性,同样可以⽀持其他的分布式计算模

随机推荐