ФЭНДОМ

import re
import time
from threading import Thread
from urllib.parse import urlencode
import requests
import json
from requests import Session
 
 
class BotLogger:
    def __init__(self, name):
        self.name = name
        self.log = []
 
    def debug(self, msg):
        self.log.append((0, msg))
        print(self.name + ' DEBUG ' + str(msg))
 
    def info(self, msg):
        self.log.append((1, msg))
        print(self.name + ' INFO ' + str(msg))
 
    def warning(self, msg):
        self.log.append((2, msg))
        print(self.name + ' WARNING ' + str(msg))
 
    def error(self, msg):
        self.log.append((2, msg))
        print(self.name + ' ERROR ' + str(msg))
 
    def fatal(self, msg):
        self.log.append((3, msg))
        print(self.name + ' FATAL ' + str(msg))
 
 
class FanBot:
    USERAGENT = "Fanbot v1.1 by Manka-Manka"
 
    def __init__(self, username, password, wiki, configfile="config.json"):
        self.isalive = True
        self.username = username
        self.password = password
        self.wiki = wiki
        self.basepath = "http://%s.wikia.com" % wiki
        self.apipath = self.basepath + "/api.php"
        self.wikiapath = self.basepath + "/wikia.php"
 
        self.session = Session()
        self.logger = BotLogger(name=self.name)
        self.logger.info("Initialization...")
        self.headers = {
            'User-Agent': FanBot.USERAGENT
        }
        self.islogged = False
        self.isawake = True
        self.login()
        if not self.islogged:
            self.logger.fatal("Cant'login. Quitting...")
            self.quit()
 
    @property
    def name(self):
        return self.username + "-" + self.wiki
 
    def login(self):
        self.logger.info("Logging in...")
        login_data = {
            'action': 'login',
            'lgname': self.username,
            'lgpassword': self.password,
            'format': 'json'
        }
        response = self.session.post(self.basepath + '/api.php', data=login_data)
        data = json.loads(response.text)
        self.logger.debug(data)
 
        login_data['lgtoken'] = data['login']['token']
        response = self.session.post(self.basepath + '/api.php', data=login_data)
        data = json.loads(response.text)
        self.logger.debug(data)
        if data['login']['result'] == 'Success':
            self.islogged = True
 
    def get_token(self, name):
        pass
 
    def quit(self):
        self.logger.info("Quit")
 
 
class ChatBotPingThread:
    def __init__(self, parent):
        self.bot = parent
        self.thr = Thread(target=self.__work)
 
    def start(self):
        self.thr.start()
 
    def __work(self):
        while self.bot.isalive:
            self.bot.post('ping')
            self.bot.logger.debug('Ping')
            time.sleep(bot.delay)
 
 
class ChatInfo:
    server_id = None
    is_mod = False
    room = None
    key = None
    host = None
    port = 80
 
 
class ChatBot(FanBot):
    PING_DELAY = 15
 
    def __init__(self, username, password, wiki, configfile="config.json", delay: int=None):
        FanBot.__init__(self, username, password, wiki, configfile)
 
        self.delay = delay or ChatBot.PING_DELAY
        self.login()
 
        self.headers.update({
            'Cookies': self.get_cookies(),
            'Pragma': 'no-cache',
            'Cache-Control': 'no-cache',
            'Accept': '*/*',
            'Content-Type': 'text/plain;charset=UTF-8'
        })
 
        self.request_options = {}
        self.plugins = {}
        self.handlers = {}
        self.users = {}
        self.ping_thread = ChatBotPingThread(self)
 
        self.fetchinfo()
 
    def get_cookies(self):
        keys = self.session.cookies.keys()
        values = self.session.cookies.values()
 
        cookies= {}
        i = 0
        for k in keys:
            cookies[k] = values[i]
            i += 1
        return str.join(' ', [("%s=%s;" % p) for p in cookies.items()])
 
    def fetchinfo(self):
        """"
        Fetching data
        """
        self.logger.info("Fetching chat info...")
 
        info = ChatInfo()
        chat_info = self.session.post(self.wikiapath, data={'controller': 'Chat', 'format': 'json'}).json()
        self.logger.debug(chat_info)
 
        info.is_mod = chat_info['isModerator'] == 1
        info.room = chat_info['roomId']
        info.key = chat_info['chatkey']
        info.port = chat_info['chatServerPort']
        info.host = chat_info['chatServerHost']
        info.server_id = self.session.get(
            self.basepath + '/api.php?action=query&meta=siteinfo&siprop=wikidesc&format=json'
        ).json()['query']['wikidesc']['id']
 
        self.request_options = {
          "name": self.username,
          "EIO": 3,
          "transport": 'polling',
          "key": info.key,
          "roomId": info.room,
          "serverId": info.server_id
        }
        self.time_cachebuster = 0
        self.chatapi = "http://%s:%s" % (info.host, info.port)
 
        self.logger.info("Chat api uri: " + self.chatapi)
 
        res = self.get()
        spl = res.text
        self.logger.debug(spl)
        spl = re.match(r'\d+:0(.*)$', spl)
        if spl is None:
            self.logger.fatal("Sid cannot be found")
            self.quit()
            return
        spl = spl.group(1)
        self.request_options['sid'] = json.loads(spl)['sid']
        self.logger.info("Sid: " + self.request_options['sid'])
        self.headers['Cookies'] = str.join(' ', [("%s=%s;" % c) for c in res.cookies.items()])
 
        self.isalive = True
 
    def get(self, path='/socket.io/'):
        """
        Get data from chat
        :param path:
            Relative path for request. Default: '/socket.io/'
        :return:
            Response object
        """
        opts = self.request_options.copy()
        opts['t'] = str(int(time.time()*1000)) + "-" + str(self.time_cachebuster)
        self.time_cachebuster += 1
        return requests.get(self.chatapi + path, params=opts, headers=self.headers)
 
    def post(self, body):
        if body == 'ping':
            prefix = '2'
        else:
            prefix = '42'
        json_str = prefix + json.dumps(['message', {'id': None, 'attrs': body}])
        json_str = str(len(json_str)) + ":" + json_str
 
        opts = self.request_options.copy()
        opts['t'] = str(int(time.time()*1000)) + "-" + str(self.time_cachebuster)
        self.time_cachebuster += 1
        return requests.post(self.chatapi + '/socket.io/?' + urlencode(opts), data=json_str, headers=self.headers)
 
    def run(self):
        self.ping_thread.start()
        while self.isalive:
            try:
                res = self.get()
                print(res.encoding)
                body = res.text
 
                spl = re.match(r'[:\d]+:42(.*)$', body)
                if spl is None:
                    if "Session ID unknown" in body:
                        self.logger.fatal("Error: Session ID unknown")
                        break
                    continue
 
                info = json.loads(spl.group(1))[1]
                event = info.get('event')
                data = info.get('data')
 
                if event == 'chat:add':
                    r = self.post({'msgType': 'chat', 'text': 'Message', 'name': self.username})
                    print(r)
 
                print(event, data)
 
            except Exception as e:
                pass
 
bot = ChatBot('UserName', 'Password', 'wiki')
bot.run()
Материалы сообщества доступны в соответствии с условиями лицензии CC-BY-SA , если не указано иное.