Created
March 7, 2023 13:16
-
-
Save manelio/89d3e02cec6f5132e06fb2b2f09ee7ec to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| # -*- coding: utf-8 -*- | |
| import pyodbc | |
| import requests | |
| import logging | |
| import time, datetime, re | |
| import csv | |
| # Backoff time sets how many minutes to wait between google pings when your API limit is hit | |
| BACKOFF_TIME = 10 | |
| # Return Full Google Results? If True, full JSON results from Gooñgle are included in output | |
| RETURN_FULL_RESULTS = False | |
| # Nº Direcciones to Geocoding (max. 2.500 per day) | |
| maxDirecciones = 500 | |
| # Datos acceso SQL Sever | |
| server = "SQL_SERVER_NAME" | |
| database = "DB_ NAME" | |
| user = "USERNAME" | |
| password = "PASSWORD" | |
| logger = logging.getLogger("root") | |
| logger.setLevel(logging.DEBUG) | |
| # create console handler | |
| ch = logging.StreamHandler() | |
| ch.setLevel(logging.DEBUG) | |
| logger.addHandler(ch) | |
| # fecha = ("Fecha: aaaa-mm") | |
| now = datetime.datetime.now() | |
| fecha = (str(now.year) + "-" + str(now.month)) | |
| y = now.year | |
| m = now.month | |
| if m == 12: | |
| FirstDayMonth = str(datetime.date(y, m, 1).replace(day=1, month=1, year=datetime.date(y, 1, 1).year + 1)).replace( | |
| "-", "") | |
| else: | |
| FirstDayMonth = str(datetime.date(y, m, 1).replace(day=1, month=datetime.date(y, m, 1).month + 1)).replace("-", "") | |
| if m == 12: | |
| LastDayMonth = str( | |
| datetime.date(y, m, 1).replace(day=1, month=1, year=datetime.date(y, 1, 1).year + 1) - datetime.timedelta( | |
| days=1)).replace("-", "") | |
| else: | |
| LastDayMonth = str( | |
| datetime.date(y, m, 1).replace(day=1, month=datetime.date(y, m, 1).month + 1) - datetime.timedelta( | |
| days=1)).replace("-", "") | |
| API_KEY = "XXXXXXXXXXXXXXXXXXXXXXX" # cuenta nueva google | |
| # ------------------ FUNCTION DEFINITIONS ------------------------ | |
| def print2csv(data): | |
| f = open('result.csv', 'w') | |
| writer = csv.writer(f) | |
| writer.writerow(data) | |
| f.close() | |
| def get_google_results(address, api_key=None, return_full_response=False): | |
| """ | |
| Get geocode results from Google Maps Geocoding API. | |
| Note, that in the case of multiple google geocode reuslts, this function returns details of the FIRST result. | |
| @param address: String address as accurate as possible. For Example "18 Grafton Street, Dublin, Ireland" | |
| @param api_key: String API key if present from google. | |
| If supplied, requests will use your allowance from the Google API. If not, you | |
| will be limited to the free usage of 2500 requests per day. | |
| @param return_full_response: Boolean to indicate if you'd like to return the full response from google. This | |
| is useful if you'd like additional location details for storage or parsing later. | |
| """ | |
| # Set up your Geocoding url | |
| geocode_url = "https://maps.googleapis.com/maps/api/geocode/json?address=%s" % (address) | |
| if api_key is not None: | |
| geocode_url = geocode_url + "&key={}".format(api_key) | |
| # Ping google for the reuslts: | |
| results = requests.get(geocode_url) | |
| # Results will be in JSON format - convert to dict using requests functionality | |
| results = results.json() | |
| # if there's no results or an error, return empty results. | |
| if len(results['results']) == 0: | |
| output = { | |
| "formatted_address": None, | |
| "latitude": None, | |
| "longitude": None, | |
| "accuracy": None, | |
| "google_place_id": None, | |
| "type": None, | |
| "postcode": None | |
| } | |
| else: | |
| answer = results['results'][0] | |
| output = { | |
| "formatted_address": answer.get('formatted_address'), | |
| "latitude": answer.get('geometry').get('location').get('lat'), | |
| "longitude": answer.get('geometry').get('location').get('lng'), | |
| "accuracy": answer.get('geometry').get('location_type'), | |
| "google_place_id": answer.get("place_id"), | |
| "type": ",".join(answer.get('types')), | |
| "postcode": ",".join([x['long_name'] for x in answer.get('address_components') | |
| if 'postal_code' in x.get('types')]) | |
| } | |
| # Append some other details: | |
| output['input_string'] = address | |
| output['number_of_results'] = len(results['results']) | |
| output['status'] = results.get('status') | |
| if return_full_response is True: | |
| output['response'] = results | |
| return output | |
| # --------------------------------------------------------------------- | |
| start_time = time.time() | |
| # Crear conexion a SQL Server | |
| # cnxn = pyodbc.connect('DRIVER={SQL Server};SERVER=%s;DATABASE=%s;UID=%s;PWD=%s' %(server, database, user, password)) | |
| # Crear conexion a SQL Server | |
| print('Creating Connection to Server') | |
| cnxn = pyodbc.connect('DRIVER={SQL Server};SERVER=%s;DATABASE=%s;UID=%s;PWD=%s' % (server, database, user, password)) | |
| # cnxn = pyodbc.connect('DRIVER={SQL Server};SERVER=%s;DATABASE=%s;Trusted_Connection=yes;' %(server, database)) | |
| print('Connected to SQL Server in %s seconds' % (time.time() - start_time)) | |
| # Create a cursor from the connection | |
| cursor = cnxn.cursor() | |
| # Seleccionar Direcciones sin Geocodificar | |
| # cursor.execute("SELECT TOP(?) I.[IdCentro] ,I.[IdPersona] ,I.[IdAbono] , I.FechaAlta, I.FechaBaja ,[DireccionCompleta], [OX] ,[OY], UtmGeocoded FROM [DeporForus].[dbo].[Integrantes] I LEFT JOIN [Mapas].[dbo].[Personas_OX_OY] PP ON I.IdCentro = PP.IdCentro AND I.IdPersona = PP.IdPersona AND I.IdAbono = PP.IdAbono LEFT JOIN [Mapas].[dbo].[Personas] P ON I.IdCentro = P.IdCentro AND I.IdPersona = P.IdPersona LEFT JOIN [DeporForus].[dbo].[Personas] F ON I.IdCentro = F.IdCentro AND I.[IdPersona] = F.[IdPersona] WHERE i.FechaBaja >= DATEADD(YEAR, -2, GETDATE()) AND [DireccionCompleta] IS NOT NULL AND (UtmGeocoded IS NULL OR UtmGeocoded =0) AND I.IdCentro <> '07' AND I.IdCentro <> '23'", maxDirecciones) | |
| cursor.execute( | |
| "SELECT TOP(?) I.[IdCentro] ,I.[IdPersona] ,I.[IdAbono] , I.FechaAlta, I.FechaBaja ,[DireccionCompleta], [OX] ,[OY], UtmGeocoded FROM [DeporForus].[dbo].[Integrantes_Test] I LEFT JOIN [Mapas].[dbo].[Personas_OX_OY] PP ON I.IdCentro = PP.IdCentro AND I.IdPersona = PP.IdPersona AND I.IdAbono = PP.IdAbono LEFT JOIN [Mapas].[dbo].[Personas] P ON I.IdCentro = P.IdCentro AND I.IdPersona = P.IdPersona LEFT JOIN [DeporForus].[dbo].[Personas] F ON I.IdCentro = F.IdCentro AND I.[IdPersona] = F.[IdPersona] WHERE I.FechaAlta < GETDATE() AND (I.FechaBaja > GETDATE() OR I.FechaBaja IS NULL) AND [DireccionCompleta] IS NOT NULL and [DireccionCompleta] not like ' ,%' and [DireccionCompleta] not like 'indef%' AND (UtmGeocoded IS NULL OR UtmGeocoded =0)", | |
| maxDirecciones) | |
| # cursor.execute("SELECT TOP(?) I.[IdCentro] ,I.[IdPersona] ,I.[IdAbono] , I.FechaAlta, I.FechaBaja ,[DireccionCompleta], [OX] ,[OY], UtmGeocoded FROM [DeporForus].[dbo].[Integrantes] I LEFT JOIN [Mapas].[dbo].[Personas_OX_OY] PP ON I.IdCentro = PP.IdCentro AND I.IdPersona = PP.IdPersona AND I.IdAbono = PP.IdAbono LEFT JOIN [Mapas].[dbo].[Personas] P ON I.IdCentro = P.IdCentro AND I.IdPersona = P.IdPersona LEFT JOIN [DeporForus].[dbo].[Personas] F ON I.IdCentro = F.IdCentro AND I.[IdPersona] = F.[IdPersona] WHERE I.FechaAlta < GETDATE() AND (I.FechaBaja > GETDATE() OR I.FechaBaja IS NULL) AND [DireccionCompleta] IS NOT NULL AND (OX IS NULL OR UtmGeocoded =0)", maxDirecciones) | |
| addresses = [] | |
| idPersonas = [] | |
| idCentros = [] | |
| idAbonos = [] | |
| for row in cursor.fetchall(): | |
| # Para eliminar "ruido" del nº de la dirección | |
| # print(row.IdPersona, row.DireccionCompleta) | |
| comaSplit = re.split(r"[,]", row.DireccionCompleta) # Separa string por , | |
| digitSplit = re.split('(\d+)', comaSplit[1]) # Separa por digitos 0,1, ..., 9 | |
| # addresJoin = (comaSplit[0]+", "+ digitSplit[1]+","+comaSplit[-2]+","+comaSplit[-1]).strip() | |
| try: | |
| addressesesJoin = (comaSplit[0] + ", " + digitSplit[1] + "," + comaSplit[-2] + "," + comaSplit[-1]).strip() | |
| # addresJoin = (comaSplit[0]+", "+ digitSplit[1]+","+comaSplit[2]+","+comaSplit[3]).strip() | |
| except Exception: | |
| addressesesJoin = (comaSplit[0] + "," + comaSplit[-2] + "," + comaSplit[-1]).strip() | |
| addresses.append(addressesesJoin) | |
| idPersonas.append(row.IdPersona) | |
| idCentros.append(row.IdCentro) | |
| idAbonos.append(row.IdAbono) | |
| print('Data fetch after %s seconds' % (time.time() - start_time)) | |
| # ------------------------ PROCESSING LOOP ----------------------------- | |
| ''' | |
| # Ensure, before we start, that the API key is ok/valid, and internet access is ok | |
| test_result = get_google_results("London, England", API_KEY, RETURN_FULL_RESULTS) | |
| if (test_result['status'] != 'OK') or (test_result['formatted_address'] != 'London, UK'): | |
| logger.warning("There was an error when testing the Google Geocoder.") | |
| raise ConnectionError('Problem with test results from Google Geocode - check your API key and internet connection.') | |
| ''' | |
| # Create a list to hold results | |
| results = [] | |
| resultsID = [] | |
| resultsOX = [] | |
| resultsOY = [] | |
| resultsIdCentros = [] | |
| resultsIdAbonos = [] | |
| resultsUtmGeocoded = [] | |
| # header = ['Centro', 'Persona', 'Lon', 'Lat', 'Google_Result'] | |
| # f = open('result.csv', 'w') | |
| # writer = csv.writer(f) | |
| # writer.writerow(header) | |
| # f.close() | |
| # Go through each address in turn | |
| for address, idPersona, idCentro, idAbono in zip(addresses, idPersonas, idCentros, idAbonos): | |
| # While the address geocoding is not finished: | |
| geocoded = False | |
| time.sleep(0.02) | |
| while geocoded is not True: | |
| # Geocode the address with google | |
| try: | |
| geocode_result = get_google_results(address, API_KEY, return_full_response=RETURN_FULL_RESULTS) | |
| except Exception as e: | |
| logger.exception(e) | |
| logger.error("Major error with {}".format(address)) | |
| logger.error("Skipping!") | |
| geocoded = True | |
| # If we're over the API limit, backoff for a while and try again later. | |
| if geocode_result['status'] == 'OVER_QUERY_LIMIT': | |
| logger.info("Hit Query Limit! Backing off for a bit.") | |
| time.sleep(BACKOFF_TIME * 60) # en seg. | |
| geocoded = False | |
| else: | |
| # If we're ok with API use, save the results | |
| # Note that the results might be empty / non-ok - log this | |
| if geocode_result['status'] != 'OK': | |
| logger.warning("Error geocoding IdCentro = {} - IdPersona = {} ".format(idCentro, idPersona)) | |
| resultsOX.append(geocode_result["longitude"]) | |
| resultsOY.append(geocode_result["latitude"]) | |
| resultsID.append(idPersona) | |
| resultsIdCentros.append(idCentro) | |
| resultsIdAbonos.append(idAbono) | |
| resultsUtmGeocoded.append(1) | |
| # si falla, lo registramos en la excel | |
| # print2csv([idCentro, idPersona, geocode_result["longitude"], geocode_result["latitude"], geocode_result['status']]) | |
| geocoded = True | |
| else: | |
| resultsOX.append(geocode_result["longitude"]) | |
| resultsOY.append(geocode_result["latitude"]) | |
| resultsID.append(idPersona) | |
| resultsIdCentros.append(idCentro) | |
| resultsIdAbonos.append(idAbono) | |
| resultsUtmGeocoded.append(1) | |
| geocoded = True | |
| # Print status every 100 addresses | |
| if len(results) % 100 == 0: | |
| logger.info("Completed {} of {} address".format(len(resultsID), len(addresses))) | |
| ## Every 500 addresses, save progress to file(in case of a failure so you have something!) | |
| # if len(results) % 500 == 0: | |
| # All done | |
| logger.info("Finished geocoding all addresses") | |
| print('Finished geocoding after %s seconds' % (time.time() - start_time)) | |
| ## Update DB longitude | |
| # for id, abono, centro, long, utm in zip(resultsID, resultsIdAbonos, resultsIdCentros, resultsOX, resultsUtmGeocoded): | |
| # cursor.execute("UPDATE Personas_OX_OY SET OX = ?, UtmGeocoded = ? WHERE IdPersona = ? AND IdCentro = ? AND IdAbono = ?", long, utm, id, centro, abono) | |
| ## Update DB latitude | |
| # for id, abono, centro, lat, utm in zip(resultsID, resultsIdAbonos, resultsIdCentros, resultsOY, resultsUtmGeocoded): | |
| # cursor.execute("UPDATE Personas_OX_OY SET OY = ?, UtmGeocoded = ? WHERE IdPersona = ? AND IdCentro = ? AND IdAbono = ?", lat, utm, id, centro, abono) | |
| # unificamos la consulta en una, no hacen falta 2 loops | |
| for id, abono, centro, lat, long, utm in zip(resultsID, resultsIdAbonos, resultsIdCentros, resultsOY, resultsOX, | |
| resultsUtmGeocoded): | |
| cursor.execute( | |
| "UPDATE Personas_OX_OY SET OX = ?, OY = ?, UtmGeocoded = ? WHERE IdPersona = ? AND IdCentro = ? AND IdAbono = ?", | |
| long, lat, utm, id, centro, abono) | |
| ## Call cnxn.commit() or your work will be lost! | |
| cnxn.commit() | |
| cnxn.close() | |
| print('Updated DB after %s seconds' % (time.time() - start_time)) | |
| logger.info('DONE!') | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment