forked from moggieuk/Happy-Hare
-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Refactored MMU processor to work with moonraker
- Loading branch information
1 parent
af32c0d
commit d7ea2ee
Showing
6 changed files
with
144 additions
and
181 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,70 @@ | ||
import logging, os, re, fileinput | ||
|
||
class MmuServer: | ||
TOOL_DISCOVERY_REGEX = r'((^MMU_CHANGE_TOOL.*?TOOL=)|(^T))(?P<tool>\d{1,2})' | ||
METADATA_REPLACEMENT_STRING = "!mmu_inject_tools_used!" | ||
|
||
def __init__(self, config): | ||
self.config = config | ||
self.server = config.get_server() | ||
self.file_manager = self.server.lookup_component('file_manager') | ||
self.enable_file_preprocessor = config.getboolean('enable_file_preprocessor', True) | ||
|
||
self.server.register_event_handler("file_manager:filelist_changed", self._filelist_changed) | ||
|
||
def _filelist_changed(self, response): | ||
if not self.enable_file_preprocessor: | ||
return | ||
|
||
if response['action'] == 'create_file': | ||
filepath = os.path.join(self.file_manager.get_directory(), response['item']['path']) | ||
|
||
if filepath.endswith(".gcode"): | ||
self._write_mmu_metadata(filepath) | ||
|
||
def _write_mmu_metadata(self, file_path): | ||
self._log('Checking for MMU metadata placeholder in file: ' + file_path) | ||
has_placeholder, tools_used = self._enumerate_used_tools(file_path) | ||
|
||
# An edit-in-place is seen by Moonraker as a file change (rightly so), | ||
# BUT it's seen this way even if no changes are made. We use `has_placeholder` | ||
# to determine whether there are any changes to make to prevent an infinite loop. | ||
if has_placeholder: | ||
self._log('Writing MMU metadata to file: ' + file_path) | ||
return self._inject_tool_usage(file_path, tools_used) | ||
else : | ||
self._log('No MMU metadata placeholder found in file: ' + file_path) | ||
return False | ||
|
||
def _log(self, message): | ||
logging.info('mmu_file_processor ' + message) | ||
|
||
def _enumerate_used_tools(self, file_path): | ||
regex = re.compile(self.TOOL_DISCOVERY_REGEX, re.IGNORECASE) | ||
tools_used = set() | ||
has_placeholder = False | ||
|
||
with open(file_path, 'r') as f: | ||
for line in f: | ||
if not has_placeholder and self.METADATA_REPLACEMENT_STRING in line: | ||
has_placeholder = True | ||
|
||
match = regex.match(line) | ||
if match: | ||
tool = match.group('tool') | ||
tools_used.add(int(tool)) | ||
|
||
return (has_placeholder, sorted(tools_used)) | ||
|
||
def _inject_tool_usage(self, file_path, tools_used): | ||
with fileinput.FileInput(file_path, inplace=1) as file: | ||
for line in file: | ||
if self.METADATA_REPLACEMENT_STRING in line: | ||
print(line.replace(self.METADATA_REPLACEMENT_STRING, ','.join(map(str, tools_used))), end='') | ||
else: | ||
print(line, end='') | ||
|
||
return True | ||
|
||
def load_component(config): | ||
return MmuServer(config) |
This file was deleted.
Oops, something went wrong.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,68 @@ | ||
import os | ||
import shutil | ||
import unittest | ||
from unittest.mock import MagicMock | ||
|
||
from components.mmu_server import MmuServer | ||
|
||
class TestMmuServerFileProcessor(unittest.TestCase): | ||
TOOLCHANGE_FILEPATH = 'tests/support/toolchange.gcode' | ||
NO_TOOLCHANGE_FILEPATH = 'tests/support/no_toolchange.gcode' | ||
|
||
def setUp(self): | ||
self.subject = MmuServer(MagicMock()) | ||
shutil.copyfile('tests/support/toolchange.orig.gcode', self.TOOLCHANGE_FILEPATH) | ||
shutil.copyfile('tests/support/no_toolchange.orig.gcode', self.NO_TOOLCHANGE_FILEPATH) | ||
|
||
def tearDown(self): | ||
os.remove(self.TOOLCHANGE_FILEPATH) | ||
os.remove(self.NO_TOOLCHANGE_FILEPATH) | ||
|
||
def test_filelist_callback_when_enabled(self): | ||
self.subject.enable_file_preprocessor = True | ||
self.subject._write_mmu_metadata = MagicMock() | ||
|
||
self.subject._filelist_changed({'action': 'create_file', 'item': {'path': 'test.gcode'}}) | ||
|
||
self.subject._write_mmu_metadata.assert_called_once() | ||
|
||
def test_filelist_callback_when_disabled(self): | ||
self.subject.enable_file_preprocessor = False | ||
self.subject._write_mmu_metadata = MagicMock() | ||
|
||
self.subject._filelist_changed({'action': 'create_file', 'item': {'path': 'test.gcode'}}) | ||
|
||
self.subject._write_mmu_metadata.assert_not_called() | ||
|
||
def test_filelist_callback_when_wrong_event(self): | ||
self.subject.enable_file_preprocessor = False | ||
self.subject._write_mmu_metadata = MagicMock() | ||
|
||
self.subject._filelist_changed({'action': 'move_file', 'item': {'path': 'test.gcode'}}) | ||
|
||
self.subject._write_mmu_metadata.assert_not_called() | ||
|
||
def test_filelist_callback_when_wrong_file_type(self): | ||
self.subject.enable_file_preprocessor = False | ||
self.subject._write_mmu_metadata = MagicMock() | ||
|
||
self.subject._filelist_changed({'action': 'create_file', 'item': {'path': 'test.txt'}}) | ||
|
||
self.subject._write_mmu_metadata.assert_not_called() | ||
|
||
def test_write_mmu_metadata_when_writing_to_files(self): | ||
self.subject._write_mmu_metadata(self.TOOLCHANGE_FILEPATH) | ||
|
||
with open(self.TOOLCHANGE_FILEPATH, 'r') as f: | ||
file_contents = f.read() | ||
self.assertIn('PRINT_START MMU_TOOLS_USED=0,1,2,5,11\n', file_contents) | ||
self.assertNotIn('[mmu_inject_tools_used]', file_contents) | ||
|
||
def test_write_mmu_metadata_when_no_toolchanges(self): | ||
self.subject._write_mmu_metadata(self.NO_TOOLCHANGE_FILEPATH) | ||
|
||
with open(self.NO_TOOLCHANGE_FILEPATH, 'r') as f: | ||
file_contents = f.read() | ||
self.assertIn('PRINT_START MMU_TOOLS_USED=\n', file_contents) | ||
self.assertNotIn('[mmu_inject_tools_used]', file_contents) | ||
|
This file was deleted.
Oops, something went wrong.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters