Coverage for kwai/core/json_api.py: 92%

285 statements  

« prev     ^ index     » next       coverage.py v7.3.0, created at 2023-09-05 17:55 +0000

1"""Module that defines some jsonapi related models.""" 

2import dataclasses 

3from types import NoneType 

4from typing import Any, Callable, Optional, Type, Union, get_args, get_origin 

5 

6from fastapi import Query 

7from pydantic import BaseModel, Extra, Field, create_model 

8 

9 

10class Meta(BaseModel): 

11 """Defines the metadata for the document model. 

12 

13 Attributes: 

14 count: The number of actual resources 

15 offset: The offset of the returned resources (pagination) 

16 limit: The maximum number of returned resources (pagination) 

17 

18 A limit of 0, means there was no limit was set. 

19 """ 

20 

21 count: int | None = None 

22 offset: int | None = None 

23 limit: int | None = None 

24 

25 class Config: 

26 """Allow extra attributes on the Meta object.""" 

27 

28 extra = Extra.allow 

29 

30 

31class DocumentBaseModel(BaseModel): 

32 """A basemodel for a document.""" 

33 

34 meta: Meta | None = None 

35 

36 def dict(self, *args, **kwargs) -> dict[str, Any]: 

37 """Overloads dict to remove 'meta' when it's None.""" 

38 if self.meta is None: 

39 exclude = kwargs.get("exclude", None) 

40 if exclude is None: 

41 kwargs["exclude"] = {"meta"} 

42 else: 

43 kwargs["exclude"].add("meta") 

44 return super().dict(*args, **kwargs) 

45 

46 

47@dataclasses.dataclass(frozen=True, kw_only=True, slots=True) 

48class Attribute: 

49 """Dataclass for storing information about attributes.""" 

50 

51 name: str 

52 getter: Callable 

53 type: Type 

54 optional: bool = dataclasses.field(init=False) 

55 

56 def __post_init__(self): 

57 """Set the optional field.""" 

58 all_types = get_args(self.type) 

59 object.__setattr__(self, "optional", NoneType in all_types) 

60 

61 

62@dataclasses.dataclass(frozen=True, kw_only=True, slots=True) 

63class Relationship: 

64 """Dataclass for storing information about relationships.""" 

65 

66 name: str 

67 getter: Callable 

68 type: Type 

69 optional: bool = dataclasses.field(init=False) 

70 iterable: bool = dataclasses.field(init=False) 

71 resource_type: Type = dataclasses.field(init=False) 

72 

73 def __post_init__(self): 

74 """Initialise the properties that depend on the type.""" 

75 all_types = get_args(self.type) 

76 object.__setattr__(self, "optional", NoneType in all_types) 

77 

78 if self.optional: 

79 non_optional_type = all_types[all_types.index(NoneType) ^ 1] 

80 else: 

81 non_optional_type = self.type 

82 

83 origin = get_origin(non_optional_type) 

84 object.__setattr__(self, "iterable", origin is list) 

85 

86 if self.iterable: 

87 object.__setattr__(self, "resource_type", get_args(non_optional_type)[0]) 

88 else: 

89 object.__setattr__(self, "resource_type", non_optional_type) 

90 

91 

92class Resource: 

93 """A class that is responsible for generating all the models needed for JSON:API. 

94 

95 Use the resource decorator to mark a class as a JSON:API resource. The decorator 

96 will attach an instance of this class to the marked class. 

97 """ 

98 

99 def __init__(self, resource): 

100 self._resource = resource 

101 self._type = "" 

102 self._id_getter = None 

103 self._attributes = {} 

104 self._relationships = {} 

105 

106 self._resource_identifier_model = None 

107 self._attributes_model = None 

108 self._relationships_model = None 

109 self._resource_model = None 

110 self._resource_data_model = None 

111 self._resource_data_model_list = None 

112 self._document_model = None 

113 

114 def get_attribute(self, attribute_name: str) -> Attribute | None: 

115 """Return the attribute definition for the given name. 

116 

117 None is returned, when the attribute does not exist. 

118 """ 

119 return self._attributes.get(attribute_name, None) 

120 

121 def get_resource_attribute(self, attribute_name: str, resource_instance): 

122 """Return the value of the attribute from the resource. 

123 

124 The getter created when scanning the class for the attributes will 

125 be used to retrieve the attribute value from the resource. 

126 """ 

127 attr = self.get_attribute(attribute_name) 

128 

129 if attr is None: 

130 return 

131 return attr.getter(resource_instance) 

132 

133 def get_resource_attributes(self, resource_instance) -> dict[str, Any]: 

134 """Get all attribute values from the resource instance.""" 

135 values = {} 

136 for attr in self._attributes.values(): 

137 values[attr.name] = attr.getter(resource_instance) 

138 return values 

139 

140 def get_relationship(self, relationship_name: str) -> Relationship | None: 

141 """Return the relationship definition for the given name. 

142 

143 None is returned, when the relationship does not exist. 

144 """ 

145 return self._relationships.get(relationship_name, None) 

146 

147 def get_resource_relationships(self, resource_instance) -> dict[str, Any]: 

148 """Get all relationship values from the resource instance.""" 

149 values = {} 

150 for rel in self._relationships.values(): 

151 values[rel.name] = rel.getter(resource_instance) 

152 return values 

153 

154 def get_type(self) -> str: 

155 """Get the resource type.""" 

156 return self._type 

157 

158 def get_resource_object(self, resource_instance) -> tuple: 

159 """Get the resource from the resource instance. 

160 

161 It returns a tuple. The first value will contain the resource object, while 

162 the second value will contain a list of the related resource objects. 

163 """ 

164 attributes = self.get_resource_attributes(resource_instance) 

165 related_resource_objects = set() 

166 

167 relationships = {} 

168 for rel in self._relationships.values(): 

169 rel_value = rel.getter(resource_instance) 

170 

171 relationship_model = self._relationships_model.__fields__[rel.name].type_ 

172 

173 if rel_value is None: 

174 relationship_value = None 

175 elif rel.iterable: 

176 relationship_value = [] 

177 for value in rel_value: 

178 relationship_value.append( 

179 value.__json_api_resource__.get_resource_identifier(value) 

180 ) 

181 # Get all related resources. 

182 related_resource = value.__json_api_resource__.get_resource_object( 

183 value 

184 ) 

185 related_resource_objects.add(related_resource[0]) 

186 for r in related_resource[1]: 

187 related_resource_objects.add(r) 

188 else: 

189 relationship_value = ( 

190 rel_value.__json_api_resource__.get_resource_identifier(rel_value) 

191 ) 

192 # Get all related resources. 

193 related_resource = rel_value.__json_api_resource__.get_resource_object( 

194 rel_value 

195 ) 

196 related_resource_objects.add(related_resource[0]) 

197 for r in related_resource[1]: 

198 related_resource_objects.add(r) 

199 

200 relationships[rel.name] = relationship_model(data=relationship_value) 

201 

202 resource_model = self.get_resource_model() 

203 return ( 

204 resource_model( 

205 id=self.get_resource_id(resource_instance), 

206 attributes=attributes, 

207 relationships=relationships, 

208 ), 

209 related_resource_objects, 

210 ) 

211 

212 def has_id(self) -> bool: 

213 """Check if there is an id available in the resource.""" 

214 return self._id_getter is not None 

215 

216 def get_resource_id(self, resource_instance): 

217 """Return the id of the resource. 

218 

219 The getter created when scanning the class for the id property/method will 

220 be used to retrieve the id from the resource. 

221 """ 

222 return self._id_getter(resource_instance) 

223 

224 def build(self, auto: bool = True) -> "Resource": 

225 """Build the JSONAPI resource models.""" 

226 self._type = getattr(self._resource, "__json_api_resource_type__", "") 

227 

228 assert len(self._type) > 0, ( 

229 "Is this a JSON_API resource? Did you forget the " 

230 "json_api.resource decorator?" 

231 ) 

232 

233 self._scan_class_attributes() 

234 if auto: 

235 if dataclasses.is_dataclass(self._resource): 

236 self._scan_dataclass() 

237 elif issubclass(self._resource, BaseModel): 

238 self._scan_base_model() 

239 

240 self._create_resource_identifier_model() 

241 self._create_resource_model() 

242 self._create_document_model() 

243 

244 assert self._id_getter is not None, ( 

245 "Can't determine the id of the resource. " 

246 "Use the id decorator or define an 'id' field." 

247 ) 

248 

249 return self 

250 

251 def _scan_class_attributes(self): 

252 """Search for attributes, id and relationships in the class. 

253 

254 This method will check the attributes of a class if they are decorated with 

255 attribute, id or relationship decorators. 

256 """ 

257 for attribute_name in dir(self._resource): 

258 class_attribute = getattr(self._resource, attribute_name) 

259 if not callable(class_attribute): 

260 continue 

261 

262 # First check if the jsonapi.attribute decorator was used to define an 

263 # attribute. 

264 json_api_attribute_name = getattr( 

265 class_attribute, "__json_api_attribute__", None 

266 ) 

267 if json_api_attribute_name is not None: 

268 assert ( 

269 "return" in class_attribute.__annotations__ 

270 ), f"attribute {json_api_attribute_name} of resource {self._type} misses a return type" 

271 self._attributes[json_api_attribute_name] = Attribute( 

272 name=json_api_attribute_name, 

273 getter=class_attribute, 

274 type=class_attribute.__annotations__["return"], 

275 ) 

276 continue 

277 

278 # Check if json_api.id decorator is used to define a method to get the id 

279 # of a resource. 

280 if hasattr(class_attribute, "__json_api_id__"): 

281 self._id_getter = class_attribute 

282 continue 

283 

284 # Check if jsonapi.relationship is used to define relationships 

285 json_api_relationship_name = getattr( 

286 class_attribute, "__json_api_relationship__", None 

287 ) 

288 if json_api_relationship_name is not None: 

289 assert ( 

290 "return" in class_attribute.__annotations__ 

291 ), f"relationship {json_api_relationship_name} of resource {self._type} misses a return type" 

292 self._relationships[json_api_relationship_name] = Relationship( 

293 name=json_api_relationship_name, 

294 getter=class_attribute, 

295 type=class_attribute.__annotations__["return"], 

296 ) 

297 

298 @classmethod 

299 def _create_getter(cls, attribute_name: str): 

300 """Create a method for getting the value of an attribute.""" 

301 

302 def get(self): 

303 return getattr(self, attribute_name) 

304 

305 return get 

306 

307 def _scan_dataclass(self): 

308 """Search for attributes, id and relationships in a dataclass. 

309 

310 A field with a __json_api_resource__ attribute is a relationship. A field with 

311 the name "id" will be used as id of the resource. All other fields are 

312 attributes. When a field has a name that is already used with an 

313 attribute, id or relationship decorator, it will be skipped. 

314 """ 

315 for field_ in dataclasses.fields(self._resource): 

316 # When the field name is 'id' and there is not a method decorated with 

317 # jsonapi.id, then this will be the id of the resource. 

318 if field_.name == "id": 

319 if self._id_getter is None: 

320 self._id_getter = self._create_getter("id") 

321 continue 

322 

323 # Skip the field when it was already registered with a decorator. 

324 if field_.name in self._attributes or field_.name in self._relationships: 

325 continue 

326 

327 # When the field has a type that was decorated with json_api.resource, 

328 # then the field is a relationship. 

329 if hasattr(field_.type, "__json_api_resource__"): 

330 self._relationships[field_.name] = Relationship( 

331 name=field_.name, 

332 getter=self._create_getter(field_.name), 

333 type=field_.type, 

334 ) 

335 continue 

336 

337 # This is an attribute. 

338 self._attributes[field_.name] = Attribute( 

339 name=field_.name, 

340 getter=self._create_getter(field_.name), 

341 type=field_.type, 

342 ) 

343 

344 def _scan_base_model(self): 

345 """Search for attributes and relationships on a BaseModel class. 

346 

347 A field with a __json_api_resource__ attribute is a relationship. A field with 

348 the name "id" will be used as id of the resource. All other fields are 

349 attributes. When a field has a name that is already used with an 

350 attribute, id or relationship decorator, it will be skipped. 

351 """ 

352 for field_ in self._resource.__fields__.values(): 

353 # When the field name is 'id' and there is not a method decorated with 

354 # jsonapi.id, then this will be the id of the resource. 

355 if field_.name == "id": 

356 if self._id_getter is None: 

357 self._id_getter = self._create_getter("id") 

358 continue 

359 

360 # Skip the field when it was already registered with a decorator. 

361 if field_.name in self._attributes or field_.name in self._relationships: 

362 continue 

363 

364 # When the field has a type that was decorated with json_api.resource, 

365 # then the field is a relationship. 

366 if hasattr(field_.type_, "__json_api_resource__"): 

367 self._relationships[field_.name] = Relationship( 

368 name=field_.name, 

369 getter=self._create_getter(field_.name), 

370 type=field_.annotation, 

371 ) 

372 continue 

373 

374 # This is an attribute. 

375 self._attributes[field_.name] = Attribute( 

376 name=field_.name, 

377 getter=self._create_getter(field_.name), 

378 type=field_.type_, 

379 ) 

380 

381 def __str__(self) -> str: 

382 """Use type as string representation.""" 

383 return f"type: {self._type}" 

384 

