forked from aaPanel/BaoTa
-
Notifications
You must be signed in to change notification settings - Fork 0
/
flask_compress.py
146 lines (114 loc) · 4.98 KB
/
flask_compress.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
import sys,os
from gzip import GzipFile
from io import BytesIO
from flask import request, current_app,session,Response,g
if sys.version_info[:2] == (2, 6):
class GzipFile(GzipFile):
""" Backport of context manager support for python 2.6"""
def __enter__(self):
if self.fileobj is None:
raise ValueError("I/O operation on closed GzipFile object")
return self
def __exit__(self, *args):
self.close()
class DictCache(object):
def __init__(self):
self.data = {}
def get(self, key):
return self.data.get(key)
def set(self, key, value):
self.data[key] = value
class Compress(object):
"""
The Compress object allows your application to use Flask-Compress.
When initialising a Compress object you may optionally provide your
:class:`flask.Flask` application object if it is ready. Otherwise,
you may provide it later by using the :meth:`init_app` method.
:param app: optional :class:`flask.Flask` application object
:type app: :class:`flask.Flask` or None
"""
def __init__(self, app=None):
"""
An alternative way to pass your :class:`flask.Flask` application
object to Flask-Compress. :meth:`init_app` also takes care of some
default `settings`_.
:param app: the :class:`flask.Flask` application object.
"""
self.app = app
if app is not None:
self.init_app(app)
def init_app(self, app):
defaults = [
('COMPRESS_MIMETYPES', ['text/html', 'text/css', 'text/xml',
'application/json',
'application/javascript']),
('COMPRESS_LEVEL', 3),
('COMPRESS_MIN_SIZE', 500),
('COMPRESS_CACHE_KEY', None),
('COMPRESS_CACHE_BACKEND', None),
('COMPRESS_REGISTER', True),
]
for k, v in defaults:
app.config.setdefault(k, v)
backend = app.config['COMPRESS_CACHE_BACKEND']
self.cache = backend() if backend else None
self.cache_key = app.config['COMPRESS_CACHE_KEY']
if (app.config['COMPRESS_REGISTER'] and
app.config['COMPRESS_MIMETYPES']):
app.after_request(self.after_request)
def after_request(self, response):
app = self.app or current_app
accept_encoding = request.headers.get('Accept-Encoding', '')
response.headers['Server'] = 'nginx'
response.headers['Connection'] = 'keep-alive'
if 'dologin' in g and app.config['SSL']:
try:
for k,v in request.cookies.items():
response.set_cookie(k,'',expires='Thu, 01-Jan-1970 00:00:00 GMT',path='/')
except:
pass
if 'rm_ssl' in g:
import public
try:
for k,v in request.cookies.items():
response.set_cookie(k,'',expires='Thu, 01-Jan-1970 00:00:00 GMT',path='/')
except:
pass
session_name = app.config['SESSION_COOKIE_NAME']
session_id = public.get_session_id()
response.set_cookie(session_name,'',expires='Thu, 01-Jan-1970 00:00:00 GMT',path='/')
response.set_cookie(session_name, session_id, path='/', max_age=86400 * 30,httponly=True)
request_token = request.cookies.get('request_token','')
if request_token:
response.set_cookie('request_token',request_token,path='/',max_age=86400 * 30)
if (response.mimetype not in app.config['COMPRESS_MIMETYPES'] or
'gzip' not in accept_encoding.lower() or
not 200 <= response.status_code < 300 or
(response.content_length is not None and
response.content_length < app.config['COMPRESS_MIN_SIZE']) or
'Content-Encoding' in response.headers):
return response
response.direct_passthrough = False
if self.cache:
key = self.cache_key(response)
gzip_content = self.cache.get(key) or self.compress(app, response)
self.cache.set(key, gzip_content)
else:
gzip_content = self.compress(app, response)
response.set_data(gzip_content)
response.headers['Content-Encoding'] = 'gzip'
response.headers['Content-Length'] = response.content_length
vary = response.headers.get('Vary')
if vary:
if 'accept-encoding' not in vary.lower():
response.headers['Vary'] = '{}, Accept-Encoding'.format(vary)
else:
response.headers['Vary'] = 'Accept-Encoding'
return response
def compress(self, app, response):
gzip_buffer = BytesIO()
with GzipFile(mode='wb',
compresslevel=app.config['COMPRESS_LEVEL'],
fileobj=gzip_buffer) as gzip_file:
gzip_file.write(response.get_data())
return gzip_buffer.getvalue()