203 lines
7.4 KiB
Python
203 lines
7.4 KiB
Python
import logging
|
|
import json
|
|
import os
|
|
import base64
|
|
|
|
import gi
|
|
gi.require_version("Gtk", "3.0")
|
|
from gi.repository import Gtk, Gdk, GLib
|
|
|
|
class KlippyFiles():
|
|
thumbnail_dir = "/tmp/.KS-thumbnails"
|
|
|
|
def __init__(self, screen):
|
|
self.loop = None
|
|
self._poll_task = None
|
|
self._screen = screen
|
|
self.callbacks = []
|
|
self.files = {}
|
|
self.filelist = []
|
|
self.metadata_timeout = {}
|
|
|
|
if not os.path.exists(self.thumbnail_dir):
|
|
os.makedirs(self.thumbnail_dir)
|
|
|
|
self.gcodes_path = None
|
|
|
|
def initialize(self):
|
|
self.gcodes_path = None
|
|
if "virtual_sdcard" in self._screen.printer.get_config_section_list():
|
|
vsd = self._screen.printer.get_config_section("virtual_sdcard")
|
|
if "path" in vsd:
|
|
self.gcodes_path = vsd['path']
|
|
logging.info("Gcodes path: %s" % self.gcodes_path)
|
|
|
|
def _callback(self, result, method, params):
|
|
if method == "server.files.list":
|
|
if "result" in result and isinstance(result['result'], list):
|
|
newfiles = []
|
|
deletedfiles = self.filelist.copy()
|
|
for item in result['result']:
|
|
file = item['filename'] if "filename" in item else item['path']
|
|
if file in self.files:
|
|
deletedfiles.remove(file)
|
|
else:
|
|
newfiles.append(file)
|
|
self.add_file(item, False)
|
|
|
|
if len(newfiles) > 0 or len(deletedfiles) > 0:
|
|
self.run_callbacks(newfiles, deletedfiles)
|
|
|
|
if len(deletedfiles) > 0:
|
|
for file in deletedfiles:
|
|
self.remove_file(file)
|
|
elif method == "server.files.directory":
|
|
if "result" in result:
|
|
dir = params['path'][7:] if params['path'].startswith('gcodes/') else params['path']
|
|
if dir[-1] == '/':
|
|
dir = dir[:-1]
|
|
|
|
newfiles = []
|
|
for file in result['result']['files']:
|
|
fullpath = "%s/%s" % (dir, file['filename'])
|
|
if fullpath not in self.filelist:
|
|
newfiles.append(fullpath)
|
|
|
|
if len(newfiles) > 0:
|
|
self.run_callbacks(newfiles)
|
|
elif method == "server.files.metadata":
|
|
if "error" in result.keys():
|
|
logging.debug("Error in getting metadata for %s. Retrying in 6 seconds" % (params['filename']))
|
|
return
|
|
|
|
for x in result['result']:
|
|
self.files[params['filename']][x] = result['result'][x]
|
|
if "thumbnails" in self.files[params['filename']]:
|
|
self.files[params['filename']]['thumbnails'].sort(key=lambda x: x['size'], reverse=True)
|
|
|
|
for thumbnail in self.files[params['filename']]['thumbnails']:
|
|
thumbnail['local'] = False
|
|
if self.gcodes_path is not None:
|
|
fpath = os.path.join(self.gcodes_path, params['filename'])
|
|
fdir = os.path.dirname(fpath)
|
|
path = os.path.join(fdir, thumbnail['relative_path'])
|
|
if os.access(path, os.R_OK):
|
|
thumbnail['local'] = True
|
|
thumbnail['path'] = path
|
|
if thumbnail['local'] is False:
|
|
fdir = os.path.dirname(params['filename'])
|
|
thumbnail['path'] = os.path.join(fdir, thumbnail['relative_path'])
|
|
self.run_callbacks(mods=[params['filename']])
|
|
|
|
def add_file(self, item, notify=True):
|
|
if 'filename' not in item and 'path' not in item:
|
|
logging.info("Error adding item, unknown filename or path: %s" % item)
|
|
return
|
|
|
|
filename = item['path'] if "path" in item else item['filename']
|
|
if filename in self.filelist:
|
|
logging.info("File already exists: %s" % filename)
|
|
self.request_metadata(filename)
|
|
GLib.timeout_add(1000, self.run_callbacks, mods=[filename])
|
|
return
|
|
|
|
self.filelist.append(filename)
|
|
self.files[filename] = {
|
|
"size": item['size'],
|
|
"modified": item['modified']
|
|
}
|
|
self.request_metadata(filename)
|
|
if notify is True:
|
|
self.run_callbacks(newfiles=[filename])
|
|
|
|
def add_file_callback(self, callback):
|
|
try:
|
|
self.callbacks.append(callback)
|
|
except Exception:
|
|
logging.debug("Callback not found: %s" % callback)
|
|
|
|
def process_update(self, data):
|
|
if 'item' in data and data['item']['root'] != 'gcodes':
|
|
return
|
|
|
|
if data['action'] == "create_dir":
|
|
self._screen._ws.klippy.get_file_dir("gcodes/%s" % data['item']['path'], self._callback)
|
|
elif data['action'] == "create_file":
|
|
self.add_file(data['item'])
|
|
elif data['action'] == "delete_file":
|
|
self.remove_file(data['item']['path'])
|
|
elif data['action'] == "modify_file":
|
|
self.request_metadata(data['item']['path'])
|
|
elif data['action'] == "move_file":
|
|
self.add_file(data['item'], False)
|
|
self.remove_file(data['source_item']['path'], False)
|
|
self.run_callbacks(newfiles=[data['item']['path']], deletedfiles=[data['source_item']['path']])
|
|
|
|
def remove_file_callback(self, callback):
|
|
if callback in self.callbacks:
|
|
self.callbacks.pop(self.callbacks.index(callback))
|
|
|
|
def file_exists(self, filename):
|
|
return True if filename in self.filelist else False
|
|
|
|
def file_metadata_exists(self, filename):
|
|
if not self.file_exists(filename):
|
|
return False
|
|
if "slicer" in self.files[filename]:
|
|
return True
|
|
return False
|
|
|
|
def get_thumbnail_location(self, filename):
|
|
if not self.has_thumbnail(filename):
|
|
return None
|
|
|
|
thumb = self.files[filename]['thumbnails'][0]
|
|
if thumb['local'] is False:
|
|
return ['http', thumb['path']]
|
|
return ['file', thumb['path']]
|
|
|
|
def has_thumbnail(self, filename):
|
|
if filename not in self.files:
|
|
return False
|
|
return "thumbnails" in self.files[filename] and len(self.files[filename]) > 0
|
|
|
|
def request_metadata(self, filename):
|
|
if filename not in self.filelist:
|
|
return False
|
|
self._screen._ws.klippy.get_file_metadata(filename, self._callback)
|
|
|
|
def refresh_files(self):
|
|
self._screen._ws.klippy.get_file_list(self._callback)
|
|
|
|
def remove_file(self, filename, notify=True):
|
|
if filename not in self.filelist:
|
|
return
|
|
|
|
self.filelist.remove(filename)
|
|
self.files.pop(filename, None)
|
|
|
|
if notify is True:
|
|
self.run_callbacks(deletedfiles=[filename])
|
|
|
|
def ret_file_data(self, filename):
|
|
print("Getting file info for %s" % (filename))
|
|
self._screen._ws.klippy.get_file_metadata(filename, self._callback)
|
|
|
|
def run_callbacks(self, newfiles=[], deletedfiles=[], mods=[]):
|
|
if len(self.callbacks) <= 0:
|
|
return
|
|
|
|
for cb in self.callbacks:
|
|
GLib.idle_add(cb, newfiles, deletedfiles, mods)
|
|
|
|
return False
|
|
|
|
def get_file_list(self):
|
|
return self.filelist
|
|
|
|
def get_file_info(self, filename):
|
|
if filename not in self.files:
|
|
return {"path": None, "modified": 0, "size": 0}
|
|
|
|
return self.files[filename]
|