Notes![what is notes.io? What is notes.io?](/theme/images/whatisnotesio.png)
![]() ![]() Notes - notes.io |
from io import StringIO
from typing import Any, Dict, Sequence, Union
import pandas as pd
from jinja2 import Template
from sqlalchemy import create_engine, text
from sqlalchemy.engine import URL, Engine
from sqlalchemy.pool import NullPool
from datetime import date, datetime
from decimal import Decimal
def resolve_dir(dirname: str):
for sql_path in [dirname, f"application-source/{dirname}"]:
if os.path.isdir(sql_path):
return os.path.abspath(sql_path)
raise RuntimeError(f"Unable to locate directory '{dirname}' from working directory '{os.curdir}'")
def _convert_to_date(d):
if isinstance(d, str):
return date(9999, 12, 31) if d == "9999-12-31" else pd.to_datetime(d).date()
if isinstance(d, datetime):
return d.date()
if isinstance(d, date):
return d
return pd.NaT
def load_from_env(*vars: str) -> Dict[str, str]:
missing_vars: List[str] = [var for var in vars if var not in os.environ]
if len(missing_vars) > 0:
raise RuntimeError("Please set the environment variable(s): %s" % ", ".join(sorted(missing_vars)))
return {var: os.environ[var] for var in vars}
def connect_postgresql(source: str) -> Engine:
"""Configure a PostgreSQL database connection
Returns:
sqlalchemy.Engine: A database connection engine
"""
prefix = source.upper()
config: Dict[str, str] = load_from_env(
f"{prefix}_HOST",
f"{prefix}_PORT",
f"{prefix}_DATABASE",
f"{prefix}_USERNAME",
f"{prefix}_PASSWORD",
)
host: str = config[f"{prefix}_HOST"]
port: int = int(config[f"{prefix}_PORT"])
database: str = config[f"{prefix}_DATABASE"]
username: str = config[f"{prefix}_USERNAME"]
password: str = config[f"{prefix}_PASSWORD"]
connection_url: URL = URL.create(
"postgresql+psycopg2",
host=host,
port=port,
database=database,
username=username,
password=password,
)
# Disable connection pooling to avoid timeout issues
engine: Engine = create_engine(connection_url, poolclass=NullPool)
return engine
def connect_sql_server(source: str, trusted: bool = False) -> Engine:
"""Configure a Microsoft SQL Server database connection
Returns:
sqlalchemy.Engine: A database connection engine
"""
prefix = source.upper()
config: Dict[str, str] = load_from_env(
f"{prefix}_HOST",
f"{prefix}_PORT",
f"{prefix}_DATABASE",
)
host: str = config[f"{prefix}_HOST"]
port: int = int(config[f"{prefix}_PORT"])
database: str = config[f"{prefix}_DATABASE"]
driver: str = "ODBC Driver 17 for SQL Server"
opts: Dict[str, str] = {
"driver": driver,
"autocommit": "True", # Required for compatibility with Azure SQL Data Warehouse
}
username: Union[str, None] = None
password: Union[str, None] = None
if not trusted:
config = load_from_env(
f"{prefix}_USERNAME",
f"{prefix}_PASSWORD",
)
username = config[f"{prefix}_USERNAME"]
password = config[f"{prefix}_PASSWORD"]
else:
opts = {
"trusted_connection": "yes",
"kerberos": "yes",
}
connection_url: URL = URL.create(
"mssql+pyodbc",
host=host,
port=port,
database=database,
username=username,
password=password,
query=opts,
)
# Disable connection pooling to avoid timeout issues
engine: Engine = create_engine(connection_url, poolclass=NullPool)
return engine
upstream_db = connect_sql_server("LASR")
downstream_db = connect_postgresql("QRA")
def bulk_insert(df: pd.DataFrame, schema: str, table: str, engine: Engine, with_analyze: bool):
if len(df) == 0:
return
buffer = StringIO()
df.to_csv(buffer, header=False, index=False)
buffer.seek(0)
sql: str = f"COPY {schema}.{table} ({','.join(df.columns.to_list())}) FROM STDIN WITH CSV;"
print(sql)
connection = engine.raw_connection()
try:
cursor = connection.cursor()
try:
cursor.copy_expert(sql, buffer)
connection.commit()
if with_analyze:
execute_query_raw(f"ANALYZE {schema}.{table};", engine)
except Exception:
connection.rollback()
raise
finally:
cursor.close()
finally:
connection.close() # This releases the connection back into the engine’s connection pool.
def execute_named_query(source: str, name: str, engine: Engine, params: Dict[str, Any] = {}) -> pd.DataFrame:
sql_dir = resolve_dir(f"{source}")
filepath = os.path.join(sql_dir, f"{name}.sql")
print(params)
if not os.path.isfile(filepath):
raise RuntimeError(f"Unable to locate file '{name}.sql'")
with open(filepath, mode="r") as f:
sql_template = f.read()
sql = Template(sql_template).render(params)
print(sql)
return execute_query(sql, engine)
def execute_query(sql: str, engine: Engine) -> pd.DataFrame:
with engine.connect() as connection:
df = pd.read_sql_query(text(sql), connection, coerce_float=False)
return df
def ensure_schema( df: pd.DataFrame, schema: Dict[str, Any]) -> pd.DataFrame:
name = schema["name"]
if schema['include_id_column']=='Y':
all_fields = schema["id_cols"] + schema["data_cols"]
else:
all_fields = schema["data_cols"]
print(all_fields)
assert df.columns.is_unique
#print(sorted(field[0] for field in all_fields))
assert sorted(df.columns.to_list()) == sorted(field[0] for field in all_fields), f"Schema mismatch for {name}."
assert not df[[field[0] for field in all_fields if not field[1].startswith("nullable ")]].isna().any().any()
for field in all_fields:
if field[1] in ["bool", "nullable bool"]:
df[field[0]] = df[field[0]].astype("boolean")
elif field[1] in ["date" , "nullable date"]:
df[field[0]] = df[field[0]].transform(_convert_to_date)
elif field[1] in ["datetime" , "nullable datetime"]:
df[field[0]] = pd.to_datetime(df[field[0]])
elif field[1] in ["decimal" , "nullable decimal"]:
df[field[0]] = df[field[0]].transform(
lambda x: Decimal(x) if (x is not None and x != "None") else Decimal("NaN")
)
elif field[1] in ["nullable float64"]:
df[field[0]] = df[field[0]].astype("float64")
elif field[1] in ["nullable int16"]:
df[field[0]] = df[field[0]].astype(pd.Int16Dtype())
elif field[1] in ["nullable int32"]:
df[field[0]] = df[field[0]].astype(pd.Int32Dtype())
elif field[1] in ["nullable int64"]:
df[field[0]] = df[field[0]].astype(pd.Int64Dtype())
elif field[1] in ["float64" , "int16" , "int32" , "int64"]:
df[field[0]] = df[field[0]].astype(field[1])
elif field[1] in ["str"]:
# Remove leading and trailing whitespace.
df[field[0]] = df[field[0]].str.strip()
elif field[1] in ["nullable str"]:
# Remove leading and trailing whitespace.
df[field[0]] = df[field[0]].astype(pd.StringDtype(storage="python")).str.strip()
else :
raise RuntimeError(f"{name}: Column '{field[0]}' has unsupported type '{field[1]}'")
return df
from datetime import datetime, timedelta
#will change it to as of now date
current_date = '2024-06-22'
current_date =datetime.strptime(current_date, "%Y-%m-%d")
current_day_prev_date = current_date - timedelta(days=1)
# # Find the first day of the current month
first_day_prev_date_month = current_day_prev_date.replace(day=1) # 2024-06-01
# # Subtract one day to get the last day of the previous month
first_day_current_month = current_date.replace(day=1) # 2024-06-01
last_month_end_date = first_day_current_month - timedelta(days=1)#2024-05-31
first_day_last_month = last_month_end_date.replace(day=1)
current_date = datetime.strptime(current_date.strftime("%Y-%m-%d"), '%Y-%m-%d').date()
current_day_prev_date = datetime.strptime(current_day_prev_date.strftime("%Y-%m-%d"), '%Y-%m-%d').date()
first_day_prev_date_month = datetime.strptime(first_day_prev_date_month.strftime("%Y-%m-%d"), '%Y-%m-%d').date()
last_month_end_date = datetime.strptime(last_month_end_date.strftime("%Y-%m-%d"), '%Y-%m-%d').date()
first_day_last_month = datetime.strptime(first_day_last_month.strftime("%Y-%m-%d"), '%Y-%m-%d').date()
print(current_date, current_day_prev_date, first_day_prev_date_month, last_month_end_date)
qra_universe_latest_month_end_date = execute_named_query("../sql/etl","qra_universe_latest_date", downstream_db, {"start_date":first_day_last_month, "end_date": last_month_end_date })
qra_universe_latest_month_end_date = qra_universe_latest_month_end_date.loc[0, 'latest_month_end_date']
assert last_month_end_date == qra_universe_latest_month_end_date, "last month end date is not present in qra_universe table"
# load qra_universe_security_ciq_price_creation
qra_universe_security_ciq_price = {
"schema": "qra_scratch",
"name": "qra_universe_security_ciq_price",
"name_plural": "qra_universe_security_ciq_price",
"include_id_column":"N",
"id_cols": [
("id", "int64"),
],
"data_cols": [
("as_of_date", "date"),
("datadate", "date"),
("xf_trading_item_id", "nullable int64"),
("price_local", "nullable float64"),
("price_usd", "nullable float64"),
("price_usd_div_adjusted", "nullable float64"),
],
}
qra_universe_security_ciq_price_df = execute_named_query("../sql/etl","qra_universe_security_ciq_price_creation_updated", downstream_db,
{"last_month_end_date": last_month_end_date, "current_day_prev_date": current_day_prev_date,
"first_day_prev_date_month": first_day_prev_date_month})
qra_universe_security_ciq_price_df = qra_universe_security_ciq_price_df.rename(columns={'pricingdate':'datadate',
'tradingitemid': 'xf_trading_item_id',
'ciq_price_local': 'price_local',
'ciq_price_usd': 'price_usd',
'divadjpriceusd': 'price_usd_div_adjusted',
})
qra_universe_security_ciq_price_df = ensure_schema(qra_universe_security_ciq_price_df, qra_universe_security_ciq_price)
qra_universe_security_ciq_price_df
last_month_end_date_datetime = pd.to_datetime(last_month_end_date) # all securities pulled date
first_day_prev_date_month_datetime = pd.to_datetime(first_day_prev_date_month)
current_day_prev_date_datetime = pd.to_datetime(current_day_prev_date)
assert len(qra_universe_security_ciq_price_df) != 0
duplicates = qra_universe_security_ciq_price_df.groupby(['xf_trading_item_id', 'datadate'])['xf_trading_item_id'].nunique()
assert not duplicates[duplicates > 1].any(), "more than one market cap value for a company is found"
as_of_dates_df = qra_universe_security_ciq_price_df.copy()
as_of_dates_df['as_of_date'] = pd.to_datetime(qra_universe_security_ciq_price_df['as_of_date'])
all_in_range = as_of_dates_df[as_of_dates_df['as_of_date']==last_month_end_date_datetime].empty
assert all_in_range == False , 'Not all dates are in given start and end date'
as_of_dates_df['datadate'] = pd.to_datetime(qra_universe_security_ciq_price_df['datadate'])
all_in_range = as_of_dates_df['datadate'].between(first_day_prev_date_month_datetime, current_day_prev_date_datetime).all()
assert all_in_range == True , 'Not all dates are in given start and end date'
#bulk_insert(qra_universe_security_ciq_price_df, qra_universe_security_ciq_price['schema'],qra_universe_security_ciq_price['name'], downstream_db, False)
print(f"insert {qra_universe_security_ciq_price_df.shape[0]} records into {qra_universe_security_ciq_price['schema']}.{qra_universe_security_ciq_price['name']}")
# load qra_universe_security_compustat_price_creation
qra_universe_security_compustat_price = {
"schema": "qra_scratch",
"name": "qra_universe_security_compustat_price",
"name_plural": "qra_universe_security_compustat_price",
"include_id_column":"N",
"id_cols": [
("id", "int64"),
],
"data_cols": [
("as_of_date", "date"),
("datadate", "date"),
("gvkey", "str"),
("iid", "str"),
("xf_trading_item_id", "nullable int64"),
("price_local_unadjusted", "nullable float64"),
("price_usd_unadjusted", "nullable float64"),
("ajexdi", "nullable float64"),
("divadjfactor", "nullable float64"),
("curcdd", "nullable str"),
],
}
qra_universe_security_compustat_price_df = execute_named_query("../sql/etl","qra_universe_security_compustat_price_creation_updated", downstream_db, {"last_month_end_date": last_month_end_date, "current_day_prev_date": current_day_prev_date,
"first_day_prev_date_month": first_day_prev_date_month})
qra_universe_security_compustat_price_df
qra_universe_security_compustat_price_df = qra_universe_security_compustat_price_df.rename(columns = {'tradingitemid':'xf_trading_item_id'})
qra_universe_security_compustat_price_df = ensure_schema(qra_universe_security_compustat_price_df, qra_universe_security_compustat_price)
assert len(qra_universe_security_compustat_price_df) != 0
duplicates = qra_universe_security_compustat_price_df.groupby(['xf_trading_item_id', 'datadate'])['xf_trading_item_id'].nunique()
assert not duplicates[duplicates > 1].any(), "more than one price value for a tradingitemid is found on a given date"
as_of_dates_df = qra_universe_security_compustat_price_df.copy()
as_of_dates_df['as_of_date'] = pd.to_datetime(qra_universe_security_compustat_price_df['as_of_date'])
all_in_range = as_of_dates_df[as_of_dates_df['as_of_date']==last_month_end_date_datetime].empty
assert all_in_range == False , 'Not all dates are in given last month end date'
as_of_dates_df['datadate'] = pd.to_datetime(qra_universe_security_compustat_price_df['datadate'])
all_in_range = as_of_dates_df['datadate'].between(first_day_prev_date_month_datetime, current_day_prev_date_datetime).all()
assert all_in_range == True , 'Not all dates are in given start and end date'
#bulk_insert(qra_universe_security_compustat_price_df, qra_universe_security_compustat_price['schema'],qra_universe_security_compustat_price['name'], downstream_db, False)
print(f"insert {qra_universe_security_compustat_price_df.shape[0]} records into {qra_universe_security_compustat_price['schema']}.{qra_universe_security_compustat_price['name']}")
# load qra_universe_asset_daily_return_creation
qra_universe_asset_daily_return = {
"schema": "qra_scratch",
"name": "qra_universe_security_daily_return",
"name_plural": "qra_universe_security_daily_return",
"include_id_column":"N",
"id_cols": [
("id", "int64"),
],
"data_cols": [
("date", "date"),
("cal_month_end_date", "date"),
("xf_trading_item_id", "nullable int64"),
("gvkey", "nullable str"),
("iid", "nullable str"),
("msci_price_return_local", "nullable float64"),
("msci_price_return_usd", "nullable float64"),
("msci_gross_total_return_local", "nullable float64"),
("msci_gross_total_return_usd", "nullable float64"),
("compustat_price_return_local", "nullable float64"),
("compustat_price_return_usd", "nullable float64"),
("compustat_total_return_local", "nullable float64"),
("compustat_total_return_usd", "nullable float64"),
("compustat_total_log_return_local", "nullable float64"),
("compustat_total_log_return_usd", "nullable float64"),
("ciq_price_return_local", "nullable float64"),
("ciq_price_return_usd", "nullable float64"),
("ciq_total_return_local", "nullable float64"),
("ciq_total_return_usd", "nullable float64"),
("ciq_total_log_return_local", "nullable float64"),
("ciq_total_log_return_usd", "nullable float64"),
("qra_total_return_local", "nullable float64"),
("qra_total_return_usd", "nullable float64"),
("qra_total_log_return_local", "nullable float64"),
("qra_total_log_return_usd", "nullable float64"),
],
}
qra_universe_asset_daily_return_df = execute_named_query("../sql/etl","qra_universe_asset_daily_return_creation_updated", downstream_db,
{"last_month_end_date": last_month_end_date, "current_day_prev_date": current_day_prev_date,
"first_day_prev_date_month": first_day_prev_date_month})
qra_universe_asset_daily_return_df = qra_universe_asset_daily_return_df.rename(columns={'datadate':'date',
'month_end': 'cal_month_end_date',
'msci_total_return_local': 'msci_gross_total_return_local',
'msci_total_return_usd': 'msci_gross_total_return_usd',
'comp_price_return_local': 'compustat_price_return_local',
'comp_price_return_usd': 'compustat_price_return_usd',
'comp_total_return_local': 'compustat_total_return_local',
'comp_total_return_usd': 'compustat_total_return_usd',
'comp_total_log_return_local': 'compustat_total_log_return_local',
'comp_total_log_return_usd': 'compustat_total_log_return_usd',
'ciq_price_return_usd': 'ciq_price_return_usd',
'ciq_total_return_usd': 'ciq_total_return_usd',
'ciq_total_log_return_usd': 'ciq_total_log_return_usd',
'qra_total_return_usd': 'qra_total_return_usd',
'qra_total_log_return_usd': 'qra_total_log_return_usd',
})
qra_universe_asset_daily_return_df
qra_universe_asset_daily_return_df = ensure_schema(qra_universe_asset_daily_return_df, qra_universe_asset_daily_return)
assert len(qra_universe_asset_daily_return_df) != 0, "qra_universe_asset_daily_return_df is empty"
duplicates = qra_universe_asset_daily_return_df.groupby(['xf_trading_item_id', 'date'])['xf_trading_item_id'].nunique()
assert not duplicates[duplicates > 1].any(), "more than one return value for a tradingItemId is found"
as_of_dates_df = qra_universe_asset_daily_return_df.copy()
as_of_dates_df['date'] = pd.to_datetime(qra_universe_asset_daily_return_df['date'])
all_in_range = as_of_dates_df['date'].between(first_day_prev_date_month_datetime, current_day_prev_date_datetime).all()
assert all_in_range == True , 'Not all dates are in given start and end date'
#bulk_insert(qra_universe_asset_daily_return_df, qra_universe_asset_daily_return['schema'],qra_universe_asset_daily_return['name'], downstream_db, False)
print(f"insert {qra_universe_asset_daily_return_df.shape[0]} records into {qra_universe_asset_daily_return['schema']}.{qra_universe_asset_daily_return['name']}")
![]() |
Notes is a web-based application for online taking notes. You can take your notes and share with others people. If you like taking long notes, notes.io is designed for you. To date, over 8,000,000,000+ notes created and continuing...
With notes.io;
- * You can take a note from anywhere and any device with internet connection.
- * You can share the notes in social platforms (YouTube, Facebook, Twitter, instagram etc.).
- * You can quickly share your contents without website, blog and e-mail.
- * You don't need to create any Account to share a note. As you wish you can use quick, easy and best shortened notes with sms, websites, e-mail, or messaging services (WhatsApp, iMessage, Telegram, Signal).
- * Notes.io has fabulous infrastructure design for a short link and allows you to share the note as an easy and understandable link.
Fast: Notes.io is built for speed and performance. You can take a notes quickly and browse your archive.
Easy: Notes.io doesn’t require installation. Just write and share note!
Short: Notes.io’s url just 8 character. You’ll get shorten link of your note when you want to share. (Ex: notes.io/q )
Free: Notes.io works for 14 years and has been free since the day it was started.
You immediately create your first note and start sharing with the ones you wish. If you want to contact us, you can use the following communication channels;
Email: [email protected]
Twitter: http://twitter.com/notesio
Instagram: http://instagram.com/notes.io
Facebook: http://facebook.com/notesio
Regards;
Notes.io Team