赞
踩
日志服务 SLS 是云原生观测与分析平台,为 Log、Metric、Trace 等数据提供大规模、低成本、实时的平台化服务,基于日志服务的便捷的数据接入能力,可以将系统日志、业务日志等接入 SLS 进行存储、分析;阿里云 Flink 是阿里云基于 Apache Flink 构建的大数据分析平台,在实时数据分析、风控检测等场景应用广泛。阿里云 Flink 原生支持阿里云日志服务 SLS 的 Connector,用户可以在阿里云 Flink 平台将 SLS 作为源表或者结果表使用。
阿里云 Flink SLS Connector 对于结构化的日志非常直接,通过配置,SLS 的日志字段可以与 Flink SQL 的 Table 字段列一一映射;然后仍有大量的业务日志并非完全的结构化,例如会将所有日志内容写入一个字段中,需要正则提前、分隔符拆分等手段才可以提取出结构化的字段,基于这个场景,本文介绍一种使用 SLS SPL 配置 SLS Connector 完成数据结构化的方案,覆盖日志清洗与格式规整场景。
日志数据往往是多种来源,多种格式,往往没有固定的 Schema,所以在数据处理前,需要先对数据进行清洗、格式规整,然后在进行数据分析;这类数据内容格式是不固定的,可能是 JSON 字符串、CSV 格式,甚至是不规则的 Java 堆栈日志。
Flink SQL 是一种兼容 SQL 语法的实时计算模型,可以基于 SQL 对结构化数据进行分析,但同时也要求源数据模式固定:字段名称、类型、数量是固定;这也是 SQL 计算模型的基础。
日志数据的弱结构化特点与 Flink SQL 结构化分析之间有着一道鸿沟,跨越这道鸿沟需要一个中间层来进行数据清洗、规整;这个中间层的方案有多种选择可以使用,下面会对不同的方案做简单对比,并提出一种新的基于 SLS SPL 的方案来轻量化完成解决数据清洗规整的工作。
下面是一条日志示例,日志格式较为复杂,既有 JSON 字符串,又有字符串与 JSON 混合的场景。其中:
- {
- "Payload": "{\"lastNotified\": 1705030483, \"serverUri\": \"http://test.alert.com/alert-api/tasks\", \"jobID\": \"44d6ce47bb4995ef0c8052a9a30ed6d8\", \"alertName\": \"alert-12345678-123456\", \"project\": \"test-sls-project\", \"projectId\": 123, \"aliuid\": \"1234567890\", \"alertDisplayName\": \"\\u6d4b\\u8bd5\\u963f\\u91cc\\u4e91\\u544a\\u8b66\", \"checkJobUri\": \"http://test.alert.com/alert-api/task_check\", \"schedule\": {\"timeZone\": \"\", \"delay\": 0, \"runImmediately\": false, \"type\": \"FixedRate\", \"interval\": \"1m\"}, \"jobRunID\": \"bf86aa5e67a6891d-61016da98c79b-5071a6b\", \"firedNotNotified\": 25161}",
- "TaskID": "bf86aa5e67a6891d-61016da98c79b-5071a6b-334f81a-5c38aaa1-9354-43ec-8369-4f41a7c23887",
- "TaskType": "ALERT",
- "__source__": "11.199.97.112",
- "__tag__:__hostname__": "iabcde12345.cloud.abc121",
- "__tag__:__path__": "/var/log/service_a.LOG",
- "caller": "executor/pool.go:64",
- "error": "CouldNotExecuteQuery : {\n \"httpCode\": 404,\n \"errorCode\": \"LogStoreNotExist\",\n \"errorMessage\": \"logstore k8s-event does not exist\",\n \"requestID\": \"65B7C10AB43D9895A8C3DB6A\"\n}",
- "requestURL": "/apis/autoscaling/v2beta1/namespaces/python-etl/horizontalpodautoscalers/cn-shenzhen-56492-1234567890123?timeout=30s",
- "ts": "2024-01-29 22:57:13"
- }
对于这样的日志提取出更有价值的信息需要进行数据清洗,首先需要提取重要的字段,然后对这些字段进行数据分析;本篇关注重要字段的提取,分析仍然可以在 Flink 中进行。
假设提取字段具体需求如下:
最终需要的字段列表如下,基于这样一个表格模型,我们可以便捷的使用 Flink SQL 进行数据分析。
实现这样的数据清洗,有很多种方法,这里列举几种基于 SLS 与 Flink 的方案,不同方案之间没有绝对的优劣,需要根据不同的场景选择不同的方案。
数据加工方案:在 SLS 控制台创建目标 Logstore,通过创建数据加工任务,完成对数据的清洗。
Flink 方案:将 error 和 payload 指定为源表字段,通过 SQL 正则函数、JSON 函数对字段进行解析,解析后的字段写入临时表,然后对临时表进行分析。
SPL 方案:在 Flink SLS Connector 中配置 SPL 语句,对数据进行清洗,Flink 中源表字段定义为清洗后的数据结构。
从上述三种方案的原理不难看出,在需要数据清洗的场景中,在 SLS Connector 中配置 SPL 是一种更轻量化的方案,具有轻量化、易维护、易扩展的特点。
在日志数据弱结构化的场景中,SPL 方案既避免了方案一中创建临时中间 Logstore,也避免了方案二中在 Flink 中创建临时表,在离数据源更近的位置进行数据清洗,在计算平台关注业务逻辑,职责分离更加清晰。
接下来以一段弱结构化日志为例,来介绍基于 SLS SPL 的能力来使用 Flink。为了便于演示,这里在 Flink 控制台配置 SLS 的源表,然后开启一个连续查询以观察效果。在实际使用过程中,仅需修改 SLS 源表配置,即可完成数据清洗与字段规整。
在 Logstore 可以可以开启扫描模式,SLS SPL 管道式语法使用丨分隔符分割不同的指令,每次输入一个指令可以即时查看结果,然后增加管道数,渐进式、探索式获取最终结果。
对上图中的 SPL 进行简单描述:
- * | project Payload, error, "__tag__:__path__", "__tag__:__hostname__", caller
- | parse-json Payload
- | project-away Payload
- | parse-regexp error, 'CouldNotExecuteQuery : ({[\w":\s,\-}]+)' as errorJson
- | parse-json errorJson
- | parse-regexp "__tag__:__path__", '\/var\/log\/([\w\_]+).LOG' as serviceName
- | parse-regexp caller, '\w+/([\w\.]+):(\d+)' as fileName, fileNo
- | project-rename serviceHost="__tag__:__hostname__"
- | extend scheduleType = json_extract_scalar(schedule, '$.type')
- | project httpCode, errorCode,errorMessage,requestID,fileName, fileNo, serviceHost,scheduleType, project
在阿里云 Flink 控制台创建一个空白的 SQL 的流作业草稿,点击下一步,进入作业编写。
在作业草稿中输入如下创建临时表的语句:
- CREATE TEMPORARY TABLE sls_input_complex (
- errorCode STRING,
- errorMessage STRING,
- fileName STRING,
- fileNo STRING,
- httpCode STRING,
- requestID STRING,
- scheduleType STRING,
- serviceHost STRING,
- project STRING,
- proctime as PROCTIME()
- ) WITH (
- 'connector' = 'sls',
- 'endpoint' ='cn-beijing-intranet.log.aliyuncs.com',
- 'accessId' = '${ak}',
- 'accessKey' = '${sk}',
- 'starttime' = '2024-02-01 10:30:00',
- 'project' ='${project}',
- 'logstore' ='${logtore}',
- 'query' = '* | project Payload, error, "__tag__:__path__", "__tag__:__hostname__", caller | parse-json Payload | project-away Payload | parse-regexp error, ''CouldNotExecuteQuery : ({[\w":\s,\-}]+)'' as errorJson | parse-json errorJson | parse-regexp "__tag__:__path__", ''\/var\/log\/([\w\_]+).LOG'' as serviceName | parse-regexp caller, ''\w+/([\w\.]+):(\d+)'' as fileName, fileNo | project-rename serviceHost="__tag__:__hostname__" | extend scheduleType = json_extract_scalar(schedule, ''$.type'') | project httpCode, errorCode,errorMessage,requestID,fileName, fileNo, serviceHost,scheduleType,project'
- );
在作业中输入分析语句,查看结果数据:
SELECT * FROM sls_input_complex
点击右上角调试按钮,进行调试,可以看到 TABLE 中每一列的值,对应 SPL 处理后的结果。
为了适应弱结构化日志数据的需求,Flink SLS Connector 进行了升级,支持直接通过 Connector配置 SPL 的方式实现 SLS 数据源的清洗下推,特别是需要正则字段提取、JSON 字段提取、CSV 字段提取场景下,相较原数据加工方案和原 Flink SLS Connector 方案更轻量级,让数据清洗的职责更加清晰,在数据源端完成数据清洗工作,也可以减少数据的网络传输流量,使得到达 Flink 的数据已经是规整好的数据,可以更加专注在 Flink 中进行业务数据分析。
同时为了便于 SPL 验证测试,SLS 扫描查询也已支持使用 SPL 进行查询,可以实时看到 SPL 管道式语法执行结果。
参考链接:
[1] 日志服务概述
https://help.aliyun.com/zh/sls/product-overview/what-is-log-service
[2] SPL 概述
https://help.aliyun.com/zh/sls/user-guide/spl-overview
[3] 阿里云 Flink Connector SLS
https://help.aliyun.com/zh/flink/developer-reference/log-service-connector
[4] SLS 扫描查询
https://help.aliyun.com/zh/sls/user-guide/scan-based-query-overview
作者:潘伟龙(豁朗)
本文为阿里云原创内容,未经允许不得转载。
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。