赞
踩
- 导出到 HDFS 或者本地的数据文件,需要直接导入 Hive 时,有时包含特殊字符,按照给定的字段单字符分隔符或者默认换行分隔符,插入到 Hive 的数据可能不是我们预期的,此时需要我们自定义 Hive 的分隔符。
-
- 同时Hive默认只支持单字符,如果需要支持多字符作为分隔符,可以按照如下方式重写输入格式化类来自定义分割符,也可以进行一些设置,并在建表时声明出来分割方式。下面会分别介绍这两种方式。在介绍之前先准备环境,介绍一下Hadoop和Hive的安装。
目录
环境使用 Centos 7,同时先安装好 JDK,这里假定集群已经配置好(比如SSH、NTP、防火墙、网络等)。Hive 需要依赖于 Hadoop,因此需要先安装好 Hadoop 和 Hive 。本次 Hadoop 选用的版本是 2.7.7,Hive的版本是 1.2.2 。
规划
节点名 | HDFS | YARN | Hive |
node1 | | | yes |
node2 | | | |
node3 | | |
1. 下载并解压
- wget http://archive.apache.org/dist/hadoop/common/hadoop-2.7.7/hadoop-2.7.7.tar.gz
-
- tar -zxf hadoop-2.7.7.tar.gz -C /opt/
- 2. 配置Hadoop环境变量
- 进入修改用户的环境变量配置文件:vim ~/.bash_profile ,添加如下配置,并使配置立即生效 source ~/.bash_profile
- #hadoop配置
- export HADOOP_HOME=/opt/hadoop-2.7.7
- export PATH=$PATH:$JAVA_HOME/bin:$HADOOP_HOME/bin:$HADOOP_HOME/sbin
配置完成后可以在终端输入: hadoop version 查看配置是否神效,如果生效可以看到Hadoop版本信息。
3. 创建需要的文件
- #文件名可以所以,和配置文件对应起来就行
- mkdir -p /opt/hadoop/dfs/dn
- mkdir -p /opt/hadoop/dfs/nn
- mkdir -p /opt/hadoop/dfs/snn
- mkdir -p /opt/hadoop/yarn/container-logs
- 4. 修改 hadoop-env.sh 配置文件
-
vim $HADOOP_HOME/etc/hadoop/hadoop-env.sh
配置如下,主要配置上JAVA_HOME
- export JAVA_HOME=/usr/local/zulu8
- export HADOOP_HOME=/opt/hadoop-2.7.7
- export HADOOP_CONF_DIR=${HADOOP_HOME}/etc/hadoop
- export HADOOP_COMMON_LIB_NATIVE_DIR=$HADOOP_HOME/lib/native
- export HADOOP_OPTS="-Djava.library.path=$HADOOP_HOME/lib:$HADOOP_COMMON_LIB_NATIVE_DIR"
- # 配置用户信息,如果是 root 用户启动,需要配置为 root;
- export HDFS_NAMENODE_USER=root
- export HDFS_DATANODE_USER=root
- export HDFS_SECONDARYNAMENODE_USER=root
- export YARN_RESOURCEMANAGER_USER=root export YARN_NODEMANAGER_USER=root
5. 修改 core-site.xml 配置文件
vim $HADOOP_HOME/etc/hadoop/core-site.xml
- <configuration>
- <property>
- <name>fs.defaultFS</name>
- <value>hdfs://node1:8020</value>
- </property>
- <!-- 设置垃圾回收的时间,0为禁止,单位分钟数 -->
- <property>
- <name>fs.trash.interval</name>
- <value>60</value>
- </property>
- <property>
- <name>fs.trash.checkpoint.interval</name>
- <value>0</value>
- </property>
- <property>
- <name>hadoop.proxyuser.root.groups</name>
- <value>*</value>
- </property>
- <property>
- <name>hadoop.proxyuser.root.hosts</name>
- <value>*</value>
- </property>
- </configuration>
6. 修改 hdfs-site.xml 配置文件
vim $HADOOP_HOME/etc/hadoop/hdfs-site.xml
- <configuration>
- <property>
- <name>dfs.namenode.name.dir</name>
- <value>file:///opt/hadoop/dfs/nn</value>
- </property>
- <property>
- <name>dfs.datanode.data.dir</name>
- <value>file:///opt/hadoop/dfs/dn</value>
- </property>
- <property>
- <name>dfs.namenode.checkpoint.dir</name>
- <value>file:///opt/hadoop/dfs/snn</value>
- </property>
- <!--block的副本数,默认为3-->
- <property>
- <name>dfs.replication</name>
- <value>1</value>
- </property>
- <property>
- <name>dfs.namenode.http-address</name>
- <value>node1:50070</value>
- </property>
- <property>
- <name>dfs.namenode.secondary.http-address</name>
- <value>node1:50090</value>
- </property>
- <property>
- <name>dfs.permissions</name>
- <value>false</value>
- </property>
- </configuration>
7. 修改 mapred-site.xml 配置文件
vim $HADOOP_HOME/etc/hadoop/mapred-site.xml
- <configuration>
- <property>
- <name>mapreduce.framework.name</name>
- <value>yarn</value>
- </property>
- <property>
- <name>mapreduce.jobhistory.address</name>
- <value>node1:10020</value>
- </property>
- <property>
- <name>mapreduce.jobhistory.webapp.address</name>
- <value>node1:19888</value>
- </property>
- <property>
- <name>mapreduce.jobhistory.webapp.https.address</name>
- <value>node1:19890</value>
- </property>
- </configuration>
8. 修改 yarn-site.xml 配置文件
vim $HADOOP_HOME/etc/hadoop/yarn-site.xml
- <configuration>
-
- <!-- Site specific YARN configuration properties -->
-
- <property>
- <name>yarn.resourcemanager.hostname</name>
- <value>node1</value>
- </property>
- <property>
- <name>yarn.nodemanager.aux-services</name>
- <value>mapreduce_shuffle</value>
- </property>
- <property>
- <name>yarn.nodemanager.aux-services.mapreduce.shuffle.class</name>
- <value>org.apache.hadoop.mapred.ShuffleHandler</value>
- </property>
-
- <!--NodeManager本地目录-->
- <property>
- <name>yarn.nodemanager.local-dirs</name>
- <value>file:///opt/hadoop/yarn</value>
- </property>
- <!--NodeManager容器日志目录-->
- <property>
- <name>yarn.nodemanager.log-dirs</name>
- <value>file:///opt/hadoop/yarn/container-logs</value>
- </property>
- <property>
- <name>yarn.log-aggregation-enable</name>
- <value>true</value>
- </property>
- <property>
- <name>yarn.log.server.url</name>
- <value>http://node1:19888/jobhistory/logs/</value>
- </property>
- <property>
- <name>yarn.nodemanager.vmem-check-enabled</name>
- <value>false</value>
- </property>
- <property>
- <name>yarn.application.classpath</name>
- <value>
- $HADOOP_HOME/etc/hadoop,
- $HADOOP_HOME/share/hadoop/common/*,
- $HADOOP_HOME/share/hadoop/common/lib/*,
- $HADOOP_HOME/share/hadoop/hdfs/*,
- $HADOOP_HOME/share/hadoop/hdfs/lib/*,
- $HADOOP_HOME/share/hadoop/mapreduce/*,
- $HADOOP_HOME/share/hadoop/mapreduce/lib/*,
- $HADOOP_HOME/share/hadoop/yarn/*,
- $HADOOP_HOME/share/hadoop/yarn/lib/*
- </value>
- </property>
- </configuration>
9. 修改 slaves 配置文件
- #vim $HADOOP_HOME/etc/hadoop/workers
- vim $HADOOP_HOME/etc/hadoop/slaves
- node1
- node2
- node3
10. 分发到各个节点
- scp -r $HADOOP_HOME/ root@node2:/opt/
- scp -r $HADOOP_HOME/ root@node3:/opt/
11. 格式化 NameNode
$HADOOP_HOME/bin/hdfs namenode -format
12. 启动 Hadoop或关闭
- #$HADOOP_HOME/sbin/start-dfs.sh
- #$HADOOP_HOME/sbin/start-yarn.sh
- $HADOOP_HOME/sbin/start-all.sh
-
- #如果关闭
- $HADOOP_HOME/sbin/stop-all.sh
- 13. 检查和测试
- 查看启动的线程 jps
在浏览器中输入 http://node1:50070 可以看到Hadoop的详细信息
运行自带的 WordCount 测试环境是否可以正常运行和计算
- hadoop dfs -mkdir /tmp/input
- hadoop fs -put $HADOOP_HOME/README.txt /tmp/input
- hadoop jar $HADOOP_HOME/share/hadoop/mapreduce/hadoop-mapreduce-examples-2.7.7.jar wordcount /tmp/input /tmp/output
- hadoop fs -tail /tmp/output/part-r-00000
1. 下载和解压
- wget http://archive.apache.org/dist/hive/hive-1.2.2/apache-hive-1.2.2-bin.tar.gz
-
- tar -zxf apache-hive-1.2.2-bin.tar.gz -C /opt/
2. 配置Hive环境变量
vim ~/.bash_profile
添加如下配置,保存并推出
- #Hive配置
- export HIVE_HOME=/opt/apache-hive-1.2.2-bin
- export PATH=$PATH:$HIVE_HOME/bin
3. Mysql需要设置一个可以远程访问的账号,然后再创建一个hive数据库
- mysql> use mysql;
- mysql> select host,user from user;
- mysql> grant all privileges on *.* to 'hive'@'%' identified by '远程访问mysql的密码' with grant option;
- mysql> flush privileges;
- mysql> create database metastore;
- mysql> exit;
添加Mysql驱动到$HIVE_HOME/lib 下
wget https://repo1.maven.org/maven2/mysql/mysql-connector-java/5.1.47/mysql-connector-java-5.1.47.jar -P $HIVE_HOME/lib/
4. 复制重命名 hive 的配置文件
- cd $HIVE_HOME/conf
- cp hive-env.sh.template hive-env.sh
- cp hive-default.xml.template hive-site.xml
- cp hive-log4j.properties.template hive-log4j.properties
- cp hive-exec-log4j.properties.template hive-exec-log4j.properties
- cp beeline-log4j.properties.template beeline-log4j.properties
5. 修改hive-env.sh中的内容:
- export JAVA_HOME=/usr/local/zulu8
- export HADOOP_HOME=/opt/hadoop-2.7.7
- export HIVE_HOME=/opt/apache-hive-1.2.2-bin
- export HIVE_CONF_DIR=$HIVE_HOME/conf
- 6. 配置hive-log4j.properties
- 找到 log4j.appender.EventCounter=org.apache.hadoop.hive.shims.HiveEventCounter 注释掉,改为:
- #log4j.appender.EventCounter=org.apache.hadoop.hive.shims.HiveEventCounter
- log4j.appender.EventCounter=org.apache.hadoop.log.metrics.EventCounter
7. 修改hive-site.xml
vim $HIVE_HOME/conf/hive-site.xml
- <configuration>
- <!-- 数据存储的HDFS目录,用来存储Hive数据库、表等数据 -->
- <property>
- <name>hive.metastore.warehouse.dir</name>
- <value>/hive/warehouse</value>
- <description>location of default database for the warehouse</description>
- </property>
- <!-- 远程服务HiveServer2绑定的IP -->
- <property>
- <name>hive.server2.thrift.bind.host</name>
- <value>node1</value>
- <description>Bind host on which to run the HiveServer2 Thrift service.</description>
- </property>
- <property>
- <name>hive.metastore.uris</name>
- <!-- <value/> -->
- <value>thrift://node1:9083</value>
- <description>Thrift URI for the remote metastore. Used by metastore client to connect to remote metastore.</description>
- </property>
- <!-- 配置数据库连接,用来存储数据库元信息 -->
- <property>
- <name>javax.jdo.option.ConnectionURL</name>
- <!--<value>jdbc:derby:;databaseName=metastore_db;create=true</value>-->
- <value>jdbc:mysql://node1:3306/metastore?createDatabaseIfNotExist=true&useUnicode=true&characterEncoding=utf8&useSSL=false</value>
- <description>
- JDBC connect string for a JDBC metastore.
- To use SSL to encrypt/authenticate the connection, provide database-specific SSL flag in the connection URL.
- For example, jdbc:postgresql://myhost/db?ssl=true for postgres database.
- </description>
- </property>
- <!-- 配置mysql数据库驱动名称 -->
- <property>
- <name>javax.jdo.option.ConnectionDriverName</name>
- <!--<value>org.apache.derby.jdbc.EmbeddedDriver</value>-->
- <value>com.mysql.jdbc.Driver</value>
- <description>Driver class name for a JDBC metastore</description>
- </property>
- <!-- Mysql数据库用户名 -->
- <property>
- <name>javax.jdo.option.ConnectionUserName</name>
- <!--<value>APP</value>-->
- <value>hive</value>
- <description>Username to use against metastore database</description>
- </property>
- <!-- Mysql数据库登陆密码 -->
- <property>
- <name>javax.jdo.option.ConnectionPassword</name>
- <!--<value>mine</value>-->
- <value>123456</value>
- <description>password to use against metastore database</description>
- </property>
- <!-- 启动时自动建表 -->
- <property>
- <name>datanucleus.schema.autoCreateAll</name>
- <value>true</value>
- <description>Auto creates necessary schema on a startup if one doesn't exist. Set this to false, after creating it once.To enable auto create also set hive.metastore.schema.verification=false. Auto creation is not recommended for production use cases, run schematool command instead.</description>
- </property>
- <property>
- <name>hive.metastore.schema.verification</name>
- <value>false</value>
- <description>
- Enforce metastore schema version consistency.
- True: Verify that version information stored in is compatible with one from Hive jars. Also disable automatic
- schema migration attempt. Users are required to manually migrate schema after Hive upgrade which ensures
- proper metastore schema migration. (Default)
- False: Warn if the version information stored in metastore doesn't match with one from in Hive jars.
- </description>
- </property>
- <!--Hive的job临时空间-->
- <property>
- <name>hive.exec.local.scratchdir</name>
- <!-- <value>${system:java.io.tmpdir}/${system:user.name}</value> -->
- <value>/tmp/hive/exec/${user.name}</value>
- <description>Local scratch space for Hive jobs</description>
- </property>
- <property>
- <name>hive.downloaded.resources.dir</name>
- <!-- <value>${system:java.io.tmpdir}/${hive.session.id}_resources</value> -->
- <value>/tmp/hive/${hive.session.id}_resources</value>
- <description>Temporary local directory for added resources in the remote file system.</description>
- </property>
- <property>
- <name>hive.querylog.location</name>
- <!-- <value>${system:java.io.tmpdir}/${system:user.name}</value> -->
- <value>/tmp/hive/log</value>
- <description>Location of Hive run time structured log file</description>
- </property>
- <property>
- <name>hive.server2.logging.operation.log.location</name>
- <!-- <value>${system:java.io.tmpdir}/${system:user.name}/operation_logs</value> -->
- <value>/tmp/hive/server2/${user.name}/operation_logs</value>
- <description>Top level directory where operation logs are stored if logging functionality is enabled</description>
- </property>
- <!--配置执行动态分区的模式。nonstrict:不严格模式;strict:严格模式-->
- <property>
- <name>hive.exec.dynamic.partition.mode</name>
- <value>nonstrict</value>
- <description>
- In strict mode, the user must specify at least one static partition
- in case the user accidentally overwrites all partitions.
- In nonstrict mode all partitions are allowed to be dynamic.
- </description>
- </property>
-
- <property>
- <name>hive.server2.authentication</name>
- <value>NONE</value>
- <description>
- Expects one of [nosasl, none, ldap, kerberos, pam, custom].
- Client authentication types.
- NONE: no authentication check
- LDAP: LDAP/AD based authentication
- KERBEROS: Kerberos/GSSAPI authentication
- CUSTOM: Custom authentication provider
- (Use with property hive.server2.custom.authentication.class)
- PAM: Pluggable authentication module
- NOSASL: Raw transport
- </description>
- </property>
- <property>
- <name>hive.server2.thrift.client.user</name>
- <!--<value>anonymous</value>-->
- <value>hive</value>
- <description>Username to use against thrift client</description>
- </property>
- <property>
- <name>hive.server2.thrift.client.password</name>
- <!--<value>anonymous</value>-->
- <value>hive</value>
- <description>Password to use against thrift client</description>
- </property>
- </configuration>
- 8. 初始化 hive
- 这一步会在 Mysql 的 metastore 库下初始化的表。
$HIVE_HOME/bin/schematool -dbType mysql -initSchema
- 9. 启动 hive
- 如果是需要远程连接,这两个服务必须开启,比如JDBC、数据库工具、beeline等。
- hive --service metastore >/dev/null 2>&1 &
- hive --service hiveserver2 >/dev/null 2>&1 &
10 使用和退出
- hive
- hive> show databases;
- OK
- default
- Time taken: 0.786 seconds, Fetched: 1 row(s)
- hive> quit;
默认情况下,Hive对于分隔符只支持单字符,不过Hive自带一个工具jar包,这个包支持正则和多字符方式定义分隔符。
1. 查询hive自带的工具jar包位置
find / -name hive-contrib-*.jar
2. 将上面搜索到的jar包配置到配置hive-site.xml文件中
- <property>
- <name>hive.aux.jars.path</name>
- <value>file:///opt/apache-hive-1.2.2-bin/lib/hive-contrib-1.2.2.jar</value>
- <description>Added by tiger.zeng on 20120202.These JAR file are available to all users for all jobs</description>
- </property>
上面配置之后可以不用重启Hive服务,只需要重新进入Hive CLI就可生效,且是永久的。也可以配置为临时的,就是在进入Hive CLI后,临时加载这个jar包,执行如下:
hive> add jar file:///opt/apache-hive-1.2.2-bin/lib/hive-contrib-1.2.2.jar
- 3. 使用
- 准备如下数据,分隔符为 |#|,
- 3324|#|003|#|20190816 09:16:18|#|0.00|#|2017-11-13 12:00:00
- 3330|#|009|#|20190817 15:21:03|#|1234.56|#|2017-11-14 12:01:00
建表时如下声明与定义如下,并加载数据,查询数据:
- drop table if exists split_test;
- CREATE TABLE split_test(
- id INT COMMENT '借阅查询ID',
- number STRING COMMENT '流水号',
- `date` STRING COMMENT '查询返回日期',
- loanamount DOUBLE COMMENT '借款金额范围',
- createtime TIMESTAMP COMMENT '创建时间'
- )ROW FORMAT SERDE 'org.apache.hadoop.hive.contrib.serde2.MultiDelimitSerDe'
- WITH SERDEPROPERTIES ("field.delim"="|#|")
- STORED AS TEXTFILE;
-
- --加载数据
- LOAD DATA LOCAL INPATH '/root/split_test.txt' OVERWRITE INTO TABLE split_test;
-
查询结果如下:
- --查询数据
- hive> select * from split_test;
- OK
- 3324 003 20190816 09:16:18 0.0 2017-11-13 12:00:00
- 3330 009 20190817 15:21:03 1234.56 2017-11-14 12:01:00
- Time taken: 0.11 seconds, Fetched: 2 row(s)
自定义部分使用 Java 编写,使用Idea新建一个 Maven项目。
- <properties>
- <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
- <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
- <maven.compiler.source>1.8</maven.compiler.source>
- <maven.compiler.target>1.8</maven.compiler.target>
- <java.version>1.8</java.version>
- </properties>
-
- <dependencies>
- <dependency>
- <groupId>org.apache.hadoop</groupId>
- <artifactId>hadoop-client</artifactId>
- <version>2.7.7</version>
- </dependency>
- <dependency>
- <groupId>org.apache.hadoop</groupId>
- <artifactId>hadoop-common</artifactId>
- <version>2.7.7</version>
- </dependency>
- <dependency>
- <groupId>org.apache.hadoop</groupId>
- <artifactId>hadoop-hdfs</artifactId>
- <version>2.7.7</version>
- </dependency>
- <dependency>
- <groupId>org.apache.hadoop</groupId>
- <artifactId>hadoop-mapreduce-client-core</artifactId>
- <version>2.7.7</version>
- </dependency>
-
- <dependency>
- <groupId>org.apache.hive</groupId>
- <artifactId>hive-exec</artifactId>
- <version>1.2.2</version>
- <!--<exclusions>
- <exclusion>
- <groupId>org.pentaho</groupId>
- <artifactId>pentaho-aggdesigner-algorithm</artifactId>
- </exclusion>
- </exclusions>-->
- </dependency>
-
- </dependencies>
- package yore.hive;
-
- import org.apache.hadoop.fs.FileSystem;
- import org.apache.hadoop.fs.Path;
- import org.apache.hadoop.io.LongWritable;
- import org.apache.hadoop.io.Text;
- import org.apache.hadoop.io.compress.CompressionCodec;
- import org.apache.hadoop.io.compress.CompressionCodecFactory;
- import org.apache.hadoop.io.compress.SplittableCompressionCodec;
- import org.apache.hadoop.mapred.*;
-
- import java.io.IOException;
-
- /**
- * 自定义重写的 TextInputFormat 类
- * 此类是将org.apache.hadoop.mapred下的TextInputFormat源码拷贝进来进行改写。
- *
- * <pre>
- * Hive将HDFS上的文件导入Hive会进行如下处理:
- * 调用InputFormat,将文件切成不同的文档。每篇文档即一行(Row)。
- * 调用SerDe的Deserializer,将一行(Row),切分为各个字段。
- *
- * 可以查看hadoop-mapreduce-client-core-2.7.7.jar包org.apache.hadoop.mapred下的类TextInputFormat。
- * 建表前在hive的CLI界面上输入如下即可实现自定义多字符换行符
- * set textinputformat.record.delimiter=<自定义换行字符串>;
- * Maven项目的 pom.xml文件中添加如下依赖。
- *
- * </pre>
- *
- * Created by yore on 2019/4/3 17:56
- */
- public class SQPTextInputFormat extends FileInputFormat<LongWritable, Text> implements JobConfigurable {
-
- private CompressionCodecFactory compressionCodecs = null;
- //"US-ASCII""ISO-8859-1""UTF-8""UTF-16BE""UTF-16LE""UTF-16"
- private final static String defaultEncoding = "UTF-8";
- private String encoding = null;
-
- public void configure(JobConf jobConf) {
- this.compressionCodecs = new CompressionCodecFactory(jobConf);
- }
-
- @Override
- protected boolean isSplitable(FileSystem fs, Path filename) {
- CompressionCodec codec = this.compressionCodecs.getCodec(filename);
- if (null == codec) {
- return true;
- }
- return codec instanceof SplittableCompressionCodec;
- }
-
- public RecordReader<LongWritable, Text> getRecordReader(InputSplit inputSplit, JobConf jobConf, Reporter reporter) throws IOException {
- reporter.setStatus(inputSplit.toString());
- String delimiter = jobConf.get("textinputformat.record.linesep");
- this.encoding = jobConf.get("textinputformat.record.encoding",defaultEncoding);
- byte[] recordDelimiterBytes = null;
- if (null != delimiter) {//Charsets.UTF_8
- recordDelimiterBytes = delimiter.getBytes(this.encoding);
- }
- return new SQPRecordReader(jobConf, (FileSplit)inputSplit, recordDelimiterBytes);
- }
-
- }
- package yore.hive;
-
- import org.apache.commons.logging.Log;
- import org.apache.commons.logging.LogFactory;
- import org.apache.hadoop.conf.Configuration;
- import org.apache.hadoop.fs.FSDataInputStream;
- import org.apache.hadoop.fs.FileSystem;
- import org.apache.hadoop.fs.Path;
- import org.apache.hadoop.fs.Seekable;
- import org.apache.hadoop.io.LongWritable;
- import org.apache.hadoop.io.Text;
- import org.apache.hadoop.io.compress.*;
- import org.apache.hadoop.mapred.FileSplit;
- import org.apache.hadoop.mapred.RecordReader;
- import org.apache.hadoop.util.LineReader;
-
- import java.io.IOException;
- import java.io.InputStream;
-
- /**
- * 自定义的 LineRecordReader 类
- * 此类是将org.apache.hadoop.mapred下的 LineRecordReader 源码拷贝进来进行改写。
- *
- * Created by yore on 2019/4/3 18:03
- */
- public class SQPRecordReader implements RecordReader<LongWritable, Text> {
-
- private static final Log LOG = LogFactory.getLog(SQPRecordReader.class.getName());
-
- private CompressionCodecFactory compressionCodecs = null;
- private long start;
- private long pos;
- private long end;
- private LineReader in;
- private FSDataInputStream fileIn;
- private final Seekable filePosition;
- int maxLineLength;
- private CompressionCodec codec;
- private Decompressor decompressor;
- //field separator
- private String FieldSep;
- private static final String defaultFSep="\001";
- //"US-ASCII""ISO-8859-1""UTF-8""UTF-16BE""UTF-16LE""UTF-16"
- private final static String defaultEncoding = "UTF-8";
- private String encoding = null;
-
- public SQPRecordReader(Configuration job, FileSplit split) throws IOException {
- this(job, split, null);
- }
-
- public SQPRecordReader(Configuration job, FileSplit split, byte[] recordDelimiter) throws IOException {
- this.maxLineLength = job.getInt("mapreduce.input.linerecordreader.line.maxlength", 2147483647);
- this.FieldSep = job.get("textinputformat.record.fieldsep",defaultFSep);
- this.encoding = job.get("textinputformat.record.encoding",defaultEncoding);
- this.start = split.getStart();
- this.end = (this.start + split.getLength());
- Path file = split.getPath();
- this.compressionCodecs = new CompressionCodecFactory(job);
- this.codec = this.compressionCodecs.getCodec(file);
-
- FileSystem fs = file.getFileSystem(job);
- this.fileIn = fs.open(file);
- if (isCompressedInput()) {
- this.decompressor = CodecPool.getDecompressor(this.codec);
- if ((this.codec instanceof SplittableCompressionCodec)) {
- SplitCompressionInputStream cIn = ((SplittableCompressionCodec)this.codec).createInputStream(this.fileIn, this.decompressor, this.start, this.end, SplittableCompressionCodec.READ_MODE.BYBLOCK);
-
- this.in = new LineReader(cIn, job, recordDelimiter);
- this.start = cIn.getAdjustedStart();
- this.end = cIn.getAdjustedEnd();
- this.filePosition = cIn;
- } else {
- this.in = new LineReader(this.codec.createInputStream(this.fileIn, this.decompressor), job, recordDelimiter);
- this.filePosition = this.fileIn;
- }
- } else {
- this.fileIn.seek(this.start);
- this.in = new LineReader(this.fileIn, job, recordDelimiter);
- this.filePosition = this.fileIn;
- }
-
- if (this.start != 0L) {
- this.start += this.in.readLine(new Text(), 0, maxBytesToConsume(this.start));
- }
- this.pos = this.start;
- }
-
- public SQPRecordReader(InputStream in, long offset, long endOffset, int maxLineLength) {
- this(in, offset, endOffset, maxLineLength, null);
- }
-
- public SQPRecordReader(InputStream in, long offset, long endOffset, int maxLineLength, byte[] recordDelimiter) {
- this.maxLineLength = maxLineLength;
- this.in = new LineReader(in, recordDelimiter);
- this.start = offset;
- this.pos = offset;
- this.end = endOffset;
- this.filePosition = null;
- }
-
- public SQPRecordReader(InputStream in, long offset, long endOffset, Configuration job) throws IOException {
- this(in, offset, endOffset, job, null);
- }
-
- public SQPRecordReader(InputStream in, long offset, long endOffset, Configuration job, byte[] recordDelimiter) throws IOException {
- this.maxLineLength = job.getInt("mapreduce.input.linerecordreader.line.maxlength", 2147483647);
- this.in = new LineReader(in, job, recordDelimiter);
- this.start = offset;
- this.pos = offset;
- this.end = endOffset;
- this.filePosition = null;
- }
-
- public LongWritable createKey() {
- return new LongWritable();
- }
-
- public Text createValue() {
- return new Text();
- }
-
- private boolean isCompressedInput() {
- return this.codec != null;
- }
-
- private int maxBytesToConsume(long pos) {
- return isCompressedInput() ? 2147483647 : (int)Math.min(2147483647L, this.end - pos);
- }
-
- private long getFilePosition() throws IOException {
- long retVal;
- if ((isCompressedInput()) && (null != this.filePosition))
- retVal = this.filePosition.getPos();
- else {
- retVal = this.pos;
- }
- return retVal;
- }
-
-
- public boolean next(LongWritable longWritable, Text text) throws IOException {
- while (getFilePosition() <= this.end) {
- longWritable.set(this.pos);
-
- int newSize = this.in.readLine(text, this.maxLineLength, Math.max(maxBytesToConsume(this.pos), this.maxLineLength));
-
- if (newSize == 0) {
- return false;
- }
-
- if (encoding.compareTo(defaultEncoding) != 0) {
- String str = new String(text.getBytes(), 0, text.getLength(), encoding);
- text.set(str);
- }
-
- if (FieldSep.compareTo(defaultFSep) != 0) {
- String replacedValue = text.toString().replace(FieldSep, defaultFSep);
- text.set(replacedValue);
- }
-
- this.pos += newSize;
- if (newSize < this.maxLineLength) {
- return true;
- }
-
- LOG.info("Skipped line of size " + newSize + " at pos " + (this.pos - newSize));
- }
-
- return false;
- }
-
- public long getPos() throws IOException {
- return this.pos;
- }
-
- public void close() throws IOException {
- try {
- if (this.in != null)
- this.in.close();
- }
- finally {
- if (this.decompressor != null)
- CodecPool.returnDecompressor(this.decompressor);
- }
- }
-
- public float getProgress() throws IOException {
- if(this.start == this.end){
- return 0.0F;
- }
- return Math.min(1.0F, (float)(getFilePosition() - this.start) / (float)(this.end - this.start));
- }
-
- }
启动 Hive CLI 后,先测试和查看当前 Hive 的信息,输入如下命令
- hive> create table test1(id int);
- OK
- Time taken: 0.442 seconds
- hive> show tables;
- OK
- test1
- Time taken: 0.031 seconds, Fetched: 1 row(s)
- hive> describe extended test1;
- OK
- id int
-
- Detailed Table Information Table(tableName:test1, dbName:default, owner:root, createTime:1554317591, lastAccessTime:0, retention:0, sd:StorageDescriptor(cols:[FieldSchema(name:id, type:int, comment:null)], location:hdfs://node1:8020/hive/warehouse/test1, inputFormat:org.apache.hadoop.mapred.TextInputFormat, outputFormat:org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat, compressed:false, numBuckets:-1, serdeInfo:SerDeInfo(name:null, serializationLib:org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe, parameters:{serialization.format=1}), bucketCols:[], sortCols:[], parameters:{}, skewedInfo:SkewedInfo(skewedColNames:[], skewedColValues:[], skewedColValueLocationMaps:{}), storedAsSubDirectories:false), partitionKeys:[], parameters:{transient_lastDdlTime=1554317591}, viewOriginalText:null, viewExpandedText:null, tableType:MANAGED_TABLE)
- Time taken: 0.126 seconds, Fetched: 3 row(s)
- hive>
从上面打印的信息可以看到,hive 的输入和输出调用的类有:
下面我们就来设置 Hive 的输入格式化为我们刚才开发的自定的类,如果设置成功后再次查看 inputFormat会是我们添加的那个类了。
3.4.1 将源码打包
因为项目是一个 Maven 项目,可以直接使用 Maven 命令打包 mvn clean package。如果是在 Idea 则直接点击运行右侧的 Maven Projects --> Lifecycle --> package 。
3.4.2 将打好的 jar 包添加到环境的lib库中
将程序打成jar包,放到 Hive 和 Hadoop 的lib库下。如果有多个节点,每个节点下都需要上传一份。
3.4.3 设置 Hive 加载数据的编码格式、自定义字段分隔符和自定义换行符
添加完 jar 包后需要重新进入 Hive CLI。进入Hive CLI输入如下命令:
- //"US-ASCII""ISO-8859-1""UTF-8""UTF-16BE""UTF-16LE""UTF-16"
- set textinputformat.record.encoding=UTF-8;
- // 字段间的切分字符
- set textinputformat.record.fieldsep=,;
- // 行切分字符
- set textinputformat.record.linesep=|+|;
以上命令设置输入文件的编码格式为 UTF-8,字符安间的分割符为英文逗号,行分隔符为 |+| 符。
3.4.4 在本地创建一个文件,测试数据如下
例如 在家目录下创建一个 hive_separator.txt 文件,输入如下测试数据:
3,Yore|+|9,Yuan|+|11,東
3.4.5 创建一个测试表
其中 INPUTFORMAT 为我们自定的 TextInputFormat 类的全限定类名:yore.hive.SQPTextInputFormat.java
- create table test (
- id string,
- name string
- ) stored as
- INPUTFORMAT 'yore.hive.SQPTextInputFormat'
- OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat' ;
3.4.6 加载测试数据到表中
- load data local inpath '/root/hive_separator.txt'
- overwrite into table test;
3.4.7 查询数据
- hive> select * from test;
- OK
- 3 Yore
- 9 Yuan
- 11 東
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。