executeMigration(); } /** * Execute the migration logic */ private function executeMigration(): void { $run = $this->migrationRun; $schedule = $run->schedule; if (!$schedule) { $run->update([ 'status' => 'failed', 'error_message' => 'Schedule not found', 'completed_at' => now(), ]); return; } // Check if this is an incremental migration $isIncremental = $schedule->python_script_args['incremental'] ?? false; // Update schedule last run $schedule->update([ 'last_run_at' => now(), ]); // Start the migration run $run->update([ 'status' => 'running', 'started_at' => now(), 'total_tables' => count($schedule->tables), ]); $logs = []; $migratedRows = 0; $failedRows = 0; try { $sourceDb = $schedule->sourceDatabase; $targetDb = $schedule->targetDatabase; if (!$sourceDb || !$targetDb) { throw new \Exception('Source or target database not found'); } // Get tables to migrate with their names $tableIds = is_array($schedule->tables) ? $schedule->tables : json_decode($schedule->tables, true); // Load table models to get schema and table names $tables = \App\Models\Table::with('sourceDatabase') ->whereIn('id', $tableIds) ->get(); // Build table names as schema.table_name $tableNames = $tables->map(fn($t) => "{$t->schema_name}.{$t->table_name}")->toArray(); // Prepare Python script arguments $pythonScript = $schedule->python_script_path ?? base_path('scripts/migrate_data.py'); // Build command $command = [ 'python3', $pythonScript, '--source-host', $sourceDb->host, '--source-port', (string) $sourceDb->port, '--source-database', $sourceDb->database, '--source-username', $sourceDb->username, '--source-password', $sourceDb->password, '--source-driver', $sourceDb->driver, '--target-host', $targetDb->host, '--target-port', (string) $targetDb->port, '--target-database', $targetDb->database, '--target-username', $targetDb->username, '--target-password', $targetDb->password, '--target-driver', $targetDb->driver ?? 'pgsql', '--tables', implode(',', $tableNames), '--batch-size', (string) $schedule->batch_size, '--run-id', (string) $run->id, ]; // Add incremental migration flags if ($isIncremental && $schedule->last_run_at) { $command[] = '--incremental'; $command[] = '--since'; $command[] = $schedule->last_run_at->toIso8601String(); } if ($schedule->truncate_before_migration) { $command[] = '--truncate'; } if (!$schedule->create_indexes_after) { $command[] = '--skip-indexes'; } // Add Life table parameters if ($schedule->use_life_table) { $command[] = '--use-life'; $command[] = '--life-table'; $command[] = $schedule->life_table_name ?? ''; $command[] = '--life-id-column'; $command[] = $schedule->life_id_column ?? ''; $command[] = '--base-id-column'; $command[] = $schedule->base_id_column ?? ''; $command[] = '--operation-column'; $command[] = $schedule->operation_column ?? 'x_Operation'; $command[] = '--datetime-column'; $command[] = $schedule->datetime_column ?? 'x_DateTime'; } // Log the command (without passwords) $logCommand = $command; foreach ($logCommand as $i => $arg) { if ($arg === '--source-password' || $arg === '--target-password') { $logCommand[$i + 1] = '***'; } } $logs[] = 'Executing: ' . implode(' ', $logCommand); $logs[] = 'Mode: ' . ($isIncremental ? 'Incremental' : 'Full'); $run->update(['logs' => $logs]); // Execute Python script $process = new Process($command); $process->setTimeout($this->timeout); $process->setIdleTimeout(300); // 5 minutes idle timeout $process->run(function ($type, $buffer) use ($run, &$logs, &$migratedRows, &$failedRows) { $logs[] = $buffer; // Parse output for progress if (preg_match('/Migrated (\d+) rows/', $buffer, $matches)) { $migratedRows = (int) $matches[1]; } if (preg_match('/Failed (\d+) rows/', $buffer, $matches)) { $failedRows = (int) $matches[1]; } // Update logs periodically $run->update(['logs' => $logs]); }); if (!$process->isSuccessful()) { throw new ProcessFailedException($process); } // Mark as completed $run->update([ 'status' => 'completed', 'completed_at' => now(), 'migrated_rows' => $migratedRows, 'failed_rows' => $failedRows, 'processed_tables' => count($tableIds), 'logs' => $logs, ]); // Update schedule - successful migration $schedule->update([ 'last_successful_migration_at' => now(), 'next_run_at' => $this->calculateNextRun($schedule), ]); } catch (\Exception $e) { $logs[] = 'ERROR: ' . $e->getMessage(); $run->update([ 'status' => 'failed', 'error_message' => $e->getMessage(), 'completed_at' => now(), 'logs' => $logs, 'failed_rows' => $failedRows, ]); Log::error('Migration job failed', [ 'run_id' => $run->id, 'schedule_id' => $schedule?->id, 'error' => $e->getMessage(), ]); } } /** * Calculate next run time based on cron expression */ private function calculateNextRun(MigrationSchedule $schedule): ?\DateTime { if (!$schedule->is_active || !$schedule->cron_expression) { return null; } try { $cron = new \Cron\CronExpression($schedule->cron_expression); return $cron->getNextRunDate(new \DateTime(), 0, true); } catch (\Exception $e) { return null; } } /** * Handle a job failure. */ public function failed(\Throwable $exception): void { $this->migrationRun->update([ 'status' => 'failed', 'error_message' => $exception->getMessage(), 'completed_at' => now(), ]); Log::error('Migration job failed', [ 'run_id' => $this->migrationRun->id, 'error' => $exception->getMessage(), ]); } }