Compare commits
12 Commits
ff17f90098
..
master
| Author | SHA1 | Date | |
|---|---|---|---|
| 540ecd6aa8 | |||
| d4c642c25e | |||
| 75a936ef29 | |||
| c7cc85d1e8 | |||
| 95f4735a94 | |||
| 9fd495c976 | |||
| 75a8770bf8 | |||
| 4a81b1617b | |||
| 449ebbcce5 | |||
| b9dd14d562 | |||
| 6bb1439323 | |||
| ef438b79b2 |
@@ -0,0 +1,143 @@
|
||||
|
||||
# cli decode
|
||||
def cli_decode(base64_encoded_script: str) -> str:
|
||||
return f"echo '{base64_encoded_script}' | base64 -d | gunzip "
|
||||
|
||||
# source now
|
||||
def source_now(base64_encoded_script: str) -> str:
|
||||
return f"source <({cli_decode(base64_encoded_script)})"
|
||||
|
||||
# Color codes for bash prompt
|
||||
def get_green_color() -> str:
|
||||
"""Returns ANSI escape sequence for green text"""
|
||||
return r"\[\e[0;32m\]"
|
||||
|
||||
def get_reset_color() -> str:
|
||||
"""Returns ANSI escape sequence to reset color back to normal"""
|
||||
return r"\[\e[0m\]"
|
||||
|
||||
# Get local IP address using hostname
|
||||
def get_local_ip_command() -> str:
|
||||
"""
|
||||
Returns a command that gets the local IP address using hostname -I
|
||||
which displays all network addresses of the host
|
||||
"""
|
||||
return r"hostname -I | awk '{print $1}'"
|
||||
|
||||
# Basic prompt components
|
||||
def get_user_host() -> str:
|
||||
"""Returns bash variables for username@hostname"""
|
||||
return r"\u@\h"
|
||||
|
||||
def get_working_directory() -> str:
|
||||
"""Returns bash variable for current working directory"""
|
||||
return r"\w"
|
||||
|
||||
def get_prompt_symbol() -> str:
|
||||
"""Returns the prompt symbol ($)"""
|
||||
return r"\$"
|
||||
|
||||
# Construct the complete PS1 prompt
|
||||
def build_ps1_prompt() -> str:
|
||||
"""
|
||||
Builds a colored bash prompt that shows:
|
||||
- username@hostname in green
|
||||
- local IP address in parentheses
|
||||
- current working directory
|
||||
- $ prompt symbol
|
||||
- resets color after
|
||||
"""
|
||||
green = get_green_color()
|
||||
reset = get_reset_color()
|
||||
user_host = get_user_host()
|
||||
local_ip_cmd = get_local_ip_command()
|
||||
work_dir = get_working_directory()
|
||||
prompt_symbol = get_prompt_symbol()
|
||||
|
||||
return f'{green}{user_host} ($({local_ip_cmd})):{work_dir}{prompt_symbol} {reset}'
|
||||
|
||||
def get_plain_user_host_ip() -> str:
|
||||
"""
|
||||
Returns a plain text command that outputs user@host (ip) format
|
||||
without PS1 formatting strings - just the actual values
|
||||
"""
|
||||
return 'echo "$(whoami)@$(hostname) ($(hostname -I | awk \'{print $1}\'))"'
|
||||
|
||||
# Alias building utilities
|
||||
def create_bash_function(func_name: str, func_body: str) -> str:
|
||||
"""
|
||||
Creates a bash function with the given name and body.
|
||||
The function body should be properly formatted bash code.
|
||||
"""
|
||||
return f'{func_name}() {{\n{func_body}\n}}'
|
||||
|
||||
def create_alias_with_function(alias_name: str, func_name: str, func_body: str) -> str:
|
||||
"""
|
||||
Creates a bash alias that defines and immediately calls a function.
|
||||
This pattern allows for complex logic within aliases.
|
||||
"""
|
||||
function_def = create_bash_function(func_name, func_body)
|
||||
return f"alias {alias_name}='{function_def}; {func_name}'"
|
||||
|
||||
# File finding utilities
|
||||
def find_last_modified_file_command() -> str:
|
||||
"""
|
||||
Returns a command that finds the last modified file in current directory.
|
||||
Uses find with printf to get modification time, sorts by time, and gets the newest.
|
||||
"""
|
||||
return 'find . -maxdepth 1 -type f -printf "%T@ %p\\0" | sort -znr | head -zn1 | cut -d" " -f2- --zero-terminated'
|
||||
|
||||
def create_conditional_message(condition: str, success_msg: str, failure_msg: str) -> str:
|
||||
"""
|
||||
Creates a bash conditional that shows different messages based on a condition.
|
||||
"""
|
||||
return f'''if [ {condition} ]; then
|
||||
{success_msg}
|
||||
else
|
||||
{failure_msg}
|
||||
fi'''
|
||||
|
||||
# Tail last modified file alias components
|
||||
def get_tail_last_logic() -> str:
|
||||
"""
|
||||
Creates the logic for tailing the last modified file in current directory.
|
||||
"""
|
||||
find_cmd = find_last_modified_file_command()
|
||||
success_action = '''echo "Tailing: \\"$LAST_MODIFIED_FILE\\"";
|
||||
tail "$@" "$LAST_MODIFIED_FILE"'''
|
||||
failure_action = 'echo "No files found in the current directory to tail."'
|
||||
|
||||
conditional = create_conditional_message(
|
||||
'-n "$LAST_MODIFIED_FILE"',
|
||||
success_action,
|
||||
failure_action
|
||||
)
|
||||
|
||||
return f'''LAST_MODIFIED_FILE=$({find_cmd});
|
||||
{conditional}'''
|
||||
|
||||
def build_tail_last_alias() -> str:
|
||||
"""
|
||||
Builds the 'tl' alias that tails the last modified file in current directory.
|
||||
"""
|
||||
func_body = get_tail_last_logic()
|
||||
return create_alias_with_function('tl', '_tl_func', func_body)
|
||||
|
||||
better_env = f'''
|
||||
export TERM=xterm
|
||||
export PS1="{build_ps1_prompt()}"
|
||||
|
||||
# Tail the last modified file in current directory
|
||||
{build_tail_last_alias()}
|
||||
|
||||
{create_alias_with_function('lss', '_lss_func', 'systemctl status lightspeed.service')}
|
||||
{create_alias_with_function('cdm', '_cdm_func', 'cd /home/multihomingproxy/logs')}
|
||||
|
||||
echo "Bash customizations sourced successfully!"
|
||||
'''
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
+47
-29
@@ -5,12 +5,17 @@ import pyperclip
|
||||
from typing import Dict, Any, List, Optional, Callable
|
||||
from pathlib import Path
|
||||
import re
|
||||
import googletrans
|
||||
import asyncio
|
||||
|
||||
console = Console()
|
||||
|
||||
# Create the main grafana subcommand
|
||||
grafana_app = typer.Typer()
|
||||
|
||||
# Global translator instance
|
||||
translator = googletrans.Translator()
|
||||
|
||||
def safe_get(data: Dict[str, Any], key: str, default: Any = None) -> Any:
|
||||
"""Safely get a value from a dictionary."""
|
||||
return data.get(key, default)
|
||||
@@ -19,12 +24,10 @@ def contains_chinese(text: str) -> bool:
|
||||
"""Check if text contains Chinese characters."""
|
||||
if not isinstance(text, str):
|
||||
return False
|
||||
# Unicode ranges for Chinese characters
|
||||
chinese_pattern = re.compile(r'[\u4e00-\u9fff\u3400-\u4dbf\u20000-\u2a6df\u2a700-\u2b73f\u2b740-\u2b81f\u2b820-\u2ceaf\uf900-\ufaff\u3300-\u33ff\ufe30-\ufe4f\uf900-\ufaff\u2f800-\u2fa1f]')
|
||||
return bool(chinese_pattern.search(text))
|
||||
return text != text.encode('ascii', 'ignore').decode('ascii')
|
||||
|
||||
def mutate_chinese_titles(data: Dict[str, Any]) -> None:
|
||||
"""Recursively mutate titles containing Chinese characters."""
|
||||
async def mutate_chinese_titles(data: Dict[str, Any]) -> None:
|
||||
"""Recursively translate Chinese titles to English."""
|
||||
if not isinstance(data, dict):
|
||||
return
|
||||
|
||||
@@ -33,13 +36,25 @@ def mutate_chinese_titles(data: Dict[str, Any]) -> None:
|
||||
|
||||
for key, value in data.items():
|
||||
if key in title_keys and isinstance(value, str) and contains_chinese(value):
|
||||
data[key] = "please_translate_me"
|
||||
try:
|
||||
chinese_value = value
|
||||
if chinese_value.contains("--"):
|
||||
idx = value.index("--")
|
||||
chinese_value, _ = value[:idx]
|
||||
|
||||
# Translate Chinese text to English
|
||||
translation = await translator.translate(chinese_value, dest='en')
|
||||
data[key] = chinese_value + "--" + translation.text
|
||||
|
||||
console.print(f"[dim]Translated '{chinese_value}' → '{translation.text}'[/dim]")
|
||||
except Exception as e:
|
||||
console.print(f"[yellow]Warning: Failed to translate '{value}': {e}[/yellow]")
|
||||
elif isinstance(value, dict):
|
||||
mutate_chinese_titles(value)
|
||||
await mutate_chinese_titles(value)
|
||||
elif isinstance(value, list):
|
||||
for item in value:
|
||||
if isinstance(item, dict):
|
||||
mutate_chinese_titles(item)
|
||||
await mutate_chinese_titles(item)
|
||||
|
||||
def mutate_panel(panel: Dict[str, Any]) -> None:
|
||||
"""Mutate a single panel by setting repeat and repeatDirection fields."""
|
||||
@@ -88,9 +103,9 @@ def mutate_grafana_json_repeat(data: Dict[str, Any]) -> Dict[str, Any]:
|
||||
|
||||
return mutated_data
|
||||
|
||||
def mutate_grafana_json_chinese(data: Dict[str, Any]) -> Dict[str, Any]:
|
||||
async def mutate_grafana_json_chinese(data: Dict[str, Any]) -> Dict[str, Any]:
|
||||
"""
|
||||
Mutate Grafana JSON by replacing Chinese text in titles with 'please_translate_me'.
|
||||
Mutate Grafana JSON by translating Chinese text in titles to English.
|
||||
|
||||
Args:
|
||||
data: The Grafana dashboard JSON data
|
||||
@@ -101,12 +116,12 @@ def mutate_grafana_json_chinese(data: Dict[str, Any]) -> Dict[str, Any]:
|
||||
# Make a copy to avoid mutating the original
|
||||
mutated_data = json.loads(json.dumps(data))
|
||||
|
||||
# Apply Chinese title mutation recursively
|
||||
mutate_chinese_titles(mutated_data)
|
||||
# Apply Chinese title translation recursively
|
||||
await mutate_chinese_titles(mutated_data)
|
||||
|
||||
return mutated_data
|
||||
|
||||
def mutate_grafana_json_all(data: Dict[str, Any]) -> Dict[str, Any]:
|
||||
async def mutate_grafana_json_all(data: Dict[str, Any]) -> Dict[str, Any]:
|
||||
"""
|
||||
Apply both mutations: repeat fields and Chinese title translation.
|
||||
|
||||
@@ -121,12 +136,12 @@ def mutate_grafana_json_all(data: Dict[str, Any]) -> Dict[str, Any]:
|
||||
console.print("[green]✓ Applied repeat field mutation[/green]")
|
||||
|
||||
# Then apply Chinese title translation
|
||||
mutated_data = mutate_grafana_json_chinese(mutated_data)
|
||||
mutated_data = await mutate_grafana_json_chinese(mutated_data)
|
||||
console.print("[green]✓ Applied Chinese title translation[/green]")
|
||||
|
||||
return mutated_data
|
||||
|
||||
def process_grafana_json(
|
||||
async def process_grafana_json(
|
||||
mutation_func: Callable[[Dict[str, Any]], Dict[str, Any]],
|
||||
input_file: Optional[Path],
|
||||
output_file: Optional[Path],
|
||||
@@ -173,7 +188,10 @@ def process_grafana_json(
|
||||
|
||||
# Apply mutation
|
||||
try:
|
||||
mutated_data = mutation_func(data)
|
||||
if asyncio.iscoroutinefunction(mutation_func):
|
||||
mutated_data = await mutation_func(data)
|
||||
else:
|
||||
mutated_data = mutation_func(data)
|
||||
console.print(f"[green]{success_message}[/green]")
|
||||
except Exception as e:
|
||||
console.print(f"[red]Error mutating JSON: {e}[/red]")
|
||||
@@ -213,15 +231,15 @@ def mutate_dashboard(
|
||||
- Set maxPerRow to 12
|
||||
- Remove repeat field from row-type panels
|
||||
"""
|
||||
process_grafana_json(
|
||||
asyncio.run(process_grafana_json(
|
||||
mutation_func=mutate_grafana_json_repeat,
|
||||
input_file=input_file,
|
||||
output_file=output_file,
|
||||
process_message="Processing Grafana dashboard JSON...",
|
||||
success_message="Successfully mutated Grafana JSON!"
|
||||
)
|
||||
))
|
||||
|
||||
@grafana_app.command("translate", short_help="Replace Chinese text in titles with 'please_translate_me'.")
|
||||
@grafana_app.command("translate", short_help="Translate Chinese text in titles to English.")
|
||||
def translate_dashboard(
|
||||
input_file: Optional[Path] = typer.Option(
|
||||
None, "--input-file", "-i", help="Input JSON file path. If not provided, reads from clipboard."
|
||||
@@ -231,19 +249,19 @@ def translate_dashboard(
|
||||
)
|
||||
):
|
||||
"""
|
||||
Replace Chinese text in titles with 'please_translate_me'.
|
||||
Translate Chinese text in titles to English using Google Translate.
|
||||
|
||||
This command processes Grafana dashboard JSON and replaces any Chinese characters
|
||||
This command processes Grafana dashboard JSON and translates any Chinese characters
|
||||
in title-related fields (title, name, description, tooltip, legendFormat, text)
|
||||
with the placeholder text 'please_translate_me'.
|
||||
to English using Google Translate.
|
||||
"""
|
||||
process_grafana_json(
|
||||
asyncio.run(process_grafana_json(
|
||||
mutation_func=mutate_grafana_json_chinese,
|
||||
input_file=input_file,
|
||||
output_file=output_file,
|
||||
process_message="Processing Grafana dashboard JSON for Chinese title translation...",
|
||||
success_message="Successfully replaced Chinese text in titles!"
|
||||
)
|
||||
success_message="Successfully translated Chinese text in titles!"
|
||||
))
|
||||
|
||||
@grafana_app.command("mutate-all", short_help="Apply both repeat field mutation and Chinese title translation.")
|
||||
def mutate_all_dashboard(
|
||||
@@ -255,16 +273,16 @@ def mutate_all_dashboard(
|
||||
)
|
||||
):
|
||||
"""
|
||||
Apply both mutations: set repeat fields and replace Chinese text in titles.
|
||||
Apply both mutations: set repeat fields and translate Chinese text in titles.
|
||||
|
||||
This command processes Grafana dashboard JSON and applies both:
|
||||
1. Repeat field mutation (repeat="instance", repeatDirection="h", maxPerRow=12)
|
||||
2. Chinese title translation (replaces Chinese characters with 'please_translate_me')
|
||||
2. Chinese title translation (translates Chinese characters to English using Google Translate)
|
||||
"""
|
||||
process_grafana_json(
|
||||
asyncio.run(process_grafana_json(
|
||||
mutation_func=mutate_grafana_json_all,
|
||||
input_file=input_file,
|
||||
output_file=output_file,
|
||||
process_message="Processing Grafana dashboard JSON with all mutations...",
|
||||
success_message="Successfully applied all mutations!"
|
||||
)
|
||||
))
|
||||
@@ -0,0 +1,66 @@
|
||||
"""
|
||||
Utility functions for string compression and encoding.
|
||||
|
||||
This module provides functions for compressing strings using gzip
|
||||
and encoding them with base64 for efficient storage and transmission.
|
||||
"""
|
||||
|
||||
import base64
|
||||
import gzip
|
||||
from typing import Final
|
||||
|
||||
|
||||
def compress_string(text: str, *, encoding: str = "utf-8") -> bytes:
|
||||
"""
|
||||
Compress a string using gzip with maximum compression.
|
||||
|
||||
Args:
|
||||
text: The string to compress
|
||||
encoding: The text encoding to use (default: utf-8)
|
||||
|
||||
Returns:
|
||||
The compressed data as bytes
|
||||
|
||||
Raises:
|
||||
UnicodeEncodeError: If the text cannot be encoded with the specified encoding
|
||||
"""
|
||||
text_bytes: bytes = text.encode(encoding)
|
||||
return gzip.compress(text_bytes, compresslevel=9)
|
||||
|
||||
|
||||
def encode_base64(data: bytes) -> str:
|
||||
"""
|
||||
Encode bytes to a base64 string.
|
||||
|
||||
Args:
|
||||
data: The bytes to encode
|
||||
|
||||
Returns:
|
||||
The base64-encoded string
|
||||
"""
|
||||
return base64.b64encode(data).decode("ascii")
|
||||
|
||||
|
||||
def compress_and_encode(text: str, *, encoding: str = "utf-8") -> str:
|
||||
"""
|
||||
Compress a string with gzip and encode it as base64.
|
||||
|
||||
This is a convenience function that combines compress_string() and encode_base64().
|
||||
|
||||
Args:
|
||||
text: The string to compress and encode
|
||||
encoding: The text encoding to use (default: utf-8)
|
||||
|
||||
Returns:
|
||||
The compressed and base64-encoded string
|
||||
|
||||
Raises:
|
||||
UnicodeEncodeError: If the text cannot be encoded with the specified encoding
|
||||
"""
|
||||
compressed_data: bytes = compress_string(text, encoding=encoding)
|
||||
return encode_base64(compressed_data)
|
||||
|
||||
|
||||
# Constants for common use cases
|
||||
MAX_COMPRESSION_LEVEL: Final[int] = 9
|
||||
DEFAULT_ENCODING: Final[str] = "utf-8"
|
||||
@@ -5,6 +5,8 @@ import gzip
|
||||
import pyperclip
|
||||
import io
|
||||
import base64
|
||||
from . import utils
|
||||
from . import better_bash
|
||||
|
||||
console = Console()
|
||||
|
||||
@@ -94,6 +96,30 @@ def decode_dir(
|
||||
console.print(f"[red]An unexpected error occurred during decode: {e}[/red]")
|
||||
raise typer.Exit(code=1)
|
||||
|
||||
@wrap_app.command("env", short_help="Compress and encode bash environment script for sourcing.")
|
||||
def env():
|
||||
"""
|
||||
Compresses the bash environment script and copies a source command to clipboard.
|
||||
"""
|
||||
console.print("[blue]Compressing bash environment script...[/blue]")
|
||||
|
||||
try:
|
||||
# Compress and encode the better_env script
|
||||
compressed_encoded = utils.compress_and_encode(better_bash.better_env)
|
||||
|
||||
# Create the source command
|
||||
source_command = better_bash.source_now(compressed_encoded)
|
||||
|
||||
# Copy to clipboard
|
||||
pyperclip.copy(source_command)
|
||||
|
||||
console.print("[green]Source command copied to clipboard![/green]")
|
||||
console.print(f"[dim]Command: {source_command}[/dim]")
|
||||
|
||||
except Exception as e:
|
||||
console.print(f"[red]An unexpected error occurred: {e}[/red]")
|
||||
raise typer.Exit(code=1)
|
||||
|
||||
# Add the encode subcommand to the wrap app
|
||||
wrap_app.add_typer(encode_app, name="encode")
|
||||
|
||||
|
||||
Reference in New Issue
Block a user