当前位置:   article > 正文

使用flink将kafka数据同步到delta数据湖中_flink-connector-kafka_2.12

flink-connector-kafka_2.12

使用flink将kafka数据同步到delta中

1.简介

上篇文章简单实现了mysql数据使用flink同步到delta中,现在写一个关于kafka

  • Flink 1.13.0
  • delta 1.0.0
  • flink-connector-kafka_2.12 1.13.0

2.Kafka入湖代码

2.1 Flink运行环境

设置下checkpoint的时间大小

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(5000);
  • 1
  • 2

2.2 构建KafkaSouce

构建kafkasource,需要用到topic和consumer,指定key和value的序列化

public static Properties getProperties(String topic, String consumer){
    Properties properties = new Properties();
    properties.put("bootstrap.servers", consumer);
    properties.put("group.id", topic);
    properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
    properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
    return properties;
}
//一行代码就可以实现
FlinkKafkaConsumer<String> source = new FlinkKafkaConsumer<>(topic, new SimpleStringSchema(), properties);
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10

2.3 Kafka消息的Schema转变成Flink-RowType

同mysql一样,使用flink将数据入湖时,需要将kafka的数据格式进行转换成Flink的RowType

通过RowType.RowField实现,这里我的kafka消息由四个字段的数据组成

public static RowType getKafkaRowType(){
    return new RowType(Arrays.asList(
        new RowType.RowField("userId", new VarCharType(VarCharType.MAX_LENGTH)),
        new RowType.RowField("stationTime", new VarCharType(VarCharType.MAX_LENGTH)),
        new RowType.RowField("score", new IntType()),
        new RowType.RowField("localTime", new VarCharType(VarCharType.MAX_LENGTH))));
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7

2.4 构建Sink

使用delta-flink依赖中的DeltaSink
.forRowData()方法,指定lakePath,hadoop-conf,rowType,生成Sink

public static org.apache.hadoop.conf.Configuration getHadoopConf() {
    org.apache.hadoop.conf.Configuration conf = new org.apache.hadoop.conf.Configuration();
    conf.set("parquet.compression", "SNAPPY");
    return conf;
}

public static DeltaSink<RowData> createDeltaSink(String deltaTablePath, RowType rowType) {
    return DeltaSink
        .forRowData(
        new Path(deltaTablePath),
        getHadoopConf(),
        rowType).build();
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13

2.5 String转为RowData

Source端使用String类型,Sink端使用RowData类型,所以需要使用Map函数进行一次转换。

使用fastJson获取每个字段的值,然后变成Flink row类型,最后使用convertor转换为RowData

//存在于flink-table-runtime-blink_2.12依赖中 
public static final DataFormatConverters.DataFormatConverter<RowData, Row> CONVERTER =
    DataFormatConverters.getConverterForDataType(
    TypeConversions.fromLogicalToDataType(getKafkaRowType())
);
public static RowData kafkaJsonToRowData(String line){
    String userId = JSON.parseObject(line).getString("user_id");
    String stationTime = JSON.parseObject(line).getString("station_time");
    Integer score = JSON.parseObject(line).getInteger("score");
    String localTime = JSON.parseObject(line).getString("local_time");
    Row row = Row.of(userId, stationTime, score, localTime);
    return CONVERTER.toInternal(row);
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13

2.6 执行

依次将source,sink放入env中执行即可

env.addSource(source)
    .setParallelism(2)
    .map(FlinkDeltaUtil::kafkaJsonToRowData)
    .sinkTo(FlinkDeltaUtil.createDeltaSink(lakePathNoPartition, FlinkDeltaUtil.getKafkaRowType()))
    .setParallelism(1);
env.execute("Flink-Read-Kafka-Json-To-Delta");
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6

2.7 kafka脚本

附上一个向kafka发送消息的脚本,由python实现,指定topic和kafka-server的ip就可以发送

# coding=utf-8
import json
import random
import time
import codecs
from kafka import KafkaProducer


log_file = "/opt/access.log"
topic = 'topic'
kafka_server = 'ip'

user_count = 100
log_count = 300
ip = [127, 156, 222, 105, 24, 192, 153, 127, 31, 168, 32, 10, 82, 77, 118, 228]
status_code = ("200",)
url_count = 10
content_uri_pattern = '/nanHu/contents/{content_id}?user_id={user_id}'


# 随机生成时间
def sample_time():
    return time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())


# 随机生成用户
def sample_users():
    # 假设有1000W注册用户,每日访问用户10W-50W人
    all_users = range(1, user_count)
    user_cont = random.randint(10, 50)
    users = random.sample(all_users, user_cont)
    return users


# 随机生成ip
def sample_ip():
    random_ip = random.sample(ip, 4)
    return ".".join([str(item) for item in random_ip])


# 随机生成状态码
def sample_status_code():
    return random.sample(status_code, 1)[0]


# 随机生成停留时间
def sample_station_time():
    return random.randint(15, 60)


# 随机生成分数
def sample_score():
    return random.randint(30, 100)


def generate_log(count=10):
    time_str = sample_time()
    users = sample_users()
    print('Start generate [%s] log..' % log_count)
    producer = KafkaProducer(bootstrap_servers=kafka_server)

    with codecs.open(log_file, "a+", encoding='utf-8') as f:

        while count >= 1:
            # 随机选择一个用户
            user_id = random.choice(users)
            sample_content_id = str(random.randint(0, url_count))

            ret_url = content_uri_pattern.format(
                content_id=sample_content_id,
                user_id=user_id
            )
            query_log = u"{ip} [{local_time}] \"GET {url} HTTP/1.1\" {status_code}".format(
                url=ret_url,
                ip=sample_ip(),
                status_code=sample_status_code(),
                local_time=time_str
            )
            f.write(query_log + u'\n')

            event_log = {
                "station_time": str(sample_station_time()),
                "user_id": user_id,
                "score": sample_score(),
                "local_time": time_str
            }

            producer.send(topic, json.dumps(event_log).encode('utf-8'))
            if count % 100 == 0:
                print('generate msgs: [%s]' % count)
            count = count - 1

    producer.close()
    print('Finish generate log [%s]' % log_count)


if __name__ == '__main__':
    try:
        generate_log(log_count)
    except Exception as e:
        print(str(e))
        exit(-1)
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87
  • 88
  • 89
  • 90
  • 91
  • 92
  • 93
  • 94
  • 95
  • 96
  • 97
  • 98
  • 99
  • 100
  • 101
  • 102

3. 源码

仓库地址 (https://gitee.com/zhiling-chen/demo-mysql-flink-delta)

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/blog/article/detail/44078
推荐阅读
  • 文章浏览阅读1w次,点赞32次,收藏100次。我们知道,kafka中每个topic被划分为多个分区,那么生产者将消息发送到topic时,具体追加到哪个分区呢?这就是所谓的分区策略,Kafka为我们提供了默认的分区策略,同时它也支持自定义分区... [详细]

  • 这里我们的配置是选择的是KRaft,因为Kafka官方已经计划在Kafak中移除Zookeeper。对于UI配置项没什么特别要说的,这里只是提一下,注意这里的docker-compose.yml中environment的写法,和上面的Kaf... [详细]

  • Flink提供了丰富的客户端操作来提交任务和与任务进行交互。下面主要从Flink命令行、SQLClient和Web五个方面进行整理。在Flink安装目录的bin目录下可以看到flink和等文件,这些都是客户端操作的入口。Flink客户端操作... [详细]

  • kafka分布式的情况下,如何保证消息的顺序消费?_kafka顺序消费kafka顺序消费目录一、什么是分布式二、kafka介绍三、消息的顺序消费四、如何保证消息的顺序消费 一、什么是分布式分布式是指将计算任务分散到多个计算节点上进... [详细]

  • 在讲Kafka日志源码之前,我们要先对Kafka日志有一个大体的认识这也是阅读源码的关键,一步一步来前面我们聊到了Kafka的生产端的整体架构可以看到,我们每一个Topic都可以分为多个Partition,而每一个Partition对应着一... [详细]

  • 更准确地说,数据记录中的value被解释为同一key的最后一个value的UPDATE,如果有这个key(如果不存在相应的key,则该更新被视为INSERT)。总之,这段代码的作用是通过Kafka连接器创建两个表,并将"pageviews"... [详细]

  • Flink 输出至 Elasticsearch
    Flink输出至Elasticsearch。Flink输出至Elasticsearch【1】引入pom.xml依赖<dependency><groupId>org.apache.flink</groupId&g... [详细]

  • 都是从中scan出来的。而这个myTable又是我们注册进去的。问题就是有哪些方式可以注册Table。类似于上述的WordCount,指定一个文件系统fs,也可以是kafka等,还需要一些格式和Schema等。//将source注册到env... [详细]

  • Flink CDC、OGG、Debezium等基于日志开源CDC方案对比_flink cdc 2.4不同
    CDC的全称是,在广义的概念上,只要能捕获数据变更的技术,我们都可以称为CDC。我们目前通常描述的CDC技术主要面向数据库的变更,是一种用于捕获数据库中数据变更的技术。_flinkcdc2.4不同flinkcdc2.4不同先上一张图,后面再... [详细]

  • 状态精准一次是Flink流式计算引擎的一大特色,本章节从状态、状态后端、checkpoint算法逐步为读者展示了Flink状态机制设计的绝妙之处。深入理解Flink(二)FlinkStateBackendCheckpoint容错深入分析... [详细]

  • 2-1启动gateway之前先启动一个flinksession./bin/yarn-session.sh-d2-2启动命令:2-3查看日志观察是否启动成功:查看日志出现这个条信息就证明已经找到了flinksessionapplication... [详细]

  • Flink 的时间属性及原理解析
    FlinkAPI大体上可以划分为三个层次:处于,这三层中每一层都非常依赖于时间属性时间FlinkAPI。在这一层中因为封装方面原因,我们能够接触到时间地方不是很多,所以我们将重点放在底层和最上层Flink时间属性及原理解... [详细]

  • flink读取文件数据写到elasticsearch_flink写入esflink写入es前言es是大数据存储的必备中间件之一,通过flink可以读取来自日志文件,kafka等外部数据源的数据,然后写入es中,本篇将通过实例演示下完整的操... [详细]

  • 文章目录ElasticSearch是一个分布式的开源搜索和分析引擎,适用于所有类型的数据。ElasticSearch有着简洁的REST风格的API,以良好的分布式特性、速度和可扩展性而闻名,在大数据领域应用非常广泛。Flink为Elasti... [详细]

  • Flink数据下沉到Elasticsearch示例简介  当初做课程设计的时候,找到的flink接入elasticsearch的文章除了flink的文档示例之外版本都挺老的,所以自己按照flink的文档把原来的改造了一下。现再更新最新版本... [详细]

  • Elasticsearch连接器的使用与JDBC连接器非常相似,写入数据的模式同样是由创建表的DDL中是否有主键定义决定的。​除了所有ROW类型的字段(对应着HBase中的family),表中还应有一个原子类型的字段,它就会被识别为HBas... [详细]

  • 模式【1】;生成,然后转化为JobGraph;【2】依次启动三者都服从分布式协同一致的策略;将JobGraph转化为,然后转化为物理执行任务Execution,然后进行deploydeploy过程会向请求slot,如果有直接deploy到对... [详细]

  • 除了使用已有的所定义的数据格式类型之外,用户也可以自定义实现,来满足的不同的数据类型定义需求。Flink提供了可插拔的让用户将自定义的注册到Flink类型系统中。如下代码所示只需要通过实现接口,返回相应的类型信息。通过@TypeInfo注解... [详细]

  • 在分布式系统中,消息的丢失、错乱不可避免,这些问题会在分布式系统的组件中引入不一致状态,如果没有定消息,那么组件无法从这些不一致状态中恢复。作为分布式数据处理框架,Flink提供了支撑流计算和批计算的接口,同在此基础之上抽象出不同的... [详细]

  • 大家都应该清楚Task和StreamTask两个概念,Task是直接受TaskManager管理和调度的,而Task又会调用StreamTask,而StreamTask中真正封装了算子的处理逻辑。在run()方法中,首先将反序列化后的数据封... [详细]

相关标签
  

闽ICP备14008679号