Created
September 24, 2025 01:34
-
-
Save leyuskckiran1510/4816f45ac8113d4394102bad7e304379 to your computer and use it in GitHub Desktop.
Read Mails from servers using Imap
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| import email | |
| import imaplib | |
| import logging | |
| from dataclasses import dataclass | |
| from email.header import decode_header | |
| from email.message import Message | |
| from typing import Any, cast | |
| logger = logging.getLogger(__name__) | |
| @dataclass | |
| class FetchedEmails(): | |
| a_from: str | |
| to: str | |
| subject: str | |
| body: Any | |
| def get_email_header(msg: Message, header_name: str) -> str: | |
| header, encoding = decode_header(msg.get(header_name, ""))[0] | |
| if isinstance(header, bytes): | |
| header = header.decode(encoding or "utf-8", errors="ignore") | |
| return header | |
| def fetch_emails( | |
| host: str, | |
| username: str, | |
| password: str, | |
| *, | |
| _filter: str = "UNSEEN", | |
| ) -> list[FetchedEmails]: | |
| imap = imaplib.IMAP4_SSL(host) | |
| logger.info(f"Login into account {username=}, {password}") | |
| imap.login(username, password) | |
| imap.select("Github") | |
| imap.subscribe("Inbox") | |
| # for filters | |
| # https://www.marshallsoft.com/ImapSearch.htm | |
| status, messages = imap.search(None, _filter) | |
| if status != "OK" or not messages or not messages[0]: | |
| imap.logout() | |
| logger.warning("No new mail found.") | |
| return [] | |
| email_ids: list[str] = messages[0].split() | |
| fetched_emails: list[FetchedEmails] = [] | |
| for eid in email_ids: | |
| status, msg_data = imap.fetch(eid, "(RFC822)") | |
| if status == "OK" and msg_data: | |
| for part in msg_data: | |
| if isinstance(part, tuple): | |
| msg: Message = email.message_from_bytes(part[1]) | |
| subject = get_email_header(msg, "Subject") | |
| _from = get_email_header(msg, "From") | |
| to = get_email_header(msg, "To") | |
| fetched_emails.append( | |
| FetchedEmails( | |
| subject=subject, | |
| body=msg, | |
| a_from=_from, | |
| to=to, | |
| ) | |
| ) | |
| imap.logout() | |
| return fetched_emails | |
| def get_message_content(msg: Message) -> str: | |
| if msg.is_multipart(): | |
| parts: list[Message] = cast(list[Message], msg.get_payload()) | |
| texts: list[str] = [] | |
| for part in parts: | |
| if part.get_content_type() == "text/plain": | |
| payload = part.get_payload(decode=True) | |
| if payload is not None and isinstance(payload, bytes): | |
| texts.append( | |
| payload.decode( | |
| part.get_content_charset() or "utf-8", errors="replace" | |
| ) | |
| ) | |
| return "\n".join(texts) | |
| else: | |
| payload = msg.get_payload(decode=True) | |
| if payload is not None and isinstance(payload, bytes): | |
| return payload.decode( | |
| msg.get_content_charset() or "utf-8", errors="replace" | |
| ) | |
| return "" | |
| HOST_SERVER = "imap.gmail.com" | |
| EMAIL = "" | |
| PASSWORD = "" | |
| for eml in (fetch_emails(HOST_SERVER,EMAIL,PASSWORD | |
| ,_filter="ALL",)): | |
| body = get_message_content(eml.body) | |
| print(f"{eml.subject=},{eml.a_from=},{eml.to=},{body=}") |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment