from twisted.protocols import basic from twisted.conch import telnet from twisted.internet.protocol import Factory,ServerFactory from twisted.internet import protocol, reactor from zope.interface import implements from twisted.python import log import ansi_cython as ansi import engine import sys import config class AnsiTelnet(protocol.Protocol): implements(telnet.ITelnetProtocol) rgoptRemote = [telnet.SGA, telnet.ECHO] #putty sez: 31 (NAWS), 32 (TERMINAL-SPEED), 24 (TERMINAL-TYPE), 39 (NEW-ENVIRON), 36 (ENVIRON), 3(SGA) # We might support NAWS at some point -- internally we will always deal in 80x25, but it might be helpful to have # window resizing work, and provide feedback if the window is too small. # I don't know what we'd do in response to TERMINAL-SPEED, since we're just going to blast out data as fast as possible anyway. # NEW-ENVIRON might be useful for sending usernames or cookies automatically from flashterm. # ENVIRON is an older version of NEW-ENVIRON. # TERMINAL-TYPE is client-to-server only, and the server only speaks ANSI. Again, maybe useful as a feedback mechanism. rgoptLocal = [telnet.SGA] #putty sez: 1 (ECHO), 3 (SGA) def __init__(self, client): print "created" self.ast = None self.client = client self.evKey = None self.evArrow = None self.Refresh() def connectionMade(self): print "connection made" rgdefr = [] def AddCb(defr): """ rgdefr.append(defr) def cbWaitForHello(result): print "cb: ", result, len(rgdefr) rgdefr.remove(defr) if len(rgdefr) == 0: self.Hello() defr.addCallback(cbWaitForHello) defr.addErrback(cbWaitForHello) """ pass for opt in self.rgoptRemote: AddCb(self.transport.will(opt)) for opt in self.rgoptLocal: AddCb(self.transport.do(opt)) print "end: ", len(rgdefr) self.Hello() def enableLocal(self, option): print "enableLocal", ord(option) return option in self.rgoptLocal def enableRemote(self, option): print "enableRemote", ord(option) return option in self.rgoptRemote def disableLocal(self, option): print "disableLocal", option pass def disableRemote(self, option): print "disableRemote", option pass def dataReceived(self, data): if self.ast != None: data = self.ast + data ich = 0 ichSplit = 0 fAnsi = False fEsc = False for ch in data: if fAnsi: if not (ch == ";" or ch.isdigit()): fAnsi = False if ch.isalpha(): stRgarg = data[ichSplit + 2:ich] if len(stRgarg) == 0: rgarg = [] else: rgarg = [int(stArg) for stArg in stRgarg.split(";")] self.RcvAnsi(ch, rgarg) ichSplit = ich + 1 elif ch == ansi.esc[0]: fEsc = True elif fEsc: fEsc = False if ch == ansi.esc[1]: fAnsi = True if ichSplit < ich - 1: self.RcvText(data[ichSplit:ich - 1]) ichSplit = ich - 1 ich = ich + 1 if fEsc or fAnsi: self.ast = data[ichSplit:ich] else: self.ast = None if ichSplit < ich: self.RcvText(data[ichSplit:ich]) def Hello(self): print "negotiated!" self.transport.write(ansi.esc + "2J") #cls self.transport.write(ansi.esc + "=25l") #hide the cursor self.client.dgJoinGame = self.CreateEvents self.client.addDgQuit(self.transport.loseConnection) self.client.go(self.factory.fnRun, self) def RcvText(self, text): if self.client.evKey: for ch in text: self.client.evKey.fire(ch) mpcode_key = {"A": ansi.K_UP, "B": ansi.K_DOWN, "C": ansi.K_RIGHT, "D": ansi.K_LEFT, "K": ansi.K_END, "H": ansi.K_HOME, "V": ansi.K_PGUP, "U": ansi.K_PGDN } def RcvAnsi(self, code, rgarg): if self.client.evKey: if code in self.mpcode_key: self.client.evKey.fire(self.mpcode_key[code]) else: print "unrecognized code", code, rgarg def CreateEvents(self, client, game): client.evKey = engine.Event(game) def Draw(self, ascr): self.transport.write(self.ascr.AstDiff(ascr)) self.transport.write(ansi.esc + "H") #cursor stays in top-left corner? ascr should contain a cursor pos? self.ascr = ascr def Refresh(self): self.ascr = ansi.Ascr(config.W, config.H, ansi.achInvd) def connectionLost(self, reason): print "lost connection" self.client.removeDgQuit(self.transport.loseConnection) self.client.quit() class DOSNewlineTelnetTransport(telnet.TelnetTransport): def dataReceived(self, data): telnet.TelnetTransport.dataReceived(self, data) if self.state == 'newline': self.state = 'data' self.applicationDataReceived(b'\r') class AnsiFactory(ServerFactory): def __init__(self, fnRun): self.fnRun = fnRun def buildProtocol(self, addr): p = DOSNewlineTelnetTransport(AnsiTelnet, engine.Client()) p.factory = self print "built protocol for ", addr return p class CmdProtocol(basic.LineReceiver): def connectionMade(self): if self.transport.getHost().host != "127.0.0.1": self.transport.write("Sorry, you must connect from localhost") self.transport.loseConnection() def lineReceived(self, line): self.transport.write(self.factory.fnCmd(line).encode('utf-8') + self.delimiter) class CmdFactory(ServerFactory): def __init__(self, fnCmd): self.fnCmd = fnCmd def buildProtocol(self, addr): p = CmdProtocol() p.factory = self print "built cmd protocol for ", addr return p def RunServer(fnRun): log.startLogging(sys.stdout) reactor.listenTCP(config.PORT_TELNET, AnsiFactory(fnRun), 50, config.BIND_HOSTNAME) reactor.run() def RunCmdServer(fnCmd): if config.PORT_CMDSERVER: reactor.listenTCP(config.PORT_CMDSERVER, CmdFactory(fnCmd), 50, "localhost")