forked from waku-org/go-waku
-
Notifications
You must be signed in to change notification settings - Fork 0
/
store.go
127 lines (106 loc) · 3.51 KB
/
store.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
package library
import (
"C"
"encoding/json"
"github.com/waku-org/go-waku/waku/v2/protocol/legacy_store/pb"
wpb "github.com/waku-org/go-waku/waku/v2/protocol/pb"
)
import (
"context"
"time"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/waku-org/go-waku/waku/v2/protocol/legacy_store"
)
type storePagingOptions struct {
PageSize uint64 `json:"pageSize,omitempty"`
Cursor *pb.Index `json:"cursor,omitempty"`
Forward bool `json:"forward,omitempty"`
}
type storeMessagesArgs struct {
Topic string `json:"pubsubTopic,omitempty"`
ContentTopics []string `json:"contentTopics,omitempty"`
StartTime *int64 `json:"startTime,omitempty"`
EndTime *int64 `json:"endTime,omitempty"`
PagingOptions *storePagingOptions `json:"pagingOptions,omitempty"`
}
type storeMessagesReply struct {
Messages []*wpb.WakuMessage `json:"messages,omitempty"`
PagingInfo storePagingOptions `json:"pagingInfo,omitempty"`
Error string `json:"error,omitempty"`
}
func queryResponse(ctx context.Context, instance *WakuInstance, args storeMessagesArgs, options []legacy_store.HistoryRequestOption) (string, error) {
res, err := instance.node.LegacyStore().Query(
ctx,
legacy_store.Query{
PubsubTopic: args.Topic,
ContentTopics: args.ContentTopics,
StartTime: args.StartTime,
EndTime: args.EndTime,
},
options...,
)
reply := storeMessagesReply{}
if err != nil {
reply.Error = err.Error()
return marshalJSON(reply)
}
reply.Messages = res.Messages
reply.PagingInfo = storePagingOptions{
PageSize: args.PagingOptions.PageSize,
Cursor: res.Cursor(),
Forward: args.PagingOptions.Forward,
}
return marshalJSON(reply)
}
// StoreQuery is used to retrieve historic messages using waku store protocol.
func StoreQuery(instance *WakuInstance, queryJSON string, peerID string, ms int) (string, error) {
if err := validateInstance(instance, MustBeStarted); err != nil {
return "", err
}
var args storeMessagesArgs
err := json.Unmarshal([]byte(queryJSON), &args)
if err != nil {
return "", err
}
options := []legacy_store.HistoryRequestOption{
legacy_store.WithAutomaticRequestID(),
legacy_store.WithPaging(args.PagingOptions.Forward, args.PagingOptions.PageSize),
legacy_store.WithCursor(args.PagingOptions.Cursor),
}
if peerID != "" {
p, err := peer.Decode(peerID)
if err != nil {
return "", err
}
options = append(options, legacy_store.WithPeer(p))
} else {
options = append(options, legacy_store.WithAutomaticPeerSelection())
}
var ctx context.Context
var cancel context.CancelFunc
if ms > 0 {
ctx, cancel = context.WithTimeout(instance.ctx, time.Duration(int(ms))*time.Millisecond)
defer cancel()
} else {
ctx = instance.ctx
}
return queryResponse(ctx, instance, args, options)
}
// StoreLocalQuery is used to retrieve historic messages stored in the localDB using waku store protocol.
func StoreLocalQuery(instance *WakuInstance, queryJSON string) (string, error) {
if err := validateInstance(instance, MustBeStarted); err != nil {
return "", err
}
var args storeMessagesArgs
err := json.Unmarshal([]byte(queryJSON), &args)
if err != nil {
return "", err
}
options := []legacy_store.HistoryRequestOption{
legacy_store.WithAutomaticRequestID(),
legacy_store.WithPaging(args.PagingOptions.Forward, args.PagingOptions.PageSize),
legacy_store.WithCursor(args.PagingOptions.Cursor),
legacy_store.WithLocalQuery(),
}
return queryResponse(instance.ctx, instance, args, options)
}