Files
project-replica/app/Jobs/RunMigrationJob.php
2026-03-23 00:51:38 +09:00

256 lines
8.4 KiB
PHP

<?php
namespace App\Jobs;
use App\Models\MigrationRun;
use App\Models\MigrationSchedule;
use App\Models\SourceDatabase;
use App\Models\TargetDatabase;
use Illuminate\Bus\Queueable;
use Illuminate\Contracts\Queue\ShouldQueue;
use Illuminate\Foundation\Bus\Dispatchable;
use Illuminate\Queue\InteractsWithQueue;
use Illuminate\Queue\SerializesModels;
use Illuminate\Support\Facades\Log;
use Symfony\Component\Process\Process;
use Symfony\Component\Process\Exception\ProcessFailedException;
class RunMigrationJob implements ShouldQueue
{
use Dispatchable, InteractsWithQueue, Queueable, SerializesModels;
public int $timeout = 3600; // 1 hour timeout
public int $tries = 1;
public int $backoff = 60; // Wait 60 seconds between retries
/**
* Create a new job instance.
*/
public function __construct(
public MigrationRun $migrationRun
) {}
/**
* Execute the job.
*/
public function handle(): void
{
$this->executeMigration();
}
/**
* Execute the migration logic
*/
private function executeMigration(): void
{
$run = $this->migrationRun;
$schedule = $run->schedule;
if (!$schedule) {
$run->update([
'status' => 'failed',
'error_message' => 'Schedule not found',
'completed_at' => now(),
]);
return;
}
// Check if this is an incremental migration
$isIncremental = $schedule->python_script_args['incremental'] ?? false;
// Update schedule last run
$schedule->update([
'last_run_at' => now(),
]);
// Start the migration run
$run->update([
'status' => 'running',
'started_at' => now(),
'total_tables' => count($schedule->tables),
]);
$logs = [];
$migratedRows = 0;
$failedRows = 0;
try {
$sourceDb = $schedule->sourceDatabase;
$targetDb = $schedule->targetDatabase;
if (!$sourceDb || !$targetDb) {
throw new \Exception('Source or target database not found');
}
// Get tables to migrate with their names
$tableIds = is_array($schedule->tables) ? $schedule->tables : json_decode($schedule->tables, true);
// Load table models to get schema and table names
$tables = \App\Models\Table::with('sourceDatabase')
->whereIn('id', $tableIds)
->get();
// Build table names as schema.table_name
$tableNames = $tables->map(fn($t) => "{$t->schema_name}.{$t->table_name}")->toArray();
// Prepare Python script arguments
$pythonScript = $schedule->python_script_path ?? base_path('scripts/migrate_data.py');
// Build command
$command = [
'python3',
$pythonScript,
'--source-host', $sourceDb->host,
'--source-port', (string) $sourceDb->port,
'--source-database', $sourceDb->database,
'--source-username', $sourceDb->username,
'--source-password', $sourceDb->password,
'--source-driver', $sourceDb->driver,
'--target-host', $targetDb->host,
'--target-port', (string) $targetDb->port,
'--target-database', $targetDb->database,
'--target-username', $targetDb->username,
'--target-password', $targetDb->password,
'--target-driver', $targetDb->driver ?? 'pgsql',
'--tables', implode(',', $tableNames),
'--batch-size', (string) $schedule->batch_size,
'--run-id', (string) $run->id,
];
// Add incremental migration flags
if ($isIncremental && $schedule->last_run_at) {
$command[] = '--incremental';
$command[] = '--since';
$command[] = $schedule->last_run_at->toIso8601String();
}
if ($schedule->truncate_before_migration) {
$command[] = '--truncate';
}
if (!$schedule->create_indexes_after) {
$command[] = '--skip-indexes';
}
// Add Life table parameters
if ($schedule->use_life_table) {
$command[] = '--use-life';
$command[] = '--life-table';
$command[] = $schedule->life_table_name ?? '';
$command[] = '--life-id-column';
$command[] = $schedule->life_id_column ?? '';
$command[] = '--base-id-column';
$command[] = $schedule->base_id_column ?? '';
$command[] = '--operation-column';
$command[] = $schedule->operation_column ?? 'x_Operation';
$command[] = '--datetime-column';
$command[] = $schedule->datetime_column ?? 'x_DateTime';
}
// Log the command (without passwords)
$logCommand = $command;
foreach ($logCommand as $i => $arg) {
if ($arg === '--source-password' || $arg === '--target-password') {
$logCommand[$i + 1] = '***';
}
}
$logs[] = 'Executing: ' . implode(' ', $logCommand);
$logs[] = 'Mode: ' . ($isIncremental ? 'Incremental' : 'Full');
$run->update(['logs' => $logs]);
// Execute Python script
$process = new Process($command);
$process->setTimeout($this->timeout);
$process->setIdleTimeout(300); // 5 minutes idle timeout
$process->run(function ($type, $buffer) use ($run, &$logs, &$migratedRows, &$failedRows) {
$logs[] = $buffer;
// Parse output for progress
if (preg_match('/Migrated (\d+) rows/', $buffer, $matches)) {
$migratedRows = (int) $matches[1];
}
if (preg_match('/Failed (\d+) rows/', $buffer, $matches)) {
$failedRows = (int) $matches[1];
}
// Update logs periodically
$run->update(['logs' => $logs]);
});
if (!$process->isSuccessful()) {
throw new ProcessFailedException($process);
}
// Mark as completed
$run->update([
'status' => 'completed',
'completed_at' => now(),
'migrated_rows' => $migratedRows,
'failed_rows' => $failedRows,
'processed_tables' => count($tableIds),
'logs' => $logs,
]);
// Update schedule - successful migration
$schedule->update([
'last_successful_migration_at' => now(),
'next_run_at' => $this->calculateNextRun($schedule),
]);
} catch (\Exception $e) {
$logs[] = 'ERROR: ' . $e->getMessage();
$run->update([
'status' => 'failed',
'error_message' => $e->getMessage(),
'completed_at' => now(),
'logs' => $logs,
'failed_rows' => $failedRows,
]);
Log::error('Migration job failed', [
'run_id' => $run->id,
'schedule_id' => $schedule?->id,
'error' => $e->getMessage(),
]);
}
}
/**
* Calculate next run time based on cron expression
*/
private function calculateNextRun(MigrationSchedule $schedule): ?\DateTime
{
if (!$schedule->is_active || !$schedule->cron_expression) {
return null;
}
try {
$cron = new \Cron\CronExpression($schedule->cron_expression);
return $cron->getNextRunDate(new \DateTime(), 0, true);
} catch (\Exception $e) {
return null;
}
}
/**
* Handle a job failure.
*/
public function failed(\Throwable $exception): void
{
$this->migrationRun->update([
'status' => 'failed',
'error_message' => $exception->getMessage(),
'completed_at' => now(),
]);
Log::error('Migration job failed', [
'run_id' => $this->migrationRun->id,
'error' => $exception->getMessage(),
]);
}
}