argument('table'); $this->converter = new HfsqlDataConverter(); $startTime = microtime(true); $this->ensureTempDirectory(); $this->optimizePostgresForBulkImport(); try { $totalRecords = $this->calculateRecordsToProcess($tableName); if ($totalRecords === 0) { $this->info("No records found. Finishing."); return CommandAlias::SUCCESS; } $result = $this->importData($tableName, $totalRecords); $this->restorePostgresSettings(); $this->displayDetailedDuration($startTime, $totalRecords); return $result; } catch (\Exception $e) { $this->error("Critical error importing table {$tableName}: " . $e->getMessage()); $this->restorePostgresSettings(); $this->enablePostgresConstraints(); return CommandAlias::FAILURE; } } protected function importData(string $tableName, int $totalRecords): int { if ($totalRecords >= self::RECORDS_THRESHOLD_FOR_COPY) { $this->warn("Mode: COPY FROM MULTIPLE CSV FILES (Max " . self::MAX_RECORDS_PER_FILE . " per file)"); return $this->runCopyFromCsv($tableName, $totalRecords); } $this->warn("Mode: CHUNKED UPSERT"); return $this->runChunkedUpsert($tableName, $totalRecords); } protected function runCopyFromCsv(string $tableName, int $totalRecords): int { $this->disablePostgresConstraints(); $context = $this->initializeCopyContext($tableName); $progressBar = $this->output->createProgressBar($totalRecords); $progressBar->start(); $pk = strtoupper($context['pkColumn']); $lastPk = null; $recordsFetched = 0; $fetchStart = microtime(true); while ($recordsFetched < $totalRecords) { $remaining = $totalRecords - $recordsFetched; $chunkSize = min(self::CHUNK_SIZE, $remaining); $query = DB::connection('hfsql') ->table($tableName) ->select($context['columns']) ->orderBy($pk) ->limit($chunkSize); if ($lastPk !== null) { $query->where($pk, '>', $lastPk); } $records = $query->get(); if ($records->isEmpty()) { break; } $chunkFetchTime = microtime(true) - $fetchStart; $this->totalFetchTime += $chunkFetchTime; $this->processChunkForCopy($records, $context, $tableName); $progressBar->advance(count($records)); $recordsFetched += count($records); $lastPk = $records->last()->{$context['pkColumn']}; if (gc_enabled()) { gc_collect_cycles(); } $fetchStart = microtime(true); } $progressBar->finish(); $this->line(''); $this->copyAndCleanupFile($context, $tableName); $this->enablePostgresConstraints(); $this->resetPostgresSequences($tableName); $this->info("Import by COPY completed ({$context['totalImported']} records)."); return Command::SUCCESS; } protected function processChunkForCopy(iterable $records, array &$context, string $tableName): void { $skipConversion = $this->option('no-convert'); $batch = []; foreach ($records as $record) { // Batch conversion to reduce overhead $batch[] = $record; // Process in micro-batches if (count($batch) >= 1000) { $this->writeBatchToCsv($batch, $context, $tableName, $skipConversion); $batch = []; } } // Write remaining records if (!empty($batch)) { $this->writeBatchToCsv($batch, $context, $tableName, $skipConversion); } } protected function writeBatchToCsv(array $batch, array &$context, string $tableName, bool $skipConversion): void { foreach ($batch as $record) { // Check if we need a new file if ($context['fileHandle'] === null || $context['recordsInFile'] >= self::MAX_RECORDS_PER_FILE) { $this->copyAndCleanupFile($context, $tableName); $this->openNewCsvFile($context, $tableName); } // START/END Measurement: Conversion $startConvert = microtime(true); // Optimized conversion if ($skipConversion) { // Fast mode: no conversion $dataArray = array_values((array) $record); } else { // Normal mode: with conversion $dataArray = $this->converter->convertToPostgres( $tableName, (array) $record, $context['columns'] ); } $this->totalConvertTime += (microtime(true) - $startConvert); // START/END Measurement: CSV Writing $startCsvWrite = microtime(true); fputcsv($context['fileHandle'], $dataArray, "\t", '"', "\\"); $this->totalCsvWriteTime += (microtime(true) - $startCsvWrite); $context['recordsInFile']++; $context['totalImported']++; } } protected function copyAndCleanupFile(array &$context, string $tableName): void { if (!$context['fileHandle'] || !$context['filePath']) { return; } fclose($context['fileHandle']); // START/END Measurement: COPY $startCopy = microtime(true); // Execute COPY FROM with optimized options - Preservar case con comillas dobles $pgPath = str_replace('\\', '/', $context['filePath']); $sql = "COPY \"{$tableName}\" ({$context['columnList']}) FROM '{$pgPath}' WITH (FORMAT csv, DELIMITER E'\t', ENCODING 'UTF-8', QUOTE '\"', ESCAPE '\\')"; DB::statement($sql); $copyDuration = microtime(true) - $startCopy; $this->totalCopyTime += $copyDuration; // Accumulate here // (Previous command line output about COPY duration removed as requested) unlink($context['filePath']); // Reset file context $context['fileHandle'] = null; $context['filePath'] = null; $context['recordsInFile'] = 0; } protected function openNewCsvFile(array &$context, string $tableName): void { $csvFileName = sprintf( '%s_part_%d_%d.csv', $tableName, $context['fileCounter'], time() ); $context['filePath'] = $this->baseTempDir . DIRECTORY_SEPARATOR . $csvFileName; $context['fileHandle'] = fopen($context['filePath'], 'w'); $context['fileCounter']++; } protected function runChunkedUpsert(string $tableName, int $totalRecords): int { $this->disablePostgresConstraints(); $pkColumn = $this->getPkColumn($tableName); $allColumns = $this->getTableColumns($tableName); $updateColumns = array_diff($allColumns, [$pkColumn]); $totalImported = 0; DB::connection('hfsql') ->table($tableName) ->orderBy(strtoupper($pkColumn)) ->limit($totalRecords) ->chunk(self::CHUNK_SIZE, function ($records) use ( $tableName, &$totalImported, $pkColumn, $updateColumns, $allColumns ) { $recordsToInsert = $this->prepareRecordsForUpsert($records, $tableName, $allColumns); DB::table($tableName)->upsert( $recordsToInsert, [$pkColumn], $updateColumns ); $totalImported += count($recordsToInsert); }); $this->enablePostgresConstraints(); $this->resetPostgresSequences($tableName); $this->info("Import by UPSERT completed ({$totalImported} records)."); return Command::SUCCESS; } protected function prepareRecordsForUpsert(iterable $records, string $tableName, array $columns): array { $result = []; foreach ($records as $record) { $dataArray = $this->converter->convertToPostgres($tableName, (array) $record, $columns); $result[] = array_combine($columns, $dataArray); } return $result; } /** * Obtiene las columnas de la tabla preservando el case original */ protected function getTableColumns(string $tableName): array { $query = " SELECT column_name FROM information_schema.columns WHERE table_name = ? ORDER BY ordinal_position "; $columns = DB::select($query, [$tableName]); return array_map(function($col) { return $col->column_name; }, $columns); } protected function initializeCopyContext(string $tableName): array { $pkColumn = $this->getPkColumn($tableName); $columns = $this->getTableColumns($tableName); $columnList = implode(', ', array_map(fn($c) => "\"{$c}\"", $columns)); return [ 'pkColumn' => $pkColumn, 'columns' => $columns, 'columnList' => $columnList, 'fileHandle' => null, 'filePath' => null, 'fileCounter' => 1, 'recordsInFile' => 0, 'totalImported' => 0, ]; } protected function calculateRecordsToProcess(string $tableName): int { $realTotal = DB::connection('hfsql')->table($tableName)->count(); if ($realTotal === 0) { return 0; } if (App::environment('local') && self::MAX_RECORDS_FOR_LOCAL_TEST !== null && $realTotal > self::MAX_RECORDS_FOR_LOCAL_TEST ) { $this->warn("Testing mode: Processing {$realTotal} → " . self::MAX_RECORDS_FOR_LOCAL_TEST . " records."); return self::MAX_RECORDS_FOR_LOCAL_TEST; } return $realTotal; } protected function ensureTempDirectory(): void { if (!File::isDirectory($this->baseTempDir)) { File::makeDirectory($this->baseTempDir, 0777, true); } } // DETAILED FUNCTION TO DISPLAY METRICS protected function displayDetailedDuration(float $startTime, int $totalRecords): void { $totalDuration = microtime(true) - $startTime; $recordsPerSecond = round($totalRecords / $totalDuration); $this->info("\n--- IMPORT METRICS (Total: {$totalRecords} records) ---"); $fetchPercent = round(($this->totalFetchTime / $totalDuration) * 100, 2); $convertPercent = round(($this->totalConvertTime / $totalDuration) * 100, 2); $csvPercent = round(($this->totalCsvWriteTime / $totalDuration) * 100, 2); $copyPercent = round(($this->totalCopyTime / $totalDuration) * 100, 2); $otherTime = $totalDuration - ($this->totalFetchTime + $this->totalConvertTime + $this->totalCsvWriteTime + $this->totalCopyTime); $otherPercent = round(($otherTime / $totalDuration) * 100, 2); $metrics = [ 'HFSQL Data Fetch Time' => [$this->totalFetchTime, $fetchPercent], 'Conversion Time (to Postg. Data)' => [$this->totalConvertTime, $convertPercent], 'CSV Write Time (fputcsv)' => [$this->totalCsvWriteTime, $csvPercent], 'COPY Execution Time (Postgres)' => [$this->totalCopyTime, $copyPercent], 'Other (Overhead, I/O, Init, etc.)' => [$otherTime, $otherPercent], ]; $this->table( ['Phase', 'Time (s)', 'Percentage (%)'], array_map(fn($name, $data) => [ $name, number_format($data[0], 4), number_format($data[1], 2) ], array_keys($metrics), $metrics) ); $this->info("\nProcess finished in **" . number_format($totalDuration, 3) . "s** ({$recordsPerSecond} records/second)."); } // PostgreSQL optimizations for bulk import protected function optimizePostgresForBulkImport(): void { try { // Parameters that CAN be changed at runtime DB::statement("SET maintenance_work_mem = '512MB'"); DB::statement("SET work_mem = '256MB'"); DB::statement("SET synchronous_commit = OFF"); DB::statement("SET commit_delay = 100000"); DB::statement("SET temp_buffers = '256MB'"); $this->info("✓ PostgreSQL optimized for bulk import."); } catch (\Exception $e) { $this->warn("Could not set all optimization parameters: " . $e->getMessage()); } } protected function restorePostgresSettings(): void { try { DB::statement("SET synchronous_commit = ON"); DB::statement("SET maintenance_work_mem = DEFAULT"); DB::statement("SET work_mem = DEFAULT"); DB::statement("SET commit_delay = DEFAULT"); DB::statement("SET temp_buffers = DEFAULT"); } catch (\Exception $e) { // Silent fail on restore } } protected function getPkColumn(string $tableName): string { $serialColumns = $this->getSerialColumns($tableName); return empty($serialColumns) ? 'id' . $tableName // Mantener case original, sin strtolower : $serialColumns[0]['column']; } protected function disablePostgresConstraints(): void { DB::statement("SET session_replication_role = 'replica'"); } protected function enablePostgresConstraints(): void { DB::statement("SET session_replication_role = 'origin'"); } protected function resetPostgresSequences(string $tableName): void { $serialColumns = $this->getSerialColumns($tableName); if (empty($serialColumns)) { $this->warn("No serial columns found for {$tableName}. Sequence reset skipped."); return; } foreach ($serialColumns as $columnInfo) { $columnName = $columnInfo['column']; $sequenceName = $columnInfo['sequence']; // Usar comillas dobles para preservar case DB::statement( "SELECT setval(:seq, COALESCE((SELECT MAX(\"{$columnName}\") FROM \"{$tableName}\"), 1))", ['seq' => $sequenceName] ); } } protected function getSerialColumns(string $tableName): array { $quotedTableName = "\"{$tableName}\""; $sql = " SELECT a.attname AS column_name, pg_get_serial_sequence(:quoted_table, a.attname) AS sequence_name FROM pg_attribute a JOIN pg_class c ON c.oid = a.attrelid WHERE c.relname = :table_name AND a.attnum > 0 AND NOT a.attisdropped AND ( pg_get_serial_sequence(:quoted_table, a.attname) IS NOT NULL OR a.attidentity = 'a' ) "; $results = DB::select($sql, [ 'quoted_table' => $quotedTableName, 'table_name' => $tableName // Sin strtolower ]); return array_map(fn($r) => [ 'column' => $r->column_name, 'sequence' => $r->sequence_name, ], array_filter($results, fn($r) => $r->sequence_name)); } }