Добавить main.py

This commit is contained in:
2025-11-10 19:24:11 +09:00
commit 6c812944ea

412
main.py Normal file
View File

@@ -0,0 +1,412 @@
import uuid
import hashlib
import base64
import logging
import random
from sqlalchemy import create_engine, Column, Integer, String, MetaData, Table
from sqlalchemy.orm import sessionmaker
from sqlalchemy.ext.declarative import declarative_base
# Настройка логирования
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
# Настройка подключения к БД
DATABASE_URL = "mssql://mis_aokb_desktop_srv:HLqKhOMwPRjNAVvLIKtc@10.48.89.4:1433/amu_mis_AOKB_prod?"
DRIVER_URL = "driver=ODBC+Driver+18+for+SQL+Server&TrustServerCertificate=yes"
# Создаем движок
engine = create_engine(
DATABASE_URL,
DRIVER_URL
)
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
# Базовый класс для моделей
Base = declarative_base()
# Модель пользователя
class XUser(Base):
__tablename__ = 'x_User'
UserID = Column(Integer, primary_key=True)
GeneralPassword = Column(String(255))
GUID = Column(String(36), nullable=True)
# Альтернативный способ через Table (если не хотите использовать ORM)
# metadata = MetaData()
# x_user_table = Table(
# 'x_User', metadata,
# Column('UserID', Integer, primary_key=True),
# Column('GeneralLogin', String(255)),
# Column('GeneralPassword', String(255)),
# Column('AuthMode', bool(0)),
# Column('FIO', String(255)),
# Column('GUID', String(36))
# )
def get_db():
"""Получение сессии БД"""
db = SessionLocal()
try:
yield db
finally:
db.close()
def generate_clear_password(length: int = 12) -> str:
"""
Генерирует случайный пароль без неоднозначных символов
Исключает:
- Символы, которые можно спутать: 1, l, I, 0, O, o
- Специальные символы
Args:
length: длина пароля
Returns:
Случайный пароль
"""
# Безопасные символы: только цифры и буквы (без неоднозначных)
safe_digits = '23456789' # без 0,1
safe_uppercase = 'ABCDEFGHJKLMNPQRSTUVWXYZ' # без I, O
safe_lowercase = 'abcdefghjkmnpqrstuvwxyz' # без l, o
all_chars = safe_digits + safe_uppercase + safe_lowercase
# Гарантируем, что пароль содержит хотя бы по одному символу из каждой категории
password_chars = [
random.choice(safe_digits),
random.choice(safe_uppercase),
random.choice(safe_lowercase)
]
# Заполняем оставшуюся длину случайными символами из всех категорий
for _ in range(length - 3):
password_chars.append(random.choice(all_chars))
# Перемешиваем символы
random.shuffle(password_chars)
return ''.join(password_chars)
def compute_hash(password: str, guid: str) -> str:
"""
Вычисляет хеш пароля по алгоритму из PHP метода
Args:
password: пароль
guid: GUID в строковом формате
Returns:
base64-encoded хеш
"""
try:
# Проверяем, что GUID валидный
uuid_obj = uuid.UUID(guid)
except ValueError:
logging.warning('Invalid GUID format provided', extra={'guid': guid})
raise ValueError('The provided GUID is not valid')
text = guid.upper()
bytes_data = (text + password + text).encode('utf-8')
# Хеширование SHA1
hash_bytes = hashlib.sha1(bytes_data).digest()
# 4-1 итерации повторного хеширования (3 дополнительные итерации)
for i in range(3):
hash_bytes = hashlib.sha1(hash_bytes).digest()
return base64.b64encode(hash_bytes).decode('utf-8')
def update_passwords_with_random(user_data_list: list, password_length: int = 12,
output_file: str = 'updated_passwords_with_random.txt') -> tuple:
"""
Обновляет GeneralPassword для списка пользователей с генерацией случайных паролей
Args:
user_data_list: список кортежей (UserID, GeneralLogin, GUID)
password_length: длина генерируемого пароля
output_file: имя файла для сохранения результатов
Returns:
tuple: (количество обновленных пользователей, список результатов)
"""
results = []
updated_count = 0
# Создаем сессию БД
db = SessionLocal()
try:
with open(output_file, 'w', encoding='utf-8') as f:
# Записываем заголовок
f.write("UserID\tGeneralLogin\tGUID\tNewPassword\tPasswordHash\tStatus\n")
for user_id, login, old_guid in user_data_list:
try:
# Получаем пользователя из базы данных через ORM
user = db.query(XUser).filter(XUser.UserID == user_id).first()
if not user:
error_msg = f"User with ID {user_id} not found"
result_line = f"{user_id}\t{login}\t{old_guid}\t\t\tERROR: User not found"
f.write(result_line + "\n")
results.append({
'user_id': user_id,
'login': login,
'guid': old_guid,
'password': '',
'hash': '',
'status': 'ERROR: User not found'
})
logging.error(error_msg)
continue
# Генерируем случайный пароль
new_password = generate_clear_password(password_length)
# Вычисляем хеш для нового пароля
new_hash = compute_hash(new_password, old_guid)
# Обновляем пароль пользователя
user.GeneralPassword = new_hash
# Записываем результат
result_line = f"{user_id}\t{login}\t{old_guid}\t{new_password}\t{new_hash}\tSUCCESS"
f.write(result_line + "\n")
results.append({
'user_id': user_id,
'login': login,
'guid': old_guid,
'password': new_password,
'hash': new_hash,
'status': 'SUCCESS'
})
updated_count += 1
logging.info(f'Password updated for user ID: {user_id}, Login: {login}')
except Exception as e:
error_msg = f"Error updating user {user_id}: {str(e)}"
result_line = f"{user_id}\t{login}\t{old_guid}\t\t\tERROR: {str(e)}"
f.write(result_line + "\n")
results.append({
'user_id': user_id,
'login': login,
'guid': old_guid,
'password': '',
'hash': '',
'status': f'ERROR: {str(e)}'
})
logging.error(error_msg)
# Коммитим все изменения в БД
db.commit()
logging.info(f'Successfully updated passwords for {updated_count} users')
logging.info(f'Results saved to {output_file}')
return updated_count, results
except Exception as e:
# Откатываем изменения в случае ошибки
db.rollback()
logging.error(f'Error processing users: {str(e)}')
raise
finally:
# Закрываем сессию
db.close()
# Альтернативная версия с использованием прямого SQL (без ORM)
# def update_passwords_direct_sql(user_data_list: list, password_length: int = 12,
# output_file: str = 'updated_passwords_direct.txt') -> tuple:
# """
# Обновляет пароли используя прямое SQL выполнение
# """
# results = []
# updated_count = 0
# # Создаем сессию БД
# db = SessionLocal()
# try:
# with open(output_file, 'w', encoding='utf-8') as f:
# f.write("UserID\tGeneralLogin\tGUID\tNewPassword\tPasswordHash\tStatus\n")
# for user_id, login, old_guid in user_data_list:
# try:
# # Генерируем случайный пароль
# new_password = generate_clear_password(password_length)
# # Вычисляем хеш
# new_hash = compute_hash(new_password, old_guid)
# # Прямой SQL запрос для обновления
# stmt = x_user_table.update().where(
# x_user_table.c.UserID == user_id
# ).values(
# GeneralPassword=new_hash
# )
# # Выполняем запрос
# result = db.execute(stmt)
# if result.rowcount == 0:
# # Пользователь не найден
# result_line = f"{user_id}\t{login}\t{old_guid}\t\t\tERROR: User not found"
# results.append({
# 'user_id': user_id, 'login': login, 'guid': old_guid,
# 'password': '', 'hash': '', 'status': 'ERROR: User not found'
# })
# else:
# # Успешное обновление
# result_line = f"{user_id}\t{login}\t{old_guid}\t{new_password}\t{new_hash}\tSUCCESS"
# results.append({
# 'user_id': user_id, 'login': login, 'guid': old_guid,
# 'password': new_password, 'hash': new_hash, 'status': 'SUCCESS'
# })
# updated_count += 1
# f.write(result_line + "\n")
# logging.info(f'Processed user ID: {user_id}, Login: {login}')
# except Exception as e:
# error_msg = f"Error updating user {user_id}: {str(e)}"
# result_line = f"{user_id}\t{login}\t{old_guid}\t\t\tERROR: {str(e)}"
# f.write(result_line + "\n")
# results.append({
# 'user_id': user_id, 'login': login, 'guid': old_guid,
# 'password': '', 'hash': '', 'status': f'ERROR: {str(e)}'
# })
# logging.error(error_msg)
# # Коммитим изменения
# db.commit()
# logging.info(f'Successfully updated passwords for {updated_count} users')
# return updated_count, results
# except Exception as e:
# db.rollback()
# logging.error(f'Error processing users: {str(e)}')
# raise
# finally:
# db.close()
def create_user_list_from_data() -> list:
"""
Создает список пользователей из предоставленных данных
"""
user_data = [
(2679, 'АбулдиновАС', '83025555-FFB0-4AAD-82B7-BA990198749B'),
]
return user_data
def test_database_connection():
"""Тестирует подключение к БД"""
try:
db = SessionLocal()
# Простой запрос для проверки подключения
result = db.execute("SELECT 1")
db.close()
print("✓ Подключение к БД успешно")
return True
except Exception as e:
print(f"✗ Ошибка подключения к БД: {e}")
return False
def test_password_generation():
"""Тестирует генерацию паролей"""
print("Тест генерации паролей:")
for i in range(5):
password = generate_clear_password(12)
print(f" Пароль {i+1}: {password}")
def main():
"""Основная функция для выполнения обновления паролей"""
print("Начало обновления паролей...")
# Тестируем подключение к БД
if not test_database_connection():
return
# Тестируем генерацию паролей
test_password_generation()
# Получаем список пользователей
user_list = create_user_list_from_data()
print(f"Найдено пользователей для обработки: {len(user_list)}")
# Выберите метод обновления:
# 1. Через ORM (рекомендуется)
print("\nИспользование ORM метода...")
try:
updated_count, results = update_passwords_with_random(
user_list,
password_length=12,
output_file='updated_passwords_orm.txt'
)
print(f"Результаты (ORM):")
print(f" Успешно обновлено: {updated_count} пользователей")
print(f" Ошибки: {len(user_list) - updated_count} пользователей")
except Exception as e:
print(f"Ошибка при обновлении паролей (ORM): {e}")
# 2. Через прямое SQL (альтернатива)
# print("\nИспользование прямого SQL метода...")
# try:
# updated_count, results = update_passwords_direct_sql(
# user_list,
# password_length=12,
# output_file='updated_passwords_sql.txt'
# )
# print(f"Результаты (SQL):")
# print(f" Успешно обновлено: {updated_count} пользователей")
# print(f" Ошибки: {len(user_list) - updated_count} пользователей")
# except Exception as e:
# print(f"Ошибка при обновлении паролей (SQL): {e}")
# Создаем удобный отчет
if 'results' in locals():
create_password_report(results)
def create_password_report(results: list, filename: str = 'password_report.txt'):
"""Создает удобный отчет с паролями"""
with open(filename, 'w', encoding='utf-8') as f:
f.write("ОТЧЕТ ОБ ОБНОВЛЕНИИ ПАРОЛЕЙ\n")
f.write("=" * 50 + "\n\n")
success_results = [r for r in results if r['status'] == 'SUCCESS']
error_results = [r for r in results if r['status'] != 'SUCCESS']
f.write(f"Всего пользователей: {len(results)}\n")
f.write(f"Успешно обновлено: {len(success_results)}\n")
f.write(f"Ошибок: {len(error_results)}\n\n")
f.write("СПИСОК ПАРОЛЕЙ:\n")
f.write("-" * 50 + "\n")
for result in success_results:
f.write(f"Логин: {result['login']}\n")
f.write(f"UserID: {result['user_id']}\n")
f.write(f"Пароль: {result['password']}\n")
f.write(f"GUID: {result['guid']}\n")
f.write(f"Хеш: {result['hash']}\n")
f.write("-" * 30 + "\n")
if error_results:
f.write("\nОШИБКИ:\n")
f.write("-" * 30 + "\n")
for error in error_results:
f.write(f"UserID: {error['user_id']}, Логин: {error['login']}\n")
f.write(f"Ошибка: {error['status']}\n")
f.write("-" * 20 + "\n")
if __name__ == "__main__":
main()