当前位置:   article > 正文

【数据平台】之Kafka+Minio数据埋点大数据利器_把kafka连接minio

把kafka连接minio

商城埋点数据,即客户在商城的购买行为的各种浏览数据,是对客户购买行为分析的重要资产。在大数据领域,可以通过收集客户行为数据,分析客户行为规律,进而指导商城各种营销活动制定。例如淘宝和京东展示的各种维度分析报表,可展现客户购买偏好,浏览商品偏好,地域性偏好等;另一个重要应用便是商城“商品推荐”,商城商品推荐便时对客户浏览和购买行为综合分析的结果,在亚马逊“商品推荐”能为企业商城来带40%左右的营收。基于此,对客户行为数据保存就显得至关重要。

流程图

kafka+minio+connector s3
1、通过kafka实时接收商城端行为(埋点)数据;
2、通过kafka插件connect-s3将数据转换为文本格式数据;
3、数据存储到文件存储Minio集群;
以上是通过3个步骤,便可将商城的客户行为数据,最终转换落地存储到文件系统MINIO集群。

1、Kafka部署

以kafka_2.12-2.4.x版本为例,先解压到/appuser/kafka_2.12-2.4.x目录,并说明kafka部署,部署集群节点为:

192.168.0.1
192.168.0.2
192.168.0.3
  • 1
  • 2
  • 3

1、系统参数修改

  • vi /etc/sysctl.conf增加如下配置项
 vm.swappiness=5
 net.core.wmem_default=256960
 net.core.rmem_default=256960
 vm.max_map_count=262144

 sysctl -p 重启使其生效;
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • vi /proc/sys/vm/dirty_background_ratio
    设置值为5
  • vi /etc/security/limits.conf
 *soft nofile 204800
 *hard nofile 204800
 *soft nproc 204800
 *hard nproc 204800
  • 1
  • 2
  • 3
  • 4

2、找到config目录下的server.properties文件

broker.id=1                     #三台机器分别为1、2、3
zookeeper.connect=192.168.0.1,192.168.0.2,192.168.0.3
listeners=PLAINTEXT://host:9092  #host为当前服务的实际IP地址
num.partitions=2                 #默认分区数为1,可根据实际调整
log.dirs=/appuser/data
num.network.threads=3
socket.send.buffer.bytes=1048576
socket.receive.buffer.bytes=1048576
log.flush.interval.messages=15000
log.flush.interval.ms=3000
replica_lag_max_messages=7000
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11

3、配置启动jvm参数
修改vi kafka-server-start.sh

export KAFKA_HEAP_OPTS="-Xmx4G -Xms4G -server -XX:+UseG1GC -XX:InitiatingHeapOccupancyPercent=70 -XX:G1RSetUpdatingPauseTimePercent=5 -XX:MAXGCPauseMillis=500 -XX:ParallelGCThreads=16 -XX:ConcGCThreads=16"
  • 1

其他可选参数:

 -XX: +PrintGCDetails
 -XX: +PrintGCDatestamps
 XX: +PrintHeapAtGC
 -XX: +PrintTenuringDistribution
 -XX: +PrintGCApplicationStoppedTime
 -XX: +PrintPromotionFailure
 #-XX: PrintFLSStatistics=1
 #-Xloggc: /applog/gc.log
 #-XX:HeapDumpPath:/heapdump
 -XX: +UseGCLogFileRotation
 -XX: NumberOfGCLogFiles=10
 -XX: GCLogFileSize=10M
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12

4、修改默认日志文件输出路径
配置kafka-run-class.sh,在文件最前面增加内容:LOG_DIR=/appuser/logs
修改日志级别:log4j.properties

1.1、启动

执行命令
./bin/kafka-server-start.sh -daemon config/server.properties

1.2、常用操作
  • 进入任意节点,创建topic
    ./bin/kafka-topics.sh --bootstrap-server 192.168.0.1:9092 --create --topic s3-minio --partitions 5 --replication-factor 2
    topic名称:s3-minio
    分区数:5
    副本数:2
  • 查看topic
    ./bin/kafka-topics.sh --bootstrap-server 192.168.0.1:9092 --list #查看集群可用的topics
    ./bin/kafka-topics.sh --bootstrap-server 192.168.0.1:9092 --describe #查看topics详情
    ./bin/kafka-topics.sh --bootstrap-server 192.168.0.1:9092 --describe --topic s3-minio #查看具体topic详情
  • 修改topic
    ./bin/kafka-topics.sh --bootstrap-server 192.168.0.1:9092 --alter --partitions n --topic s3-minio #修改s3-minio的分区数
    ./bin/kafka-reassian-partitions.sh --zookeeper zk集群地址 --reassignment-json-file decrease-replica-factor.json --execute #修改topic分区副本数
	decrease-replica-factor.json的json格式如下

	{"version":1,
	"partitions":[
		{"topic":"s3-minio","partition":0,"replicas":[1,2]},
		{"topic":"s3-minio","partition":1,"replicas":[1,3]}
		]
	}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 删除topic
    ./bin/kafka-topics.sh --bootstrap-server 192.168.0.1:9092 --delete --topic s3-minio
  • Producer发送消息
    ./bin/kafka-console-producer.sh --broker-list 192.168.0.1:9092 --topic s3-minio
  • Consumer消费消息
    ./bin/kafka-console-consumer.sh --bootstrap-server 192.168.0.1:9092 --topic s3-minio
    #从开始处进行消费消息
    ./bin/kafka-console-consumer.sh --bootstrap-server 192.168.0.1:9092 --topic s3-minio --from-beginning

2、connect-s3插件部署

下载confluent-connect-s3,下载地址地址,下载confluentinc-kafka-connect-s3-5.5.x.zip完成后进行解压
mkdir -r /appuser/kafka_2.12-2.4.x/plugins 创建文件夹
将文件解压到/appuser/kafka_2.12-2.4.x/plugins/kafka-connect-s3-5.5.x
1、配置connect-distributed-s3.properties文件
找到connect-distributed.properties文件,执行 cp connect-distributed.peroperties connect-distributed-s3.properties命令,拷贝一份文件。

vi connect-distributed-s3.properties

bootstrap.servers=192.168.0.1:9092,192.168.0.2:9092,192.168.0.3:9092
plugin.path=/appuser/kafka_2.12-2.4.x/plugins/kafka-connect-s3-5.5.x
  • 1
  • 2
  • 3
  • 4
2.1 启动connect-s3

bin/kafka-distributes.sh -daemon config/connect-distributed-s3.properties

2.2 部署验证connect-s3

应能正常访问:http://host:8083/connectors

其中host可以是任意节点IP地址:192.168.0.1,192.168.0.2,192.168.0.3

2.3 创建s3的sink插件

为了完成kafka和minio之间的数据转换,需要创建s3的sink插件,通过插件将实时流数据转换为文本数据存储到文件系统,参考资料详细参考资料,以下以JSON格式埋点数据为例:

  • 创建s3-sink
    最大tasks.max和 connector分区数保持一致,192.168.0.1为kafka服务器IP地址,可以是任意节点的IP地址。
    f5-host:为minio集群的F5地址或者ng的地址;
    flush.size:标识多少数据量生成一个文件;
    aws.access.key.id:为minio集群设置用户名;
    aws.secret.access.key:为minio集群设置密码;
    s3.bucket.name:为minio集群设置的bucket名称;
 curl -X POST -H "Content-Type:application/json" \--data'{
"name":s3-sink",
"config":{
 "name":"s3-sink",
 "connector.class":"io.confluent.connects3.S3SinkConnector",
 "tasks.max"5",
 "topics":"s3-minio",
 "topics.dir":"",
 "s3.bucket.name":"s3-bucket",
 "s3.part.size:"26214400",
 "flush.size":"4500",
 "store.url":"http://f5-host:9000",
 "aws.access.key.id":"minio",
 "aws.secret.access.key":"minio",
 "key.converter":"org.apache.kafka.connect.storage.StringConverter",
 "key.converter.schemas.enable":"false",
 "value.converter":"org.apache.kafka.connect.json.JsonConverter",
 "value.converter.schemas.enable":"false",
 "storage.class":"io.confluent.connect.s3.storage.S3Storage",
 "schema.compatibility":"NONE",
 "schemas.enable":"false",
 "format.class":"io.confluent.connect.s3.format.json.JsonFormat",
 "partitioner.class": "io.confluent.connect.storage.partitioner.Time.BasedPartitioner",
 "locale":"zh-CN",
 "timezone":"Asia/Shanghai",
 "timestamp.extractor":"Record",
 "path.format":"YYYY-MM-dd",
 "partition.duration.ms":"600000"
}
}'\http://192.168.0.1:8083/connectors
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 修改s3-sink
建议最tasks max和 connector分区数保持一致,f5-host为minio集群的F5地址或者ng的地址。

 curl -X PUT -H "Content-Type:application/json" \--data'{
 "name":"s3-sink",
 "connector.class":"io.confluent.connects3.S3SinkConnector",
 "tasks.max"5",
 "topics":"s3-minio",
 "topics.dir":"",
 "s3.bucket.name":"s3-bucket",
 "s3.part.size:"26214400",
 "flush.size":"4500",
 "store.url":"http://f5-host:9000",
 "aws.access.key.id":"minio",
 "aws.secret.access.key":"minio",
 "key.converter":"org.apache.kafka.connect.storage.StringConverter",
 "key.converter.schemas.enable":"false",
 "value.converter":"org.apache.kafka.connect.json.JsonConverter",
 "value.converter.schemas.enable":"false",
 "storage.class":"io.confluent.connect.s3.storage.S3Storage",
 "schema.compatibility":"NONE",
 "schemas.enable":"false",
 "format.class":"io.confluent.connect.s3.format.json.JsonFormat",
 "partitioner.class": "io.confluent.connect.storage.partitioner.Time.BasedPartitioner",
 "locale":"zh-CN",
 "timezone":"Asia/Shanghai",
 "timestamp.extractor":"Record",
 "path.format":"YYYY-MM-dd",
 "partition.duration.ms":"600000"
}'\http://192.168.0.1:8083/connectors/s3-sink/config|jq
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 删除s3-sink
    curl -X DELETE http://host:8083/connectors/s3-sink
  • 检查s3-sink
    查看工作状态:http://host:8083/connectors/s3-sink/status
    查看配置:http://host:8083/connectors/s3-sink/config

通过以上部署步骤,即可完成kafka,kafka-connect-s3插件部署;

3、MINIO部署

请参考我的另一篇文章:【商城】Minio+ImgProxy商城图片一站式处理

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/盐析白兔/article/detail/712023
推荐阅读
相关标签
  

闽ICP备14008679号