385 def _create_resource_identifier_model(self) -> None: 

386 """Create a resource identifier model, if it wasn't created yet.""" 

387 if self._resource_identifier_model is not None: 

388 return 

389 

390 def hash_resource(resource_instance): 

391 """Create a hash for a resource.""" 

392 return hash(resource_instance.type + "." + resource_instance.id) 

393 

394 self._resource_identifier_model = create_model( 

395 self.get_model_class_prefix() + "ResourceIdentifier", 

396 **{ 

397 "id": (str | None, Field(default=None)), 

398 "type": (str, Field(const=True, default=self._type)), 

399 }, 

400 ) 

401 self._resource_identifier_model.__hash__ = hash_resource 

402 

403 def get_resource_identifier_model(self): 

404 """Return the resource identifier model. 

405 

406 The resource identifier model contains the id and type of the resource. 

407 """ 

408 return self._resource_identifier_model 

409 

410 def get_resource_identifier(self, resource_instance): 

411 """Get an instance of the resource identifier. 

412 

413 A resource identifier contains the id and the type of the resource. 

414 """ 

415 return self._resource_identifier_model( 

416 id=self.get_resource_id(resource_instance) 

417 ) 

418 

419 def get_model_class_prefix(self) -> str: 

420 """Return the prefix used for creating the model classes. 

421 

422 The prefix is the name of the class without the "Resource" suffix. 

423 """ 

424 return self._resource.__name__.removesuffix("Resource") + "_" 

425 

426 def _create_resource_model(self): 

427 """Create the resource model.""" 

428 if self._resource_model is not None: 

429 return 

430 

431 # Create a model for the attributes of the resource. 

432 attributes = {} 

433 for attr in self._attributes.values(): 

434 if attr.optional: 

435 attributes[attr.name] = (attr.type, Field(default=None)) 

436 else: 

437 attributes[attr.name] = (attr.type, ...) 

438 

439 self._attributes_model = create_model( 

440 self.get_model_class_prefix() + "Attributes", **attributes 

441 ) 

442 

443 resource_model_fields = {"attributes": (self._attributes_model, ...)} 

444 

445 if len(self._relationships) > 0: 

446 # Create a model for the relationships of the resource. 

447 relationships = {} 

448 for rel in self._relationships.values(): 

449 if hasattr(rel.resource_type, "__json_api_resource__"): 

450 rel_type = ( 

451 rel.resource_type.__json_api_resource__.get_resource_identifier_model() 

452 ) 

453 

454 if rel.iterable: 

455 rel_type = list[rel_type] 

456 if rel.optional: 

457 rel_type |= None 

458 

459 relationship_object_model = create_model( 

460 self.get_model_class_prefix() 

461 + rel.name.capitalize() 

462 + "Relationship", 

463 data=(rel_type, ...), 

464 ) 

465 

466 relationships[rel.name] = ( 

467 Optional[relationship_object_model], 

468 None if rel.optional else ..., 

469 ) 

470 

471 self._relationships_model = create_model( 

472 self.get_model_class_prefix() + "Relationships", **relationships 

473 ) 

474 resource_model_fields["relationships"] = ( 

475 self._relationships_model, 

476 ..., 

477 ) 

478 

479 self._resource_model = create_model( 

480 self.get_model_class_prefix() + "Resource", 

481 __base__=self.get_resource_identifier_model(), 

482 **resource_model_fields, 

483 ) 

484 

485 def get_resource_model(self): 

486 """Get the resource model. 

487 

488 The resource model contains the id, type, attributes and relationships of the 

489 resource. 

490 """ 

491 return self._resource_model 

492 

493 def get_resource_data_model(self): 

494 """Get the resource model with data as object.""" 

495 if self._resource_data_model is None: 

496 self._resource_data_model = create_model( 

497 self.get_model_class_prefix() + "DataResourceModel", 

498 data=(self.get_resource_model(), ...), 

499 ) 

500 return self._resource_data_model 

501 

502 def get_resource_data_model_list(self): 

503 """Get resource model with data as list.""" 

504 if self._resource_data_model_list is None: 

505 self._resource_data_model_list = create_model( 

506 self.get_model_class_prefix() + "DataResourceModelList", 

507 data=(list[self.get_resource_model()], ...), 

508 ) 

509 return self._resource_data_model_list 

510 

511 def _create_document_model(self): 

512 """Create the document model. 

513 

514 The document model is the main model of the resource. 

515 """ 

516 if self._document_model is not None: 

517 return 

518 

519 resource_model = self.get_resource_model() 

520 

