当前位置:   article > 正文

实战自定义Flink SQL Connector(w/ Flink 1.11 & Redis)

flinksql connector print

Foreword

Flink SQL之所以简洁易用而功能强大,其中一个重要因素就是其拥有丰富的Connector(连接器)组件。Connector是Flink与外部系统交互的载体,并分为负责读取的Source和负责写入的Sink两大类。不过,Flink SQL内置的Connector有可能无法cover实际业务中的种种需求,需要我们自行定制。好在社区已经提供了一套标准化、易于扩展的体系,用户只要按照规范面向接口编程,就能轻松打造自己的Connector。本文就在现有Bahir Flink项目的基础上逐步实现一个SQL化的Redis Connector。

Introducing DynamicTableSource/Sink

当前(Flink 1.11+)Flink SQL Connector的架构简图如下所示,设计文档可参见FLIP-95

动态表(dynamic table)一直都是Flink SQL流批一体化的重要概念,也是上述架构中Planning阶段的核心。而自定义Connector的主要工作就是实现基于动态表的Source/Sink,还包括上游产生它的工厂,以及下游在Runtime阶段实际执行Source/Sink逻辑的RuntimeProvider。Metadata阶段的表元数据则由Catalog维护。

前方海量代码预警。

Implementing RedisDynamicTableFactory

DynamicTableFactory需要具备以下功能:

  • 定义与校验建表时传入的各项参数;
  • 获取表的元数据;
  • 定义读写数据时的编码/解码格式(非必需);
  • 创建可用的DynamicTable[Source/Sink]实例。

实现了DynamicTable[Source/Sink]Factory接口的工厂类骨架如下所示。

  1. public class RedisDynamicTableFactory implements DynamicTableSourceFactory, DynamicTableSinkFactory {
  2. @Override
  3. public DynamicTableSource createDynamicTableSource(Context context) { }
  4. @Override
  5. public DynamicTableSink createDynamicTableSink(Context context) { }
  6. @Override
  7. public String factoryIdentifier() { }
  8. @Override
  9. public Set<ConfigOption<?>> requiredOptions() { }
  10. @Override
  11. public Set<ConfigOption<?>> optionalOptions() { }
  12. }

首先来定义Redis Connector需要的各项参数,利用内置的ConfigOption/ConfigOptions类即可。它们的含义都很简单,不再赘述。

  1. public static final ConfigOption<String> MODE = ConfigOptions
  2. .key("mode")
  3. .stringType()
  4. .defaultValue("single");
  5. public static final ConfigOption<String> SINGLE_HOST = ConfigOptions
  6. .key("single.host")
  7. .stringType()
  8. .defaultValue(Protocol.DEFAULT_HOST);
  9. public static final ConfigOption<Integer> SINGLE_PORT = ConfigOptions
  10. .key("single.port")
  11. .intType()
  12. .defaultValue(Protocol.DEFAULT_PORT);
  13. public static final ConfigOption<String> CLUSTER_NODES = ConfigOptions
  14. .key("cluster.nodes")
  15. .stringType()
  16. .noDefaultValue();
  17. public static final ConfigOption<String> SENTINEL_NODES = ConfigOptions
  18. .key("sentinel.nodes")
  19. .stringType()
  20. .noDefaultValue();
  21. public static final ConfigOption<String> SENTINEL_MASTER = ConfigOptions
  22. .key("sentinel.master")
  23. .stringType()
  24. .noDefaultValue();
  25. public static final ConfigOption<String> PASSWORD = ConfigOptions
  26. .key("password")
  27. .stringType()
  28. .noDefaultValue();
  29. public static final ConfigOption<String> COMMAND = ConfigOptions
  30. .key("command")
  31. .stringType()
  32. .noDefaultValue();
  33. public static final ConfigOption<Integer> DB_NUM = ConfigOptions
  34. .key("db-num")
  35. .intType()
  36. .defaultValue(Protocol.DEFAULT_DATABASE);
  37. public static final ConfigOption<Integer> TTL_SEC = ConfigOptions
  38. .key("ttl-sec")
  39. .intType()
  40. .noDefaultValue();
  41. public static final ConfigOption<Integer> CONNECTION_TIMEOUT_MS = ConfigOptions
  42. .key("connection.timeout-ms")
  43. .intType()
  44. .defaultValue(Protocol.DEFAULT_TIMEOUT);
  45. public static final ConfigOption<Integer> CONNECTION_MAX_TOTAL = ConfigOptions
  46. .key("connection.max-total")
  47. .intType()
  48. .defaultValue(GenericObjectPoolConfig.DEFAULT_MAX_TOTAL);
  49. public static final ConfigOption<Integer> CONNECTION_MAX_IDLE = ConfigOptions
  50. .key("connection.max-idle")
  51. .intType()
  52. .defaultValue(GenericObjectPoolConfig.DEFAULT_MAX_IDLE);
  53. public static final ConfigOption<Boolean> CONNECTION_TEST_ON_BORROW = ConfigOptions
  54. .key("connection.test-on-borrow")
  55. .booleanType()
  56. .defaultValue(GenericObjectPoolConfig.DEFAULT_TEST_ON_BORROW);
  57. public static final ConfigOption<Boolean> CONNECTION_TEST_ON_RETURN = ConfigOptions
  58. .key("connection.test-on-return")
  59. .booleanType()
  60. .defaultValue(GenericObjectPoolConfig.DEFAULT_TEST_ON_RETURN);
  61. public static final ConfigOption<Boolean> CONNECTION_TEST_WHILE_IDLE = ConfigOptions
  62. .key("connection.test-while-idle")
  63. .booleanType()
  64. .defaultValue(GenericObjectPoolConfig.DEFAULT_TEST_WHILE_IDLE);
  65. public static final ConfigOption<String> LOOKUP_ADDITIONAL_KEY = ConfigOptions
  66. .key("lookup.additional-key")
  67. .stringType()
  68. .noDefaultValue();
  69. public static final ConfigOption<Integer> LOOKUP_CACHE_MAX_ROWS = ConfigOptions
  70. .key("lookup.cache.max-rows")
  71. .intType()
  72. .defaultValue(-1);
  73. public static final ConfigOption<Integer> LOOKUP_CACHE_TTL_SEC = ConfigOptions
  74. .key("lookup.cache.ttl-sec")
  75. .intType()
  76. .defaultValue(-1);

