前言
开发周期一天半。脑袋里想着这东西写完估计再也不会碰了。于是乎就完全没有在意代码风格和编写习惯,最终造成了如此鬼畜的emmmm程序。
虽然就代码而言有点恐怖,但是就结果而言,他还是十分耐操。至少在他寿终正寝(我们觉得实在烦了)之前,还是没有出什么bug的。
其实是去年就完成的东西了。现在只是正好记起来搬出来而已XD。
寒假过去了1/5了我却是在病魔中度过的,真是太浪费了呀。。
coolq是用了天台封装的。天台就是好啊,要是我肯定才懒得封装这些东西XD,直接暴力怼直接用hhhhhhh
代码在下面。
# -*- coding: utf-8 -*-
import json
import telegram
from coolq import CoolQHttpAPI
from flask import Flask, request
import re
from peewee import *
from playhouse.shortcuts import model_to_dict
db = SqliteDatabase("Custom.db")
switch = -1 # -1 is connected,0 is cut off , 1 is juest tg to QQ ,2 is juest QQ to tg
class BaseModel(Model):
class Meta:
database = db
class FriendShip(BaseModel):
ID = PrimaryKeyField()
QQ = IntegerField(null=True, unique=True)
TG = CharField(null=True, unique=True)
NAME = CharField(null=True, unique=True)
REGISTERED = IntegerField(null=False) # if == 0 is registered.
class Meta:
db_table = "FriendShip"
FriendShip.create_table()
app = Flask(__name__)
api = CoolQHttpAPI("http://other.9bie.org:5700", access_token="")
bot = telegram.Bot(token="")
def tool_function_check_identifier(ifentifier, type):
'''
Query the regietered custom_ifentifier
if is not exits,return ""
:param ifentifier: id
:param type: 1 ->QQ ,2 ->TG 3....more ->WEB OR OTHER?
:return: custom_identifier
'''
types = {1: FriendShip.QQ, 2: FriendShip.TG}
try:
ret = FriendShip.select().where(types[type] == ifentifier, FriendShip.REGISTERED == 0)[0]
except:
return False
return ret.NAME if type != 0 else False
def tool_function_confirm_custom_identifier(sender: int, custom_identifier: str, qq_id: int = 0, tg_name: str = ""):
senders = {1: 2, 2: 1}
try:
ret = FriendShip.select().where(FriendShip.QQ == qq_id, FriendShip.TG == tg_name,
FriendShip.REGISTERED == senders[sender], FriendShip.NAME == custom_identifier)[
0].update(REGISTERED=0).execute()
except:
return False
def tool_function_bind_qq_tg_identifier(custom_identifier: str, qq_id: int, tg_name: str, sender: int) -> bool:
"""
Unifed the TG and QQ 's name,to hardcore to the local file.
:param custom_identifier: User coustom name.
:param qq_id: qq number
:param tg_name: tg username
:param sender 1 ->QQ ,2 ->TG 3....more ->WEB OR OTHER?
:return: T OR F
"""
try:
FriendShip.create(QQ=qq_id, TG=tg_name, NAME=custom_identifier, REGISTERED=sender)
except:
return False
return True
def tool_function_txt(start_str, end, html):
start = html.find(start_str)
if start >= 0:
start += len(start_str)
end = html.find(end, start)
if end >= 0:
return html[start:end].strip()
def tool_function_peelphotourl(message_raw): # return the message only and image_list
image_list = []
while message_raw.find("url=") != -1:
# image_list.append(message_raw[message_raw.find("url=")+4:message_raw.find("]")])
image_list.append(tool_function_txt("url=", "]", message_raw))
message_raw = re.sub("\[CQ:image.*?\]", "", message_raw)
print(message_raw)
return message_raw, image_list
def control(message_raw, sender: int):
if message_raw[:9] == "/register":
param_ = message_raw.split(" ")
if len(param_) == 4: # /register name qq tg
if tool_function_bind_qq_tg_identifier(custom_identifier=param_[1], qq_id=int(param_[2]), tg_name=param_[3],
sender=sender):
if sender == 1:
api.send_group_msg(558226805, "ok.")
else:
bot.send_message(chat_id=-256726247, text="ok.")
return True
else:
if sender == 1:
api.send_group_msg(558226805, "USAGE:\n/register custom_name qq_number tg_username.")
else:
bot.send_message(chat_id=-256726247, text="USAGE:\n/register custom_name qq_number tg_username.")
if message_raw[:13] == "/acc_register":
param_ = message_raw.split(" ")
if len(param_) == 4: # /register name qq tg
if tool_function_confirm_custom_identifier(custom_identifier=param_[1], qq_id=int(param_[2]),
tg_name=param_[3], sender=sender):
if sender == 2:
api.send_group_msg(558226805, "ok.")
else:
bot.send_message(chat_id=-256726247, text="ok.")
return True
else:
if sender == 1:
api.send_group_msg(558226805, "USAGE:\n /acc_register custom_name qq_number tg_username.")
else:
bot.send_message(chat_id=-256726247,
text="USAGE:\n /acc_register custom_name qq_number tg_username.")
if message_raw[:len("/switch")] == "/switch":
param_ = message_raw.split(" ")
if len(param_) == 2: # /switch -1~2
print("[Switch]:%s"%param_[1])
if int(param_[1]) < -1 or int(param_[1]) > 2:
if sender == 1:
api.send_group_msg(558226805,
"USAGE:\n /switch -1~2\n -1 is connect.0 is cut off.1 is juest tg to qq.2 is juest qq to tg")
else:
bot.send_message(chat_id=-256726247,
text="USAGE:\n /switch -1~2\n -1 is connect.0 is cut off.1 is juest tg to qq.2 is juest qq to tg")
else:
global switch
switch = int(param_[1])
api.send_group_msg(558226805,"OK.NOW MOD:%s"%param_[1])
bot.send_message(chat_id=-256726247,
text="OK.NOW MOD:%s"%param_[1])
else:
if sender == 1:
api.send_group_msg(558226805,
"USAGE:\n /switch -1~2\n -1 is connect.0 is cut off.1 is juest tg to qq.2 is juest qq to tg")
else:
bot.send_message(chat_id=-256726247,
text="USAGE:\n /switch -1~2\n -1 is connect.0 is cut off.1 is juest tg to qq.2 is juest qq to tg")
def kernerl(data, source: int):
# Source: 1 ->QQ ,2 ->TG 3....more ->WEB OR OTHER?
if source == 1:
if data["post_type"] == "message":
if data["message_type"] == "group":
if data["group_id"] == 558226805:
# check the control commond.
# such as "!help" ,and the other.
if control(data["message"], 1):
# control break.
return
if switch == 0 or switch == 1:
print("[QQ][RECV][SWITCH]:%d" % switch)
return
msg = data["message"]
name = tool_function_check_identifier(data["user_id"], 1)
print("[INFO:QQ]%s-----%s" % (data["user_id"], name))
if name == False:
info = api.get_group_member_info(
558226805, data["user_id"])
name = info["card"] or info["nickname"]
if "CQ:at" in msg:
at = tool_function_check_identifier(tool_function_txt("qq=", "]", msg), 1)
if at == False:
info = api.get_group_member_info(
558226805, tool_function_txt("qq=", "]", msg))
at = info["card"] or info["nickname"]
msg = re.sub("\[CQ:at.*?\]", "@%s" % api.at(
558226805, data["user_id"]), msg)
else:
msg = re.sub("\[CQ:at.*?\]", "@%s" % at, msg)
if "CQ:image" in data["message"]:
# send msg have photo
msg, image_list = tool_function_peelphotourl(msg)
# chat_id = bot.send_message(chat_id=-256726247, text="%s:\n%s" % (
# name, msg)).message_id
for i in image_list:
if msg != "":
bot.send_photo(chat_id=-256726247, photo=i, caption="%s: %s" % (name, msg))
else:
bot.send_photo(chat_id=-256726247, photo=i, caption="%s: Send a Photo." % (name))
elif "message" in data:
msg = re.sub("\[.*?\]", "", msg) # Unkonw.delete
bot.send_message(chat_id=-256726247, text="%s: %s" % (
name, msg))
if source == 2:
if data["chat"]["type"] == "group":
if data["chat"]["id"] == -256726247:
if data["from"]["is_bot"] == False:
if "text" in data: # it's text
if control(data["text"], 2):
# control break.
return
if switch == 0 or switch == 2:
print("[TG][RECV][SWITCH]:%d" % switch)
return
if "username" not in data["from"]:
name = data["from"]["first_name"] + data["from"]["last_name"]
else:
name = data["from"]["username"]
info = tool_function_check_identifier(name, 2)
print("[INFO:TG]%s-----%s" % (name, info))
if info == False:
info = name
api.send_group_msg(558226805, "%s: %s" % (info, data["text"]))
@app.route("/initbot")
def init():
bot.set_webhook("https://other.9bie.org/")
return "Working..."
@app.route("/", methods=["POST"])
def tg_event():
update = telegram.Update.de_json(request.get_json(force=True), bot)
if update.message is None:
return "Show me your TOKEN please!"
kernerl(update.message.to_dict(), 2)
return ""
@app.route("/", methods=["POST"])
def qq_event():
data = json.loads(request.data.decode("utf-8"))
kernerl(data, 1)
return ""
if __name__ == "__main__":
# updater = Updater(token='')
bot = telegram.Bot(token="")
bot.set_webhook("https://other.9bie.org/")
# api.send_group_msg(558226805, "QQ<-->TG<status>:\n START!")
# bot.send_message(chat_id=-256726247, text="QQ<-->TG<status>:\n START!")
app.run(host='0.0.0.0', port=8088)
COOLQ:
# -*- coding: utf-8 -*-
import json
import requests
RETCODE = {
100: AttributeError("100: 参数缺失或参数无效"),
102: ValueError("102: 酷Q函数返回的数据无效"),
103: PermissionError("103: 权限不足或文件系统异常,不符合预期"),
104: RuntimeError("104: 酷Q凭证失效"),
201: EnvironmentError("201: 工作线程池未正确初始化")
}
class CoolQHttpAPI:
def __init__(self, addr, access_token=None):
if addr[-1] == "/":
self.addr = addr[:-1]
else:
self.addr = addr
self.access_token = access_token
if not self.get_status()["good"]:
raise ConnectionError("连接到酷Q失败")
def _POST(self, api, **kargs):
headers = {"Content-Type": "application/json"}
if self.access_token:
headers["Authorization"] = "Token %s" % self.access_token
try:
resp = requests.post(
url=self.addr + "/" + api,
headers=headers,
data=json.dumps(kargs),
timeout=5)
except requests.exceptions.ConnectionError:
raise ConnectionError("无法连接到酷Q")
except requests.exceptions.Timeout:
raise TimeoutError("与酷Q通讯时发生超时")
if resp.status_code == 404:
raise NameError("API %s 不存在" % api)
tmp = resp.json()
if tmp["status"] == "async":
return None
if tmp["status"] == "failed":
if tmp["retcode"] in RETCODE:
raise RETCODE[tmp["retcode"]]
else:
raise NotImplementedError("%s 未知错误" % tmp["retcode"])
return tmp["data"]
def send_private_msg(self, user_id, message, auto_escape=False):
return self._POST(
"send_private_msg",
user_id=user_id,
message=message,
auto_escape=auto_escape
)
def send_group_msg(self, group_id, message, auto_escape=False):
return self._POST(
"send_group_msg",
group_id=group_id,
message=message,
auto_escape=auto_escape
)
def send_discuss_msg(self, discuss_id, message, auto_escape=False):
return self._POST(
"send_discuss_msg",
group_id=discuss_id,
message=message,
auto_escape=auto_escape
)
def delete_msg(self, message_id):
return self._POST(
"delete_msg",
message_id=message_id)
def send_like(self, user_id, times=1):
return self._POST(
"send_like",
times=times
)
def set_group_kick(self, group_id, user_id, reject_add_request=False):
return self._POST(
"set_group_kick",
group_id=group_id,
user_id=user_id,
reject_add_request=reject_add_request
)
def set_group_ban(self, group_id, user_id, duration=60):
return self._POST(
"set_group_ban",
group_id=group_id,
user_id=user_id,
duration=duration
)
def set_group_anonymous_ban(self, group_id, anonymous=None,
anonymous_flag=None, duration=30):
if anonymous:
return self._POST(
"set_group_anonymous_ban",
group_id=group_id,
anonymous=anonymous,
duration=duration
)
elif anonymous_flag:
return self._POST(
"set_group_anonymous_ban",
group_id=group_id,
anonymous_flag=anonymous_flag,
duration=duration
)
else:
raise AttributeError("anonymous与anonymous_flag二选一必填")
def set_group_whole_ban(self, group_id, enable=True):
return self._POST(
"set_group_whole_ban",
group_id=group_id,
enable=enable
)
def set_group_admin(self, group_id, user_id, enable=True):
return self._POST(
"set_group_admin",
group_id=group_id,
user_id=user_id,
enable=enable
)
def set_group_anonymous(self, group_id, enable=True):
return self._POST(
"set_group_anonymous",
group_id=group_id,
enable=enable
)
def set_group_card(self, group_id, user_id, card=""):
return self._POST(
"set_group_card",
group_id=group_id,
user_id=user_id,
card=card
)
def set_group_leave(self, group_id, is_dismiss=False):
return self._POST(
"set_group_leave",
group_id=group_id,
is_dismiss=is_dismiss
)
def set_group_special_title(self, group_id, user_id, special_title="", duration=-1):
return self._POST(
"set_group_special_title",
group_id=group_id,
user_id=user_id,
special_title=special_title,
duration=duration
)
def set_discuss_leave(self, discuss_id):
return self._POST(
"set_discuss_leave",
discuss_id=discuss_id
)
def set_friend_add_request(self, flag, approve=True, remark=""):
return self._POST(
"set_friend_add_request",
flag=flag,
approve=approve,
remark=remark
)
def set_group_add_request(self, flag, sub_type, approve=True, reason=""):
return self._POST(
"set_group_add_request",
flag=flag,
sub_type=sub_type,
approve=approve,
reason=reason
)
def get_login_info(self):
return self._POST("get_login_info")
def get_stranger_info(self, user_id, no_cache=False):
return self._POST(
"get_stranger_info",
user_id=user_id,
no_cache=no_cache
)
def get_group_list(self):
return self._POST("get_group_list")
def get_group_member_info(self, group_id, user_id, no_cache=False):
return self._POST(
"get_group_member_info",
group_id=group_id,
user_id=user_id,
no_cache=no_cache
)
def get_group_member_list(self, group_id):
return self._POST(
"get_group_member_list",
group_id=group_id
)
def get_cookies(self):
return self._POST("get_cookies")
def get_csrf_token(self):
return self._POST("get_csrf_token")
def get_credentials(self):
return self._POST("get_credentials")
def get_record(self, file, out_format):
return self._POST(
"get_record",
file=file,
out_format=out_format
)
def get_status(self):
return self._POST("get_status")
def get_version_info(self):
return self._POST("get_version_info")
def set_restart(self, clean_log=False, clean_cache=False, clean_event=False):
return self._POST(
"set_restart",
clean_log=clean_log,
clean_cache=clean_cache,
clean_event=clean_event
)
def set_restart_plugin(self, delay=0):
return self._POST(
"set_restart_plugin",
delay=delay
)
def clean_data_dir(self, data_dir):
return self._POST(
"clean_data_dir",
data_dir=data_dir
)
def clean_plugin_log(self):
return self._POST("clean_plugin_log")
if __name__ == "__main__":
api = CoolQHttpAPI("http://127.0.0.1:5700")
api.send_private_msg(524543577, "Hello"
这是啥东西~