-
-
Save nodarai/8babe132f4a2a15262e3d32b7f93656b to your computer and use it in GitHub Desktop.
import script
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| #!/usr/bin/env python3 | |
| import csv | |
| import json | |
| import locale | |
| import logging | |
| from concurrent.futures import ThreadPoolExecutor, as_completed | |
| from threading import Thread | |
| from time import time | |
| import pandas as pd | |
| from crawler.spider_runner import SpiderRunner | |
| from crawler.spiders.main_spider import Spider | |
| from helpers.mapping import US_STATES_MAP | |
| from services.encryption import Encryption | |
| locale.setlocale(locale.LC_ALL, 'en_US.utf8') | |
| logger = logging.getLogger() | |
| _DETAILED_RESULTS_HEADERS = [ | |
| "initial_index", "category", "subcategory", "name_input", "address_input", "city_input", "state_input", | |
| "zip_input", "phone_input", "matching_strength", "overall_status", "license_status", "insurance_status", | |
| "bond_status", "workers_comp_status", "lawsuits_status", "violations_status", "website", "license_number", | |
| "license_expiry", "license_suspend_date", "license_type", "license_SpecialtyCode1Desc", | |
| "license_SpecialtyCode2Desc", "PrimaryPrincipalName", "business_name", "phone_number", "ubi_number", | |
| # "location", | |
| "address_line1", "address_line2", "city", "state", "zip", | |
| "last_updated", "InsuranceCompany", | |
| "InsurancePolicyNo", "InsuranceAmt", "InsuranceEffectiveDate", "InsuranceExpirationDate", "InsuranceCancelDate", | |
| "InsuranceCompany_previous", "InsurancePolicyNo_previous", "InsuranceAmt_previous", | |
| "InsuranceExpirationDate_previous", "BondFirmName", "BondAccountID", "BondAmt", "BondReceivedByL&I", | |
| "BondEffectiveDate", "BondExpirationDate", "BondCancelDate", "BondImpaired", "BondImpairedDate", | |
| "BondFirmName_previous", "BondAccountID_previous", "BondAmt_previous", "BondCancelDate_previous", | |
| "WorkersComp_L&I_AccountID", "WorkersCompCertificate", "lawsuits_count_total", | |
| "lawsuits_count_trailing_12_months", "latest_lawsuit_complaint_date", "latest_lawsuit_status", | |
| "latest_lawsuit_cause_number", "latest_lawsuit_complaint_amount", "violations_count_total", | |
| "violations_count_trailing_12_months", "latest_violation_issue_date", "latest_violation_status", | |
| "latest_violation_infraction_number", "latest_violation_amount", "latest_violation_type", | |
| "latest_violation_description", | |
| "matching_strength_details", "contractors_details", | |
| ] | |
| types = { | |
| 'Beauty': 'Professional Beauty', | |
| 'homeservices': 'Home Services', | |
| 'Home': 'Home Services' | |
| } | |
| match_map = { | |
| 'FAILED': 9999999, | |
| 'EXACT': 3, | |
| 'STRONG': 2, | |
| 'WEAK': 1, | |
| 'NO MATCH': 0, | |
| } | |
| _LICENSE_ID_TEXT = [ | |
| 'Verified License', | |
| 'Consider inspection', | |
| 'Inactive License', | |
| '', | |
| ] | |
| def run_search(idx, item): | |
| logger.debug(f'Starting search for idx={idx}') | |
| search_request = dict() | |
| search_request['business_name'] = item['business_name'] | |
| search_request['license_category'] = types[item['category']] | |
| search_request['address'] = item['address'] | |
| search_request['city'] = item['city'] | |
| search_request['zip_code'] = str(item['zip']).split('.')[0] | |
| search_request['phone_number'] = str(item['phone'])[1:] | |
| spider_name = US_STATES_MAP[item['state']] | |
| contractors = None | |
| if item['state'] == 'CA': | |
| contractors = SpiderRunner(spider_name, **search_request).spider(**search_request).get_contractors_from_db() | |
| if contractors: | |
| logger.debug('Found a contractor in DB.') | |
| search_request.update(contractors[0]) | |
| if search_request.get('license_number'): | |
| search_request['license'] = search_request['license_number'] | |
| results = [] | |
| lic_numbers_set = set() | |
| try: | |
| item_lic_number_primary = str(int(item.get('lic_number_primary', ''))) | |
| except ValueError: | |
| item_lic_number_primary = str(item.get('lic_number_primary', '')) | |
| try: | |
| item_lic_number_secondary = str(int(item.get('lic_number_secondary', ''))) | |
| except ValueError: | |
| item_lic_number_secondary = str(item.get('lic_number_secondary', '')) | |
| if item_lic_number_primary.replace('na', '') or item_lic_number_secondary.replace('-', ''): | |
| for lic_number_primary in item_lic_number_primary.split(','): | |
| lic_number_primary = str(lic_number_primary).strip() | |
| if not lic_number_primary.replace('na', ''): | |
| continue | |
| lic_number = lic_number_primary.replace('na', '').strip() | |
| lic_numbers_set.add(lic_number) | |
| search_request['license_number'] = lic_number | |
| search_request['license'] = lic_number | |
| spider_runner = SpiderRunner(spider_name, **search_request) | |
| spider_results = spider_runner.get_results() | |
| for i, spider_result in enumerate(spider_results): | |
| request = Encryption(spider_result['viewstate']).decrypt() | |
| request = json.loads(request) | |
| request['license'] = search_request['license_number'] | |
| spider_details = spider_runner.get_details(request) | |
| spider_results[i].update(spider_details) | |
| spider_results[i]['contractors'] = contractors | |
| results += spider_results | |
| for lic_number_secondary in item_lic_number_secondary.split(','): | |
| lic_number = lic_number_secondary.replace('-', '') | |
| if not lic_number.strip() or lic_number.strip() in lic_numbers_set: | |
| continue | |
| search_request['license_number'] = lic_number_secondary.replace('-', '').strip() | |
| search_request['license'] = search_request['license_number'] | |
| search_request['secondary'] = True | |
| spider_results = SpiderRunner(spider_name, **search_request).get_results() | |
| for i, spider_result in enumerate(spider_results): | |
| request = Encryption(spider_result['viewstate']).decrypt() | |
| request = json.loads(request) | |
| request['license'] = search_request['license_number'] | |
| spider_runner = SpiderRunner(spider_name, **search_request) | |
| spider_details = spider_runner.get_details(request) | |
| spider_results[i].update(spider_details) | |
| spider_results[i]['contractors'] = contractors | |
| results += spider_results | |
| else: | |
| logger.debug('Primary and secondary licenses are not provided.') | |
| spider_results = SpiderRunner(spider_name, **search_request).get_results() | |
| for i, spider_result in enumerate(spider_results): | |
| request = Encryption(spider_result['viewstate']).decrypt() | |
| request = json.loads(request) | |
| request['license'] = request.get('license_number', '') | |
| spider_runner = SpiderRunner(spider_name, **search_request) | |
| spider_details = spider_runner.get_details(request) | |
| spider_results[i].update(spider_details) | |
| spider_results[i]['contractors'] = contractors | |
| results += spider_results | |
| return idx, results | |
| def safe_run_search(*args, **kwargs): | |
| try: | |
| res = run_search(*args, **kwargs) | |
| except Exception: | |
| idx = args[0] if args else kwargs.get('idx') | |
| logger.exception(f"Search failed for {idx}") | |
| res = (idx, [{'license_status': '', 'match_details': {'strength': 'FAILED'}}]) | |
| return res | |
| class ImportLicenses: | |
| def __init__(self, path): | |
| self.path = path | |
| def add_to_detailed_results(self, idx, search, found_result=None): | |
| found_result = found_result if found_result else {} | |
| insurance = found_result.get('insurance_information') or {} | |
| insurance_prev = insurance.get('previous') or {} | |
| bonding_information = found_result.get('bonding_information') or {} | |
| bonding_information_prev = bonding_information.get('previous') or {} | |
| workers_compensation = found_result.get('workers_compensation') or {} | |
| lawsuit_information = found_result.get('lawsuit_information') or {} | |
| violation_information = found_result.get('violation_information') or {} | |
| result = {} | |
| result["initial_index"] = idx | |
| result["category"] = search[0] | |
| result["subcategory"] = search[1] | |
| result["name_input"] = search[2] | |
| result["address_input"] = search[4] | |
| result["city_input"] = search[5] | |
| result["state_input"] = search[6] | |
| result["zip_input"] = search[7] | |
| result["phone_input"] = search[8] | |
| result["matching_strength"] = found_result.get('match_details', {}).get("strength", '') | |
| result["overall_status"] = _LICENSE_ID_TEXT[found_result.get('license_status_id', 0) - 1] | |
| result["license_status"] = found_result.get('license_status', '') | |
| result["insurance_status"] = insurance.get('insurance_status', '') | |
| result["bond_status"] = bonding_information.get('bonding_status', '') | |
| result["workers_comp_status"] = workers_compensation.get('workers_status', '') | |
| result["lawsuits_status"] = lawsuit_information.get('complaint_status', '') | |
| result["violations_status"] = violation_information.get('violation_status', '') | |
| result["website"] = found_result.get('url', '') | |
| result["license_number"] = found_result.get('license_number', '') | |
| result["license_expiry"] = found_result.get('license_expiry', '') | |
| result["license_suspend_date"] = found_result.get('suspension_date', '') | |
| result["license_type"] = found_result.get('license_type', '') | |
| result["license_SpecialtyCode1Desc"] = found_result.get('specialty_code_1_desc', '') | |
| result["license_SpecialtyCode2Desc"] = found_result.get('specialty_code_2_desc', '') | |
| result["PrimaryPrincipalName"] = found_result.get('primary_principal_name', '') | |
| result["business_name"] = found_result.get('name', '') | |
| result["phone_number"] = found_result.get('phone_number', '') | |
| result["ubi_number"] = found_result.get('ubi_number', '') | |
| # result["location"] = found_result.get('location', '') | |
| result["address_line1"] = found_result.get('street', '').title() | |
| result["address_line2"] = found_result.get('', '').title() | |
| result["city"] = found_result.get('city', '').title() | |
| result["state"] = found_result.get('state_abb', '').upper() | |
| result["zip"] = found_result.get('zip_code', '') | |
| result["last_updated"] = found_result.get('last_updated', '') | |
| result["InsuranceCompany"] = insurance.get('company_name', '') | |
| result["InsurancePolicyNo"] = insurance.get('policy_number', '') | |
| result["InsuranceAmt"] = insurance.get('insurance_amount', '') | |
| result["InsuranceEffectiveDate"] = insurance.get('effective_date', '') | |
| result["InsuranceExpirationDate"] = insurance.get('expiry_date', '') | |
| result["InsuranceCancelDate"] = insurance.get('cancellation_date', '') | |
| result["InsuranceCompany_previous"] = insurance_prev.get('company_name', '') | |
| result["InsurancePolicyNo_previous"] = insurance_prev.get('policy_number', '') | |
| result["InsuranceAmt_previous"] = insurance_prev.get('insurance_amount', '') | |
| result["InsuranceExpirationDate_previous"] = insurance_prev.get('expiry_date', '') | |
| result["BondFirmName"] = bonding_information.get('company_name', '') | |
| result["BondAccountID"] = bonding_information.get('bond_number', '') | |
| result["BondAmt"] = bonding_information.get('bond_amount', '') | |
| result["BondReceivedByL&I"] = bonding_information.get('received_by_li', '') | |
| result["BondEffectiveDate"] = bonding_information.get('effective_date', '') | |
| result["BondExpirationDate"] = bonding_information.get('expiry_date', '') | |
| result["BondCancelDate"] = bonding_information.get('cancellation_date', '') | |
| result["BondImpaired"] = bonding_information.get('impaired', '') | |
| result["BondImpairedDate"] = bonding_information.get('impaired_date', '') | |
| result["BondFirmName_previous"] = bonding_information_prev.get('company_name', '') | |
| result["BondAccountID_previous"] = bonding_information_prev.get('bond_number', '') | |
| result["BondAmt_previous"] = bonding_information_prev.get('bond_amount', '') | |
| result["BondCancelDate_previous"] = bonding_information_prev.get('cancellation_date', '') | |
| result["WorkersComp_L&I_AccountID"] = workers_compensation.get('li_account', '') | |
| result["WorkersCompCertificate"] = workers_compensation.get('certificate', '') | |
| result["lawsuits_count_total"] = lawsuit_information.get('count', '') | |
| result["lawsuits_count_trailing_12_months"] = lawsuit_information.get('count_12_month', '') | |
| result["latest_lawsuit_complaint_date"] = lawsuit_information.get('complaint_date', '') | |
| result["latest_lawsuit_status"] = lawsuit_information.get('complaint_status', '') | |
| result["latest_lawsuit_cause_number"] = lawsuit_information.get('cause_number', '') | |
| result["latest_lawsuit_complaint_amount"] = lawsuit_information.get('complaint_amount', '') | |
| result["violations_count_total"] = violation_information.get('count', '') | |
| result["violations_count_trailing_12_months"] = violation_information.get('count_12_month', '') | |
| result["latest_violation_issue_date"] = violation_information.get('issue_date', '') | |
| result["latest_violation_status"] = violation_information.get('violation_status', '') | |
| result["latest_violation_infraction_number"] = violation_information.get('infraction_id', '') | |
| result["latest_violation_amount"] = violation_information.get('violation_amount', '') | |
| result["latest_violation_type"] = violation_information.get('violation_type', '') | |
| result["latest_violation_description"] = violation_information.get('violation_description', '') | |
| result["matching_strength_details"] = found_result.get('match_details', {}) | |
| result["contractors_details"] = found_result.get('contractors', {}) | |
| self.detailed_results.append(result) | |
| def read_file(self): | |
| if self.path.endswith('.csv'): | |
| df = pd.read_csv(self.path, dtype=object) | |
| df.fillna('', inplace=True) | |
| else: | |
| return | |
| timestamp = round(time()) | |
| csvfile2 = open(f'result-{timestamp}.csv', 'w') | |
| writer2 = csv.writer(csvfile2) | |
| headers = list(df.columns.values) | |
| writer2.writerow(['initial_index'] + headers + ['results_count', 'best_match']) | |
| found_results = {} | |
| with ThreadPoolExecutor(10) as executor: | |
| threads = [executor.submit(safe_run_search, idx, item) for idx, item in enumerate(df.iloc)] | |
| for thread in as_completed(threads): | |
| res_idx, res = thread.result() | |
| found_results[res_idx] = res | |
| # To run in single thread use the below lines | |
| # for idx, item in enumerate(df.iloc): | |
| # found_results[idx] = run_search(idx, item)[1] | |
| logger.debug('All threads finished running.') | |
| self.detailed_results = [] | |
| for idx, results in sorted(found_results.items(), key=lambda x: x[0]): | |
| search = list(df.iloc[idx]) | |
| best_result = 'NO MATCH' | |
| if results: | |
| for result in results: | |
| if not 'license_status' in result: | |
| viewstate = result['viewstate'] | |
| decoded_result = Encryption(viewstate).decrypt() | |
| decoded_details = json.loads(decoded_result) | |
| else: | |
| decoded_details = result | |
| if 'match_details' not in decoded_details: | |
| decoded_details = Spider().pipeline(decoded_details) | |
| strength = decoded_details['match_details']['strength'] | |
| result = [strength] + list(decoded_details.values()) | |
| self.add_to_detailed_results(idx, search, decoded_details) | |
| if match_map[strength] > match_map[best_result]: | |
| best_result = strength | |
| else: | |
| self.add_to_detailed_results(idx, search) | |
| writer2.writerow([idx] + search + [len(results), best_result]) | |
| logger.debug('Finished writing results summary.') | |
| if self.detailed_results: | |
| with open(f'detailed_result-{timestamp}.csv', 'w') as csvfile1: | |
| writer1 = csv.writer(csvfile1) | |
| writer1.writerow(_DETAILED_RESULTS_HEADERS) | |
| for row in self.detailed_results: | |
| writer1.writerow(list(row.values())) | |
| logger.debug('Finished writing detailed summary.') | |
| if __name__ == '__main__': | |
| import argparse | |
| parser = argparse.ArgumentParser(description='Run searches in bulk.') | |
| parser.add_argument('input_file', type=str, help='input file path') | |
| args = parser.parse_args() | |
| t = time() | |
| ImportLicenses(args.input_file).read_file() | |
| logger.info(f'Time elapsed: {time() - t}') |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment