当前位置:   article > 正文

Flink异步IO落库Cassandra_flink table 落库

flink table 落库

 

 当需要可伸缩性和高可用性而不影响性能时,Apache Cassandra「数据库」是正确的选择。
 线性可伸缩性和在商用硬件或云基础设施上经过验证的容错能力使其成为关键任务数据的完美平台。
 Cassandra对跨多个数据中心复制的支持是同类中最好的,它为用户提供了更低的延迟,并可以在区域性中断(断电等)中存活下来,让用户高枕无忧。

Cassandra

Cassandra能够快速不间断地管理海量数据。
The Cassandra Query Language (CQL)
Cassandra Configuration File
DataStax Docs
spark-cassandra-connector
生产环境稳定版3.11.4。需要注意flink-connector-cassandra的客户端datastax版本。

源码

单机和集群配置

默认的配置文件在

1
./conf/cassandra.yaml

 

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
最少配置:  
cluster_name:集群名称
seeds:用逗号分隔的集群种子节点的IP地址列表
storage_port:集群节点间RPC通信的端口号。默认7000,出于安全考虑,不要暴露到外网。
listen_address:节点的IP地址,用于节点间相互通信。还可以设置listen_interface,指定具体使用哪个网卡,但不能和listen_address同时设置。
native_transport_port:客户端访问时用到的端口。

用户名密码配置(一般要创建不同角色的用户并分配相应的DDL和DML权限):  
authenticator: PasswordAuthenticator
authorizer: CassandraAuthorizer

目录配置:
data_file_directories:数据文件存储位置,可以是一个或多个目录,比如多个磁盘
commitlog_directory:commitlog文件所在位置,出于性能考虑,可以和数据文件目录存储在不同磁盘上
saved_caches_directory:缓存文件所在目录
hints_directory:hints所在的目录

环境变量配置在cassandra-env.sh文件中,如JVM_OPTS

日志输出:  
用的是logback,编辑logback.xml文件进行修改。默认INFO级别输出到system.log,debug级别输出到debug.log。前台运行Cassandra会把INFO级别的日志输出到控制台。

客户端API和客户端工具

Flink用的是flink-connector-cassandra的jar,里面还是基于datastax的。

API示例代码:
com.datastax.driver.core.querybuilder.Insert
com.datastax.driver.core.querybuilder.Update
com.datastax.driver.core.querybuilder.Select
com.datastax.driver.core.querybuilder.Delete

类似Navicat这种客户端使用RazorSQLQuery, Edit, Browse, and Manage Databases

建表、查询和插入式更新

建表

1
2
3
4
5
6
7
8
drop table excelsior.t;
create table excelsior.t(
	pk int,
	t int,
	s varchar,
	v varchar,
	primary key(pk, t)
);

image

查询

1
select * from excelsior.t;

插入式更新

1
2
insert into  excelsior.t (pk, t)
values (5,6);

image

1
2
insert into  excelsior.t (pk, t, s)
values (5,6,'s2');

image

1
2
insert into  excelsior.t (pk, t, s)
values (5,6,'s5');

image

1
2
insert into  excelsior.t (pk, t, s)
values (5,6,null);

image

1
2
insert into  excelsior.t (pk, t, s, v)
values (5,6,'s6','v8');

image

1
2
insert into  excelsior.t (pk, t, v)
values (5,6,'v9');

image

异步Job

创建表

image

构造10万条消息

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
for (int i = 0; i < 100000; i++) {
    POJO pojo = new POJO();
    int j = random.nextInt(5);
    pojo.setAid("ID000-" + i);
    pojo.setAname("NAME-" + i);
    pojo.setAstyle("STYLE000-" + j);
    pojo.setEnergy(new BigDecimal(random.nextInt(1000)).setScale(2, RoundingMode.HALF_UP));
    pojo.setAge((j == 0 ? 1 : j) * 13);
    long time = System.currentTimeMillis();
    pojo.setTt(new Date(time));
    pojo.setLogTime(time);

    ZoneId shanghaiZoneId = ZoneId.of("Asia/Shanghai");
    ZonedDateTime shanghaiZonedDateTime = ZonedDateTime.now(shanghaiZoneId);

    DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSS");

    pojo.setCreateTime(shanghaiZonedDateTime.format(formatter));
    pojo.setUpdateTime(shanghaiZonedDateTime.format(formatter));

    pojo.setAstatus("0" + j);

    String value = gson.toJson(pojo);

    producer.send(new ProducerRecord<String, String>(AsyncCassandraJob.class.getSimpleName(), Integer.toString(i), value));

    System.out.println(value);
}

