jjzjj

hadoop - 用于单行和多行日志的自定义 RecordReader

coder 2024-01-09 原文

我正在尝试创建一个 MR 作业,它将更改通过 Flume 加载到 HDFS 中的日志文件的格式。我正在尝试将日志转换为一种格式,其中字段由“:::”分隔。例如

date/timestamp:::log-level:::rest-of-log

我遇到的问题是有些日志是单行的,有些是多行的,我需要在日志的其余字段中保持多行日志的完整性。我已经编写了一个自定义的 InputFormatRecordReader 来尝试执行此操作(基本上只是修改了 NLineRecordReader 以追加行,直到它到达日期戳,而不是附加固定数量的行)。我用来格式化日志的 MR 作业似乎工作正常,但 RecordReader 似乎无法正常工作以传递多行,我不确定为什么。

这是我的 RecordReader 类:

public class LogRecordReader extends RecordReader<LongWritable, Text> {

private LineReader in;
private LongWritable key;
private Text value = new Text();
private long start = 0;
private long end = 0;
private long pos = 0;
private int maxLineLength;
private Text line = new Text(); // working line
private Text lineHasDate = new Text(); // if line encounters a date stamp, hold it here

public void close() throws IOException {
    if (in != null) {
        in.close();
    }
}

public LongWritable getCurrentKey() throws IOException,InterruptedException {
    return key;
}

public Text getCurrentValue() throws IOException, InterruptedException {
    return value;
}

public float getProgress() throws IOException, InterruptedException {
    if (start == end) {
        return 0.0f;
    }
    else {
        return Math.min(1.0f, (pos - start) / (float)(end - start));
    }
}

public void initialize(InputSplit genericSplit, TaskAttemptContext context) throws IOException, InterruptedException {

    FileSplit split = (FileSplit) genericSplit;
    final Path file = split.getPath();
    Configuration conf = context.getConfiguration();
    this.maxLineLength = conf.getInt("mapred.linerecordreader.maxlength",Integer.MAX_VALUE);
    FileSystem fs = file.getFileSystem(conf);
    start = split.getStart();
    end = start + split.getLength();
    boolean skipFirstLine = false;
    FSDataInputStream filein = fs.open(split.getPath());

    // if we're not starting at the beginning, we should skip the first line
    if (start != 0){
        skipFirstLine = true;
        --start;
        filein.seek(start);
    }

    in = new LineReader(filein, conf);

    // if we should skip the first line
    if(skipFirstLine){
        start += in.readLine(new Text(), 0, (int)Math.min((long)Integer.MAX_VALUE, end - start));
    }

    this.pos = start;
}

/**
 * create a complete log message from individual lines using date/time stamp as a breakpoint
 */
public boolean nextKeyValue() throws IOException, InterruptedException {

    // if key has not yet been initialized
    if (key == null) { 
        key = new LongWritable();
    }

    key.set(pos);

    // if value has not yet been initialized
    if (value == null) { 
        value = new Text();
    }

    value.clear();

    final Text endline = new Text("\n");
    int newSize = 0;

    // if a line with a date was encountered on the previous call
    if (lineHasDate.getLength() > 0) { 
        while (pos < end) {
            value.append(lineHasDate.getBytes(), 0, lineHasDate.getLength()); // append the line
            value.append(endline.getBytes(), 0, endline.getLength()); // append a line break
            pos += newSize;
            if (newSize == 0) break;
        }
        lineHasDate.clear(); // clean up
    }

    // to check buffer 'line' for date/time stamp
    Pattern regexDateTime = Pattern.compile("^\\d{2}\\s\\S+\\s\\d{4}\\s\\d{2}:\\d{2}:\\d{2},\\d{3}\\s");
    Matcher matcherDateTime = regexDateTime.matcher(line.toString());

    // read in a new line to the buffer 'line'
    newSize = in.readLine(line, maxLineLength, Math.max((int)Math.min(Integer.MAX_VALUE, end-pos), maxLineLength));

    // if the line in the buffer contains a date/time stamp, append it
    if (matcherDateTime.find()) {
        while (pos < end) {
            newSize = in.readLine(line, maxLineLength, Math.max((int)Math.min(Integer.MAX_VALUE, end-pos), maxLineLength));
            value.append(line.getBytes(), 0, line.getLength()); // append the line
            value.append(endline.getBytes(), 0, endline.getLength()); // append a line break
            if (newSize == 0) break;
            pos += newSize;
            if (newSize < maxLineLength) break;
        }
        // read in the next line to the buffer 'line'
        newSize = in.readLine(line, maxLineLength, Math.max((int)Math.min(Integer.MAX_VALUE, end-pos), maxLineLength));
    }

    // while lines in the buffer do not contain date/time stamps, append them
    while(!matcherDateTime.find()) {
            newSize = in.readLine(line, maxLineLength, Math.max((int)Math.min(Integer.MAX_VALUE, end-pos), maxLineLength));
            value.append(line.getBytes(), 0, line.getLength()); // append the line
            value.append(endline.getBytes(), 0, endline.getLength()); // append a line break
            if (newSize == 0) break;
            pos += newSize;
            if (newSize < maxLineLength) break;
        // read in the next line to the buffer 'line', and continue looping
        newSize = in.readLine(line, maxLineLength, Math.max((int)Math.min(Integer.MAX_VALUE, end-pos), maxLineLength));
    }

    // if the line in the buffer contains a date/time stamp (which it should since the loop broke) save it for next call
    if (matcherDateTime.find()) lineHasDate = line;

    // if there is no new line
    if (newSize == 0) {
        // TODO: if lineHasDate is the last line in the file, it must be appended (?)
        key = null;
        value = null;
        return false;
    } 

    return true;
}
}

