import asyncio
from threading import Thread
from datetime import datetime
from aiogram import Bot, Dispatcher, executor, types
API_TOKEN = ''
bot = Bot(token=API_TOKEN)
dp = Dispatcher(bot)
chat_ids = {}
@dp.message_handler()
async def echo(message: types.Message):
# old style:
# await bot.send_message(message.chat.id, message.text)
chat_ids[message.message_id] = message.from_user
text = f'{message.message_id} {message.from_user} {message.text}'
await message.reply(text, reply=False)
async def periodic(sleep_for, queue):
while True:
await asyncio.sleep(sleep_for)
now = datetime.utcnow()
print(f"{now}")
for id in chat_ids:
queue.put_nowait((id, f"{now}"))
# await bot.send_message(id, f"{now}", disable_notification=True)
def run_tick(queue):
newloop = asyncio.new_event_loop()
asyncio.set_event_loop(newloop)
asyncio.run(periodic(3, queue))
if __name__ == '__main__':
queue = asyncio.Queue()
Thread(target=run_tick, args=(queue,), daemon=True).start()
executor.start_polling(dp, skip_updates=True)
イベントが発生したが今のところ失敗した場合、bot.send_messageで登録ユーザーにメッセージを送信したい。これが私が試したものです。
これを行う簡単な方法はありますか?
編集:2020-1-3
@ user4815162342による作業例を次に示します。
import asyncio
from datetime import datetime
from aiogram import Bot, Dispatcher, executor, types
API_TOKEN = ''
bot = Bot(token=API_TOKEN)
dp = Dispatcher(bot)
chat_ids = {}
@dp.message_handler()
async def echo(message: types.Message):
chat_ids[message.from_user.id] = message.from_user
text = f'{message.message_id} {message.from_user} {message.text}'
await message.reply(text, reply=False)
async def periodic(sleep_for):
while True:
await asyncio.sleep(sleep_for)
now = datetime.utcnow()
print(f"{now}")
for id in chat_ids:
await bot.send_message(id, f"{now}", disable_notification=True)
if __name__ == '__main__':
dp.loop.create_task(periodic(10))
executor.start_polling(dp)
最初の問題は、別のスレッドからasyncioコードを呼び出そうとしたことでした。結果として生じるエラーを修正するために、追加のスレッドを保持しながら新しいイベントループを作成しました。ことわざにあるように、今あなたは2つの問題を抱えています。
キューから読み取るコードがないため、キューのアイデアは未完成に見えます。また、非同期キューはイベントループ間またはスレッド間で共有されるように設計されていないため、存在したとしても機能しません。混乱を解消するには、イベントループ内から定期的な更新を実行する方法を見つける必要があります。つまり、この仮定を再検討します。
しかし、自分のタスクをエグゼキュータに追加する方法はありません。
のソースを見ると、Executor
ディスパッチャからイベントループを取得しているように見えloop
ます。ディスパッチャは、パブリックにアクセス可能な属性にイベントループを保持しています。つまりcreate_task
、そのループでメソッドを呼び出すだけでタスクを作成できます。例えば:
if __name__ == '__main__':
dp.loop.create_task(periodic())
executor.start_polling(dp, skip_updates=True)
これperiodic
で、最初の試みと同じように定式化できます。
async def periodic(sleep_for, queue):
while True:
await asyncio.sleep(sleep_for)
now = datetime.utcnow()
for id in chat_ids:
await bot.send_message(id, f"{now}",
disable_notification=True)
を使用していないため、これをテストしていないことに注意してくださいaiogram
。対処する必要があるかもしれない潜在的な問題は、chat_ids
辞書がmessage.message_id
キーとして含まれているように見えるのに対し、をbot.send_message
受け入れることmessage.chat.id
です。
この記事はインターネットから収集されたものであり、転載の際にはソースを示してください。
侵害の場合は、連絡してください[email protected]
コメントを追加