赞
踩
上篇文章简单实现了mysql数据使用flink同步到delta中,现在写一个关于kafka的
设置下checkpoint的时间大小
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(5000);
构建kafkasource,需要用到topic和consumer,指定key和value的序列化
public static Properties getProperties(String topic, String consumer){
Properties properties = new Properties();
properties.put("bootstrap.servers", consumer);
properties.put("group.id", topic);
properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
return properties;
}
//一行代码就可以实现
FlinkKafkaConsumer<String> source = new FlinkKafkaConsumer<>(topic, new SimpleStringSchema(), properties);
同mysql一样,使用flink将数据入湖时,需要将kafka的数据格式进行转换成Flink的RowType
通过RowType.RowField实现,这里我的kafka消息由四个字段的数据组成
public static RowType getKafkaRowType(){
return new RowType(Arrays.asList(
new RowType.RowField("userId", new VarCharType(VarCharType.MAX_LENGTH)),
new RowType.RowField("stationTime", new VarCharType(VarCharType.MAX_LENGTH)),
new RowType.RowField("score", new IntType()),
new RowType.RowField("localTime", new VarCharType(VarCharType.MAX_LENGTH))));
}
使用delta-flink依赖中的DeltaSink
.forRowData()方法,指定lakePath,hadoop-conf,rowType,生成Sink
public static org.apache.hadoop.conf.Configuration getHadoopConf() {
org.apache.hadoop.conf.Configuration conf = new org.apache.hadoop.conf.Configuration();
conf.set("parquet.compression", "SNAPPY");
return conf;
}
public static DeltaSink<RowData> createDeltaSink(String deltaTablePath, RowType rowType) {
return DeltaSink
.forRowData(
new Path(deltaTablePath),
getHadoopConf(),
rowType).build();
}
Source端使用String类型,Sink端使用RowData类型,所以需要使用Map函数进行一次转换。
使用fastJson获取每个字段的值,然后变成Flink row类型,最后使用convertor转换为RowData
//存在于flink-table-runtime-blink_2.12依赖中
public static final DataFormatConverters.DataFormatConverter<RowData, Row> CONVERTER =
DataFormatConverters.getConverterForDataType(
TypeConversions.fromLogicalToDataType(getKafkaRowType())
);
public static RowData kafkaJsonToRowData(String line){
String userId = JSON.parseObject(line).getString("user_id");
String stationTime = JSON.parseObject(line).getString("station_time");
Integer score = JSON.parseObject(line).getInteger("score");
String localTime = JSON.parseObject(line).getString("local_time");
Row row = Row.of(userId, stationTime, score, localTime);
return CONVERTER.toInternal(row);
}
依次将source,sink放入env中执行即可
env.addSource(source)
.setParallelism(2)
.map(FlinkDeltaUtil::kafkaJsonToRowData)
.sinkTo(FlinkDeltaUtil.createDeltaSink(lakePathNoPartition, FlinkDeltaUtil.getKafkaRowType()))
.setParallelism(1);
env.execute("Flink-Read-Kafka-Json-To-Delta");
附上一个向kafka发送消息的脚本,由python实现,指定topic和kafka-server的ip就可以发送
# coding=utf-8 import json import random import time import codecs from kafka import KafkaProducer log_file = "/opt/access.log" topic = 'topic' kafka_server = 'ip' user_count = 100 log_count = 300 ip = [127, 156, 222, 105, 24, 192, 153, 127, 31, 168, 32, 10, 82, 77, 118, 228] status_code = ("200",) url_count = 10 content_uri_pattern = '/nanHu/contents/{content_id}?user_id={user_id}' # 随机生成时间 def sample_time(): return time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()) # 随机生成用户 def sample_users(): # 假设有1000W注册用户,每日访问用户10W-50W人 all_users = range(1, user_count) user_cont = random.randint(10, 50) users = random.sample(all_users, user_cont) return users # 随机生成ip def sample_ip(): random_ip = random.sample(ip, 4) return ".".join([str(item) for item in random_ip]) # 随机生成状态码 def sample_status_code(): return random.sample(status_code, 1)[0] # 随机生成停留时间 def sample_station_time(): return random.randint(15, 60) # 随机生成分数 def sample_score(): return random.randint(30, 100) def generate_log(count=10): time_str = sample_time() users = sample_users() print('Start generate [%s] log..' % log_count) producer = KafkaProducer(bootstrap_servers=kafka_server) with codecs.open(log_file, "a+", encoding='utf-8') as f: while count >= 1: # 随机选择一个用户 user_id = random.choice(users) sample_content_id = str(random.randint(0, url_count)) ret_url = content_uri_pattern.format( content_id=sample_content_id, user_id=user_id ) query_log = u"{ip} [{local_time}] \"GET {url} HTTP/1.1\" {status_code}".format( url=ret_url, ip=sample_ip(), status_code=sample_status_code(), local_time=time_str ) f.write(query_log + u'\n') event_log = { "station_time": str(sample_station_time()), "user_id": user_id, "score": sample_score(), "local_time": time_str } producer.send(topic, json.dumps(event_log).encode('utf-8')) if count % 100 == 0: print('generate msgs: [%s]' % count) count = count - 1 producer.close() print('Finish generate log [%s]' % log_count) if __name__ == '__main__': try: generate_log(log_count) except Exception as e: print(str(e)) exit(-1)
仓库地址 (https://gitee.com/zhiling-chen/demo-mysql-flink-delta)
赞
踩
赞
踩
赞
踩
赞
踩
赞
踩
赞
踩
赞
踩
赞
踩
赞
踩
赞
踩
赞
踩
赞
踩
赞
踩
赞
踩
赞
踩
赞
踩
赞
踩
赞
踩
赞
踩
赞
踩
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。