Skip to content

Commit

Permalink
Merge pull request #64 from ParclLabs/limits
Browse files Browse the repository at this point in the history
Limits
  • Loading branch information
zhibindai26 authored Sep 5, 2024
2 parents c057f27 + 082d3fe commit 099d5c9
Show file tree
Hide file tree
Showing 18 changed files with 495 additions and 437 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/build_and_test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ jobs:
- name: Run tests 🧪
env:
PARCL_LABS_API_KEY: ${{ secrets.PARCL_LABS_API_KEY }}
PARCL_LABS_API_KEY: ${{ secrets.PARCL_LABS_API_KEY }}
run: |
set -e
pipenv run make test
Expand Down
7 changes: 6 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,9 @@
### v1.6.0
### v1.6.2
- Fix bug where `limit` parameter was not being applied when `turbo_mode` was enabled.
- Update handingling of `limit` parameter. If the `limit` parameter is greater than maximum allowed limit on the particular endpoint, the `limit` will automatically default to the maximum allowed value instead of throwing an error.
- Refactor `ParclLabsClient` so that `limit` is not set during client instantiation. Limit should be set when calling specific `retrieve` methods.

### v1.6.1
- Bug fix for handling of 422 validation errors.

### v1.6.0
Expand Down
634 changes: 316 additions & 318 deletions Pipfile.lock

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion parcllabs/__version__.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
VERSION = "1.6.1"
VERSION = "1.6.2"
2 changes: 2 additions & 0 deletions parcllabs/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@

DEFAULT_LIMIT = 12
MAX_POST_LIMIT = 1000
DEFAULT_LIMIT_SMALL = 1000
DEFAULT_LIMIT_LARGE = 10000

VALID_LOCATION_TYPES = [
"COUNTY",
Expand Down
11 changes: 4 additions & 7 deletions parcllabs/parcllabs_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,8 @@


class ServiceGroup:
def __init__(self, client, limit):
def __init__(self, client):
self._client = client
self._limit = limit
self._services = {}

def add_service(
Expand All @@ -23,7 +22,7 @@ def add_service(
alias: Optional[str] = None,
):
service = service_class(
url=url, post_url=post_url, client=self._client, limit=self._limit
url=url, post_url=post_url, client=self._client
)
setattr(self, name, service)
self._services[name] = service
Expand All @@ -40,8 +39,8 @@ class ParclLabsClient:
def __init__(
self,
api_key: str,
limit: int = 12,
api_url: str = api_base,
limit: Optional[int] = None,
turbo_mode: bool = False,
num_workers: Optional[int] = None,
):
Expand All @@ -52,7 +51,6 @@ def __init__(

self.api_key = api_key
self.api_url = api_url
self.limit = limit
self.estimated_session_credit_usage = 0
self.num_workers = num_workers
self.turbo_mode = turbo_mode
Expand All @@ -71,7 +69,7 @@ def _initialize_services(self):
self.property = self._create_property_services()

def _create_service_group(self):
return ServiceGroup(self, self.limit)
return ServiceGroup(self)

def _add_services_to_group(
self, group: ServiceGroup, services: Dict[str, Dict[str, Any]]
Expand Down Expand Up @@ -258,7 +256,6 @@ def _create_search_services(self):
services = {
"markets": {
"url": "/v1/search/markets",
"post_url": "/v1/search/markets",
"service_class": SearchMarkets,
},
}
Expand Down
5 changes: 1 addition & 4 deletions parcllabs/services/data_utils.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,7 @@
import pandas as pd
import numpy as np

from parcllabs.common import (
ID_COLUMNS,
DATE_COLUMNS
)
from parcllabs.common import ID_COLUMNS, DATE_COLUMNS


def safe_concat_and_format_dtypes(data_container):
Expand Down
1 change: 0 additions & 1 deletion parcllabs/services/metrics/portfolio_size_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@


class PortfolioSizeService(ParclLabsService):

def retrieve(
self,
parcl_ids: List[int],
Expand Down
3 changes: 1 addition & 2 deletions parcllabs/services/metrics/property_type_service.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,10 @@
from typing import Any, Mapping, Optional
from parcllabs.common import DEFAULT_LIMIT

from parcllabs.services.parcllabs_service import ParclLabsService
from parcllabs.services.validators import Validators


class PropertyTypeService(ParclLabsService):

def retrieve(
self,
parcl_ids: int,
Expand Down
100 changes: 54 additions & 46 deletions parcllabs/services/parcllabs_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,9 @@
import platform
from collections import deque

from concurrent.futures import ThreadPoolExecutor, as_completed
from requests.exceptions import RequestException
from typing import Any, Mapping, Optional, List, Dict
from parcllabs.common import DELETE_FROM_OUTPUT, DEFAULT_LIMIT
from parcllabs.common import DELETE_FROM_OUTPUT, DEFAULT_LIMIT_SMALL, DEFAULT_LIMIT_LARGE
from parcllabs.exceptions import NotFoundError
from parcllabs.services.validators import Validators
from parcllabs.services.data_utils import safe_concat_and_format_dtypes
Expand All @@ -20,14 +19,13 @@ class ParclLabsService:
"""

def __init__(
self, url: str, client: Any, post_url: str = None, limit: int = DEFAULT_LIMIT
self, url: str, client: Any, post_url: str = None
) -> None:
self.url = url
self.post_url = post_url
self.client = client
if client is None:
raise ValueError("Missing required client object.")
self.limit = limit
self.api_url = client.api_url
self.full_url = self.api_url + self.url
self.full_post_url = self.api_url + self.post_url if post_url else None
Expand Down Expand Up @@ -104,19 +102,23 @@ def _make_request(self, method: str, url: str, **kwargs) -> requests.Response:
raise RequestException(f"An unexpected error occurred: {str(e)}")

def _post(
self, url: str, data: Optional[Dict[str, Any]] = None
self,
url: str,
params: Optional[Dict[str, Any]] = None,
data: Optional[Dict[str, Any]] = None,
) -> requests.Response:
"""
Send a POST request to the specified URL with the given data.
Args:
url (str): The URL endpoint to request.
params (dict, optional): The parameters to send in the query string.
data (dict, optional): The data to send in the request body.
Returns:
requests.Response: The response object.
"""
return self._make_request("POST", url, json=data)
return self._make_request("POST", url, params=params, json=data)

def _get(
self, url: str, params: Optional[Dict[str, Any]] = None
Expand Down Expand Up @@ -156,17 +158,25 @@ def _fetch(
The result of the fetch operation. The exact return type depends on the specific
fetch method called (_fetch_post, _fetch_get, or _fetch_get_many_parcl_ids).
"""
if params and not params.get("limit"):
params["limit"] = self.limit

params = self._clean_params(params)

if self.client.turbo_mode and self.full_post_url:
# convert the list of parcl_ids into post body params, formatted
# as strings
params = {"parcl_id": [str(pid) for pid in parcl_ids], **params}
return self._fetch_post(params, auto_paginate)
if params.get("limit"):
params["limit"] = self._validate_limit("POST", params["limit"])

data = {"parcl_id": [str(pid) for pid in parcl_ids], **params}
params = {"limit": params["limit"]} if params.get("limit") else {}

print(f"data: {data}, params: {params}")

return self._fetch_post(params, data, auto_paginate)
else:
if params.get("limit"):
params["limit"] = self._validate_limit("GET", params["limit"])

if len(parcl_ids) == 1:
url = self.full_url.format(parcl_id=parcl_ids[0])
return self._fetch_get(url, params, auto_paginate)
Expand Down Expand Up @@ -205,10 +215,16 @@ def _fetch_get_many_parcl_ids(

return results

def _fetch_post(self, params: Dict[str, Any], auto_paginate: bool):
response = self._post(self.full_post_url, data=params)
def _fetch_post(
self, params: Dict[str, Any], data: Dict[str, Any], auto_paginate: bool
):
response = self._post(self.full_post_url, params=params, data=data)
return self._process_and_paginate_response(
response, auto_paginate, original_params=params, referring_method="post"
response,
auto_paginate,
original_params=params,
data=data,
referring_method="post",
)

def _fetch_get(self, url: str, params: Dict[str, Any], auto_paginate: bool):
Expand All @@ -219,7 +235,12 @@ def _fetch_get(self, url: str, params: Dict[str, Any], auto_paginate: bool):
return result

def _process_and_paginate_response(
self, response, auto_paginate, original_params, referring_method: str = "get"
self,
response,
auto_paginate,
original_params,
data=None,
referring_method: str = "get",
):

if response.status_code == 404:
Expand All @@ -236,7 +257,7 @@ def _process_and_paginate_response(
while result["links"].get("next") is not None:
next_url = result["links"]["next"]
if referring_method == "post":
next_response = self._post(next_url, data=original_params)
next_response = self._post(next_url, data=data, params=original_params)
else:
next_response = self._get(next_url, params=original_params)
next_response.raise_for_status()
Expand Down Expand Up @@ -264,7 +285,7 @@ def retrieve(
{
"start_date": start_date,
"end_date": end_date,
"limit": limit if limit is not None else self.limit,
"limit": limit if limit else None,
**(params or {}),
}
)
Expand Down Expand Up @@ -327,34 +348,21 @@ def error_handling(self, response: requests.Response) -> None:
msg = f"{response.status_code} {type_of_error} Error: {error_message}"
raise requests.RequestException(msg)

@staticmethod
def _validate_limit(method, limit):
if method.upper() == "POST":
if limit > DEFAULT_LIMIT_LARGE:
print(
f"Supplied limit value is too large for requested endpoint. Setting limit to maxium value of {DEFAULT_LIMIT_LARGE}."
)
limit = DEFAULT_LIMIT_LARGE
elif method.upper() == "GET":
if limit > DEFAULT_LIMIT_SMALL:
print(
f"Supplied limit value is too large for requested endpoint. Setting limit to maxium value of {DEFAULT_LIMIT_SMALL}."
)
limit = DEFAULT_LIMIT_SMALL
else:
raise ValueError("Invalid method. Must be either 'GET' or 'POST'.")

class ParclLabsStreamingService(ParclLabsService):

def _convert_text_to_json(self, chunk):
try:
return json.loads(chunk)
except json.JSONDecodeError as e:
print(f"Error decoding JSON: {e}")
return None

def _process_streaming_data(self, data, batch_size=10000, num_workers=None):
with ThreadPoolExecutor(max_workers=num_workers) as executor:
chunks = deque(data.strip().split("\n"))
futures = [
executor.submit(self._convert_text_to_json, chunk)
for chunk in chunks
if chunk
]

buffer = deque()
for future in as_completed(futures):
result = future.result()
if result:
buffer.append(result)

if len(buffer) >= batch_size:
yield pd.DataFrame(buffer)
buffer.clear()

if buffer:
yield pd.DataFrame(buffer)
return limit
8 changes: 5 additions & 3 deletions parcllabs/services/properties/property_events_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,9 @@
safe_concat_and_format_dtypes,
)
from parcllabs.services.validators import Validators
from parcllabs.services.parcllabs_service import ParclLabsStreamingService
from parcllabs.services.streaming.parcl_labs_streaming_service import (
ParclLabsStreamingService,
)
from concurrent.futures import ThreadPoolExecutor, as_completed
from parcllabs.exceptions import (
NotFoundError,
Expand All @@ -24,8 +26,8 @@ class PropertyEventsService(ParclLabsStreamingService):
Retrieve parcl_property_id event history.
"""

def __init__(self, limit: int = DEFAULT_LIMIT, *args, **kwargs):
super().__init__(limit=limit, *args, **kwargs)
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)

def retrieve(
self,
Expand Down
4 changes: 3 additions & 1 deletion parcllabs/services/properties/property_search.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,9 @@
from typing import List
from parcllabs.services.validators import Validators
from parcllabs.common import VALID_PROPERTY_TYPES_UNIT_SEARCH, VALID_ENTITY_NAMES
from parcllabs.services.parcllabs_service import ParclLabsStreamingService
from parcllabs.services.streaming.parcl_labs_streaming_service import (
ParclLabsStreamingService,
)


class PropertySearch(ParclLabsStreamingService):
Expand Down
8 changes: 4 additions & 4 deletions parcllabs/services/search.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
import pandas as pd
from typing import Any, Mapping, Optional, List
from parcllabs.common import (
DEFAULT_LIMIT,
VALID_LOCATION_TYPES,
VALID_US_REGIONS,
VALID_US_STATE_ABBREV,
Expand All @@ -18,8 +17,8 @@ class SearchMarkets(ParclLabsService):
Retrieve parcl_id and metadata for geographic markets in the Parcl Labs API.
"""

def __init__(self, limit: int = DEFAULT_LIMIT, *args, **kwargs):
super().__init__(limit=limit, *args, **kwargs)
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)

def _as_pd_dataframe(self, data: List[Mapping[str, Any]]) -> Any:
return pd.DataFrame(data)
Expand Down Expand Up @@ -115,7 +114,8 @@ def retrieve(
if geoid:
params["geoid"] = geoid

params["limit"] = limit if limit is not None else self.limit
if limit:
params["limit"] = self._validate_limit("GET", limit)

results = self._fetch_get(
url=self.full_url, params=params, auto_paginate=auto_paginate
Expand Down
Loading

0 comments on commit 099d5c9

Please sign in to comment.