370 lines
14 KiB
Python
370 lines
14 KiB
Python
"""
|
||
Générateur XML pour certifications CPF - Version Optimisée
|
||
Génère un fichier XML conforme au schéma urn:cdc:cpf:pc5:schema:1.0.0
|
||
"""
|
||
|
||
import xml.etree.ElementTree as ET
|
||
from xml.dom import minidom
|
||
import csv
|
||
import logging
|
||
import datetime
|
||
import pytz
|
||
import os
|
||
from pathlib import Path
|
||
from typing import Optional, List, Dict
|
||
import sys
|
||
|
||
# Configuration
|
||
CSV_FILE = "Data.csv"
|
||
XML_FILE = "file.xml"
|
||
XSD_FILE = "validation.xsd"
|
||
TZ_PARIS = pytz.timezone('Europe/Paris')
|
||
|
||
# Configuration du logging
|
||
logging.basicConfig(
|
||
level=logging.INFO,
|
||
format='%(asctime)s - %(levelname)s - %(message)s',
|
||
handlers=[
|
||
logging.FileHandler('xml_generation.log', encoding='utf-8'),
|
||
logging.StreamHandler()
|
||
]
|
||
)
|
||
logger = logging.getLogger(__name__)
|
||
|
||
|
||
class XMLValidator:
|
||
"""Classe pour valider le XML généré contre le XSD"""
|
||
|
||
@staticmethod
|
||
def validate_xml(xml_file: str, xsd_file: str) -> bool:
|
||
"""Valide un fichier XML contre un schéma XSD"""
|
||
try:
|
||
from lxml import etree
|
||
|
||
# Charger le schéma XSD
|
||
with open(xsd_file, 'rb') as f:
|
||
schema_root = etree.XML(f.read())
|
||
schema = etree.XMLSchema(schema_root)
|
||
|
||
# Charger le fichier XML
|
||
with open(xml_file, 'rb') as f:
|
||
xml_doc = etree.parse(f)
|
||
|
||
# Valider
|
||
is_valid = schema.validate(xml_doc)
|
||
|
||
if not is_valid:
|
||
logger.error("Erreurs de validation XSD:")
|
||
for error in schema.error_log:
|
||
logger.error(f" Ligne {error.line}: {error.message}")
|
||
return False
|
||
|
||
logger.info("✓ Le fichier XML est valide selon le schéma XSD")
|
||
return True
|
||
|
||
except ImportError:
|
||
logger.warning("Module lxml non disponible. Installation recommandée: pip install lxml")
|
||
return None
|
||
except Exception as e:
|
||
logger.error(f"Erreur lors de la validation: {e}")
|
||
return False
|
||
|
||
|
||
class CPFXMLGenerator:
|
||
"""Générateur de fichiers XML pour les certifications CPF"""
|
||
|
||
def __init__(self, csv_filepath: str, xml_filepath: str):
|
||
self.csv_filepath = Path(csv_filepath)
|
||
self.xml_filepath = Path(xml_filepath)
|
||
self.namespace = "urn:cdc:cpf:pc5:schema:1.0.0"
|
||
self.xsi_namespace = "http://www.w3.org/2001/XMLSchema-instance"
|
||
|
||
def validate_csv_structure(self, rows: List[List[str]]) -> bool:
|
||
"""Valide la structure du fichier CSV"""
|
||
if len(rows) < 6:
|
||
logger.error("Le fichier CSV doit contenir au moins 6 lignes")
|
||
return False
|
||
|
||
# Vérifier que la ligne d'en-tête existe
|
||
if not rows[5] or len(rows[5]) < 25:
|
||
logger.error("La ligne d'en-tête (ligne 6) est incomplète")
|
||
return False
|
||
|
||
return True
|
||
|
||
def clean_string(self, value: str, max_length: Optional[int] = None) -> str:
|
||
"""Nettoie et tronque une chaîne si nécessaire"""
|
||
cleaned = value.strip()
|
||
if max_length and len(cleaned) > max_length:
|
||
logger.warning(f"Valeur tronquée: '{cleaned}' -> '{cleaned[:max_length]}'")
|
||
cleaned = cleaned[:max_length]
|
||
return cleaned
|
||
|
||
def format_date(self, date_str: str) -> str:
|
||
"""
|
||
Convertit une date du format DD/MM/YYYY vers YYYY-MM-DD
|
||
Gère aussi les dates déjà au bon format
|
||
"""
|
||
date_str = date_str.strip()
|
||
|
||
# Si déjà au format ISO (YYYY-MM-DD)
|
||
if len(date_str) == 10 and date_str[4] == '-' and date_str[7] == '-':
|
||
return date_str
|
||
|
||
# Format DD/MM/YYYY
|
||
if '/' in date_str:
|
||
parts = date_str.split('/')
|
||
if len(parts) == 3 and len(parts[2]) == 4:
|
||
return f"{parts[2]}-{parts[1].zfill(2)}-{parts[0].zfill(2)}"
|
||
|
||
logger.warning(f"Format de date non reconnu: {date_str}")
|
||
return date_str
|
||
|
||
def get_current_timestamp(self) -> str:
|
||
"""Génère un horodatage au format requis par le XSD"""
|
||
now = datetime.datetime.now(TZ_PARIS)
|
||
return now.strftime("%Y-%m-%dT%H:%M:%S+01:00")
|
||
|
||
def create_element(self, parent: ET.Element, tag: str, text: Optional[str] = None,
|
||
attrib: Optional[Dict] = None) -> ET.Element:
|
||
"""Crée un élément XML avec le namespace cpf"""
|
||
full_tag = f"{{{self.namespace}}}{tag}"
|
||
elem = ET.SubElement(parent, full_tag, attrib or {})
|
||
if text is not None:
|
||
elem.text = str(text)
|
||
return elem
|
||
|
||
def add_titulaire(self, parent: ET.Element, row: List[str]) -> None:
|
||
"""Ajoute les informations du titulaire"""
|
||
titulaire = self.create_element(parent, "titulaire")
|
||
|
||
# Données obligatoires
|
||
self.create_element(titulaire, "nomNaissance", self.clean_string(row[17], 60))
|
||
|
||
# nomUsage (nillable)
|
||
if row[18].strip() and row[18].strip().lower() != 'nil':
|
||
self.create_element(titulaire, "nomUsage", self.clean_string(row[18], 60))
|
||
else:
|
||
self.create_element(titulaire, "nomUsage", attrib={f"{{{self.xsi_namespace}}}nil": "true"})
|
||
|
||
self.create_element(titulaire, "prenom1", self.clean_string(row[19], 60))
|
||
self.create_element(titulaire, "anneeNaissance", self.clean_string(row[20]))
|
||
|
||
# moisNaissance (nillable)
|
||
if row[21].strip() and row[21].strip().lower() != 'nil':
|
||
self.create_element(titulaire, "moisNaissance", self.clean_string(row[21]))
|
||
else:
|
||
self.create_element(titulaire, "moisNaissance", attrib={f"{{{self.xsi_namespace}}}nil": "true"})
|
||
|
||
# jourNaissance (nillable)
|
||
if row[22].strip() and row[22].strip().lower() != 'nil':
|
||
self.create_element(titulaire, "jourNaissance", self.clean_string(row[22]))
|
||
else:
|
||
self.create_element(titulaire, "jourNaissance", attrib={f"{{{self.xsi_namespace}}}nil": "true"})
|
||
|
||
self.create_element(titulaire, "sexe", self.clean_string(row[23]))
|
||
|
||
# Code commune naissance
|
||
code_commune = self.create_element(titulaire, "codeCommuneNaissance")
|
||
code_postal_elem = self.create_element(code_commune, "codePostalNaissance")
|
||
|
||
if row[24].strip() and row[24].strip().lower() != 'nil':
|
||
self.create_element(code_postal_elem, "codePostal", self.clean_string(row[24], 9))
|
||
else:
|
||
self.create_element(code_postal_elem, "codePostal", attrib={f"{{{self.xsi_namespace}}}nil": "true"})
|
||
|
||
def add_passage_certification(self, parent: ET.Element, row: List[str]) -> None:
|
||
"""Ajoute un passage de certification"""
|
||
passage = self.create_element(parent, "passageCertification")
|
||
|
||
# Données obligatoires
|
||
self.create_element(passage, "idTechnique", self.clean_string(row[7], 255))
|
||
self.create_element(passage, "obtentionCertification", row[8].upper())
|
||
self.create_element(passage, "donneeCertifiee", row[9].lower())
|
||
|
||
# Dates
|
||
self.create_element(passage, "dateDebutValidite", self.format_date(row[10]))
|
||
|
||
# dateFinValidite (nillable)
|
||
if row[11].strip().lower() == 'nil':
|
||
self.create_element(passage, "dateFinValidite", attrib={f"{{{self.xsi_namespace}}}nil": "true"})
|
||
else:
|
||
self.create_element(passage, "dateFinValidite", self.format_date(row[11]))
|
||
|
||
# Niveaux européens
|
||
self.create_element(passage, "presenceNiveauLangueEuro", row[12].lower())
|
||
self.create_element(passage, "presenceNiveauNumeriqueEuro", row[13].lower())
|
||
|
||
# Scoring (nillable)
|
||
if row[14].strip().lower() == 'nil':
|
||
self.create_element(passage, "scoring", attrib={f"{{{self.xsi_namespace}}}nil": "true"})
|
||
else:
|
||
self.create_element(passage, "scoring", self.clean_string(row[14], 255))
|
||
|
||
# Mention validée (nillable)
|
||
if row[15].strip().lower() == 'nil':
|
||
self.create_element(passage, "mentionValidee", attrib={f"{{{self.xsi_namespace}}}nil": "true"})
|
||
else:
|
||
self.create_element(passage, "mentionValidee", row[15].upper())
|
||
|
||
# Modalités inscription
|
||
modalites = self.create_element(passage, "modalitesInscription")
|
||
self.create_element(modalites, "modaliteAcces", row[16].upper())
|
||
|
||
# Identification titulaire
|
||
identification = self.create_element(passage, "identificationTitulaire")
|
||
self.add_titulaire(identification, row)
|
||
|
||
def generate_xml(self) -> bool:
|
||
"""Génère le fichier XML à partir du CSV"""
|
||
try:
|
||
logger.info(f"Lecture du fichier CSV: {self.csv_filepath}")
|
||
|
||
# Vérifier l'existence du fichier
|
||
if not self.csv_filepath.exists():
|
||
logger.error(f"Fichier CSV introuvable: {self.csv_filepath}")
|
||
return False
|
||
|
||
# Lire le CSV
|
||
with open(self.csv_filepath, 'r', encoding='utf-8') as f:
|
||
reader = csv.reader(f)
|
||
rows = list(reader)
|
||
|
||
# Valider la structure
|
||
if not self.validate_csv_structure(rows):
|
||
return False
|
||
|
||
logger.info("Création de la structure XML...")
|
||
|
||
# Créer l'élément racine
|
||
ET.register_namespace('cpf', self.namespace)
|
||
ET.register_namespace('xsi', self.xsi_namespace)
|
||
|
||
root = ET.Element(
|
||
f"{{{self.namespace}}}flux",
|
||
attrib={
|
||
f"{{{self.xsi_namespace}}}schemaLocation":
|
||
f"{self.namespace} validation.xsd"
|
||
}
|
||
)
|
||
|
||
# En-tête
|
||
headers = rows[5]
|
||
self.create_element(root, "idFlux", self.clean_string(headers[2], 50))
|
||
self.create_element(root, "horodatage", self.get_current_timestamp())
|
||
|
||
# Émetteur
|
||
emetteur = self.create_element(root, "emetteur")
|
||
self.create_element(emetteur, "idClient", self.clean_string(headers[3], 8))
|
||
|
||
# Certificateurs
|
||
certificateurs = self.create_element(emetteur, "certificateurs")
|
||
certificateur = self.create_element(certificateurs, "certificateur")
|
||
|
||
self.create_element(certificateur, "idClient", self.clean_string(headers[4], 8))
|
||
self.create_element(certificateur, "idContrat", self.clean_string(headers[5], 20))
|
||
|
||
# Certifications
|
||
certifications = self.create_element(certificateur, "certifications")
|
||
certification = self.create_element(certifications, "certification")
|
||
|
||
self.create_element(certification, "type", self.clean_string(headers[6], 255))
|
||
self.create_element(certification, "code", self.clean_string(headers[7], 100))
|
||
|
||
# Passages de certification
|
||
passages = self.create_element(certification, "passageCertifications")
|
||
|
||
# Traiter chaque ligne de données
|
||
data_rows = [row for row in rows[6:] if row and row[0].strip()]
|
||
logger.info(f"Traitement de {len(data_rows)} certifications...")
|
||
|
||
for idx, row in enumerate(data_rows, start=1):
|
||
try:
|
||
logger.info(f" [{idx}/{len(data_rows)}] Ajout: {row[19]} {row[17]}")
|
||
self.add_passage_certification(passages, row)
|
||
except Exception as e:
|
||
logger.error(f"Erreur ligne {idx + 6}: {e}")
|
||
continue
|
||
|
||
# Formater et sauvegarder
|
||
logger.info(f"Écriture du fichier XML: {self.xml_filepath}")
|
||
xml_str = ET.tostring(root, encoding='utf-8')
|
||
|
||
# Indentation avec minidom
|
||
dom = minidom.parseString(xml_str)
|
||
pretty_xml = dom.toprettyxml(indent=" ", encoding='utf-8')
|
||
|
||
# Supprimer les lignes vides
|
||
lines = [line for line in pretty_xml.decode('utf-8').split('\n') if line.strip()]
|
||
|
||
with open(self.xml_filepath, 'w', encoding='utf-8') as f:
|
||
f.write('\n'.join(lines))
|
||
|
||
logger.info("✓ Fichier XML créé avec succès")
|
||
return True
|
||
|
||
except Exception as e:
|
||
logger.error(f"Erreur lors de la génération XML: {e}", exc_info=True)
|
||
return False
|
||
|
||
|
||
def main():
|
||
"""Fonction principale"""
|
||
# Effacer la console
|
||
os.system('cls' if os.name == 'nt' else 'clear')
|
||
|
||
print("=" * 70)
|
||
print("GÉNÉRATEUR XML CPF - Version Professionnelle")
|
||
print("=" * 70)
|
||
print()
|
||
|
||
# Générer le XML
|
||
generator = CPFXMLGenerator(CSV_FILE, XML_FILE)
|
||
|
||
if not generator.generate_xml():
|
||
logger.error("❌ Échec de la génération du fichier XML")
|
||
sys.exit(1)
|
||
|
||
print()
|
||
print("-" * 70)
|
||
print("VALIDATION DU FICHIER XML")
|
||
print("-" * 70)
|
||
|
||
# Valider le XML
|
||
validator = XMLValidator()
|
||
|
||
if Path(XSD_FILE).exists():
|
||
validation_result = validator.validate_xml(XML_FILE, XSD_FILE)
|
||
|
||
if validation_result is True:
|
||
print()
|
||
print("✓" * 35)
|
||
print("SUCCESS: Le fichier XML est conforme au schéma XSD")
|
||
print("✓" * 35)
|
||
elif validation_result is False:
|
||
print()
|
||
print("⚠" * 35)
|
||
print("ATTENTION: Le fichier XML contient des erreurs de validation")
|
||
print("Consultez les logs ci-dessus pour plus de détails")
|
||
print("⚠" * 35)
|
||
else:
|
||
print()
|
||
print("ℹ" * 35)
|
||
print("INFO: Validation XSD non effectuée (module lxml non installé)")
|
||
print("Installez lxml pour activer la validation: pip install lxml")
|
||
print("ℹ" * 35)
|
||
else:
|
||
logger.warning(f"Fichier XSD non trouvé: {XSD_FILE}")
|
||
print()
|
||
print("⚠" * 35)
|
||
print("ATTENTION: Fichier XSD non trouvé - validation impossible")
|
||
print("⚠" * 35)
|
||
|
||
print()
|
||
print(f"Fichier généré: {XML_FILE}")
|
||
print(f"Logs disponibles: xml_generation.log")
|
||
|
||
|
||
if __name__ == "__main__":
|
||
main()
|