Initial Commit
This commit is contained in:
235
main.py
Normal file
235
main.py
Normal file
@@ -0,0 +1,235 @@
|
||||
#!/usr/bin/env python3
|
||||
# -*- coding: utf-8 -*-
|
||||
import datetime
|
||||
import sys
|
||||
import math
|
||||
import logging
|
||||
import psycopg2 # POSTGRE CONNECTOR
|
||||
import pypyodbc # HFSQL CONNECTOR
|
||||
import os
|
||||
from dotenv import load_dotenv
|
||||
from tqdm import tqdm
|
||||
|
||||
# ---------------- CONFIGURATION ----------------
|
||||
|
||||
# Load variables from .env
|
||||
load_dotenv()
|
||||
|
||||
# Read config from .env
|
||||
HFSQL_DSN = os.getenv("HFSQL_DSN")
|
||||
HFSQL_SERVER = os.getenv("HFSQL_SERVER")
|
||||
HFSQL_PORT = os.getenv("HFSQL_PORT")
|
||||
HFSQL_DATABASE = os.getenv("HFSQL_DATABASE")
|
||||
HFSQL_USER = os.getenv("HFSQL_USER")
|
||||
HFSQL_PASSWORD = os.getenv("HFSQL_PASSWORD")
|
||||
|
||||
# HFSQL Connection String
|
||||
HFSQL_CONN_STR = (f"DSN={HFSQL_DSN};UID={HFSQL_USER};PWD={HFSQL_PASSWORD}")
|
||||
|
||||
PG_HOST = os.getenv("PG_HOST")
|
||||
PG_PORT = os.getenv("PG_PORT")
|
||||
PG_DB = os.getenv("PG_DB")
|
||||
PG_USER = os.getenv("PG_USER")
|
||||
PG_PASSWORD = os.getenv("PG_PASSWORD")
|
||||
POSTGRES_CONN_STR = (
|
||||
f"host={PG_HOST} "
|
||||
f"port={PG_PORT} "
|
||||
f"dbname={PG_DB} "
|
||||
f"user={PG_USER} "
|
||||
f"password={PG_PASSWORD}"
|
||||
)
|
||||
|
||||
BATCH_SIZE = int(os.getenv("BATCH_SIZE", 10000))
|
||||
LOG_FILE = os.getenv("LOG_FILE", "migration.log")
|
||||
|
||||
# Data types mapping HFSQL → PostgreSQL
|
||||
TYPE_MAP = {
|
||||
int: 'INTEGER',
|
||||
str: 'TEXT',
|
||||
float: 'REAL',
|
||||
bool: 'BOOLEAN',
|
||||
bytes: 'BYTEA',
|
||||
datetime.datetime: 'TIMESTAMP',
|
||||
datetime.date: 'DATE',
|
||||
}
|
||||
|
||||
# ---------------- LOGGING ----------------
|
||||
logging.basicConfig(
|
||||
filename=LOG_FILE,
|
||||
filemode="a",
|
||||
level=logging.INFO,
|
||||
format="%(asctime)s - %(levelname)s - %(message)s"
|
||||
)
|
||||
|
||||
# ---------------- FUNCTIONS ----------------
|
||||
|
||||
def get_hfsql_types(cursor, table_name):
|
||||
"""
|
||||
Returns a list of columns [(name, hfsql_type)]
|
||||
"""
|
||||
cursor.execute(f"SELECT * FROM {table_name} WHERE 1=0")
|
||||
columns = [(desc[0],desc[1]) for desc in cursor.description]
|
||||
return columns
|
||||
|
||||
|
||||
def count_records(cursor, table_name):
|
||||
cursor.execute(f"SELECT COUNT(*) FROM {table_name}")
|
||||
return cursor.fetchone()[0]
|
||||
|
||||
|
||||
def generate_postgres_insert(table_name, columns):
|
||||
names = [col[0] for col in columns]
|
||||
placeholders = ", ".join(["%s"] * len(columns))
|
||||
fields = ", ".join(names)
|
||||
query = f"INSERT INTO {table_name} ({fields}) VALUES ({placeholders})"
|
||||
#print(fields)
|
||||
return query
|
||||
|
||||
|
||||
#def convert_type(value, hfsql_type):
|
||||
# if value is None:
|
||||
# return None
|
||||
# if isinstance(value, bytearray):
|
||||
# return bytes(value) # convertir a bytes para PostgreSQL
|
||||
# tipo_str = str(hfsql_type).lower()
|
||||
# if "tinyint" in tipo_str or "boolean" in tipo_str:
|
||||
# return bool(value)
|
||||
# if "date" in tipo_str or "datetime" in tipo_str:
|
||||
# return value
|
||||
# return value
|
||||
|
||||
def convert_type(value, hfsql_type):
|
||||
"""Convierte un valor HFSQL a PostgreSQL según TYPE_MAP"""
|
||||
if value is None:
|
||||
return None
|
||||
|
||||
# Tinyint / boolean
|
||||
if hfsql_type in ["tinyint", "boolean"]:
|
||||
return bool(value)
|
||||
|
||||
# Decimal / float
|
||||
if hfsql_type in ["float", "decimal"]:
|
||||
return float(value)
|
||||
|
||||
# Int / bigint
|
||||
if hfsql_type in ["int", "bigint"]:
|
||||
return int(value)
|
||||
|
||||
# Strings
|
||||
if hfsql_type in ["varchar", "char", "text"]:
|
||||
return str(value)
|
||||
|
||||
# Por defecto, devolver tal cual
|
||||
return value
|
||||
|
||||
def create_postgres_table_if_not_exists(cur_pg, table_name, columns):
|
||||
"""
|
||||
Crea la tabla en PostgreSQL si no existe, usando la estructura pasada en columns.
|
||||
columns: lista de tuplas (nombre_columna, tipo_columna)
|
||||
"""
|
||||
# Mapeo básico de tipos HFSQL a PostgreSQL (ajusta según tus tipos)
|
||||
|
||||
columns_ddl = []
|
||||
for col_name, col_type in columns:
|
||||
pg_type = TYPE_MAP.get(col_type, 'TEXT') # default a TEXT
|
||||
columns_ddl.append(f'"{col_name}" {pg_type}')
|
||||
|
||||
ddl = f'CREATE TABLE IF NOT EXISTS "{table_name}" (\n {",\n ".join(columns_ddl)}\n);'
|
||||
#print(f"dll Create ->{ddl}")
|
||||
|
||||
try:
|
||||
cur_pg.execute(ddl)
|
||||
cur_pg.connection.commit()
|
||||
print(f"✅ Tabla {table_name} creada o ya existía en PostgreSQL")
|
||||
except Exception as e:
|
||||
print(f"❌ Error creando tabla {table_name}: {e}")
|
||||
cur_pg.connection.rollback()
|
||||
|
||||
|
||||
def migrate_table(table_name):
|
||||
logging.info(f"Starting migration of table {table_name}")
|
||||
# HFSQL Connection
|
||||
try:
|
||||
conn_hf = pypyodbc.connect(HFSQL_CONN_STR)
|
||||
cur_hf = conn_hf.cursor()
|
||||
print("✅ Connected to HFSQL")
|
||||
except Exception as e:
|
||||
print("Error connecting to HFSQL:", e)
|
||||
return
|
||||
# POSTGRE Connection
|
||||
try:
|
||||
conn_pg = psycopg2.connect(POSTGRES_CONN_STR)
|
||||
cur_pg = conn_pg.cursor()
|
||||
print("✅ Connected to PostgreSQL")
|
||||
except Exception as e:
|
||||
print("Error connecting to POSTGRE:", e)
|
||||
return
|
||||
|
||||
# Get table info
|
||||
columns = get_hfsql_types(cur_hf, table_name)
|
||||
#print(f"Structure -> \n{columns}")
|
||||
total = count_records(cur_hf, table_name)
|
||||
#print(f"Total rows -> \n {total}")
|
||||
logging.info(f"Table {table_name}: {total} records detected")
|
||||
|
||||
#CREATE TABLE IF NOT EXISTS
|
||||
create_postgres_table_if_not_exists(cur_pg, table_name, columns)
|
||||
#CREATE INSERT
|
||||
insert_query = generate_postgres_insert(table_name, columns)
|
||||
total_batches = math.ceil(total / BATCH_SIZE)
|
||||
|
||||
for batch_num in tqdm(range(total_batches), desc=f"Migrating {table_name}"):
|
||||
|
||||
offset = batch_num * BATCH_SIZE
|
||||
columns_sql = ", ".join([f'"{col[0]}"' for col in columns])
|
||||
cur_hf.execute(f"SELECT {columns_sql} FROM {table_name} LIMIT {BATCH_SIZE} OFFSET {offset}")
|
||||
print(f"curfDesc-> {cur_hf}")
|
||||
# print(f"SELECT {columns_sql} FROM {table_name} LIMIT {BATCH_SIZE} OFFSET {offset}")
|
||||
rows = cur_hf.fetchall()
|
||||
print(f"row1 = {rows[0]}")
|
||||
|
||||
converted_data = []
|
||||
|
||||
for row_num, row in enumerate(rows, 1):
|
||||
row_values = []
|
||||
for i, value in enumerate(row):
|
||||
col_type = columns[i][1] # obtener el tipo de la columna por índice
|
||||
row_values.append(convert_type(value, col_type))
|
||||
print(f"Columna {i} :{value}")
|
||||
|
||||
# Rellenar con None si falta alguna columna
|
||||
while len(row_values) < len(columns):
|
||||
row_values.append(None)
|
||||
|
||||
converted_data.append(row_values)
|
||||
print(f"Converted Data -> {converted_data}")
|
||||
try:
|
||||
cur_pg.executemany(insert_query, converted_data)
|
||||
conn_pg.commit()
|
||||
except Exception as e:
|
||||
|
||||
conn_pg.rollback()
|
||||
logging.error(f"Error in batch {batch_num}: {e}")
|
||||
print(f"❌ Error in batch {batch_num}: {e}")
|
||||
print("🛑 Stopping migration due to error.")
|
||||
break
|
||||
# Detener la migración si falla un batch
|
||||
|
||||
cur_hf.close()
|
||||
conn_hf.close()
|
||||
cur_pg.close()
|
||||
conn_pg.close()
|
||||
|
||||
logging.info(f"Migration of {table_name} completed successfully.")
|
||||
print(f"✅ Table {table_name} migrated successfully.")
|
||||
|
||||
|
||||
# ---------------- MAIN ----------------
|
||||
|
||||
if __name__ == "__main__":
|
||||
if len(sys.argv) != 2:
|
||||
print("Usage: python migrate_table.py Table_Name")
|
||||
sys.exit(1)
|
||||
|
||||
table_name = sys.argv[1]
|
||||
migrate_table(table_name)
|
||||
Reference in New Issue
Block a user