这是格式化日志的 MR 作业:

public class FlumeLogFormat extends Configured implements Tool {

/**
 * Map class
 */
public static class Map extends MapReduceBase 
    implements Mapper<LongWritable, Text, Text, Text> {

    private Text formattedLog = new Text();
    private Text keyDateTime = new Text(); // no value

    public void map(LongWritable key, Text value, 
        OutputCollector<Text, Text> output, Reporter reporter) throws IOException {

        String log = value.toString();
        StringBuffer buffer = new StringBuffer();

        Pattern regex = Pattern.compile("^(\\d{2}\\s\\S+\\s\\d{4}\\s\\d{2}:\\d{2}:\\d{2},\\d{3})\\s([A-Z]{4,5})\\s([\\s\\S]+)");
        Matcher matcher = regex.matcher(log);
        if (matcher.find()) {
            buffer.append(matcher.group(1)+":::"+matcher.group(2)+":::"+matcher.group(3)); // insert ":::" between fields to serve as a delimiter

        formattedLog.set(buffer.toString());
        keyDateTime.set(matcher.group(1));
        output.collect(keyDateTime, formattedLog);
        }
    }
}

/**
 * run method
 * @param args
 * @return int
 * @throws Exception
 */
public int run(String[] args) throws Exception {

    JobConf conf = new JobConf(getConf(), FlumeLogFormat.class); // class is LogFormat
    conf.setJobName("FlumeLogFormat");

    conf.setOutputKeyClass(Text.class);
    conf.setOutputValueClass(Text.class);

    conf.setMapperClass(Map.class);

    List<String> other_args = new ArrayList<String>();
    for(int i=0; i < args.length; ++i) {
      try {
        if ("-m".equals(args[i])) {
          conf.setNumMapTasks(Integer.parseInt(args[++i]));
        } else if ("-r".equals(args[i])) {
          conf.setNumReduceTasks(Integer.parseInt(args[++i]));
        } else {
          other_args.add(args[i]);
        }
      } catch (NumberFormatException exception) {
        System.out.println("Give int value instead of " + args[i]);
        //return printUsage();
      } catch (ArrayIndexOutOfBoundsException exception) {
        System.out.println("Parameter missing " +  args[i-1]);
        //return printUsage();
      }
    }

    if (other_args.size() != 2) {

      //return printUsage();
    }

    FileInputFormat.setInputPaths(conf, new Path(other_args.get(0)));
    FileOutputFormat.setOutputPath(conf, new Path(other_args.get(1)));

    conf.setInputFormat(LogInputFormat.class);
    conf.setOutputFormat(SequenceFileOutputFormat.class);

    JobClient.runJob(conf);
    return 0;
}


/**
 * Main method
 * @param args
 * @throws Exception
 */
public static void main(String[] args) throws Exception {

    int res = ToolRunner.run(new Configuration(), new FlumeLogFormat(), args);
    System.exit(res);
}
}

日志如下:

21 July 2013 17:35:51,334 INFO  [conf-file-poller-0] (org.apache.flume.node.Application.startAllComponents:173)  - Starting Sink k1

25 May 2013 06:33:36,795 ERROR [lifecycleSupervisor-1-7] (org.apache.flume.lifecycle.LifecycleSupervisor$MonitorRunnable.run:253)  - Unable to start EventDrivenSourceRunner: { source:org.apache.flume.source.SpoolDirectorySource{name:r1,state:IDLE} } - Exception follows.
java.lang.IllegalStateException: Directory does not exist: /root/FlumeTest
        at com.google.common.base.Preconditions.checkState(Preconditions.java:145)
        at org.apache.flume.client.avro.ReliableSpoolingFileEventReader.<init>(ReliableSpoolingFileEventReader.java:129)
        at org.apache.flume.client.avro.ReliableSpoolingFileEventReader.<init>(ReliableSpoolingFileEventReader.java:72)
        at org.apache.flume.client.avro.ReliableSpoolingFileEventReader$Builder.build(ReliableSpoolingFileEventReader.java:556)
        at org.apache.flume.source.SpoolDirectorySource.start(SpoolDirectorySource.java:75)
        at org.apache.flume.source.EventDrivenSourceRunner.start(EventDrivenSourceRunner.java:44)
        at org.apache.flume.lifecycle.LifecycleSupervisor$MonitorRunnable.run(LifecycleSupervisor.java:251)
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
        at java.util.concurrent.FutureTask$Sync.innerRunAndReset(FutureTask.java:351)
        at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:178)
        at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:165)
        at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:267)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1146)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
        at java.lang.Thread.run(Thread.java:679)

01 June 2012 12:35:22,222 INFO noiweoqierwnvoirenvoiernv iorenvoiernve irnvoirenv

最佳答案

FWIW,我看到您正在处理多行堆栈跟踪。

目前,我正在使用与 Log4j2 一起使用的自定义 Flume 构建来处理这些问题,我使用 {separator(|)} 语法将 PatternLayout 中的 %mEX 替换为 |。

关于hadoop - 用于单行和多行日志的自定义 RecordReader,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/17097622/

有关hadoop - 用于单行和多行日志的自定义 RecordReader的更多相关文章

  1. ruby - Facter::Util::Uptime:Module 的未定义方法 get_uptime (NoMethodError) - 2

    我正在尝试设置一个puppet节点,但ruby​​gems似乎不正常。如果我通过它自己的二进制文件(/usr/lib/ruby/gems/1.8/gems/facter-1.5.8/bin/facter)在cli上运行facter,它工作正常,但如果我通过由ruby​​gems(/usr/bin/facter)安装的二进制文件,它抛出:/usr/lib/ruby/1.8/facter/uptime.rb:11:undefinedmethod`get_uptime'forFacter::Util::Uptime:Module(NoMethodError)from/usr/lib/ruby

  2. ruby-on-rails - Rails 常用字符串(用于通知和错误信息等) - 2

    大约一年前,我决定确保每个包含非唯一文本的Flash通知都将从模块中的方法中获取文本。我这样做的最初原因是为了避免一遍又一遍地输入相同的字符串。如果我想更改措辞,我可以在一个地方轻松完成,而且一遍又一遍地重复同一件事而出现拼写错误的可能性也会降低。我最终得到的是这样的:moduleMessagesdefformat_error_messages(errors)errors.map{|attribute,message|"Error:#{attribute.to_s.titleize}#{message}."}enddeferror_message_could_not_find(obje

  3. ruby-on-rails - Rails 3.2.1 中 ActionMailer 中的未定义方法 'default_content_type=' - 2

    我在我的项目中添加了一个系统来重置用户密码并通过电子邮件将密码发送给他,以防他忘记密码。昨天它运行良好(当我实现它时)。当我今天尝试启动服务器时,出现以下错误。=>BootingWEBrick=>Rails3.2.1applicationstartingindevelopmentonhttp://0.0.0.0:3000=>Callwith-dtodetach=>Ctrl-CtoshutdownserverExiting/Users/vinayshenoy/.rvm/gems/ruby-1.9.3-p0/gems/actionmailer-3.2.1/lib/action_mailer

  4. ruby-on-rails - form_for 中不在模型中的自定义字段 - 2

    我想向我的Controller传递一个参数,它是一个简单的复选框,但我不知道如何在模型的form_for中引入它,这是我的观点:{:id=>'go_finance'}do|f|%>Transferirde:para:Entrada:"input",:placeholder=>"Quantofoiganho?"%>Saída:"output",:placeholder=>"Quantofoigasto?"%>Nota:我想做一个额外的复选框,但我该怎么做,模型中没有一个对象,而是一个要检查的对象,以便在Controller中创建一个ifelse,如果没有检查,请帮助我,非常感谢,谢谢

  5. ruby - 主要 :Object when running build from sublime 的未定义方法 `require_relative' - 2

    我已经从我的命令行中获得了一切,所以我可以运行rubymyfile并且它可以正常工作。但是当我尝试从sublime中运行它时,我得到了undefinedmethod`require_relative'formain:Object有人知道我的sublime设置中缺少什么吗?我正在使用OSX并安装了rvm。 最佳答案 或者,您可以只使用“require”,它应该可以正常工作。我认为“require_relative”仅适用于ruby​​1.9+ 关于ruby-主要:Objectwhenrun

  6. ruby-on-rails - 如何在 ruby​​ 交互式 shell 中有多行? - 2

    这可能是个愚蠢的问题。但是,我是一个新手......你怎么能在交互式ruby​​shell中有多行代码?好像你只能有一条长线。按回车键运行代码。无论如何我可以在不运行代码的情况下跳到下一行吗?再次抱歉,如果这是一个愚蠢的问题。谢谢。 最佳答案 这是一个例子:2.1.2:053>a=1=>12.1.2:054>b=2=>22.1.2:055>a+b=>32.1.2:056>ifa>b#Thecode‘if..."startsthedefinitionoftheconditionalstatement.2.1.2:057?>puts"f

  7. Ruby Sinatra 配置用于生产和开发 - 2

    我已经在Sinatra上创建了应用程序,它代表了一个简单的API。我想在生产和开发上进行部署。我想在部署时选择,是开发还是生产,一些方法的逻辑应该改变,这取决于部署类型。是否有任何想法,如何完成以及解决此问题的一些示例。例子:我有代码get'/api/test'doreturn"Itisdev"end但是在部署到生产环境之后我想在运行/api/test之后看到ItisPROD如何实现? 最佳答案 根据SinatraDocumentation:EnvironmentscanbesetthroughtheRACK_ENVenvironm

  8. ruby - 在 Ruby 中有条件地定义函数 - 2

    我有一些代码在几个不同的位置之一运行:作为具有调试输出的命令行工具,作为不接受任何输出的更大程序的一部分,以及在Rails环境中。有时我需要根据代码的位置对代码进行细微的更改,我意识到以下样式似乎可行:print"Testingnestedfunctionsdefined\n"CLI=trueifCLIdeftest_printprint"CommandLineVersion\n"endelsedeftest_printprint"ReleaseVersion\n"endendtest_print()这导致:TestingnestedfunctionsdefinedCommandLin

  9. ruby - 定义方法参数的条件 - 2

    我有一个只接受一个参数的方法:defmy_method(number)end如果使用number调用方法,我该如何引发错误??通常,我如何定义方法参数的条件?比如我想在调用的时候报错:my_method(1) 最佳答案 您可以添加guard在函数的开头,如果参数无效则引发异常。例如:defmy_method(number)failArgumentError,"Inputshouldbegreaterthanorequalto2"ifnumbereputse.messageend#=>Inputshouldbegreaterthano

  10. ruby - 如何在 Grape 中定义哈希数组? - 2

    我使用Ember作为我的前端和GrapeAPI来为我的API提供服务。前端发送类似:{"service"=>{"name"=>"Name","duration"=>"30","user"=>nil,"organization"=>"org","category"=>nil,"description"=>"description","disabled"=>true,"color"=>nil,"availabilities"=>[{"day"=>"Saturday","enabled"=>false,"timeSlots"=>[{"startAt"=>"09:00AM","endAt"=>

随机推荐