Skip to content

Commit

Permalink
updated the plsql samples (oracle#437)
Browse files Browse the repository at this point in the history
Signed-off-by: Mark Nelson <[email protected]>
  • Loading branch information
markxnelson authored Jun 15, 2022
1 parent 567aac0 commit b582c8c
Show file tree
Hide file tree
Showing 4 changed files with 123 additions and 235 deletions.
27 changes: 15 additions & 12 deletions code-teq/plsqlTeq/cleanupTEQ.sql
Original file line number Diff line number Diff line change
@@ -1,15 +1,18 @@
--Clean up all objects related to the obj type: */
Execute DBMS_AQADM.STOP_QUEUE ( queue_name => 'objType_TEQ');
Execute DBMS_AQADM.drop_transactional_event_queue(queue_name =>'objType_TEQ',force=> TRUE);
--
-- This sample demonstrates how to remove (clean up) a TEQ using PL/SQL
--

--Cleans up all objects related to the RAW type: */
Execute DBMS_AQADM.STOP_QUEUE ( queue_name => 'rawType_TEQ');
Execute DBMS_AQADM.drop_transactional_event_queue(queue_name =>'rawType_TEQ',force=> TRUE);
-- Execute permission on dbms_aqadm is required.

--Cleans up all objects related to the priority queue: */
Execute DBMS_AQADM.STOP_QUEUE ( queue_name => 'jsonType_TEQ');
Execute DBMS_AQADM.drop_transactional_event_queue(queue_name =>'jsonType_TEQ',force=> TRUE);
/
select name, queue_table, dequeue_enabled,enqueue_enabled, sharded, queue_category, recipients from all_queues where OWNER='DBUSER' and QUEUE_TYPE<>'EXCEPTION_QUEUE';
begin
-- first we need to stop the TEQ
dbms_aqadm.stop_queue(
queue_name => 'my_json_teq'
);

-- now we can drop the TEQ
dbms_aqadm.drop_transactional_event_queue(
queue_name => 'my_json_teq'
);
end;
/
EXIT;
145 changes: 44 additions & 101 deletions code-teq/plsqlTeq/createTEQ.sql
Original file line number Diff line number Diff line change
@@ -1,107 +1,50 @@
--
-- This sample demonstrates how to create a TEQ using PL/SQL
--

CREATE type Message_type as object (subject VARCHAR2(30), text VARCHAR2(80));
/
-- Creating an Object type queue
BEGIN
DBMS_AQADM.CREATE_TRANSACTIONAL_EVENT_QUEUE(
queue_name =>'objType_TEQ',
storage_clause =>null,
multiple_consumers =>true,
max_retries =>10,
comment =>'ObjectType for TEQ',
queue_payload_type =>'Message_type',
queue_properties =>null,
replication_mode =>null);
DBMS_AQADM.START_QUEUE (queue_name=> 'objType_TEQ', enqueue =>TRUE, dequeue=> True);
END;
/
-- There are various payload types supported, including user-defined object, raw, JMS and JSON.
-- This sample uses the JSON payload type.

-- Creating a RAW type queue:
BEGIN
DBMS_AQADM.CREATE_TRANSACTIONAL_EVENT_QUEUE(
queue_name =>'rawType_TEQ',
storage_clause =>null,
multiple_consumers =>true,
max_retries =>10,
comment =>'RAW type for TEQ',
queue_payload_type =>'RAW',
queue_properties =>null,
replication_mode =>null);
DBMS_AQADM.START_QUEUE (queue_name=> 'rawType_TEQ', enqueue =>TRUE, dequeue=> True);
END;
/
-- Execute permission on dbms_aqadm is required.

--Creating JSON type queue:
BEGIN
DBMS_AQADM.CREATE_TRANSACTIONAL_EVENT_QUEUE(
queue_name =>'jsonType_TEQ',
storage_clause =>null,
multiple_consumers =>true,
max_retries =>10,
comment =>'jsonType for TEQ',
queue_payload_type =>'JSON',
queue_properties =>null,
replication_mode =>null);
DBMS_AQADM.START_QUEUE (queue_name=> 'jsonType_TEQ', enqueue =>TRUE, dequeue=> True);
END;
/
BEGIN
DBMS_AQADM.CREATE_TRANSACTIONAL_EVENT_QUEUE(
queue_name =>'JAVA_TEQ_PUBSUB_QUEUE',
storage_clause =>null,
multiple_consumers=>true,
max_retries =>10,
comment =>'JAVA_TEQ_PUBSUB_QUEUE',
queue_payload_type=>'JMS',
queue_properties =>null,
replication_mode =>null);
DBMS_AQADM.START_QUEUE (queue_name=> 'JAVA_TEQ_PUBSUB_QUEUE', enqueue =>TRUE, dequeue=> True);
END;
begin
-- create the TEQ
dbms_aqadm.create_transactional_event_queue(
-- note, in Oracle 19c this is called create_sharded_queue() but has the same parameters
queue_name => 'my_json_teq',
queue_payload_type => 'JSON',
-- when mutiple_consumers is true, this will create a pub/sub "topic" - the default is false
multiple_consumers => true,
max_retries => 10,
comment => 'A TEQ with JSON payload'
);

-- start the TEQ
dbms_aqadm.start_queue(
queue_name => 'my_json_teq',
-- these two parameters control whether enqueueing and dequeueing will be allowed
enqueue => true,
dequeue => true
);
end;
/
DECLARE
subscriber sys.aq$_agent;
BEGIN
dbms_aqadm.add_subscriber(queue_name => 'objType_TEQ' , subscriber => sys.aq$_agent('teqBasicObjSubscriber' , null ,0), rule => 'correlation = ''teqBasicObjSubscriber''');

dbms_aqadm.add_subscriber(queue_name => 'rawType_TEQ' , subscriber => sys.aq$_agent('teqBasicRawSubscriber' , null ,0), rule => 'correlation = ''teqBasicRawSubscriber''');
--
-- You may also want to create a subscriber for the TEQ, pub/sub topics normally deliver
-- messages only when the consumer/subscriber is present.
--

dbms_aqadm.add_subscriber(queue_name => 'jsonType_TEQ' , subscriber => sys.aq$_agent('teqBasicJsonSubscriber' , null ,0), rule => 'correlation = ''teqBasicJsonSubscriber''');

END;
/
CREATE OR REPLACE FUNCTION enqueueDequeueTEQ(subscriber varchar2, queueName varchar2, message Message_Typ) RETURN Message_Typ
IS
enqueue_options DBMS_AQ.enqueue_options_t;
message_properties DBMS_AQ.message_properties_t;
message_handle RAW(16);
dequeue_options DBMS_AQ.dequeue_options_t;
messageData Message_Typ;

BEGIN
messageData := message;
message_properties.correlation := subscriber;
DBMS_AQ.ENQUEUE(
queue_name => queueName,
enqueue_options => enqueue_options,
message_properties => message_properties,
payload => messageData,
msgid => message_handle);
COMMIT;
DBMS_OUTPUT.PUT_LINE ('----------ENQUEUE Message: ' || 'ORDERID: ' || messageData.ORDERID || ', OTP: ' || messageData.OTP ||', DELIVERY_STATUS: ' || messageData.DELIVERY_STATUS );

dequeue_options.dequeue_mode := DBMS_AQ.REMOVE;
dequeue_options.wait := DBMS_AQ.NO_WAIT;
dequeue_options.navigation := DBMS_AQ.FIRST_MESSAGE;
dequeue_options.consumer_name := subscriber;
DBMS_AQ.DEQUEUE(
queue_name => queueName,
dequeue_options => dequeue_options,
message_properties => message_properties,
payload => messageData,
msgid => message_handle);
COMMIT;
DBMS_OUTPUT.PUT_LINE ('----------DEQUEUE Message: ' || 'ORDERID: ' || messageData.ORDERID || ', OTP: ' || messageData.OTP ||', DELIVERY_STATUS: ' || messageData.DELIVERY_STATUS );
RETURN messageData;
END;
/
EXIT;
declare
subscriber sys.aq$_agent;
begin
dbms_aqadm.add_subscriber(
queue_name => 'my_json_teq',
subscriber => sys.aq$_agent(
'my_subscriber', -- the subscriber name
null, -- address, only used for notifications
0 -- protocol
),
rule => 'correlation = ''my_subscriber'''
);
end;
/
92 changes: 31 additions & 61 deletions code-teq/plsqlTeq/dequeueTEQ.sql
Original file line number Diff line number Diff line change
@@ -1,72 +1,42 @@
--Dequeue from obj Type Messages */
DECLARE
dequeue_options dbms_aq.dequeue_options_t;
message_properties dbms_aq.message_properties_t;
message_handle RAW(16);
message Message_type;

BEGIN
dequeue_options.dequeue_mode := DBMS_AQ.REMOVE;
dequeue_options.wait := DBMS_AQ.NO_WAIT;
dequeue_options.navigation := DBMS_AQ.FIRST_MESSAGE;
dequeue_options.consumer_name := 'teqBasicObjSubscriber';

DBMS_AQ.DEQUEUE(
queue_name => 'objType_TEQ',
dequeue_options => dequeue_options,
message_properties => message_properties,
payload => message,
msgid => message_handle);

DBMS_OUTPUT.PUT_LINE ('Message: ' || message.subject || ' ... ' || message.text );
COMMIT;
END;
/
--
-- This sample demonstrates how to enqueue a message onto a TEQ using PL/SQL
--

--Dequeue from RAW Type Messages */
DECLARE
dequeue_options DBMS_AQ.dequeue_options_t;
message_properties DBMS_AQ.message_properties_t;
message_handle RAW(16);
message RAW(4096);

BEGIN
dequeue_options.dequeue_mode := DBMS_AQ.REMOVE;
dequeue_options.wait := DBMS_AQ.NO_WAIT;
dequeue_options.navigation := DBMS_AQ.FIRST_MESSAGE;
dequeue_options.consumer_name := 'teqBasicRawSubscriber';
-- There are various payload types supported, including user-defined object, raw, JMS and JSON.
-- This sample uses the JSON payload type.

DBMS_AQ.DEQUEUE(
queue_name => 'rawType_TEQ',
dequeue_options => dequeue_options,
message_properties => message_properties,
payload => message,
msgid => message_handle);
COMMIT;
END;
/
-- Execute permission on dbms_aq is required.

--Dequeue from JSON TEQ
DECLARE
set serveroutput on;
declare
dequeue_options dbms_aq.dequeue_options_t;
message_properties dbms_aq.message_properties_t;
message_handle RAW(16);
message JSON;
message_handle raw(16);
message json;

BEGIN
dequeue_options.dequeue_mode := DBMS_AQ.REMOVE;
dequeue_options.wait := DBMS_AQ.NO_WAIT;
dequeue_options.navigation := DBMS_AQ.FIRST_MESSAGE;
dequeue_options.consumer_name := 'teqBasicJsonSubscriber';
begin
-- dequeue_mode determines whether we will consume the message or just browse it and leave it there
dequeue_options.dequeue_mode := dbms_aq.remove;
-- wait controls how long to wait for a message to arrive before giving up
dequeue_options.wait := dbms_aq.no_wait;
-- we must specify navigation so we know where to look in the TEQ
dequeue_options.navigation := dbms_aq.first_message;
-- set the consumer name
dequeue_options.consumer_name := 'my_subscriber';

DBMS_AQ.DEQUEUE(
queue_name => 'jsonType_TEQ',
-- perform the dequeue
dbms_aq.dequeue(
queue_name => 'my_json_teq',
dequeue_options => dequeue_options,
message_properties => message_properties,
payload => message,
msgid => message_handle);
dbms_output.put_line(json_serialize(message));
COMMIT;
END;
msgid => message_handle
);

-- print out the message payload
dbms_output.put_line(json_serialize(message));

-- commit the transaction
commit;
end;
/
EXIT;
94 changes: 33 additions & 61 deletions code-teq/plsqlTeq/enqueueTEQ.sql
Original file line number Diff line number Diff line change
@@ -1,66 +1,38 @@
--Enqueue to objType Message
DECLARE
enqueue_options dbms_aq.enqueue_options_t;
message_properties dbms_aq.message_properties_t;
message_handle RAW(16);
message Message_type;
--
-- This sample demonstrates how to enqueue a message onto a TEQ using PL/SQL
--

BEGIN
message := Message_type('NORMAL MESSAGE','enqueue objType_TEQ');
message_properties.correlation := 'teqBasicObjSubscriber';
-- There are various payload types supported, including user-defined object, raw, JMS and JSON.
-- This sample uses the JSON payload type.

DBMS_AQ.ENQUEUE(
queue_name => 'objType_TEQ',
enqueue_options => enqueue_options,
message_properties => message_properties,
payload => message,
msgid => message_handle);
COMMIT;
END;
/
-- Execute permission on dbms_aq is required.

--Enqueue to rawType Message
DECLARE
enqueue_options dbms_aq.enqueue_options_t;
message_properties dbms_aq.message_properties_t;
message_handle RAW(16);
message RAW(4096);

BEGIN
message := HEXTORAW(RPAD('FF',4095,'FF'));
message_properties.correlation := 'teqBasicRawSubscriber';

DBMS_AQ.ENQUEUE(
queue_name => 'rawType_TEQ',
enqueue_options => enqueue_options,
message_properties => message_properties,
payload => message,
msgid => message_handle);
COMMIT;
END;
/
-- Enqueue for JSON Message
DECLARE
enqueue_options dbms_aq.enqueue_options_t;
message_properties dbms_aq.message_properties_t;
message_handle RAW(16);
message json;
BEGIN
message:= json('
declare
enqueue_options dbms_aq.enqueue_options_t;
message_properties dbms_aq.message_properties_t;
message_handle raw(16);
message json;
begin
-- create the message payload
message := json('
{
"ORDERID":12345,
"USERNAME":"name"
}');
message_properties.correlation := 'teqBasicJsonSubscriber';
"orderid": 12345,
"username": "Jessica Smith"
}
');

-- set the consumer name
message_properties.correlation := 'my_subscriber';

DBMS_AQ.ENQUEUE(
queue_name => 'jsonType_TEQ',
enqueue_options => enqueue_options,
message_properties => message_properties,
payload => message,
msgid => message_handle);
dbms_output.put_line(json_serialize(message));
COMMIT;
END;
/
EXIT;
-- enqueue the message
dbms_aq.enqueue(
queue_name => 'my_json_teq',
enqueue_options => enqueue_options,
message_properties => message_properties,
payload => message,
msgid => message_handle);

-- commit the transaction
commit;
end;
/

0 comments on commit b582c8c

Please sign in to comment.