-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathdriver.py
171 lines (129 loc) · 6.48 KB
/
driver.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
import os
import sys
from pathlib import Path
import re
import numpy as np
import tqdm as tqdm
import audio_converters
from verbose_print import vprint1, vprint2, vprint3
class Converter:
"""
Converter Driver, runs the whole operation
See convert.py for a description of the args param for __init__
"""
def __init__(self, args):
self.args = args
self.use_stdout = args["-s"] or False
self.use_glue = args["-g"] or False
self.use_glue_for_labels = args["-l"] or False
self.dir_mode = args["dir"] or False
self.file_mode = args["file"] or False
self.module_mode = args["--modules"] or False
self.input = args["<input_directory>"] or args["<input_file>"]
self.output = args["<output_directory>"] or args["<output_file>"]
self.filter = args["-f"] or None
if self.filter is not None:
self.filter = re.compile(str(self.filter).lstrip('='))
self.glued = []
self.labels = []
def run(self):
vprint2(f"[startup] {self.input} -> {self.output}")
if self.dir_mode is True:
# converting a directory and copy it to a different dir
vprint1("[startup] converting directory")
self.cvt_tree(self.input, self.output)
elif self.file_mode is True:
# convert just a single file
vprint1("[startup] converting file")
self.cvt_file(self.input, self.output)
if self.module_mode is True:
# this prints the currently installed modules
print("\n".join([f"{v.__name__} -> \"{k}\"" for k, v in audio_converters.available.items()]))
def cvt_file(self, input_path: str, output_path: str) -> None:
"""Convert a single audio file to a numpy array, uses available converters.
:param input_path: the path to the input file
:param output_path: the path to the output file
"""
# check all available converters
for key, value in audio_converters.available.items():
# prepare some reusable debug info
converter_name = value.__name__
check_string = f"\"{input_path}\".endswith(\"{key}\")"
vprint2(f"[cvt_file] checking converter {converter_name} with type \"{key}\"")
# check if the converter matches the file ending of the input file
vprint3("[cvt_file] checking if " + check_string)
if input_path.endswith(key):
vprint3(f"[cvt_file] {check_string} = True")
# instantiate converter
vprint2("[cvt_file] creating instance of " + converter_name)
converter = value()
# tell the converter to create a spectrogram from the audio file
try:
vprint2("[cvt_file] converting with " + converter_name)
array = converter.convert(input_path)
except FileNotFoundError as e:
# if the input file does not exist we error out
# and return early
print("Failed to convert file: File not found!")
return
# check if we should glue all the individual files into one
if self.use_glue:
self.glued += [array]
self.labels += [input_path + ".label.npy"]
return
# save numpy array to disk in binary format
if not self.use_stdout:
np.save(output_path, array)
else:
# write header
sys.stdout.write(f"============== BEGIN SHAPE ==============\n# shape={array.shape}\n")
# iterate 3rd dim
for data_slice in array:
# write slice
np.savetxt(sys.stdout.buffer, data_slice, fmt='%-7.4f')
# write slice footer
sys.stdout.write("# slice\n")
# write footer
sys.stdout.write("=============== END SHAPE ===============")
# stop looking for other converters
return
def cvt_tree(self, input_dir: str, output_dir: str) -> None:
"""Convert an entire folder from audio to numpy arrays.
Uses all available converters.
:param input_dir: the input directory containing the audio files
:param output_dir: the directory to put the numpy arrays into,
this folder will be created if it does not exist.
"""
# create output_dir and all parents if not exist
vprint3(f"[cvt_tree] Ensuring that {output_dir} exists")
Path(output_dir).mkdir(parents=True, exist_ok=True)
# make sure we have a fresh array to glue together if required
self.glued = []
# iterate over all files in the input directory
for filename in tqdm.tqdm(os.listdir(input_dir),desc="Walking files"):
vprint3(f"[cvt_tree] processing {filename} in {input_dir}")
if self.filter is None or self.filter.match(filename) is not None:
# get the "filename without extension" from filename
pre, _ = os.path.splitext(filename)
# put input_dir and filename together
input_file = os.path.join(input_dir, filename)
# put output_dir and "filename with new extension together"
output_file = os.path.join(output_dir, pre + ".npy")
vprint3(f"[cvt_tree] converting {input_file} to {output_file}")
self.cvt_file(input_file, output_file)
else:
vprint3(f"[cvt_tree] file {filename} was skipped since it did not match {self.filter}")
# check if there is something over to save to disk
if len(self.glued) > 0:
# save the glued together arrays to disk
out = os.path.join(output_dir, "glued.npy")
np.save(out, np.array(self.glued))
# also glue labels if required
labels = []
if self.use_glue_for_labels:
for x in self.labels:
labels += [np.load(x)]
out = os.path.join(output_dir, "labels.npy")
np.save(out, np.array(labels))
# clear array
self.glued = []