赞
踩
该方法创建了一个PrintSinkFunction 操作,然后作为addSink方法的参数
PrintSinkFunction这个类继承自RichSinkFunction富函数类
因此就可以调用富函数类(是一个实现类)的声明周期方法,例如open,close,以及获取运行时上下文,运行环境,定义状态等等
可以调用DataStream的addSink方法
然后传入自己实现的SinkFunction
继承RichSinkFunction类,并实现CheckpointedFunction,CheckpointListener(检查点)
在StreamingFileSink类中调用forRowFormat()方法传入Path以及Encoder返回StreamingFileSink.DefaultBulkFormatBuilder,DefaultBulkFormatBuilder是一个静态类并继承RowFormatBuilder类,RowFormatBuilder类又继承BucketsBuilder类,底层将数据写入bucket(桶),桶里面分大小存储分区文件,实现了分布式存储
使用Builder构建器构建
RowFormatBuilder是行编码
BulkFormatBuilder是列存储编码格式
关系图
代码
public class SinkToFileTest { public static void main(String[] args) throws Exception{ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(4); DataStreamSource<Event> stream = env.fromElements(new Event("Mary", "./home", 1000L), new Event("Bob", "./cart", 2000L), new Event("Alice", "./prod?id=100", 3000L), new Event("Bob", "./prod?id=1", 3300L), new Event("Alice", "./prod?id=200", 3000L), new Event("Bob", "./home", 3500L), new Event("Bob", "./prod?id=2", 3800L), new Event("Bob", "./prod?id=3", 4200L)); //2.为了得到并传入SinkFunction,需要构建StreamingFileSink的一个对象 //调用forRowFormat方法或者forBulkformat方法得到一个DefaultRowFormatBuilder // 其中forBulkformat方法前面还有类型参数,以及传参要求一个目录名称,一个编码器 //写入文件需要序列化,需要定义序列化方法并进行编码转换,当成Stream写入文件 //然后再使用builder创建实例 StreamingFileSink<String> streamingFileSink = StreamingFileSink.<String>forRowFormat(new Path("./output"),new SimpleStringEncoder<>("UTF-8")) .withRollingPolicy(//指定滚动策略,根据事件或者文件大小新产生文件归档保存 DefaultRollingPolicy.builder()//使用builder构建实例 .withMaxPartSize(1024 * 1024 * 1024) .withRolloverInterval(TimeUnit.MINUTES.toMinutes(15))//事件间隔毫秒数 .withInactivityInterval(TimeUnit.MINUTES.toMinutes(15))//当前不活跃的间隔事件,隔多长事件没有数据到来 .build() ) .build(); //1.写入文件调用addSink()方法,并传入SinkFunction stream .map(data -> data.toString())//把Event类型转换成String .addSink(streamingFileSink); env.execute(); } }
构造FlinkKafkaProducer类传入三个参数:brokerList(主机+端口号)和topicId(topic)以及serializationSchema(编码序列化)完成构造
public class SinkToKafka { public static void main(String[] args) throws Exception{ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); //1.从kafka中读取数据 Properties properties = new Properties(); properties.setProperty("bootstrap.servers","hadoop2:9092"); properties.setProperty("group.id", "consumer-group"); DataStreamSource<String> kafkaStream = env.addSource( new FlinkKafkaConsumer<String>("clicks", new SimpleStringSchema(), properties)); //2.用flink进行简单的etl处理转换 SingleOutputStreamOperator<String> result = kafkaStream.map(new MapFunction<String, String>() { @Override public String map(String value) throws Exception { String[] fields = value.split(","); return new Event(fields[0].trim(), fields[1].trim(), Long.valueOf(fields[2].trim())).toString(); } }); //3.结果数据写入kafka //FlinkKafkaProducer传参borckList,topicid,序列化 result.addSink(new FlinkKafkaProducer<String>( "hadoop2:9092","events",new SimpleStringSchema())); env.execute(); } }
<dependency>
<groupId>org.apache.bahir</groupId>
<artifactId>flink-connector-redis_2.11</artifactId>
<version>1.0</version>
</dependency>
RedisSink类继承自RichSinkFunction
去调构造方法,传入redis集群的配置FlinkJedisConfigBase以及RedisMapper写入命令
new FlinkJedisConfigBase的时候,可以使用FlinkJedisPoolConfig没毛病,直接继承的FlinkJedisConfigBase
FlinkJedisConfigBase是一个接口
实例FlinkJedisPoolConfig的时候也是使用的构造器Builder()的设计模式即,同样再使用.build实例它
RedisMapper是一个接口
自定义一个实现类并重写方法getCommandDescription(),getKeyFromData(Event data),getValueFromData(Event data)
public class SinkToRedis { public static void main(String[] args) throws Exception{ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); //1.输入ClickSource是自定义输入 DataStreamSource<Event> stream = env.addSource(new ClickSource()); //2.创建一个jedis连接配置 //FlinkJedisPoolConfig直接继承的FlinkJedisConfigBase FlinkJedisPoolConfig config = new FlinkJedisPoolConfig.Builder() .setHost("hadoop2") .build(); //3.写入redis stream.addSink(new RedisSink<>(config,new MyRedisMapper())); env.execute(); } //3.自定义类实现 redisMapper接口 public static class MyRedisMapper implements RedisMapper<Event>{ @Override //返回一个redis命令的描述 public RedisCommandDescription getCommandDescription() { return new RedisCommandDescription(RedisCommand.HSET,"clicks");//写入哈希表 } @Override //把key定义成user public String getKeyFromData(Event data) { return data.user; } @Override //把value定义成url public String getValueFromData(Event data) { return data.url; } } }
运行redis
[hadoop1@hadoop2 redis]$ ./src/redis-server
[hadoop1@hadoop2 bin]$ pwd
/usr/local/bin
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-elasticsearch6_${scala.binary.version}</artifact
Id>
<version>${flink.version}</version>
</dependency>
ElasticsearchSink类继承ElasticsearchSinkBase抽象类,ElasticsearchSinkBase抽象类继承RichSinkFunction接口
ElasticsearchSink类调用Builder()传入参数是List和ElasticsearchSinkFunction
HttpHost需要参数主机名和端口号
是一个接口,写一个实现类重写他的方法,写入逻辑
public class SinToES { public static void main(String[] args) throws Exception{ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); //1.输入 DataStreamSource<Event> stream = env.fromElements(new Event("Mary", "./home", 1000L), new Event("Bob", "./cart", 2000L), new Event("Alice", "./prod?id=100", 3000L), new Event("Bob", "./prod?id=1", 3300L), new Event("Alice", "./prod?id=200", 3000L), new Event("Bob", "./home", 3500L), new Event("Bob", "./prod?id=2", 3800L), new Event("Bob", "./prod?id=3", 4200L)); //2.定义hosts的列表 ArrayList<HttpHost> httpHosts = new ArrayList<>(); httpHosts.add(new HttpHost("hadoop",9200)); //3.定义ElasticsearchSinkFunction<T>,是个接口,重写process方法 //向es发送请求,并插入数据 ElasticsearchSinkFunction<Event> elasticsearchSinkFunction = new ElasticsearchSinkFunction<Event>() { @Override //输入,运行上下文,发送任务请求 public void process(Event element, RuntimeContext ctx, RequestIndexer indexer) { HashMap<String, String> map = new HashMap<>(); map.put(element.user, element.url); //构建一个indexrequest IndexRequest request = Requests.indexRequest() .index("clicks") .type("types") .source(map); indexer.add(request); } }; //4.写入es //传入参数是List<HttpHost>和ElasticsearchSinkFunction<T> stream.addSink(new ElasticsearchSink.Builder<>(httpHosts,elasticsearchSinkFunction).build()); env.execute(); } }
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-jdbc_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>5.1.47</version>
</dependency>
无继承,无实现
定义了sink方法,三个参数,sql,JdbcStatementBuilder构造,JdbcConnectionOptions等sql的连接配置,然后返回SinkFunction
JdbcStatementBuilder是个接口,实现了BiConsumerWithException接口
单一抽象方法accept(),lambda使用
构造器私有,因此调用JdbcConnectionOptionsBuilder.build()进行实例化
public class SinkToMysql { public static void main(String[] args) throws Exception{ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); //1.输入 DataStreamSource<Event> stream = env.fromElements(new Event("Mary", "./home", 1000L), new Event("Bob", "./cart", 2000L), new Event("Alice", "./prod?id=100", 3000L), new Event("Bob", "./prod?id=1", 3300L), new Event("Alice", "./prod?id=200", 3000L), new Event("Bob", "./home", 3500L), new Event("Bob", "./prod?id=2", 3800L), new Event("Bob", "./prod?id=3", 4200L)); //三个参数,sql,JdbcStatementBuilder构造,JdbcConnectionOptions等sql的连接配置 stream.addSink(JdbcSink.sink( "INSERT INTO clicks (user,url) VALUES(?,?)", ((statement,event)->{ statement.setString(1,event.user); statement.setString(2,event.url); }), new JdbcConnectionOptions.JdbcConnectionOptionsBuilder() .withUrl("jdbc:mysql://localhost:3306/test2") .withDriverName("com.mysql.jdbc.Driver") .withUsername("root") .withPassword("123456") .build() )); env.execute(); } }
mysql> create table clicks(
-> user varchar(20) not null,
-> url varchar(100) not null);
Query OK, 0 rows affected (0.02 sec)
调用DataStream的addSink()方法,并传入自定义好的SinkFunction(采用富函数类),重写关键方法invoke(),并且重写富函数类的生命周期相关方法open和close
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-client</artifactId>
<version>${hbase.version}</version>
</dependency>
略
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。