#!/usr/bin/env python3 """ Database Migration Script This script migrates data from a source PostgreSQL database to a target PostgreSQL database. It is called by the Laravel application via the RunMigrationJob. Usage: python migrate_data.py \ --source-host localhost \ --source-port 5432 \ --source-database source_db \ --source-username user \ --source-password pass \ --target-host localhost \ --target-port 5432 \ --target-database target_db \ --target-username user \ --target-password pass \ --tables 1,2,3 \ --batch-size 1000 \ --run-id 123 \ [--truncate] \ [--skip-indexes] """ import argparse import psycopg2 from psycopg2 import sql, extras from psycopg2.extensions import connection import sys import json from typing import List, Dict, Any, Optional def parse_args() -> argparse.Namespace: parser = argparse.ArgumentParser(description='Migrate data between PostgreSQL databases') # Source database parser.add_argument('--source-host', required=True, help='Source database host') parser.add_argument('--source-port', type=int, required=True, help='Source database port') parser.add_argument('--source-database', required=True, help='Source database name') parser.add_argument('--source-username', required=True, help='Source database username') parser.add_argument('--source-password', required=True, help='Source database password') parser.add_argument('--source-driver', default='pgsql', help='Source database driver (pgsql or mssql)') # Target database parser.add_argument('--target-host', required=True, help='Target database host') parser.add_argument('--target-port', type=int, required=True, help='Target database port') parser.add_argument('--target-database', required=True, help='Target database name') parser.add_argument('--target-username', required=True, help='Target database username') parser.add_argument('--target-password', required=True, help='Target database password') parser.add_argument('--target-driver', default='pgsql', help='Target database driver (pgsql or mssql)') # Migration options parser.add_argument('--tables', required=True, help='Comma-separated list of tables to migrate (format: schema.table_name)') parser.add_argument('--batch-size', type=int, default=1000, help='Number of rows to migrate per batch') parser.add_argument('--run-id', required=True, help='Migration run ID for logging') parser.add_argument('--truncate', action='store_true', help='Truncate target tables before migration') parser.add_argument('--skip-indexes', action='store_true', help='Skip creating indexes after migration') parser.add_argument('--incremental', action='store_true', help='Enable incremental migration') parser.add_argument('--since', type=str, help='Migrate only records modified since this ISO 8601 timestamp') parser.add_argument('--incremental-column', type=str, default='updated_at', help='Column to use for incremental migration (default: updated_at)') parser.add_argument('--use-life', action='store_true', help='Use MSSQL Life tables for incremental migration') parser.add_argument('--life-table', type=str, help='Name of the Life table for change tracking') parser.add_argument('--life-id-column', type=str, help='Name of the LifeID column (e.g., VisitResultLifeID)') parser.add_argument('--base-id-column', type=str, help='Name of the base ID column (e.g., VisitResultID)') parser.add_argument('--operation-column', type=str, default='x_Operation', help='Column name for operation type (i/u/d)') parser.add_argument('--datetime-column', type=str, default='x_DateTime', help='Column name for change timestamp') return parser.parse_args() def get_connection(host: str, port: int, database: str, username: str, password: str, driver: str = 'pgsql') -> connection: """Create a database connection based on driver type.""" if driver == 'mssql': import pymssql return pymssql.connect( server=host, port=port, database=database, user=username, password=password, as_dict=True ) else: # pgsql (default) import psycopg2 return psycopg2.connect( host=host, port=port, database=database, user=username, password=password ) def get_table_info(conn: connection, table_id: int) -> Optional[Dict[str, Any]]: """Get table information from the Laravel database.""" # This would typically query the Laravel database for table details # For now, we'll use a simplified approach return {'id': table_id} def get_columns_for_table(source_conn: connection, table_name: str, schema: str = 'public') -> List[Dict[str, Any]]: """Get column information for a table.""" with source_conn.cursor(cursor_factory=extras.RealDictCursor) as cur: cur.execute(""" SELECT column_name, data_type, udt_name, is_nullable, column_default FROM information_schema.columns WHERE table_schema = %s AND table_name = %s ORDER BY ordinal_position """, (schema, table_name)) return cur.fetchall() def migrate_table( source_conn: connection, target_conn: connection, table_name: str, schema: str, batch_size: int, truncate: bool, incremental: bool = False, since: str = None, incremental_column: str = 'updated_at', use_life: bool = False, life_table: str = None, life_id_column: str = None, base_id_column: str = None, operation_column: str = 'x_Operation', datetime_column: str = 'x_DateTime' ) -> Dict[str, int]: """Migrate a single table from source to target.""" migrated_rows = 0 failed_rows = 0 if use_life and life_table and since: # MIGRATION VIA LIFE TABLE return migrate_via_life_table( source_conn, target_conn, table_name, schema, life_table, life_id_column, base_id_column, operation_column, datetime_column, since, batch_size ) # STANDARD MIGRATION (by ID or timestamp) with source_conn.cursor(cursor_factory=extras.RealDictCursor) as source_cur: # Build WHERE clause for incremental migration where_clause = "" params = [] if incremental and since: where_clause = f" WHERE {incremental_column} > %s" params = [since] print(f"Incremental migration: {incremental_column} > {since}") # Get total count count_query = sql.SQL("SELECT COUNT(*) FROM {}.{}").format( sql.Identifier(schema), sql.Identifier(table_name) ) if where_clause: count_query += sql.SQL(where_clause) source_cur.execute(count_query, params) total_rows = source_cur.fetchone()['count'] print(f"Table {schema}.{table_name}: {total_rows} rows to migrate") if total_rows == 0: return {'migrated': 0, 'failed': 0} # Truncate target table if requested (not for incremental) if truncate and not incremental: with target_conn.cursor() as target_cur: try: target_cur.execute(sql.SQL("TRUNCATE TABLE {}.{} CASCADE").format( sql.Identifier(schema), sql.Identifier(table_name) )) target_conn.commit() print(f"Truncated {schema}.{table_name}") except Exception as e: print(f"Warning: Could not truncate {schema}.{table_name}: {e}") target_conn.rollback() # Get columns columns = get_columns_for_table(source_conn, table_name, schema) column_names = [col['column_name'] for col in columns] # Fetch and insert in batches offset = 0 while offset < total_rows: try: # Fetch batch from source select_query = sql.SQL(""" SELECT {} FROM {}.{} """).format( sql.SQL(', ').join([sql.Identifier(col) for col in column_names]), sql.Identifier(schema), sql.Identifier(table_name) ) if where_clause: select_query += sql.SQL(where_clause) select_query += sql.SQL(" LIMIT %s OFFSET %s") fetch_params = params + [batch_size, offset] source_cur.execute(select_query, fetch_params) rows = source_cur.fetchall() if not rows: break # Insert batch into target with target_conn.cursor() as target_cur: placeholders = sql.SQL(', ').join([sql.SQL('%s')] * len(column_names)) insert_query = sql.SQL("INSERT INTO {}.{} ({}) VALUES ({})").format( sql.Identifier(schema), sql.Identifier(table_name), sql.SQL(', ').join([sql.Identifier(col) for col in column_names]), placeholders ) extras.execute_batch(target_cur, insert_query, [tuple(row[col] for col in column_names) for row in rows], page_size=batch_size) target_conn.commit() migrated_rows += len(rows) offset += batch_size print(f"Migrated {migrated_rows}/{total_rows} rows") except Exception as e: print(f"Error migrating batch: {e}") failed_rows += len(rows) if 'rows' in locals() else batch_size target_conn.rollback() break return {'migrated': migrated_rows, 'failed': failed_rows} def migrate_via_life_table( source_conn, target_conn, table_name: str, schema: str, life_table: str, life_id_column: str, base_id_column: str, operation_column: str, datetime_column: str, since: str, batch_size: int ) -> Dict[str, int]: """ Migrate table using MSSQL Life table mechanism. Life tables track all changes (insert, update, delete) with: - x_Operation: 'i' = insert, 'u' = update, 'd' = delete - x_DateTime: timestamp of change - {Table}LifeID: unique ID in Life table - {Table}ID: reference to base table ID """ stats = {'inserted': 0, 'updated': 0, 'deleted': 0, 'total': 0} print(f"Life table migration: {schema}.{table_name} via {life_table}") print(f"Changes since: {since}") try: # For MSSQL source, use different connection handling is_mssql = source_conn.dsn_parameters.get('driver') == 'sqlsrv' if hasattr(source_conn, 'dsn_parameters') else False if is_mssql: # MSSQL-specific query with CTE for latest versions cursor = source_conn.cursor(as_dict=True) # Get latest version of each changed record query = f""" WITH LatestLife AS ( SELECT {base_id_column}, MAX({life_id_column}) as MaxLifeID FROM {life_table} WHERE {datetime_column} > ? GROUP BY {base_id_column} ) SELECT dl.* FROM {life_table} dl INNER JOIN LatestLife ll ON dl.{life_id_column} = ll.MaxLifeID ORDER BY dl.{base_id_column} """ cursor.execute(query, (since,)) # Process in batches rows = [] for row in cursor: rows.append(dict(row)) if len(rows) >= batch_size: process_life_batch( rows, target_conn, table_name, schema, base_id_column, operation_column, stats ) rows = [] # Process remaining rows if rows: process_life_batch( rows, target_conn, table_name, schema, base_id_column, operation_column, stats ) cursor.close() else: # PostgreSQL source (for testing) with source_conn.cursor(cursor_factory=extras.RealDictCursor) as cursor: query = sql.SQL(""" WITH LatestLife AS ( SELECT {} as base_id, MAX({}) as MaxLifeID FROM {} WHERE {} > %s GROUP BY {} ) SELECT dl.* FROM {} dl INNER JOIN LatestLife ll ON dl.{} = ll.MaxLifeID ORDER BY dl.{} """).format( sql.Identifier(base_id_column), sql.Identifier(life_id_column), sql.SQL(life_table), sql.Identifier(datetime_column), sql.Identifier(base_id_column), sql.SQL(life_table), sql.Identifier(life_id_column), sql.Identifier(base_id_column) ) cursor.execute(query, (since,)) rows = cursor.fetchall() process_life_batch( rows, target_conn, table_name, schema, base_id_column, operation_column, stats ) print(f"Life migration complete: +{stats['inserted']} inserts, ~{stats['updated']} updates, -{stats['deleted']} deletes") except Exception as e: print(f"Error in Life table migration: {e}") import traceback traceback.print_exc() stats['error'] = str(e) return stats def process_life_batch( rows: List[Dict], target_conn, table_name: str, schema: str, base_id_column: str, operation_column: str, stats: Dict[str, int] ): """Process a batch of Life table records""" if not rows: return # Separate by operation type inserts = [r for r in rows if r.get(operation_column) == 'i'] updates = [r for r in rows if r.get(operation_column) == 'u'] deletes = [r for r in rows if r.get(operation_column) == 'd'] # Exclude Life-specific fields exclude_fields = {operation_column, 'x_DateTime', 'x_Seance', 'x_User'} life_id_field = None for key in rows[0].keys(): if key.endswith('LifeID'): life_id_field = key exclude_fields.add(key) # Get columns to write write_columns = [k for k in rows[0].keys() if k not in exclude_fields] # Process inserts if inserts: with target_conn.cursor() as cur: for row in inserts: values = [row[col] for col in write_columns] placeholders = ','.join(['%s'] * len(values)) columns_sql = ','.join([f'"{col}"' for col in write_columns]) insert_sql = f""" INSERT INTO "{schema}"."{table_name}" ({columns_sql}) VALUES ({placeholders}) ON CONFLICT ("{base_id_column}") DO UPDATE SET {', '.join([f'"{col}" = EXCLUDED."{col}"' for col in write_columns if col != base_id_column])} """ try: cur.execute(insert_sql, values) stats['inserted'] += 1 except Exception as e: print(f"Error inserting: {e}") stats['failed'] = stats.get('failed', 0) + 1 target_conn.commit() # Process updates if updates: with target_conn.cursor() as cur: for row in updates: values = [row[col] for col in write_columns] set_clause = ', '.join([f'"{col}" = %s' for col in write_columns if col != base_id_column]) update_sql = f""" UPDATE "{schema}"."{table_name}" SET {set_clause} WHERE "{base_id_column}" = %s """ try: cur.execute(update_sql, values + [row[base_id_column]]) stats['updated'] += 1 except Exception as e: print(f"Error updating: {e}") stats['failed'] = stats.get('failed', 0) + 1 target_conn.commit() # Process deletes if deletes: with target_conn.cursor() as cur: delete_ids = [r[base_id_column] for r in deletes] delete_sql = f""" DELETE FROM "{schema}"."{table_name}" WHERE "{base_id_column}" = ANY(%s) """ try: cur.execute(delete_sql, (delete_ids,)) stats['deleted'] += cur.rowcount except Exception as e: print(f"Error deleting: {e}") stats['failed'] = stats.get('failed', 0) + len(delete_ids) target_conn.commit() stats['total'] += len(rows) def main() -> int: args = parse_args() source_conn = None target_conn = None try: print(f"Starting migration run {args.run_id}") print(f"Source: {args.source_host}:{args.source_port}/{args.source_database}") print(f"Target: {args.target_host}:{args.target_port}/{args.target_database}") print(f"Tables: {args.tables}") print(f"Batch size: {args.batch_size}") print(f"Truncate before migration: {args.truncate}") # Parse table names (format: schema.table_name) table_names = [t.strip() for t in args.tables.split(',')] # Connect to databases source_conn = get_connection( args.source_host, args.source_port, args.source_database, args.source_username, args.source_password, args.source_driver ) target_conn = get_connection( args.target_host, args.target_port, args.target_database, args.target_username, args.target_password, args.target_driver ) print("Connected to both databases") total_migrated = 0 total_failed = 0 for table_full_name in table_names: try: # Parse schema and table name if '.' in table_full_name: schema, table_name = table_full_name.split('.', 1) else: schema = 'public' table_name = table_full_name print(f"\nProcessing table: {schema}.{table_name}") # Check if table exists in source with source_conn.cursor() as cur: cur.execute(""" SELECT EXISTS ( SELECT FROM information_schema.tables WHERE table_schema = %s AND table_name = %s ) """, (schema, table_name)) exists = cur.fetchone()[0] if not exists: print(f"Table {schema}.{table_name} does not exist in source, skipping") continue # Get table columns from source with source_conn.cursor() as cur: cur.execute(""" SELECT column_name, data_type, character_maximum_length, numeric_precision, numeric_scale FROM information_schema.columns WHERE table_schema = %s AND table_name = %s ORDER BY ordinal_position """, (schema, table_name)) columns = cur.fetchall() if not columns: print(f"No columns found for {schema}.{table_name}, skipping") continue # Check if table exists in target target_cur = target_conn.cursor() target_cur.execute(""" SELECT EXISTS ( SELECT FROM information_schema.tables WHERE table_schema = %s AND table_name = %s ) """, (schema, table_name)) target_exists = target_cur.fetchone()[0] target_cur.close() # Create table in target if it doesn't exist if not target_exists: print(f"Creating table {schema}.{table_name} in target...") # Build CREATE TABLE statement column_defs = [] for col in columns: col_name, data_type, char_len, num_precision, num_scale = col type_def = data_type if char_len and data_type in ('character varying', 'character', 'text'): type_def = f"{data_type}({char_len})" elif num_precision and data_type in ('numeric', 'decimal'): if num_scale: type_def = f"{data_type}({num_precision},{num_scale})" else: type_def = f"{data_type}({num_precision})" column_defs.append(f'"{col_name}" {type_def}') create_sql = f'CREATE TABLE "{schema}"."{table_name}" ({", ".join(column_defs)})' target_cur = target_conn.cursor() target_cur.execute(create_sql) target_conn.commit() target_cur.close() print(f"Table {schema}.{table_name} created in target") else: print(f"Table {schema}.{table_name} already exists in target") # Migrate the data result = migrate_table( source_conn, target_conn, table_name, schema, args.batch_size, args.truncate, incremental=args.incremental, since=args.since, incremental_column=args.incremental_column, use_life=args.use_life, life_table=args.life_table, life_id_column=args.life_id_column, base_id_column=args.base_id_column, operation_column=args.operation_column, datetime_column=args.datetime_column ) total_migrated += result['migrated'] total_failed += result['failed'] except Exception as e: print(f"Error migrating table {table_full_name}: {e}") total_failed += 1 print(f"\nMigration complete!") print(f"Migrated {total_migrated} rows") print(f"Failed {total_failed} rows") return 0 if total_failed == 0 else 1 except Exception as e: print(f"Migration failed: {e}") import traceback traceback.print_exc() return 1 finally: if source_conn: source_conn.close() if target_conn: target_conn.close() if __name__ == '__main__': sys.exit(main())