`
wodentt
  • 浏览: 2439 次
  • 性别: Icon_minigender_1
  • 来自: 济南
最近访客 更多访客>>
文章分类
社区版块
存档分类
最新评论

Java实时获取oracle变更

 
阅读更多
http://www.iteye.com/topic/267893


在一个基于数据库的“实时系统”里面,“实时”获取数据库变化是非常重要的,本文主要描述通过Oracle中的捕获进程实时获取数据库变化。

背景:

       要做一个车辆GPS监控系统,主要分两块:

    1.采集。由GPS厂商提供实时数据,通过UDP包接收
    2.展示。前端程序获取到最新GPS数据后,在地图上模拟车辆的运行情况
      

备选方案:

     1.采集程序接收到UDP包并解析后,将数据放入数据库;前端程序轮询数据库以获取最新数据。
     2.采集程序接收到UDP包并解析后,将数据放入数据库,同时向前端程序发送一条消息,传递最新数据。

方案评估:



    方案1:最简单,最传统;增加不必要的数据库查询,并且非实时,轮询时间间隔不好确定。

    方案2:可实现“实时”,但增加采集程序职责,采集程序本不知道前端系统的存在。



最终方案:



    最后采取了另一种方案:通过oracle捕获进程捕获数据库变更(采集程序insert或update一条记录时,捕获进程即时获取到该条记录),将变更记录发送到AQ(oracle高级队列,JMS的oracle实现),前端程序只关注AQ,当有新消息到来时,即刻可收到并做相应处理,反映出实时状态。

    关于捕获进程,请参考《Streams概述》,《Streams捕获进程》



实现:



  

SQL代码


    创建表空间和用户:

   




Sql代码 
1.Create tablespace streams_tbs datafile 'E:\DBSERVER\ORACLE9I\ORADATA\TESTDB\stream_tbs.dbf' size 25M Reuse autoextend on maxsize unlimited; 
2. 
3.--修改目标表(要捕获变更的表)追加日志 
4.ALTER TABLE myoracle.TEST_GPS_STATUS ADD SUPPLEMENTAL LOG GROUP log_group_gpsstatus_pk (DEVICEID) ALWAYS; 
5. 
6. 
7.create user strmadmin identified by strmadmin default tablespace streams_tbs quota unlimited on streams_tbs; 
8. 
9. 
10.grant connect, resource, select_catalog_role to strmadmin; 




授予相应权限




Sql代码 
1.grant execute on dbms_aqadm to strmadmin; 
2. 
3.grant execute on dbms_capture_adm to strmadmin; 
4. 
5.grant execute on dbms_propagation_adm to strmadmin; 
6. 
7.grant execute on dbms_streams_adm to strmadmin; 
8. 
9.grant execute on dbms_apply_adm to strmadmin; 
10. 
11.grant execute on dbms_flashback to strmadmin; 
12. 
13.grant execute on dbms_aq to strmadmin; 
14. 
15.grant execute on dbms_aqjms to strmadmin; 
16. 
17.grant execute on dbms_aqin to strmadmin; 
18. 
19.grant execute on dbms_aqjms_internal to strmadmin; 
20.  



执行系统存储过程分配权限


   



Sql代码 
1.BEGIN 
2.DBMS_RULE_ADM.GRANT_SYSTEM_PRIVILEGE( 
3.privilege => DBMS_RULE_ADM.CREATE_RULE_SET_OBJ, 
4.grantee => 'strmadmin', 
5.grant_option => FALSE); 
6.END; 
7./ 
8. 
9. 
10. 
11.BEGIN 
12.DBMS_RULE_ADM.GRANT_SYSTEM_PRIVILEGE( 
13.privilege => DBMS_RULE_ADM.CREATE_RULE_OBJ, 
14.grantee => 'strmadmin', 
15.grant_option => FALSE); 
16.END; 
17./ 
18.  





以strmadmin帐户登录oracle




创建AQ,类型为JMS消息






Sql代码 
1.BEGIN 
2.   DBMS_AQADM.CREATE_QUEUE_TABLE( 
3.        Queue_table            => 'gpsstatus_queue_table', 
4.        Queue_payload_type     => 'SYS.AQ$_JMS_MESSAGE', 
5.        multiple_consumers  => false, 
6.        compatible             => '8.1.5'); 
7.   DBMS_AQADM.CREATE_QUEUE( 
8.      Queue_name          => 'gpsstatus_queue', 
9.      Queue_table         => 'gpsstatus_queue_table'); 
10.   DBMS_AQADM.START_QUEUE( 
11.      queue_name         => 'gpsstatus_queue'); 
12.END; 
13./ 
14.BEGIN 
15.DBMS_STREAMS_ADM.SET_UP_QUEUE( 
16.    queue_table  => 'gps_temp_queue_table', 
17.    queue_name   => 'gps_temp_queue'); 
18.END; 
19./ 




为目标表创建捕获进程






Sql代码 
1.BEGIN 
2.DBMS_STREAMS_ADM.ADD_TABLE_RULES( 
3.table_name => 'myoracle.TEST_GPS_STATUS', 
4.streams_type => 'capture', 
5.streams_name => 'capture_gps', 
6.queue_name => 'gps_temp_queue', 
7.include_dml => true, 
8.include_ddl => false); 
9.END; 
10./ 




初始化scn





Sql代码 
1.DECLARE 
2.iscn NUMBER; -- Variable to hold instantiation SCN value 
3.BEGIN 
4.iscn := DBMS_FLASHBACK.GET_SYSTEM_CHANGE_NUMBER(); 
5.DBMS_APPLY_ADM.SET_TABLE_INSTANTIATION_SCN( 
6.source_object_name => 'myoracle.TEST_GPS_STATUS', 
7.source_database_name => 'TESTdb', 
8.instantiation_scn => iscn); 
9.END; 
10./ 
11.  



为消息队列创建代理






Sql代码 
1.BEGIN 
2.DBMS_AQADM.CREATE_AQ_AGENT( 
3.agent_name => 'gpsstatus_agent'); 
4.DBMS_AQADM.ENABLE_DB_ACCESS( 
5.agent_name => 'gpsstatus_agent', 
6.db_username => 'strmadmin'); 
7.END; 
8./ 
9.DECLARE 
10.subscriber SYS.AQ$_AGENT; 
11.BEGIN 
12.subscriber := SYS.AQ$_AGENT('gpsstatus_agent', NULL, NULL); 
13.SYS.DBMS_AQADM.ADD_SUBSCRIBER( 
14.queue_name => 'strmadmin.gpsstatus_queue', 
15.subscriber => subscriber, 
16.rule => NULL, 
17.transformation => NULL); 
18.END; 
19./ 




创建存储过程以决定将哪些信息放到消息队列里面







Sql代码 
1.CREATE OR REPLACE PROCEDURE enq_gps_lcr(in_any IN SYS.ANYDATA) IS 
2.--agent sys.aq$_agent := sys.aq$_agent('gpsstatus_agent', null, 0); 
3.message sys.aq$_jms_message; 
4.enqueue_options dbms_aq.enqueue_options_t; 
5.message_properties dbms_aq.message_properties_t; 
6.msgid raw(16); 
7.lcr SYS.LCR$_ROW_RECORD; 
8.rc PLS_INTEGER; 
9.DEVICEID varchar2(11); 
10.GATHERDATETIME date; 
11.LONGITUDETYPE char(1); 
12.LONGITUDEVALUE number ; 
13.LATITUDETYPE char(1); 
14.LATITUDEVALUE number ; 
15.SPEED number ; 
16.DIRECTION number ; 
17.BEGIN 
18.rc := in_any.GETOBJECT(lcr); 
19.DEVICEID:=lcr.get_value('new','DEVICEID').ACCESSvarchar2(); 
20.GATHERDATETIME := lcr.GET_VALUE('new','GATHERDATETIME').ACCESSdate(); 
21.LONGITUDETYPE := lcr.GET_VALUE('new','LONGITUDETYPE').ACCESSchar(); 
22.LONGITUDEVALUE := lcr.GET_VALUE('new','LONGITUDEVALUE').ACCESSnumber(); 
23.LATITUDETYPE := lcr.GET_VALUE('new','LATITUDETYPE').ACCESSchar(); 
24.LATITUDEVALUE := lcr.GET_VALUE('new','LATITUDEVALUE').ACCESSnumber(); 
25.SPEED := lcr.GET_VALUE('new','SPEED').ACCESSnumber(); 
26.DIRECTION := lcr.GET_VALUE('new','DIRECTION').ACCESSnumber(); 
27.message := sys.aq$_jms_message.construct(1); 
28.--message.set_replyto(agent); 
29.message.set_type(''); 
30.message.set_userid('strmadmin'); 
31.message.set_appid(''); 
32.message.set_groupid(''); 
33.message.set_groupseq(''); 
34.message.set_string_property('DEVICEID', DEVICEID); 
35.message.set_string_property('GATHERDATETIME', to_char(GATHERDATETIME,'yyyy-MM-dd hh24:mi:ss')); 
36.message.set_string_property('LONGITUDETYPE', LONGITUDETYPE); 
37.message.set_string_property('LONGITUDEVALUE', to_char(LONGITUDEVALUE) ); 
38.message.set_string_property('LATITUDETYPE', LATITUDETYPE); 
39.message.set_string_property('LATITUDEVALUE', to_char(LATITUDEVALUE)); 
40.message.set_string_property('SPEED', to_char(SPEED) ); 
41.message.set_string_property('DIRECTION', to_char(DIRECTION) ); 
42.--指定消息生存时间 
43.message_properties.expiration:=60; 
44.dbms_aq.enqueue(queue_name => 'strmadmin.gpsstatus_queue', 
45.enqueue_options => enqueue_options, 
46.message_properties => message_properties, 
47.payload => message, 
48.msgid => msgid); 
49.COMMIT; 
50.END; 
51./ 




为目标表配置处理器






Sql代码 
1.BEGIN 
2.DBMS_APPLY_ADM.SET_DML_HANDLER( 
3.object_name => 'myoracle.TEST_GPS_STATUS', 
4.object_type => 'TABLE', 
5.operation_name => 'UPDATE',  --可配置为insert,update,delete等 
6.error_handler => false, 
7.user_procedure => 'strmadmin.enq_gps_lcr', 
8.apply_database_link => NULL); 
9.END; 
10./ 




设定参数及启动捕获进程








Sql代码 
1.BEGIN 
2.DBMS_STREAMS_ADM.ADD_TABLE_RULES( 
3.table_name => 'myoracle.TEST_GPS_STATUS', 
4.streams_type => 'apply', 
5.streams_name => 'apply_gps', 
6.queue_name => 'strmadmin.gps_temp_queue', 
7.include_dml => true, 
8.include_ddl => false, 
9.source_database => 'TESTdb'); 
10.END; 
11./ 
12.BEGIN 
13.DBMS_APPLY_ADM.SET_PARAMETER( 
14.apply_name => 'apply_gps', 
15.parameter => 'disable_on_error', 
16.value => 'n'); 
17.END; 
18./ 
19.BEGIN 
20.DBMS_APPLY_ADM.START_APPLY( 
21.apply_name => 'apply_gps'); 
22.END; 
23./ 
24.BEGIN 
25.DBMS_CAPTURE_ADM.START_CAPTURE( 
26.capture_name => 'capture_gps'); 
27.END; 
28./ 




至此,捕获进程配置完毕

可update一条myoracle.TEST_GPS_STATUS 中的记录,再查询gpsstatus_queue_table中是否有对应的一条记录。如果有,则配置成功。





下面是java处理代码,可直接使用JMS接口

本例使用oracle提供的API








Java代码 
1.QueueConnectionFactory queueConnectionFactory = null; 
2.QueueConnection queueConnection = null; 
3.QueueSession queueSession = null; 
4. 
5.Queue queue = null; 
6.QueueReceiver subscriber = null; 
7.Message message = null; 













Java代码 
1.log.info("开始连接 "); 
2.queueConnectionFactory = AQjmsFactory.getQueueConnectionFactory(ip,sid, port, "thin"); 
3.queueConnection = queueConnectionFactory.createQueueConnection(userName, password); 
4.log.info("创建Queue Connection 成功"); 
5.queueConnection.start(); 
6.log.info("connection started"); 
7.queueSession = queueConnection.createQueueSession(false, 
8.Session.AUTO_ACKNOWLEDGE); 
9..info("Queue session created"); 
10.queue = ((AQjmsSession) queueSession).getQueue(userName, queueName); 
11.log.info("Queue getted"); 
12.subscriber = queueSession.createReceiver(queue); 
13.log.info("初始化完成"); 




开始取消息,本例采取while(true)的方式获取消息。当没有消息的时候,线程会一直阻塞,直到有新的消息到来,立即取出。





   




Java代码 
1.while (true) { 
2.         message = subscriber.receive();//receive方法使没有新消息时,线程挂起 
3.      //do something... 
4.} 
5.  







最后:



      本文只是试图探求一种比较好的获取实时数据方法,并不适用于所有场合,但在处理实时告警,订单等方面,应该是有一定的用武之地,若结合comet等技术,完全可以实现真正的实时。
分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics