Notes![what is notes.io? What is notes.io?](/theme/images/whatisnotesio.png)
![]() ![]() Notes - notes.io |
import gzip
import logging
from sqlalchemy import create_engine, text
import pandas as pd
import urllib3
urllib3.disable_warnings()
# Configure logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
# AWS S3 configuration
S3_BUCKET_NAME = 'aa00002695-quanthub-market-data-s3-dev-us-east-1'
S3_PATH = 'Datastore/Xpressfeed/compustatmarketcap/'
S3_CLIENT = boto3.client('s3', verify=False)
# Database configuration
DB_USER = 'consumg'
DB_PASSWORD = 'consumg#8894'
DB_HOST = 'aa00002695-qhub-prd-clus.cluster-c0wjzrxtrwvo.us-east-1.rds.amazonaws.com'
DB_PORT = '5440'
DB_NAME = 'qhub'
SCHEMA_NAME = 'consumg'
FINAL_TABLE_NAME = 'compustatmarketcap'
# Database connection string
DB_URL = f'postgresql://{DB_USER}:{DB_PASSWORD}@{DB_HOST}:{DB_PORT}/{DB_NAME}'
ENGINE = create_engine(DB_URL)
def list_files(bucket_name, prefix, suffix='.csv.gz'):
"""List files in S3 bucket with given prefix and suffix."""
try:
logging.info("Fetching list of files from S3")
response = S3_CLIENT.list_objects_v2(Bucket=bucket_name, Prefix=prefix)
if 'Contents' in response:
files = [obj['Key'] for obj in response['Contents'] if obj['Key'].endswith(suffix)]
logging.info(f"Found {len(files)} files in S3 with prefix {prefix}")
return files
else:
logging.warning("No files found in S3 with the specified prefix")
return []
except Exception as e:
logging.error(f"Error fetching list of files: {e}")
raise
def create_temp_table(engine, schema_name, table_name):
"""Create temporary table in the database."""
try:
with engine.connect() as connection:
logging.info("Creating temporary table")
drop_statement = text(f"DROP TABLE IF EXISTS {schema_name}.{table_name}")
connection.execute(drop_statement)
connection.commit()
sql_statement = text(f"""
CREATE TABLE {schema_name}.{table_name} (
GVKEY VARCHAR(6) NOT NULL,
DATADATE DATE,
CURCDD VARCHAR,
MKVAL DOUBLE PRECISION
)
""")
connection.execute(sql_statement)
connection.commit()
logging.info("Temporary table created successfully")
except Exception as e:
logging.error(f"Error creating temporary table: {e}")
raise
def copy_data_to_temp_table(engine, schema_name, table_name, files, batch_size=500000):
"""Copy data from S3 files to temporary database table using COPY command."""
try:
logging.info("Starting to copy data to the database table")
for file_key in files:
try:
logging.info(f"Copying data from {file_key}")
obj = S3_CLIENT.get_object(Bucket=S3_BUCKET_NAME, Key=file_key)
with gzip.open(obj['Body'], 'rt') as gz:
chunk_iter = pd.read_csv(gz, chunksize=batch_size)
for chunk in chunk_iter:
chunk.to_sql(table_name, engine, schema=schema_name, if_exists='append', index=False)
logging.info(f"Copied batch of size {len(chunk)} from {file_key}")
except Exception as e:
logging.error(f"Error copying data from file {file_key}: {e}")
raise
# Verify that all data is copied successfully
with engine.connect() as connection:
count_statement = text(f"SELECT COUNT(*) FROM {schema_name}.{table_name}")
result = connection.execute(count_statement)
total_records = result.scalar()
logging.info(f"Total records copied to {schema_name}.{table_name}: {total_records}")
except Exception as e:
logging.error(f"Error copying data to temporary table: {e}")
raise
def rename_and_replace_table(engine, schema_name, temp_table_name, final_table_name):
"""Rename temporary table and replace final table."""
try:
with engine.connect() as connection:
# Drop the final table if it exists
drop_statement = text(f"DROP TABLE IF EXISTS {schema_name}.{final_table_name}")
connection.execute(drop_statement)
connection.commit()
logging.info(f"Existing table {final_table_name} dropped")
# Rename the temporary table to the final table name
rename_statement = text(f"ALTER TABLE {schema_name}.{temp_table_name} RENAME TO {final_table_name}")
connection.execute(rename_statement)
connection.commit()
logging.info(f"Temporary table renamed to {final_table_name}")
except Exception as e:
logging.error(f"Error renaming and replacing table: {e}")
raise
def main():
try:
logging.info("Starting the file listing process")
files = list_files(S3_BUCKET_NAME, S3_PATH)
if not files:
logging.error("No CSV files found")
return
temp_table_name = 'temp_table'
create_temp_table(ENGINE, SCHEMA_NAME, temp_table_name)
# Copy data from all files into the temporary table
try:
copy_data_to_temp_table(ENGINE, SCHEMA_NAME, temp_table_name, files, batch_size=500000)
except Exception as e:
logging.error(f"Error during data copy: {e}")
raise
# Rename and replace the final table only if data copy is successful
rename_and_replace_table(ENGINE, SCHEMA_NAME, temp_table_name, FINAL_TABLE_NAME)
logging.info("Process completed successfully")
except Exception as e:
logging.error(f"Process failed: {e}")
if __name__ == "__main__":
main()
![]() |
Notes.io is a web-based application for taking notes. You can take your notes and share with others people. If you like taking long notes, notes.io is designed for you. To date, over 8,000,000,000 notes created and continuing...
With notes.io;
- * You can take a note from anywhere and any device with internet connection.
- * You can share the notes in social platforms (YouTube, Facebook, Twitter, instagram etc.).
- * You can quickly share your contents without website, blog and e-mail.
- * You don't need to create any Account to share a note. As you wish you can use quick, easy and best shortened notes with sms, websites, e-mail, or messaging services (WhatsApp, iMessage, Telegram, Signal).
- * Notes.io has fabulous infrastructure design for a short link and allows you to share the note as an easy and understandable link.
Fast: Notes.io is built for speed and performance. You can take a notes quickly and browse your archive.
Easy: Notes.io doesn’t require installation. Just write and share note!
Short: Notes.io’s url just 8 character. You’ll get shorten link of your note when you want to share. (Ex: notes.io/q )
Free: Notes.io works for 12 years and has been free since the day it was started.
You immediately create your first note and start sharing with the ones you wish. If you want to contact us, you can use the following communication channels;
Email: [email protected]
Twitter: http://twitter.com/notesio
Instagram: http://instagram.com/notes.io
Facebook: http://facebook.com/notesio
Regards;
Notes.io Team