Hello everyone and welcome! This article will explore how to connect and get data from the Assertive Yield reporting API. Assertive Yield is a company that provides advanced ad revenue management solutions for publishers. Founded in 2019 by Nils Lind in Amsterdam, the company helps digital publishers optimize and maximize their advertising revenue through AI-driven technology and real-time analytics. Its flagship product, Yield Manager, is designed to streamline complex ad infrastructures and provide a unified platform for managing ad revenue.
Let’s dive into the code. Remember when you copy the code make sure to place proper indentations, otherwise the code won’t work. You should know that, it’s obvious, I know I don’t need to tell you this.
First, let’s start by importing all the required libraries
import datetime
import subprocess
import json
import os
import boto3
from analytics_library import get_secret, get_bucket
- datetime library is a very strong library. Used to manipulate and format date and time. You can read more about it here.
- subprocess Allows running shell commands from within the Python script. We will use it for cURL commands.
- json Used to work with JSON data (parsing, serializing, converting binary strings to dictionaries, etc.).
- os: Accesses environment variables and interacts with the operating system. For example, delete local temporary files.
boto3
: AWS SDK for Python, used for interacting with AWS services like S3.analytics_library
: Imports custom functions from a custom libraryget_secret
(for retrieving API secrets) andget_bucket
(to get the S3 bucket name). We will review the custom library in another article.
def api_query(**context):
api_query(**context) The function is defined to take a context
argument, which comes from a workflow system Apache Airflow. The context
dictionary provides execution context information such as today’s date. The function can be named anything but for clarity, it is named as it’s named.
Next, we are going to set up date variables inside the api_query function.
try:
today = context['ti'].xcom_pull(key='init_today')
except:
today = str(datetime.date.today())
end = datetime.datetime.strptime(today, '%Y-%m-%d').date() - datetime.timedelta(days=1)
start = end - datetime.timedelta(days=3)
print(f'\nToday: {today}, start: {start}, end: {end}\n')
The code block retrieves or calculates the current date, sets a one-week period (from start to end), and prints this range. If a specific date isn’t available via XCom, then the current date is used. Here’s a breakdown of the provided code snippet:
- try: today = context[‘ti’].xcom_pull(key=init_today)
The code attempts to pull the today value from an Airflow task instance (ti) using the XCom mechanism, where XCom (cross-communication) is used to pass small pieces of data between tasks. Here, xcom_pull() retrieves the value stored under the key init_today (from a prior task). - except: today = str(datetime.date.today())
If the key init_today does not exist or an error occurs, the code catches the exception and sets today to the current date using datetime.date.today(). The result is converted to a string to maintain consistency with the expected format. - end = datetime.datetime.strptime(today, ‘%Y-%m-%d’).date() – datetime.timedelta(days=1)
This line converts the today string (in YYYY-MM-DD format) into a date object using strptime(), subtracts one day from it using timedelta(days=1), and assigns it to the variable end. - start = end – datetime.timedelta(days=3)
The start date is set to three days before the end date by subtracting three days using another timedelta(days=3). - print(f’\nToday: {today}, start: {start}, end: {end}\n’)
Finally, the code prints the today, start, and end dates, formatted for readability. This helps track the date range being processed.
Next, we will be pulling credentials from a repository.
print('Getting API creds...')
#get_secret('assertive-yield-creds', 'creds')
#creds = eval(os.environ['creds'])
creds = {'email':'my_email@provider.com','password':'Pa$$w0r4'}
email = creds['email']
password = creds['password']
print('DONE\n')
After printing a message, the next two lines are commented out. Those two lines retrieve API credentials from the environment using the get_secret function. Then eval(os.environ[‘creds’]): Evaluates the environment variable creds (assuming it contains a JSON-like string) and assigns it to creds. In some other article I will go into detail about custom helper functions. For now, let’s just have credentials hardcoded in the creds = {’email’:’my_email@provider.com’,’password’:’Pa$$w0r4′} variable
Then we extract the email and password for later use and print the DONE message.
We are done setting up dates and login credentials, now is the time to connect to the API. We are going to use the same technique as int the AppNexus article.
First, we will start by getting Auth Token and Client ID
print('\nGetting auth token and client id...')
crl = "curl --location --request POST 'https://suite.assertiveyield.com/api/v2/login/' "
crl += "--header 'Content-Type: application/x-www-form-urlencoded' "
crl += f"--data-urlencode 'email={email}' "
crl += f"--data-urlencode 'password={password}' "
res = subprocess.check_output(crl, shell=True)
res = json.loads(res)
auth_tok = res['data']['authToken']
user_id = res['data']['userId']
print('DONE')
After the print statement, we construct curl
command to send a POST request to the API to authenticate using the email and password. Then execute the curl
command using subprocess.check_output()
. And finally, we parse the JSON response to extract the authToken
and userId
from the API response.
Next, we are fetching publisher information by constructing and executing a curl command.
crl = "curl https://suite.assertiveyield.com/api/v2/publisher "
crl += "-H 'Content-Type: application/x-www-form-urlencoded' "
crl += f'-H "X-Auth-Token: {auth_tok}" '
crl += f'-H "X-User-Id: {user_id}" '
pubs = subprocess.check_output(crl, shell=True)
pubs = json.loads(pubs)
entities = [i['entities'] for i in pubs]
entities = [ entity for lst in entities for entity in lst ]
entity_ids = [ e['id'] for e in entities]
The first 4 lines we are constructing a curl command to fetch the list of publishers using the auth token and user ID. After that next two lines implements the curl request and process the response. After that response flattens extracting entities and entity ids.
data = {
"entities": entity_ids,
"timezone": "UTC",
"dimensions": ["host","adUnit","dfpAdvertiser"],
"metrics": ["impressions", "direct_impressions",
"prebid_won_impressions", "dynamicAllocation_impressions",
"prebid_revenue", "prebid_won_revenue",
"dynamicAllocation_revenue","direct_revenue",
"lineItem_revenue",
"pageViews",
"viewable","clicks"],
"timeGranularity": "by_hour",
"applyRevenueBias": False,
"dimensionsLimit": 10000,
"filter": [
{
"id": "startDate",
"value": f"{start}T00:00:00.000Z"
},
{
"id": "endDate",
"value": f"{end}T23:59:59.999Z"
}
]
}
Then the above block is added to construct a JSON payload data object. The object specifies the entities, dimensions, metrics, and date range (dynamically supplied from start and end variables) for the API query.
sdata = json.dumps(data)
crl = f'curl -H "X-Auth-Token: {auth_tok}" '
crl += f'-H "X-User-Id: {user_id}" https://suite.assertiveyield.com/api/v2/report '
crl += '-H "Content-Type: application/json" '
crl += '--request POST '
crl += f'--data \'{sdata}\''
res = subprocess.check_output(crl, shell=True)
res = json.loads(res)
print('DONE\n')
Then the block above convetrs data dictionary to JSON, adds it to the crl request variable and sends it as a POST request. Consiquentrly the response is parsed to a dictionary using json.loads method.
Now we received the data in the res variable but we need to “clean” it. For that we will create 2 helper functions. One for cleaning up and formatting in site, adunit and dfpAdvertiser columns values by escaping any qoues. Anohter will be converting all None values to ‘0’ and ensure all other values are strings.
def strchk(lst):
site, adunit, dfpAdvertiser = lst
site = site.replace('"','""')
adunit = adunit.replace('"','""')
dfpAdvertiser = dfpAdvertiser.replace('"','""')
return([f'"{site}"',f'"{adunit}"', f'"{dfpAdvertiser}"'])
def strval(val):
val = '0' if val is None else str(val)
return(val)
Then we parse the data for output by extracting headers from the response, combine all the metrics into CSV ros and finally create a list of strings in CSV format:
keys = [k for k in res['data'][0].keys() if k != '_group']
out = [strchk(d['_group']) + [strval(d[k]) for k in keys] for d in res['data']]
cols = ['site','adunit','dfpAdvertiser'] + keys
out = [cols] + out
out = [','.join(i) for i in out ]
n_rows = len(out)
Currently, we processed the report and are ready to write it on the disk and then upload it to the S3 bucket for permanent storage:
print('Writing CSV API report...')
fn = '/tmp/assertive_yeild_report.csv'
wt = open(fn, 'w')
wt.write( '\n'.join(out))
wt.close()
print('report saved locally\n')
print('Uploading to S3...')
s3_key = f'temp/{today} - assertive_yeild - {start} to {end}.csv'
bk = get_bucket()
s3r = boto3.resource('s3')
s3r.meta.client.upload_file(fn, bk, s3_key)
os.remove(fn)
print('\ndone\n')
After printing the message we assign a temporary csv file path to the fn variable. Then we use the open method to write the report to a CSV file. Also, notice how the ‘\n’.join(out) is used in the write method.
At this point, the report is processed and saved on an S3 bucket. The get_bucket() is a custom function that returns a bucket depending on the environment name. I will discuss it in some other article. Thank you for stopping by and feel free to write a comment or email me directly if you have any questions.
RELATED POSTS
View all