Source code for ESSArch_Core.fixity.validation.backends.structure

import logging
import os
import traceback
from os import walk

from django.utils import timezone
from glob2 import glob, iglob

from ESSArch_Core.exceptions import ValidationError
from ESSArch_Core.fixity.models import Validation
from ESSArch_Core.fixity.validation.backends.base import BaseValidator
from ESSArch_Core.util import normalize_path


[docs] class StructureValidator(BaseValidator): """ Validates that the directory has all the required files and no invalid extensions. The ``tree`` option specifies a set of rules for directories: * ``type`` must be either ``root`` or ``folder`` and is only used to specify if we are at the root or not. If ``folder`` is specified then we also require the ``name`` option to be specified. * ``name`` tells us which folder is being described * ``required_files`` is a list with files that must be in the directory * ``valid_paths`` is a list of strings and/or lists that are allowed in the directory. Wildcards can be specified using ``*``. Inner lists are used to specify valid file groups. E.g. ``[['*.doc', '*.pdf']]`` says that for each .doc file there must be a .pdf file with the same name. """ file_validator = False def update_required_files(self, rel_dir, filename, required_files): if rel_dir == '.': rel_file = filename else: rel_file = os.path.join(rel_dir, filename) required_files.remove(rel_file) def in_valid_paths(self, root, path, valid_paths): for valid_path in [p for p in valid_paths if isinstance(p, str)]: if path in list(map(normalize_path, glob(valid_path))): return True for valid_path in [p for p in valid_paths if not isinstance(p, str)]: for nested_valid_path in valid_path: for found_nested_path, matches in iglob(nested_valid_path, with_matches=True): found_nested_path = normalize_path(found_nested_path) if found_nested_path == path: # check matches matches = map(normalize_path, matches) for match in matches: for related_path in valid_path: if related_path != found_nested_path: related_path = related_path.replace('*', match, 1) if not os.path.isfile(related_path): rel_path = normalize_path(os.path.relpath(path, root)) rel_related_path = normalize_path(os.path.relpath(related_path, root)) raise ValidationError('{file} missing related file {related}'.format( file=rel_path, related=rel_related_path )) return True raise ValidationError('{file} is not allowed'.format(file=path)) def validate_folder(self, path, node): valid_paths = node.get('valid_paths', []) allow_empty = node.get('allow_empty', True) required_files = list(map(normalize_path, [req.format(**self.data) for req in node.get('required_files', [])])) file_count = 0 for idx, valid in enumerate(valid_paths): if isinstance(valid, str): valid_paths[idx] = normalize_path(os.path.join(path, valid).format(**self.data)) else: for nested_idx, nested_valid in enumerate(valid): valid[nested_idx] = normalize_path(os.path.join(path, nested_valid).format(**self.data)) for root, _dirs, files in walk(path): for f in files: file_count += 1 if len(valid_paths): try: self.in_valid_paths(path, normalize_path(os.path.join(root, f)), valid_paths) except ValidationError as validation_exc: try: self.update_required_files(os.path.relpath(root, path), f, required_files) except ValueError: raise validation_exc if len(required_files): try: self.update_required_files(os.path.relpath(root, path), f, required_files) except ValueError: pass if not allow_empty and file_count == 0: raise ValidationError('{path} is not allowed to be empty'.format(path=path)) if len(required_files): raise ValidationError('Missing {files} in {path}'.format(files=','.join(required_files), path=path)) def validate(self, filepath, expected=None): logger = logging.getLogger('essarch.fixity.validation.structure') root = self.options.get('tree', []) filepath = normalize_path(filepath) logger.debug("Validating structure of %s" % filepath) val_obj = Validation.objects.create( filename=filepath, time_started=timezone.now(), validator=self.__class__.__name__, required=self.required, task=self.task, information_package=self.ip, responsible=self.responsible, specification={ 'context': self.context, 'options': self.options, } ) passed = False try: for node in root: if node['type'] == 'root': self.validate_folder(filepath, node) elif node['type'] == 'folder': self.validate_folder(os.path.join(filepath, node['name']), node) passed = True except Exception: logger.exception("Structure validation of {} failed".format(filepath)) val_obj.message = traceback.format_exc() raise else: message = "Successful structure validation of %s" % filepath val_obj.message = message logger.info(message) finally: val_obj.time_done = timezone.now() val_obj.passed = passed val_obj.save(update_fields=['time_done', 'passed', 'message']) return message