Created
January 3, 2026 14:17
-
-
Save aliakseis/f873b7dab60106038ef6295ca8455679 to your computer and use it in GitHub Desktop.
Export Skype to RTF
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| #!/usr/bin/env python3 | |
| # -*- coding: utf-8 -*- | |
| import json | |
| import html | |
| import re | |
| import datetime | |
| import logging | |
| # Configure diagnostic logging. | |
| logging.basicConfig( | |
| level=logging.DEBUG, # Use DEBUG for detailed messages. | |
| format='%(asctime)s [%(levelname)s] %(message)s', | |
| datefmt='%Y-%m-%d %H:%M:%S' | |
| ) | |
| #################################### | |
| # Helper functions for RTF output # | |
| #################################### | |
| def rtf_escape(text): | |
| res = "" | |
| for ch in text: | |
| if ch in ['\\', '{', '}']: | |
| res += '\\' + ch | |
| else: | |
| try: | |
| ch.encode('cp1251') | |
| res += ch | |
| except UnicodeEncodeError: | |
| res += r"\u" + str(ord(ch)) + "?" | |
| return res | |
| def convert_html_to_rtf(text): | |
| # Step 2. Replace known newline markers. | |
| # Normalize newlines first. | |
| text = text.replace("\r\n", "\n") | |
| # Replace custom ">>" markers with a newline. | |
| text = text.replace(">>", "\n") | |
| # Replace <br> (or <br/>) tags with newline. | |
| text = re.sub(r'<br\s*/?>', "\n", text, flags=re.IGNORECASE) | |
| # Step 3. Process bold sections. | |
| # Convert <b>...</b> into markers. (Using a DOTALL flag to allow multiline.) | |
| text = re.sub(r'<b>(.*?)</b>', r'<<<BSTART>>>\1<<<BEND>>>', text, flags=re.IGNORECASE|re.DOTALL) | |
| # Step 4. Remove any other HTML tags. | |
| # Here we remove any substring starting with < and ending with > | |
| text = re.sub(r'<[^>]+>', '', text) | |
| # Step 1. Unescape HTML entities. | |
| text = html.unescape(text) | |
| # At this point, text still contains literal "\n" characters and our bold markers. | |
| # Step 5. Protect our markers from RTF escaping. | |
| # Use temporary tokens that are unlikely to appear in the text. | |
| text = text.replace("<<<BSTART>>>", "___BSTART___") | |
| text = text.replace("<<<BEND>>>", "___BEND___") | |
| # Step 6. RTF-escape the text. | |
| # (This function should output characters directly if they’re CP1251-encodable.) | |
| escaped = rtf_escape(text) | |
| # Step 7. Replace literal newline characters with RTF line breaks. | |
| escaped = escaped.replace("\n", r'\line ') | |
| # Step 8. Restore our bold markers with RTF bold commands. | |
| escaped = escaped.replace("___BSTART___", r'{\b ') | |
| escaped = escaped.replace("___BEND___", r'}') | |
| return escaped | |
| ########################################## | |
| # JSON Loading and Conversation Handling # | |
| ########################################## | |
| def load_endpoints(filename): | |
| """ | |
| Load endpoints.json and build a mapping from endpointId to display name. | |
| Expects structure: { "endpoints": [ { "endpointId": "...", ... }, ... ] } | |
| """ | |
| logging.info("Loading endpoints from %s", filename) | |
| try: | |
| with open(filename, "r", encoding="utf-8") as f: | |
| raw = f.read() | |
| logging.debug("Raw endpoints content (first 300 chars): %s", raw[:300]) | |
| data = json.loads(raw) | |
| except Exception as e: | |
| logging.error("Error loading endpoints file: %s", e) | |
| return {} | |
| mapping = {} | |
| endpoints_list = [] | |
| if isinstance(data, dict) and "endpoints" in data: | |
| endpoints_list = data["endpoints"] | |
| elif isinstance(data, list): | |
| endpoints_list = data | |
| else: | |
| logging.warning("Endpoints file has unexpected structure.") | |
| for ep in endpoints_list: | |
| ep_id = ep.get("endpointId") | |
| display_name = ep.get("displayName") or ep.get("username") or ep_id | |
| if ep_id: | |
| mapping[ep_id] = display_name | |
| logging.debug("Loaded endpoints mapping: %s", mapping) | |
| return mapping | |
| def load_conversations(filename): | |
| """ | |
| Load conversations from messages.json. | |
| Expects structure: | |
| { | |
| "userId": "...", | |
| "exportDate": "...", | |
| "conversations": [ { "id": "...", "displayName": "...", "MessageList": [ ... ] }, ... ] | |
| } | |
| """ | |
| logging.info("Loading conversations from %s", filename) | |
| try: | |
| with open(filename, "r", encoding="utf-8") as f: | |
| raw = f.read() | |
| logging.debug("Raw messages content (first 300 chars): %s", raw[:300]) | |
| data = json.loads(raw) | |
| except Exception as e: | |
| logging.error("Error loading messages file: %s", e) | |
| return [] | |
| if isinstance(data, dict) and "conversations" in data: | |
| convs = data["conversations"] | |
| elif isinstance(data, list): | |
| convs = data | |
| else: | |
| logging.warning("Messages file has unexpected structure.") | |
| convs = [] | |
| logging.info("Loaded %d conversation(s).", len(convs)) | |
| return convs | |
| def format_timestamp(ts): | |
| """ | |
| Convert a timestamp (seconds or milliseconds) to a human-readable string. | |
| Timestamps with more than 10 digits are assumed to be in milliseconds. | |
| """ | |
| if ts is None: | |
| return "" | |
| try: | |
| ts = int(ts) | |
| except (ValueError, TypeError): | |
| return "" | |
| if len(str(ts)) > 10: | |
| ts = ts / 1000.0 | |
| dt = datetime.datetime.fromtimestamp(ts) | |
| return dt.strftime("%Y-%m-%d %H:%M:%S") | |
| def sanitize_filename(name): | |
| """ | |
| Remove or replace characters that are not allowed in file names. | |
| """ | |
| return re.sub(r'[\\/*?:"<>|]', "_", name) | |
| ####################################### | |
| # RTF Generation Function # | |
| ####################################### | |
| def write_rtf_file(conversation_id, partner_name, messages, my_id, endpoints): | |
| """ | |
| Generate an RTF file for a conversation. | |
| The header uses CP1251 (for Cyrillic) and Unicode directives. | |
| Each message line includes a timestamp and the sender in bold, | |
| and HTML formatting in the content is converted to RTF. | |
| """ | |
| logging.info("Processing conversation %s with partner: %s", conversation_id, partner_name) | |
| # Sort messages by timestamp (oldest first); adjust order if necessary. | |
| #messages_sorted = sorted(messages, key=lambda m: m.get("timestamp", 0)) | |
| # If the sorted order is reversed from what you expect, you can reverse the list: | |
| #messages_sorted = list(reversed(messages_sorted)) | |
| messages_sorted = list(reversed(messages)) | |
| # RTF header with Unicode and CP1251 info. | |
| rtf = (r"{\rtf1\ansi\ansicpg1251\uc1\deff0" | |
| r"{\fonttbl{\f0\fnil\fcharset204 Times New Roman;}}" "\n") | |
| rtf += r"\f0\fs24" "\n" | |
| header = r"{\b Conversation with " + rtf_escape(partner_name) + "}" + r"\line" + "\n\n" | |
| rtf += header | |
| for msg in messages_sorted: | |
| ts = format_timestamp(msg.get("timestamp")) | |
| sender_id = msg.get("sender") or msg.get("from") or "unknown" | |
| sender_name = "Me" if sender_id == my_id else endpoints.get(sender_id, sender_id) | |
| content = msg.get("content", "") | |
| # Convert HTML formatting (e.g. <b>, <br>) to RTF. | |
| content_rtf = convert_html_to_rtf(content) | |
| line = "" | |
| if ts: | |
| line += ts + " - " | |
| line += r"{\b " + rtf_escape(sender_name) + r":} " + content_rtf + r"\line" | |
| rtf += line + "\n" | |
| rtf += "}" | |
| filename = sanitize_filename("chat_" + partner_name + "_" + conversation_id + ".rtf") | |
| try: | |
| # Write the file using CP1251 encoding. | |
| with open(filename, "w", encoding="cp1251") as f: | |
| f.write(rtf) | |
| logging.info("Generated file: %s", filename) | |
| except Exception as e: | |
| logging.error("Error writing file %s: %s", filename, e) | |
| def clean_message_endings(text): | |
| """ | |
| Removes unwanted leading/trailing HTML fragments or formatting artifacts, | |
| including multiple consecutive tags and anything after '...'. | |
| """ | |
| # Remove all tags from the beginning of the text | |
| #text = re.sub(r'^\s*(<[^>]+>\s*)+', '', text) | |
| # Remove '...' and everything after it, including trailing tags | |
| text = re.sub(r'\.\.\.\s*(<[^>]+>\s*)*$', '', text) | |
| return text.strip() | |
| def remove_redundant_messages(messages): | |
| """ | |
| Removes redundant messages by checking consecutive duplicates from the same sender. | |
| If a message is a shorter version of the next one, remove it. | |
| """ | |
| cleaned_messages = [] | |
| for msg in messages: | |
| if not cleaned_messages: | |
| cleaned_messages.append(msg) | |
| continue | |
| last_msg = cleaned_messages[-1] | |
| # Use a safer sender lookup (either "sender" or "from") | |
| sender = msg.get("sender") or msg.get("from") or msg.get("author", "unknown") | |
| last_sender = last_msg.get("sender") or last_msg.get("from") or last_msg.get("author", "unknown") | |
| # Clean trailing artifacts before checking redundancy | |
| cleaned_content = clean_message_endings(msg.get("content", "")) | |
| last_cleaned = clean_message_endings(last_msg.get("content", "")) | |
| # Check if it's just a longer version of the previous message | |
| if sender == last_sender and last_cleaned.startswith(cleaned_content): | |
| continue # Skip redundant message | |
| cleaned_messages.append(msg) | |
| return cleaned_messages | |
| def filter_until_stable(messages): | |
| """ | |
| Runs remove_redundant_messages in both forward and reverse order until no more reductions occur. | |
| """ | |
| while True: | |
| filtered = remove_redundant_messages(messages) | |
| filtered_reversed = remove_redundant_messages(list(reversed(filtered))) | |
| filtered_final = list(reversed(filtered_reversed)) | |
| # Stop looping if no further reduction in message count | |
| if len(filtered_final) == len(messages): | |
| break | |
| messages = filtered_final # Update messages for the next iteration | |
| return messages | |
| def extract_bing_response_id(text): | |
| """Extracts the <bing-response id="..."> value if it exists.""" | |
| match = re.search(r'<bing-response id="(\d+)"', text) | |
| return match.group(1) if match else None | |
| def keep_largest_message_preserving_order(messages): | |
| """ | |
| Keeps only the longest message for each <bing-response id="..."> tag while maintaining order. | |
| Messages without a bing-response ID are preserved. | |
| """ | |
| seen_bing_ids = {} # Track longest messages by bing-response ID | |
| result = [] # Ordered list to maintain sequence | |
| for msg in messages: | |
| content = msg.get("content", "") | |
| bing_id = extract_bing_response_id(content) | |
| if bing_id: | |
| # Keep only the longest message for each bing-response ID | |
| if bing_id not in seen_bing_ids or len(content) > len(seen_bing_ids[bing_id]["content"]): | |
| seen_bing_ids[bing_id] = msg | |
| # Add longest messages for each bing-response ID back in order | |
| for msg in messages: | |
| bing_id = extract_bing_response_id(msg.get("content", "")) | |
| if bing_id: | |
| if seen_bing_ids[bing_id] == msg: | |
| result.append(msg) | |
| else: | |
| # Directly preserve messages without a bing-response ID | |
| result.append(msg) | |
| return result | |
| ########################## | |
| # Main Processing Logic # | |
| ########################## | |
| def main(): | |
| logging.info("Starting export_skype_to_rtf process.") | |
| endpoints_file = "endpoints.json" | |
| messages_file = "messages.json" | |
| endpoints = load_endpoints(endpoints_file) | |
| conversations = load_conversations(messages_file) | |
| if not conversations: | |
| logging.error("No conversations loaded. Exiting.") | |
| return | |
| # Replace this with your actual Skype ID. | |
| my_id = "lxa.sanko" | |
| conversation_count = 0 | |
| for conv in conversations: | |
| conv_id = conv.get("id", "unknown") | |
| partner_name = conv.get("displayName") or conv_id | |
| # Retrieve messages from "messages" or "MessageList" | |
| messages = conv.get("messages") or conv.get("MessageList") or [] | |
| if not messages: | |
| logging.warning("No messages found for conversation %s", conv_id) | |
| continue | |
| #filtered_messages = remove_redundant_messages(messages) | |
| #filtered_messages = remove_redundant_messages(reversed(filtered_messages)) | |
| #filtered_messages = filter_until_stable(messages) | |
| filtered_messages = keep_largest_message_preserving_order(messages) | |
| write_rtf_file(conv_id, partner_name, filtered_messages, my_id, endpoints) | |
| conversation_count += 1 | |
| logging.info("Export completed. Processed %d conversation(s).", conversation_count) | |
| if __name__ == "__main__": | |
| main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment