Optimize CSV to XML generation and validation

Refactor CSV to XML generator with improved structure and validation.
This commit is contained in:
jfontaine35 2025-11-19 14:30:46 +01:00 committed by GitHub
parent 8beb298a0c
commit fedb0b3c7a
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
1 changed files with 356 additions and 146 deletions

View File

@ -1,159 +1,369 @@
"""
Générateur XML pour certifications CPF - Version Optimisée
Génère un fichier XML conforme au schéma urn:cdc:cpf:pc5:schema:1.0.0
"""
import xml.etree.ElementTree as ET
from xml.dom import minidom
import csv
import random
import logging
import datetime
import subprocess
import pytz
import os
import os
from pathlib import Path
from typing import Optional, List, Dict
import sys
# Emplacement du fichier CSV et XML
csv_file = "Dico2.csv"
xml_file = "file.xml"
tz_paris = pytz.timezone('Europe/Paris')
#Efface la console
def clear_console():
os.system('cls' if os.name == 'nt' else 'clear')
clear_console()
# Configuration
CSV_FILE = "Data.csv"
XML_FILE = "file.xml"
XSD_FILE = "validation.xsd"
TZ_PARIS = pytz.timezone('Europe/Paris')
# Configuration du logging
logging.basicConfig(level=logging.DEBUG, format='%(asctime)s - %(levelname)s - %(message)s')
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler('xml_generation.log', encoding='utf-8'),
logging.StreamHandler()
]
)
logger = logging.getLogger(__name__)
def indent(elem, level=0):
""" Ajoute des indentations aux éléments pour une meilleure lisibilité du fichier XML. """
i = "\n" + level * " "
if len(elem):
if not elem.text or not elem.text.strip():
elem.text = i + " "
if not elem.tail or not elem.tail.strip():
elem.tail = i
for elem in elem:
indent(elem, level + 1)
if not elem.tail or not elem.tail.strip():
elem.tail = i
class XMLValidator:
"""Classe pour valider le XML généré contre le XSD"""
@staticmethod
def validate_xml(xml_file: str, xsd_file: str) -> bool:
"""Valide un fichier XML contre un schéma XSD"""
try:
from lxml import etree
# Charger le schéma XSD
with open(xsd_file, 'rb') as f:
schema_root = etree.XML(f.read())
schema = etree.XMLSchema(schema_root)
# Charger le fichier XML
with open(xml_file, 'rb') as f:
xml_doc = etree.parse(f)
# Valider
is_valid = schema.validate(xml_doc)
if not is_valid:
logger.error("Erreurs de validation XSD:")
for error in schema.error_log:
logger.error(f" Ligne {error.line}: {error.message}")
return False
logger.info("✓ Le fichier XML est valide selon le schéma XSD")
return True
except ImportError:
logger.warning("Module lxml non disponible. Installation recommandée: pip install lxml")
return None
except Exception as e:
logger.error(f"Erreur lors de la validation: {e}")
return False
class CPFXMLGenerator:
"""Générateur de fichiers XML pour les certifications CPF"""
def __init__(self, csv_filepath: str, xml_filepath: str):
self.csv_filepath = Path(csv_filepath)
self.xml_filepath = Path(xml_filepath)
self.namespace = "urn:cdc:cpf:pc5:schema:1.0.0"
self.xsi_namespace = "http://www.w3.org/2001/XMLSchema-instance"
def validate_csv_structure(self, rows: List[List[str]]) -> bool:
"""Valide la structure du fichier CSV"""
if len(rows) < 6:
logger.error("Le fichier CSV doit contenir au moins 6 lignes")
return False
# Vérifier que la ligne d'en-tête existe
if not rows[5] or len(rows[5]) < 25:
logger.error("La ligne d'en-tête (ligne 6) est incomplète")
return False
return True
def clean_string(self, value: str, max_length: Optional[int] = None) -> str:
"""Nettoie et tronque une chaîne si nécessaire"""
cleaned = value.strip()
if max_length and len(cleaned) > max_length:
logger.warning(f"Valeur tronquée: '{cleaned}' -> '{cleaned[:max_length]}'")
cleaned = cleaned[:max_length]
return cleaned
def format_date(self, date_str: str) -> str:
"""
Convertit une date du format DD/MM/YYYY vers YYYY-MM-DD
Gère aussi les dates déjà au bon format
"""
date_str = date_str.strip()
# Si déjà au format ISO (YYYY-MM-DD)
if len(date_str) == 10 and date_str[4] == '-' and date_str[7] == '-':
return date_str
# Format DD/MM/YYYY
if '/' in date_str:
parts = date_str.split('/')
if len(parts) == 3 and len(parts[2]) == 4:
return f"{parts[2]}-{parts[1].zfill(2)}-{parts[0].zfill(2)}"
logger.warning(f"Format de date non reconnu: {date_str}")
return date_str
def get_current_timestamp(self) -> str:
"""Génère un horodatage au format requis par le XSD"""
now = datetime.datetime.now(TZ_PARIS)
return now.strftime("%Y-%m-%dT%H:%M:%S+01:00")
def create_element(self, parent: ET.Element, tag: str, text: Optional[str] = None,
attrib: Optional[Dict] = None) -> ET.Element:
"""Crée un élément XML avec le namespace cpf"""
full_tag = f"{{{self.namespace}}}{tag}"
elem = ET.SubElement(parent, full_tag, attrib or {})
if text is not None:
elem.text = str(text)
return elem
def add_titulaire(self, parent: ET.Element, row: List[str]) -> None:
"""Ajoute les informations du titulaire"""
titulaire = self.create_element(parent, "titulaire")
# Données obligatoires
self.create_element(titulaire, "nomNaissance", self.clean_string(row[17], 60))
# nomUsage (nillable)
if row[18].strip() and row[18].strip().lower() != 'nil':
self.create_element(titulaire, "nomUsage", self.clean_string(row[18], 60))
else:
self.create_element(titulaire, "nomUsage", attrib={f"{{{self.xsi_namespace}}}nil": "true"})
self.create_element(titulaire, "prenom1", self.clean_string(row[19], 60))
self.create_element(titulaire, "anneeNaissance", self.clean_string(row[20]))
# moisNaissance (nillable)
if row[21].strip() and row[21].strip().lower() != 'nil':
self.create_element(titulaire, "moisNaissance", self.clean_string(row[21]))
else:
self.create_element(titulaire, "moisNaissance", attrib={f"{{{self.xsi_namespace}}}nil": "true"})
# jourNaissance (nillable)
if row[22].strip() and row[22].strip().lower() != 'nil':
self.create_element(titulaire, "jourNaissance", self.clean_string(row[22]))
else:
self.create_element(titulaire, "jourNaissance", attrib={f"{{{self.xsi_namespace}}}nil": "true"})
self.create_element(titulaire, "sexe", self.clean_string(row[23]))
# Code commune naissance
code_commune = self.create_element(titulaire, "codeCommuneNaissance")
code_postal_elem = self.create_element(code_commune, "codePostalNaissance")
if row[24].strip() and row[24].strip().lower() != 'nil':
self.create_element(code_postal_elem, "codePostal", self.clean_string(row[24], 9))
else:
self.create_element(code_postal_elem, "codePostal", attrib={f"{{{self.xsi_namespace}}}nil": "true"})
def add_passage_certification(self, parent: ET.Element, row: List[str]) -> None:
"""Ajoute un passage de certification"""
passage = self.create_element(parent, "passageCertification")
# Données obligatoires
self.create_element(passage, "idTechnique", self.clean_string(row[7], 255))
self.create_element(passage, "obtentionCertification", row[8].upper())
self.create_element(passage, "donneeCertifiee", row[9].lower())
# Dates
self.create_element(passage, "dateDebutValidite", self.format_date(row[10]))
# dateFinValidite (nillable)
if row[11].strip().lower() == 'nil':
self.create_element(passage, "dateFinValidite", attrib={f"{{{self.xsi_namespace}}}nil": "true"})
else:
self.create_element(passage, "dateFinValidite", self.format_date(row[11]))
# Niveaux européens
self.create_element(passage, "presenceNiveauLangueEuro", row[12].lower())
self.create_element(passage, "presenceNiveauNumeriqueEuro", row[13].lower())
# Scoring (nillable)
if row[14].strip().lower() == 'nil':
self.create_element(passage, "scoring", attrib={f"{{{self.xsi_namespace}}}nil": "true"})
else:
self.create_element(passage, "scoring", self.clean_string(row[14], 255))
# Mention validée (nillable)
if row[15].strip().lower() == 'nil':
self.create_element(passage, "mentionValidee", attrib={f"{{{self.xsi_namespace}}}nil": "true"})
else:
self.create_element(passage, "mentionValidee", row[15].upper())
# Modalités inscription
modalites = self.create_element(passage, "modalitesInscription")
self.create_element(modalites, "modaliteAcces", row[16].upper())
# Identification titulaire
identification = self.create_element(passage, "identificationTitulaire")
self.add_titulaire(identification, row)
def generate_xml(self) -> bool:
"""Génère le fichier XML à partir du CSV"""
try:
logger.info(f"Lecture du fichier CSV: {self.csv_filepath}")
# Vérifier l'existence du fichier
if not self.csv_filepath.exists():
logger.error(f"Fichier CSV introuvable: {self.csv_filepath}")
return False
# Lire le CSV
with open(self.csv_filepath, 'r', encoding='utf-8') as f:
reader = csv.reader(f)
rows = list(reader)
# Valider la structure
if not self.validate_csv_structure(rows):
return False
logger.info("Création de la structure XML...")
# Créer l'élément racine
ET.register_namespace('cpf', self.namespace)
ET.register_namespace('xsi', self.xsi_namespace)
root = ET.Element(
f"{{{self.namespace}}}flux",
attrib={
f"{{{self.xsi_namespace}}}schemaLocation":
f"{self.namespace} validation.xsd"
}
)
# En-tête
headers = rows[5]
self.create_element(root, "idFlux", self.clean_string(headers[2], 50))
self.create_element(root, "horodatage", self.get_current_timestamp())
# Émetteur
emetteur = self.create_element(root, "emetteur")
self.create_element(emetteur, "idClient", self.clean_string(headers[3], 8))
# Certificateurs
certificateurs = self.create_element(emetteur, "certificateurs")
certificateur = self.create_element(certificateurs, "certificateur")
self.create_element(certificateur, "idClient", self.clean_string(headers[4], 8))
self.create_element(certificateur, "idContrat", self.clean_string(headers[5], 20))
# Certifications
certifications = self.create_element(certificateur, "certifications")
certification = self.create_element(certifications, "certification")
self.create_element(certification, "type", self.clean_string(headers[6], 255))
self.create_element(certification, "code", self.clean_string(headers[7], 100))
# Passages de certification
passages = self.create_element(certification, "passageCertifications")
# Traiter chaque ligne de données
data_rows = [row for row in rows[6:] if row and row[0].strip()]
logger.info(f"Traitement de {len(data_rows)} certifications...")
for idx, row in enumerate(data_rows, start=1):
try:
logger.info(f" [{idx}/{len(data_rows)}] Ajout: {row[19]} {row[17]}")
self.add_passage_certification(passages, row)
except Exception as e:
logger.error(f"Erreur ligne {idx + 6}: {e}")
continue
# Formater et sauvegarder
logger.info(f"Écriture du fichier XML: {self.xml_filepath}")
xml_str = ET.tostring(root, encoding='utf-8')
# Indentation avec minidom
dom = minidom.parseString(xml_str)
pretty_xml = dom.toprettyxml(indent=" ", encoding='utf-8')
# Supprimer les lignes vides
lines = [line for line in pretty_xml.decode('utf-8').split('\n') if line.strip()]
with open(self.xml_filepath, 'w', encoding='utf-8') as f:
f.write('\n'.join(lines))
logger.info("✓ Fichier XML créé avec succès")
return True
except Exception as e:
logger.error(f"Erreur lors de la génération XML: {e}", exc_info=True)
return False
def main():
"""Fonction principale"""
# Effacer la console
os.system('cls' if os.name == 'nt' else 'clear')
print("=" * 70)
print("GÉNÉRATEUR XML CPF - Version Professionnelle")
print("=" * 70)
print()
# Générer le XML
generator = CPFXMLGenerator(CSV_FILE, XML_FILE)
if not generator.generate_xml():
logger.error("❌ Échec de la génération du fichier XML")
sys.exit(1)
print()
print("-" * 70)
print("VALIDATION DU FICHIER XML")
print("-" * 70)
# Valider le XML
validator = XMLValidator()
if Path(XSD_FILE).exists():
validation_result = validator.validate_xml(XML_FILE, XSD_FILE)
if validation_result is True:
print()
print("" * 35)
print("SUCCESS: Le fichier XML est conforme au schéma XSD")
print("" * 35)
elif validation_result is False:
print()
print("" * 35)
print("ATTENTION: Le fichier XML contient des erreurs de validation")
print("Consultez les logs ci-dessus pour plus de détails")
print("" * 35)
else:
print()
print("" * 35)
print("INFO: Validation XSD non effectuée (module lxml non installé)")
print("Installez lxml pour activer la validation: pip install lxml")
print("" * 35)
else:
if level and (not elem.tail or not elem.tail.strip()):
elem.tail = i
# Fonction pour lire les données CSV et générer le fichier XML
def create_xml_from_csv(csv_filepath, xml_filepath):
logging.info("Ouverture du CSV...")
# Ouvrir le fichier CSV pour la lecture
with open(csv_filepath, newline='', encoding='utf-8') as csvfile:
reader = csv.reader(csvfile)
Allrows = list(reader)
logger.warning(f"Fichier XSD non trouvé: {XSD_FILE}")
print()
print("" * 35)
print("ATTENTION: Fichier XSD non trouvé - validation impossible")
print("" * 35)
# Vérification de la présence des données nécessaires
if len(Allrows) < 5:
logging.error("Les données CSV requises sont manquantes.")
exit()
headers = Allrows[4][2:] # Sauter les deux premiers en-têtes
logging.info("Création de l'arborescence du fichier XML...")
# Créer l'élément racine avec l'espace de noms 'cpf'
root = ET.Element("cpf:flux")
root.set("xmlns:cpf", "urn:cdc:cpf:pc5:schema:1.0.0")
root.set("xmlns:xsi", "http://www.w3.org/2001/XMLSchema-instance")
#Reglage du fuseau horaire
now = datetime.datetime.now(tz_paris)
value = now.strftime("%Y-%m-%dT%H:%M:%S+01:00")
ET.SubElement(root, "cpf:idFlux").text = headers[0]
#str(random.randint(1000000000, 99999999999))
ET.SubElement(root, "cpf:horodatage").text = value
# Création et ajout de l'élément emetteur
emetteur = ET.SubElement(root, "cpf:emetteur")
logging.info("Emmetteur du fchier ...")
# Création de la structure des certificateurs
idClient = str(random.randint(10000000, 99999999)) if Allrows[5][2] == '' else Allrows[5][2]
ET.SubElement(emetteur, "cpf:idClient").text = idClient
certificateurs = ET.SubElement(emetteur, "cpf:certificateurs")
certificateur = ET.SubElement(certificateurs, "cpf:certificateur")
logging.info(f"Certificateur {idClient}...")
# Traitement pour NumClient pour le certificateur
idClient2 = str(random.randint(10000000, 99999999)) if Allrows[5][3] == '' else Allrows[5][3]
ET.SubElement(certificateur, "cpf:idClient").text = idClient2
# Traitement pour NumContrat pour le certificateur
idContrat = str(random.randint(10000000, 99999999)) if Allrows[5][4] == '' else Allrows[5][4]
ET.SubElement(certificateur, "cpf:idContrat").text = idContrat
# Création de la structure des certifications
certifications = ET.SubElement(certificateur, "cpf:certifications")
certification = ET.SubElement(certifications, "cpf:certification")
ET.SubElement(certification, "cpf:type").text = headers[5]
ET.SubElement(certification, "cpf:code").text = headers[6]
# Ajout d'un seul passage de certification
passage_certifications = ET.SubElement(certification, "cpf:passageCertifications")
# Itérer sur chaque ligne du fichier CSV
for row in Allrows[5:]:
# Sauter les lignes vides
if not row[0].strip():
continue
#Ajout des certifications
logging.info(f"Ajout des certifications : {row[7]}")
passage_certification = ET.SubElement(passage_certifications, "cpf:passageCertification")
ET.SubElement(passage_certification, "cpf:idTechnique").text = row[7]
ET.SubElement(passage_certification, "cpf:obtentionCertification").text = row[8]
ET.SubElement(passage_certification, "cpf:donneeCertifiee").text = row[9]
ET.SubElement(passage_certification, "cpf:dateDebutValidite").text = row[10]
# Vérification si la date de fin de validité est 'nil;'
if row[11].strip().lower() == 'nil':
ET.SubElement(passage_certification, "cpf:dateFinValidite", {"xsi:nil": "true"})
else:
ET.SubElement(passage_certification, "cpf:dateFinValidite").text = row[11]
ET.SubElement(passage_certification, "cpf:presenceNiveauLangueEuro").text = row[12]
ET.SubElement(passage_certification, "cpf:presenceNiveauNumeriqueEuro").text = row[13]
if row[14].strip().lower() == 'nil':
ET.SubElement(passage_certification, "cpf:scoring", {"xsi:nil": "true"})
else:
ET.SubElement(passage_certification, "cpf:scoring").text = row[14]
if row[15].strip().lower() == 'nil':
ET.SubElement(passage_certification, "cpf:mentionValidee", {"xsi:nil": "true"})
else:
ET.SubElement(passage_certification, "cpf:mentionValidee").text = row[15]
# Modalite inscription
modalites_inscription = ET.SubElement(passage_certification, "cpf:modalitesInscription")
ET.SubElement(modalites_inscription, "cpf:modaliteAcces").text = row[16]
#Identification Titulaire
identification_titulaire = ET.SubElement(passage_certification, "cpf:identificationTitulaire")
titulaire = ET.SubElement(identification_titulaire, "cpf:titulaire")
logging.info(f"Titulaire : {row[18]} {row[19]}")
ET.SubElement(titulaire, "cpf:nomNaissance").text = row[17]
ET.SubElement(titulaire, "cpf:nomUsage").text = row[18]
ET.SubElement(titulaire, "cpf:prenom1").text = row[19]
ET.SubElement(titulaire, "cpf:anneeNaissance").text = row[20]
ET.SubElement(titulaire, "cpf:moisNaissance").text = row[21]
ET.SubElement(titulaire, "cpf:jourNaissance").text = row[22]
ET.SubElement(titulaire, "cpf:sexe").text = row[23]
code_commune_naissance = ET.SubElement(titulaire, "cpf:codeCommuneNaissance")
code_postal_naissance = ET.SubElement(code_commune_naissance, "cpf:codePostalNaissance")
ET.SubElement(code_postal_naissance, "cpf:codePostal").text = row[24]
print()
print(f"Fichier généré: {XML_FILE}")
print(f"Logs disponibles: xml_generation.log")
# Appliquer l'indentation
indent(root)
# Enregistrement du fichier XML
logging.info(f"Ecriture du XML...")
tree = ET.ElementTree(root)
with open(xml_filepath, "wb") as file:
tree.write(file, encoding="utf-8", xml_declaration=True)
# Appel de la fonction pour créer le fichier XML
create_xml_from_csv(csv_file, xml_file)
logging.info("Fichier XML créé avec succès.")
if __name__ == "__main__":
main()