Skip to content

Instantly share code, notes, and snippets.

@nodarai
Forked from bersena911/import_script.py
Last active September 25, 2020 12:02
Show Gist options
  • Select an option

  • Save nodarai/8babe132f4a2a15262e3d32b7f93656b to your computer and use it in GitHub Desktop.

Select an option

Save nodarai/8babe132f4a2a15262e3d32b7f93656b to your computer and use it in GitHub Desktop.
import script
#!/usr/bin/env python3
import csv
import json
import locale
import logging
from concurrent.futures import ThreadPoolExecutor, as_completed
from threading import Thread
from time import time
import pandas as pd
from crawler.spider_runner import SpiderRunner
from crawler.spiders.main_spider import Spider
from helpers.mapping import US_STATES_MAP
from services.encryption import Encryption
locale.setlocale(locale.LC_ALL, 'en_US.utf8')
logger = logging.getLogger()
_DETAILED_RESULTS_HEADERS = [
"initial_index", "category", "subcategory", "name_input", "address_input", "city_input", "state_input",
"zip_input", "phone_input", "matching_strength", "overall_status", "license_status", "insurance_status",
"bond_status", "workers_comp_status", "lawsuits_status", "violations_status", "website", "license_number",
"license_expiry", "license_suspend_date", "license_type", "license_SpecialtyCode1Desc",
"license_SpecialtyCode2Desc", "PrimaryPrincipalName", "business_name", "phone_number", "ubi_number",
# "location",
"address_line1", "address_line2", "city", "state", "zip",
"last_updated", "InsuranceCompany",
"InsurancePolicyNo", "InsuranceAmt", "InsuranceEffectiveDate", "InsuranceExpirationDate", "InsuranceCancelDate",
"InsuranceCompany_previous", "InsurancePolicyNo_previous", "InsuranceAmt_previous",
"InsuranceExpirationDate_previous", "BondFirmName", "BondAccountID", "BondAmt", "BondReceivedByL&I",
"BondEffectiveDate", "BondExpirationDate", "BondCancelDate", "BondImpaired", "BondImpairedDate",
"BondFirmName_previous", "BondAccountID_previous", "BondAmt_previous", "BondCancelDate_previous",
"WorkersComp_L&I_AccountID", "WorkersCompCertificate", "lawsuits_count_total",
"lawsuits_count_trailing_12_months", "latest_lawsuit_complaint_date", "latest_lawsuit_status",
"latest_lawsuit_cause_number", "latest_lawsuit_complaint_amount", "violations_count_total",
"violations_count_trailing_12_months", "latest_violation_issue_date", "latest_violation_status",
"latest_violation_infraction_number", "latest_violation_amount", "latest_violation_type",
"latest_violation_description",
"matching_strength_details", "contractors_details",
]
types = {
'Beauty': 'Professional Beauty',
'homeservices': 'Home Services',
'Home': 'Home Services'
}
match_map = {
'FAILED': 9999999,
'EXACT': 3,
'STRONG': 2,
'WEAK': 1,
'NO MATCH': 0,
}
_LICENSE_ID_TEXT = [
'Verified License',
'Consider inspection',
'Inactive License',
'',
]
def run_search(idx, item):
logger.debug(f'Starting search for idx={idx}')
search_request = dict()
search_request['business_name'] = item['business_name']
search_request['license_category'] = types[item['category']]
search_request['address'] = item['address']
search_request['city'] = item['city']
search_request['zip_code'] = str(item['zip']).split('.')[0]
search_request['phone_number'] = str(item['phone'])[1:]
spider_name = US_STATES_MAP[item['state']]
contractors = None
if item['state'] == 'CA':
contractors = SpiderRunner(spider_name, **search_request).spider(**search_request).get_contractors_from_db()
if contractors:
logger.debug('Found a contractor in DB.')
search_request.update(contractors[0])
if search_request.get('license_number'):
search_request['license'] = search_request['license_number']
results = []
lic_numbers_set = set()
try:
item_lic_number_primary = str(int(item.get('lic_number_primary', '')))
except ValueError:
item_lic_number_primary = str(item.get('lic_number_primary', ''))
try:
item_lic_number_secondary = str(int(item.get('lic_number_secondary', '')))
except ValueError:
item_lic_number_secondary = str(item.get('lic_number_secondary', ''))
if item_lic_number_primary.replace('na', '') or item_lic_number_secondary.replace('-', ''):
for lic_number_primary in item_lic_number_primary.split(','):
lic_number_primary = str(lic_number_primary).strip()
if not lic_number_primary.replace('na', ''):
continue
lic_number = lic_number_primary.replace('na', '').strip()
lic_numbers_set.add(lic_number)
search_request['license_number'] = lic_number
search_request['license'] = lic_number
spider_runner = SpiderRunner(spider_name, **search_request)
spider_results = spider_runner.get_results()
for i, spider_result in enumerate(spider_results):
request = Encryption(spider_result['viewstate']).decrypt()
request = json.loads(request)
request['license'] = search_request['license_number']
spider_details = spider_runner.get_details(request)
spider_results[i].update(spider_details)
spider_results[i]['contractors'] = contractors
results += spider_results
for lic_number_secondary in item_lic_number_secondary.split(','):
lic_number = lic_number_secondary.replace('-', '')
if not lic_number.strip() or lic_number.strip() in lic_numbers_set:
continue
search_request['license_number'] = lic_number_secondary.replace('-', '').strip()
search_request['license'] = search_request['license_number']
search_request['secondary'] = True
spider_results = SpiderRunner(spider_name, **search_request).get_results()
for i, spider_result in enumerate(spider_results):
request = Encryption(spider_result['viewstate']).decrypt()
request = json.loads(request)
request['license'] = search_request['license_number']
spider_runner = SpiderRunner(spider_name, **search_request)
spider_details = spider_runner.get_details(request)
spider_results[i].update(spider_details)
spider_results[i]['contractors'] = contractors
results += spider_results
else:
logger.debug('Primary and secondary licenses are not provided.')
spider_results = SpiderRunner(spider_name, **search_request).get_results()
for i, spider_result in enumerate(spider_results):
request = Encryption(spider_result['viewstate']).decrypt()
request = json.loads(request)
request['license'] = request.get('license_number', '')
spider_runner = SpiderRunner(spider_name, **search_request)
spider_details = spider_runner.get_details(request)
spider_results[i].update(spider_details)
spider_results[i]['contractors'] = contractors
results += spider_results
return idx, results
def safe_run_search(*args, **kwargs):
try:
res = run_search(*args, **kwargs)
except Exception:
idx = args[0] if args else kwargs.get('idx')
logger.exception(f"Search failed for {idx}")
res = (idx, [{'license_status': '', 'match_details': {'strength': 'FAILED'}}])
return res
class ImportLicenses:
def __init__(self, path):
self.path = path
def add_to_detailed_results(self, idx, search, found_result=None):
found_result = found_result if found_result else {}
insurance = found_result.get('insurance_information') or {}
insurance_prev = insurance.get('previous') or {}
bonding_information = found_result.get('bonding_information') or {}
bonding_information_prev = bonding_information.get('previous') or {}
workers_compensation = found_result.get('workers_compensation') or {}
lawsuit_information = found_result.get('lawsuit_information') or {}
violation_information = found_result.get('violation_information') or {}
result = {}
result["initial_index"] = idx
result["category"] = search[0]
result["subcategory"] = search[1]
result["name_input"] = search[2]
result["address_input"] = search[4]
result["city_input"] = search[5]
result["state_input"] = search[6]
result["zip_input"] = search[7]
result["phone_input"] = search[8]
result["matching_strength"] = found_result.get('match_details', {}).get("strength", '')
result["overall_status"] = _LICENSE_ID_TEXT[found_result.get('license_status_id', 0) - 1]
result["license_status"] = found_result.get('license_status', '')
result["insurance_status"] = insurance.get('insurance_status', '')
result["bond_status"] = bonding_information.get('bonding_status', '')
result["workers_comp_status"] = workers_compensation.get('workers_status', '')
result["lawsuits_status"] = lawsuit_information.get('complaint_status', '')
result["violations_status"] = violation_information.get('violation_status', '')
result["website"] = found_result.get('url', '')
result["license_number"] = found_result.get('license_number', '')
result["license_expiry"] = found_result.get('license_expiry', '')
result["license_suspend_date"] = found_result.get('suspension_date', '')
result["license_type"] = found_result.get('license_type', '')
result["license_SpecialtyCode1Desc"] = found_result.get('specialty_code_1_desc', '')
result["license_SpecialtyCode2Desc"] = found_result.get('specialty_code_2_desc', '')
result["PrimaryPrincipalName"] = found_result.get('primary_principal_name', '')
result["business_name"] = found_result.get('name', '')
result["phone_number"] = found_result.get('phone_number', '')
result["ubi_number"] = found_result.get('ubi_number', '')
# result["location"] = found_result.get('location', '')
result["address_line1"] = found_result.get('street', '').title()
result["address_line2"] = found_result.get('', '').title()
result["city"] = found_result.get('city', '').title()
result["state"] = found_result.get('state_abb', '').upper()
result["zip"] = found_result.get('zip_code', '')
result["last_updated"] = found_result.get('last_updated', '')
result["InsuranceCompany"] = insurance.get('company_name', '')
result["InsurancePolicyNo"] = insurance.get('policy_number', '')
result["InsuranceAmt"] = insurance.get('insurance_amount', '')
result["InsuranceEffectiveDate"] = insurance.get('effective_date', '')
result["InsuranceExpirationDate"] = insurance.get('expiry_date', '')
result["InsuranceCancelDate"] = insurance.get('cancellation_date', '')
result["InsuranceCompany_previous"] = insurance_prev.get('company_name', '')
result["InsurancePolicyNo_previous"] = insurance_prev.get('policy_number', '')
result["InsuranceAmt_previous"] = insurance_prev.get('insurance_amount', '')
result["InsuranceExpirationDate_previous"] = insurance_prev.get('expiry_date', '')
result["BondFirmName"] = bonding_information.get('company_name', '')
result["BondAccountID"] = bonding_information.get('bond_number', '')
result["BondAmt"] = bonding_information.get('bond_amount', '')
result["BondReceivedByL&I"] = bonding_information.get('received_by_li', '')
result["BondEffectiveDate"] = bonding_information.get('effective_date', '')
result["BondExpirationDate"] = bonding_information.get('expiry_date', '')
result["BondCancelDate"] = bonding_information.get('cancellation_date', '')
result["BondImpaired"] = bonding_information.get('impaired', '')
result["BondImpairedDate"] = bonding_information.get('impaired_date', '')
result["BondFirmName_previous"] = bonding_information_prev.get('company_name', '')
result["BondAccountID_previous"] = bonding_information_prev.get('bond_number', '')
result["BondAmt_previous"] = bonding_information_prev.get('bond_amount', '')
result["BondCancelDate_previous"] = bonding_information_prev.get('cancellation_date', '')
result["WorkersComp_L&I_AccountID"] = workers_compensation.get('li_account', '')
result["WorkersCompCertificate"] = workers_compensation.get('certificate', '')
result["lawsuits_count_total"] = lawsuit_information.get('count', '')
result["lawsuits_count_trailing_12_months"] = lawsuit_information.get('count_12_month', '')
result["latest_lawsuit_complaint_date"] = lawsuit_information.get('complaint_date', '')
result["latest_lawsuit_status"] = lawsuit_information.get('complaint_status', '')
result["latest_lawsuit_cause_number"] = lawsuit_information.get('cause_number', '')
result["latest_lawsuit_complaint_amount"] = lawsuit_information.get('complaint_amount', '')
result["violations_count_total"] = violation_information.get('count', '')
result["violations_count_trailing_12_months"] = violation_information.get('count_12_month', '')
result["latest_violation_issue_date"] = violation_information.get('issue_date', '')
result["latest_violation_status"] = violation_information.get('violation_status', '')
result["latest_violation_infraction_number"] = violation_information.get('infraction_id', '')
result["latest_violation_amount"] = violation_information.get('violation_amount', '')
result["latest_violation_type"] = violation_information.get('violation_type', '')
result["latest_violation_description"] = violation_information.get('violation_description', '')
result["matching_strength_details"] = found_result.get('match_details', {})
result["contractors_details"] = found_result.get('contractors', {})
self.detailed_results.append(result)
def read_file(self):
if self.path.endswith('.csv'):
df = pd.read_csv(self.path, dtype=object)
df.fillna('', inplace=True)
else:
return
timestamp = round(time())
csvfile2 = open(f'result-{timestamp}.csv', 'w')
writer2 = csv.writer(csvfile2)
headers = list(df.columns.values)
writer2.writerow(['initial_index'] + headers + ['results_count', 'best_match'])
found_results = {}
with ThreadPoolExecutor(10) as executor:
threads = [executor.submit(safe_run_search, idx, item) for idx, item in enumerate(df.iloc)]
for thread in as_completed(threads):
res_idx, res = thread.result()
found_results[res_idx] = res
# To run in single thread use the below lines
# for idx, item in enumerate(df.iloc):
# found_results[idx] = run_search(idx, item)[1]
logger.debug('All threads finished running.')
self.detailed_results = []
for idx, results in sorted(found_results.items(), key=lambda x: x[0]):
search = list(df.iloc[idx])
best_result = 'NO MATCH'
if results:
for result in results:
if not 'license_status' in result:
viewstate = result['viewstate']
decoded_result = Encryption(viewstate).decrypt()
decoded_details = json.loads(decoded_result)
else:
decoded_details = result
if 'match_details' not in decoded_details:
decoded_details = Spider().pipeline(decoded_details)
strength = decoded_details['match_details']['strength']
result = [strength] + list(decoded_details.values())
self.add_to_detailed_results(idx, search, decoded_details)
if match_map[strength] > match_map[best_result]:
best_result = strength
else:
self.add_to_detailed_results(idx, search)
writer2.writerow([idx] + search + [len(results), best_result])
logger.debug('Finished writing results summary.')
if self.detailed_results:
with open(f'detailed_result-{timestamp}.csv', 'w') as csvfile1:
writer1 = csv.writer(csvfile1)
writer1.writerow(_DETAILED_RESULTS_HEADERS)
for row in self.detailed_results:
writer1.writerow(list(row.values()))
logger.debug('Finished writing detailed summary.')
if __name__ == '__main__':
import argparse
parser = argparse.ArgumentParser(description='Run searches in bulk.')
parser.add_argument('input_file', type=str, help='input file path')
args = parser.parse_args()
t = time()
ImportLicenses(args.input_file).read_file()
logger.info(f'Time elapsed: {time() - t}')
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment