forked from lysu/go-saga
-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathstorage.go
129 lines (111 loc) · 3.02 KB
/
storage.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
package saga
import (
"fmt"
"go.intra.xiaojukeji.com/golang/sarama"
"log"
)
// Storage uses to support save and lookup saga log.
type Storage interface {
// AppendLog appends log data into log under given logID
AppendLog(logID string, data string) error
// Lookup uses to lookup all log under given logID
Lookup(logID string) ([]string, error)
// Close use to close storage and release resources
Close() error
}
type memStorage struct {
data map[string][]string
}
// NewMemStorage creates log storage base on memory.
// This storage use simple `map[string][]string`, just for TestCase used.
// NOT use this in product.
func NewMemStorage() (Storage, error) {
return &memStorage{
data: make(map[string][]string),
}, nil
}
// AppendLog appends log into queue under given logID.
func (s *memStorage) AppendLog(logID string, data string) error {
logQueue, ok := s.data[logID]
if !ok {
logQueue = []string{}
s.data[logID] = logQueue
}
s.data[logID] = append(s.data[logID], data)
return nil
}
// Lookup lookups log under given logID.
func (s *memStorage) Lookup(logID string) ([]string, error) {
return s.data[logID], nil
}
// Close use to close storage and release resources.
func (s *memStorage) Close() error {
return nil
}
type kafkaStorage struct {
producer sarama.SyncProducer
consumer sarama.Consumer
}
// NewKafkaStorage creates log storage base on Kafka.
func NewKafkaStorage(addrs []string) (Storage, error) {
producer, err := sarama.NewSyncProducer(addrs, nil)
if err != nil {
panic(fmt.Sprintf("Start Kafka Storage failure: %v", err))
}
consumer, err := sarama.NewConsumer([]string{"localhost:9092"}, nil)
if err != nil {
panic(err)
}
return &kafkaStorage{
producer: producer,
consumer: consumer,
}
}
// AppendLog appends log into queue under given logID.
func (s *kafkaStorage) AppendLog(logID string, data string) error {
msg := &sarama.ProducerMessage{Topic: logID, Value: sarama.StringEncoder(data)} // ?? always new?
partition, offset, err := s.producer.SendMessage(msg)
if err != nil {
log.Printf("FAILED to send message: %s\n", err)
return err
}
log.Printf("> message sent to partition %d at offset %d\n", partition, offset)
return nil
}
// Lookup lookups log under given logID.
func (s *kafkaStorage) Lookup(logID string) ([]string, error) {
partitionConsumer, err := s.consumer.ConsumePartition(logID, 0, sarama.OffsetOldest)
if err != nil {
panic(err)
}
defer func() {
if err := partitionConsumer.Close(); err != nil {
log.Fatalln(err)
}
}()
data := []string{}
consumed := 0
for {
select {
case msg := <-partitionConsumer.Messages():
log.Printf("Consumed message offset %d\n", msg.Offset)
consumed++
data = append(data, msg)
if data == "" { // ???
break
}
}
}
log.Printf("Consumed: %d\n", consumed)
return data, nil
}
// Close use to close storage and release resources.
func (s *kafkaStorage) Close() error {
if err1 := s.producer.Close(); err1 != nil {
log.Println(err1)
}
if err2 := s.consumer.Close(); err2 != nil {
log.Println(err2)
}
return nil
}