当前位置:   article > 正文

Kafka Streams实战-KTable API

ktable

本文会介绍:

  • 流和表的关系
  • 数据更新和KTable的配置
  • 聚合、窗口和流表连接
  • 全局KTable

1. 流和表的关系

1.1 数据更新流

假设我们有一个股票价格的数据流,每个数据包含股票的ID,timestamp和股价,要把这些数据写入到关系型数据库的表格里,如果使用股票的ID作为主键,那么具有相同ID的数据会被更新,我们可以把这种用于更新数据的流视为更新流。如下图所示:

这类似于changelog,具有相同key的数据只会保留最新的数据。而要保留每个key的最新数据,可以使用之前介绍过的compaction功能,旧的key/value会被删除,如下图所示:

对于changelog或更新流,我们会使用一个被称为KTable的抽象概念。

1.2 数据流和更新流的比较

我们会使用KStream和KTable来比较数据流和更新流。我们会通过运行一个简单的股票行情应用程序来说明,该应用程序会为三个虚构的公司生成三次股票报价,总共九条数据。KStream和KTable将读取这些数据并通过print()方法把它们输出打印到控制台。下图是打印的结果,KStream打印了所有九条数据,这是我们希望看到的结果,因为KStream视每一个数据都是独立的。而KTable只打印了三条数据,因为KTable视每一个数据都是对以前的更新。

注意:使用KTable时,数据必须要有key值,没有key是无法更新数据的。

从KTable的角度来看,它没有接收到9条单独的数据,它接收到的是三条原始数据和两轮的更新,它只打印最后一轮的更新。KTable的数据与KStream最后三条的数据是一样的,在后续部分会讲述KTable是如何仅仅输出更新数据的机制。

下面是上述应用程序的示例代码:

  1. StreamsBuilder builder = new StreamsBuilder();
  2. // 创建KTable实例
  3. KTable<String, StockTickerData> stockTickerTable = builder.table("stock-ticker-table");
  4. // 创建KStream实例
  5. KStream<String, StockTickerData> stockTickerStream = builder.stream("stock-ticker-stream");
  6. // 打印结果到控制台
  7. stockTickerTable.toStream().print(Printed.<String, StockTickerData>toSysOut().withLabel("Stocks-KTable"));
  8. stockTickerStream.print(Printed.<String, StockTickerData>toSysOut().withLabel("Stocks-KStream"));

注意:在创建KTable和KStream实例时没有指定任何serdes,之所以可以不指定是因为我们可以在配置里面先注册默认的serdes,例如:

  1. props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG,
  2.         Serdes.String().getClass().getName());
  3. props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG,
  4.         StreamsSerdes.StockTickerSerde().getClass().getName());

如果要使用不同类型则需要通过Consumed<K, V>提供使用的serdes。

这里要说的是,在数据流中具有相同key的数据是更新。更新流是KTable背后的主要概念。

2. 数据更新和KTable的配置

要弄清楚KTable的功能,我们应该知道:

  • 数据是存储在哪里?
  • KTable是如何确定输出数据?

当调用StreamsBuilder.table(final String topic)创建KTable实例的同时,内部会创建一个StateStore来跟踪流的状态,但它不可用于交互式查询。StreamsBuilder.table有个重载的方法,第二个参数是Materialized的实例,允许自定义存储的类型并提供查询,在后续部分会讲述交互式查询。因此,KTable是使用与Kafka Streams集成的本地状态存储数据的。

要回答第二个问题,我们需要考虑以下几个因素:

  • 应用程序的数据量,数据速率越快会增加输出更新数据的速度
  • 数据中有多少个不同的key,越多数量的不同key会导致向下游发送更多更新数据
  • 配置参数cache.max.bytes.buffering和commit.interval.ms的设置

从上述因素里面,我们只能控制配置参数的设置,所以本文只会介绍cache.max.bytes.buffering和commit.interval.ms。

2.1 设置缓存缓冲大小

KTable缓存用于对具有相同key的数据进行重复

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/你好赵伟/article/detail/590896
推荐阅读
相关标签
  

闽ICP备14008679号