Accrochage/convert2xml.py

370 lines
14 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
Générateur XML pour certifications CPF - Version Optimisée
Génère un fichier XML conforme au schéma urn:cdc:cpf:pc5:schema:1.0.0
"""
import xml.etree.ElementTree as ET
from xml.dom import minidom
import csv
import logging
import datetime
import pytz
import os
from pathlib import Path
from typing import Optional, List, Dict
import sys
# Configuration
CSV_FILE = "Data.csv"
XML_FILE = "file.xml"
XSD_FILE = "validation.xsd"
TZ_PARIS = pytz.timezone('Europe/Paris')
# Configuration du logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler('xml_generation.log', encoding='utf-8'),
logging.StreamHandler()
]
)
logger = logging.getLogger(__name__)
class XMLValidator:
"""Classe pour valider le XML généré contre le XSD"""
@staticmethod
def validate_xml(xml_file: str, xsd_file: str) -> bool:
"""Valide un fichier XML contre un schéma XSD"""
try:
from lxml import etree
# Charger le schéma XSD
with open(xsd_file, 'rb') as f:
schema_root = etree.XML(f.read())
schema = etree.XMLSchema(schema_root)
# Charger le fichier XML
with open(xml_file, 'rb') as f:
xml_doc = etree.parse(f)
# Valider
is_valid = schema.validate(xml_doc)
if not is_valid:
logger.error("Erreurs de validation XSD:")
for error in schema.error_log:
logger.error(f" Ligne {error.line}: {error.message}")
return False
logger.info("✓ Le fichier XML est valide selon le schéma XSD")
return True
except ImportError:
logger.warning("Module lxml non disponible. Installation recommandée: pip install lxml")
return None
except Exception as e:
logger.error(f"Erreur lors de la validation: {e}")
return False
class CPFXMLGenerator:
"""Générateur de fichiers XML pour les certifications CPF"""
def __init__(self, csv_filepath: str, xml_filepath: str):
self.csv_filepath = Path(csv_filepath)
self.xml_filepath = Path(xml_filepath)
self.namespace = "urn:cdc:cpf:pc5:schema:1.0.0"
self.xsi_namespace = "http://www.w3.org/2001/XMLSchema-instance"
def validate_csv_structure(self, rows: List[List[str]]) -> bool:
"""Valide la structure du fichier CSV"""
if len(rows) < 6:
logger.error("Le fichier CSV doit contenir au moins 6 lignes")
return False
# Vérifier que la ligne d'en-tête existe
if not rows[5] or len(rows[5]) < 25:
logger.error("La ligne d'en-tête (ligne 6) est incomplète")
return False
return True
def clean_string(self, value: str, max_length: Optional[int] = None) -> str:
"""Nettoie et tronque une chaîne si nécessaire"""
cleaned = value.strip()
if max_length and len(cleaned) > max_length:
logger.warning(f"Valeur tronquée: '{cleaned}' -> '{cleaned[:max_length]}'")
cleaned = cleaned[:max_length]
return cleaned
def format_date(self, date_str: str) -> str:
"""
Convertit une date du format DD/MM/YYYY vers YYYY-MM-DD
Gère aussi les dates déjà au bon format
"""
date_str = date_str.strip()
# Si déjà au format ISO (YYYY-MM-DD)
if len(date_str) == 10 and date_str[4] == '-' and date_str[7] == '-':
return date_str
# Format DD/MM/YYYY
if '/' in date_str:
parts = date_str.split('/')
if len(parts) == 3 and len(parts[2]) == 4:
return f"{parts[2]}-{parts[1].zfill(2)}-{parts[0].zfill(2)}"
logger.warning(f"Format de date non reconnu: {date_str}")
return date_str
def get_current_timestamp(self) -> str:
"""Génère un horodatage au format requis par le XSD"""
now = datetime.datetime.now(TZ_PARIS)
return now.strftime("%Y-%m-%dT%H:%M:%S+01:00")
def create_element(self, parent: ET.Element, tag: str, text: Optional[str] = None,
attrib: Optional[Dict] = None) -> ET.Element:
"""Crée un élément XML avec le namespace cpf"""
full_tag = f"{{{self.namespace}}}{tag}"
elem = ET.SubElement(parent, full_tag, attrib or {})
if text is not None:
elem.text = str(text)
return elem
def add_titulaire(self, parent: ET.Element, row: List[str]) -> None:
"""Ajoute les informations du titulaire"""
titulaire = self.create_element(parent, "titulaire")
# Données obligatoires
self.create_element(titulaire, "nomNaissance", self.clean_string(row[17], 60))
# nomUsage (nillable)
if row[18].strip() and row[18].strip().lower() != 'nil':
self.create_element(titulaire, "nomUsage", self.clean_string(row[18], 60))
else:
self.create_element(titulaire, "nomUsage", attrib={f"{{{self.xsi_namespace}}}nil": "true"})
self.create_element(titulaire, "prenom1", self.clean_string(row[19], 60))
self.create_element(titulaire, "anneeNaissance", self.clean_string(row[20]))
# moisNaissance (nillable)
if row[21].strip() and row[21].strip().lower() != 'nil':
self.create_element(titulaire, "moisNaissance", self.clean_string(row[21]))
else:
self.create_element(titulaire, "moisNaissance", attrib={f"{{{self.xsi_namespace}}}nil": "true"})
# jourNaissance (nillable)
if row[22].strip() and row[22].strip().lower() != 'nil':
self.create_element(titulaire, "jourNaissance", self.clean_string(row[22]))
else:
self.create_element(titulaire, "jourNaissance", attrib={f"{{{self.xsi_namespace}}}nil": "true"})
self.create_element(titulaire, "sexe", self.clean_string(row[23]))
# Code commune naissance
code_commune = self.create_element(titulaire, "codeCommuneNaissance")
code_postal_elem = self.create_element(code_commune, "codePostalNaissance")
if row[24].strip() and row[24].strip().lower() != 'nil':
self.create_element(code_postal_elem, "codePostal", self.clean_string(row[24], 9))
else:
self.create_element(code_postal_elem, "codePostal", attrib={f"{{{self.xsi_namespace}}}nil": "true"})
def add_passage_certification(self, parent: ET.Element, row: List[str]) -> None:
"""Ajoute un passage de certification"""
passage = self.create_element(parent, "passageCertification")
# Données obligatoires
self.create_element(passage, "idTechnique", self.clean_string(row[7], 255))
self.create_element(passage, "obtentionCertification", row[8].upper())
self.create_element(passage, "donneeCertifiee", row[9].lower())
# Dates
self.create_element(passage, "dateDebutValidite", self.format_date(row[10]))
# dateFinValidite (nillable)
if row[11].strip().lower() == 'nil':
self.create_element(passage, "dateFinValidite", attrib={f"{{{self.xsi_namespace}}}nil": "true"})
else:
self.create_element(passage, "dateFinValidite", self.format_date(row[11]))
# Niveaux européens
self.create_element(passage, "presenceNiveauLangueEuro", row[12].lower())
self.create_element(passage, "presenceNiveauNumeriqueEuro", row[13].lower())
# Scoring (nillable)
if row[14].strip().lower() == 'nil':
self.create_element(passage, "scoring", attrib={f"{{{self.xsi_namespace}}}nil": "true"})
else:
self.create_element(passage, "scoring", self.clean_string(row[14], 255))
# Mention validée (nillable)
if row[15].strip().lower() == 'nil':
self.create_element(passage, "mentionValidee", attrib={f"{{{self.xsi_namespace}}}nil": "true"})
else:
self.create_element(passage, "mentionValidee", row[15].upper())
# Modalités inscription
modalites = self.create_element(passage, "modalitesInscription")
self.create_element(modalites, "modaliteAcces", row[16].upper())
# Identification titulaire
identification = self.create_element(passage, "identificationTitulaire")
self.add_titulaire(identification, row)
def generate_xml(self) -> bool:
"""Génère le fichier XML à partir du CSV"""
try:
logger.info(f"Lecture du fichier CSV: {self.csv_filepath}")
# Vérifier l'existence du fichier
if not self.csv_filepath.exists():
logger.error(f"Fichier CSV introuvable: {self.csv_filepath}")
return False
# Lire le CSV
with open(self.csv_filepath, 'r', encoding='utf-8') as f:
reader = csv.reader(f)
rows = list(reader)
# Valider la structure
if not self.validate_csv_structure(rows):
return False
logger.info("Création de la structure XML...")
# Créer l'élément racine
ET.register_namespace('cpf', self.namespace)
ET.register_namespace('xsi', self.xsi_namespace)
root = ET.Element(
f"{{{self.namespace}}}flux",
attrib={
f"{{{self.xsi_namespace}}}schemaLocation":
f"{self.namespace} validation.xsd"
}
)
# En-tête
headers = rows[5]
self.create_element(root, "idFlux", self.clean_string(headers[2], 50))
self.create_element(root, "horodatage", self.get_current_timestamp())
# Émetteur
emetteur = self.create_element(root, "emetteur")
self.create_element(emetteur, "idClient", self.clean_string(headers[3], 8))
# Certificateurs
certificateurs = self.create_element(emetteur, "certificateurs")
certificateur = self.create_element(certificateurs, "certificateur")
self.create_element(certificateur, "idClient", self.clean_string(headers[4], 8))
self.create_element(certificateur, "idContrat", self.clean_string(headers[5], 20))
# Certifications
certifications = self.create_element(certificateur, "certifications")
certification = self.create_element(certifications, "certification")
self.create_element(certification, "type", self.clean_string(headers[6], 255))
self.create_element(certification, "code", self.clean_string(headers[7], 100))
# Passages de certification
passages = self.create_element(certification, "passageCertifications")
# Traiter chaque ligne de données
data_rows = [row for row in rows[6:] if row and row[0].strip()]
logger.info(f"Traitement de {len(data_rows)} certifications...")
for idx, row in enumerate(data_rows, start=1):
try:
logger.info(f" [{idx}/{len(data_rows)}] Ajout: {row[19]} {row[17]}")
self.add_passage_certification(passages, row)
except Exception as e:
logger.error(f"Erreur ligne {idx + 6}: {e}")
continue
# Formater et sauvegarder
logger.info(f"Écriture du fichier XML: {self.xml_filepath}")
xml_str = ET.tostring(root, encoding='utf-8')
# Indentation avec minidom
dom = minidom.parseString(xml_str)
pretty_xml = dom.toprettyxml(indent=" ", encoding='utf-8')
# Supprimer les lignes vides
lines = [line for line in pretty_xml.decode('utf-8').split('\n') if line.strip()]
with open(self.xml_filepath, 'w', encoding='utf-8') as f:
f.write('\n'.join(lines))
logger.info("✓ Fichier XML créé avec succès")
return True
except Exception as e:
logger.error(f"Erreur lors de la génération XML: {e}", exc_info=True)
return False
def main():
"""Fonction principale"""
# Effacer la console
os.system('cls' if os.name == 'nt' else 'clear')
print("=" * 70)
print("GÉNÉRATEUR XML CPF - Version Professionnelle")
print("=" * 70)
print()
# Générer le XML
generator = CPFXMLGenerator(CSV_FILE, XML_FILE)
if not generator.generate_xml():
logger.error("❌ Échec de la génération du fichier XML")
sys.exit(1)
print()
print("-" * 70)
print("VALIDATION DU FICHIER XML")
print("-" * 70)
# Valider le XML
validator = XMLValidator()
if Path(XSD_FILE).exists():
validation_result = validator.validate_xml(XML_FILE, XSD_FILE)
if validation_result is True:
print()
print("" * 35)
print("SUCCESS: Le fichier XML est conforme au schéma XSD")
print("" * 35)
elif validation_result is False:
print()
print("" * 35)
print("ATTENTION: Le fichier XML contient des erreurs de validation")
print("Consultez les logs ci-dessus pour plus de détails")
print("" * 35)
else:
print()
print("" * 35)
print("INFO: Validation XSD non effectuée (module lxml non installé)")
print("Installez lxml pour activer la validation: pip install lxml")
print("" * 35)
else:
logger.warning(f"Fichier XSD non trouvé: {XSD_FILE}")
print()
print("" * 35)
print("ATTENTION: Fichier XSD non trouvé - validation impossible")
print("" * 35)
print()
print(f"Fichier généré: {XML_FILE}")
print(f"Logs disponibles: xml_generation.log")
if __name__ == "__main__":
main()