-
Notifications
You must be signed in to change notification settings - Fork 1
/
examples.go
155 lines (137 loc) · 5.03 KB
/
examples.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
package fastce
import (
"fmt"
"log"
"net"
"net/http"
j "github.com/creativecactus/fast-cloudevents-go/jsonce"
"github.com/valyala/fasthttp"
)
// ExampleCEClientCEServerImplementation shows a predictable example using ExampleCEClientCEServer
func ExampleCEClientCEServerImplementation() {
ces := []j.CloudEvent{
j.CloudEvent{
Source: "Example",
},
}
mode := j.ModeBinary
ces, err := ExampleCEClientCEServer(ces, mode)
if err != nil {
fmt.Println(err.Error())
return
}
fmt.Printf("Received: %d, first has Source:%s\n", len(ces), ces[0].Source)
// Output: Received: 1, first has Source:Example
}
// ExampleCEClientCEServer shows an example of using both a CEClient and CEServer together
// Notice that some pointer shuffling is needed to have the client response (inbound) point
// to the server response (outbound). This is where HTTP would usually sit.
// This is not needed on the requesting side because of how RequestCtx is used.
func ExampleCEClientCEServer(ces []j.CloudEvent, mode j.Mode) (result []j.CloudEvent, err error) {
// Init client
cec, err := NewCEClient("PUT", "")
if err != nil {
err = fmt.Errorf("Example failed to Init: %s", err.Error())
return
}
defer cec.Release()
// Client send request to server
err = cec.SendEvents(j.DefaultCEToMap, ces, mode)
if err != nil {
err = fmt.Errorf("Example failed to Send: %s", err.Error())
return
}
// Server receive and respond
// err = cec.Send() // No actual HTTP request in this example
ces, mode, err = GetEvents(j.DefaultMapToCE, cec.Request)
if err != nil {
err = fmt.Errorf("Example failed to Get: %s", err.Error())
return
}
if len(ces) == 0 {
err = fmt.Errorf("Example Get returned 0 events")
return
}
err = SetEvents(j.DefaultCEToMap, cec.Response, ces, mode)
if err != nil {
err = fmt.Errorf("Example failed to Set: %s", err.Error())
return
}
// Client receive response
ces, mode, err = cec.RecvEvents(j.DefaultMapToCE)
if err != nil {
err = fmt.Errorf("Example failed to Recv: %s", err.Error())
return
}
result = ces
return
}
// SimpleServer shows the highest level interface for fastce, as well as a mechanism for low level configuration
// listenAddr should be an interface:port such as 0.0.0.0:0. If port is 0, next available free port is used
func SimpleServer(listenAddr string) error {
// An example of a custom unmarshal function
MyMapToCE := func(cm j.CEMap) (ce j.CloudEvent, err error) {
// In this example, we still want to perform the DefaultCEToMap validation
// But we will automatically generate an ID if it is not present
if id, ok := cm["id"].(string); !ok || len(id) < 1 {
cm["id"] = "SomeRandomRuntimeGeneratedID"
}
return j.DefaultMapToCE(cm)
}
handler := func(ces j.CloudEvents) (res j.CloudEvents, err error) {
// This is a simple echo server
res = ces
return
}
// In cases where HTTP level actions need to be taken (setting headers, routing), use ListenAndServeHTTP
// In cases where an external server is used and you don't want to pass the request down to this server,
// you can use the fastce.Get*/Set* functions directly (Send*/Recv* for clients).
return CEServer{}.ListenAndServeCE(listenAddr, j.DefaultCEToMap, MyMapToCE, handler)
}
// ExampleServer shows an example implementation with a fasthttp server.
// listenAddr should be an interface:port such as 0.0.0.0:0. If port is 0, next available free port is used
// handler is a function to handle fasthttp.RequestCtx, such as ExampleHandler
// Returns the server created, a channel for receiving fatal errors (nil if .Shutdown() gracefully),
// the address used to listen (useful if the provided listenAddr has a 0 port), any init error.
func ExampleServer(listenAddr string, handler func(ctx *fasthttp.RequestCtx)) (server *fasthttp.Server, shutdownErr <-chan error, addr string, err error) {
server = &fasthttp.Server{
Handler: handler,
}
listener, err := net.Listen("tcp", listenAddr)
if err != nil {
err = fmt.Errorf("Listen error: %s", err.Error())
return
}
listenAddr = fmt.Sprintf("http://%s", listener.Addr().String())
shutdown := make(chan error)
log.Printf("Listening on %s", listenAddr)
go func() {
err := server.Serve(listener)
shutdown <- err
}()
shutdownErr = shutdown
return server, shutdownErr, listenAddr, nil
}
// ExampleHandler shows an example implementation of a fasthttp requestCtx handler.
// It responds with a hard coded string if the url is /info (any method)
// Otherwise it acts as a CloudEvents echo server.
func ExampleHandler(ctx *fasthttp.RequestCtx) {
switch p := string(ctx.Path()); p {
case "/info":
// If we had a storage interface here, we could show
// a projected count of events stored, for example.
ctx.Write([]byte("Example Server"))
break
default:
ces, mode, err := GetEvents(j.DefaultMapToCE, &ctx.Request)
if err != nil {
log.Printf("ERR: %s", err.Error())
ctx.Error(err.Error(), http.StatusBadRequest)
return
} else {
log.Printf("OK : Received %d events in mode %d\n", len(ces), mode)
}
// log.Printf("\tData: %#v\n", ces)
SetEvents(j.DefaultCEToMap, &ctx.Response, ces, mode)
}
}