Notes
![]() ![]() Notes - notes.io |
import sys
import pandas as pd
import subprocess
from datetime import datetime
import logging
import traceback
import requests
import json
import os
import time
from io import BytesIO, StringIO
from requests.packages.urllib3.exceptions import InsecureRequestWarning
requests.packages.urllib3.disable_warnings(InsecureRequestWarning)
from concurrent.futures import ThreadPoolExecutor
import gc
# Logger
logging.basicConfig(level=logging.INFO, format='%(asctime)s :: %(levelname)s :: %(message)s')
logger = logging.getLogger('qrt-qh-frg-storeleads-data-cronjob-logs')
cronjob_name = 'aa00002695-qrt-qh-frg-storeleads-data-cronjob'
logger.info("aa00002695-qrt-qh-frg-storeleads-data-cronjob CronJob Started")
ENV = os.environ["ENV"]
BUCKET_NAME = os.environ['BUCKET_NAME']
FOLDER_NAME_RAW = os.environ['FOLDER_NAME_RAW']
FOLDER_NAME_TECH = os.environ['FOLDER_NAME_TECH']
email_api_url = os.environ['email_api_url']
headers = {
'Authorization': 'Bearer 946fb9b2-81da-4fc2-5141-53d36b5a',
}
# EmailFunction
logger.info(f"Setting email function")
def send_email_with_attachment(body, subject):
try:
s = requests.Session()
res = s.get(f'{email_api_url}get_token/[email protected]', verify=False)
token = res.json()["token"]
s.headers.update({"Authorization": f"Bearer {token}"})
payload = {
"to_list": ['[email protected]','[email protected]', '[email protected]', '[email protected]']
, "from_id": '[email protected]'
, "sub": subject
, "message": body
, "cc_list": []
,"cc_qh": False
}
resp = s.post(f"{email_api_url}send_email_with_attachments",
data=payload,
verify=False)
logger.info(f"Email API response code : {resp.status_code}")
except Exception as e:
logger.info(f"Email API Failed : {e}")
#function to validate AWS credentials
def validate_aws_credentials():
try:
logger.info('Validating AWS credentials')
# Create a session using your AWS credentials
session = boto3.Session()
# Get the current identity (user or role) using the session
sts_client = session.client("sts")
response = sts_client.get_caller_identity()
# Print the details of the authenticated identity
logger.info("Authenticated Identity:")
logger.info(f"User ID: {response['UserId']}")
logger.info(f"Account ID: {response['Account']}")
logger.info(f"ARN: {response['Arn']}")
except Exception as e:
print(f"Error validating credentials: {e}")
validate_aws_credentials()
logger.info("DONE WITH AWS CREDENTIALS VALIDATION")
logger.info("AWS CLI command Process starting...")
# List all objects from the folder
def list_s3_files(prefix):
session = boto3.Session()
s3 = session.resource('s3')
bucket = s3.Bucket(BUCKET_NAME)
try :
files = [obj.key.split('/')[-1] for obj in bucket.objects.filter(Prefix=prefix)]
return files
except Exception as e:
exception = f'Exception occured at list_s3_files :{str(e)}'
return [exception]
'''
Figure out the missing files from the folder and save them into it as csv files.
Send mail notification if we are in Production.
'''
def save_dataframe(df, prefix):
s3_files = list_s3_files(prefix)
if 'Exception occured at list_s3_files' not in s3_files[0]:
exist_files = [file.split('.')[0] for file in s3_files if file != '']
missing_files = df[~df['labels'].isin(exist_files)]
session = boto3.Session()
s3 = session.resource('s3')
bucket = s3.Bucket(BUCKET_NAME)
failure_files = []
logger.info(f'missing files in {prefix} are {missing_files["labels"]}')
for file in missing_files['labels']:
status = 429
logger.info(f"file : {file}")
while status == 429:
try :
if prefix == 'Datastore/StoreLeads/raw_data_technologies':
response = requests.get(
f'https://storeleads.app/json/api/v1/all/historical/domain/{file}?fields=platform,name,technologies.name',
headers=headers,
)
else:
response = requests.get(
f'https://storeleads.app/json/api/v1/all/historical/domain/{file}?fields=platform,name,rank,estimated_page_views,gggggggg,last_platform_change_at,country_code',
headers=headers,
)
logger.info(f"status :{response.status_code}")
if response.status_code == 200:
df_temp = []
chunk_size = 10000
for chunk in pd.read_json(StringIO(response.text),lines = True , chunksize = chunk_size):
df_temp.append(chunk)
df = pd.concat(df_temp)
logger.info("df is generated!!")
location = f'{prefix}/{file}.csv'
csv_buffer = StringIO()
df.to_csv(csv_buffer, index=False)
logger.info("df converted to csv.")
bucket.Object(key=location).put(Body=csv_buffer.getvalue())
logger.info("object is created in s3 successfully!!")
del df
del response
gc.collect()
status = 200
logger.info('AWS command executed successfully.')
elif response.status_code == 429:
failure_files.append(file)
logger.info(f'429 state {file}')
time.sleep(int(response.headers["Retry-After"])+1)
else:
failure_files.append(file)
logger.info(f'failure case of {file}, {response.status_code} ')
continue
except Exception as e:
failure_files.append(file)
logger.info(f"Error executing AWS command: {e}")
subject = f"{cronjob_name} successful"
logger.info(f"{cronjob_name} executed successfully")
body = f"Cronjob {cronjob_name} executed successfully, not processed files are {failure_files}"
if ENV == 'prod':
send_email_with_attachment(body, subject)
else:
subject = f"{cronjob_name} Failed"
logger.info(f"Cronjob failed : s3_files[0]")
body = f"Cronjob failed : s3_files[0]"
if ENV == 'prod':
send_email_with_attachment(body, subject)
try :
s3 = boto3.resource('s3')
bucket = s3.Bucket(BUCKET_NAME)
client = boto3.client('s3')
paginator = client.get_paginator('list_objects_v2')
# historical data available
response = requests.get(
'https://storeleads.app/json/api/v1/all/historical/domain',
headers=headers,
)
labels = pd.DataFrame(json.loads(response.content))
save_dataframe(labels,FOLDER_NAME_RAW)
save_dataframe(labels,FOLDER_NAME_TECH)
except Exception as e:
print("Exception e :", e)
logger.info(f"Error executing AWS command: {e}")
subject = f"{cronjob_name} Failed"
body = f"Cronjob failed : {e}"
if ENV == 'prod':
send_email_with_attachment(body, subject)
![]() |
Notes is a web-based application for online taking notes. You can take your notes and share with others people. If you like taking long notes, notes.io is designed for you. To date, over 8,000,000,000+ notes created and continuing...
With notes.io;
- * You can take a note from anywhere and any device with internet connection.
- * You can share the notes in social platforms (YouTube, Facebook, Twitter, instagram etc.).
- * You can quickly share your contents without website, blog and e-mail.
- * You don't need to create any Account to share a note. As you wish you can use quick, easy and best shortened notes with sms, websites, e-mail, or messaging services (WhatsApp, iMessage, Telegram, Signal).
- * Notes.io has fabulous infrastructure design for a short link and allows you to share the note as an easy and understandable link.
Fast: Notes.io is built for speed and performance. You can take a notes quickly and browse your archive.
Easy: Notes.io doesn’t require installation. Just write and share note!
Short: Notes.io’s url just 8 character. You’ll get shorten link of your note when you want to share. (Ex: notes.io/q )
Free: Notes.io works for 14 years and has been free since the day it was started.
You immediately create your first note and start sharing with the ones you wish. If you want to contact us, you can use the following communication channels;
Email: [email protected]
Twitter: http://twitter.com/notesio
Instagram: http://instagram.com/notes.io
Facebook: http://facebook.com/notesio
Regards;
Notes.io Team