marmots/telnet.py

194 lines
6.9 KiB
Python
Raw Normal View History

2020-06-28 15:27:56 +00:00
# This file is part of MarMOTS.
#
# MarMOTS is free software: you can redistribute it and/or modify it under the terms of the GNU Affero
# General Public License as published by the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# MarMOTS is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the
# implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General
# Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License along with MarMOTS. If not,
# see <https://www.gnu.org/licenses/>.
#
# Copyright 2009, 2010, 2011, 2020 Jeremy Penner
2011-03-19 00:10:02 +00:00
from twisted.protocols import basic
from twisted.conch import telnet
from twisted.internet.protocol import Factory,ServerFactory
from twisted.internet import protocol, reactor
from zope.interface import implements
from twisted.python import log
import ansi_cython as ansi
2011-03-19 00:10:02 +00:00
import engine
import sys
import config
class AnsiTelnet(protocol.Protocol):
implements(telnet.ITelnetProtocol)
rgoptRemote = [telnet.SGA, telnet.ECHO] #putty sez: 31 (NAWS), 32 (TERMINAL-SPEED), 24 (TERMINAL-TYPE), 39 (NEW-ENVIRON), 36 (ENVIRON), 3(SGA)
# We might support NAWS at some point -- internally we will always deal in 80x25, but it might be helpful to have
# window resizing work, and provide feedback if the window is too small.
# I don't know what we'd do in response to TERMINAL-SPEED, since we're just going to blast out data as fast as possible anyway.
# NEW-ENVIRON might be useful for sending usernames or cookies automatically from flashterm.
# ENVIRON is an older version of NEW-ENVIRON.
# TERMINAL-TYPE is client-to-server only, and the server only speaks ANSI. Again, maybe useful as a feedback mechanism.
rgoptLocal = [telnet.SGA] #putty sez: 1 (ECHO), 3 (SGA)
def __init__(self, client):
print "created"
self.ast = None
self.client = client
self.Refresh()
def connectionMade(self):
print "connection made"
rgdefr = []
def AddCb(defr):
"""
rgdefr.append(defr)
def cbWaitForHello(result):
print "cb: ", result, len(rgdefr)
rgdefr.remove(defr)
if len(rgdefr) == 0:
self.Hello()
defr.addCallback(cbWaitForHello)
defr.addErrback(cbWaitForHello)
"""
pass
for opt in self.rgoptRemote:
AddCb(self.transport.will(opt))
for opt in self.rgoptLocal:
AddCb(self.transport.do(opt))
print "end: ", len(rgdefr)
self.Hello()
def enableLocal(self, option):
print "enableLocal", ord(option)
return option in self.rgoptLocal
def enableRemote(self, option):
print "enableRemote", ord(option)
return option in self.rgoptRemote
def disableLocal(self, option):
print "disableLocal", option
pass
def disableRemote(self, option):
print "disableRemote", option
pass
def dataReceived(self, data):
if self.ast != None:
data = self.ast + data
ich = 0
ichSplit = 0
fAnsi = False
fEsc = False
for ch in data:
if fAnsi:
if not (ch == ";" or ch.isdigit()):
fAnsi = False
if ch.isalpha():
stRgarg = data[ichSplit + 2:ich]
if len(stRgarg) == 0:
rgarg = []
else:
rgarg = [int(stArg) for stArg in stRgarg.split(";")]
self.RcvAnsi(ch, rgarg)
ichSplit = ich + 1
elif ch == ansi.esc[0]:
fEsc = True
elif fEsc:
fEsc = False
if ch == ansi.esc[1]:
fAnsi = True
if ichSplit < ich - 1:
self.RcvText(data[ichSplit:ich - 1])
ichSplit = ich - 1
ich = ich + 1
if fEsc or fAnsi:
self.ast = data[ichSplit:ich]
else:
self.ast = None
if ichSplit < ich:
self.RcvText(data[ichSplit:ich])
def Hello(self):
print "negotiated!"
self.transport.write(ansi.esc + "2J") #cls
self.transport.write(ansi.esc + "=25l") #hide the cursor
self.client.addDgQuit(self.transport.loseConnection)
self.client.go(self.factory.fnRun, self)
2011-04-12 15:37:52 +00:00
2011-03-19 00:10:02 +00:00
def RcvText(self, text):
for ch in text:
self.client.KeyPressed(ch)
2011-03-19 00:10:02 +00:00
mpcode_key = {"A": ansi.K_UP, "B": ansi.K_DOWN, "C": ansi.K_RIGHT, "D": ansi.K_LEFT,
"K": ansi.K_END, "H": ansi.K_HOME, "V": ansi.K_PGUP, "U": ansi.K_PGDN }
def RcvAnsi(self, code, rgarg):
if code in self.mpcode_key:
self.client.KeyPressed(self.mpcode_key[code])
else:
print "unrecognized code", code, rgarg
2011-04-12 15:37:52 +00:00
2011-03-19 00:10:02 +00:00
def Draw(self, ascr):
self.transport.write(self.ascr.AstDiff(ascr))
self.transport.write(ansi.esc + "H") #cursor stays in top-left corner? ascr should contain a cursor pos?
self.ascr = ascr
def Refresh(self):
self.ascr = ansi.Ascr(config.W, config.H, ansi.achInvd)
2011-04-12 15:37:52 +00:00
2011-03-19 00:10:02 +00:00
def connectionLost(self, reason):
print "lost connection"
self.client.removeDgQuit(self.transport.loseConnection)
self.client.quit()
2011-04-12 15:37:52 +00:00
class DOSNewlineTelnetTransport(telnet.TelnetTransport):
def dataReceived(self, data):
telnet.TelnetTransport.dataReceived(self, data)
if self.state == 'newline':
self.state = 'data'
self.applicationDataReceived(b'\r')
2011-03-19 00:10:02 +00:00
class AnsiFactory(ServerFactory):
def __init__(self, fnRun):
self.fnRun = fnRun
def buildProtocol(self, addr):
p = DOSNewlineTelnetTransport(AnsiTelnet, engine.Client())
2011-03-19 00:10:02 +00:00
p.factory = self
print "built protocol for ", addr
return p
class CmdProtocol(basic.LineReceiver):
def connectionMade(self):
if self.transport.getHost().host != "127.0.0.1":
self.transport.write("Sorry, you must connect from localhost")
self.transport.loseConnection()
def lineReceived(self, line):
self.transport.write(self.factory.fnCmd(line).encode('utf-8') + self.delimiter)
2011-03-19 00:10:02 +00:00
class CmdFactory(ServerFactory):
def __init__(self, fnCmd):
self.fnCmd = fnCmd
def buildProtocol(self, addr):
p = CmdProtocol()
p.factory = self
print "built cmd protocol for ", addr
return p
def RunServer(fnRun):
log.startLogging(sys.stdout)
reactor.listenTCP(config.PORT_TELNET, AnsiFactory(fnRun), 50, config.BIND_HOSTNAME)
reactor.run()
def RunCmdServer(fnCmd):
if config.PORT_CMDSERVER:
reactor.listenTCP(config.PORT_CMDSERVER, CmdFactory(fnCmd), 50, "localhost")