Configuring Oracle Data Queue for ANYDATA Payload
In this guide, we will walk through the process of creating and configuring an Oracle data queue for an ANYDATA payload. This setup allows seamless data transfer between two databases (DB1 and DB2) while ensuring data integrity and type flexibility.
Prerequisites
Two databases (DB1 and DB2) with connectivity.
The global_name parameter must be enabled on both databases.
A database link must be created between DB1 and DB2.
Step 1: Enable Global Name Parameter
Run the following command as SYSDBA to check the status of the global_name parameter:
SHOW PARAMETER global_name;
If the value is FALSE, set it to TRUE using:
ALTER SYSTEM SET global_name = TRUE;
Step 2: Create a Database Link
Create a database link from DB1 to DB2 for communication:
CREATE DATABASE LINK "DB2"
CONNECT TO "STRMADMIN"
IDENTIFIED BY "strmadmin"
USING '(DESCRIPTION =(ADDRESS = (PROTOCOL = TCP)(HOST = 192.168.1.1)(PORT = 1521))(CONNECT_DATA = (SID=ORCL)))';
Test the connection:
SELECT * FROM global_name@DB2;
If successful, this should return the global_name of the destination database (DB2).
Step 3: Create a User-Defined Type
The following user-defined type must be created on both databases:
CREATE OR REPLACE EDITIONABLE TYPE STRMADMIN.MYTYPE AS OBJECT (
ID NUMBER,
REMARK VARCHAR2(100 BYTE)
);
/
Step 4: Configure the Queue System
Drop Existing Queues (If Any)
Before creating a new queue, ensure that any existing queues are stopped and removed:
BEGIN
DBMS_AQADM.STOP_QUEUE(queue_name => 'strmadmin.test_q');
DBMS_AQADM.DROP_QUEUE(queue_name => 'strmadmin.test_q');
DBMS_AQADM.DROP_QUEUE_TABLE(queue_table => 'strmadmin.test_qt');
END;
/
If the above commands do not work, remove the queue using:
BEGIN
DBMS_STREAMS_ADM.REMOVE_QUEUE(
queue_name => 'strmadmin.test_q',
cascade => TRUE,
drop_unused_queue_table => TRUE);
END;
/
Create a Queue on DB1
BEGIN
DBMS_AQADM.create_queue_table(
queue_table => 'test_qt',
queue_payload_type => 'SYS.ANYDATA',
multiple_consumers => TRUE
);
DBMS_AQADM.create_queue(
queue_name => 'test_q',
queue_table => 'test_qt'
);
DBMS_AQADM.start_queue(queue_name => 'test_q');
END;
/
Create Propagation on DB1
Propagation should be created after the queue is established on both sides.
BEGIN
DBMS_PROPAGATION_ADM.CREATE_PROPAGATION(
propagation_name => 'TEST_PROP',
source_queue => 'strmadmin.test_q',
destination_queue => 'strmadmin.test_q',
destination_dblink => 'DB2',
queue_to_queue => TRUE
);
END;
/
Create a Queue on DB2 (for Receiving Messages)
BEGIN
DBMS_AQADM.create_queue_table(
queue_table => 'test_qt',
queue_payload_type => 'SYS.ANYDATA',
multiple_consumers => TRUE
);
DBMS_AQADM.create_queue(
queue_name => 'test_q',
queue_table => 'test_qt'
);
DBMS_AQADM.start_queue(queue_name => 'test_q');
DBMS_AQADM.add_subscriber(
queue_name => 'test_q',
subscriber => SYS.AQ$_AGENT('RECIPIENT', NULL, NULL)
);
END;
/
Step 5: Testing the Queue System
Enqueue a Message from DB1
declare
enqueue_options DBMS_AQ.ENQUEUE_OPTIONS_T;
message_properties DBMS_AQ.MESSAGE_PROPERTIES_T;
message_handle RAW(16);
MESSAGE STRMADMIN.MYTYPE;
msg ANYDATA;
begin
MESSAGE := STRMADMIN.MYTYPE(1, 'TEST');
msg := ANYDATA.CONVERTOBJECT(MESSAGE);
DBMS_AQ.ENQUEUE(
queue_name => 'STRMADMIN.test_q',
enqueue_options => enqueue_options,
message_properties => message_properties,
payload => msg,
msgid => message_handle
);
COMMIT;
end;
/
Dequeue Manually from DB2
DECLARE
dequeue_options DBMS_AQ.dequeue_options_t;
message_properties DBMS_AQ.message_properties_t;
msgid RAW(16);
MESSAGE STRMADMIN.MYTYPE;
msg ANYDATA;
BEGIN
FOR rec IN (SELECT * FROM test_qt)
LOOP
dequeue_options.msgid := rec.msgid;
dequeue_options.consumer_name := 'RECIPIENT';
DBMS_AQ.DEQUEUE(
queue_name => 'STRMADMIN.TEST_Q',
dequeue_options => dequeue_options,
message_properties => message_properties,
payload => msg,
msgid => msgid
);
IF msg.GETTYPENAME() = 'STRMADMIN.MYTYPE' THEN
rc := msg.GETOBJECT(MESSAGE);
DBMS_OUTPUT.PUT_LINE('ID: ' || MESSAGE.ID);
DBMS_OUTPUT.PUT_LINE('REMARK: ' || MESSAGE.REMARK);
END IF;
END LOOP;
COMMIT;
EXCEPTION
WHEN OTHERS THEN ROLLBACK;
END;
/
Step 6: Register a Callback Procedure
If you want automatic message dequeuing upon arrival, register a callback procedure:
CREATE OR REPLACE PROCEDURE DEQUEUE_MSG_CB(
context IN RAW,
reginfo IN SYS.AQ$_REG_INFO,
descr IN SYS.AQ$_DESCRIPTOR,
payload IN VARCHAR2,
payloadl IN NUMBER
)
AS
MESSAGE STRMADMIN.MYTYPE;
msg ANYDATA;
BEGIN
-- Get the message identifier and consumer name from the descriptor
dequeue_options.msgid := descr.msg_id;
dequeue_options.consumer_name := descr.consumer_name;
-- Dequeue the message
DBMS_AQ.DEQUEUE(
queue_name => descr.queue_name,
dequeue_options => dequeue_options,
message_properties => message_properties,
payload => msg,
msgid => message_handle);
if msg.GETTYPENAME() = 'STRMADMIN.MYTYPE' then
rc := msg.getobject(MESSAGE);
DBMS_OUTPUT.PUT_LINE('ID ' || MESSAGE.ID);
DBMS_OUTPUT.PUT_LINE('REMARK '|| MESSAGE.REMARK);
end if;
-- if you are saving data
COMMIT;
END;
/
Register the callback procedure:
BEGIN
DBMS_AQ.REGISTER(
SYS.AQ$_REG_INFO_LIST(
SYS.AQ$_REG_INFO(
'test_q:RECIPIENT',
DBMS_AQ.NAMESPACE_AQ,
'plsql://strmadmin.DEQUEUE_MSG_CB?PR=1',
HEXTORAW('FF')
)
), 1
);
END;
/
This completes the setup of an Oracle ANYDATA queue system, ensuring reliable data propagation between databases!
References
For more details, refer to the Oracle Advanced Queuing Guide.