赞
踩
本文会介绍:
1. 流和表的关系
1.1 数据更新流
假设我们有一个股票价格的数据流,每个数据包含股票的ID,timestamp和股价,要把这些数据写入到关系型数据库的表格里,如果使用股票的ID作为主键,那么具有相同ID的数据会被更新,我们可以把这种用于更新数据的流视为更新流。如下图所示:
这类似于changelog,具有相同key的数据只会保留最新的数据。而要保留每个key的最新数据,可以使用之前介绍过的compaction功能,旧的key/value会被删除,如下图所示:
对于changelog或更新流,我们会使用一个被称为KTable的抽象概念。
1.2 数据流和更新流的比较
我们会使用KStream和KTable来比较数据流和更新流。我们会通过运行一个简单的股票行情应用程序来说明,该应用程序会为三个虚构的公司生成三次股票报价,总共九条数据。KStream和KTable将读取这些数据并通过print()方法把它们输出打印到控制台。下图是打印的结果,KStream打印了所有九条数据,这是我们希望看到的结果,因为KStream视每一个数据都是独立的。而KTable只打印了三条数据,因为KTable视每一个数据都是对以前的更新。
注意:使用KTable时,数据必须要有key值,没有key是无法更新数据的。
从KTable的角度来看,它没有接收到9条单独的数据,它接收到的是三条原始数据和两轮的更新,它只打印最后一轮的更新。KTable的数据与KStream最后三条的数据是一样的,在后续部分会讲述KTable是如何仅仅输出更新数据的机制。
下面是上述应用程序的示例代码:
- StreamsBuilder builder = new StreamsBuilder();
- // 创建KTable实例
- KTable<String, StockTickerData> stockTickerTable = builder.table("stock-ticker-table");
- // 创建KStream实例
- KStream<String, StockTickerData> stockTickerStream = builder.stream("stock-ticker-stream");
- // 打印结果到控制台
- stockTickerTable.toStream().print(Printed.<String, StockTickerData>toSysOut().withLabel("Stocks-KTable"));
- stockTickerStream.print(Printed.<String, StockTickerData>toSysOut().withLabel("Stocks-KStream"));
注意:在创建KTable和KStream实例时没有指定任何serdes,之所以可以不指定是因为我们可以在配置里面先注册默认的serdes,例如:
- props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG,
- Serdes.String().getClass().getName());
- props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG,
- StreamsSerdes.StockTickerSerde().getClass().getName());
如果要使用不同类型则需要通过Consumed<K, V>提供使用的serdes。
这里要说的是,在数据流中具有相同key的数据是更新。更新流是KTable背后的主要概念。
2. 数据更新和KTable的配置
要弄清楚KTable的功能,我们应该知道:
当调用StreamsBuilder.table(final String topic)创建KTable实例的同时,内部会创建一个StateStore来跟踪流的状态,但它不可用于交互式查询。StreamsBuilder.table有个重载的方法,第二个参数是Materialized的实例,允许自定义存储的类型并提供查询,在后续部分会讲述交互式查询。因此,KTable是使用与Kafka Streams集成的本地状态存储数据的。
要回答第二个问题,我们需要考虑以下几个因素:
从上述因素里面,我们只能控制配置参数的设置,所以本文只会介绍cache.max.bytes.buffering和commit.interval.ms。
2.1 设置缓存缓冲大小
KTable缓存用于对具有相同key的数据进行重复
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。