# This file is part of MarMOTS. # # MarMOTS is free software: you can redistribute it and/or modify it under the terms of the GNU Affero # General Public License as published by the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # MarMOTS is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the # implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General # Public License for more details. # # You should have received a copy of the GNU Affero General Public License along with MarMOTS. If not, # see . # # Copyright 2009, 2010, 2011, 2020 Jeremy Penner from twisted.protocols import basic from twisted.conch import telnet from twisted.internet.protocol import Factory,ServerFactory from twisted.internet import protocol, reactor from zope.interface import implements from twisted.python import log import ansi_cython as ansi import engine import sys import config class AnsiTelnet(protocol.Protocol): implements(telnet.ITelnetProtocol) rgoptRemote = [telnet.SGA, telnet.ECHO] #putty sez: 31 (NAWS), 32 (TERMINAL-SPEED), 24 (TERMINAL-TYPE), 39 (NEW-ENVIRON), 36 (ENVIRON), 3(SGA) # We might support NAWS at some point -- internally we will always deal in 80x25, but it might be helpful to have # window resizing work, and provide feedback if the window is too small. # I don't know what we'd do in response to TERMINAL-SPEED, since we're just going to blast out data as fast as possible anyway. # NEW-ENVIRON might be useful for sending usernames or cookies automatically from flashterm. # ENVIRON is an older version of NEW-ENVIRON. # TERMINAL-TYPE is client-to-server only, and the server only speaks ANSI. Again, maybe useful as a feedback mechanism. rgoptLocal = [telnet.SGA] #putty sez: 1 (ECHO), 3 (SGA) def __init__(self, client): print "created" self.ast = None self.client = client self.Refresh() def connectionMade(self): print "connection made" rgdefr = [] def AddCb(defr): """ rgdefr.append(defr) def cbWaitForHello(result): print "cb: ", result, len(rgdefr) rgdefr.remove(defr) if len(rgdefr) == 0: self.Hello() defr.addCallback(cbWaitForHello) defr.addErrback(cbWaitForHello) """ pass for opt in self.rgoptRemote: AddCb(self.transport.will(opt)) for opt in self.rgoptLocal: AddCb(self.transport.do(opt)) print "end: ", len(rgdefr) self.Hello() def enableLocal(self, option): print "enableLocal", ord(option) return option in self.rgoptLocal def enableRemote(self, option): print "enableRemote", ord(option) return option in self.rgoptRemote def disableLocal(self, option): print "disableLocal", option pass def disableRemote(self, option): print "disableRemote", option pass def dataReceived(self, data): if self.ast != None: data = self.ast + data ich = 0 ichSplit = 0 fAnsi = False fEsc = False for ch in data: if fAnsi: if not (ch == ";" or ch.isdigit()): fAnsi = False if ch.isalpha(): stRgarg = data[ichSplit + 2:ich] if len(stRgarg) == 0: rgarg = [] else: rgarg = [int(stArg) for stArg in stRgarg.split(";")] self.RcvAnsi(ch, rgarg) ichSplit = ich + 1 elif ch == ansi.esc[0]: fEsc = True elif fEsc: fEsc = False if ch == ansi.esc[1]: fAnsi = True if ichSplit < ich - 1: self.RcvText(data[ichSplit:ich - 1]) ichSplit = ich - 1 ich = ich + 1 if fEsc or fAnsi: self.ast = data[ichSplit:ich] else: self.ast = None if ichSplit < ich: self.RcvText(data[ichSplit:ich]) def Hello(self): print "negotiated!" self.transport.write(ansi.esc + "2J") #cls self.transport.write(ansi.esc + "=25l") #hide the cursor self.client.addDgQuit(self.transport.loseConnection) self.client.go(self.factory.fnRun, self) def RcvText(self, text): for ch in text: self.client.KeyPressed(ch) mpcode_key = {"A": ansi.K_UP, "B": ansi.K_DOWN, "C": ansi.K_RIGHT, "D": ansi.K_LEFT, "K": ansi.K_END, "H": ansi.K_HOME, "V": ansi.K_PGUP, "U": ansi.K_PGDN } def RcvAnsi(self, code, rgarg): if code in self.mpcode_key: self.client.KeyPressed(self.mpcode_key[code]) else: print "unrecognized code", code, rgarg def Draw(self, ascr): self.transport.write(self.ascr.AstDiff(ascr)) self.transport.write(ansi.esc + "H") #cursor stays in top-left corner? ascr should contain a cursor pos? self.ascr = ascr def Refresh(self): self.ascr = ansi.Ascr(config.W, config.H, ansi.achInvd) def connectionLost(self, reason): print "lost connection" self.client.removeDgQuit(self.transport.loseConnection) self.client.quit() class DOSNewlineTelnetTransport(telnet.TelnetTransport): def dataReceived(self, data): telnet.TelnetTransport.dataReceived(self, data) if self.state == 'newline': self.state = 'data' self.applicationDataReceived(b'\r') class AnsiFactory(ServerFactory): def __init__(self, fnRun): self.fnRun = fnRun def buildProtocol(self, addr): p = DOSNewlineTelnetTransport(AnsiTelnet, engine.Client()) p.factory = self print "built protocol for ", addr return p class CmdProtocol(basic.LineReceiver): def connectionMade(self): if self.transport.getHost().host != "127.0.0.1": self.transport.write("Sorry, you must connect from localhost") self.transport.loseConnection() def lineReceived(self, line): self.transport.write(self.factory.fnCmd(line).encode('utf-8') + self.delimiter) class CmdFactory(ServerFactory): def __init__(self, fnCmd): self.fnCmd = fnCmd def buildProtocol(self, addr): p = CmdProtocol() p.factory = self print "built cmd protocol for ", addr return p def RunServer(fnRun): log.startLogging(sys.stdout) reactor.listenTCP(config.PORT_TELNET, AnsiFactory(fnRun), 50, config.BIND_HOSTNAME) reactor.run() def RunCmdServer(fnCmd): if config.PORT_CMDSERVER: reactor.listenTCP(config.PORT_CMDSERVER, CmdFactory(fnCmd), 50, "localhost")