jjzjj

hadoop - 在 MapReduce 中因为/n 读取被分解成两行的记录

coder 2024-01-09 原文

我正在尝试编写一个自定义阅读器,用于读取具有定义字段数的记录(位于两行中)。

例如

1,2,3,4("," can be there or not)
,5,6,7,8

我的要求是读取记录并将其作为单个记录推送到映射器中,如 {1,2,3,4,5,6,7,8}。请提供一些意见。

更新:

public boolean nextKeyValue() throws IOException, InterruptedException {
    if(key == null) {
        key = new LongWritable();
    }

    //Current offset is the key
    key.set(pos); 

    if(value == null) {
        value = new Text();
    }

    int newSize = 0;
    int numFields = 0;
    Text temp = new Text();
    boolean firstRead = true;

    while(numFields < reqFields) {
        while(pos < end) {
            //Read up to the '\n' character and store it in 'temp'
            newSize = in.readLine(  temp, 
                                    maxLineLength, 
                                    Math.max((int) Math.min(Integer.MAX_VALUE, end - pos), 
                                             maxLineLength));

            //If 0 bytes were read, then we are at the end of the split
            if(newSize == 0) {
                break;
            }

            //Otherwise update 'pos' with the number of bytes read
            pos += newSize;

            //If the line is not too long, check number of fields
            if(newSize < maxLineLength) {
                break;
            }

            //Line too long, try again
            LOG.info("Skipped line of size " + newSize + " at pos " + 
                        (pos - newSize));
        }

        //Exit, since we're at the end of split
        if(newSize == 0) {
            break;
        }
        else {
            String record = temp.toString();
            StringTokenizer fields = new StringTokenizer(record,"|");

            numFields += fields.countTokens();

            //Reset 'value' if this is the first append
            if(firstRead) {
                value = new Text();
                firstRead = false;
            }

            if(numFields != reqFields) {
                value.append(temp.getBytes(), 0, temp.getLength());
            }
            else {
                value.append(temp.getBytes(), 0, temp.getLength());
            }
        }
    }

    if(newSize == 0) {
        key = null;
        value = null;
        return false;
    }
    else {
        return true;
    }
}

这是我正在尝试处理的 nextKeyValue 方法。但是映射器仍然没有获得正确的值。 reqFields 为 4

最佳答案

看看 TextInputFormat 是如何实现的。看看它的父类(super class) FileInputFormat 也是如此。您必须继承 FileInputFormat 的 Either TextInputFormat 并实现您自己的记录处理。

实现任何类型的文件输入格式时要注意的是:

框架将拆分文件并为您提供您必须读取的文件片段的起始偏移量和字节长度。很可能会将文件拆分到一些记录中。这就是为什么如果该记录未完全包含在拆分中,您的读者必须跳过拆分开头的记录字节,以及读取拆分的最后一个字节以读取整个最后一条记录(如果该记录是完整的)未完全包含在拆分中。

例如,TextInoutFormat 将\n 字符视为记录定界符,因此当它进行拆分时,它会跳过字节直到第一个\n 字符并读取拆分的末尾直到\n 字符。

至于代码示例:

您需要问自己以下问题:假设您打开文件,找到一个随机位置并开始向前阅读。 你如何检测记录的开始?我在你的代码中没有看到任何处理它的东西,没有它,你不能写出好的输入格式,因为你不知道什么是记录边界。

现在,通过使 isSplittable(JobContext,Path) 方法返回 false,仍然可以使输入格式从头到尾读取整个文件。这使得文件完全由单个映射任务读取,从而降低了并行度。

您的内部 while 循环似乎有问题,因为它正在检查太长的行并跳过它们。鉴于您的记录是使用多行编写的,因此在阅读记录时可能会合并一条记录的一部分和另一条记录的另一部分。

关于hadoop - 在 MapReduce 中因为/n 读取被分解成两行的记录,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/28068253/

有关hadoop - 在 MapReduce 中因为/n 读取被分解成两行的记录的更多相关文章

  1. ruby - 如何将脚本文件的末尾读取为数据文件(Perl 或任何其他语言) - 2

    我正在寻找执行以下操作的正确语法(在Perl、Shell或Ruby中):#variabletoaccessthedatalinesappendedasafileEND_OF_SCRIPT_MARKERrawdatastartshereanditcontinues. 最佳答案 Perl用__DATA__做这个:#!/usr/bin/perlusestrict;usewarnings;while(){print;}__DATA__Texttoprintgoeshere 关于ruby-如何将脚

  2. Ruby 写入和读取对象到文件 - 2

    好的,所以我的目标是轻松地将一些数据保存到磁盘以备后用。您如何简单地写入然后读取一个对象?所以如果我有一个简单的类classCattr_accessor:a,:bdefinitialize(a,b)@a,@b=a,bendend所以如果我从中非常快地制作一个objobj=C.new("foo","bar")#justgaveitsomerandomvalues然后我可以把它变成一个kindaidstring=obj.to_s#whichreturns""我终于可以将此字符串打印到文件或其他内容中。我的问题是,我该如何再次将这个id变回一个对象?我知道我可以自己挑选信息并制作一个接受该信

  3. ruby - Sinatra:运行 rspec 测试时记录噪音 - 2

    Sinatra新手;我正在运行一些rspec测试,但在日志中收到了一堆不需要的噪音。如何消除日志中过多的噪音?我仔细检查了环境是否设置为:test,这意味着记录器级别应设置为WARN而不是DEBUG。spec_helper:require"./app"require"sinatra"require"rspec"require"rack/test"require"database_cleaner"require"factory_girl"set:environment,:testFactoryGirl.definition_file_paths=%w{./factories./test/

  4. ruby-on-rails - Rails 5 Active Record 记录无效错误 - 2

    我有两个Rails模型,即Invoice和Invoice_details。一个Invoice_details属于Invoice,一个Invoice有多个Invoice_details。我无法使用accepts_nested_attributes_forinInvoice通过Invoice模型保存Invoice_details。我收到以下错误:(0.2ms)BEGIN(0.2ms)ROLLBACKCompleted422UnprocessableEntityin25ms(ActiveRecord:4.0ms)ActiveRecord::RecordInvalid(Validationfa

  5. 世界前沿3D开发引擎HOOPS全面讲解——集3D数据读取、3D图形渲染、3D数据发布于一体的全新3D应用开发工具 - 2

    无论您是想搭建桌面端、WEB端或者移动端APP应用,HOOPSPlatform组件都可以为您提供弹性的3D集成架构,同时,由工业领域3D技术专家组成的HOOPS技术团队也能为您提供技术支持服务。如果您的客户期望有一种在多个平台(桌面/WEB/APP,而且某些客户端是“瘦”客户端)快速、方便地将数据接入到3D应用系统的解决方案,并且当访问数据时,在各个平台上的性能和用户体验保持一致,HOOPSPlatform将帮助您完成。利用HOOPSPlatform,您可以开发在任何环境下的3D基础应用架构。HOOPSPlatform可以帮您打造3D创新型产品,HOOPSSDK包含的技术有:快速且准确的CAD

  6. python - 如何读取 MIDI 文件、更改其乐器并将其写回? - 2

    我想解析一个已经存在的.mid文件,改变它的乐器,例如从“acousticgrandpiano”到“violin”,然后将它保存回去或作为另一个.mid文件。根据我在文档中看到的内容,该乐器通过program_change或patch_change指令进行了更改,但我找不到任何在已经存在的MIDI文件中执行此操作的库.他们似乎都只支持从头开始创建的MIDI文件。 最佳答案 MIDIpackage会为您完成此操作,但具体方法取决于midi文件的原始内容。一个MIDI文件由一个或多个音轨组成,每个音轨是十六个channel中任何一个上的

  7. STM32读取串口传感器数据(颗粒物传感器,主动上传) - 2

    文章目录1.开发板选择*用到的资源2.串口通信(个人理解)3.代码分析(注释比较详细)1.主函数2.串口1配置3.串口2配置以及中断函数4.注意问题5.源码链接1.开发板选择我用的是STM32F103RCT6的板子,不过代码大概在F103系列的板子上都可以运行,我试过在野火103的霸道板上也可以,主要看一下串口对应的引脚一不一样就行了,不一样的就更改一下。*用到的资源keil5软件这里用到了两个串口资源,采集数据一个,串口通信一个,板子对应引脚如下:串口1,TX:PA9,RX:PA10串口2,TX:PA2,RX:PA32.串口通信(个人理解)我就从串口采集传感器数据这个过程说一下我自己的理解,

  8. hadoop安装之保姆级教程(二)之YARN的配置 - 2

    1.1.1 YARN的介绍 为克服Hadoop1.0中HDFS和MapReduce存在的各种问题⽽提出的,针对Hadoop1.0中的MapReduce在扩展性和多框架⽀持⽅⾯的不⾜,提出了全新的资源管理框架YARN. ApacheYARN(YetanotherResourceNegotiator的缩写)是Hadoop集群的资源管理系统,负责为计算程序提供服务器计算资源,相当于⼀个分布式的操作系统平台,⽽MapReduce等计算程序则相当于运⾏于操作系统之上的应⽤程序。 YARN被引⼊Hadoop2,最初是为了改善MapReduce的实现,但是因为具有⾜够的通⽤性,同样可以⽀持其他的分布式计算模

  9. ruby-on-rails - 事件记录 : Select max of limit - 2

    我正在尝试将以下SQL查询转换为ActiveRecord,它正在融化我的大脑。deletefromtablewhereid有什么想法吗?我想做的是限制表中的行数。所以,我想删除少于最近10个条目的所有内容。编辑:通过结合以下几个答案找到了解决方案。Temperature.where('id这给我留下了最新的10个条目。 最佳答案 从您的SQL来看,您似乎想要从表中删除前10条记录。我相信到目前为止的大多数答案都会如此。这里有两个额外的选择:基于MurifoX的版本:Table.where(:id=>Table.order(:id).

  10. ruby - 是否可以在不实际发送或读取数据的情况下查明 ruby​​ 套接字是否处于 ESTABLISHED 或 CLOSE_WAIT 状态? - 2

    s=Socket.new(Socket::AF_INET,Socket::SOCK_STREAM,0)s.connect(Socket.pack_sockaddr_in('port','hostname'))ssl=OpenSSL::SSL::SSLSocket.new(s,sslcert)ssl.connect从这里开始,如果ssl连接和底层套接字仍然是ESTABLISHED,或者它是否在默认值7200之后进入CLOSE_WAIT,我想检查一个线程几秒钟甚至更糟的是在实际上不需要.write()或.read()的情况下关闭。是用select()、IO.select()还是其他方法完成

随机推荐