|
1 | 1 | import os
|
2 | 2 |
|
3 |
| -from django.core.files.storage import DefaultStorage |
4 | 3 | from django.core.management.base import BaseCommand
|
5 | 4 | from django.utils.module_loading import import_string
|
6 | 5 |
|
7 |
| -from PIL import UnidentifiedImageError |
8 |
| - |
9 | 6 | from filer import settings as filer_settings
|
| 7 | +from filer.models.filemodels import File |
10 | 8 | from filer.utils.loader import load_model
|
11 | 9 |
|
| 10 | +from PIL import UnidentifiedImageError |
| 11 | + |
12 | 12 |
|
13 | 13 | class Command(BaseCommand):
|
14 |
| - help = "Look for orphaned files in media folders." |
15 |
| - storage = DefaultStorage() |
16 |
| - prefix = filer_settings.FILER_STORAGES['public']['main']['UPLOAD_TO_PREFIX'] |
| 14 | + help = "Check for orphaned files, missing file references, and set image dimensions." |
17 | 15 |
|
18 | 16 | def add_arguments(self, parser):
|
19 | 17 | parser.add_argument(
|
20 | 18 | '--orphans',
|
21 | 19 | action='store_true',
|
22 | 20 | dest='orphans',
|
23 | 21 | default=False,
|
24 |
| - help="Walk through the media folders and look for orphaned files.", |
| 22 | + help="Scan media folders for orphaned files.", |
25 | 23 | )
|
26 | 24 | parser.add_argument(
|
27 | 25 | '--delete-orphans',
|
28 | 26 | action='store_true',
|
29 | 27 | dest='delete_orphans',
|
30 | 28 | default=False,
|
31 |
| - help="Delete orphaned files from their media folders.", |
| 29 | + help="Delete orphaned files from storage.", |
32 | 30 | )
|
33 | 31 | parser.add_argument(
|
34 | 32 | '--missing',
|
35 | 33 | action='store_true',
|
36 | 34 | dest='missing',
|
37 | 35 | default=False,
|
38 |
| - help="Verify media folders and report about missing files.", |
| 36 | + help="Check file references and report missing files.", |
39 | 37 | )
|
40 | 38 | parser.add_argument(
|
41 | 39 | '--delete-missing',
|
42 | 40 | action='store_true',
|
43 | 41 | dest='delete_missing',
|
44 | 42 | default=False,
|
45 |
| - help="Delete references in database if files are missing in media folder.", |
| 43 | + help="Delete database entries if files are missing in the media folder.", |
46 | 44 | )
|
47 | 45 | parser.add_argument(
|
48 | 46 | '--image-dimensions',
|
49 | 47 | action='store_true',
|
50 | 48 | dest='image_dimensions',
|
51 | 49 | default=False,
|
52 |
| - help="Look for images without dimensions set, set them accordingly.", |
| 50 | + help="Set image dimensions if they are not set.", |
53 | 51 | )
|
54 | 52 | parser.add_argument(
|
55 | 53 | '--noinput',
|
56 | 54 | '--no-input',
|
57 | 55 | action='store_false',
|
58 | 56 | dest='interactive',
|
59 | 57 | default=True,
|
60 |
| - help="Do NOT prompt the user for input of any kind." |
| 58 | + help="Do not prompt the user for any interactive input.", |
61 | 59 | )
|
62 | 60 |
|
63 | 61 | def handle(self, *args, **options):
|
64 | 62 | if options['missing']:
|
65 | 63 | self.verify_references(options)
|
66 | 64 | if options['delete_missing']:
|
67 | 65 | if options['interactive']:
|
68 |
| - msg = "\nThis will delete entries from your database. Are you sure you want to do this?\n\n" \ |
69 |
| - "Type 'yes' to continue, or 'no' to cancel: " |
70 |
| - if input(msg) != 'yes': |
71 |
| - self.stdout.write("Aborted: Delete missing file entries from database.") |
| 66 | + if input( |
| 67 | + "\nThis will delete missing file references from the database.\n" |
| 68 | + "Type 'yes' to continue, or 'no' to cancel: " |
| 69 | + ) != 'yes': |
| 70 | + self.stdout.write("Aborted: Missing file references were not deleted.\n") |
| 71 | + self.stdout.flush() |
72 | 72 | return
|
73 | 73 | self.verify_references(options)
|
74 | 74 |
|
75 |
| - if options['orphans']: |
76 |
| - self.verify_storages(options) |
77 |
| - if options['delete_orphans']: |
78 |
| - if options['interactive']: |
79 |
| - msg = "\nThis will delete orphaned files from your storage. Are you sure you want to do this?\n\n" \ |
80 |
| - "Type 'yes' to continue, or 'no' to cancel: " |
81 |
| - if input(msg) != 'yes': |
82 |
| - self.stdout.write("Aborted: Delete orphaned files from storage.") |
| 75 | + if options['orphans'] or options['delete_orphans']: |
| 76 | + if options['delete_orphans'] and options['interactive']: |
| 77 | + if input( |
| 78 | + "\nThis will delete orphaned files from storage.\n" |
| 79 | + "Type 'yes' to continue, or 'no' to cancel: " |
| 80 | + ) != 'yes': |
| 81 | + self.stdout.write("Aborted: Orphaned files were not deleted.\n") |
| 82 | + self.stdout.flush() |
83 | 83 | return
|
84 | 84 | self.verify_storages(options)
|
| 85 | + |
85 | 86 | if options['image_dimensions']:
|
86 | 87 | self.image_dimensions(options)
|
87 | 88 |
|
88 | 89 | def verify_references(self, options):
|
89 |
| - from filer.models.filemodels import File |
90 |
| - |
| 90 | + """ |
| 91 | + Checks that every file reference in the database exists in storage. |
| 92 | + If a file is missing, either report it or delete the reference based on the provided options. |
| 93 | + """ |
91 | 94 | for file in File.objects.all():
|
92 | 95 | if not file.file.storage.exists(file.file.name):
|
93 | 96 | if options['delete_missing']:
|
94 | 97 | file.delete()
|
95 |
| - msg = "Delete missing file reference '{}/{}' from database." |
| 98 | + verbose_msg = f"Deleted missing file reference '{file.folder}/{file}' from the database." |
96 | 99 | else:
|
97 |
| - msg = "Referenced file '{}/{}' is missing in media folder." |
98 |
| - if options['verbosity'] > 2: |
99 |
| - self.stdout.write(msg.format(str(file.folder), str(file))) |
100 |
| - elif options['verbosity']: |
101 |
| - self.stdout.write(os.path.join(str(file.folder), str(file))) |
| 100 | + verbose_msg = f"File reference '{file.folder}/{file}' is missing in storage." |
| 101 | + if options.get('verbosity', 1) > 2: |
| 102 | + self.stdout.write(verbose_msg + "\n") |
| 103 | + self.stdout.flush() |
| 104 | + elif options.get('verbosity'): |
| 105 | + self.stdout.write(os.path.join(str(file.folder), str(file)) + "\n") |
| 106 | + self.stdout.flush() |
102 | 107 |
|
103 | 108 | def verify_storages(self, options):
|
104 |
| - from filer.models.filemodels import File |
105 |
| - |
106 |
| - def walk(prefix): |
| 109 | + """ |
| 110 | + Scans all storages defined in FILER_STORAGES (e.g., public and private) |
| 111 | + for orphaned files, then reports or deletes them based on the options. |
| 112 | + """ |
| 113 | + |
| 114 | + def walk(storage, prefix, label_prefix): |
| 115 | + # If the directory does not exist, there is nothing to scan |
| 116 | + if not storage.exists(prefix): |
| 117 | + return |
107 | 118 | child_dirs, files = storage.listdir(prefix)
|
108 | 119 | for filename in files:
|
109 |
| - relfilename = os.path.join(prefix, filename) |
110 |
| - if not File.objects.filter(file=relfilename).exists(): |
| 120 | + actual_path = os.path.join(prefix, filename) |
| 121 | + relfilename = os.path.join(label_prefix, filename) |
| 122 | + if not File.objects.filter(file=actual_path).exists(): |
111 | 123 | if options['delete_orphans']:
|
112 |
| - storage.delete(relfilename) |
113 |
| - msg = "Deleted orphaned file '{}'" |
| 124 | + storage.delete(actual_path) |
| 125 | + message = f"Deleted orphaned file '{relfilename}'" |
114 | 126 | else:
|
115 |
| - msg = "Found orphaned file '{}'" |
116 |
| - if options['verbosity'] > 2: |
117 |
| - self.stdout.write(msg.format(relfilename)) |
118 |
| - elif options['verbosity']: |
119 |
| - self.stdout.write(relfilename) |
120 |
| - |
| 127 | + message = f"Found orphaned file '{relfilename}'" |
| 128 | + if options.get('verbosity', 1) > 2: |
| 129 | + self.stdout.write(message + "\n") |
| 130 | + self.stdout.flush() |
| 131 | + elif options.get('verbosity'): |
| 132 | + self.stdout.write(relfilename + "\n") |
| 133 | + self.stdout.flush() |
121 | 134 | for child in child_dirs:
|
122 |
| - walk(os.path.join(prefix, child)) |
123 |
| - |
124 |
| - filer_public = filer_settings.FILER_STORAGES['public']['main'] |
125 |
| - storage = import_string(filer_public['ENGINE'])() |
126 |
| - walk(filer_public['UPLOAD_TO_PREFIX']) |
| 135 | + walk(storage, os.path.join(prefix, child), os.path.join(label_prefix, child)) |
| 136 | + |
| 137 | + # Loop through each storage configuration (e.g., public, private, etc.) |
| 138 | + for storage_name, storage_config in filer_settings.FILER_STORAGES.items(): |
| 139 | + storage_settings = storage_config.get('main') |
| 140 | + if not storage_settings: |
| 141 | + continue |
| 142 | + storage = import_string(storage_settings['ENGINE'])() |
| 143 | + if storage_settings.get('OPTIONS', {}).get('location'): |
| 144 | + storage.location = storage_settings['OPTIONS']['location'] |
| 145 | + # Set label_prefix: for public and private storages, use their names. |
| 146 | + label_prefix = storage_name if storage_name in ['public', 'private'] else storage_settings.get('UPLOAD_TO_PREFIX', '') |
| 147 | + walk(storage, storage_settings.get('UPLOAD_TO_PREFIX', ''), label_prefix) |
127 | 148 |
|
128 | 149 | def image_dimensions(self, options):
|
| 150 | + """ |
| 151 | + For images without set dimensions (_width == 0 or None), try to read their dimensions |
| 152 | + and save them, handling SVG files and possible image errors. |
| 153 | + """ |
129 | 154 | from django.db.models import Q
|
130 |
| - |
131 | 155 | import easy_thumbnails
|
132 | 156 | from easy_thumbnails.VIL import Image as VILImage
|
133 |
| - |
134 | 157 | from filer.utils.compatibility import PILImage
|
135 | 158 |
|
136 |
| - no_dimensions = load_model(filer_settings.FILER_IMAGE_MODEL).objects.filter( |
137 |
| - Q(_width=0) | Q(_width__isnull=True) |
138 |
| - ) |
139 |
| - self.stdout.write(f"trying to set dimensions on {no_dimensions.count()} files") |
140 |
| - for image in no_dimensions: |
141 |
| - if image.file_ptr: |
142 |
| - file_holder = image.file_ptr |
143 |
| - else: |
144 |
| - file_holder = image |
| 159 | + ImageModel = load_model(filer_settings.FILER_IMAGE_MODEL) |
| 160 | + images_without_dimensions = ImageModel.objects.filter(Q(_width=0) | Q(_width__isnull=True)) |
| 161 | + self.stdout.write(f"Setting dimensions for {images_without_dimensions.count()} images" + "\n") |
| 162 | + self.stdout.flush() |
| 163 | + for image in images_without_dimensions: |
| 164 | + file_holder = image.file_ptr if getattr(image, 'file_ptr', None) else image |
145 | 165 | try:
|
146 | 166 | imgfile = file_holder.file
|
147 | 167 | imgfile.seek(0)
|
148 |
| - except (FileNotFoundError): |
149 |
| - pass |
| 168 | + except FileNotFoundError: |
| 169 | + continue |
| 170 | + if image.file.name.lower().endswith('.svg'): |
| 171 | + # For SVG files, use VILImage (invalid SVGs do not throw errors) |
| 172 | + with VILImage.load(imgfile) as vil_image: |
| 173 | + image._width, image._height = vil_image.size |
150 | 174 | else:
|
151 |
| - if image.file.name.lower().endswith('.svg'): |
152 |
| - with VILImage.load(imgfile) as vil_image: |
153 |
| - # invalid svg doesnt throw errors |
154 |
| - image._width, image._height = vil_image.size |
155 |
| - else: |
156 |
| - try: |
157 |
| - with PILImage.open(imgfile) as pil_image: |
158 |
| - image._width, image._height = pil_image.size |
159 |
| - image._transparent = easy_thumbnails.utils.is_transparent(pil_image) |
160 |
| - except UnidentifiedImageError: |
161 |
| - continue |
162 |
| - image.save() |
163 |
| - return |
| 175 | + try: |
| 176 | + with PILImage.open(imgfile) as pil_image: |
| 177 | + image._width, image._height = pil_image.size |
| 178 | + image._transparent = easy_thumbnails.utils.is_transparent(pil_image) |
| 179 | + except UnidentifiedImageError: |
| 180 | + continue |
| 181 | + image.save() |
0 commit comments