I faced an issue where, in order to use the Splunk python module (splunklib, splunk-sdk) for obtaining search results, I kept getting permission related errors. In multiple online posts it was suggested that one needs to be a Splunk Admin to use these. I was not able to obtain a Splunk admin user account. I decided to write a small Python script that logs in as a regular user through the Splunk web log in page, performs a search and then returns those search results. This was done using requests module. Can be optimized in numerous ways but it does the job in its current form too. Using Splunk Version 7.0.0. The final goal would be to include it as part of automated test suite that would check Splunk logs after running auto tests than fail if errors are present.
# The Beginning
# Some required imports
import requests,json, re, time
from bs4 import BeautifulSoup
# base Splunk URL in your set up
Splunk_base_url = "https://your.splunk.url"
# Assemble Splunk log in page URL
login_page_url = Splunk_base_url + "/en-US/account/login"
# headers for various POSTs below
headers = {
"User-Agent": "Mozilla/5.0 (Windows NT 6.1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/37.0.2062.120 Safari/537.36",
"Content-Type": "application/x-www-form-urlencoded;charset=UTF-8",
"Server": "Splunkd"
}
# Splunk username and password. Use your own.
username = "splunk_username"
password = "splunk_password"
# This request GETs Splunk log in page and starts the session.
s = requests.Session()
s.headers.update(headers)
login_page_get_request = s.get(login_page_url)
# We also need to parse this GET response html and obtain the cval parameter.
# cval is required for POST request later that will actually log us in.
result = re.search('"cval":(.*),"time":', login_page_get_request.text)
cval = result.group(1)
# Now let's log in.
# Data for log in POST
initial_login_data = {
"cval": cval,
"username": username,
"password": password,
"return_to": "/en-US/",
}
# Log in POST request is here
login_page_post_request = s.post(login_page_url, data=initial_login_data,headers= headers)
# Now that we are logged in, we need to get the FORM_KEY parameter which is required later.
# FORM_KEY can be obtained by GETting JSON response from the url below.
get_config_url = Splunk_base_url + "/en-US/config"
get_config = s.get(get_config_url)
# Parse returned JSON and define FORM_KEY
get_config_json = json.loads(get_config.text)
FORM_KEY = get_config_json["FORM_KEY"]
# Define the URL for the search POST
post_search_url = Splunk_base_url + "/en-US/splunkd/__raw/services/search/jobs"
# Data required for the search POST
# Please note that this is where the actual search query is. You can replace with anything, include variables and similar.
post_search_body = {'search': 'search index=* AND (level NOT (WARN OR INFO OR DEBUG)) earliest=-30minutes'}
# headers required for the search POST
headers = {
"X-Requested-With" : "XMLHttpRequest",
"User-Agent": "Mozilla/5.0 (Windows NT 6.1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/37.0.2062.120 Safari/537.36",
"Content-Type": "application/x-www-form-urlencoded;charset=UTF-8",
"X-Splunk-Form-Key": FORM_KEY
}
# Execute the search POST here
post_search = s.post(url=post_search_url,data=post_search_body,headers=headers)
# Ge the search job sid from the search POST XML response
soup = BeautifulSoup(post_search.text, "lxml")
search_job_sid = soup.find("sid").text
# Now, one needs to wait for the search to be completed. There are ways to implement this elegantly. This is not that way. I just limit to 5 seconds as that always works in my environment.
# Explicit wait (not optimal) while the results are obtained.
time.sleep(5)
# Now execute GET with that job sid to get the actual search results in JSON format
get_search_results_url = Splunk_base_url + "/en-US/splunkd/__raw/services/search/jobs/" + search_job_sid + "/results?output_mode=json&offset=0&count=20"
get_search_results = s.get(get_search_results_url)
# Load up the returned JSON from request above that contains the search results
get_search_results_json = json.loads(get_search_results.text)
# Some rudimentary formatting to parse search result JSON and show the search results one by one. Better to use your own.
issue_number = 1
if not get_search_results_json["results"]:
print ("No search results returned! Bye ...")
else:
for splunk_results in get_search_results_json["results"]:
print ("============== issue start ================")
print ("issue #" + str(issue_number))
print ("")
print (splunk_results["_raw"])
print ("")
issue_number =issue_number + 1
print ("============== issue end ================")
# The End