from twisted.protocols import basic from twisted.conch import telnet from twisted.internet.protocol import Factory,ServerFactory from twisted.internet import protocol, reactor from zope.interface import implements from twisted.python import log import ansi import engine import sys import config class AnsiTelnet(protocol.Protocol): implements(telnet.ITelnetProtocol) rgoptRemote = [telnet.SGA, telnet.ECHO] #putty sez: 31 (NAWS), 32 (TERMINAL-SPEED), 24 (TERMINAL-TYPE), 39 (NEW-ENVIRON), 36 (ENVIRON), 3(SGA) # We might support NAWS at some point -- internally we will always deal in 80x25, but it might be helpful to have # window resizing work, and provide feedback if the window is too small. # I don't know what we'd do in response to TERMINAL-SPEED, since we're just going to blast out data as fast as possible anyway. # NEW-ENVIRON might be useful for sending usernames or cookies automatically from flashterm. # ENVIRON is an older version of NEW-ENVIRON. # TERMINAL-TYPE is client-to-server only, and the server only speaks ANSI. Again, maybe useful as a feedback mechanism. rgoptLocal = [telnet.SGA] #putty sez: 1 (ECHO), 3 (SGA) def __init__(self, client): print "created" self.ast = None self.client = client self.evKey = None self.evArrow = None self.Refresh() def connectionMade(self): print "connection made" rgdefr = [] def AddCb(defr): """ rgdefr.append(defr) def cbWaitForHello(result): print "cb: ", result, len(rgdefr) rgdefr.remove(defr) if len(rgdefr) == 0: self.Hello() defr.addCallback(cbWaitForHello) defr.addErrback(cbWaitForHello) """ pass for opt in self.rgoptRemote: AddCb(self.transport.will(opt)) for opt in self.rgoptLocal: AddCb(self.transport.do(opt)) print "end: ", len(rgdefr) self.Hello() def enableLocal(self, option): print "enableLocal", ord(option) return option in self.rgoptLocal def enableRemote(self, option): print "enableRemote", ord(option) return option in self.rgoptRemote def disableLocal(self, option): print "disableLocal", option pass def disableRemote(self, option): print "disableRemote", option pass def dataReceived(self, data): if self.ast != None: data = self.ast + data ich = 0 ichSplit = 0 fAnsi = False fEsc = False for ch in data: if fAnsi: if not (ch == ";" or ch.isdigit()): fAnsi = False if ch.isalpha(): stRgarg = data[ichSplit + 2:ich] if len(stRgarg) == 0: rgarg = [] else: rgarg = [int(stArg) for stArg in stRgarg.split(";")] self.RcvAnsi(ch, rgarg) ichSplit = ich + 1 elif ch == ansi.esc[0]: fEsc = True elif fEsc: fEsc = False if ch == ansi.esc[1]: fAnsi = True if ichSplit < ich - 1: self.RcvText(data[ichSplit:ich - 1]) ichSplit = ich - 1 ich = ich + 1 if fEsc or fAnsi: self.ast = data[ichSplit:ich] else: self.ast = None if ichSplit < ich: self.RcvText(data[ichSplit:ich]) def Hello(self): print "negotiated!" self.transport.write(ansi.esc + "2J") #cls self.transport.write(ansi.esc + "=25l") #hide the cursor self.client.dgJoinGame = self.CreateEvents self.client.addDgQuit(self.transport.loseConnection) self.client.go(self.factory.fnRun, self) def RcvText(self, text): if self.client.evKey: for ch in text: self.client.evKey.fire(ch) mpcode_key = {"A": ansi.K_UP, "B": ansi.K_DOWN, "C": ansi.K_RIGHT, "D": ansi.K_LEFT, "K": ansi.K_END, "H": ansi.K_HOME, "V": ansi.K_PGUP, "U": ansi.K_PGDN } def RcvAnsi(self, code, rgarg): if self.client.evKey: if code in self.mpcode_key: self.client.evKey.fire(self.mpcode_key[code]) else: print "unrecognized code", code, rgarg def CreateEvents(self, client, game): client.evKey = engine.Event(game) def Draw(self, ascr): self.transport.write(self.ascr.AstDiff(ascr)) self.transport.write(ansi.esc + "H") #cursor stays in top-left corner? ascr should contain a cursor pos? self.ascr = ascr def Refresh(self): self.ascr = ansi.Ascr(config.W, config.H, ansi.achInvd) def connectionLost(self, reason): print "lost connection" self.client.removeDgQuit(self.transport.loseConnection) self.client.quit() class DOSNewlineTelnetTransport(telnet.TelnetTransport): def dataReceived(self, data): telnet.TelnetTransport.dataReceived(self, data) if self.state == 'newline': self.state = 'data' self.applicationDataReceived(b'\r') class AnsiFactory(ServerFactory): def __init__(self, fnRun): self.fnRun = fnRun def buildProtocol(self, addr): p = DOSNewlineTelnetTransport(AnsiTelnet, engine.Client()) p.factory = self print "built protocol for ", addr return p class FlashPolicy(basic.LineReceiver): delimiter = '\0' def lineReceived(self,line): if line == "": self.sendLine("") class CmdProtocol(basic.LineReceiver): def connectionMade(self): if self.transport.getHost().host != "127.0.0.1": self.transport.write("Sorry, you must connect from localhost") self.transport.loseConnection() def lineReceived(self, line): self.transport.write(self.factory.fnCmd(line) + self.delimiter) class CmdFactory(ServerFactory): def __init__(self, fnCmd): self.fnCmd = fnCmd def buildProtocol(self, addr): p = CmdProtocol() p.factory = self print "built cmd protocol for ", addr return p def RunServer(fnRun): log.startLogging(sys.stdout) factoryPolicy = Factory() factoryPolicy.protocol = FlashPolicy reactor.listenTCP(config.PORT_TELNET, AnsiFactory(fnRun), 50, config.BIND_HOSTNAME) if config.PORT_FLASHPOLICY: reactor.listenTCP(config.PORT_FLASHPOLICY, factoryPolicy, 50, config.BIND_HOSTNAME) reactor.run() def RunCmdServer(fnCmd): if config.PORT_CMDSERVER: reactor.listenTCP(config.PORT_CMDSERVER, CmdFactory(fnCmd), 50, "localhost")