Skip to content

Commit

Permalink
WMS# 8021: OracleAQ and TEQ (oracle#390)
Browse files Browse the repository at this point in the history
* update for node.js and python

* update node and python bugs

* update teardown and setup

* python bug fix

* Update nodeEnqDeqAQ.js

* Update nodeEnqDeqTEQ.js
  • Loading branch information
matayal authored Apr 26, 2022
1 parent 26fe165 commit 92a7f7e
Show file tree
Hide file tree
Showing 48 changed files with 761 additions and 90 deletions.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
const oracledb = require('oracledb');

async function run() {

let connection;

try {
const config = { connectString: process.env.DB_ALIAS, externalAuth: true };
const connection = await oracledb.getConnection(config);

createQueue(connection,"NODE_TEQ_ADT" );
createQueue(connection,"NODE_TEQ_RAW" );
createQueue(connection,"NODE_TEQ_JMS" );

} catch (err) {
console.error(err);
} finally {
if (connection) {
try {
await connection.close();
} catch (err) {
console.error(err);
}
}
}
}
run();

async function cleanUp(conn, queueTable, queueName) {

DBMS_AQADM.STOP_QUEUE ( queue_name => 'objType_TEQ');
DBMS_AQADM.drop_transactional_event_queue(queue_name =>'objType_TEQ',force=> TRUE);

await conn.execute(`
BEGIN
DBMS_AQADM.STOP_QUEUE( QUEUE_NAME => '`.concat(queueName).concat(`');
END;`)
);

await conn.execute(`
BEGIN
DBMS_AQADM.DROP_TRANSACTIONAL_QUEUE(
QUEUE_NAME => '`.concat(queueName).concat(`',
FORCE => TRUE
);
END;`)
);

}
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
24 changes: 24 additions & 0 deletions workshops/oracleAQ/aqPython/aq/pythonCleanUpAQ.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
#!/usr/bin/env python
import os
import logging
from os import environ as env
import cx_Oracle
import oci

connection = cx_Oracle.connect(dsn=env.get('DB_ALIAS'))
cursor = connection.cursor()
dropQuery="""
BEGIN
DBMS_AQADM.STOP_QUEUE( queue_name => 'PYTHON_AQ_ADT');
DBMS_AQADM.DROP_QUEUE( queue_name => 'PYTHON_AQ_ADT');
DBMS_AQADM.DROP_QUEUE_TABLE ( queue_table => 'PYTHON_AQ_ADT_Table');
DBMS_AQADM.STOP_QUEUE( queue_name =>'PYTHON_AQ_RAW');
DBMS_AQADM.DROP_QUEUE( queue_name =>'PYTHON_AQ_RAW');
DBMS_AQADM.DROP_QUEUE_TABLE ( queue_table =>'PYTHON_AQ_RAW_Table');
DBMS_AQADM.STOP_QUEUE( queue_name =>'PYTHON_AQ_JMS');
DBMS_AQADM.DROP_QUEUE( queue_name =>'PYTHON_AQ_JMS');
DBMS_AQADM.DROP_QUEUE_TABLE ( queue_table =>'PYTHON_AQ_JMS_Table');
END;"""
cursor.execute(dropQuery)
42 changes: 42 additions & 0 deletions workshops/oracleAQ/aqPython/aq/pythonCreateAQ.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
#!/usr/bin/env PYTHON_AQ
import os
import logging
from os import environ as env
import cx_Oracle
import threading
import time
import oci
import base64

connection = cx_Oracle.connect(dsn=env.get('DB_ALIAS'))
cursor = connection.cursor()

cursor.execute("CREATE OR REPLACE TYPE PYTHON_AQ_MESSAGE_TYPE AS OBJECT (Title VARCHAR2(100), Authors VARCHAR2(100),Price NUMBER(5,2))");

adtQuery="""
BEGIN
DBMS_AQADM.CREATE_QUEUE_TABLE ( queue_table => 'PYTHON_AQ_ADT_Table', queue_payload_type => 'PYTHON_AQ_MESSAGE_TYPE');
DBMS_AQADM.CREATE_QUEUE ( queue_name => 'PYTHON_AQ_ADT', queue_table => 'PYTHON_AQ_ADT_Table');
DBMS_AQADM.START_QUEUE ( queue_name => 'PYTHON_AQ_ADT');
END;"""
cursor.execute(adtQuery)

rawQuery="""
BEGIN
DBMS_AQADM.CREATE_QUEUE_TABLE ( queue_table => 'PYTHON_AQ_RAW_Table', queue_payload_type => 'RAW');
DBMS_AQADM.CREATE_QUEUE ( queue_name => 'PYTHON_AQ_RAW', queue_table => 'PYTHON_AQ_RAW_Table');
DBMS_AQADM.START_QUEUE ( queue_name => 'PYTHON_AQ_RAW');
END;"""
cursor.execute(rawQuery)

jmsQuery="""
BEGIN
DBMS_AQADM.CREATE_QUEUE_TABLE ( queue_table => 'PYTHON_AQ_JMS_Table', queue_payload_type => 'SYS.AQ$_JMS_TEXT_MESSAGE');
DBMS_AQADM.CREATE_QUEUE ( queue_name => 'PYTHON_AQ_JMS', queue_table => 'PYTHON_AQ_JMS_Table');
DBMS_AQADM.START_QUEUE ( queue_name => 'PYTHON_AQ_JMS');
END;"""
cursor.execute(jmsQuery)

query= "select name, queue_table, dequeue_enabled,enqueue_enabled, sharded, queue_category, recipients from all_queues where OWNER='JAVAUSER' and QUEUE_TYPE<>'EXCEPTION_QUEUE'";
for i in cursor.execute(query):
print(i)
Original file line number Diff line number Diff line change
Expand Up @@ -2,17 +2,14 @@
import logging
from os import environ as env
import cx_Oracle
import threading
import time
import oci
import base64

connection = cx_Oracle.connect(dsn=env.get('DB_ALIAS'))
cursor = connection.cursor()

#ADT payload
book_type = connection.gettype("ADT_BOOK")
queue = connection.queue("PYTHON_ADT_Q", book_type)
book_type = connection.gettype("PYTHON_AQ_MESSAGE_TYPE")
queue = connection.queue("PYTHON_AQ_ADT", book_type)

book = book_type.newobject()
book.TITLE = "Quick Brown Fox"
Expand All @@ -28,8 +25,8 @@

#deqOptions should have consumername in case of multiconsumer queue
#queue.deqOptions.consumername = "PYTHON_ADT_SUBSCIBER"
options = connection.deqoptions()
options.wait = cx_Oracle.DEQ_NO_WAIT
queue.deqOptions.wait = cx_Oracle.DEQ_NO_WAIT
queue.deqOptions.navigation = cx_Oracle.DEQ_FIRST_MSG
msg = queue.deqOne()
connection.commit()
print("Dequeued message with ADT payload : ",msg.payload.TITLE)
Expand All @@ -38,8 +35,7 @@

#RAW PAYLOAD
print("\n2) Sample for Classic queue : RAW payload")

queue = connection.queue("PYTHON_RAW_Q")
queue = connection.queue("PYTHON_AQ_RAW")
PAYLOAD_DATA = [
"The first message"
]
Expand All @@ -63,7 +59,7 @@
headerType = connection.gettype("SYS.AQ$_JMS_HEADER")
user_prop_Type = connection.gettype("SYS.AQ$_JMS_USERPROPARRAY")

queue = connection.queue("PYTHON_JMS_Q",jmsType)
queue = connection.queue("PYTHON_AQ_JMS",jmsType)
#create python object for JMS type
text = jmsType.newobject()
text.HEADER = headerType.newobject()
Expand Down
24 changes: 24 additions & 0 deletions workshops/oracleAQ/aqPython/teq/pythonCleanupTEQ.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
#!/usr/bin/env python
import os
import logging
from os import environ as env
import cx_Oracle
import oci

connection = cx_Oracle.connect(dsn=env.get('DB_ALIAS'))
cursor = connection.cursor()
dropQuery="""
BEGIN
DBMS_AQADM.STOP_QUEUE ( queue_name => 'PYTHON_TEQ_ADT');
DBMS_AQADM.drop_transactional_event_queue(queue_name =>'PYTHON_TEQ_ADT',force=> TRUE);
DBMS_AQADM.STOP_QUEUE ( queue_name => 'PYTHON_TEQ_RAW');
DBMS_AQADM.drop_transactional_event_queue(queue_name =>'PYTHON_TEQ_RAW',force=> TRUE);
DBMS_AQADM.STOP_QUEUE ( queue_name => 'PYTHON_TEQ_JMS');
DBMS_AQADM.drop_transactional_event_queue(queue_name =>'PYTHON_TEQ_JMS',force=> TRUE);
DBMS_AQADM.STOP_QUEUE ( queue_name => 'PYTHON_TEQ_JSON');
DBMS_AQADM.drop_transactional_event_queue(queue_name =>'PYTHON_TEQ_JSON',force=> TRUE);
END;"""
cursor.execute(dropQuery)
88 changes: 88 additions & 0 deletions workshops/oracleAQ/aqPython/teq/pythonCreateTEQ.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
#!/usr/bin/env python
import os
import logging
from os import environ as env
import cx_Oracle
import threading
import time
import oci
import base64

connection = cx_Oracle.connect(dsn=env.get('DB_ALIAS'))
cursor = connection.cursor()

cursor.execute("CREATE OR REPLACE TYPE PYTHON_TEQ_MESSAGE_TYPE AS OBJECT (Title VARCHAR2(100), Authors VARCHAR2(100),Price NUMBER(5,2))");

adtQuery="""
BEGIN
DBMS_AQADM.CREATE_TRANSACTIONAL_EVENT_QUEUE(
queue_name =>'PYTHON_TEQ_ADT',
storage_clause =>null,
multiple_consumers =>true,
max_retries =>10,
comment =>'ObjectType for TEQ',
queue_payload_type =>'PYTHON_TEQ_MESSAGE_TYPE',
queue_properties =>null,
replication_mode =>null);
DBMS_AQADM.START_QUEUE (queue_name=> 'PYTHON_TEQ_ADT', enqueue =>TRUE, dequeue=> True);
COMMIT;
DBMS_AQADM.add_subscriber(queue_name => 'PYTHON_TEQ_ADT', subscriber => sys.aq$_agent('PYTHON_TEQ_SUBSCIBER_ADT', null ,0)); END;"""
cursor.execute(adtQuery)

rawQuery = """
DECLARE
subscriber sys.aq$_agent;
BEGIN
DBMS_AQADM.CREATE_TRANSACTIONAL_EVENT_QUEUE(
queue_name =>'PYTHON_TEQ_RAW',
storage_clause =>null,
multiple_consumers =>true,
max_retries =>10,
comment =>'TEQ samples using PYTHON',
queue_payload_type =>'RAW',
queue_properties =>null,
replication_mode =>null);
DBMS_AQADM.START_QUEUE (queue_name=>'PYTHON_TEQ_RAW', enqueue =>TRUE, dequeue=> True);
COMMIT;
DBMS_AQADM.add_subscriber(queue_name => 'PYTHON_TEQ_RAW', subscriber => sys.aq$_agent('PYTHON_TEQ_SUBSCIBER_RAW', null ,0)); END;"""
cursor.execute(rawQuery)

jsonQuery = """
DECLARE
subscriber sys.aq$_agent;
BEGIN
DBMS_AQADM.CREATE_TRANSACTIONAL_EVENT_QUEUE(
queue_name =>'PYTHON_TEQ_JSON',
storage_clause =>null,
multiple_consumers =>true,
max_retries =>10,
comment =>'TEQ samples using PYTHON',
queue_payload_type =>'JSON',
queue_properties =>null,
replication_mode =>null);
DBMS_AQADM.START_QUEUE (queue_name=>'PYTHON_TEQ_JSON', enqueue =>TRUE, dequeue=> True);
COMMIT;
DBMS_AQADM.add_subscriber(queue_name => 'PYTHON_TEQ_JSON', subscriber => sys.aq$_agent('PYTHON_TEQ_SUBSCIBER_JSON', null ,0)); END;"""
cursor.execute(jsonQuery)

jmsQuery = """
DECLARE
subscriber sys.aq$_agent;
BEGIN
DBMS_AQADM.CREATE_TRANSACTIONAL_EVENT_QUEUE(
queue_name =>'PYTHON_TEQ_JMS',
storage_clause =>null,
multiple_consumers =>true,
max_retries =>10,
comment =>'TEQ samples using PYTHON',
queue_payload_type =>'JMS',
queue_properties =>null,
replication_mode =>null);
DBMS_AQADM.START_QUEUE (queue_name=>'PYTHON_TEQ_JMS', enqueue =>TRUE, dequeue=> True);
COMMIT;
DBMS_AQADM.add_subscriber(queue_name => 'PYTHON_TEQ_JMS', subscriber => sys.aq$_agent('PYTHON_TEQ_SUBSCIBER_JMS', null ,0)); END;"""
cursor.execute(jmsQuery)

query= "select name, queue_table, dequeue_enabled,enqueue_enabled, sharded, queue_category, recipients from all_queues where OWNER='JAVAUSER' and QUEUE_TYPE<>'EXCEPTION_QUEUE'";
for i in cursor.execute(query):
print(i)
90 changes: 90 additions & 0 deletions workshops/oracleAQ/aqPython/teq/pythonEnqDeqTEQ.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
import os
import logging
from os import environ as env
import cx_Oracle
import threading
import time
import oci
import base64

connection = cx_Oracle.connect(dsn=env.get('DB_ALIAS'))
cursor = connection.cursor()

#ADT payload
book_type = connection.gettype("PYTHON_TEQ_MESSAGE_TYPE")
adtQueue = connection.queue("PYTHON_TEQ_ADT", book_type)

book = book_type.newobject()
book.TITLE = "Quick Brown Fox"
book.AUTHORS = "The Dog"
book.PRICE = 123
props = connection.msgproperties(payload=book, correlation="correlation-py", expiration=30, priority=7)

print("1) Sample for Classic Queue : ADT payload")
print("Enqueue one message with ADT payload : ",book.TITLE)
adtQueue.enqOne(props)
connection.commit()
print("Enqueue Done!!!")

#deqOptions should have consumername in case of multiconsumer queue
adtQueue.deqOptions.consumername = "PYTHON_TEQ_SUBSCIBER_ADT"
adtQueue.deqOptions.wait = cx_Oracle.DEQ_NO_WAIT
adtQueue.deqOptions.navigation = cx_Oracle.DEQ_FIRST_MSG
adtMsg = adtQueue.deqOne()
connection.commit()
print("Dequeued message with ADT payload : ",adtMsg.payload.TITLE)
print("Dequeue Done!!!")
print("-----------------------------------------------------------------")

#RAW PAYLOAD
print("\n2) Sample for Classic queue : RAW payload")

rawQueue = connection.queue("PYTHON_TEQ_RAW")
PAYLOAD_DATA = [
"The first message"
]
for data in PAYLOAD_DATA:
print("Enqueue message with RAW payload : ",data)
rawQueue.enqone(connection.msgproperties(payload=data))
connection.commit()
print("Enqueue Done!!!")

rawQueue.deqOptions.consumername = "PYTHON_TEQ_SUBSCIBER_RAW"
rawQueue.deqOptions.wait = cx_Oracle.DEQ_NO_WAIT
rawQueue.deqOptions.navigation = cx_Oracle.DEQ_FIRST_MSG
rawMsg = rawQueue.deqOne()
connection.commit()
print(rawMsg.payload.decode(connection.encoding))
print("Dequeued message with RAW payload : ",rawMsg.payload)
print("Dequeue Done!!!")
print("-----------------------------------------------------------------")

print("\n3) Sample for Classic queue : JMS payload")
#get the JMS type
jmsType = connection.gettype("SYS.AQ$_JMS_TEXT_MESSAGE")
headerType = connection.gettype("SYS.AQ$_JMS_HEADER")
user_prop_Type = connection.gettype("SYS.AQ$_JMS_USERPROPARRAY")

jmsQueue = connection.queue("PYTHON_TEQ_JMS",jmsType)
#create python object for JMS type
text = jmsType.newobject()
text.HEADER = headerType.newobject()
text.TEXT_VC = "JMS text message"
text.TEXT_LEN = 20
text.HEADER.APPID = "py-app-1"
text.HEADER.GROUPID = "py-grp-1"
text.HEADER.GROUPSEQ = 1
text.HEADER.PROPERTIES = user_prop_Type.newobject()
print("Enqueue one message with JMS payload : ",text.TEXT_VC)
jmsQueue.enqOne(connection.msgproperties(payload=text))
connection.commit()
print("Enqueue Done!!!")

jmsQueue.deqOptions.consumername = "PYTHON_TEQ_SUBSCIBER_JMS"
jmsQueue.deqOptions.wait = cx_Oracle.DEQ_NO_WAIT
jmsQueue.deqOptions.navigation = cx_Oracle.DEQ_FIRST_MSG
jmsMsg = jmsQueue.deqOne()
connection.commit()
print("Dequeued message with JMS payload :",jmsMsg.payload.TEXT_VC)
print("Dequeue Done!!!")
print("-----------------------------------------------------------------")
2 changes: 0 additions & 2 deletions workshops/oracleAQ/cleanupAQPython.sh

This file was deleted.

18 changes: 0 additions & 18 deletions workshops/oracleAQ/cleanupAQPython.sql

This file was deleted.

Loading

0 comments on commit 92a7f7e

Please sign in to comment.