新建会话
Ctrl
K
Kimi Code 限时活动
历史会话
查看手机应用

PBKDF2 100k Attack Time

```py """ Cryptographic utilities for encrypting and decrypting agent data. This module provides functions for deriving encryption keys from passwords using PBKDF2 and encrypting/decrypting data using Fernet symmetric encryption. Logging in this module is intentionally minimal and NEVER includes any secret material such as passwords, salts, or ciphertext. Only operation types and high-level status are logged. """ import base64 import logging import os from typing import Optional, Tuple from cryptography.fernet import Fernet, InvalidToken from cryptography.hazmat.primitives import hashes from cryptography.hazmat.primitives.kdf.pbkdf2 import PBKDF2HMAC from .exceptions import ( DecryptionError, EncryptionError, InvalidPasswordError, KeyDerivationError, ValidationError, ) logger = logging.getLogger(__name__) def derive_key(password: str, salt: Optional[bytes] = None) -> Tuple[bytes, bytes]: """ Derive an encryption key from a password using PBKDF2. Args: password: The password to derive the key from salt: Optional salt bytes. If None, a random salt is generated. Returns: A tuple of (key, salt) where key is a base64-encoded Fernet key and salt is the salt bytes used (or generated). Raises: InvalidPasswordError: If password is empty or None KeyDerivationError: If key derivation fails """ if not password: raise InvalidPasswordError("Password cannot be empty or None") if not isinstance(password, str): raise ValidationError( "Password must be a string", f"Got type: {type(password).__name__}", ) try: if salt is None: salt = os.urandom(16) if not isinstance(salt, bytes) or len(salt) < 8: raise ValidationError( "Invalid salt", "Salt must be at least 8 bytes", ) kdf = PBKDF2HMAC( algorithm=hashes.SHA256(), length=32, salt=salt, iterations=100000, ) key = base64.urlsafe_b64encode(kdf.derive(password.encode())) logger.debug("Derived encryption key using PBKDF2", extra={"salt_len": len(salt)}) return key, salt except Exception as e: if isinstance(e, (InvalidPasswordError, ValidationError)): raise raise KeyDerivationError("Failed to derive encryption key", str(e)) from e def encrypt_data(data: str, password: str) -> dict: """ Encrypt a string using PBKDF2 key derivation and Fernet encryption. Args: data: The plaintext string to encrypt password: The password to use for key derivation Returns: A dictionary containing: - 'data': Base64-encoded encrypted data - 'salt': Base64-encoded salt used for key derivation Raises: ValidationError: If data is not a string or is None EncryptionError: If encryption fails """ if data is None: raise ValidationError("Data cannot be None", "Provide a valid string to encrypt") if not isinstance(data, str): raise ValidationError("Data must be a string", f"Got type: {type(data).__name__}") try: key, salt = derive_key(password) f = Fernet(key) encrypted = f.encrypt(data.encode()) logger.debug("Encrypted data string", extra={"cipher_len": len(encrypted)}) return { "data": base64.b64encode(encrypted).decode(), "salt": base64.b64encode(salt).decode(), } except (InvalidPasswordError, KeyDerivationError, ValidationError): raise except Exception as e: raise EncryptionError("Failed to encrypt data", str(e)) from e def decrypt_data(encrypted_dict: dict, password: str) -> str: """ Decrypt data that was encrypted with encrypt_data(). Args: encrypted_dict: A dictionary containing: - 'data': Base64-encoded encrypted data - 'salt': Base64-encoded salt used for key derivation password: The password used for encryption Returns: The decrypted plaintext string Raises: ValidationError: If encrypted_dict is invalid or missing required keys DecryptionError: If decryption fails (wrong password, corrupted data, etc.) """ if not isinstance(encrypted_dict, dict): raise ValidationError( "Encrypted data must be a dictionary", f"Got type: {type(encrypted_dict).__name__}", ) if "data" not in encrypted_dict or "salt" not in encrypted_dict: raise ValidationError( "Encrypted dictionary missing required keys", "Expected keys: 'data' and 'salt'", ) try: salt = base64.b64decode(encrypted_dict["salt"]) key, _ = derive_key(password, salt) f = Fernet(key) encrypted_data = base64.b64decode(encrypted_dict["data"]) decrypted = f.decrypt(encrypted_data) return decrypted.decode("utf-8") except InvalidToken: raise DecryptionError( "Decryption failed - invalid token", "The password may be incorrect or the data may be corrupted", ) from None except (UnicodeDecodeError, ValueError) as e: raise DecryptionError("Decryption failed - invalid data format", str(e)) from e except (InvalidPasswordError, KeyDerivationError, ValidationError): raise except Exception as e: raise DecryptionError("Decryption failed", str(e)) from e ``` ```audit.py """ Encrypted audit logging for agent activity. This module provides the AuditLogger class for creating an encrypted, tamper-evident log of sensitive operations like key injection and lock file access. """ import json import logging import os import time from typing import Any, Dict, List, Optional from .crypto import DecryptionError, decrypt_data, encrypt_data logger = logging.getLogger(__name__) class AuditLogger: """ Manages an encrypted append-only audit log. Each log entry is individually encrypted and signed (via authenticated encryption in crypto.py) to ensure integrity and confidentiality. """ def __init__(self, file_path: str = "agent_audit.log"): """ Initialize an AuditLogger instance. Args: file_path: Path to the audit log file (default: "agent_audit.log") """ self.file_path = file_path self.master_key = os.environ.get("AGENT_MASTER_KEY", "default-key") def log_event(self, event_type: str, details: Optional[Dict[str, Any]] = None) -> None: """ Log an event to the encrypted audit log. Args: event_type: Identifier for the type of event (e.g., "key_access", "lock_created") details: Optional dictionary containing non-sensitive event details """ if details is None: details = {} entry = { "timestamp": time.time(), "iso_time": time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime()), "event_type": event_type, "details": details, } try: # Serialize and encrypt the entry json_str = json.dumps(entry) encrypted_data = encrypt_data(json_str, self.master_key) # We store the encrypted dict as a single line JSON # encrypt_data returns {'data': '...', 'salt': '...'} log_line = json.dumps(encrypted_data) with open(self.file_path, "a") as f: f.write(log_line + "\n") except Exception as e: # We explicitly catch errors here to prevent audit logging failures # from crashing the main application, but we log the error to system logs. logger.error(f"Failed to write to audit log: {e}") def read_logs(self) -> List[Dict[str, Any]]: """ Read and decrypt all entries from the audit log. Returns: List of decrypted log entries sorted by timestamp. """ if not os.path.exists(self.file_path): return [] entries = [] try: with open(self.file_path, "r") as f: for line_num, line in enumerate(f, 1): line = line.strip() if not line: continue try: encrypted_dict = json.loads(line) decrypted_json = decrypt_data(encrypted_dict, self.master_key) entry = json.loads(decrypted_json) entries.append(entry) except (json.JSONDecodeError, DecryptionError) as e: logger.warning(f"Failed to decrypt audit log line {line_num}: {e}") # We continue reading other lines continue except Exception as e: logger.error(f"Error reading audit log: {e}") return [] return entries def clear(self) -> None: """Clear the audit log file.""" if os.path.exists(self.file_path): os.remove(self.file_path) ``` ```keychain.py """ OS keychain integration for secure credential storage. This module provides functions for storing, retrieving, and managing API keys and other credentials using the platform's native keyring service. Logging in this module NEVER records secret key values. Only key names and high-level operation results are logged. """ import json import logging from typing import Dict, Optional, cast import keyring import keyring.errors from .audit import AuditLogger from .exceptions import ( InvalidKeyNameError, KeychainAccessError, KeychainDeletionError, KeychainStorageError, ValidationError, ) SERVICE_NAME = "backpack-agent" logger = logging.getLogger(__name__) audit_logger = AuditLogger() def _validate_key_name(key_name: str) -> None: """ Validate a key name. Args: key_name: The key name to validate Raises: InvalidKeyNameError: If the key name is invalid """ if not key_name: raise InvalidKeyNameError("", "Key name cannot be empty") if not isinstance(key_name, str): raise InvalidKeyNameError(str(key_name), f"Key name must be a string, got {type(key_name).__name__}") # Key names should not start with underscore (reserved for internal use) # except for the registry key if key_name.startswith("_") and key_name != "_registry": raise InvalidKeyNameError(key_name, "Key names starting with '_' are reserved for internal use") def store_key(key_name: str, key_value: str) -> None: """ Store a key-value pair in the OS keychain. Args: key_name: The name/identifier of the key key_value: The secret value to store Raises: InvalidKeyNameError: If key_name is invalid KeychainStorageError: If storing the key fails """ _validate_key_name(key_name) if key_value is None: raise ValidationError("Key value cannot be None", "Provide a valid string value to store") if not isinstance(key_value, str): raise ValidationError("Key value must be a string", f"Got type: {type(key_value).__name__}") try: keyring.set_password(SERVICE_NAME, key_name, key_value) logger.info("Stored key in keychain", extra={"service": SERVICE_NAME, "key_name": key_name}) audit_logger.log_event("store_key", {"service": SERVICE_NAME, "key_name": key_name}) except keyring.errors.KeyringError as e: raise KeychainStorageError(key_name, f"Keyring error: {str(e)}") from e except Exception as e: raise KeychainStorageError(key_name, f"Unexpected error: {str(e)}") from e def get_key(key_name: str) -> Optional[str]: """ Retrieve a key value from the OS keychain. Args: key_name: The name/identifier of the key to retrieve Returns: The stored key value, or None if not found Raises: InvalidKeyNameError: If key_name is invalid KeychainAccessError: If accessing the keychain fails """ _validate_key_name(key_name) try: value = keyring.get_password(SERVICE_NAME, key_name) logger.debug( "Retrieved key from keychain", extra={"service": SERVICE_NAME, "key_name": key_name, "found": bool(value)}, ) if value: audit_logger.log_event("get_key", {"service": SERVICE_NAME, "key_name": key_name}) return value except keyring.errors.KeyringError as e: raise KeychainAccessError(f"Failed to retrieve key '{key_name}' from keychain", str(e)) from e except Exception as e: raise KeychainAccessError(f"Unexpected error retrieving key '{key_name}'", str(e)) from e def list_keys() -> Dict[str, bool]: """ List all keys registered in the keychain. Note: The OS keyring doesn't provide native list functionality, so we maintain a registry of keys. Returns: A dictionary mapping key names to True (indicating they exist) Raises: KeychainAccessError: If accessing the keychain fails """ try: registry = get_key("_registry") if registry: try: return cast(Dict[str, bool], json.loads(registry)) except json.JSONDecodeError: return {} return {} except KeychainAccessError: return {} def register_key(key_name: str) -> None: """ Register a key name in the keychain registry. This is used to track which keys exist, since the OS keyring doesn't provide native list functionality. Args: key_name: The name of the key to register Raises: InvalidKeyNameError: If key_name is invalid KeychainStorageError: If storing the registry fails """ _validate_key_name(key_name) try: registry = list_keys() registry[key_name] = True keyring.set_password(SERVICE_NAME, "_registry", json.dumps(registry)) except (KeychainAccessError, KeychainStorageError): # Registry failures are non-critical (key itself is already stored) pass except Exception as e: raise KeychainStorageError("_registry", f"Failed to update registry: {str(e)}") from e def delete_key(key_name: str) -> None: """ Delete a key from the keychain and registry. Args: key_name: The name of the key to delete Raises: InvalidKeyNameError: If key_name is invalid KeychainDeletionError: If deletion fails """ _validate_key_name(key_name) # Deletion is intentionally idempotent: # - If the secret doesn't exist in the keychain, we still remove it from the registry. # - Tests expect delete_key() to not raise for missing keys. try: keyring.delete_password(SERVICE_NAME, key_name) audit_logger.log_event("delete_key", {"service": SERVICE_NAME, "key_name": key_name}) except keyring.errors.PasswordDeleteError: pass except keyring.errors.KeyringError as e: raise KeychainDeletionError(key_name, f"Keyring error: {str(e)}") from e except Exception as e: raise KeychainDeletionError(key_name, f"Unexpected error: {str(e)}") from e # Update registry try: registry = list_keys() registry.pop(key_name, None) keyring.set_password(SERVICE_NAME, "_registry", json.dumps(registry)) except Exception: pass ``` ```exceptions.py """ Custom exception classes for Backpack Agent Container System. This module defines specific exception types for different error scenarios, providing better error handling and more informative error messages. """ from typing import Optional class BackpackError(Exception): """Base exception class for all Backpack-related errors.""" def __init__(self, message: str, details: Optional[str] = None): """ Initialize a Backpack error. Args: message: Human-readable error message details: Optional additional details for debugging """ self.message = message self.details = details super().__init__(self.message) def __str__(self): if self.details: return f"{self.message} (Details: {self.details})" return self.message class CryptoError(BackpackError): """Exception raised for cryptographic operation errors.""" class DecryptionError(CryptoError): """Exception raised when decryption fails.""" def __init__(self, message: str = "Decryption failed", details: Optional[str] = None): super().__init__( message or "Failed to decrypt data. This may be due to an incorrect password, corrupted data, or invalid format.", details, ) class EncryptionError(CryptoError): """Exception raised when encryption fails.""" def __init__(self, message: str = "Encryption failed", details: Optional[str] = None): super().__init__( message or "Failed to encrypt data. Please check your input and try again.", details, ) class KeyDerivationError(CryptoError): """Exception raised when key derivation fails.""" def __init__(self, message: str = "Key derivation failed", details: Optional[str] = None): super().__init__( message or "Failed to derive encryption key. This may indicate a problem with the password or salt.", details, ) class KeychainError(BackpackError): """Exception raised for keychain operation errors.""" class KeyNotFoundError(KeychainError): """Exception raised when a key is not found in the keychain.""" def __init__(self, key_name: str): super().__init__( f"Key '{key_name}' not found in keychain", f"Use 'backpack key add {key_name}' to add this key to your vault", ) self.key_name = key_name class KeychainAccessError(KeychainError): """Exception raised when keychain access fails.""" def __init__(self, message: str = "Failed to access keychain", details: Optional[str] = None): super().__init__( message or "Unable to access the OS keychain. Please check your system's keychain permissions.", details, ) class KeychainStorageError(KeychainError): """Exception raised when storing to keychain fails.""" def __init__(self, key_name: str, details: Optional[str] = None): super().__init__( f"Failed to store key '{key_name}' in keychain", details or "Please check your system's keychain permissions and available storage", ) self.key_name = key_name class KeychainDeletionError(KeychainError): """Exception raised when deleting from keychain fails.""" def __init__(self, key_name: str, details: Optional[str] = None): super().__init__( f"Failed to delete key '{key_name}' from keychain", details or "The key may not exist or keychain access may be denied", ) self.key_name = key_name class AgentLockError(BackpackError): """Exception raised for agent.lock file operation errors.""" class AgentLockNotFoundError(AgentLockError): """Exception raised when agent.lock file is not found.""" def __init__(self, file_path: str = "agent.lock"): super().__init__( f"Agent lock file not found: {file_path}", "Run 'backpack init' to create an agent.lock file", ) self.file_path = file_path class AgentLockCorruptedError(AgentLockError): """Exception raised when agent.lock file is corrupted or invalid.""" def __init__(self, file_path: str = "agent.lock", details: Optional[str] = None): super().__init__( f"Agent lock file is corrupted or invalid: {file_path}", details or "The file may have been modified or encrypted with a different key", ) self.file_path = file_path class AgentLockReadError(AgentLockError): """Exception raised when reading agent.lock file fails.""" def __init__(self, file_path: str, details: Optional[str] = None): super().__init__( f"Failed to read agent lock file: {file_path}", details or "Please check file permissions and ensure the file is not locked by another process", ) self.file_path = file_path class AgentLockWriteError(AgentLockError): """Exception raised when writing agent.lock file fails.""" def __init__(self, file_path: str, details: Optional[str] = None): super().__init__( f"Failed to write agent lock file: {file_path}", details or "Please check file permissions and available disk space", ) self.file_path = file_path class ValidationError(BackpackError): """Exception raised for input validation errors.""" class InvalidPathError(ValidationError): """Exception raised when a file path is invalid.""" def __init__(self, path: str, reason: Optional[str] = None): super().__init__( f"Invalid path: {path}", reason or "The path does not exist or is not accessible", ) self.path = path class InvalidKeyNameError(ValidationError): """Exception raised when a key name is invalid.""" def __init__(self, key_name: str, reason: Optional[str] = None): super().__init__( f"Invalid key name: {key_name}", reason or "Key names must be non-empty and contain only alphanumeric characters and underscores", ) self.key_name = key_name class InvalidPasswordError(ValidationError): """Exception raised when a password is invalid.""" def __init__(self, reason: Optional[str] = None): super().__init__( "Invalid password", reason or "Password cannot be empty", ) class ScriptExecutionError(BackpackError): """Exception raised when agent script execution fails.""" def __init__(self, script_path: str, details: Optional[str] = None): super().__init__( f"Failed to execute agent script: {script_path}", details or "Please check that the script exists and is executable", ) self.script_path = script_path ``` ```cli.py """ Command-line interface for Backpack Agent Container System. This module provides CLI commands for managing agents, keys, and running agents with JIT variable injection. Logging is configured here for CLI usage. By default it logs at INFO level, and can be controlled via the BACKPACK_LOG_LEVEL environment variable. """ import json import logging import os import platform import shutil import subprocess import sys import zipfile from typing import Dict, List import click from . import __version__ from .agent_lock import AgentLock from .exceptions import AgentLockNotFoundError, AgentLockReadError, BackpackError, KeyNotFoundError, ValidationError from .keychain import ( InvalidKeyNameError, KeychainDeletionError, KeychainStorageError, delete_key, get_key, list_keys, register_key, store_key, ) def _configure_logging() -> logging.Logger: """ Configure a default logger for CLI usage if none is configured. The log level can be overridden with BACKPACK_LOG_LEVEL (e.g. DEBUG, INFO). Can log to a file if BACKPACK_LOG_FILE is set. """ root = logging.getLogger() if not root.handlers: level_name = os.environ.get("BACKPACK_LOG_LEVEL", "INFO").upper() level = getattr(logging, level_name, logging.INFO) handlers: List[logging.Handler] = [logging.StreamHandler(sys.stderr)] log_file = os.environ.get("BACKPACK_LOG_FILE") if log_file: handlers.append(logging.FileHandler(log_file)) logging.basicConfig( level=level, format="%(asctime)s [%(levelname)s] %(name)s: %(message)s", datefmt="%H:%M:%S", handlers=handlers ) return logging.getLogger(__name__) logger = _configure_logging() def handle_error(e: Exception, exit_code: int = 1) -> None: """ Handle and display errors in a user-friendly way. Args: e: The exception to handle exit_code: Exit code to use (default: 1) """ if isinstance(e, BackpackError): logger.error("Backpack error", extra={"type": type(e).__name__, "error_message": e.message}) click.echo(click.style(f"Error: {e.message}", fg="red"), err=True) if e.details: click.echo(click.style(f" {e.details}", fg="yellow"), err=True) # Add helpful tips based on exception type if isinstance(e, KeyNotFoundError): click.echo(click.style(" 💡 Tip: List available keys with 'backpack key list'", fg="cyan"), err=True) elif isinstance(e, AgentLockNotFoundError): click.echo(click.style(" 💡 Tip: Initialize an agent with 'backpack init' or 'backpack quickstart'", fg="cyan"), err=True) elif isinstance(e, AgentLockReadError): click.echo(click.style(" 💡 Tip: Check file permissions or if the file is corrupted.", fg="cyan"), err=True) elif isinstance(e, click.ClickException): raise e else: logger.error("Unexpected error in CLI", extra={"type": type(e).__name__, "error": str(e)}) click.echo(click.style(f"Unexpected error: {str(e)}", fg="red"), err=True) if hasattr(e, "__cause__") and e.__cause__: click.echo(click.style(f" Caused by: {str(e.__cause__)}", fg="yellow"), err=True) click.echo(click.style(" 💡 Tip: Run with BACKPACK_LOG_LEVEL=DEBUG for more details.", fg="cyan"), err=True) sys.exit(exit_code) @click.group() def cli(): """ Backpack Agent Container CLI. A secure system for managing AI agents with encrypted state, credentials, and personality configurations. """ @cli.command() @click.option("--non-interactive", is_flag=True, help="Skip prompts; use defaults (for scripts)") def quickstart(non_interactive): """ Interactive wizard to create your first agent in under 2 minutes. Guides you through creating an agent.lock and a starter agent script. Use --non-interactive for CI/scripts (uses sensible defaults). """ try: click.echo(click.style("\n>> Backpack Quick Start\n", fg="cyan", bold=True)) click.echo("Create a working agent in under 2 minutes.\n") agent_lock = AgentLock() if os.path.exists(agent_lock.file_path) and not non_interactive: if not click.confirm("agent.lock already exists here. Overwrite and continue?"): click.echo("Cancelled.") return # Agent name / description if non_interactive: agent_name = "My Agent" creds_input = "OPENAI_API_KEY" personality_text = "You are a helpful AI assistant. Use a professional tone." else: agent_name = click.prompt("What's your agent's name?", default="My Agent", show_default=True) click.echo("\nCommon credentials (comma-separated): OPENAI_API_KEY, ANTHROPIC_API_KEY, TWITTER_TOKEN") creds_input = click.prompt("Required credentials", default="OPENAI_API_KEY", show_default=True) personality_text = click.prompt( "Personality / system prompt", default="You are a helpful AI assistant. Use a professional tone.", show_default=True, ) creds: Dict[str, str] = {} for cred in creds_input.replace(",", " ").split(): c = cred.strip() if not c: continue if not c.replace("_", "").isalnum(): continue creds[c] = f"placeholder_{c.lower()}" if not creds: creds = {"OPENAI_API_KEY": "placeholder_openai_api_key"} personality_data = {"system_prompt": personality_text or "You are a helpful AI assistant.", "tone": "professional"} agent_lock.create(creds, personality_data) click.echo(click.style(f"\n[OK] Created agent.lock with {len(creds)} credential(s)", fg="green")) # Generate starter agent.py agent_py = _QUICKSTART_AGENT_SCRIPT.format(agent_name=agent_name or "My Agent", creds_list=", ".join(repr(k) for k in creds)) agent_path = "agent.py" if os.path.exists(agent_path) and not non_interactive: if not click.confirm(f"{agent_path} already exists. Overwrite?"): agent_path = "agent_quickstart.py" click.echo(f"Writing starter script to {agent_path} instead.") with open(agent_path, "w") as f: f.write(agent_py) click.echo(click.style(f"[OK] Created {agent_path}", fg="green")) click.echo(click.style("\nNext steps:", fg="cyan", bold=True)) click.echo(" 1. Add your keys: backpack key add OPENAI_API_KEY (and any others)") click.echo(f" 2. Run your agent: backpack run {agent_path}") click.echo("") except BackpackError as e: handle_error(e) except Exception as e: handle_error(e) _QUICKSTART_AGENT_SCRIPT = '''""" {agent_name} - Generated by Backpack quickstart. Uses credentials and personality from agent.lock (injected by Backpack). """ import os def main(): # Credentials are injected by Backpack when you run: backpack run agent.py keys = [{creds_list}] for key_name in keys: value = os.environ.get(key_name) print(f"{{key_name}} available: {{'Yes' if value else 'No'}}") system_prompt = os.environ.get("AGENT_SYSTEM_PROMPT", "") print(f"Personality: {{system_prompt[:60]}}...") print("Agent is ready. Replace this with your real logic.") if __name__ == "__main__": main() ''' @cli.command() @click.option("--credentials", help="Comma-separated list of required credentials (e.g., OPENAI_API_KEY,TWITTER_TOKEN)") @click.option("--personality", help="Agent personality prompt") def init(credentials, personality): """ Initialize a new agent.lock file. """ try: creds: Dict[str, str] = {} if credentials: for cred in credentials.split(","): cred_name = cred.strip() if not cred_name: continue if not cred_name.replace("_", "").isalnum(): raise ValidationError( f"Invalid credential name: {cred_name}", "Credential names must contain only alphanumeric characters and underscores", ) creds[cred_name] = f"placeholder_{cred_name.lower()}" personality_data = {"system_prompt": personality or "You are a helpful AI assistant.", "tone": "professional"} agent_lock = AgentLock() if os.path.exists(agent_lock.file_path): if not click.confirm("agent.lock already exists. Overwrite it?"): click.echo("Cancelled.") return agent_lock.create(creds, personality_data) click.echo(click.style(f"[OK] Created agent.lock with {len(creds)} credential placeholders", fg="green")) except BackpackError as e: handle_error(e) except Exception as e: handle_error(e) @cli.command() @click.option("--new-key", help="New master key (prompted if not provided)") @click.option("--key-file", default="agent.lock", help="Path to agent.lock file") def rotate(new_key, key_file): """ Rotate the master encryption key for agent.lock. Decrypts the current agent.lock with the current key (AGENT_MASTER_KEY), and re-encrypts it with the new key. """ if not os.path.exists(key_file): click.echo(click.style(f"File {key_file} not found.", fg="red")) sys.exit(1) # 1. Read with current key current_lock = AgentLock(key_file) data = current_lock.read() if data is None: click.echo(click.style("Failed to decrypt agent.lock with current key.", fg="red")) click.echo("Check if AGENT_MASTER_KEY is set correctly.") sys.exit(1) click.echo(click.style(f"Successfully decrypted {key_file}", fg="green")) # 2. Get new key if not new_key: click.echo("\nYou are about to rotate the master key.") click.echo("Please provide a new secure master key.") new_key = click.prompt("New Master Key", hide_input=True, confirmation_prompt=True) if not new_key: click.echo("Key cannot be empty.") sys.exit(1) # 3. Write with new key try: # Create a new lock instance with the NEW key new_lock = AgentLock(key_file, master_key=new_key) new_lock.create(data["credentials"], data["personality"], data["memory"]) click.echo(click.style(f"\n[OK] Re-encrypted {key_file} with new key.", fg="green")) click.echo(click.style("\nIMPORTANT:", fg="yellow", bold=True)) click.echo("You MUST update your AGENT_MASTER_KEY environment variable to the new key.") click.echo("If you lose this key, you cannot access the agent.lock file again.") except Exception as e: click.echo(click.style(f"Failed to rotate key: {e}", fg="red")) sys.exit(1) @cli.command() @click.argument("script_path") @click.option("--non-interactive", is_flag=True, help="Skip prompts (auto-approve keys) - enabled automatically if AGENT_MASTER_KEY is set") def run(script_path, non_interactive): """ Run an agent with JIT variable injection. """ agent_lock = AgentLock() agent_data = agent_lock.read() if not agent_data: raise click.ClickException("No agent.lock found. Run 'backpack init' first.") # Detect Cloud/Non-interactive mode # We assume non-interactive if the flag is passed OR if AGENT_MASTER_KEY is explicitly set # (implying a managed environment like Vercel/Railway) is_cloud_mode = non_interactive or (os.environ.get("AGENT_MASTER_KEY") is not None) env_vars: Dict[str, str] = {} # Get required keys directly from the loaded data creds_layer = agent_data.get("credentials", {}) required_keys = list(creds_layer.keys()) for key_name in required_keys: value_to_inject = None source = None # 1. Check Environment (already set?) if key_name in os.environ: source = "environment" # 2. Check Lock File (Encrypted Portability) # If the value in agent.lock is NOT a placeholder, it's a real encrypted key elif key_name in creds_layer: val = creds_layer[key_name] if val and not val.startswith("placeholder_"): value_to_inject = val source = "agent.lock" # 3. Check Local Vault (Keychain) if not source and not value_to_inject: stored_key = get_key(key_name) if stored_key: value_to_inject = stored_key source = "vault" # Decision Logic if source == "environment": # Already satisfied. if not is_cloud_mode: click.echo(f"Key {key_name} found in environment.") continue if value_to_inject: if is_cloud_mode: # Cloud/Non-interactive: Auto-approve env_vars[key_name] = value_to_inject else: # Local/Interactive: Prompt user msg = f"This agent requires access to {key_name}" if source == "agent.lock": msg += " (found in agent.lock)" elif source == "vault": msg += " (found in vault)" if click.confirm(f"{msg}. Allow access?"): env_vars[key_name] = value_to_inject else: click.echo(f"Access denied for {key_name}. Agent may not function properly.") else: # Key not found anywhere click.echo(f"Key {key_name} not found in environment, agent.lock, or vault.") click.echo(f" Add it with 'backpack key add {key_name}' or set it as an environment variable.") env_vars["AGENT_SYSTEM_PROMPT"] = agent_data["personality"]["system_prompt"] env_vars["AGENT_TONE"] = agent_data["personality"]["tone"] # Merge injected env vars with current environment env = os.environ.copy() env.update(env_vars) click.echo(f"Running {script_path} with {len(env_vars)} injected variables...") # Use subprocess.run() instead of os.system() for better control and security # sys.executable ensures we use the same Python interpreter result = subprocess.run([sys.executable, script_path], env=env) # Exit with the script's return code sys.exit(result.returncode) @cli.group() def key(): """Manage keys in personal vault""" @key.command("add") @click.argument("key_name") @click.option("--value", prompt=True, hide_input=True, help="Key value") def add_key(key_name, value): """ Add a key to personal vault. """ try: if not value or not value.strip(): raise ValidationError("Key value cannot be empty", "Please provide a non-empty value") existing_key = get_key(key_name) if existing_key: if not click.confirm(f"Key '{key_name}' already exists. Overwrite it?"): click.echo("Cancelled.") return store_key(key_name, value) register_key(key_name) click.echo(click.style(f"[OK] Added {key_name} to vault", fg="green")) except (InvalidKeyNameError, KeychainStorageError, ValidationError) as e: handle_error(e) except Exception as e: handle_error(e) @key.command("list") def list_keys_cmd(): """List keys in personal vault.""" keys = list_keys() if keys: click.echo("Keys in vault:") for key_name in keys: click.echo(f" - {key_name}") else: click.echo("No keys in vault") @key.command("remove") @click.argument("key_name") def remove_key(key_name): """Remove a key from personal vault.""" try: delete_key(key_name) click.echo(click.style(f"[OK] Removed {key_name} from vault", fg="green")) except (KeychainDeletionError, InvalidKeyNameError) as e: handle_error(e) except Exception as e: handle_error(e) def _get_templates_dir() -> str: """Return path to backpack/templates (works when installed or run from source).""" try: # Prefer stdlib resource APIs when available. try: from importlib import resources as importlib_resources # py3.7+ # `files()` is py3.9+, so guard it. files = getattr(importlib_resources, "files", None) if files is not None: return str(files("backpack").joinpath("templates")) except Exception: pass # Fallback for older environments / packaging edge-cases. import pkg_resources # type: ignore return pkg_resources.resource_filename("backpack", "templates") except Exception: pass return os.path.join(os.path.dirname(os.path.abspath(__file__)), "templates") def _list_template_names() -> List[str]: """Return list of template directory names.""" root = _get_templates_dir() if not os.path.isdir(root): return [] return [d for d in os.listdir(root) if os.path.isdir(os.path.join(root, d)) and not d.startswith(".")] @cli.group() def template(): """Use ready-made agent templates.""" @template.command("list") def template_list(): """List available agent templates.""" names = _list_template_names() if not names: click.echo("No templates found.") return click.echo(click.style("Available templates:\n", fg="cyan", bold=True)) root = _get_templates_dir() for name in sorted(names): manifest_path = os.path.join(root, name, "manifest.json") if os.path.isfile(manifest_path): try: with open(manifest_path) as f: m = json.load(f) desc = m.get("description", "") click.echo(f" {name}") if desc: click.echo(click.style(f" {desc}", fg="white")) except (json.JSONDecodeError, OSError): click.echo(f" {name}") else: click.echo(f" {name}") @template.command("use") @click.argument("name") @click.option("--dir", "target_dir", type=click.Path(), default=".", help="Directory to copy template into (default: current)") def template_use(name, target_dir): """Copy a template into the current (or given) directory and create agent.lock.""" root = _get_templates_dir() template_path = os.path.join(root, name) if not os.path.isdir(template_path): click.echo(click.style(f"Template '{name}' not found.", fg="red")) click.echo("Run 'backpack template list' to see available templates.") sys.exit(1) manifest_path = os.path.join(template_path, "manifest.json") if not os.path.isfile(manifest_path): click.echo(click.style(f"Template '{name}' has no manifest.json.", fg="red")) sys.exit(1) try: with open(manifest_path) as f: manifest = json.load(f) except (json.JSONDecodeError, OSError) as e: click.echo(click.style(f"Invalid manifest: {e}", fg="red")) sys.exit(1) creds_list = manifest.get("credentials", []) personality = manifest.get("personality", {}) creds = {c: f"placeholder_{c.lower()}" for c in creds_list} personality_data = { "system_prompt": personality.get("system_prompt", "You are a helpful AI assistant."), "tone": personality.get("tone", "professional"), } os.makedirs(target_dir, exist_ok=True) agent_lock = AgentLock(file_path=os.path.join(target_dir, "agent.lock")) if os.path.exists(agent_lock.file_path) and not click.confirm(f"{agent_lock.file_path} exists. Overwrite?"): click.echo("Skipped agent.lock.") else: agent_lock.create(creds, personality_data) click.echo(click.style(f"[OK] Created {agent_lock.file_path}", fg="green")) agent_src = os.path.join(template_path, "agent.py") agent_dst = os.path.join(target_dir, "agent.py") if os.path.isfile(agent_src): if os.path.exists(agent_dst) and not click.confirm("agent.py exists. Overwrite?"): click.echo("Skipped agent.py.") else: shutil.copy2(agent_src, agent_dst) click.echo(click.style(f"[OK] Created {agent_dst}", fg="green")) click.echo(click.style("\nNext steps:", fg="cyan", bold=True)) for c in creds_list: click.echo(f" backpack key add {c}") click.echo(" backpack run agent.py") click.echo("") @cli.group() def config(): """Manage agent configuration.""" @config.command("personality") @click.option("--system-prompt", help="New system prompt") @click.option("--tone", help="New tone (e.g., professional, friendly)") def config_personality(system_prompt, tone): """ Update agent personality. """ agent_lock = AgentLock() if not os.path.exists(agent_lock.file_path): click.echo(click.style("No agent.lock found.", fg="red")) sys.exit(1) try: # Read current personality to preserve existing values if only one is updated current_data = agent_lock.read() if not current_data: click.echo(click.style("Failed to read agent.lock", fg="red")) sys.exit(1) current_personality = current_data.get("personality", {}) new_personality = current_personality.copy() if system_prompt: new_personality["system_prompt"] = system_prompt if tone: new_personality["tone"] = tone if new_personality == current_personality: click.echo("No changes specified.") return agent_lock.update_personality(new_personality) click.echo(click.style("[OK] Updated personality", fg="green")) except BackpackError as e: handle_error(e) except Exception as e: handle_error(e) @cli.command() @click.option("--fast", is_flag=True, help="Skip pauses (for scripting)") def demo(fast): """Show a short before/after demo of Backpack's value.""" click.echo(click.style("\n +============================================================+", fg="cyan")) click.echo(click.style(" | BACKPACK DEMO: Before vs After |", fg="cyan")) click.echo(click.style(" +============================================================+\n", fg="cyan")) click.echo(click.style(" BEFORE (Naked Agent):", fg="red", bold=True)) click.echo(" - Agent code + scattered .env / dashboard secrets") click.echo(" - 'TWITTER_API_KEY not set' -> crash -> find key -> paste -> restart") click.echo(" - Personality in code or random config files") click.echo("") click.echo(click.style(" AFTER (Backpack):", fg="green", bold=True)) click.echo(" - agent.lock travels with code (encrypted credentials + personality)") click.echo(" - Run: backpack run agent.py") click.echo(" - Prompt: 'This agent needs TWITTER_API_KEY. You have it. Allow? (Y/n)' -> Y") click.echo(" - Key injected into process only; never plain text on disk") click.echo(" - Personality versioned in Git with the agent") click.echo("") click.echo(click.style(" JIT INJECTION:", fg="cyan", bold=True)) click.echo(" 1. Backpack reads agent.lock -> sees required keys") click.echo(" 2. Checks your OS keychain (vault)") click.echo(" 3. Asks once per key -> injects into env for this run only") click.echo(" 4. Agent runs with credentials + AGENT_SYSTEM_PROMPT, AGENT_TONE") click.echo("") click.echo(click.style(" Try it:", fg="yellow", bold=True)) click.echo(" backpack quickstart # 2-minute setup") click.echo(" backpack template list # ready-made agents") click.echo(" backpack run agent.py # run with JIT injection") click.echo("") @cli.command() @click.argument("output_file", required=False) def export(output_file): """Export the current agent to a zip file.""" if not output_file: output_file = "backpack_agent.zip" if not output_file.endswith(".zip"): output_file += ".zip" files_to_export = ["agent.lock", "agent.py", "requirements.txt", "README.md"] found_files = [] try: with zipfile.ZipFile(output_file, "w") as zf: for f in files_to_export: if os.path.exists(f): zf.write(f) found_files.append(f) if found_files: click.echo(click.style(f"[OK] Exported {len(found_files)} files to {output_file}", fg="green")) for f in found_files: click.echo(f" - {f}") else: click.echo(click.style("No agent files found to export.", fg="yellow")) # Clean up empty zip if os.path.exists(output_file): os.remove(output_file) except Exception as e: click.echo(click.style(f"Export failed: {e}", fg="red")) sys.exit(1) @cli.command("import") @click.argument("input_file") @click.option("--dir", "target_dir", default=".", help="Target directory") def import_agent(input_file, target_dir): """Import an agent from a zip file.""" if not os.path.exists(input_file): click.echo(click.style(f"File {input_file} not found.", fg="red")) sys.exit(1) try: os.makedirs(target_dir, exist_ok=True) with zipfile.ZipFile(input_file, "r") as zf: zf.extractall(target_dir) click.echo(click.style(f"[OK] Imported agent to {target_dir}", fg="green")) click.echo("Files:") for name in zf.namelist(): click.echo(f" - {name}") except zipfile.BadZipFile: click.echo(click.style("Invalid zip file.", fg="red")) sys.exit(1) except Exception as e: click.echo(click.style(f"Import failed: {e}", fg="red")) sys.exit(1) @cli.command() def tutorial(): """Interactive tutorial to learn Backpack.""" click.echo(click.style("\n🎓 Backpack Interactive Tutorial", fg="cyan", bold=True)) click.echo("Welcome! This tutorial will guide you through the core concepts.\n") if not click.confirm("Ready to start?", default=True): click.echo("Maybe later! 👋") return # Step 1: Concepts click.echo(click.style("\n1. The Problem: The Naked Agent 😱", fg="yellow", bold=True)) click.echo("Agents usually have their keys scattered in .env files and config hardcoded.") click.echo("This makes them hard to share and insecure.") click.pause(info="Press any key to continue...") # Step 2: Agent Lock click.echo(click.style("\n2. The Solution: agent.lock 🔒", fg="yellow", bold=True)) click.echo("Backpack creates an encrypted file that travels with your code.") click.echo("It contains: Credentials (placeholders), Personality, and Memory.") click.pause(info="Press any key to continue...") # Step 3: JIT Injection click.echo(click.style("\n3. JIT Variable Injection 💉", fg="yellow", bold=True)) click.echo("When you run an agent, Backpack:") click.echo(" a. Reads agent.lock") click.echo(" b. Asks for permission to use keys from your secure vault") click.echo(" c. Injects them directly into the process memory") click.echo(" d. NEVER writes them to disk in plain text") click.pause(info="Press any key to continue...") # Step 4: Hands on click.echo(click.style("\n4. Let's try it! 🚀", fg="yellow", bold=True)) click.echo("We'll create a dummy agent now.") if click.confirm("Create 'tutorial_agent' directory?"): target_dir = "tutorial_agent" try: os.makedirs(target_dir, exist_ok=True) # Simulate init click.echo(f"\nRunning: backpack init (in {target_dir})") agent_lock = AgentLock(os.path.join(target_dir, "agent.lock")) if not os.path.exists(agent_lock.file_path): agent_lock.create({"OPENAI_API_KEY": "placeholder"}, {"system_prompt": "You are a student."}) click.echo(click.style("[OK] Created agent.lock", fg="green")) click.echo("\nNow you would run: backpack run agent.py") click.echo("And Backpack would ask to inject OPENAI_API_KEY.") except Exception as e: click.echo(click.style(f"Failed to create tutorial agent: {e}", fg="red")) click.echo(click.style("\n🎉 Tutorial Complete!", fg="green", bold=True)) click.echo("You are ready to use Backpack.") click.echo("Run 'backpack quickstart' to build your real agent.") @cli.command() @click.option("--json", "json_output", is_flag=True, help="Output in JSON format") def status(json_output): """Show current agent status.""" agent_lock = AgentLock() if not os.path.exists(agent_lock.file_path): if json_output: click.echo(json.dumps({"error": "No agent.lock found"})) else: click.echo(click.style("No agent.lock found in current directory.", fg="yellow")) return try: data = agent_lock.read() if not data: if json_output: click.echo(json.dumps({"error": "agent.lock is corrupted or unreadable"})) else: click.echo(click.style("agent.lock is corrupted or unreadable.", fg="red")) return if json_output: # Return pure data for machine consumption output = { "file_path": agent_lock.file_path, "size": os.stat(agent_lock.file_path).st_size, "layers": { "credentials": list(data.get("credentials", {}).keys()), "personality": data.get("personality", {}), "memory": data.get("memory", {}) } } click.echo(json.dumps(output, indent=2)) return click.echo(click.style(f"Agent Status ({agent_lock.file_path})", fg="cyan", bold=True)) # File info stat = os.stat(agent_lock.file_path) click.echo(f" Size: {stat.st_size} bytes") # Layers click.echo("\n Layers:") creds = data.get("credentials", {}) click.echo(f" - Credentials: {len(creds)} keys defined") for k in creds: click.echo(f" - {k}") personality = data.get("personality", {}) click.echo(f" - Personality: {len(personality)} items") memory = data.get("memory", {}) click.echo(f" - Memory: {len(memory)} items") except Exception as e: if json_output: click.echo(json.dumps({"error": str(e)})) else: click.echo(click.style(f"Error reading status: {e}", fg="red")) @cli.command() def info(): """Show system information.""" click.echo(click.style("Backpack Information", fg="cyan", bold=True)) click.echo(f" Version: {__version__}") click.echo(f" Python: {platform.python_version()} ({sys.executable})") click.echo(f" Platform: {platform.platform()}") click.echo(f" CWD: {os.getcwd()}") @cli.command() def version(): """Show version information.""" click.echo(f"backpack version {__version__}") @cli.command() def doctor(): """Check for common issues.""" click.echo(click.style("Running Backpack Doctor...", fg="cyan", bold=True)) issues = [] # Check Python version py_ver = sys.version_info if py_ver < (3, 7): issues.append("Python version is too old. Backpack requires 3.7+") else: click.echo(" [OK] Python version") # Check keyring try: import keyring try: keyring.get_keyring() click.echo(" [OK] Keyring backend found") except Exception as e: issues.append(f"Keyring error: {e}") except ImportError: issues.append("keyring library not installed") # Check cryptography try: import cryptography click.echo(f" [OK] Cryptography library ({cryptography.__version__})") except ImportError: issues.append("cryptography library not installed") if issues: click.echo(click.style("\nIssues found:", fg="red", bold=True)) for issue in issues: click.echo(f" - {issue}") sys.exit(1) else: click.echo(click.style("\nEverything looks good! 🎒", fg="green", bold=True)) if __name__ == "__main__": cli() ``` ```agent_lock.py """ Agent lock file management. This module provides the AgentLock class for creating, reading, and updating encrypted agent.lock files that contain credentials, personality, and memory. Logging in this module focuses on file paths and operation outcomes only. No decrypted contents are ever logged. """ import json import logging import os from typing import Dict, Any, Optional from .audit import AuditLogger from .crypto import encrypt_data, decrypt_data, DecryptionError, EncryptionError from .exceptions import ( AgentLockNotFoundError, AgentLockReadError, AgentLockWriteError, InvalidPathError, ValidationError, ) logger = logging.getLogger(__name__) class AgentLock: """ Manages encrypted agent.lock files containing agent configuration and state. The agent.lock file contains three encrypted layers: 1. Credentials: Placeholders for required API keys 2. Personality: System prompts and agent configuration 3. Memory: Ephemeral agent state All data is encrypted using a master key (from AGENT_MASTER_KEY env var). """ def __init__(self, file_path: str = "agent.lock", master_key: str = None): """ Initialize an AgentLock instance. Args: file_path: Path to the agent.lock file (default: "agent.lock") master_key: Optional master key to use (overrides env var) """ self.file_path = file_path self.master_key = master_key or os.environ.get("AGENT_MASTER_KEY", "default-key") self.audit_logger = AuditLogger() def create(self, credentials: Dict[str, str], personality: Dict[str, str], memory: Dict[str, Any] = None) -> None: """ Create a new agent.lock file with encrypted layers. Args: credentials: Dictionary mapping credential names to placeholder values personality: Dictionary containing system prompts and configuration memory: Optional dictionary for ephemeral agent state (default: empty dict) Raises: ValidationError: If input data is invalid EncryptionError: If encryption fails AgentLockWriteError: If writing the file fails """ if memory is None: memory = {} # Validate inputs if not isinstance(credentials, dict): raise ValidationError("Credentials must be a dictionary", f"Got type: {type(credentials).__name__}") if not isinstance(personality, dict): raise ValidationError("Personality must be a dictionary", f"Got type: {type(personality).__name__}") if not isinstance(memory, dict): raise ValidationError("Memory must be a dictionary", f"Got type: {type(memory).__name__}") try: data = { "version": "1.0", "layers": { "credentials": encrypt_data(json.dumps(credentials), self.master_key), "personality": encrypt_data(json.dumps(personality), self.master_key), "memory": encrypt_data(json.dumps(memory), self.master_key), }, } except (EncryptionError, ValidationError) as e: raise AgentLockWriteError(self.file_path, f"Failed to encrypt data: {str(e)}") from e try: # Ensure directory exists directory = os.path.dirname(self.file_path) if directory and not os.path.exists(directory): os.makedirs(directory, exist_ok=True) with open(self.file_path, "w") as f: json.dump(data, f, indent=2) logger.info("Created agent.lock file", extra={"path": self.file_path}) self.audit_logger.log_event("lock_created", {"path": self.file_path}) except PermissionError as e: raise AgentLockWriteError(self.file_path, f"Permission denied: {str(e)}") from e except OSError as e: raise AgentLockWriteError(self.file_path, f"OS error: {str(e)}") from e except Exception as e: raise AgentLockWriteError(self.file_path, f"Unexpected error: {str(e)}") from e def read(self) -> Optional[Dict[str, Dict[str, Any]]]: """ Read and decrypt the agent.lock file. Returns: A dictionary with keys 'credentials', 'personality', and 'memory', each containing the decrypted data. Returns None if the file doesn't exist or decryption fails. Raises: AgentLockReadError: If reading the file fails (I/O/permissions). InvalidPathError: If the path exists but is not a file. """ if not os.path.exists(self.file_path): logger.debug("agent.lock file not found", extra={"path": self.file_path}) return None if not os.path.isfile(self.file_path): raise InvalidPathError(self.file_path, "Path exists but is not a file") try: with open(self.file_path, "r") as f: data = json.load(f) except json.JSONDecodeError: # Corrupted file (or wrong content) -> treat as unreadable logger.warning("agent.lock file is not valid JSON", extra={"path": self.file_path}) return None except PermissionError as e: raise AgentLockReadError(self.file_path, f"Permission denied: {str(e)}") from e except OSError as e: raise AgentLockReadError(self.file_path, f"OS error: {str(e)}") from e except Exception as e: raise AgentLockReadError(self.file_path, f"Unexpected error reading file: {str(e)}") from e # Validate file structure (treat invalid as unreadable) if not isinstance(data, dict): logger.warning("agent.lock file has invalid structure", extra={"path": self.file_path}) return None if "layers" not in data or not isinstance(data["layers"], dict): logger.warning("agent.lock missing 'layers' section", extra={"path": self.file_path}) return None required_layers = ["credentials", "personality", "memory"] for layer in required_layers: if layer not in data["layers"]: logger.warning( "agent.lock missing required layer", extra={"path": self.file_path, "layer": layer}, ) return None try: result = { "credentials": json.loads(decrypt_data(data["layers"]["credentials"], self.master_key)), "personality": json.loads(decrypt_data(data["layers"]["personality"], self.master_key)), "memory": json.loads(decrypt_data(data["layers"]["memory"], self.master_key)), } logger.debug("Successfully read agent.lock file", extra={"path": self.file_path}) self.audit_logger.log_event("lock_read", {"path": self.file_path}) return result except DecryptionError: logger.warning("Failed to decrypt agent.lock file", extra={"path": self.file_path}) return None except json.JSONDecodeError: logger.warning("Decrypted agent.lock contents are not valid JSON", extra={"path": self.file_path}) return None except Exception as e: logger.error("Unexpected error reading agent.lock file", extra={"path": self.file_path, "error": str(e)}) return None def update_memory(self, memory: Dict[str, Any]) -> None: """ Update the memory layer of the agent.lock file. This preserves existing credentials and personality while updating only the ephemeral memory state. Args: memory: New memory dictionary to store Raises: AgentLockNotFoundError: If agent.lock file doesn't exist ValidationError: If memory is not a dictionary AgentLockWriteError: If writing the updated file fails """ if not isinstance(memory, dict): raise ValidationError("Memory must be a dictionary", f"Got type: {type(memory).__name__}") agent_data = self.read() if agent_data is None: raise AgentLockNotFoundError(self.file_path) try: agent_data["memory"] = memory self.create(agent_data["credentials"], agent_data["personality"], memory) self.audit_logger.log_event("lock_memory_updated", {"path": self.file_path}) except (ValidationError, EncryptionError, AgentLockWriteError): raise except Exception as e: raise AgentLockWriteError(self.file_path, f"Failed to update memory: {str(e)}") from e def update_personality(self, personality: Dict[str, str]) -> None: """ Update the personality layer of the agent.lock file. This preserves existing credentials and memory while updating only the personality configuration. Args: personality: New personality dictionary to store Raises: AgentLockNotFoundError: If agent.lock file doesn't exist ValidationError: If personality is not a dictionary AgentLockWriteError: If writing the updated file fails """ if not isinstance(personality, dict): raise ValidationError("Personality must be a dictionary", f"Got type: {type(personality).__name__}") agent_data = self.read() if agent_data is None: raise AgentLockNotFoundError(self.file_path) try: agent_data["personality"] = personality self.create(agent_data["credentials"], personality, agent_data["memory"]) self.audit_logger.log_event("lock_personality_updated", {"path": self.file_path}) except (ValidationError, EncryptionError, AgentLockWriteError): raise except Exception as e: raise AgentLockWriteError(self.file_path, f"Failed to update personality: {str(e)}") from e def get_required_keys(self) -> list: """ Get a list of required credential keys from the agent.lock file. Returns: A list of credential key names (e.g., ['OPENAI_API_KEY', 'TWITTER_TOKEN']) Raises: AgentLockNotFoundError: If agent.lock file doesn't exist AgentLockCorruptedError: If the file is corrupted """ agent_data = self.read() if agent_data is None: return [] if "credentials" in agent_data and isinstance(agent_data["credentials"], dict): return list(agent_data["credentials"].keys()) return [] ``` Evaluate this program and the repo it belongs to. Be vicious and tear into it.
Refactor code: remove casts, unify timestamps, relocate CLI entry point
The user wants me to evaluate a Python codebase. They've provided several files from what appears to be a "Backpack Agent Container System" - a CLI tool for managing AI agents with encrypted credentials.
Let me analyze the code:
  1. crypto.py: Handles encryption/decryption using Fernet (symmetric encryption) with PBKDF2 key derivation. Seems standard but I need to check for issues.
  2. audit.py: Encrypted audit logging. Stores encrypted JSON lines.
  3. keychain.py: OS keychain integration using the keyring library.
  4. exceptions.py: Custom exception classes.
  5. cli.py: Click-based CLI interface. Lots of commands (quickstart, init, rotate, run, key management, etc.)
  6. agent_lock.py: Manages the encrypted agent.lock file which stores credentials, personality, and memory.
Let me look for security issues, design flaws, bugs, and code quality issues:
Security Issues:
  1. Weak default master key: In agent_lock.py and crypto.py, if no master key is provided, it defaults to "default-key". This is catastrophic for security. The code mentions this in comments but doesn't prevent it.
  2. Timing attacks: No mention of constant-time comparison for password checking (though Fernet handles authentication).
  3. Key derivation iterations: 100,000 iterations for PBKDF2 is okay but could be higher (modern standard is often 600k+).
  4. Salt reuse: In derive_key, if salt is provided, it uses it. This is correct, but the API allows the caller to provide None for salt (which generates a new one).
  5. Audit log encryption uses same key: The audit log uses AGENT_MASTER_KEY from env var, defaulting to "default-key". If the user doesn't set this, the audit log is encrypted with a publicly known key.
  6. Race conditions: In audit.py, read_logs and log_event don't use file locking. Concurrent writes could corrupt the log.
  7. Zip slip vulnerability: In cli.py, the import_agent command extracts a zip file without checking for path traversal (../ in zip entries).
  8. Command injection potential: In run command, subprocess.run([sys.executable, script_path], env=env) - if script_path is user-controlled and contains spaces or special chars, this could be an issue, but it's passed as a list so it's safer. However, there's no validation that script_path is actually a file or that it's safe to execute.
  9. Sensitive data in memory: The code loads decrypted credentials into dictionaries and passes them around. No secure memory wiping.
Design/Architecture Issues:
  1. Mixed concerns: AgentLock class handles both file I/O and encryption logic. Should be separated.
  2. Global state: The AuditLogger is instantiated at module level in keychain.py, which happens at import time. This can cause issues with testing and configuration.
  3. Error handling: Many places catch generic Exception and wrap it. This can mask bugs.
  4. JSON encoding: Using standard json.dumps for complex data. No handling of non-serializable types.
  5. Registry implementation: The keychain registry is stored as a JSON string in the keyring under "_registry". This is inefficient and prone to corruption if multiple processes write concurrently.
  6. Environment variable pollution: The run command injects secrets into environment variables. This is insecure - env vars are visible to all child processes and can be leaked via /proc/*/environ on Linux.
Code Quality Issues:
  1. Type hints: Some functions have them, some don't. Inconsistent.
  2. Docstrings: Good docstrings but some are redundant.
  3. String formatting: Mix of f-strings and .format().
  4. Constants: Hardcoded values like iteration count (100000) should be constants.
  5. File permissions: No explicit setting of restrictive file permissions on created files (agent.lock, audit logs).
  6. Logging: Mixed use of logger and click.echo. Some log messages might leak to stdout inappropriately.
Specific File Issues:
crypto.py:
  • Line with os.urandom(16) - 16 bytes (128 bits) salt is fine.
  • derive_key accepts password: str - should check type first before checking truthiness (order matters for error messages).
  • Exception chaining uses from e properly.
audit.py:
  • No file locking mechanism.
  • AGENT_MASTER_KEY defaults to "default-key" - major security hole.
  • Line-by-line JSON parsing is slow for large logs.
keychain.py:
  • register_key doesn't validate if the key was actually stored before updating registry.
  • Race condition between storing key and updating registry.
cli.py:
  • The run command has complex logic for determining "cloud mode" that might be confusing.
  • quickstart generates Python code as a string - potential for syntax errors if agent_name contains quotes (though formatted with repr()).
  • rotate command creates new AgentLock with new key, but doesn't verify the old one was readable first? Actually it does read first.
  • tutorial command creates actual files - side effects in what should be a read-only demonstration.
agent_lock.py:
  • read() returns Optional[Dict] but callers often don't handle None properly.
  • get_required_keys() returns empty list on error, which might be confused with "no keys required".
  • No verification that the decrypted JSON structure matches expected schema.
Bugs:
  1. Memory layer encryption: In create(), memory is encrypted even if empty. This is fine but wasteful.
  2. Key rotation: The rotate command in cli.py calls agent_lock.create() with 3 arguments: creds, personality, memory. But looking at agent_lock.py, create() signature is create(self, credentials, personality, memory=None). This looks correct.
  3. Import error: In cli.py, AgentLock is imported from .agent_lock, but in the rotate command, it instantiates AgentLock(key_file) and then AgentLock(key_file, master_key=new_key). This creates two different instances, which is fine, but the second one writes with the new key.
  4. Zip extraction: In import_agent, no validation of zip contents before extraction.
  5. Path traversal in template use: target_dir is used with os.makedirs and file operations, but no validation that it's within intended bounds.
Minor Issues:
  • keychain.py uses cast(Dict[str, bool], ...) which is unnecessary if proper validation is done.
  • audit.py uses time.time() (float) and time.strftime() (string) - redundant timestamp info.
  • cli.py has if __name__ == "__main__": cli() but this is a package module, so it should probably be in __main__.py.
Documentation/UX:
  • The help text mentions "JIT variable injection" but this is just environment variable injection.
  • The difference between "vault" (keychain) and "agent.lock" encrypted storage is confusing.
  • No mention of how to backup or recover if master key is lost.
Let me structure this critique:
  1. Critical Security Flaws (must fix immediately)
  2. Serious Design Flaws (architectural issues)
  3. Code Quality Issues (maintainability)
  4. Minor Nitpicks
I'll be "vicious" as requested - direct, harsh, but technically accurate.
This is a naive, over-engineered security hazard masquerading as an "encrypted agent container." It reimplements poorly what already exists (agen, direnv, 1Password CLI) while introducing critical vulnerabilities. Here's the teardown:

Critical Security Flaws

1. The "Default Key" Catastrophe Your AgentLock defaults to "default-key" if AGENT_MASTER_KEY isn't set. This isn't a fallback—it's a backdoor. Every user who misses that env var has their credentials encrypted with a string that appears in your source code. An attacker with file access can decrypt every agent.lock file on the system instantly. This single line invalidates your entire security model.
2. Audit Log Theater You encrypt the audit log with the same master key (or "default-key"), creating a circular trust problem. Worse, audit.py appends to a file without file locking (f.write(log_line + "\n")). Concurrent processes will interleave JSON lines, corrupting the log and destroying the tamper-evidence you claim to provide. Also, the audit logger is instantiated at module import time in keychain.py, meaning it captures events before the user configures logging.
3. Zip Slip Vulnerability import_agent extracts zip files blindly:
Python
复制
zf.extractall(target_dir)  # No validation!
A malicious zip with ../../../../etc/cron.d/backdoor will traverse outside the target directory and overwrite system files. You don't validate entry paths.
4. Environment Variable Injection Your "JIT injection" dumps secrets into environment variables (env.update(env_vars)). Environment variables are not secret. On Linux, any user can read /proc/<pid>/environ of your running agent. Child processes inherit these variables and may leak them in crash reports. Real secret injection uses file descriptors (like systemd-creds or pass).
5. Race Conditions in Keychain Registry register_key updates the registry with a read-modify-write cycle:
Python
复制
registry = list_keys()  # Read
registry[key_name] = True  # Modify
keyring.set_password(...)  # Write
Two parallel backpack key add operations will lose one key from the registry due to the TOCTOU race. The registry itself is a bad idea—keyring backends already store metadata; you're layering a fragile JSON file on top.

Serious Design Flaws

1. The "Layers" Abstraction is Nonsense You encrypt three separate JSON blobs (credentials, personality, memory) with the same key, then wrap them in another JSON structure. This gives you the complexity of a multi-layer cryptosystem with none of the benefits. You can't rotate keys for individual layers, and you triple the encryption overhead. Just encrypt one JSON blob.
2. Salt Mismanagement In derive_key, you allow the caller to pass None for salt, which generates a random one. This is correct, but you then return the salt to the caller, who must store it. However, in encrypt_data, you base64-encode the salt and return it in a dict. The salt doesn't need to be secret, but your API design forces every caller to manage salt lifecycle manually, increasing the chance of salt reuse (which destroys the security of PBKDF2).
3. Error Handling That Hides Attacks Your read() method returns None for at least four different failure modes: file not found, JSON decode error, decryption failure, and invalid structure. Callers like get_required_keys() treat None as "no keys required" rather than "cryptographic failure." An attacker who corrupts the agent.lock file will cause the agent to run with no credentials injected, potentially falling back to unauthenticated mode. Failures should raise exceptions, not return empty success states.
4. Memory Management You load decrypted credentials into Python strings (value_to_inject = stored_key), which are immutable and linger in memory until garbage collection. There's no memset equivalent to wipe secrets. For a tool claiming "secure credential storage," this is amateur.
5. Misleading UX The "vault" vs. "agent.lock" distinction confuses local keychain storage with encrypted file storage. Users will inevitably commit the master key to git alongside the lock file, defeating the purpose. Your quickstart generates Python code with string formatting (_QUICKSTART_AGENT_SCRIPT.format(...)) that doesn't escape quotes, so an agent named My "Agent" will generate syntax errors.

Code Quality Issues

1. Import Chaos You have circular import risks (audit imports crypto, cli imports everything). cli.py is 600+ lines of god-object CLI handling. The AgentLock class violates single responsibility: it handles file I/O, encryption, schema validation, and audit logging.
2. Type Inconsistency get_required_keys() returns list (bare generic) while other functions return List[str]. You cast(Dict[str, bool], ...) in list_keys() instead of validating the JSON schema. The rotate command takes new_key as a string argument—so the master key ends up in shell history.
3. Logging Leaks While you claim "Logging in this module NEVER includes any secret material," your handle_error function logs e.details which often includes file paths and internal error strings that reveal system structure. The debug log in derive_key logs salt length—unnecessary metadata leakage.
4. Hardcoded Cryptographic Parameters 100,000 PBKDF2 iterations is 2015-era security. Modern standards (OWASP, NIST) recommend 600,000+ for SHA256. This is hardcoded with no configuration option, meaning users can't upgrade their security without patching your library.
5. Platform Assumptions You assume Unix-like paths in run command (sys.executable), but don't handle Windows-specific quoting or shebang issues. The doctor command checks for keyring but doesn't verify the backend is actually secure (it could be the plaintext "Windows" backend).

The Fatal Irony

Your "encrypted, tamper-evident log" uses authenticated encryption (Fernet), but your primary storage agent.lock is encrypted with the same key and has no integrity verification beyond "try to decrypt and see if it works." An attacker with write access can flip bits in the ciphertext to manipulate the decrypted JSON structure (if they know the key, or if you're using the default key, which everyone is).
Verdict: This is student-level code with enterprise-level marketing. It creates a false sense of security while introducing filesystem race conditions, default passwords, and zip traversal attacks. Use agen or sops instead—they handle key rotation, have proper threat models, and don't default to "default-key."
Delete the repository, learn about the Linux key retention service, and start over with a design that assumes the user is hostile and the filesystem is untrusted.