from rest_framework.response import Response from rest_framework.exceptions import ParseError from collections.abc import Sequence import math class Cursor: def __init__(self, value, offset=0, is_prev=False, has_results=None): self.value = value self.offset = int(offset) self.is_prev = bool(is_prev) self.has_results = has_results def __str__(self): return f"{self.value}:{self.offset}:{int(self.is_prev)}" def __eq__(self, other): return all( getattr(self, attr) == getattr(other, attr) for attr in ("value", "offset", "is_prev", "has_results") ) def __repr__(self): return "<{}: value={} offset={} is_prev={}>".format( type(self).__name__, self.value, self.offset, int(self.is_prev), ) def __bool__(self): return bool(self.has_results) @classmethod def from_string(cls, value): bits = value.split(":") if len(bits) != 3: raise ValueError try: value = float(bits[0]) if "." in bits[0] else int(bits[0]) bits = value, int(bits[1]), int(bits[2]) except (TypeError, ValueError): raise ValueError return cls(*bits) class CursorResult(Sequence): def __init__(self, results, next, prev, hits=None, max_hits=None): self.results = results self.next = next self.prev = prev self.hits = hits self.max_hits = max_hits def __len__(self): return len(self.results) def __iter__(self): return iter(self.results) def __getitem__(self, key): return self.results[key] def __repr__(self): return f"<{type(self).__name__}: results={len(self.results)}>" MAX_LIMIT = 100 class BadPaginationError(Exception): pass class OffsetPaginator: """ The Offset paginator using the offset and limit with cursor controls http://example.com/api/users/?cursor=10.0.0&per_page=10 cursor=limit,offset=page, """ def __init__( self, queryset, order_by=None, max_limit=MAX_LIMIT, max_offset=None, on_results=None, ): self.key = ( order_by if order_by is None or isinstance(order_by, (list, tuple, set)) else (order_by,) ) self.queryset = queryset self.max_limit = max_limit self.max_offset = max_offset self.on_results = on_results def get_result(self, limit=100, cursor=None): # offset is page # # value is page limit if cursor is None: cursor = Cursor(0, 0, 0) limit = min(limit, self.max_limit) queryset = self.queryset if self.key: queryset = queryset.order_by(*self.key) page = cursor.offset offset = cursor.offset * cursor.value stop = offset + (cursor.value or limit) + 1 if self.max_offset is not None and offset >= self.max_offset: raise BadPaginationError("Pagination offset too large") if offset < 0: raise BadPaginationError("Pagination offset cannot be negative") results = list(queryset[offset:stop]) if cursor.value != limit: results = results[-(limit + 1) :] next_cursor = Cursor(limit, page + 1, False, len(results) > limit) prev_cursor = Cursor(limit, page - 1, True, page > 0) results = list(results[:limit]) if self.on_results: results = self.on_results(results) max_hits = math.ceil(queryset.count() / limit) return CursorResult( results=results, next=next_cursor, prev=prev_cursor, hits=None, max_hits=max_hits, ) class BasePaginator: """BasePaginator class can be inherited by any View to return a paginated view""" # cursor query parameter name cursor_name = "cursor" # get the per page parameter from request def get_per_page(self, request, default_per_page=100, max_per_page=100): try: per_page = int(request.GET.get("per_page", default_per_page)) except ValueError: raise ParseError(detail="Invalid per_page parameter.") max_per_page = max(max_per_page, default_per_page) if per_page > max_per_page: raise ParseError( detail=f"Invalid per_page value. Cannot exceed {max_per_page}." ) return per_page def paginate( self, request, on_results=None, paginator=None, paginator_cls=OffsetPaginator, default_per_page=100, max_per_page=100, cursor_cls=Cursor, extra_stats=None, controller=None, **paginator_kwargs, ): """Paginate the request""" assert (paginator and not paginator_kwargs) or ( paginator_cls and paginator_kwargs ) per_page = self.get_per_page(request, default_per_page, max_per_page) # Convert the cursor value to integer and float from string input_cursor = None if request.GET.get(self.cursor_name): try: input_cursor = cursor_cls.from_string(request.GET.get(self.cursor_name)) except ValueError: raise ParseError(detail="Invalid cursor parameter.") if not paginator: paginator = paginator_cls(**paginator_kwargs) try: cursor_result = paginator.get_result(limit=per_page, cursor=input_cursor) except BadPaginationError as e: raise ParseError(detail=str(e)) # Serialize result according to the on_result function if on_results: results = on_results(cursor_result.results) else: results = cursor_result.results # Add Manipulation functions to the response if controller is not None: results = controller(results) else: results = results # Return the response response = Response( { "next_cursor": str(cursor_result.next), "prev_cursor": str(cursor_result.prev), "next_page_results": cursor_result.next.has_results, "prev_page_results": cursor_result.prev.has_results, "count": cursor_result.__len__(), "total_pages": cursor_result.max_hits, "extra_stats": extra_stats, "results": results, } ) return response