接下来分别覆写requiredOptions()和optionalOptions()方法,它们分别返回Connector的必需参数集合和可选参数集合。

  1. @Override
  2. public Set<ConfigOption<?>> requiredOptions() {
  3. Set<ConfigOption<?>> requiredOptions = new HashSet<>();
  4. requiredOptions.add(MODE);
  5. requiredOptions.add(COMMAND);
  6. return requiredOptions;
  7. }
  8. @Override
  9. public Set<ConfigOption<?>> optionalOptions() {
  10. Set<ConfigOption<?>> optionalOptions = new HashSet<>();
  11. optionalOptions.add(SINGLE_HOST);
  12. optionalOptions.add(SINGLE_PORT);
  13. // 其他14个参数略去......
  14. optionalOptions.add(LOOKUP_CACHE_TTL_SEC);
  15. return optionalOptions;
  16. }

然后分别覆写createDynamicTableSource()与createDynamicTableSink()方法,创建DynamicTableSource和DynamicTableSink实例。在创建之前,我们可以利用内置的TableFactoryHelper工具类来校验传入的参数,当然也可以自己编写校验逻辑。另外,通过关联的上下文对象还能获取到表的元数据。代码如下,稍后会编写具体的Source/Sink类。

  1. @Override
  2. public DynamicTableSource createDynamicTableSource(Context context) {
  3. FactoryUtil.TableFactoryHelper helper = createTableFactoryHelper(this, context);
  4. helper.validate();
  5. ReadableConfig options = helper.getOptions();
  6. validateOptions(options);
  7. TableSchema schema = context.getCatalogTable().getSchema();
  8. return new RedisDynamicTableSource(options, schema);
  9. }
  10. @Override
  11. public DynamicTableSink createDynamicTableSink(Context context) {
  12. FactoryUtil.TableFactoryHelper helper = createTableFactoryHelper(this, context);
  13. helper.validate();
  14. ReadableConfig options = helper.getOptions();
  15. validateOptions(options);
  16. TableSchema schema = context.getCatalogTable().getSchema();
  17. return new RedisDynamicTableSink(options, schema);
  18. }
  19. private void validateOptions(ReadableConfig options) {
  20. switch (options.get(MODE)) {
  21. case "single":
  22. if (StringUtils.isEmpty(options.get(SINGLE_HOST))) {
  23. throw new IllegalArgumentException("Parameter single.host must be provided in single mode");
  24. }
  25. break;
  26. case "cluster":
  27. if (StringUtils.isEmpty(options.get(CLUSTER_NODES))) {
  28. throw new IllegalArgumentException("Parameter cluster.nodes must be provided in cluster mode");
  29. }
  30. break;
  31. case "sentinel":
  32. if (StringUtils.isEmpty(options.get(SENTINEL_NODES)) || StringUtils.isEmpty(options.get(SENTINEL_MASTER))) {
  33. throw new IllegalArgumentException("Parameters sentinel.nodes and sentinel.master must be provided in sentinel mode");
  34. }
  35. break;
  36. default:
  37. throw new IllegalArgumentException("Invalid Redis mode. Must be single/cluster/sentinel");
  38. }
  39. }

在factoryIdentifier()方法内指定工厂类的标识符,该标识符就是建表时必须填写的connector参数的值。

  1. @Override
  2. public String factoryIdentifier() {
  3. return "redis";
  4. }

笔者在之前的文章中介绍过,Flink SQL采用Java SPI机制来发现与加载表工厂类。所以最后不要忘了classpath的META-INF/services目录下创建一个名为org.apache.flink.table.factories.Factory的文件,并写入我们自定义的工厂类的全限定名,如:org.apache.flink.streaming.connectors.redis.dynamic.RedisDynamicTableFactory

Implementing RedisDynamicTableSink

Bahir Flink项目已经提供了基于DataStream API的RedisSink,我们可以利用它来直接构建RedisDynamicTableSink,减少重复工作。实现了DynamicTableSink接口的类骨架如下。

  1. public class RedisDynamicTableSink implements DynamicTableSink {
  2. private final ReadableConfig options;
  3. private final TableSchema schema;
  4. public RedisDynamicTableSink(ReadableConfig options, TableSchema schema) {
  5. this.options = options;
  6. this.schema = schema;
  7. }
  8. @Override
  9. public ChangelogMode getChangelogMode(ChangelogMode changelogMode) { }
  10. @Override
  11. public SinkRuntimeProvider getSinkRuntimeProvider(Context context) { }
  12. @Override
  13. public DynamicTableSink copy() { }
  14. @Override
  15. public String asSummaryString() { }
  16. }

getChangelogMode()方法需要返回该Sink可以接受的change log行的类别。由于向Redis写入的数据可以是只追加的,也可以是带有回撤语义的(如各种聚合数据),因此支持INSERT、UPDATE_BEFORE和UPDATE_AFTER类别。

  1. @Override
  2. public ChangelogMode getChangelogMode(ChangelogMode changelogMode) {
  3. return ChangelogMode.newBuilder()
  4. .addContainedKind(RowKind.INSERT)
  5. .addContainedKind(RowKind.UPDATE_BEFORE)
  6. .addContainedKind(RowKind.UPDATE_AFTER)
  7. .build();
  8. }

接下来需要实现SinkRuntimeProvider,即编写SinkFunction供底层运行时调用。由于RedisSink已经是现成的SinkFunction了,我们只需要写好通用的RedisMapper,顺便做一些前置的校验工作(如检查表的列数以及数据类型)即可。getSinkRuntimeProvider()方法与RedisMapper的代码如下,很容易理解。

  1. @Override
  2. public SinkRuntimeProvider getSinkRuntimeProvider(Context context) {
  3. Preconditions.checkNotNull(options, "No options supplied");
  4. FlinkJedisConfigBase jedisConfig = Util.getFlinkJedisConfig(options);
  5. Preconditions.checkNotNull(jedisConfig, "No Jedis config supplied");
  6. RedisCommand command = RedisCommand.valueOf(options.get(COMMAND).toUpperCase());
  7. int fieldCount = schema.getFieldCount();
  8. if (fieldCount != (needAdditionalKey(command) ? 3 : 2)) {
  9. throw new ValidationException("Redis sink only supports 2 or 3 columns");
  10. }
  11. DataType[] dataTypes = schema.getFieldDataTypes();
  12. for (int i = 0; i < fieldCount; i++) {
  13. if (!dataTypes[i].getLogicalType().getTypeRoot().equals(LogicalTypeRoot.VARCHAR)) {
  14. throw new ValidationException("Redis connector only supports STRING type");
  15. }
  16. }
  17. RedisMapper<RowData> mapper = new RedisRowDataMapper(options, command);
  18. RedisSink<RowData> redisSink = new RedisSink<>(jedisConfig, mapper);
  19. return SinkFunctionProvider.of(redisSink);
  20. }
  21. private static boolean needAdditionalKey(RedisCommand command) {
  22. return command.getRedisDataType() == RedisDataType.HASH || command.getRedisDataType() == RedisDataType.SORTED_SET;
  23. }
  24. public static final class RedisRowDataMapper implements RedisMapper<RowData> {
  25. private static final long serialVersionUID = 1L;
  26. private final ReadableConfig options;
  27. private final RedisCommand command;
  28. public RedisRowDataMapper(ReadableConfig options, RedisCommand command) {
  29. this.options = options;
  30. this.command = command;
  31. }
  32. @Override
  33. public RedisCommandDescription getCommandDescription() {
  34. return new RedisCommandDescription(command, "default-additional-key");
  35. }
  36. @Override
  37. public String getKeyFromData(RowData data) {
  38. return data.getString(needAdditionalKey(command) ? 1 : 0).toString();
  39. }
  40. @Override
  41. public String getValueFromData(RowData data) {
  42. return data.getString(needAdditionalKey(command) ? 2 : 1).toString();
  43. }
  44. @Override
  45. public Optional<String> getAdditionalKey(RowData data) {
  46. return needAdditionalKey(command) ? Optional.of(data.getString(0).toString()) : Optional.empty();
  47. }
  48. @Override
  49. public Optional<Integer> getAdditionalTTL(RowData data) {
  50. return options.getOptional(TTL_SEC);
  51. }
  52. }

剩下的copy()和asSummaryString()方法就很简单了。

  1. @Override
  2. public DynamicTableSink copy() {
  3. return new RedisDynamicTableSink(options, schema);
  4. }
  5. @Override
  6. public String asSummaryString() {
  7. return "Redis Dynamic Table Sink";
  8. }

Implementing RedisDynamicTableSource

与DynamicTableSink不同,DynamicTableSource又根据其特性分为两类,即ScanTableSource和LookupTableSource。顾名思义,前者能够扫描外部系统中的所有或部分数据,并且支持谓词下推、分区下推之类的特性;而后者不会感知到外部系统中数据的全貌,而是根据一个或者多个key去执行点查询并返回结果。

考虑到在数仓体系中Redis一般作为维度库使用,因此我们需要实现的是LookupTableSource接口。实现该接口的RedisDynamicTableSource类如下所示,大体结构与Sink类似。

  1. public class RedisDynamicTableSource implements LookupTableSource {
  2. private final ReadableConfig options;
  3. private final TableSchema schema;
  4. public RedisDynamicTableSource(ReadableConfig options, TableSchema schema) {
  5. this.options = options;
  6. this.schema = schema;
  7. }
  8. @Override
  9. public LookupRuntimeProvider getLookupRuntimeProvider(LookupContext context) {
  10. Preconditions.checkArgument(context.getKeys().length == 1 && context.getKeys()[0].length == 1, "Redis source only supports lookup by single key");
  11. int fieldCount = schema.getFieldCount();
  12. if (fieldCount != 2) {
  13. throw new ValidationException("Redis source only supports 2 columns");
  14. }
  15. DataType[] dataTypes = schema.getFieldDataTypes();
  16. for (int i = 0; i < fieldCount; i++) {
  17. if (!dataTypes[i].getLogicalType().getTypeRoot().equals(LogicalTypeRoot.VARCHAR)) {
  18. throw new ValidationException("Redis connector only supports STRING type");
  19. }
  20. }
  21. return TableFunctionProvider.of(new RedisRowDataLookupFunction(options));
  22. }
  23. @Override
  24. public DynamicTableSource copy() {
  25. return new RedisDynamicTableSource(options, schema);
  26. }
  27. @Override
  28. public String asSummaryString() {
  29. return "Redis Dynamic Table Source";
  30. }
  31. }

