我编写了ding()
函数,将消息发送到阿里巴巴应用程序DingTalk中的聊天组。函数通过将消息发布到url来发送消息,类似于requests.post(url, message)
当我运行以下代码时(其中requests.post()
嵌入在on_message()
),我通过ding()
发送的消息将无法通过,并且没有错误消息。
但是当我单独使用函数ding()
时,它工作得非常好。
我想知道ding()
函数中的requests.post
是否与websocket连接冲突。你知道吗
ding()
定义如下:
def ding(subject_text):
dingding_url = "https://oapi.dingtalk.com/robot/send?access_token=fe********************b1"
headers = {"Content-Type": "application/json; charset=utf-8"}
post_data = {
"msgtype": "text",
"text": {
"content": subject_text+'#'
},
"at": {
"atMobiles": ["185xxxx3281"]
}
}
requests.post(dingding_url, headers=headers, data=json.dumps(post_data))
ding()
嵌入在on_message()
中,如下所示:
def on_message(ws, message):
try:
ding(message)
except Exception as e:
print(e)
on_message()
在下面的WebSocketApp中使用(其中MySocketss是WebSocketApp的子类):
class MySocketss(WebSocketApp):
def _send_ping(self, interval, event):
while not event.wait(interval):
self.sock.last_ping_tm = time.time()
if self.sock:
try:
self.sock.send('2')
except Exception as ex:
print(ex)
break
ws = MySocketss('wss://sscpgoelik.jin10.com:8082/socket.io/?EIO=3&transport=websocket&sid=%s' % a,
on_message=on_message,
on_error=on_error,
on_close=on_close)
如果您需要更多细节和全貌(尽管可能与问题无关),完整代码如下:
def ding(subject_text):
dingding_url = "https://oapi.dingtalk.com/robot/send?access_token=feb01e99e53b6f7747455239d0ccc11525e57b8ef8bb1f7b5b16f0e17d25ebb1"
headers = {"Content-Type": "application/json; charset=utf-8"}
post_data = {
"msgtype": "text",
"text": {
"content": subject_text+'#'
},
"at": {
"atMobiles": ["185xxxx3281"]
}
}
requests.post(dingding_url, headers=headers, data=json.dumps(post_data))
class MySocketss(WebSocketApp):
def _send_ping(self, interval, event):
while not event.wait(interval):
self.sock.last_ping_tm = time.time()
if self.sock:
try:
self.sock.send('2')
except Exception as ex:
print(ex)
break
def on_close(ws):
print("Connection closed ……")
def on_message(ws, message):
try:
ding(message)
except Exception as e:
print(e)
def on_error(ws, error):
print(error)
def on_open(ws):
ws.send('2probe')
ws.send('5')
ws.send('2')
if __name__ == "__main__":
s=requests.session()
headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/63.0.3239.132 Safari/537.36'}
x = s.get('https://sscpgoelik.jin10.com:8082/socket.io/?EIO=3&transport=polling&t=Mu6fSmG', headers=headers)
a = re.compile('sid":"(.*?)"').findall(x.text)[0]
s.get('https://sscpgoelik.jin10.com:8082/socket.io/?EIO=3&transport=polling&t=Mu6fSp8&sid=%s' % a,
headers=headers)
s.post('https://sscpgoelik.jin10.com:8082/socket.io/?EIO=3&transport=polling&t=Mu6fSpS&sid=%s' % a,
headers=headers,data='27:420["switch_channel",-8200]')
s.get('https://sscpgoelik.jin10.com:8082/socket.io/?EIO=3&transport=polling&t=Mu6fSpU&sid=%s' % a,
headers=headers)
ws = MySocketss('wss://sscpgoelik.jin10.com:8082/socket.io/?EIO=3&transport=websocket&sid=%s' % a,
on_message=on_message,
on_error=on_error,
on_close=on_close)
ws.on_open = on_open
ws.run_forever(ping_interval=15)
有人知道它为什么不起作用以及如何修复吗?你知道吗
目前没有回答
相关问题 更多 >
编程相关推荐