marmots/telnet.py
Jeremy Penner 10edbacf86 WIP robot creation in board editor!
- refactor client - now has a _stack_ of current Games, the top of which
  is active - prevents shutdown of whiteboard when "leaving" to edit
  source code
- add tpers.StaticIndex / inspector.py for interactively querying object
  graphs
2020-07-19 21:31:54 -04:00

194 lines
6.9 KiB
Python

# This file is part of MarMOTS.
#
# MarMOTS is free software: you can redistribute it and/or modify it under the terms of the GNU Affero
# General Public License as published by the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# MarMOTS is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the
# implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General
# Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License along with MarMOTS. If not,
# see <https://www.gnu.org/licenses/>.
#
# Copyright 2009, 2010, 2011, 2020 Jeremy Penner
from twisted.protocols import basic
from twisted.conch import telnet
from twisted.internet.protocol import Factory,ServerFactory
from twisted.internet import protocol, reactor
from zope.interface import implements
from twisted.python import log
import ansi_cython as ansi
import engine
import sys
import config
class AnsiTelnet(protocol.Protocol):
implements(telnet.ITelnetProtocol)
rgoptRemote = [telnet.SGA, telnet.ECHO] #putty sez: 31 (NAWS), 32 (TERMINAL-SPEED), 24 (TERMINAL-TYPE), 39 (NEW-ENVIRON), 36 (ENVIRON), 3(SGA)
# We might support NAWS at some point -- internally we will always deal in 80x25, but it might be helpful to have
# window resizing work, and provide feedback if the window is too small.
# I don't know what we'd do in response to TERMINAL-SPEED, since we're just going to blast out data as fast as possible anyway.
# NEW-ENVIRON might be useful for sending usernames or cookies automatically from flashterm.
# ENVIRON is an older version of NEW-ENVIRON.
# TERMINAL-TYPE is client-to-server only, and the server only speaks ANSI. Again, maybe useful as a feedback mechanism.
rgoptLocal = [telnet.SGA] #putty sez: 1 (ECHO), 3 (SGA)
def __init__(self, client):
print "created"
self.ast = None
self.client = client
self.Refresh()
def connectionMade(self):
print "connection made"
rgdefr = []
def AddCb(defr):
"""
rgdefr.append(defr)
def cbWaitForHello(result):
print "cb: ", result, len(rgdefr)
rgdefr.remove(defr)
if len(rgdefr) == 0:
self.Hello()
defr.addCallback(cbWaitForHello)
defr.addErrback(cbWaitForHello)
"""
pass
for opt in self.rgoptRemote:
AddCb(self.transport.will(opt))
for opt in self.rgoptLocal:
AddCb(self.transport.do(opt))
print "end: ", len(rgdefr)
self.Hello()
def enableLocal(self, option):
print "enableLocal", ord(option)
return option in self.rgoptLocal
def enableRemote(self, option):
print "enableRemote", ord(option)
return option in self.rgoptRemote
def disableLocal(self, option):
print "disableLocal", option
pass
def disableRemote(self, option):
print "disableRemote", option
pass
def dataReceived(self, data):
if self.ast != None:
data = self.ast + data
ich = 0
ichSplit = 0
fAnsi = False
fEsc = False
for ch in data:
if fAnsi:
if not (ch == ";" or ch.isdigit()):
fAnsi = False
if ch.isalpha():
stRgarg = data[ichSplit + 2:ich]
if len(stRgarg) == 0:
rgarg = []
else:
rgarg = [int(stArg) for stArg in stRgarg.split(";")]
self.RcvAnsi(ch, rgarg)
ichSplit = ich + 1
elif ch == ansi.esc[0]:
fEsc = True
elif fEsc:
fEsc = False
if ch == ansi.esc[1]:
fAnsi = True
if ichSplit < ich - 1:
self.RcvText(data[ichSplit:ich - 1])
ichSplit = ich - 1
ich = ich + 1
if fEsc or fAnsi:
self.ast = data[ichSplit:ich]
else:
self.ast = None
if ichSplit < ich:
self.RcvText(data[ichSplit:ich])
def Hello(self):
print "negotiated!"
self.transport.write(ansi.esc + "2J") #cls
self.transport.write(ansi.esc + "=25l") #hide the cursor
self.client.addDgQuit(self.transport.loseConnection)
self.client.go(self.factory.fnRun, self)
def RcvText(self, text):
for ch in text:
self.client.KeyPressed(ch)
mpcode_key = {"A": ansi.K_UP, "B": ansi.K_DOWN, "C": ansi.K_RIGHT, "D": ansi.K_LEFT,
"K": ansi.K_END, "H": ansi.K_HOME, "V": ansi.K_PGUP, "U": ansi.K_PGDN }
def RcvAnsi(self, code, rgarg):
if code in self.mpcode_key:
self.client.KeyPressed(self.mpcode_key[code])
else:
print "unrecognized code", code, rgarg
def Draw(self, ascr):
self.transport.write(self.ascr.AstDiff(ascr))
self.transport.write(ansi.esc + "H") #cursor stays in top-left corner? ascr should contain a cursor pos?
self.ascr = ascr
def Refresh(self):
self.ascr = ansi.Ascr(config.W, config.H, ansi.achInvd)
def connectionLost(self, reason):
print "lost connection"
self.client.removeDgQuit(self.transport.loseConnection)
self.client.quit()
class DOSNewlineTelnetTransport(telnet.TelnetTransport):
def dataReceived(self, data):
telnet.TelnetTransport.dataReceived(self, data)
if self.state == 'newline':
self.state = 'data'
self.applicationDataReceived(b'\r')
class AnsiFactory(ServerFactory):
def __init__(self, fnRun):
self.fnRun = fnRun
def buildProtocol(self, addr):
p = DOSNewlineTelnetTransport(AnsiTelnet, engine.Client())
p.factory = self
print "built protocol for ", addr
return p
class CmdProtocol(basic.LineReceiver):
def connectionMade(self):
if self.transport.getHost().host != "127.0.0.1":
self.transport.write("Sorry, you must connect from localhost")
self.transport.loseConnection()
def lineReceived(self, line):
self.transport.write(self.factory.fnCmd(line).encode('utf-8') + self.delimiter)
class CmdFactory(ServerFactory):
def __init__(self, fnCmd):
self.fnCmd = fnCmd
def buildProtocol(self, addr):
p = CmdProtocol()
p.factory = self
print "built cmd protocol for ", addr
return p
def RunServer(fnRun):
log.startLogging(sys.stdout)
reactor.listenTCP(config.PORT_TELNET, AnsiFactory(fnRun), 50, config.BIND_HOSTNAME)
reactor.run()
def RunCmdServer(fnCmd):
if config.PORT_CMDSERVER:
reactor.listenTCP(config.PORT_CMDSERVER, CmdFactory(fnCmd), 50, "localhost")