赞
踩
介绍
Cassandra是一套开源分布式NoSQL数据库系统。它最初由Facebook开发,用于储存收件箱等简单格式数据,集GoogleBigTable的数据模型与Amazon
Dynamo的完全分布式的架构于一身Facebook于2008将 Cassandra
开源,此后,由于Cassandra良好的可扩展性,被Digg、Twitter等知名Web
2.0网站所采纳,成为了一种流行的分布式结构化数据存储方案
下载:cassandra下载地址(以3.9版本为例)
https://archive.apache.org/dist/cassandra/3.9/
我下载linux版,安装在centos7,下载apache-cassandra-3.9-bin.tar.gz
上传到服务器/usr/local/cassandra目录
tar -xzvf apache-cassandra-3.9-bin.tar.gz
在解压目录下/usr/local/cassandra/apache-cassandra-3.9 创建3个目录
commitlog data saved_caches
打开/usr/local/cassandra/apache-cassandra-3.9/conf下的cassandra.yaml
基本配置
数据存储目录
data_file_directories:
- /usr/local/apache-cassandra-3.9/data
#提交记录日志
commitlog_directory: /usr/local/apache-cassandra-3.9/commitlog
#缓存配置
saved_caches_directory: /usr/local/apache-cassandra-3.9/saved_caches
集群配置(
打开每台服务器的7000端口
,这是每台的通信端口)
我以3台服务器为例:192.168.10.128、192.168.10.129、192.168.10.130
以192.168.10.128、192.168.10.129为seed节点:
192.168.10.130的配置:
cluster_name: 'Test Cluster'
seed_provider:
- class_name: org.apache.cassandra.locator.SimpleSeedProvider
parameters:
- seeds: "192.168.10.128,192.168.10.129"
listen_address: 192.168.10.130
rpc_address: 192.168.10.130
另外两台也差不多,只是把listen_address和rpc_address换成自己的ip,节点之间必须保证cluster_name一致。如果改了cluster_name,那么需要删除data里面的数据再重启
在/usr/local/cassandra/apache-cassandra-3.9/bin目录下执行
./cassandra -R
常用命令:
查看集群状态: ./nodetool status
连接客户端: ./cqlsh 如果配置了listen_address为自己服务器的ip 那么就用
./cqlsh ip
(自己服务器ip)
生成还有加上账号密码
7199 - JMX
7000 - 节点间通信(如果启用了TLS,则不使用)
7001 - TLS节点间通信(使用TLS时使用)
9160 - Thrift客户端API
9042 - CQL本地传输端口
cluster_name 集群的名字,默认情况下是TestCluster。对于这个属性的配置可以防止某个节点加入到其他集群中去,所以一个集群中的节点必须有相同的cluster_name属性。 listen_address Cassandra需要监听的IP或主机名,默认是localhost。建议配置私有IP,不要用0.0.0.0。 commitlog_directory commit log的保存目录,压缩包安装方式默认是/var/lib/cassandra/commitlog。通过前面的了解,我们可以知道,把这个目录和数据目录分开存放到不同的物理磁盘可以提高性能。 data_file_directories 数据文件的存放目录,压缩包安装方式默认是/var/lib/cassandra/data。为了更好的效果,建议使用RAID 0或SSD。 save_caches_directory 保存表和行的缓存,压缩包安装方式默认是/var/lib/cassandra/saved_caches。 通常使用:用得比较频繁的属性 在启动节点前,需要仔细评估你的需求。 commit_failure_policy 提交失败时的策略(默认stop): stop:关闭gossip和Thrift,让节点挂起,但是可以通过JMX进行检测。 sto_commit:关闭commit log,整理需要写入的数据,但是提供读数据服务。 ignore:忽略错误,使得该处理失败。 disk_failure_policy 设置Cassandra如何处理磁盘故障(默认stop)。 stop:关闭gossip和Thrift,让节点挂起,但是可以通过JMX进行检测。 stop_paranoid:在任何SSTable错误时就闭gossip和Thrift。 best_effort:这是Cassandra处理磁盘错误最好的目标。如果Cassandra不能读取磁盘,那么它就标记该磁盘为黑名单,可以继续在其他磁盘进行写入数据。如果Cassandra不能从磁盘读取数据,那个这些SSTable就标记为不可读,其他可用的继续堆外提供服务。所以就有可能在一致性水平为ONE时会读取到过期的数据。 ignore:用于升级情况。 endpoint_snitch 用于设置Cassandra定位节点和路由请求的snitch(默认org.apache.cassandra.locator.SimpleSnitch),必须设置为实现了IEndpointSnitch的类。 rpc_address 一般填写本机ip 用于监听客户端连接的地址。可用的包括: 0.0.0.0监听所有地址 IP地址 主机名 不设置:使用hosts文件或DNS seed_provider 需要联系的节点地址。Cassandra使用-seeds集合找到其他节点并学习其整个环中的网络拓扑。 class_name:(默认org.apache.cassandra.locator.SimpleSeedProvider),可用自定义,但通常不必要。 – seeds:(默认127.0.0.1)逗号分隔的IP列表。 compaction_throughput_mb_per_sec 限制特定吞吐量下的压缩速率。如果插入数据的速度越快,越应该压缩SSTable减少其数量。推荐16-32倍于写入速度(MB/s)。如果是0表示不限制。 memtable_total_space_in_mb 指定节点中memables最大使用的内存数(默认1/4heap)。 concurrent_reads (默认32)读取数据的瓶颈是在磁盘上,设置16倍于磁盘数量可以减少操作队列。 concurrent_writes (默认32)在Cassandra里写很少出现I/O不稳定,所以并发写取决于CPU的核心数量。推荐8倍于CPU数。 incremental_backups (默认false)最后一次快照发生时备份更新的数据(增量备份)。当增量备份可用时,Cassandra创建一个到SSTable的的硬链接或者流式存储到本地的备份/子目录。删除这些硬链接是操作员的责任。 snapshot_before_compaction (默认false)启用或禁用在压缩前执行快照。这个选项在数据格式改变的时候来备份数据是很有用的。注意使用这个选项,因为Cassandra不会自动删除过期的快照。 phi_convict_threshold (默认8)调整失效检测器的敏感度。较小的值增加了把未响应的节点标注为挂掉的可能性,反之就会降低其可能性。在不稳定的网络环境下(比如EC2),把这个值调整为10或12有助于防止错误的失效判断。大于12或小于5的值不推荐! 性能调优 commit_sync (默认:periodic)Cassandra用来确认每毫秒写操作的方法。 periodic:和commitlog_sync_period_in_ms(默认10000 – 10 秒)一起控制把commit log同步到磁盘的频繁度。周期性的同步会立即确认。 batch:和commitlog_sync_batch_window_in_ms(默认disabled)一起控制Cassandra在执行同步前要等待其他写操作多久时间。当使用该方法时,写操作在同步数据到磁盘前不会被确认。 commitlog_periodic_queue_size (默认1024*CPU的数量)commit log队列上的等待条目。当写入非常大的blob时,请减少这个数值。比如,16倍于CPU对于1MB的Blob工作得很好。这个设置应该至少和concurrent_writes一样大。 commitlog_segment_size_in_mb (默认32)设置每个commit log文件段的大小。一个commit log段在其所有数据刷新到SSTable后可能会被归档、删除或回收。数据的总数可以潜在的包含系统中所有表的commit log段。默认值适合大多数情况,当然你也可以修改,比如8或16MB。 commitlog_total_space_in_mb (默认32位JVM为32,64位JVM为1024)commit log使用的总空间。如果使用的空间达到以上指定的值,Cassandra进入下一个临近的部分,或者把旧的commit log刷新到磁盘,删除这些日志段。该个操作减少了在启动时加载过多数据引起的延迟,防止了把无限更新的表保存到有限的commit log段中。 compaction_preheat_key_cache (默认true)当设置为true的时候,缓存的row key在压缩期间被跟踪,并且重新缓存其在新压缩的SSTable中的位置。如果有极其大的key要缓存,把这个值设为false。 concurrent_compactors (默认每个CPU一个)设置每个节点并发压缩处理的值,不包含验证修复逆商。并发压缩可以在混合读写工作下帮助保持读的性能——通过减缓把一堆小的SSTable压缩而进行的长时间压缩。如果压缩运行得太慢或太快,请首先修改compaction_throughput_mb_per_sec的值。 in_memory_compaction_limit_in_mb (默认64)针对数据行在内存中的压缩限制。超大的行会溢出磁盘并且使用更慢的二次压缩。当这个情况发生时,会对特定的行的key记录一个消息。推荐5-10%的Java对内存大小。 multithreaded_compaction (默认false)当设置为true的时候,每个压缩操作使用一个线程,一个线程用于合并SSTable。典型的,这个只在使用SSD的时候有作用。使用HDD的时候,受限于磁盘I/O(可参考compaction_throughput_mb_per_sec)。 preheat_kernel_page_cache (默认false) 启用或禁用内核页面缓存预热压缩后的key缓存。当启用的时候会预热第一个页面(4K)用于由每个数据行的顺序访问。对于大的数据行通常是有危害的。 file_cache_size_in_mb (小于1/4堆内存或512)用于SSTable读取的缓存内存大小。 memtable_flush_queue_size (默认4)等待刷新的满的memtable的数量(等待写线程的memtable)。最小是设置一个table上索引的最大数量。 memtable_flush_writers (默认每数据目录一个)设置用于刷新memtable的线程数量。这些线程是磁盘I/O阻塞的,每个线程在阻塞的情况下都保持了memtable。如果有大的堆内存和很多数据目录,可以增加该值提升刷新性能。 column_index_size_in_kb (默认64)当数据到达这个值的时候添加列索引到行上。这个值定义了多少数据行必须被反序列化来读取列。如果列的值很大或有很多列,那么就需要增加这个值。 populate_io_cache_on_flush (默认false)添加新刷新或压缩的SSTable到操作系统的页面缓存。 reduce_cache_capacity_to (默认0.6)设置由reduce_cache_sizes_at定义的Java对内存达到限制时的最大缓存容量百分比。 reduce_cache_sizes_at (默认0.85)当Java对内存使用率达到这个百分比,Cassandra减少通过reduce_cache_capacity_to定义的缓存容量。禁用请使用1.0。 stream_throughput_outbound_megabits_per_sec (默认200)限制所有外出的流文件吞吐量。Cassandra在启动或修复时使用很多顺序I/O来流化数据,这些可以导致网络饱和以及降低RPC的性能。 trickle_fsync (默认false)当使用顺序写的时候,启用该选项就告诉fsync强制操作系统在trickle_fsync_interval_in_kb设定的间隔刷新脏缓存。建议在SSD启用。 trickle_fsync_interval_in_kb (默认10240)设置fsync的大小
cassandra的键空间(KeySpace)
相当于数据库
,我们创建一个键空间就是创建了一个数据库
副本因子,副本策略,Durable_writes(是否启用 CommitLog 机制)
副本因子决定数据有几份副本。例如:
副本因子为1表示每一行只有一个副,。副本因子为2表示每一行有两个副本,每个副本位于不同的节点上。在实际应用中为了避免单点故障,会配置为3以上。所有的副本都同样重要,没有主从之分。可以为每个数据中心定义副本因子。副本策略设置应大于1,但是不能超过集群中的节点数
描述的是副本放在集群中的策略
查看集群
desc cluster;
查看键空间(类似于mysql查看有哪些库)
desc Keyspaces;
进入键空间(类似mysql的选择哪个库)
use 键空间名;
查看该库下有哪些表desc tables;
查看表结构
desc 表名;
创建一个键空间名字为:school,副本策略选择:简单策略 SimpleStrategy,副本因子:3
CREATE KEYSPACE school WITH replication = {'class':'SimpleStrategy', 'replication_factor' : 3};
修改键空间,修改键空间语句,修改school键空间,把副本引子 从3 改为1
ALTER KEYSPACE school WITH replication = {'class':'SimpleStrategy', 'replication_factor' : 1};
删除键空间
DROP KEYSPACE school
CREATE TABLE student(
id int PRIMARY KEY,
name text,
age int,
gender tinyint,
address text ,
interest set<text>,
phone list<text>,
education map<text, text>
);
查看创建的表: DESCRIBE TABLE student;
CREATE INDEX sname ON student (name);
drop index sname;
查询:
Primary Key
只能用 = 号查询 第二主键 支持= > < >= <= 索引列 只支持 = 号
非索引非主键字段过滤可以使用ALLOW FILTERING
第一主键 只能用=号查询 例如:
create table testTab (
key_one int,
key_two int,
name text,
age int,
PRIMARY KEY(key_one, key_two) );
create INDEX tage ON testTab (age);
key_one
是第一主键,key_two
是第二主键,age
是索引列,name
是普通列
1.key_one列是第一主键 对key_one进行 = 号查询,可以查出结果,对key_one 进行范围查询使用 > 号,无法查出结果.它是分区键,就是要指定哪个区先
2.不要单独对key_two 进行 查询,加上ALLOW FILTERING 后确实可以查询出数据,但是不建议这么做
select * from testtab where key_two = 8 ALLOW FILTERING;
正确的做法是,在查询第二主键时,前面先写上第一主键
select * from testtab where key_one=12 and key_two > 7;
3.索引列 只支持=号
select * from testtab where age = 19;
4.普通列,非索引非主键字段 name是普通列,在查询时需要使用ALLOW FILTERING
select * from testtab where key_one=12 and name='张小仙' allow filtering; --可以查询
不写allow filtering会报错
ALLOW FILTERING是一种非常消耗计算机资源的查询方式。
如果表包含例如100万行,并且其中95%具有满足查询条件的值,则查询仍然相对有效,这时应该使用ALLOW FILTERING。
如果表包含100万行,并且只有2行包含满足查询条件值,则查询效率极低。Cassandra将无需加载999,998行。如果经常使用查询,则最好在列上添加索引。
ALLOW FILTERING在表数据量小的时候没有什么问题,但是数据量过大就会使查询变得缓慢
更新列:
UPDATE <tablename> SET <column name> = <new value> <column name> = <value>....WHERE <condition> UPDATE student set gender = 1 where student_id= 1012;
批量操作: 语法:
BEGIN BATCH <insert-stmt>/ <update-stmt>/ <delete-stmt> APPLY BATCH;
示例:
BEGIN BATCH INSERT INTO
student(id,address,age,education,gender,interest,name,phone)VALUES(1013,{'忠县中学'},20,{'小学':'山河小学','初中':'来仪小学'},1,{'打牌','泡妞'},'黎飞',['110','119']);
UPDATE student set age=11 where id= 1012; DELETE FROM student WHERE id >=1011;
APPLY BATCH;
删除行
语法:DELETE FROM <identifier> WHERE <condition>;
示例:DELETE FROM student WHERE student_id=1012;
<dependency>
<groupId>com.datastax.cassandra</groupId>
<artifactId>cassandra-driver-mapping</artifactId>
<version>3.9.0</version>
</dependency>
<dependency>
<groupId>com.datastax.cassandra</groupId>
<artifactId>cassandra-driver-core</artifactId>
<version>3.9.0</version>
</dependency>
public class TestKeySpace { Session session = null; /** * 连接cassandra */ @Before public void init() { String host = "192.168.10.130"; int port = 9042; Cluster cluster = Cluster.builder() .addContactPoint(host) .withPort(port) .withCredentials("cassandra", "cassandra") .withoutJMXReporting() .build(); session = cluster.connect(); } }
/** * 查找键空间 */ @Test public void findKeySpace() { List<KeyspaceMetadata> keyspaces = session.getCluster().getMetadata().getKeyspaces(); for (KeyspaceMetadata keyspaceMetadata : keyspaces) { System.out.println(keyspaceMetadata.getName()); } } /** * 创建键空间 */ @Test public void createKeySpace() { //方法1 使用CQL语句 // String createKeySpaceCQL = "create keyspace if not exists liaochao001 with replication={'class':'SimpleStrategy', 'replication_factor': 1}"; // session.execute(createKeySpaceCQL); //方法2 使用面向对象 Map<String, Object> map = new HashMap<>(); map.put("class", "SimpleStrategy"); map.put("replication_factor", 2); KeyspaceOptions options = SchemaBuilder .createKeyspace("girls") .ifNotExists() .with() .replication(map); session.execute(options); } /** * 删除键空间 */ @Test public void deleteKeySpace() { //方法 1 使用CQL //String createKeySpaceCQL = "drop keyspace liaochao001"; //session.execute(createKeySpaceCQL); //方法2 使用面向对象的方式 DropKeyspace dropKeyspace = SchemaBuilder .dropKeyspace("mumu") .ifExists(); session.execute(dropKeyspace); } /** * 修改键空间 */ @Test public void alterKeySpace() { Map<String, Object> map = new HashMap<>(); map.put("class", "SimpleStrategy"); map.put("replication_factor", 1); KeyspaceOptions keyspaceOptions = SchemaBuilder .alterKeyspace("mumu") .with() .replication(map); session.execute(keyspaceOptions); } 4.表(列族)的操作 /** * 创建列族(类似于创建表) */ @Test public void createColumn() { //方法1 使用CQL String createTableCQL = "create table if not exists school.student(name varchar primary key, age int)"; session.execute(createTableCQL); //方法2 使用schemaBuilder Create create = SchemaBuilder .createTable("girls", "student") .addPartitionKey("id", DataType.bigint()) .addColumn("address", DataType.set(DataType.text())) .addColumn("age", DataType.cint()) .addColumn("name", DataType.text()) .addColumn("gender", DataType.cint()) .addColumn("interest", DataType.set(DataType.text())) .addColumn("phone", DataType.list(DataType.text())) .addColumn("education", DataType.map(DataType.text(), DataType.text())) .ifNotExists(); session.execute(create); } /** * 修改表 */ @Test public void alterTable() { //新增一个字段 SchemaStatement schemaStatement1 = SchemaBuilder .alterTable("girls", "student") .addColumn("email") .type(DataType.text()); session.execute(schemaStatement1); //修改一个字段 // SchemaStatement schemaStatement2 = SchemaBuilder.alterTable("girls", "student") // .alterColumn("email") // .type(DataType.varchar()); // session.execute(schemaStatement2); //删除一个字段 // SchemaStatement schemaStatement3 = SchemaBuilder // .alterTable("girls", "student") // .dropColumn("email"); // session.execute(schemaStatement3); } /** * 删除表 */ @Test public void removeTable(){ Drop drop = SchemaBuilder .dropTable("school", "student") .ifExists(); session.execute(drop); }
/** * 插入数据 */ @Test public void insertData() { //方法1 使用CQL // String insertCQL = "insert into liaochao001.student(name,age) values('文璺',20)"; //session.execute(insertCQL); //方法2 使用 Set<String> interestSet = new HashSet<>(); interestSet.add("看书"); interestSet.add("约p"); List<String> phoneList = new ArrayList<>(); phoneList.add("110"); phoneList.add("119"); Map<String,String> map = new HashMap<>(); map.put("小学","三河小学"); map.put("中学","来仪小学"); Set<String> addressSet = new HashSet<>(); addressSet.add("校园路129号"); Student student = Student .builder() .id(1012) .address(addressSet) .name("薛天霸") .age(19) .gender(1) .phone(phoneList) .interest(interestSet) .education(map) .email("123@qq.com") .build(); Mapper<Student> mapper = new MappingManager(session).mapper(Student.class); mapper.save(student); } /** * 查询数据 */ @Test public void queryData() { String queryCQL = "select * from liaochao001.student"; ResultSet rs = session.execute(queryCQL); List<Row> dataList = rs.all(); for (Row row : dataList) { System.out.println("===>name:" + row.getString("name")); System.out.println("===>age:" + row.getInt("age")); } } /** * 修改数据 */ @Test public void updateData() { //方法1 CQL String updateCQL = "update liaochao001.student set age =32 where name='zhangsan'"; session.execute(updateCQL); } /** * 删除数据 */ @Test public void deleteData() { String deleteCQL = "delete from liaochao001.student where name = 'zhangsan'"; session.execute(deleteCQL);
/** * 创建索引 */ @Test public void createIndex() { // SchemaStatement statement = SchemaBuilder // .createIndex("nameindex") // .onTable("school", "student") // .andColumn("name"); // session.execute(statement); //给map类型常见索引 SchemaStatement schemaStatement = SchemaBuilder.createIndex("educationindex") .onTable("school", "student") .andKeysOfColumn("education"); session.execute(schemaStatement); } /** * 删除索引 */ @Test public void dropIndex() { Drop mymap = SchemaBuilder.dropIndex("school","mymap"); session.execute(mymap); }
预编译statement的时候,Cassandra会解析query语句,缓存解析的结果并返回一个唯一的标志。当绑定并且执行预编译statement的时候,驱动只会发送这个标志,那么Cassandra就会跳过解析query语句的过程。
应保证query语句只应该被预编译一次,缓存PreparedStatement 到我们的应用中(PreparedStatement
是线程安全的);如果我们对同一个query语句预编译了多次,那么驱动输出印警告日志;
如果一个query语句只执行一次,那么预编译不会提供性能上的提高,反而会降低性能,因为是两次请求,那么此时可以考虑用 simple statement 来代替
/** * 预编译:数据量大时提高性能,数据量小会拖慢性能 * https://docs.datastax.com/en/developer/java-driver/3.0/manual/statements/prepared/ */ @Test public void batchPrepare() { BatchStatement batchStatement = new BatchStatement(); //预编译语句 PreparedStatement prepare = session.prepare("INSERT INTO school.student ( id,address, age , education, gender, interest, name, phone)VALUES(?,?,?,?,?,?,?,?)"); for (int i = 0; i < 100; i++) { Set<String> interestSet = new HashSet<>(); interestSet.add("看书"); interestSet.add("约炮"); List<String> phoneList = new ArrayList<>(); phoneList.add("0"+i+"110"); phoneList.add("0"+i+"119"); Map<String,String> map = new HashMap<>(); map.put("小学","三河小学"+i); map.put("中学","来仪小学"+i); Set<String> addressSet = new HashSet<>(); addressSet.add("校园路"+i+"号"); Student student = Student .builder() .id((1014+i)) .address(addressSet) .name("薛雅天"+i) .age(19+i) .gender(1) .phone(phoneList) .interest(interestSet) .education(map) .email("123@qq.com") .build(); //设置预编译占位符对应的值 BoundStatement boundStatement = prepare.bind(student.getId(), student.getAddress(), student.getAge(), student.getEducation(), student.getGender(), student.getInterest(), student.getName(), student.getPhone() ); //添加数据到批量操作Statement batchStatement.add(boundStatement); } //执行批量插入操作 session.execute(batchStatement); //情空 batchStatement.clear(); }
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。