赞
踩
在程序监听数据库中数据变化做出及时响应的需求,很难找到一种可靠并且可行的成熟方案。很多情况,不惜牺牲性能,启动定时任务不停的扫描数据库来获取数据变更,这样既牺牲性能,而且无法做到及时监听数据变化。本文将针对如何在程序中及时监听数据库变化且不牺牲性能的前提下实现这样的需求。
本文将给出数据库主动发送Http请求通知应用程序;数据库主动执行Java代码;以及Mysql Salve伪装实时获取数据变化。
看完这篇文章,从此放弃Quartz、轮询、定时任务等方案!
关系型数据库的触发器可以再数据库insert、delete、update事件触发,此时再触发器中由数据库主动发送Http请求,而程序中只需要提供Http API 接口即可实现对数据库数据变更即时捕获,再也不用搞定时任务不断的扫表,不断的查询数据库,性能高、无延迟。
已经查明Oracle、SQL Server 支持 Http调用,MySQL5.1-5.5 需要在Linux环境下安装插件mysql-udf-http (暂不支持Windows)
@RestController public class TestController { @GetMapping("/getUser/{id}") public String getUser(@PathVariable("id") String id){ System.out.println("获取参数id="+id); return "ok"; } @PostMapping("/getUserPost/{id}") public String getUserPost(@PathVariable("id") String id, HttpServletRequest request) { String name = request.getParameter("name"); String age = request.getParameter("age"); System.out.println("Post 获取参数id="+id+",name="+name+",age="+age); return "ok"; } } # SpringBoot 配置文件 server.port=80 server.servlet.context-path=/
启动本机服务
这里需要说明本地实验的工具是DbVisual、Oracle版本是11g 11.2.0.1 ,分别使用到SYS用户和SCOTT用户。
已SYS用户登录Oracle数据服务,为实验用户SCOTT授权对UTL_HTTP包的执行权限,需要使用命令:
GRANT EXECUTE ON "UTL_HTTP" TO "SCOTT";
commit;
-- 查询ACL控制文件
SELECT * FROM resource_view WHERE any_path like '/sys/acls/%.xml';
-- 创建控制文件:DBMS_NETWORK_ACL_ADMIN.CREATE_ACL
-- 删除控制文件:DBMS_NETWORK_ACL_ADMIN.DROP_ACL( acl => 'www.xml');
这里需要使用到两条命令,分别是创建ACL文件和删除ACL文件。DBMS_NETWORK_ACL_ADMIN.CREATE_ACL 和DBMS_NETWORK_ACL_ADMIN.DROP_ACL。注意对于本文给出的命令执行需要在PL/SQL 块中执行,就是需要在BEGIN END 中执行,需要说明一点,本文使用的工具是DbVisual 所以执行代码块使用了符号“–/”和 “ /”包裹代码块,这只是DbVisual工具的特性要求,其他工具请忽视。
--/ BEGIN -- 创建文件 dbms_network_acl_admin.create_acl( acl => 'data_exchange_server_acl.xml', -- 新文件名 DESCRIPTION => 'Normal Access', principal => 'CONNECT', -- 赋予角色 CONNECT is_grant => TRUE, PRIVILEGE => 'connect', start_date => NULL, end_date => NULL); commit; END; / --/ BEGIN -- 赋予权限 dbms_network_acl_admin.add_privilege( acl => 'data_exchange_server_acl.xml', principal => 'SCOTT',-- 赋予权限的大写用户名 is_grant => TRUE, privilege => 'connect', start_date => null, end_date => null); commit; END; /
可以使用命令查询DBA_NETWORK_ACL_PRIVILEGES ,对访问控制列表的管理可以借助DBMS_NETWORK_ACL_ADMIN.ASSIGN_ACL和DBMS_NETWORK_ACL_ADMIN.UNASSIGN_ACL来完成增加和删除。
select * from dba_network_acl_privileges;
-- 增加访问控制: DBMS_NETWORK_ACL_ADMIN.ASSIGN_ACL(acl => 'www.xml',host => '*.qq.com'); -- 删除访问控制: DBMS_NETWORK_ACL_ADMIN.UNASSIGN_ACL(host => 'www.qq.com'); --/ BEGIN dbms_network_acl_admin.assign_acl( acl => 'data_exchange_server_acl.xml', -- 如果使用本机测试的话,不要写localhost 或 127.0.0.1 -- 一定写IP地址 -- 域名,ip地址,ip地址和域名允许使用通配符 -- 通配符可以是 192.168.1.* 或 *.baidu.com host => '192.168.1.1', lower_port => 80, -- 指定端口号 upper_port => NULL); commit; END; /
-- 查看配置的信息
select * from SYS.DBA_NETWORK_ACLS;
至此,我们已经完成对SCOTT用户使用UTL_HTTP包的前置准备工作,下面切换用户SCOTT登录Oracle服务。
SCOTT用户登录Oracle服务,创建函数完成对HTTP 的GET请求。此例中是最基本的GET请求,需要说明,虽然在设置ACL访问控制列表时可以对域名、I P设置通配符,但是经过实际测试 *.com * 是不支持url传参的例如:“www.test.com?a=1&b=2”这种形式的。但是像Rest方式的 “www.test.com/a”、“www.test.com/a/b/c” 是支持的
对HTTP请求返回的的解析,注意毕竟是数据库,不要幻想对HTTP支持有多么强大,对接口返回JSON、XML格式的相应需要自己想办法解析。所以业务设计中针对数据库请求的HTTP接口无论从请求参数还是返回数据都应该尽量简单。
HTTPS如何请求,这个ORACLE 是支持的,需要先设置SSL 证书。可以参考:
https://oracle-base.com/articles/misc/configure-tcpip-with-ssl-and-tls-for-database-connections
https://docs.oracle.com/cd/E11882_01/network.112/e40393/asoappf.htm#ASOAG9835
CREATE OR REPLACE FUNCTION "SCOTT"."HTTP_GET"(url varchar2) RETURN CLOB AS BEGIN DECLARE -- 声明HttpRequest request UTL_HTTP.REQ; -- 声明HttpResponse response UTL_HTTP.RESP; v_readline CLOB; v_html CLOB; BEGIN v_html := ''; BEGIN -- 访问指定的网址,并指定第二个参数为 GET 请求。另外还可以设置第三个参数来决定使用协议 HTTP 1.0还是 HTTP1.1 request := UTL_HTTP.BEGIN_REQUEST (url,'GET'); -- 常规设置字符集 UTL_HTTP.SET_BODY_CHARSET('UTF-8'); -- 常规设置Request Head 可根据具体场景决定,这里请求网页。 UTL_HTTP.SET_HEADER(request, 'Content-Type', 'text/html;charset=utf-8'); -- 开始请求并获取返回 response := UTL_HTTP.GET_RESPONSE ( request ); -- 逐行读取相应信息 LOOP UTL_HTTP.READ_LINE ( response, v_readline, TRUE ); v_html := v_html || v_readline; END LOOP; -- 关闭response UTL_HTTP.END_RESPONSE(response); -- 关闭request UTL_HTTP.END_REQUEST(request); -- 异常处理 EXCEPTION WHEN UTL_HTTP.END_OF_BODY THEN UTL_HTTP.END_RESPONSE (response); WHEN OTHERS THEN UTL_HTTP.END_RESPONSE(response); UTL_HTTP.END_REQUEST(request); END; -- 返回全部相应 return v_html; END; END;
执行函数HTTP_GET()向本机发送请求,后台输入打印日志,并返回“ok” 数据库接收并且解析。
select HTTP_GET('http://192.168.1.1/getUser/1') from dual;
CREATE OR REPLACE FUNCTION "SCOTT"."HTTP_POST" (url IN VARCHAR2) RETURN CLOB AS BEGIN DECLARE request UTL_HTTP.REQ; response UTL_HTTP.RESP; v_readline CLOB; v_html CLOB; v_params varchar(300); BEGIN v_html :=''; BEGIN request := UTL_HTTP.BEGIN_REQUEST(url,'POST'); UTL_HTTP.SET_BODY_CHARSET('UTF-8'); UTL_HTTP.SET_HEADER(request, 'Content-Type', 'application/x-www-form-urlencoded;charset=UTF-8'); -- 中文参数要转义 v_params := 'age=20&name=' || utl_url.escape('中国zhangsan', true, 'UTF8') || '&sex=1'; -- 先设置Content-Length 再设置参数, 注意使用 LENGTHB 而不是 LENGTH UTL_HTTP.SET_HEADER(request,'Content-Length', LENGTHB(v_params)); UTL_HTTP.WRITE_TEXT(request,v_params); --发送请求获取返回 response := UTL_HTTP.GET_RESPONSE(request); LOOP UTL_HTTP.READ_LINE(response, v_readline, TRUE ); v_html := v_html || v_readline; END LOOP; UTL_HTTP.END_RESPONSE(response); UTL_HTTP.END_REQUEST(request); EXCEPTION WHEN UTL_HTTP.END_OF_BODY THEN UTL_HTTP.END_RESPONSE (response); WHEN OTHERS THEN UTL_HTTP.END_RESPONSE(response); UTL_HTTP.END_REQUEST(request); END; return v_html; END; END;
一定要注意,这里有几个坑,
1.中文参数乱码问题,需要使用UTL_URL.ESCAPE(‘中文’,true,‘UTF8’)转义后拼接参数,而不能使用UTL_URL.ESCAPE将整个参数字符串转义后再调用UTL_HTTP.WRITE_TEXT(request,v_params),这样后台无法获取参数。
2.一定要先设置Content-Length后再调用UTL_HTTP.WRITE_TEXT(request,v_params),顺序不能错,否则运行时报错。
3.设置Content-Length 再设置参数, 注意使用 LENGTHB 而不是 LENGTH具体的,可以使用如下命令查看本地字符集判断中文占用字节数:
select userenv('language') from dual
SIMPLIFIED CHINESE_CHINA.ZHS16GBK 一个汉字占用2各字节
SIMPLIFIED CHINESE_CHINA.AL32UTF8 一个汉字占用3各字节
select LENGTHB('中文') from dual;
执行函数HTTP_POST()向本机发送请求,后台输入打印日志,并返回“ok” 数据库接收并且解析。
如果不希望使用UTL_HTTP或因其他原因不能使用UTL_HTTP的,还可以使用Oracle支持的Java方式访问Http。
这里可以阅读Oralce的官方文档
https://docs.oracle.com/cd/B28359_01/java.111/b31225/appendixa.htm
https://docs.oracle.com/javase/1.5.0/docs/guide/security/permissions.html
需要在前面做的工作是检查当前用户所拥有的Java权限。请先查看Oracle数据库支持的Java相关的角色,并确定当前用户具有的Java角色,保证用户具备必须的权限。
权限这一块网上资料不全,有些还有错误,一般关注:
java.security.AllPermission 暗含所有其他权限
java.net.SocketPermission 网络访问权限
java.io.FilePermission 文件访问权限
要理解Oracle官方定义的PROCEDURE grant_permission(grantee VARCHAR2, permission_type VARCHAR2, permission_name VARCHAR2, permission_action VARCHAR2);
grantee :大写用户名
permission_type包含:
java.security.AllPermission 暗含所有其他权限
java.net.SocketPermission 网络访问权限
java.io.FilePermission 文件访问权限
java.security.SecurityPermission
java.security.UnresolvedPermission
java.awt.AWTPermission
java.io.SerializablePermission
java.lang.reflect.ReflectPermission
java.lang.RuntimePermission
java.net.NetPermission
java.sql.SQLPermission
java.util.PropertyPermission
java.util.logging.LoggingPermission
javax.net.ssl.SSLPermission
javax.security.auth.AuthPermission
javax.security.auth.PrivateCredentialPermission
javax.security.auth.kerberos.DelegationPermission
javax.security.auth.kerberos.ServicePermission
javax.sound.sampled.AudioPermission
permission_name和permission_action 根据permission_type不同而不同。
JAVADEBUGPRIV JAVA调试权限
JAVAIDPRIV JAVA ID权限
JAVASYSPRIV JAVA系统权限
JAVAUSERPRIV JAVA用户权限
JAVA_ADMIN JAVA管理员
JAVA_DEPLOY JAVA调度
select * from dba_roles where role like '%JAVA%'; JAVADEBUGPRIV JAVA调试权限 JAVAIDPRIV JAVA ID权限 JAVASYSPRIV JAVA系统权限 JAVAUSERPRIV JAVA用户权限 JAVA_ADMIN JAVA管理员 JAVA_DEPLOY JAVA调度 -- 授权用户 GRANT JAVASYSPRIV TO SCOTT; GRANT JAVAUSERPRIV TO SCOTT; GRANT JAVADEBUGPRIV TO SCOTT; GRANT JAVAIDPRIV TO SCOTT; GRANT JAVA_DEPLOY TO SCOTT; GRANT JAVA_ADMIN TO SCOTT; -- DBMS.GRANT_PERMISSION 的正确使用姿势,网上的文章大多错误,片面。 -- 官方定义: -- PROCEDURE grant_permission(grantee VARCHAR2, permission_type VARCHAR2, permission_name VARCHAR2, permission_action VARCHAR2) -- 参数意义: -- grantee 大写用户名 -- permission_type 权限类型,可参考下面列举权限(实际根据需要设置) -- property_name 权限名,不同权限类型,该参数设置有所不同 -- permission_action 根据权限有所不同,文件权限:read,write,execute,delete ,网络权限:accept,connect,listen,resolve -- dbms_java.grant_permission 常见权限名: java.security.AllPermission 暗含所有其他权限 java.net.SocketPermission 网络访问权限 java.io.FilePermission 文件访问权限 java.security.SecurityPermission java.security.UnresolvedPermission java.awt.AWTPermission java.io.SerializablePermission java.lang.reflect.ReflectPermission java.lang.RuntimePermission java.net.NetPermission java.sql.SQLPermission java.util.PropertyPermission java.util.logging.LoggingPermission javax.net.ssl.SSLPermission javax.security.auth.AuthPermission javax.security.auth.PrivateCredentialPermission javax.security.auth.kerberos.DelegationPermission javax.security.auth.kerberos.ServicePermission javax.sound.sampled.AudioPermission -- 实用赋权案例: -- 对文件或目录的访问权限 -- property_name 参数: -- 以/*结尾表示目录下所有文件不递归 -- 以/-结尾表示递归所有文件和子目录 -- <<ALL FILES>>匹配任何文件 -- permission_action :read,write,execute,delete call dbms_java.grant_permission('SCOTT', 'java.io.FilePermission', '<<ALL FILES>>', 'read ,write, execute,delete' ); -- 网络访问权限 -- property_name 参数: -- test.com, 192.168.1.1:8080 , *.test.com 使用*必须位于最左侧如:“*.test.com”而不是“y.test.*” -- 192.168.1.1:8080- 表示端口从8080到65535 -- 192.168.1.1:-8080 表示端口从1到8080 -- permission_action :accept,connect,listen,resolve call dbms_java.grant_permission('SCOTT', 'java.net.SocketPermission', '10.1.3.231:-9999', 'accept,connect,listen,resolve');
//使用工具编写并测试代码保证没有错误,能正确运行! //仅使用JDK自带类,不要使用三方库 //注意:因为是在Oracle中运行 //代码复制到Oracle时,删除package com.example.demo; package com.example.demo; import java.io.*; import java.net.HttpURLConnection; import java.net.URL; /** * @remark 类:Oracle 中执行的Http连接类 */ public class HttpConnection { public static String httpGet(String url,String param,Integer requestTimeOut){ return request(url,param,0,requestTimeOut); } public static String httpPost(String url,String param,Integer requestTimeOut){ return request(url,param,1,requestTimeOut); } /** * 用来发送Http请求 * @param url 请求地址 * @param param 请求参数 * @param httpMethod http请求方法 0表示GET请求,1表示POST * @param requestTimeOut 请求超时毫秒数 * @return 响应 */ private static String request(String url,String param,Integer httpMethod,Integer requestTimeOut) { HttpURLConnection httpConn=null; ByteArrayOutputStream byteArrayOutputStream = null; BufferedInputStream bufferedInputStream = null; String response = ""; try { httpConn = (HttpURLConnection) new URL(url).openConnection(); httpConn.setReadTimeout(requestTimeOut); httpConn.setRequestProperty("Content-Type", "application/x-www-form-urlencoded; charset=UTF-8"); httpConn.setRequestProperty("Charset", "UTF-8"); httpConn.setDoOutput(true); httpConn.setDoInput(true); httpConn.setUseCaches(false); //设置post请求方式 if(httpMethod==1) { httpConn.setRequestMethod("POST"); }else if(httpMethod==0){ httpConn.setRequestMethod("GET"); } //连接 httpConn.connect(); //发送参数 if(param!=null && param.trim().equals("")==false) { OutputStream outputStream = new BufferedOutputStream(httpConn.getOutputStream()); byte[] body = param.trim().getBytes("utf-8"); outputStream.write(body, 0, body.length); outputStream.flush(); outputStream.close(); } //获取返回 int statCode = httpConn.getResponseCode(); if (200 == statCode) { if (httpConn.getContentLength() > 0) { bufferedInputStream = new BufferedInputStream(httpConn.getInputStream()); byteArrayOutputStream = new ByteArrayOutputStream(); byte[] b = new byte[httpConn.getContentLength()]; int length=0; while ((length = bufferedInputStream.read(b)) > 0) { byteArrayOutputStream.write(b, 0, length); } response = byteArrayOutputStream.toString("utf-8"); } }else{ response = ""+statCode; } } catch (IOException e) { e.printStackTrace(); } finally { try { if (byteArrayOutputStream != null) { byteArrayOutputStream.close(); } if(bufferedInputStream!=null){ bufferedInputStream.close(); } if ( httpConn != null) { httpConn.disconnect(); } } catch (Exception e) { e.printStackTrace(); } } return response; } }
使用如下脚本在Oracle中创建并编译Java source ,可通过命令查看:
select ***** from dba_objects where object_type**=**‘JAVA CLASS’ AND OBJECT_NAME = ‘HttpConnection’
CREATE OR REPLACE AND COMPILE java source named HttpConnection AS import java.io.*; import java.net.HttpURLConnection; import java.net.URL; public class HttpConnection { public static String httpGet(String url,String param,Integer requestTimeOut){ return request(url,param,0,requestTimeOut); } public static String httpPost(String url,String param,Integer requestTimeOut){ return request(url,param,1,requestTimeOut); } private static String request(String url,String param,Integer httpMethod,Integer requestTimeOut) { HttpURLConnection httpConn=null; ByteArrayOutputStream byteArrayOutputStream = null; BufferedInputStream bufferedInputStream = null; String response = ""; try { httpConn = (HttpURLConnection) new URL(url).openConnection(); httpConn.setReadTimeout(requestTimeOut); httpConn.setRequestProperty("Content-Type", "application/x-www-form-urlencoded; charset=UTF-8"); httpConn.setRequestProperty("Charset", "UTF-8"); httpConn.setDoOutput(true); httpConn.setDoInput(true); httpConn.setUseCaches(false); if(httpMethod==1) { httpConn.setRequestMethod("POST"); }else if(httpMethod==0){ httpConn.setRequestMethod("GET"); } httpConn.connect(); if(param!=null && param.trim().equals("")==false) { OutputStream outputStream = new BufferedOutputStream(httpConn.getOutputStream()); byte[] body = param.trim().getBytes("utf-8"); outputStream.write(body, 0, body.length); outputStream.flush(); outputStream.close(); } int statCode = httpConn.getResponseCode(); if (200 == statCode) { if (httpConn.getContentLength() > 0) { bufferedInputStream = new BufferedInputStream(httpConn.getInputStream()); byteArrayOutputStream = new ByteArrayOutputStream(); byte[] b = new byte[httpConn.getContentLength()]; int length=0; while ((length = bufferedInputStream.read(b)) > 0) { byteArrayOutputStream.write(b, 0, length); } response = byteArrayOutputStream.toString("utf-8"); } }else{ response = ""+statCode; } } catch (IOException e) { e.printStackTrace(); } finally { try { if (byteArrayOutputStream != null) { byteArrayOutputStream.close(); } if(bufferedInputStream!=null){ bufferedInputStream.close(); } if ( httpConn != null) { httpConn.disconnect(); } } catch (Exception e) { e.printStackTrace(); } } return response; } }
创建函数发起对Oracle中编译的java代码的调用,从创建函数的脚本不难看出,Oracle函数的参数声明与Java方法参数声明的顺序,类型一致;Oracle函数的返回值类型与Java方法的返回值类型一致。
下面脚本中对java方法的调用语法:‘类名.方法名(参数1类型,参数2类型,参数3类型) return 返回值类型’
-- 对java 访问Http Get请求的静态方法调用函数 CREATE OR REPLACE FUNCTION "SCOTT"."HTTP_GET_JAVA"(url VARCHAR2, param VARCHAR2 ,requestTimeOut number) RETURN VARCHAR2 AS LANGUAGE java name 'HttpConnection.httpGet(java.lang.String,java.lang.String,java.lang.Integer) return java.lang.String'; -- 对java 访问http Post请求的静态方法调用函数 CREATE OR REPLACE FUNCTION "SCOTT"."HTTP_POST_JAVA"(url VARCHAR2, param VARCHAR2 ,requestTimeOut number) RETURN VARCHAR2 AS LANGUAGE java name 'HttpConnection.httpPost(java.lang.String,java.lang.String,java.lang.Integer) return java.lang.String';
启动java服务
观察运行结果
笔者实际测试使用的数据库版本是2008 R2
OLE对象调用:OLE从多媒体借鉴而来,是Windows的一组服务功能,提供了一种以源于不同应用软件的信息建立复合文档的强有力方法。Sql Server中常用的OLE调用如下:
-- EXEC sp_OACreate 'WinHttp.WinHttpRequest.5.1', @obj OUT
-- EXEC sp_OACreate 'Scripting.FileSystemObject', @fso OUT
-- EXEC SP_OACreate 'Msxml2.ServerXMLHTTP.3.0', @object OUT
-- Exec sp_OACreate 'MSXML2.XMLHTTP', @Object OUT
exec sp_configure 'show advanced options',1;
go
reconfigure;
go
sp_configure 'Ole Automation Procedures',1;
go
reconfigure;
go
-- HTTP GFT请求 IF EXISTS (select * from sysobjects where name='HTTP_GET' and type='FN') DROP function HTTP_GET GO CREATE FUNCTION [dbo].[HTTP_GET] (@url VARCHAR(500)) RETURNS VARCHAR(8000) AS BEGIN DECLARE @status int, -- 状态 @statusText varchar(1000), @responseText varchar(8000), @objectID int, @hResult int, @len int EXEC @hResult = sp_OACreate 'MSXML2.ServerXMLHTTP', @objectID OUT EXEC @hResult = sp_OAMethod @objectID, 'open', null, 'GET', @url, false -- EXEC @hResult = sp_OAMethod @objectID, 'setRequestHeader', null, 'Content-Type', 'text/xml;charset=UTF-8' EXEC SP_OAMethod @objectID,'setRequestHeader',null,'Content-Type','application/x-www-form-urlencoded;charset=UTF-8' EXEC @hResult = sp_OAMethod @objectID, 'send', null -- EXEC sp_OAGetProperty @objectID, 'StatusText', @statusText out -- EXEC sp_OAGetProperty @objectID, 'Status', @status out EXEC sp_OAGetProperty @objectID, 'responseText', @responseText out exec sp_OADestroy @objectID RETURN @responseText END GO
-- HTTP POST 请求 IF EXISTS (select * from sysobjects where name='HTTP_POST' and type='FN') DROP function HTTP_POST GO CREATE FUNCTION [dbo].[HTTP_POST] (@url VARCHAR(500),@requestParams varchar(4000)) RETURNS VARCHAR(8000) AS BEGIN DECLARE @status int, -- 状态 @statusText varchar(1000), @responseText varchar(8000), @objectID int, @hResult int, @len int EXEC @hResult = sp_OACreate 'MSXML2.ServerXMLHTTP', @objectID OUT EXEC @hResult = sp_OAMethod @objectID, 'open', null,'POST', @url,'false' EXEC SP_OAMethod @objectID,'setRequestHeader',null,'Content-Type','application/x-www-form-urlencoded;charset=UTF-8' set @len = len(@requestParams) EXEC @hResult = sp_OAMethod @objectID, 'setRequestHeader',null,'Content-Length', @len EXEC sp_OAMethod @objectID, 'send', null,@requestParams EXEC sp_OAGetProperty @objectID, 'StatusText', @statusText out EXEC sp_OAGetProperty @objectID, 'Status', @status out EXEC sp_OAGetProperty @objectID, 'responseText', @responseText out exec sp_OADestroy @objectID RETURN @responseText END GO
阿里开源Canel原理:
需要安装Canel服务伪装成Mysql Slave节点,通过读取Mysql bin log实现数据监听。
下载地址:https://github.com/alibaba/canal/releases
文档地址:https://github.com/alibaba/canal/wiki/QuickStart
对于自建 MySQL , 需要先开启 Binlog 写入功能,配置 binlog-format 为 ROW 模式,my.cnf 中配置如下。修改完成后重启MySql服务。
[mysqld]
log-bin=mysql-bin # 开启 binlog
binlog-format=ROW # 选择 ROW 模式
server_id=1 # 配置 MySQL replaction 需要定义,不要和 canal 的 slaveId 重复
授权 canal 链接 MySQL 账号具有作为 MySQL slave 的权限, 如果已有账户可直接 grant
CREATE USER canal IDENTIFIED BY 'canal';
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
-- GRANT ALL PRIVILEGES ON *.* TO 'canal'@'%' ;
FLUSH PRIVILEGES;
由于测试时机器中JDK 1.7 导致无法正常启动,需要手动修改指定JDK 安装目录:
## set java path
if [ -z "$JAVA" ] ; then
# JAVA=$(which java)
JAVA=/yourpath/jdk1.8.0_261/bin/java
fi
启动/canal/bin/startup.sh
停止/canal/bin/ stop.sh
添加依赖: <dependency> <groupId>com.alibaba.otter</groupId> <artifactId>canal.client</artifactId> <version>1.1.0</version> </dependency> 编写客户端监听代码: //canal 服务连接参数 static String ip ="10.1.1.243"; static int port =11111;//默认端口 static String destination ="example"; static String username ="canal"; static String password ="canal"; /** * client 数据监听 * @param args */ public static void main(String args[]) { // 创建链接 CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress( ip, port), destination, username, password); // 获取指定数量的数据 int batchSize = 1000; try { //连接 connector.connect(); //监听的数据库、表 // connector.subscribe(".*\\..*"); connector.subscribe("sys\\..*"); connector.rollback(); while (true) { Message message = connector.getWithoutAck(batchSize); long batchId = message.getId(); int size = message.getEntries().size(); if (batchId == -1 || size == 0) { //没有数据变更 try { System.out.println("当前没有数据变更 sleep"); Thread.sleep(1000); } catch (InterruptedException e) { } } else { //数据变更 //打印监听到的数据变更(inser、update、delete) printEntry(message.getEntries()); } // 提交确认 connector.ack(batchId); // 处理失败, 回滚数据 // connector.rollback(batchId); } } finally { //销毁连接 connector.disconnect(); } } /** * 打印数据行 * @param entrys */ private static void printEntry(List<Entry> entrys) { for (Entry entry : entrys) { if (entry.getEntryType() == EntryType.TRANSACTIONBEGIN || entry.getEntryType() == EntryType.TRANSACTIONEND) { continue; } //数据行变更 RowChange rowChage = null; try { rowChage = RowChange.parseFrom(entry.getStoreValue()); } catch (Exception e) { throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:" + entry.toString(), e); } //数据行变更类型 EventType eventType = rowChage.getEventType(); System.out.println(String.format("================> binlog[%s:%s] , 数据库:%s,表:%s] , 数据变更类型 : %s", entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(), entry.getHeader().getSchemaName(), entry.getHeader().getTableName(), eventType)); //遍历数据 for (RowData rowData : rowChage.getRowDatasList()) { //删除行 if (eventType == EventType.DELETE) { printColumn(rowData.getBeforeColumnsList()); } else if (eventType == EventType.INSERT) { //插入行 printColumn(rowData.getAfterColumnsList()); } else { //更新行 //更新之前的数据 System.out.println("-------> 变更前数据:"); printColumn(rowData.getBeforeColumnsList()); //更新之后的数据 System.out.println("-------> 变更后数据:"); printColumn(rowData.getAfterColumnsList()); } } } } /** * 打印数据 column * @param columns */ private static void printColumn(List<Column> columns) { for (Column column : columns) { System.out.println(column.getName() + " : " + column.getValue() + " update=" + column.getUpdated()); } }
当前没有数据变更 sleep ================> binlog[mysql-bin.000001:9472782] , 数据库:sys,表:test] , 数据变更类型 : UPDATE -------> 变更前数据: age : 55555 update=false fix : 1 update=false -------> 变更后数据: age : 55555 update=false fix : 2 update=true 当前没有数据变更 sleep 当前没有数据变更 sleep ================> binlog[mysql-bin.000001:9485376] , 数据库:sys,表:test] , 数据变更类型 : INSERT age : 1 update=true fix : 1 update=true 当前没有数据变更 sleep 当前没有数据变更 sleep ================> binlog[mysql-bin.000001:9488377] , 数据库:sys,表:test] , 数据变更类型 : DELETE age : 1 update=false fix : 1 update=false 当前没有数据变更 sleep
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。