Назад к вопросам
Middle
5
```python import asyncio from typing import List, Optional class Chunk: def __init__(self, items: List[dict]): self.items = items @property def size(self) -> int: return len(self.items) class Producer: async def next(self) -> Optional[Chunk]: # возвращает пачку данных или None ... async def commit(self, chunks: List[Chunk]) -> None: # фиксируем успешную обработку chunks ... class Consumer: max_batch_size = 100 async def consume(self, chunks: List[Chunk]) -> None: total_size = sum(chunk.size for chunk in chunks) if total_size > self.max_batch_size: raise ValueError("batch is too large") # обработка данных ... producer = Producer() consumer = Consumer() async def process(): while True: chunk = await producer.next() if chunk is None: break try: await consumer.consume([chunk]) await producer.commit([chunk]) except Exception as e: print("error but continue:", e) continue if __name__ == "__main__": asyncio.run(process()) ``` This code defines asynchronous producer-consumer classes with batch processing and error handling. The task is to implement or complete the logic for producing, consuming, and committing chunks of data, ensuring batch size limits are respected and errors are handled gracefully.
Компании, где спрашивали
Navio
Ответ от нейросети
sobes.tech AI
В представленном коде нужно реализовать методы Producer.next(), Producer.commit() и логику обработки в Consumer.consume().
Producer.next()должен асинхронно возвращать следующий объектChunkс данными илиNone, если данных больше нет.Consumer.consume()проверяет, что суммарный размер пачки не превышаетmax_batch_size, и обрабатывает данные (например, сохраняет или анализирует).Producer.commit()фиксирует успешную обработку пачки, например, обновляя статус в базе.
Пример реализации с имитацией данных:
import asyncio
from typing import List, Optional
class Chunk:
def __init__(self, items: List[dict]):
self.items = items
@property
def size(self) -> int:
return len(self.items)
class Producer:
def __init__(self):
self.data = [
[{'id': i} for i in range(30)],
[{'id': i} for i in range(50)],
[{'id': i} for i in range(120)], # слишком большой чанк
[{'id': i} for i in range(20)],
]
self.index = 0
async def next(self) -> Optional[Chunk]:
await asyncio.sleep(0.1) # имитация задержки
if self.index >= len(self.data):
return None
chunk = Chunk(self.data[self.index])
self.index += 1
return chunk
async def commit(self, chunks: List[Chunk]) -> None:
await asyncio.sleep(0.05) # имитация фиксации
print(f"Committed {sum(chunk.size for chunk in chunks)} items")
class Consumer:
max_batch_size = 100
async def consume(self, chunks: List[Chunk]) -> None:
total_size = sum(chunk.size for chunk in chunks)
if total_size > self.max_batch_size:
raise ValueError(f"batch is too large: {total_size} > {self.max_batch_size}")
await asyncio.sleep(0.1) # имитация обработки
print(f"Consumed {total_size} items")
producer = Producer()
consumer = Consumer()
async def process():
while True:
chunk = await producer.next()
if chunk is None:
break
try:
await consumer.consume([chunk])
await producer.commit([chunk])
except Exception as e:
print("error but continue:", e)
continue
if __name__ == "__main__":
asyncio.run(process())
В этом примере третий чанк превышает лимит, вызывая исключение, которое обрабатывается и позволяет продолжить работу с последующими чанками.