Очередь в асинхронном коде. Python
Этот код ошибок не выдает, но проблема в том, что он и не печатает ничего.
import okx.Trade as Trade
import okx.MarketData as MarketData
import okx.Account as Account
import time
import asyncio
import websockets
import datetime
import json
import threading
apikey = ""
secretkey = ""
passphrase = ""
tradeAPI = Trade.TradeAPI(apikey, secretkey, passphrase, False, flag='0', debug=False)
marketDataAPI = MarketData.MarketAPI(flag='0', debug=False)
accountAPI = Account.AccountAPI(apikey, secretkey, passphrase, False, flag='0', debug=False)
async def TickersChannel_ws():
""" Websoket подписка на цены"""
url = "wss://ws.okx.com:8443/ws/v5/business"
async with websockets.connect(url) as ws:
subs = {
"op": "subscribe",
"args": [
dict(channel="mark-price-candle1m", instId="SOL-USDT-SWAP")
]
}
await ws.send(json.dumps(subs))
async for msg in ws:
msg = json.loads(msg)
if "event" not in msg:
yield msg.get("data")
async def producer(que_):
async for ticker in TickersChannel_ws():
await que_.put(ticker)
print(f"Добавили {ticker}")
await asyncio.sleep(0.1)
async def consumer(que_):
while True:
if not que_.empty():
x = await que_.get()
await asyncio.sleep(0.1)
print(f"ПОЛУЧИЛИ {x}, размер {que_.qsize()}")
if que_.empty():
print("pusto")
async def main1():
queue = asyncio.Queue()
task1 = asyncio.create_task(producer(queue))
task2 = asyncio.create_task(consumer(queue))
await task1
await task2
asyncio.run(main1())
по отдельности функция producer работает а вместе с consumer ничего не печатает по итогу. Помогите плз
Ответы (2 шт):
Автор решения: Evgenii Evstafev
→ Ссылка
Проблема в том, что await task1 блокирует main1 из-за
бесконечного producer (лучше использовать await que_.put(ticker)), а consumer должен использовать await queue.get() вместо цикла с get_nowait.