Skip to content

Commit

Permalink
Fix memory bloat when displaying plot logs in GUI by sending only new…
Browse files Browse the repository at this point in the history
… changes (Chia-Network#2847)

* Fix memory bloat when displaying plot logs in GUI by sending only new changes

* Add updated lines field to plot logs message

* Send full log during service connection, and send log updates every time

* updated plot logs
fixed removing of the plots from the queue

* used message from main process

* used just one plot for serial plotting

* lint format

Co-authored-by: Adam Kelly <[email protected]>
Co-authored-by: Zlatko <[email protected]>
  • Loading branch information
3 people authored Apr 28, 2021
1 parent 0810f4a commit 0f57b11
Show file tree
Hide file tree
Showing 2 changed files with 81 additions and 53 deletions.
9 changes: 9 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,15 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
and this project does not yet adhere to [Semantic Versioning](https://semver.org/spec/v2.0.0.html)
for setuptools_scm/PEP 440 reasons.


## UNRELEASED

### Changed

### Fixed

- Fixed excess memory use when displaying plot logs in GUI

## 1.1.2 Chia Blockchain 2021-04-24

### Changed
Expand Down
125 changes: 72 additions & 53 deletions chia/daemon/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import time
import traceback
import uuid

from concurrent.futures import ThreadPoolExecutor
from enum import Enum
from pathlib import Path
Expand Down Expand Up @@ -65,11 +66,15 @@ async def fetch(url: str):
class PlotState(str, Enum):
SUBMITTED = "SUBMITTED"
RUNNING = "RUNNING"
ERROR = "ERROR"
DELETED = "DELETED"
REMOVING = "REMOVING"
FINISHED = "FINISHED"


class PlotEvent(str, Enum):
LOG_CHANGED = "log_changed"
STATE_CHANGED = "state_changed"


# determine if application is a script file or frozen exe
if getattr(sys, "frozen", False):
name_map = {
Expand Down Expand Up @@ -294,40 +299,48 @@ def get_status(self) -> Dict[str, Any]:
response = {"success": True, "genesis_initialized": True}
return response

def plot_queue_to_payload(self, plot_queue_item) -> Dict[str, Any]:
def plot_queue_to_payload(self, plot_queue_item, send_full_log: bool) -> Dict[str, Any]:
error = plot_queue_item.get("error")
has_error = error is not None

return {
item = {
"id": plot_queue_item["id"],
"queue": plot_queue_item["queue"],
"size": plot_queue_item["size"],
"parallel": plot_queue_item["parallel"],
"delay": plot_queue_item["delay"],
"state": plot_queue_item["state"],
"error": str(error) if has_error else None,
"log": plot_queue_item.get("log"),
"deleted": plot_queue_item["deleted"],
"log_new": plot_queue_item.get("log_new"),
}

if send_full_log:
item["log"] = plot_queue_item.get("log")
return item

def prepare_plot_state_message(self, state: PlotEvent, id):
message = {
"state": state,
"queue": self.extract_plot_queue(id),
}
return message

def extract_plot_queue(self) -> List[Dict]:
def extract_plot_queue(self, id=None) -> List[Dict]:
send_full_log = id is None
data = []
for item in self.plots_queue:
data.append(WebSocketServer.plot_queue_to_payload(self, item))
if id is None or item["id"] == id:
data.append(self.plot_queue_to_payload(item, send_full_log))
return data

async def _state_changed(self, service: str, state: str):
async def _state_changed(self, service: str, message: Dict[str, Any]):
"""If id is None, send the whole state queue"""
if service not in self.connections:
return

message = None
websockets = self.connections[service]

if service == service_plotter:
message = {
"state": state,
"queue": self.extract_plot_queue(),
}

if message is None:
return

Expand All @@ -342,41 +355,35 @@ async def _state_changed(self, service: str, state: str):
websockets.remove(websocket)
await websocket.close()

def state_changed(self, service: str, state: str):
asyncio.create_task(self._state_changed(service, state))

async def _watch_file_changes(self, id: str, loop: asyncio.AbstractEventLoop):
config = self._get_plots_queue_item(id)
def state_changed(self, service: str, message: Dict[str, Any]):
asyncio.create_task(self._state_changed(service, message))

if config is None:
raise Exception(f"Plot queue config with ID {id} is not defined")
async def _watch_file_changes(self, config, fp: TextIO, loop: asyncio.AbstractEventLoop):
id = config["id"]
final_words = ["Renamed final file"]

words = ["Renamed final file"]
file_path = config["out_file"]
fp = open(file_path, "r")
while True:
new = await loop.run_in_executor(io_pool_exc, fp.readline)
new_data = await loop.run_in_executor(io_pool_exc, fp.readline)

if config["state"] is not PlotState.RUNNING:
return

config["log"] = new if config["log"] is None else config["log"] + new
self.state_changed(service_plotter, "log_changed")
if new_data not in (None, ""):
config["log"] = new_data if config["log"] is None else config["log"] + new_data
config["log_new"] = new_data
self.state_changed(service_plotter, self.prepare_plot_state_message(PlotEvent.LOG_CHANGED, id))

if new:
for word in words:
if word in new:
if new_data:
for word in final_words:
if word in new_data:
return
else:
time.sleep(0.5)

async def _track_plotting_progress(self, id: str, loop: asyncio.AbstractEventLoop):
config = self._get_plots_queue_item(id)

if config is None:
raise Exception(f"Plot queue config with ID {id} is not defined")

await self._watch_file_changes(id, loop)
async def _track_plotting_progress(self, config, loop: asyncio.AbstractEventLoop):
file_path = config["out_file"]
with open(file_path, "r") as fp:
await self._watch_file_changes(config, fp, loop)

def _build_plotting_command_args(self, request: Any, ignoreCount: bool) -> List[str]:
service_name = request["service"]
Expand Down Expand Up @@ -435,6 +442,9 @@ def _get_plots_queue_item(self, id: str):
def _run_next_serial_plotting(self, loop: asyncio.AbstractEventLoop, queue: str = "default"):
next_plot_id = None

if self._is_serial_plotting_running(queue) is True:
return

for item in self.plots_queue:
if item["queue"] == queue and item["state"] is PlotState.SUBMITTED and item["parallel"] is False:
next_plot_id = item["id"]
Expand All @@ -449,7 +459,7 @@ async def _start_plotting(self, id: str, loop: asyncio.AbstractEventLoop, queue:
config = self._get_plots_queue_item(id)

if config is None:
raise Exception(f"Plot queue with ID {id} does not exists")
raise Exception(f"Plot queue config with ID {id} does not exist")

state = config["state"]
if state is not PlotState.SUBMITTED:
Expand All @@ -459,7 +469,7 @@ async def _start_plotting(self, id: str, loop: asyncio.AbstractEventLoop, queue:
delay = config["delay"]
await asyncio.sleep(delay)

if config["state"] is PlotState.DELETED:
if config["state"] is not PlotState.SUBMITTED:
return

service_name = config["service_name"]
Expand All @@ -473,27 +483,24 @@ async def _start_plotting(self, id: str, loop: asyncio.AbstractEventLoop, queue:
config["state"] = PlotState.RUNNING
config["out_file"] = plotter_log_path(self.root_path, id).absolute()
config["process"] = process
self.state_changed(service_plotter, "state")
self.state_changed(service_plotter, self.prepare_plot_state_message(PlotEvent.STATE_CHANGED, id))

if service_name not in self.services:
self.services[service_name] = []

self.services[service_name].append(process)

await self._track_plotting_progress(id, loop)

# (output, err) = process.communicate()
# await process.wait()
await self._track_plotting_progress(config, loop)

config["state"] = PlotState.FINISHED
self.state_changed(service_plotter, "state")
self.state_changed(service_plotter, self.prepare_plot_state_message(PlotEvent.STATE_CHANGED, id))

except (subprocess.SubprocessError, IOError):
log.exception(f"problem starting {service_name}")
error = Exception("Start plotting failed")
config["state"] = PlotState.ERROR
config["state"] = PlotState.FINISHED
config["error"] = error
self.state_changed(service_plotter, "state")
self.state_changed(service_plotter, self.prepare_plot_state_message(PlotEvent.STATE_CHANGED, id))
raise error

finally:
Expand Down Expand Up @@ -521,13 +528,17 @@ async def start_plotting(self, request: Dict[str, Any]):
"parallel": parallel,
"delay": delay * k if parallel is True else delay,
"state": PlotState.SUBMITTED,
"deleted": False,
"error": None,
"log": None,
"process": None,
}

self.plots_queue.append(config)

# notify GUI about new plot queue item
self.state_changed(service_plotter, self.prepare_plot_state_message(PlotEvent.STATE_CHANGED, id))

# only first item can start when user selected serial plotting
can_start_serial_plotting = k == 0 and self._is_serial_plotting_running(queue) is False

Expand Down Expand Up @@ -556,27 +567,34 @@ async def stop_plotting(self, request: Dict[str, Any]) -> Dict[str, Any]:
process = config["process"]
queue = config["queue"]

# mark plot config as deleted for already running asyncio functions
config["state"] = PlotState.DELETED
if config["state"] is PlotState.REMOVING:
return {"success": False}

try:
run_next = False
if process is not None and state == PlotState.RUNNING:
run_next = True
config["state"] = PlotState.REMOVING
self.state_changed(service_plotter, self.prepare_plot_state_message(PlotEvent.STATE_CHANGED, id))
await kill_process(process, self.root_path, service_plotter, id)

config["state"] = PlotState.FINISHED
config["deleted"] = True

self.state_changed(service_plotter, self.prepare_plot_state_message(PlotEvent.STATE_CHANGED, id))

self.plots_queue.remove(config)

if run_next:
loop = asyncio.get_event_loop()
self._run_next_serial_plotting(loop, queue)

self.state_changed(service_plotter, "removed")
return {"success": True}
except Exception as e:
log.error(f"Error during killing the plot process: {e}")
config["state"] = PlotState.ERROR
config["state"] = PlotState.FINISHED
config["error"] = str(e)
self.state_changed(service_plotter, "state")
self.state_changed(service_plotter, self.prepare_plot_state_message(PlotEvent.STATE_CHANGED, id))
return {"success": False}

async def start_service(self, request: Dict[str, Any]):
Expand Down Expand Up @@ -677,6 +695,7 @@ async def register_service(self, websocket: WebSocketServerProtocol, request: Di
if self.ping_job is None:
self.ping_job = asyncio.create_task(self.ping_task())
self.log.info(f"registered for service {service}")
log.info(f"{response}")
return response


Expand Down

0 comments on commit 0f57b11

Please sign in to comment.