From 6c812944ea19559f7878aea212ff36c9baf68042 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=D0=90=D0=BD=D0=B4=D1=80=D0=B5=D0=B9=20=D0=91=D1=80=D1=83?= =?UTF-8?q?=D1=81=D0=BD=D0=B8=D1=86=D1=8B=D0=BD?= Date: Mon, 10 Nov 2025 19:24:11 +0900 Subject: [PATCH] =?UTF-8?q?=D0=94=D0=BE=D0=B1=D0=B0=D0=B2=D0=B8=D1=82?= =?UTF-8?q?=D1=8C=20main.py?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- main.py | 412 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 412 insertions(+) create mode 100644 main.py diff --git a/main.py b/main.py new file mode 100644 index 0000000..0c8b488 --- /dev/null +++ b/main.py @@ -0,0 +1,412 @@ +import uuid +import hashlib +import base64 +import logging +import random +from sqlalchemy import create_engine, Column, Integer, String, MetaData, Table +from sqlalchemy.orm import sessionmaker +from sqlalchemy.ext.declarative import declarative_base + +# Настройка логирования +logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') + +# Настройка подключения к БД +DATABASE_URL = "mssql://mis_aokb_desktop_srv:HLqKhOMwPRjNAVvLIKtc@10.48.89.4:1433/amu_mis_AOKB_prod?" +DRIVER_URL = "driver=ODBC+Driver+18+for+SQL+Server&TrustServerCertificate=yes" + +# Создаем движок +engine = create_engine( + DATABASE_URL, + DRIVER_URL +) +SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine) + +# Базовый класс для моделей +Base = declarative_base() + +# Модель пользователя +class XUser(Base): + __tablename__ = 'x_User' + + UserID = Column(Integer, primary_key=True) + GeneralPassword = Column(String(255)) + GUID = Column(String(36), nullable=True) + +# Альтернативный способ через Table (если не хотите использовать ORM) +# metadata = MetaData() +# x_user_table = Table( +# 'x_User', metadata, +# Column('UserID', Integer, primary_key=True), +# Column('GeneralLogin', String(255)), +# Column('GeneralPassword', String(255)), +# Column('AuthMode', bool(0)), +# Column('FIO', String(255)), +# Column('GUID', String(36)) +# ) + +def get_db(): + """Получение сессии БД""" + db = SessionLocal() + try: + yield db + finally: + db.close() + +def generate_clear_password(length: int = 12) -> str: + """ + Генерирует случайный пароль без неоднозначных символов + + Исключает: + - Символы, которые можно спутать: 1, l, I, 0, O, o + - Специальные символы + + Args: + length: длина пароля + + Returns: + Случайный пароль + """ + # Безопасные символы: только цифры и буквы (без неоднозначных) + safe_digits = '23456789' # без 0,1 + safe_uppercase = 'ABCDEFGHJKLMNPQRSTUVWXYZ' # без I, O + safe_lowercase = 'abcdefghjkmnpqrstuvwxyz' # без l, o + + all_chars = safe_digits + safe_uppercase + safe_lowercase + + # Гарантируем, что пароль содержит хотя бы по одному символу из каждой категории + password_chars = [ + random.choice(safe_digits), + random.choice(safe_uppercase), + random.choice(safe_lowercase) + ] + + # Заполняем оставшуюся длину случайными символами из всех категорий + for _ in range(length - 3): + password_chars.append(random.choice(all_chars)) + + # Перемешиваем символы + random.shuffle(password_chars) + + return ''.join(password_chars) + +def compute_hash(password: str, guid: str) -> str: + """ + Вычисляет хеш пароля по алгоритму из PHP метода + + Args: + password: пароль + guid: GUID в строковом формате + + Returns: + base64-encoded хеш + """ + try: + # Проверяем, что GUID валидный + uuid_obj = uuid.UUID(guid) + except ValueError: + logging.warning('Invalid GUID format provided', extra={'guid': guid}) + raise ValueError('The provided GUID is not valid') + + text = guid.upper() + bytes_data = (text + password + text).encode('utf-8') + + # Хеширование SHA1 + hash_bytes = hashlib.sha1(bytes_data).digest() + + # 4-1 итерации повторного хеширования (3 дополнительные итерации) + for i in range(3): + hash_bytes = hashlib.sha1(hash_bytes).digest() + + return base64.b64encode(hash_bytes).decode('utf-8') + +def update_passwords_with_random(user_data_list: list, password_length: int = 12, + output_file: str = 'updated_passwords_with_random.txt') -> tuple: + """ + Обновляет GeneralPassword для списка пользователей с генерацией случайных паролей + + Args: + user_data_list: список кортежей (UserID, GeneralLogin, GUID) + password_length: длина генерируемого пароля + output_file: имя файла для сохранения результатов + + Returns: + tuple: (количество обновленных пользователей, список результатов) + """ + results = [] + updated_count = 0 + + # Создаем сессию БД + db = SessionLocal() + + try: + with open(output_file, 'w', encoding='utf-8') as f: + # Записываем заголовок + f.write("UserID\tGeneralLogin\tGUID\tNewPassword\tPasswordHash\tStatus\n") + + for user_id, login, old_guid in user_data_list: + try: + # Получаем пользователя из базы данных через ORM + user = db.query(XUser).filter(XUser.UserID == user_id).first() + + if not user: + error_msg = f"User with ID {user_id} not found" + result_line = f"{user_id}\t{login}\t{old_guid}\t\t\tERROR: User not found" + f.write(result_line + "\n") + results.append({ + 'user_id': user_id, + 'login': login, + 'guid': old_guid, + 'password': '', + 'hash': '', + 'status': 'ERROR: User not found' + }) + logging.error(error_msg) + continue + + # Генерируем случайный пароль + new_password = generate_clear_password(password_length) + + # Вычисляем хеш для нового пароля + new_hash = compute_hash(new_password, old_guid) + + # Обновляем пароль пользователя + user.GeneralPassword = new_hash + + # Записываем результат + result_line = f"{user_id}\t{login}\t{old_guid}\t{new_password}\t{new_hash}\tSUCCESS" + f.write(result_line + "\n") + results.append({ + 'user_id': user_id, + 'login': login, + 'guid': old_guid, + 'password': new_password, + 'hash': new_hash, + 'status': 'SUCCESS' + }) + updated_count += 1 + + logging.info(f'Password updated for user ID: {user_id}, Login: {login}') + + except Exception as e: + error_msg = f"Error updating user {user_id}: {str(e)}" + result_line = f"{user_id}\t{login}\t{old_guid}\t\t\tERROR: {str(e)}" + f.write(result_line + "\n") + results.append({ + 'user_id': user_id, + 'login': login, + 'guid': old_guid, + 'password': '', + 'hash': '', + 'status': f'ERROR: {str(e)}' + }) + logging.error(error_msg) + + # Коммитим все изменения в БД + db.commit() + + logging.info(f'Successfully updated passwords for {updated_count} users') + logging.info(f'Results saved to {output_file}') + + return updated_count, results + + except Exception as e: + # Откатываем изменения в случае ошибки + db.rollback() + logging.error(f'Error processing users: {str(e)}') + raise + finally: + # Закрываем сессию + db.close() + +# Альтернативная версия с использованием прямого SQL (без ORM) +# def update_passwords_direct_sql(user_data_list: list, password_length: int = 12, +# output_file: str = 'updated_passwords_direct.txt') -> tuple: +# """ +# Обновляет пароли используя прямое SQL выполнение +# """ +# results = [] +# updated_count = 0 + +# # Создаем сессию БД +# db = SessionLocal() + +# try: +# with open(output_file, 'w', encoding='utf-8') as f: +# f.write("UserID\tGeneralLogin\tGUID\tNewPassword\tPasswordHash\tStatus\n") + +# for user_id, login, old_guid in user_data_list: +# try: +# # Генерируем случайный пароль +# new_password = generate_clear_password(password_length) + +# # Вычисляем хеш +# new_hash = compute_hash(new_password, old_guid) + +# # Прямой SQL запрос для обновления +# stmt = x_user_table.update().where( +# x_user_table.c.UserID == user_id +# ).values( +# GeneralPassword=new_hash +# ) + +# # Выполняем запрос +# result = db.execute(stmt) + +# if result.rowcount == 0: +# # Пользователь не найден +# result_line = f"{user_id}\t{login}\t{old_guid}\t\t\tERROR: User not found" +# results.append({ +# 'user_id': user_id, 'login': login, 'guid': old_guid, +# 'password': '', 'hash': '', 'status': 'ERROR: User not found' +# }) +# else: +# # Успешное обновление +# result_line = f"{user_id}\t{login}\t{old_guid}\t{new_password}\t{new_hash}\tSUCCESS" +# results.append({ +# 'user_id': user_id, 'login': login, 'guid': old_guid, +# 'password': new_password, 'hash': new_hash, 'status': 'SUCCESS' +# }) +# updated_count += 1 + +# f.write(result_line + "\n") +# logging.info(f'Processed user ID: {user_id}, Login: {login}') + +# except Exception as e: +# error_msg = f"Error updating user {user_id}: {str(e)}" +# result_line = f"{user_id}\t{login}\t{old_guid}\t\t\tERROR: {str(e)}" +# f.write(result_line + "\n") +# results.append({ +# 'user_id': user_id, 'login': login, 'guid': old_guid, +# 'password': '', 'hash': '', 'status': f'ERROR: {str(e)}' +# }) +# logging.error(error_msg) + +# # Коммитим изменения +# db.commit() + +# logging.info(f'Successfully updated passwords for {updated_count} users') +# return updated_count, results + +# except Exception as e: +# db.rollback() +# logging.error(f'Error processing users: {str(e)}') +# raise +# finally: +# db.close() + +def create_user_list_from_data() -> list: + """ + Создает список пользователей из предоставленных данных + """ + user_data = [ + (2679, 'АбулдиновАС', '83025555-FFB0-4AAD-82B7-BA990198749B'), + ] + + return user_data + +def test_database_connection(): + """Тестирует подключение к БД""" + try: + db = SessionLocal() + # Простой запрос для проверки подключения + result = db.execute("SELECT 1") + db.close() + print("✓ Подключение к БД успешно") + return True + except Exception as e: + print(f"✗ Ошибка подключения к БД: {e}") + return False + +def test_password_generation(): + """Тестирует генерацию паролей""" + print("Тест генерации паролей:") + for i in range(5): + password = generate_clear_password(12) + print(f" Пароль {i+1}: {password}") + +def main(): + """Основная функция для выполнения обновления паролей""" + print("Начало обновления паролей...") + + # Тестируем подключение к БД + if not test_database_connection(): + return + + # Тестируем генерацию паролей + test_password_generation() + + # Получаем список пользователей + user_list = create_user_list_from_data() + print(f"Найдено пользователей для обработки: {len(user_list)}") + + # Выберите метод обновления: + # 1. Через ORM (рекомендуется) + print("\nИспользование ORM метода...") + try: + updated_count, results = update_passwords_with_random( + user_list, + password_length=12, + output_file='updated_passwords_orm.txt' + ) + + print(f"Результаты (ORM):") + print(f" Успешно обновлено: {updated_count} пользователей") + print(f" Ошибки: {len(user_list) - updated_count} пользователей") + + except Exception as e: + print(f"Ошибка при обновлении паролей (ORM): {e}") + + # 2. Через прямое SQL (альтернатива) + # print("\nИспользование прямого SQL метода...") + # try: + # updated_count, results = update_passwords_direct_sql( + # user_list, + # password_length=12, + # output_file='updated_passwords_sql.txt' + # ) + + # print(f"Результаты (SQL):") + # print(f" Успешно обновлено: {updated_count} пользователей") + # print(f" Ошибки: {len(user_list) - updated_count} пользователей") + + # except Exception as e: + # print(f"Ошибка при обновлении паролей (SQL): {e}") + + # Создаем удобный отчет + if 'results' in locals(): + create_password_report(results) + +def create_password_report(results: list, filename: str = 'password_report.txt'): + """Создает удобный отчет с паролями""" + with open(filename, 'w', encoding='utf-8') as f: + f.write("ОТЧЕТ ОБ ОБНОВЛЕНИИ ПАРОЛЕЙ\n") + f.write("=" * 50 + "\n\n") + + success_results = [r for r in results if r['status'] == 'SUCCESS'] + error_results = [r for r in results if r['status'] != 'SUCCESS'] + + f.write(f"Всего пользователей: {len(results)}\n") + f.write(f"Успешно обновлено: {len(success_results)}\n") + f.write(f"Ошибок: {len(error_results)}\n\n") + + f.write("СПИСОК ПАРОЛЕЙ:\n") + f.write("-" * 50 + "\n") + + for result in success_results: + f.write(f"Логин: {result['login']}\n") + f.write(f"UserID: {result['user_id']}\n") + f.write(f"Пароль: {result['password']}\n") + f.write(f"GUID: {result['guid']}\n") + f.write(f"Хеш: {result['hash']}\n") + f.write("-" * 30 + "\n") + + if error_results: + f.write("\nОШИБКИ:\n") + f.write("-" * 30 + "\n") + for error in error_results: + f.write(f"UserID: {error['user_id']}, Логин: {error['login']}\n") + f.write(f"Ошибка: {error['status']}\n") + f.write("-" * 20 + "\n") + +if __name__ == "__main__": + main() \ No newline at end of file