赞
踩
简而言之,最大的好处是可以直接在数据库服务器运行,减少了客户端和服务器端之间的网络连接、以及数据传输时间
场景设定是:现在有user表和organization表,两张表的数据量都非常大,比如说几个亿。现在需要搞一张宽表来存放user表的全部信息和user关联的organization表的organization_name。
我们使用PL/pgSQL中的Procedure来完成
根据上面的描述,我们即使不清楚PL/pgSQL要怎么写。我们也要知道我们的需求是什么。需求是把这有几个亿的两张表的数据搞到另外一张表里去。
我们刚才说到,PL/pgSQL是跑在数据库服务器上的程序,那么我们就要思考,对于这么大数据量的迁移工作,它的运行程序时间必然是非常长的,有可能它占用的IO也很高(因为几个亿的数据重新insert到一张表里去,数据库要不停的刷IO)。同时我们还要考虑到数据库服务器还要正常处理业务,不能对系统的业务产生影响。
怎么办?我们决定使用分时、分批次处理。也就是可以在凌晨期间,用户量相对少的时候,跑他个五六个小时。等到白天用户上来了,那我们就不运行这个迁移程序。
根据上面的思路,写出下面的Procedure
CREATE OR REPLACE PROCEDURE POSTGRES.REDUNDANT_USER_ORG_INFO() LANGUAGE 'plpgsql' AS $BODY$ DECLARE -- 声明变量 last_id bigint; row_num integer := 0; -- 声明游标 row_cursor CURSOR FOR SELECT t.*, org.organization_name FROM postgres.user t INNER JOIN postgres.organization org ON t.organization_id = org.id WHERE t.id > last_id ORDER BY t.id ASC ; BEGIN -- 查询同步记录表,查询上一次同步结束的ID并把查询结果赋值给last_id变量 SELECT last_sync_id INTO last_id FROM postgres.redundant_user_org_info_migration_record; -- 打印日志 RAISE NOTICE 'start init redundant data at % and last_id is %', now(), last_id; -- 循环提交 FOR row_rec IN row_cursor LOOP last_id := row_rec.id; BEGIN -- INSERT SQL INSERT INTO postgres.user_org_info ( id,user_name,organization_name,creation_date,last_updated ) VALUES ( row_rec.id,row_rec.user_name,row_rec.organization_name,row_rec.creation_date,clock_timestamp() ); row_num := row_num + 1; EXCEPTION -- 碰到唯一约束异常,直接忽略 WHEN unique_violation THEN -- do nothing, just ignore END; -- 每10万次提交一次事务,同时更新同步记录表 IF mod(row_num, 100000) = 0 THEN RAISE NOTICE 'reach 100000 records commit transaction at % and update redundant_user_org_info_migration_record and last_id is % ', now(), last_id; UPDATE postgres.redundant_user_org_info_migration_record SET last_sync_id = last_id; COMMIT; END IF; END LOOP; -- 最后结束时,更新同步记录表 RAISE NOTICE 'final commit transaction at % and update redundant_user_org_info_migration_record and last_id is %', now(), last_id; UPDATE postgres.redundant_contact_migration_record SET last_sync_id = last_id; COMMIT; RAISE NOTICE 'end init data...... at %', now(); END; $BODY$;
Procedure是没有返回值的,在这里示例中我们不需要什么返回值.所以使用Procedure
A procedure does not have a return value. A procedure can therefore end without a RETURN statement. If you wish to use a RETURN statement to exit the code early, write just RETURN with no expression.
从上面的实例程序中可以看得出来,基本结构是
DECLARE
变量名 变量类型 := 默认值;
BEGIN
-- 具体的SQL
-- 这里面还可以继续嵌套DECLARE-BEGIN-ENG结构
ENG;
处理上述变量的使用方式,示例程序中还使用了
SELECT 字段名 INTO 变量名 FROM 具体查询;
这种方式。因为我们是多次运行的程序,所以每次程序开始时都要知道上次的结束点在哪里。
上述SQL中查询还使用排序,所以这就会保证每次执行都不会重复扫描已处理过的数据。
你可能会说,那第一次查不到数据怎么办?有两种处理方式,一种是编程时考虑到这种情况,给他一个默认值。二是我们把迁移记录表里提前插入一条数据就可以了。这里使用的是第二种方式,也就是肯定能查出来数据。关于第一种方式,官方文档也有提到,可以去找一下文档
对应的官方文档
平时做过开发的同学应该有所了解,当使用jdbc从数据库里查询出特别大的结果集时,如果不进行流式读取,很可能把应用的JVM给撑爆。数据库游标也是同理
Rather than executing a whole query at once, it is possible to set up a cursor that encapsulates the query, and then read the query result a few rows at a time. One reason for doing this is to avoid memory overrun when the result contains a large number of rows.
如上所述,声明游标格式是
游标名字 CURSOR FOR 具体的查询语句;
一般来说,游标声明之后,还要打开游标(OPEN)、然后从移动游标从而在结果集中读取数据(FETCH)、处理完毕之后要关闭游标(CLOSE)
不过我们可以和FOR循环一起使用,上面的这些工作FOR循环内部都帮我们做了
FOR 行变量名(这里加入叫recordvar) IN 游标名字 LOOP
-- 具体业务SQL
END LOOP;
The FOR statement automatically opens the cursor, and it closes the cursor again when the loop exit
The variable recordvar is automatically defined as type record and exists only inside the loop (any existing definition of the variable name is ignored within the loop). Each row returned by the cursor is successively assigned to this record variable and the loop body is executed
对于FOR循环返回的行变量类型是record类型的
就像jdbc里的ResutSet一样,当处理查询结果集时,我们需要有一个变量能够访问结果集的每一行,从而可以访问每一行对应的字段。那么在PL/pgSQL中也有类似的东西,就是Row Type和Record Type.
name table_name%ROWTYPE;
name RECORD;
二者的区别是,Row Type一旦声明,那么这一行的每个字段的类型就固定了;而Row Record不一样,结果集的每条数据的每个字段的类型是这个变量的真正类型
Record variables are similar to row-type variables, but they have no predefined structure. They take on the actual row structure of the row they are assigned during a SELECT or FOR command. The substructure of a record variable can change each time it is assigned to
注意我们的提交逻辑,在FOR循环中是将每10万次提交一次事务的逻辑放在嵌套BEGIN-END的外面,也就是和BEGIN-END平级。
很多同学会觉得应该把这部分逻辑放到里面,从而造成执行Procedure时会报错。
同时对于事务,我们并没有主动开启事务,只是有一个COMMIT的提交
it is possible to end transactions using the commands COMMIT and ROLLBACK. A new transaction is started automatically after a transaction is ended using these commands, so there is no separate START TRANSACTION command
而且我们在外面又进行了一次事务的提交。这是因为,如果我们的数据是10万零一条,显然它是不会进入到IF中去,所以我们还是要最后再提交一次事务
在上述程序中,我们捕获了唯一约束异常,并且什么都没做。因为我们的数据可能会存在唯一约束异常,但是我们对于这些数据不care。所以针对这种情况,为了让程序继续执行,我们选择不回滚事务,让事务继续。
当然如果实际中,你的业务需要你进行回滚事务或其他操作,你可以在异常里面处理
对于的官方文档
可能实际中要处理的异常不止唯一约束异常,还有各种奇奇怪怪的异常。我们写程序的时候只有去官方文档里的附录去按图索骥就可以了
官方error code文档
我们使用Condition Name那一列就可以捕获异常了
关于日志打印,其实不光是记录程序执行的情况。更多的场景是,我们调试的时候更需要日志。比如,我们写了一个PL/pgSQL,执行的时候发现没有我们最终要的结果。我们又不知道程序里发生了什么,所以我们可以把自己需要的东西打印出来,看看哪里有问题。
一般格式采用示例程序里的写法就可以
对应的官方文档
到此为止,所有的语法、以及业务逻辑、以及为什么要这么写我们都明白并清楚了,程序也调试好了。那么接下来就是运行它
CALL POSTGRES.REDUNDANT_USER_ORG_INFO();
程序里的postgres是我自己创建的一个schema,你也可以自己创建一个,叫什么名字都可以。
截止到现在,我们基本完成了百分之90的工作,还有最后一个就是让这个Procedure分时运行。我们想到可不可以设置一个定时任务让它默认跑?类似于Linux下的crontab
这种事情,我们想到了,那我们的前辈们肯定也早考虑到了,使用pg_cron即可完成。具体使用,不在此篇博文中介绍
好了,到这一步,我们的所有需求都被很好的满足了。我们很开心。但是对于上面这个程序,我们可不可以再优化一下呢?
哪里优化?在描述里我提到,两张表都是好几亿的数据,同时程序里又使用了inner join。那么可想而知,两张几个亿的大表inner join,即使索引设计的不错,但是数据量还是很大,实际情况中,user表的数据列可能非常多,那么我们的SQL执行时间非常长。如果优化这一点呢?
我在示例程序里的SQL中有一个creation_data的字段,它属于user表,我们可以把一张大表认为的把它搞小。那就是分区,可以将user表按creation_date进行分区,比如说按年分区,那么我这个SQL每次处理一年的数据那应该很快了。
当然这只是一种思路,你可能会说,有可能我一年就几亿用户啊,这你怎么办?那我们只好再想其他办法来处理。不过就我实际工作遇到的情况,几个亿的用户数据,基本不可能在一年之内,可能都要四五年甚至六七年。
关于进一步优化就谈到这,这是说一个思路
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。