QQ<=>TG:一个惨不忍睹的东西

请注意,本文编写于 2133 天前,最后修改于 1641 天前,其中某些信息可能已经过时。

前言

开发周期一天半。脑袋里想着这东西写完估计再也不会碰了。于是乎就完全没有在意代码风格和编写习惯,最终造成了如此鬼畜的emmmm程序。

虽然就代码而言有点恐怖,但是就结果而言,他还是十分耐操。至少在他寿终正寝(我们觉得实在烦了)之前,还是没有出什么bug的。

其实是去年就完成的东西了。现在只是正好记起来搬出来而已XD。

寒假过去了1/5了我却是在病魔中度过的,真是太浪费了呀。。

coolq是用了天台封装的。天台就是好啊,要是我肯定才懒得封装这些东西XD,直接暴力怼直接用hhhhhhh

代码在下面。


# -*- coding: utf-8 -*-
import json
import telegram
from coolq import CoolQHttpAPI
from flask import Flask, request
import re
from peewee import *
from playhouse.shortcuts import model_to_dict

db = SqliteDatabase("Custom.db")
switch = -1  # -1 is connected,0 is cut off , 1 is juest tg to QQ ,2 is juest QQ to tg



class BaseModel(Model):
    class Meta:
        database = db


class FriendShip(BaseModel):
    ID = PrimaryKeyField()
    QQ = IntegerField(null=True, unique=True)
    TG = CharField(null=True, unique=True)
    NAME = CharField(null=True, unique=True)
    REGISTERED = IntegerField(null=False)  # if == 0 is registered.

    class Meta:
        db_table = "FriendShip"


FriendShip.create_table()
app = Flask(__name__)
api = CoolQHttpAPI("http://other.9bie.org:5700", access_token="")
bot = telegram.Bot(token="")


def tool_function_check_identifier(ifentifier, type):
    '''
    Query the regietered custom_ifentifier
    if is not exits,return ""
    :param ifentifier: id
    :param type: 1 ->QQ ,2 ->TG 3....more ->WEB OR OTHER?
    :return: custom_identifier
    '''
    types = {1: FriendShip.QQ, 2: FriendShip.TG}
    try:
        ret = FriendShip.select().where(types[type] == ifentifier, FriendShip.REGISTERED == 0)[0]
    except:
        return False
    return ret.NAME if type != 0 else False


def tool_function_confirm_custom_identifier(sender: int, custom_identifier: str, qq_id: int = 0, tg_name: str = ""):
    senders = {1: 2, 2: 1}
    try:
        ret = FriendShip.select().where(FriendShip.QQ == qq_id, FriendShip.TG == tg_name,
                                        FriendShip.REGISTERED == senders[sender], FriendShip.NAME == custom_identifier)[
            0].update(REGISTERED=0).execute()
    except:
        return False


def tool_function_bind_qq_tg_identifier(custom_identifier: str, qq_id: int, tg_name: str, sender: int) -> bool:
    """
    Unifed the TG and QQ 's name,to hardcore to the local file.
    :param custom_identifier: User coustom name.
    :param qq_id: qq number
    :param tg_name: tg username
    :param sender 1 ->QQ ,2 ->TG 3....more ->WEB OR OTHER?
    :return: T OR F
    """
    try:
        FriendShip.create(QQ=qq_id, TG=tg_name, NAME=custom_identifier, REGISTERED=sender)
    except:
        return False
    return True


def tool_function_txt(start_str, end, html):
    start = html.find(start_str)
    if start >= 0:
        start += len(start_str)
        end = html.find(end, start)
        if end >= 0:
            return html[start:end].strip()


def tool_function_peelphotourl(message_raw):  # return the message only and image_list
    image_list = []
    while message_raw.find("url=") != -1:
        # image_list.append(message_raw[message_raw.find("url=")+4:message_raw.find("]")])
        image_list.append(tool_function_txt("url=", "]", message_raw))
        message_raw = re.sub("\[CQ:image.*?\]", "", message_raw)
        print(message_raw)
    return message_raw, image_list


def control(message_raw, sender: int):
    if message_raw[:9] == "/register":
        param_ = message_raw.split(" ")
        if len(param_) == 4:  # /register name qq tg
            if tool_function_bind_qq_tg_identifier(custom_identifier=param_[1], qq_id=int(param_[2]), tg_name=param_[3],
                                                   sender=sender):
                if sender == 1:
                    api.send_group_msg(558226805, "ok.")
                else:
                    bot.send_message(chat_id=-256726247, text="ok.")
                return True
        else:
            if sender == 1:
                api.send_group_msg(558226805, "USAGE:\n/register custom_name qq_number tg_username.")
            else:
                bot.send_message(chat_id=-256726247, text="USAGE:\n/register custom_name qq_number tg_username.")
    if message_raw[:13] == "/acc_register":
        param_ = message_raw.split(" ")
        if len(param_) == 4:  # /register name qq tg
            if tool_function_confirm_custom_identifier(custom_identifier=param_[1], qq_id=int(param_[2]),
                                                       tg_name=param_[3], sender=sender):
                if sender == 2:
                    api.send_group_msg(558226805, "ok.")
                else:
                    bot.send_message(chat_id=-256726247, text="ok.")
                return True
        else:
            if sender == 1:
                api.send_group_msg(558226805, "USAGE:\n    /acc_register custom_name qq_number tg_username.")
            else:
                bot.send_message(chat_id=-256726247,
                                 text="USAGE:\n    /acc_register custom_name qq_number tg_username.")
    if message_raw[:len("/switch")] == "/switch":
        param_ = message_raw.split(" ")
        if len(param_) == 2:  # /switch -1~2
            print("[Switch]:%s"%param_[1])
            if int(param_[1]) < -1 or int(param_[1]) > 2:
                if sender == 1:
                    api.send_group_msg(558226805,
                                       "USAGE:\n    /switch -1~2\n    -1 is connect.0 is cut off.1 is juest tg to qq.2 is juest qq to tg")
                else:
                    bot.send_message(chat_id=-256726247,
                                     text="USAGE:\n    /switch -1~2\n    -1 is connect.0 is cut off.1 is juest tg to qq.2 is juest qq to tg")
            else:
                global switch
                switch = int(param_[1])
                api.send_group_msg(558226805,"OK.NOW MOD:%s"%param_[1])
                bot.send_message(chat_id=-256726247,
                                     text="OK.NOW MOD:%s"%param_[1])
        else:
            if sender == 1:
                api.send_group_msg(558226805,
                                   "USAGE:\n    /switch -1~2\n    -1 is connect.0 is cut off.1 is juest tg to qq.2 is juest qq to tg")
            else:
                bot.send_message(chat_id=-256726247,
                                 text="USAGE:\n    /switch -1~2\n    -1 is connect.0 is cut off.1 is juest tg to qq.2 is juest qq to tg")


def kernerl(data, source: int):
    # Source: 1 ->QQ ,2 ->TG 3....more ->WEB OR OTHER?
    if source == 1:
        if data["post_type"] == "message":
            if data["message_type"] == "group":
                if data["group_id"] == 558226805:
                    # check the control commond.
                    # such as "!help"  ,and the other.
                    if control(data["message"], 1):
                        # control break.
                        return
                    if switch == 0 or switch == 1:
                        print("[QQ][RECV][SWITCH]:%d" % switch)
                        return
                    msg = data["message"]

                    name = tool_function_check_identifier(data["user_id"], 1)
                    print("[INFO:QQ]%s-----%s" % (data["user_id"], name))
                    if name == False:
                        info = api.get_group_member_info(
                            558226805, data["user_id"])
                        name = info["card"] or info["nickname"]
                    if "CQ:at" in msg:
                        at = tool_function_check_identifier(tool_function_txt("qq=", "]", msg), 1)
                        if at == False:
                            info = api.get_group_member_info(
                                558226805, tool_function_txt("qq=", "]", msg))
                            at = info["card"] or info["nickname"]
                            msg = re.sub("\[CQ:at.*?\]", "@%s" % api.at(
                                558226805, data["user_id"]), msg)
                        else:
                            msg = re.sub("\[CQ:at.*?\]", "@%s" % at, msg)
                    if "CQ:image" in data["message"]:
                        # send msg have photo
                        msg, image_list = tool_function_peelphotourl(msg)
                        # chat_id = bot.send_message(chat_id=-256726247, text="%s:\n%s" % (
                        #     name, msg)).message_id
                        for i in image_list:
                            if msg != "":
                                bot.send_photo(chat_id=-256726247, photo=i, caption="%s: %s" % (name, msg))
                            else:
                                bot.send_photo(chat_id=-256726247, photo=i, caption="%s: Send a Photo." % (name))
                    elif "message" in data:
                        msg = re.sub("\[.*?\]", "", msg)  # Unkonw.delete
                        bot.send_message(chat_id=-256726247, text="%s: %s" % (
                            name, msg))
    if source == 2:

        if data["chat"]["type"] == "group":
            if data["chat"]["id"] == -256726247:
                if data["from"]["is_bot"] == False:
                    if "text" in data:  # it's text
                        if control(data["text"], 2):
                            # control break.
                            return
                        if switch == 0 or switch == 2:
                            print("[TG][RECV][SWITCH]:%d" % switch)
                            return
                        if "username" not in data["from"]:
                            name = data["from"]["first_name"] + data["from"]["last_name"]
                        else:
                            name = data["from"]["username"]
                        info = tool_function_check_identifier(name, 2)

                        print("[INFO:TG]%s-----%s" % (name, info))
                        if info == False:
                            info = name
                        api.send_group_msg(558226805, "%s: %s" % (info, data["text"]))


@app.route("/initbot")
def init():
    bot.set_webhook("https://other.9bie.org/")
    return "Working..."


@app.route("/", methods=["POST"])
def tg_event():
    update = telegram.Update.de_json(request.get_json(force=True), bot)
    if update.message is None:
        return "Show me your TOKEN please!"
    kernerl(update.message.to_dict(), 2)
    return ""


@app.route("/", methods=["POST"])
def qq_event():
    data = json.loads(request.data.decode("utf-8"))
    kernerl(data, 1)
    return ""


if __name__ == "__main__":
    # updater = Updater(token='')
    bot = telegram.Bot(token="")
    bot.set_webhook("https://other.9bie.org/")
    # api.send_group_msg(558226805, "QQ<-->TG<status>:\n    START!")
    # bot.send_message(chat_id=-256726247, text="QQ<-->TG<status>:\n    START!")
    app.run(host='0.0.0.0', port=8088)

COOLQ:

# -*- coding: utf-8 -*-
import json

import requests

RETCODE = {
    100: AttributeError("100: 参数缺失或参数无效"),
    102: ValueError("102: 酷Q函数返回的数据无效"),
    103: PermissionError("103: 权限不足或文件系统异常,不符合预期"),
    104: RuntimeError("104: 酷Q凭证失效"),
    201: EnvironmentError("201: 工作线程池未正确初始化")
}


class CoolQHttpAPI:
    
    def __init__(self, addr, access_token=None):
        if addr[-1] == "/":
            self.addr = addr[:-1]
        else:
            self.addr = addr
        self.access_token = access_token
        if not self.get_status()["good"]:
            raise ConnectionError("连接到酷Q失败")
    
    def _POST(self, api, **kargs):
        headers = {"Content-Type": "application/json"}
        if self.access_token:
            headers["Authorization"] = "Token %s" % self.access_token
        try:
            resp = requests.post(
                url=self.addr + "/" + api,
                headers=headers,
                data=json.dumps(kargs),
                timeout=5)
        except requests.exceptions.ConnectionError:
            raise ConnectionError("无法连接到酷Q")
        except requests.exceptions.Timeout:
            raise TimeoutError("与酷Q通讯时发生超时")
        if resp.status_code == 404:
            raise NameError("API %s 不存在" % api)
        tmp = resp.json()
        if tmp["status"] == "async":
            return None
        if tmp["status"] == "failed":
            if tmp["retcode"] in RETCODE:
                raise RETCODE[tmp["retcode"]]
            else:
                raise NotImplementedError("%s 未知错误" % tmp["retcode"])
        return tmp["data"]
    
    def send_private_msg(self, user_id, message, auto_escape=False):
        return self._POST(
            "send_private_msg",
            user_id=user_id,
            message=message,
            auto_escape=auto_escape
        )
    
    def send_group_msg(self, group_id, message, auto_escape=False):
        return self._POST(
            "send_group_msg",
            group_id=group_id,
            message=message,
            auto_escape=auto_escape
        )
    
    def send_discuss_msg(self, discuss_id, message, auto_escape=False):
        return self._POST(
            "send_discuss_msg",
            group_id=discuss_id,
            message=message,
            auto_escape=auto_escape
        )
    
    def delete_msg(self, message_id):
        return self._POST(
            "delete_msg",
            message_id=message_id)
    
    def send_like(self, user_id, times=1):
        return self._POST(
            "send_like",
            times=times
        )
    
    def set_group_kick(self, group_id, user_id, reject_add_request=False):
        return self._POST(
            "set_group_kick",
            group_id=group_id,
            user_id=user_id,
            reject_add_request=reject_add_request
        )

    def set_group_ban(self, group_id, user_id, duration=60):
        return self._POST(
            "set_group_ban",
            group_id=group_id,
            user_id=user_id,
            duration=duration
        )

    def set_group_anonymous_ban(self, group_id, anonymous=None, 
                                anonymous_flag=None, duration=30):
        if anonymous:
            return self._POST(
                "set_group_anonymous_ban",
                group_id=group_id,
                anonymous=anonymous,
                duration=duration
            )
        elif anonymous_flag:
            return self._POST(
                "set_group_anonymous_ban",
                group_id=group_id,
                anonymous_flag=anonymous_flag,
                duration=duration
            )
        else:
            raise AttributeError("anonymous与anonymous_flag二选一必填")

    def set_group_whole_ban(self, group_id, enable=True):
        return self._POST(
            "set_group_whole_ban",
            group_id=group_id,
            enable=enable
        )

    def set_group_admin(self, group_id, user_id, enable=True):
        return self._POST(
            "set_group_admin",
            group_id=group_id,
            user_id=user_id,
            enable=enable
        )
    
    def set_group_anonymous(self, group_id, enable=True):
        return self._POST(
            "set_group_anonymous",
            group_id=group_id,
            enable=enable
        )
    
    def set_group_card(self, group_id, user_id, card=""):
        return self._POST(
            "set_group_card",
            group_id=group_id,
            user_id=user_id,
            card=card
        )
    
    def set_group_leave(self, group_id, is_dismiss=False):
        return self._POST(
            "set_group_leave",
            group_id=group_id,
            is_dismiss=is_dismiss
        )
    
    def set_group_special_title(self, group_id, user_id, special_title="", duration=-1):
        return self._POST(
            "set_group_special_title",
            group_id=group_id,
            user_id=user_id,
            special_title=special_title,
            duration=duration
        )
    
    def set_discuss_leave(self, discuss_id):
        return self._POST(
            "set_discuss_leave",
            discuss_id=discuss_id
        )
    
    def set_friend_add_request(self, flag, approve=True, remark=""):
        return self._POST(
            "set_friend_add_request",
            flag=flag,
            approve=approve,
            remark=remark
        )
    
    def set_group_add_request(self, flag, sub_type, approve=True, reason=""):
        return self._POST(
            "set_group_add_request",
            flag=flag,
            sub_type=sub_type,
            approve=approve,
            reason=reason
        )
    
    def get_login_info(self):
        return self._POST("get_login_info")
    
    def get_stranger_info(self, user_id, no_cache=False):
        return self._POST(
            "get_stranger_info",
            user_id=user_id,
            no_cache=no_cache
        )
    
    def get_group_list(self):
        return self._POST("get_group_list")
    
    def get_group_member_info(self, group_id, user_id, no_cache=False):
        return self._POST(
            "get_group_member_info",
            group_id=group_id,
            user_id=user_id,
            no_cache=no_cache
        )

    def get_group_member_list(self, group_id):
        return self._POST(
            "get_group_member_list",
            group_id=group_id
        )
    
    def get_cookies(self):
        return self._POST("get_cookies")

    def get_csrf_token(self):
        return self._POST("get_csrf_token")
    
    def get_credentials(self):
        return self._POST("get_credentials")
    
    def get_record(self, file, out_format):
        return self._POST(
            "get_record",
            file=file,
            out_format=out_format
        )
    
    def get_status(self):
        return self._POST("get_status")
    
    def get_version_info(self):
        return self._POST("get_version_info")
    
    def set_restart(self, clean_log=False, clean_cache=False, clean_event=False):
        return self._POST(
            "set_restart",
            clean_log=clean_log,
            clean_cache=clean_cache,
            clean_event=clean_event
        )
    
    def set_restart_plugin(self, delay=0):
        return self._POST(
            "set_restart_plugin",
            delay=delay
        )
    
    def clean_data_dir(self, data_dir):
        return self._POST(
            "clean_data_dir",
            data_dir=data_dir
        )
    
    def clean_plugin_log(self):
        return self._POST("clean_plugin_log")


if __name__ == "__main__":
    api = CoolQHttpAPI("http://127.0.0.1:5700")
    api.send_private_msg(524543577, "Hello"

添加新评论

已有 1 条评论

这是啥东西~