Skip to content

Instantly share code, notes, and snippets.

@manelio
Created March 7, 2023 13:16
Show Gist options
  • Select an option

  • Save manelio/89d3e02cec6f5132e06fb2b2f09ee7ec to your computer and use it in GitHub Desktop.

Select an option

Save manelio/89d3e02cec6f5132e06fb2b2f09ee7ec to your computer and use it in GitHub Desktop.
# -*- coding: utf-8 -*-
import pyodbc
import requests
import logging
import time, datetime, re
import csv
# Backoff time sets how many minutes to wait between google pings when your API limit is hit
BACKOFF_TIME = 10
# Return Full Google Results? If True, full JSON results from Gooñgle are included in output
RETURN_FULL_RESULTS = False
# Nº Direcciones to Geocoding (max. 2.500 per day)
maxDirecciones = 500
# Datos acceso SQL Sever
server = "SQL_SERVER_NAME"
database = "DB_ NAME"
user = "USERNAME"
password = "PASSWORD"
logger = logging.getLogger("root")
logger.setLevel(logging.DEBUG)
# create console handler
ch = logging.StreamHandler()
ch.setLevel(logging.DEBUG)
logger.addHandler(ch)
# fecha = ("Fecha: aaaa-mm")
now = datetime.datetime.now()
fecha = (str(now.year) + "-" + str(now.month))
y = now.year
m = now.month
if m == 12:
FirstDayMonth = str(datetime.date(y, m, 1).replace(day=1, month=1, year=datetime.date(y, 1, 1).year + 1)).replace(
"-", "")
else:
FirstDayMonth = str(datetime.date(y, m, 1).replace(day=1, month=datetime.date(y, m, 1).month + 1)).replace("-", "")
if m == 12:
LastDayMonth = str(
datetime.date(y, m, 1).replace(day=1, month=1, year=datetime.date(y, 1, 1).year + 1) - datetime.timedelta(
days=1)).replace("-", "")
else:
LastDayMonth = str(
datetime.date(y, m, 1).replace(day=1, month=datetime.date(y, m, 1).month + 1) - datetime.timedelta(
days=1)).replace("-", "")
API_KEY = "XXXXXXXXXXXXXXXXXXXXXXX" # cuenta nueva google
# ------------------ FUNCTION DEFINITIONS ------------------------
def print2csv(data):
f = open('result.csv', 'w')
writer = csv.writer(f)
writer.writerow(data)
f.close()
def get_google_results(address, api_key=None, return_full_response=False):
"""
Get geocode results from Google Maps Geocoding API.
Note, that in the case of multiple google geocode reuslts, this function returns details of the FIRST result.
@param address: String address as accurate as possible. For Example "18 Grafton Street, Dublin, Ireland"
@param api_key: String API key if present from google.
If supplied, requests will use your allowance from the Google API. If not, you
will be limited to the free usage of 2500 requests per day.
@param return_full_response: Boolean to indicate if you'd like to return the full response from google. This
is useful if you'd like additional location details for storage or parsing later.
"""
# Set up your Geocoding url
geocode_url = "https://maps.googleapis.com/maps/api/geocode/json?address=%s" % (address)
if api_key is not None:
geocode_url = geocode_url + "&key={}".format(api_key)
# Ping google for the reuslts:
results = requests.get(geocode_url)
# Results will be in JSON format - convert to dict using requests functionality
results = results.json()
# if there's no results or an error, return empty results.
if len(results['results']) == 0:
output = {
"formatted_address": None,
"latitude": None,
"longitude": None,
"accuracy": None,
"google_place_id": None,
"type": None,
"postcode": None
}
else:
answer = results['results'][0]
output = {
"formatted_address": answer.get('formatted_address'),
"latitude": answer.get('geometry').get('location').get('lat'),
"longitude": answer.get('geometry').get('location').get('lng'),
"accuracy": answer.get('geometry').get('location_type'),
"google_place_id": answer.get("place_id"),
"type": ",".join(answer.get('types')),
"postcode": ",".join([x['long_name'] for x in answer.get('address_components')
if 'postal_code' in x.get('types')])
}
# Append some other details:
output['input_string'] = address
output['number_of_results'] = len(results['results'])
output['status'] = results.get('status')
if return_full_response is True:
output['response'] = results
return output
# ---------------------------------------------------------------------
start_time = time.time()
# Crear conexion a SQL Server
# cnxn = pyodbc.connect('DRIVER={SQL Server};SERVER=%s;DATABASE=%s;UID=%s;PWD=%s' %(server, database, user, password))
# Crear conexion a SQL Server
print('Creating Connection to Server')
cnxn = pyodbc.connect('DRIVER={SQL Server};SERVER=%s;DATABASE=%s;UID=%s;PWD=%s' % (server, database, user, password))
# cnxn = pyodbc.connect('DRIVER={SQL Server};SERVER=%s;DATABASE=%s;Trusted_Connection=yes;' %(server, database))
print('Connected to SQL Server in %s seconds' % (time.time() - start_time))
# Create a cursor from the connection
cursor = cnxn.cursor()
# Seleccionar Direcciones sin Geocodificar
# cursor.execute("SELECT TOP(?) I.[IdCentro] ,I.[IdPersona] ,I.[IdAbono] , I.FechaAlta, I.FechaBaja ,[DireccionCompleta], [OX] ,[OY], UtmGeocoded FROM [DeporForus].[dbo].[Integrantes] I LEFT JOIN [Mapas].[dbo].[Personas_OX_OY] PP ON I.IdCentro = PP.IdCentro AND I.IdPersona = PP.IdPersona AND I.IdAbono = PP.IdAbono LEFT JOIN [Mapas].[dbo].[Personas] P ON I.IdCentro = P.IdCentro AND I.IdPersona = P.IdPersona LEFT JOIN [DeporForus].[dbo].[Personas] F ON I.IdCentro = F.IdCentro AND I.[IdPersona] = F.[IdPersona] WHERE i.FechaBaja >= DATEADD(YEAR, -2, GETDATE()) AND [DireccionCompleta] IS NOT NULL AND (UtmGeocoded IS NULL OR UtmGeocoded =0) AND I.IdCentro <> '07' AND I.IdCentro <> '23'", maxDirecciones)
cursor.execute(
"SELECT TOP(?) I.[IdCentro] ,I.[IdPersona] ,I.[IdAbono] , I.FechaAlta, I.FechaBaja ,[DireccionCompleta], [OX] ,[OY], UtmGeocoded FROM [DeporForus].[dbo].[Integrantes_Test] I LEFT JOIN [Mapas].[dbo].[Personas_OX_OY] PP ON I.IdCentro = PP.IdCentro AND I.IdPersona = PP.IdPersona AND I.IdAbono = PP.IdAbono LEFT JOIN [Mapas].[dbo].[Personas] P ON I.IdCentro = P.IdCentro AND I.IdPersona = P.IdPersona LEFT JOIN [DeporForus].[dbo].[Personas] F ON I.IdCentro = F.IdCentro AND I.[IdPersona] = F.[IdPersona] WHERE I.FechaAlta < GETDATE() AND (I.FechaBaja > GETDATE() OR I.FechaBaja IS NULL) AND [DireccionCompleta] IS NOT NULL and [DireccionCompleta] not like ' ,%' and [DireccionCompleta] not like 'indef%' AND (UtmGeocoded IS NULL OR UtmGeocoded =0)",
maxDirecciones)
# cursor.execute("SELECT TOP(?) I.[IdCentro] ,I.[IdPersona] ,I.[IdAbono] , I.FechaAlta, I.FechaBaja ,[DireccionCompleta], [OX] ,[OY], UtmGeocoded FROM [DeporForus].[dbo].[Integrantes] I LEFT JOIN [Mapas].[dbo].[Personas_OX_OY] PP ON I.IdCentro = PP.IdCentro AND I.IdPersona = PP.IdPersona AND I.IdAbono = PP.IdAbono LEFT JOIN [Mapas].[dbo].[Personas] P ON I.IdCentro = P.IdCentro AND I.IdPersona = P.IdPersona LEFT JOIN [DeporForus].[dbo].[Personas] F ON I.IdCentro = F.IdCentro AND I.[IdPersona] = F.[IdPersona] WHERE I.FechaAlta < GETDATE() AND (I.FechaBaja > GETDATE() OR I.FechaBaja IS NULL) AND [DireccionCompleta] IS NOT NULL AND (OX IS NULL OR UtmGeocoded =0)", maxDirecciones)
addresses = []
idPersonas = []
idCentros = []
idAbonos = []
for row in cursor.fetchall():
# Para eliminar "ruido" del nº de la dirección
# print(row.IdPersona, row.DireccionCompleta)
comaSplit = re.split(r"[,]", row.DireccionCompleta) # Separa string por ,
digitSplit = re.split('(\d+)', comaSplit[1]) # Separa por digitos 0,1, ..., 9
# addresJoin = (comaSplit[0]+", "+ digitSplit[1]+","+comaSplit[-2]+","+comaSplit[-1]).strip()
try:
addressesesJoin = (comaSplit[0] + ", " + digitSplit[1] + "," + comaSplit[-2] + "," + comaSplit[-1]).strip()
# addresJoin = (comaSplit[0]+", "+ digitSplit[1]+","+comaSplit[2]+","+comaSplit[3]).strip()
except Exception:
addressesesJoin = (comaSplit[0] + "," + comaSplit[-2] + "," + comaSplit[-1]).strip()
addresses.append(addressesesJoin)
idPersonas.append(row.IdPersona)
idCentros.append(row.IdCentro)
idAbonos.append(row.IdAbono)
print('Data fetch after %s seconds' % (time.time() - start_time))
# ------------------------ PROCESSING LOOP -----------------------------
'''
# Ensure, before we start, that the API key is ok/valid, and internet access is ok
test_result = get_google_results("London, England", API_KEY, RETURN_FULL_RESULTS)
if (test_result['status'] != 'OK') or (test_result['formatted_address'] != 'London, UK'):
logger.warning("There was an error when testing the Google Geocoder.")
raise ConnectionError('Problem with test results from Google Geocode - check your API key and internet connection.')
'''
# Create a list to hold results
results = []
resultsID = []
resultsOX = []
resultsOY = []
resultsIdCentros = []
resultsIdAbonos = []
resultsUtmGeocoded = []
# header = ['Centro', 'Persona', 'Lon', 'Lat', 'Google_Result']
# f = open('result.csv', 'w')
# writer = csv.writer(f)
# writer.writerow(header)
# f.close()
# Go through each address in turn
for address, idPersona, idCentro, idAbono in zip(addresses, idPersonas, idCentros, idAbonos):
# While the address geocoding is not finished:
geocoded = False
time.sleep(0.02)
while geocoded is not True:
# Geocode the address with google
try:
geocode_result = get_google_results(address, API_KEY, return_full_response=RETURN_FULL_RESULTS)
except Exception as e:
logger.exception(e)
logger.error("Major error with {}".format(address))
logger.error("Skipping!")
geocoded = True
# If we're over the API limit, backoff for a while and try again later.
if geocode_result['status'] == 'OVER_QUERY_LIMIT':
logger.info("Hit Query Limit! Backing off for a bit.")
time.sleep(BACKOFF_TIME * 60) # en seg.
geocoded = False
else:
# If we're ok with API use, save the results
# Note that the results might be empty / non-ok - log this
if geocode_result['status'] != 'OK':
logger.warning("Error geocoding IdCentro = {} - IdPersona = {} ".format(idCentro, idPersona))
resultsOX.append(geocode_result["longitude"])
resultsOY.append(geocode_result["latitude"])
resultsID.append(idPersona)
resultsIdCentros.append(idCentro)
resultsIdAbonos.append(idAbono)
resultsUtmGeocoded.append(1)
# si falla, lo registramos en la excel
# print2csv([idCentro, idPersona, geocode_result["longitude"], geocode_result["latitude"], geocode_result['status']])
geocoded = True
else:
resultsOX.append(geocode_result["longitude"])
resultsOY.append(geocode_result["latitude"])
resultsID.append(idPersona)
resultsIdCentros.append(idCentro)
resultsIdAbonos.append(idAbono)
resultsUtmGeocoded.append(1)
geocoded = True
# Print status every 100 addresses
if len(results) % 100 == 0:
logger.info("Completed {} of {} address".format(len(resultsID), len(addresses)))
## Every 500 addresses, save progress to file(in case of a failure so you have something!)
# if len(results) % 500 == 0:
# All done
logger.info("Finished geocoding all addresses")
print('Finished geocoding after %s seconds' % (time.time() - start_time))
## Update DB longitude
# for id, abono, centro, long, utm in zip(resultsID, resultsIdAbonos, resultsIdCentros, resultsOX, resultsUtmGeocoded):
# cursor.execute("UPDATE Personas_OX_OY SET OX = ?, UtmGeocoded = ? WHERE IdPersona = ? AND IdCentro = ? AND IdAbono = ?", long, utm, id, centro, abono)
## Update DB latitude
# for id, abono, centro, lat, utm in zip(resultsID, resultsIdAbonos, resultsIdCentros, resultsOY, resultsUtmGeocoded):
# cursor.execute("UPDATE Personas_OX_OY SET OY = ?, UtmGeocoded = ? WHERE IdPersona = ? AND IdCentro = ? AND IdAbono = ?", lat, utm, id, centro, abono)
# unificamos la consulta en una, no hacen falta 2 loops
for id, abono, centro, lat, long, utm in zip(resultsID, resultsIdAbonos, resultsIdCentros, resultsOY, resultsOX,
resultsUtmGeocoded):
cursor.execute(
"UPDATE Personas_OX_OY SET OX = ?, OY = ?, UtmGeocoded = ? WHERE IdPersona = ? AND IdCentro = ? AND IdAbono = ?",
long, lat, utm, id, centro, abono)
## Call cnxn.commit() or your work will be lost!
cnxn.commit()
cnxn.close()
print('Updated DB after %s seconds' % (time.time() - start_time))
logger.info('DONE!')
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment