forked from dr-who/stutools
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathflushTestReader.c
137 lines (116 loc) · 3.32 KB
/
flushTestReader.c
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
#define _GNU_SOURCE
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <pthread.h>
#include <getopt.h>
#include <sys/uio.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <fcntl.h>
#include <signal.h>
#include "utils.h"
#include "logSpeed.h"
#define NEEDLE "!!!### 3.14159265 ###!!!"
#define READCHUNK (10*1024*1024)
typedef struct {
int threadid;
char *path;
size_t total;
size_t startPosition;
size_t exclusive;
logSpeedType logSpeed;
} threadInfoType;
int keepRunning = 1;
void intHandler(int d) {
(void)d;
fprintf(stderr,"got signal\n");
}
static void *runThread(void *arg) {
threadInfoType *threadContext = (threadInfoType*)arg; // grab the thread threadContext args
int mode = O_RDONLY | O_DIRECT;
int fd = open(threadContext->path, mode);
if (fd < 0) {
perror(threadContext->path);
return NULL;
}
char *haystack;
CALLOC(haystack, READCHUNK, 1);
//= aligned_alloc(4096, READCHUNK); // 10MB to skip superblock
double lastnum = 0;
double maxdelta = 0;
double totaldelta = 0;
double totalN = 0;
while (1) {
lseek(fd, 0, SEEK_SET);
int bytes = read(fd, haystack, READCHUNK);
double readtime = timedouble();
if (bytes < 0) {
perror("read");
}
char actualneed[100];
sprintf(actualneed, "%s%2d", NEEDLE, threadContext->threadid);
char *pos = memmem(haystack, bytes, actualneed, strlen(actualneed));
if (pos) {
double r = atof(pos - 24);
if (r != lastnum) {
// fprintf(stderr,"read '%s'\n", s);
double delta = readtime - r;
if (delta < 87600) {
totalN++;
totaldelta += delta;
if (delta > maxdelta) {
maxdelta = delta;
}
}
fprintf(stderr,"thread %d, pos=%10zd read %f (delta from now %f, avg delta %f, max delta %f)\n", threadContext->threadid, pos - haystack, r, delta, totaldelta/totalN, maxdelta);
lastnum = r;
}
usleep(10);
}
}
free(haystack);
return NULL;
}
void startThreads(int argc, char *argv[]) {
if (argc > 0) {
size_t threads = argc - 1;
pthread_t *pt = (pthread_t*) calloc((size_t) threads, sizeof(pthread_t));
if (pt==NULL) {fprintf(stderr, "OOM(pt): \n");exit(-1);}
threadInfoType *threadContext = (threadInfoType*) calloc(threads, sizeof(threadInfoType));
if (threadContext == NULL) {fprintf(stderr,"OOM(threadContext): \n");exit(-1);}
int startP = 0;
for (size_t i = 0; i < threads; i++) {
if (argv[i + 1][0] != '-') {
threadContext[i].path = argv[i + 1];
threadContext[i].threadid = i;
threadContext[i].exclusive = 0;
threadContext[i].startPosition = (1024L*1024*1024) * startP;
threadContext[i].total = 0;
logSpeedInit(&threadContext[i].logSpeed);
pthread_create(&(pt[i]), NULL, runThread, &(threadContext[i]));
}
}
size_t allbytes = 0;
double maxtime = 0;
double allmb = 0;
for (size_t i = 0; i < threads; i++) {
if (argv[i + 1][0] != '-') {
pthread_join(pt[i], NULL);
allbytes += threadContext[i].total;
allmb += logSpeedMean(&threadContext[i].logSpeed) / 1024.0 / 1024;
if (logSpeedTime(&threadContext[i].logSpeed) > maxtime) {
maxtime = logSpeedTime(&threadContext[i].logSpeed);
}
logSpeedFree(&threadContext[i].logSpeed);
}
}
free(threadContext);
free(pt);
}
}
int main(int argc, char *argv[]) {
startThreads(argc, argv);
return 0;
}