#!/usr/bin/env python3 # -*- coding: utf-8 -*- import datetime import sys import math import logging import psycopg2 # POSTGRE CONNECTOR import pypyodbc # HFSQL CONNECTOR import os from dotenv import load_dotenv from tqdm import tqdm # ---------------- CONFIGURATION ---------------- # Load variables from .env load_dotenv() # Read config from .env HFSQL_DSN = os.getenv("HFSQL_DSN") HFSQL_SERVER = os.getenv("HFSQL_SERVER") HFSQL_PORT = os.getenv("HFSQL_PORT") HFSQL_DATABASE = os.getenv("HFSQL_DATABASE") HFSQL_USER = os.getenv("HFSQL_USER") HFSQL_PASSWORD = os.getenv("HFSQL_PASSWORD") # HFSQL Connection String HFSQL_CONN_STR = (f"DSN={HFSQL_DSN};UID={HFSQL_USER};PWD={HFSQL_PASSWORD}") PG_HOST = os.getenv("PG_HOST") PG_PORT = os.getenv("PG_PORT") PG_DB = os.getenv("PG_DB") PG_USER = os.getenv("PG_USER") PG_PASSWORD = os.getenv("PG_PASSWORD") POSTGRES_CONN_STR = ( f"host={PG_HOST} " f"port={PG_PORT} " f"dbname={PG_DB} " f"user={PG_USER} " f"password={PG_PASSWORD}" ) BATCH_SIZE = int(os.getenv("BATCH_SIZE", 10000)) LOG_FILE = os.getenv("LOG_FILE", "migration.log") # Data types mapping HFSQL → PostgreSQL TYPE_MAP = { int: 'INTEGER', str: 'TEXT', float: 'REAL', bool: 'BOOLEAN', bytes: 'BYTEA', datetime.datetime: 'TIMESTAMP', datetime.date: 'DATE', } # ---------------- LOGGING ---------------- logging.basicConfig( filename=LOG_FILE, filemode="a", level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s" ) # ---------------- FUNCTIONS ---------------- def get_hfsql_types(cursor, table_name): """ Returns a list of columns [(name, hfsql_type)] """ cursor.execute(f"SELECT * FROM {table_name} WHERE 1=0") columns = [(desc[0],desc[1]) for desc in cursor.description] return columns def count_records(cursor, table_name): cursor.execute(f"SELECT COUNT(*) FROM {table_name}") return cursor.fetchone()[0] def generate_postgres_insert(table_name, columns): names = [col[0] for col in columns] placeholders = ", ".join(["%s"] * len(columns)) fields = ", ".join(names) query = f"INSERT INTO {table_name} ({fields}) VALUES ({placeholders})" #print(fields) return query #def convert_type(value, hfsql_type): # if value is None: # return None # if isinstance(value, bytearray): # return bytes(value) # convertir a bytes para PostgreSQL # tipo_str = str(hfsql_type).lower() # if "tinyint" in tipo_str or "boolean" in tipo_str: # return bool(value) # if "date" in tipo_str or "datetime" in tipo_str: # return value # return value def convert_type(value, hfsql_type): """Convierte un valor HFSQL a PostgreSQL según TYPE_MAP""" if value is None: return None # Tinyint / boolean if hfsql_type in ["tinyint", "boolean"]: return bool(value) # Decimal / float if hfsql_type in ["float", "decimal"]: return float(value) # Int / bigint if hfsql_type in ["int", "bigint"]: return int(value) # Strings if hfsql_type in ["varchar", "char", "text"]: return str(value) # Por defecto, devolver tal cual return value def create_postgres_table_if_not_exists(cur_pg, table_name, columns): """ Crea la tabla en PostgreSQL si no existe, usando la estructura pasada en columns. columns: lista de tuplas (nombre_columna, tipo_columna) """ # Mapeo básico de tipos HFSQL a PostgreSQL (ajusta según tus tipos) columns_ddl = [] for col_name, col_type in columns: pg_type = TYPE_MAP.get(col_type, 'TEXT') # default a TEXT columns_ddl.append(f'"{col_name}" {pg_type}') ddl = f'CREATE TABLE IF NOT EXISTS "{table_name}" (\n {",\n ".join(columns_ddl)}\n);' #print(f"dll Create ->{ddl}") try: cur_pg.execute(ddl) cur_pg.connection.commit() print(f"✅ Tabla {table_name} creada o ya existía en PostgreSQL") except Exception as e: print(f"❌ Error creando tabla {table_name}: {e}") cur_pg.connection.rollback() def migrate_table(table_name): logging.info(f"Starting migration of table {table_name}") # HFSQL Connection try: conn_hf = pypyodbc.connect(HFSQL_CONN_STR) cur_hf = conn_hf.cursor() print("✅ Connected to HFSQL") except Exception as e: print("Error connecting to HFSQL:", e) return # POSTGRE Connection try: conn_pg = psycopg2.connect(POSTGRES_CONN_STR) cur_pg = conn_pg.cursor() print("✅ Connected to PostgreSQL") except Exception as e: print("Error connecting to POSTGRE:", e) return # Get table info columns = get_hfsql_types(cur_hf, table_name) #print(f"Structure -> \n{columns}") total = count_records(cur_hf, table_name) #print(f"Total rows -> \n {total}") logging.info(f"Table {table_name}: {total} records detected") #CREATE TABLE IF NOT EXISTS create_postgres_table_if_not_exists(cur_pg, table_name, columns) #CREATE INSERT insert_query = generate_postgres_insert(table_name, columns) total_batches = math.ceil(total / BATCH_SIZE) for batch_num in tqdm(range(total_batches), desc=f"Migrating {table_name}"): offset = batch_num * BATCH_SIZE columns_sql = ", ".join([f'"{col[0]}"' for col in columns]) cur_hf.execute(f"SELECT {columns_sql} FROM {table_name} LIMIT {BATCH_SIZE} OFFSET {offset}") print(f"curfDesc-> {cur_hf}") # print(f"SELECT {columns_sql} FROM {table_name} LIMIT {BATCH_SIZE} OFFSET {offset}") rows = cur_hf.fetchall() print(f"row1 = {rows[0]}") converted_data = [] for row_num, row in enumerate(rows, 1): row_values = [] for i, value in enumerate(row): col_type = columns[i][1] # obtener el tipo de la columna por índice row_values.append(convert_type(value, col_type)) print(f"Columna {i} :{value}") # Rellenar con None si falta alguna columna while len(row_values) < len(columns): row_values.append(None) converted_data.append(row_values) print(f"Converted Data -> {converted_data}") try: cur_pg.executemany(insert_query, converted_data) conn_pg.commit() except Exception as e: conn_pg.rollback() logging.error(f"Error in batch {batch_num}: {e}") print(f"❌ Error in batch {batch_num}: {e}") print("🛑 Stopping migration due to error.") break # Detener la migración si falla un batch cur_hf.close() conn_hf.close() cur_pg.close() conn_pg.close() logging.info(f"Migration of {table_name} completed successfully.") print(f"✅ Table {table_name} migrated successfully.") # ---------------- MAIN ---------------- if __name__ == "__main__": if len(sys.argv) != 2: print("Usage: python migrate_table.py Table_Name") sys.exit(1) table_name = sys.argv[1] migrate_table(table_name)