我创建了一个 hadoop 自定义可写对象,如下所示
public class ResultType implements Writable {
private Text xxxx;
private Text yyyy;
private Text zzzz;
public ResultType() {}
public ResultType(Text xxxx, Text yyyy, Text zzzz) {
this.xxxx = xxxx;
this.yyyy = yyyy;
this.zzzz = zzzz;
}
public Text getxxxx() {
return this.xxxx;
}
public Text getyyyy() {
return this.yyyy;
}
public Text getzzzz() {
return this.zzzz;
}
@Override
public void readFields(DataInput in) throws IOException {
xxxx.readFields(in);
yyyy.readFields(in);
zzzz.readFields(in);
}
@Override
public void write(DataOutput out) throws IOException {
xxxx.write(out);
yyyy.write(out);
zzzz.write(out);
}
}
我的映射器代码是
public static class Mapper1 extends TableMapper<Text, ResultType> {
private Text text = new Text();
@Override
public void map(ImmutableBytesWritable row, Result values, Context context)
throws IOException, InterruptedException {
// getting name value
String xxxx = new String(values.getValue(Bytes.toBytes("cf"), Bytes.toBytes("xxxx")));
String yyyy = new String(values.getValue(Bytes.toBytes("cf"), Bytes.toBytes("yyyy")));
String zzzz = new String(values.getValue(Bytes.toBytes("cf"), Bytes.toBytes("zzzz")));
text.set(xxxx);
context.write(text, new ResultType(new Text(xxxx), new Text(yyyy), new Text(zzzz)));
}
}
我的 Reducer 代码是
public static class Reducer1 extends Reducer<Text, ResultType, Text, ResultType> {
public void reduce(Text key, Iterable<ResultType> values, Context context)
throws IOException, InterruptedException {
List<ResultType> returnset = new ArrayList<ResultType>();
Map<String, ResultType> duplicatelist = new HashMap<String, ResultType>();
boolean iskeyadded = true;
for (ResultType val : values) {
Text yyyy = val.getyyyy();
Text zzzz = val.getzzzz();
String groupkey = yyyy + "," + zzzz ;
if (duplicatelist.containsKey(groupkey)) {
if (iskeyadded) {
context.write(key, new ResultType(new Text(key), new Text(yyyy),
new Text(zzzz)));
iskeyadded = false;
}
context.write(key, new ResultType(new Text(key), new Text(yyyy), new Text(zzzz)));
} else {
duplicatelist.put(groupkey, val);
}
}
}
}
当我运行这段代码时,我得到了
Ignoring exception during close for org.apache.hadoop.mapred.MapTask$NewOutputCollector@20890b6f
java.lang.NullPointerException
at test.ResultType.readFields(ResultType.java)
at org.apache.hadoop.io.serializer.WritableSerialization$WritableDeserializer.deserialize(WritableSerialization.java:71)
at org.apache.hadoop.io.serializer.WritableSerialization$WritableDeserializer.deserialize(WritableSerialization.java:42)
at org.apache.hadoop.mapreduce.task.ReduceContextImpl.nextKeyValue(ReduceContextImpl.java:146)
at org.apache.hadoop.mapreduce.task.ReduceContextImpl.nextKey(ReduceContextImpl.java:121)
at org.apache.hadoop.mapreduce.lib.reduce.WrappedReducer$Context.nextKey(WrappedReducer.java:302)
at org.apache.hadoop.mapreduce.Reducer.run(Reducer.java:170)
at org.apache.hadoop.mapred.Task$NewCombinerRunner.combine(Task.java:1688)
at org.apache.hadoop.mapred.MapTask$MapOutputBuffer.sortAndSpill(MapTask.java:1637)
at org.apache.hadoop.mapred.MapTask$MapOutputBuffer.flush(MapTask.java:1489)
at org.apache.hadoop.mapred.MapTask$NewOutputCollector.close(MapTask.java:723)
at org.apache.hadoop.mapred.MapTask.closeQuietly(MapTask.java:2019)
at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:797)
at org.apache.hadoop.mapred.MapTask.run(MapTask.java:341)
at org.apache.hadoop.mapred.LocalJobRunner$Job$MapTaskRunnable.run(LocalJobRunner.java:243)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
at java.util.concurrent.FutureTask.run(FutureTask.java:262)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
最佳答案
您得到一个 NullPointerException,因为您的自定义可写对象中的任何 Text 对象都不是在任何地方创建的。您可以只在类顶部声明它们的地方创建它们。
private Text xxxx = new Text();
private Text yyyy = new Text();
private Text zzzz = new Text();
我还建议您将设置它们的构造函数更改为:
public ResultType(Text xxxx, Text yyyy, Text zzzz) {
this.xxxx.set(xxxx);
this.yyyy.set(yyyy);
this.zzzz.set(zzzz);
}
与字符串不同,Text 对象不是不可变的,因此使它们相等不会创建新的 Text 对象。如果您尝试在别处重用 Text 对象,这将导致问题。
关于java - 在 org.apache.hadoop.mapred.MapTask$NewOutputCollector 关闭期间忽略异常,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/44501999/
我主要使用Ruby来执行此操作,但到目前为止我的攻击计划如下:使用gemsrdf、rdf-rdfa和rdf-microdata或mida来解析给定任何URI的数据。我认为最好映射到像schema.org这样的统一模式,例如使用这个yaml文件,它试图描述数据词汇表和opengraph到schema.org之间的转换:#SchemaXtoschema.orgconversion#data-vocabularyDV:name:namestreet-address:streetAddressregion:addressRegionlocality:addressLocalityphoto:i
我真的很习惯使用Ruby编写以下代码:my_hash={}my_hash['test']=1Java中对应的数据结构是什么? 最佳答案 HashMapmap=newHashMap();map.put("test",1);我假设? 关于java-等价于Java中的RubyHash,我们在StackOverflow上找到一个类似的问题: https://stackoverflow.com/questions/22737685/
我知道我可以指定某些字段来使用pluck查询数据库。ids=Item.where('due_at但是我想知道,是否有一种方法可以指定我想避免从数据库查询的某些字段。某种反拔?posts=Post.where(published:true).do_not_lookup(:enormous_field) 最佳答案 Model#attribute_names应该返回列/属性数组。您可以排除其中一些并传递给pluck或select方法。像这样:posts=Post.where(published:true).select(Post.attr
我正在尝试使用boilerpipe来自JRuby。我看过guide从JRuby调用Java,并成功地将它与另一个Java包一起使用,但无法弄清楚为什么同样的东西不能用于boilerpipe。我正在尝试基本上从JRuby中执行与此Java等效的操作:URLurl=newURL("http://www.example.com/some-location/index.html");Stringtext=ArticleExtractor.INSTANCE.getText(url);在JRuby中试过这个:require'java'url=java.net.URL.new("http://www
下面的代码在我第一次运行它时就可以正常工作:require'rubygems'require'spreadsheet'book=Spreadsheet.open'/Users/me/myruby/Mywks.xls'sheet=book.worksheet0row=sheet.row(1)putsrow[1]book.write'/Users/me/myruby/Mywks.xls'当我再次运行它时,我会收到更多消息,例如:/Library/Ruby/Gems/1.8/gems/spreadsheet-0.6.5.9/lib/spreadsheet/excel/reader.rb:11
我想这样组织C源代码:+/||___+ext||||___+native_extension||||___+lib||||||___(Sourcefilesarekeptinhere-maycontainsub-folders)||||___native_extension.c||___native_extension.h||___extconf.rb||___+lib||||___(Rubysourcecode)||___Rakefile我无法使此设置与mkmf一起正常工作。native_extension/lib中的文件(包含在native_extension.c中)将被完全忽略。
我只想对我一直在思考的这个问题有其他意见,例如我有classuser_controller和classuserclassUserattr_accessor:name,:usernameendclassUserController//dosomethingaboutanythingaboutusersend问题是我的User类中是否应该有逻辑user=User.newuser.do_something(user1)oritshouldbeuser_controller=UserController.newuser_controller.do_something(user1,user2)我
什么是ruby的rack或python的Java的wsgi?还有一个路由库。 最佳答案 来自Python标准PEP333:Bycontrast,althoughJavahasjustasmanywebapplicationframeworksavailable,Java's"servlet"APImakesitpossibleforapplicationswrittenwithanyJavawebapplicationframeworktoruninanywebserverthatsupportstheservletAPI.ht
导读:随着叮咚买菜业务的发展,不同的业务场景对数据分析提出了不同的需求,他们希望引入一款实时OLAP数据库,构建一个灵活的多维实时查询和分析的平台,统一数据的接入和查询方案,解决各业务线对数据高效实时查询和精细化运营的需求。经过调研选型,最终引入ApacheDoris作为最终的OLAP分析引擎,Doris作为核心的OLAP引擎支持复杂地分析操作、提供多维的数据视图,在叮咚买菜数十个业务场景中广泛应用。作者|叮咚买菜资深数据工程师韩青叮咚买菜创立于2017年5月,是一家专注美好食物的创业公司。叮咚买菜专注吃的事业,为满足更多人“想吃什么”而努力,通过美好食材的供应、美好滋味的开发以及美食品牌的孵
这篇文章是继上一篇文章“Observability:从零开始创建Java微服务并监控它(一)”的续篇。在上一篇文章中,我们讲述了如何创建一个Javaweb应用,并使用Filebeat来收集应用所生成的日志。在今天的文章中,我来详述如何收集应用的指标,使用APM来监控应用并监督web服务的在线情况。源码可以在地址 https://github.com/liu-xiao-guo/java_observability 进行下载。摄入指标指标被视为可以随时更改的时间点值。当前请求的数量可以改变任何毫秒。你可能有1000个请求的峰值,然后一切都回到一个请求。这也意味着这些指标可能不准确,你还想提取最小/