AdX (Google Ad Exchange) Reporting API – Part 2
January 4, 2025 | by Pythonister Mister

Hello, and welcome back to the second part of the AdX reporting API article.
As I promised earlier, we will discuss the adx_api_functinos module in this article. The module is a Python script file called, you guessed it, adx_api_functinos.py
The file itself contains imports from other modules and libraries, as well as a few functions. Let’s look into it closely.
The first three lines are the imports of the required modules and libraries.
import tempfile, os, boto3, gzip, csv
from datetime import datetime as datetime, timedelta
from googleads import ad_manager, common, errors
- tempfile – the module to create temporary files. Downloading the report file is required.
- os – the module needed for interaction with the operation system. Particularly to delete the report tempfile after it is processed.
- boto3 – the library (AWS SDK) needed for interaction with AWS. I am not sure why someone decided to call it boto3… not just boto or aws_api_services… Anyway, the library will be used to send the downloaded report to the S3 bucket.
- gzip – the module to perform archiving operations on files. The original format of the downloaded report file is .tsv.gz; the file needs to be extracted from the archive and converted to the CSV file.
- csv – the module reads and writes tabular data in CSV format.
- datetime – module supplies classes for manipulating dates and times.
- googleads – client library for the Google Ads API. We will use it to download the report file.
Next, we are going to download the Google ads YAML file. The file contains basic connection configuration parameters like application name, network code, and path to the private key file. See the example of the content of the YAML file below.
ad_manager:
application_name: your-app-name
network_code: 123456
path_to_private_key_file: /tmp/adx_creds.json
In my case, I keep the YAML file on an S3 bucket.
gpath = '/tmp/googleads.yaml'
ses = boto3.session.Session()
s3r = ses.resource('s3')
s3r.meta.client.download_file('ax-airflow',
'store/yaml/googleads.yaml',
gpath)
The file path to the downloaded file is defined first. Then, the session and s3r (resource) objects are instantiated. Finally, the download_file method is invoked on the meta.client attribute of the s3r object. The method accepts three parameters: the s3 bucket name, the S3 file path inside the bucket, and the download folder.
At this point, we are done loading all the necessary data, and we are ready to define the first function. It is called open_connection and establishes a connection with the AdX server. This function will be using the YAML configuration file we downloaded earlier.
def open_connection():
client = ad_manager.AdManagerClient.LoadFromStorage(gpath)
client.cache = common.ZeepServiceProxy.NO_CACHE
network_service = client.GetService('NetworkService')
try:
network_service.getCurrentNetwork()
except errors.GoogleAdsServerFault:
print('Network version not supported. May need to update googleads package')
return(-1)
return(client)
First, we create the client object by invoking the LoadFromStorage method. The method accepts the path to the YAML file as a parameter. Then, the cache is disabled because the access denied error appears otherwise. Next, the networ_service object is created, and the getCurrentNetwork() method is invoked on that object in a try-catch block. If there are no errors during this step, then the function returns the client object. Otherwise, -1 is returned.
The second function is called api_call. It is the main function that will be used to download the report.
def api_call(statement = {'query': '', 'values': None},
dims = ['AD_UNIT_NAME'],
dim_attr = [],
cols = ['AD_SERVER_IMPRESSIONS'],
export_format = 'CSV_DUMP',
start_date = datetime.now().date() - timedelta(days=7),
end_date = datetime.now().date(),
s3 = True,
s3_bucket = 'airflow-api',
s3_key = 'datadump/noname_dump1.csv.gz',
ad_view = 'HIERARCHICAL',
dimIds = None,
tz = None):
The function signature contains 13 parameters with default values. Let’s go over each parameter one by one.
- statement – is a dictionary used to filter reports.
- dims – list of strings; report dimensions
- dim_attr – list of strings; report attributes
- cols – list of strings; report metrics
- export_format – string; output format from Ad Manager API. The valid formats are TSV, TSV_EXCEL, CSV_DUMP, XML, XLSX
- start_date – date; report start date
- end_date – date; report end date
- s3 – boolean flag; moves results to S3 if True, else returns function output
- s3_bucket – string; name of the S3 bucket to upload the report
- s3_key – string; S3 file path
- ad_view – string; a view for an ad unit report.
- dimIds – string; ids for custom dimension keys
- tz – string; timezone
client = open_connection()
if client == -1:
raise Exception("open_connection returned -1")
The first code block establishes a connection with the API. The open_connection returns either the client object or -1. The -1 means that the google ads package needs to be updated. An exception is raised to flag the error.
report_job = {
'reportQuery': {
'dimensions': dims,
'adUnitView': ad_view,
'dimensionAttributes': dim_attr,
'statement': statement,
'columns': cols,
'dateRangeType': 'CUSTOM_DATE',
'startDate': start_date,
'endDate': end_date
}
}
if dimIds is not None:
report_job['reportQuery']['customDimensionKeyIds'] = dimIds
if tz is not None:
report_job['reportQuery']['timeZoneType'] = tz
Next, the reporting job dictionary is created. The values for the keys come from the function signature. If dimension IDs or timezone parameters are supplied by the user, then we include them separately or do not include them otherwise.
report_downloader = client.GetDataDownloader()
report_job_id = report_downloader.WaitForReport(report_job)
report_file = tempfile.NamedTemporaryFile(suffix='.tsv.gz', delete=False)
report_downloader.DownloadReportToFile(
report_job_id, export_format, report_file
)
report_file.close()
print(f"Downloaded {report_file.name}")
Now, we are ready to download the report. First, the report downloader is initialized, and report_job is supplied to generate the report job id. Then, a temporary report file is initialized. Finally, the report_job_id, export_format, and report_file are supplied so that the report_downloader can download the file. After the report is downloaded, the report file is closed, and the report file path is displayed.
At this point, the report is downloaded, and all that’s left is to verify if the report has data and whether to upload it to S3 or not.
n_rows = 0
header = ''
with gzip.open(report_file.name, 'rt') as rep:
rr = csv.reader(rep)
for row in rr:
n_rows += 1
if n_rows == 1:
header = row
rep.close()
To validate the report, we want to make sure that it has more than zero rows and the header is present. To do so, we access the report with the gzip module and instantiate the csv reader. Then, the reader iterrates through the rows, counting each row. The first row is the header. At the end of this code block we close the opened gzip.
if s3 == True:
ses = boto3.session.Session()
s3r = ses.resource('s3')
s3r.meta.client.upload_file(report_file.name, s3_bucket, s3_key)
os.remove(report_file.name)
return({'n_rows':n_rows, 'header':header})
else:
return( report_file.name )
The final code block consists of a conditional statement based on the value of the boolean s3 flag. If the flag is true, an AWS session and s3 resource are created and instantiated, and the report is uploaded to S3. Then the os module removes the report and the function returns a dictionary with the number of rows and the report header. If the flag is false then the function returns the file path to the report.
This is it. That will conclude our article 2 part article about AdX API reporting. Please feel free to comment or send me message if you have any questions. The part one is HERE.
RELATED POSTS
View all