Files
change-passwords-mis-py/main.py

408 lines
17 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import uuid
import hashlib
import base64
import logging
import random
from sqlalchemy import create_engine, Column, Integer, String, MetaData, Table, text
from sqlalchemy.orm import sessionmaker
from sqlalchemy.ext.declarative import declarative_base
# Настройка логирования
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
# Настройка подключения к БД
DATABASE_URL = "mssql://mis_aokb_desktop_srv:HLqKhOMwPRjNAVvLIKtc@10.48.89.4:1433/amu_mis_AOKB_prod?driver=ODBC+Driver+18+for+SQL+Server&TrustServerCertificate=yes"
# Создаем движок
engine = create_engine(DATABASE_URL)
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
# Базовый класс для моделей
Base = declarative_base()
# Модель пользователя
class XUser(Base):
__tablename__ = 'x_User'
UserID = Column(Integer, primary_key=True)
GeneralPassword = Column(String(255))
GUID = Column(String(36), nullable=True)
# Альтернативный способ через Table (если не хотите использовать ORM)
# metadata = MetaData()
# x_user_table = Table(
# 'x_User', metadata,
# Column('UserID', Integer, primary_key=True),
# Column('GeneralLogin', String(255)),
# Column('GeneralPassword', String(255)),
# Column('AuthMode', bool(0)),
# Column('FIO', String(255)),
# Column('GUID', String(36))
# )
def get_db():
"""Получение сессии БД"""
db = SessionLocal()
try:
yield db
finally:
db.close()
def generate_clear_password(length: int = 12) -> str:
"""
Генерирует случайный пароль без неоднозначных символов
Исключает:
- Символы, которые можно спутать: 1, l, I, 0, O, o
- Специальные символы
Args:
length: длина пароля
Returns:
Случайный пароль
"""
# Безопасные символы: только цифры и буквы (без неоднозначных)
safe_digits = '23456789' # без 0,1
safe_uppercase = 'ABCDEFGHJKLMNPQRSTUVWXYZ' # без I, O
safe_lowercase = 'abcdefghjkmnpqrstuvwxyz' # без l, o
all_chars = safe_digits + safe_uppercase + safe_lowercase
# Гарантируем, что пароль содержит хотя бы по одному символу из каждой категории
password_chars = [
random.choice(safe_digits),
random.choice(safe_uppercase),
random.choice(safe_lowercase)
]
# Заполняем оставшуюся длину случайными символами из всех категорий
for _ in range(length - 3):
password_chars.append(random.choice(all_chars))
# Перемешиваем символы
random.shuffle(password_chars)
return ''.join(password_chars)
def compute_hash(password: str, guid: str) -> str:
"""
Вычисляет хеш пароля по алгоритму из PHP метода
Args:
password: пароль
guid: GUID в строковом формате
Returns:
base64-encoded хеш
"""
try:
# Проверяем, что GUID валидный
uuid_obj = uuid.UUID(guid)
except ValueError:
logging.warning('Invalid GUID format provided', extra={'guid': guid})
raise ValueError('The provided GUID is not valid')
text = guid.upper()
bytes_data = (text + password + text).encode('utf-8')
# Хеширование SHA1
hash_bytes = hashlib.sha1(bytes_data).digest()
# 4-1 итерации повторного хеширования (3 дополнительные итерации)
for i in range(3):
hash_bytes = hashlib.sha1(hash_bytes).digest()
return base64.b64encode(hash_bytes).decode('utf-8')
def update_passwords_with_random(user_data_list: list, password_length: int = 12,
output_file: str = 'updated_passwords_with_random.txt') -> tuple:
"""
Обновляет GeneralPassword для списка пользователей с генерацией случайных паролей
Args:
user_data_list: список кортежей (UserID, GeneralLogin, GUID)
password_length: длина генерируемого пароля
output_file: имя файла для сохранения результатов
Returns:
tuple: (количество обновленных пользователей, список результатов)
"""
results = []
updated_count = 0
# Создаем сессию БД
db = SessionLocal()
try:
with open(output_file, 'w', encoding='utf-8') as f:
# Записываем заголовок
f.write("UserID\tGeneralLogin\tGUID\tNewPassword\tPasswordHash\tStatus\n")
for user_id, login, old_guid in user_data_list:
try:
# Получаем пользователя из базы данных через ORM
user = db.query(XUser).filter(XUser.UserID == user_id).first()
if not user:
error_msg = f"User with ID {user_id} not found"
result_line = f"{user_id}\t{login}\t{old_guid}\t\t\tERROR: User not found"
f.write(result_line + "\n")
results.append({
'user_id': user_id,
'login': login,
'guid': old_guid,
'password': '',
'hash': '',
'status': 'ERROR: User not found'
})
logging.error(error_msg)
continue
# Генерируем случайный пароль
new_password = generate_clear_password(password_length)
# Вычисляем хеш для нового пароля
new_hash = compute_hash(new_password, old_guid)
# Обновляем пароль пользователя
user.GeneralPassword = new_hash
# Записываем результат
result_line = f"{user_id}\t{login}\t{old_guid}\t{new_password}\t{new_hash}\tSUCCESS"
f.write(result_line + "\n")
results.append({
'user_id': user_id,
'login': login,
'guid': old_guid,
'password': new_password,
'hash': new_hash,
'status': 'SUCCESS'
})
updated_count += 1
logging.info(f'Password updated for user ID: {user_id}, Login: {login}')
except Exception as e:
error_msg = f"Error updating user {user_id}: {str(e)}"
result_line = f"{user_id}\t{login}\t{old_guid}\t\t\tERROR: {str(e)}"
f.write(result_line + "\n")
results.append({
'user_id': user_id,
'login': login,
'guid': old_guid,
'password': '',
'hash': '',
'status': f'ERROR: {str(e)}'
})
logging.error(error_msg)
# Коммитим все изменения в БД
db.commit()
logging.info(f'Successfully updated passwords for {updated_count} users')
logging.info(f'Results saved to {output_file}')
return updated_count, results
except Exception as e:
# Откатываем изменения в случае ошибки
db.rollback()
logging.error(f'Error processing users: {str(e)}')
raise
finally:
# Закрываем сессию
db.close()
# Альтернативная версия с использованием прямого SQL (без ORM)
# def update_passwords_direct_sql(user_data_list: list, password_length: int = 12,
# output_file: str = 'updated_passwords_direct.txt') -> tuple:
# """
# Обновляет пароли используя прямое SQL выполнение
# """
# results = []
# updated_count = 0
# # Создаем сессию БД
# db = SessionLocal()
# try:
# with open(output_file, 'w', encoding='utf-8') as f:
# f.write("UserID\tGeneralLogin\tGUID\tNewPassword\tPasswordHash\tStatus\n")
# for user_id, login, old_guid in user_data_list:
# try:
# # Генерируем случайный пароль
# new_password = generate_clear_password(password_length)
# # Вычисляем хеш
# new_hash = compute_hash(new_password, old_guid)
# # Прямой SQL запрос для обновления
# stmt = x_user_table.update().where(
# x_user_table.c.UserID == user_id
# ).values(
# GeneralPassword=new_hash
# )
# # Выполняем запрос
# result = db.execute(stmt)
# if result.rowcount == 0:
# # Пользователь не найден
# result_line = f"{user_id}\t{login}\t{old_guid}\t\t\tERROR: User not found"
# results.append({
# 'user_id': user_id, 'login': login, 'guid': old_guid,
# 'password': '', 'hash': '', 'status': 'ERROR: User not found'
# })
# else:
# # Успешное обновление
# result_line = f"{user_id}\t{login}\t{old_guid}\t{new_password}\t{new_hash}\tSUCCESS"
# results.append({
# 'user_id': user_id, 'login': login, 'guid': old_guid,
# 'password': new_password, 'hash': new_hash, 'status': 'SUCCESS'
# })
# updated_count += 1
# f.write(result_line + "\n")
# logging.info(f'Processed user ID: {user_id}, Login: {login}')
# except Exception as e:
# error_msg = f"Error updating user {user_id}: {str(e)}"
# result_line = f"{user_id}\t{login}\t{old_guid}\t\t\tERROR: {str(e)}"
# f.write(result_line + "\n")
# results.append({
# 'user_id': user_id, 'login': login, 'guid': old_guid,
# 'password': '', 'hash': '', 'status': f'ERROR: {str(e)}'
# })
# logging.error(error_msg)
# # Коммитим изменения
# db.commit()
# logging.info(f'Successfully updated passwords for {updated_count} users')
# return updated_count, results
# except Exception as e:
# db.rollback()
# logging.error(f'Error processing users: {str(e)}')
# raise
# finally:
# db.close()
def create_user_list_from_data() -> list:
"""
Создает список пользователей из предоставленных данных
"""
user_data = [
(2679, 'АбулдиновАС', '83025555-FFB0-4AAD-82B7-BA990198749B'),
]
return user_data
def test_database_connection():
"""Тестирует подключение к БД"""
try:
db = SessionLocal()
# Простой запрос для проверки подключения
result = db.execute(text("SELECT 1"))
db.close()
print("✓ Подключение к БД успешно")
return True
except Exception as e:
print(f"✗ Ошибка подключения к БД: {e}")
return False
def test_password_generation():
"""Тестирует генерацию паролей"""
print("Тест генерации паролей:")
for i in range(5):
password = generate_clear_password(12)
print(f" Пароль {i+1}: {password}")
def main():
"""Основная функция для выполнения обновления паролей"""
print("Начало обновления паролей...")
# Тестируем подключение к БД
if not test_database_connection():
return
# Тестируем генерацию паролей
test_password_generation()
# Получаем список пользователей
user_list = create_user_list_from_data()
print(f"Найдено пользователей для обработки: {len(user_list)}")
# Выберите метод обновления:
# 1. Через ORM (рекомендуется)
print("\nИспользование ORM метода...")
try:
updated_count, results = update_passwords_with_random(
user_list,
password_length=12,
output_file='updated_passwords_orm.txt'
)
print(f"Результаты (ORM):")
print(f" Успешно обновлено: {updated_count} пользователей")
print(f" Ошибки: {len(user_list) - updated_count} пользователей")
except Exception as e:
print(f"Ошибка при обновлении паролей (ORM): {e}")
# 2. Через прямое SQL (альтернатива)
# print("\nИспользование прямого SQL метода...")
# try:
# updated_count, results = update_passwords_direct_sql(
# user_list,
# password_length=12,
# output_file='updated_passwords_sql.txt'
# )
# print(f"Результаты (SQL):")
# print(f" Успешно обновлено: {updated_count} пользователей")
# print(f" Ошибки: {len(user_list) - updated_count} пользователей")
# except Exception as e:
# print(f"Ошибка при обновлении паролей (SQL): {e}")
# Создаем удобный отчет
if 'results' in locals():
create_password_report(results)
def create_password_report(results: list, filename: str = 'password_report.txt'):
"""Создает удобный отчет с паролями"""
with open(filename, 'w', encoding='utf-8') as f:
f.write("ОТЧЕТ ОБ ОБНОВЛЕНИИ ПАРОЛЕЙ\n")
f.write("=" * 50 + "\n\n")
success_results = [r for r in results if r['status'] == 'SUCCESS']
error_results = [r for r in results if r['status'] != 'SUCCESS']
f.write(f"Всего пользователей: {len(results)}\n")
f.write(f"Успешно обновлено: {len(success_results)}\n")
f.write(f"Ошибок: {len(error_results)}\n\n")
f.write("СПИСОК ПАРОЛЕЙ:\n")
f.write("-" * 50 + "\n")
for result in success_results:
f.write(f"Логин: {result['login']}\n")
f.write(f"UserID: {result['user_id']}\n")
f.write(f"Пароль: {result['password']}\n")
f.write(f"GUID: {result['guid']}\n")
f.write(f"Хеш: {result['hash']}\n")
f.write("-" * 30 + "\n")
if error_results:
f.write("\nОШИБКИ:\n")
f.write("-" * 30 + "\n")
for error in error_results:
f.write(f"UserID: {error['user_id']}, Логин: {error['login']}\n")
f.write(f"Ошибка: {error['status']}\n")
f.write("-" * 20 + "\n")
if __name__ == "__main__":
main()