Skip to content

Commit

Permalink
Lock socket during sending data
Browse files Browse the repository at this point in the history
  • Loading branch information
shortcutme committed Oct 3, 2017
1 parent 20a0631 commit f4cdc31
Show file tree
Hide file tree
Showing 2 changed files with 128 additions and 7 deletions.
113 changes: 113 additions & 0 deletions plugins/FilePack/FilePackPlugin.py-
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
import os
import re
import itertools

from Plugin import PluginManager
from Config import config
from util import helper


# Keep archive open for faster reponse times for large sites
archive_cache = {}


def closeArchive(archive_path):
if archive_path in archive_cache:
del archive_cache[archive_path]


def openArchive(archive_path):
if archive_path not in archive_cache:
if archive_path.endswith("tar.gz"):
import tarfile
archive_cache[archive_path] = tarfile.open(archive_path, "r:gz")
elif archive_path.endswith("tar.bz2"):
import tarfile
archive_cache[archive_path] = tarfile.open(archive_path, "r:bz2")
else:
import zipfile
archive_cache[archive_path] = zipfile.ZipFile(archive_path)
helper.timer(5, lambda: closeArchive(archive_path)) # Close after 5 sec

archive = archive_cache[archive_path]
return archive

def openArchiveFile(archive_path, path_within):
archive = openArchive(archive_path)
if archive_path.endswith(".zip"):
return archive.open(path_within)
else:
return archive.extractfile(path_within.encode("utf8"))


@PluginManager.registerTo("UiRequest")
class UiRequestPlugin(object):
def actionSiteMedia(self, path, **kwargs):
if ".zip/" in path or ".tar.gz/" in path:
path_parts = self.parsePath(path)
file_path = u"%s/%s/%s" % (config.data_dir, path_parts["address"], path_parts["inner_path"].decode("utf8"))
match = re.match("^(.*\.(?:tar.gz|tar.bz2|zip))/(.*)", file_path)
archive_path, path_within = match.groups()
if not os.path.isfile(archive_path):
site = self.server.site_manager.get(path_parts["address"])
if not site:
self.error404(path)
# Wait until file downloads
result = site.needFile(site.storage.getInnerPath(archive_path), priority=10)
# Send virutal file path download finished event to remove loading screen
site.updateWebsocket(file_done=site.storage.getInnerPath(file_path))
if not result:
return self.error404(path)
try:
file = openArchiveFile(archive_path, path_within)
content_type = self.getContentType(file_path)
self.sendHeader(200, content_type=content_type, noscript=kwargs.get("header_noscript", False))
return self.streamFile(file)
except Exception, err:
self.log.debug("Error opening archive file: %s" % err)
return self.error404(path)

return super(UiRequestPlugin, self).actionSiteMedia(path, **kwargs)

def streamFile(self, file):
while 1:
try:
block = file.read(60 * 1024)
if block:
yield block
else:
raise StopIteration
except StopIteration:
file.close()
break


@PluginManager.registerTo("SiteStorage")
class SiteStoragePlugin(object):
def isFile(self, inner_path):
if ".zip/" in inner_path or ".tar.gz/" in inner_path or ".tar.bz2/" in inner_path:
match = re.match("^(.*\.(?:tar.gz|tar.bz2|zip))/(.*)", inner_path)
inner_archive_path, path_within = match.groups()
return super(SiteStoragePlugin, self).isFile(inner_archive_path)
else:
return super(SiteStoragePlugin, self).isFile(inner_path)

def getDbFiles(self):
for item in super(SiteStoragePlugin, self).getDbFiles():
yield item

# Search for archive files
for content_inner_path in self.site.content_manager.listContents():
content = self.site.content_manager.contents[content_inner_path]
if not content:
merged_site.log.error("[MISSING] %s" % content_inner_path)
continue

file_relative_paths = itertools.chain(
content.get("files", {}).iteritems(),
content.get("files_optional", {}).iteritems()
)

for file_relative_path, node in file_relative_paths:
if "zeronet-archive" in file_relative_path:
print node
22 changes: 15 additions & 7 deletions src/Connection/Connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,10 @@
import gevent
import msgpack
import msgpack.fallback
try:
from gevent.coros import RLock
except:
from gevent.lock import RLock

from Config import config
from Debug import Debug
Expand All @@ -15,7 +19,7 @@ class Connection(object):
__slots__ = (
"sock", "sock_wrapped", "ip", "port", "cert_pin", "target_onion", "id", "protocol", "type", "server", "unpacker", "req_id",
"handshake", "crypt", "connected", "event_connected", "closed", "start_time", "last_recv_time",
"last_message_time", "last_send_time", "last_sent_time", "incomplete_buff_recv", "bytes_recv", "bytes_sent", "cpu_time",
"last_message_time", "last_send_time", "last_sent_time", "incomplete_buff_recv", "bytes_recv", "bytes_sent", "cpu_time", "send_lock",
"last_ping_delay", "last_req_time", "last_cmd", "bad_actions", "sites", "name", "updateName", "waiting_requests", "waiting_streams"
)

Expand Down Expand Up @@ -58,6 +62,7 @@ def __init__(self, server, ip, port, sock=None, target_onion=None):
self.bad_actions = 0
self.sites = 0
self.cpu_time = 0.0
self.send_lock = RLock()

self.name = None
self.updateName()
Expand Down Expand Up @@ -351,6 +356,7 @@ def handleStream(self, message):

# Send data to connection
def send(self, message, streaming=False):
self.last_send_time = time.time()
if config.debug_socket:
self.log("Send: %s, to: %s, streaming: %s, site: %s, inner_path: %s, req_id: %s" % (
message.get("cmd"), message.get("to"), streaming,
Expand All @@ -362,10 +368,10 @@ def send(self, message, streaming=False):
self.log("Send error: missing socket")
return False

self.last_send_time = time.time()
try:
if streaming:
bytes_sent = StreamingMsgpack.stream(message, self.sock.sendall)
with self.send_lock:
bytes_sent = StreamingMsgpack.stream(message, self.sock.sendall)
message = None
self.bytes_sent += bytes_sent
self.server.bytes_sent += bytes_sent
Expand All @@ -374,7 +380,8 @@ def send(self, message, streaming=False):
message = None
self.bytes_sent += len(data)
self.server.bytes_sent += len(data)
self.sock.sendall(data)
with self.send_lock:
self.sock.sendall(data)
except Exception, err:
self.close("Send error: %s" % err)
return False
Expand All @@ -387,9 +394,10 @@ def sendRawfile(self, file, read_bytes):
bytes_left = read_bytes
while True:
self.last_send_time = time.time()
self.sock.sendall(
file.read(min(bytes_left, buff))
)
with self.send_lock:
self.sock.sendall(
file.read(min(bytes_left, buff))
)
bytes_left -= buff
if bytes_left <= 0:
break
Expand Down

0 comments on commit f4cdc31

Please sign in to comment.