Thursday, 6 March 2025

Oracle advance queue for configuration on oracle 19c for ANYDATA type.

Configuring Oracle Data Queue for ANYDATA Payload

In this guide, we will walk through the process of creating and configuring an Oracle data queue for an ANYDATA payload. This setup allows seamless data transfer between two databases (DB1 and DB2) while ensuring data integrity and type flexibility.

Prerequisites

  • Two databases (DB1 and DB2) with connectivity.

  • The global_name parameter must be enabled on both databases.

  • A database link must be created between DB1 and DB2.


Step 1: Enable Global Name Parameter

Run the following command as SYSDBA to check the status of the global_name parameter:

SHOW PARAMETER global_name;

If the value is FALSE, set it to TRUE using:

ALTER SYSTEM SET global_name = TRUE;

Step 2: Create a Database Link

Create a database link from DB1 to DB2 for communication:

CREATE DATABASE LINK "DB2"
CONNECT TO "STRMADMIN"
IDENTIFIED BY "strmadmin"
USING '(DESCRIPTION =(ADDRESS = (PROTOCOL = TCP)(HOST = 192.168.1.1)(PORT = 1521))(CONNECT_DATA = (SID=ORCL)))';

Test the connection:

SELECT * FROM global_name@DB2;

If successful, this should return the global_name of the destination database (DB2).


Step 3: Create a User-Defined Type

The following user-defined type must be created on both databases:

CREATE OR REPLACE EDITIONABLE TYPE STRMADMIN.MYTYPE AS OBJECT (
    ID NUMBER,
    REMARK VARCHAR2(100 BYTE)
);
/

Step 4: Configure the Queue System

Drop Existing Queues (If Any)

Before creating a new queue, ensure that any existing queues are stopped and removed:

BEGIN
    DBMS_AQADM.STOP_QUEUE(queue_name => 'strmadmin.test_q');
    DBMS_AQADM.DROP_QUEUE(queue_name => 'strmadmin.test_q');
    DBMS_AQADM.DROP_QUEUE_TABLE(queue_table => 'strmadmin.test_qt');
END;
/

If the above commands do not work, remove the queue using:

BEGIN
    DBMS_STREAMS_ADM.REMOVE_QUEUE(
        queue_name               =>  'strmadmin.test_q',
        cascade                  =>  TRUE,
        drop_unused_queue_table  =>  TRUE);
END;
/

Create a Queue on DB1

BEGIN
    DBMS_AQADM.create_queue_table(
        queue_table => 'test_qt',
        queue_payload_type => 'SYS.ANYDATA',
        multiple_consumers => TRUE
    );

    DBMS_AQADM.create_queue(
        queue_name => 'test_q',
        queue_table => 'test_qt'
    );

    DBMS_AQADM.start_queue(queue_name => 'test_q');
END;
/

Create Propagation on DB1

Propagation should be created after the queue is established on both sides.

BEGIN
    DBMS_PROPAGATION_ADM.CREATE_PROPAGATION(
        propagation_name   => 'TEST_PROP',
        source_queue       => 'strmadmin.test_q',
        destination_queue  => 'strmadmin.test_q',
        destination_dblink => 'DB2',
        queue_to_queue     => TRUE
    );
END;
/

Create a Queue on DB2 (for Receiving Messages)

BEGIN
    DBMS_AQADM.create_queue_table(
        queue_table => 'test_qt',
        queue_payload_type => 'SYS.ANYDATA',
        multiple_consumers => TRUE
    );

    DBMS_AQADM.create_queue(
        queue_name => 'test_q',
        queue_table => 'test_qt'
    );

    DBMS_AQADM.start_queue(queue_name => 'test_q');
    
    DBMS_AQADM.add_subscriber(
        queue_name => 'test_q',
        subscriber => SYS.AQ$_AGENT('RECIPIENT', NULL, NULL)
    );
END;
/

