Last active
December 2, 2021 21:53
-
-
Save zzggbb/ef4505af70cc80e44d1f928c32b266c8 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| # standard imports | |
| import logging | |
| # 3rd party imports | |
| import requests | |
| logging.basicConfig( | |
| level=logging.INFO, | |
| format='%(asctime)s.%(msecs)03d %(message)s', | |
| datefmt='%Y-%m-%d %H:%M:%S', | |
| ) | |
| LOGGER = logging.getLogger() | |
| API_DOMAIN = 'webapi.depop.com' | |
| API_PREFIX = f'https://{API_DOMAIN}/api' | |
| class DepopAPI: | |
| def __init__(self, auth_token): | |
| self.auth_token = auth_token | |
| self.auth_header = {'authorization': f'Bearer {self.auth_token}'} | |
| @staticmethod | |
| def get_user_id(username): | |
| """ | |
| Converts the specified username into an integer ID | |
| :username: str: The specified username | |
| :returns: int: Integer ID of specified user | |
| :returns: None: If lookup of the specified user fails | |
| """ | |
| response = requests.get( | |
| f'{API_PREFIX}/v1/shop/{username}' | |
| ) | |
| if response.status_code == 200: | |
| user_id = response.json()['id'] | |
| LOGGER.info(f"successfully found user '{username}' with ID '{user_id}'") | |
| return user_id | |
| else: | |
| LOGGER.info(f"failed to find user '{username}'") | |
| return None | |
| @staticmethod | |
| def check_username(username): | |
| return DepopAPI.get_user_id(username) is not None | |
| def get_item_data(self, item_slug): | |
| """ | |
| Convert the specified item-slug (eg. 'username-some-item-name') | |
| into a dictionary containing information about the specified item. | |
| :item_slug: str: The specified item | |
| :returns: dict: Dictionary containing information about item slug | |
| :returns: None: If item slug is invalid | |
| """ | |
| response = requests.get( | |
| f'{API_PREFIX}/v2/products/{item_slug}', | |
| headers=self.auth_header | |
| ) | |
| if response.status_code == 200: | |
| LOGGER.info(f"successfully found item '{item_slug}'") | |
| return response.json() | |
| else: | |
| LOGGER.info(f"failed to find item '{item_slug}'") | |
| return None | |
| def get_item_page(self, index, user_id, limit=200, offset_id=None): | |
| """ | |
| Fetches a page from the specified user. Use the offset_id to fetch | |
| sequential pages. | |
| :index: int: Index of page relative to other pages; only used for logging purposes | |
| :user_id: int: ID of the specified user | |
| :limit: int: Maximum number of items to return. [default=24] | |
| :offset_id: str|None: Returned by calls to this function in the 'meta->last_offset_id' field. | |
| Used to get sequential pages. See `get_unsold_items` for example | |
| :returns: dict: Page of item data | |
| :returns: None: If the page request fails | |
| """ | |
| params = {'limit': limit} | |
| if offset_id: | |
| params |= {'offset_id': offset_id} | |
| response = requests.get( | |
| f'{API_PREFIX}/v1/shop/{user_id}/products/', | |
| headers=self.auth_header, | |
| params=params | |
| ) | |
| if response.status_code == 200: | |
| item_page = response.json() | |
| n_loaded = len(item_page['products']) | |
| LOGGER.info(f"successfully loaded {n_loaded} items from page {index} of user {user_id}") | |
| return item_page | |
| else: | |
| LOGGER.info(f"failed to load product page of user {user_id}") | |
| return None | |
| def get_unsold_items(self, user_id): | |
| """ | |
| Fetches the item-slugs of all unsold items for the specified user | |
| :user_id: int: ID of the specified user | |
| :returns: [str]: List of item-slugs | |
| """ | |
| unsold_items = [] | |
| n_total_items = 0 | |
| n_sold_items = 0 | |
| page_index = 1 | |
| LOGGER.info("gathering list of unsold items...") | |
| item_page = self.get_item_page(page_index, user_id) | |
| if not item_page: | |
| return None | |
| while True: | |
| n_total_items += len(item_page['products']) | |
| for product in item_page['products']: | |
| if product['sold']: | |
| n_sold_items += 1 | |
| else: | |
| unsold_items.append(product['slug']) | |
| offset_id = item_page['meta']['last_offset_id'] | |
| if item_page['meta']['end']: | |
| break | |
| page_index += 1 | |
| item_page = self.get_item_page(page_index, user_id, offset_id=offset_id) | |
| LOGGER.info("finished gathering list of unsold items!") | |
| LOGGER.info(f'total items: {n_total_items}') | |
| LOGGER.info(f'total unsold items: {len(unsold_items)}') | |
| LOGGER.info(f'total sold items: {n_sold_items}') | |
| return unsold_items | |
| def refresh_item(self, item_slug): | |
| """ | |
| Refresh the specified item. Assumes user has authority to do so. | |
| :item_slug: str: The specified item | |
| :raises: ValueError: If item slug is invalid | |
| :raises: RuntimeError: If request fails | |
| """ | |
| item_data = self.get_item_data(item_slug) | |
| if not item_data: | |
| raise ValueError(f"couldn't look up item '{item_slug}'") | |
| response = requests.put( | |
| f'{API_PREFIX}/v1/products/{item_slug}', | |
| headers=self.auth_header, | |
| json={ | |
| 'variants': item_data['variants'], | |
| "address": item_data['address'], | |
| 'countryCode': item_data['countryCode'], | |
| } | |
| ) | |
| if response.status_code != 201: | |
| raise RuntimeError(f"depop returned error code {response.status_code}") | |
| LOGGER.info(f"successfully refreshed item '{item_slug}'") |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| # standard imports | |
| import time | |
| import logging | |
| # local imports | |
| from depop_api import DepopAPI, LOGGER | |
| REFRESH_INTERVAL = 0.5 | |
| AUTH_TOKEN_PROMPT = """ | |
| Please provide a depop authentication token. To obtain this token: | |
| 1. Open chrome://settings/cookies/detail?site=depop.com | |
| 2. Expand the section labeled "access_token" | |
| 3. Copy the 40-character key within the "Content" section | |
| Please paste the 40 character key and then press enter: | |
| > """ | |
| USERNAME_PROMPT = """ | |
| Please provide your depop username: | |
| > """ | |
| MODE_PROMPT = """ | |
| What would you like to do? Press one of the following keys: | |
| q - quit | |
| r - refresh unsold items within a specified range | |
| a - refresh all unsold items | |
| s - refresh a single item | |
| p - print full list of unsold items | |
| > """ | |
| def main(): | |
| username = input(USERNAME_PROMPT) | |
| if not DepopAPI.check_username(username): | |
| return | |
| auth_token = input(AUTH_TOKEN_PROMPT) | |
| print() | |
| API = DepopAPI(auth_token) | |
| user_id = DepopAPI.get_user_id(username) | |
| unsold_items = API.get_unsold_items(user_id) | |
| while True: | |
| mode = input(MODE_PROMPT).lower() | |
| if mode == 'q': | |
| break | |
| elif mode == 'r': | |
| while True: | |
| try: | |
| index_a = unsold_items.index(input('first item slug: ')) | |
| break | |
| except ValueError: | |
| continue | |
| while True: | |
| try: | |
| index_b = unsold_items.index(input('last item slug: ')) + 1 | |
| break | |
| except ValueError: | |
| continue | |
| elif mode == 'a': | |
| index_a = 0 | |
| index_b = len(unsold_items) | |
| elif mode == 's': | |
| while True: | |
| try: | |
| index_a = unsold_items.index(input('item slug: ')) | |
| index_b = index_a + 1 | |
| break | |
| except ValueError: | |
| continue | |
| elif mode == 'p': | |
| for i, name in enumerate(unsold_items): | |
| print(i, name) | |
| continue | |
| else: | |
| print(f"unrecognized mode '{mode}', try again") | |
| continue | |
| for i in range(index_a, index_b)[::-1]: | |
| item_slug = unsold_items[i] | |
| LOGGER.info(f'refreshing {item_slug}, then sleeping {REFRESH_INTERVAL}') | |
| API.refresh_item(item_slug) | |
| time.sleep(REFRESH_INTERVAL) | |
| if __name__ == '__main__': | |
| main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment