Sobes.tech
Back to questions
Middle
4

```python import asyncio from typing import List, Optional class Chunk: def __init__(self, items: List[dict]): self.items = items @property def size(self) -> int: return len(self.items) class Producer: async def next(self) -> Optional[Chunk]: # возвращает пачку данных или None ... async def commit(self, chunks: List[Chunk]) -> None: # фиксируем успешную обработку chunks ... class Consumer: max_batch_size = 100 async def consume(self, chunks: List[Chunk]) -> None: total_size = sum(chunk.size for chunk in chunks) if total_size > self.max_batch_size: raise ValueError("batch is too large") # обработка данных ... producer = Producer() consumer = Consumer() async def process(): while True: chunk = await producer.next() if chunk is None: break try: await consumer.consume([chunk]) await producer.commit([chunk]) except Exception as e: print("error but continue:", e) continue if __name__ == "__main__": asyncio.run(process()) ``` This code defines asynchronous producer-consumer classes with batch processing and error handling. The task is to implement or complete the logic for producing, consuming, and committing chunks of data, ensuring batch size limits are respected and errors are handled gracefully.

Companies where asked
Navio

Answer from AI

sobes.tech AI

В представленном коде нужно реализовать методы Producer.next(), Producer.commit() и логику обработки в Consumer.consume().

  • Producer.next() должен асинхронно возвращать следующий объект Chunk с данными или None, если данных больше нет.
  • Consumer.consume() проверяет, что суммарный размер пачки не превышает max_batch_size, и обрабатывает данные (например, сохраняет или анализирует).
  • Producer.commit() фиксирует успешную обработку пачки, например, обновляя статус в базе.

Пример реализации с имитацией данных:

import asyncio
from typing import List, Optional

class Chunk:
    def __init__(self, items: List[dict]):
        self.items = items

    @property
    def size(self) -> int:
        return len(self.items)

class Producer:
    def __init__(self):
        self.data = [
            [{'id': i} for i in range(30)],
            [{'id': i} for i in range(50)],
            [{'id': i} for i in range(120)],  # слишком большой чанк
            [{'id': i} for i in range(20)],
        ]
        self.index = 0

    async def next(self) -> Optional[Chunk]:
        await asyncio.sleep(0.1)  # имитация задержки
        if self.index >= len(self.data):
            return None
        chunk = Chunk(self.data[self.index])
        self.index += 1
        return chunk

    async def commit(self, chunks: List[Chunk]) -> None:
        await asyncio.sleep(0.05)  # имитация фиксации
        print(f"Committed {sum(chunk.size for chunk in chunks)} items")

class Consumer:
    max_batch_size = 100

    async def consume(self, chunks: List[Chunk]) -> None:
        total_size = sum(chunk.size for chunk in chunks)
        if total_size > self.max_batch_size:
            raise ValueError(f"batch is too large: {total_size} > {self.max_batch_size}")
        await asyncio.sleep(0.1)  # имитация обработки
        print(f"Consumed {total_size} items")

producer = Producer()
consumer = Consumer()

async def process():
    while True:
        chunk = await producer.next()
        if chunk is None:
            break
        try:
            await consumer.consume([chunk])
            await producer.commit([chunk])
        except Exception as e:
            print("error but continue:", e)
            continue

if __name__ == "__main__":
    asyncio.run(process())

В этом примере третий чанк превышает лимит, вызывая исключение, которое обрабатывается и позволяет продолжить работу с последующими чанками.