Files
postgresql-migration-laravel/app/Console/Commands/ImportHfsqlDataCommand.php
2026-01-07 17:48:40 +01:00

495 lines
17 KiB
PHP

<?php
namespace App\Console\Commands;
use Illuminate\Console\Command;
use Illuminate\Support\Facades\DB;
use Illuminate\Support\Facades\App;
use Illuminate\Support\Facades\File;
use App\Services\HfsqlDataConverter;
use Symfony\Component\Console\Command\Command as CommandAlias;
class ImportHfsqlDataCommand extends Command
{
const int RECORDS_THRESHOLD_FOR_COPY = 10000;
const int MAX_RECORDS_FOR_LOCAL_TEST = 1000000;
const int MAX_RECORDS_PER_FILE = 2000000;
const int CHUNK_SIZE = 200000; // Increased to reduce query overhead
// Command Properties
protected $signature = 'db:import-hfsql
{table : The name of the table to migrate}
{--no-convert : Skip data conversion for faster import}';
protected $description = 'Imports HFSQL data using COPY for large tables and UPSERT for small ones.';
protected string $baseTempDir = 'C:\Temp\pg_imports';
protected HfsqlDataConverter $converter;
// METRIC ACCUMULATION PROPERTIES
protected float $totalFetchTime = 0.0;
protected float $totalConvertTime = 0.0;
protected float $totalCsvWriteTime = 0.0;
protected float $totalCopyTime = 0.0;
public function handle(): int
{
ini_set('memory_limit', '2048M'); // No memory limit
set_time_limit(0); // No time limit
$tableName = $this->argument('table');
$this->converter = new HfsqlDataConverter();
$startTime = microtime(true);
$this->ensureTempDirectory();
$this->optimizePostgresForBulkImport();
try {
$totalRecords = $this->calculateRecordsToProcess($tableName);
if ($totalRecords === 0) {
$this->info("No records found. Finishing.");
return CommandAlias::SUCCESS;
}
$result = $this->importData($tableName, $totalRecords);
$this->restorePostgresSettings();
$this->displayDetailedDuration($startTime, $totalRecords);
return $result;
} catch (\Exception $e) {
$this->error("Critical error importing table {$tableName}: " . $e->getMessage());
$this->restorePostgresSettings();
$this->enablePostgresConstraints();
return CommandAlias::FAILURE;
}
}
protected function importData(string $tableName, int $totalRecords): int
{
if ($totalRecords >= self::RECORDS_THRESHOLD_FOR_COPY) {
$this->warn("Mode: COPY FROM MULTIPLE CSV FILES (Max " . self::MAX_RECORDS_PER_FILE . " per file)");
return $this->runCopyFromCsv($tableName, $totalRecords);
}
$this->warn("Mode: CHUNKED UPSERT");
return $this->runChunkedUpsert($tableName, $totalRecords);
}
protected function runCopyFromCsv(string $tableName, int $totalRecords): int
{
$this->disablePostgresConstraints();
$context = $this->initializeCopyContext($tableName);
$progressBar = $this->output->createProgressBar($totalRecords);
$progressBar->start();
$pk = strtoupper($context['pkColumn']);
$lastPk = null;
$recordsFetched = 0;
$fetchStart = microtime(true);
while ($recordsFetched < $totalRecords) {
$remaining = $totalRecords - $recordsFetched;
$chunkSize = min(self::CHUNK_SIZE, $remaining);
$query = DB::connection('hfsql')
->table($tableName)
->select($context['columns'])
->orderBy($pk)
->limit($chunkSize);
if ($lastPk !== null) {
$query->where($pk, '>', $lastPk);
}
$records = $query->get();
if ($records->isEmpty()) {
break;
}
$chunkFetchTime = microtime(true) - $fetchStart;
$this->totalFetchTime += $chunkFetchTime;
$this->processChunkForCopy($records, $context, $tableName);
$progressBar->advance(count($records));
$recordsFetched += count($records);
$lastPk = $records->last()->{$context['pkColumn']};
if (gc_enabled()) {
gc_collect_cycles();
}
$fetchStart = microtime(true);
}
$progressBar->finish();
$this->line('');
$this->copyAndCleanupFile($context, $tableName);
$this->enablePostgresConstraints();
$this->resetPostgresSequences($tableName);
$this->info("Import by COPY completed ({$context['totalImported']} records).");
return Command::SUCCESS;
}
protected function processChunkForCopy(iterable $records, array &$context, string $tableName): void
{
$skipConversion = $this->option('no-convert');
$batch = [];
foreach ($records as $record) {
// Batch conversion to reduce overhead
$batch[] = $record;
// Process in micro-batches
if (count($batch) >= 1000) {
$this->writeBatchToCsv($batch, $context, $tableName, $skipConversion);
$batch = [];
}
}
// Write remaining records
if (!empty($batch)) {
$this->writeBatchToCsv($batch, $context, $tableName, $skipConversion);
}
}
protected function writeBatchToCsv(array $batch, array &$context, string $tableName, bool $skipConversion): void
{
foreach ($batch as $record) {
// Check if we need a new file
if ($context['fileHandle'] === null || $context['recordsInFile'] >= self::MAX_RECORDS_PER_FILE) {
$this->copyAndCleanupFile($context, $tableName);
$this->openNewCsvFile($context, $tableName);
}
// START/END Measurement: Conversion
$startConvert = microtime(true);
// Optimized conversion
if ($skipConversion) {
// Fast mode: no conversion
$dataArray = array_values((array) $record);
} else {
// Normal mode: with conversion
$dataArray = $this->converter->convertToPostgres(
$tableName,
(array) $record,
$context['columns']
);
}
$this->totalConvertTime += (microtime(true) - $startConvert);
// START/END Measurement: CSV Writing
$startCsvWrite = microtime(true);
fputcsv($context['fileHandle'], $dataArray, "\t", '"', "\\");
$this->totalCsvWriteTime += (microtime(true) - $startCsvWrite);
$context['recordsInFile']++;
$context['totalImported']++;
}
}
protected function copyAndCleanupFile(array &$context, string $tableName): void
{
if (!$context['fileHandle'] || !$context['filePath']) {
return;
}
fclose($context['fileHandle']);
// START/END Measurement: COPY
$startCopy = microtime(true);
// Execute COPY FROM with optimized options - Preservar case con comillas dobles
$pgPath = str_replace('\\', '/', $context['filePath']);
$sql = "COPY \"{$tableName}\" ({$context['columnList']})
FROM '{$pgPath}'
WITH (FORMAT csv, DELIMITER E'\t', ENCODING 'UTF-8', QUOTE '\"', ESCAPE '\\')";
DB::statement($sql);
$copyDuration = microtime(true) - $startCopy;
$this->totalCopyTime += $copyDuration; // Accumulate here
// (Previous command line output about COPY duration removed as requested)
unlink($context['filePath']);
// Reset file context
$context['fileHandle'] = null;
$context['filePath'] = null;
$context['recordsInFile'] = 0;
}
protected function openNewCsvFile(array &$context, string $tableName): void
{
$csvFileName = sprintf(
'%s_part_%d_%d.csv',
$tableName,
$context['fileCounter'],
time()
);
$context['filePath'] = $this->baseTempDir . DIRECTORY_SEPARATOR . $csvFileName;
$context['fileHandle'] = fopen($context['filePath'], 'w');
$context['fileCounter']++;
}
protected function runChunkedUpsert(string $tableName, int $totalRecords): int
{
$this->disablePostgresConstraints();
$pkColumn = $this->getPkColumn($tableName);
$allColumns = $this->getTableColumns($tableName);
$updateColumns = array_diff($allColumns, [$pkColumn]);
$totalImported = 0;
DB::connection('hfsql')
->table($tableName)
->orderBy(strtoupper($pkColumn))
->limit($totalRecords)
->chunk(self::CHUNK_SIZE, function ($records) use (
$tableName,
&$totalImported,
$pkColumn,
$updateColumns,
$allColumns
) {
$recordsToInsert = $this->prepareRecordsForUpsert($records, $tableName, $allColumns);
DB::table($tableName)->upsert(
$recordsToInsert,
[$pkColumn],
$updateColumns
);
$totalImported += count($recordsToInsert);
});
$this->enablePostgresConstraints();
$this->resetPostgresSequences($tableName);
$this->info("Import by UPSERT completed ({$totalImported} records).");
return Command::SUCCESS;
}
protected function prepareRecordsForUpsert(iterable $records, string $tableName, array $columns): array
{
$result = [];
foreach ($records as $record) {
$dataArray = $this->converter->convertToPostgres($tableName, (array) $record, $columns);
$result[] = array_combine($columns, $dataArray);
}
return $result;
}
/**
* Obtiene las columnas de la tabla preservando el case original
*/
protected function getTableColumns(string $tableName): array
{
$query = "
SELECT column_name
FROM information_schema.columns
WHERE table_name = ?
ORDER BY ordinal_position
";
$columns = DB::select($query, [$tableName]);
return array_map(function($col) {
return $col->column_name;
}, $columns);
}
protected function initializeCopyContext(string $tableName): array
{
$pkColumn = $this->getPkColumn($tableName);
$columns = $this->getTableColumns($tableName);
$columnList = implode(', ', array_map(fn($c) => "\"{$c}\"", $columns));
return [
'pkColumn' => $pkColumn,
'columns' => $columns,
'columnList' => $columnList,
'fileHandle' => null,
'filePath' => null,
'fileCounter' => 1,
'recordsInFile' => 0,
'totalImported' => 0,
];
}
protected function calculateRecordsToProcess(string $tableName): int
{
$realTotal = DB::connection('hfsql')->table($tableName)->count();
if ($realTotal === 0) {
return 0;
}
if (App::environment('local')
&& self::MAX_RECORDS_FOR_LOCAL_TEST !== null
&& $realTotal > self::MAX_RECORDS_FOR_LOCAL_TEST
) {
$this->warn("Testing mode: Processing {$realTotal}" . self::MAX_RECORDS_FOR_LOCAL_TEST . " records.");
return self::MAX_RECORDS_FOR_LOCAL_TEST;
}
return $realTotal;
}
protected function ensureTempDirectory(): void
{
if (!File::isDirectory($this->baseTempDir)) {
File::makeDirectory($this->baseTempDir, 0777, true);
}
}
// DETAILED FUNCTION TO DISPLAY METRICS
protected function displayDetailedDuration(float $startTime, int $totalRecords): void
{
$totalDuration = microtime(true) - $startTime;
$recordsPerSecond = round($totalRecords / $totalDuration);
$this->info("\n--- IMPORT METRICS (Total: {$totalRecords} records) ---");
$fetchPercent = round(($this->totalFetchTime / $totalDuration) * 100, 2);
$convertPercent = round(($this->totalConvertTime / $totalDuration) * 100, 2);
$csvPercent = round(($this->totalCsvWriteTime / $totalDuration) * 100, 2);
$copyPercent = round(($this->totalCopyTime / $totalDuration) * 100, 2);
$otherTime = $totalDuration - ($this->totalFetchTime + $this->totalConvertTime + $this->totalCsvWriteTime + $this->totalCopyTime);
$otherPercent = round(($otherTime / $totalDuration) * 100, 2);
$metrics = [
'HFSQL Data Fetch Time' => [$this->totalFetchTime, $fetchPercent],
'Conversion Time (to Postg. Data)' => [$this->totalConvertTime, $convertPercent],
'CSV Write Time (fputcsv)' => [$this->totalCsvWriteTime, $csvPercent],
'COPY Execution Time (Postgres)' => [$this->totalCopyTime, $copyPercent],
'Other (Overhead, I/O, Init, etc.)' => [$otherTime, $otherPercent],
];
$this->table(
['Phase', 'Time (s)', 'Percentage (%)'],
array_map(fn($name, $data) => [
$name,
number_format($data[0], 4),
number_format($data[1], 2)
], array_keys($metrics), $metrics)
);
$this->info("\nProcess finished in **" . number_format($totalDuration, 3) . "s** ({$recordsPerSecond} records/second).");
}
// PostgreSQL optimizations for bulk import
protected function optimizePostgresForBulkImport(): void
{
try {
// Parameters that CAN be changed at runtime
DB::statement("SET maintenance_work_mem = '512MB'");
DB::statement("SET work_mem = '256MB'");
DB::statement("SET synchronous_commit = OFF");
DB::statement("SET commit_delay = 100000");
DB::statement("SET temp_buffers = '256MB'");
$this->info("✓ PostgreSQL optimized for bulk import.");
} catch (\Exception $e) {
$this->warn("Could not set all optimization parameters: " . $e->getMessage());
}
}
protected function restorePostgresSettings(): void
{
try {
DB::statement("SET synchronous_commit = ON");
DB::statement("SET maintenance_work_mem = DEFAULT");
DB::statement("SET work_mem = DEFAULT");
DB::statement("SET commit_delay = DEFAULT");
DB::statement("SET temp_buffers = DEFAULT");
} catch (\Exception $e) {
// Silent fail on restore
}
}
protected function getPkColumn(string $tableName): string
{
$serialColumns = $this->getSerialColumns($tableName);
return empty($serialColumns)
? 'id' . $tableName // Mantener case original, sin strtolower
: $serialColumns[0]['column'];
}
protected function disablePostgresConstraints(): void
{
DB::statement("SET session_replication_role = 'replica'");
}
protected function enablePostgresConstraints(): void
{
DB::statement("SET session_replication_role = 'origin'");
}
protected function resetPostgresSequences(string $tableName): void
{
$serialColumns = $this->getSerialColumns($tableName);
if (empty($serialColumns)) {
$this->warn("No serial columns found for {$tableName}. Sequence reset skipped.");
return;
}
foreach ($serialColumns as $columnInfo) {
$columnName = $columnInfo['column'];
$sequenceName = $columnInfo['sequence'];
// Usar comillas dobles para preservar case
DB::statement(
"SELECT setval(:seq, COALESCE((SELECT MAX(\"{$columnName}\") FROM \"{$tableName}\"), 1))",
['seq' => $sequenceName]
);
}
}
protected function getSerialColumns(string $tableName): array
{
$quotedTableName = "\"{$tableName}\"";
$sql = "
SELECT
a.attname AS column_name,
pg_get_serial_sequence(:quoted_table, a.attname) AS sequence_name
FROM pg_attribute a
JOIN pg_class c ON c.oid = a.attrelid
WHERE c.relname = :table_name
AND a.attnum > 0
AND NOT a.attisdropped
AND (
pg_get_serial_sequence(:quoted_table, a.attname) IS NOT NULL
OR a.attidentity = 'a'
)
";
$results = DB::select($sql, [
'quoted_table' => $quotedTableName,
'table_name' => $tableName // Sin strtolower
]);
return array_map(fn($r) => [
'column' => $r->column_name,
'sequence' => $r->sequence_name,
], array_filter($results, fn($r) => $r->sequence_name));
}
}