Coverage for src/kwai/core/domain/repository/query.py: 100%

3 statements  

« prev     ^ index     » next       coverage.py v7.6.10, created at 2024-01-01 00:00 +0000

1"""Module that defines an interface for a query.""" 

2 

3from abc import abstractmethod 

4from typing import Any, AsyncIterator 

5 

6 

7class Query: 

8 """Interface for a query.""" 

9 

10 @abstractmethod 

11 async def count(self) -> int: 

12 """Get the numbers of rows to be returned.""" 

13 raise NotImplementedError 

14 

15 @abstractmethod 

16 async def fetch_one(self) -> dict[str, Any] | None: 

17 """Fetch just one row.""" 

18 raise NotImplementedError 

19 

20 @abstractmethod 

21 def fetch( 

22 self, limit: int | None = None, offset: int | None = None 

23 ) -> AsyncIterator[dict[str, Any]]: 

24 """Fetch all rows. 

25 

26 A generator should be returned to avoid reading all rows at once. 

27 """ 

28 raise NotImplementedError