forked from Chia-Network/chia-blockchain
-
Notifications
You must be signed in to change notification settings - Fork 0
/
init_funcs.py
377 lines (323 loc) · 14.7 KB
/
init_funcs.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
import os
import shutil
from pathlib import Path
from typing import Any, Dict, List, Optional, Tuple
import yaml
from chia import __version__
from chia.consensus.coinbase import create_puzzlehash_for_pk
from chia.ssl.create_ssl import (
ensure_ssl_dirs,
generate_ca_signed_cert,
get_chia_ca_crt_key,
make_ca_cert,
write_ssl_cert_and_key,
)
from chia.util.bech32m import encode_puzzle_hash
from chia.util.config import (
create_default_chia_config,
initial_config_file,
load_config,
save_config,
unflatten_properties,
)
from chia.util.ints import uint32
from chia.util.keychain import Keychain
from chia.util.path import mkdir
from chia.util.ssl import (
DEFAULT_PERMISSIONS_CERT_FILE,
DEFAULT_PERMISSIONS_KEY_FILE,
RESTRICT_MASK_CERT_FILE,
RESTRICT_MASK_KEY_FILE,
check_and_fix_permissions_for_ssl_file,
fix_ssl,
)
from chia.wallet.derive_keys import master_sk_to_pool_sk, master_sk_to_wallet_sk
private_node_names = {"full_node", "wallet", "farmer", "harvester", "timelord", "daemon"}
public_node_names = {"full_node", "wallet", "farmer", "introducer", "timelord"}
def dict_add_new_default(updated: Dict, default: Dict, do_not_migrate_keys: Dict[str, Any]):
for k in do_not_migrate_keys:
if k in updated and do_not_migrate_keys[k] == "":
updated.pop(k)
for k, v in default.items():
ignore = False
if k in do_not_migrate_keys:
do_not_data = do_not_migrate_keys[k]
if isinstance(do_not_data, dict):
ignore = False
else:
ignore = True
if isinstance(v, dict) and k in updated and ignore is False:
# If there is an intermediate key with empty string value, do not migrate all descendants
if do_not_migrate_keys.get(k, None) == "":
do_not_migrate_keys[k] = v
dict_add_new_default(updated[k], default[k], do_not_migrate_keys.get(k, {}))
elif k not in updated or ignore is True:
updated[k] = v
def check_keys(new_root: Path, keychain: Optional[Keychain] = None) -> None:
if keychain is None:
keychain = Keychain()
all_sks = keychain.get_all_private_keys()
if len(all_sks) == 0:
print("No keys are present in the keychain. Generate them with 'chia keys generate'")
return None
config: Dict = load_config(new_root, "config.yaml")
pool_child_pubkeys = [master_sk_to_pool_sk(sk).get_g1() for sk, _ in all_sks]
all_targets = []
stop_searching_for_farmer = "xch_target_address" not in config["farmer"]
stop_searching_for_pool = "xch_target_address" not in config["pool"]
number_of_ph_to_search = 500
selected = config["selected_network"]
prefix = config["network_overrides"]["config"][selected]["address_prefix"]
for i in range(number_of_ph_to_search):
if stop_searching_for_farmer and stop_searching_for_pool and i > 0:
break
for sk, _ in all_sks:
all_targets.append(
encode_puzzle_hash(create_puzzlehash_for_pk(master_sk_to_wallet_sk(sk, uint32(i)).get_g1()), prefix)
)
if all_targets[-1] == config["farmer"].get("xch_target_address"):
stop_searching_for_farmer = True
if all_targets[-1] == config["pool"].get("xch_target_address"):
stop_searching_for_pool = True
# Set the destinations, if necessary
updated_target: bool = False
if "xch_target_address" not in config["farmer"]:
print(
f"Setting the xch destination for the farmer reward (1/8 plus fees, solo and pooling) to {all_targets[0]}"
)
config["farmer"]["xch_target_address"] = all_targets[0]
updated_target = True
elif config["farmer"]["xch_target_address"] not in all_targets:
print(
f"WARNING: using a farmer address which we don't have the private"
f" keys for. We searched the first {number_of_ph_to_search} addresses. Consider overriding "
f"{config['farmer']['xch_target_address']} with {all_targets[0]}"
)
if "pool" not in config:
config["pool"] = {}
if "xch_target_address" not in config["pool"]:
print(f"Setting the xch destination address for pool reward (7/8 for solo only) to {all_targets[0]}")
config["pool"]["xch_target_address"] = all_targets[0]
updated_target = True
elif config["pool"]["xch_target_address"] not in all_targets:
print(
f"WARNING: using a pool address which we don't have the private"
f" keys for. We searched the first {number_of_ph_to_search} addresses. Consider overriding "
f"{config['pool']['xch_target_address']} with {all_targets[0]}"
)
if updated_target:
print(
f"To change the XCH destination addresses, edit the `xch_target_address` entries in"
f" {(new_root / 'config' / 'config.yaml').absolute()}."
)
# Set the pool pks in the farmer
pool_pubkeys_hex = set(bytes(pk).hex() for pk in pool_child_pubkeys)
if "pool_public_keys" in config["farmer"]:
for pk_hex in config["farmer"]["pool_public_keys"]:
# Add original ones in config
pool_pubkeys_hex.add(pk_hex)
config["farmer"]["pool_public_keys"] = pool_pubkeys_hex
save_config(new_root, "config.yaml", config)
def copy_files_rec(old_path: Path, new_path: Path):
if old_path.is_file():
print(f"{new_path}")
mkdir(new_path.parent)
shutil.copy(old_path, new_path)
elif old_path.is_dir():
for old_path_child in old_path.iterdir():
new_path_child = new_path / old_path_child.name
copy_files_rec(old_path_child, new_path_child)
def migrate_from(
old_root: Path,
new_root: Path,
manifest: List[str],
do_not_migrate_settings: List[str],
):
"""
Copy all the files in "manifest" to the new config directory.
"""
if old_root == new_root:
print("same as new path, exiting")
return 1
if not old_root.is_dir():
print(f"{old_root} not found - this is ok if you did not install this version")
return 0
print(f"\n{old_root} found")
print(f"Copying files from {old_root} to {new_root}\n")
for f in manifest:
old_path = old_root / f
new_path = new_root / f
copy_files_rec(old_path, new_path)
# update config yaml with new keys
config: Dict = load_config(new_root, "config.yaml")
config_str: str = initial_config_file("config.yaml")
default_config: Dict = yaml.safe_load(config_str)
flattened_keys = unflatten_properties({k: "" for k in do_not_migrate_settings})
dict_add_new_default(config, default_config, flattened_keys)
save_config(new_root, "config.yaml", config)
create_all_ssl(new_root)
return 1
def create_all_ssl(root_path: Path):
# remove old key and crt
config_dir = root_path / "config"
old_key_path = config_dir / "trusted.key"
old_crt_path = config_dir / "trusted.crt"
if old_key_path.exists():
print(f"Old key not needed anymore, deleting {old_key_path}")
os.remove(old_key_path)
if old_crt_path.exists():
print(f"Old crt not needed anymore, deleting {old_crt_path}")
os.remove(old_crt_path)
ssl_dir = config_dir / "ssl"
ca_dir = ssl_dir / "ca"
ensure_ssl_dirs([ssl_dir, ca_dir])
private_ca_key_path = ca_dir / "private_ca.key"
private_ca_crt_path = ca_dir / "private_ca.crt"
chia_ca_crt, chia_ca_key = get_chia_ca_crt_key()
chia_ca_crt_path = ca_dir / "chia_ca.crt"
chia_ca_key_path = ca_dir / "chia_ca.key"
write_ssl_cert_and_key(chia_ca_crt_path, chia_ca_crt, chia_ca_key_path, chia_ca_key)
if not private_ca_key_path.exists() or not private_ca_crt_path.exists():
# Create private CA
print(f"Can't find private CA, creating a new one in {root_path} to generate TLS certificates")
make_ca_cert(private_ca_crt_path, private_ca_key_path)
# Create private certs for each node
ca_key = private_ca_key_path.read_bytes()
ca_crt = private_ca_crt_path.read_bytes()
generate_ssl_for_nodes(ssl_dir, ca_crt, ca_key, True)
else:
# This is entered when user copied over private CA
print(f"Found private CA in {root_path}, using it to generate TLS certificates")
ca_key = private_ca_key_path.read_bytes()
ca_crt = private_ca_crt_path.read_bytes()
generate_ssl_for_nodes(ssl_dir, ca_crt, ca_key, True)
chia_ca_crt, chia_ca_key = get_chia_ca_crt_key()
generate_ssl_for_nodes(ssl_dir, chia_ca_crt, chia_ca_key, False, overwrite=False)
def generate_ssl_for_nodes(ssl_dir: Path, ca_crt: bytes, ca_key: bytes, private: bool, overwrite=True):
if private:
names = private_node_names
else:
names = public_node_names
for node_name in names:
node_dir = ssl_dir / node_name
ensure_ssl_dirs([node_dir])
if private:
prefix = "private"
else:
prefix = "public"
key_path = node_dir / f"{prefix}_{node_name}.key"
crt_path = node_dir / f"{prefix}_{node_name}.crt"
if key_path.exists() and crt_path.exists() and overwrite is False:
continue
generate_ca_signed_cert(ca_crt, ca_key, crt_path, key_path)
def copy_cert_files(cert_path: Path, new_path: Path):
for old_path_child in cert_path.glob("*.crt"):
new_path_child = new_path / old_path_child.name
copy_files_rec(old_path_child, new_path_child)
check_and_fix_permissions_for_ssl_file(new_path_child, RESTRICT_MASK_CERT_FILE, DEFAULT_PERMISSIONS_CERT_FILE)
for old_path_child in cert_path.glob("*.key"):
new_path_child = new_path / old_path_child.name
copy_files_rec(old_path_child, new_path_child)
check_and_fix_permissions_for_ssl_file(new_path_child, RESTRICT_MASK_KEY_FILE, DEFAULT_PERMISSIONS_KEY_FILE)
def init(create_certs: Optional[Path], root_path: Path, fix_ssl_permissions: bool = False):
if create_certs is not None:
if root_path.exists():
if os.path.isdir(create_certs):
ca_dir: Path = root_path / "config/ssl/ca"
if ca_dir.exists():
print(f"Deleting your OLD CA in {ca_dir}")
shutil.rmtree(ca_dir)
print(f"Copying your CA from {create_certs} to {ca_dir}")
copy_cert_files(create_certs, ca_dir)
create_all_ssl(root_path)
else:
print(f"** Directory {create_certs} does not exist **")
else:
print(f"** {root_path} does not exist. Executing core init **")
# sanity check here to prevent infinite recursion
if chia_init(root_path, fix_ssl_permissions=fix_ssl_permissions) == 0 and root_path.exists():
return init(create_certs, root_path, fix_ssl_permissions)
print(f"** {root_path} was not created. Exiting **")
return -1
else:
return chia_init(root_path, fix_ssl_permissions=fix_ssl_permissions)
def chia_version_number() -> Tuple[str, str, str, str]:
scm_full_version = __version__
left_full_version = scm_full_version.split("+")
version = left_full_version[0].split(".")
scm_major_version = version[0]
scm_minor_version = version[1]
if len(version) > 2:
smc_patch_version = version[2]
patch_release_number = smc_patch_version
else:
smc_patch_version = ""
major_release_number = scm_major_version
minor_release_number = scm_minor_version
dev_release_number = ""
# If this is a beta dev release - get which beta it is
if "0b" in scm_minor_version:
original_minor_ver_list = scm_minor_version.split("0b")
major_release_number = str(1 - int(scm_major_version)) # decrement the major release for beta
minor_release_number = scm_major_version
patch_release_number = original_minor_ver_list[1]
if smc_patch_version and "dev" in smc_patch_version:
dev_release_number = "." + smc_patch_version
elif "0rc" in version[1]:
original_minor_ver_list = scm_minor_version.split("0rc")
major_release_number = str(1 - int(scm_major_version)) # decrement the major release for release candidate
minor_release_number = str(int(scm_major_version) + 1) # RC is 0.2.1 for RC 1
patch_release_number = original_minor_ver_list[1]
if smc_patch_version and "dev" in smc_patch_version:
dev_release_number = "." + smc_patch_version
else:
major_release_number = scm_major_version
minor_release_number = scm_minor_version
patch_release_number = smc_patch_version
dev_release_number = ""
install_release_number = major_release_number + "." + minor_release_number
if len(patch_release_number) > 0:
install_release_number += "." + patch_release_number
if len(dev_release_number) > 0:
install_release_number += dev_release_number
return major_release_number, minor_release_number, patch_release_number, dev_release_number
def chia_minor_release_number():
res = int(chia_version_number()[2])
print(f"Install release number: {res}")
return res
def chia_full_version_str() -> str:
major, minor, patch, dev = chia_version_number()
return f"{major}.{minor}.{patch}{dev}"
def chia_init(root_path: Path, *, should_check_keys: bool = True, fix_ssl_permissions: bool = False):
"""
Standard first run initialization or migration steps. Handles config creation,
generation of SSL certs, and setting target addresses (via check_keys).
should_check_keys can be set to False to avoid blocking when accessing a passphrase
protected Keychain. When launching the daemon from the GUI, we want the GUI to
handle unlocking the keychain.
"""
if os.environ.get("CHIA_ROOT", None) is not None:
print(
f"warning, your CHIA_ROOT is set to {os.environ['CHIA_ROOT']}. "
f"Please unset the environment variable and run chia init again\n"
f"or manually migrate config.yaml"
)
print(f"Chia directory {root_path}")
if root_path.is_dir() and Path(root_path / "config" / "config.yaml").exists():
# This is reached if CHIA_ROOT is set, or if user has run chia init twice
# before a new update.
if fix_ssl_permissions:
fix_ssl(root_path)
if should_check_keys:
check_keys(root_path)
print(f"{root_path} already exists, no migration action taken")
return -1
create_default_chia_config(root_path)
create_all_ssl(root_path)
if fix_ssl_permissions:
fix_ssl(root_path)
if should_check_keys:
check_keys(root_path)
print("")
print("To see your keys, run 'chia keys show --show-mnemonic-seed'")
return 0