import asyncio import smtplib from email.mime.text import MIMEText from email.mime.multipart import MIMEMultipart from email.mime.application import MIMEApplication from typing import Dict, List, Optional from datetime import datetime from app.core.config import settings from app.core.logging import migration_logger class EmailSender: """Класс для отправки email уведомлений""" def __init__(self): self.smtp_server = settings.EMAIL_HOST self.smtp_port = settings.EMAIL_PORT self.username = settings.EMAIL_USER self.password = settings.EMAIL_PASSWORD self.from_addr = settings.EMAIL_FROM self.to_addrs = settings.EMAIL_TO def send_email(self, subject: str, body: str, attachments: Optional[List[str]] = None) -> bool: """Отправка email с вложениями""" if not all([self.smtp_server, self.username, self.password, self.from_addr]): migration_logger.warning("Настройки email не заполнены. Отправка email пропущена.") return False try: # Создаем сообщение msg = MIMEMultipart() msg['From'] = self.from_addr msg['To'] = ', '.join(self.to_addrs) msg['Subject'] = subject msg['Date'] = datetime.now().strftime('%a, %d %b %Y %H:%M:%S %z') # Добавляем текст msg.attach(MIMEText(body, 'plain', 'utf-8')) # Добавляем вложения if attachments: for file_path in attachments: try: with open(file_path, 'rb') as f: part = MIMEApplication(f.read(), Name=file_path.split('/')[-1]) part['Content-Disposition'] = f'attachment; filename="{file_path.split("/")[-1]}"' msg.attach(part) except Exception as e: migration_logger.error(f"Ошибка при добавлении вложения {file_path}: {e}") # Отправляем with smtplib.SMTP_SSL(self.smtp_server, self.smtp_port) as server: server.login(self.username, self.password) server.send_message(msg) migration_logger.info(f"Email успешно отправлен на {', '.join(self.to_addrs)}") return True except Exception as e: migration_logger.error(f"Ошибка при отправке email: {e}") return False def send_error_notification(self, error_message: str, traceback_str: str = None, table_name: str = None): """Отправить уведомление об ошибке""" subject = f"ОШИБКА МИГРАЦИИ - {datetime.now().strftime('%Y-%m-%d %H:%M')}" body = f""" 🚨 ОШИБКА В ПРОЦЕССЕ МИГРАЦИИ ДАННЫХ {'='*60} Время: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')} Сервис: Migration Service """ if table_name: body += f"Таблица: {table_name}\n" body += f""" Ошибка: {error_message} """ if traceback_str: body += f""" 📎 Детали: {traceback_str} """ body += """ 🔧 Необходимо проверить логи и устранить проблему. """ return self.send_email(subject, body) def send_success_notification(self, stats: dict, duration: float): """Отправить уведомление об успешной миграции""" subject = f"УСПЕШНАЯ МИГРАЦИЯ - {datetime.now().strftime('%Y-%m-%d %H:%M')}" body = f""" МИГРАЦИЯ ДАННЫХ УСПЕШНО ЗАВЕРШЕНА {'='*60} Время: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')} Длительность: {duration:.1f} сек СТАТИСТИКА: {'='*40} Всего строк в БД: {stats.get('total_rows', 0)} Таблиц обработано: {stats.get('total_tables', 0)} Последняя репликация: {stats.get('last_replication', 'Нет данных')} Миграция выполнена успешно! """ return self.send_email(subject, body) def send_start_notification(self, tables: List[str], full_reload: bool): """Отправить уведомление о начале миграции""" subject = f"НАЧАЛО МИГРАЦИИ - {datetime.now().strftime('%Y-%m-%d %H:%M')}" body = f""" НАЧАЛО МИГРАЦИИ ДАННЫХ {'='*60} Время старта: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')} Режим: {'ПОЛНАЯ ПЕРЕЗАГРУЗКА' if full_reload else 'ИНКРЕМЕНТАЛЬНАЯ'} Таблиц для обработки: {len(tables)} СПИСОК ТАБЛИЦ: {chr(10).join([f' • {t}' for t in tables])} Миграция запущена... """ return self.send_email(subject, body) async def send_migration_summary_email(self, email_data: Dict) -> bool: """ Отправить сводный email о результатах миграции. Args: email_data: Словарь с данными: - batch_id: ID батча - total: Всего таблиц - completed: Завершено - successful: Успешно - failed: Ошибок - results: Список результатов по таблицам - timestamp: Время отчёта Returns: bool: True если email отправлен успешно """ try: # Формируем HTML-отчёт html_content = self._create_email_html(email_data) # Создаём письмо msg = MIMEMultipart('alternative') msg['Subject'] = f"Отчёт о миграции: {email_data.get('successful', 0)} успешно, {email_data.get('failed', 0)} ошибок" msg['From'] = self.from_addr msg['To'] = ', '.join(self.to_addrs) # Текстовая версия text_content = self._create_email_text(email_data) msg.attach(MIMEText(text_content, 'plain', 'utf-8')) # HTML версия msg.attach(MIMEText(html_content, 'html', 'utf-8')) # Отправляем await asyncio.to_thread( self._send_email_sync, smtp_server=self.smtp_server, smtp_port=self.smtp_port, login=self.username, password=self.password, from_addr=self.from_addr, to_addrs=self.to_addrs, message=msg.as_string() ) migration_logger.info(f"Email отправлен: {len(self.to_addrs)} получателей") return True except Exception as e: migration_logger.error(f"Ошибка отправки email: {e}") return False def _create_email_html(self, data: Dict) -> str: """Создать HTML-версию письма""" batch_id = data.get('batch_id', 'N/A') total = data.get('total', 0) successful = data.get('successful', 0) failed = data.get('failed', 0) timestamp = data.get('timestamp', 'N/A') results = data.get('results', []) # Цвет статуса if failed == 0: status_color = "#28a745" status_text = "✅ Все успешно" elif failed < total: status_color = "#ffc107" status_text = "⚠️ Частичный успех" else: status_color = "#dc3545" status_text = "❌ Все плохо" # Таблица результатов rows = "" for r in results: table_name = r.get('table', 'Unknown') success = r.get('success', False) error = r.get('error', '') if success: row_color = "#d4edda" status_icon = "✅" else: row_color = "#f8d7da" status_icon = "❌" rows += f""" {table_name} {"Успешно" if success else "Ошибка"} {error[:50] if error else "-"} """ html = f"""

Репликация данных

Batch ID: {batch_id}

Время: {timestamp}

{status_text}

Детали по таблицам

{rows}
Таблица Результат Ошибка
""" return html def _create_email_text(self, data: Dict) -> str: """Создать текстовую версию письма""" batch_id = data.get('batch_id', 'N/A') total = data.get('total', 0) successful = data.get('successful', 0) failed = data.get('failed', 0) timestamp = data.get('timestamp', 'N/A') results = data.get('results', []) text = f""" ОТЧЁТ О МИГРАЦИИ ДАННЫХ ======================= Batch ID: {batch_id} Время: {timestamp} СТАТИСТИКА: - Всего таблиц: {total} - Успешно: {successful} - Ошибок: {failed} ДЕТАЛИ ПО ТАБЛИЦАМ: """ for r in results: table_name = r.get('table', 'Unknown') success = r.get('success', False) error = r.get('error', '') status = "✅ Успешно" if success else f"❌ Ошибка: {error}" text += f"\n • {table_name}: {status}" text += """ ======================= Автоматическое сообщение от системы репликации Не отвечайте на это письмо """ return text def _send_email_sync( self, smtp_server: str, smtp_port: int, login: str, password: str, from_addr: str, to_addrs: List[str], message: str ): """Синхронная отправка email (запускается в потоке)""" with smtplib.SMTP_SSL(smtp_server, smtp_port) as server: server.login(login, password) server.sendmail(from_addr, to_addrs, message) # server = smtplib.SMTP_SSL(smtp_server, smtp_port) # try: # server.starttls() # server.login(login, password) # server.sendmail(from_addr, to_addrs, message) # finally: # server.quit() email_sender = EmailSender()