[optimize] Refactoring AbyssCli project to Python script

This commit is contained in:
acite
2025-09-20 14:41:47 +08:00
parent a7c522a61f
commit 87536a1508
3 changed files with 321 additions and 380 deletions

321
Abyss/Toolkits/abyss-cli.py Normal file
View File

@@ -0,0 +1,321 @@
#!/usr/bin/env python3
# abysscli.py
"""
Abyss CLI — Python 3 实现
Commands:
open <baseUrl> <user> <privateKeyBase64>
destroy <baseUrl> <token>
valid <baseUrl> <token>
create <baseUrl> <user> <privateKeyBase64> <newUsername> <privilege>
"""
from __future__ import annotations
import sys
import argparse
import base64
import json
import typing as t
from urllib.parse import quote
import requests
from requests import Session
from nacl.signing import SigningKey
from nacl.exceptions import BadSignatureError
# ---- Utilities for Ed25519 handling ----
def generate_keypair_base64() -> t.Tuple[str, str]:
"""
Generate Ed25519 keypair.
Returns (private_base64, public_base64).
private is encoded as 64 bytes: seed(32) || pub(32) to align with many raw-private formats.
public is 32 bytes.
"""
sk = SigningKey.generate()
seed = sk.encode() # 32 bytes seed
vk = sk.verify_key.encode() # 32 bytes pubkey
priv_raw = seed + vk # 64 bytes
return base64.b64encode(priv_raw).decode('ascii'), base64.b64encode(vk).decode('ascii')
def sign_with_private_base64(private_base64: str, data: bytes) -> str:
"""
Accept private key as base64. Supports:
- 32-byte seed (seed only)
- 64-byte raw private (seed + pub)
Returns base64(signature).
"""
try:
raw = base64.b64decode(private_base64)
except Exception as e:
raise ValueError(f"privateKeyBase64 is not valid base64: {e}")
if len(raw) == 32:
seed = raw
elif len(raw) == 64:
seed = raw[:32]
else:
raise ValueError(f"Unsupported private key length: {len(raw)} bytes (expected 32 or 64)")
sk = SigningKey(seed)
sig = sk.sign(data).signature # 64 bytes
return base64.b64encode(sig).decode('ascii')
# ---- HTTP helpers ----
def create_session(base_url: str) -> Session:
s = requests.Session()
base = base_url.rstrip('/')
s.headers.update({'User-Agent': 'AbyssCli-Python/1.0'})
s.base_url = base + '/' # attach attribute for convenience
return s
def _full_url(session: Session, path: str) -> str:
base = getattr(session, 'base_url', '')
# ensure no double slashes issues
return base + path.lstrip('/')
def try_read_response_text(resp: requests.Response) -> str:
try:
return resp.text or ""
except Exception:
return ""
def parse_possibly_json_string(text: str) -> str:
"""
Server sometimes returns a JSON-encoded string like: "username"
Try json.loads first, fall back to trimming quotes.
"""
if text is None:
return ""
text = text.strip()
if not text:
return ""
try:
parsed = json.loads(text)
# If parsed is a string, return it; otherwise return original trimmed
if isinstance(parsed, str):
return parsed
# otherwise return textual representation
return str(parsed)
except Exception:
# fallback trim quotes only at ends if present
if text.startswith('"') and text.endswith('"') and len(text) >= 2:
return text[1:-1]
return text
# ---- Command implementations ----
def cmd_open(args: argparse.Namespace) -> int:
base = args.baseUrl
user = args.user
priv_base64 = args.privateKeyBase64
sess = create_session(base)
# 1. GET challenge
url = _full_url(sess, f"api/user/{quote(user, safe='')}")
try:
r = sess.get(url, timeout=15)
except Exception as e:
print(f"Failed to GET challenge: {e}", file=sys.stderr)
return 1
if not r.ok:
print(f"Failed to get challenge: HTTP {r.status_code}", file=sys.stderr)
txt = try_read_response_text(r)
if txt:
print(txt, file=sys.stderr)
return 1
challenge_text = try_read_response_text(r)
challenge = parse_possibly_json_string(challenge_text)
# challenge is expected to be base64-encoded bytes
try:
challenge_bytes = base64.b64decode(challenge)
except Exception:
print("Challenge is not valid base64.", file=sys.stderr)
return 1
# 2. Sign
try:
signature_base64 = sign_with_private_base64(priv_base64, challenge_bytes)
except Exception as e:
print(f"Signing failed: {e}", file=sys.stderr)
return 1
# 3. POST response to get token
post_url = _full_url(sess, f"api/user/{quote(user, safe='')}")
payload = {"Response": signature_base64}
try:
r2 = sess.post(post_url, json=payload, timeout=15)
except Exception as e:
print(f"Failed to POST response: {e}", file=sys.stderr)
return 1
if not r2.ok:
print(f"Authentication failed: HTTP {r2.status_code}", file=sys.stderr)
txt = try_read_response_text(r2)
if txt:
print(txt, file=sys.stderr)
return 1
token_text = try_read_response_text(r2)
token = parse_possibly_json_string(token_text)
if not token:
print("Authentication failed or server returned no token.", file=sys.stderr)
return 1
print(token)
return 0
def cmd_destroy(args: argparse.Namespace) -> int:
base = args.baseUrl
token = args.token
sess = create_session(base)
url = _full_url(sess, f"api/user/destroy?token={quote(token, safe='')}")
try:
r = sess.post(url, timeout=15)
except Exception as e:
print(f"Destroy request failed: {e}", file=sys.stderr)
return 1
if not r.ok:
print(f"Destroy failed: HTTP {r.status_code}", file=sys.stderr)
txt = try_read_response_text(r)
if txt:
print(txt, file=sys.stderr)
return 1
# some servers return body, but original prints "Success"
print("Success")
return 0
def cmd_valid(args: argparse.Namespace) -> int:
base = args.baseUrl
token = args.token
sess = create_session(base)
url = _full_url(sess, f"api/user/validate?token={quote(token, safe='')}")
try:
r = sess.post(url, timeout=15)
except Exception as e:
print(f"Validate request failed: {e}", file=sys.stderr)
return 1
if not r.ok:
print("Invalid")
return 1
content = try_read_response_text(r)
username = parse_possibly_json_string(content)
if not username:
print("Invalid")
return 1
print(username)
return 0
def cmd_create(args: argparse.Namespace) -> int:
base = args.baseUrl
user = args.user
priv_base64 = args.privateKeyBase64
new_username = args.newUsername
privilege = args.privilege
sess = create_session(base)
# 1. Get challenge for creator
url = _full_url(sess, f"api/user/{quote(user, safe='')}")
try:
r = sess.get(url, timeout=15)
except Exception as e:
print(f"Failed to GET challenge for creator: {e}", file=sys.stderr)
return 1
if not r.ok:
print(f"Failed to get challenge for creator: HTTP {r.status_code}", file=sys.stderr)
txt = try_read_response_text(r)
if txt:
print(txt, file=sys.stderr)
return 1
challenge_text = try_read_response_text(r)
challenge = parse_possibly_json_string(challenge_text)
try:
challenge_bytes = base64.b64decode(challenge)
except Exception:
print("Challenge is not valid base64.", file=sys.stderr)
return 1
# 2. Sign challenge with creator private key
try:
signature_base64 = sign_with_private_base64(priv_base64, challenge_bytes)
except Exception as e:
print(f"Signing failed: {e}", file=sys.stderr)
return 1
# 3. Generate key pair for new user
new_priv_b64, new_pub_b64 = generate_keypair_base64()
# 4. Build create payload and PATCH
payload = {
"Response": signature_base64,
"Name": new_username,
"Parent": user,
"Privilege": int(privilege),
"PublicKey": new_pub_b64
}
patch_url = _full_url(sess, f"api/user/{quote(user, safe='')}")
try:
r2 = sess.request("PATCH", patch_url, json=payload, timeout=15)
except Exception as e:
print(f"Create request failed: {e}", file=sys.stderr)
return 1
resp_text = try_read_response_text(r2)
if not r2.ok:
print(f"Create failed: HTTP {r2.status_code}", file=sys.stderr)
if resp_text:
print(resp_text, file=sys.stderr)
return 1
print("Success")
print("NewUserPrivateKeyBase64:")
print(new_priv_b64)
print("NewUserPublicKeyBase64:")
print(new_pub_b64)
return 0
# ---- CLI entrypoint ----
def main(argv: t.Optional[t.List[str]] = None) -> int:
if argv is None:
argv = sys.argv[1:]
parser = argparse.ArgumentParser(prog="AbyssCli", description="Abyss CLI (Python)")
sub = parser.add_subparsers(dest="cmd")
p_open = sub.add_parser("open", help="open <baseUrl> <user> <privateKeyBase64>")
p_open.add_argument("baseUrl")
p_open.add_argument("user")
p_open.add_argument("privateKeyBase64")
p_destroy = sub.add_parser("destroy", help="destroy <baseUrl> <token>")
p_destroy.add_argument("baseUrl")
p_destroy.add_argument("token")
p_valid = sub.add_parser("valid", help="valid <baseUrl> <token>")
p_valid.add_argument("baseUrl")
p_valid.add_argument("token")
p_create = sub.add_parser("create", help="create <baseUrl> <user> <privateKeyBase64> <newUsername> <privilege>")
p_create.add_argument("baseUrl")
p_create.add_argument("user")
p_create.add_argument("privateKeyBase64")
p_create.add_argument("newUsername")
p_create.add_argument("privilege", type=int)
if not argv:
parser.print_help()
return 1
args = parser.parse_args(argv)
try:
if args.cmd == "open":
return cmd_open(args)
elif args.cmd == "destroy":
return cmd_destroy(args)
elif args.cmd == "valid":
return cmd_valid(args)
elif args.cmd == "create":
return cmd_create(args)
else:
print("Unknown command.", file=sys.stderr)
parser.print_help()
return 2
except Exception as ex:
print(f"Error: {ex}", file=sys.stderr)
return 3
if __name__ == "__main__":
sys.exit(main())

View File

@@ -1,16 +0,0 @@
<Project Sdk="Microsoft.NET.Sdk">
<PropertyGroup>
<OutputType>Exe</OutputType>
<TargetFramework>net9.0</TargetFramework>
<ImplicitUsings>enable</ImplicitUsings>
<Nullable>enable</Nullable>
<PublishAot>true</PublishAot>
<InvariantGlobalization>true</InvariantGlobalization>
</PropertyGroup>
<ItemGroup>
<PackageReference Include="NSec.Cryptography" Version="25.4.0" />
</ItemGroup>
</Project>

View File

@@ -1,364 +0,0 @@
// Program.cs
using System;
using System.Net.Http;
using System.Text;
using System.Text.Json;
using System.Threading.Tasks;
using System.Collections.Generic;
using System.Text.Json.Serialization.Metadata;
using NSec.Cryptography;
public class ChallengeRequestBody
{
public string Response { get; set; } = "";
}
public class CreateRequestBody
{
public string Response { get; set; } = "";
public string Name { get; set; } = "";
public string Parent { get; set; } = "";
public int Privilege { get; set; }
public string PublicKey { get; set; } = "";
}
public static class Ed25519Utils
{
public static (string privateBase64, string publicBase64) GenerateKeyPairBase64()
{
var algorithm = SignatureAlgorithm.Ed25519;
var creationParameters = new KeyCreationParameters
{
ExportPolicy = KeyExportPolicies.AllowPlaintextExport
};
using var key = Key.Create(algorithm, creationParameters);
var priv = key.Export(KeyBlobFormat.RawPrivateKey);
var pub = key.Export(KeyBlobFormat.RawPublicKey);
return (Convert.ToBase64String(priv), Convert.ToBase64String(pub));
}
public static string SignBase64PrivateKey(string privateKeyBase64, byte[] dataToSign)
{
var algorithm = SignatureAlgorithm.Ed25519;
var privateBytes = Convert.FromBase64String(privateKeyBase64);
using var key = Key.Import(algorithm, privateBytes, KeyBlobFormat.RawPrivateKey);
var sig = algorithm.Sign(key, dataToSign);
return Convert.ToBase64String(sig);
}
}
public static class Program
{
static async Task<int> Main(string[] args)
{
if (args == null || args.Length == 0)
{
PrintUsage();
return 1;
}
var cmd = args[0].ToLowerInvariant();
try
{
switch (cmd)
{
case "open":
return await CmdOpen(args);
case "destroy":
return await CmdDestroy(args);
case "valid":
return await CmdValid(args);
case "create":
return await CmdCreate(args);
default:
Console.Error.WriteLine("Unknown command.");
PrintUsage();
return 2;
}
}
catch (Exception ex)
{
Console.Error.WriteLine($"Error: {ex.Message}");
return 3;
}
}
static void PrintUsage()
{
Console.WriteLine("Usage:");
Console.WriteLine(" AbyssCli open <baseUrl> <user> <privateKeyBase64>");
Console.WriteLine(" AbyssCli destroy <baseUrl> <token>");
Console.WriteLine(" AbyssCli valid <baseUrl> <token>");
Console.WriteLine(" AbyssCli create <baseUrl> <user> <privateKeyBase64> <newUsername> <privilege>");
}
static HttpClient CreateHttpClient(string baseUrl)
{
var client = new HttpClient();
client.BaseAddress = new Uri(baseUrl.TrimEnd('/') + "/");
return client;
}
static async Task<int> CmdOpen(string[] args)
{
if (args.Length != 4)
{
Console.Error.WriteLine("open requires 3 arguments: <baseUrl> <user> <privateKeyBase64>");
return 1;
}
var baseUrl = args[1];
var user = args[2];
var privateKeyBase64 = args[3];
using var client = CreateHttpClient(baseUrl);
// 1. GET challenge
var challenge = await GetChallenge(client, user);
if (challenge == null)
{
Console.Error.WriteLine("Failed to get challenge.");
return 1;
}
// 2. Sign challenge (challenge is base64 string)
byte[] challengeBytes;
try
{
challengeBytes = Convert.FromBase64String(challenge);
}
catch
{
Console.Error.WriteLine("Challenge is not valid base64.");
return 1;
}
string signatureBase64;
try
{
signatureBase64 = Ed25519Utils.SignBase64PrivateKey(privateKeyBase64, challengeBytes);
}
catch (Exception ex)
{
Console.Error.WriteLine($"Signing failed: {ex.Message}");
return 1;
}
// 3. POST response to get token
var token = await PostResponseForToken(client, user, signatureBase64);
if (token == null)
{
Console.Error.WriteLine("Authentication failed or server returned no token.");
return 1;
}
Console.WriteLine(token);
return 0;
}
static async Task<int> CmdDestroy(string[] args)
{
if (args.Length != 3)
{
Console.Error.WriteLine("destroy requires 2 arguments: <baseUrl> <token>");
return 1;
}
var baseUrl = args[1];
var token = args[2];
using var client = CreateHttpClient(baseUrl);
var resp = await client.PostAsync($"api/user/destroy?token={token}", null);
if (!resp.IsSuccessStatusCode)
{
Console.Error.WriteLine($"Destroy failed: {resp.StatusCode}");
var txt = await TryReadResponseText(resp);
if (!string.IsNullOrEmpty(txt)) Console.Error.WriteLine(txt);
return 1;
}
var body = await resp.Content.ReadAsStringAsync();
Console.WriteLine("Success");
return 0;
}
static async Task<int> CmdValid(string[] args)
{
if (args.Length != 3)
{
Console.Error.WriteLine("valid requires 2 arguments: <baseUrl> <token>");
return 1;
}
var baseUrl = args[1];
var token = args[2];
using var client = CreateHttpClient(baseUrl);
var resp = await client.PostAsync($"api/user/validate?token={token}", null);
if (!resp.IsSuccessStatusCode)
{
Console.WriteLine("Invalid");
return 1;
}
var content = await resp.Content.ReadAsStringAsync();
// server likely returns JSON string (e.g. "username"), try to parse JSON string
try
{
var username = JsonSerializer.Deserialize<string>(content, jsonOptions);
if (username == null)
{
Console.WriteLine("Invalid");
return 1;
}
Console.WriteLine(username);
return 0;
}
catch
{
// fallback
Console.WriteLine(content.Trim('"'));
return 0;
}
}
static async Task<int> CmdCreate(string[] args)
{
if (args.Length != 6)
{
Console.Error.WriteLine("create requires 5 arguments: <baseUrl> <user> <privateKeyBase64> <newUsername> <privilege>");
return 1;
}
var baseUrl = args[1];
var user = args[2];
var privateKeyBase64 = args[3];
var newUsername = args[4];
if (!int.TryParse(args[5], out var privilege))
{
Console.Error.WriteLine("Privilege must be an integer.");
return 1;
}
using var client = CreateHttpClient(baseUrl);
// 1. Get challenge for creator user
var challenge = await GetChallenge(client, user);
if (challenge == null)
{
Console.Error.WriteLine("Failed to get challenge for creator.");
return 1;
}
byte[] challengeBytes;
try
{
challengeBytes = Convert.FromBase64String(challenge);
}
catch
{
Console.Error.WriteLine("Challenge is not valid base64.");
return 1;
}
string signatureBase64;
try
{
signatureBase64 = Ed25519Utils.SignBase64PrivateKey(privateKeyBase64, challengeBytes);
}
catch (Exception ex)
{
Console.Error.WriteLine($"Signing failed: {ex.Message}");
return 1;
}
// 2. Generate key pair for new user
var (newPrivBase64, newPubBase64) = Ed25519Utils.GenerateKeyPairBase64();
// 3. Build create payload
var payload = new CreateRequestBody
{
Response = signatureBase64,
Name = newUsername,
Parent = user,
Privilege = privilege,
PublicKey = newPubBase64
};
var json = JsonSerializer.Serialize(payload, jsonOptions);
var request = new HttpRequestMessage(new HttpMethod("PATCH"), $"api/user/{Uri.EscapeDataString(user)}")
{
Content = new StringContent(json, Encoding.UTF8, "application/json")
};
var resp = await client.SendAsync(request);
var respText = await TryReadResponseText(resp);
if (!resp.IsSuccessStatusCode)
{
Console.Error.WriteLine($"Create failed: {resp.StatusCode}");
if (!string.IsNullOrEmpty(respText)) Console.Error.WriteLine(respText);
return 1;
}
Console.WriteLine("Success");
Console.WriteLine("NewUserPrivateKeyBase64:");
Console.WriteLine(newPrivBase64);
Console.WriteLine("NewUserPublicKeyBase64:");
Console.WriteLine(newPubBase64);
return 0;
}
static async Task<string?> GetChallenge(HttpClient client, string user)
{
var resp = await client.GetAsync($"api/user/{Uri.EscapeDataString(user)}");
if (!resp.IsSuccessStatusCode) return null;
var content = await resp.Content.ReadAsStringAsync();
// server probably returns JSON string; try to deserialize to string
try
{
var s = JsonSerializer.Deserialize<string>(content, jsonOptions);
if (s != null) return s;
}
catch { /* ignore */ }
// fallback: trim quotes
return content.Trim('"');
}
static async Task<string?> PostResponseForToken(HttpClient client, string user, string signatureBase64)
{
var body = new ChallengeRequestBody { Response = signatureBase64 };
var json = JsonSerializer.Serialize(body, jsonOptions);
var resp = await client.PostAsync($"api/user/{Uri.EscapeDataString(user)}",
new StringContent(json, Encoding.UTF8, "application/json"));
if (!resp.IsSuccessStatusCode) return null;
var content = await resp.Content.ReadAsStringAsync();
try
{
var token = JsonSerializer.Deserialize<string>(content, jsonOptions);
if (!string.IsNullOrEmpty(token)) return token;
}
catch { /* ignore */ }
return content.Trim('"');
}
static async Task<string> TryReadResponseText(HttpResponseMessage resp)
{
try
{
return await resp.Content.ReadAsStringAsync();
}
catch
{
return "";
}
}
static readonly JsonSerializerOptions jsonOptions = new()
{
TypeInfoResolver = new DefaultJsonTypeInfoResolver()
};
}