赞
踩
创建一个具备kinesis读写权限的role
我这边是搭建了一台linux-centos的机器,当然也可以是其他的服务器
搭建的服务器必须安装java,且版本要大于8的。
Java Archive Downloads - Java SE 8u211 and later
我这是通过python 程序发送数据到log,也可以通过其他方式
https://www.python.org/downloads/windows/
非常简单,就不截图说明了
说下分片:分片一共有两种模式按需和预置
按需:Kinesis Data Streams 自动管理分片以提供必要的吞吐量。您只需为使用的实际吞吐量付费,而 Kinesis Data Streams 在工作负载上升或下降时自动适应它们的吞吐量需求,默认情况下,采用按需模式的数据流会自动扩展吞吐量,以便容纳最高每秒 200MiB 的流量和每秒 200,000 条记录的写入容量。如果流量超出容量,您的数据流将受到限制。
预置:必须指定数据流的分片数。数据流的总容量是其分片容量的总和。您可以根据需要增加或减少数据流中的分片数,并按小时费率支付分片数。
个人见解:如果流的数据小于1000条/s,那就选预置1-2分片,如果流的数据不确定,设备太多,一秒几千上万 那就选按需的。
Data Firehose 安装选项比较多,但是也比较简单没有啥。firehose不管分区,但是可以创建分区路径。所以glue table 需要分区查询就需要自己创建。
安装:
- {
- "cloudwatch.emitMetrics": true,
- "kinesis.endpoint": "kinesis.ap-northeast-1.amazonaws.com",
- "firehose.endpoint": "firehose.ap-northeast-1.amazonaws.com",
- “awsAccessKeyId”: “************",
- "awsSecretAccessKey": “***********",
- "flows": [
- {
- "filePattern": "/tmp/agent_data/*.log*",
- "kinesisStream": "poc_test_2",
- "partitionKeyOption": "RANDOM",
- "kinesis.roleArn": "arn:aws:iam::11111111:role/kinesis_DefaultRole"
- }
- ]
- }}
kinesis.endpoint :kinesis data stream endpoint ,如果不是EC2服务器,可能前面还需要加http:
firehose.endpoint :firehose endpoint ,如果不是EC2服务器,可能前面还需要加http:
flows 是数组 所以内部配置可以是多个。
filePattern:需要监测的的文件路径,比如上面就是监测/tmp/agent_data目录下所有后缀包含log的文件。
kinesisStream:kinesis data stream 流名称
partitionKeyOption:分片设置,默认随机
kinesis.roleArn :具有kinesis 读写权限的role
配置文件还有很多其他选项,这些算是常用的
其余命令可参考文档:使用 Kinesis 代理写入 Amazon Kinesis Data Streams - Amazon Kinesis Data Streams
命令:
python写的
- import logging
- import json
- logger = logging.getLogger(__name__)
- logger.setLevel(logging.INFO)
- handler = logging.FileHandler(‘test_data.log’)
- handler.setLevel(logging.INFO)
- formatter = logging.Formatter(‘%(message)s’)
- handler.setFormatter(formatter)
- logger.addHandler(handler)
- # data_json.json is sample json format data
- with open(‘data_json.json’,‘rb’) as f:
- js = json.loads(f.read().decode('utf-8'))
- # for 100 times and put it in the test_data.log file
- for num in range(1, 100):
- json_str = json.dumps(js)
- logger.info(json_str)

以下测试都是个人结果,可能存在问题。如有不同,可留言告知。
1.覆写或者修改某一行数据 agent 是否会重复发送
答:不会
如:比如先输入一条,被datastream消费走了,然后覆盖写入4条,结果是只会发送3条,因为第一条数据被覆盖,会报错:Error when processing current input file or when tracking its status. 覆盖写入会缺少一条数据。
2.创建新的流,是否会拉取同路径下其他的 agent log文件里的数据
答:不会
如:创建一个新的datastream,向旧的log文件(有数据)中插入新的数据,新的datastream并不会将旧的数据重新读取,而是只会将新插入的数据传入到datastream-firehose中。
但是如果新建的firehose会消费输入datastream内的所有数据,不管该datastream流是否被消费过,所以datastream有存储功能,数据被消费依然存在。
3.当agent服务意外停止后,继续输入数据,当agent启动后是否会传输这些数据
答:会
当服务停止后,启动服务后,agent会将监测路径下的log文件内的新内容发送到datastream。
意味着,当监测文件中的数据被读取后,将会被标记,将不会被agent二次读取,如果修改了已处理的数据,则会报错,而替换的数据将不会被传输,所以个人感觉agent监测的行号
4.将已传输完成的文件名修改后,agent是否会是认为新文件重新读取
答:不会,mv 修改log文件名称,agent 不会重新读取
5. 直接将数据文件拖到监测文件目录是否可以传输
答:会
6. datastream 会不会自动删除重复数据
答:不会
重复数据需要在consumer端自行处理
7.如果传输数据大于限制(比如一个分片,1s 大于1000条或1MB),数据是否会丢失
答:个人测试不会,只不过速度会慢,分批发送,但是查阅过外网资料,有人测试过会丢失数据,也有可能我测试的数据量不够大,所以尽量还是按照分片的限制来传输,如果数据量超过,添加分片数即可。
8.agent 能否在windows上使用
答:可以,我用的测试版本是EC2-Windows Server 2022 Datacenter
必须安装java8以上版本和.NET Framework 大于4.6。如果默认大于4.6 ,但是启动服务错误,就卸载然后重装就行了。
role权限需要加上个sts:AssumeRole
信任关系也需要加上该帐号:这个问题研究了1天,不知道你们需要不需要,就是把自己的帐号加到这个role的信任关系里,当然你的帐号是root的话 应该就不会有这个问题。
- {
- "Version": "2012-10-17",
- "Statement": [
- {
- "Sid": "",
- "Effect": "Allow",
- "Principal": {"Service": "ec2.amazonaws.com",
- "AWS": "arn:aws:iam::<deployment-acct-id>:user/用户名"},
- "Action": "sts:AssumeRole"
- }
- ]
- }
配置文件也跟linux不一样:
- {
- "Sources": [
- {
- "Id": "JsonLogSource", #source唯一标识
- "SourceType": "DirectorySource", #类型
- "RecordParser": "SingleLineJson", # 数据类型
- "Directory": "C:\\LogSource\\", #监测的文件路径
- "FileNameFilter": "*.log" #监测的文件格式
- }
- ],
- "Sinks": [
- {
- "Id": "TestKinesisStreamSink", #sink的唯一标识
- "SinkType": "KinesisStream", # skin的类型,是datastream
- "StreamName": "muji_win_stream", #datastream 名称
- "AccessKey":"*************",
- "SecretKey":"*************",
- "RoleARN":"arn:aws:iam::11111111:role/kinesis_DefaultRole",
- "Region": "ap-northeast-1"
- }
- ],
- "Pipes": [
- {
- "Id": "JsonLogSourceToLogStream",
- "SourceRef": "JsonLogSource", ## source id
- "SinkRef": "TestKinesisStreamSink" #sink id
- }
- ]
- }

启动的话,可以使用windows服务启动也可以使用命令
启动命令:Start-Service -Name AWSKinesisTap
无论是linux 还是windows 只要修改过配置文件都需要重新启动服务
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。