Created
December 28, 2020 11:44
-
-
Save adityaraute/6ad251cbf2e2993bb19d3b0f8876b048 to your computer and use it in GitHub Desktop.
Using PySpark in Google Colab
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| { | |
| "nbformat": 4, | |
| "nbformat_minor": 0, | |
| "metadata": { | |
| "colab": { | |
| "name": "Restaurant Recommender.ipynb", | |
| "provenance": [], | |
| "collapsed_sections": [ | |
| "0yR5iT_szD1o", | |
| "bIEdOY_2C5T4", | |
| "x_v6aF1HiQjw" | |
| ], | |
| "toc_visible": true | |
| }, | |
| "kernelspec": { | |
| "name": "python3", | |
| "display_name": "Python 3" | |
| } | |
| }, | |
| "cells": [ | |
| { | |
| "cell_type": "markdown", | |
| "metadata": { | |
| "id": "n7opPpCWbDPh" | |
| }, | |
| "source": [ | |
| "##Restaurant Recommender Based On PySpark\n", | |
| "\n" | |
| ] | |
| }, | |
| { | |
| "cell_type": "markdown", | |
| "metadata": { | |
| "id": "0yR5iT_szD1o" | |
| }, | |
| "source": [ | |
| "#### Setting up the environment\r\n", | |
| "\r\n", | |
| "In Google Colab, we fetch the spark modules, configure and install them. \r\n", | |
| "We also import libraries like matplotlib that can help us visualize the data.\r\n", | |
| "We create an object of the SparkContext and name it 'sc'. The first cell may take longer to run.\r\n" | |
| ] | |
| }, | |
| { | |
| "cell_type": "code", | |
| "metadata": { | |
| "id": "RpDA1bkZyGJ0" | |
| }, | |
| "source": [ | |
| "!apt-get install openjdk-8-jdk-headless -qq > /dev/null\n", | |
| "!wget -q https://www-us.apache.org/dist/spark/spark-2.4.7/spark-2.4.7-bin-hadoop2.7.tgz\n", | |
| "!tar xf spark-2.4.7-bin-hadoop2.7.tgz\n", | |
| "!pip install -q findspark" | |
| ], | |
| "execution_count": null, | |
| "outputs": [] | |
| }, | |
| { | |
| "cell_type": "code", | |
| "metadata": { | |
| "id": "5l76xcy5FKhy" | |
| }, | |
| "source": [ | |
| "import matplotlib.pyplot as plt\n", | |
| "import os\n", | |
| "import findspark\n", | |
| "from pyspark import SparkContext" | |
| ], | |
| "execution_count": null, | |
| "outputs": [] | |
| }, | |
| { | |
| "cell_type": "code", | |
| "metadata": { | |
| "id": "Kwpl_jKKFUBy" | |
| }, | |
| "source": [ | |
| "os.environ[\"JAVA_HOME\"] = \"/usr/lib/jvm/java-8-openjdk-amd64\"\n", | |
| "os.environ[\"SPARK_HOME\"] = \"/content/spark-2.4.7-bin-hadoop2.7\"\n", | |
| "findspark.init()\n", | |
| "sc = SparkContext(\"local\")\n" | |
| ], | |
| "execution_count": null, | |
| "outputs": [] | |
| }, | |
| { | |
| "cell_type": "markdown", | |
| "metadata": { | |
| "id": "l48Duzhtze7I" | |
| }, | |
| "source": [ | |
| "I have provided both 'entree_data.tar.gz' zipped folder as well as the extracted files and folder. You can use either one to run the project. Only run the below cell if you are using the zipped folder.\r\n" | |
| ] | |
| }, | |
| { | |
| "cell_type": "code", | |
| "metadata": { | |
| "colab": { | |
| "base_uri": "https://localhost:8080/" | |
| }, | |
| "id": "UGW-z9JMFUgn", | |
| "outputId": "d33b6cb1-2a54-471f-bb83-ce8f074337d0" | |
| }, | |
| "source": [ | |
| "!tar xf entree_data.tar.gz" | |
| ], | |
| "execution_count": null, | |
| "outputs": [ | |
| { | |
| "output_type": "stream", | |
| "text": [ | |
| "entree\t\t sample_data\t\t spark-2.4.7-bin-hadoop2.7.tgz\n", | |
| "entree_data.tar.gz spark-2.4.7-bin-hadoop2.7 spark-2.4.7-bin-hadoop2.7.tgz.1\n" | |
| ], | |
| "name": "stdout" | |
| } | |
| ] | |
| }, | |
| { | |
| "cell_type": "code", | |
| "metadata": { | |
| "id": "TeT159JeFUmV" | |
| }, | |
| "source": [ | |
| "!ls" | |
| ], | |
| "execution_count": null, | |
| "outputs": [] | |
| }, | |
| { | |
| "cell_type": "markdown", | |
| "metadata": { | |
| "id": "7Sai5xlxCC3h" | |
| }, | |
| "source": [ | |
| "\r\n" | |
| ] | |
| }, | |
| { | |
| "cell_type": "markdown", | |
| "metadata": { | |
| "id": "bIEdOY_2C5T4" | |
| }, | |
| "source": [ | |
| "#### Processing the session files.\r\n", | |
| "\r\n", | |
| "Here, we will first use the session file to generate popularity score of each restaurant.\r\n" | |
| ] | |
| }, | |
| { | |
| "cell_type": "code", | |
| "metadata": { | |
| "id": "NWuZWNXD_hX0", | |
| "colab": { | |
| "base_uri": "https://localhost:8080/" | |
| }, | |
| "outputId": "4b4f05a1-84a8-4a19-8a1a-59a13f5248e7" | |
| }, | |
| "source": [ | |
| "obj = {'Atlanta': [], 'Boston': [], 'Chicago':[], 'Los Angeles':[], 'New Orleans':[], 'New York':[], 'San Francisco':[], 'Washington D.C.':[]}\n", | |
| "\n", | |
| "def trail(x):\n", | |
| " if len(x)==1:\n", | |
| " return \"000000\"+x\n", | |
| " elif len(x)==2:\n", | |
| " return \"00000\"+x\n", | |
| " elif len(x)==3:\n", | |
| " return \"0000\"+x\n", | |
| "\n", | |
| "def locmap(x):\n", | |
| " if(x[-1]=='A'):\n", | |
| " return ('Atlanta',trail(x[:-1]))\n", | |
| " elif(x[-1]=='B'):\n", | |
| " return ('Boston',trail(x[:-1]))\n", | |
| " elif(x[-1]=='C'):\n", | |
| " return ('Chicago',trail(x[:-1]))\n", | |
| " elif(x[-1]=='E'):\n", | |
| " return ('New Orleans',trail(x[:-1]))\n", | |
| " elif(x[-1]=='F'):\n", | |
| " return ('New York',trail(x[:-1]))\n", | |
| " elif(x[-1]=='G'):\n", | |
| " return ('San Francisco',trail(x[:-1]))\n", | |
| " elif(x[-1]=='D'):\n", | |
| " return ('Los Angeles',trail(x[:-1]))\n", | |
| " elif(x[-1]=='H'):\n", | |
| " return ('Washington D.C.',trail(x[:-1]))\n", | |
| "\n", | |
| "def to_list(a):\n", | |
| " return [a]\n", | |
| "\n", | |
| "def append(a, b):\n", | |
| " a.append(b)\n", | |
| " return a\n", | |
| "\n", | |
| "def extend(a, b):\n", | |
| " a.extend(b)\n", | |
| " return a\n", | |
| "\n", | |
| "sess = sc.textFile(\"entree/session/session.19*\").map(lambda s: s.split(\"\\t\")).map(lambda s: s[2])\n", | |
| "print(\"Number of rows before filtering: \", sess.count())\n", | |
| "sess = sess.filter(lambda x: x[0]!='0')\n", | |
| "\n", | |
| "\n", | |
| "print(\"Number of rows after filtering: \", sess.count())\n", | |
| "sess = sc.parallelize(sess.map(lambda x: locmap(x)).collect())\n", | |
| "sess=sess.combineByKey(to_list, append, extend).collect()\n", | |
| "\n", | |
| "print(\"Example of data being stored in the 'sess' object\")\n", | |
| "for i in sess:\n", | |
| " print(i[0], i[1][:5], \"\\t \\t Total restaurants: \",len(i[1]))" | |
| ], | |
| "execution_count": null, | |
| "outputs": [ | |
| { | |
| "output_type": "stream", | |
| "text": [ | |
| "Number of rows before filtering: 50672\n", | |
| "Number of rows after filtering: 5995\n", | |
| "\n", | |
| "Chicago ['0000260', '0000250', '0000010', '0000418', '0000418'] \t \t Total restaurants: 4990\n", | |
| "New York ['0000723', '0000816', '0000816', '0000017', '0000622'] \t \t Total restaurants: 278\n", | |
| "New Orleans ['0000024', '0000266', '0000221', '0000221', '0000174'] \t \t Total restaurants: 100\n", | |
| "Los Angeles ['0000180', '0000180', '0000180', '0000180', '0000180'] \t \t Total restaurants: 180\n", | |
| "Atlanta ['0000012', '0000012', '0000152', '0000245', '0000023'] \t \t Total restaurants: 71\n", | |
| "Washington D.C. ['0000325', '0000065', '0000365', '0000325', '0000372'] \t \t Total restaurants: 118\n", | |
| "San Francisco ['0000198', '0000078', '0000227', '0000406', '0000231'] \t \t Total restaurants: 129\n", | |
| "Boston ['0000312', '0000008', '0000403', '0000059', '0000140'] \t \t Total restaurants: 129\n" | |
| ], | |
| "name": "stdout" | |
| } | |
| ] | |
| }, | |
| { | |
| "cell_type": "markdown", | |
| "metadata": { | |
| "id": "pBgdYO-qCTaU" | |
| }, | |
| "source": [ | |
| "As can be seen above, after filtering almost 90% of the data is lost. We can conclude that this process is optional or is rather harming the correctness of the program. But it includes some important PySpark concepts and hence, I would rather not modify it." | |
| ] | |
| }, | |
| { | |
| "cell_type": "markdown", | |
| "metadata": { | |
| "id": "x_v6aF1HiQjw" | |
| }, | |
| "source": [ | |
| "#### Onto the Data folder\r\n", | |
| "\r\n", | |
| "In the following function, we shall only be considering 4 parameters as important: \r\n", | |
| "\r\n", | |
| "\r\n", | |
| "\r\n", | |
| "1. The ID of the restaurant\r\n", | |
| "2. The list of features\r\n", | |
| "3. The city in which the restaurant is present\r\n", | |
| "4. The name of the restaurant\r\n", | |
| "5. The popularity score\r\n", | |
| "\r\n", | |
| "\r\n" | |
| ] | |
| }, | |
| { | |
| "cell_type": "code", | |
| "metadata": { | |
| "id": "SVY4Q4OBuxCB" | |
| }, | |
| "source": [ | |
| "def stream(s, city):\n", | |
| " for i in sess:\n", | |
| " if(i[0]==city):\n", | |
| " popularity = i[1].count(s[0])\n", | |
| " return (s[1], s[2].split(), city, s[0],popularity)" | |
| ], | |
| "execution_count": null, | |
| "outputs": [] | |
| }, | |
| { | |
| "cell_type": "markdown", | |
| "metadata": { | |
| "id": "Rjzol6-pB-dz" | |
| }, | |
| "source": [ | |
| "Here, we are creating RDDs and transforming them based on our needs. We are also maintaining counts of each city in the `resto` list.\r\n" | |
| ] | |
| }, | |
| { | |
| "cell_type": "code", | |
| "metadata": { | |
| "id": "ZW4xZx6w2E4S" | |
| }, | |
| "source": [ | |
| "resto=[]\r\n", | |
| "atlanta = sc.textFile('entree/data/atlanta.txt')\r\n", | |
| "atlanta = atlanta.map(lambda s: s.split('\\t')).map(lambda s:stream(s, 'Atlanta'))\r\n", | |
| "atcount = atlanta.count()\r\n", | |
| "resto.append()\r\n", | |
| "boston = sc.textFile('entree/data/boston.txt')\r\n", | |
| "boston = boston.map(lambda s: s.split('\\t')).map(lambda s: stream(s, 'Boston'))\r\n", | |
| "bocount = boston.count()\r\n", | |
| "resto.append(bocount)\r\n", | |
| "chicago = sc.textFile('entree/data/chicago.txt')\r\n", | |
| "chicago = chicago.map(lambda s: s.split('\\t')).map(lambda s: stream(s, 'Chicago'))\r\n", | |
| "chcount = chicago.count()\r\n", | |
| "resto.append(chcount)\r\n", | |
| "los_angeles = sc.textFile('entree/data/los_angeles.txt')\r\n", | |
| "los_angeles = los_angeles.map(lambda s: s.split('\\t')).map(lambda s: stream(s, 'Los Angeles'))\r\n", | |
| "lacount = los_angeles.count()\r\n", | |
| "resto.append(lacount)\r\n", | |
| "new_orleans = sc.textFile('entree/data/new_orleans.txt')\r\n", | |
| "new_orleans = new_orleans.map(lambda s: s.split('\\t')).map(lambda s: stream(s, 'New Orleans'))\r\n", | |
| "nocount = new_orleans.count()\r\n", | |
| "resto.append(nocount)\r\n", | |
| "new_york = sc.textFile('entree/data/new_york.txt')\r\n", | |
| "new_york = new_york.map(lambda s: s.split('\\t')).map(lambda s: stream(s, 'New York'))\r\n", | |
| "nycount = new_york.count()\r\n", | |
| "resto.append(nycount)\r\n", | |
| "san_francisco = sc.textFile('entree/data/san_francisco.txt')\r\n", | |
| "san_francisco = san_francisco.map(lambda s: s.split('\\t')).map(lambda s: stream(s, 'San Francisco'))\r\n", | |
| "sfcount = san_francisco.count()\r\n", | |
| "resto.append(sfcount)\r\n", | |
| "washington_dc = sc.textFile('entree/data/washington_dc.txt')\r\n", | |
| "washington_dc = washington_dc.map(lambda s: s.split('\\t')).map(lambda s: stream(s, 'Washington D.C.'))\r\n", | |
| "wacount = washington_dc.count()\r\n", | |
| "resto.append(wacount)\r\n", | |
| "\r\n", | |
| "\r\n", | |
| "\r\n" | |
| ], | |
| "execution_count": null, | |
| "outputs": [] | |
| }, | |
| { | |
| "cell_type": "markdown", | |
| "metadata": { | |
| "id": "Bg4zfNl4fxmU" | |
| }, | |
| "source": [ | |
| "Here we have a visualization of the data distribution. As we can see, the data is not necessarily distributed uniformly but the numbers are fairly comparable." | |
| ] | |
| }, | |
| { | |
| "cell_type": "code", | |
| "metadata": { | |
| "colab": { | |
| "base_uri": "https://localhost:8080/", | |
| "height": 352 | |
| }, | |
| "id": "AsmZP_l8fCJI", | |
| "outputId": "6a6a3a5f-f25c-4436-a85f-1699e573eb4d" | |
| }, | |
| "source": [ | |
| "fig = plt.figure()\n", | |
| "ax = fig.add_axes([0,0,2,1])\n", | |
| "cities = ['Atlanta', 'Boston', 'Chicago', 'Los Angeles', 'New Orleans', 'New York', 'San Francisco', 'Washington D.C.']\n", | |
| "ax.bar(cities,resto)\n", | |
| "ax.set_title('Number of restaurants in the given locations')\n", | |
| "\n", | |
| "plt.show()\n", | |
| "\n", | |
| "print(\"Total Count: \" + atcount+bocount+chcount+lacount+nocount+nycount+sfcount+wacount)\n" | |
| ], | |
| "execution_count": null, | |
| "outputs": [ | |
| { | |
| "output_type": "display_data", | |
| "data": { | |
| "image/png": "\n", | |
| "text/plain": [ | |
| "<Figure size 432x288 with 1 Axes>" | |
| ] | |
| }, | |
| "metadata": { | |
| "tags": [], | |
| "needs_background": "light" | |
| } | |
| } | |
| ] | |
| }, | |
| { | |
| "cell_type": "markdown", | |
| "metadata": { | |
| "id": "yHd_7unO2Njx" | |
| }, | |
| "source": [ | |
| "We combine the data by taking a union of all RDDs. \r\n", | |
| "We filter the restaurants that have 'CLOSED' in their names. This enables us to only provide results of the available restaurants." | |
| ] | |
| }, | |
| { | |
| "cell_type": "code", | |
| "metadata": { | |
| "id": "C9NpaJEyXWMj", | |
| "colab": { | |
| "base_uri": "https://localhost:8080/" | |
| }, | |
| "outputId": "f9c7e213-24b4-422e-a4de-48cd8b36bddb" | |
| }, | |
| "source": [ | |
| "final = sc.union([atlanta,boston, chicago, los_angeles, new_orleans, new_york, san_francisco, washington_dc]).filter(lambda s: \"(\" not in s[0]).sortBy(lambda x: x[0].strip())\n", | |
| "\n", | |
| "final.take(5)\n" | |
| ], | |
| "execution_count": null, | |
| "outputs": [ | |
| { | |
| "output_type": "stream", | |
| "text": [ | |
| "4160\n" | |
| ], | |
| "name": "stdout" | |
| }, | |
| { | |
| "output_type": "execute_result", | |
| "data": { | |
| "text/plain": [ | |
| "[('101', ['125', '075', '205', '053', '166'], 'New York', '0001096', 0),\n", | |
| " ('101 Seafood',\n", | |
| " ['125', '212', '075', '205', '053', '166'],\n", | |
| " 'New York',\n", | |
| " '0000163',\n", | |
| " 1),\n", | |
| " ('103 NYC', ['184', '075', '204', '052', '166'], 'New York', '0000542', 0),\n", | |
| " ('103 WEST',\n", | |
| " ['080',\n", | |
| " '253',\n", | |
| " '099',\n", | |
| " '200',\n", | |
| " '245',\n", | |
| " '196',\n", | |
| " '191',\n", | |
| " '192',\n", | |
| " '146',\n", | |
| " '024',\n", | |
| " '045',\n", | |
| " '076',\n", | |
| " '206',\n", | |
| " '054',\n", | |
| " '167'],\n", | |
| " 'Atlanta',\n", | |
| " '0000230',\n", | |
| " 1),\n", | |
| " ('107 West', ['075', '204', '052', '165'], 'New York', '0000010', 0)]" | |
| ] | |
| }, | |
| "metadata": { | |
| "tags": [] | |
| }, | |
| "execution_count": 54 | |
| } | |
| ] | |
| }, | |
| { | |
| "cell_type": "markdown", | |
| "metadata": { | |
| "id": "XQ2yXwJognZI" | |
| }, | |
| "source": [ | |
| "A list of all features being considered by the data. There is room for improvement here." | |
| ] | |
| }, | |
| { | |
| "cell_type": "code", | |
| "metadata": { | |
| "colab": { | |
| "base_uri": "https://localhost:8080/" | |
| }, | |
| "id": "n15SdZmGXCrP", | |
| "outputId": "58fd6ebe-c59d-43a0-be67-4612d2623386" | |
| }, | |
| "source": [ | |
| "features = sc.textFile('entree/data/features.txt').map(lambda x: x.split(\"\\t\"))\n", | |
| "features.take(5)\n", | |
| "# features.collect()" | |
| ], | |
| "execution_count": null, | |
| "outputs": [ | |
| { | |
| "output_type": "execute_result", | |
| "data": { | |
| "text/plain": [ | |
| "[['000', 'A'],\n", | |
| " ['001', 'Authentic'],\n", | |
| " ['002', 'Afghanistan'],\n", | |
| " ['003', 'African'],\n", | |
| " ['004', 'After Hours Dining']]" | |
| ] | |
| }, | |
| "metadata": { | |
| "tags": [] | |
| }, | |
| "execution_count": 56 | |
| } | |
| ] | |
| }, | |
| { | |
| "cell_type": "code", | |
| "metadata": { | |
| "id": "s4mRX7LtbgBL", | |
| "colab": { | |
| "base_uri": "https://localhost:8080/" | |
| }, | |
| "outputId": "414bfaf9-2786-4f0f-882b-09cf889de2f1" | |
| }, | |
| "source": [ | |
| "print(\"Input list of features wanted\")\n", | |
| "lis = input().split()\n", | |
| "print(\"\\n1. Atlanta\\n2. Boston\\n3. Chicago\\n4. Los Angeles\\n5. New Orleans\\n6. New York\\n7. San Francisco\\n8. Washington D.C.\\n\")\n", | |
| "location = int(input(\"Enter a number for location: \")) - 1\n", | |
| "\n", | |
| "def score(x,lis):\n", | |
| " return len( [value for value in lis if value in x[1]] )\n", | |
| " \n", | |
| "\n", | |
| "\n", | |
| "temp = final.map(lambda x: (x[0], score(x,lis), x[2], x[4])).filter(lambda x: x[1] !=0 and x[2]==cities[location]).sortBy(lambda x: (x[1], x[3]),ascending= False )#.sortBy(lambda x:x[-1], ascending = False)\n", | |
| "print(\"Hotels with at least one feature matching: \" + str( temp.count()))\n", | |
| "temp.take(10)\n" | |
| ], | |
| "execution_count": null, | |
| "outputs": [ | |
| { | |
| "output_type": "stream", | |
| "text": [ | |
| "Input list of features wanted\n", | |
| "164 053 253\n", | |
| "\n", | |
| "1. Atlanta\n", | |
| "2. Boston\n", | |
| "3. Chicago\n", | |
| "4. Los Angeles\n", | |
| "5. New Orleans\n", | |
| "6. New York\n", | |
| "7. San Francisco\n", | |
| "8. Washington D.C.\n", | |
| "\n", | |
| "Enter a number for location: 1\n", | |
| "Hotels with at least one feature matching: 224\n" | |
| ], | |
| "name": "stdout" | |
| }, | |
| { | |
| "output_type": "execute_result", | |
| "data": { | |
| "text/plain": [ | |
| "[(\"Houston's\", 3, 'Atlanta', 5),\n", | |
| " (\"Babette's Cafe\", 3, 'Atlanta', 2),\n", | |
| " ('Chile Tree', 3, 'Atlanta', 1),\n", | |
| " ('Haveli', 3, 'Atlanta', 1),\n", | |
| " ('Longhorn Steaks', 3, 'Atlanta', 1),\n", | |
| " ('Outback Steaks', 3, 'Atlanta', 1),\n", | |
| " ('SURIN OF THAILAND', 3, 'Atlanta', 1),\n", | |
| " (\"Altobeli's Fine Italian Cuisine\", 3, 'Atlanta', 0),\n", | |
| " ('August Moon', 3, 'Atlanta', 0),\n", | |
| " ('Azio', 3, 'Atlanta', 0)]" | |
| ] | |
| }, | |
| "metadata": { | |
| "tags": [] | |
| }, | |
| "execution_count": 58 | |
| } | |
| ] | |
| }, | |
| { | |
| "cell_type": "markdown", | |
| "metadata": { | |
| "id": "6bYIzMk08Zid" | |
| }, | |
| "source": [ | |
| "The output displays the names of the restaurants, the number of matching features, their locations and their popularity score." | |
| ] | |
| } | |
| ] | |
| } |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment