Skip to content

Instantly share code, notes, and snippets.

@HStep20
Last active December 31, 2025 19:12
Show Gist options
  • Select an option

  • Save HStep20/d6c5350bbcc12e40b1c9cdf7d9178c16 to your computer and use it in GitHub Desktop.

Select an option

Save HStep20/d6c5350bbcc12e40b1c9cdf7d9178c16 to your computer and use it in GitHub Desktop.
This script will read your Overseer data and create/apply user tags to all of your sonarr/radarr instances
"""
This script will read your Overseer data and create/apply user tags to all of your sonarr/radarr instances, then create a filter in each connected -arr application for the users you specify.
It is forward compatible with the future User Tagging feature of overseer, and formats the tag in the same 'id - lowercase username' pattern Overseer will
It only uses built in python libraries, so you should be able to download and run without much hassle.
NOTE: YOU ARE REQUIRED TO USE IP:PORT CONNECTIONS FOR YOUR SONARR/RADARR INSTANCES INSIDE OF OVERSEERR
This will NOT utilize docker-compose style hostnames at the moment, and I don't use them personally, so I don't see myself adding them
Steps to use:
1. Add your Overseer API key
2. Add your Overseer Internal URL (might work with external domain url, but I didn't test)
2.5 Edit the Default values for number of users/requests
3. Press Play and wait
"""
import requests
from requests import HTTPError
from typing import Any
import re
import logging
from requests.models import Response
from urllib3.util import Retry
OVERSEER_API_KEY = "YOURAPIKEY"
OVERSEER_URL = "http://YOURIP:PORT/api/v1"
# I didn't want to figure out Pagination, so I set defaults to what I felt would be the maximum someone could have.
# If you have more than 100 users, or a user has more than 1000 requests, you'll need to update these values to reflect that
NUM_USERS_TO_PULL = 100
NUM_MAX_USER_REQUESTS = 1000
def handle_response(response: Response, *args: Any, **kwargs: Any) -> None:
"""Handles the Response and throws an error if there is an error with the request
Args:
response (Response): The response of the call being made
Raises:
requests.exceptions.HTTPError: Error raised by API
"""
try:
response.raise_for_status()
except requests.exceptions.HTTPError as e:
logger.error(
f"{response.status_code} - {response.request.url} - {response.text}"
)
raise requests.exceptions.HTTPError(f"{str(e)}: {response.text}") from e
def make_session(api_key: str) -> requests.Session:
"""Creates a Requests Session with headers and logging set up
Args:
api_key (str): API key of service being accessed with the session
Returns:
requests.Session: Requests session with overhead added
"""
session = requests.Session()
session.hooks["response"] = [handle_response]
adapter = requests.adapters.HTTPAdapter(
max_retries=Retry(
total=10,
backoff_factor=5,
status_forcelist=[429, 500],
allowed_methods=["GET", "POST", "PUT"],
)
)
session.mount("https://", adapter)
session.headers.update({"X-Api-Key": api_key})
return session
def map_arr_api_keys() -> dict[str, str]:
"""Gets all sonarr/radarr servers + api keys from Overseer and returns a map of them
Returns:
dict[str,str]: A Map of -arr server_urls : api_keys
"""
requests_session = make_session(api_key=OVERSEER_API_KEY)
sonarr_servers = requests_session.get(url=OVERSEER_URL + "/settings/sonarr").json()
radarr_servers = requests_session.get(url=OVERSEER_URL + "/settings/radarr").json()
all_servers = sonarr_servers + radarr_servers
api_key_map = {}
for server in all_servers:
api_key_map[server["hostname"] + ":" + str(server["port"])] = server["apiKey"]
return api_key_map
def tag_requests_from_user(
arr_api_key_map: dict[str, str],
user_requests: dict[str, Any],
user_tag_string: str,
) -> None:
"""Tags all the requests for each user
Args:
arr_api_key_map (dict[str, str]): Map of the server URL and API key for each service connected to overseer
user_requests (dict[str, Any]): list of user requests
user_tag_string (str): Formatted user tag name. Follows "id - lowercase username" format
"""
for media_request in user_requests:
try:
tag_request_from_user(
media_request=media_request,
arr_api_key_map=arr_api_key_map,
user_tag_string=user_tag_string,
)
except ValueError as e:
logger.error(e)
def tag_request_from_user(
media_request: dict[str, Any], arr_api_key_map: dict[str, str], user_tag_string: str
):
"""Reads request data from Overseer, and finds the media within Sonarr/Radarr, and applies a user tag to that item in its respective server
Args:
media_request (dict[str, Any]): The Media Request metadata provided by Overseerr API
arr_api_key_map (dict[str, str]): Map of all servers connected to Overseerr, and their API keys
user_tag_string (str): Formatted user tag name. Follows "id - lowercase username" format
"""
if media_request["status"] == 4:
raise ValueError(
f"{arr_object_data['title']} has ERROR request status - Skipping"
)
if "serviceUrl" not in media_request["media"]:
raise ValueError(
f"{arr_object_data['title']} has no ServiceURL associated with it - Skipping"
)
# Unfortunately the provided service URL doesn't include the /v3/api slug, so we have to build our own
non_api_url = media_request["media"]["serviceUrl"]
ip_port = re.findall(r"[0-9]+(?:\.[0-9]+){3}:[0-9]+", non_api_url)[0]
base_url = "http://" + ip_port
service_url = base_url + "/api/v3"
if media_request["type"] == "tv":
request_path = "/series"
request_params = {"tvdbId": media_request["media"]["tvdbId"]}
else:
request_path = "/movie"
request_params = {"tmdbId": media_request["media"]["tmdbId"]}
requests_session = make_session(api_key=arr_api_key_map[ip_port])
arr_object_data = requests_session.get(
url=service_url + request_path,
params=request_params,
).json()
if len(arr_object_data) == 0:
raise ValueError(
f"{base_url} - {media_request['media']['externalServiceSlug']} is in the user's request list, but not found on server - Skipping"
)
arr_object_data = arr_object_data[0]
tag_data = requests_session.get(
url=service_url + "/tag",
).json()
# Because each request has its own server associated with it, we should check for the tag each time.
# The alternate way would be to group by server, then do one check per server, but we don't need to worry about api calls here
tag_id = get_tag_id(tag_data, user_tag_string)
if tag_id == -1:
logger.warning(f'{base_url} - Tag "{user_tag_string}" not found in server.')
tag_creation_response = create_user_tag(
requests_session=requests_session,
service_url=service_url,
user_tag_string=user_tag_string,
)
if tag_creation_response.ok:
tag_id = tag_creation_response.json()["id"]
logger.info(f"{base_url} - Created tag {user_tag_string} with id: {tag_id}")
else:
raise HTTPError(f'{base_url} - Failed to create tag "{user_tag_string}"')
if tag_id in arr_object_data["tags"]:
logger.info(
f"{base_url} - {user_tag_string} - {arr_object_data['title']} already has user tag"
)
else:
tag_addition_response = tag_media_with_user_data(
requests_session=requests_session,
service_url=service_url,
request_path=request_path,
request_params=request_params,
arr_object_data=arr_object_data,
tag_id=tag_id,
)
if tag_addition_response.ok:
logger.info(
f"{base_url} - {user_tag_string} - Tagged {arr_object_data['title']}"
)
else:
raise HTTPError(tag_addition_response.text)
def get_tag_id(tag_data: dict[str, Any], user_tag_string: str) -> int:
"""Gets the tagId of the user's tag from the respective server.
Args:
tag_data (dict[str, Any]): The Tag Data from the -arr api
user_tag_string (str): The tag name for the current overseer user
Returns:
int: The tagId of the respective -arr instance. Returns -1 if it doesn't exist
"""
for tag in tag_data:
if tag["label"] == user_tag_string:
return tag["id"]
return -1
def create_user_tag(
requests_session: requests.Session,
service_url: str,
user_tag_string: str,
) -> dict[str, Any]:
"""Create a user tag in Sonarr/Radarr
Args:
requests_session (requests.Session): Requests session for app you are creating tag in
service_url (str): the URL of the app you are creating the tag in
user_tag_string (str): tag string, which will be the tag name
Returns:
dict[str, Any]: Tag creation return data, including new ID
"""
return requests_session.post(
url=service_url + "/tag",
json={"label": user_tag_string},
)
def tag_media_with_user_data(
requests_session: requests.Session,
service_url: str,
request_path: str,
request_params: dict[str, Any],
arr_object_data: dict[str, Any],
tag_id: int,
) -> requests.Response:
"""Applies tag to selected media object
Args:
requests_session (requests.Session): Requests session for app you are apply tag in
service_url (str): URL of app
request_path (str): Slug to interact with media object
request_params (dict[str, Any]): Extra request params to dictate the media object
arr_object_data (dict[str, Any]): Media Object metadata from Sonarr/Radarr
tag_id (int): Tag ID to apply to arr_object_data
Returns:
requests.Response: Response from tag call
"""
if tag_id not in arr_object_data["tags"]:
arr_object_data["tags"].append(tag_id)
return requests_session.put(
url=service_url + request_path,
params=request_params,
json=arr_object_data,
)
def create_tag_filter_in_application(
arr_api_key_map: dict[str, str], user_tag_string: str
):
"""Create a custom filter in each server for the user tag
Args:
arr_api_key_map (dict[str, str]): Map of -arr URLs:API Keys
user_tag_string (str): Tag Name for the current user
"""
for server in arr_api_key_map:
base_url = "http://" + server + "/api/v3"
requests_session = make_session(api_key=arr_api_key_map[server])
current_filters = requests_session.get(url=base_url + "/customfilter").json()
current_filter_labels = [x["label"] for x in current_filters]
if user_tag_string not in current_filter_labels:
tag_info = requests_session.get(url=base_url + "/tag").json()
tag_id = get_tag_id(tag_data=tag_info, user_tag_string=user_tag_string)
server_info = requests_session.get(url=base_url + "/system/status").json()
if server_info["appName"].lower() == "sonarr":
filter_type = "series"
else:
filter_type = "movieIndex"
sonarr_filter = {
"type": filter_type,
"label": user_tag_string,
"filters": [{"key": "tags", "value": [tag_id], "type": "contains"}],
}
requests_session.post(url=base_url + "/customfilter", json=sonarr_filter)
logger.info(f"http://{server} - {user_tag_string} - Created Filter")
else:
logger.warning(
f"http://{server} - {user_tag_string} - Filter Already Exists - Skipping"
)
def main():
arr_api_key_map = map_arr_api_keys()
overseer_requests_session = make_session(api_key=OVERSEER_API_KEY)
all_users = overseer_requests_session.get(
url=OVERSEER_URL + "/user", params={"take": NUM_USERS_TO_PULL}
).json()["results"]
for user in all_users:
user_data = overseer_requests_session.get(
url=OVERSEER_URL + f"/user/{user['id']}"
).json()
# My users don't have a ton of requests, so I didn't want to bother figuring out pagination.
# This should just pull all requests (unless you have users who request A TON)
user_requests = overseer_requests_session.get(
url=OVERSEER_URL + f"/user/{user['id']}/requests",
params={"take": NUM_MAX_USER_REQUESTS},
).json()["results"]
user_tag_string = (
str(user_data["id"]) + " - " + user_data["displayName"].lower()
)
separator = "\n==============================================\n"
print(
separator
+ f" Tagging {user_data['displayName']}'s Media"
+ separator
)
if len(user_requests) > 0:
tag_requests_from_user(arr_api_key_map, user_requests, user_tag_string)
create_tag_filter_in_application(arr_api_key_map, user_tag_string)
else:
logger.warning(f"{user['displayName']} has no requests - Skipping")
if __name__ == "__main__":
# create logger with 'spam_application'
logger = logging.getLogger("overseer_tagger")
logger.setLevel(logging.INFO)
fh = logging.StreamHandler()
fh.setLevel(logging.DEBUG)
ch = logging.StreamHandler()
ch.setLevel(logging.ERROR)
# create formatter and add it to the handlers
formatter = logging.Formatter(
"%(asctime)s - %(name)s - %(levelname)s - %(message)s"
)
fh.setFormatter(formatter)
ch.setFormatter(formatter)
# add the handlers to the logger
logger.addHandler(fh)
logger.addHandler(ch)
main()
@stagenethome
Copy link

This script is amazing. I managed to get it working, but for anyone else finding this I'll paste the full code below.

--

"""

This script will read your Overseer data and create/apply user tags to all of your sonarr/radarr instances, then create a filter in each connected -arr application for the users you specify. 
It is forward compatible with the future User Tagging feature of overseer, and formats the tag in the same 'id - lowercase username' pattern Overseer will
It only uses built in python libraries, so you should be able to download and run without much hassle.

NOTE:   YOU ARE REQUIRED TO USE IP:PORT CONNECTIONS FOR YOUR SONARR/RADARR INSTANCES INSIDE OF OVERSEERR
        This will NOT utilize docker-compose style hostnames at the moment, and I don't use them personally, so I don't see myself adding them

Steps to use:
1. Add your Overseer API key
2. Add your Overseer Internal URL (might work with external domain url, but I didn't test)
2.5 Edit the Default values for number of users/requests
3. Press Play and wait

https://gist.github.com/HStep20/d6c5350bbcc12e40b1c9cdf7d9178c16

"""


import requests
from requests import HTTPError
from typing import Any
import re
import logging
from requests.models import Response
from urllib3.util import Retry


OVERSEER_API_KEY = "YOURAPIKEY"
OVERSEER_URL = "http://YOURIP:PORT/api/v1"


