ATS : Using the HTTP DB Logger's API via Python

#!/usr/bin/python3

import http.client
import json
import time
import os
import requests

# HttpDbLogger application host (it will be some kind of Java applications server - Tomcat/Jetty/etc
WEBSERVER_URL = "localhost:8080"
# ats-httpdblogger-4.0.6 is the name of the deployed .war file
# and /service/logger is the full path to the logger sevice
HTTP_DB_LOGGER_LOGGER_URL = "/ats-httpdblogger-4.0.6/service/logger"

# Relative URLs for each logging operation
START_RUN_REL_URL = "startRun"
START_SUITE_REL_URL = "startSuite"
START_TESTCASE_REL_URL = "startTestcase"
INSERT_MESSAGE_REL_URL = "insertMessage"
INSERT_MESSAGES_REL_URL = "insertMessages"
ADD_RUN_METAINFO_REL_URL = "addRunMetainfo"
ATTACH_FILE_REL_URL = "attachFile"
ADD_SCENARIO_METAINFO_REL_URL = "addScenarioMetainfo"
ADD_TESTCASE_METAINFO_REL_URL = "addTestcaseMetainfo"
UPDATE_RUN_REL_URL = "updateRun"
UPDATE_SUITE_REL_URL = "updateSuite"
END_TESTCASE_REL_URL = "endTestcase"
END_SUITE_REL_URL = "endSuite"
END_RUN_REL_URL = "endRun"

# JSON keys, used in the creation the requests' bodies
START_RUN_KEYS = ["dbHost", "dbPort", "dbName", "dbUser", "dbPassword", "runName", "osName", "productName", "buildName", "versionName", "userNote", "hostName", "timestamp"]
START_SUITE_KEYS = ["suiteName", "packageName", "timestamp", "parentId", "parentType","sessionId"]
START_TESTCASE_KEYS = ["sessionId", "testcaseName", "scenarioName", "scenarioDescription", "timestamp", "parentId", "parentType"]
INSERT_MESSAGE_KEYS = ["sessionId", "machineName", "threadName", "level", "message", "timestamp", "parentId", "parentType"]
INSERT_MESSAGES_KEYS = ["sessionId", "parentType", "parentId", "timestamp", "messages"]
ADD_RUN_METAINFO_KEYS = ["sessionId","metaKey","metaValue"]
ATTACH_FILE_KEYS = ["sessionId","parentId","fileName"]
ADD_SCENARIO_METAINFO_KEYS = ["sessionId","parentId","metaKey","metaValue"]
ADD_TESTCASE_METAINFO_KEYS = ["sessionId","parentId","metaKey","metaValue"]
UPDATE_RUN_KEYS = ["sessionId", "runName", "osName", "productName", "buildName", "versionName", "userNote", "hostName"]
UPDATE_SUITE_KEYS = ["sessionId", "suiteId", "suiteName", "userNote"]
END_TESTCASE_KEYS = ["sessionId", "result", "timestamp", "testcaseId"]
END_SUITE_KEYS = ["suiteId", "sessionId", "timestamp"]
END_RUN_KEYS = ["sessionId", "timestamp"]

# enable logging of message on each HTTP method execution
DEBUG=True

# this function will additionally process the json value
def checkJsonValue(value):
	if value == None:
		return "null"
	elif type(value) is list:
		return json.dumps(value)
	else:
		return "\""+str(value)+"\""


# this can be replaced with the built-in JSON functionality
def createJson(keys, values):
	json = "{"
	for i in range(len(keys)):
		json += "\"" + keys[i] + "\":" + checkJsonValue(values[i]) + ","
	json = json[0:len(json)-1] # remove trailing comma
	json += "}"
	return json

def doHttpMethod(methodName, baseUrl, relUrl, headers={}, body=None):
	if DEBUG:
		print("Executing '{}' HTTP method to URL '{}' with headers '{}' and body '{}'".format(methodName, baseUrl+"/"+relUrl, headers, body))
	conn = http.client.HTTPConnection(baseUrl)
	conn.request(methodName, relUrl, body, headers)
	resp = conn.getresponse().read().decode("utf-8")
	conn.close()
	return resp
	
def httpPost(baseUrl, relUrl, headers, body):
	return doHttpMethod("POST", baseUrl, relUrl, headers, body)
	
def httpGet(baseUrl, relUrl, headers):
	return doHttpMethod("GET", baseUrl, relUrl, headers)
	
def httpPut(baseUrl, relUrl, headers, body):
	return doHttpMethod("PUT", baseUrl, relUrl, headers, body)


def startRun(dbHost, dbPort, dbName, dbUser, dbPassword, runName, osName, productName, buildName, versionName, userNote, hostName, timestamp):
	relUrl = HTTP_DB_LOGGER_LOGGER_URL+"/"+START_RUN_REL_URL
	headers = {"Content-type": "application/json", "Accept": "application/json"}
	body = createJson(START_RUN_KEYS, [dbHost, dbPort, dbName, dbUser, dbPassword, runName, osName, productName, buildName, versionName, userNote, hostName, timestamp])
	return httpPost(WEBSERVER_URL, relUrl, headers, body)
	
def startSuite(suiteName, packageName, timestamp, parentId, parentType, sessionId):
	relUrl = HTTP_DB_LOGGER_LOGGER_URL+"/"+START_SUITE_REL_URL
	headers = {"Content-type": "application/json", "Accept": "application/json"}
	body = createJson(START_SUITE_KEYS, [suiteName, packageName, timestamp, parentId, parentType, sessionId])
	return httpPost(WEBSERVER_URL, relUrl, headers, body)
	
def startTestcase(sessionId, testcaseName, scenarioName, description, timestamp, parentId, parentType):
	relUrl = HTTP_DB_LOGGER_LOGGER_URL+"/"+START_TESTCASE_REL_URL
	headers = {"Content-type": "application/json", "Accept": "application/json"}
	body = createJson(START_TESTCASE_KEYS, [sessionId, testcaseName, scenarioName, description, timestamp, parentId, parentType])
	return httpPost(WEBSERVER_URL, relUrl, headers, body)
	
def endTestcase(sessionId, result, timestamp, testcaseId):
	relUrl = HTTP_DB_LOGGER_LOGGER_URL+"/"+END_TESTCASE_REL_URL
	headers = {"Content-type": "application/json", "Accept": "application/json"}
	body = createJson(END_TESTCASE_KEYS, [sessionId, result, timestamp, testcaseId])
	return httpPost(WEBSERVER_URL, relUrl, headers, body)
	
def endSuite(suiteId, sessionId, timestamp):
	relUrl = HTTP_DB_LOGGER_LOGGER_URL+"/"+END_SUITE_REL_URL
	headers = {"Content-type": "application/json", "Accept": "application/json"}
	body = createJson(END_SUITE_KEYS, [suiteId, sessionId, timestamp])
	return httpPost(WEBSERVER_URL, relUrl, headers, body)

def endRun(sessionId, timestamp):
	relUrl = HTTP_DB_LOGGER_LOGGER_URL+"/"+END_RUN_REL_URL
	headers = {"Content-type": "application/json", "Accept": "application/json"}
	body = createJson(END_RUN_KEYS, [sessionId, timestamp])
	return httpPost(WEBSERVER_URL, relUrl, headers, body)
	
def insertMessage(sessionId, machineName, threadName, level, message, timestamp, parentId, parentType):
	relUrl = HTTP_DB_LOGGER_LOGGER_URL+"/"+INSERT_MESSAGE_REL_URL
	headers = {"Content-type": "application/json", "Accept": "application/json"}
	body = createJson(INSERT_MESSAGE_KEYS, [sessionId, machineName, threadName, level, message, timestamp, parentId, parentType])
	return httpPost(WEBSERVER_URL, relUrl, headers, body)
	
def insertMessages(sessionId, parentType, parentId, timestamp, messages):
	relUrl = HTTP_DB_LOGGER_LOGGER_URL+"/"+INSERT_MESSAGES_REL_URL
	headers = {"Content-type": "application/json", "Accept": "application/json"}
	body = createJson(INSERT_MESSAGES_KEYS, [sessionId, parentType, parentId, timestamp, messages])
	return httpPost(WEBSERVER_URL, relUrl, headers, body)
	
def updateRun(sessionId, runName, osName, productName, buildName, versionName, userNote, hostName):
	relUrl = HTTP_DB_LOGGER_LOGGER_URL+"/"+UPDATE_RUN_REL_URL
	headers = {"Content-type": "application/json", "Accept": "application/json"}
	body = createJson(UPDATE_RUN_KEYS, [sessionId, runName, osName, productName, buildName, versionName, userNote, hostName])
	return httpPost(WEBSERVER_URL, relUrl, headers, body)
	
def updateSuite(sessionId, suiteId, suiteName, userNote):
	relUrl = HTTP_DB_LOGGER_LOGGER_URL+"/"+UPDATE_SUITE_REL_URL
	headers = {"Content-type": "application/json", "Accept": "application/json"}
	body = createJson(UPDATE_SUITE_KEYS, [sessionId, suiteId, suiteName, userNote])
	return httpPost(WEBSERVER_URL, relUrl, headers, body)
	
def addRunMetaInfo(sessionId, metaKey, metaValue):
	relUrl = HTTP_DB_LOGGER_LOGGER_URL+"/"+ADD_RUN_METAINFO_REL_URL
	headers = {"Content-type": "application/json", "Accept": "application/json"}
	body = createJson(ADD_RUN_METAINFO_KEYS, [sessionId, metaKey, metaValue])
	return httpPost(WEBSERVER_URL, relUrl, headers, body)
	
def attachFile(sessionId, parentId, fileName, fileStream):
	relUrl = 'http://' + WEBSERVER_URL + HTTP_DB_LOGGER_LOGGER_URL + "/" + ATTACH_FILE_REL_URL
	print("Executing '{}' HTTP method to URL '{}'".format('PUT', relUrl))
	attach_file_details = json.dumps(createJson(ATTACH_FILE_KEYS, [sessionId, parentId, fileName]))
	return requests.put(relUrl, files={'attach_file_details': attach_file_details, 'stream' : fileStream})

def addScenarioMetaInfo(sessionId, testcaseId, metaKey, metaValue):
	relUrl = HTTP_DB_LOGGER_LOGGER_URL+"/"+ADD_SCENARIO_METAINFO_REL_URL
	headers = {"Content-type": "application/json", "Accept": "application/json"}
	body = createJson(ADD_SCENARIO_METAINFO_KEYS, [sessionId, testcaseId, metaKey, metaValue])
	return httpPost(WEBSERVER_URL, relUrl, headers, body)


# variables for storing responses' data
sessionId = None
runIds = []
suiteIds = []
testcaseIds = []

# start a run
responseBody = startRun("localhost", "", "TestDb","AtsUser","AtsPassword","testRun1","LINUX","Ats HttpDbLogger","1","4.0.6",
						"first run","ws",str(int(round(time.time() * 1000)))) # if the db port is skipped, the default one for MSSQL will be used (1433)
print(responseBody)
responseBody = json.loads(responseBody)
sessionId = responseBody['sessionId']
runId = responseBody['runId']
runIds.append(runId)

# start a suite
responseBody = startSuite("SuiteOne", "com.example.ats", str(int(round(time.time() * 1000))), "-1", "RUN", sessionId)
responseBody = json.loads(responseBody)
suiteId = responseBody['suiteId']
suiteIds.append(suiteId)

