Skip to content

Instantly share code, notes, and snippets.

@Hammer2900
Created December 13, 2025 12:00
Show Gist options
  • Select an option

  • Save Hammer2900/180165d884767228e3e744034d1366ed to your computer and use it in GitHub Desktop.

Select an option

Save Hammer2900/180165d884767228e3e744034d1366ed to your computer and use it in GitHub Desktop.
save xonsh history to index
import sys
import os
# Добавляем путь к папке с расширениями (если еще нет)
ext_dir = os.path.expanduser('~/.xonsh')
if ext_dir not in sys.path:
sys.path.append(ext_dir)
# Импортируем наш класс
from history_search import SearchEngineHistory
# Указываем xonsh использовать наш бэкенд
$XONSH_HISTORY_BACKEND = SearchEngineHistory
# Функция для поиска
def _hsearch(args):
if not args:
print("Usage: hsearch <query>")
return
query = " ".join(args)
# Получаем доступ к текущему объекту истории xonsh
hist = __xonsh__.history
# Проверяем, что это наш кастомный движок
if hasattr(hist, 'search'):
print(f"Searching for: {query}...")
results = hist.search(query, limit=5)
if not results:
print("No matches found.")
# Красивый вывод (последние сверху)
for i, doc in enumerate(results):
print(f"{i+1}. {doc.get('inp', '')}")
else:
print("Error: Current history backend does not support search.")
# Регистрируем алиас 'hs' или 'hsearch'
aliases['hsearch'] = _hsearch
aliases['hs'] = _hsearch
import datetime
import json
import os
import sys
from collections import Counter
from typing import List, Dict, Any
import zlib
from PyQt6.QtCore import Qt, QAbstractTableModel, QModelIndex, QSortFilterProxyModel
from PyQt6.QtGui import QAction
from PyQt6.QtWidgets import (
QApplication,
QMainWindow,
QWidget,
QVBoxLayout,
QHBoxLayout,
QTableView,
QLineEdit,
QLabel,
QGroupBox,
QHeaderView,
QToolBar,
QStatusBar,
QSplitter,
)
class HistoryReader:
def __init__(self, db_path: str):
self.db_path = os.path.expanduser(db_path)
self.documents: List[Dict[str, Any]] = []
def load_data(self) -> List[Dict[str, Any]]:
if not os.path.exists(self.db_path):
return []
all_docs = []
try:
segments = [f for f in os.listdir(self.db_path) if f.startswith('seg_')]
except OSError:
return []
for seg_name in segments:
seg_path = os.path.join(self.db_path, seg_name)
docs = self._read_segment(seg_path)
all_docs.extend(docs)
all_docs.sort(key=lambda x: x.get('id', 0), reverse=True)
self.documents = all_docs
return all_docs
def _read_segment(self, seg_path: str) -> List[Dict]:
idx_path = os.path.join(seg_path, 'doc_idx.json')
bin_path = os.path.join(seg_path, 'docs.bin')
if not os.path.exists(idx_path) or not os.path.exists(bin_path):
return []
results = []
try:
with open(idx_path, 'r') as f:
doc_index = json.load(f)
if os.path.getsize(bin_path) > 0:
with open(bin_path, 'rb') as f_bin:
for doc_id, meta in doc_index.items():
offset, length, _ = meta
f_bin.seek(offset)
compressed_data = f_bin.read(length)
try:
json_bytes = zlib.decompress(compressed_data)
doc = json.loads(json_bytes.decode('utf-8'))
if 'id' not in doc:
doc['id'] = int(doc_id)
results.append(doc)
except Exception:
continue
except Exception as e:
print(f'Error reading segment {seg_path}: {e}')
return []
return results
def get_stats(self) -> Dict[str, Any]:
if not self.documents:
return {'total': 0, 'unique': 0, 'top_cmd': '-'}
total = len(self.documents)
cmds = [d.get('inp', '') for d in self.documents]
unique = len(set(cmds))
if cmds:
top_cmd = Counter(cmds).most_common(1)[0]
top_cmd_str = f'{top_cmd[0]} ({top_cmd[1]})'
else:
top_cmd_str = '-'
return {'total': total, 'unique': unique, 'top_cmd': top_cmd_str}
class HistoryTableModel(QAbstractTableModel):
def __init__(self, data=None):
super().__init__()
self._data = data or []
self._headers = ['Time', 'Command', 'Return', 'Session ID']
def rowCount(self, parent=QModelIndex()):
return len(self._data)
def columnCount(self, parent=QModelIndex()):
return len(self._headers)
def data(self, index, role=Qt.ItemDataRole.DisplayRole):
if not index.isValid():
return None
row, col = index.row(), index.column()
item = self._data[row]
if role == Qt.ItemDataRole.DisplayRole:
if col == 0:
ts = item.get('id', 0)
try:
return datetime.datetime.fromtimestamp(ts / 1e9).strftime('%Y-%m-%d %H:%M:%S')
except:
return str(ts)
elif col == 1:
return item.get('inp', '')
elif col == 2:
return str(item.get('rtn', ''))
elif col == 3:
return str(item.get('sessionid', ''))[:8] + '...'
elif role == Qt.ItemDataRole.TextAlignmentRole:
if col == 0 or col == 2:
return Qt.AlignmentFlag.AlignCenter
return Qt.AlignmentFlag.AlignLeft | Qt.AlignmentFlag.AlignVCenter
return None
def headerData(self, section, orientation, role=Qt.ItemDataRole.DisplayRole):
if role == Qt.ItemDataRole.DisplayRole and orientation == Qt.Orientation.Horizontal:
return self._headers[section]
return None
class FilterWidget(QWidget):
def __init__(self, parent=None):
super().__init__(parent)
layout = QHBoxLayout(self)
layout.setContentsMargins(0, 0, 0, 0)
self.lbl = QLabel('Filter Commands:')
self.search_input = QLineEdit()
self.search_input.setPlaceholderText('Type to search history...')
self.search_input.setClearButtonEnabled(True)
layout.addWidget(self.lbl)
layout.addWidget(self.search_input)
class StatsWidget(QGroupBox):
def __init__(self, parent=None):
super().__init__('Statistics', parent)
self.layout = QVBoxLayout(self)
self.lbl_total = QLabel('Total: 0')
self.lbl_unique = QLabel('Unique: 0')
self.lbl_top = QLabel('Top: -')
self.lbl_top.setWordWrap(True)
self.layout.addWidget(self.lbl_total)
self.layout.addWidget(self.lbl_unique)
self.layout.addWidget(QLabel('Most Frequent:'))
self.layout.addWidget(self.lbl_top)
self.layout.addStretch()
def update_stats(self, stats: Dict):
self.lbl_total.setText(f"Total Commands: {stats['total']}")
self.lbl_unique.setText(f"Unique Commands: {stats['unique']}")
self.lbl_top.setText(f"{stats['top_cmd']}")
class HistoryPanel(QGroupBox):
def __init__(self, parent=None):
super().__init__('Details', parent)
layout = QVBoxLayout(self)
self.info_label = QLabel('Select a command')
self.info_label.setAlignment(Qt.AlignmentFlag.AlignTop)
self.info_label.setWordWrap(True)
self.info_label.setTextInteractionFlags(Qt.TextInteractionFlag.TextSelectableByMouse)
layout.addWidget(self.info_label)
layout.addStretch()
def set_data(self, data: Dict):
if not data:
self.info_label.setText('No selection')
return
ts = data.get('id', 0)
try:
dt_str = datetime.datetime.fromtimestamp(ts / 1e9).strftime('%Y-%m-%d %H:%M:%S.%f')
except:
dt_str = str(ts)
text = (
f"<b>ID:</b> {data.get('id')}<br><b>Date:</b> {dt_str}<br>"
f"<b>Session:</b> {data.get('sessionid')}<br><b>Return:</b> {data.get('rtn')}<br>"
f"<hr><b>Command:</b><br><pre>{data.get('inp')}</pre>"
)
self.info_label.setText(text)
class MainWindow(QMainWindow):
def __init__(self):
super().__init__()
self.setWindowTitle('Xonsh History Viewer')
self.resize(1000, 600)
db_path = '~/.xonsh/history_search_db'
self.reader = HistoryReader(db_path)
self._init_ui()
self._load_data()
def _init_ui(self):
toolbar = QToolBar('Main')
self.addToolBar(toolbar)
refresh_action = QAction('Refresh', self)
refresh_action.triggered.connect(self._load_data)
toolbar.addAction(refresh_action)
central_widget = QWidget()
self.setCentralWidget(central_widget)
main_layout = QHBoxLayout(central_widget)
splitter = QSplitter(Qt.Orientation.Horizontal)
main_layout.addWidget(splitter)
left_widget = QWidget()
left_layout = QVBoxLayout(left_widget)
left_layout.setContentsMargins(0, 0, 0, 0)
self.filter_widget = FilterWidget()
self.filter_widget.search_input.textChanged.connect(self._on_search_changed)
self.table_view = QTableView()
self.table_view.setSelectionBehavior(QTableView.SelectionBehavior.SelectRows)
self.table_view.setAlternatingRowColors(True)
self.table_view.verticalHeader().setVisible(False)
self.table_view.clicked.connect(self._on_table_click)
self.proxy_model = QSortFilterProxyModel()
self.proxy_model.setFilterCaseSensitivity(Qt.CaseSensitivity.CaseInsensitive)
self.proxy_model.setFilterKeyColumn(1)
left_layout.addWidget(self.filter_widget)
left_layout.addWidget(self.table_view)
right_widget = QWidget()
right_layout = QVBoxLayout(right_widget)
right_layout.setContentsMargins(0, 0, 0, 0)
self.stats_widget = StatsWidget()
self.history_panel = HistoryPanel()
right_layout.addWidget(self.stats_widget)
right_layout.addWidget(self.history_panel, stretch=1)
splitter.addWidget(left_widget)
splitter.addWidget(right_widget)
splitter.setSizes([700, 300])
self.setStatusBar(QStatusBar())
def _load_data(self):
self.statusBar().showMessage('Loading data...')
data = self.reader.load_data()
self.model = HistoryTableModel(data)
self.proxy_model.setSourceModel(self.model)
self.table_view.setModel(self.proxy_model)
selection_model = self.table_view.selectionModel()
if selection_model:
selection_model.selectionChanged.connect(self._on_selection_changed)
header = self.table_view.horizontalHeader()
header.setSectionResizeMode(0, QHeaderView.ResizeMode.ResizeToContents)
header.setSectionResizeMode(1, QHeaderView.ResizeMode.Stretch)
header.setSectionResizeMode(2, QHeaderView.ResizeMode.ResizeToContents)
header.setSectionResizeMode(3, QHeaderView.ResizeMode.Fixed)
self.table_view.setColumnWidth(3, 80)
stats = self.reader.get_stats()
self.stats_widget.update_stats(stats)
self.statusBar().showMessage(f'Loaded {len(data)} records.')
def _on_search_changed(self, text):
self.proxy_model.setFilterFixedString(text)
def _on_table_click(self, index):
self._update_details(index)
def _on_selection_changed(self, selected, deselected):
indexes = self.table_view.selectionModel().selectedRows()
if indexes:
self._update_details(indexes[0])
def _update_details(self, proxy_index):
source_index = self.proxy_model.mapToSource(proxy_index)
if source_index.isValid():
row = source_index.row()
raw_data = self.model._data[row]
self.history_panel.set_data(raw_data)
if __name__ == '__main__':
app = QApplication(sys.argv)
app.setStyle('Fusion')
window = MainWindow()
window.show()
sys.exit(app.exec())
import os
import sys
import re
import json
import zlib
import mmap
import math
import struct
import shutil
import threading
import heapq
import time
import uuid
import collections
from collections import defaultdict, Counter
from typing import Dict, List, Tuple, Type, Optional, Set
# Импортируем базовый класс истории Xonsh
try:
from xonsh.history.base import History
except ImportError:
class History:
def __init__(self, **kwargs): pass
# --- Глобальный реестр ---
_REGISTRY: Dict[str, 'IndexEngine'] = {}
_REGISTRY_LOCK = threading.Lock()
# Формат: DocID (8 байт, unsigned long long), TF (4 байта, unsigned int)
POSTING_STRUCT = struct.Struct('<QI')
# --- Текстовый процессор ---
class TextProcessor:
ENDINGS = re.compile(r'(ыми|ых|ого|ому|ые|ый|ая|ой|ь|ы|и|а|е|у|ю|ом|ем|ам|ал|ил|ть|ing|ed|es|ly)$')
TOKEN_RE = re.compile(r'\w+') # Ловим любые слова, даже короткие (ls, cd)
@staticmethod
def stem(word: str) -> str:
if len(word) > 4:
return TextProcessor.ENDINGS.sub('', word)
return word
@staticmethod
def process(text: str) -> List[str]:
if not text: return []
raw_words = TextProcessor.TOKEN_RE.findall(text.lower())
return [TextProcessor.stem(w) for w in raw_words]
# --- BM25 (Алгоритм ранжирования) ---
class BM25:
def __init__(self, k1: float = 1.5, b: float = 0.75):
self.k1 = k1
self.b = b
def score(self, tf: int, doc_len: int, avg_dl: float, idf: float) -> float:
numerator = tf * (self.k1 + 1)
denominator = tf + self.k1 * (1 - self.b + self.b * (doc_len / avg_dl))
return idf * (numerator / denominator)
# --- Работа с диском ---
class DiskSegment:
def __init__(self, dir_path: str):
self.dir_path = dir_path
self.vocab: Dict[str, Tuple[int, int]] = {}
self.doc_index: Dict[int, Tuple[int, int, int]] = {}
self.files = {}
self.mm_postings = None
self.mm_docs = None
try:
p_path = os.path.join(dir_path, 'postings.bin')
d_path = os.path.join(dir_path, 'docs.bin')
self.files['postings'] = open(p_path, 'rb')
self.files['docs'] = open(d_path, 'rb')
# Защита от пустых файлов
if os.path.getsize(p_path) > 0:
self.mm_postings = mmap.mmap(self.files['postings'].fileno(), 0, access=mmap.ACCESS_READ)
if os.path.getsize(d_path) > 0:
self.mm_docs = mmap.mmap(self.files['docs'].fileno(), 0, access=mmap.ACCESS_READ)
except Exception:
self.close()
raise
self._load_vocab()
self._load_doc_index()
def _load_vocab(self):
vp = os.path.join(self.dir_path, 'vocab.json')
if os.path.exists(vp):
with open(vp, 'r', encoding='utf-8') as f:
self.vocab = json.load(f)
def _load_doc_index(self):
dp = os.path.join(self.dir_path, 'doc_idx.json')
if os.path.exists(dp):
with open(dp, 'r') as f:
raw = json.load(f)
self.doc_index = {int(k): tuple(v) for k, v in raw.items()}
def get_postings(self, term: str) -> List[Tuple[int, int]]:
if self.mm_postings is None or term not in self.vocab: return []
offset, length = self.vocab[term]
try:
raw_bytes = zlib.decompress(self.mm_postings[offset : offset + length])
results = []
last_doc_id = 0
for delta_id, tf in POSTING_STRUCT.iter_unpack(raw_bytes):
doc_id = last_doc_id + delta_id
results.append((doc_id, tf))
last_doc_id = doc_id
return results
except Exception: return []
def get_document(self, doc_id: int) -> Optional[Dict]:
if self.mm_docs is None or doc_id not in self.doc_index: return None
offset, length, _ = self.doc_index[doc_id]
try:
return json.loads(zlib.decompress(self.mm_docs[offset : offset + length]).decode('utf-8'))
except Exception: return None
def get_doc_len(self, doc_id: int) -> int:
return self.doc_index[doc_id][2] if doc_id in self.doc_index else 0
def close(self):
if self.mm_postings: self.mm_postings.close()
if self.mm_docs: self.mm_docs.close()
for f in self.files.values(): f.close()
class SegmentWriter:
@staticmethod
def write(base_dir: str, seg_id: str, inverted_index: Dict, docs: Dict, doc_lens: Dict):
seg_dir = os.path.join(base_dir, f'seg_{seg_id}')
os.makedirs(seg_dir, exist_ok=True)
vocab, doc_index = {}, {}
with open(os.path.join(seg_dir, 'postings.bin'), 'wb') as f_post:
curr = 0
for term in sorted(inverted_index.keys()):
# Восстанавливаем дельта-кодирование при записи
postings = sorted(inverted_index[term], key=lambda x: x[0])
buf = bytearray()
last = 0
for doc_id, tf in postings:
buf.extend(POSTING_STRUCT.pack(doc_id - last, tf))
last = doc_id
comp = zlib.compress(buf)
f_post.write(comp)
vocab[term] = (curr, len(comp))
curr += len(comp)
with open(os.path.join(seg_dir, 'docs.bin'), 'wb') as f_docs:
curr = 0
for doc_id, data in docs.items():
comp = zlib.compress(json.dumps(data).encode('utf-8'))
f_docs.write(comp)
doc_index[doc_id] = (curr, len(comp), doc_lens.get(doc_id, 0))
curr += len(comp)
with open(os.path.join(seg_dir, 'vocab.json'), 'w', encoding='utf-8') as f: json.dump(vocab, f)
with open(os.path.join(seg_dir, 'doc_idx.json'), 'w') as f: json.dump(doc_index, f)
return seg_dir
# --- Index Engine с ПОИСКОМ ---
class IndexEngine:
def __init__(self, name: str, path: str):
self.path = path
self.mem_docs = {}
self.mem_doc_lens = {}
self.mem_inverted = defaultdict(lambda: defaultdict(int))
self.stats = {'total_docs': 0, 'total_len': 0, 'doc_freqs': Counter()}
self.deleted_ids = set()
self.segments = []
self._lock = threading.RLock()
if self.path:
if not os.path.exists(self.path):
os.makedirs(self.path, exist_ok=True)
self._load_stats()
self._load_segments()
def _load_stats(self):
try:
with open(os.path.join(self.path, 'stats.json'), 'r') as f:
d = json.load(f)
self.stats.update(d)
self.stats['doc_freqs'] = Counter(d['doc_freqs'])
self.deleted_ids = set(d.get('deleted_ids', []))
except: pass
def _save_stats(self):
with open(os.path.join(self.path, 'stats.json'), 'w') as f:
json.dump({**self.stats, 'deleted_ids': list(self.deleted_ids)}, f)
def _load_segments(self):
for name in sorted(os.listdir(self.path)):
if name.startswith('seg_'):
try: self.segments.append(DiskSegment(os.path.join(self.path, name)))
except: pass
def add(self, doc: Dict):
doc_id = doc['id']
with self._lock:
self.mem_docs[doc_id] = doc
tokens = TextProcessor.process(doc.get('inp', ''))
self.mem_doc_lens[doc_id] = len(tokens)
for t in tokens:
self.mem_inverted[t][doc_id] += 1
self.stats['doc_freqs'][t] += 1
self.stats['total_docs'] += 1
self.stats['total_len'] += len(tokens)
def flush(self):
with self._lock:
if not self.mem_docs: return
inv = {t: list(d.items()) for t, d in self.mem_inverted.items()}
# Используем наносекунды для ID сегмента
path = SegmentWriter.write(self.path, str(time.time_ns()), inv, self.mem_docs, self.mem_doc_lens)
self.segments.append(DiskSegment(path))
self.mem_docs.clear(); self.mem_doc_lens.clear(); self.mem_inverted.clear()
self._save_stats()
# --- МЕТОД ПОИСКА ---
def search(self, query: str, limit: int = 10) -> List[Dict]:
tokens = TextProcessor.process(query)
if not tokens: return []
bm25 = BM25()
avg_dl = self.stats['total_len'] / max(1, self.stats['total_docs'])
scores = defaultdict(float)
# 1. Ищем в памяти
for term in tokens:
idf = math.log(1 + (self.stats['total_docs'] - self.stats['doc_freqs'][term] + 0.5) / (self.stats['doc_freqs'][term] + 0.5))
for doc_id, tf in self.mem_inverted[term].items():
scores[doc_id] += bm25.score(tf, self.mem_doc_lens[doc_id], avg_dl, idf)
# 2. Ищем на диске
for seg in self.segments:
for doc_id, tf in seg.get_postings(term):
scores[doc_id] += bm25.score(tf, seg.get_doc_len(doc_id), avg_dl, idf)
# Топ результатов
top_ids = heapq.nlargest(limit, scores.keys(), key=lambda k: scores[k])
results = []
for doc_id in top_ids:
if doc_id in self.mem_docs:
results.append(self.mem_docs[doc_id])
else:
for seg in reversed(self.segments):
doc = seg.get_document(doc_id)
if doc:
results.append(doc)
break
return results
# --- XONSH BACKEND ---
class SearchEngineHistory(History):
def __init__(self, **kwargs):
super().__init__(**kwargs)
self.sessionid = str(uuid.uuid4())
self.data_dir = os.path.expanduser('~/.xonsh/history_search_db')
with _REGISTRY_LOCK:
if 'xonsh_search' not in _REGISTRY:
_REGISTRY['xonsh_search'] = IndexEngine('xonsh_search', self.data_dir)
self.engine = _REGISTRY['xonsh_search']
def append(self, cmd):
# ID = текущее время в наносекундах
doc = cmd.copy()
doc['id'] = time.time_ns()
doc['sessionid'] = self.sessionid
doc.pop('out', None)
self.engine.add(doc)
try: self.engine.flush()
except Exception as e: print(f"History Err: {e}", file=sys.stderr)
def items(self, newest_first=False):
all_docs = list(self.engine.mem_docs.values())
for seg in self.engine.segments:
for doc_id in seg.doc_index:
d = seg.get_document(doc_id)
if d: all_docs.append(d)
all_docs.sort(key=lambda x: x['id'], reverse=newest_first)
yield from all_docs
def all_items(self, newest_first=False): yield from self.items(newest_first)
# ВОТ ОН, ИСПРАВЛЕННЫЙ INFO
def info(self):
data = collections.OrderedDict()
data['backend'] = 'custom_search_engine'
data['sessionid'] = self.sessionid
data['location'] = self.data_dir
data['docs_in_index'] = self.engine.stats['total_docs']
return data
# Метод для вызова из алиаса
def search(self, query, limit=10):
return self.engine.search(query, limit)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment