Files
postgresql-migration/main_simple_copia.py
2025-11-11 10:04:17 +01:00

230 lines
6.8 KiB
Python

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import datetime
import sys
import math
import logging
import psycopg2 # POSTGRE CONNECTOR
import pypyodbc # HFSQL CONNECTOR
import os
from dotenv import load_dotenv
from tqdm import tqdm
# ---------------- CONFIGURATION ----------------
# Load variables from .env
load_dotenv()
# Read config from .env
HFSQL_DSN = os.getenv("HFSQL_DSN")
HFSQL_SERVER = os.getenv("HFSQL_SERVER")
HFSQL_PORT = os.getenv("HFSQL_PORT")
HFSQL_DATABASE = os.getenv("HFSQL_DATABASE")
HFSQL_USER = os.getenv("HFSQL_USER")
HFSQL_PASSWORD = os.getenv("HFSQL_PASSWORD")
# HFSQL Connection String
HFSQL_CONN_STR = (f"DSN={HFSQL_DSN};UID={HFSQL_USER};PWD={HFSQL_PASSWORD}")
PG_HOST = os.getenv("PG_HOST")
PG_PORT = os.getenv("PG_PORT")
PG_DB = os.getenv("PG_DB")
PG_USER = os.getenv("PG_USER")
PG_PASSWORD = os.getenv("PG_PASSWORD")
POSTGRES_CONN_STR = (
f"host={PG_HOST} "
f"port={PG_PORT} "
f"dbname={PG_DB} "
f"user={PG_USER} "
f"password={PG_PASSWORD}"
)
BATCH_SIZE = int(os.getenv("BATCH_SIZE", 10000))
LOG_FILE = os.getenv("LOG_FILE", "migration.log")
# Data types mapping HFSQL → PostgreSQL
TYPE_MAP = {
int: 'INTEGER',
str: 'TEXT',
float: 'REAL',
bool: 'BOOLEAN',
bytes: 'BYTEA',
datetime.datetime: 'TIMESTAMP',
datetime.date: 'DATE',
}
# ---------------- LOGGING ----------------
logging.basicConfig(
filename=LOG_FILE,
filemode="a",
level=logging.INFO,
format="%(asctime)s - %(levelname)s - %(message)s"
)
# ---------------- FUNCTIONS ----------------
def get_hfsql_types(cursor, table_name):
"""
Returns a list of columns [(name, hfsql_type)]
"""
cursor.execute(f"SELECT * FROM {table_name} WHERE 1=0")
columns = [(desc[0],desc[1]) for desc in cursor.description]
return columns
def count_records(cursor, table_name):
cursor.execute(f"SELECT COUNT(*) FROM {table_name}")
return cursor.fetchone()[0]
def generate_postgres_insert(table_name, columns):
names = [col[0] for col in columns]
placeholders = ", ".join(["%s"] * len(columns))
fields = ", ".join(names)
query = f"INSERT INTO {table_name} ({fields}) VALUES ({placeholders})"
return query
#def convert_type(value, hfsql_type):
# if value is None:
# return None
# if isinstance(value, bytearray):
# return bytes(value) # convertir a bytes para PostgreSQL
# tipo_str = str(hfsql_type).lower()
# if "tinyint" in tipo_str or "boolean" in tipo_str:
# return bool(value)
# if "date" in tipo_str or "datetime" in tipo_str:
# return value
# return value
def convert_type(value, hfsql_type):
"""Convierte un valor HFSQL a PostgreSQL según TYPE_MAP"""
if value is None:
return None
# Tinyint / boolean
if hfsql_type in ["tinyint", "boolean"]:
return bool(value)
# Decimal / float
if hfsql_type in ["float", "decimal"]:
return float(value)
# Int / bigint
if hfsql_type in ["int", "bigint"]:
return int(value)
# Strings
if hfsql_type in ["varchar", "char", "text"]:
return str(value)
# Por defecto, devolver tal cual
return value
def create_postgres_table_if_not_exists(cur_pg, table_name, columns):
"""
Crea la tabla en PostgreSQL si no existe, usando la estructura pasada en columns.
columns: lista de tuplas (nombre_columna, tipo_columna)
"""
# Mapeo básico de tipos HFSQL a PostgreSQL (ajusta según tus tipos)
columns_ddl = []
for col_name, col_type in columns:
pg_type = TYPE_MAP.get(col_type, 'TEXT') # default a TEXT
columns_ddl.append(f'"{col_name}" {pg_type}')
print(columns_ddl[0])
ddl = f'CREATE TABLE IF NOT EXISTS "{table_name}" (\n {",\n ".join(columns_ddl)}\n);'
try:
cur_pg.execute(ddl)
cur_pg.connection.commit()
print(f"✅ Tabla {table_name} creada o ya existía en PostgreSQL")
except Exception as e:
print(f"❌ Error creando tabla {table_name}: {e}")
cur_pg.connection.rollback()
def migrate_table(table_name):
logging.info(f"Starting migration of table {table_name}")
# HFSQL Connection
try:
conn_hf = pypyodbc.connect(HFSQL_CONN_STR)
cur_hf = conn_hf.cursor()
print("✅ Connected to HFSQL")
except Exception as e:
print("Error connecting to HFSQL:", e)
return
# POSTGRE Connection
try:
conn_pg = psycopg2.connect(POSTGRES_CONN_STR)
cur_pg = conn_pg.cursor()
print("✅ Connected to PostgreSQL")
except Exception as e:
print("Error connecting to POSTGRE:", e)
return
# Get table info
columns = get_hfsql_types(cur_hf, table_name)
print(f"Structure -> \n{columns}")
total = count_records(cur_hf, table_name)
print(f"Total rows -> \n {total}")
logging.info(f"Table {table_name}: {total} records detected")
#CREATE TABLE IF NOT EXISTS
create_postgres_table_if_not_exists(cur_pg, table_name, columns)
#CREATE INSERT
insert_query = generate_postgres_insert(table_name, columns)
total_batches = math.ceil(total / BATCH_SIZE)
for batch_num in tqdm(range(total_batches), desc=f"Migrating {table_name}"):
offset = batch_num * BATCH_SIZE
print(f"SELECT * FROM {table_name} LIMIT {BATCH_SIZE} OFFSET {offset}")
cur_hf.execute(f"SELECT * FROM {table_name} LIMIT {BATCH_SIZE} OFFSET {offset}")
rows = cur_hf.fetchall()
converted_data = []
for row_num, row in enumerate(rows, 1):
row_values = []
for i, value in enumerate(row):
col_type = columns[i][1] # obtener el tipo de la columna por índice
row_values.append(convert_type(value, col_type))
# Rellenar con None si falta alguna columna
while len(row_values) < len(columns):
row_values.append(None)
converted_data.append(row_values)
try:
cur_pg.executemany(insert_query, converted_data)
conn_pg.commit()
except Exception as e:
conn_pg.rollback()
logging.error(f"Error in batch {batch_num}: {e}")
print(f"❌ Error in batch {batch_num}: {e}")
print("🛑 Stopping migration due to error.")
break
# Detener la migración si falla un batch
cur_hf.close()
conn_hf.close()
cur_pg.close()
conn_pg.close()
logging.info(f"Migration of {table_name} completed successfully.")
print(f"✅ Table {table_name} migrated successfully.")
# ---------------- MAIN ----------------
if __name__ == "__main__":
if len(sys.argv) != 2:
print("Usage: python migrate_table.py Table_Name")
sys.exit(1)
table_name = sys.argv[1]
migrate_table(table_name)