Skip to content

Instantly share code, notes, and snippets.

@zzggbb
Last active December 2, 2021 21:53
Show Gist options
  • Select an option

  • Save zzggbb/ef4505af70cc80e44d1f928c32b266c8 to your computer and use it in GitHub Desktop.

Select an option

Save zzggbb/ef4505af70cc80e44d1f928c32b266c8 to your computer and use it in GitHub Desktop.
# standard imports
import logging
# 3rd party imports
import requests
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s.%(msecs)03d %(message)s',
datefmt='%Y-%m-%d %H:%M:%S',
)
LOGGER = logging.getLogger()
API_DOMAIN = 'webapi.depop.com'
API_PREFIX = f'https://{API_DOMAIN}/api'
class DepopAPI:
def __init__(self, auth_token):
self.auth_token = auth_token
self.auth_header = {'authorization': f'Bearer {self.auth_token}'}
@staticmethod
def get_user_id(username):
"""
Converts the specified username into an integer ID
:username: str: The specified username
:returns: int: Integer ID of specified user
:returns: None: If lookup of the specified user fails
"""
response = requests.get(
f'{API_PREFIX}/v1/shop/{username}'
)
if response.status_code == 200:
user_id = response.json()['id']
LOGGER.info(f"successfully found user '{username}' with ID '{user_id}'")
return user_id
else:
LOGGER.info(f"failed to find user '{username}'")
return None
@staticmethod
def check_username(username):
return DepopAPI.get_user_id(username) is not None
def get_item_data(self, item_slug):
"""
Convert the specified item-slug (eg. 'username-some-item-name')
into a dictionary containing information about the specified item.
:item_slug: str: The specified item
:returns: dict: Dictionary containing information about item slug
:returns: None: If item slug is invalid
"""
response = requests.get(
f'{API_PREFIX}/v2/products/{item_slug}',
headers=self.auth_header
)
if response.status_code == 200:
LOGGER.info(f"successfully found item '{item_slug}'")
return response.json()
else:
LOGGER.info(f"failed to find item '{item_slug}'")
return None
def get_item_page(self, index, user_id, limit=200, offset_id=None):
"""
Fetches a page from the specified user. Use the offset_id to fetch
sequential pages.
:index: int: Index of page relative to other pages; only used for logging purposes
:user_id: int: ID of the specified user
:limit: int: Maximum number of items to return. [default=24]
:offset_id: str|None: Returned by calls to this function in the 'meta->last_offset_id' field.
Used to get sequential pages. See `get_unsold_items` for example
:returns: dict: Page of item data
:returns: None: If the page request fails
"""
params = {'limit': limit}
if offset_id:
params |= {'offset_id': offset_id}
response = requests.get(
f'{API_PREFIX}/v1/shop/{user_id}/products/',
headers=self.auth_header,
params=params
)
if response.status_code == 200:
item_page = response.json()
n_loaded = len(item_page['products'])
LOGGER.info(f"successfully loaded {n_loaded} items from page {index} of user {user_id}")
return item_page
else:
LOGGER.info(f"failed to load product page of user {user_id}")
return None
def get_unsold_items(self, user_id):
"""
Fetches the item-slugs of all unsold items for the specified user
:user_id: int: ID of the specified user
:returns: [str]: List of item-slugs
"""
unsold_items = []
n_total_items = 0
n_sold_items = 0
page_index = 1
LOGGER.info("gathering list of unsold items...")
item_page = self.get_item_page(page_index, user_id)
if not item_page:
return None
while True:
n_total_items += len(item_page['products'])
for product in item_page['products']:
if product['sold']:
n_sold_items += 1
else:
unsold_items.append(product['slug'])
offset_id = item_page['meta']['last_offset_id']
if item_page['meta']['end']:
break
page_index += 1
item_page = self.get_item_page(page_index, user_id, offset_id=offset_id)
LOGGER.info("finished gathering list of unsold items!")
LOGGER.info(f'total items: {n_total_items}')
LOGGER.info(f'total unsold items: {len(unsold_items)}')
LOGGER.info(f'total sold items: {n_sold_items}')
return unsold_items
def refresh_item(self, item_slug):
"""
Refresh the specified item. Assumes user has authority to do so.
:item_slug: str: The specified item
:raises: ValueError: If item slug is invalid
:raises: RuntimeError: If request fails
"""
item_data = self.get_item_data(item_slug)
if not item_data:
raise ValueError(f"couldn't look up item '{item_slug}'")
response = requests.put(
f'{API_PREFIX}/v1/products/{item_slug}',
headers=self.auth_header,
json={
'variants': item_data['variants'],
"address": item_data['address'],
'countryCode': item_data['countryCode'],
}
)
if response.status_code != 201:
raise RuntimeError(f"depop returned error code {response.status_code}")
LOGGER.info(f"successfully refreshed item '{item_slug}'")
# standard imports
import time
import logging
# local imports
from depop_api import DepopAPI, LOGGER
REFRESH_INTERVAL = 0.5
AUTH_TOKEN_PROMPT = """
Please provide a depop authentication token. To obtain this token:
1. Open chrome://settings/cookies/detail?site=depop.com
2. Expand the section labeled "access_token"
3. Copy the 40-character key within the "Content" section
Please paste the 40 character key and then press enter:
> """
USERNAME_PROMPT = """
Please provide your depop username:
> """
MODE_PROMPT = """
What would you like to do? Press one of the following keys:
q - quit
r - refresh unsold items within a specified range
a - refresh all unsold items
s - refresh a single item
p - print full list of unsold items
> """
def main():
username = input(USERNAME_PROMPT)
if not DepopAPI.check_username(username):
return
auth_token = input(AUTH_TOKEN_PROMPT)
print()
API = DepopAPI(auth_token)
user_id = DepopAPI.get_user_id(username)
unsold_items = API.get_unsold_items(user_id)
while True:
mode = input(MODE_PROMPT).lower()
if mode == 'q':
break
elif mode == 'r':
while True:
try:
index_a = unsold_items.index(input('first item slug: '))
break
except ValueError:
continue
while True:
try:
index_b = unsold_items.index(input('last item slug: ')) + 1
break
except ValueError:
continue
elif mode == 'a':
index_a = 0
index_b = len(unsold_items)
elif mode == 's':
while True:
try:
index_a = unsold_items.index(input('item slug: '))
index_b = index_a + 1
break
except ValueError:
continue
elif mode == 'p':
for i, name in enumerate(unsold_items):
print(i, name)
continue
else:
print(f"unrecognized mode '{mode}', try again")
continue
for i in range(index_a, index_b)[::-1]:
item_slug = unsold_items[i]
LOGGER.info(f'refreshing {item_slug}, then sleeping {REFRESH_INTERVAL}')
API.refresh_item(item_slug)
time.sleep(REFRESH_INTERVAL)
if __name__ == '__main__':
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment