Files
project-replica/scripts/migrate_data.py
2026-03-23 00:51:38 +09:00

611 lines
24 KiB
Python
Executable File

#!/usr/bin/env python3
"""
Database Migration Script
This script migrates data from a source PostgreSQL database to a target PostgreSQL database.
It is called by the Laravel application via the RunMigrationJob.
Usage:
python migrate_data.py \
--source-host localhost \
--source-port 5432 \
--source-database source_db \
--source-username user \
--source-password pass \
--target-host localhost \
--target-port 5432 \
--target-database target_db \
--target-username user \
--target-password pass \
--tables 1,2,3 \
--batch-size 1000 \
--run-id 123 \
[--truncate] \
[--skip-indexes]
"""
import argparse
import psycopg2
from psycopg2 import sql, extras
from psycopg2.extensions import connection
import sys
import json
from typing import List, Dict, Any, Optional
def parse_args() -> argparse.Namespace:
parser = argparse.ArgumentParser(description='Migrate data between PostgreSQL databases')
# Source database
parser.add_argument('--source-host', required=True, help='Source database host')
parser.add_argument('--source-port', type=int, required=True, help='Source database port')
parser.add_argument('--source-database', required=True, help='Source database name')
parser.add_argument('--source-username', required=True, help='Source database username')
parser.add_argument('--source-password', required=True, help='Source database password')
parser.add_argument('--source-driver', default='pgsql', help='Source database driver (pgsql or mssql)')
# Target database
parser.add_argument('--target-host', required=True, help='Target database host')
parser.add_argument('--target-port', type=int, required=True, help='Target database port')
parser.add_argument('--target-database', required=True, help='Target database name')
parser.add_argument('--target-username', required=True, help='Target database username')
parser.add_argument('--target-password', required=True, help='Target database password')
parser.add_argument('--target-driver', default='pgsql', help='Target database driver (pgsql or mssql)')
# Migration options
parser.add_argument('--tables', required=True, help='Comma-separated list of tables to migrate (format: schema.table_name)')
parser.add_argument('--batch-size', type=int, default=1000, help='Number of rows to migrate per batch')
parser.add_argument('--run-id', required=True, help='Migration run ID for logging')
parser.add_argument('--truncate', action='store_true', help='Truncate target tables before migration')
parser.add_argument('--skip-indexes', action='store_true', help='Skip creating indexes after migration')
parser.add_argument('--incremental', action='store_true', help='Enable incremental migration')
parser.add_argument('--since', type=str, help='Migrate only records modified since this ISO 8601 timestamp')
parser.add_argument('--incremental-column', type=str, default='updated_at', help='Column to use for incremental migration (default: updated_at)')
parser.add_argument('--use-life', action='store_true', help='Use MSSQL Life tables for incremental migration')
parser.add_argument('--life-table', type=str, help='Name of the Life table for change tracking')
parser.add_argument('--life-id-column', type=str, help='Name of the LifeID column (e.g., VisitResultLifeID)')
parser.add_argument('--base-id-column', type=str, help='Name of the base ID column (e.g., VisitResultID)')
parser.add_argument('--operation-column', type=str, default='x_Operation', help='Column name for operation type (i/u/d)')
parser.add_argument('--datetime-column', type=str, default='x_DateTime', help='Column name for change timestamp')
return parser.parse_args()
def get_connection(host: str, port: int, database: str, username: str, password: str, driver: str = 'pgsql') -> connection:
"""Create a database connection based on driver type."""
if driver == 'mssql':
import pymssql
return pymssql.connect(
server=host,
port=port,
database=database,
user=username,
password=password,
as_dict=True
)
else: # pgsql (default)
import psycopg2
return psycopg2.connect(
host=host,
port=port,
database=database,
user=username,
password=password
)
def get_table_info(conn: connection, table_id: int) -> Optional[Dict[str, Any]]:
"""Get table information from the Laravel database."""
# This would typically query the Laravel database for table details
# For now, we'll use a simplified approach
return {'id': table_id}
def get_columns_for_table(source_conn: connection, table_name: str, schema: str = 'public') -> List[Dict[str, Any]]:
"""Get column information for a table."""
with source_conn.cursor(cursor_factory=extras.RealDictCursor) as cur:
cur.execute("""
SELECT column_name, data_type, udt_name, is_nullable, column_default
FROM information_schema.columns
WHERE table_schema = %s AND table_name = %s
ORDER BY ordinal_position
""", (schema, table_name))
return cur.fetchall()
def migrate_table(
source_conn: connection,
target_conn: connection,
table_name: str,
schema: str,
batch_size: int,
truncate: bool,
incremental: bool = False,
since: str = None,
incremental_column: str = 'updated_at',
use_life: bool = False,
life_table: str = None,
life_id_column: str = None,
base_id_column: str = None,
operation_column: str = 'x_Operation',
datetime_column: str = 'x_DateTime'
) -> Dict[str, int]:
"""Migrate a single table from source to target."""
migrated_rows = 0
failed_rows = 0
if use_life and life_table and since:
# MIGRATION VIA LIFE TABLE
return migrate_via_life_table(
source_conn, target_conn, table_name, schema,
life_table, life_id_column, base_id_column,
operation_column, datetime_column, since, batch_size
)
# STANDARD MIGRATION (by ID or timestamp)
with source_conn.cursor(cursor_factory=extras.RealDictCursor) as source_cur:
# Build WHERE clause for incremental migration
where_clause = ""
params = []
if incremental and since:
where_clause = f" WHERE {incremental_column} > %s"
params = [since]
print(f"Incremental migration: {incremental_column} > {since}")
# Get total count
count_query = sql.SQL("SELECT COUNT(*) FROM {}.{}").format(
sql.Identifier(schema),
sql.Identifier(table_name)
)
if where_clause:
count_query += sql.SQL(where_clause)
source_cur.execute(count_query, params)
total_rows = source_cur.fetchone()['count']
print(f"Table {schema}.{table_name}: {total_rows} rows to migrate")
if total_rows == 0:
return {'migrated': 0, 'failed': 0}
# Truncate target table if requested (not for incremental)
if truncate and not incremental:
with target_conn.cursor() as target_cur:
try:
target_cur.execute(sql.SQL("TRUNCATE TABLE {}.{} CASCADE").format(
sql.Identifier(schema),
sql.Identifier(table_name)
))
target_conn.commit()
print(f"Truncated {schema}.{table_name}")
except Exception as e:
print(f"Warning: Could not truncate {schema}.{table_name}: {e}")
target_conn.rollback()
# Get columns
columns = get_columns_for_table(source_conn, table_name, schema)
column_names = [col['column_name'] for col in columns]
# Fetch and insert in batches
offset = 0
while offset < total_rows:
try:
# Fetch batch from source
select_query = sql.SQL("""
SELECT {} FROM {}.{}
""").format(
sql.SQL(', ').join([sql.Identifier(col) for col in column_names]),
sql.Identifier(schema),
sql.Identifier(table_name)
)
if where_clause:
select_query += sql.SQL(where_clause)
select_query += sql.SQL(" LIMIT %s OFFSET %s")
fetch_params = params + [batch_size, offset]
source_cur.execute(select_query, fetch_params)
rows = source_cur.fetchall()
if not rows:
break
# Insert batch into target
with target_conn.cursor() as target_cur:
placeholders = sql.SQL(', ').join([sql.SQL('%s')] * len(column_names))
insert_query = sql.SQL("INSERT INTO {}.{} ({}) VALUES ({})").format(
sql.Identifier(schema),
sql.Identifier(table_name),
sql.SQL(', ').join([sql.Identifier(col) for col in column_names]),
placeholders
)
extras.execute_batch(target_cur, insert_query, [tuple(row[col] for col in column_names) for row in rows], page_size=batch_size)
target_conn.commit()
migrated_rows += len(rows)
offset += batch_size
print(f"Migrated {migrated_rows}/{total_rows} rows")
except Exception as e:
print(f"Error migrating batch: {e}")
failed_rows += len(rows) if 'rows' in locals() else batch_size
target_conn.rollback()
break
return {'migrated': migrated_rows, 'failed': failed_rows}
def migrate_via_life_table(
source_conn,
target_conn,
table_name: str,
schema: str,
life_table: str,
life_id_column: str,
base_id_column: str,
operation_column: str,
datetime_column: str,
since: str,
batch_size: int
) -> Dict[str, int]:
"""
Migrate table using MSSQL Life table mechanism.
Life tables track all changes (insert, update, delete) with:
- x_Operation: 'i' = insert, 'u' = update, 'd' = delete
- x_DateTime: timestamp of change
- {Table}LifeID: unique ID in Life table
- {Table}ID: reference to base table ID
"""
stats = {'inserted': 0, 'updated': 0, 'deleted': 0, 'total': 0}
print(f"Life table migration: {schema}.{table_name} via {life_table}")
print(f"Changes since: {since}")
try:
# For MSSQL source, use different connection handling
is_mssql = source_conn.dsn_parameters.get('driver') == 'sqlsrv' if hasattr(source_conn, 'dsn_parameters') else False
if is_mssql:
# MSSQL-specific query with CTE for latest versions
cursor = source_conn.cursor(as_dict=True)
# Get latest version of each changed record
query = f"""
WITH LatestLife AS (
SELECT
{base_id_column},
MAX({life_id_column}) as MaxLifeID
FROM {life_table}
WHERE {datetime_column} > ?
GROUP BY {base_id_column}
)
SELECT dl.*
FROM {life_table} dl
INNER JOIN LatestLife ll
ON dl.{life_id_column} = ll.MaxLifeID
ORDER BY dl.{base_id_column}
"""
cursor.execute(query, (since,))
# Process in batches
rows = []
for row in cursor:
rows.append(dict(row))
if len(rows) >= batch_size:
process_life_batch(
rows, target_conn, table_name, schema,
base_id_column, operation_column, stats
)
rows = []
# Process remaining rows
if rows:
process_life_batch(
rows, target_conn, table_name, schema,
base_id_column, operation_column, stats
)
cursor.close()
else:
# PostgreSQL source (for testing)
with source_conn.cursor(cursor_factory=extras.RealDictCursor) as cursor:
query = sql.SQL("""
WITH LatestLife AS (
SELECT
{} as base_id,
MAX({}) as MaxLifeID
FROM {}
WHERE {} > %s
GROUP BY {}
)
SELECT dl.*
FROM {} dl
INNER JOIN LatestLife ll
ON dl.{} = ll.MaxLifeID
ORDER BY dl.{}
""").format(
sql.Identifier(base_id_column),
sql.Identifier(life_id_column),
sql.SQL(life_table),
sql.Identifier(datetime_column),
sql.Identifier(base_id_column),
sql.SQL(life_table),
sql.Identifier(life_id_column),
sql.Identifier(base_id_column)
)
cursor.execute(query, (since,))
rows = cursor.fetchall()
process_life_batch(
rows, target_conn, table_name, schema,
base_id_column, operation_column, stats
)
print(f"Life migration complete: +{stats['inserted']} inserts, ~{stats['updated']} updates, -{stats['deleted']} deletes")
except Exception as e:
print(f"Error in Life table migration: {e}")
import traceback
traceback.print_exc()
stats['error'] = str(e)
return stats
def process_life_batch(
rows: List[Dict],
target_conn,
table_name: str,
schema: str,
base_id_column: str,
operation_column: str,
stats: Dict[str, int]
):
"""Process a batch of Life table records"""
if not rows:
return
# Separate by operation type
inserts = [r for r in rows if r.get(operation_column) == 'i']
updates = [r for r in rows if r.get(operation_column) == 'u']
deletes = [r for r in rows if r.get(operation_column) == 'd']
# Exclude Life-specific fields
exclude_fields = {operation_column, 'x_DateTime', 'x_Seance', 'x_User'}
life_id_field = None
for key in rows[0].keys():
if key.endswith('LifeID'):
life_id_field = key
exclude_fields.add(key)
# Get columns to write
write_columns = [k for k in rows[0].keys() if k not in exclude_fields]
# Process inserts
if inserts:
with target_conn.cursor() as cur:
for row in inserts:
values = [row[col] for col in write_columns]
placeholders = ','.join(['%s'] * len(values))
columns_sql = ','.join([f'"{col}"' for col in write_columns])
insert_sql = f"""
INSERT INTO "{schema}"."{table_name}" ({columns_sql})
VALUES ({placeholders})
ON CONFLICT ("{base_id_column}") DO UPDATE SET
{', '.join([f'"{col}" = EXCLUDED."{col}"' for col in write_columns if col != base_id_column])}
"""
try:
cur.execute(insert_sql, values)
stats['inserted'] += 1
except Exception as e:
print(f"Error inserting: {e}")
stats['failed'] = stats.get('failed', 0) + 1
target_conn.commit()
# Process updates
if updates:
with target_conn.cursor() as cur:
for row in updates:
values = [row[col] for col in write_columns]
set_clause = ', '.join([f'"{col}" = %s' for col in write_columns if col != base_id_column])
update_sql = f"""
UPDATE "{schema}"."{table_name}"
SET {set_clause}
WHERE "{base_id_column}" = %s
"""
try:
cur.execute(update_sql, values + [row[base_id_column]])
stats['updated'] += 1
except Exception as e:
print(f"Error updating: {e}")
stats['failed'] = stats.get('failed', 0) + 1
target_conn.commit()
# Process deletes
if deletes:
with target_conn.cursor() as cur:
delete_ids = [r[base_id_column] for r in deletes]
delete_sql = f"""
DELETE FROM "{schema}"."{table_name}"
WHERE "{base_id_column}" = ANY(%s)
"""
try:
cur.execute(delete_sql, (delete_ids,))
stats['deleted'] += cur.rowcount
except Exception as e:
print(f"Error deleting: {e}")
stats['failed'] = stats.get('failed', 0) + len(delete_ids)
target_conn.commit()
stats['total'] += len(rows)
def main() -> int:
args = parse_args()
source_conn = None
target_conn = None
try:
print(f"Starting migration run {args.run_id}")
print(f"Source: {args.source_host}:{args.source_port}/{args.source_database}")
print(f"Target: {args.target_host}:{args.target_port}/{args.target_database}")
print(f"Tables: {args.tables}")
print(f"Batch size: {args.batch_size}")
print(f"Truncate before migration: {args.truncate}")
# Parse table names (format: schema.table_name)
table_names = [t.strip() for t in args.tables.split(',')]
# Connect to databases
source_conn = get_connection(
args.source_host, args.source_port, args.source_database,
args.source_username, args.source_password, args.source_driver
)
target_conn = get_connection(
args.target_host, args.target_port, args.target_database,
args.target_username, args.target_password, args.target_driver
)
print("Connected to both databases")
total_migrated = 0
total_failed = 0
for table_full_name in table_names:
try:
# Parse schema and table name
if '.' in table_full_name:
schema, table_name = table_full_name.split('.', 1)
else:
schema = 'public'
table_name = table_full_name
print(f"\nProcessing table: {schema}.{table_name}")
# Check if table exists in source
with source_conn.cursor() as cur:
cur.execute("""
SELECT EXISTS (
SELECT FROM information_schema.tables
WHERE table_schema = %s AND table_name = %s
)
""", (schema, table_name))
exists = cur.fetchone()[0]
if not exists:
print(f"Table {schema}.{table_name} does not exist in source, skipping")
continue
# Get table columns from source
with source_conn.cursor() as cur:
cur.execute("""
SELECT column_name, data_type, character_maximum_length, numeric_precision, numeric_scale
FROM information_schema.columns
WHERE table_schema = %s AND table_name = %s
ORDER BY ordinal_position
""", (schema, table_name))
columns = cur.fetchall()
if not columns:
print(f"No columns found for {schema}.{table_name}, skipping")
continue
# Check if table exists in target
target_cur = target_conn.cursor()
target_cur.execute("""
SELECT EXISTS (
SELECT FROM information_schema.tables
WHERE table_schema = %s AND table_name = %s
)
""", (schema, table_name))
target_exists = target_cur.fetchone()[0]
target_cur.close()
# Create table in target if it doesn't exist
if not target_exists:
print(f"Creating table {schema}.{table_name} in target...")
# Build CREATE TABLE statement
column_defs = []
for col in columns:
col_name, data_type, char_len, num_precision, num_scale = col
type_def = data_type
if char_len and data_type in ('character varying', 'character', 'text'):
type_def = f"{data_type}({char_len})"
elif num_precision and data_type in ('numeric', 'decimal'):
if num_scale:
type_def = f"{data_type}({num_precision},{num_scale})"
else:
type_def = f"{data_type}({num_precision})"
column_defs.append(f'"{col_name}" {type_def}')
create_sql = f'CREATE TABLE "{schema}"."{table_name}" ({", ".join(column_defs)})'
target_cur = target_conn.cursor()
target_cur.execute(create_sql)
target_conn.commit()
target_cur.close()
print(f"Table {schema}.{table_name} created in target")
else:
print(f"Table {schema}.{table_name} already exists in target")
# Migrate the data
result = migrate_table(
source_conn, target_conn,
table_name, schema,
args.batch_size, args.truncate,
incremental=args.incremental,
since=args.since,
incremental_column=args.incremental_column,
use_life=args.use_life,
life_table=args.life_table,
life_id_column=args.life_id_column,
base_id_column=args.base_id_column,
operation_column=args.operation_column,
datetime_column=args.datetime_column
)
total_migrated += result['migrated']
total_failed += result['failed']
except Exception as e:
print(f"Error migrating table {table_full_name}: {e}")
total_failed += 1
print(f"\nMigration complete!")
print(f"Migrated {total_migrated} rows")
print(f"Failed {total_failed} rows")
return 0 if total_failed == 0 else 1
except Exception as e:
print(f"Migration failed: {e}")
import traceback
traceback.print_exc()
return 1
finally:
if source_conn:
source_conn.close()
if target_conn:
target_conn.close()
if __name__ == '__main__':
sys.exit(main())