Skip to content

Instantly share code, notes, and snippets.

@donovanfm
Created May 2, 2019 02:56
Show Gist options
  • Select an option

  • Save donovanfm/b4e6f0517e601a91e93ec32dd0313e10 to your computer and use it in GitHub Desktop.

Select an option

Save donovanfm/b4e6f0517e601a91e93ec32dd0313e10 to your computer and use it in GitHub Desktop.
Code for a Google Cloud Function that updates a Cloud SQL instance with recently modified Ad Manager LineItems.
import datetime
from googleads import ad_manager
from googleads import oauth2
from google.oauth2 import service_account
from google.cloud import bigquery
from os import getenv
import pymysql
import pytz
def handle_request(data, context):
"""Background Cloud Function to be triggered by Pub/Sub topic <fetch_line_items>."""
# Step 1: Authenticate and load LineItemService
oauth2_client = oauth2.GoogleServiceAccountClient(getenv('KEY_FILE'), oauth2.GetAPIScope("ad_manager"))
ad_manager_client = ad_manager.AdManagerClient(oauth2_client, getenv('APPLICATION_NAME'), getenv('NETWORK_CODE'))
line_item_service = ad_manager_client.GetService("LineItemService")
# Step 2: Specify SQL config and connect
CONNECTION_NAME = getenv('CLOUDSQL_INSTANCE_CONNECTION')
DB_USER = getenv('CLOUDSQL_USER')
DB_PASSWORD = getenv('CLOUDSQL_PASSWORD')
DB_NAME = getenv('CLOUDSQL_DATABASE')
mysql_config = {
'unix_socket': f'/cloudsql/{CONNECTION_NAME}',
'user': DB_USER,
'password': DB_PASSWORD,
'db': DB_NAME,
'charset': 'utf8mb4',
'cursorclass': pymysql.cursors.DictCursor,
'autocommit': True
}
mysql_conn = pymysql.connect(**mysql_config)
with mysql_conn.cursor() as cursor:
# Step 3: Query database for the latest recorded modified date time
cursor.execute("SELECT lastModifiedDateTime FROM LineItems " +
"ORDER BY lastModifiedDateTime DESC LIMIT 1;")
result = cursor.fetchone()
if result is not None:
latestModifiedDateTime = pytz.timezone('UTC').localize(result["lastModifiedDateTime"]) + datetime.timedelta(days=-1)
statement = (ad_manager.StatementBuilder()
.Where('lastModifiedDateTime > :lastModifiedDateTime')
.WithBindVariable('lastModifiedDateTime', latestModifiedDateTime))
else:
statement = ad_manager.StatementBuilder()
# Step 4: Query database for the columns
cursor.execute("SELECT COLUMN_NAME FROM INFORMATION_SCHEMA.COLUMNS WHERE TABLE_NAME='LineItems';")
columns = [row["COLUMN_NAME"] for row in cursor.fetchall()]
# Step 5: Fetch recently updated LineItems and update database
while True:
response = line_item_service.getLineItemsByStatement(statement.ToStatement())
print("Response: " + str(response))
if 'results' in response and len(response['results']):
line_items = response['results']
cursor.execute("INSERT INTO LineItems "
"(" + ",".join(columns) + ") "
"VALUES " + generate_sql_values(line_items, columns) + " "
"ON DUPLICATE KEY UPDATE"
" " + generate_sql_columns(columns) + ";"
)
statement.offset += statement.limit
else:
break
return {'message': 'Number of LineItems updated: ' + str(response['totalResultSetSize'])}
def generate_sql_columns(columns):
return ", ".join([column + " = VALUES(" + column + ")" for column in columns])
def generate_sql_values(line_items, columns):
sql_values = ("(" +
"),(".join([
", ".join([convert_value_for_sql_query(line_item[column]) for column in columns])
for line_item in line_items]
) + ")")
return sql_values
def convert_value_for_sql_query(ad_manager_value):
value_type = type(ad_manager_value)
date_keys = ['year', 'month', 'day']
datetime_keys = ['hour', 'minute', 'second']
if ad_manager_value is None:
return "NULL"
elif value_type == str:
return "\"" + str(ad_manager_value).replace("\"", "\\\"") + "\""
elif str(value_type) == "<class 'zeep.objects.Date'>":
return "\"" + datetime.date(*[ad_manager_value[key] for key in date_keys]).isoformat() + "\""
elif str(value_type) == "<class 'zeep.objects.DateTime'>":
datetime_value = datetime.datetime(
ad_manager_value['date']['year'], ad_manager_value['date']['month'], ad_manager_value['date']['day'],
ad_manager_value['hour'], ad_manager_value['minute'], ad_manager_value['second'], tzinfo=pytz.timezone(ad_manager_value['timeZoneId'])
)
utc_datetime_value = datetime_value.astimezone(pytz.timezone('UTC'))
utc_datetime_value = datetime.datetime(utc_datetime_value.year, utc_datetime_value.month, utc_datetime_value.day, utc_datetime_value.hour, utc_datetime_value.minute, utc_datetime_value.second)
return "\"" + utc_datetime_value.isoformat() + "\""
else:
return str(ad_manager_value).replace("\"", "\\\"")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment