forked from sijms/go-ora
-
Notifications
You must be signed in to change notification settings - Fork 0
/
aq.go
146 lines (138 loc) · 3.65 KB
/
aq.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
package dbms
import (
"database/sql"
"database/sql/driver"
"errors"
go_ora "github.com/sijms/go-ora/v2"
)
type AQ struct {
conn *sql.DB
Name string `db:"QUEUE_NAME"`
TableName string `db:"TB_NAME"`
TypeName string `db:"TYPE_NAME"`
Owner string
MaxRetry int64 `db:"MAX_RETRY"`
RetryDelay int64 `db:"RETRY_DELAY"`
RetentionTime int64 `db:"RETENTION_TIME"`
Comment string `db:"COMMENT"`
}
func NewAQ(conn *sql.DB, name, typeName string) *AQ {
output := &AQ{
conn: conn,
Name: name,
TableName: name + "_TB",
TypeName: typeName,
RetentionTime: -1,
Comment: name,
}
return output
}
func (aq *AQ) validate() error {
if aq.conn == nil {
return errors.New("no connection defined for AQ type")
}
if len(aq.Name) == 0 {
return errors.New("queue name cannot be null")
}
if len(aq.TypeName) == 0 {
return errors.New("type name cannot be null")
}
return nil
}
func (aq *AQ) Create() error {
err := aq.validate()
if err != nil {
return err
}
sqlText := `BEGIN
DBMS_AQADM.CREATE_QUEUE_TABLE (:TB_NAME, :TYPE_NAME);
DBMS_AQADM.CREATE_QUEUE (
:QUEUE_NAME, :TB_NAME, DBMS_AQADM.NORMAL_QUEUE,
:MAX_RETRY, :RETRY_DELAY, :RETENTION_TIME,
FALSE, :COMMENT);
END;`
_, err = aq.conn.Exec(sqlText, aq)
return err
}
func (aq *AQ) Drop() error {
err := aq.validate()
if err != nil {
return err
}
sqlText := `BEGIN
DBMS_AQADM.DROP_QUEUE(:QUEUE_NAME, FALSE);
DBMS_AQADM.DROP_QUEUE_TABLE(:TB_NAME);
END;`
_, err = aq.conn.Exec(sqlText, aq) //sql.Named("QUEUE_NAME", aq.Name),
//sql.Named("TABLE_NAME", aq.TableName))
return err
}
// enable both enqueue and dequeue
func (aq *AQ) Start(enqueue, dequeue bool) error {
err := aq.validate()
if err != nil {
return err
}
_, err = aq.conn.Exec(`BEGIN
dbms_aqadm.start_queue (queue_name => :QUEUE_NAME,
enqueue => :ENQUEUE ,
dequeue => :DEQUEUE);
END;`, aq.Name, go_ora.PLBool(enqueue), go_ora.PLBool(dequeue))
return err
}
// disable both enqueue and dequeue
func (aq *AQ) Stop(enqueue, dequeue bool) error {
err := aq.validate()
if err != nil {
return err
}
_, err = aq.conn.Exec(`BEGIN
dbms_aqadm.stop_queue(queue_name => :QUEUE_NAME,
enqueue => :ENQUEUE ,
dequeue => :DEQUEUE);
END;`, aq.Name, go_ora.PLBool(enqueue), go_ora.PLBool(dequeue))
return err
}
func (aq *AQ) Dequeue(message driver.Value, messageSize int) (messageID []byte, err error) {
err = aq.validate()
if err != nil {
return
}
sqlText := `DECLARE
dequeue_options dbms_aq.dequeue_options_t;
message_properties dbms_aq.message_properties_t;
BEGIN
dequeue_options.VISIBILITY := DBMS_AQ.IMMEDIATE;
DBMS_AQ.DEQUEUE (
queue_name => :QUEUE_NAME,
dequeue_options => dequeue_options,
message_properties => message_properties,
payload => :MSG,
msgid => :MSG_ID);
END;`
_, err = aq.conn.Exec(sqlText, sql.Named("QUEUE_NAME", aq.Name),
sql.Named("MSG", go_ora.Out{Dest: message, Size: messageSize}),
sql.Named("MSG_ID", go_ora.Out{Dest: &messageID, Size: 100}))
return
}
func (aq *AQ) Enqueue(message driver.Value) (messageID []byte, err error) {
err = aq.validate()
if err != nil {
return
}
sqlText := `DECLARE
enqueue_options dbms_aq.enqueue_options_t;
message_properties dbms_aq.message_properties_t;
BEGIN
DBMS_AQ.ENQUEUE (
queue_name => :QUEUE_NAME,
enqueue_options => enqueue_options,
message_properties => message_properties,
payload => :MSG,
msgid => :MSG_ID);
END;`
_, err = aq.conn.Exec(sqlText, sql.Named("QUEUE_NAME", aq.Name),
sql.Named("MSG", message),
sql.Named("MSG_ID", go_ora.Out{Dest: &messageID, Size: 100}))
return
}