Skip to content
Snippets Groups Projects
Commit 4c463ef1 authored by Flissi Areski's avatar Flissi Areski
Browse files

Merge branch 'fix_zip' into 'master'

Fix zip

See merge request !6
parents b4148138 150f1dca
No related branches found
No related tags found
1 merge request!6Fix zip
......@@ -27,44 +27,59 @@ def verif_mail(mail):
def extract_files_from_fileitems(fileitems, final_dir, extensions_list=None, unzip=False):
"""
Cette fonction permet l'extraction de fichiers et leur copie vers le répertoire de résultats de l'analyse (dossier 'result/{run_id}').
Ces fichiers ont été fournie par l'utilisateur, le paramètre 'fileitems' dont correspondre à un objet fileitems (type spécifique associé au téléchargement de fichiers). Ils serviront dans l'analyse de PAMPA.
Cette fonction permet de faire un filtre entre les fichiers dont l'extension est valide (extensions passés en arguments).
Les fichiers peuvent être contenus dans un fichier ZIP.
Gère les fichiers uploadés (dont ZIP) et les copie dans le dossier cible.
Args:
fileitems (list): Liste de fichiers type Werkzeug FileStorage
final_dir (str): Dossier de destination
extensions_list (list): Extensions autorisées (ex: ['.txt', '.csv'])
unzip (bool): Auto-extraction des ZIP si True
Returns:
tuple: (fichiers transférés, fichiers rejetés)
"""
files_moved = []
files_not_moved = []
final_dir = final_dir.rstrip('/')
if not os.path.exists(final_dir):
os.mkdir(final_dir)
os.makedirs(final_dir, exist_ok=True)
if extensions_list:
extensions = "|".join(extensions_list)
for f in fileitems:
fn = os.path.basename(f.filename)
re_extension = re.compile(os.path.splitext(fn)[1], re.IGNORECASE) # for extension check
re_extension = re.compile(os.path.splitext(fn)[1], re.IGNORECASE)
if unzip and re_extension.search(".zip"):
with zipfile.ZipFile(f.file, "r") as myzip:
for element in myzip.filelist:
fname = element.filename
re_extension = re.compile(os.path.splitext(fname)[1], re.IGNORECASE) # for extension check
if (extensions_list is None or re_extension.search(extensions)) and not os.path.exists(final_dir + '/' + fname):
open(final_dir + '/' + fname, 'wb').write(myzip.open(fname, "r").read()) # cp file
for element in myzip.infolist():
fname = os.path.basename(element.filename)
if element.is_dir():
continue
re_extension = re.compile(os.path.splitext(fname)[1], re.IGNORECASE)
if (extensions_list is None or re_extension.search(extensions)) and not os.path.exists(
os.path.join(final_dir, fname)):
with myzip.open(element.filename, "r") as src, open(os.path.join(final_dir, fname),
'wb') as dst:
dst.write(src.read())
files_moved.append(fname)
elif fname[-1] == "/":
pass # if a directory is in the zipfile.filelist: pass (no recursivity)
else:
files_not_moved.append(fname)
elif (extensions_list is None or re_extension.search(extensions)) and not os.path.exists(final_dir + '/' + fn):
open(final_dir + '/' + fn, 'wb').write(f.file.read()) # cp file
elif (extensions_list is None or re_extension.search(extensions)) and not os.path.exists(
os.path.join(final_dir, fn)):
with open(os.path.join(final_dir, fn), 'wb') as dst:
dst.write(f.file.read())
files_moved.append(fn)
else:
files_not_moved.append(fn)
return files_moved, files_not_moved
def extract_files_from_list(filename_list, final_dir):
"""
Déplacement de fichiers internes.
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please to comment