AsyncCassandraRequest

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
/**
 * AsyncCassandraRequest
 */
public class AsyncCassandraRequest extends RichAsyncFunction<POJOWrapper, String> {

    private static final long serialVersionUID = -3713249881014861537L;

    Logger logger = LoggerFactory.getLogger(AsyncCassandraRequest.class);

    private transient Cluster cluster;
    private transient ListenableFuture<Session> session;

    @Override
    public void open(Configuration parameters) throws Exception {
        // 配置与Cassandra集群之间的Socket连接参数
        SocketOptions so = new SocketOptions()
                // 读取数据超时时间 默认12000
                .setReadTimeoutMillis(30000)
                // 连接超时时间 默认5000
                .setConnectTimeoutMillis(30000);

        // 连接池相关的参数
        PoolingOptions poolingOptions = new PoolingOptions()
                // 每个连接的最大并发请求数 默认-2147483648
                .setMaxRequestsPerConnection(HostDistance.LOCAL, 32768)
                // 每个Cassandra节点多少个连接 核心连接数和最大连接数
                .setConnectionsPerHost(HostDistance.LOCAL, 4, 10)
                // 心跳间隔 默认30秒
                .setHeartbeatIntervalSeconds(60)
                // 连接池超时时间 默认5000
                .setPoolTimeoutMillis(30000);

        // 写数据时的数据一致性级别 QUORUM=至少成功写入Q个复制节点(Q=N/2+1)
        QueryOptions queryOptions = new QueryOptions().setConsistencyLevel(ConsistencyLevel.QUORUM);

        // 重试策略 如果做了预合并 加上Cassandra的插入式更新,可以多尝试几次直到成功
        // DowngradingConsistencyRetryPolicy:会降级的一致性,意思是重试之后的一致性要比初次的一致性级别低,也就是会保证最终一致性
        // 使用LoggingRetryPolicy包装一下
        RetryPolicy retryPolicy = new LoggingRetryPolicy(DowngradingConsistencyRetryPolicy.INSTANCE);

        cluster = Cluster.builder().addContactPoint("127.0.0.1").withSocketOptions(so)
                .withClusterName("Test Cluster")
//                .withCredentials("username", "password")
                .withPoolingOptions(poolingOptions)
                .withQueryOptions(queryOptions)
                .withRetryPolicy(retryPolicy)
                // 负载均衡策略 RoundRobinPolicy Hash取模
                .withLoadBalancingPolicy(new RoundRobinPolicy())
                .withPort(9042).build();

        session = cluster.connectAsync();
    }

    @Override
    public void close() throws Exception {
        if (cluster != null) {
            cluster.close();
        }
    }

    @Override
    public void timeout(POJOWrapper input, ResultFuture<String> resultFuture) {
        logger.info("timeout: " + input);
    }

    @Override
    public void asyncInvoke(POJOWrapper input, ResultFuture<String> resultFuture) throws Exception {
        System.out.println("asyncInvoke: " + Thread.currentThread().getName());

        ListenableFuture<ResultSet> resultSet = Futures.transform(session, new AsyncFunction<Session, ResultSet>() {
            @Override
            public ListenableFuture<ResultSet> apply(Session session) throws Exception {
                System.out.println(CqlHelper.createInsertQuery(input, false));
                return session.executeAsync(CqlHelper.createInsertQuery(input, false));
            }
        });

        Futures.addCallback(resultSet, new FutureCallback<ResultSet>() {
            @Override
            public void onSuccess(@Nullable ResultSet rows) {
                System.out.println("rows: " + rows);
                resultFuture.complete(Arrays.asList("success"));
            }

            @Override
            public void onFailure(Throwable throwable) {
                throwable.printStackTrace();
            }
        });
    }
}

10万条数据入库

image

时间

image

声明:本文内容由网友自发贡献,不代表【wpsshop博客】立场,版权归原作者所有,本站不承担相应法律责任。如您发现有侵权的内容,请联系我们。转载请注明出处:https://www.wpsshop.cn/w/我家自动化/article/detail/126286
推荐阅读