-
Notifications
You must be signed in to change notification settings - Fork 0
/
15-backtest-trading-strategy.Rmd
264 lines (212 loc) · 10 KB
/
15-backtest-trading-strategy.Rmd
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
# Backtest trading strategy
## Objectives
- overview of requirements
- implement the storing task
- test the backtesting
## Overview of requirements
In the last chapter, we started storing trade events and orders in the database which will be crucial for backtesting, which we will focus on in this chapter.
Backtesting is a procedure of running historical data through the system and observing how our strategy would perform as if we would run it "in the past". Backtesting works on assumption that the market will behave in a similar fashion in the future as it was in the past.
At this moment we are receiving the trade events from the Binance through WebSocket. The `Streamer.Binance` process is handling those messages by parsing them from JSON string to map, then converting them to structs and broadcasting them to the `TRADE_EVENTS:#{symbol}` PubSub topic. The `Naive.Trader` subscribes to the `TRADE_EVENTS:#{symbol}` topic and takes decisions based on incoming data. As it places buy and sell orders it broadcasts them to the `ORDERS:#{symbol}` PubSub topic. The `DataWarehouse.Subscriber.Worker` processes subscribe to both trade events and orders topics and store incoming data inside the database - we can visualize that flow like that:
```{r, fig.align="center", out.width="60%", echo=FALSE}
knitr::include_graphics("images/chapter_14_02_current_pubsub.png")
```
To backtest we can substitute the `Streamer.Binance` process with a `Task` that will `stream` trade events' data from the database and broadcasts it to the `TRADE_EVENTS:#{symbol}` PubSub topic(the same topic as the `Streamer.Binance` process).
From the perspective of the `Naive.Trader` it *does not* make any difference who is broadcasting those trade events. This should be a clear indication of the value of publish/subscribe model that we implemented from the beginning. It allows us to swap producer and consumers freely to backtest our trading strategies:
```{r, fig.align="center", out.width="60%", echo=FALSE}
knitr::include_graphics("images/chapter_14_03_backtest_pubsub.png")
```
## Implement the storing task
We will start by creating a new file called `publisher.ex` inside the
`apps/data_warehouse/lib/data_warehouse` directory. We will start by implementing the basic `Task` behavior:
```{r, engine = 'elixir', eval = FALSE}
# /apps/data_warehouse/lib/data_warehouse/publisher.ex
defmodule DataWarehouse.Publisher do
use Task
def start_link(arg) do
Task.start_link(__MODULE__, :run, [arg])
end
def run(arg) do
# ...
end
end
```
To be able to query the database we will import `Ecto` and require `Logger` for logging:
```{r, engine = 'elixir', eval = FALSE}
# /apps/data_warehouse/lib/data_warehouse/publisher.ex
...
import Ecto.Query, only: [from: 2]
require Logger
...
```
We can now modify the `run/1` function to expect specific `type`, `symbol`, `from`, `to` and `interval`:
```{r, engine = 'elixir', eval = FALSE}
# /apps/data_warehouse/lib/data_warehouse/publisher.ex
...
def run(%{
type: :trade_events,
symbol: symbol,
from: from,
to: to,
interval: interval
}) do
...
```
Inside the body of the `run/1` function, first, we will convert `from` and `to` Unix timestamps by using private helper functions as well as make sure that the passed symbol is uppercase:
```{r, engine = 'elixir', eval = FALSE}
# /apps/data_warehouse/lib/data_warehouse/publisher.ex
...
def run(%{
...
}) do
symbol = String.upcase(symbol)
from_ts =
"#{from}T00:00:00.000Z"
|> convert_to_ms()
to_ts =
"#{to}T23:59:59.000Z"
|> convert_to_ms()
end
...
defp convert_to_ms(iso8601DateString) do
iso8601DateString
|> NaiveDateTime.from_iso8601!()
|> DateTime.from_naive!("Etc/UTC")
|> DateTime.to_unix()
|> Kernel.*(1000)
end
```
Next, we will select data from the database but because of possibly hundreds of thousands of rows being selected and because we are broadcasting them to the PubSub every x ms it could take a substantial amount of time to `broadcast` all of them. Instead of `select`ing data and storing all of it in the memory, we will use `Repo.stream/1` function to keep `broadcast`ing it on the go. Additionally, we will add `index` to the data to be able to log info messages every 10k messages. The last thing that we need to define will be the timeout value - the default value is 5 seconds and we will change it to `:infinity`:
```{r, engine = 'elixir', eval = FALSE}
# /apps/data_warehouse/lib/data_warehouse/publisher.ex
def run(%{
...
}) do
...
DataWarehouse.Repo.transaction(
fn ->
from(te in DataWarehouse.Schema.TradeEvent,
where:
te.symbol == ^symbol and
te.trade_time >= ^from_ts and
te.trade_time < ^to_ts,
order_by: te.trade_time
)
|> DataWarehouse.Repo.stream()
|> Enum.with_index()
|> Enum.map(fn {row, index} ->
:timer.sleep(interval)
if rem(index, 10_000) == 0 do
Logger.info("Publisher broadcasted #{index} events")
end
publish_trade_event(row)
end)
end,
timeout: :infinity
)
Logger.info("Publisher finished streaming trade events")
end
```
Finally, the above code uses the `publish_trade_event/1` helper function which converts DataWarehouse's TradeEvent to the Stremer's TradeEvent to broadcast the same structs as the `streamer` application:
```{r, engine = 'elixir', eval = FALSE}
# /apps/data_warehouse/lib/data_warehouse/publisher.ex
...
defp publish_trade_event(%DataWarehouse.Schema.TradeEvent{} = trade_event) do
new_trade_event =
struct(
Streamer.Binance.TradeEvent,
trade_event |> Map.to_list()
)
Phoenix.PubSub.broadcast(
Streamer.PubSub,
"TRADE_EVENTS:#{trade_event.symbol}",
new_trade_event
)
end
```
\newpage
We also need to remember about keeping the interface tidy so we will add `publish_data` to the `DataWarehouse` module:
```{r, engine = 'elixir', eval = FALSE}
...
def publish_data(args) do
DataWarehouse.Publisher.start_link(args)
end
...
```
This finishes our implementation - we should be able to stream trade events from the database to the PubSub using the above Task which we will do below.
## Test the backtesting
For consistency and ease of testing/use, I prepared an compressed single data of trade events for XRPUSDT(2019-06-03). We can download that file from GitHub using `wget`:
```{r, engine = 'bash', eval = FALSE}
$ cd /tmp
$ wget https://github.com/Cinderella-Man/binance-trade-events \
/raw/master/XRPUSDT/XRPUSDT-2019-06-03.csv.gz
```
We can now uncompress the archive and load those trade events into our database:
```{r, engine = 'bash', eval = FALSE}
$ gunzip XRPUSDT-2019-06-03.csv.gz
$ PGPASSWORD=postgres psql -Upostgres -h localhost -ddata_warehouse \
-c "\COPY trade_events FROM '/tmp/XRPUSDT-2019-06-03.csv' WITH (FORMAT csv, delimiter ';');"
COPY 206115
```
The number after the word `COPY` in the response indicates the number of rows that got copied into the database.
We can now give it a try and run full backtesting but first let's clean the orders table:
```{r, engine = 'bash', eval = FALSE}
$ psql -Upostgres -h127.0.0.1
Password for user postgres:
...
postgres=# \c data_warehouse
You are now connected to database "data_warehouse" as user "postgres".
data_warehouse=# DELETE FROM orders;
DELETE ...
```
We can now start a new iex session where we will start trading(the `naive` application) as well as storing orders(the `data_warehouse` application) and instead of starting the `Streamer.Binance` worker we will start the `DataWarehouse.Publisher` task with arguments matching the imported day and symbol:
```{r, engine = 'bash', eval = FALSE}
$ iex -S mix
...
iex(1)> DataWarehouse.start_storing("ORDERS", "XRPUSDT")
19:17:59.596 [info] Starting storing data from ORDERS:XRPUSDT topic
19:17:59.632 [info] DataWarehouse worker is subscribing to ORDERS:XRPUSDT
{:ok, #PID<0.417.0>}
iex(2)> Naive.start_trading("XRPUSDT")
19:18:16.293 [info] Starting Elixir.Naive.SymbolSupervisor worker for XRPUSDT
19:18:16.332 [info] Starting new supervision tree to trade on XRPUSDT
{:ok, #PID<0.419.0>}
19:18:18.327 [info] Initializing new trader(1615288698325) for XRPUSDT
iex(3)> DataWarehouse.publish_data(%{
type: :trade_events,
symbol: "XRPUSDT",
from: "2019-06-02",
to: "2019-06-04",
interval: 5
})
{:ok, #PID<0.428.0>}
19:19:07.532 [info] Publisher broadcasted 0 events
19:19:07.534 [info] The trader(1615288698325) is placing a BUY order for
XRPUSDT @ 0.44391, quantity: 450.5
19:19:07.749 [info] The trader(1615288698325) is placing a SELL order for
XRPUSDT @ 0.44426, quantity: 450.5.
...
19:20:07.568 [info] Publisher broadcasted 10000 events
...
19:21:07.571 [info] Publisher broadcasted 20000 events
19:22:07.576 [info] Publisher broadcasted 30000 events
...
19:39:07.875 [info] Publisher broadcasted 200000 events
19:39:44.576 [info] Publisher finished streaming trade events
```
From the above log, we can see that it took about 20 minutes to run 206k records through the system(a lot of that time[17+ minutes] was indeed the 5ms `sleep`).
After the streaming finished we can check out the orders table inside the database to figure out how many trades we made and what income have they generated.
```{r, engine = 'bash', eval = FALSE}
$ psql -Upostgres -h127.0.0.1
Password for user postgres:
...
postgres=# \c data_warehouse
You are now connected to database "data_warehouse" as user "postgres".
data_warehouse=# SELECT COUNT(*) FROM orders;
count
-------
224
(1 row)
```
By looking at the orders we can figure out some performance metrics but that's less than perfect to get answers to simple questions like "what's the performance of my strategy?". We will address that and other concerns in future chapters.
[Note] Please remember to run `mix format` to keep things nice and tidy.
Source code for this chapter can be found at [Github](https://github.com/frathon/create-a-cryptocurrency-trading-bot-in-elixir-source-code/tree/chapter_15)