# start a testcase
responseBody = startTestcase(sessionId, "TestCase1", "Scenario1", "The first scenario/testcase", str(int(round(time.time() * 1000))), "-1", "SUITE")
responseBody = json.loads(responseBody)
testcaseId = responseBody['testcaseId']
testcaseIds.append(testcaseId)

# insert single run and suite message
insertMessage(sessionId, "ws", "main", "INFO", "Some run message with log level INFO", str(int(round(time.time() * 1000))), str(runId), "RUN")
for i in suiteIds:
	insertMessage(sessionId, "ws", "main", "INFO", "Some suite message with log level INFO", str(int(round(time.time() * 1000))), str(i), "SUITE")
	

# simulate some work by sleeping for 1 minute
os.system("sleep 10")

# create simple file for each testcase and attach that file to the corresponsing testcase
for i in testcaseIds:
	f = open('log_file_' + str(i) + '.txt', 'w+');
	f.write('Some log file');
	f.flush()
	f.close()
	f = open('log_file_' + str(i) + '.txt', 'rb');
	attachFile(sessionId, str(i),'log_file_' + str(i) + '.txt', f);
	f.close()

# end all testcases
# in order to do that, we have to disable/skip the default event state checking
# and this is done by simply providing testcaseId, instead of -1
# now instead of checking whether we can end a testcase
# the HttpDbLogger will just check if the testcase, which we want to end is indeed already running and whether it was started by the current session
for i in testcaseIds:
		endTestcase(sessionId, "PASSED", str(int(round(time.time() * 1000))), str(i)) # here we are providing the testcaseId
		

# insert more than one message for the first testcase
# as you can see, by providing parentType and parentId,
# we can insert messages in already ended/closed testcases
# we also provide custom time stamp, which is the current time of the test executor (e.g. our machine where this script will be executed)
message1 = {
	'machineName':'ws',
	'threadName':'main',
	'level':'ERROR',
	'message':'1/2 testcase messages logged at once',
	'timestamp':str(int(round(time.time() * 1000))),
	'parentId':str(testcaseIds[0]),
	'parentType':'TESTCASE'
}
message2 = {
	'machineName':'ws',
	'threadName':'main',
	'level':'ERROR',
	'message':'2/2 testcase messages logged at once',
	'timestamp':'-1',
	'parentId':str(testcaseIds[0]),
	'parentType':'TESTCASE'
}
messages = [
	message1,
	message2
] # TODO dump messages list to a valid JSON string
insertMessages(sessionId, "TESTCASE", str(i), '-1', messages)


# end all suites
# again as with the ending of all the testcases,
# we are providing suiteId, so the HttpDbLogger will skip any event logging state checking, check if the requested suite (refered by the suiteId) is started by the current session
# and if everything is OK, it will stop the suite
# if an error occurred, it's cause and Java stacktrace will be returned with the response's body
for i in suiteIds:
	endSuite(str(i), sessionId, str(int(round(time.time() * 1000))))
	
# end the run
endRun(sessionId, str(int(round(time.time() * 1000))))

# update the already ended run
updateRun(sessionId, "New testRun1", None, None, None, None, None, "my workstation at home")

# update each suite, that was started by this session
for i in suiteIds:
	updateSuite(sessionId, str(i), "Some new suite name for suite with ID " + str(i), "Suite with ID " + str(i))

# add run metainfo
# currently the only available metaKey value is type
addRunMetaInfo(sessionId, "type", "progression")

# add scenario metainfo for each testcase
for i in testcaseIds:
	if i % 2 == 0:
		addScenarioMetaInfo(sessionId, str(i), "group", "even")
		addTestcaseMetaInfo(sessionId, str(i), "group", "even")
	else:
		addScenarioMetaInfo(sessionId, str(i), "group", "odd")
		addTestcaseMetaInfo(sessionId, str(i), "group", "odd")





Go to parent page

Go to Table of Contents