Step 5: Testing the Queue System

Enqueue a Message from DB1

declare
    enqueue_options    DBMS_AQ.ENQUEUE_OPTIONS_T;
    message_properties DBMS_AQ.MESSAGE_PROPERTIES_T;
    message_handle     RAW(16);
    MESSAGE            STRMADMIN.MYTYPE;
    msg                ANYDATA;
begin
    MESSAGE := STRMADMIN.MYTYPE(1, 'TEST');
    msg := ANYDATA.CONVERTOBJECT(MESSAGE);

    DBMS_AQ.ENQUEUE(
        queue_name         => 'STRMADMIN.test_q',
        enqueue_options    => enqueue_options,
        message_properties => message_properties,
        payload            => msg,
        msgid              => message_handle
    );
    COMMIT;
end;
/

Dequeue Manually from DB2

DECLARE
    dequeue_options     DBMS_AQ.dequeue_options_t;
    message_properties  DBMS_AQ.message_properties_t;
    msgid               RAW(16);
    MESSAGE             STRMADMIN.MYTYPE;
    msg                 ANYDATA;
BEGIN
    FOR rec IN (SELECT * FROM test_qt)
LOOP dequeue_options.msgid := rec.msgid; dequeue_options.consumer_name := 'RECIPIENT'; DBMS_AQ.DEQUEUE( queue_name => 'STRMADMIN.TEST_Q', dequeue_options => dequeue_options, message_properties => message_properties, payload => msg, msgid => msgid ); IF msg.GETTYPENAME() = 'STRMADMIN.MYTYPE' THEN rc := msg.GETOBJECT(MESSAGE); DBMS_OUTPUT.PUT_LINE('ID: ' || MESSAGE.ID); DBMS_OUTPUT.PUT_LINE('REMARK: ' || MESSAGE.REMARK); END IF; END LOOP; COMMIT; EXCEPTION WHEN OTHERS THEN ROLLBACK; END; /

Step 6: Register a Callback Procedure

If you want automatic message dequeuing upon arrival, register a callback procedure:

CREATE OR REPLACE PROCEDURE DEQUEUE_MSG_CB(
  context  IN RAW,
  reginfo  IN SYS.AQ$_REG_INFO, 
  descr    IN SYS.AQ$_DESCRIPTOR,
  payload  IN VARCHAR2,
  payloadl IN NUMBER
)
AS
   MESSAGE             STRMADMIN.MYTYPE;
   msg                 ANYDATA;
BEGIN
    -- Get the message identifier and consumer name from the descriptor
	dequeue_options.msgid := descr.msg_id;
	dequeue_options.consumer_name := descr.consumer_name;
    -- Dequeue the message
	DBMS_AQ.DEQUEUE(
		queue_name         => descr.queue_name,
		dequeue_options    => dequeue_options,
		message_properties => message_properties,
		payload            => msg,
		msgid              => message_handle);

	if msg.GETTYPENAME() = 'STRMADMIN.MYTYPE' then
		rc := msg.getobject(MESSAGE);
		DBMS_OUTPUT.PUT_LINE('ID ' || MESSAGE.ID);
		DBMS_OUTPUT.PUT_LINE('REMARK '|| MESSAGE.REMARK);
	end if;

  -- if you are saving data
  COMMIT;
END;
/

Register the callback procedure:

BEGIN  
    DBMS_AQ.REGISTER(
        SYS.AQ$_REG_INFO_LIST(
            SYS.AQ$_REG_INFO(
                'test_q:RECIPIENT',
                DBMS_AQ.NAMESPACE_AQ,
                'plsql://strmadmin.DEQUEUE_MSG_CB?PR=1',
                HEXTORAW('FF')
            )
        ), 1
    );
END;
/

This completes the setup of an Oracle ANYDATA queue system, ensuring reliable data propagation between databases!

References

For more details, refer to the Oracle Advanced Queuing Guide.