Coverage for src/kwai/core/functions.py: 100%

21 statements  

« prev     ^ index     » next       coverage.py v7.6.10, created at 2024-01-01 00:00 +0000

1"""Module that defines some common functions.""" 

2 

3from itertools import count 

4from typing import Any, AsyncIterator, Callable, Generator 

5 

6 

7async def async_groupby[T, R]( 

8 it: AsyncIterator[T], key: Callable[[T], R] 

9) -> AsyncIterator[tuple[Any, list[T]]]: 

10 """An async groupby.""" 

11 try: 

12 row = await anext(it) 

13 except StopAsyncIteration: 

14 return 

15 

16 group: list[T] = [row] 

17 current_key = key(row) 

18 

19 # Process all other rows 

20 async for row in it: 

21 new_key = key(row) 

22 if new_key != current_key: 

23 yield current_key, group 

24 group = [row] 

25 current_key = new_key 

26 else: 

27 group.append(row) 

28 

29 yield current_key, group 

30 

31 

32def generate_filenames(prefix, suffix, places=3) -> Generator[str, None, None]: 

33 """Generate sequential filenames with the format <prefix><index><suffix>. 

34 

35 The index field is padded with leading zeroes to the specified number of places 

36 """ 

37 pattern = "{}{{:0{}d}}{}".format(prefix, places, suffix) 

38 for i in count(1): 

39 yield pattern.format(i)