Skip to content

Commit

Permalink
added browse history cli
Browse files Browse the repository at this point in the history
  • Loading branch information
TyberiusPrime committed Nov 13, 2024
1 parent 7520afe commit 14c9575
Show file tree
Hide file tree
Showing 2 changed files with 216 additions and 5 deletions.
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -75,3 +75,4 @@ addopts="--ignore=dev_utils"
ppg2-filter-constraint-violations = "pypipegraph2.cli:main_filter_constraint_violations"
ppg2-browse-graph = "pypipegraph2.cli:view_graph_ml"
ppg2-hash-file = "pypipegraph2.cli:hash_file"
ppg2-browse-history= "pypipegraph2.cli:browse_history"
220 changes: 215 additions & 5 deletions python/pypipegraph2/cli.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
"""CLI interface to ppg2"""

import subprocess
import pyzstd
import re
import sys
Expand Down Expand Up @@ -141,9 +142,7 @@ def update_filtered_nodes(self):
def update_ancestors_descendants(self):
if self.selected_node:
self.ancestors = sorted(self.graph.predecessors(self.selected_node))
self.descendants = sorted(
self.graph.successors(self.selected_node)
)
self.descendants = sorted(self.graph.successors(self.selected_node))
ancestors_label = self.query_one("#ancestors-label", Static)
ancestors_label.update(f"Upstream ({len(self.ancestors)})")
# Populate ancestor list
Expand Down Expand Up @@ -193,19 +192,230 @@ def on_key(self, event):

GraphExplorerApp(graph).run()

def hash_file(filename= None):

def hash_file(filename=None):
from pathlib import Path
import hashers

if filename is None:
filename = sys.argv[1]
filename = Path(filename)
h = hashers.hash_file(filename)
import pprint

pprint.pprint(h)


def browse_history(dot_ppg_path=None, dot_ppg_path_old=None):
from pathlib import Path

if dot_ppg_path is None:
try:
dot_ppg_path = Path(sys.argv[1])
except:
print(
"usage: ppg2-browse-history <.ppp/per_script/whatever> [<old/.ppp/per_script/whatever] [search_text]"
)
print(
"second one is for diffs. Might alse be a 'snapshot' directory, then path1 is appended for maximum convience."
)
sys.exit(1)
filename = Path(dot_ppg_path) / "history/ppg_history.2.zstd"
if dot_ppg_path_old is None:
try:
dot_ppg_path_old = Path(sys.argv[2])
except IndexError:
pass
import networkx as nx
import pyzstd
import tempfile
import subprocess
import json

with pyzstd.ZstdFile(filename, "rb") as op:
history = json.loads(op.read().decode("utf-8"))
history_old = None
if dot_ppg_path_old is not None:
filename_old = Path(dot_ppg_path_old) / "history/ppg_history.2.zstd"
if filename_old.exists():
with pyzstd.ZstdFile(filename_old, "rb") as op:
history_old = json.loads(op.read().decode("utf-8"))
else:
filename_old = (
Path(dot_ppg_path_old) / dot_ppg_path / "history/ppg_history.2.zstd"
)
with pyzstd.ZstdFile(filename_old, "rb") as op:
history_old = json.loads(op.read().decode("utf-8"))
try:
start_search_term = sys.argv[3]
except:
start_search_term = ""

from textual.app import App, ComposeResult
from textual.containers import Container
from textual.widgets import (
Input,
ListView,
Static,
ListItem,
Label,
TextArea,
TabbedContent,
)
from textual.reactive import reactive
import difflib
import networkx as nx

class HistoryExplorerApp(App):
# CSS_PATH = "styles.css" # Optional: Define custom CSS here for styling
ancestors = reactive([])
descendants = reactive([])

def __init__(self, history, history_old, start_search_term):
super().__init__()
self.history = history
self.history_old = history_old
self.start_search = start_search_term
self.filtered_nodes = []

def compose(self) -> ComposeResult:
# Build the UI structure
with Container(id="main-layout"):
yield Container(
Input(placeholder="Search history...", id="node-input"),
Static("Filtered history", id="nodes-label"),
ListView(id="node-list"),
Static("Selected node", id="nodes-selected-label"),
TextArea("Selected Node", id="node-selected"),
id="middle-pane",
)
if self.history_old:
with Container(id="right-pane"):
with TabbedContent("Content", "Content old", "Diff"):
yield TextArea(id="node_history")
yield TextArea(id="node_history_old")
yield TextArea(id="node_history_diff")
else:
yield Container(
Static(
"Content | Content Old | Diff"
if self.history_old
else "Content",
id="descendants-label",
),
Label(id="node_history"),
id="right-pane",
)

def on_mount(self):
ni = self.query_one("#node-input")
ni.focus() # Start focus on text input
ni.value = self.start_search

def on_input_changed(self, event: Input.Changed):
# Filter nodes based on input text
search_text = event.value.strip().lower()
# so we can copy paste into this.
search_text = search_text.replace("'", "")
search_text = search_text.replace("->", "!!!")
self.filtered_nodes = [
node
for node in self.history
if not search_text or search_text in str(node).lower()
]
self.update_filtered_nodes()
self.selected_node = None
self.update_node()

def update_filtered_nodes(self):
node_list_view = self.query_one("#node-list", ListView)
node_list_view.clear()
for node in self.filtered_nodes[:100]:
node_list_view.append(ListItem(Label(node), name=node))

def update_node(self):
label_node = self.query_one("#node-selected", TextArea)
label_node.text = (
(self.selected_node or "")
.replace(":::", ":::\n")
.replace("!!!", "\n!!!\n")
.strip()
)
text_box = self.query_one("#node_history", TextArea)
if self.selected_node:
text = self.history[self.selected_node]
else:
text = "(no node selected)"
text_box.text = text

if self.history_old:
if self.selected_node:
old_text = self.history_old.get(
self.selected_node, "(no history available)"
)
else:
old_text = "(no node selected)"
text_box = self.query_one("#node_history_old", TextArea)
text_box.text = old_text
if old_text == text:
diff_text = "No changes"
else:
diff_text = ""
tf = open("a.txt",'w')
tf2 = open('b.txt', 'w')
tf.write(old_text)
tf2.write(text)
tf.flush()
tf2.flush()
try:
diff_text = subprocess.check_output(['icdiff', tf.name, tf2.name]).decode('utf-8')
except FileNotFoundError:
diff_text = "(install icdiff for pretty diff)\n"

for line in difflib.unified_diff(
old_text.splitlines(), text.splitlines()
):
diff_text += line + "\n"
text_box = self.query_one("#node_history_diff", TextArea)
text_box.text = diff_text

def on_list_view_highlighted(self, event: ListView.Selected):
# Update selected node when a node is chosen from the list
if event.item:
selected_node = event.item.name
else:
selected_node = None
self.selected_node = selected_node
self.update_node()

def on_key(self, event):
# Tab to switch focus between input, node list, ancestors, and descendants
if event.key == "tab":
if self.query_one("#node-input").has_focus:
self.query_one("#node-list").focus()
elif self.query_one("#node-list").has_focus:
self.query_one("#node_history").focus()
elif self.query_one("#node_history").has_focus:
if self.history_old:
self.query_one("#node_history_old").focus()
else:
self.query_one("#node-list").focus()
elif self.query_one("#node_history_old").has_focus:
self.query_one("#node_history_diff").focus()
else:
self.query_one("#node-input").focus()
elif event.key == "down" and self.query_one("#node-input").has_focus:
self.query_one("#node-list").focus()

elif event.key == "escape" or event.key == 'q':
sys.exit(0)

# Run the app

HistoryExplorerApp(history, history_old, start_search_term).run()


if __name__ == "__main__":
import sys

view_graph_ml(filename=sys.argv[1])
browse_history()

0 comments on commit 14c9575

Please sign in to comment.