PythonEnthusiast.com

Criteo Reporting API

September 19, 2024 | by Pythonister Mister

img_3.1_criteo

Hello Python Enthusiasts! Welcome again, today we will go over the Criteo.com reporting API. Criteo is a global technology company specializing in performance marketing, data-driven advertising, and commerce media. Criteo provides advertisers and publishers with tools to optimize their marketing and monetization strategies through personalized advertising, machine learning, and real-time data analytics.

As always I want to remind you to set the indententions correctly. Otherwise, your code won’t work. I am sure you already know this but I will be reminding about it over and over.

Let’s start with the imports:

import subprocess
import json
import csv
from datetime import datetime, timedelta
from analytics_library import get_secret, upload_s3, get_bucket

import subprocess – Imports the subprocess module, which allows the code to run system commands (like curl) from within the script.

import json: Imports the json module for working with JSON data (parsing and converting JSON to Python objects).

import csv: Imports the csv module to work with CSV files.

from datetime import datetime, timedelta: Imports datetime and timedelta classes from the datetime module to work with dates and time spans. Or You can just import datetime library only, by itself, and call the timedelta and datetime method on that library directly, like datetime.datetime and datetime.timedelta like I did in Assertive Yield and AppNexus cases.

from analytics_library import get_secret, upload_s3, get_bucket: Imports functions from a custom library (analytics_library). We will discuss how to write those custom functions in some other article. The custom functions do the following:

  • get_secret: Retrieves a stored secret (likely an API key or token).
  • upload_s3: Uploads files to an S3 bucket.
  • get_bucket: Retrieves the S3 bucket to which files will be uploaded.

Next after the imports we define the function name and the rest of the code will be inside the function.

def api_query(**context):

def api_query(context): Defines a function named api_query, which accepts keyword arguments in the context dictionary. The dictionary holds Airflow job-related variables. Like today’s date which we will try to pull later in the code. As for now, you can ignore the context variable. I will explain how to use it in the context of Airflow in some other article.

    print('setting up variables..')
    try:
        today = context['ti'].xcom_pull(key='init_today')
    except:
        today = str(datetime.date.today())
    end = datetime.strptime(today, '%Y-%m-%d').date() - timedelta(days=1)
    start = end - timedelta(days=6)

Right after the function definition, we are going to set up the dates. The try-except block follows the print statement. It attempts to pull the value of init_today from the context using the xcom_pull the method from an Apache Airflow task instance (will be discussed in a separate article).

today = context[‘ti’].xcom_pull(key=’init_today’): Tries to get a value from xcom using a key init_today.

except: If xcom_pull fails (for instance, if init_today is missing), it falls back to setting today to the current date (datetime.date.today()).

end: Converts today from a string to a date object and subtracts one day (yesterday).

start: Sets start to six days before end, creating a 7-day window (from start to end).

Next, we are going to set up the access token variable – “tk”. For my projects I use Amazon Secrets Manager to store the secret values (like passwords or tokens). I wrote a custom library function to pull the secret values. I will discuss that function in a separate article. For that reason the custom function is commented out and the tk value is hardcoded.

    # tk = json.loads(get_secret('analytics-api-criteo', osvar=False))['token']
   tk = 'TOKEN_361283621'

Then we are going to define report’s dimensions and metrics as two arrays. Then joining those arrays into the curl request string along with the start and end dates. crl: Constructs a curl command to send an HTTP GET request to the Criteo API, including the token, dimensions, metrics, and date range (from start to end).

After the curl request is defined, the subprocess executes it and loads it into a list. I used this technique in the AppNexus and Assetive Yield API calls.

    dim = ['Domain', 'DeviceType']
    met = ['CriteoDisplays', 'Revenue', 'Clicks']

    print('getting report..')
    crl = "curl -XGET 'https://pmc.criteo.com/api/stats?"
    crl += f"apitoken={tk}"
    crl += f"&dimensions={(',').join(dim)}"
    crl += f"&metrics={(',').join(met)}"
    crl += f"&begindate={str(start)}"
    crl += f"&enddate={str(end)}' "

    res = subprocess.check_output(crl, shell=True)
    res = json.loads(res)

After that, we will validate the report by checking its length and dates. n_rows: Stores the number of rows in the response, hence report length. In future articles, I will explain how to use the n_rows variable to validate your report. dats: A set comprehension that extracts and converts the TimeId field from each row of the report into date objects to validate how many report dates were received.

Then we will extract max and min dates from the “dats” to use it later for report CSV filename.

    print('validating report..')
    n_rows = len(res)
    dats = {datetime.strptime(row['TimeId'], "%Y-%m-%dT%H:%M:%S").date()
            for row in res}
    try:
        start = min(dats)
        end = max(dats)
    except:
        raise Exception('failed to extract dates from data..')

Now we are ready to write the report on a disk. Traditionally we will write it in the ‘/tmp/’ directory/. We will be using the CSV library to write the report. You can read more about the CSV library HERE. I wanted to point out that the report header is defined dynamically by extracting keys from the first item (dictionary) of the list and converting the keys into a list.

    print('writing report to disk..')
    csv_fp = f'/tmp/criteo_api.csv'

    with open(csv_fp, 'w', newline='') as file:
        header = list(res[0].keys())
        writer = csv.DictWriter(file, fieldnames=header)
        writer.writeheader()
        writer.writerows(res)

The last step after the report is saved is to push it to the cloud. In my example, I am using a custom function that uploads the CSV report to AWS S3. In this example, I am sending the file to the temp (temporary) folder on my S3 bucket. The reason why it goes to a temporary folder first is that the report will be validated one more time for consistency and then pushed to the final folder.

The report name includes 3 date variables: today, start and end. To indicate the date when report was downloaded, start and end of the report.

bk = get_bucket(): Retrieves depending on the environment the S3 bucket where the file will be uploaded. Custom function to separate staging and production environments. Going to discuss it in some other article.

print('uploading report to s3..')
s3_key = f'temp/{today} - criteo - {start} to {end}.csv'
bk = get_bucket()
upload_s3(csv_fp, bk, s3_key)

And this is it at this point the report is extracted and saved on S3 for future ingesting to a database. If you have any questions please feel free to send me a message to help@pythonenthusiast.com! Good luck!

RELATED POSTS

View all

view all