jjzjj

DataX更新null值到ElasticSearch不生效的问题

鸣宇淳 2023-04-20 原文

一、问题现象

我们用的 DataX 版本比较老,在推送数据到 Elasticsearch ,根据主键更新数据时,发现有 null 不能更新到 Elasticsearch 中的问题,Elasticsearch 中还保持原来的值。

具体情况如下:
1、Elasticsearch 索引中有个 double 类型的字段,比如字段名叫 guar_fee_rate (担保费率),原来是有值的,比如值为1。

## 查询索引结构
GET my_test_indice/_mapping

{
  "my_test_indice" : {
    "mappings" : {
      "properties" : {
        "guar_fee_rate" : {
          "type" : "long"
        },
        "guar_fee_rate " : {
          "type" : "double"  ## double类型的字段
        }
      }
    }
  }
}

## 查询数据
GET my_test_indice/_search

{
     ...... 
    "hits" : [
      {
        "_index" : "my_test_indice",
        "_type" : "_doc",
        "_id" : "1",
        "_score" : 1.0,
        "_source" : {
          "guar_fee_rate" : 1  ## 当前有值,为1
        }
      }
    ]
  }
}

2、现在这个字段值变为 null,使用 DataX 推送更新到 Elasticsearch 中,预期是将ES中的 guar_fee_rate 值改为 null。

## 预期的值
"hits" : [
      {
        "_index" : "my_test_indice",
        "_type" : "_doc",
        "_id" : "1",
        "_score" : 1.0,
        "_source" : {
          "guar_fee_rate" : null  ## 想把字段值改为 null
        }
      }
    ]

3、但是发现, Elasticsearch 中的值没有变,还是保持原来的 1。
4、如果在 source 端将 null 值转换为空字符串后,则 DataX 运行时候会报错,报脏数据。

二、先说解决办法

经过一番排查问题,最终的解决方法是:

1、如果只是想解决 double 类型字段的问题,可以 在 DataX source 端将为 null 的字段值转换为字符串 “NaN”,DataX 才会将对应 ES 字段值更新为 null
2、如果要解决 integer、keyword 等其他类型字段问题,升级 DataX 版本,或者二次开发。

三、排查过程 & 原因分析

针对于这个 null 值不能更到到 Elasticsearch 中的问题,排查分析过程如下:

1、先排查是 ES 的问题还是 DataX 的问题

测试一下给 ES double 类型字段赋 null,能更新成功吗?

## 创建索引
PUT my_test_indice
{
  "mappings": {
    "properties": {
      "guar_fee_rate ": {
        "type":       "double"
      }
    }
  }
}
## 插入值1
PUT my_test_indice/_doc/1
{
  "guar_fee_rate":1
}
## 查询目前值为1
GET my_test_indice/_search
{
	"hits" : [
	 {
	   "_index" : "my_test_indice",
	   "_type" : "_doc",
	   "_id" : "1",
	   "_score" : 1.0,
	   "_source" : {
	     "guar_fee_rate" : 1
	   }
	 }
	]
}

## 更新为 null
POST my_test_indice/_update_by_query
{
  "script": {
    "source": "ctx._source['guar_fee_rate'] = null"
  }
}

## 查询目前值为null
GET my_test_indice/_search
{
	"hits" : [
	 {
	   "_index" : "my_test_indice",
	   "_type" : "_doc",
	   "_id" : "1",
	   "_score" : 1.0,
	   "_source" : {
	     "guar_fee_rate" : null
	   }
	 }
	]
}

至此,说明 ES 的 double 类型字段是可以赋值为 null 的,那肯定是 DataX 的问题导致不能更新。

2、查找资料

先后翻看了 DataX 官方文档、百度、谷歌,都没有找到解决方案,甚至连问这个问题的都没有。

3、查看 DataX ElasticsearchWriter 源码 double 字段类型更新部分

查看写入 Elasticsearch 数据的类:
最新版本的链接如下,(我生产环境用的不是最新版本,但是可以参考): ElasticSearchWriter.java

查看这个方法,

void doBatchInsert(final List<Record> writerBuffer)

找到字段类型为 double 时的代码,column调用 asDouble() 方法获取值。

case DOUBLE:
     data.put(columnName, column.asDouble());
     break;

找到对应重载的实现方法,在 com.alibaba.datax.common.element.DoubleColumn 类中,

@Override
public Double asDouble() {
	if (null == this.getRawData()) {
		return null;
	}

	String string = (String) this.getRawData();

	boolean isDoubleSpecific = string.equals("NaN")
			|| string.equals("-Infinity") || string.equals("+Infinity");
	if (isDoubleSpecific) {
		return Double.valueOf(string);
	}

	BigDecimal result = this.asBigDecimal();
	OverFlowUtil.validateDoubleNotOverFlow(result);

	return result.doubleValue();
}

这里当值为 “NaN”、“-Infinity”、“+Infinity”,分别表示 不是数字、负无穷、正无穷,如果是这三个字符串,不会返回null,是可以返回值的,这时候我将DataX 源端值为 null 的情况下转换为字符串 “NaN”,发现可以 将 Elasticsearch 里的字段值更新为 null 了,实现了目标。

4、查看 DataX ElasticsearchWriter 源码 序列化 JSON 串部分

double 类型字段值的问题解决了,Integer和其他类型 null 值肯定也存在不能更新的问题,怎么解决呢,继续看代码。

void doBatchInsert(final List<Record> writerBuffer)

在这个方法下面,当 DataX 配置为 Updata 模式时,序列化的地方。

case UPDATE:
    Map<String, Object> updateDoc = new HashMap<String, Object>();
    updateDoc.put("doc", data);
    updateDoc.put("doc_as_upsert", true);
    Update.Builder update = null;
    if (this.enableWriteNull)  //这个 if 是最新版本DataX的功能,我用的版本没有
    {
            // write: {a:"1",b:null}
        update = new Update.Builder(
                JSONObject.toJSONString(updateDoc, SerializerFeature.WriteMapNullValue,
                        SerializerFeature.QuoteFieldNames, SerializerFeature.SkipTransientField,
                        SerializerFeature.WriteEnumUsingToString, SerializerFeature.SortField));
        // 在DEFAULT_GENERATE_FEATURE基础上,只增加了SerializerFeature.WRITE_MAP_NULL_FEATURES
    } else {
        // write: {"a":"1"}
        update = new Update.Builder(JSONObject.toJSONString(updateDoc));
    }

这里的将对象序列化为JSON串用的 fastjson 工具,而 fastjson 对于值为 null 的节点,默认是丢弃掉了。比如:

    @Test
    public void test22() {
        Map<String, Object> objectMap = new HashMap<>();
        objectMap.put("a", 2);
        objectMap.put("b", null);
        objectMap.put("c", "abc");

        String jsonStr = JSON.toJSONString(objectMap);
        System.out.println(jsonStr);
        // 输出内容为:{"a":2,"c":"abc"},b节点会丢弃掉。
    }

我用的 DataX 版本是直接用 JSONObject.toJSONString(updateDoc) 序列化的,下一步用这个 json 串去更新 Elasticsearch 时,没有的字段,就会造成不更新,根本问题原因就在这里。

四、如何解决

彻底解决问题的办法就是修改对象序列化逻辑,让为 null 的节点也能序列化出来。我想到的解决方法有两个:

1、升级 DataX 版本

最新的 DataX 版本已经修复了这个 bug,就像前面代码里一样,增加了一个参数 enableWriteNull,默认是true,用这个参数控制是否将 null 值更新到 Elasticsearch 中,具体代码是这一段:

if (this.enableWriteNull)
{
        // write: {a:"1",b:null}
    update = new Update.Builder(
            JSONObject.toJSONString(updateDoc, SerializerFeature.WriteMapNullValue,
                    SerializerFeature.QuoteFieldNames, SerializerFeature.SkipTransientField,
                    SerializerFeature.WriteEnumUsingToString, SerializerFeature.SortField));
    // 在DEFAULT_GENERATE_FEATURE基础上,只增加了SerializerFeature.WRITE_MAP_NULL_FEATURES
}

在 toJSONString() 时指定 SerializerFeature.WriteMapNullValue 参数,这个是 fastjson 参数,指定是否将 null,也序列化出来。

2、修改源码

因为我们生产环境的 DataX 本身就是二次开发的,所以我们采用自己修改源码的方式,仿照最新版本的处理方式,解决这个问题。

至此,这个 DataX 更新 null 值到 Elasticsearch 不生效的问题算是完美解决了。

有关DataX更新null值到ElasticSearch不生效的问题的更多相关文章

  1. ruby-on-rails - 如何验证 update_all 是否实际在 Rails 中更新 - 2

    给定这段代码defcreate@upgrades=User.update_all(["role=?","upgraded"],:id=>params[:upgrade])redirect_toadmin_upgrades_path,:notice=>"Successfullyupgradeduser."end我如何在该操作中实际验证它们是否已保存或未重定向到适当的页面和消息? 最佳答案 在Rails3中,update_all不返回任何有意义的信息,除了已更新的记录数(这可能取决于您的DBMS是否返回该信息)。http://ar.ru

  2. ruby - 在 64 位 Snow Leopard 上使用 rvm、postgres 9.0、ruby 1.9.2-p136 安装 pg gem 时出现问题 - 2

    我想为Heroku构建一个Rails3应用程序。他们使用Postgres作为他们的数据库,所以我通过MacPorts安装了postgres9.0。现在我需要一个postgresgem并且共识是出于性能原因你想要pggem。但是我对我得到的错误感到非常困惑当我尝试在rvm下通过geminstall安装pg时。我已经非常明确地指定了所有postgres目录的位置可以找到但仍然无法完成安装:$envARCHFLAGS='-archx86_64'geminstallpg--\--with-pg-config=/opt/local/var/db/postgresql90/defaultdb/po

  3. ruby - 通过 rvm 升级 ruby​​gems 的问题 - 2

    尝试通过RVM将RubyGems升级到版本1.8.10并出现此错误:$rvmrubygemslatestRemovingoldRubygemsfiles...Installingrubygems-1.8.10forruby-1.9.2-p180...ERROR:Errorrunning'GEM_PATH="/Users/foo/.rvm/gems/ruby-1.9.2-p180:/Users/foo/.rvm/gems/ruby-1.9.2-p180@global:/Users/foo/.rvm/gems/ruby-1.9.2-p180:/Users/foo/.rvm/gems/rub

  4. ruby - 通过 RVM (OSX Mountain Lion) 安装 Ruby 2.0.0-p247 时遇到问题 - 2

    我的最终目标是安装当前版本的RubyonRails。我在OSXMountainLion上运行。到目前为止,这是我的过程:已安装的RVM$\curl-Lhttps://get.rvm.io|bash-sstable检查已知(我假设已批准)安装$rvmlistknown我看到当前的稳定版本可用[ruby-]2.0.0[-p247]输入命令安装$rvminstall2.0.0-p247注意:我也试过这些安装命令$rvminstallruby-2.0.0-p247$rvminstallruby=2.0.0-p247我很快就无处可去了。结果:$rvminstall2.0.0-p247Search

  5. ruby-on-rails - 使用 rails 4 设计而不更新用户 - 2

    我将应用程序升级到Rails4,一切正常。我可以登录并转到我的编辑页面。也更新了观点。使用标准View时,用户会更新。但是当我添加例如字段:name时,它​​不会在表单中更新。使用devise3.1.1和gem'protected_attributes'我需要在设备或数据库上运行某种更新命令吗?我也搜索过这个地方,找到了许多不同的解决方案,但没有一个会更新我的用户字段。我没有添加任何自定义字段。 最佳答案 如果您想允许额外的参数,您可以在ApplicationController中使用beforefilter,因为Rails4将参数

  6. ruby - Fast-stemmer 安装问题 - 2

    由于fast-stemmer的问题,我很难安装我想要的任何ruby​​gem。我把我得到的错误放在下面。Buildingnativeextensions.Thiscouldtakeawhile...ERROR:Errorinstallingfast-stemmer:ERROR:Failedtobuildgemnativeextension./System/Library/Frameworks/Ruby.framework/Versions/2.0/usr/bin/rubyextconf.rbcreatingMakefilemake"DESTDIR="cleanmake"DESTDIR=

  7. ruby - 安装 Ruby 时遇到问题(无法下载资源 "readline--patch") - 2

    当我尝试安装Ruby时遇到此错误。我试过查看this和this但无济于事➜~brewinstallrubyWarning:YouareusingOSX10.12.Wedonotprovidesupportforthispre-releaseversion.Youmayencounterbuildfailuresorotherbreakages.Pleasecreatepull-requestsinsteadoffilingissues.==>Installingdependenciesforruby:readline,libyaml,makedepend==>Installingrub

  8. java - 从 JRuby 调用 Java 类的问题 - 2

    我正在尝试使用boilerpipe来自JRuby。我看过guide从JRuby调用Java,并成功地将它与另一个Java包一起使用,但无法弄清楚为什么同样的东西不能用于boilerpipe。我正在尝试基本上从JRuby中执行与此Java等效的操作:URLurl=newURL("http://www.example.com/some-location/index.html");Stringtext=ArticleExtractor.INSTANCE.getText(url);在JRuby中试过这个:require'java'url=java.net.URL.new("http://www

  9. ruby-on-rails - 简单的 Ruby on Rails 问题——如何将评论附加到用户和文章? - 2

    我意识到这可能是一个非常基本的问题,但我现在已经花了几天时间回过头来解决这个问题,但出于某种原因,Google就是没有帮助我。(我认为部分问题在于我是一个初学者,我不知道该问什么......)我也看过O'Reilly的RubyCookbook和RailsAPI,但我仍然停留在这个问题上.我找到了一些关于多态关系的信息,但它似乎不是我需要的(尽管如果我错了请告诉我)。我正在尝试调整MichaelHartl'stutorial创建一个包含用户、文章和评论的博客应用程序(不使用脚手架)。我希望评论既属于用户又属于文章。我的主要问题是:我不知道如何将当前文章的ID放入评论Controller。

  10. 【高数】用拉格朗日中值定理解决极限问题 - 2

    首先回顾一下拉格朗日定理的内容:函数f(x)是在闭区间[a,b]上连续、开区间(a,b)上可导的函数,那么至少存在一个,使得:通过这个表达式我们可以知道,f(x)是函数的主体,a和b可以看作是主体函数f(x)中所取的两个值。那么可以有,  也就意味着我们可以用来替换 这种替换可以用在求某些多项式差的极限中。方法: 外层函数f(x)是一致的,并且h(x)和g(x)是等价无穷小。此时,利用拉格朗日定理,将原式替换为 ,再进行求解,往往会省去复合函数求极限的很多麻烦。使用要注意:1.要先找到主体函数f(x),即外层函数必须相同。2.f(x)找到后,复合部分是等价无穷小。3.要满足作差的形式。如果是加

随机推荐