forked from cooperative-computing-lab/cctools
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathbatch_file.c
202 lines (177 loc) · 6.28 KB
/
batch_file.c
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
/*
Copyright (C) 2022 The University of Notre Dame
This software is distributed under the GNU General Public License.
See the file COPYING for details.
*/
#include "batch_file.h"
#include "sha1.h"
#include "stringtools.h"
#include "xxmalloc.h"
#include "path.h"
#include "hash_table.h"
#include <time.h>
#include <sys/time.h>
#include <sys/types.h>
#include <dirent.h>
struct hash_table *check_sums = NULL;
double total_checksum_time = 0.0;
/**
* Create batch_file from outer_name and inner_name.
* Outer/DAG name indicates the name that will be on the host/submission side.
* This is equivalent to the filename in Makeflow.
* Inner/task name indicates the name that will be used for execution.
* IF no inner_name is given, or the specified batch_queue does not support
* remote renaming the outer_name will be used.
**/
struct batch_file *batch_file_create(struct batch_queue *queue, const char * outer_name, const char * inner_name)
{
struct batch_file *f = calloc(1,sizeof(*f));
f->outer_name = xxstrdup(outer_name);
if(batch_queue_supports_feature(queue, "remote_rename") && inner_name){
f->inner_name = xxstrdup(inner_name);
} else {
f->inner_name = xxstrdup(outer_name);
}
return f;
}
/**
* Delete batch_file, including freeing outer_name and inner_name/
**/
void batch_file_delete(struct batch_file *f)
{
if(!f)
return;
free(f->outer_name);
free(f->inner_name);
free(f);
}
/**
* Given a file, return the string that identifies it appropriately
* for the given batch system, combining the local and remote name
* and making substitutions according to the node.
**/
char * batch_file_to_string(struct batch_queue *queue, struct batch_file *f )
{
if(batch_queue_supports_feature(queue,"remote_rename")) {
return string_format("%s=%s", f->outer_name, f->inner_name);
} else {
return string_format("%s", f->outer_name);
}
}
/**
* Given a list of files, add the files to the given string.
* Returns the original string, realloced if necessary
**/
char * batch_files_to_string(struct batch_queue *queue, struct list *files )
{
struct batch_file *file;
char * file_str = strdup("");
char * separator = "";
if(!files) return file_str;
list_first_item(files);
while((file=list_next_item(files))) {
/* Only add separator if past first item. */
file_str = string_combine(file_str,separator);
char *f = batch_file_to_string(queue, file);
file_str = string_combine(file_str,f);
/* This could be set using batch_queue feature or option
* to allow for batch system specific separators. */
separator = ",";
free(f);
}
return file_str;
}
int batch_file_outer_compare(const void *file1, const void *file2) {
struct batch_file **f1 = (void *)file1;
struct batch_file **f2 = (void *)file2;
return strcmp((*f1)->outer_name, (*f2)->outer_name);
}
/* Return the content based ID for a file.
* generates the checksum of a file's contents if does not exist */
char * batch_file_generate_id(struct batch_file *f) {
if(check_sums == NULL){
check_sums = hash_table_create(0,0);
}
char *check_sum_value = hash_table_lookup(check_sums, f->outer_name);
if(check_sum_value == NULL){
unsigned char hash[SHA1_DIGEST_LENGTH];
struct timeval start_time;
struct timeval end_time;
gettimeofday(&start_time,NULL);
int success = sha1_file(f->outer_name, hash);
gettimeofday(&end_time,NULL);
double run_time = ((end_time.tv_sec*1000000 + end_time.tv_usec) - (start_time.tv_sec*1000000 + start_time.tv_usec)) / 1000000.0;
total_checksum_time += run_time;
debug(D_MAKEFLOW_HOOK," The total checksum time is %lf",total_checksum_time);
if(success == 0){
debug(D_MAKEFLOW, "Unable to checksum this file: %s", f->outer_name);
return NULL;
}
f->hash = xxstrdup(sha1_string(hash));
hash_table_insert(check_sums, f->outer_name, xxstrdup(sha1_string(hash)));
debug(D_MAKEFLOW,"Checksum hash of %s is: %s",f->outer_name,f->hash);
return xxstrdup(f->hash);
}
debug(D_MAKEFLOW,"Checksum already exists in hash table. Cached CHECKSUM hash of %s is: %s", f->outer_name, check_sum_value);
return xxstrdup(check_sum_value);
}
/* Return the content based ID for a directory.
* generates the checksum for the directories contents if does not exist
* *NEED TO ACCOUNT FOR SYMLINKS LATER* */
char * batch_file_generate_id_dir(char *file_name){
if(check_sums == NULL){
check_sums = hash_table_create(0,0);
}
char *check_sum_value = hash_table_lookup(check_sums, file_name);
if(check_sum_value == NULL){
char *hash_sum = "";
struct dirent **dp;
int num;
// Scans directory and sorts in reverse order
num = scandir(file_name, &dp, NULL, alphasort);
if(num < 0){
debug(D_MAKEFLOW,"Unable to scan %s", file_name);
return NULL;
}
else{
int i;
for(i = num - 1; i >= 0; i--) {
if(strcmp(dp[i]->d_name,".") != 0 && strcmp(dp[i]->d_name,"..") != 0){
char *file_path = string_format("%s/%s",file_name,dp[i]->d_name);
if(path_is_dir(file_path) == 1){
hash_sum = string_format("%s%s",hash_sum,batch_file_generate_id_dir(file_path));
}
else{
unsigned char hash[SHA1_DIGEST_LENGTH];
struct timeval start_time;
struct timeval end_time;
gettimeofday(&start_time,NULL);
int success = sha1_file(file_path, hash);
gettimeofday(&end_time,NULL);
double run_time = ((end_time.tv_sec*1000000 + end_time.tv_usec) - (start_time.tv_sec*1000000 + start_time.tv_usec)) / 1000000.0;
total_checksum_time += run_time;
debug(D_MAKEFLOW_HOOK," The total checksum time is %lf",total_checksum_time);
if(success == 0){
debug(D_MAKEFLOW, "Unable to checksum this file: %s", file_path);
free(file_path);
free(dp[i]);
continue;
}
hash_sum = string_format("%s%s:%s",hash_sum,file_name,sha1_string(hash));
}
free(file_path);
}
free(dp[i]);
}
free(dp);
unsigned char hash[SHA1_DIGEST_LENGTH];
sha1_buffer(hash_sum, strlen(hash_sum), hash);
free(hash_sum);
hash_table_insert(check_sums, file_name, xxstrdup(sha1_string(hash)));
debug(D_MAKEFLOW,"Checksum hash of %s is: %s",file_name,sha1_string(hash));
return xxstrdup(sha1_string(hash));
}
}
debug(D_MAKEFLOW,"Checksum already exists in hash table. Cached CHECKSUM hash of %s is: %s", file_name, check_sum_value);
return check_sum_value;
}