Hello stranger! If you are looking for an article on how to query RiseCodes.com API using Python programming language, then you are in the right place.
Let’s begin with some general information about the company. RiseCodes uses AI and machine learning to optimize programmatic advertising for publishers, enhancing website traffic and maximizing revenue. Their platform is designed to improve auction logic, search quality, and ad relevance, driving a 30% uplift in publisher revenue. They are also involved in collective efforts to fight ad fraud as a founding member of the Human Collective. Rise focuses on delivering efficient, AI-driven solutions to elevate the digital advertising landscape.
REMINDER: When you copy the code make sure to keep proper indentations, otherwise the code won’t work. You probably know that Python loves indentations, why am I telling you this… Okay, let’s begin.
As always, we will start by importing the necessary libraries.
import os
import csv
import json
import time
import requests
import datetime
from custom_library import get_secret, get_bucket, upload_s3
The block above imports necessary libraries for file handling (os
, csv
, json
), API interaction (requests
), time manipulation (datetime
, time
), and custom helper functions from custom_library
. The custom library we will discuss in a separate article.
def api_query(**context):
Next, we define the function where all the code is going to live. The function takes a context, which holds data passed from Apache Airflow.
try:
today = context['ti'].xcom_pull(key="init_today")
except:
today = str(datetime.date.today())
end = datetime.datetime.strptime(today, '%Y-%m-%d').date() - datetime.timedelta(days = 1)
start = end - datetime.timedelta(days=7)
print(f'start: {start} end: {end}')
In this example, we are going to pull today’s date value from the context variable. If there is an issue pulling the data from the context variable, then the value will be generated by the datetime library.
Then start and report dates are generated and printed out. Those dates define the date range for the report: the previous week starting from the day before today.
# get_secret('rise-api-creds', 'creds')
# key = eval(os.environ['creds'])['api_key']
key = 'I_AM_THE_KEY'
print('requesting report')
body = {
"name": "the_report",
"activityType": "hb",
"dimensions": ["date", "domain", "mediaType", "deviceType"],
"metrics": ["impressions", "revenue", "clicks"],
"filters": [{
"key": "impressions",
"rel": "gt",
"values": ["0"]
}],
"timeZone": "Etc/UTC",
"endDate": str(end),
"startDate": str(start)
}
url = 'https://reports-api.yellowblue.io/reports'
headers = {'x-api-key': key}
res = requests.post(url, headers=headers, json=body)
id = json.loads(res.text)['id']
After the dates are set up, we will set the key variable to hold the API key. I use a custom function in my codes to pull the API keys from AWS Secrets Manager. I am going to discuss this function in detail in some other article. So for simplicity’s sake, I hardcoded the key variable.
Next, as it’s printed in the message, we will build a request body to request a report ID for the next API call.
The report body has the following attributes: report name, activity type (header bidding), dimensions, metrics, filters (impressions > 0), time zone, and report start/end dates. The start and end dates are supplied dynamically from the start and end variables.
Then we define the URL endpoint to query and headers variables. The header variable will be holding a dictionary with one API key-value pair.
Finally, we are sending a POST request to the endpoint using the requests library. The URL, headers, and body are supplied as parameters for the post method.
print('fetching report')
url = f'https://reports-api.yellowblue.io/reports/{id}'
headers = {'x-api-key': key}
res = requests.get(url, headers=headers)
res = json.loads(res.text)
status = res.get('status')
count = 1
while status != 'DONE':
print(f'report status: {status}\n\twaiting 15 sec\n\tattempt {count}')
time.sleep(15)
res = requests.get(url, headers=headers)
res = json.loads(res.text)
status = res.get("status")
count += 1
if count == 10:
raise f'API ERROR! Loop counter {count}, status {status}'
After we requested the report (told the API ‘Hey, start cooking the report’) and received the report ID, we should check if the report is ready. To do that we will query the same endpoint but with the report ID at the end and see what status it will return. If the request returns “DONE” then the report is ready otherwise the code will wait for 15 seconds and send the request again. After 10 attempts an error will be raised to flag a potential infinity loop.
raw_data = res['data']
Extracts the report data from the API response dictionary. By the way, besides the data key, the res dictionary has also totalRows and status keys. The status key we used earlier and the totalRows won’t be used. The nextPageToken key may be present if the report has more than one page. In that case, we need to handle the pagination.
npt = res.get('nextPageToken')
while npt:
params = {'pageToken': npt}
res = requests.get(url, headers=headers, params=params)
res = json.loads(res.text)
raw_data += res['data']
npt = res.get('nextPageToken')
The pagination logic is the following:
Get nextPageToken value from the response. If the nextPageToken key does not exist in the res dictionary then the get method will return null.
If npt is not null meaning there is a next page, the while loop will be triggered.
Inside the loop:
Request with the nextPageToken (as pageToken) is send to the end point.
Consiquently the response gets converted into a dictionary.
Then added to the raw_data report list.
Then the npt variable gets assigned again and if it is not None the loop will start over again.
print('processing report')
report = []
for i in raw_data:
line = {
'date': i['date'],
'domain': i['domain'],
'deviceType': i['deviceType'],
'mediaType': i['mediaType'],
'impressions': i['impressions'],
'revenue': i['revenue'],
'clicks': i.get('clicks', 0)
}
report.append(line)
After the report is completely downloaded we will process it. The processing means iterating the raw_data list of dictionaries line by line and recreating each line into a new dictionary. Then add that newly created dictionary to the report list. Some line dictionaries come without the “clicks” key, so we are using the get method to catch the non-existing clicks keys and replace Nones with zeros. In other words, missing clicks
are defaulted to 0.
print('validating report')
n_rows = len(report)
dats = {datetime.datetime.strptime(row['date'], "%Y-%m-%d").date()
for row in report}
try:
start = min(dats)
end = max(dats)
except:
raise Exception('failed to extract dates from data..')
Next, we validate the report by checking the number of rows and extracting date values to ensure the start and end dates align with expectations. The set comprehension is used to extract the dates from the report list and convert them into datetime objects for min and max dates determination. The start (min) and end (max) variables will be used later in the final CSV report name.
print('writing to disk')
report_header = ['date', 'domain', 'deviceType', 'mediaType', 'impressions', 'revenue', 'clicks']
fp = f'/tmp/{today}_rise.csv'
with open(fp, mode='w', newline='') as f:
writer = csv.DictWriter(f, fieldnames=report_header)
writer.writeheader()
writer.writerows(report)
Writing the report to a CSV file in a temporary directory (/tmp
), ensuring the correct headers and rows are included. The CSV library is used for the file writing. You can read about it HERE.
print('uploading to s3')
s3_key = f'temp/{today} - rise - {start} to {end}.csv'
bk = get_bucket()
upload_s3(fp, bk, s3_key)
Lastly, we upload the temporary CSV file to an S3 bucket using the custom upload_s3
function, with the S3 path formatted to include the date range and the report date. The custom upload function will be discussed in another article.
Thank you for stopping by! Please feel free to reach out if you have any questions.
RELATED POSTS
View all