Created
May 2, 2019 02:56
-
-
Save donovanfm/b4e6f0517e601a91e93ec32dd0313e10 to your computer and use it in GitHub Desktop.
Code for a Google Cloud Function that updates a Cloud SQL instance with recently modified Ad Manager LineItems.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| import datetime | |
| from googleads import ad_manager | |
| from googleads import oauth2 | |
| from google.oauth2 import service_account | |
| from google.cloud import bigquery | |
| from os import getenv | |
| import pymysql | |
| import pytz | |
| def handle_request(data, context): | |
| """Background Cloud Function to be triggered by Pub/Sub topic <fetch_line_items>.""" | |
| # Step 1: Authenticate and load LineItemService | |
| oauth2_client = oauth2.GoogleServiceAccountClient(getenv('KEY_FILE'), oauth2.GetAPIScope("ad_manager")) | |
| ad_manager_client = ad_manager.AdManagerClient(oauth2_client, getenv('APPLICATION_NAME'), getenv('NETWORK_CODE')) | |
| line_item_service = ad_manager_client.GetService("LineItemService") | |
| # Step 2: Specify SQL config and connect | |
| CONNECTION_NAME = getenv('CLOUDSQL_INSTANCE_CONNECTION') | |
| DB_USER = getenv('CLOUDSQL_USER') | |
| DB_PASSWORD = getenv('CLOUDSQL_PASSWORD') | |
| DB_NAME = getenv('CLOUDSQL_DATABASE') | |
| mysql_config = { | |
| 'unix_socket': f'/cloudsql/{CONNECTION_NAME}', | |
| 'user': DB_USER, | |
| 'password': DB_PASSWORD, | |
| 'db': DB_NAME, | |
| 'charset': 'utf8mb4', | |
| 'cursorclass': pymysql.cursors.DictCursor, | |
| 'autocommit': True | |
| } | |
| mysql_conn = pymysql.connect(**mysql_config) | |
| with mysql_conn.cursor() as cursor: | |
| # Step 3: Query database for the latest recorded modified date time | |
| cursor.execute("SELECT lastModifiedDateTime FROM LineItems " + | |
| "ORDER BY lastModifiedDateTime DESC LIMIT 1;") | |
| result = cursor.fetchone() | |
| if result is not None: | |
| latestModifiedDateTime = pytz.timezone('UTC').localize(result["lastModifiedDateTime"]) + datetime.timedelta(days=-1) | |
| statement = (ad_manager.StatementBuilder() | |
| .Where('lastModifiedDateTime > :lastModifiedDateTime') | |
| .WithBindVariable('lastModifiedDateTime', latestModifiedDateTime)) | |
| else: | |
| statement = ad_manager.StatementBuilder() | |
| # Step 4: Query database for the columns | |
| cursor.execute("SELECT COLUMN_NAME FROM INFORMATION_SCHEMA.COLUMNS WHERE TABLE_NAME='LineItems';") | |
| columns = [row["COLUMN_NAME"] for row in cursor.fetchall()] | |
| # Step 5: Fetch recently updated LineItems and update database | |
| while True: | |
| response = line_item_service.getLineItemsByStatement(statement.ToStatement()) | |
| print("Response: " + str(response)) | |
| if 'results' in response and len(response['results']): | |
| line_items = response['results'] | |
| cursor.execute("INSERT INTO LineItems " | |
| "(" + ",".join(columns) + ") " | |
| "VALUES " + generate_sql_values(line_items, columns) + " " | |
| "ON DUPLICATE KEY UPDATE" | |
| " " + generate_sql_columns(columns) + ";" | |
| ) | |
| statement.offset += statement.limit | |
| else: | |
| break | |
| return {'message': 'Number of LineItems updated: ' + str(response['totalResultSetSize'])} | |
| def generate_sql_columns(columns): | |
| return ", ".join([column + " = VALUES(" + column + ")" for column in columns]) | |
| def generate_sql_values(line_items, columns): | |
| sql_values = ("(" + | |
| "),(".join([ | |
| ", ".join([convert_value_for_sql_query(line_item[column]) for column in columns]) | |
| for line_item in line_items] | |
| ) + ")") | |
| return sql_values | |
| def convert_value_for_sql_query(ad_manager_value): | |
| value_type = type(ad_manager_value) | |
| date_keys = ['year', 'month', 'day'] | |
| datetime_keys = ['hour', 'minute', 'second'] | |
| if ad_manager_value is None: | |
| return "NULL" | |
| elif value_type == str: | |
| return "\"" + str(ad_manager_value).replace("\"", "\\\"") + "\"" | |
| elif str(value_type) == "<class 'zeep.objects.Date'>": | |
| return "\"" + datetime.date(*[ad_manager_value[key] for key in date_keys]).isoformat() + "\"" | |
| elif str(value_type) == "<class 'zeep.objects.DateTime'>": | |
| datetime_value = datetime.datetime( | |
| ad_manager_value['date']['year'], ad_manager_value['date']['month'], ad_manager_value['date']['day'], | |
| ad_manager_value['hour'], ad_manager_value['minute'], ad_manager_value['second'], tzinfo=pytz.timezone(ad_manager_value['timeZoneId']) | |
| ) | |
| utc_datetime_value = datetime_value.astimezone(pytz.timezone('UTC')) | |
| utc_datetime_value = datetime.datetime(utc_datetime_value.year, utc_datetime_value.month, utc_datetime_value.day, utc_datetime_value.hour, utc_datetime_value.minute, utc_datetime_value.second) | |
| return "\"" + utc_datetime_value.isoformat() + "\"" | |
| else: | |
| return str(ad_manager_value).replace("\"", "\\\"") |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment