Skip to content

Instantly share code, notes, and snippets.

@eros18123
Created January 8, 2026 19:00
Show Gist options
  • Select an option

  • Save eros18123/dc63002855e29c482e113d7c7ef2f96b to your computer and use it in GitHub Desktop.

Select an option

Save eros18123/dc63002855e29c482e113d7c7ef2f96b to your computer and use it in GitHub Desktop.
chat nova versao
import sys
import os
import uuid
import tempfile
import threading
import json
import time
import re
import base64
import mimetypes
import requests
from aqt import mw
from aqt.qt import (
QAction, QDialog, QWidget, QVBoxLayout,
QLineEdit, QPushButton, QLabel, pyqtSignal, QColor,
QDialogButtonBox, Qt, QMenu, QInputDialog, QHBoxLayout, QFormLayout,
QTableWidget, QTableWidgetItem, QHeaderView, QTextEdit, QSplitter,
QListWidget, QListWidgetItem, QTabWidget, QCheckBox
)
from aqt.utils import tooltip
from aqt import gui_hooks
from anki.sound import AVTag
from datetime import datetime, timedelta
try:
from aqt.qt import QWebEngineView
except ImportError:
QWebEngineView = None
# ... (Cole aqui as classes inalteradas: FirebaseAPI, AuthManager, LiveViewWindow, SimpleChatWidget, NumericTableWidgetItem)
class FirebaseAPI:
def __init__(self, base_url, api_key):
if not base_url.endswith('/'): base_url += '/'
self.base_url = base_url
self.api_key = api_key
self.auth_url_signup = f"https://identitytoolkit.googleapis.com/v1/accounts:signUp?key={self.api_key}"
self.auth_url_signin = f"https://identitytoolkit.googleapis.com/v1/accounts:signInWithPassword?key={self.api_key}"
self.auth_url_refresh = f"https://securetoken.googleapis.com/v1/token?key={self.api_key}"
self.auth_url_change = f"https://identitytoolkit.googleapis.com/v1/accounts:update?key={self.api_key}"
def _send_request(self, url, payload):
try:
r = requests.post(url, data=json.dumps(payload), timeout=10)
r.raise_for_status()
return r.json(), None
except requests.exceptions.RequestException as e:
try:
error_data = e.response.json()
return None, error_data.get("error", {}).get("message", "UNKNOWN_ERROR")
except: return None, str(e)
except Exception as e: return None, str(e)
def signup_user(self, email, password): return self._send_request(self.auth_url_signup, {"email": email, "password": password, "returnSecureToken": True})
def signin_user(self, email, password): return self._send_request(self.auth_url_signin, {"email": email, "password": password, "returnSecureToken": True})
def refresh_token(self, refresh_token): return self._send_request(self.auth_url_refresh, {"grant_type": "refresh_token", "refresh_token": refresh_token})
def change_password(self, id_token, new_password): return self._send_request(self.auth_url_change, {"idToken": id_token, "password": new_password, "returnSecureToken": False})
def get_data(self, path="", id_token=None, params=""):
try:
url = f"{self.base_url}{path}.json?auth={id_token}{'&' if params else ''}{params}"
r = requests.get(url, timeout=10)
r.raise_for_status()
return r.json()
except: return None
def put_data(self, path, data, id_token=None):
try:
url = f"{self.base_url}{path}.json?auth={id_token}"
requests.put(url, data=json.dumps(data), timeout=10).raise_for_status()
except Exception as e: print(f"AnkiChat: Erro PUT em {path}: {e}")
def patch_data(self, path, data, id_token=None):
try:
url = f"{self.base_url}{path}.json?auth={id_token}"
requests.patch(url, data=json.dumps(data), timeout=10).raise_for_status()
except Exception as e: print(f"AnkiChat: Erro PATCH em {path}: {e}")
def post_data(self, path, data, id_token=None, return_name=False):
try:
url = f"{self.base_url}{path}.json?auth={id_token}"
response = requests.post(url, data=json.dumps(data), timeout=10)
response.raise_for_status()
if return_name: return response.json()
except Exception as e:
print(f"AnkiChat: Erro POST em {path}: {e}")
if return_name: raise e
def delete_data(self, path, id_token=None):
try:
url = f"{self.base_url}{path}.json?auth={id_token}"
requests.delete(url, timeout=10).raise_for_status()
except: pass
class BackgroundUpdater:
def __init__(self):
self.is_connected = False
self.nickname = None
self.id_token = None
self.uid = None
self.email = None
self.refresh_token = None
self.firebase = None
self._heartbeat_thread = None
self.stop_heartbeat = threading.Event()
def initialize(self, firebase_api):
self.firebase = firebase_api
def update_state(self, email, uid, id_token, refresh_token, expires_in):
self.email = email
self.uid = uid
self.id_token = id_token
self.refresh_token = refresh_token
self.nickname = email.split('@')[0]
self.is_connected = True
self.start_heartbeat()
def clear_state(self):
self.is_connected = False
self.stop_heartbeat.set()
self.nickname = None
self.id_token = None
self.uid = None
self.email = None
self.refresh_token = None
def start_heartbeat(self):
if self._heartbeat_thread and self._heartbeat_thread.is_alive():
return
self.stop_heartbeat.clear()
self._heartbeat_thread = threading.Thread(target=self._presence_loop, daemon=True)
self._heartbeat_thread.start()
# --- MUDANÇA: Função de coleta de dados expandida ---
def _get_anki_stats_safe(self):
if not mw or not mw.col: return {}
result = {}
evt = threading.Event()
def query_on_main():
try:
start_of_today_ms = (mw.col.sched.day_cutoff - 86400) * 1000
# Queries existentes
total_reviews = mw.col.db.scalar("SELECT count() FROM revlog WHERE id > ?", start_of_today_ms) or 0
total_time_ms = mw.col.db.scalar("SELECT sum(time) FROM revlog WHERE id > ?", start_of_today_ms) or 0
new_done = mw.col.db.scalar("SELECT count() FROM revlog WHERE id > ? AND type = 0 AND lastIvl = 0", start_of_today_ms) or 0
learn_done = mw.col.db.scalar("SELECT count() FROM revlog WHERE id > ? AND type IN (0, 2) AND lastIvl > 0", start_of_today_ms) or 0
review_done = mw.col.db.scalar("SELECT count() FROM revlog WHERE id > ? AND type = 1", start_of_today_ms) or 0
# --- NOVO: Queries para contagem de botões ---
# ease=1: De Novo (Again)
# ease=2: Difícil (Hard)
# ease=3: Bom (Good)
# ease=4: Fácil (Easy)
again_count = mw.col.db.scalar("SELECT count() FROM revlog WHERE id > ? AND ease = 1", start_of_today_ms) or 0
hard_count = mw.col.db.scalar("SELECT count() FROM revlog WHERE id > ? AND ease = 2", start_of_today_ms) or 0
good_count = mw.col.db.scalar("SELECT count() FROM revlog WHERE id > ? AND ease = 3", start_of_today_ms) or 0
easy_count = mw.col.db.scalar("SELECT count() FROM revlog WHERE id > ? AND ease = 4", start_of_today_ms) or 0
# --- NOVO: Cálculo da Retenção para cards maduros (type=1) ---
mature_reviews_count = mw.col.db.scalar("SELECT count() FROM revlog WHERE id > ? AND type = 1", start_of_today_ms) or 0
mature_passes_count = mw.col.db.scalar("SELECT count() FROM revlog WHERE id > ? AND type = 1 AND ease > 1", start_of_today_ms) or 0
retention = (mature_passes_count / mature_reviews_count) if mature_reviews_count > 0 else 0
return {
"reviews_today": total_reviews,
"new_done": new_done,
"learn_done": learn_done,
"review_done": review_done,
"total_time_ms": total_time_ms,
"again_count": again_count,
"hard_count": hard_count,
"good_count": good_count,
"easy_count": easy_count,
"retention": retention,
}
except Exception as e:
print(f"AnkiChat: Erro ao executar query SQL: {e}")
return {}
def wrapper():
nonlocal result
try:
result = query_on_main()
finally:
evt.set()
mw.taskman.run_on_main(wrapper)
evt.wait(timeout=5.0)
return result
def update_presence_on_firebase(self):
if not self.is_connected: return
stats = self._get_anki_stats_safe()
presence_data = {
"uid": self.uid,
"nickname": self.nickname,
"state": "online",
"last_seen": int(time.time()),
**stats # Envia todas as estatísticas, incluindo as novas
}
self.firebase.put_data(f"online/{self.uid}", presence_data, self.id_token)
def trigger_immediate_update(self):
threading.Thread(target=self.update_presence_on_firebase, daemon=True).start()
def _presence_loop(self):
while not self.stop_heartbeat.is_set():
self.update_presence_on_firebase()
time.sleep(30)
background_updater = BackgroundUpdater()
class AuthManager:
# (código inalterado)
def __init__(self, chat_window=None):
self.cw = chat_window
self.firebase = background_updater.firebase
self.addon_path = os.path.dirname(os.path.abspath(__file__))
self.history_file = os.path.join(self.addon_path, 'login_history.json')
self.autologin_file = os.path.join(self.addon_path, 'autologin.json')
def _obfuscate(self, data: str) -> str: return base64.b64encode(data.encode('utf-8')).decode('utf-8')
def _deobfuscate(self, data: str) -> str:
try: return base64.b64decode(data.encode('utf-8')).decode('utf-8')
except: return ""
def load_login_history(self):
if not os.path.exists(self.history_file): return {}
try:
with open(self.history_file, 'r', encoding='utf-8') as f: return json.load(f)
except: return {}
def save_login_history(self, history):
try:
with open(self.history_file, 'w', encoding='utf-8') as f: json.dump(history, f, indent=4)
except: pass
def start_login_thread(self, email, password): threading.Thread(target=self._attempt_login, args=(email, password), daemon=True).start()
def _attempt_login(self, email, password):
response, error = self.firebase.signin_user(email, password)
if response:
uid = response.get('localId')
history = self.load_login_history()
history[email] = self._obfuscate(password)
self.save_login_history(history)
background_updater.update_state(email, uid, response.get('idToken'), response.get('refreshToken'), response.get('expiresIn', '3600'))
if self.cw: self.cw.connection_succeeded.emit(email, uid, response.get('idToken'), password, response.get('refreshToken'), response.get('expiresIn', '3600'))
self.save_autologin_info(email, uid, response.get('refreshToken'))
else:
if self.cw: self.cw.connection_failed.emit(f"Falha no login: {error}")
def save_autologin_info(self, email, uid, refresh_token):
try:
with open(self.autologin_file, 'w', encoding='utf-8') as f: json.dump({'email': email, 'uid': uid, 'refreshToken': refresh_token}, f)
except: pass
def show_register_dialog(self):
dialog = QDialog(self.cw)
dialog.setWindowTitle("Registrar")
layout = QFormLayout(dialog)
email_entry = QLineEdit()
pass_entry = QLineEdit()
pass_entry.setEchoMode(QLineEdit.EchoMode.Password)
layout.addRow("E-mail:", email_entry)
layout.addRow("Senha:", pass_entry)
buttons = QDialogButtonBox(QDialogButtonBox.StandardButton.Ok | QDialogButtonBox.StandardButton.Cancel)
buttons.accepted.connect(dialog.accept)
buttons.rejected.connect(dialog.reject)
layout.addWidget(buttons)
if dialog.exec():
email, password = email_entry.text().strip(), pass_entry.text()
if not email or not password: return
threading.Thread(target=self._attempt_register, args=(email, password), daemon=True).start()
def _attempt_register(self, email, password):
signup_response, signup_error = self.firebase.signup_user(email, password)
if signup_response:
new_nick, uid, id_token = signup_response.get('email').split('@')[0], signup_response.get('localId'), signup_response.get('idToken')
self.firebase.put_data(f"users/{uid}", {"nickname": new_nick}, id_token)
self.firebase.put_data(f"nick_to_uid/{new_nick}", uid, id_token)
background_updater.update_state(email, uid, id_token, signup_response.get('refreshToken'), signup_response.get('expiresIn', '3600'))
self.save_autologin_info(email, uid, signup_response.get('refreshToken'))
background_updater.trigger_immediate_update()
time.sleep(0.5)
if self.cw: self.cw.connection_succeeded.emit(email, uid, id_token, password, signup_response.get('refreshToken'), signup_response.get('expiresIn', '3600'))
else:
if self.cw: self.cw.connection_failed.emit(f"Erro no registro: {signup_error}")
def show_change_password_dialog(self): pass
class LiveViewWindow(QDialog):
# (código inalterado)
def __init__(self, parent, target_uid, target_nick, request_id):
super().__init__(parent)
self.target_uid, self.target_nick, self.request_id = target_uid, target_nick, request_id
self.firebase, self.id_token, self.is_listening, self.last_html_content = background_updater.firebase, background_updater.id_token, True, ""
self.setWindowTitle(f"Visualizando: {self.target_nick}")
self.setMinimumSize(400, 300)
self.layout = QVBoxLayout(self)
if QWebEngineView:
self.webview = QWebEngineView()
self.layout.addWidget(self.webview)
self.webview.hide()
else:
self.webview = QLabel("Módulo WebEngine não disponível.")
self.layout.addWidget(self.webview)
self.status_label = QLabel("Aguardando o usuário iniciar a revisão...")
self.status_label.setAlignment(Qt.AlignmentFlag.AlignCenter)
self.layout.addWidget(self.status_label)
threading.Thread(target=self._listen_for_updates, daemon=True).start()
def _listen_for_updates(self):
path = f"live_view/{self.target_uid}"
while self.is_listening:
try:
card_data = self.firebase.get_data(path, self.id_token)
if card_data: mw.taskman.run_on_main(lambda d=card_data: self.update_card_view(d))
else:
mw.taskman.run_on_main(self.show_stream_ended_message)
break
except Exception as e: print(f"LiveView: Erro ao buscar dados: {e}")
time.sleep(0.5)
def update_card_view(self, data):
if not self.is_listening or not QWebEngineView: return
state = data.get("state", "question")
if state == "waiting":
self.show_waiting_message()
return
self.status_label.hide()
self.webview.show()
q_html, a_html = data.get("q", ""), data.get("a", "")
full_html = f"<html><body><div id='qa'>{q_html}</div>"
if state == "answer": full_html += f"<hr id=answer>{a_html}"
full_html += "</body></html>"
if full_html != self.last_html_content:
self.last_html_content = full_html
self.webview.setHtml(full_html)
def show_waiting_message(self):
if QWebEngineView: self.webview.hide()
self.status_label.setText(f"Aguardando {self.target_nick} iniciar a revisão...")
self.status_label.show()
def show_stream_ended_message(self):
if QWebEngineView: self.webview.hide()
self.status_label.setText(f"{self.target_nick} encerrou a transmissão.")
self.status_label.show()
self.is_listening = False
def closeEvent(self, event):
self.is_listening = False
self.firebase.put_data(f"view_requests/{self.target_uid}/{self.request_id}/status", "closed", self.id_token)
super().closeEvent(event)
class SimpleChatWidget(QWidget):
# (código inalterado)
new_message_received = pyqtSignal()
def __init__(self, parent=None):
super().__init__(parent)
self.displayed_message_ids = set()
self.setup_ui()
def setup_ui(self):
layout = QVBoxLayout(self)
layout.setContentsMargins(0, 0, 0, 0)
self.chat_display = QTextEdit()
self.chat_display.setReadOnly(True)
input_layout = QHBoxLayout()
self.chat_input = QLineEdit()
self.chat_input.setPlaceholderText("Digite sua mensagem...")
self.send_button = QPushButton("Enviar")
input_layout.addWidget(self.chat_input)
input_layout.addWidget(self.send_button)
layout.addWidget(self.chat_display)
layout.addLayout(input_layout)
self.send_button.clicked.connect(self.handle_send_message)
self.chat_input.returnPressed.connect(self.handle_send_message)
def handle_send_message(self):
if not background_updater.is_connected: return
text = self.chat_input.text().strip()
if not text: return
self.chat_input.clear()
message_data = {"uid": background_updater.uid, "nick": background_updater.nickname, "text": text, "timestamp": int(time.time() * 1000)}
threading.Thread(target=background_updater.firebase.post_data, args=("messages", message_data, background_updater.id_token)).start()
def update_messages(self, messages):
sorted_messages = sorted(messages.items(), key=lambda item: item[1].get('timestamp', 0))
new_message_arrived = False
for msg_id, msg_data in sorted_messages:
if msg_id not in self.displayed_message_ids:
if msg_data.get("quiz_event"): continue
if msg_data.get('uid') != background_updater.uid: new_message_arrived = True
self._append_message_to_chat(msg_data.get('nick', '...'), msg_data.get('text', ''), msg_data.get('uid') == background_updater.uid)
self.displayed_message_ids.add(msg_id)
if new_message_arrived: self.new_message_received.emit()
self.chat_display.verticalScrollBar().setValue(self.chat_display.verticalScrollBar().maximum())
def _append_message_to_chat(self, nick, text, is_own_message):
color = "green" if is_own_message else "blue"
self.chat_display.append(f"<b><font color='{color}'>{nick}:</font></b> {text}")
def clear_chat(self):
self.chat_display.clear()
self.displayed_message_ids.clear()
class NumericTableWidgetItem(QTableWidgetItem):
# (código inalterado)
def __lt__(self, other):
try: return float(self.data(Qt.ItemDataRole.UserRole)) < float(other.data(Qt.ItemDataRole.UserRole))
except (ValueError, TypeError): return super().__lt__(other)
class ChatWindow(QDialog):
connection_succeeded = pyqtSignal(str, str, str, str, str, str)
connection_failed = pyqtSignal(str)
user_update_received = pyqtSignal(dict, dict)
message_update_received = pyqtSignal(dict)
show_tooltip_signal = pyqtSignal(str)
deck_invite_received = pyqtSignal(str, dict)
sent_deck_invite_accepted = pyqtSignal(str, str, dict)
deck_ready_for_download = pyqtSignal(str, dict)
view_request_received = pyqtSignal(str, dict)
view_request_accepted = pyqtSignal(str, str, dict)
def __init__(self, parent=None):
super().__init__(parent)
self.setWindowFlags(self.windowFlags() | Qt.WindowType.WindowMinimizeButtonHint | Qt.WindowType.WindowMaximizeButtonHint)
self.RELAY_SERVER_URL = "https://eros18123.pythonanywhere.com"
self.pending_deck_transfers = {}
self.is_being_watched = False
self.active_live_view_window = None
self.pending_view_requests = {}
self.viewers = {}
self.blocked_users = set()
self.last_master_list = {}
self.last_online_stats = {}
self.addon_path = os.path.dirname(os.path.abspath(__file__))
self.settings_file = os.path.join(self.addon_path, 'ankichat_settings.json')
self.auth_manager = AuthManager(self)
self.setup_ui()
self._load_settings()
self.connection_succeeded.connect(self.on_connection_success)
self.connection_failed.connect(self.on_connection_failure)
self.user_update_received.connect(self.update_user_table)
self.message_update_received.connect(self.simple_chat_widget.update_messages)
self.show_tooltip_signal.connect(tooltip)
self.deck_invite_received.connect(self.handle_deck_invite)
self.sent_deck_invite_accepted.connect(self.handle_accepted_invite)
self.deck_ready_for_download.connect(self.handle_ready_for_download)
self.view_request_received.connect(self.handle_view_request)
self.view_request_accepted.connect(self.handle_accepted_view_request)
self.simple_chat_widget.new_message_received.connect(self.handle_new_chat_message_notification)
if background_updater.is_connected:
self.on_connection_success()
else:
self.login_widget.show()
self.status_label.hide()
self.main_chat_widget.hide()
def setup_ui(self):
self.setWindowTitle("AnkiChat")
self.resize(1000, 600) # Aumentar a largura padrão para acomodar as colunas
main_layout = QVBoxLayout(self)
self.login_widget = QWidget()
login_layout = QFormLayout(self.login_widget)
self.email_entry = QLineEdit()
self.password_entry = QLineEdit()
self.password_entry.setEchoMode(QLineEdit.EchoMode.Password)
login_layout.addRow("E-mail:", self.email_entry)
login_layout.addRow("Senha:", self.password_entry)
buttons_widget = QWidget()
buttons_layout = QHBoxLayout(buttons_widget)
self.login_button = QPushButton("Login")
self.register_button = QPushButton("Registrar")
buttons_layout.addWidget(self.login_button)
buttons_layout.addWidget(self.register_button)
login_layout.addRow(buttons_widget)
self.status_label = QLabel("Logando...")
self.status_label.setAlignment(Qt.AlignmentFlag.AlignCenter)
self.main_chat_widget = QWidget()
chat_layout = QVBoxLayout(self.main_chat_widget)
top_bar_layout = QHBoxLayout()
self.logout_button = QPushButton("Logout")
self.viewers_label = QLabel("Assistindo você: Ninguém")
self.viewers_label.hide()
top_bar_layout.addWidget(self.logout_button)
top_bar_layout.addStretch()
top_bar_layout.addWidget(self.viewers_label)
chat_layout.addLayout(top_bar_layout)
self.tabs = QTabWidget()
users_widget = QWidget()
users_layout = QVBoxLayout(users_widget)
users_layout.setContentsMargins(0,0,0,0)
filter_layout = QHBoxLayout()
self.show_active_today_checkbox = QCheckBox("Mostrar somente quem estudou hoje")
self.show_online_only_checkbox = QCheckBox("Mostrar somente online")
filter_layout.addWidget(self.show_active_today_checkbox)
filter_layout.addWidget(self.show_online_only_checkbox)
filter_layout.addStretch()
self.user_table = QTableWidget()
# --- MUDANÇA: Mais colunas e novos cabeçalhos ---
self.user_table.setColumnCount(12)
self.user_table.setHorizontalHeaderLabels([
"Usuário", "Total Rev.", "Novos", "Aprend.", "A Revisar",
"De Novo", "Difícil", "Bom", "Fácil", "Retenção %",
"Tempo Total", "Tempo Médio"
])
self.user_table.setContextMenuPolicy(Qt.ContextMenuPolicy.CustomContextMenu)
self.user_table.setEditTriggers(QTableWidget.EditTrigger.NoEditTriggers)
self.user_table.setSelectionBehavior(QTableWidget.SelectionBehavior.SelectRows)
self.user_table.verticalHeader().setVisible(False)
self.user_table.horizontalHeader().setSectionResizeMode(QHeaderView.ResizeMode.Stretch)
self.user_table.horizontalHeader().setSectionResizeMode(0, QHeaderView.ResizeMode.Interactive)
self.user_table.setSortingEnabled(True)
users_layout.addLayout(filter_layout)
users_layout.addWidget(self.user_table)
self.simple_chat_widget = SimpleChatWidget(self)
self.tabs.addTab(users_widget, "Usuários")
self.tabs.addTab(self.simple_chat_widget, "Chat Geral")
self.chat_tab_index = 1
self.tabs.currentChanged.connect(self.on_tab_switched)
chat_layout.addWidget(self.tabs)
main_layout.addWidget(self.login_widget)
main_layout.addWidget(self.status_label)
main_layout.addWidget(self.main_chat_widget)
self.login_button.clicked.connect(self.handle_login)
self.register_button.clicked.connect(self.auth_manager.show_register_dialog)
self.logout_button.clicked.connect(self.logout)
self.user_table.customContextMenuRequested.connect(self.show_user_context_menu)
self.show_active_today_checkbox.stateChanged.connect(self._filters_changed)
self.show_online_only_checkbox.stateChanged.connect(self._filters_changed)
def _load_settings(self):
try:
if os.path.exists(self.settings_file):
with open(self.settings_file, 'r', encoding='utf-8') as f:
settings = json.load(f)
self.show_active_today_checkbox.setChecked(settings.get('show_active_today', False))
self.show_online_only_checkbox.setChecked(settings.get('show_online_only', False))
except Exception as e: print(f"AnkiChat: Erro ao carregar configurações: {e}")
def _save_settings(self):
settings = {
'show_active_today': self.show_active_today_checkbox.isChecked(),
'show_online_only': self.show_online_only_checkbox.isChecked()
}
try:
with open(self.settings_file, 'w', encoding='utf-8') as f: json.dump(settings, f, indent=4)
except Exception as e: print(f"AnkiChat: Erro ao salvar configurações: {e}")
def _filters_changed(self):
self._save_settings()
if self.last_master_list: self.update_user_table(self.last_master_list, self.last_online_stats)
def on_connection_success(self, *args):
self.login_widget.hide()
self.status_label.hide()
self.main_chat_widget.show()
self.setWindowTitle(f"AnkiChat - {background_updater.nickname}")
self.load_blocked_users()
self.simple_chat_widget.clear_chat()
if not hasattr(self, '_polling_thread') or not self._polling_thread.is_alive():
self._polling_thread = threading.Thread(target=self.poll_for_updates, daemon=True)
self._polling_thread.start()
def poll_for_updates(self):
while background_updater.is_connected:
try:
uid, token = background_updater.uid, background_updater.id_token
master_user_list = background_updater.firebase.get_data("users", token) or {}
online_stats = background_updater.firebase.get_data("online", token) or {}
self.user_update_received.emit(master_user_list, online_stats)
messages = background_updater.firebase.get_data("messages", token) or {}
self.message_update_received.emit(messages)
self._poll_deck_invites(uid, token)
self._poll_view_requests(uid, token)
self._check_viewer_status(uid, token)
except Exception as e: print(f"AnkiChat poll error: {e}")
time.sleep(2.5)
# --- MUDANÇA: Lógica de preenchimento da tabela atualizada ---
def update_user_table(self, master_user_list, online_stats):
self.last_master_list = master_user_list
self.last_online_stats = online_stats
show_active_today = self.show_active_today_checkbox.isChecked()
show_online_only = self.show_online_only_checkbox.isChecked()
self.user_table.setSortingEnabled(False)
self.user_table.setRowCount(0)
for uid, user_data in master_user_list.items():
nick = user_data.get('nickname')
if not nick: continue
stats_data = online_stats.get(uid, {})
state = stats_data.get('state', 'offline')
reviews_today = stats_data.get('reviews_today', 0)
if show_online_only and state != 'online': continue
if show_active_today and reviews_today == 0: continue
row = self.user_table.rowCount()
self.user_table.insertRow(row)
nick_item = QTableWidgetItem(nick)
nick_item.setData(Qt.ItemDataRole.UserRole, uid)
if uid == background_updater.uid: nick_item.setForeground(QColor("green"))
elif state == 'offline': nick_item.setForeground(QColor("gray"))
if uid in self.blocked_users: nick_item.setForeground(QColor("red"))
self.user_table.setItem(row, 0, nick_item)
# Preencher colunas existentes
new_done = stats_data.get('new_done', 0)
learn_done = stats_data.get('learn_done', 0)
review_done = stats_data.get('review_done', 0)
total_time_ms = stats_data.get('total_time_ms', 0)
avg_time_s = self._format_avg_time(total_time_ms, reviews_today)
# --- NOVO: Puxar os novos dados ---
again_count = stats_data.get('again_count', 0)
hard_count = stats_data.get('hard_count', 0)
good_count = stats_data.get('good_count', 0)
easy_count = stats_data.get('easy_count', 0)
retention = stats_data.get('retention', 0)
def create_numeric_item(value, display_text):
item = NumericTableWidgetItem(display_text)
item.setData(Qt.ItemDataRole.UserRole, value)
return item
# Colunas de 1 a 4 (dados de revisão)
self.user_table.setItem(row, 1, create_numeric_item(reviews_today, str(reviews_today)))
self.user_table.setItem(row, 2, create_numeric_item(new_done, str(new_done)))
self.user_table.setItem(row, 3, create_numeric_item(learn_done, str(learn_done)))
self.user_table.setItem(row, 4, create_numeric_item(review_done, str(review_done)))
# --- NOVO: Preencher as colunas de botões (5 a 8) ---
self.user_table.setItem(row, 5, create_numeric_item(again_count, str(again_count)))
self.user_table.setItem(row, 6, create_numeric_item(hard_count, str(hard_count)))
self.user_table.setItem(row, 7, create_numeric_item(good_count, str(good_count)))
self.user_table.setItem(row, 8, create_numeric_item(easy_count, str(easy_count)))
# --- NOVO: Preencher a coluna de retenção (9) ---
retention_str = f"{retention * 100:.1f}%" if reviews_today > 0 else "-"
self.user_table.setItem(row, 9, create_numeric_item(retention, retention_str))
# Colunas de tempo (agora 10 e 11)
self.user_table.setItem(row, 10, create_numeric_item(total_time_ms, self._format_time(total_time_ms)))
self.user_table.setItem(row, 11, create_numeric_item(avg_time_s, f"{avg_time_s:.1f}s/card"))
self.user_table.setSortingEnabled(True)
# --- Nenhuma mudança no restante do arquivo ---
def on_connection_failure(self, err_msg):
tooltip(err_msg)
self.status_label.hide()
self.login_widget.show()
def logout(self):
if background_updater.is_connected: background_updater.firebase.patch_data(f"online/{background_updater.uid}", {"state": "offline"}, background_updater.id_token)
background_updater.clear_state()
autologin_file = os.path.join(self.addon_path, 'autologin.json')
if os.path.exists(autologin_file): os.remove(autologin_file)
self.simple_chat_widget.clear_chat()
self.main_chat_widget.hide()
self.login_widget.show()
self.setWindowTitle("AnkiChat")
def handle_new_chat_message_notification(self):
if self.tabs.currentIndex() != self.chat_tab_index: self.tabs.tabBar().setTabTextColor(self.chat_tab_index, QColor("orange"))
def on_tab_switched(self, index):
if index == self.chat_tab_index: self.tabs.tabBar().setTabTextColor(self.chat_tab_index, QColor())
def _format_time(self, ms):
if ms is None: return "-"
seconds = ms / 1000
if seconds < 60: return f"{seconds:.1f}s"
return f"{seconds / 60:.1f} min"
def _format_avg_time(self, total_ms, count):
if not count or not total_ms: return 0
return (total_ms / count) / 1000
def show_user_context_menu(self, pos):
item = self.user_table.itemAt(pos)
if not item: return
nick = self.user_table.item(item.row(), 0).text()
uid = self.user_table.item(item.row(), 0).data(Qt.ItemDataRole.UserRole)
menu = QMenu()
if nick == background_updater.nickname:
menu.addAction("Mudar Senha").triggered.connect(self.auth_manager.show_change_password_dialog)
menu.addAction("Desbloquear Usuário...").triggered.connect(self.unblock_user_dialog)
if self.is_being_watched: menu.addAction("Parar Transmissão").triggered.connect(self._stop_broadcast)
else:
menu.addAction("Enviar Deck").triggered.connect(lambda: self.send_deck_invite(nick))
menu.addAction("Assistir Revisão").triggered.connect(lambda: self.send_view_request(nick))
menu.addSeparator()
if uid in self.blocked_users: menu.addAction("Desbloquear Usuário").triggered.connect(lambda: self.unblock_user(uid, nick))
else: menu.addAction("Bloquear Usuário").triggered.connect(lambda: self.block_user(uid, nick))
menu.exec(self.user_table.mapToGlobal(pos))
def send_deck_invite(self, nick):
decks = sorted(mw.col.decks.all_names())
if not decks:
tooltip("Você não tem decks para enviar.")
return
deck_name, ok = QInputDialog.getItem(self, "Enviar Deck", f"Escolha o deck para enviar para {nick}:", decks, 0, False)
if ok and deck_name: threading.Thread(target=self._send_deck_invitation, args=(nick, deck_name), daemon=True).start()
def _send_deck_invitation(self, target_nick, deck_name):
target_uid = background_updater.firebase.get_data(f"nick_to_uid/{target_nick}", background_updater.id_token)
if not target_uid:
self.show_tooltip_signal.emit(f"Usuário {target_nick} não encontrado.")
return
if background_updater.firebase.get_data(f"users/{target_uid}/blocked/{background_updater.uid}", background_updater.id_token):
self.show_tooltip_signal.emit(f"Você não pode interagir com {target_nick}.")
return
invite_data = {"sender_uid": background_updater.uid, "sender_nick": background_updater.nickname, "deck_name": deck_name, "status": "pending"}
res = background_updater.firebase.post_data(f"deck_invites/{target_uid}", invite_data, background_updater.id_token, return_name=True)
if res:
invite_id = res['name']
background_updater.firebase.put_data(f"sent_invites/{background_updater.uid}/{invite_id}", {"recipient_uid": target_uid}, background_updater.id_token)
self.show_tooltip_signal.emit(f"Convite para o deck '{deck_name}' enviado para {target_nick}.")
def handle_deck_invite(self, invite_id, invite_data):
sender, deck = invite_data.get("sender_nick", "Alguém"), invite_data.get("deck_name", "um deck")
dialog = QDialog(self)
dialog.setWindowTitle("Convite de Deck")
layout = QVBoxLayout(dialog)
layout.addWidget(QLabel(f"<b>{sender}</b> quer te enviar o deck <b>'{deck}'</b>.<br>Aceitar?"))
buttons = QDialogButtonBox(QDialogButtonBox.StandardButton.Yes | QDialogButtonBox.StandardButton.No)
buttons.accepted.connect(dialog.accept)
buttons.rejected.connect(dialog.reject)
layout.addWidget(buttons)
if dialog.exec():
background_updater.firebase.patch_data(f"deck_invites/{background_updater.uid}/{invite_id}", {"status": "accepted"}, background_updater.id_token)
self.show_tooltip_signal.emit("Convite aceito. Aguardando transferência...")
else:
background_updater.firebase.delete_data(f"deck_invites/{background_updater.uid}/{invite_id}", background_updater.id_token)
self.show_tooltip_signal.emit("Convite recusado.")
def handle_accepted_invite(self, recipient_uid, invite_id, invite_data):
self.show_tooltip_signal.emit(f"{invite_data.get('sender_nick')} aceitou. Iniciando envio...")
threading.Thread(target=self._process_deck_upload_p2p, args=(recipient_uid, invite_id, invite_data), daemon=True).start()
def _process_deck_upload_p2p(self, recipient_uid, invite_id, invite_data):
try:
transfer_id = str(uuid.uuid4())
background_updater.firebase.patch_data(f"deck_invites/{recipient_uid}/{invite_id}", {"transfer_id": transfer_id, "status": "transferring"}, background_updater.id_token)
deck_id = mw.col.decks.id(invite_data['deck_name'])
with tempfile.NamedTemporaryFile(suffix=".apkg", delete=False) as tmp: export_path = tmp.name
from anki.exporting import AnkiPackageExporter
exporter = AnkiPackageExporter(mw.col)
exporter.did = deck_id
exporter.exportInto(export_path)
with open(export_path, 'rb') as f:
while True:
chunk = f.read(1024 * 256)
if not chunk: break
requests.post(f"{self.RELAY_SERVER_URL}/upload/{transfer_id}", data=chunk, timeout=20)
requests.post(f"{self.RELAY_SERVER_URL}/end/{transfer_id}", timeout=10)
self.show_tooltip_signal.emit("Deck enviado com sucesso.")
except Exception as e: self.show_tooltip_signal.emit(f"Erro no envio: {e}")
finally:
if 'export_path' in locals() and os.path.exists(export_path): os.remove(export_path)
def handle_ready_for_download(self, invite_id, invite_data):
self.show_tooltip_signal.emit("Deck pronto para baixar. Iniciando download...")
threading.Thread(target=self._process_deck_download_p2p, args=(invite_id, invite_data), daemon=True).start()
def _process_deck_download_p2p(self, invite_id, invite_data):
try:
transfer_id = invite_data['transfer_id']
with tempfile.NamedTemporaryFile(suffix=".apkg", delete=False) as tmp: save_path = tmp.name
with open(save_path, "wb") as f:
while True:
response = requests.get(f"{self.RELAY_SERVER_URL}/download/{transfer_id}", timeout=20)
if response.status_code == 200:
if response.content == b'--EOF--': break
f.write(response.content)
elif response.status_code == 204: time.sleep(1)
else: raise Exception(f"Erro no servidor: {response.status_code}")
from aqt.importing import importFile
mw.taskman.run_on_main(lambda: importFile(mw, save_path))
self.show_tooltip_signal.emit(f"Deck '{invite_data['deck_name']}' importado!")
background_updater.firebase.delete_data(f"deck_invites/{background_updater.uid}/{invite_id}", background_updater.id_token)
except Exception as e: self.show_tooltip_signal.emit(f"Erro no download: {e}")
def send_view_request(self, nick): threading.Thread(target=self._send_view_request_thread, args=(nick,), daemon=True).start()
def _send_view_request_thread(self, target_nick):
target_uid = background_updater.firebase.get_data(f"nick_to_uid/{target_nick}", background_updater.id_token)
if not target_uid:
self.show_tooltip_signal.emit(f"Usuário {target_nick} não encontrado.")
return
if background_updater.firebase.get_data(f"users/{target_uid}/blocked/{background_updater.uid}", background_updater.id_token):
self.show_tooltip_signal.emit(f"Você não pode interagir com {target_nick}.")
return
req_data = {"requester_uid": background_updater.uid, "requester_nick": background_updater.nickname, "status": "pending"}
res = background_updater.firebase.post_data(f"view_requests/{target_uid}", req_data, background_updater.id_token, return_name=True)
if res:
req_id = res['name']
background_updater.firebase.put_data(f"sent_view_requests/{background_updater.uid}/{req_id}", {"target_uid": target_uid}, background_updater.id_token)
self.show_tooltip_signal.emit(f"Pedido de visualização enviado para {target_nick}.")
def handle_view_request(self, request_id, request_data):
requester_nick, requester_uid = request_data.get("requester_nick", "Alguém"), request_data.get("requester_uid")
dialog = QDialog(self)
dialog.setWindowTitle("Pedido de Visualização")
layout = QVBoxLayout(dialog)
layout.addWidget(QLabel(f"<b>{requester_nick}</b> quer assistir à sua revisão.<br>Aceitar?"))
buttons = QDialogButtonBox(QDialogButtonBox.StandardButton.Yes | QDialogButtonBox.StandardButton.No)
buttons.accepted.connect(dialog.accept)
buttons.rejected.connect(dialog.reject)
layout.addWidget(buttons)
if dialog.exec():
background_updater.firebase.patch_data(f"view_requests/{background_updater.uid}/{request_id}", {"status": "accepted"}, background_updater.id_token)
self.viewers[requester_uid] = requester_nick
self._update_viewers_display()
self._start_broadcast()
else:
background_updater.firebase.delete_data(f"view_requests/{background_updater.uid}/{request_id}", background_updater.id_token)
self.show_tooltip_signal.emit("Pedido de visualização recusado.")
def handle_accepted_view_request(self, target_uid, request_id, request_data):
target_nick = request_data.get("requester_nick", "Usuário")
if self.active_live_view_window and self.active_live_view_window.isVisible(): return
self.active_live_view_window = LiveViewWindow(self, target_uid, target_nick, request_id)
self.active_live_view_window.show()
def _start_broadcast(self):
if self.is_being_watched: return
self.is_being_watched = True
self.viewers_label.show()
gui_hooks.reviewer_did_show_question.append(self._broadcast_card_state)
gui_hooks.reviewer_did_show_answer.append(self._broadcast_card_state)
self.show_tooltip_signal.emit("Você está transmitindo sua revisão!")
def _stop_broadcast(self):
if not self.is_being_watched: return
self.is_being_watched = False
self.viewers_label.hide()
self.viewers = {}
try:
gui_hooks.reviewer_did_show_question.remove(self._broadcast_card_state)
gui_hooks.reviewer_did_show_answer.remove(self._broadcast_card_state)
except ValueError: pass
background_updater.firebase.delete_data(f"live_view/{background_updater.uid}", background_updater.id_token)
self.show_tooltip_signal.emit("Transmissão encerrada.")
def _prepare_html_for_viewing(self, html: str, card, question_side: bool) -> str:
if not mw or not mw.col or not mw.col.media: return html
processed_html, media_dir = html, mw.col.media.dir()
for img_filename in re.findall(r'<img src="([^"]+)"[^>]*>', processed_html):
img_path = os.path.join(media_dir, img_filename)
if os.path.exists(img_path):
try:
mime_type, _ = mimetypes.guess_type(img_path)
if not mime_type: mime_type = "application/octet-stream"
with open(img_path, "rb") as image_file: encoded_string = base64.b64encode(image_file.read()).decode('utf-8')
data_uri = f"data:{mime_type};base64,{encoded_string}"
processed_html = processed_html.replace(f'src="{img_filename}"', f'src="{data_uri}"', 1)
except Exception as e: print(f"AnkiChat: Não foi possível embutir a imagem {img_filename}: {e}")
av_tags = card.question_av_tags() if question_side else card.answer_av_tags()
for i, av_tag in enumerate(av_tags):
av_path = os.path.join(media_dir, av_tag.filename)
if os.path.exists(av_path):
try:
mime_type, _ = mimetypes.guess_type(av_path)
if not mime_type: mime_type = "application/octet-stream"
with open(av_path, "rb") as audio_file: encoded_string = base64.b64encode(audio_file.read()).decode('utf-8')
data_uri = f"data:{mime_type};base64,{encoded_string}"
side_char, anki_play_tag = 'q' if question_side else 'a', f"[anki:play:{'q' if question_side else 'a'}:{i}]"
audio_html_tag = f'<audio controls src="{data_uri}"></audio>'
processed_html = processed_html.replace(anki_play_tag, audio_html_tag)
except Exception as e: print(f"AnkiChat: Não foi possível embutir o áudio {av_tag.filename}: {e}")
return processed_html
def _broadcast_card_state(self, card):
if not self.is_being_watched: return
is_answer = mw.state == "review" and mw.reviewer.state == "answer"
q_html = self._prepare_html_for_viewing(card.q(), card, question_side=True)
a_html = self._prepare_html_for_viewing(card.a(), card, question_side=False) if is_answer else ""
data = {"q": q_html, "a": a_html, "state": "answer" if is_answer else "question"}
background_updater.firebase.put_data(f"live_view/{background_updater.uid}", data, background_updater.id_token)
def load_blocked_users(self):
blocked = background_updater.firebase.get_data(f"users/{background_updater.uid}/blocked", background_updater.id_token)
if blocked: self.blocked_users = set(blocked.keys())
def block_user(self, target_uid, target_nick):
background_updater.firebase.put_data(f"users/{background_updater.uid}/blocked/{target_uid}", True, background_updater.id_token)
self.blocked_users.add(target_uid)
self.show_tooltip_signal.emit(f"{target_nick} foi bloqueado.")
self._filters_changed()
def unblock_user(self, target_uid, target_nick):
background_updater.firebase.delete_data(f"users/{background_updater.uid}/blocked/{target_uid}", background_updater.id_token)
self.blocked_users.discard(target_uid)
self.show_tooltip_signal.emit(f"{target_nick} foi desbloqueado.")
self._filters_changed()
def unblock_user_dialog(self):
nick, ok = QInputDialog.getText(self, "Desbloquear Usuário", "Digite o nick do usuário a ser desbloqueado:")
if ok and nick:
target_uid = background_updater.firebase.get_data(f"nick_to_uid/{nick}", background_updater.id_token)
if target_uid and target_uid in self.blocked_users: self.unblock_user(target_uid, nick)
else: self.show_tooltip_signal.emit(f"Usuário '{nick}' não encontrado ou não está bloqueado.")
def _update_viewers_display(self):
if not self.viewers: self.viewers_label.setText("Assistindo você: Ninguém")
else: self.viewers_label.setText(f"Assistindo você: {', '.join(self.viewers.values())}")
def _check_viewer_status(self, uid, token):
if not self.is_being_watched: return
viewers_to_remove = []
for viewer_uid in self.viewers:
requests = background_updater.firebase.get_data(f"view_requests/{uid}", token, params=f'orderBy="requester_uid"&equalTo="{viewer_uid}"')
if requests:
is_still_watching = any(req_data.get("status") == "accepted" for req_data in requests.values())
if not is_still_watching: viewers_to_remove.append(viewer_uid)
if viewers_to_remove:
for viewer_uid in viewers_to_remove: del self.viewers[viewer_uid]
self._update_viewers_display()
def closeEvent(self, event):
self.hide()
event.ignore()
window_instance = None
def launch_window():
global window_instance
if window_instance is None: window_instance = ChatWindow(mw)
window_instance.show()
window_instance.activateWindow()
def on_review_for_background_update(reviewer, card, ease):
if background_updater.is_connected: background_updater.trigger_immediate_update()
def attempt_background_autologin():
addon_path = os.path.dirname(os.path.abspath(__file__))
autologin_file = os.path.join(addon_path, 'autologin.json')
if not os.path.exists(autologin_file): return
try:
with open(autologin_file, 'r') as f: data = json.load(f)
def _bg_login():
res, err = background_updater.firebase.refresh_token(data['refreshToken'])
if res:
background_updater.update_state(data['email'], data['uid'], res['id_token'], res['refresh_token'], res.get('expires_in', '3600'))
print(f"AnkiChat: Login automático em segundo plano bem-sucedido para {data['email']}")
else: print(f"AnkiChat: Falha no login automático em segundo plano: {err}")
threading.Thread(target=_bg_login, daemon=True).start()
except Exception as e: print(f"AnkiChat: Erro ao ler arquivo de autologin: {e}")
def init_addon():
FIREBASE_URL = "https://ankichatapp-default-rtdb.firebaseio.com/"
API_KEY = "AIzaSyDNSI7R6GX9B5PCGIwvAoKM_5uen7BU_C0"
firebase_api = FirebaseAPI(FIREBASE_URL, API_KEY)
background_updater.initialize(firebase_api)
action = QAction("AnkiChat", mw)
action.triggered.connect(launch_window)
mw.form.menubar.addAction(action)
attempt_background_autologin()
gui_hooks.reviewer_did_answer_card.append(on_review_for_background_update)
def cleanup_on_exit():
if background_updater.is_connected:
background_updater.firebase.patch_data(f"online/{background_updater.uid}", {"state": "offline"}, background_updater.id_token)
gui_hooks.main_window_did_init.append(init_addon)
gui_hooks.profile_will_close.append(cleanup_on_exit)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment