forked from freqtrade/freqtrade
-
Notifications
You must be signed in to change notification settings - Fork 0
/
convert_backtestdata.py
executable file
·201 lines (166 loc) · 6.55 KB
/
convert_backtestdata.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
#!/usr/bin/env python3
"""
Script to display when the bot will buy a specific pair
Mandatory Cli parameters:
-p / --pair: pair to examine
Optional Cli parameters
-d / --datadir: path to pair backtest data
--timerange: specify what timerange of data to use.
-l / --live: Live, to download the latest ticker for the pair
"""
import logging
import sys
from argparse import Namespace
from os import path
import glob
import json
import re
from typing import List, Dict
import gzip
from freqtrade.arguments import Arguments
from freqtrade import misc, constants
from pandas import DataFrame
import dateutil.parser
logger = logging.getLogger('freqtrade')
def load_old_file(filename) -> (List[Dict], bool):
if not path.isfile(filename):
logger.warning("filename %s does not exist", filename)
return (None, False)
logger.debug('Loading ticker data from file %s', filename)
pairdata = None
if filename.endswith('.gz'):
logger.debug('Loading ticker data from file %s', filename)
is_zip = True
with gzip.open(filename) as tickerdata:
pairdata = json.load(tickerdata)
else:
is_zip = False
with open(filename) as tickerdata:
pairdata = json.load(tickerdata)
return (pairdata, is_zip)
def parse_old_backtest_data(ticker) -> DataFrame:
"""
Reads old backtest data
Format: "O": 8.794e-05,
"H": 8.948e-05,
"L": 8.794e-05,
"C": 8.88e-05,
"V": 991.09056638,
"T": "2017-11-26T08:50:00",
"BV": 0.0877869
"""
columns = {'C': 'close', 'V': 'volume', 'O': 'open',
'H': 'high', 'L': 'low', 'T': 'date'}
frame = DataFrame(ticker) \
.rename(columns=columns)
if 'BV' in frame:
frame.drop('BV', 1, inplace=True)
if 'date' not in frame:
logger.warning("Date not in frame - probably not a Ticker file")
return None
frame.sort_values('date', inplace=True)
return frame
def convert_dataframe(frame: DataFrame):
"""Convert dataframe to new format"""
# reorder columns:
cols = ['date', 'open', 'high', 'low', 'close', 'volume']
frame = frame[cols]
# Make sure parsing/printing data is assumed to be UTC
frame['date'] = frame['date'].apply(
lambda d: int(dateutil.parser.parse(d+'+00:00').timestamp()) * 1000)
frame['date'] = frame['date'].astype('int64')
# Convert columns one by one to preserve type.
by_column = [frame[x].values.tolist() for x in frame.columns]
return list(list(x) for x in zip(*by_column))
def convert_file(filename: str, filename_new: str) -> None:
"""Converts a file from old format to ccxt format"""
(pairdata, is_zip) = load_old_file(filename)
if pairdata and type(pairdata) is list:
if type(pairdata[0]) is list:
logger.error("pairdata for %s already in new format", filename)
return
frame = parse_old_backtest_data(pairdata)
# Convert frame to new format
if frame is not None:
frame1 = convert_dataframe(frame)
misc.file_dump_json(filename_new, frame1, is_zip)
def convert_main(args: Namespace) -> None:
"""
converts a folder given in --datadir from old to new format to support ccxt
"""
workdir = path.join(args.datadir, "")
logger.info("Workdir: %s", workdir)
for filename in glob.glob(workdir + "*.json"):
# swap currency names
ret = re.search(r'[A-Z_]{7,}', path.basename(filename))
if args.norename:
filename_new = filename
else:
if not ret:
logger.warning("file %s could not be converted, could not extract currencies",
filename)
continue
pair = ret.group(0)
currencies = pair.split("_")
if len(currencies) != 2:
logger.warning("file %s could not be converted, could not extract currencies",
filename)
continue
ret_integer = re.search(r'\d+(?=\.json)', path.basename(filename))
ret_string = re.search(r'(\d+[mhdw])(?=\.json)', path.basename(filename))
if ret_integer:
minutes = int(ret_integer.group(0))
# default to adding 'm' to end of minutes for new interval name
interval = str(minutes) + 'm'
# but check if there is a mapping between int and string also
for str_interval, minutes_interval in constants.TICKER_INTERVAL_MINUTES.items():
if minutes_interval == minutes:
interval = str_interval
break
# change order on pairs if old ticker interval found
filename_new = path.join(path.dirname(filename),
"{}_{}-{}.json".format(currencies[1],
currencies[0], interval))
elif ret_string:
interval = ret_string.group(0)
filename_new = path.join(path.dirname(filename),
"{}_{}-{}.json".format(currencies[0],
currencies[1], interval))
else:
logger.warning("file %s could not be converted, interval not found", filename)
continue
logger.debug("Converting and renaming %s to %s", filename, filename_new)
convert_file(filename, filename_new)
def convert_parse_args(args: List[str]) -> Namespace:
"""
Parse args passed to the script
:param args: Cli arguments
:return: args: Array with all arguments
"""
arguments = Arguments(args, 'Convert datafiles')
arguments.parser.add_argument(
'-d', '--datadir',
help='path to backtest data (default: %(default)s',
dest='datadir',
default=path.join('freqtrade', 'tests', 'testdata'),
type=str,
metavar='PATH',
)
arguments.parser.add_argument(
'-n', '--norename',
help='don''t rename files from BTC_<PAIR> to <PAIR>_BTC - '
'Note that not renaming will overwrite source files',
dest='norename',
default=False,
action='store_true'
)
return arguments.parse_args()
def main(sysargv: List[str]) -> None:
"""
This function will initiate the bot and start the trading loop.
:return: None
"""
logger.info('Starting Dataframe conversation')
convert_main(convert_parse_args(sysargv))
if __name__ == '__main__':
main(sys.argv[1:])