Servidor SSH con Twisted

Este post fue migrado de un blog hecho con Wordpress. Si se ve mal, dejame un comentario y lo arreglo.

Actualicé este ejemplo, tamibién incluido en el libro de Twisted para que no tire Deprecation Warnings:

from twisted.cred import portal, checkers, credentials

from twisted.conch import error, avatar, recvline, interfaces as conchinterfaces

from twisted.conch.ssh import factory, userauth, connection, keys, session, common

from twisted.conch.insults import insults

from twisted.application import service, internet

from zope.interface import implements

import os

class SSHDemoProtocol(recvline.HistoricRecvLine):

def __init__(self, user):

    self.user = user



def connectionMade(self) :

    recvline.HistoricRecvLine.connectionMade(self)

    self.terminal.write("Welcome to my test SSH server.")

    self.terminal.nextLine()

    self.do_help()

    self.showPrompt()



def showPrompt(self):

    self.terminal.write("$ ")



def getCommandFunc(self, cmd):

    return getattr(self, 'do_' + cmd, None)



def lineReceived(self, line):

    line = line.strip()

    if line:

        cmdAndArgs = line.split()

        cmd = cmdAndArgs[0]

        args = cmdAndArgs[1:]

        func = self.getCommandFunc(cmd)

        if func:

           try:

               func(*args)

           except Exception, e:

               self.terminal.write("Error: %s" % e)

               self.terminal.nextLine()

        else:

           self.terminal.write("No such command.")

           self.terminal.nextLine()

    self.showPrompt()



def do_help(self, cmd=''):

    "Get help on a command. Usage: help command"

    if cmd:

        func = self.getCommandFunc(cmd)

        if func:

            self.terminal.write(func.__doc__)

            self.terminal.nextLine()

            return



    publicMethods = filter(

        lambda funcname: funcname.startswith('do_'), dir(self))

    commands = [cmd.replace('do_', '', 1) for cmd in publicMethods]

    self.terminal.write("Commands: " + " ".join(commands))

    self.terminal.nextLine()



def do_echo(self, *args):

    "Echo a string. Usage: echo my line of text"

    self.terminal.write(" ".join(args))

    self.terminal.nextLine()



def do_whoami(self):

    "Prints your user name. Usage: whoami"

    self.terminal.write(self.user.username)

    self.terminal.nextLine()



def do_quit(self):

    "Ends your session. Usage: quit"

    self.terminal.write("Thanks for playing!")

    self.terminal.nextLine()

    self.terminal.loseConnection()



def do_clear(self):

    "Clears the screen. Usage: clear"

    self.terminal.reset()

class SSHDemoAvatar(avatar.ConchUser):

implements(conchinterfaces.ISession)



def __init__(self, username):

    avatar.ConchUser.__init__(self)

    self.username = username

    self.channelLookup.update({'session':session.SSHSession})



def openShell(self, protocol):

    serverProtocol = insults.ServerProtocol(SSHDemoProtocol, self)

    serverProtocol.makeConnection(protocol)

    protocol.makeConnection(session.wrapProtocol(serverProtocol))



def getPty(self, terminal, windowSize, attrs):

    return None



def execCommand(self, protocol, cmd):

    raise NotImplementedError



def closed(self):

    pass

class SSHDemoRealm:

implements(portal.IRealm)



def requestAvatar(self, avatarId, mind, *interfaces):

    if conchinterfaces.IConchUser in interfaces:

        return interfaces[0], SSHDemoAvatar(avatarId), lambda: None

    else:

        raise Exception, "No supported interfaces found."

def getRSAKeys():

if not (os.path.exists('public.key') and os.path.exists('private.key')):

    # generate a RSA keypair

    print "Generating RSA keypair..."

    from Crypto.PublicKey import RSA

    KEY_LENGTH = 1024

    rsaKey = RSA.generate(KEY_LENGTH, common.entropy.get_bytes)

    publicKeyString = keys.Key(rsaKey).toString()

    privateKeyString = keys.makePrivateKeyString(rsaKey)

    # save keys for next time

    file('public.key', 'w+b').write(publicKeyString)

    file('private.key', 'w+b').write(privateKeyString)

    print "done."

else:

    publicKeyString = file('public.key').read()

    privateKeyString = file('private.key').read()

return publicKeyString, privateKeyString

if name == "main":

sshFactory = factory.SSHFactory()

sshFactory.portal = portal.Portal(SSHDemoRealm())

users = {'admin': 'aaa', 'guest': 'bbb'}

sshFactory.portal.registerChecker(

checkers.InMemoryUsernamePasswordDatabaseDontUse(**users))

pubKeyString, privKeyString = getRSAKeys()

sshFactory.publicKeys = {

    'ssh-rsa': keys.Key.fromString(pubKeyString)}

sshFactory.privateKeys = {

    'ssh-rsa': keys.Key.fromString(privKeyString)}



from twisted.internet import reactor

reactor.listenTCP(2222, sshFactory)

reactor.run()</pre></body></html>

Comentarios

Comments powered by Disqus