赞
踩
一个云盘存储系统,将文件存入Hadoop中或者将Hadoop中的文件取出到本地。
#运行用户 user root; #启动进程,通常设置成和cpu的数量相等 worker_processes 1; #全局错误日志及PID文件 error_log /usr/local/nginx/logs/error.log; pid /usr/local/nginx/logs/nginx.pid; #工作模式及连接数上限 events { #epoll是多路复用IO(I/O Multiplexing)中的一种方式,但是仅用于linux2.6以上内核,可以大大提高nginx的性能 use epoll; #单个后台worker process进程的最大并发链接数 worker_connections 1024; } #设定http服务器,利用它的反向代理功能提供负载均衡支持 http { include /usr/local/nginx/conf/mime.types; default_type application/octet-stream; access_log /usr/local/nginx/logs/access.log; sendfile on; keepalive_timeout 65; #开启gzip压缩 gzip on; gzip_disable "MSIE [1-6]\.(?!.*SV1)"; upstream dc{ server 192.168.65.103:8061; server 192.168.65.104:8061; } server { listen 81; server_name localhost; location / { proxy_pass http://dc; } #静态文件,nginx自己处理 location ~ ^/(images|javascript|js|css|flash|media|static)/ { root /opt/module/nginx-1.22.0/static; #过期30天,静态文件不怎么更新,过期可以设大一点,如果频繁更新,则可以设置得小一点。 expires 3d; } error_page 500 502 503 504 /50x.html; location = /50x.html { root html; } } }
1.注释本地地址绑定
2.修改保护模式
3.访问端口
4.后台启动
# bind * -::* # like the default, all available interfaces
protected-mode no
port 6379
requirepass xrl
daemonize yes
# Name the components on this agent a1.sources = r1 a1.sinks = k1 a1.channels = c1 # Describe/configure the source a1.sources.r1.type = exec a1.sources.r1.command = tail -F /opt/item/xrlcloud.log a1.sources.r1.shell = /bin/bash -c # Describe the sink a1.sinks.k1.type = avro a1.sinks.k1.hostname = hadoop102 a1.sinks.k1.port = 4141 # Describe the channel a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 # Bind the source and sink to the channel a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1
Hadoop102上收集103和104发过来的日志信息和nginx的日志信息并上传
# Name the components on this agent a3.sources = r1 r2 a3.sinks = k1 k2 a3.channels = c1 c2 # Describe/configure the source a3.sources.r1.type = avro a3.sources.r1.bind = hadoop102 a3.sources.r1.port = 4141 a3.sources.r2.type = TAILDIR a3.sources.r2.positionFile = /opt/item/tail_dir.json a3.sources.r2.filegroups = f1 f2 a3.sources.r2.filegroups.f1 = /usr/local/nginx/logs/access.log a3.sources.r2.filegroups.f2 = /usr/local/nginx/logs/error.log # Describe the sink a3.sinks.k1.type = hdfs a3.sinks.k1.hdfs.path = hdfs://xrlhadoop/clouddisk/flume/itemlogs/%Y%m%d/%H #上传文件的前缀 a3.sinks.k1.hdfs.filePrefix = item- #是否按照时间滚动文件夹 a3.sinks.k1.hdfs.round = true #多少时间单位创建一个新的文件夹 a3.sinks.k1.hdfs.roundValue = 1 #重新定义时间单位 a3.sinks.k1.hdfs.roundUnit = hour #是否使用本地时间戳 a3.sinks.k1.hdfs.useLocalTimeStamp = true #积攒多少个 Event 才 flush 到 HDFS 一次 a3.sinks.k1.hdfs.batchSize = 100 #设置文件类型,可支持压缩 a3.sinks.k1.hdfs.fileType = DataStream #多久生成一个新的文件 a3.sinks.k1.hdfs.rollInterval = 30 #设置每个文件的滚动大小大概是 128M a3.sinks.k1.hdfs.rollSize = 134217700 #文件的滚动与 Event 数量无关 a3.sinks.k1.hdfs.rollCount = 0 a3.sinks.k2.type = hdfs a3.sinks.k2.hdfs.path = hdfs://xrlhadoop/clouddisk/flume/nginxlogs/%Y%m%d/%H #上传文件的前缀 a3.sinks.k2.hdfs.filePrefix = nginx- #是否按照时间滚动文件夹 a3.sinks.k2.hdfs.round = true #多少时间单位创建一个新的文件夹 a3.sinks.k2.hdfs.roundValue = 1 #重新定义时间单位 a3.sinks.k2.hdfs.roundUnit = hour #是否使用本地时间戳 a3.sinks.k2.hdfs.useLocalTimeStamp = true #积攒多少个 Event 才 flush 到 HDFS 一次 a3.sinks.k2.hdfs.batchSize = 100 #设置文件类型,可支持压缩 a3.sinks.k2.hdfs.fileType = DataStream #多久生成一个新的文件 a3.sinks.k2.hdfs.rollInterval = 60 #设置每个文件的滚动大小大概是 128M a3.sinks.k2.hdfs.rollSize = 134217700 #文件的滚动与 Event 数量无关 a3.sinks.k2.hdfs.rollCount = 0 # Describe the channel a3.channels.c1.type = memory a3.channels.c1.capacity = 1000 a3.channels.c1.transactionCapacity = 100 a3.channels.c2.type = memory a3.channels.c2.capacity = 1000 a3.channels.c2.transactionCapacity = 100 # Bind the source and sink to the channel a3.sources.r1.channels = c1 a3.sinks.k1.channel = c1 a3.sources.r2.channels = c2 a3.sinks.k2.channel = c2
#!bin/bash
/opt/module/flume-1.10.0/bin/flume-ng agent -c /opt/module/flume-1.10.0/conf -n a1 -f /opt/item/flume/flume-item1.conf
hadoop104
flume-item2.sh
#!bin/bash
/opt/module/flume-1.10.0/bin/flume-ng agent -c /opt/module/flume-1.10.0/conf -n a2 -f /opt/item/flume/flume-item2.conf
hadoop102
flume-cloudddisk.sh
#!/bin/bash
/opt/module/flume-1.10.0/bin/flume-ng agent -c /opt/module/flume-1.10.0/conf -n a3 -f /opt/item/flume/flume-cloudddisk.conf
flume-item.sh 启动flume
#!/bin/bash
sh /opt/item/flume/flume-cloudddisk.sh
ssh root@hadoop103 sh /opt/item/flume/flum-item1.sh
ssh root@hadoop104 sh /opt/item/flume/flum-item2.sh
--用户信息,文件信息
sqoop import --connect jdbc:mysql://192.168.65.102:3306/clouddis?serverTimezone=UTC --username root --password root --table user --target-dir /clouddisk/sqoop/user
sqoop import --connect jdbc:mysql://192.168.65.102:3306/clouddis?serverTimezone=UTC --username root --password root --table diskfile --target-dir /clouddisk/sqoop/diskfile
--增量导入
sqoop job --create cditem-job1 -- import --connect jdbc:mysql://192.168.65.102:3306/clouddis?serverTimezone=UTC --username root --table user --password-file file:root/.mysql.password --target-dir /clouddisk/sqoop/user -m 1 --check-column uid --incremental append --last-value 3
sqoop job --create cditem-job2 -- import --connect jdbc:mysql://192.168.65.102:3306/clouddis?serverTimezone=UTC --username root --table diskfile --password-file file:root/.mysql.password --target-dir /clouddisk/sqoop/diskfile -m 1 --check-column f_id --incremental append --last-value 6
--用户访问信息
sqoop import --connect jdbc:mysql://192.168.65.102:3306/clouddis?serverTimezone=UTC --username root --password root --table dcrequestmsg --target-dir /clouddisk/sqoop/dcrequestmsg
--增量导入
sqoop job --create cditem-job3 -- import --connect jdbc:mysql://192.168.65.102:3306/clouddis?serverTimezone=UTC --username root --table dcrequestmsg --password-file file:root/.mysql.password --target-dir /clouddisk/sqoop/dcrequestmsg -m 1 --check-column d_id --incremental append --last-value 36
sqoop job任务的运行脚本 --sqoop_incremental.sh
#!/bin/bash
/opt/module/sqoop-1.4.7/bin/sqoop job --exec cditem-job1>>/opt/module/sqoop-1.4.7/sqoopTasks/cditem-job1.out 2>&1
/opt/module/sqoop-1.4.7/bin/sqoop job --exec cditem-job2>>/opt/module/sqoop-1.4.7/sqoopTasks/cditem-job2.out 2>&1
/opt/module/sqoop-1.4.7/bin/sqoop job --exec cditem-job3>>/opt/module/sqoop-1.4.7/sqoopTasks/cditem-job3.out 2>&1
<plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-shade-plugin</artifactId> <version>3.2.4</version> <executions> <execution> <phase>package</phase> <goals> <goal>shade</goal> </goals> </execution> </executions> <configuration> <artifactSet> <includes> <include>mysql:mysql-connector-java</include> </includes> </artifactSet> </configuration> </plugin>
#!/bin/bash
hadoop jar /opt/item/mapreduce/dcmapreduce.jar com/xrl/dcmsgmapreduce/fileranking/FileRankingMapReduce /clouddisk/sqoop/dcrequestmsg /dcmapreduce/FileRanking
package com.xrl.dao; import java.sql.*; /** * @author xrl * @create 2022-07-05 15:10 */ public class FileRankingDao { private Connection conn = null; private PreparedStatement pstmt = null; private int count = 0; private final String sql = "insert into fileranking values(null ,?,?)"; public FileRankingDao() { innit(); } public void formatting() { String sql = "delete from fileranking"; Statement st = null; try { st = conn.createStatement(); st.executeUpdate(sql); } catch (SQLException throwables) { try { conn.rollback(); } catch (SQLException e) { e.printStackTrace(); } throwables.printStackTrace(); } } private void innit() { try { conn = DriverManager.getConnection("jdbc:mysql://192.168.65.102:3306/clouddis?serverTimezone=UTC", "root", "root"); conn.setAutoCommit(false);//事务提交 pstmt = conn.prepareStatement(sql); } catch (SQLException throwables) { throwables.printStackTrace(); } } public void addFileRanking(int count, String name) { if (conn == null || pstmt == null) { innit(); } if (conn == null || pstmt == null) { return; } try { pstmt.setInt(1, count); pstmt.setString(2, name); //批处理操作 pstmt.addBatch(); if (count++ % 5 == 0) { pstmt.executeBatch(); conn.commit(); pstmt.clearBatch(); } } catch (SQLException throwables) { throwables.printStackTrace(); } try { pstmt.executeBatch(); conn.commit(); pstmt.clearBatch(); } catch (SQLException throwables) { throwables.printStackTrace(); try { conn.rollback(); } catch (SQLException e) { e.printStackTrace(); } } } public void close() { if (conn != null) { try { conn.setAutoCommit(true); conn.close(); } catch (SQLException throwables) { throwables.printStackTrace(); } } } }
mapper代码
package com.xrl.dcmsgmapreduce.fileranking; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import java.io.IOException; import java.text.ParseException; import java.text.SimpleDateFormat; /** * @author xrl * @create 2022-07-04 11:36 */ class FileRankingMapper extends Mapper<LongWritable, Text, Text, IntWritable> { IntWritable iw = new IntWritable(1); SimpleDateFormat format = new SimpleDateFormat("yyyy MM dd"); @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String[] split = value.toString().trim().split(","); if (split.length > 5 && split[5] != null) { context.write(new Text(split[5]), iw); } } //TODO:利用FileSplit inputSplit = (FileSplit)context.getInputSplit(); // String filename= inputSplit.getPath().getName();获取文件名称 // 找出访问的文件名称后缀的最大值然后存储到hdfs中,每次读取文件之前先去hdfs中取得 // 文件最大值,拼接出文件名,只读取大于这个值的文件,来实现读取每次生成的文件内容统计 }
reduce
package com.xrl.dcmsgmapreduce.fileranking; import com.xrl.dao.FileRankingDao; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; import java.io.IOException; /** * @author xrl * @create 2022-07-04 11:37 */ class FileRankingReduce extends Reducer<Text, IntWritable, IntWritable, Text> { FileRankingDao dao = new FileRankingDao(); @Override protected void setup(Context context) throws IOException, InterruptedException { dao.formatting(); } @Override protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { int sum = 0; for (IntWritable value : values) { sum += value.get(); } dao.addFileRanking(sum, key.toString()); context.write(new IntWritable(sum), key); } @Override protected void cleanup(Context context) throws IOException, InterruptedException { dao.close(); } }
main
package com.xrl.dcmsgmapreduce.fileranking; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.CombineTextInputFormat; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import java.io.IOException; /** * @author xrl * @create 2022-07-04 10:48 */ public class FileRankingMapReduce { public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException { Configuration conf = new Configuration(); Job job = Job.getInstance(conf); //指定jar的类 job.setJarByClass(FileRankingMapper.class); job.setJarByClass(FileRankingReduce.class); //更改切片机制,合并小文件 job.setInputFormatClass(CombineTextInputFormat.class); //虚拟存储切片最大值设置 20m CombineTextInputFormat.setMaxInputSplitSize(job, 20971520); //指定类 job.setMapperClass(FileRankingMapper.class); job.setReducerClass(FileRankingReduce.class); //输出类型 job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(IntWritable.class); job.setOutputKeyClass(IntWritable.class); job.setOutputValueClass(Text.class); //设置输入输出路径 /* FileInputFormat.addInputPath(job, new Path("data/dcrequestmsg")); Path path = new Path("data/FileRanking"); */ FileInputFormat.addInputPath(job, new Path(args[0])); Path path = new Path(args[1]); FileSystem fs = FileSystem.get(conf); if (fs.exists(path)) { fs.delete(path, true); } FileOutputFormat.setOutputPath(job, path); System.exit(job.waitForCompletion(true) ? 0 : 1); } }
#修改时区
default.timezone.id=Asia/Shanghai
azkaban.webserver.url=http://hadoop102:8081
executor.port=12321
#数据库类型
database.type=mysql
#端口
mysql.port=3306
#数据库主机
mysql.host=hadoop102
#数据库实例名
mysql.database=azkaban
mysql.user=root
mysql.password=root
/opt/module/azkaban/azkaban-web/conf/azkaban.properties
default.timezone.id=Asia/Shanghai
jetty.port=8081
database.type=mysql
mysql.port=3306
mysql.host=hadoop102
mysql.database=azkaban
mysql.user=root
mysql.password=root
azkaban.executorselector.filters=StaticRemainingFlowSize,CpuStatus
/opt/module/azkaban/azkaban-web/conf/azkaban-users.xml
<azkaban-users>
<user groups="azkaban" password="azkaban" roles="admin" username="azkaban"/>
<user password="metrics" roles="metrics" username="metrics"/>
<user password="xrl200166" roles="admin,metrics" username="root"/>
<role name="admin" permissions="ADMIN"/>
<role name="metrics" permissions="METRICS"/>
</azkaban-users>
#flume-item.job
type=command
command=sh /opt/item/flume/flume-cloudddisk.sh
mrfileranking.job
#myfileranking.job
type=command
dependencies=sqoopfileranking
command=sh /opt/item/mapreduce/mrfileranking.sh
sqoopfileranking.job
#sqoopfileranking.job
type=command
command=sh /opt/item/sqoop/sqoop_fileranking.sh
sqoop.job
#sqoop.job
type=command
command=sh /opt/item/sqoop/sqoop_incremental.sh
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。