前言
开发周期一天半。脑袋里想着这东西写完估计再也不会碰了。于是乎就完全没有在意代码风格和编写习惯,最终造成了如此鬼畜的 emmmm 程序。
虽然就代码而言有点恐怖,但是就结果而言,他还是十分耐操。至少在他寿终正寝(我们觉得实在烦了)之前,还是没有出什么 bug 的。
其实是去年就完成的东西了。现在只是正好记起来搬出来而已 XD。
寒假过去了 1/5 了我却是在病魔中度过的,真是太浪费了呀。。
coolq 是用了天台封装的。天台就是好啊,要是我肯定才懒得封装这些东西 XD,直接暴力怼直接用 hhhhhhh
代码在下面。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 | # -*- coding: utf-8 -*- import json import telegram from coolq import CoolQHttpAPI from flask import Flask, request import re from peewee import * from playhouse.shortcuts import model_to_dict db = SqliteDatabase( "Custom.db" ) switch = - 1 # -1 is connected,0 is cut off , 1 is juest tg to QQ ,2 is juest QQ to tg class BaseModel(Model): class Meta: database = db class FriendShip(BaseModel): ID = PrimaryKeyField() QQ = IntegerField(null = True , unique = True ) TG = CharField(null = True , unique = True ) NAME = CharField(null = True , unique = True ) REGISTERED = IntegerField(null = False ) # if == 0 is registered. class Meta: db_table = "FriendShip" FriendShip.create_table() app = Flask(__name__) bot = telegram.Bot(token = "") def tool_function_check_identifier(ifentifier, type ): ''' Query the regietered custom_ifentifier if is not exits,return "" :param ifentifier: id :param type: 1 ->QQ ,2 ->TG 3....more ->WEB OR OTHER? :return: custom_identifier ''' types = { 1 : FriendShip.QQ, 2 : FriendShip.TG} try : ret = FriendShip.select().where(types[ type ] = = ifentifier, FriendShip.REGISTERED = = 0 )[ 0 ] except : return False return ret.NAME if type ! = 0 else False def tool_function_confirm_custom_identifier(sender: int , custom_identifier: str , qq_id: int = 0 , tg_name: str = ""): senders = { 1 : 2 , 2 : 1 } try : ret = FriendShip.select().where(FriendShip.QQ = = qq_id, FriendShip.TG = = tg_name, FriendShip.REGISTERED = = senders[sender], FriendShip.NAME = = custom_identifier)[ 0 ].update(REGISTERED = 0 ).execute() except : return False def tool_function_bind_qq_tg_identifier(custom_identifier: str , qq_id: int , tg_name: str , sender: int ) - > bool : """ Unifed the TG and QQ 's name,to hardcore to the local file. :param custom_identifier: User coustom name. :param qq_id: qq number :param tg_name: tg username :param sender 1 ->QQ ,2 ->TG 3....more ->WEB OR OTHER? :return: T OR F """ try : FriendShip.create(QQ = qq_id, TG = tg_name, NAME = custom_identifier, REGISTERED = sender) except : return False return True def tool_function_txt(start_str, end, html): start = html.find(start_str) if start > = 0 : start + = len (start_str) end = html.find(end, start) if end > = 0 : return html[start:end].strip() def tool_function_peelphotourl(message_raw): # return the message only and image_list image_list = [] while message_raw.find( "url=" ) ! = - 1 : # image_list.append(message_raw[message_raw.find("url=")+4:message_raw.find("]")]) image_list.append(tool_function_txt( "url=" , "]" , message_raw)) message_raw = re.sub( "\[CQ:image.*?\]" , "", message_raw) print (message_raw) return message_raw, image_list def control(message_raw, sender: int ): if message_raw[: 9 ] = = "/register" : param_ = message_raw.split( " " ) if len (param_) = = 4 : # /register name qq tg if tool_function_bind_qq_tg_identifier(custom_identifier = param_[ 1 ], qq_id = int (param_[ 2 ]), tg_name = param_[ 3 ], sender = sender): if sender = = 1 : api.send_group_msg( 558226805 , "ok." ) else : bot.send_message(chat_id = - 256726247 , text = "ok." ) return True else : if sender = = 1 : api.send_group_msg( 558226805 , "USAGE:\n/register custom_name qq_number tg_username." ) else : bot.send_message(chat_id = - 256726247 , text = "USAGE:\n/register custom_name qq_number tg_username." ) if message_raw[: 13 ] = = "/acc_register" : param_ = message_raw.split( " " ) if len (param_) = = 4 : # /register name qq tg if tool_function_confirm_custom_identifier(custom_identifier = param_[ 1 ], qq_id = int (param_[ 2 ]), tg_name = param_[ 3 ], sender = sender): if sender = = 2 : api.send_group_msg( 558226805 , "ok." ) else : bot.send_message(chat_id = - 256726247 , text = "ok." ) return True else : if sender = = 1 : api.send_group_msg( 558226805 , "USAGE:\n /acc_register custom_name qq_number tg_username." ) else : bot.send_message(chat_id = - 256726247 , text = "USAGE:\n /acc_register custom_name qq_number tg_username." ) if message_raw[: len ( "/switch" )] = = "/switch" : param_ = message_raw.split( " " ) if len (param_) = = 2 : # /switch -1~2 print ( "[Switch]:%s" % param_[ 1 ]) if int (param_[ 1 ]) < - 1 or int (param_[ 1 ]) > 2 : if sender = = 1 : api.send_group_msg( 558226805 , "USAGE:\n /switch -1~2\n -1 is connect.0 is cut off.1 is juest tg to qq.2 is juest qq to tg" ) else : bot.send_message(chat_id = - 256726247 , text = "USAGE:\n /switch -1~2\n -1 is connect.0 is cut off.1 is juest tg to qq.2 is juest qq to tg" ) else : global switch switch = int (param_[ 1 ]) api.send_group_msg( 558226805 , "OK.NOW MOD:%s" % param_[ 1 ]) bot.send_message(chat_id = - 256726247 , text = "OK.NOW MOD:%s" % param_[ 1 ]) else : if sender = = 1 : api.send_group_msg( 558226805 , "USAGE:\n /switch -1~2\n -1 is connect.0 is cut off.1 is juest tg to qq.2 is juest qq to tg" ) else : bot.send_message(chat_id = - 256726247 , text = "USAGE:\n /switch -1~2\n -1 is connect.0 is cut off.1 is juest tg to qq.2 is juest qq to tg" ) def kernerl(data, source: int ): # Source: 1 ->QQ ,2 ->TG 3....more ->WEB OR OTHER? if source = = 1 : if data[ "post_type" ] = = "message" : if data[ "message_type" ] = = "group" : if data[ "group_id" ] = = 558226805 : # check the control commond. # such as "!help" ,and the other. if control(data[ "message" ], 1 ): # control break. return if switch = = 0 or switch = = 1 : print ( "[QQ][RECV][SWITCH]:%d" % switch) return msg = data[ "message" ] name = tool_function_check_identifier(data[ "user_id" ], 1 ) print ( "[INFO:QQ]%s-----%s" % (data[ "user_id" ], name)) if name = = False : info = api.get_group_member_info( 558226805 , data[ "user_id" ]) name = info[ "card" ] or info[ "nickname" ] if "CQ:at" in msg: at = tool_function_check_identifier(tool_function_txt( "qq=" , "]" , msg), 1 ) if at = = False : info = api.get_group_member_info( 558226805 , tool_function_txt( "qq=" , "]" , msg)) at = info[ "card" ] or info[ "nickname" ] msg = re.sub( "\[CQ:at.*?\]" , "@%s" % api.at( 558226805 , data[ "user_id" ]), msg) else : msg = re.sub( "\[CQ:at.*?\]" , "@%s" % at, msg) if "CQ:image" in data[ "message" ]: # send msg have photo msg, image_list = tool_function_peelphotourl(msg) # chat_id = bot.send_message(chat_id=-256726247, text="%s:\n%s" % ( # name, msg)).message_id for i in image_list: if msg ! = "": bot.send_photo(chat_id = - 256726247 , photo = i, caption = "%s: %s" % (name, msg)) else : bot.send_photo(chat_id = - 256726247 , photo = i, caption = "%s: Send a Photo." % (name)) elif "message" in data: msg = re.sub( "\[.*?\]" , "", msg) # Unkonw.delete bot.send_message(chat_id = - 256726247 , text = "%s: %s" % ( name, msg)) if source = = 2 : if data[ "chat" ][ "type" ] = = "group" : if data[ "chat" ][ "id" ] = = - 256726247 : if data[ "from" ][ "is_bot" ] = = False : if "text" in data: # it's text if control(data[ "text" ], 2 ): # control break. return if switch = = 0 or switch = = 2 : print ( "[TG][RECV][SWITCH]:%d" % switch) return if "username" not in data[ "from" ]: name = data[ "from" ][ "first_name" ] + data[ "from" ][ "last_name" ] else : name = data[ "from" ][ "username" ] info = tool_function_check_identifier(name, 2 ) print ( "[INFO:TG]%s-----%s" % (name, info)) if info = = False : info = name api.send_group_msg( 558226805 , "%s: %s" % (info, data[ "text" ])) @app .route( "/initbot" ) def init(): return "Working..." @app .route( "/" , methods = [ "POST" ]) def tg_event(): update = telegram.Update.de_json(request.get_json(force = True ), bot) if update.message is None : return "Show me your TOKEN please!" kernerl(update.message.to_dict(), 2 ) return "" @app .route( "/" , methods = [ "POST" ]) def qq_event(): data = json.loads(request.data.decode( "utf-8" )) kernerl(data, 1 ) return "" if __name__ = = "__main__" : # updater = Updater(token='') bot = telegram.Bot(token = "") # api.send_group_msg(558226805, "QQ<-->TG<status>:\n START!") # bot.send_message(chat_id=-256726247, text="QQ<-->TG<status>:\n START!") app.run(host = '0.0.0.0' , port = 8088 ) |
COOLQ:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 | # -*- coding: utf-8 -*- import json import requests RETCODE = { 100 : AttributeError( "100: 参数缺失或参数无效" ), 102 : ValueError( "102: 酷Q函数返回的数据无效" ), 103 : PermissionError( "103: 权限不足或文件系统异常,不符合预期" ), 104 : RuntimeError( "104: 酷Q凭证失效" ), 201 : EnvironmentError( "201: 工作线程池未正确初始化" ) } class CoolQHttpAPI: def __init__( self , addr, access_token = None ): if addr[ - 1 ] = = "/" : self .addr = addr[: - 1 ] else : self .addr = addr self .access_token = access_token if not self .get_status()[ "good" ]: raise ConnectionError( "连接到酷Q失败" ) def _POST( self , api, * * kargs): headers = { "Content-Type" : "application/json" } if self .access_token: headers[ "Authorization" ] = "Token %s" % self .access_token try : resp = requests.post( url = self .addr + "/" + api, headers = headers, data = json.dumps(kargs), timeout = 5 ) except requests.exceptions.ConnectionError: raise ConnectionError( "无法连接到酷Q" ) except requests.exceptions.Timeout: raise TimeoutError( "与酷Q通讯时发生超时" ) if resp.status_code = = 404 : raise NameError( "API %s 不存在" % api) tmp = resp.json() if tmp[ "status" ] = = "async" : return None if tmp[ "status" ] = = "failed" : if tmp[ "retcode" ] in RETCODE: raise RETCODE[tmp[ "retcode" ]] else : raise NotImplementedError( "%s 未知错误" % tmp[ "retcode" ]) return tmp[ "data" ] def send_private_msg( self , user_id, message, auto_escape = False ): return self ._POST( "send_private_msg" , user_id = user_id, message = message, auto_escape = auto_escape ) def send_group_msg( self , group_id, message, auto_escape = False ): return self ._POST( "send_group_msg" , group_id = group_id, message = message, auto_escape = auto_escape ) def send_discuss_msg( self , discuss_id, message, auto_escape = False ): return self ._POST( "send_discuss_msg" , group_id = discuss_id, message = message, auto_escape = auto_escape ) def delete_msg( self , message_id): return self ._POST( "delete_msg" , message_id = message_id) def send_like( self , user_id, times = 1 ): return self ._POST( "send_like" , times = times ) def set_group_kick( self , group_id, user_id, reject_add_request = False ): return self ._POST( "set_group_kick" , group_id = group_id, user_id = user_id, reject_add_request = reject_add_request ) def set_group_ban( self , group_id, user_id, duration = 60 ): return self ._POST( "set_group_ban" , group_id = group_id, user_id = user_id, duration = duration ) def set_group_anonymous_ban( self , group_id, anonymous = None , anonymous_flag = None , duration = 30 ): if anonymous: return self ._POST( "set_group_anonymous_ban" , group_id = group_id, anonymous = anonymous, duration = duration ) elif anonymous_flag: return self ._POST( "set_group_anonymous_ban" , group_id = group_id, anonymous_flag = anonymous_flag, duration = duration ) else : raise AttributeError( "anonymous与anonymous_flag二选一必填" ) def set_group_whole_ban( self , group_id, enable = True ): return self ._POST( "set_group_whole_ban" , group_id = group_id, enable = enable ) def set_group_admin( self , group_id, user_id, enable = True ): return self ._POST( "set_group_admin" , group_id = group_id, user_id = user_id, enable = enable ) def set_group_anonymous( self , group_id, enable = True ): return self ._POST( "set_group_anonymous" , group_id = group_id, enable = enable ) def set_group_card( self , group_id, user_id, card = ""): return self ._POST( "set_group_card" , group_id = group_id, user_id = user_id, card = card ) def set_group_leave( self , group_id, is_dismiss = False ): return self ._POST( "set_group_leave" , group_id = group_id, is_dismiss = is_dismiss ) def set_group_special_title( self , group_id, user_id, special_title = "", duration = - 1 ): return self ._POST( "set_group_special_title" , group_id = group_id, user_id = user_id, special_title = special_title, duration = duration ) def set_discuss_leave( self , discuss_id): return self ._POST( "set_discuss_leave" , discuss_id = discuss_id ) def set_friend_add_request( self , flag, approve = True , remark = ""): return self ._POST( "set_friend_add_request" , flag = flag, approve = approve, remark = remark ) def set_group_add_request( self , flag, sub_type, approve = True , reason = ""): return self ._POST( "set_group_add_request" , flag = flag, sub_type = sub_type, approve = approve, reason = reason ) def get_login_info( self ): return self ._POST( "get_login_info" ) def get_stranger_info( self , user_id, no_cache = False ): return self ._POST( "get_stranger_info" , user_id = user_id, no_cache = no_cache ) def get_group_list( self ): return self ._POST( "get_group_list" ) def get_group_member_info( self , group_id, user_id, no_cache = False ): return self ._POST( "get_group_member_info" , group_id = group_id, user_id = user_id, no_cache = no_cache ) def get_group_member_list( self , group_id): return self ._POST( "get_group_member_list" , group_id = group_id ) def get_cookies( self ): return self ._POST( "get_cookies" ) def get_csrf_token( self ): return self ._POST( "get_csrf_token" ) def get_credentials( self ): return self ._POST( "get_credentials" ) def get_record( self , file , out_format): return self ._POST( "get_record" , file = file , out_format = out_format ) def get_status( self ): return self ._POST( "get_status" ) def get_version_info( self ): return self ._POST( "get_version_info" ) def set_restart( self , clean_log = False , clean_cache = False , clean_event = False ): return self ._POST( "set_restart" , clean_log = clean_log, clean_cache = clean_cache, clean_event = clean_event ) def set_restart_plugin( self , delay = 0 ): return self ._POST( "set_restart_plugin" , delay = delay ) def clean_data_dir( self , data_dir): return self ._POST( "clean_data_dir" , data_dir = data_dir ) def clean_plugin_log( self ): return self ._POST( "clean_plugin_log" ) if __name__ = = "__main__" : api.send_private_msg( 524543577 , "Hello" |
这是啥东西~