请求.post()在python websocketapp的\u消息方法中不起作用

2024-06-16 11:50:49 发布

您现在位置:Python中文网/ 问答频道 /正文

我编写了ding()函数,将消息发送到阿里巴巴应用程序DingTalk中的聊天组。函数通过将消息发布到url来发送消息,类似于requests.post(url, message)

当我运行以下代码时(其中requests.post()嵌入在on_message()),我通过ding()发送的消息将无法通过,并且没有错误消息。 但是当我单独使用函数ding()时,它工作得非常好。 我想知道ding()函数中的requests.post是否与websocket连接冲突。你知道吗

ding()定义如下:

def ding(subject_text):
    dingding_url = "https://oapi.dingtalk.com/robot/send?access_token=fe********************b1" 
    headers = {"Content-Type": "application/json; charset=utf-8"}
    post_data = {
        "msgtype": "text", 
        "text": {
            "content": subject_text+'#'
        },
        "at": {
            "atMobiles": ["185xxxx3281"]
        }
    }
    requests.post(dingding_url, headers=headers, data=json.dumps(post_data))

ding()嵌入在on_message()中,如下所示:

def on_message(ws, message): 
    try:
        ding(message)           
    except Exception as e:
        print(e)    

on_message()在下面的WebSocketApp中使用(其中MySocketss是WebSocketApp的子类):

class MySocketss(WebSocketApp):
    def _send_ping(self, interval, event):
        while not event.wait(interval):
            self.sock.last_ping_tm = time.time()
            if self.sock:
                try:
                    self.sock.send('2')
                except Exception as ex:
                    print(ex)
                    break

ws = MySocketss('wss://sscpgoelik.jin10.com:8082/socket.io/?EIO=3&transport=websocket&sid=%s' % a,
                              on_message=on_message,
                              on_error=on_error,
                              on_close=on_close)

如果您需要更多细节和全貌(尽管可能与问题无关),完整代码如下:

def ding(subject_text):
    dingding_url = "https://oapi.dingtalk.com/robot/send?access_token=feb01e99e53b6f7747455239d0ccc11525e57b8ef8bb1f7b5b16f0e17d25ebb1" 
    headers = {"Content-Type": "application/json; charset=utf-8"}
    post_data = {
        "msgtype": "text", 
        "text": {
            "content": subject_text+'#'
        },
        "at": {
            "atMobiles": ["185xxxx3281"]
        }
    }
    requests.post(dingding_url, headers=headers, data=json.dumps(post_data))


class MySocketss(WebSocketApp):
    def _send_ping(self, interval, event):
        while not event.wait(interval):
            self.sock.last_ping_tm = time.time()
            if self.sock:
                try:
                    self.sock.send('2')
                except Exception as ex:
                    print(ex)
                    break

def on_close(ws): 
  print("Connection closed ……")


def on_message(ws, message): 
    try:
        ding(message)           
    except Exception as e:
        print(e)

def on_error(ws, error): 
  print(error)

def on_open(ws): 
  ws.send('2probe')
  ws.send('5')
  ws.send('2')

if __name__ == "__main__":
     s=requests.session()
     headers = {
        'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/63.0.3239.132 Safari/537.36'}
     x = s.get('https://sscpgoelik.jin10.com:8082/socket.io/?EIO=3&transport=polling&t=Mu6fSmG', headers=headers)
     a = re.compile('sid":"(.*?)"').findall(x.text)[0]
     s.get('https://sscpgoelik.jin10.com:8082/socket.io/?EIO=3&transport=polling&t=Mu6fSp8&sid=%s' % a,
                     headers=headers)
     s.post('https://sscpgoelik.jin10.com:8082/socket.io/?EIO=3&transport=polling&t=Mu6fSpS&sid=%s' % a,
                      headers=headers,data='27:420["switch_channel",-8200]')
     s.get('https://sscpgoelik.jin10.com:8082/socket.io/?EIO=3&transport=polling&t=Mu6fSpU&sid=%s' % a,
                      headers=headers)

     ws = MySocketss('wss://sscpgoelik.jin10.com:8082/socket.io/?EIO=3&transport=websocket&sid=%s' % a,
                     on_message=on_message,
                     on_error=on_error,
                     on_close=on_close)
     ws.on_open = on_open
     ws.run_forever(ping_interval=15)

有人知道它为什么不起作用以及如何修复吗?你知道吗


Tags: textselfcomsendurlmessagedataws