赞
踩
商城埋点数据,即客户在商城的购买行为的各种浏览数据,是对客户购买行为分析的重要资产。在大数据领域,可以通过收集客户行为数据,分析客户行为规律,进而指导商城各种营销活动制定。例如淘宝和京东展示的各种维度分析报表,可展现客户购买偏好,浏览商品偏好,地域性偏好等;另一个重要应用便是商城“商品推荐”,商城商品推荐便时对客户浏览和购买行为综合分析的结果,在亚马逊“商品推荐”能为企业商城来带40%左右的营收。基于此,对客户行为数据保存就显得至关重要。
1、通过kafka实时接收商城端行为(埋点)数据;
2、通过kafka插件connect-s3将数据转换为文本格式数据;
3、数据存储到文件存储Minio集群;
以上是通过3个步骤,便可将商城的客户行为数据,最终转换落地存储到文件系统MINIO集群。
以kafka_2.12-2.4.x版本为例,先解压到/appuser/kafka_2.12-2.4.x目录,并说明kafka部署,部署集群节点为:
192.168.0.1
192.168.0.2
192.168.0.3
1、系统参数修改
vm.swappiness=5
net.core.wmem_default=256960
net.core.rmem_default=256960
vm.max_map_count=262144
sysctl -p 重启使其生效;
*soft nofile 204800
*hard nofile 204800
*soft nproc 204800
*hard nproc 204800
2、找到config目录下的server.properties文件
broker.id=1 #三台机器分别为1、2、3
zookeeper.connect=192.168.0.1,192.168.0.2,192.168.0.3
listeners=PLAINTEXT://host:9092 #host为当前服务的实际IP地址
num.partitions=2 #默认分区数为1,可根据实际调整
log.dirs=/appuser/data
num.network.threads=3
socket.send.buffer.bytes=1048576
socket.receive.buffer.bytes=1048576
log.flush.interval.messages=15000
log.flush.interval.ms=3000
replica_lag_max_messages=7000
3、配置启动jvm参数
修改vi kafka-server-start.sh
export KAFKA_HEAP_OPTS="-Xmx4G -Xms4G -server -XX:+UseG1GC -XX:InitiatingHeapOccupancyPercent=70 -XX:G1RSetUpdatingPauseTimePercent=5 -XX:MAXGCPauseMillis=500 -XX:ParallelGCThreads=16 -XX:ConcGCThreads=16"
其他可选参数:
-XX: +PrintGCDetails
-XX: +PrintGCDatestamps
XX: +PrintHeapAtGC
-XX: +PrintTenuringDistribution
-XX: +PrintGCApplicationStoppedTime
-XX: +PrintPromotionFailure
#-XX: PrintFLSStatistics=1
#-Xloggc: /applog/gc.log
#-XX:HeapDumpPath:/heapdump
-XX: +UseGCLogFileRotation
-XX: NumberOfGCLogFiles=10
-XX: GCLogFileSize=10M
4、修改默认日志文件输出路径
配置kafka-run-class.sh,在文件最前面增加内容:LOG_DIR=/appuser/logs
修改日志级别:log4j.properties
执行命令
./bin/kafka-server-start.sh -daemon config/server.properties
decrease-replica-factor.json的json格式如下
{"version":1,
"partitions":[
{"topic":"s3-minio","partition":0,"replicas":[1,2]},
{"topic":"s3-minio","partition":1,"replicas":[1,3]}
]
}
下载confluent-connect-s3,下载地址或地址,下载confluentinc-kafka-connect-s3-5.5.x.zip完成后进行解压
mkdir -r /appuser/kafka_2.12-2.4.x/plugins 创建文件夹
将文件解压到/appuser/kafka_2.12-2.4.x/plugins/kafka-connect-s3-5.5.x
1、配置connect-distributed-s3.properties文件
找到connect-distributed.properties文件,执行 cp connect-distributed.peroperties connect-distributed-s3.properties命令,拷贝一份文件。
vi connect-distributed-s3.properties
bootstrap.servers=192.168.0.1:9092,192.168.0.2:9092,192.168.0.3:9092
plugin.path=/appuser/kafka_2.12-2.4.x/plugins/kafka-connect-s3-5.5.x
bin/kafka-distributes.sh -daemon config/connect-distributed-s3.properties
应能正常访问:http://host:8083/connectors
其中host可以是任意节点IP地址:192.168.0.1,192.168.0.2,192.168.0.3
为了完成kafka和minio之间的数据转换,需要创建s3的sink插件,通过插件将实时流数据转换为文本数据存储到文件系统,参考资料和详细参考资料,以下以JSON格式埋点数据为例:
curl -X POST -H "Content-Type:application/json" \--data'{
"name":s3-sink",
"config":{
"name":"s3-sink",
"connector.class":"io.confluent.connects3.S3SinkConnector",
"tasks.max"5",
"topics":"s3-minio",
"topics.dir":"",
"s3.bucket.name":"s3-bucket",
"s3.part.size:"26214400",
"flush.size":"4500",
"store.url":"http://f5-host:9000",
"aws.access.key.id":"minio",
"aws.secret.access.key":"minio",
"key.converter":"org.apache.kafka.connect.storage.StringConverter",
"key.converter.schemas.enable":"false",
"value.converter":"org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable":"false",
"storage.class":"io.confluent.connect.s3.storage.S3Storage",
"schema.compatibility":"NONE",
"schemas.enable":"false",
"format.class":"io.confluent.connect.s3.format.json.JsonFormat",
"partitioner.class": "io.confluent.connect.storage.partitioner.Time.BasedPartitioner",
"locale":"zh-CN",
"timezone":"Asia/Shanghai",
"timestamp.extractor":"Record",
"path.format":"YYYY-MM-dd",
"partition.duration.ms":"600000"
}
}'\http://192.168.0.1:8083/connectors
建议最tasks max和 connector分区数保持一致,f5-host为minio集群的F5地址或者ng的地址。
curl -X PUT -H "Content-Type:application/json" \--data'{
"name":"s3-sink",
"connector.class":"io.confluent.connects3.S3SinkConnector",
"tasks.max"5",
"topics":"s3-minio",
"topics.dir":"",
"s3.bucket.name":"s3-bucket",
"s3.part.size:"26214400",
"flush.size":"4500",
"store.url":"http://f5-host:9000",
"aws.access.key.id":"minio",
"aws.secret.access.key":"minio",
"key.converter":"org.apache.kafka.connect.storage.StringConverter",
"key.converter.schemas.enable":"false",
"value.converter":"org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable":"false",
"storage.class":"io.confluent.connect.s3.storage.S3Storage",
"schema.compatibility":"NONE",
"schemas.enable":"false",
"format.class":"io.confluent.connect.s3.format.json.JsonFormat",
"partitioner.class": "io.confluent.connect.storage.partitioner.Time.BasedPartitioner",
"locale":"zh-CN",
"timezone":"Asia/Shanghai",
"timestamp.extractor":"Record",
"path.format":"YYYY-MM-dd",
"partition.duration.ms":"600000"
}'\http://192.168.0.1:8083/connectors/s3-sink/config|jq
通过以上部署步骤,即可完成kafka,kafka-connect-s3插件部署;
请参考我的另一篇文章:【商城】Minio+ImgProxy商城图片一站式处理
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。