根据Flink框架本身的要求,用于执行点查询的LookupRuntimeProvider必须是TableFunction(同步)或者AsyncTableFunction(异步)。由于Bahir Flink项目采用的Jedis是同步客户端,故本文只给出同步版本的实现,异步版本可以换用其他客户端(如Redisson或Vert.x Redis Client)。RedisRowDataLookupFunction的代码如下。

  1. public static class RedisRowDataLookupFunction extends TableFunction<RowData> {
  2. private static final long serialVersionUID = 1L;
  3. private final ReadableConfig options;
  4. private final String command;
  5. private final String additionalKey;
  6. private final int cacheMaxRows;
  7. private final int cacheTtlSec;
  8. private RedisCommandsContainer commandsContainer;
  9. private transient Cache<RowData, RowData> cache;
  10. public RedisRowDataLookupFunction(ReadableConfig options) {
  11. Preconditions.checkNotNull(options, "No options supplied");
  12. this.options = options;
  13. command = options.get(COMMAND).toUpperCase();
  14. Preconditions.checkArgument(command.equals("GET") || command.equals("HGET"), "Redis table source only supports GET and HGET commands");
  15. additionalKey = options.get(LOOKUP_ADDITIONAL_KEY);
  16. cacheMaxRows = options.get(LOOKUP_CACHE_MAX_ROWS);
  17. cacheTtlSec = options.get(LOOKUP_CACHE_TTL_SEC);
  18. }
  19. @Override
  20. public void open(FunctionContext context) throws Exception {
  21. super.open(context);
  22. FlinkJedisConfigBase jedisConfig = Util.getFlinkJedisConfig(options);
  23. commandsContainer = RedisCommandsContainerBuilder.build(jedisConfig);
  24. commandsContainer.open();
  25. if (cacheMaxRows > 0 && cacheTtlSec > 0) {
  26. cache = CacheBuilder.newBuilder()
  27. .expireAfterWrite(cacheTtlSec, TimeUnit.SECONDS)
  28. .maximumSize(cacheMaxRows)
  29. .build();
  30. }
  31. }
  32. @Override
  33. public void close() throws Exception {
  34. if (cache != null) {
  35. cache.invalidateAll();
  36. }
  37. if (commandsContainer != null) {
  38. commandsContainer.close();
  39. }
  40. super.close();
  41. }
  42. public void eval(Object obj) {
  43. RowData lookupKey = GenericRowData.of(obj);
  44. if (cache != null) {
  45. RowData cachedRow = cache.getIfPresent(lookupKey);
  46. if (cachedRow != null) {
  47. collect(cachedRow);
  48. return;
  49. }
  50. }
  51. StringData key = lookupKey.getString(0);
  52. String value = command.equals("GET") ? commandsContainer.get(key.toString()) : commandsContainer.hget(additionalKey, key.toString());
  53. RowData result = GenericRowData.of(key, StringData.fromString(value));
  54. cache.put(lookupKey, result);
  55. collect(result);
  56. }
  57. }

有三点需要注意:

  • Redis维度数据一般用String或Hash类型存储,因此命令支持GET与HGET。如果使用Hash类型,需要在参数中额外传入它的key,不能像Sink一样动态指定;
  • 为了避免每来一条数据都请求Redis,需要设计缓存,上面利用的是Guava Cache。在Redis中查不到的数据也要缓存,防止穿透;
  • TableFunction必须有一个签名为eval(Object)eval(Object...)的方法。在本例中实际输出的数据类型为ROW<STRING, STRING>,在Flink Table的类型体系中应表示为RowData(StringData, StringData)。

Using Redis SQL Connector

来实际应用一下吧。先创建一张表示Hash结构的Redis Sink表。

  1. CREATE TABLE rtdw_dws.redis_test_order_stat_dashboard (
  2. hashKey STRING,
  3. cityId STRING,
  4. data STRING,
  5. PRIMARY KEY (hashKey) NOT ENFORCED
  6. ) WITH (
  7. 'connector' = 'redis',
  8. 'mode' = 'single',
  9. 'single.host' = '172.16.200.124',
  10. 'single.port' = '6379',
  11. 'db-num' = '10',
  12. 'command' = 'HSET',
  13. 'ttl-sec' = '86400',
  14. 'connection.max-total' = '5',
  15. 'connection.timeout-ms' = '5000',
  16. 'connection.test-while-idle' = 'true'
  17. )

然后读取Kafka中的订单流,统计一些简单的数据,并写入Redis。

  1. /*
  2. tableEnvConfig.setBoolean("table.dynamic-table-options.enabled", true)
  3. tableEnvConfig.setBoolean("table.exec.emit.early-fire.enabled", true)
  4. tableEnvConfig.setString("table.exec.emit.early-fire.delay", "5s")
  5. tableEnv.createTemporarySystemFunction("MapToJsonString", classOf[MapToJsonString])
  6. */
  7. INSERT INTO rtdw_dws.redis_test_order_stat_dashboard
  8. SELECT
  9. CONCAT('dashboard:city_stat:', p.orderDay) AS hashKey,
  10. CAST(p.cityId AS STRING) AS cityId,
  11. MapToJsonString(MAP[
  12. 'subOrderNum', CAST(p.subOrderNum AS STRING),
  13. 'buyerNum', CAST(p.buyerNum AS STRING),
  14. 'gmv', CAST(p.gmv AS STRING)
  15. ]) AS data
  16. FROM (
  17. SELECT
  18. cityId,
  19. SUBSTR(tss, 0, 10) AS orderDay,
  20. COUNT(1) AS subOrderNum,
  21. COUNT(DISTINCT userId) AS buyerNum,
  22. SUM(quantity * merchandisePrice) AS gmv
  23. FROM rtdw_dwd.kafka_order_done_log /*+ OPTIONS('scan.startup.mode'='latest-offset','properties.group.id'='fsql_redis_test_order_stat_dashboard') */
  24. GROUP BY TUMBLE(procTime, INTERVAL '1' DAY), cityId, SUBSTR(tss, 0, 10)
  25. ) p

观察结果~

再看一下Redis作为维度表的使用,仍然以Hash结构为例。

  1. CREATE TABLE rtdw_dim.redis_test_city_info (
  2. cityId STRING,
  3. cityName STRING
  4. ) WITH (
  5. 'connector' = 'redis',
  6. 'mode' = 'single',
  7. 'single.host' = '172.16.200.124',
  8. 'single.port' = '6379',
  9. 'db-num' = '9',
  10. 'command' = 'HGET',
  11. 'connection.timeout-ms' = '5000',
  12. 'connection.test-while-idle' = 'true',
  13. 'lookup.additional-key' = 'rtdw_dim:test_city_info',
  14. 'lookup.cache.max-rows' = '1000',
  15. 'lookup.cache.ttl-sec' = '600'
  16. )

为了方便观察结果,创建一张Print Sink表输出数据,然后将Kafka流表与Redis维表做Temporal Join,SQL语句如下。

  1. CREATE TABLE test.print_redis_test_dim_join (
  2. tss STRING,
  3. cityId BIGINT,
  4. cityName STRING
  5. ) WITH (
  6. 'connector' = 'print'
  7. )
  8. INSERT INTO test.print_redis_test_dim_join
  9. SELECT a.tss, a.cityId, b.cityName
  10. FROM rtdw_dwd.kafka_order_done_log /*+ OPTIONS('scan.startup.mode'='latest-offset','properties.group.id'='fsql_redis_source_test') */ AS a
  11. LEFT JOIN rtdw_dim.redis_test_city_info FOR SYSTEM_TIME AS OF a.procTime AS b ON CAST(a.cityId AS STRING) = b.cityId
  12. WHERE a.orderType = 12

查看输出~

  1. 4> +I(2021-03-04 20:44:48,10264,漳州市)
  2. 3> +I(2021-03-04 20:45:26,10030,常德市)
  3. 4> +I(2021-03-04 20:45:23,10332,桂林市)
  4. 7> +I(2021-03-04 20:45:26,10031,九江市)
  5. 9> +I(2021-03-04 20:45:23,10387,惠州市)
  6. 4> +I(2021-03-04 20:45:19,10607,芜湖市)
  7. 3> +I(2021-03-04 20:45:25,10364,无锡市)

The End

通过上面的示例,相信看官已经能够根据自己的需求灵活地定制Flink SQL Connector了。本文未详述的ScanTableSource、异步LookupTableSource和Encoding/Decoding Format也会在之后的文章中择机讲解。

最近春寒料峭,民那注意增减衣物。

晚安晚安。

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/寸_铁/article/detail/886367
推荐阅读
相关标签
  

闽ICP备14008679号