赞
踩
<dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-mongodb</artifactId> <version>1.1.0-1.18</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-base</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-scala_2.12</artifactId> <version>1.8.2</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-java</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-core</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-java</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-clients</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.bahir</groupId> <artifactId>flink-connector-redis_2.11</artifactId> <version>1.1.0</version> </dependency>
package com.wfg.flink.connector.redis; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommand; import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommandDescription; import org.apache.flink.streaming.connectors.redis.common.mapper.RedisMapper; /** * @author wfg */ public class RedisExampleMapper implements RedisMapper<Tuple2<String, Integer>> { @Override public RedisCommandDescription getCommandDescription() { return new RedisCommandDescription(RedisCommand.HSET, "HASH_NAME"); } @Override public String getKeyFromData(Tuple2<String, Integer> data) { return data.f0; } @Override public String getValueFromData(Tuple2<String, Integer> data) { return data.f1.toString(); } }
package com.wfg.flink.connector.mongodb; import cn.hutool.core.date.DateUtil; import com.alibaba.fastjson2.JSONObject; import com.wfg.flink.connector.dto.KafkaPvDto; import com.wfg.flink.connector.redis.RedisExampleMapper; import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.common.functions.ReduceFunction; import org.apache.flink.api.common.typeinfo.BasicTypeInfo; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.configuration.MemorySize; import org.apache.flink.connector.mongodb.source.MongoSource; import org.apache.flink.connector.mongodb.source.enumerator.splitter.PartitionStrategy; import org.apache.flink.connector.mongodb.source.reader.deserializer.MongoDeserializationSchema; import org.apache.flink.runtime.state.hashmap.HashMapStateBackend; import org.apache.flink.streaming.api.CheckpointingMode; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.connectors.redis.RedisSink; import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoolConfig; import org.bson.BsonDocument; import static com.wfg.flink.connector.constants.Constants.MONGO_TEST_PV_COLLECTION; /** * @author wfg * 根据名字统计访问次数 */ public class MongoAllNameRedisCounts { public static void main(String[] args) throws Exception { String startTime = DateUtil.format(DateUtil.date(), "yyyy-MM-dd HH:mm:ss"); System.out.println("StartTime:" + startTime); final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // 开启Checkpointing,设置Checkpoint间隔 env.enableCheckpointing(30000); // 设置Checkpoint模式 env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE); // 设置最小Checkpoint间隔 env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500); // 设置最大并发Checkpoint数目 env.getCheckpointConfig().setMaxConcurrentCheckpoints(1); // 使用RocksDB作为状态后端 env.setStateBackend(new HashMapStateBackend()); env.setParallelism(10); // 配置MongoDB源 MongoSource<String> mongoSource = MongoSource.<String>builder() .setUri("mongodb://root:123456@127.0.0.1:27017,127.0.0.1:27018,127.0.0.1:27019/admin?replicaSet=rs0&authSource=admin") .setDatabase("sjzz") .setCollection(MONGO_TEST_PV_COLLECTION) .setFetchSize(2048) // .setLimit(1000) .setNoCursorTimeout(true) .setPartitionStrategy(PartitionStrategy.SINGLE) .setPartitionSize(MemorySize.ofMebiBytes(64)) // .setSamplesPerPartition(10) .setDeserializationSchema(new MongoDeserializationSchema<>() { @Override public String deserialize(BsonDocument document) { return document.toJson(); } @Override public TypeInformation<String> getProducedType() { return BasicTypeInfo.STRING_TYPE_INFO; } }) .build(); // 创建MongoDB数据流 DataStream<String> sourceStream = env.fromSource(mongoSource, WatermarkStrategy.noWatermarks(), "kafka Mongo Source"); // 转换数据,提取人名作为Key DataStream<Tuple2<String, Integer>> nameCountStream = sourceStream .map(new MapFunction<String, Tuple2<String, Integer>>() { @Override public Tuple2<String, Integer> map(String value) throws Exception { KafkaPvDto data = JSONObject.parseObject(value, KafkaPvDto.class); return Tuple2.of(data.getUserName(), 1); } }) .keyBy(value -> value.f0) .reduce(new ReduceFunction<>() { @Override public Tuple2<String, Integer> reduce(Tuple2<String, Integer> value1, Tuple2<String, Integer> value2) { return new Tuple2<>(value1.f0, value1.f1 + value2.f1); } }); FlinkJedisPoolConfig conf = new FlinkJedisPoolConfig.Builder().setHost("127.0.0.1").build(); RedisSink<Tuple2<String, Integer>> sink = new RedisSink<>(conf, new RedisExampleMapper()); nameCountStream.addSink(sink); // 输出结果 env.execute("Flink MongoDB Name Count "); System.out.println("-----------------------------------"); System.out.println("startTime: " + startTime); System.out.println("EndTime: " + DateUtil.format(DateUtil.date(), "yyyy-MM-dd HH:mm:ss")); } }
注意:
大数据生成参考:批量数据生成
中间件部署参考:
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。