NotesWhat is notes.io?

Notes brand slogan

Notes - notes.io

import boto3
import gzip
from sqlalchemy import create_engine, text
import urllib3
import logging

urllib3.disable_warnings()

# AWS S3 configuration
S3_BUCKET_NAME = 'aa00002695-quanthub-market-data-s3-dev-us-east-1'
S3_PATH = 'Datastore/Xpressfeed/compustatmarketcap/'
S3_CLIENT = boto3.client('s3', verify=False)

# Database configuration
DB_USER = 'consumg'
DB_PASSWORD = 'consumg#8894'
DB_HOST = 'aa00002695-qhub-prd-clus.cluster-c0wjzrxtrwvo.us-east-1.rds.amazonaws.com'
DB_PORT = '5440'
DB_NAME = 'qhub'
SCHEMA_NAME = 'consumg'
FINAL_TABLE_NAME = 'compustatmarketcap'

# Database connection string
DB_URL = f'postgresql://{DB_USER}:{DB_PASSWORD}@{DB_HOST}:{DB_PORT}/{DB_NAME}'
ENGINE = create_engine(DB_URL)

logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')

def list_files(bucket_name, prefix, suffix='.csv.gz'):


"""List files in S3 bucket with given prefix and suffixes."""
logging.info("Fetching list of files from S3")
try:
response = S3_CLIENT.list_objects_v2(Bucket=bucket_name, Prefix=prefix)
files = [obj['Key'] for obj in response.get('Contents', []) if obj['Key'].endswith(suffix)]
return files
except Exception as e:
logging.error(f"Error listing files from S3: {str(e)}")
return []

def create_temp_table(engine, schema_name, table_name):
"""Create temporary table in the database."""
try:
with engine.connect() as connection:
logging.info("Database connection successful")
drop_statement = text(f"DROP TABLE IF EXISTS {schema_name}.{table_name}")
connection.execute(drop_statement)
connection.commit()
logging.info(f"Temporary table {table_name} dropped if it existed")
sql_statement = text(f"""
CREATE TABLE {schema_name}.{table_name} (
GVKEY VARCHAR(6),
DATADATE DATE,
CURCDD VARCHAR(3),
MKVAL DOUBLE PRECISION

);

""")
connection.execute(sql_statement)
connection.commit()
logging.info("Temporary table created successfully")
except Exception as e:
logging.error(f"Error creating temporary table: {str(e)}")

def copy_data_to_temp_table(engine, schema_name, table_name, files):
"""Copy data from S3 files to temporary database table using COPY command."""
total_copied = 0
conn=engine.connect()
#conn.execution_options(autocommit=False)
try:
logging.info("Starting to copy data to the database table")
for file_key in files:
logging.info(f"Copying data from {file_key}")
try:
obj = S3_CLIENT.get_object(Bucket=S3_BUCKET_NAME, Key=file_key)
with gzip.open(obj['Body'], 'rt') as gz:
record_count = sum(1 for line in gz)-1
logging.info(f"Total source record count in {file_key}:{record_count}")
obj = S3_CLIENT.get_object(Bucket=S3_BUCKET_NAME, Key=file_key)
with gzip.open(obj['Body'], 'rt') as gz:
copy_sql = f"COPY {schema_name}.{table_name} FROM stdin WITH CSV HEADER DELIMITER ','"
with conn.connection.cursor() as cur:
cur.copy_expert(sql=copy_sql, file=gz)
conn.connection.commit()
conn.connection.set_isolation_level(1)
cur.execute(f"SELECT COUNT(*) FROM {schema_name}.{table_name}")
copied_count = cur.fetchone()[0]-total_copied
logging.info(f"Total records copied from {file_key}:{copied_count}")

if copied_count!=record_count:
logging.error(f"Record count mismatch for {file_key}: Expected {record_count},but copied {copied_count}")
raise
else:
logging.info(f"Record count verification passed for {file_key}")

total_copied+=copied_count

except Exception as e:
logging.error(f"Failed to copy data from {file_key}: {str(e)}")
raise
logging.info(f"Total records copied from all files: {total_copied}")
return True
except Exception as e:
logging.error(f"Error copying data to temporary table: {str(e)}")
return False


