Files
2025-11-16 15:05:19 +03:00

420 lines
13 KiB
Python

"""Python implementation of a Tinode chatbot."""
# For compatibility between python 2 and 3
from __future__ import print_function
import argparse
import base64
from concurrent import futures
from datetime import datetime
import json
import os
try:
from importlib.metadata import version
except ImportError:
# Fallback for Python < 3.8
from importlib_metadata import version
import platform
try:
import Queue as queue
except ImportError:
import queue
import random
import signal
import sys
import time
import grpc
from google.protobuf.json_format import MessageToDict
# Import generated grpc modules
from tinode_grpc import pb
from tinode_grpc import pbx
# For compatibility with python2
if sys.version_info[0] >= 3:
unicode = str
APP_NAME = "Tino-chatbot"
APP_VERSION = "1.2.3"
LIB_VERSION = version("tinode_grpc")
# Maximum length of string to log. Shorten longer strings.
MAX_LOG_LEN = 64
# User ID of the current user
botUID = None
# Dictionary wich contains lambdas to be executed when server response is received
onCompletion = {}
# This is needed for gRPC ssl to work correctly.
os.environ["GRPC_SSL_CIPHER_SUITES"] = "HIGH+ECDSA"
def log(*args):
print(datetime.utcnow().strftime('%Y-%m-%d %H:%M:%S.%f')[:-3], *args)
# Add bundle for future execution
def add_future(tid, bundle):
onCompletion[tid] = bundle
# Shorten long strings for logging.
def clip_long_string(obj):
if isinstance(obj, unicode) or isinstance(obj, str):
if len(obj) > MAX_LOG_LEN:
return '<' + str(len(obj)) + ' bytes: ' + obj[:12] + '...' + obj[-12:] + '>'
return obj
elif isinstance(obj, (list, tuple)):
return [clip_long_string(item) for item in obj]
elif isinstance(obj, dict):
return dict((key, clip_long_string(val)) for key, val in obj.items())
else:
return obj
def to_json(msg):
return json.dumps(clip_long_string(MessageToDict(msg)))
# Resolve or reject the future
def exec_future(tid, code, text, params):
bundle = onCompletion.get(tid)
if bundle != None:
del onCompletion[tid]
try:
if code >= 200 and code < 400:
arg = bundle.get('arg')
bundle.get('onsuccess')(arg, params)
else:
log("Error: {} {} ({})".format(code, text, tid))
onerror = bundle.get('onerror')
if onerror:
onerror(bundle.get('arg'), {'code': code, 'text': text})
except Exception as err:
log("Error handling server response", err)
# List of active subscriptions
subscriptions = {}
def add_subscription(topic):
subscriptions[topic] = True
def del_subscription(topic):
subscriptions.pop(topic, None)
def subscription_failed(topic, errcode):
if topic == 'me':
# Failed 'me' subscription means the bot is disfunctional.
if errcode.get('code') == 502:
# Cluster unreachable. Break the loop and retry in a few seconds.
client_post(None)
else:
exit(1)
def login_error(unused, errcode):
# Check for 409 "already authenticated".
if errcode.get('code') != 409:
exit(1)
def server_version(params):
if params == None:
return
log("Server:", params['build'].decode('ascii'), params['ver'].decode('ascii'))
def next_id():
next_id.tid += 1
return str(next_id.tid)
next_id.tid = 100
# Quotes from the fortune cookie file
quotes = []
def next_quote():
idx = random.randrange(0, len(quotes))
# Make sure quotes are not repeated
while idx == next_quote.idx:
idx = random.randrange(0, len(quotes))
next_quote.idx = idx
return quotes[idx]
next_quote.idx = 0
# This is the class for the server-side gRPC endpoints
class Plugin(pbx.PluginServicer):
def Account(self, acc_event, context):
action = None
if acc_event.action == pb.CREATE:
action = "created"
# TODO: subscribe to the new user.
elif acc_event.action == pb.UPDATE:
action = "updated"
elif acc_event.action == pb.DELETE:
action = "deleted"
else:
action = "unknown"
log("Account", action, ":", acc_event.user_id, acc_event.public)
return pb.Unused()
queue_out = queue.Queue()
def client_generate():
while True:
msg = queue_out.get()
if msg == None:
return
log("out:", to_json(msg))
yield msg
def client_post(msg):
queue_out.put(msg)
def client_reset():
# Drain the queue
try:
while queue_out.get(False) != None:
pass
except queue.Empty:
pass
def hello():
tid = next_id()
add_future(tid, {
'onsuccess': lambda unused, params: server_version(params),
})
return pb.ClientMsg(hi=pb.ClientHi(id=tid, user_agent=APP_NAME + "/" + APP_VERSION + " (" +
platform.system() + "/" + platform.release() + "); gRPC-python/" + LIB_VERSION,
ver=LIB_VERSION, lang="EN"))
def login(cookie_file_name, scheme, secret):
tid = next_id()
add_future(tid, {
'arg': cookie_file_name,
'onsuccess': lambda fname, params: on_login(fname, params),
'onerror': lambda unused, errcode: login_error(unused, errcode),
})
return pb.ClientMsg(login=pb.ClientLogin(id=tid, scheme=scheme, secret=secret))
def subscribe(topic):
tid = next_id()
add_future(tid, {
'arg': topic,
'onsuccess': lambda topicName, unused: add_subscription(topicName),
'onerror': lambda topicName, errcode: subscription_failed(topicName, errcode),
})
return pb.ClientMsg(sub=pb.ClientSub(id=tid, topic=topic))
def leave(topic):
tid = next_id()
add_future(tid, {
'arg': topic,
'onsuccess': lambda topicName, unused: del_subscription(topicName)
})
return pb.ClientMsg(leave=pb.ClientLeave(id=tid, topic=topic))
def publish(topic, text):
tid = next_id()
return pb.ClientMsg(pub=pb.ClientPub(id=tid, topic=topic, no_echo=True,
head={"auto": json.dumps(True).encode('utf-8')}, content=json.dumps(text).encode('utf-8')))
def note_read(topic, seq):
return pb.ClientMsg(note=pb.ClientNote(topic=topic, what=pb.READ, seq_id=seq))
def init_server(listen):
# Launch plugin server: accept connection(s) from the Tinode server.
server = grpc.server(futures.ThreadPoolExecutor(max_workers=16))
pbx.add_PluginServicer_to_server(Plugin(), server)
server.add_insecure_port(listen)
server.start()
log("Plugin server running at '"+listen+"'")
return server
def init_client(addr, schema, secret, cookie_file_name, secure, ssl_host):
log("Connecting to", "secure" if secure else "", "server at", addr,
"SNI="+ssl_host if ssl_host else "")
channel = None
if secure:
opts = (('grpc.ssl_target_name_override', ssl_host),) if ssl_host else None
channel = grpc.secure_channel(addr, grpc.ssl_channel_credentials(), opts)
else:
channel = grpc.insecure_channel(addr)
# Call the server
stream = pbx.NodeStub(channel).MessageLoop(client_generate())
# Session initialization sequence: {hi}, {login}, {sub topic='me'}
client_post(hello())
client_post(login(cookie_file_name, schema, secret))
return stream
def client_message_loop(stream):
try:
# Read server responses
for msg in stream:
log(datetime.utcnow().strftime('%Y-%m-%d %H:%M:%S.%f')[:-3], "in:", to_json(msg))
if msg.HasField("ctrl"):
# Run code on command completion
exec_future(msg.ctrl.id, msg.ctrl.code, msg.ctrl.text, msg.ctrl.params)
elif msg.HasField("data"):
# log("message from:", msg.data.from_user_id)
# Protection against the bot talking to self from another session.
if msg.data.from_user_id != botUID:
# Respond to message.
# Mark received message as read
client_post(note_read(msg.data.topic, msg.data.seq_id))
# Insert a small delay to prevent accidental DoS self-attack.
time.sleep(0.1)
# Respond with a witty quote
client_post(publish(msg.data.topic, next_quote()))
elif msg.HasField("pres"):
# log("presence:", msg.pres.topic, msg.pres.what)
# Wait for peers to appear online and subscribe to their topics
if msg.pres.topic == 'me':
if (msg.pres.what == pb.ServerPres.ON or msg.pres.what == pb.ServerPres.MSG) \
and subscriptions.get(msg.pres.src) == None:
client_post(subscribe(msg.pres.src))
elif msg.pres.what == pb.ServerPres.OFF and subscriptions.get(msg.pres.src) != None:
client_post(leave(msg.pres.src))
else:
# Ignore everything else
pass
except grpc._channel._Rendezvous as err:
log("Disconnected:", err)
def read_auth_cookie(cookie_file_name):
"""Read authentication token from a file"""
cookie = open(cookie_file_name, 'r')
params = json.load(cookie)
cookie.close()
schema = params.get("schema")
secret = None
if schema == None:
return None, None
if schema == 'token':
secret = base64.b64decode(params.get('secret').encode('utf-8'))
else:
secret = params.get('secret').encode('utf-8')
return schema, secret
def on_login(cookie_file_name, params):
global botUID
client_post(subscribe('me'))
"""Save authentication token to file"""
if params == None or cookie_file_name == None:
return
if 'user' in params:
botUID = params['user'].decode("ascii")[1:-1]
# Protobuf map 'params' is not a python object or dictionary. Convert it.
nice = {'schema': 'token'}
for key_in in params:
if key_in == 'token':
key_out = 'secret'
else:
key_out = key_in
nice[key_out] = json.loads(params[key_in].decode('utf-8'))
try:
cookie = open(cookie_file_name, 'w')
json.dump(nice, cookie)
cookie.close()
except Exception as err:
log("Failed to save authentication cookie", err)
def load_quotes(file_name):
with open(file_name) as f:
for line in f:
quotes.append(line.strip())
return len(quotes)
def run(args):
schema = None
secret = None
if args.login_token:
"""Use token to login"""
schema = 'token'
secret = args.login_token.encode('ascii')
log("Logging in with token", args.login_token)
elif args.login_basic:
"""Use username:password"""
schema = 'basic'
secret = args.login_basic.encode('utf-8')
log("Logging in with login:password", args.login_basic)
else:
"""Try reading the cookie file"""
try:
schema, secret = read_auth_cookie(args.login_cookie)
log("Logging in with cookie file", args.login_cookie)
except Exception as err:
log("Failed to read authentication cookie", err)
if schema:
# Load random quotes from file
log("Loaded {} quotes".format(load_quotes(args.quotes)))
# Start Plugin server
server = init_server(args.listen)
# Initialize and launch client
client = init_client(args.host, schema, secret, args.login_cookie, args.ssl, args.ssl_host)
# Setup closure for graceful termination
def exit_gracefully(signo, stack_frame):
log("Terminated with signal", signo)
server.stop(0)
client.cancel()
sys.exit(0)
# Add signal handlers
signal.signal(signal.SIGINT, exit_gracefully)
signal.signal(signal.SIGTERM, exit_gracefully)
# Run blocking message loop in a cycle to handle
# server being down.
while True:
client_message_loop(client)
time.sleep(3)
client_reset()
client = init_client(args.host, schema, secret, args.login_cookie, args.ssl, args.ssl_host)
# Close connections gracefully before exiting
server.stop(None)
client.cancel()
else:
log("Error: authentication scheme not defined")
if __name__ == '__main__':
"""Parse command-line arguments. Extract server host name, listen address, authentication scheme"""
random.seed()
purpose = "Tino, Tinode's chatbot."
log(purpose)
parser = argparse.ArgumentParser(description=purpose)
parser.add_argument('--host', default='localhost:16060', help='address of Tinode server gRPC endpoint')
parser.add_argument('--ssl', action='store_true', help='use SSL to connect to the server')
parser.add_argument('--ssl-host', help='SSL host name to use instead of default (useful for connecting to localhost)')
parser.add_argument('--listen', default='0.0.0.0:40051', help='address to listen on for incoming Plugin API calls')
parser.add_argument('--login-basic', help='login using basic authentication username:password')
parser.add_argument('--login-token', help='login using token authentication')
parser.add_argument('--login-cookie', default='.tn-cookie', help='read credentials from the provided cookie file')
parser.add_argument('--quotes', default='quotes.txt', help='file with messages for the chatbot to use, one message per line')
args = parser.parse_args()
run(args)