概述
一、问题现象
我们用的 DataX 版本比较老,在推送数据到 Elasticsearch ,根据主键更新数据时,发现有 null 不能更新到 Elasticsearch 中的问题,Elasticsearch 中还保持原来的值。
具体情况如下:
1、Elasticsearch 索引中有个 double 类型的字段,比如字段名叫 guar_fee_rate (担保费率),原来是有值的,比如值为1。
## 查询索引结构
GET my_test_indice/_mapping
{
"my_test_indice" : {
"mappings" : {
"properties" : {
"guar_fee_rate" : {
"type" : "long"
},
"guar_fee_rate " : {
"type" : "double" ## double类型的字段
}
}
}
}
}
## 查询数据
GET my_test_indice/_search
{
...略...
"hits" : [
{
"_index" : "my_test_indice",
"_type" : "_doc",
"_id" : "1",
"_score" : 1.0,
"_source" : {
"guar_fee_rate" : 1 ## 当前有值,为1
}
}
]
}
}
2、现在这个字段值变为 null,使用 DataX 推送更新到 Elasticsearch 中,预期是将ES中的 guar_fee_rate 值改为 null。
## 预期的值
"hits" : [
{
"_index" : "my_test_indice",
"_type" : "_doc",
"_id" : "1",
"_score" : 1.0,
"_source" : {
"guar_fee_rate" : null ## 想把字段值改为 null
}
}
]
3、但是发现, Elasticsearch 中的值没有变,还是保持原来的 1。
4、如果在 source 端将 null 值转换为空字符串后,则 DataX 运行时候会报错,报脏数据。
二、先说解决办法
经过一番排查问题,最终的解决方法是:
1、如果只是想解决 double 类型字段的问题,可以 在 DataX source 端将为 null 的字段值转换为字符串 “NaN”,DataX 才会将对应 ES 字段值更新为 null。
2、如果要解决 integer、keyword 等其他类型字段问题,升级 DataX 版本,或者二次开发。
三、排查过程 & 原因分析
针对于这个 null 值不能更到到 Elasticsearch 中的问题,排查分析过程如下:
1、先排查是 ES 的问题还是 DataX 的问题
测试一下给 ES double 类型字段赋 null,能更新成功吗?
## 创建索引
PUT my_test_indice
{
"mappings": {
"properties": {
"guar_fee_rate ": {
"type": "double"
}
}
}
}
## 插入值1
PUT my_test_indice/_doc/1
{
"guar_fee_rate":1
}
## 查询目前值为1
GET my_test_indice/_search
{
"hits" : [
{
"_index" : "my_test_indice",
"_type" : "_doc",
"_id" : "1",
"_score" : 1.0,
"_source" : {
"guar_fee_rate" : 1
}
}
]
}
## 更新为 null
POST my_test_indice/_update_by_query
{
"script": {
"source": "ctx._source['guar_fee_rate'] = null"
}
}
## 查询目前值为null
GET my_test_indice/_search
{
"hits" : [
{
"_index" : "my_test_indice",
"_type" : "_doc",
"_id" : "1",
"_score" : 1.0,
"_source" : {
"guar_fee_rate" : null
}
}
]
}
至此,说明 ES 的 double 类型字段是可以赋值为 null 的,那肯定是 DataX 的问题导致不能更新。
2、查找资料
先后翻看了 DataX 官方文档、百度、谷歌,都没有找到解决方案,甚至连问这个问题的都没有。
3、查看 DataX ElasticsearchWriter 源码 double 字段类型更新部分
查看写入 Elasticsearch 数据的类:
最新版本的链接如下,(我生产环境用的不是最新版本,但是可以参考): ElasticSearchWriter.java
查看这个方法,
void doBatchInsert(final List<Record> writerBuffer)
找到字段类型为 double 时的代码,column调用 asDouble() 方法获取值。
case DOUBLE:
data.put(columnName, column.asDouble());
break;
找到对应重载的实现方法,在 com.alibaba.datax.common.element.DoubleColumn 类中,
@Override
public Double asDouble() {
if (null == this.getRawData()) {
return null;
}
String string = (String) this.getRawData();
boolean isDoubleSpecific = string.equals("NaN")
|| string.equals("-Infinity") || string.equals("+Infinity");
if (isDoubleSpecific) {
return Double.valueOf(string);
}
BigDecimal result = this.asBigDecimal();
OverFlowUtil.validateDoubleNotOverFlow(result);
return result.doubleValue();
}
这里当值为 “NaN”、“-Infinity”、“+Infinity”,分别表示 不是数字、负无穷、正无穷,如果是这三个字符串,不会返回null,是可以返回值的,这时候我将DataX 源端值为 null 的情况下转换为字符串 “NaN”,发现可以 将 Elasticsearch 里的字段值更新为 null 了,实现了目标。
4、查看 DataX ElasticsearchWriter 源码 序列化 JSON 串部分
double 类型字段值的问题解决了,Integer和其他类型 null 值肯定也存在不能更新的问题,怎么解决呢,继续看代码。
void doBatchInsert(final List<Record> writerBuffer)
在这个方法下面,当 DataX 配置为 Updata 模式时,序列化的地方。
case UPDATE:
Map<String, Object> updateDoc = new HashMap<String, Object>();
updateDoc.put("doc", data);
updateDoc.put("doc_as_upsert", true);
Update.Builder update = null;
if (this.enableWriteNull) //这个 if 是最新版本DataX的功能,我用的版本没有
{
// write: {a:"1",b:null}
update = new Update.Builder(
JSONObject.toJSONString(updateDoc, SerializerFeature.WriteMapNullValue,
SerializerFeature.QuoteFieldNames, SerializerFeature.SkipTransientField,
SerializerFeature.WriteEnumUsingToString, SerializerFeature.SortField));
// 在DEFAULT_GENERATE_FEATURE基础上,只增加了SerializerFeature.WRITE_MAP_NULL_FEATURES
} else {
// write: {"a":"1"}
update = new Update.Builder(JSONObject.toJSONString(updateDoc));
}
这里的将对象序列化为JSON串用的 fastjson 工具,而 fastjson 对于值为 null 的节点,默认是丢弃掉了。比如:
@Test
public void test22() {
Map<String, Object> objectMap = new HashMap<>();
objectMap.put("a", 2);
objectMap.put("b", null);
objectMap.put("c", "abc");
String jsonStr = JSON.toJSONString(objectMap);
System.out.println(jsonStr);
// 输出内容为:{"a":2,"c":"abc"},b节点会丢弃掉。
}
我用的 DataX 版本是直接用 JSONObject.toJSONString(updateDoc) 序列化的,下一步用这个 json 串去更新 Elasticsearch 时,没有的字段,就会造成不更新,根本问题原因就在这里。
四、如何解决
彻底解决问题的办法就是修改对象序列化逻辑,让为 null 的节点也能序列化出来。我想到的解决方法有两个:
1、升级 DataX 版本
最新的 DataX 版本已经修复了这个 bug,就像前面代码里一样,增加了一个参数 enableWriteNull,默认是true,用这个参数控制是否将 null 值更新到 Elasticsearch 中,具体代码是这一段:
if (this.enableWriteNull)
{
// write: {a:"1",b:null}
update = new Update.Builder(
JSONObject.toJSONString(updateDoc, SerializerFeature.WriteMapNullValue,
SerializerFeature.QuoteFieldNames, SerializerFeature.SkipTransientField,
SerializerFeature.WriteEnumUsingToString, SerializerFeature.SortField));
// 在DEFAULT_GENERATE_FEATURE基础上,只增加了SerializerFeature.WRITE_MAP_NULL_FEATURES
}
在 toJSONString() 时指定 SerializerFeature.WriteMapNullValue 参数,这个是 fastjson 参数,指定是否将 null,也序列化出来。
2、修改源码
因为我们生产环境的 DataX 本身就是二次开发的,所以我们采用自己修改源码的方式,仿照最新版本的处理方式,解决这个问题。
至此,这个 DataX 更新 null 值到 Elasticsearch 不生效的问题算是完美解决了。
最后
以上就是幽默野狼为你收集整理的DataX更新null值到ElasticSearch不生效的问题的全部内容,希望文章能够帮你解决DataX更新null值到ElasticSearch不生效的问题所遇到的程序开发问题。
如果觉得靠谱客网站的内容还不错,欢迎将靠谱客网站推荐给程序员好友。
发表评论 取消回复