forked from lighttpd/lighttpd2
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathlogfile.py
154 lines (140 loc) · 5.12 KB
/
logfile.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
# -*- coding: utf-8 -*-
import time
__all__ = [ 'LogFile', 'RemoveEscapeSeq' ]
ATTRS = [ 'closed', 'encoding', 'errors', 'mode', 'name', 'newlines', 'softspace' ]
class LogFile(object):
def __init__(self, file, **clones):
self.file = file
self.clones = clones
self.newline = True
def __enter__(self, *args, **kwargs): return self.file.__enter__(*args, **kwargs)
def __exit__(self, *args, **kwargs): return self.file.__exit__(*args, **kwargs)
def __iter__(self, *args, **kwargs): return self.file.__iter__(*args, **kwargs)
def __repr__(self, *args, **kwargs): return self.file.__repr__(*args, **kwargs)
def __delattr__(self, name):
if name in ATTRS:
return delattr(self.file, name)
else:
return super(LogFile, self).__delattr__(name, value)
def __getattr__(self, name):
if name in ATTRS:
return getattr(self.file, name)
else:
return super(LogFile, self).__getattr__(name, value)
def __getattribute__(self, name):
if name in ATTRS:
return self.file.__getattribute__(name)
else:
return object.__getattribute__(self, name)
def __setattr__(self, name, value):
if name in ATTRS:
return setattr(self.file, name, value)
else:
return super(LogFile, self).__setattr__(name, value)
def close(self, *args, **kwargs): return self.file.close(*args, **kwargs)
def fileno(self, *args, **kwargs):
pass
def flush(self, *args, **kwargs):
for (p, f) in self.clones.items():
f.flush(*args, **kwargs)
return self.file.flush(*args, **kwargs)
def isatty(self, *args, **kwargs): return False
def next(self, *args, **kwargs): return self.file.next(*args, **kwargs)
def read(self, *args, **kwargs): return self.file.read(*args, **kwargs)
def readinto(self, *args, **kwargs): return self.file.readinto(*args, **kwargs)
def readline(self, *args, **kwargs): return self.file.readline(*args, **kwargs)
def readlines(self, *args, **kwargs): return self.file.readlines(*args, **kwargs)
def seek(self, *args, **kwargs):
pass
def tell(self, *args, **kwargs): return self.file.tell(*args, **kwargs)
def truncate(self, *args, **kwargs):
pass
def __write(self, str):
self.file.write(str)
for (p, f) in self.clones.items():
f.write(p + str)
def _write(self, str):
if "" == str: return
if self.newline:
# "%f" needs python 2.6
# ts = time.strftime("%Y/%m/%d %H:%M:%S.%f %Z: ")
ts = time.strftime("%Y/%m/%d %H:%M:%S %Z")
self.file.write(ts + ": " + str)
for (p, f) in self.clones.items():
f.write(ts + " " + p + ": " + str)
else:
self.file.write(str)
for (p, f) in self.clones.items():
f.write(str)
self.newline = ('\n' == str[-1])
def write(self, str):
lines = str.split('\n')
for l in lines[:-1]:
self._write(l + '\n')
self._write(lines[-1])
def writelines(self, *args):
return self.write(''.join(args))
def xreadlines(self, *args, **kwargs): return self.file.xreadlines(*args, **kwargs)
class RemoveEscapeSeq(object):
def __init__(self, file):
self.file = file
self.escape_open = False
def __enter__(self, *args, **kwargs): return self.file.__enter__(*args, **kwargs)
def __exit__(self, *args, **kwargs): return self.file.__exit__(*args, **kwargs)
def __iter__(self, *args, **kwargs): return self.file.__iter__(*args, **kwargs)
def __repr__(self, *args, **kwargs): return self.file.__repr__(*args, **kwargs)
def __delattr__(self, name):
if name in ATTRS:
return delattr(self.file, name)
else:
return super(RemoveEscapeSeq, self).__delattr__(name, value)
def __getattr__(self, name):
if name in ATTRS:
return getattr(self.file, name)
else:
return super(RemoveEscapeSeq, self).__getattr__(name, value)
def __getattribute__(self, name):
if name in ATTRS:
return self.file.__getattribute__(name)
else:
return object.__getattribute__(self, name)
def __setattr__(self, name, value):
if name in ATTRS:
return setattr(self.file, name, value)
else:
return super(RemoveEscapeSeq, self).__setattr__(name, value)
def close(self, *args, **kwargs): return self.file.close(*args, **kwargs)
def fileno(self, *args, **kwargs):
pass
def flush(self, *args, **kwargs): return self.file.flush(*args, **kwargs)
def isatty(self, *args, **kwargs): return False
def next(self, *args, **kwargs): return self.file.next(*args, **kwargs)
def read(self, *args, **kwargs): return self.file.read(*args, **kwargs)
def readinto(self, *args, **kwargs): return self.file.readinto(*args, **kwargs)
def readline(self, *args, **kwargs): return self.file.readline(*args, **kwargs)
def readlines(self, *args, **kwargs): return self.file.readlines(*args, **kwargs)
def seek(self, *args, **kwargs):
pass
def tell(self, *args, **kwargs): return self.file.tell(*args, **kwargs)
def truncate(self, *args, **kwargs):
pass
def write(self, str):
while str != "":
if self.escape_open:
l = str.split('m', 1)
if len(l) == 2:
self.escape_open = False
str = l[1]
else:
return
else:
l = str.split('\033', 1)
self.file.write(l[0])
if len(l) == 2:
self.escape_open = True
str = l[1]
else:
return
def writelines(self, *args):
return self.write(''.join(args))
def xreadlines(self, *args, **kwargs): return self.file.xreadlines(*args, **kwargs)