application: verify upload filename

Signed-off-by:  Eric Callahan <arksine.code@gmail.com>
This commit is contained in:
Eric Callahan
2024-10-21 20:29:54 -04:00
parent 71f9e677b8
commit 4e00a0760e

@@ -980,10 +980,7 @@ class FileUploadHandler(AuthorizedRequestHandler):
async def post(self) -> None:
if self.parse_failed:
self._file.on_finish()
try:
os.remove(self._file.filename)
except Exception:
pass
self._remove_temp_file()
raise tornado.web.HTTPError(500, "File Upload Parsing Failed")
form_args = {}
chk_target = self._targets.pop('checksum')
@@ -992,20 +989,20 @@ class FileUploadHandler(AuthorizedRequestHandler):
# Validate checksum
recd_cksum = chk_target.value.decode().lower()
if calc_chksum != recd_cksum:
# remove temporary file
try:
os.remove(self._file.filename)
except Exception:
pass
self._remove_temp_file()
raise tornado.web.HTTPError(
422,
f"File checksum mismatch: expected {recd_cksum}, "
f"calculated {calc_chksum}"
)
mp_fname: Optional[str] = self._file.multipart_filename
if mp_fname is None or not mp_fname.strip():
self._remove_temp_file()
raise tornado.web.HTTPError(400, "Multipart filename omitted")
for name, target in self._targets.items():
if target.value:
form_args[name] = target.value.decode()
form_args['filename'] = self._file.multipart_filename
form_args['filename'] = mp_fname
form_args['tmp_file_path'] = self._file.filename
debug_msg = "\nFile Upload Arguments:"
for name, value in form_args.items():
@@ -1013,7 +1010,7 @@ class FileUploadHandler(AuthorizedRequestHandler):
debug_msg += f"\nChecksum: {calc_chksum}"
form_args["current_user"] = self.current_user
logging.debug(debug_msg)
logging.info(f"Processing Uploaded File: {self._file.multipart_filename}")
logging.info(f"Processing Uploaded File: {mp_fname}")
try:
result = await self.file_manager.finalize_upload(form_args)
except ServerError as e:
@@ -1041,6 +1038,12 @@ class FileUploadHandler(AuthorizedRequestHandler):
self.set_header("Content-Type", "application/json; charset=UTF-8")
self.finish(jsonw.dumps(result))
def _remove_temp_file(self) -> None:
try:
os.remove(self._file.filename)
except Exception:
pass
# Default Handler for unregistered endpoints
class AuthorizedErrorHandler(AuthorizedRequestHandler):
async def prepare(self) -> None: