Notes![what is notes.io? What is notes.io?](/theme/images/whatisnotesio.png)
![]() ![]() Notes - notes.io |
import pymongo
from pymongo import MongoClient
import datetime
from sql_dbManagement import SQLDBManagement
logger = logging.getLogger(__name__)
class DatabaseManagement:
def __init__(self):
try:
database_name = "TestResultsDisplay"
logger.debug("Connecting to DB: {}".format(database_name))
self.mongoClient = MongoClient(
"mongodb://admin:[email protected]:27017/?authSource=admin"
)
self.database = self.mongoClient[database_name]
self.testResultUpdateFlag = False
self.database.TestCaseResults.create_index(
[
("testCaseUniqueID", pymongo.DESCENDING),
],
unique=True,
)
self.database.RSP_Results.create_index(
[
("testCaseUniqueID", pymongo.DESCENDING),
],
unique=True,
)
self.TestCaseResults = self.database.TestCaseResults
self.RSPTestCaseResults = self.database.RSP_Results
self.sqlDBManagement=SQLDBManagement()
logger.debug("Connected to mongo database successfully")
except Exception as err:
logger.error(str(err))
self.mongoClient = None
self.database = None
self.connectionError = err
logger.error(err)
def updatefilenameinDB(self, filename, uniqueid, index_to_update):
result = self.database.RSP_Results.update_one(
{"testCaseUniqueID": uniqueid},
{
"$set": {
f"testCaseResults.{index_to_update}.reportFileName": filename
}
}
)
if result.modified_count > 0:
logger.info("{} updated successfully".format(filename))
def updateTestResults(self, testResults):
self.database.TestCaseResults.update_one(
{"testCaseUniqueID": testResults["testCaseUniqueID"]},
{
"$set": {
"testCaseResults": testResults["testCaseResults"],
"testCaseCount": testResults["testCaseCount"],
"reportFileName": testResults["fileName"],
"statistics": testResults["statistics"],
"safety_test_cases": testResults["safety_test_cases"],
"non_safety_test_cases": testResults["non_safety_test_cases"],
"safetyStat": testResults["safetyStat"],
"NonsafetyStat": testResults["NonsafetyStat"],
"testTimeStart": testResults["testTimeStart"],
"testTimeEnd": testResults["testTimeEnd"],
"softwareVersion": testResults["softwareVersion"],
"lastUpdated_IST": testResults["lastUpdated_IST"],
"comment": "",
}
},
upsert=True,
)
def insertRSPTestResults(self, testResults):
isRecordAlreadyPresent = self.database.RSP_Results.find_one(
{"testCaseUniqueID": testResults["testCaseUniqueID"]}
)
if isRecordAlreadyPresent is None:
self.database.RSP_Results.insert_one(
{
"testCaseUniqueID": testResults["testCaseUniqueID"],
"testTimeStart": testResults["testTimeStart"],
"testCaseResults": testResults["testCaseResults"],
},
)
self.testResultUpdateFlag = True
else:
self.updateRSPTestResults(isRecordAlreadyPresent, testResults)
def updateRSPTestResults(self, isRecordAlreadyPresent, testResults):
if isRecordAlreadyPresent['testTimeStart'] < testResults["testTimeStart"] and not (self.testResultUpdateFlag):
self.database.RSP_Results.update_one(
{"testCaseUniqueID": testResults["testCaseUniqueID"]},
{
'$unset': {
"testCaseResults": 1,
},
},
upsert=True,
)
self.database.RSP_Results.update_one(
{"testCaseUniqueID": testResults["testCaseUniqueID"]},
{
'$set': {
"testTimeStart": testResults["testTimeStart"],
"testCaseResults": testResults["testCaseResults"],
},
},
upsert=True,
)
self.testResultUpdateFlag = True
else:
self.database.RSP_Results.update_one(
{"testCaseUniqueID": testResults["testCaseUniqueID"]},
{
'$set': {
"testTimeStart": testResults["testTimeStart"],
},
'$addToSet': {
"testCaseResults":
testResults["testCaseResults"][0],
},
},
upsert=True,
)
def updateStatisticsToCurrentDate(self, testCaseUniqueID, data):
self.database.RSP_Results.update_one(
{"testCaseUniqueID": testCaseUniqueID},
{
"$set": {
"statistics": data,
}
}, )
def updateTestComment(self, keyID, comment):
self.sqlDBManagement.updateTestcaseComment(keyID, comment)
#--Below code is for mongodb
# self.database.TestCaseResults.update_one(
# {"testCaseUniqueID": testCaseUniqueID},
# {
# "$set": {
# "comment": comment,
# }
# },
# )
def getKeyID(self, dates, mainprd, subprd):
key= self.sqlDBManagement.getKeyID(dates, mainprd, subprd)
return key
def getTestCaseResults(self, KeyID):
testCaseResults=self.sqlDBManagement.getTestCaseResultBykeyID(KeyID)
# --Below code is for mongodb
# testCaseResults = self.database.TestCaseResults.find_one(
# {"testCaseUniqueID": KeyID}
# )
return testCaseResults
def getCommentByKeyID(self,KeyID):
return self.sqlDBManagement.getCommentByKeyID(KeyID)
def RSPEnduranceRunResults(self, testCaseUniqueID):
RSPtestCaseResults = self.database.RSP_Results.find_one(
{"testCaseUniqueID": testCaseUniqueID}
)
return RSPtestCaseResults
def get30DaysTestStatistics(self, text):
tod = datetime.datetime.now()
delta = datetime.timedelta(days=30)
last30DayDate = tod - delta
if text == "RSP Endurance Test":
testCaseResults = self.database.RSP_Results.find(
{"testTimeStart": {"$gt": last30DayDate}}
).sort("testTimeStart", 1)
return testCaseResults
else:
return self.sqlDBManagement.get30DaysTestStatistics(tod,last30DayDate,'TBM_CHERRY')
#---Below code is for mongodb acess
# testCaseResults = self.database.TestCaseResults.find(
# {"testTimeStart": {"$gt": last30DayDate}}
# ).sort("testTimeStart", 1)
# return testCaseResults
def get_existing_report_filename(self, testcaseuniqueid, index):
result = self.database.RSP_Results.find_one(
{"testCaseUniqueID": testcaseuniqueid},
)
if bool(result):
for file in result['testCaseResults']:
try:
if file[index]['reportFileName']:
return file['testCaseResults'][index]['reportFileName']
except:
return False
if __name__ == "__main__":
obj = DatabaseManagement()
obj.getTestCaseResults("7524803c7b735085c5e9f6515d73185f839f0dfa40b521d37e4d7452")
![]() |
Notes is a web-based application for online taking notes. You can take your notes and share with others people. If you like taking long notes, notes.io is designed for you. To date, over 8,000,000,000+ notes created and continuing...
With notes.io;
- * You can take a note from anywhere and any device with internet connection.
- * You can share the notes in social platforms (YouTube, Facebook, Twitter, instagram etc.).
- * You can quickly share your contents without website, blog and e-mail.
- * You don't need to create any Account to share a note. As you wish you can use quick, easy and best shortened notes with sms, websites, e-mail, or messaging services (WhatsApp, iMessage, Telegram, Signal).
- * Notes.io has fabulous infrastructure design for a short link and allows you to share the note as an easy and understandable link.
Fast: Notes.io is built for speed and performance. You can take a notes quickly and browse your archive.
Easy: Notes.io doesn’t require installation. Just write and share note!
Short: Notes.io’s url just 8 character. You’ll get shorten link of your note when you want to share. (Ex: notes.io/q )
Free: Notes.io works for 14 years and has been free since the day it was started.
You immediately create your first note and start sharing with the ones you wish. If you want to contact us, you can use the following communication channels;
Email: [email protected]
Twitter: http://twitter.com/notesio
Instagram: http://instagram.com/notes.io
Facebook: http://facebook.com/notesio
Regards;
Notes.io Team