def drop_and_replace_final_table(engine,schema_name,temp_table_name,final_table_name):
try:
with engine.connect() as connection:
drop_statement = text(f"DROP TABLE IF EXISTS {schema_name}.{final_table_name}")
connection.execute(drop_statement)
connection.commit()
logging.info(f"Table dropped {final_table_name}")

rename_statement = text(f"ALTER TABLE {schema_name}.{temp_table_name} RENAME TO {final_table_name}")
connection.execute(rename_statement)
connection.commit()
logging.info(f"Temporary table renamed to {final_table_name}")
except Exception as e:
logging.error(f"Error renaming and replacing table: {str(e)}")
raise


def create_indexes(engine, schema_name, table_name):
"""Create necessary indexes after data is loaded into the table."""
with engine.connect() as connection:
# Create unique index
unique_index_statement = text(f"""
CREATE UNIQUE INDEX IF NOT EXISTS {table_name}_unique_idx
ON {schema_name}.{table_name} (datadate, gvkey, curcdd);
""")
result=connection.execute(unique_index_statement)
if result.rowcount>0:
logging.info(f"Unique index created")
else:
logging.info("not created")

# Create non-unique index
index_statement = text(f"""
CREATE INDEX IF NOT EXISTS {table_name}_index
ON {schema_name}.{table_name} (gvkey, datadate);
""")
connection.execute(index_statement)

def main():
logging.info("Starting the file listing process")
files = list_files(S3_BUCKET_NAME, S3_PATH)
#logging.info(f"Files: {files}")
if files:
files = files[:2]
temp_table_name = 'temp_table'
try:
create_temp_table(ENGINE, SCHEMA_NAME, temp_table_name)
success = copy_data_to_temp_table(ENGINE, SCHEMA_NAME, temp_table_name, files)
logging.info(f"data copy success status:{success}")
if success:
logging.info("data copied to temporary table successfully")
try:
drop_and_replace_final_table(ENGINE, SCHEMA_NAME,temp_table_name, FINAL_TABLE_NAME) # Drop and replace final table after successful data copy
create_indexes(ENGINE, SCHEMA_NAME,FINAL_TABLE_NAME)
except Exception as e:
logging.error(f"Error during final table update: {str(e)}")
raise
else:
logging.error("Copying data to temporary table failed. Skipping renaming.")
except Exception as e:
logging.error(f"Process failed: {str(e)}. Final table remains unchanged.")
else:
logging.info("No CSV files found")

if __name__ == "__main__":
main()

     
 
what is notes.io
 

Notes.io is a web-based application for taking notes. You can take your notes and share with others people. If you like taking long notes, notes.io is designed for you. To date, over 8,000,000,000 notes created and continuing...

With notes.io;

  • * You can take a note from anywhere and any device with internet connection.
  • * You can share the notes in social platforms (YouTube, Facebook, Twitter, instagram etc.).
  • * You can quickly share your contents without website, blog and e-mail.
  • * You don't need to create any Account to share a note. As you wish you can use quick, easy and best shortened notes with sms, websites, e-mail, or messaging services (WhatsApp, iMessage, Telegram, Signal).
  • * Notes.io has fabulous infrastructure design for a short link and allows you to share the note as an easy and understandable link.

Fast: Notes.io is built for speed and performance. You can take a notes quickly and browse your archive.

Easy: Notes.io doesn’t require installation. Just write and share note!

Short: Notes.io’s url just 8 character. You’ll get shorten link of your note when you want to share. (Ex: notes.io/q )

Free: Notes.io works for 12 years and has been free since the day it was started.


You immediately create your first note and start sharing with the ones you wish. If you want to contact us, you can use the following communication channels;


Email: [email protected]

Twitter: http://twitter.com/notesio

Instagram: http://instagram.com/notes.io

Facebook: http://facebook.com/notesio



Regards;
Notes.io Team

     
 
Shortened Note Link
 
 
Looding Image
 
     
 
Long File
 
 

For written notes was greater than 18KB Unable to shorten.

To be smaller than 18KB, please organize your notes, or sign in.