isphere.command.core_command module
A common set of capabilities for each command.
# Copyright (c) 2014-2015 Maximilien Riehl <max@riehl.io> # This work is free. You can redistribute it and/or modify it under the # terms of the Do What The Fuck You Want To Public License, Version 2, # as published by Sam Hocevar. See the COPYING.wtfpl file for more details. # """ A common set of capabilities for each command. """ from __future__ import print_function from cmd2 import Cmd import re from isphere.connection import CachingVSphere from isphere.interactive_wrapper import NotFound try: _input = raw_input except NameError: _input = input class NoOutput(Exception): """ To be raised inside an isphere `eval` command to avoid producing output. """ pass class CoreCommand(Cmd): """ The core capabilities of isphere commands, independent of the item type. """ def __init__(self): self.cache = CachingVSphere(self.hostname, self.username, self.password) Cmd.__init__(self) self.prompt = self.colorize("isphere > ", "green") def preloop(self): """ Called by the `cmd.Cmd` base class before entering the REPL loop. Displays information about the cached items. """ self.cache.fill() print( self.colorize("{0} VMs on {1} ESXis available.".format(self.cache.number_of_vms, self.cache.number_of_esxis), "blue")) print( self.colorize("{0} Distributed Virtual Switches configured.".format( self.cache.number_of_dvses), "blue")) def do_reload(self, _): """Usage: reload Reload VM cache from the vSphere server. Sample usage: `reload` """ self.preloop() def eval(self, line, item_name_generator, item_retriever, local_name): """ Run an eval command. This will retrieve items based on given patterns and execute a python statement against each of these items. The item will be bound to the given name for each of these statements. - line (type `str`): The text line provided by the user. The format should be as follows: <patterns> ! <statement> - item_name_generator (type `callable`): A function that should generate an iterable that represents the available item names for pattern matching. - item_retriever (type `callable`): A function that takes an item name generated by `item_name_generator` as input and returns the actual item. - local_name (type `str`): The name to bind the item to when evaluating the statement. """ try: patterns_and_statement = line.split("!", 1) patterns = patterns_and_statement[0] statement = patterns_and_statement[1] except IndexError: print(self.colorize("Looks like your input was malformed. Try `help eval_*`.", "red")) return for item_name in item_name_generator(patterns): def guard(): raise NoOutput() _globals, _locals = {}, {} try: item = item_retriever(item_name) except NotFound: print(self.colorize("Skipping {item} since it could not be retrieved.".format(item=item_name), "red")) continue _locals[local_name] = item _locals["no_output"] = guard _globals[local_name] = item _globals["no_output"] = guard separator = "-" item_name_header = " {name} ".format( name=item_name[:78]).center( 80, separator) try: result = eval(statement, _globals, _locals) print(self.colorize(item_name_header, "blue")) print(result) except NoOutput: pass except Exception as e: print(self.colorize(item_name_header, "red")) print(self.colorize("Eval failed for {0}: {1}".format(item_name, e), "red")) def compile_and_yield_generic_patterns(self, patterns, pattern_generator, item_count, risky=True): """ Compiles and returns regular expression patterns. Swallows the exception and complains if the patterns are invalid. Optionally (yes by default), warns and prompts in case no pattern was given. - patterns (type `str`): A space delimited sequence of regular expression patterns. - pattern_generator (type `callable`): A function that, given a list of patterns, will return matching item names. - item_count (type `int`): The number of available items before pattern matching. - risky (type `bool`, default `True`): Whether to warn when no patterns are given. """ if not patterns and risky: unformatted_message = "No pattern specified - you're doing this to all {count} items. Proceed? (y/N) " message = unformatted_message.format(count=self.colorize(str(item_count), "red")) if not _input(message).lower() == "y": return [] actual_patterns = patterns.strip().split(" ") try: compiled_patterns = [re.compile(pattern) for pattern in actual_patterns] except Exception as e: print(self.colorize("Invalid regular expression patterns: {0}".format(e), "red")) return [] return pattern_generator(compiled_patterns) @staticmethod def do_EOF(_): """ Exit this prompt. Usage: `EOF` (Ctrl+D also works). """ return True
Classes
class CoreCommand
The core capabilities of isphere commands, independent of the item type.
class CoreCommand(Cmd): """ The core capabilities of isphere commands, independent of the item type. """ def __init__(self): self.cache = CachingVSphere(self.hostname, self.username, self.password) Cmd.__init__(self) self.prompt = self.colorize("isphere > ", "green") def preloop(self): """ Called by the `cmd.Cmd` base class before entering the REPL loop. Displays information about the cached items. """ self.cache.fill() print( self.colorize("{0} VMs on {1} ESXis available.".format(self.cache.number_of_vms, self.cache.number_of_esxis), "blue")) print( self.colorize("{0} Distributed Virtual Switches configured.".format( self.cache.number_of_dvses), "blue")) def do_reload(self, _): """Usage: reload Reload VM cache from the vSphere server. Sample usage: `reload` """ self.preloop() def eval(self, line, item_name_generator, item_retriever, local_name): """ Run an eval command. This will retrieve items based on given patterns and execute a python statement against each of these items. The item will be bound to the given name for each of these statements. - line (type `str`): The text line provided by the user. The format should be as follows: <patterns> ! <statement> - item_name_generator (type `callable`): A function that should generate an iterable that represents the available item names for pattern matching. - item_retriever (type `callable`): A function that takes an item name generated by `item_name_generator` as input and returns the actual item. - local_name (type `str`): The name to bind the item to when evaluating the statement. """ try: patterns_and_statement = line.split("!", 1) patterns = patterns_and_statement[0] statement = patterns_and_statement[1] except IndexError: print(self.colorize("Looks like your input was malformed. Try `help eval_*`.", "red")) return for item_name in item_name_generator(patterns): def guard(): raise NoOutput() _globals, _locals = {}, {} try: item = item_retriever(item_name) except NotFound: print(self.colorize("Skipping {item} since it could not be retrieved.".format(item=item_name), "red")) continue _locals[local_name] = item _locals["no_output"] = guard _globals[local_name] = item _globals["no_output"] = guard separator = "-" item_name_header = " {name} ".format( name=item_name[:78]).center( 80, separator) try: result = eval(statement, _globals, _locals) print(self.colorize(item_name_header, "blue")) print(result) except NoOutput: pass except Exception as e: print(self.colorize(item_name_header, "red")) print(self.colorize("Eval failed for {0}: {1}".format(item_name, e), "red")) def compile_and_yield_generic_patterns(self, patterns, pattern_generator, item_count, risky=True): """ Compiles and returns regular expression patterns. Swallows the exception and complains if the patterns are invalid. Optionally (yes by default), warns and prompts in case no pattern was given. - patterns (type `str`): A space delimited sequence of regular expression patterns. - pattern_generator (type `callable`): A function that, given a list of patterns, will return matching item names. - item_count (type `int`): The number of available items before pattern matching. - risky (type `bool`, default `True`): Whether to warn when no patterns are given. """ if not patterns and risky: unformatted_message = "No pattern specified - you're doing this to all {count} items. Proceed? (y/N) " message = unformatted_message.format(count=self.colorize(str(item_count), "red")) if not _input(message).lower() == "y": return [] actual_patterns = patterns.strip().split(" ") try: compiled_patterns = [re.compile(pattern) for pattern in actual_patterns] except Exception as e: print(self.colorize("Invalid regular expression patterns: {0}".format(e), "red")) return [] return pattern_generator(compiled_patterns) @staticmethod def do_EOF(_): """ Exit this prompt. Usage: `EOF` (Ctrl+D also works). """ return True
Ancestors (in MRO)
- CoreCommand
- cmd2.Cmd
- cmd.Cmd
Class variables
var abbrev
var blankLinesAllowed
var case_insensitive
var colorcodes
var colors
var commentGrammars
var commentInProgress
var continuation_prompt
var current_script_dir
var debug
var defaultExtension
var default_file_name
var default_to_shell
var doc_header
var doc_leader
var echo
var editor
var excludeFromHistory
var feedback_to_output
var identchars
var intro
var kept_state
var lastcmd
var legalChars
var locals_in_py
var misc_header
var multilineCommands
var noSpecialParse
var nohelp
var prefixParser
var quiet
var redirector
var reserved_words
var ruler
var saveparser
var settable
var shortcuts
var terminators
var timing
var undoc_header
var urlre
var use_rawinput
Static methods
def do_EOF(
_)
Exit this prompt.
Usage: EOF
(Ctrl+D also works).
@staticmethod def do_EOF(_): """ Exit this prompt. Usage: `EOF` (Ctrl+D also works). """ return True
Instance variables
var cache
var prompt
Methods
def __init__(
self)
def __init__(self): self.cache = CachingVSphere(self.hostname, self.username, self.password) Cmd.__init__(self) self.prompt = self.colorize("isphere > ", "green")
def cmdloop(
self)
def cmdloop(self): parser = optparse.OptionParser() parser.add_option('-t', '--test', dest='test', action="store_true", help='Test against transcript(s) in FILE (wildcards OK)') (callopts, callargs) = parser.parse_args() if callopts.test: self.runTranscriptTests(callargs) else: if not self.run_commands_at_invocation(callargs): self._cmdloop()
def colorize(
self, val, color)
Given a string (val
), returns that string wrapped in UNIX-style
special characters that turn on (and then off) text color and style.
If the colors
environment paramter is False
, or the application
is running on Windows, will return val
unchanged.
color
should be one of the supported strings (or styles):
red/blue/green/cyan/magenta, bold, underline
def colorize(self, val, color): '''Given a string (``val``), returns that string wrapped in UNIX-style special characters that turn on (and then off) text color and style. If the ``colors`` environment paramter is ``False``, or the application is running on Windows, will return ``val`` unchanged. ``color`` should be one of the supported strings (or styles): red/blue/green/cyan/magenta, bold, underline''' if self.colors and (self.stdout == self.initial_stdout): return self.colorcodes[color][True] + val + self.colorcodes[color][False] return val
def columnize(
self, list, displaywidth=80)
Display a list of strings as a compact set of columns.
Each column is only as wide as necessary. Columns are separated by two spaces (one was not legible enough).
def columnize(self, list, displaywidth=80): """Display a list of strings as a compact set of columns. Each column is only as wide as necessary. Columns are separated by two spaces (one was not legible enough). """ if not list: self.stdout.write("<empty>\n") return nonstrings = [i for i in range(len(list)) if not isinstance(list[i], str)] if nonstrings: raise TypeError, ("list[i] not a string for i in %s" % ", ".join(map(str, nonstrings))) size = len(list) if size == 1: self.stdout.write('%s\n'%str(list[0])) return # Try every row count from 1 upwards for nrows in range(1, len(list)): ncols = (size+nrows-1) // nrows colwidths = [] totwidth = -2 for col in range(ncols): colwidth = 0 for row in range(nrows): i = row + nrows*col if i >= size: break x = list[i] colwidth = max(colwidth, len(x)) colwidths.append(colwidth) totwidth += colwidth + 2 if totwidth > displaywidth: break if totwidth <= displaywidth: break else: nrows = len(list) ncols = 1 colwidths = [0] for row in range(nrows): texts = [] for col in range(ncols): i = row + nrows*col if i >= size: x = "" else: x = list[i] texts.append(x) while texts and not texts[-1]: del texts[-1] for col in range(len(texts)): texts[col] = texts[col].ljust(colwidths[col]) self.stdout.write("%s\n"%str(" ".join(texts)))
def compile_and_yield_generic_patterns(
self, patterns, pattern_generator, item_count, risky=True)
Compiles and returns regular expression patterns. Swallows the exception and complains if the patterns are invalid. Optionally (yes by default), warns and prompts in case no pattern was given.
- patterns (type
str
): A space delimited sequence of regular expression patterns. - pattern_generator (type
callable
): A function that, given a list of patterns, will return matching item names. - item_count (type
int
): The number of available items before pattern matching. - risky (type
bool
, defaultTrue
): Whether to warn when no patterns are given.
def compile_and_yield_generic_patterns(self, patterns, pattern_generator, item_count, risky=True): """ Compiles and returns regular expression patterns. Swallows the exception and complains if the patterns are invalid. Optionally (yes by default), warns and prompts in case no pattern was given. - patterns (type `str`): A space delimited sequence of regular expression patterns. - pattern_generator (type `callable`): A function that, given a list of patterns, will return matching item names. - item_count (type `int`): The number of available items before pattern matching. - risky (type `bool`, default `True`): Whether to warn when no patterns are given. """ if not patterns and risky: unformatted_message = "No pattern specified - you're doing this to all {count} items. Proceed? (y/N) " message = unformatted_message.format(count=self.colorize(str(item_count), "red")) if not _input(message).lower() == "y": return [] actual_patterns = patterns.strip().split(" ") try: compiled_patterns = [re.compile(pattern) for pattern in actual_patterns] except Exception as e: print(self.colorize("Invalid regular expression patterns: {0}".format(e), "red")) return [] return pattern_generator(compiled_patterns)
def complete(
self, text, state)
Return the next possible completion for 'text'.
If a command has not been entered, then complete against command list.
Otherwise try to call complete_
def complete(self, text, state): """Return the next possible completion for 'text'. If a command has not been entered, then complete against command list. Otherwise try to call complete_<command> to get list of completions. """ if state == 0: import readline origline = readline.get_line_buffer() line = origline.lstrip() stripped = len(origline) - len(line) begidx = readline.get_begidx() - stripped endidx = readline.get_endidx() - stripped if begidx>0: cmd, args, foo = self.parseline(line) if cmd == '': compfunc = self.completedefault else: try: compfunc = getattr(self, 'complete_' + cmd) except AttributeError: compfunc = self.completedefault else: compfunc = self.completenames self.completion_matches = compfunc(text, line, begidx, endidx) try: return self.completion_matches[state] except IndexError: return None
def complete_help(
self, *args)
def complete_help(self, *args): commands = set(self.completenames(*args)) topics = set(a[5:] for a in self.get_names() if a.startswith('help_' + args[0])) return list(commands | topics)
def complete_statement(
self, line)
Keep accepting lines of input until the command is complete.
def complete_statement(self, line): """Keep accepting lines of input until the command is complete.""" if (not line) or ( not pyparsing.Or(self.commentGrammars). setParseAction(lambda x: '').transformString(line)): raise EmptyStatement() statement = self.parsed(line) while statement.parsed.multilineCommand and (statement.parsed.terminator == ''): statement = '%s\n%s' % (statement.parsed.raw, self.pseudo_raw_input(self.continuation_prompt)) statement = self.parsed(statement) if not statement.parsed.command: raise EmptyStatement() return statement
def completedefault(
self, *ignored)
Method called to complete an input line when no command-specific complete_*() method is available.
By default, it returns an empty list.
def completedefault(self, *ignored): """Method called to complete an input line when no command-specific complete_*() method is available. By default, it returns an empty list. """ return []
def completenames(
self, text, *ignored)
def completenames(self, text, *ignored): dotext = 'do_'+text return [a[3:] for a in self.get_names() if a.startswith(dotext)]
def default(
self, line)
Called on an input line when the command prefix is not recognized.
If this method is not overridden, it prints an error message and returns.
def default(self, line): """Called on an input line when the command prefix is not recognized. If this method is not overridden, it prints an error message and returns. """ self.stdout.write('*** Unknown syntax: %s\n'%line)
def do__load(
self, arg=None)
Runs script of command(s) from a file or URL.
def do_load(self, arg=None): """Runs script of command(s) from a file or URL.""" if arg is None: targetname = self.default_file_name else: arg = arg.split(None, 1) targetname, args = arg[0], (arg[1:] or [''])[0].strip() try: target = self.read_file_or_url(targetname) except IOError as e: self.perror('Problem accessing script from %s: \n%s' % (targetname, e)) return keepstate = Statekeeper(self, ('stdin','use_rawinput','prompt', 'continuation_prompt','current_script_dir')) self.stdin = target self.use_rawinput = False self.prompt = self.continuation_prompt = '' self.current_script_dir = os.path.split(targetname)[0] stop = self._cmdloop() self.stdin.close() keepstate.restore() self.lastcmd = '' return stop and (stop != self._STOP_SCRIPT_NO_EXIT)
def do__relative_load(
self, arg=None)
Runs commands in script at file or URL; if this is called from within an already-running script, the filename will be interpreted relative to the already-running script's directory.
def do__relative_load(self, arg=None): ''' Runs commands in script at file or URL; if this is called from within an already-running script, the filename will be interpreted relative to the already-running script's directory.''' if arg: arg = arg.split(None, 1) targetname, args = arg[0], (arg[1:] or [''])[0] targetname = os.path.join(self.current_script_dir or '', targetname) self.do__load('%s %s' % (targetname, args))
def do_cmdenvironment(
self, args)
Summary report of interactive parameters.
def do_cmdenvironment(self, args): '''Summary report of interactive parameters.''' self.stdout.write(""" Commands are %(casesensitive)scase-sensitive. Commands may be terminated with: %(terminators)s Settable parameters: %(settable)s\n""" % \ { 'casesensitive': (self.case_insensitive and 'not ') or '', 'terminators': str(self.terminators), 'settable': ' '.join(self.settable) })
def do_ed(
self, arg)
ed: edit most recent command in text editor ed [N]: edit numbered command from history ed [filename]: edit specified file name
commands are run after editor is closed. "set edit (program-name)" or set EDITOR environment variable to control which editing program is used.
def do_ed(self, arg): """ed: edit most recent command in text editor ed [N]: edit numbered command from history ed [filename]: edit specified file name commands are run after editor is closed. "set edit (program-name)" or set EDITOR environment variable to control which editing program is used.""" if not self.editor: raise EnvironmentError("Please use 'set editor' to specify your text editing program of choice.") filename = self.default_file_name if arg: try: buffer = self.last_matching(int(arg)) except ValueError: filename = arg buffer = '' else: buffer = self.history[-1] if buffer: f = open(os.path.expanduser(filename), 'w') f.write(buffer or '') f.close() os.system('%s %s' % (self.editor, filename)) self.do__load(filename)
def do_edit(
self, arg)
ed: edit most recent command in text editor ed [N]: edit numbered command from history ed [filename]: edit specified file name
commands are run after editor is closed. "set edit (program-name)" or set EDITOR environment variable to control which editing program is used.
def do_ed(self, arg): """ed: edit most recent command in text editor ed [N]: edit numbered command from history ed [filename]: edit specified file name commands are run after editor is closed. "set edit (program-name)" or set EDITOR environment variable to control which editing program is used.""" if not self.editor: raise EnvironmentError("Please use 'set editor' to specify your text editing program of choice.") filename = self.default_file_name if arg: try: buffer = self.last_matching(int(arg)) except ValueError: filename = arg buffer = '' else: buffer = self.history[-1] if buffer: f = open(os.path.expanduser(filename), 'w') f.write(buffer or '') f.close() os.system('%s %s' % (self.editor, filename)) self.do__load(filename)
def do_eof(
self, arg)
def do_EOF(self, arg): return self._STOP_SCRIPT_NO_EXIT # End of script; should not exit app
def do_exit(
self, arg)
def do_quit(self, arg): return self._STOP_AND_EXIT
def do_help(
self, arg)
def do_help(self, arg): if arg: funcname = self.func_named(arg) if funcname: fn = getattr(self, funcname) try: fn.optionParser.print_help(file=self.stdout) except AttributeError: cmd.Cmd.do_help(self, funcname[3:]) else: cmd.Cmd.do_help(self, arg)
def do_hi(
instance, arg)
history [arg]: lists past commands issued
| no arg: list all | arg is integer: list one history item, by index | arg is string: string search | arg is /enclosed in forward-slashes/: regular expression search
Usage: history [options] (limit on which commands to include)
Options: -h, --help show this help message and exit -s, --script Script format; no separation lines
def new_func(instance, arg): try: opts, newArgList = optionParser.parse_args(arg.split()) # Must find the remaining args in the original argument list, but # mustn't include the command itself #if hasattr(arg, 'parsed') and newArgList[0] == arg.parsed.command: # newArgList = newArgList[1:] newArgs = remaining_args(arg, newArgList) if isinstance(arg, ParsedString): arg = arg.with_args_replaced(newArgs) else: arg = newArgs except optparse.OptParseError as e: print (e) optionParser.print_help() return if hasattr(opts, '_exit'): return None result = func(instance, arg, opts) return result
def do_history(
instance, arg)
history [arg]: lists past commands issued
| no arg: list all | arg is integer: list one history item, by index | arg is string: string search | arg is /enclosed in forward-slashes/: regular expression search
Usage: history [options] (limit on which commands to include)
Options: -h, --help show this help message and exit -s, --script Script format; no separation lines
def new_func(instance, arg): try: opts, newArgList = optionParser.parse_args(arg.split()) # Must find the remaining args in the original argument list, but # mustn't include the command itself #if hasattr(arg, 'parsed') and newArgList[0] == arg.parsed.command: # newArgList = newArgList[1:] newArgs = remaining_args(arg, newArgList) if isinstance(arg, ParsedString): arg = arg.with_args_replaced(newArgs) else: arg = newArgs except optparse.OptParseError as e: print (e) optionParser.print_help() return if hasattr(opts, '_exit'): return None result = func(instance, arg, opts) return result
def do_l(
self, arg)
list [arg]: lists last command issued
no arg -> list most recent command arg is integer -> list one history item, by index a..b, a:b, a:, ..b -> list spans from a (or start) to b (or end) arg is string -> list all commands matching string search arg is /enclosed in forward-slashes/ -> regular expression search
def do_list(self, arg): """list [arg]: lists last command issued no arg -> list most recent command arg is integer -> list one history item, by index a..b, a:b, a:, ..b -> list spans from a (or start) to b (or end) arg is string -> list all commands matching string search arg is /enclosed in forward-slashes/ -> regular expression search """ try: history = self.history.span(arg or '-1') except IndexError: history = self.history.search(arg) for hi in history: self.poutput(hi.pr())
def do_li(
self, arg)
list [arg]: lists last command issued
no arg -> list most recent command arg is integer -> list one history item, by index a..b, a:b, a:, ..b -> list spans from a (or start) to b (or end) arg is string -> list all commands matching string search arg is /enclosed in forward-slashes/ -> regular expression search
def do_list(self, arg): """list [arg]: lists last command issued no arg -> list most recent command arg is integer -> list one history item, by index a..b, a:b, a:, ..b -> list spans from a (or start) to b (or end) arg is string -> list all commands matching string search arg is /enclosed in forward-slashes/ -> regular expression search """ try: history = self.history.span(arg or '-1') except IndexError: history = self.history.search(arg) for hi in history: self.poutput(hi.pr())
def do_list(
self, arg)
list [arg]: lists last command issued
no arg -> list most recent command arg is integer -> list one history item, by index a..b, a:b, a:, ..b -> list spans from a (or start) to b (or end) arg is string -> list all commands matching string search arg is /enclosed in forward-slashes/ -> regular expression search
def do_list(self, arg): """list [arg]: lists last command issued no arg -> list most recent command arg is integer -> list one history item, by index a..b, a:b, a:, ..b -> list spans from a (or start) to b (or end) arg is string -> list all commands matching string search arg is /enclosed in forward-slashes/ -> regular expression search """ try: history = self.history.span(arg or '-1') except IndexError: history = self.history.search(arg) for hi in history: self.poutput(hi.pr())
def do_load(
self, arg=None)
Runs script of command(s) from a file or URL.
def do_load(self, arg=None): """Runs script of command(s) from a file or URL.""" if arg is None: targetname = self.default_file_name else: arg = arg.split(None, 1) targetname, args = arg[0], (arg[1:] or [''])[0].strip() try: target = self.read_file_or_url(targetname) except IOError as e: self.perror('Problem accessing script from %s: \n%s' % (targetname, e)) return keepstate = Statekeeper(self, ('stdin','use_rawinput','prompt', 'continuation_prompt','current_script_dir')) self.stdin = target self.use_rawinput = False self.prompt = self.continuation_prompt = '' self.current_script_dir = os.path.split(targetname)[0] stop = self._cmdloop() self.stdin.close() keepstate.restore() self.lastcmd = '' return stop and (stop != self._STOP_SCRIPT_NO_EXIT)
def do_pause(
self, arg)
Displays the specified text then waits for the user to press RETURN.
def do_pause(self, arg): 'Displays the specified text then waits for the user to press RETURN.' raw_input(arg + '\n')
def do_py(
self, arg)
py Ctrl-D
(Unix) / Ctrl-Z
(Windows), quit()
, '`exit().
Non-python commands can be issued with
cmd("your command").
Run python code from external files with
run("filename.py")``
def do_py(self, arg): ''' py <command>: Executes a Python command. py: Enters interactive Python mode. End with ``Ctrl-D`` (Unix) / ``Ctrl-Z`` (Windows), ``quit()``, '`exit()``. Non-python commands can be issued with ``cmd("your command")``. Run python code from external files with ``run("filename.py")`` ''' self.pystate['self'] = self arg = arg.parsed.raw[2:].strip() localvars = (self.locals_in_py and self.pystate) or {} interp = InteractiveConsole(locals=localvars) interp.runcode('import sys, os;sys.path.insert(0, os.getcwd())') if arg.strip(): interp.runcode(arg) else: def quit(): raise EmbeddedConsoleExit def onecmd_plus_hooks(arg): return self.onecmd_plus_hooks(arg + '\n') def run(arg): try: file = open(arg) interp.runcode(file.read()) file.close() except IOError as e: self.perror(e) self.pystate['quit'] = quit self.pystate['exit'] = quit self.pystate['cmd'] = onecmd_plus_hooks self.pystate['run'] = run try: cprt = 'Type "help", "copyright", "credits" or "license" for more information.' keepstate = Statekeeper(sys, ('stdin','stdout')) sys.stdout = self.stdout sys.stdin = self.stdin interp.interact(banner= "Python %s on %s\n%s\n(%s)\n%s" % (sys.version, sys.platform, cprt, self.__class__.__name__, self.do_py.__doc__)) except EmbeddedConsoleExit: pass keepstate.restore()
def do_q(
self, arg)
def do_quit(self, arg): return self._STOP_AND_EXIT
def do_quit(
self, arg)
def do_quit(self, arg): return self._STOP_AND_EXIT
def do_r(
self, arg)
run [arg]: re-runs an earlier command
no arg -> run most recent command arg is integer -> run one history item, by index arg is string -> run most recent command by string search arg is /enclosed in forward-slashes/ -> run most recent by regex
def do_run(self, arg): """run [arg]: re-runs an earlier command no arg -> run most recent command arg is integer -> run one history item, by index arg is string -> run most recent command by string search arg is /enclosed in forward-slashes/ -> run most recent by regex """ 'run [N]: runs the SQL that was run N commands ago' runme = self.last_matching(arg) self.pfeedback(runme) if runme: stop = self.onecmd_plus_hooks(runme)
def do_reload(
self, _)
Usage: reload Reload VM cache from the vSphere server.
Sample usage: reload
def do_reload(self, _): """Usage: reload Reload VM cache from the vSphere server. Sample usage: `reload` """ self.preloop()
def do_run(
self, arg)
run [arg]: re-runs an earlier command
no arg -> run most recent command arg is integer -> run one history item, by index arg is string -> run most recent command by string search arg is /enclosed in forward-slashes/ -> run most recent by regex
def do_run(self, arg): """run [arg]: re-runs an earlier command no arg -> run most recent command arg is integer -> run one history item, by index arg is string -> run most recent command by string search arg is /enclosed in forward-slashes/ -> run most recent by regex """ 'run [N]: runs the SQL that was run N commands ago' runme = self.last_matching(arg) self.pfeedback(runme) if runme: stop = self.onecmd_plus_hooks(runme)
def do_save(
self, arg)
save [N] [filename.ext]
Saves command from history to file.
| N => Number of command (from history), or *
;
| most recent command if omitted
def do_save(self, arg): """`save [N] [filename.ext]` Saves command from history to file. | N => Number of command (from history), or `*`; | most recent command if omitted""" try: args = self.saveparser.parseString(arg) except pyparsing.ParseException: self.perror('Could not understand save target %s' % arg) raise SyntaxError(self.do_save.__doc__) fname = args.fname or self.default_file_name if args.idx == '*': saveme = '\n\n'.join(self.history[:]) elif args.idx: saveme = self.history[int(args.idx)-1] else: saveme = self.history[-1] try: f = open(os.path.expanduser(fname), 'w') f.write(saveme) f.close() self.pfeedback('Saved to %s' % (fname)) except Exception as e: self.perror('Error saving %s' % (fname)) raise
def do_set(
self, arg)
Sets a cmd2 parameter. Accepts abbreviated parameter names so long as there is no ambiguity. Call without arguments for a list of settable parameters with their values.
def do_set(self, arg): ''' Sets a cmd2 parameter. Accepts abbreviated parameter names so long as there is no ambiguity. Call without arguments for a list of settable parameters with their values.''' try: statement, paramName, val = arg.parsed.raw.split(None, 2) val = val.strip() paramName = paramName.strip().lower() if paramName not in self.settable: hits = [p for p in self.settable if p.startswith(paramName)] if len(hits) == 1: paramName = hits[0] else: return self.do_show(paramName) currentVal = getattr(self, paramName) if (val[0] == val[-1]) and val[0] in ("'", '"'): val = val[1:-1] else: val = cast(currentVal, val) setattr(self, paramName, val) self.stdout.write('%s - was: %s\nnow: %s\n' % (paramName, currentVal, val)) if currentVal != val: try: onchange_hook = getattr(self, '_onchange_%s' % paramName) onchange_hook(old=currentVal, new=val) except AttributeError: pass except (ValueError, AttributeError, NotSettableError) as e: self.do_show(arg)
def do_shell(
self, arg)
execute a command as if at the OS prompt.
def do_shell(self, arg): 'execute a command as if at the OS prompt.' os.system(arg)
def do_shortcuts(
self, args)
Lists single-key shortcuts available.
def do_shortcuts(self, args): """Lists single-key shortcuts available.""" result = "\n".join('%s: %s' % (sc[0], sc[1]) for sc in sorted(self.shortcuts)) self.stdout.write("Single-key shortcuts for other commands:\n%s\n" % (result))
def do_show(
instance, arg)
Shows value of a parameter. Usage: show [options] arg
Options: -h, --help show this help message and exit -l, --long describe function of parameter
def new_func(instance, arg): try: opts, newArgList = optionParser.parse_args(arg.split()) # Must find the remaining args in the original argument list, but # mustn't include the command itself #if hasattr(arg, 'parsed') and newArgList[0] == arg.parsed.command: # newArgList = newArgList[1:] newArgs = remaining_args(arg, newArgList) if isinstance(arg, ParsedString): arg = arg.with_args_replaced(newArgs) else: arg = newArgs except optparse.OptParseError as e: print (e) optionParser.print_help() return if hasattr(opts, '_exit'): return None result = func(instance, arg, opts) return result
def emptyline(
self)
Called when an empty line is entered in response to the prompt.
If this method is not overridden, it repeats the last nonempty command entered.
def emptyline(self): """Called when an empty line is entered in response to the prompt. If this method is not overridden, it repeats the last nonempty command entered. """ if self.lastcmd: return self.onecmd(self.lastcmd)
def eval(
self, line, item_name_generator, item_retriever, local_name)
Run an eval command. This will retrieve items based on given patterns and execute a python statement against each of these items. The item will be bound to the given name for each of these statements.
- line (type
str
): The text line provided by the user. The format should be as follows:! - item_name_generator (type
callable
): A function that should generate an iterable that represents the available item names for pattern matching. - item_retriever (type
callable
): A function that takes an item name generated byitem_name_generator
as input and returns the actual item. - local_name (type
str
): The name to bind the item to when evaluating the statement.
def eval(self, line, item_name_generator, item_retriever, local_name): """ Run an eval command. This will retrieve items based on given patterns and execute a python statement against each of these items. The item will be bound to the given name for each of these statements. - line (type `str`): The text line provided by the user. The format should be as follows: <patterns> ! <statement> - item_name_generator (type `callable`): A function that should generate an iterable that represents the available item names for pattern matching. - item_retriever (type `callable`): A function that takes an item name generated by `item_name_generator` as input and returns the actual item. - local_name (type `str`): The name to bind the item to when evaluating the statement. """ try: patterns_and_statement = line.split("!", 1) patterns = patterns_and_statement[0] statement = patterns_and_statement[1] except IndexError: print(self.colorize("Looks like your input was malformed. Try `help eval_*`.", "red")) return for item_name in item_name_generator(patterns): def guard(): raise NoOutput() _globals, _locals = {}, {} try: item = item_retriever(item_name) except NotFound: print(self.colorize("Skipping {item} since it could not be retrieved.".format(item=item_name), "red")) continue _locals[local_name] = item _locals["no_output"] = guard _globals[local_name] = item _globals["no_output"] = guard separator = "-" item_name_header = " {name} ".format( name=item_name[:78]).center( 80, separator) try: result = eval(statement, _globals, _locals) print(self.colorize(item_name_header, "blue")) print(result) except NoOutput: pass except Exception as e: print(self.colorize(item_name_header, "red")) print(self.colorize("Eval failed for {0}: {1}".format(item_name, e), "red"))
def fileimport(
self, statement, source)
def fileimport(self, statement, source): try: f = open(os.path.expanduser(source)) except IOError: self.stdout.write("Couldn't read from file %s\n" % source) return '' data = f.read() f.close() return data
def func_named(
self, arg)
def func_named(self, arg): result = None target = 'do_' + arg if target in dir(self): result = target else: if self.abbrev: # accept shortened versions of commands funcs = [fname for fname in self.keywords if fname.startswith(arg)] if len(funcs) == 1: result = 'do_' + funcs[0] return result
def get_names(
self)
def get_names(self): # This method used to pull in base class attributes # at a time dir() didn't do it yet. return dir(self.__class__)
def last_matching(
self, arg)
def last_matching(self, arg): try: if arg: return self.history.get(arg)[-1] else: return self.history[-1] except IndexError: return None
def onecmd(
self, line)
Interpret the argument as though it had been typed in response to the prompt.
This may be overridden, but should not normally need to be; see the precmd() and postcmd() methods for useful execution hooks. The return value is a flag indicating whether interpretation of commands by the interpreter should stop.
This (cmd2
) version of onecmd
already override's cmd
's onecmd
.
def onecmd(self, line): """Interpret the argument as though it had been typed in response to the prompt. This may be overridden, but should not normally need to be; see the precmd() and postcmd() methods for useful execution hooks. The return value is a flag indicating whether interpretation of commands by the interpreter should stop. This (`cmd2`) version of `onecmd` already override's `cmd`'s `onecmd`. """ statement = self.parsed(line) self.lastcmd = statement.parsed.raw funcname = self.func_named(statement.parsed.command) if not funcname: return self._default(statement) try: func = getattr(self, funcname) except AttributeError: return self._default(statement) stop = func(statement) return stop
def onecmd_plus_hooks(
self, line)
def onecmd_plus_hooks(self, line): # The outermost level of try/finally nesting can be condensed once # Python 2.4 support can be dropped. stop = 0 try: try: statement = self.complete_statement(line) (stop, statement) = self.postparsing_precmd(statement) if stop: return self.postparsing_postcmd(stop) if statement.parsed.command not in self.excludeFromHistory: self.history.append(statement.parsed.raw) try: self.redirect_output(statement) timestart = datetime.datetime.now() statement = self.precmd(statement) stop = self.onecmd(statement) stop = self.postcmd(stop, statement) if self.timing: self.pfeedback('Elapsed: %s' % str(datetime.datetime.now() - timestart)) finally: self.restore_output(statement) except EmptyStatement: return 0 except Exception as e: self.perror(str(e), statement) finally: return self.postparsing_postcmd(stop)
def parsed(
self, raw, **kwargs)
def parsed(self, raw, **kwargs): if isinstance(raw, ParsedString): p = raw else: # preparse is an overridable hook; default makes no changes s = self.preparse(raw, **kwargs) s = self.inputParser.transformString(s.lstrip()) s = self.commentGrammars.transformString(s) for (shortcut, expansion) in self.shortcuts: if s.lower().startswith(shortcut): s = s.replace(shortcut, expansion + ' ', 1) break result = self.parser.parseString(s) result['raw'] = raw result['command'] = result.multilineCommand or result.command result = self.postparse(result) p = ParsedString(result.args) p.parsed = result p.parser = self.parsed for (key, val) in kwargs.items(): p.parsed[key] = val return p
def parseline(
self, line)
Parse the line into a command name and a string containing the arguments. Returns a tuple containing (command, args, line). 'command' and 'args' may be None if the line couldn't be parsed.
def parseline(self, line): """Parse the line into a command name and a string containing the arguments. Returns a tuple containing (command, args, line). 'command' and 'args' may be None if the line couldn't be parsed. """ line = line.strip() if not line: return None, None, line elif line[0] == '?': line = 'help ' + line[1:] elif line[0] == '!': if hasattr(self, 'do_shell'): line = 'shell ' + line[1:] else: return None, None, line i, n = 0, len(line) while i < n and line[i] in self.identchars: i = i+1 cmd, arg = line[:i], line[i:].strip() return cmd, arg, line
def perror(
self, errmsg, statement=None)
def perror(self, errmsg, statement=None): if self.debug: traceback.print_exc() print (str(errmsg))
def pfeedback(
self, msg)
For printing nonessential feedback. Can be silenced with quiet
.
Inclusion in redirected output is controlled by feedback_to_output
.
def pfeedback(self, msg): """For printing nonessential feedback. Can be silenced with `quiet`. Inclusion in redirected output is controlled by `feedback_to_output`.""" if not self.quiet: if self.feedback_to_output: self.poutput(msg) else: print (msg)
def postcmd(
self, stop, line)
Hook method executed just after a command dispatch is finished.
def postcmd(self, stop, line): """Hook method executed just after a command dispatch is finished.""" return stop
def postloop(
self)
Hook method executed once when the cmdloop() method is about to return.
def postloop(self): """Hook method executed once when the cmdloop() method is about to return. """ pass
def postparse(
self, parseResult)
def postparse(self, parseResult): return parseResult
def postparsing_postcmd(
self, stop)
def postparsing_postcmd(self, stop): return stop
def postparsing_precmd(
self, statement)
def postparsing_precmd(self, statement): stop = 0 return stop, statement
def poutput(
self, msg)
Convenient shortcut for self.stdout.write(); adds newline if necessary.
def poutput(self, msg): '''Convenient shortcut for self.stdout.write(); adds newline if necessary.''' if msg: self.stdout.write(msg) if msg[-1] != '\n': self.stdout.write('\n')
def precmd(
self, line)
Hook method executed just before the command line is interpreted, but after the input prompt is generated and issued.
def precmd(self, line): """Hook method executed just before the command line is interpreted, but after the input prompt is generated and issued. """ return line
def preloop(
self)
Called by the cmd.Cmd
base class before entering the REPL loop.
Displays information about the cached items.
def preloop(self): """ Called by the `cmd.Cmd` base class before entering the REPL loop. Displays information about the cached items. """ self.cache.fill() print( self.colorize("{0} VMs on {1} ESXis available.".format(self.cache.number_of_vms, self.cache.number_of_esxis), "blue")) print( self.colorize("{0} Distributed Virtual Switches configured.".format( self.cache.number_of_dvses), "blue"))
def preparse(
self, raw, **kwargs)
def preparse(self, raw, **kwargs): return raw
def print_topics(
self, header, cmds, cmdlen, maxcol)
def print_topics(self, header, cmds, cmdlen, maxcol): if cmds: self.stdout.write("%s\n"%str(header)) if self.ruler: self.stdout.write("%s\n"%str(self.ruler * len(header))) self.columnize(cmds, maxcol-1) self.stdout.write("\n")
def pseudo_raw_input(
self, prompt)
copied from cmd's cmdloop; like raw_input, but accounts for changed stdin, stdout
def pseudo_raw_input(self, prompt): """copied from cmd's cmdloop; like raw_input, but accounts for changed stdin, stdout""" if self.use_rawinput: try: line = raw_input(prompt) except EOFError: line = 'EOF' else: self.stdout.write(prompt) self.stdout.flush() line = self.stdin.readline() if not len(line): line = 'EOF' else: if line[-1] == '\n': # this was always true in Cmd line = line[:-1] return line
def read_file_or_url(
self, fname)
def read_file_or_url(self, fname): # TODO: not working on localhost if isinstance(fname, file): result = open(fname, 'r') else: match = self.urlre.match(fname) if match: result = urllib.urlopen(match.group(1)) else: fname = os.path.expanduser(fname) try: result = open(os.path.expanduser(fname), 'r') except IOError: result = open('%s.%s' % (os.path.expanduser(fname), self.defaultExtension), 'r') return result
def redirect_output(
self, statement)
def redirect_output(self, statement): if statement.parsed.pipeTo: self.kept_state = Statekeeper(self, ('stdout',)) self.kept_sys = Statekeeper(sys, ('stdout',)) self.redirect = subprocess.Popen(statement.parsed.pipeTo, shell=True, stdout=subprocess.PIPE, stdin=subprocess.PIPE) sys.stdout = self.stdout = self.redirect.stdin elif statement.parsed.output: if (not statement.parsed.outputTo) and (not can_clip): raise EnvironmentError('Cannot redirect to paste buffer; install ``xclip`` and re-run to enable') self.kept_state = Statekeeper(self, ('stdout',)) self.kept_sys = Statekeeper(sys, ('stdout',)) if statement.parsed.outputTo: mode = 'w' if statement.parsed.output == 2 * self.redirector: mode = 'a' sys.stdout = self.stdout = open(os.path.expanduser(statement.parsed.outputTo), mode) else: sys.stdout = self.stdout = tempfile.TemporaryFile(mode="w+") if statement.parsed.output == '>>': self.stdout.write(get_paste_buffer())
def restore_output(
self, statement)
def restore_output(self, statement): if self.kept_state: if statement.parsed.output: if not statement.parsed.outputTo: self.stdout.seek(0) write_to_paste_buffer(self.stdout.read()) elif statement.parsed.pipeTo: for result in self.redirect.communicate(): self.kept_state.stdout.write(result or '') self.stdout.close() self.kept_state.restore() self.kept_sys.restore() self.kept_state = None
def runTranscriptTests(
self, callargs)
def runTranscriptTests(self, callargs): class TestMyAppCase(Cmd2TestCase): CmdApp = self.__class__ self.__class__.testfiles = callargs sys.argv = [sys.argv[0]] # the --test argument upsets unittest.main() testcase = TestMyAppCase() runner = unittest.TextTestRunner() result = runner.run(testcase) result.printErrors()
def run_commands_at_invocation(
self, callargs)
def run_commands_at_invocation(self, callargs): for initial_command in callargs: if self.onecmd_plus_hooks(initial_command + '\n'): return self._STOP_AND_EXIT
def select(
self, options, prompt='Your choice? ')
Presents a numbered menu to the user. Modelled after the bash shell's SELECT. Returns the item chosen.
Argument options
can be:
| a single string -> will be split into one-word options | a list of strings -> will be offered as options | a list of tuples -> interpreted as (value, text), so that the return value can differ from the text advertised to the user
def select(self, options, prompt='Your choice? '): '''Presents a numbered menu to the user. Modelled after the bash shell's SELECT. Returns the item chosen. Argument ``options`` can be: | a single string -> will be split into one-word options | a list of strings -> will be offered as options | a list of tuples -> interpreted as (value, text), so that the return value can differ from the text advertised to the user ''' if isinstance(options, basestring): options = zip(options.split(), options.split()) fulloptions = [] for opt in options: if isinstance(opt, basestring): fulloptions.append((opt, opt)) else: try: fulloptions.append((opt[0], opt[1])) except IndexError: fulloptions.append((opt[0], opt[0])) for (idx, (value, text)) in enumerate(fulloptions): self.poutput(' %2d. %s\n' % (idx+1, text)) while True: response = raw_input(prompt) try: response = int(response) result = fulloptions[response - 1][0] break except ValueError: pass # loop and ask again return result
class NoOutput
To be raised inside an isphere eval
command to avoid producing output.
class NoOutput(Exception): """ To be raised inside an isphere `eval` command to avoid producing output. """ pass
Ancestors (in MRO)
- NoOutput
- exceptions.Exception
- exceptions.BaseException
- __builtin__.object
Class variables
var args
var message