|
# Copyright 2019 Google LLC |
|
# |
|
# Licensed under the Apache License, Version 2.0 (the "License"); |
|
# you may not use this file except in compliance with the License. |
|
# You may obtain a copy of the License at |
|
# |
|
# https://www.apache.org/licenses/LICENSE-2.0 |
|
# |
|
# Unless required by applicable law or agreed to in writing, software |
|
# distributed under the License is distributed on an "AS IS" BASIS, |
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
|
# See the License for the specific language governing permissions and |
|
# limitations under the License. |
|
|
|
import datetime |
|
from googleads import ad_manager |
|
from googleads import oauth2 |
|
from google.oauth2 import service_account |
|
from google.cloud import bigquery |
|
from os import getenv |
|
import pymysql |
|
import pytz |
|
|
|
def handle_request(data, context): |
|
"""Background Cloud Function to be triggered by Pub/Sub topic <fetch_line_items>.""" |
|
|
|
# Step 1: Authenticate and load LineItemService |
|
oauth2_client = oauth2.GoogleServiceAccountClient(getenv('KEY_FILE'), oauth2.GetAPIScope('ad_manager')) |
|
ad_manager_client = ad_manager.AdManagerClient(oauth2_client, getenv('APPLICATION_NAME'), getenv('NETWORK_CODE')) |
|
line_item_service = ad_manager_client.GetService('LineItemService') |
|
|
|
# Step 2: Specify SQL config and connect |
|
CONNECTION_NAME = getenv('CLOUDSQL_INSTANCE_CONNECTION') |
|
DB_USER = getenv('CLOUDSQL_USER') |
|
DB_PASSWORD = getenv('CLOUDSQL_PASSWORD') |
|
DB_NAME = getenv('CLOUDSQL_DATABASE') |
|
|
|
mysql_config = { |
|
'unix_socket': f'/cloudsql/{CONNECTION_NAME}', |
|
'user': DB_USER, |
|
'password': DB_PASSWORD, |
|
'db': DB_NAME, |
|
'charset': 'utf8mb4', |
|
'cursorclass': pymysql.cursors.DictCursor, |
|
'autocommit': True |
|
} |
|
|
|
mysql_conn = pymysql.connect(**mysql_config) |
|
|
|
with mysql_conn.cursor() as cursor: |
|
# Step 3: Query database for the latest recorded modified date time |
|
cursor.execute('SELECT lastModifiedDateTime FROM LineItems ' + |
|
'ORDER BY lastModifiedDateTime DESC LIMIT 1;') |
|
result = cursor.fetchone() |
|
if result is not None: |
|
latestModifiedDateTime = pytz.timezone('UTC').localize(result['lastModifiedDateTime']) + datetime.timedelta(days=-1) |
|
statement = (ad_manager.StatementBuilder() |
|
.Where('lastModifiedDateTime > :lastModifiedDateTime') |
|
.WithBindVariable('lastModifiedDateTime', latestModifiedDateTime)) |
|
else: |
|
statement = ad_manager.StatementBuilder() |
|
|
|
# Step 4: Query database for the columns |
|
cursor.execute('SELECT COLUMN_NAME FROM INFORMATION_SCHEMA.COLUMNS WHERE TABLE_NAME="LineItems";') |
|
columns = [row['COLUMN_NAME'] for row in cursor.fetchall()] |
|
|
|
# Step 5: Fetch recently updated LineItems and update database |
|
while True: |
|
response = line_item_service.getLineItemsByStatement(statement.ToStatement()) |
|
if 'results' in response and len(response['results']): |
|
line_items = response['results'] |
|
cursor.execute('INSERT INTO LineItems ' |
|
'(' + ','.join(columns) + ') ' |
|
'VALUES ' + generate_sql_values(line_items, columns) + ' ' |
|
'ON DUPLICATE KEY UPDATE' |
|
' ' + generate_sql_columns(columns) + ';' |
|
) |
|
statement.offset += statement.limit |
|
else: |
|
break |
|
|
|
return {'message': 'Number of LineItems updated: ' + str(response['totalResultSetSize'])} |
|
|
|
def generate_sql_columns(columns): |
|
return ', '.join([column + ' = VALUES(' + column + ')' for column in columns]) |
|
|
|
def generate_sql_values(line_items, columns): |
|
sql_values = ('(' + |
|
'),('.join([ |
|
', '.join([convert_value_for_sql_query(line_item[column]) for column in columns]) |
|
for line_item in line_items] |
|
) + ')') |
|
return sql_values |
|
|
|
def convert_value_for_sql_query(ad_manager_value): |
|
value_type = type(ad_manager_value) |
|
date_keys = ['year', 'month', 'day'] |
|
datetime_keys = ['hour', 'minute', 'second'] |
|
if ad_manager_value is None: |
|
return 'NULL' |
|
elif value_type == str: |
|
return '"' + str(ad_manager_value).replace('"', '\\"') + '"' |
|
elif str(value_type) == "<class 'zeep.objects.Date'>": |
|
return '"' + datetime.date(*[ad_manager_value[key] for key in date_keys]).isoformat() + '"' |
|
elif str(value_type) == "<class 'zeep.objects.DateTime'>": |
|
datetime_value = datetime.datetime( |
|
ad_manager_value['date']['year'], ad_manager_value['date']['month'], ad_manager_value['date']['day'], |
|
ad_manager_value['hour'], ad_manager_value['minute'], ad_manager_value['second'], tzinfo=pytz.timezone(ad_manager_value['timeZoneId']) |
|
) |
|
utc_datetime_value = datetime_value.astimezone(pytz.timezone('UTC')) |
|
utc_datetime_value = datetime.datetime(utc_datetime_value.year, utc_datetime_value.month, utc_datetime_value.day, utc_datetime_value.hour, utc_datetime_value.minute, utc_datetime_value.second) |
|
return '"' + utc_datetime_value.isoformat() + '"' |
|
else: |
|
return str(ad_manager_value).replace('"', '\\"') |