Created
December 13, 2025 12:00
-
-
Save Hammer2900/180165d884767228e3e744034d1366ed to your computer and use it in GitHub Desktop.
save xonsh history to index
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| import sys | |
| import os | |
| # Добавляем путь к папке с расширениями (если еще нет) | |
| ext_dir = os.path.expanduser('~/.xonsh') | |
| if ext_dir not in sys.path: | |
| sys.path.append(ext_dir) | |
| # Импортируем наш класс | |
| from history_search import SearchEngineHistory | |
| # Указываем xonsh использовать наш бэкенд | |
| $XONSH_HISTORY_BACKEND = SearchEngineHistory | |
| # Функция для поиска | |
| def _hsearch(args): | |
| if not args: | |
| print("Usage: hsearch <query>") | |
| return | |
| query = " ".join(args) | |
| # Получаем доступ к текущему объекту истории xonsh | |
| hist = __xonsh__.history | |
| # Проверяем, что это наш кастомный движок | |
| if hasattr(hist, 'search'): | |
| print(f"Searching for: {query}...") | |
| results = hist.search(query, limit=5) | |
| if not results: | |
| print("No matches found.") | |
| # Красивый вывод (последние сверху) | |
| for i, doc in enumerate(results): | |
| print(f"{i+1}. {doc.get('inp', '')}") | |
| else: | |
| print("Error: Current history backend does not support search.") | |
| # Регистрируем алиас 'hs' или 'hsearch' | |
| aliases['hsearch'] = _hsearch | |
| aliases['hs'] = _hsearch |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| import datetime | |
| import json | |
| import os | |
| import sys | |
| from collections import Counter | |
| from typing import List, Dict, Any | |
| import zlib | |
| from PyQt6.QtCore import Qt, QAbstractTableModel, QModelIndex, QSortFilterProxyModel | |
| from PyQt6.QtGui import QAction | |
| from PyQt6.QtWidgets import ( | |
| QApplication, | |
| QMainWindow, | |
| QWidget, | |
| QVBoxLayout, | |
| QHBoxLayout, | |
| QTableView, | |
| QLineEdit, | |
| QLabel, | |
| QGroupBox, | |
| QHeaderView, | |
| QToolBar, | |
| QStatusBar, | |
| QSplitter, | |
| ) | |
| class HistoryReader: | |
| def __init__(self, db_path: str): | |
| self.db_path = os.path.expanduser(db_path) | |
| self.documents: List[Dict[str, Any]] = [] | |
| def load_data(self) -> List[Dict[str, Any]]: | |
| if not os.path.exists(self.db_path): | |
| return [] | |
| all_docs = [] | |
| try: | |
| segments = [f for f in os.listdir(self.db_path) if f.startswith('seg_')] | |
| except OSError: | |
| return [] | |
| for seg_name in segments: | |
| seg_path = os.path.join(self.db_path, seg_name) | |
| docs = self._read_segment(seg_path) | |
| all_docs.extend(docs) | |
| all_docs.sort(key=lambda x: x.get('id', 0), reverse=True) | |
| self.documents = all_docs | |
| return all_docs | |
| def _read_segment(self, seg_path: str) -> List[Dict]: | |
| idx_path = os.path.join(seg_path, 'doc_idx.json') | |
| bin_path = os.path.join(seg_path, 'docs.bin') | |
| if not os.path.exists(idx_path) or not os.path.exists(bin_path): | |
| return [] | |
| results = [] | |
| try: | |
| with open(idx_path, 'r') as f: | |
| doc_index = json.load(f) | |
| if os.path.getsize(bin_path) > 0: | |
| with open(bin_path, 'rb') as f_bin: | |
| for doc_id, meta in doc_index.items(): | |
| offset, length, _ = meta | |
| f_bin.seek(offset) | |
| compressed_data = f_bin.read(length) | |
| try: | |
| json_bytes = zlib.decompress(compressed_data) | |
| doc = json.loads(json_bytes.decode('utf-8')) | |
| if 'id' not in doc: | |
| doc['id'] = int(doc_id) | |
| results.append(doc) | |
| except Exception: | |
| continue | |
| except Exception as e: | |
| print(f'Error reading segment {seg_path}: {e}') | |
| return [] | |
| return results | |
| def get_stats(self) -> Dict[str, Any]: | |
| if not self.documents: | |
| return {'total': 0, 'unique': 0, 'top_cmd': '-'} | |
| total = len(self.documents) | |
| cmds = [d.get('inp', '') for d in self.documents] | |
| unique = len(set(cmds)) | |
| if cmds: | |
| top_cmd = Counter(cmds).most_common(1)[0] | |
| top_cmd_str = f'{top_cmd[0]} ({top_cmd[1]})' | |
| else: | |
| top_cmd_str = '-' | |
| return {'total': total, 'unique': unique, 'top_cmd': top_cmd_str} | |
| class HistoryTableModel(QAbstractTableModel): | |
| def __init__(self, data=None): | |
| super().__init__() | |
| self._data = data or [] | |
| self._headers = ['Time', 'Command', 'Return', 'Session ID'] | |
| def rowCount(self, parent=QModelIndex()): | |
| return len(self._data) | |
| def columnCount(self, parent=QModelIndex()): | |
| return len(self._headers) | |
| def data(self, index, role=Qt.ItemDataRole.DisplayRole): | |
| if not index.isValid(): | |
| return None | |
| row, col = index.row(), index.column() | |
| item = self._data[row] | |
| if role == Qt.ItemDataRole.DisplayRole: | |
| if col == 0: | |
| ts = item.get('id', 0) | |
| try: | |
| return datetime.datetime.fromtimestamp(ts / 1e9).strftime('%Y-%m-%d %H:%M:%S') | |
| except: | |
| return str(ts) | |
| elif col == 1: | |
| return item.get('inp', '') | |
| elif col == 2: | |
| return str(item.get('rtn', '')) | |
| elif col == 3: | |
| return str(item.get('sessionid', ''))[:8] + '...' | |
| elif role == Qt.ItemDataRole.TextAlignmentRole: | |
| if col == 0 or col == 2: | |
| return Qt.AlignmentFlag.AlignCenter | |
| return Qt.AlignmentFlag.AlignLeft | Qt.AlignmentFlag.AlignVCenter | |
| return None | |
| def headerData(self, section, orientation, role=Qt.ItemDataRole.DisplayRole): | |
| if role == Qt.ItemDataRole.DisplayRole and orientation == Qt.Orientation.Horizontal: | |
| return self._headers[section] | |
| return None | |
| class FilterWidget(QWidget): | |
| def __init__(self, parent=None): | |
| super().__init__(parent) | |
| layout = QHBoxLayout(self) | |
| layout.setContentsMargins(0, 0, 0, 0) | |
| self.lbl = QLabel('Filter Commands:') | |
| self.search_input = QLineEdit() | |
| self.search_input.setPlaceholderText('Type to search history...') | |
| self.search_input.setClearButtonEnabled(True) | |
| layout.addWidget(self.lbl) | |
| layout.addWidget(self.search_input) | |
| class StatsWidget(QGroupBox): | |
| def __init__(self, parent=None): | |
| super().__init__('Statistics', parent) | |
| self.layout = QVBoxLayout(self) | |
| self.lbl_total = QLabel('Total: 0') | |
| self.lbl_unique = QLabel('Unique: 0') | |
| self.lbl_top = QLabel('Top: -') | |
| self.lbl_top.setWordWrap(True) | |
| self.layout.addWidget(self.lbl_total) | |
| self.layout.addWidget(self.lbl_unique) | |
| self.layout.addWidget(QLabel('Most Frequent:')) | |
| self.layout.addWidget(self.lbl_top) | |
| self.layout.addStretch() | |
| def update_stats(self, stats: Dict): | |
| self.lbl_total.setText(f"Total Commands: {stats['total']}") | |
| self.lbl_unique.setText(f"Unique Commands: {stats['unique']}") | |
| self.lbl_top.setText(f"{stats['top_cmd']}") | |
| class HistoryPanel(QGroupBox): | |
| def __init__(self, parent=None): | |
| super().__init__('Details', parent) | |
| layout = QVBoxLayout(self) | |
| self.info_label = QLabel('Select a command') | |
| self.info_label.setAlignment(Qt.AlignmentFlag.AlignTop) | |
| self.info_label.setWordWrap(True) | |
| self.info_label.setTextInteractionFlags(Qt.TextInteractionFlag.TextSelectableByMouse) | |
| layout.addWidget(self.info_label) | |
| layout.addStretch() | |
| def set_data(self, data: Dict): | |
| if not data: | |
| self.info_label.setText('No selection') | |
| return | |
| ts = data.get('id', 0) | |
| try: | |
| dt_str = datetime.datetime.fromtimestamp(ts / 1e9).strftime('%Y-%m-%d %H:%M:%S.%f') | |
| except: | |
| dt_str = str(ts) | |
| text = ( | |
| f"<b>ID:</b> {data.get('id')}<br><b>Date:</b> {dt_str}<br>" | |
| f"<b>Session:</b> {data.get('sessionid')}<br><b>Return:</b> {data.get('rtn')}<br>" | |
| f"<hr><b>Command:</b><br><pre>{data.get('inp')}</pre>" | |
| ) | |
| self.info_label.setText(text) | |
| class MainWindow(QMainWindow): | |
| def __init__(self): | |
| super().__init__() | |
| self.setWindowTitle('Xonsh History Viewer') | |
| self.resize(1000, 600) | |
| db_path = '~/.xonsh/history_search_db' | |
| self.reader = HistoryReader(db_path) | |
| self._init_ui() | |
| self._load_data() | |
| def _init_ui(self): | |
| toolbar = QToolBar('Main') | |
| self.addToolBar(toolbar) | |
| refresh_action = QAction('Refresh', self) | |
| refresh_action.triggered.connect(self._load_data) | |
| toolbar.addAction(refresh_action) | |
| central_widget = QWidget() | |
| self.setCentralWidget(central_widget) | |
| main_layout = QHBoxLayout(central_widget) | |
| splitter = QSplitter(Qt.Orientation.Horizontal) | |
| main_layout.addWidget(splitter) | |
| left_widget = QWidget() | |
| left_layout = QVBoxLayout(left_widget) | |
| left_layout.setContentsMargins(0, 0, 0, 0) | |
| self.filter_widget = FilterWidget() | |
| self.filter_widget.search_input.textChanged.connect(self._on_search_changed) | |
| self.table_view = QTableView() | |
| self.table_view.setSelectionBehavior(QTableView.SelectionBehavior.SelectRows) | |
| self.table_view.setAlternatingRowColors(True) | |
| self.table_view.verticalHeader().setVisible(False) | |
| self.table_view.clicked.connect(self._on_table_click) | |
| self.proxy_model = QSortFilterProxyModel() | |
| self.proxy_model.setFilterCaseSensitivity(Qt.CaseSensitivity.CaseInsensitive) | |
| self.proxy_model.setFilterKeyColumn(1) | |
| left_layout.addWidget(self.filter_widget) | |
| left_layout.addWidget(self.table_view) | |
| right_widget = QWidget() | |
| right_layout = QVBoxLayout(right_widget) | |
| right_layout.setContentsMargins(0, 0, 0, 0) | |
| self.stats_widget = StatsWidget() | |
| self.history_panel = HistoryPanel() | |
| right_layout.addWidget(self.stats_widget) | |
| right_layout.addWidget(self.history_panel, stretch=1) | |
| splitter.addWidget(left_widget) | |
| splitter.addWidget(right_widget) | |
| splitter.setSizes([700, 300]) | |
| self.setStatusBar(QStatusBar()) | |
| def _load_data(self): | |
| self.statusBar().showMessage('Loading data...') | |
| data = self.reader.load_data() | |
| self.model = HistoryTableModel(data) | |
| self.proxy_model.setSourceModel(self.model) | |
| self.table_view.setModel(self.proxy_model) | |
| selection_model = self.table_view.selectionModel() | |
| if selection_model: | |
| selection_model.selectionChanged.connect(self._on_selection_changed) | |
| header = self.table_view.horizontalHeader() | |
| header.setSectionResizeMode(0, QHeaderView.ResizeMode.ResizeToContents) | |
| header.setSectionResizeMode(1, QHeaderView.ResizeMode.Stretch) | |
| header.setSectionResizeMode(2, QHeaderView.ResizeMode.ResizeToContents) | |
| header.setSectionResizeMode(3, QHeaderView.ResizeMode.Fixed) | |
| self.table_view.setColumnWidth(3, 80) | |
| stats = self.reader.get_stats() | |
| self.stats_widget.update_stats(stats) | |
| self.statusBar().showMessage(f'Loaded {len(data)} records.') | |
| def _on_search_changed(self, text): | |
| self.proxy_model.setFilterFixedString(text) | |
| def _on_table_click(self, index): | |
| self._update_details(index) | |
| def _on_selection_changed(self, selected, deselected): | |
| indexes = self.table_view.selectionModel().selectedRows() | |
| if indexes: | |
| self._update_details(indexes[0]) | |
| def _update_details(self, proxy_index): | |
| source_index = self.proxy_model.mapToSource(proxy_index) | |
| if source_index.isValid(): | |
| row = source_index.row() | |
| raw_data = self.model._data[row] | |
| self.history_panel.set_data(raw_data) | |
| if __name__ == '__main__': | |
| app = QApplication(sys.argv) | |
| app.setStyle('Fusion') | |
| window = MainWindow() | |
| window.show() | |
| sys.exit(app.exec()) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| import os | |
| import sys | |
| import re | |
| import json | |
| import zlib | |
| import mmap | |
| import math | |
| import struct | |
| import shutil | |
| import threading | |
| import heapq | |
| import time | |
| import uuid | |
| import collections | |
| from collections import defaultdict, Counter | |
| from typing import Dict, List, Tuple, Type, Optional, Set | |
| # Импортируем базовый класс истории Xonsh | |
| try: | |
| from xonsh.history.base import History | |
| except ImportError: | |
| class History: | |
| def __init__(self, **kwargs): pass | |
| # --- Глобальный реестр --- | |
| _REGISTRY: Dict[str, 'IndexEngine'] = {} | |
| _REGISTRY_LOCK = threading.Lock() | |
| # Формат: DocID (8 байт, unsigned long long), TF (4 байта, unsigned int) | |
| POSTING_STRUCT = struct.Struct('<QI') | |
| # --- Текстовый процессор --- | |
| class TextProcessor: | |
| ENDINGS = re.compile(r'(ыми|ых|ого|ому|ые|ый|ая|ой|ь|ы|и|а|е|у|ю|ом|ем|ам|ал|ил|ть|ing|ed|es|ly)$') | |
| TOKEN_RE = re.compile(r'\w+') # Ловим любые слова, даже короткие (ls, cd) | |
| @staticmethod | |
| def stem(word: str) -> str: | |
| if len(word) > 4: | |
| return TextProcessor.ENDINGS.sub('', word) | |
| return word | |
| @staticmethod | |
| def process(text: str) -> List[str]: | |
| if not text: return [] | |
| raw_words = TextProcessor.TOKEN_RE.findall(text.lower()) | |
| return [TextProcessor.stem(w) for w in raw_words] | |
| # --- BM25 (Алгоритм ранжирования) --- | |
| class BM25: | |
| def __init__(self, k1: float = 1.5, b: float = 0.75): | |
| self.k1 = k1 | |
| self.b = b | |
| def score(self, tf: int, doc_len: int, avg_dl: float, idf: float) -> float: | |
| numerator = tf * (self.k1 + 1) | |
| denominator = tf + self.k1 * (1 - self.b + self.b * (doc_len / avg_dl)) | |
| return idf * (numerator / denominator) | |
| # --- Работа с диском --- | |
| class DiskSegment: | |
| def __init__(self, dir_path: str): | |
| self.dir_path = dir_path | |
| self.vocab: Dict[str, Tuple[int, int]] = {} | |
| self.doc_index: Dict[int, Tuple[int, int, int]] = {} | |
| self.files = {} | |
| self.mm_postings = None | |
| self.mm_docs = None | |
| try: | |
| p_path = os.path.join(dir_path, 'postings.bin') | |
| d_path = os.path.join(dir_path, 'docs.bin') | |
| self.files['postings'] = open(p_path, 'rb') | |
| self.files['docs'] = open(d_path, 'rb') | |
| # Защита от пустых файлов | |
| if os.path.getsize(p_path) > 0: | |
| self.mm_postings = mmap.mmap(self.files['postings'].fileno(), 0, access=mmap.ACCESS_READ) | |
| if os.path.getsize(d_path) > 0: | |
| self.mm_docs = mmap.mmap(self.files['docs'].fileno(), 0, access=mmap.ACCESS_READ) | |
| except Exception: | |
| self.close() | |
| raise | |
| self._load_vocab() | |
| self._load_doc_index() | |
| def _load_vocab(self): | |
| vp = os.path.join(self.dir_path, 'vocab.json') | |
| if os.path.exists(vp): | |
| with open(vp, 'r', encoding='utf-8') as f: | |
| self.vocab = json.load(f) | |
| def _load_doc_index(self): | |
| dp = os.path.join(self.dir_path, 'doc_idx.json') | |
| if os.path.exists(dp): | |
| with open(dp, 'r') as f: | |
| raw = json.load(f) | |
| self.doc_index = {int(k): tuple(v) for k, v in raw.items()} | |
| def get_postings(self, term: str) -> List[Tuple[int, int]]: | |
| if self.mm_postings is None or term not in self.vocab: return [] | |
| offset, length = self.vocab[term] | |
| try: | |
| raw_bytes = zlib.decompress(self.mm_postings[offset : offset + length]) | |
| results = [] | |
| last_doc_id = 0 | |
| for delta_id, tf in POSTING_STRUCT.iter_unpack(raw_bytes): | |
| doc_id = last_doc_id + delta_id | |
| results.append((doc_id, tf)) | |
| last_doc_id = doc_id | |
| return results | |
| except Exception: return [] | |
| def get_document(self, doc_id: int) -> Optional[Dict]: | |
| if self.mm_docs is None or doc_id not in self.doc_index: return None | |
| offset, length, _ = self.doc_index[doc_id] | |
| try: | |
| return json.loads(zlib.decompress(self.mm_docs[offset : offset + length]).decode('utf-8')) | |
| except Exception: return None | |
| def get_doc_len(self, doc_id: int) -> int: | |
| return self.doc_index[doc_id][2] if doc_id in self.doc_index else 0 | |
| def close(self): | |
| if self.mm_postings: self.mm_postings.close() | |
| if self.mm_docs: self.mm_docs.close() | |
| for f in self.files.values(): f.close() | |
| class SegmentWriter: | |
| @staticmethod | |
| def write(base_dir: str, seg_id: str, inverted_index: Dict, docs: Dict, doc_lens: Dict): | |
| seg_dir = os.path.join(base_dir, f'seg_{seg_id}') | |
| os.makedirs(seg_dir, exist_ok=True) | |
| vocab, doc_index = {}, {} | |
| with open(os.path.join(seg_dir, 'postings.bin'), 'wb') as f_post: | |
| curr = 0 | |
| for term in sorted(inverted_index.keys()): | |
| # Восстанавливаем дельта-кодирование при записи | |
| postings = sorted(inverted_index[term], key=lambda x: x[0]) | |
| buf = bytearray() | |
| last = 0 | |
| for doc_id, tf in postings: | |
| buf.extend(POSTING_STRUCT.pack(doc_id - last, tf)) | |
| last = doc_id | |
| comp = zlib.compress(buf) | |
| f_post.write(comp) | |
| vocab[term] = (curr, len(comp)) | |
| curr += len(comp) | |
| with open(os.path.join(seg_dir, 'docs.bin'), 'wb') as f_docs: | |
| curr = 0 | |
| for doc_id, data in docs.items(): | |
| comp = zlib.compress(json.dumps(data).encode('utf-8')) | |
| f_docs.write(comp) | |
| doc_index[doc_id] = (curr, len(comp), doc_lens.get(doc_id, 0)) | |
| curr += len(comp) | |
| with open(os.path.join(seg_dir, 'vocab.json'), 'w', encoding='utf-8') as f: json.dump(vocab, f) | |
| with open(os.path.join(seg_dir, 'doc_idx.json'), 'w') as f: json.dump(doc_index, f) | |
| return seg_dir | |
| # --- Index Engine с ПОИСКОМ --- | |
| class IndexEngine: | |
| def __init__(self, name: str, path: str): | |
| self.path = path | |
| self.mem_docs = {} | |
| self.mem_doc_lens = {} | |
| self.mem_inverted = defaultdict(lambda: defaultdict(int)) | |
| self.stats = {'total_docs': 0, 'total_len': 0, 'doc_freqs': Counter()} | |
| self.deleted_ids = set() | |
| self.segments = [] | |
| self._lock = threading.RLock() | |
| if self.path: | |
| if not os.path.exists(self.path): | |
| os.makedirs(self.path, exist_ok=True) | |
| self._load_stats() | |
| self._load_segments() | |
| def _load_stats(self): | |
| try: | |
| with open(os.path.join(self.path, 'stats.json'), 'r') as f: | |
| d = json.load(f) | |
| self.stats.update(d) | |
| self.stats['doc_freqs'] = Counter(d['doc_freqs']) | |
| self.deleted_ids = set(d.get('deleted_ids', [])) | |
| except: pass | |
| def _save_stats(self): | |
| with open(os.path.join(self.path, 'stats.json'), 'w') as f: | |
| json.dump({**self.stats, 'deleted_ids': list(self.deleted_ids)}, f) | |
| def _load_segments(self): | |
| for name in sorted(os.listdir(self.path)): | |
| if name.startswith('seg_'): | |
| try: self.segments.append(DiskSegment(os.path.join(self.path, name))) | |
| except: pass | |
| def add(self, doc: Dict): | |
| doc_id = doc['id'] | |
| with self._lock: | |
| self.mem_docs[doc_id] = doc | |
| tokens = TextProcessor.process(doc.get('inp', '')) | |
| self.mem_doc_lens[doc_id] = len(tokens) | |
| for t in tokens: | |
| self.mem_inverted[t][doc_id] += 1 | |
| self.stats['doc_freqs'][t] += 1 | |
| self.stats['total_docs'] += 1 | |
| self.stats['total_len'] += len(tokens) | |
| def flush(self): | |
| with self._lock: | |
| if not self.mem_docs: return | |
| inv = {t: list(d.items()) for t, d in self.mem_inverted.items()} | |
| # Используем наносекунды для ID сегмента | |
| path = SegmentWriter.write(self.path, str(time.time_ns()), inv, self.mem_docs, self.mem_doc_lens) | |
| self.segments.append(DiskSegment(path)) | |
| self.mem_docs.clear(); self.mem_doc_lens.clear(); self.mem_inverted.clear() | |
| self._save_stats() | |
| # --- МЕТОД ПОИСКА --- | |
| def search(self, query: str, limit: int = 10) -> List[Dict]: | |
| tokens = TextProcessor.process(query) | |
| if not tokens: return [] | |
| bm25 = BM25() | |
| avg_dl = self.stats['total_len'] / max(1, self.stats['total_docs']) | |
| scores = defaultdict(float) | |
| # 1. Ищем в памяти | |
| for term in tokens: | |
| idf = math.log(1 + (self.stats['total_docs'] - self.stats['doc_freqs'][term] + 0.5) / (self.stats['doc_freqs'][term] + 0.5)) | |
| for doc_id, tf in self.mem_inverted[term].items(): | |
| scores[doc_id] += bm25.score(tf, self.mem_doc_lens[doc_id], avg_dl, idf) | |
| # 2. Ищем на диске | |
| for seg in self.segments: | |
| for doc_id, tf in seg.get_postings(term): | |
| scores[doc_id] += bm25.score(tf, seg.get_doc_len(doc_id), avg_dl, idf) | |
| # Топ результатов | |
| top_ids = heapq.nlargest(limit, scores.keys(), key=lambda k: scores[k]) | |
| results = [] | |
| for doc_id in top_ids: | |
| if doc_id in self.mem_docs: | |
| results.append(self.mem_docs[doc_id]) | |
| else: | |
| for seg in reversed(self.segments): | |
| doc = seg.get_document(doc_id) | |
| if doc: | |
| results.append(doc) | |
| break | |
| return results | |
| # --- XONSH BACKEND --- | |
| class SearchEngineHistory(History): | |
| def __init__(self, **kwargs): | |
| super().__init__(**kwargs) | |
| self.sessionid = str(uuid.uuid4()) | |
| self.data_dir = os.path.expanduser('~/.xonsh/history_search_db') | |
| with _REGISTRY_LOCK: | |
| if 'xonsh_search' not in _REGISTRY: | |
| _REGISTRY['xonsh_search'] = IndexEngine('xonsh_search', self.data_dir) | |
| self.engine = _REGISTRY['xonsh_search'] | |
| def append(self, cmd): | |
| # ID = текущее время в наносекундах | |
| doc = cmd.copy() | |
| doc['id'] = time.time_ns() | |
| doc['sessionid'] = self.sessionid | |
| doc.pop('out', None) | |
| self.engine.add(doc) | |
| try: self.engine.flush() | |
| except Exception as e: print(f"History Err: {e}", file=sys.stderr) | |
| def items(self, newest_first=False): | |
| all_docs = list(self.engine.mem_docs.values()) | |
| for seg in self.engine.segments: | |
| for doc_id in seg.doc_index: | |
| d = seg.get_document(doc_id) | |
| if d: all_docs.append(d) | |
| all_docs.sort(key=lambda x: x['id'], reverse=newest_first) | |
| yield from all_docs | |
| def all_items(self, newest_first=False): yield from self.items(newest_first) | |
| # ВОТ ОН, ИСПРАВЛЕННЫЙ INFO | |
| def info(self): | |
| data = collections.OrderedDict() | |
| data['backend'] = 'custom_search_engine' | |
| data['sessionid'] = self.sessionid | |
| data['location'] = self.data_dir | |
| data['docs_in_index'] = self.engine.stats['total_docs'] | |
| return data | |
| # Метод для вызова из алиаса | |
| def search(self, query, limit=10): | |
| return self.engine.search(query, limit) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment