|
| 1 | +# -*- coding: utf-8 -*- |
| 2 | +from __future__ import absolute_import, division, print_function, unicode_literals |
| 3 | + |
| 4 | +__license__ = 'GNU Affero General Public License http://www.gnu.org/licenses/agpl.html' |
| 5 | +__copyright__ = "Copyright (C) 2018 The OctoPrint Project - Released under terms of the AGPLv3 License" |
| 6 | + |
| 7 | +import octoprint.plugin |
| 8 | + |
| 9 | +from octoprint.events import Events |
| 10 | +from octoprint.access import USER_GROUP, ADMIN_GROUP |
| 11 | +from octoprint.access.permissions import Permissions |
| 12 | +from octoprint.util import to_unicode |
| 13 | + |
| 14 | +from .checks import Severity |
| 15 | +from .checks.firmware_unsafe import FirmwareUnsafeChecks |
| 16 | +from .checks.firmware_broken import FirmwareBrokenChecks |
| 17 | + |
| 18 | +import flask |
| 19 | +from flask_babel import gettext |
| 20 | + |
| 21 | +import textwrap |
| 22 | + |
| 23 | +TERMINAL_WARNING = """ |
| 24 | +!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!! |
| 25 | +{message} |
| 26 | +
|
| 27 | +Learn more at {url} |
| 28 | +!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!! |
| 29 | +
|
| 30 | +""" |
| 31 | + |
| 32 | +FIRMWARE_CHECKS = { |
| 33 | + "firmware-unsafe": FirmwareUnsafeChecks.as_dict(), |
| 34 | + "firmware-broken": FirmwareBrokenChecks.as_dict() |
| 35 | +} |
| 36 | + |
| 37 | +class FirmwareCheckPlugin(octoprint.plugin.AssetPlugin, |
| 38 | + octoprint.plugin.EventHandlerPlugin, |
| 39 | + octoprint.plugin.SimpleApiPlugin, |
| 40 | + octoprint.plugin.TemplatePlugin): |
| 41 | + |
| 42 | + # noinspection PyMissingConstructor |
| 43 | + def __init__(self): |
| 44 | + self._warnings = dict() |
| 45 | + self._scan_received = True |
| 46 | + |
| 47 | + ##~~ TemplatePlugin API |
| 48 | + |
| 49 | + def get_template_configs(self): |
| 50 | + return [ |
| 51 | + dict(type="sidebar", |
| 52 | + name=gettext("Attention!"), |
| 53 | + data_bind="visible: printerState.isOperational() && loginState.isAdmin() && warnings().length > 0", |
| 54 | + icon="exclamation-triangle", |
| 55 | + styles_wrapper=["display: none"]) |
| 56 | + ] |
| 57 | + |
| 58 | + ##~~ AssetPlugin API |
| 59 | + |
| 60 | + def get_assets(self): |
| 61 | + return dict(js=("js/firmware_check.js",), |
| 62 | + clientjs=("clientjs/firmware_check.js",), |
| 63 | + css=("css/firmware_check.css",), |
| 64 | + less=("less/firmware_check.less",)) |
| 65 | + |
| 66 | + ##~~ EventHandlerPlugin API |
| 67 | + |
| 68 | + def on_event(self, event, payload): |
| 69 | + if event == Events.DISCONNECTED: |
| 70 | + self._reset_warnings() |
| 71 | + self._reset_state() |
| 72 | + self._reset_checks() |
| 73 | + |
| 74 | + ##~~ SimpleApiPlugin API |
| 75 | + |
| 76 | + def on_api_get(self, request): |
| 77 | + if not Permissions.PLUGIN_FIRMWARE_CHECK_DISPLAY.can(): |
| 78 | + return flask.make_response("Insufficient rights", 403) |
| 79 | + return flask.jsonify(self._warnings) |
| 80 | + |
| 81 | + ##~~ GCODE received hook handler |
| 82 | + |
| 83 | + def on_gcode_received(self, comm_instance, line, *args, **kwargs): |
| 84 | + if self._scan_received: |
| 85 | + self._run_checks("received", to_unicode(line, errors="replace")) |
| 86 | + return line |
| 87 | + |
| 88 | + ##~~ Firmware info hook handler |
| 89 | + |
| 90 | + def on_firmware_info_received(self, comm_instance, firmware_name, firmware_data): |
| 91 | + self._run_checks("m115", |
| 92 | + to_unicode(firmware_name, errors="replace"), |
| 93 | + dict((to_unicode(key, errors="replace"), to_unicode(value, errors="replace")) |
| 94 | + for key, value in firmware_data.items())) |
| 95 | + self._scan_received = False |
| 96 | + |
| 97 | + ##~~ Firmware capability hook handler |
| 98 | + |
| 99 | + def on_firmware_cap_received(self, comm_instance, cap, enabled, all_caps): |
| 100 | + self._run_checks("cap", |
| 101 | + to_unicode(cap, errors="replace"), |
| 102 | + enabled) |
| 103 | + |
| 104 | + ##~~ Additional permissions hook handler |
| 105 | + |
| 106 | + def get_additional_permissions(self): |
| 107 | + return [ |
| 108 | + dict(key="DISPLAY", |
| 109 | + name="Display firmware check warnings", |
| 110 | + description=gettext("Allows to see firmware check warnings"), |
| 111 | + roles=["display"], |
| 112 | + default_groups=[USER_GROUP]) |
| 113 | + ] |
| 114 | + |
| 115 | + ##~~ Helpers |
| 116 | + |
| 117 | + def _run_checks(self, check_type, *args, **kwargs): |
| 118 | + changes = False |
| 119 | + |
| 120 | + for warning_type, check_data in FIRMWARE_CHECKS.items(): |
| 121 | + checks = check_data.get("checks") |
| 122 | + message = check_data.get("message") |
| 123 | + severity = check_data.get("severity", Severity.CRITICAL) |
| 124 | + url = "https://faq.octoprint.org/warning-{warning_type}".format(warning_type=warning_type) |
| 125 | + if not checks or not message: |
| 126 | + continue |
| 127 | + |
| 128 | + for check in checks: |
| 129 | + if not check.active: |
| 130 | + # skip non active checks |
| 131 | + continue |
| 132 | + |
| 133 | + method = getattr(check, check_type, None) |
| 134 | + if not callable(method): |
| 135 | + # skip uncallable checks |
| 136 | + continue |
| 137 | + |
| 138 | + # execute method |
| 139 | + try: |
| 140 | + method(*args, **kwargs) |
| 141 | + except Exception: |
| 142 | + self._logger.exception("There was an error running method {} on check {!r}".format(check_type, check)) |
| 143 | + continue |
| 144 | + |
| 145 | + # check if now triggered |
| 146 | + if check.triggered: |
| 147 | + if check.url is not None: |
| 148 | + url = check.url |
| 149 | + |
| 150 | + self._register_warning(warning_type, message, severity, url) |
| 151 | + |
| 152 | + # noinspection PyUnresolvedReferences |
| 153 | + self._event_bus.fire(Events.PLUGIN_FIRMWARE_CHECK_WARNING, dict(check_name=check.name, |
| 154 | + warning_type=warning_type, |
| 155 | + severity=severity, |
| 156 | + url=url)) |
| 157 | + changes = True |
| 158 | + break |
| 159 | + |
| 160 | + if changes: |
| 161 | + self._ping_clients() |
| 162 | + |
| 163 | + def _register_warning(self, warning_type, message, severity, url): |
| 164 | + self._log_to_terminal(TERMINAL_WARNING.format(message="\n".join(textwrap.wrap(message, 75)), |
| 165 | + warning_type=warning_type, |
| 166 | + url=url)) |
| 167 | + self._warnings[warning_type] = dict(message=message, |
| 168 | + severity=severity, |
| 169 | + url=url) |
| 170 | + |
| 171 | + def _reset_warnings(self): |
| 172 | + self._warnings.clear() |
| 173 | + self._ping_clients() |
| 174 | + |
| 175 | + def _reset_state(self): |
| 176 | + self._scan_received = True |
| 177 | + |
| 178 | + def _reset_checks(self): |
| 179 | + for warning_type, check_data in FIRMWARE_CHECKS.items(): |
| 180 | + checks = check_data.get("checks") |
| 181 | + if not checks: |
| 182 | + continue |
| 183 | + |
| 184 | + for check in checks: |
| 185 | + check.reset() |
| 186 | + |
| 187 | + def _log_to_terminal(self, message): |
| 188 | + if self._printer: |
| 189 | + lines = message.split("\n") |
| 190 | + self._printer.log_lines(*lines) |
| 191 | + |
| 192 | + def _ping_clients(self): |
| 193 | + self._plugin_manager.send_plugin_message(self._identifier, dict(type="update")) |
| 194 | + |
| 195 | + |
| 196 | +def register_custom_events(*args, **kwargs): |
| 197 | + return ["warning",] |
| 198 | + |
| 199 | + |
| 200 | +__plugin_name__ = "Firmware Check" |
| 201 | +__plugin_pythoncompat__ = ">=2.7,<4" # python 2 and 3 |
| 202 | +__plugin_disabling_discouraged__ = gettext("Without this plugin OctoPrint will no longer be able to " |
| 203 | + "check if the printer it is connected to has a known safety " |
| 204 | + "issue or otherwise broken firmware and inform you about that fact.") |
| 205 | +__plugin_implementation__ = FirmwareCheckPlugin() |
| 206 | +__plugin_hooks__ = { |
| 207 | + "octoprint.comm.protocol.gcode.received": (__plugin_implementation__.on_gcode_received, 100), |
| 208 | + "octoprint.comm.protocol.firmware.info": (__plugin_implementation__.on_firmware_info_received, 100), |
| 209 | + "octoprint.comm.protocol.firmware.capabilities": (__plugin_implementation__.on_firmware_cap_received, 100), |
| 210 | + "octoprint.events.register_custom_events": register_custom_events, |
| 211 | + "octoprint.access.permissions": __plugin_implementation__.get_additional_permissions |
| 212 | +} |
| 213 | + |
0 commit comments