赞
踩
目前版本中,在迁移MongoDB时,若列的类型为二进制,mongodbreader未做处理,源码src/main/java/com/alibaba/datax/plugin/reader/mongodbreader/MongoDBReader.java
if (tempCol == null) { //continue; 这个不能直接continue会导致record到目的端错位 record.addColumn(new StringColumn(null)); }else if (tempCol instanceof Double) { //TODO deal with Double.isNaN() record.addColumn(new DoubleColumn((Double) tempCol)); } else if (tempCol instanceof Boolean) { record.addColumn(new BoolColumn((Boolean) tempCol)); } else if (tempCol instanceof Date) { record.addColumn(new DateColumn((Date) tempCol)); } else if (tempCol instanceof Integer) { record.addColumn(new LongColumn((Integer) tempCol)); }else if (tempCol instanceof Long) { record.addColumn(new LongColumn((Long) tempCol)); } else { if(KeyConstant.isArrayType(column.getString(KeyConstant.COLUMN_TYPE))) { String splitter = column.getString(KeyConstant.COLUMN_SPLITTER); if(Strings.isNullOrEmpty(splitter)) { throw DataXException.asDataXException(MongoDBReaderErrorCode.ILLEGAL_VALUE, MongoDBReaderErrorCode.ILLEGAL_VALUE.getDescription()); } else { ArrayList array = (ArrayList)tempCol; String tempArrayStr = Joiner.on(splitter).join(array); record.addColumn(new StringColumn(tempArrayStr)); } } else { record.addColumn(new StringColumn(tempCol.toString())); } }
修改为:
if (tempCol == null) { //continue; 这个不能直接continue会导致record到目的端错位 record.addColumn(new StringColumn(null)); }else if (tempCol instanceof Double) { //TODO deal with Double.isNaN() record.addColumn(new DoubleColumn((Double) tempCol)); } else if (tempCol instanceof Boolean) { record.addColumn(new BoolColumn((Boolean) tempCol)); } else if (tempCol instanceof Date) { record.addColumn(new DateColumn((Date) tempCol)); } else if (tempCol instanceof Integer) { record.addColumn(new LongColumn((Integer) tempCol)); }else if (tempCol instanceof Long) { record.addColumn(new LongColumn((Long) tempCol)); } else if (tempCol instanceof Binary) { // 处理 MongoDB 的 Binary 类型数据 Binary binaryData = (Binary) tempCol; byte[] binaryBytes = binaryData.getData(); // 将字节数组添加到 DataX 中的二进制列 record.addColumn(new BytesColumn(binaryBytes)); } else { if(KeyConstant.isArrayType(column.getString(KeyConstant.COLUMN_TYPE))) { String splitter = column.getString(KeyConstant.COLUMN_SPLITTER); if(Strings.isNullOrEmpty(splitter)) { throw DataXException.asDataXException(MongoDBReaderErrorCode.ILLEGAL_VALUE, MongoDBReaderErrorCode.ILLEGAL_VALUE.getDescription()); } else { ArrayList array = (ArrayList)tempCol; String tempArrayStr = Joiner.on(splitter).join(array); record.addColumn(new StringColumn(tempArrayStr)); } } else { record.addColumn(new StringColumn(tempCol.toString())); } }
编写job脚本:1.json
{ "job": { "content": [ { "reader": { "name": "mongodbreader", "parameter": { "address": ["ip1:27017"], "collectionName": "data", "column": [ { "name": "_id", "type": "long" }, { "name": "fileContent", "type": "bytes" } ], "dbName": "monitor", "userName": "root", "userPassword": "123456", "query": { "_id": { "$lt": 21 } } } }, "writer": { "name": "mongodbwriter", "parameter": { "address": ["ip2:27017"], "collectionName": "data", "column": [ { "name": "_id", "type": "long" }, { "name": "fileContent", "type": "bytes" } ], "writeMode": { "isReplace": "true", "replaceKey": "_id" } "dbName": "test", "userName": "root", "userPassword": "123456", } } } ], "setting": { "speed": { "channel": "2" } } } }
reader
中的query
节点为查询条件,上述demo中是查询_id
小于21的记录。执行命令:
python datax.py G:\Code\1.json
datax.py
在打包后的target目录下,相对路径:target\datax\datax\bin
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。