521 document_fields = { 

522 "data": ( 

523 resource_model | list[resource_model], 

524 Field(default_factory=list), 

525 ), 

526 } 

527 if len(self._relationships) > 0: 

528 # included is a list with all related resource types. 

529 relation_types = () 

530 for rel in self._relationships.values(): 

531 relation_types = relation_types + ( 

532 rel.resource_type.__json_api_resource__.get_resource_model(), 

533 ) 

534 document_fields["included"] = ( 

535 list[Union[relation_types]], # type: ignore 

536 Field(default_factory=list), 

537 ) 

538 

539 self._document_model = create_model( 

540 self.get_model_class_prefix() + "Document", 

541 __base__=DocumentBaseModel, 

542 **document_fields, 

543 ) 

544 

545 def get_document_model(self): 

546 """Get the document model.""" 

547 return self._document_model 

548 

549 def serialize(self, resource_instance): 

550 """Serialize the resource instance into a document model.""" 

551 resource_object, related_objects = self.get_resource_object(resource_instance) 

552 included = set(related_objects) 

553 document_model = self.get_document_model() 

554 

555 return document_model(data=resource_object, included=list(included)) 

556 

557 def serialize_list(self, resource_instances: list[Any]): 

558 """Serialize a list of resources into a document model.""" 

559 included = set() 

560 resources = [] 

561 for resource_instance in resource_instances: 

562 resource_object, related_objects = self.get_resource_object( 

563 resource_instance 

564 ) 

565 resources.append(resource_object) 

566 included = included | related_objects 

567 

568 document_model = self.get_document_model() 

569 return document_model(data=resources, included=list(included)) 

570 

571 

572def resource(type_: str, auto: bool = True): 

573 """Turn a class into a JSONAPI resource with this decorator.""" 

574 

575 def decorator(cls): 

576 cls.__json_api_resource_type__ = type_ 

577 

578 json_api_resource = Resource(cls).build(auto) 

579 cls.__json_api_resource__ = json_api_resource 

580 cls.serialize = lambda resource_instance: json_api_resource.serialize( 

581 resource_instance 

582 ) 

583 cls.serialize_list = lambda resource_list: json_api_resource.serialize_list( 

584 resource_list 

585 ) 

586 cls.get_document_model = lambda: json_api_resource.get_document_model() 

587 cls.get_resource_model = lambda: json_api_resource.get_resource_model() 

588 cls.get_resource_data_model = ( 

589 lambda: json_api_resource.get_resource_data_model() 

590 ) 

591 cls.get_resource_data_model_list = ( 

592 lambda: json_api_resource.get_resource_data_model_list() 

593 ) 

594 

595 return cls 

596 

597 return decorator 

598 

599 

600def attribute(_func=None, *, name: str | None = None): 

601 """Turn a method into an attribute of a resource with this decorator. 

602 

603 When name is omitted, the name of the method is used. Brackets can be omitted when 

604 no name is passed. 

605 """ 

606 

607 def inner_function(fn): 

608 fn.__json_api_attribute__ = name or fn.__name__ 

609 return fn 

610 

611 if _func is None: 

612 return inner_function 

613 

614 return inner_function(_func) 

615 

616 

617def relationship(_func=None, *, name: str | None = None): 

618 """Turn a method into a relationship of a resource with this decorator. 

619 

620 When name is omitted, the name of the method is used. Brackets can be omitted when 

621 no name is passed. 

622 """ 

623 

624 def inner_function(fn): 

625 fn.__json_api_relationship__ = name or fn.__name__ 

626 return fn 

627 

628 if _func is None: 

629 return inner_function 

630 

631 return inner_function(_func) 

632 

633 

634def id(_func=None): 

635 """Mark this method as the way to get the id of the resource with this decorator. 

636 

637 Brackets can be omitted. 

638 """ 

639 

640 def inner_function(fn): 

641 fn.__json_api_id__ = True 

642 return fn 

643 

644 if _func is None: 

645 return inner_function 

646 

647 return inner_function(_func) 

648 

649 

650class PaginationModel(BaseModel): 

651 """A model for pagination query parameters. 

652 

653 Use this as a dependency on a route. This will handle the page[offset] 

654 and page[limit] query parameters. 

655 

656 Attributes: 

657 offset: The value of the page[offset] query parameter. Default is None. 

658 limit: The value of the page[limit] query parameter. Default is None. 

659 """ 

660 

661 offset: int | None = Field(Query(default=None, alias="page[offset]")) 

662 limit: int | None = Field(Query(default=None, alias="page[limit]"))