# I didn't want to figure out Pagination, so I set defaults to what I felt would be the maximum someone could have.
# If you have more than 100 users, or a user has more than 1000 requests, you'll need to update these values to reflect that
NUM_USERS_TO_PULL = 100
NUM_MAX_USER_REQUESTS = 1000


def handle_response(response: Response, *args: Any, **kwargs: Any) -> None:
    """Handles the Response and throws an error if there is an error with the request

    Args:
        response (Response): The response of the call being made

    Raises:
        requests.exceptions.HTTPError: Error raised by API
    """
    try:
        response.raise_for_status()
    except requests.exceptions.HTTPError as e:
        logger.error(
            f"{response.status_code} - {response.request.url} - {response.text}"
        )
        raise requests.exceptions.HTTPError(f"{str(e)}: {response.text}") from e


def make_session(api_key: str) -> requests.Session:
    """Creates a Requests Session with headers and logging set up

    Args:
        api_key (str): API key of service being accessed with the session

    Returns:
        requests.Session: Requests session with overhead added
    """
    session = requests.Session()
    session.hooks["response"] = [handle_response]
    adapter = requests.adapters.HTTPAdapter(
        max_retries=Retry(
            total=10,
            backoff_factor=5,
            status_forcelist=[429, 500],
            allowed_methods=["GET", "POST", "PUT"],
        )
    )

    session.mount("https://", adapter)
    session.headers.update({"X-Api-Key": api_key})
    return session


def map_arr_api_keys() -> dict[str, str]:
    """Gets all sonarr/radarr servers + api keys from Overseer and returns a map of them

    Returns:
        dict[str,str]: A Map of -arr server_urls : api_keys
    """
    requests_session = make_session(api_key=OVERSEER_API_KEY)
    sonarr_servers = requests_session.get(url=OVERSEER_URL + "/settings/sonarr").json()
    radarr_servers = requests_session.get(url=OVERSEER_URL + "/settings/radarr").json()
    all_servers = sonarr_servers + radarr_servers

    api_key_map = {}
    for server in all_servers:
        api_key_map[server["hostname"] + ":" + str(server["port"])] = server["apiKey"]
    return api_key_map


def tag_requests_from_user(
    arr_api_key_map: dict[str, str],
    user_requests: dict[str, Any],
    user_tag_string: str,
) -> None:
    """Tags all the requests for each user

    Args:
        arr_api_key_map (dict[str, str]): Map of the server URL and API key for each service connected to overseer
        user_requests (dict[str, Any]): list of user requests
        user_tag_string (str): Formatted user tag name. Follows "id - lowercase username" format
    """
    for media_request in user_requests:
        try:
            tag_request_from_user(
                media_request=media_request,
                arr_api_key_map=arr_api_key_map,
                user_tag_string=user_tag_string,
            )
        except ValueError as e:
            logger.error(e)


def tag_request_from_user(
    media_request: dict[str, Any], arr_api_key_map: dict[str, str], user_tag_string: str
):
    """Reads request data from Overseer, and finds the media within Sonarr/Radarr, and applies a user tag to that item in its respective server

    Args:
        media_request (dict[str, Any]): The Media Request metadata provided by Overseerr API
        arr_api_key_map (dict[str, str]): Map of all servers connected to Overseerr, and their API keys
        user_tag_string (str): Formatted user tag name. Follows "id - lowercase username" format
    """
    if media_request["status"] == 4:
        raise ValueError(
            f"{arr_object_data['title']} has ERROR request status - Skipping"
        )
    if "serviceUrl" not in media_request["media"]:
        raise ValueError(
            f"{arr_object_data['title']} has no ServiceURL associated with it - Skipping"
        )

    # Unfortunately the provided service URL doesn't include the /v3/api slug, so we have to build our own
    non_api_url = media_request["media"]["serviceUrl"]
    ip_port_matches = re.findall(r"[0-9]+(?:\.[0-9]+){3}:[0-9]+", non_api_url)
    if not ip_port_matches:
        raise ValueError(f"Service URL {non_api_url} does not contain a valid IP:PORT")
    ip_port = ip_port_matches[0]
    base_url = "http://" + ip_port
    service_url = base_url + "/api/v3"

    if media_request["type"] == "tv":
        request_path = "/series"
        request_params = {"tvdbId": media_request["media"]["tvdbId"]}
    else:
        request_path = "/movie"
        request_params = {"tmdbId": media_request["media"]["tmdbId"]}

    requests_session = make_session(api_key=arr_api_key_map[ip_port])
    arr_object_data = requests_session.get(
        url=service_url + request_path,
        params=request_params,
    ).json()
    if len(arr_object_data) == 0:
        raise ValueError(
            f"{base_url} - {media_request['media']['externalServiceSlug']} is in the user's request list, but not found on server - Skipping"
        )
    arr_object_data = arr_object_data[0]

    tag_data = requests_session.get(
        url=service_url + "/tag",
    ).json()
    # Because each request has its own server associated with it, we should check for the tag each time.
    # The alternate way would be to group by server, then do one check per server, but we don't need to worry about api calls here
    tag_id = get_tag_id(tag_data, user_tag_string)
    if tag_id == -1:
        logger.warning(f'{base_url} - Tag "{user_tag_string}" not found in server.')
        tag_creation_response = create_user_tag(
            requests_session=requests_session,
            service_url=service_url,
            user_tag_string=user_tag_string,
        )
        if tag_creation_response.ok:
            tag_id = tag_creation_response.json()["id"]
            logger.info(f"{base_url} - Created tag {user_tag_string} with id: {tag_id}")

        else:
            raise HTTPError(f'{base_url} - Failed to create tag "{user_tag_string}"')

    if tag_id in arr_object_data["tags"]:
        logger.info(
            f"{base_url} - {user_tag_string} - {arr_object_data['title']} already has user tag"
        )
    else:
        tag_addition_response = tag_media_with_user_data(
            requests_session=requests_session,
            service_url=service_url,
            request_path=request_path,
            request_params=request_params,
            arr_object_data=arr_object_data,
            tag_id=tag_id,
        )
        if tag_addition_response.ok:
            logger.info(
                f"{base_url} - {user_tag_string} - Tagged {arr_object_data['title']}"
            )
        else:
            raise HTTPError(tag_addition_response.text)


def get_tag_id(tag_data: dict[str, Any], user_tag_string: str) -> int:
    """Gets the tagId of the user's tag from the respective server.

    Args:
        tag_data (dict[str, Any]): The Tag Data from the -arr api
        user_tag_string (str): The tag name for the current overseer user

    Returns:
        int: The tagId of the respective -arr instance. Returns -1 if it doesn't exist
    """
    for tag in tag_data:
        if tag["label"] == user_tag_string:
            return tag["id"]
    return -1


def create_user_tag(
    requests_session: requests.Session,
    service_url: str,
    user_tag_string: str,
) -> dict[str, Any]:
    """Create a user tag in Sonarr/Radarr

    Args:
        requests_session (requests.Session): Requests session for app you are creating tag in
        service_url (str): the URL of the app you are creating the tag in
        user_tag_string (str): tag string, which will be the tag name

    Returns:
        dict[str, Any]: Tag creation return data, including new ID
    """
    return requests_session.post(
        url=service_url + "/tag",
        json={"label": user_tag_string},
    )


def tag_media_with_user_data(
    requests_session: requests.Session,
    service_url: str,
    request_path: str,
    request_params: dict[str, Any],
    arr_object_data: dict[str, Any],
    tag_id: int,
) -> requests.Response:
    """Applies tag to selected media object

    Args:
        requests_session (requests.Session): Requests session for app you are apply tag in
        service_url (str): URL of app
        request_path (str): Slug to interact with media object
        request_params (dict[str, Any]): Extra request params to dictate the media object
        arr_object_data (dict[str, Any]): Media Object metadata from Sonarr/Radarr
        tag_id (int): Tag ID to apply to arr_object_data

    Returns:
        requests.Response: Response from tag call
    """
    if tag_id not in arr_object_data["tags"]:
        arr_object_data["tags"].append(tag_id)

    return requests_session.put(
        url=service_url + request_path,
        params=request_params,
        json=arr_object_data,
    )


def create_tag_filter_in_application(
    arr_api_key_map: dict[str, str], user_tag_string: str
):
    """Create a custom filter in each server for the user tag

    Args:
        arr_api_key_map (dict[str, str]): Map of -arr URLs:API Keys
        user_tag_string (str): Tag Name for the current user
    """
    for server in arr_api_key_map:
        base_url = "http://" + server + "/api/v3"
        requests_session = make_session(api_key=arr_api_key_map[server])

        current_filters = requests_session.get(url=base_url + "/customfilter").json()
        current_filter_labels = [x["label"] for x in current_filters]
        if user_tag_string not in current_filter_labels:
            tag_info = requests_session.get(url=base_url + "/tag").json()
            tag_id = get_tag_id(tag_data=tag_info, user_tag_string=user_tag_string)
            server_info = requests_session.get(url=base_url + "/system/status").json()
            if server_info["appName"].lower() == "sonarr":
                filter_type = "series"
            else:
                filter_type = "movieIndex"

            sonarr_filter = {
                "type": filter_type,
                "label": user_tag_string,
                "filters": [{"key": "tags", "value": [tag_id], "type": "contains"}],
            }
            requests_session.post(url=base_url + "/customfilter", json=sonarr_filter)
            logger.info(f"http://{server} - {user_tag_string} - Created Filter")
        else:
            logger.warning(
                f"http://{server} - {user_tag_string} - Filter Already Exists - Skipping"
            )


def main():
    arr_api_key_map = map_arr_api_keys()
    overseer_requests_session = make_session(api_key=OVERSEER_API_KEY)
    all_users = overseer_requests_session.get(
        url=OVERSEER_URL + "/user", params={"take": NUM_USERS_TO_PULL}
    ).json()["results"]

    for user in all_users:
        user_data = overseer_requests_session.get(
            url=OVERSEER_URL + f"/user/{user['id']}"
        ).json()
        # My users don't have a ton of requests, so I didn't want to bother figuring out pagination.
        # This should just pull all requests (unless you have users who request A TON)
        user_requests = overseer_requests_session.get(
            url=OVERSEER_URL + f"/user/{user['id']}/requests",
            params={"take": NUM_MAX_USER_REQUESTS},
        ).json()["results"]
        user_tag_string = (
            str(user_data["id"]) + " - " + user_data["displayName"].lower()
        )
        separator = "\n==============================================\n"
        print(
            separator
            + f"         Tagging {user_data['displayName']}'s Media"
            + separator
        )

        if len(user_requests) > 0:
            tag_requests_from_user(arr_api_key_map, user_requests, user_tag_string)
            create_tag_filter_in_application(arr_api_key_map, user_tag_string)
        else:
            logger.warning(f"{user['displayName']} has no requests - Skipping")


if __name__ == "__main__":
    # create logger with 'spam_application'
    logger = logging.getLogger("overseer_tagger")
    logger.setLevel(logging.INFO)
    fh = logging.StreamHandler()
    fh.setLevel(logging.DEBUG)
    ch = logging.StreamHandler()
    ch.setLevel(logging.ERROR)

    # create formatter and add it to the handlers
    formatter = logging.Formatter(
        "%(asctime)s - %(name)s - %(levelname)s - %(message)s"
    )
    fh.setFormatter(formatter)
    ch.setFormatter(formatter)
    # add the handlers to the logger
    logger.addHandler(fh)
    logger.addHandler(ch)
    main()

@FoxxMD
Copy link

FoxxMD commented Dec 3, 2024

@crodgers89 and others experiencing NoneType and arr_object_data errors -- replace the two if statements at the beginning of tag_request_from_user (lines 131-138) with this:

    if media_request["status"] == 4:
        raise ValueError(
            f"{user_tag_string} - In Status 4 (cannot use) for Request {media_request['id']} - Skipping"
        )
    if media_request['media'] == None or "serviceUrl" not in media_request["media"]:
        raise ValueError(
            f"{user_tag_string} - Request {media_request['id']} has no ServiceURL associated with it - Skipping"
        )

@nemchik
Copy link

nemchik commented Dec 31, 2025

For anyone who finds this now and has moved to seerr https://github.com/seerr-team/seerr they changed the tag naming to not include spaces.

Here is a modified version of the script that:

  • No longer includes the spaces in the tag names
  • Can be used with IP or HOSTNAME (ex: docker)
  • Can be used with External URLs
  • Supports https when useSsl is checked in seerr (still supports http also)
  • Replaces all instances of overseer (missing r) with seerr (nothing functional here, just spelling/naming)
  • Includes a shebang at the top of the file so it can be more easily executed as a shell command
  • Includes @FoxxMD 's adjustments (verifying media exists on the request)
  • Includes @northirid / @stagenethome 's adjustments (verifying the regex on the serviceUrl actually finds a match)

If I got the crediting wrong (as far as who came up with the fixes above) I am sorry in advance.

I tested all of the changes with internal IP:PORT, internal HOSTNAME:PORT (docker), and external URL (reverse proxy). I just swapped my own settings around to test everything. No harm in more testing if anyone else wants to. Probably DO NOT use this on the original Overseerr since they still have spaces in tag names, but at this point it's pretty safe to migrate to seerr.

@HStep20 It would be great if you added a revision so this new version appears right up top for future users.

#!/usr/bin/env python3

"""

https://gist.github.com/HStep20/d6c5350bbcc12e40b1c9cdf7d9178c16

This script will read your Seerr data and create/apply user tags to all of your sonarr/radarr instances, then create a filter in each connected -arr application for the users you specify.
It is forward compatible with the future User Tagging feature of seerr, and formats the tag in the same 'id-lowercase username' pattern Seerr will
It only uses built in python libraries, so you should be able to download and run without much hassle.

Steps to use:
1. Add your Seerr API key
2. Add your Seerr URL (works with external domain url if needed)
2.5 Edit the Default values for number of users/requests
3. Press Play and wait

"""


import requests
from requests import HTTPError
from typing import Any
import re
import logging
from requests.models import Response
from urllib3.util import Retry


SEERR_API_KEY = "YOURAPIKEY"
SEERR_URL = "http://YOURIP:PORT/api/v1"


# I didn't want to figure out Pagination, so I set defaults to what I felt would be the maximum someone could have.
# If you have more than 100 users, or a user has more than 1000 requests, you'll need to update these values to reflect that
NUM_USERS_TO_PULL = 100
NUM_MAX_USER_REQUESTS = 1000


def handle_response(response: Response, *args: Any, **kwargs: Any) -> None:
    """Handles the Response and throws an error if there is an error with the request

    Args:
        response (Response): The response of the call being made

    Raises:
        requests.exceptions.HTTPError: Error raised by API
    """
    try:
        response.raise_for_status()
    except requests.exceptions.HTTPError as e:
        logger.error(
            f"{response.status_code} - {response.request.url} - {response.text}"
        )
        raise requests.exceptions.HTTPError(f"{str(e)}: {response.text}") from e


def make_session(api_key: str) -> requests.Session:
    """Creates a Requests Session with headers and logging set up

    Args:
        api_key (str): API key of service being accessed with the session

    Returns:
        requests.Session: Requests session with overhead added
    """
    session = requests.Session()
    session.hooks["response"] = [handle_response]
    adapter = requests.adapters.HTTPAdapter(
        max_retries=Retry(
            total=10,
            backoff_factor=5,
            status_forcelist=[429, 500],
            allowed_methods=["GET", "POST", "PUT"],
        )
    )

    session.mount("https://", adapter)
    session.headers.update({"X-Api-Key": api_key})
    return session


def map_arr_api_keys() -> dict[str, str]:
    """Gets all sonarr/radarr servers + api keys from Seerr and returns a map of them

    Returns:
        dict[str,str]: A Map of -arr server_urls : api_keys
    """
    requests_session = make_session(api_key=SEERR_API_KEY)
    sonarr_servers = requests_session.get(url=SEERR_URL + "/settings/sonarr").json()
    radarr_servers = requests_session.get(url=SEERR_URL + "/settings/radarr").json()
    all_servers = sonarr_servers + radarr_servers

    api_key_map = {}
    for server in all_servers:
        # if externalUrl is empty, build map_key from hostname, port and useSsl
        if server["externalUrl"] == "":
            protocol = "https" if server["useSsl"] else "http"
            map_key = f"{protocol}://{server['hostname']}:{server['port']}"
        else:
            # Remove any trailing slashes from externalUrl
            map_key = server["externalUrl"].rstrip("/")
        api_key_map[map_key] = server["apiKey"]
    return api_key_map


def tag_requests_from_user(
    arr_api_key_map: dict[str, str],
    user_requests: dict[str, Any],
    user_tag_string: str,
) -> None:
    """Tags all the requests for each user

    Args:
        arr_api_key_map (dict[str, str]): Map of the server URL and API key for each service connected to seerr
        user_requests (dict[str, Any]): list of user requests
        user_tag_string (str): Formatted user tag name. Follows "id-lowercase username" format
    """
    for media_request in user_requests:
        try:
            tag_request_from_user(
                media_request=media_request,
                arr_api_key_map=arr_api_key_map,
                user_tag_string=user_tag_string,
            )
        except ValueError as e:
            logger.error(e)


def tag_request_from_user(
    media_request: dict[str, Any], arr_api_key_map: dict[str, str], user_tag_string: str
):
    """Reads request data from Seerr, and finds the media within Sonarr/Radarr, and applies a user tag to that item in its respective server

    Args:
        media_request (dict[str, Any]): The Media Request metadata provided by Seerr API
        arr_api_key_map (dict[str, str]): Map of all servers connected to Seerr, and their API keys
        user_tag_string (str): Formatted user tag name. Follows "id-lowercase username" format
    """
    if media_request["status"] == 4:
        raise ValueError(
            f"{user_tag_string} - In Status 4 (cannot use) for Request {media_request['id']} - Skipping"
        )
    if media_request['media'] == None or "serviceUrl" not in media_request["media"]:
        raise ValueError(
            f"{user_tag_string} - Request {media_request['id']} has no ServiceURL associated with it - Skipping"
        )

    # Unfortunately the provided service URL doesn't include the /v3/api slug, so we have to build our own
    non_api_url = media_request["media"]["serviceUrl"]
    logger.info(f"Non API URL: {non_api_url}")
    base_url_matches = re.findall(r"https?://[^/]+", non_api_url)
    if not base_url_matches:
        raise ValueError(f"Service URL {non_api_url} does not contain a valid base URL")
    base_url = base_url_matches[0]
    logger.info(f"Base URL: {base_url}")
    service_url = base_url + "/api/v3"

    if media_request["type"] == "tv":
        request_path = "/series"
        request_params = {"tvdbId": media_request["media"]["tvdbId"]}
    else:
        request_path = "/movie"
        request_params = {"tmdbId": media_request["media"]["tmdbId"]}

    requests_session = make_session(api_key=arr_api_key_map[base_url])
    arr_object_data = requests_session.get(
        url=service_url + request_path,
        params=request_params,
    ).json()
    if len(arr_object_data) == 0:
        raise ValueError(
            f"{base_url} - {media_request['media']['externalServiceSlug']} is in the user's request list, but not found on server - Skipping"
        )
    arr_object_data = arr_object_data[0]

    tag_data = requests_session.get(
        url=service_url + "/tag",
    ).json()
    # Because each request has its own server associated with it, we should check for the tag each time.
    # The alternate way would be to group by server, then do one check per server, but we don't need to worry about api calls here
    tag_id = get_tag_id(tag_data, user_tag_string)
    if tag_id == -1:
        logger.warning(f'{base_url} - Tag "{user_tag_string}" not found in server.')
        tag_creation_response = create_user_tag(
            requests_session=requests_session,
            service_url=service_url,
            user_tag_string=user_tag_string,
        )
        if tag_creation_response.ok:
            tag_id = tag_creation_response.json()["id"]
            logger.info(f"{base_url} - Created tag {user_tag_string} with id: {tag_id}")

        else:
            raise HTTPError(f'{base_url} - Failed to create tag "{user_tag_string}"')

    if tag_id in arr_object_data["tags"]:
        logger.info(
            f"{base_url} - {user_tag_string} - {arr_object_data['title']} already has user tag"
        )
    else:
        tag_addition_response = tag_media_with_user_data(
            requests_session=requests_session,
            service_url=service_url,
            request_path=request_path,
            request_params=request_params,
            arr_object_data=arr_object_data,
            tag_id=tag_id,
        )
        if tag_addition_response.ok:
            logger.info(
                f"{base_url} - {user_tag_string} - Tagged {arr_object_data['title']}"
            )
        else:
            raise HTTPError(tag_addition_response.text)


def get_tag_id(tag_data: dict[str, Any], user_tag_string: str) -> int:
    """Gets the tagId of the user's tag from the respective server.

    Args:
        tag_data (dict[str, Any]): The Tag Data from the -arr api
        user_tag_string (str): The tag name for the current seerr user

    Returns:
        int: The tagId of the respective -arr instance. Returns -1 if it doesn't exist
    """
    for tag in tag_data:
        if tag["label"] == user_tag_string:
            return tag["id"]
    return -1


def create_user_tag(
    requests_session: requests.Session,
    service_url: str,
    user_tag_string: str,
) -> dict[str, Any]:
    """Create a user tag in Sonarr/Radarr

    Args:
        requests_session (requests.Session): Requests session for app you are creating tag in
        service_url (str): the URL of the app you are creating the tag in
        user_tag_string (str): tag string, which will be the tag name

    Returns:
        dict[str, Any]: Tag creation return data, including new ID
    """
    return requests_session.post(
        url=service_url + "/tag",
        json={"label": user_tag_string},
    )


def tag_media_with_user_data(
    requests_session: requests.Session,
    service_url: str,
    request_path: str,
    request_params: dict[str, Any],
    arr_object_data: dict[str, Any],
    tag_id: int,
) -> requests.Response:
    """Applies tag to selected media object

    Args:
        requests_session (requests.Session): Requests session for app you are apply tag in
        service_url (str): URL of app
        request_path (str): Slug to interact with media object
        request_params (dict[str, Any]): Extra request params to dictate the media object
        arr_object_data (dict[str, Any]): Media Object metadata from Sonarr/Radarr
        tag_id (int): Tag ID to apply to arr_object_data

    Returns:
        requests.Response: Response from tag call
    """
    if tag_id not in arr_object_data["tags"]:
        arr_object_data["tags"].append(tag_id)

    return requests_session.put(
        url=service_url + request_path,
        params=request_params,
        json=arr_object_data,
    )


def create_tag_filter_in_application(
    arr_api_key_map: dict[str, str], user_tag_string: str
):
    """Create a custom filter in each server for the user tag

    Args:
        arr_api_key_map (dict[str, str]): Map of -arr URLs:API Keys
        user_tag_string (str): Tag Name for the current user
    """
    for server in arr_api_key_map:
        base_url = server + "/api/v3"
        requests_session = make_session(api_key=arr_api_key_map[server])

        current_filters = requests_session.get(url=base_url + "/customfilter").json()
        current_filter_labels = [x["label"] for x in current_filters]
        if user_tag_string not in current_filter_labels:
            tag_info = requests_session.get(url=base_url + "/tag").json()
            tag_id = get_tag_id(tag_data=tag_info, user_tag_string=user_tag_string)
            server_info = requests_session.get(url=base_url + "/system/status").json()
            if server_info["appName"].lower() == "sonarr":
                filter_type = "series"
            else:
                filter_type = "movieIndex"

            sonarr_filter = {
                "type": filter_type,
                "label": user_tag_string,
                "filters": [{"key": "tags", "value": [tag_id], "type": "contains"}],
            }
            requests_session.post(url=base_url + "/customfilter", json=sonarr_filter)
            logger.info(f"{server} - {user_tag_string} - Created Filter")
        else:
            logger.warning(
                f"{server} - {user_tag_string} - Filter Already Exists - Skipping"
            )


def main():
    arr_api_key_map = map_arr_api_keys()
    seerr_requests_session = make_session(api_key=SEERR_API_KEY)
    all_users = seerr_requests_session.get(
        url=SEERR_URL + "/user", params={"take": NUM_USERS_TO_PULL}
    ).json()["results"]

    for user in all_users:
        user_data = seerr_requests_session.get(
            url=SEERR_URL + f"/user/{user['id']}"
        ).json()
        # My users don't have a ton of requests, so I didn't want to bother figuring out pagination.
        # This should just pull all requests (unless you have users who request A TON)
        user_requests = seerr_requests_session.get(
            url=SEERR_URL + f"/user/{user['id']}/requests",
            params={"take": NUM_MAX_USER_REQUESTS},
        ).json()["results"]
        user_tag_string = (
            str(user_data["id"]) + "-" + user_data["displayName"].lower()
        )
        separator = "\n==============================================\n"
        print(
            separator
            + f"         Tagging {user_data['displayName']}'s Media"
            + separator
        )

        if len(user_requests) > 0:
            tag_requests_from_user(arr_api_key_map, user_requests, user_tag_string)
            create_tag_filter_in_application(arr_api_key_map, user_tag_string)
        else:
            logger.warning(f"{user['displayName']} has no requests - Skipping")


if __name__ == "__main__":
    # create logger with 'spam_application'
    logger = logging.getLogger("seerr_tagger")
    logger.setLevel(logging.INFO)
    fh = logging.StreamHandler()
    fh.setLevel(logging.DEBUG)
    ch = logging.StreamHandler()
    ch.setLevel(logging.ERROR)

    # create formatter and add it to the handlers
    formatter = logging.Formatter(
        "%(asctime)s - %(name)s - %(levelname)s - %(message)s"
    )
    fh.setFormatter(formatter)
    ch.setFormatter(formatter)
    # add the handlers to the logger
    logger.addHandler(fh)
    logger.addHandler(ch)
    main()

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment