QQ<=>TG:一个惨不忍睹的东西

请注意,本文编写于 2270 天前,最后修改于 1777 天前,其中某些信息可能已经过时。

前言

开发周期一天半。脑袋里想着这东西写完估计再也不会碰了。于是乎就完全没有在意代码风格和编写习惯,最终造成了如此鬼畜的 em­mmm 程序。

虽然就代码而言有点恐怖,但是就结果而言,他还是十分耐操。至少在他寿终正寝(我们觉得实在烦了)之前,还是没有出什么 bug 的。

其实是去年就完成的东西了。现在只是正好记起来搬出来而已 XD。

寒假过去了 1/​5 了我却是在病魔中度过的,真是太浪费了呀。。

coolq 是用了天台封装的。天台就是好啊,要是我肯定才懒得封装这些东西 XD,直接暴力怼直接用 hh­h­h­hhh

代码在下面。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
# -*- coding: utf-8 -*-
import json
import telegram
from coolq import CoolQHttpAPI
from flask import Flask, request
import re
from peewee import *
from playhouse.shortcuts import model_to_dict
 
db = SqliteDatabase("Custom.db")
switch = -1  # -1 is connected,0 is cut off , 1 is juest tg to QQ ,2 is juest QQ to tg
 
 
 
class BaseModel(Model):
    class Meta:
        database = db
 
 
class FriendShip(BaseModel):
    ID = PrimaryKeyField()
    QQ = IntegerField(null=True, unique=True)
    TG = CharField(null=True, unique=True)
    NAME = CharField(null=True, unique=True)
    REGISTERED = IntegerField(null=False# if == 0 is registered.
 
    class Meta:
        db_table = "FriendShip"
 
 
FriendShip.create_table()
app = Flask(__name__)
api = CoolQHttpAPI("http://other.9bie.org:5700", access_token="")
bot = telegram.Bot(token="")
 
 
def tool_function_check_identifier(ifentifier, type):
    '''
    Query the regietered custom_ifentifier
    if is not exits,return ""
    :param ifentifier: id
    :param type: 1 ->QQ ,2 ->TG 3....more ->WEB OR OTHER?
    :return: custom_identifier
    '''
    types = {1: FriendShip.QQ, 2: FriendShip.TG}
    try:
        ret = FriendShip.select().where(types[type] == ifentifier, FriendShip.REGISTERED == 0)[0]
    except:
        return False
    return ret.NAME if type != 0 else False
 
 
def tool_function_confirm_custom_identifier(sender: int, custom_identifier: str, qq_id: int = 0, tg_name: str = ""):
    senders = {1: 2, 2: 1}
    try:
        ret = FriendShip.select().where(FriendShip.QQ == qq_id, FriendShip.TG == tg_name,
                                        FriendShip.REGISTERED == senders[sender], FriendShip.NAME == custom_identifier)[
            0].update(REGISTERED=0).execute()
    except:
        return False
 
 
def tool_function_bind_qq_tg_identifier(custom_identifier: str, qq_id: int, tg_name: str, sender: int) -> bool:
    """
    Unifed the TG and QQ 's name,to hardcore to the local file.
    :param custom_identifier: User coustom name.
    :param qq_id: qq number
    :param tg_name: tg username
    :param sender 1 ->QQ ,2 ->TG 3....more ->WEB OR OTHER?
    :return: T OR F
    """
    try:
        FriendShip.create(QQ=qq_id, TG=tg_name, NAME=custom_identifier, REGISTERED=sender)
    except:
        return False
    return True
 
 
def tool_function_txt(start_str, end, html):
    start = html.find(start_str)
    if start >= 0:
        start += len(start_str)
        end = html.find(end, start)
        if end >= 0:
            return html[start:end].strip()
 
 
def tool_function_peelphotourl(message_raw):  # return the message only and image_list
    image_list = []
    while message_raw.find("url=") != -1:
        # image_list.append(message_raw[message_raw.find("url=")+4:message_raw.find("]")])
        image_list.append(tool_function_txt("url=", "]", message_raw))
        message_raw = re.sub("\[CQ:image.*?\]", "", message_raw)
        print(message_raw)
    return message_raw, image_list
 
 
def control(message_raw, sender: int):
    if message_raw[:9] == "/register":
        param_ = message_raw.split(" ")
        if len(param_) == 4# /register name qq tg
            if tool_function_bind_qq_tg_identifier(custom_identifier=param_[1], qq_id=int(param_[2]), tg_name=param_[3],
                                                   sender=sender):
                if sender == 1:
                    api.send_group_msg(558226805, "ok.")
                else:
                    bot.send_message(chat_id=-256726247, text="ok.")
                return True
        else:
            if sender == 1:
                api.send_group_msg(558226805, "USAGE:\n/register custom_name qq_number tg_username.")
            else:
                bot.send_message(chat_id=-256726247, text="USAGE:\n/register custom_name qq_number tg_username.")
    if message_raw[:13] == "/acc_register":
        param_ = message_raw.split(" ")
        if len(param_) == 4# /register name qq tg
            if tool_function_confirm_custom_identifier(custom_identifier=param_[1], qq_id=int(param_[2]),
                                                       tg_name=param_[3], sender=sender):
                if sender == 2:
                    api.send_group_msg(558226805, "ok.")
                else:
                    bot.send_message(chat_id=-256726247, text="ok.")
                return True
        else:
            if sender == 1:
                api.send_group_msg(558226805, "USAGE:\n    /acc_register custom_name qq_number tg_username.")
            else:
                bot.send_message(chat_id=-256726247,
                                 text="USAGE:\n    /acc_register custom_name qq_number tg_username.")
    if message_raw[:len("/switch")] == "/switch":
        param_ = message_raw.split(" ")
        if len(param_) == 2# /switch -1~2
            print("[Switch]:%s"%param_[1])
            if int(param_[1]) < -1 or int(param_[1]) > 2:
                if sender == 1:
                    api.send_group_msg(558226805,
                                       "USAGE:\n    /switch -1~2\n    -1 is connect.0 is cut off.1 is juest tg to qq.2 is juest qq to tg")
                else:
                    bot.send_message(chat_id=-256726247,
                                     text="USAGE:\n    /switch -1~2\n    -1 is connect.0 is cut off.1 is juest tg to qq.2 is juest qq to tg")
            else:
                global switch
                switch = int(param_[1])
                api.send_group_msg(558226805,"OK.NOW MOD:%s"%param_[1])
                bot.send_message(chat_id=-256726247,
                                     text="OK.NOW MOD:%s"%param_[1])
        else:
            if sender == 1:
                api.send_group_msg(558226805,
                                   "USAGE:\n    /switch -1~2\n    -1 is connect.0 is cut off.1 is juest tg to qq.2 is juest qq to tg")
            else:
                bot.send_message(chat_id=-256726247,
                                 text="USAGE:\n    /switch -1~2\n    -1 is connect.0 is cut off.1 is juest tg to qq.2 is juest qq to tg")
 
 
def kernerl(data, source: int):
    # Source: 1 ->QQ ,2 ->TG 3....more ->WEB OR OTHER?
    if source == 1:
        if data["post_type"] == "message":
            if data["message_type"] == "group":
                if data["group_id"] == 558226805:
                    # check the control commond.
                    # such as "!help"  ,and the other.
                    if control(data["message"], 1):
                        # control break.
                        return
                    if switch == 0 or switch == 1:
                        print("[QQ][RECV][SWITCH]:%d" % switch)
                        return
                    msg = data["message"]
 
                    name = tool_function_check_identifier(data["user_id"], 1)
                    print("[INFO:QQ]%s-----%s" % (data["user_id"], name))
                    if name == False:
                        info = api.get_group_member_info(
                            558226805, data["user_id"])
                        name = info["card"] or info["nickname"]
                    if "CQ:at" in msg:
                        at = tool_function_check_identifier(tool_function_txt("qq=", "]", msg), 1)
                        if at == False:
                            info = api.get_group_member_info(
                                558226805, tool_function_txt("qq=", "]", msg))
                            at = info["card"] or info["nickname"]
                            msg = re.sub("\[CQ:at.*?\]", "@%s" % api.at(
                                558226805, data["user_id"]), msg)
                        else:
                            msg = re.sub("\[CQ:at.*?\]", "@%s" % at, msg)
                    if "CQ:image" in data["message"]:
                        # send msg have photo
                        msg, image_list = tool_function_peelphotourl(msg)
                        # chat_id = bot.send_message(chat_id=-256726247, text="%s:\n%s" % (
                        #     name, msg)).message_id
                        for i in image_list:
                            if msg != "":
                                bot.send_photo(chat_id=-256726247, photo=i, caption="%s: %s" % (name, msg))
                            else:
                                bot.send_photo(chat_id=-256726247, photo=i, caption="%s: Send a Photo." % (name))
                    elif "message" in data:
                        msg = re.sub("\[.*?\]", "", msg)  # Unkonw.delete
                        bot.send_message(chat_id=-256726247, text="%s: %s" % (
                            name, msg))
    if source == 2:
 
        if data["chat"]["type"] == "group":
            if data["chat"]["id"] == -256726247:
                if data["from"]["is_bot"] == False:
                    if "text" in data:  # it's text
                        if control(data["text"], 2):
                            # control break.
                            return
                        if switch == 0 or switch == 2:
                            print("[TG][RECV][SWITCH]:%d" % switch)
                            return
                        if "username" not in data["from"]:
                            name = data["from"]["first_name"] + data["from"]["last_name"]
                        else:
                            name = data["from"]["username"]
                        info = tool_function_check_identifier(name, 2)
 
                        print("[INFO:TG]%s-----%s" % (name, info))
                        if info == False:
                            info = name
                        api.send_group_msg(558226805, "%s: %s" % (info, data["text"]))
 
 
@app.route("/initbot")
def init():
    bot.set_webhook("https://other.9bie.org/")
    return "Working..."
 
 
@app.route("/", methods=["POST"])
def tg_event():
    update = telegram.Update.de_json(request.get_json(force=True), bot)
    if update.message is None:
        return "Show me your TOKEN please!"
    kernerl(update.message.to_dict(), 2)
    return ""
 
 
@app.route("/", methods=["POST"])
def qq_event():
    data = json.loads(request.data.decode("utf-8"))
    kernerl(data, 1)
    return ""
 
 
if __name__ == "__main__":
    # updater = Updater(token='')
    bot = telegram.Bot(token="")
    bot.set_webhook("https://other.9bie.org/")
    # api.send_group_msg(558226805, "QQ<-->TG<status>:\n    START!")
    # bot.send_message(chat_id=-256726247, text="QQ<-->TG<status>:\n    START!")
    app.run(host='0.0.0.0', port=8088)

COOLQ:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
# -*- coding: utf-8 -*-
import json
 
import requests
 
RETCODE = {
    100: AttributeError("100: 参数缺失或参数无效"),
    102: ValueError("102: 酷Q函数返回的数据无效"),
    103: PermissionError("103: 权限不足或文件系统异常,不符合预期"),
    104: RuntimeError("104: 酷Q凭证失效"),
    201: EnvironmentError("201: 工作线程池未正确初始化")
}
 
 
class CoolQHttpAPI:
     
    def __init__(self, addr, access_token=None):
        if addr[-1] == "/":
            self.addr = addr[:-1]
        else:
            self.addr = addr
        self.access_token = access_token
        if not self.get_status()["good"]:
            raise ConnectionError("连接到酷Q失败")
     
    def _POST(self, api, **kargs):
        headers = {"Content-Type": "application/json"}
        if self.access_token:
            headers["Authorization"] = "Token %s" % self.access_token
        try:
            resp = requests.post(
                url=self.addr + "/" + api,
                headers=headers,
                data=json.dumps(kargs),
                timeout=5)
        except requests.exceptions.ConnectionError:
            raise ConnectionError("无法连接到酷Q")
        except requests.exceptions.Timeout:
            raise TimeoutError("与酷Q通讯时发生超时")
        if resp.status_code == 404:
            raise NameError("API %s 不存在" % api)
        tmp = resp.json()
        if tmp["status"] == "async":
            return None
        if tmp["status"] == "failed":
            if tmp["retcode"] in RETCODE:
                raise RETCODE[tmp["retcode"]]
            else:
                raise NotImplementedError("%s 未知错误" % tmp["retcode"])
        return tmp["data"]
     
    def send_private_msg(self, user_id, message, auto_escape=False):
        return self._POST(
            "send_private_msg",
            user_id=user_id,
            message=message,
            auto_escape=auto_escape
        )
     
    def send_group_msg(self, group_id, message, auto_escape=False):
        return self._POST(
            "send_group_msg",
            group_id=group_id,
            message=message,
            auto_escape=auto_escape
        )
     
    def send_discuss_msg(self, discuss_id, message, auto_escape=False):
        return self._POST(
            "send_discuss_msg",
            group_id=discuss_id,
            message=message,
            auto_escape=auto_escape
        )
     
    def delete_msg(self, message_id):
        return self._POST(
            "delete_msg",
            message_id=message_id)
     
    def send_like(self, user_id, times=1):
        return self._POST(
            "send_like",
            times=times
        )
     
    def set_group_kick(self, group_id, user_id, reject_add_request=False):
        return self._POST(
            "set_group_kick",
            group_id=group_id,
            user_id=user_id,
            reject_add_request=reject_add_request
        )
 
    def set_group_ban(self, group_id, user_id, duration=60):
        return self._POST(
            "set_group_ban",
            group_id=group_id,
            user_id=user_id,
            duration=duration
        )
 
    def set_group_anonymous_ban(self, group_id, anonymous=None,
                                anonymous_flag=None, duration=30):
        if anonymous:
            return self._POST(
                "set_group_anonymous_ban",
                group_id=group_id,
                anonymous=anonymous,
                duration=duration
            )
        elif anonymous_flag:
            return self._POST(
                "set_group_anonymous_ban",
                group_id=group_id,
                anonymous_flag=anonymous_flag,
                duration=duration
            )
        else:
            raise AttributeError("anonymous与anonymous_flag二选一必填")
 
    def set_group_whole_ban(self, group_id, enable=True):
        return self._POST(
            "set_group_whole_ban",
            group_id=group_id,
            enable=enable
        )
 
    def set_group_admin(self, group_id, user_id, enable=True):
        return self._POST(
            "set_group_admin",
            group_id=group_id,
            user_id=user_id,
            enable=enable
        )
     
    def set_group_anonymous(self, group_id, enable=True):
        return self._POST(
            "set_group_anonymous",
            group_id=group_id,
            enable=enable
        )
     
    def set_group_card(self, group_id, user_id, card=""):
        return self._POST(
            "set_group_card",
            group_id=group_id,
            user_id=user_id,
            card=card
        )
     
    def set_group_leave(self, group_id, is_dismiss=False):
        return self._POST(
            "set_group_leave",
            group_id=group_id,
            is_dismiss=is_dismiss
        )
     
    def set_group_special_title(self, group_id, user_id, special_title="", duration=-1):
        return self._POST(
            "set_group_special_title",
            group_id=group_id,
            user_id=user_id,
            special_title=special_title,
            duration=duration
        )
     
    def set_discuss_leave(self, discuss_id):
        return self._POST(
            "set_discuss_leave",
            discuss_id=discuss_id
        )
     
    def set_friend_add_request(self, flag, approve=True, remark=""):
        return self._POST(
            "set_friend_add_request",
            flag=flag,
            approve=approve,
            remark=remark
        )
     
    def set_group_add_request(self, flag, sub_type, approve=True, reason=""):
        return self._POST(
            "set_group_add_request",
            flag=flag,
            sub_type=sub_type,
            approve=approve,
            reason=reason
        )
     
    def get_login_info(self):
        return self._POST("get_login_info")
     
    def get_stranger_info(self, user_id, no_cache=False):
        return self._POST(
            "get_stranger_info",
            user_id=user_id,
            no_cache=no_cache
        )
     
    def get_group_list(self):
        return self._POST("get_group_list")
     
    def get_group_member_info(self, group_id, user_id, no_cache=False):
        return self._POST(
            "get_group_member_info",
            group_id=group_id,
            user_id=user_id,
            no_cache=no_cache
        )
 
    def get_group_member_list(self, group_id):
        return self._POST(
            "get_group_member_list",
            group_id=group_id
        )
     
    def get_cookies(self):
        return self._POST("get_cookies")
 
    def get_csrf_token(self):
        return self._POST("get_csrf_token")
     
    def get_credentials(self):
        return self._POST("get_credentials")
     
    def get_record(self, file, out_format):
        return self._POST(
            "get_record",
            file=file,
            out_format=out_format
        )
     
    def get_status(self):
        return self._POST("get_status")
     
    def get_version_info(self):
        return self._POST("get_version_info")
     
    def set_restart(self, clean_log=False, clean_cache=False, clean_event=False):
        return self._POST(
            "set_restart",
            clean_log=clean_log,
            clean_cache=clean_cache,
            clean_event=clean_event
        )
     
    def set_restart_plugin(self, delay=0):
        return self._POST(
            "set_restart_plugin",
            delay=delay
        )
     
    def clean_data_dir(self, data_dir):
        return self._POST(
            "clean_data_dir",
            data_dir=data_dir
        )
     
    def clean_plugin_log(self):
        return self._POST("clean_plugin_log")
 
 
if __name__ == "__main__":
    api = CoolQHttpAPI("http://127.0.0.1:5700")
    api.send_private_msg(524543577, "Hello"

添加新评论

已有 1 条评论

这是啥东西~